kafka poll records数据为空?

技颠 发表于: 2020-09-09   最后更新时间: 2020-09-09 18:42:47   3,298 游览

提问说明

你好,我现在遇到了一个问题,我在消费我的topic里面数据的时候,很多次都进入了:TimeUnit.MILLISECONDS.sleep(100);,但是此时这个topic下面的各个partition是有数据的,这个时候为什么没有拉去到数据呢?然后我把topic换成别人的topic后,我发现消费速度就很快,很少进入:TimeUnit.MILLISECONDS.sleep(100);,这两个topic都是同一个kafka,都是5个partition。示例代码如下,期待您的回复:

import com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class T1 {

public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "102.31.117.188:9092");
    properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");

    KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new BytesDeserializer());
    consumer.subscribe(Lists.newArrayList("sw-segments"));
    try {
        while (true) {
            long l = System.currentTimeMillis();
            ConsumerRecords<String, Bytes> records = consumer.poll(100);
            long l1 = System.currentTimeMillis();
            if (records.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(100);
            } else {
                System.out.println(String.format("thread:%s,time: %s,size:%s time:%s", Thread.currentThread().getName(), System.currentTimeMillis() / 1000, records.count(), l1 - l));
            }
        }
    } finally {
        consumer.close();
    }
}
}
发表于 2020-09-09
添加评论

kafka是批量拉取消息的,你的topic可能没有另一个topic的持续和消息紧凑,你可以从头开始消费进行测试,保障消息充足。

技颠 -> 半兽人 3年前

我是一次性放了10000000这么多数据进去的,应该不存在生产者生产数据不足的情况。没有另一个topic的持续和消息紧凑,这个我怎么保证这个消息的紧凑呢?

半兽人 -> 技颠 3年前

如果消息充足的情况下,是不会进行任何休眠(5个分区确定平分了10000000消息吗,别有的分区是空了)
持续消息紧凑我指的是新的消息没有消费者拉取的快,就会为空,也就会触发你的休眠。

技颠 -> 半兽人 3年前

你好,我进行了从头开始消费测试,发现还是有一个数据的消费速度快,而我自己的数据慢了很多。

半兽人 -> 技颠 3年前

速度快慢不重要,因为消息体的大小有影响,重要的是没有休眠间断。

技颠 -> 半兽人 3年前

关键是消费速度快的那个topic的数据消息体要比速度慢的消息还要大一些(通过record.serializedValueSize())这个计算,通过测试发现确实在consumer.poll(100)这个操作之后,我自身的topic返回空的消息的频率要比别人的topic要多一些。还有一个,就是我的消息是有key的,他们的消息没有key,这个会有这么大影响吗?

技颠 -> 半兽人 3年前

你好,我发现和有没有key没有关系,数据都是一次性写入,也都是从头开始消费的,只是有时候我这么没有拉到数据的频率高一些(但实际是有数据的)。当没有拉到数据的时候,耗时就比较久一点,但实际上拉到数据的时候,每次拉取的时间就非常短。

技颠 -> 半兽人 3年前

返回的消息为空是因为org.apache.kafka.clients.consumer.KafkaConsumer#poll(org.apache.kafka.common.utils.Timer, boolean)中的timer超时了,但是这个topic的数据都是同一条数据(内容一样),正常的时候拉取500条只用1毫秒,但有时候会超时。有什么因素能导致这个频繁超时呢?

半兽人 -> 技颠 3年前

我给你解释一下poll(100)吧,客户端是主动向kafka拉取消息的,100是去向kafka拉取的频率时间,当每次去kafka,都能拿到消息,那么,这个100ms就不会起作用,但,当拉取的消息是空的时候,kafka客户端就会认为,kafka的消息已经被我消费完了,那我就隔一段时间(100ms)再去向kafka拿,这个就是所谓的kafka长轮询。

从你上面的配置来看,500条可能一个批次的消息了,如果拿了小与500条的,那是不是说明消息不够了,自然就会触发100ms的等待时间。

我不太清楚你的环境情况,分区副本情况,分区的分布情况,所以很难去帮你分析更多具体的信息,但是核心就是我上面说的。

你设置了自己的休眠时间,如果是测试目的,可以,如果你故意的,那就没必要了,你直接改100ms就可以了。

不过,你可以缩小你的测试范围,比如分区数,分区的副本数,然后分区在你集群中的分配情况,从1个分区开始往上进行测试,让场景先变得单一点。好定位问题。

技颠 -> 半兽人 3年前

好的,谢谢。我修改了每次拉取100条,效果是好一点了(1分钟能拉取的数据量更多了一些)。还是会有偶尔拉取为空的情况,我再找找为什么会有这个偶尔超时的情况。

你的答案

查看kafka相关的其他问题或提一个您自己的问题