kafka单partition lag飙升,怎么解决

W 发表于: 2022-09-22   最后更新时间: 2022-09-22 11:21:34   74 游览

先说明下,有问题的kafka 是3个partition。

每次隔个三天五天的,lag监控就会报警,每次去看lag的消费情况,基本都是单partition的lag上升,去看业务日志,发现消费改partition的消费者不消费了(日志不继续打了,感觉是僵死了),消费的进程还在(机器资源没有明显上升,cpu、内存、io等都没)。

这时重启一下消费者脚本就可继续消费,lag会慢慢下降,想问下这个是什么问题,困扰很久了,一直报警。辛苦各位大佬帮忙解答下。

业务线用的kafka集群版本是kafka_2.11-0.10.1.0,代码引入的客户端版本是

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.1.0</version>
</dependency>
发表于 2022-09-22
W
添加评论

kafka消费者线程已经从你的java进程里结束了,所以会导致这个进程永远不会重连kafka进行消费了。

解决:kafka获取到消息之后,业务处理时,捕获异常,不要向上抛

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try{
            // 你的业务
        }
        catch (Exception e) {
            // 忽视异常,不要向上抛
        }
    }
}
W -> 半兽人 7天前

先谢大佬回答。除了这种情况,还有额外其他可能的情况吗?因为项目代码就是这样写的,而且数据量应该是比较大的,最起码不小,应该不会存在因partition没有数据而终止。此外,在lag升高时,看日志并没有明显的异常日志,应该也不会存在因为异常导致的中断情况。再次感谢大佬回复!

W -> 半兽人 7天前

下附业务消费者代码,仔细看了下,业务逻辑内部有捕获异常代码。

// 消费者代码
public boolean waitTeminal() {
        try {
            check();
            final ConsumerConnector consumer = createConsumer();

            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topicFrom, threadnum); // 一次从主题中获取一个数据

            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
                    .createMessageStreams(topicCountMap);

            List<KafkaStream<byte[], byte[]>> msgStreamList = messageStreams
                    .get(topicFrom);

            logger.debug(String.format("The Kafka Stream List @size: %s",
                    msgStreamList.size()));
            for (int i = 0; i < msgStreamList.size(); i++) {
                final KafkaStream<byte[], byte[]> stream = msgStreamList.get(i);
                ConsumerPool.execute(new Runnable() {

                    @Override
                    public void run() {

                        logger.info(String.format(
                                "kafka consumer started! commitBuffer: %s",
                                String.valueOf(commitBuffer)));

                        ConsumerIterator<byte[], byte[]> iterator = stream
                                .iterator();

                        int commitCount = 0;// 批量提交计数器
                        List<E> events = new ArrayList<>();
                        while (iterator.hasNext()) {
                            long startTime = System.currentTimeMillis();
                            // 消息
                            byte[] msg = iterator.next().message();
                            if (msg == null || msg.length <= 0) {
                                continue;
                            }
                            // 事件校验
                            E e = null;
                            try {
                                e = parse(new String(msg).trim());
                            } catch (Exception except) {
                                logger.error("json格式不合法:" + Arrays.toString(msg), except);
                                continue;
                            }

                            if (!validate(e)) {
                                continue;
                            }

                            if (events.size() < 5) {
                                events.add(e);
                            }

                            if (events.size() == 5){
                                List<O> os = transferList(events);
                                for (O out : os){
                                    if (null != out) {
                                        BusinessLogger.sendEvent(topicTo,
                                                kafkaPartitionBy(out), getBytes(out));
                                    }
                                }
                                events.clear();
                            }
//                            O out = transfer(e);

                            // commit计数累加
                            commitCount++;
                            if (commitCount >= commitBuffer) {
                                consumer.commitOffsets();
                                commitCount = 0;
                                logger.info("@batchCommitSize:"
                                        + commitBuffer
                                        + "\t@times:"
                                        + (System.currentTimeMillis() - startTime));
                            }
                        }

                    }
                });
            }

            return true;
        } catch (Exception e) {
            e.printStackTrace();
            logger.info(String
                    .format("the committing was wrong cause top reason"));
            return false;
        }
    }
 // 业务逻辑代码,这里这样做,是因为之前认为是单线程消费导致处理不过来导致的,之后想采用多线程的方式处理,但是offse不好控制,就简单这样写了,效果是有的,报警确实比较少了,但是还会有。
 @Override
    public List<Event> transferList(List<Event> events) {
        List<Event> results = new ArrayList<>();

        Future<Event> eventFuture0 = addTask(events.get(0));
        Future<Event> eventFuture1 = addTask(events.get(1));
        Future<Event> eventFuture2 = addTask(events.get(2));
        Future<Event> eventFuture3 = addTask(events.get(3));
        Future<Event> eventFuture4 = addTask(events.get(4));

        try {
            results.add(eventFuture0.get());
            results.add(eventFuture1.get());
            results.add(eventFuture2.get());
            results.add(eventFuture3.get());
            results.add(eventFuture4.get());
            logger.info("eventResult = " + JSON.toJSONString(results));
            return results;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventFuture0.isCancelled();
            eventFuture1.isCancelled();
            eventFuture2.isCancelled();
            eventFuture3.isCancelled();
            eventFuture4.isCancelled();
        }

        return null;
    }
半兽人 -> W 7天前

首先,你这个客户端版本太老了,已经弃用很久了,不排除版本兼容性问题。

参考新版的消费者方式:kafka消费者Java客户端
或者我之前写的例子:kafka客户端 - java

其次,多线程消费导致的,这块非常容易出问题,线程池必须是阻塞式的,就是你多线程池子虽然设置的上限是5个,但是其实你可以无限往里面丢(比如默认有1万的队列),但是在里面排队,真正处理的是5个,也会导致这个问题。

W -> 半兽人 7天前

再次感谢大佬回答,
1、我也担心客户端版本的影响,所以贴出了版本信息,有没有办法能确定客户端的版本是否有影响哇。
2、多线程消费在写这块代码的时候,考虑到了无线往里面丢的情况,但是我认为这里的代码是不会发生的。因为处理业务逻辑之前是同步分批次拿五条数据,一块丢进去,多线程处理完毕再出来,循环取后五个(外层是同步的,同步取数据赛给业务逻辑多线程处理)。对于整体的处理速度确实有提升,而且跑了差不多一个多月了,并未发现数据上的异常(应该是可行的)。

半兽人 -> W 7天前

验证不了哎,这个太底层了。

jdk版本,客户端版本与服务端不匹配都会造成这种现象。
我给你说的是最常见的2种情况导致的,据我所知目前是没有其他的原因会导致这个现象了。

W -> 半兽人 7天前

公司的kafka版本还不能乱升,我再调研调研,再试试,谢谢大佬了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题
提问