kafka消费端消费一批消息时有时候开头几条消息没有被消费到是什么问题?

—— Only、 发表于: 2019-10-29   最后更新时间: 2019-11-04  

消费端在消费消息时,有时一批消息开头几条消息消费端感觉像没有拉取到,猜测会不会是上一次拉去消息的时候把后面几个偏移量消费了?但是这不应该把。不知道具体是什么问题,该如何找问题呢?我是用的同步+异步的方式提交的偏移量。新建了一个servlet项目启动时初始化servlet的init方法来启动的线程。servlet的destroy调用时也就是项目关闭时去结束while循环,然后调用consumer.commitSync();同步提交的方法。

代码如下

 public Consumer(String servers, String topicName, boolean flag) {
    Properties props = new Properties();
    props.put("bootstrap.servers", servers);
    props.put("group.id", GROUPID);
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("auto.offset.reset", "earliest");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");//获取记录数
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//处理消息最大间隔时间
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<String, String>(props);
    this.topic = topicName;
    this.flag=flag;
    this.consumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
    logger.info("---------kafka消费端开始消费---------");
    Gson gson = new Gson();
    try {
        while(flag){
            msgList = consumer.poll(1000);
            if(null!=msgList&&msgList.count()>0){
                for (ConsumerRecord<String, String> record : msgList) {
                    String reqJson = record.value();
                    ProposalSaveFor3105RequestDto baseRequestDto = null;
                    try {
                        baseRequestDto = gson.fromJson(reqJson,ProposalSaveFor3105RequestDto.class);
                        String proposalNo = baseRequestDto.getPolicyDataDto().getInsuredItemProcutDtoList().get(0).getInsuredDto().getProposalNo();
                        logger.info("kafka拉取了单号:"+proposalNo);
                    }catch (Exception e){
                        e.printStackTrace();
                        logger.error("kafka消费端异常---转换报文报错---报文内容:"+reqJson+",异常信息:",e);
                    }
                    if(baseRequestDto!=null){
                        try {
                            KafkaProposalSaveFor3105ServiceImpl service = new KafkaProposalSaveFor3105ServiceImpl();
                            service.saveProposal(baseRequestDto);
                        }catch (Exception e){
                            e.printStackTrace();
                            logger.error("kafka消费端业务逻辑执行异常---异常信息:",e);
                        }
                    }
                }
                consumer.commitAsync();
            }
        }
    } catch (Exception e) {
        logger.error("kafka消费端消费信息时异常---异常信息:",e);
        e.printStackTrace();
    } finally {
        try {
            consumer.commitSync();
        }catch (Exception e1){
            e1.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}

程序没有报错。
偶尔会出现这样的问题,不知道是不是我消费端写的哪里有什么问题。求大神分析分析,谢谢。



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: 使用kafka connect ,mysql作为输入,输出也是mysql 报错record value schema is missing
下一条: kafka集群controller节点连接不上受管节点

  • 消费者是定时拉起来消费,消费完成后就关闭?等待下次消费?

    • 消费者是批量拉取消息,也就是说在你停止项目的时候,如果此时刚拉取了消息,提交offsr,但程序只处理一部分,那剩下的消息就会漏掉。

        • commitAsync()commitSync()是批量行为,你如果一条一条的消费,那就一条一条的提交。比如:

          public <T> void poll(String topicName, Object obj, Class<T> clas) {
          
                  // 订阅一个主题
                  consumer.subscribe(Arrays.asList(topicName));
                  while (ConsumerService.flag) {
                      ConsumerRecords<String, String> records = consumer.poll(100);
          
                      for (TopicPartition partition : records.partitions()) {
                          List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                          for (ConsumerRecord<String, String> record : partitionRecords) {
                              long start = System.currentTimeMillis();
                              try {
                                  consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
                              } catch (Exception e) {
                                  logger.warn("kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},commit time:{},error:{}", topicName, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e);
                                  break;
                              } catch (Throwable e) {
                                  logger.warn("fatal Error:kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},message:{},error:{}", topicName, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e);
                                  break;
                              }
          
                              // 调用业务逻辑
                              try {
                                  BizClassUtils.get(obj).doBiz(JSON.parseObject(record.value(), clas));
                              } catch (Exception e) {
                                  logger.error("a message Exception: message:{},topicName:{},error:{}", record.value(), e);
                              } catch (Throwable e) {
                                  logger.error("a message throwable: message:{},topicName:{},error:{}", record.value(), e);
                              } finally {
                                  long endTime = (System.currentTimeMillis() - start);
                                  if (endTime > 20000)
                                      logger.debug("Business processed single a message used time:{}ms,message total:{}", (System.currentTimeMillis() - start), partitionRecords.size());
                              }
                          }
                      }
                  }
                  if (ConsumerService.flag == false)
                      logger.info("【kafka消费者线程结束】....");
              }
          

          参考:https://www.orchome.com/1056

            • 你的代码看了,没问题,你是没有停止的情况下也有消息丢失。
              但是只有在强杀的情况下,就会跟上面我说的场景一样,程序只处理一部分,剩下的消息就会漏掉。下次启动跳过了这些消息。

                • 重新验证了一下,可能不是消费端丢失消息。分区只有一个,而且消费端打出的日志中偏移量并没有中断过,偏移量是连续正确的,但是生产端发送了5条消息,但消费端只消费了3条消息,说明应该是生产端发送消息时丢失了2条消息,可以得出这样的结论吗?

                    • 谢谢。有个新问题想咨询一下,昨天我查看日志发现我的主题所有分区都从头开始重新消费了一次,因为kafka服务不是我在维护,怀疑可能是谁操作了kafka服务,请问知道可能做了什么操作吗?

                        • 新开个问题吧

                          1. 你的消费者重启过吗(客户端决定从哪里开始消费)
                          2. kafka 0.9版本之后,之前是存储在zk上,消费者的offset的位置存储在__consumer_offsets,如果它清空了,有可能会导致。

                            各位大佬支支招,感谢