如何实时监控kafka队列是否有新数据

克里斯蒂安 发表于: 2019-11-18   最后更新时间: 2019-11-18  

监控kafka队列是否有新数据进来

我有个人任务要从定时改为实时,有木有什么比较高效的办法能让我实时监听kafka的队列
只要有新数据进来我就立刻拿走,谢谢



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka集群宕机一台后无法消费
下一条: kafka-simple-consumer-shell.sh脚本不存在了,如何解析__consumer_offsets日志

  • java客户端实例:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    

    其中

    consumer.poll(100);
    

    为长轮询,每个100ms去kafka里拉取消息,如果有,就将直接返回。

    例子来自:https://www.orchome.com/451

    • 我用的springboot自带的监听注解 @KafkaListener(topics = "test") ,这种是一监听到新消息就会收到消息,还是有个默认的拉取消息间隔时间?