kafka消费者突然不消费了报 Offset commit failed on partition risk_gateway_etl-0 at offset 2579636: This is not the correct coordinator.

小夕夕 发表于: 2021-09-24   最后更新时间: 2021-09-24 15:42:20   7,008 游览

kafka消费者突然不消费了,有错误日志输出

kafka集群环境是3个节点,2副本,消费者模块最近时不时的无法消费消息,其中一个消费者错误日志如下:

2021-09-23 10:28:20.460 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-0 at offset 2579636: This is not the correct coordinator.
2021-09-23 10:28:20.460 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Group coordinator 172.16.196.85:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
2021-09-23 10:28:20.560 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Discovered group coordinator 172.16.196.84:9092 (id: 2147483647 rack: null)
2021-09-23 10:28:20.562 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-0 at offset 2579636: The coordinator is not aware of this member.

另一个消费者错误日志如下:

而且另一个消费者在同一时间的错误日志是:

2021-09-23 10:28:19.247 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-2 at offset 2577133: The request timed out.
2021-09-23 10:28:19.247 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Group coordinator 172.16.196.85:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
2021-09-23 10:28:19.479 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Discovered group coordinator 172.16.196.85:9092 (id: 2147483645 rack: null)
2021-09-23 10:28:20.459 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-2 at offset 2577133: This is not the correct coordinator.
2021-09-23 10:28:20.459 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Group coordinator 172.16.196.85:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
2021-09-23 10:28:20.560 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Discovered group coordinator 172.16.196.84:9092 (id: 2147483647 rack: null)
2021-09-23 10:28:20.562 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-2 at offset 2577133: The coordinator is not aware of this member.

另: 我在集群中其中一台broker里日志里看到如下信息:

[2021-09-23 10:28:10,998] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions elk-0(kafka.server.ReplicaFetcherManager)
[2021-09-23 10:28:10,998] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions List([elk-0, initOffset 10040090963 to broker BrokerEndPoint(0,172.16.196.84,9092)] ) (kafka.server.ReplicaF
etcherManager)
[2021-09-23 10:28:10,998] INFO [ReplicaAlterLogDirsManager on broker 1] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2021-09-23 10:28:11,007] INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-09-23 10:28:11,007] INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error sending fetch request (sessionId=1693859521, epoch=21103032) to node 2: java.nio.channels.ClosedSelectorException. (org.
apache.kafka.clients.FetchSessionHandler)
[2021-09-23 10:28:11,008] INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-09-23 10:28:11,008] INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Shutdown completed(kafka.server.ReplicaFetcherThread)

以上问题查了些资料,但没有收获,希望能在这里找到答案。

发表于 2021-09-24
  • 你可以在补充一下,你是如何进行的测试,部署情况,和kafka的版本等上下文信息。半兽人 2年前 回复
添加评论

The coordinator is not aware of this member.

这个错误一般是offset提交的时候,消息处理时间超过了30秒,kafka将该消费者从组中移除了,认为其已经无效。所以将组中移除,所以提交offset失败了。

可通过加大超时时间:

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,60000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,60000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,70000);

来解决这个问题。

相关的错误和解决方法和参考:https://www.orchome.com/6742

小夕夕 -> 半兽人 2年前

我看了下我的配置,目前配置的是25秒,那我把这4个参数都设置下看看

你的答案

查看kafka相关的其他问题或提一个您自己的问题