java kafka consumer.poll在运行一段时间后出现死循环,无法获取数据

MADAO 发表于: 2019-10-25   最后更新时间: 2019-10-25  

kafka服务端显示断开与消费者链接,但是消费者没有任何异常;

[2019-10-25 04:41:28,835] INFO [GroupCoordinator 0]: Member consumer-3-c80f9b26-ce89-4685-8afa-d5766e32fd47 in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,835] INFO [GroupCoordinator 0]: Preparing to rebalance group DataAcquireGroup in state PreparingRebalance with old generation 30 (__consumer_offsets-33) (reason: removing member consumer-3-c80f9b26-ce89-4685-8afa-d5766e32fd47 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,965] INFO [GroupCoordinator 0]: Member consumer-4-36f437e5-186b-470f-92cd-f02483b8fa7d in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,975] INFO [GroupCoordinator 0]: Member consumer-1-a0eaa01b-1fdf-40be-a9d3-b7e527bcb7fe in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,987] INFO [GroupCoordinator 0]: Member consumer-5-cf609397-08a1-4eef-b6cf-19a63ad24ad3 in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,989] INFO [GroupCoordinator 0]: Member consumer-2-331356e1-34ee-49a7-b352-ef2729a001bb in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

消费者代码:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
      ToolsUtil.linkedBlockingQueue.put(record);
    }
}

消费者配置:

bootstrap.servers=127.0.0.1:9092
group.id=DataAcquireGrouphuihui
enable.auto.commit=true
auto.commit.interval.ms=1000
max.poll.interval.ms=60000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.records=100

消费者正常运作10+小时后出现问题,且没有任何异常或者是警告输出 生产者运作正常



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka follower 副本宕机同时有新的数据写入,重启后日志同步是全量同步还是增量同步
下一条: kafka broker和客户端的连接断开

  • 一切正常,进程被杀,一般是oom引起的,系统日志中会有。
    另外,你启动的消费者的程序是守护进程吧。

    • 首先进程没有被杀死,查看日志发现没有任何输出,CPU使用暴涨至100%,启动的程序是普通java程序,通过打印信息排查,执行到consumer.poll(100);后程序不动了,并没有被杀死,望大佬帮忙看看