Kafka Java客户端消费不到消息

lioY3 发表于: 2023-06-13   最后更新时间: 2023-06-13 08:34:18   1,075 游览

1、场景和问题

部署两个相同group的消费者消费第三方的Kafka集群,收到消息后进行数据处理,然后转发到自己的单机Kafka上,日均约40W数据量,出现了部分数据(约7~8W)消费不到丢失的问题,使用Kafka Tool可以在源Kafka Topic中搜索到消息,但是消费者没有收到消息并打印日志。

2、版本和环境

源Kafka 0.10.0.0 集群,源Topic12个分区
目标Kafka 0.10.0.1 单机,目标Topic10个分区
消费者+生产者程序 Spring Boot 2.3.12.RELEASE + Spring Kafka 2.5.14.RELEASE(kafka-clients 2.5.1)

3、相关代码和配置

配置如下:

## consumer
spring.kafka.consumer.bootstrap-servers=
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.group-id=kafka-consumer-group-20230606
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.properties.max.poll.interval.ms=300000
spring.kafka.consumer.properties.session.timeout.ms=60000
spring.kafka.consumer.properties.request.timeout.ms=18500000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
## producer
spring.kafka.producer.bootstrap-servers=
spring.kafka.producer.retries=3
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=4096
spring.kafka.producer.buffer-memory=40960
spring.kafka.producer.properties.linger.ms=0
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
## listener
spring.kafka.listener.type=BATCH
spring.kafka.listener.ack-mode=MANUAL
spring.kafka.listener.concurrency=12

代码如下:

@KafkaListener(containerFactory = "batchFactory", topics = "ORIGIN_TOPIC")
public void doMessage(ConsumerRecords<String, String> consumerRecords, Acknowledgment ack) {
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
        if (consumerRecord != null) {
            try {
                log.info("message:{}", consumerRecord.value());
                // 处理收到的数据数据并发送目标topic
                String data = consumerRecord.value() + "sssss";
                kafkaTemplate.send("target_topic", data);
            } catch (Exception e) {
                log.error("发送数据到targetTopic异常: {}", e.getMessage(), e);
            }
        }
    }
    ack.acknowledge();
}

4、报错信息

报错1:

2023-06-11T00:51:56.178+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)
    at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:949)
    at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:727)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:840)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:559)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:984)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2378)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2373)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2359)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2173)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
    ... 3 common frames omitted

报错2:

2023-06-12T10:10:04.542+08:00 WARN ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] [o.a.k.c.consumer.internals.ConsumerCoordinator:1165] - [Consumer clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-4, groupId=quanzhougaozhi-ivehcletransfer-group-0516] Offset commit failed on partition deploycontrol_high_speed-3 at offset 11409204: The request timed out.

报错3:

2023-05-17T20:33:01.077+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#1-9-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading array of size 9092, only 641 bytes available
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading array of size 9092, only 641 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
    at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:313)
    at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:726)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:840)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:559)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1146)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
    ... 3 common frames omitted

报错4:

2023-05-18T00:58:33.372+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-10-C-1] [o.a.k.c.consumer.internals.ConsumerCoordinator:1167] - [Consumer clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-11, groupId=quanzhougaozhi-ivehcletransfer-group-0516] Offset commit failed on partition deploycontrol_high_speed-4 at offset 10469635: The coordinator is not aware of this member.
2023-05-18T00:58:33.373+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-10-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1213)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1140)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:984)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2378)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2373)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2359)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2173)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
    ... 3 common frames omitted

报错5:
使用Kafka Tool查看消费者消费情况,正常能看到12个分区(0~11),但是有时会少掉几个分区(3、4、6)
screenshot
screenshot

5、已尝试方法

已经尝试以下方法,但是不明确效果
(1)降低Spring Boot和Spring Kafka版本,Spring Boot 2.1.8.RELEASE + Spring Kafka 2.2.8.RELEASE
(2)客户端使用kafka-clients 0.10.0.0

发表于 2023-06-13
添加评论

Caused by: java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)

请求和收到的不匹配,可能是异常太多,多线程springboot错乱导致的。

所以我们聚焦核心异常:

核心异常是:org.apache.kafka.clients.consumer.CommitFailedException

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

默认情况下,消息处理时间超过了30秒,kafka将该消费者从组中移除了,认为其已经无效。所以将组中移除,所以提交offset失败了。
而失败,会导致重新选举消费者,而你的消费者程序有12个,就是此时导致的错乱。

解决:

  1. 你收到一批消息之后,将消息发送到新的kafka集群的时间,不能超过30秒,调大超时时间(治标不治本),参考这篇文章的第一个回答:https://www.orchome.com/893
  2. 如上所示,如果你调大了超时时间,依旧出现CommitFailedException,那你就需要设置获取消息的数量大小了,就是pull少一点的kafka消息,这样发送到新kafka的量少了,超时的问题就得到了缓解。
  3. 根据你的参数,spring.kafka.listener.concurrency=12,并发也是导致你异常加重的原因之一,越小的话同时处理的消息少,不容易阻塞在提交新集群那里,异常的概率越低。另外你现在是2个消费者程序,可以通过增加消费者程序来解决并发的问题,而不是通过调这个参数。
  4. spring.kafka.listener.concurrency=12话说回来,你一共12个分区,1个消费者程序12个并发的话,1个消费者程序全占满了,另外一个消费者程序可能分到个别甚至是0个消费者分区。比如你现在2个消费者,concurrency不应该超过6个,3个话不能超过4个,我建议你默认1个就行了,处理不会慢太多,可以增加消费者程序来提高并发。
你的答案

查看kafka相关的其他问题或提一个您自己的问题