当轮询kafka的consumer重启的时候,可以从最新的offset开始消费

十禾 发表于: 2021-02-08   最后更新时间: 2021-02-08 17:11:40   2,128 游览

需求:当轮询kafka的consumer重启的时候,可以从最新的offset开始消费

遇到问题:(kafka版本 2.12-2.2.0)当代码执行到poll方法时报错,且没有重置offset。在kafka服务端找不到这个consumer,是否assign不能注册consumer?可以使用subscribe()方法注册consumer的同时执行seekToEnd()吗?

更正问题!!!
通过assign()方法订阅分区失败,kafka服务器查不到对应consumer,导致后面poll方法报错。为什么assign分区会失败呢

相关代码

List<TopicPartition> finalTopicPartitions = new ArrayList<>();
for (String topicName : topicNames) {
    List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topicName);
    List<TopicPartition> topicPartitions = partitionInfos.stream().map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())).collect(Collectors.toList());
    finalTopicPartitions.addAll(topicPartitions);
}
kafkaConsumer.assign(finalTopicPartitions);
kafkaConsumer.seekToEnd(finalTopicPartitions);
// 轮询kafka,拉取任务
while (true) {
    ...

    // 从kafka获取content,并根据策略筛选条件进行筛选
    ConsumerRecords records = kafkaConsumer.poll(pollTimeout); 
 }

screenshot

相关配置

kafka:
  "auto.offset.reset": latest

报错信息

[ERROR] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handle(ConsumerCoordinator.java:1167)] [Consumer clientId=consumer-1034-1, groupId=1034] Offset commit failed on partition test-0 at offset 15: The coordinator is not aware of this member.
发表于 2021-02-08
¥1.0

你的消费者组名被占用了,还有其他的程序在使用。

十禾 -> 半兽人 3年前

假设程序A和程序B以同一个消费者组名订阅这个分区的话,只有一个程序能消费到这我理解,怎么会有占用呢?然后我刚才去除别的实例的干扰,又查了下kafka服务端,发现虽然我调用了assign()方法,但是服务端并没有消费者,是否说明没订阅成功,为什么会没订阅成功呢?

十禾 -> 十禾 3年前

上面补充了查询kafka服务端的图

半兽人 -> 十禾 3年前

1、因为你们用了同一个名字,所以你指定的offset的就报了刚才的错误呀,不是其成员。
2、消费者启成功后,生产者是否有持续的消息写到该topic呢?

十禾 -> 半兽人 3年前

不好意思,回复完了。确实是因为没有消息写进来,看似没有订阅成功,应该是kafka sdk里面的订阅是个lazy操作,已经可以每次开始消费都从最新的offset了,问题解决了。就是1看不懂

十禾 -> 半兽人 3年前

谢谢

这个代码不对, 你应该使用kafkaConmsumer.endOffsets来读取 LEO(更准确的说应该是ISR里的HW), 如若你要使用seekToEnd(), 你在这个方法之前一定要先提前调用consumer.poll(0) 应消费的元数据保存在__consumer_offstes中的位置, 在这之后, 你才可以正常使用seekToBegin/seekToEnd

你的答案

查看kafka相关的其他问题或提一个您自己的问题