kafka消费者自动提交报错,kafka需要怎么配置呢?

野心永恒 发表于: 2019-08-28   最后更新时间: 2019-08-28  

kafka消费者自动提交报错

  1. 业务场景是这样的:
    每当有一个任务下发的时候,我会启动一个线程,实例化一个consumer,consumer会订阅当前有任务的topic,当这些任务全部都消费完毕后,我会停止这个线程,但是最近发现kafka输出ERROR日志,日志如下:
    11:25:34 [Thread-18] ERROR o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=rzx-consumer-group] Offset commit failed on partition topic1566791132867-0 at offset 10: The coordinator is not aware of this member.
    11:25:34 [Thread-18] WARN  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=rzx-consumer-group] Synchronous auto-commit of offsets {topic1566791132867-0=OffsetAndMetadata{offset=10, metadata=''}} failed: Commit cannot be completed since the group
    has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processin
    g. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    
    每个消息也都消费成功了,之后再处理相同topic的消息也不会重复消费,感觉是消费者配置的问题,目前的消费者配置为:
         props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 12695150);
         props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 12695150);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    
    请问,需要怎么调整配置呢?
    谢谢


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




野心永恒 发表于: 18天前   最后更新时间: 18天前   游览量:143

上一条: kafka_2.12-2.1.0或kafka_2.11-2.1.0 出现了死锁问题,kafka节点出现假死状态
下一条: 使用Docker启动kafka创建topic异常

  • 线程关闭的时候,调用kakfa消费者的close()方法。

    • kafka消费者提交offset时,kafka集群正在重新分配消费者,就会报该错误。
      kafka客户端自动是独立的线程,按间隔自动提交,当最后一次提交offset前,kafka消费者已经通知k8s关闭,释放了消费者的身份。
      建议:

      1. poll本身为长轮询,定时去拉取新消息,没必要每次new出来。
      2. 改为手动提交