当轮询kafka的consumer重启的时候,可以从最新的offset开始消费

十禾 发表于: 2021-02-08   最后更新时间: 2021-02-08  

需求:当轮询kafka的consumer重启的时候,可以从最新的offset开始消费

遇到问题:(kafka版本 2.12-2.2.0)当代码执行到poll方法时报错,且没有重置offset。在kafka服务端找不到这个consumer,是否assign不能注册consumer?可以使用subscribe()方法注册consumer的同时执行seekToEnd()吗?

更正问题!!!
通过assign()方法订阅分区失败,kafka服务器查不到对应consumer,导致后面poll方法报错。为什么assign分区会失败呢

相关代码

List<TopicPartition> finalTopicPartitions = new ArrayList<>();
for (String topicName : topicNames) {
    List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topicName);
    List<TopicPartition> topicPartitions = partitionInfos.stream().map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())).collect(Collectors.toList());
    finalTopicPartitions.addAll(topicPartitions);
}
kafkaConsumer.assign(finalTopicPartitions);
kafkaConsumer.seekToEnd(finalTopicPartitions);
// 轮询kafka,拉取任务
while (true) {
    ...

    // 从kafka获取content,并根据策略筛选条件进行筛选
    ConsumerRecords records = kafkaConsumer.poll(pollTimeout); 
 }

screenshot

相关配置

kafka:
  "auto.offset.reset": latest

报错信息

[ERROR] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handle(ConsumerCoordinator.java:1167)] [Consumer clientId=consumer-1034-1, groupId=1034] Offset commit failed on partition test-0 at offset 15: The coordinator is not aware of this member.


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: 启动kafka的时候 连接zkServer被拒绝的问题
下一条: kafka 创建多少分区合适呢?

  • 你的消费者组名被占用了,还有其他的程序在使用。

    • 假设程序A和程序B以同一个消费者组名订阅这个分区的话,只有一个程序能消费到这我理解,怎么会有占用呢?然后我刚才去除别的实例的干扰,又查了下kafka服务端,发现虽然我调用了assign()方法,但是服务端并没有消费者,是否说明没订阅成功,为什么会没订阅成功呢?

        • 1、因为你们用了同一个名字,所以你指定的offset的就报了刚才的错误呀,不是其成员。
          2、消费者启成功后,生产者是否有持续的消息写到该topic呢?

            • 不好意思,回复完了。确实是因为没有消息写进来,看似没有订阅成功,应该是kafka sdk里面的订阅是个lazy操作,已经可以每次开始消费都从最新的offset了,问题解决了。就是1看不懂