kafka手动提交offest异常问题

小白 发表于: 2021-01-11   最后更新时间: 2021-01-11 16:18:08   2,162 游览

flume定义了kafkasource消费数据, 手动提交偏移量的方式 , kafka版本 2.1.0-cdh6.2.1 topic的分区数是1 topic的数据量增量一天千万左右, 为了保证顺序所以用了单分区

数据量较大的情况下出现

2020-12-24 17:13:38,725 (PollableSourceRunner-DirectKafkaSouce-source1) [ERROR - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:812)] [Consumer clientId=consumer-1, groupId=group61] Offset commit failed on partition A_PREPAY_FLOW-0 at offset 431122116: The request timed out.
2020-12-24 17:13:38,727 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown(AbstractCoordinator.java:706)] [Consumer clientId=consumer-1, groupId=group61] Group coordinator hnbigdata006:9092 (id: 2147483323 rack: null) is unavailable or invalid, will attempt rediscovery

2020-12-24 17:13:38,830 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)] [Consumer clientId=consumer-1, groupId=group61] Discovered group coordinator hnbigdata006:9092 (id: 2147483323 rack: null)
2020-12-24 17:13:38,830 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown(AbstractCoordinator.java:706)] [Consumer clientId=consumer-1, groupId=group61] Group coordinator hnbigdata006:9092 (id: 2147483323 rack: null) is unavailable or invalid, will attempt rediscovery
2020-12-24 17:13:38,932 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)] [Consumer clientId=consumer-1, groupId=group61] Discovered group coordinator hnbigdata006:9092 (id: 2147483323 rack: null)

请大佬帮忙指导下

发表于 2021-01-11
添加评论

kafka一条消息默认处理是30秒,如果超过30秒就会被踢出group,重新选举,也就是说,你要在30秒内手动提交一次。

增加kafka超时时间:session.timeout.msoffsets.commit.timeout.msrequest.timeout.ms

小白 -> 半兽人 3年前

业务处理好像都在30秒内可以完成的, 这个问题是否和我broker节点异常有关呢?

半兽人 -> 小白 3年前

大兄弟,broker都挂了,那超时不是正常的么。。

小白 -> 半兽人 3年前

那我的数据在提交offest之前已经入库了, 这里offest提交失败, 也就是偏移量信息写入__consume_offests失败, 那下一轮消费拿到的还是这批数据了? 数据重复了?

半兽人 -> 小白 3年前

是的,会导致重复消费。
要么先提交offset在处理消息,但当程序被强杀,或崩溃则会导致消息丢失。
我选择的是后者(程序做好压测,不随意更改线程和并发,不会崩溃)

你的答案

查看kafka相关的其他问题或提一个您自己的问题