kafka手动管理offset,poll 请求拉取失败,一直轮询 ?

执念 发表于: 2019-12-13   最后更新时间: 2019-12-13  

多实例,手动管理offset,存储到redis,项目启动初始化时,读取offset,使用seek订阅消费,poll 拉取消息,刚调试的时候正常,过几天看下,poll 拉取消息失败

Kafka version : 0.10.1.1 , 多实例环境,手动管理offset

多线程消费,每个线程消费固定的 partition

 @Override
    public void run() {
        consumer.assign(Arrays.asList(topicPartition));
        String key = StartInit.REDIS_KEY_MSG + "topic:" + topicPartition.topic() + "partition:" + topicPartition.partition();
        String lockKey = REDIS_LOCK_KEY + "topic:" + topicPartition.topic() + "partition:" + topicPartition.partition();
        long position = getOffset(key);
        log.info("topic [{}], partition [{}] thread start,consume begin offset is [{}]  ...", topicPartition.topic(), topicPartition.partition(), position);
        consumer.seek(topicPartition, position);
        try {
            boolean running = true;
            while (running) {
                long offsetNewest;
                try {
                    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                    //保证事务
                    CustomTransactionCallback customTransactionCallback = new CustomTransactionCallback(0L, records);
                    transactionTemplate.execute(customTransactionCallback);
                    offsetNewest = customTransactionCallback.getOffsetNewest();
                } catch (Exception e) {
                    log.error("find exception [{}] ...", e);
                    continue;
                }
                consumer.commitSync();
                log.info("topic [{}], partition [{}], offsetNewest is [{}] ...", topicPartition.topic(), topicPartition.partition(), offsetNewest);
                boolean locked = redisManager.lock(lockKey, 180, 3);
                if (locked) {
                    try {
                        long offsetDb = getOffset(key);
                        if (offsetDb < offsetNewest) {
                            // 更新 redis
                            stringRedisTemplate.opsForValue().set(key, String.valueOf(offsetNewest));
                        }
                    } finally {
                        redisLockService.unlock(lockKey);
                    }
                }
            }
        } finally {
            consumer.close();
        }

    }
  1. 2019-12-13 09:34:24.613 [kafka_msg_consumer_2] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0
    2019-12-13 09:34:28.034 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.066 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.081 [kafka_msg_consumer_0] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2
    2019-12-13 09:34:28.081 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.096 [kafka_msg_consumer_2] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0
    2019-12-13 09:34:28.113 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.113 [kafka_msg_consumer_0] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2
    2019-12-13 09:34:28.144 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.144 [kafka_msg_consumer_0] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2
    2019-12-13 09:34:28.159 [kafka_msg_consumer_2] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0
    .....
    
    谢谢解答。


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka 报错Got error produce response with correlation id on topic-partition Interaction.MainPostLike_5-2, retrying. Error: NETWORK_EXCEPTION
下一条: kafka服务节点挂掉 current leader's latest offset 8260512 is less than replica's latest offset 8314390 (kafka.server.ReplicaFetcherThread)

  • 1、确保kafka客户端版本和服务版本一致(务必)。
    2、kafka版本的bug,你现在的版本bug最多,包含此错。具体参考:https://issues.apache.org/jira/browse/KAFKA-6292
    3、如果以上都不是,可参考更多:https://issues.apache.org/jira/browse/KAFKA-6292?jql=ORDER%20BY%20lastViewed%20DESC

    • 再咨询一下, 根据以上代码 poll 了一批数据处理,因为业务处理出现异常,这里应该怎么处理比较好呢; 上述的代码的逻辑应该是直接抛弃了这批数据,这样肯定不妥?麻烦赐教

        • 目前是手动管理 offset, 手动记录并控制消费位置,相当于offset 提交已经没有作用了, 提不提交也无所谓了。这种情况感觉和您的回复不符吧?望赐教。

            • 我知道,你需要先把消息commit之后,然后丢给业务层,业务层报不报错,是业务层的问题,你只要保证你的commit没问题。
              另外,提交已经没有作用了,但是这条消息业务层已经处理了,说明你这样会重复消费(你可能用redis保证了),那如果想恢复,就销毁掉消费者,重新创建,重新拉消息。

                • 感谢! 是不是可以这样理解,consumer poll 一堆消息之后,每条消息都直接扔给其他线程去处理,扔完之后,直接更新 redis 的 offset(相当于是提交了offset),相当于消费 和 业务逻辑通过线程分开了