kafka手动管理offset,poll 请求拉取失败,一直轮询 ?

执念 发表于: 2019-12-13   最后更新时间: 2019-12-13 11:45:04   4,081 游览

多实例,手动管理offset,存储到redis,项目启动初始化时,读取offset,使用seek订阅消费,poll 拉取消息,刚调试的时候正常,过几天看下,poll 拉取消息失败

Kafka version : 0.10.1.1 , 多实例环境,手动管理offset

多线程消费,每个线程消费固定的 partition

 @Override
    public void run() {
        consumer.assign(Arrays.asList(topicPartition));
        String key = StartInit.REDIS_KEY_MSG + "topic:" + topicPartition.topic() + "partition:" + topicPartition.partition();
        String lockKey = REDIS_LOCK_KEY + "topic:" + topicPartition.topic() + "partition:" + topicPartition.partition();
        long position = getOffset(key);
        log.info("topic [{}], partition [{}] thread start,consume begin offset is [{}]  ...", topicPartition.topic(), topicPartition.partition(), position);
        consumer.seek(topicPartition, position);
        try {
            boolean running = true;
            while (running) {
                long offsetNewest;
                try {
                    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                    //保证事务
                    CustomTransactionCallback customTransactionCallback = new CustomTransactionCallback(0L, records);
                    transactionTemplate.execute(customTransactionCallback);
                    offsetNewest = customTransactionCallback.getOffsetNewest();
                } catch (Exception e) {
                    log.error("find exception [{}] ...", e);
                    continue;
                }
                consumer.commitSync();
                log.info("topic [{}], partition [{}], offsetNewest is [{}] ...", topicPartition.topic(), topicPartition.partition(), offsetNewest);
                boolean locked = redisManager.lock(lockKey, 180, 3);
                if (locked) {
                    try {
                        long offsetDb = getOffset(key);
                        if (offsetDb < offsetNewest) {
                            // 更新 redis
                            stringRedisTemplate.opsForValue().set(key, String.valueOf(offsetNewest));
                        }
                    } finally {
                        redisLockService.unlock(lockKey);
                    }
                }
            }
        } finally {
            consumer.close();
        }

    }
  1. 2019-12-13 09:34:24.613 [kafka_msg_consumer_2] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0
    2019-12-13 09:34:28.034 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.066 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.081 [kafka_msg_consumer_0] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2
    2019-12-13 09:34:28.081 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.096 [kafka_msg_consumer_2] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0
    2019-12-13 09:34:28.113 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.113 [kafka_msg_consumer_0] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2
    2019-12-13 09:34:28.144 [kafka_msg_consumer_1] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1
    2019-12-13 09:34:28.144 [kafka_msg_consumer_0] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2
    2019-12-13 09:34:28.159 [kafka_msg_consumer_2] WARN  o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0
    .....
    
    谢谢解答。
发表于 2019-12-13

1、确保kafka客户端版本和服务版本一致(务必)。
2、kafka版本的bug,你现在的版本bug最多,包含此错。具体参考:https://issues.apache.org/jira/browse/KAFKA-6292
3、如果以上都不是,可参考更多:https://issues.apache.org/jira/browse/KAFKA-6292?jql=ORDER%20BY%20lastViewed%20DESC

执念 -> 半兽人 4年前

检查了下,服务器版本和客户端版本不一致,谢谢大佬~

执念 -> 半兽人 4年前

再咨询一下, 根据以上代码 poll 了一批数据处理,因为业务处理出现异常,这里应该怎么处理比较好呢; 上述的代码的逻辑应该是直接抛弃了这批数据,这样肯定不妥?麻烦赐教

半兽人 -> 执念 4年前

kafka客户端不要涉及到业务,否则会非常复杂,让业务自己去处理,让业务决定是否重新丢回到kafka里,还是告警。
https://www.orchome.com/2046

执念 -> 半兽人 4年前

目前是手动管理 offset, 手动记录并控制消费位置,相当于offset 提交已经没有作用了, 提不提交也无所谓了。这种情况感觉和您的回复不符吧?望赐教。

半兽人 -> 执念 4年前

我知道,你需要先把消息commit之后,然后丢给业务层,业务层报不报错,是业务层的问题,你只要保证你的commit没问题。
另外,提交已经没有作用了,但是这条消息业务层已经处理了,说明你这样会重复消费(你可能用redis保证了),那如果想恢复,就销毁掉消费者,重新创建,重新拉消息。

执念 -> 半兽人 4年前

感谢! 是不是可以这样理解,consumer poll 一堆消息之后,每条消息都直接扔给其他线程去处理,扔完之后,直接更新 redis 的 offset(相当于是提交了offset),相当于消费 和 业务逻辑通过线程分开了

半兽人 -> 执念 4年前

是的

我用spring-kafka 2.2.4实现和你类似的功能,设置enable-auto-commit = false,且不做offset提交,但是跑了一段时间出现消费者停止了消费,有看到日志 org.apache.kafka.clients.FetchSessionHandler:394 - [Consumer clientId=consumer-2, groupId=ctk_rk_ad_report_high] Node 2 sent an invalid full fetch response with omitted=,不知道你有没有遇到过类似的情况

你的答案

查看kafka相关的其他问题或提一个您自己的问题