小鱼杰杰

0 声望

这家伙太懒,什么都没留下

个人动态
  • 南山南的忧伤 回复 小鱼杰杰kafka socket.request.max.bytes=104857600,为什么larger than 524288? 中 :

    有后续吗 我遇到跟你一样的问题

    2年前
  • 半兽人 回复 小鱼杰杰kafka消息滞留 中 :

    你咋用的流的方式写消费者呢?

    4年前
  • 小鱼杰杰 回复 半兽人kafka消息滞留 中 :

    package com.ai.linkgroup.statistics.mq;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ConcurrentHashMap;

    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import com.ai.linkgroup.statistics.util.ZipUtils;
    import com.common.system.mq.serializer.StringSerializer;

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;

    /**

    • MQ接口接收数据
    • @author blueskyevil->70942 2017年9月21日
      /
      public class KafkaReceiver implements Watcher
      {
      private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
      private ConsumerConnector consumer=null;
      // param
      private String zkConfig;
      private String group;
      private int consumers;
      private KeeperState stateValue;
      private ConsumerConfig cc;
      // 主题对应的消息队列
      private Map> topicMap;
      // 主题对应的消息队列
      private Map topicConsumers;

      public void init() //consumer需要连接zookper
      {//https://blog.csdn.net/u010463032/article/details/78892858

       Properties props = new Properties();
       props.put("zookeeper.connect", zkConfig);//指定了zookpeer的connect ,以hostname:port形式,就是zookeeper各节点的hostname和port,为防止某个挂掉,可以指定多个connect string
       props.put("group.id", group);//指定了consumer的group名字,group名一样的进程属于同一个consumer group
       props.put("consumer.numbers", consumers);
       props.put("auto.commit.enable", "true");
       props.put("auto.commit.interval.ms", "60000");
       props.put("derializer.class", "kafka.serializer.DefaultDecoder");
       //新增参数  
       props.put("zookeeper.session.timeout.ms", "70000");//socket请求超时时间,默认值是30*1000
       props.put("rebalance.backoff.ms", "20000");
       props.put("rebalance.max.retries", "10");
       props.put("zookeeper.connection.timeout.ms", "30000");//zookper的session超时时间,没有收到心跳,则认为server挂掉了,设置过低,会被误认为挂了,如果设置过高真的挂了,很长时间才被server得知
      
       cc = new ConsumerConfig(props);
      
       topicMap = new ConcurrentHashMap<String, BlockingQueue<String>>();
       topicConsumers = new ConcurrentHashMap<String, ConsumerConnector>();
      

      }

      /**

      • mq消息读取 消息被压缩 队列大小为10
      • @param topic 消息主题
      • @return 消息队列
        */
        public BlockingQueue receiveMsg(String topic)
        {
        if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
        {

         return initReadMsg(topic,true);
        

        }
        else
        {

         return topicMap.get(topic);
        

        }
        }

        /**

      • mq消息读取 消息未压缩 队列大小为10
      • @param topic 消息主题
      • @return 消息队列
        */
        public BlockingQueue receiveMsgUnzip(String topic)
        {
        //add by peiyj 防止意外关闭客户端
        if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
        {

         return initReadMsg(topic,false);
        

        }
        else
        {

         return topicMap.get(topic);
        

        }
        }

        /**

      • 启动mq消息读取线程
      • @param topic 消息主题
      • @param zipFlag 消息压缩标志
      • @return 消息队列
        */
        private synchronized BlockingQueue initReadMsg(String topic,boolean zipFlag)
        {
        //默认消息队列500
        BlockingQueue msgQ = new ArrayBlockingQueue(500);
        //
        if (null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
        {

         topicMap.put(topic, msgQ);
         try 
         {
             // Consumer
              consumer = Consumer.createJavaConsumerConnector(cc);
             logger.info("consumer...createJavaConsumerConnector topic={}",topic);
             topicConsumers.put(topic, consumer);//一个主题放一个消费者,将主题放入消费者
         } 
         catch (Exception e) 
         {
             logger.error("initReadMsg err.......",e);
         }
         Thread readMsg = new Thread(new ReadMsg(topic, msgQ,zipFlag), "["+topic+"]KafkaStream Reader");
         readMsg.start();
        

        }
        return msgQ;
        }

        /**

      • 关闭消费者
      • @param topic
        */
        public void shutDownConsumer(String topic)
        {
        try
        {

         // Consumer
         topicConsumers.get(topic).shutdown();
        
         logger.info("consumer...shutdown topic={}",topic);
        
         topicConsumers.remove(topic);
        
         topicMap.remove(topic);
        

        }
        catch (Exception e)
        {

         logger.error("shutDownConsumer err......",e);
        

        }
        }

        /**

      • 读取mq消息线程
        */
        private class ReadMsg implements Runnable
        {
        private String topic;
        private boolean zipFlag;
        private BlockingQueue msgQ;

        public ReadMsg(String topic,BlockingQueue msgQ,boolean zipFlag)
        {

         this.topic = topic;
         this.msgQ = msgQ;
         this.zipFlag=zipFlag;
        

        }

        public void run()
        {

         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
         // value表示consumer thread线程数量
         topicCountMap.put(topic, new Integer(consumers));
         while (true) 
         {
             try 
             {    
                 if(null==topicMap.get(topic))
                 {
                     break;
                 }
        
                 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = topicConsumers.get(topic).createMessageStreams(topicCountMap);
        
                 logger.info("consumer...createMessageStreams topic={}",topic);
        
                 for (KafkaStream<byte[], byte[]> m_stream : consumerMap.get(topic)) 
                 {
                     ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
                     while (it.hasNext()) 
                     {
                         //MQ offset 移动
                         MessageAndMetadata<byte[], byte[]> mm = it.next();
                         String msg = String.valueOf(new StringSerializer().<String>deserialize(mm.message()));
                         if(zipFlag)
                         {
                             msg =  ZipUtils.unzip(msg);
                             logger.debug("control receive topic={},msg={}",topic,msg);
                             msgQ.put(msg);
                         }
                         else
                         {
                             logger.debug("topic={},msg={}",topic,msg);
                             msgQ.put(msg);
                         }
                     }
                 }
             } 
             catch (Exception e)
             {
                 e.printStackTrace();
                 logger.error("KafkaConsumer Reader Exception", e);
             }
        
             try 
             {
                 Thread.sleep(2000);
             } 
             catch (InterruptedException e) 
             {
                 logger.error("ReadMsg sleep InterruptedException......",e);
             }
         }
        

        }
        }

        public void setZkConfig(String zkConfig)
        {
        this.zkConfig = zkConfig;
        }

        public void setGroup(String group)
        {
        this.group = group;
        }

        public void setConsumers(int consumers)
        {
        this.consumers = consumers;
        }

        @Override
        public void process(WatchedEvent event) {

            stateValue=event.getState();
    
    
    
    }
    

    }

    4年前
  • 半兽人 回复 小鱼杰杰kafka消息滞留 中 :

    ReadMsg类 贴一下

    4年前
  • 半兽人 回复 小鱼杰杰kafka消息滞留 中 :

    我先读读你的代码。

    4年前
  • 发表了 kafka消息滞留  
    4年前
  • 关注了用户 半兽人 · 4年前