kafka手动提交offest异常问题

小白 发表于: 2021-01-11   最后更新时间: 2021-01-11  

flume定义了kafkasource消费数据, 手动提交偏移量的方式 , kafka版本 2.1.0-cdh6.2.1 topic的分区数是1 topic的数据量增量一天千万左右, 为了保证顺序所以用了单分区

数据量较大的情况下出现

2020-12-24 17:13:38,725 (PollableSourceRunner-DirectKafkaSouce-source1) [ERROR - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:812)] [Consumer clientId=consumer-1, groupId=group61] Offset commit failed on partition A_PREPAY_FLOW-0 at offset 431122116: The request timed out.
2020-12-24 17:13:38,727 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown(AbstractCoordinator.java:706)] [Consumer clientId=consumer-1, groupId=group61] Group coordinator hnbigdata006:9092 (id: 2147483323 rack: null) is unavailable or invalid, will attempt rediscovery

2020-12-24 17:13:38,830 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)] [Consumer clientId=consumer-1, groupId=group61] Discovered group coordinator hnbigdata006:9092 (id: 2147483323 rack: null)
2020-12-24 17:13:38,830 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown(AbstractCoordinator.java:706)] [Consumer clientId=consumer-1, groupId=group61] Group coordinator hnbigdata006:9092 (id: 2147483323 rack: null) is unavailable or invalid, will attempt rediscovery
2020-12-24 17:13:38,932 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)] [Consumer clientId=consumer-1, groupId=group61] Discovered group coordinator hnbigdata006:9092 (id: 2147483323 rack: null)

请大佬帮忙指导下



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: 请问一下,每次服务启动创建不同消费者组的话,对kafka会不会有影响
下一条: flink程序消费kafka速度起不来,请问下如何调整消费速度?

  • kafka一条消息默认处理是30秒,如果超过30秒就会被踢出group,重新选举,也就是说,你要在30秒内手动提交一次。

    增加kafka超时时间:session.timeout.msoffsets.commit.timeout.msrequest.timeout.ms

    • 那我的数据在提交offest之前已经入库了, 这里offest提交失败, 也就是偏移量信息写入__consume_offests失败, 那下一轮消费拿到的还是这批数据了? 数据重复了?

        • 是的,会导致重复消费。
          要么先提交offset在处理消息,但当程序被强杀,或崩溃则会导致消息丢失。
          我选择的是后者(程序做好压测,不随意更改线程和并发,不会崩溃)