kafka如何做到先启动producer生产消息,然后再启动consumer消费消息呢?

素墨月羽 发表于: 2019-06-21   最后更新时间: 2019-06-29  

标题可能描述的不清楚,具体情况是我现在可以消费到消息,但是只能先把consumer启动起来,并且在轮询时间内由producer发送消息(或者通过命令行发送消息也可以),这样consumer才能接收到消息,consumer的auto.offset.reset未设置,如果设置成earliest是可以不启动producer,只启动consumer直接拉取全部消息的。

我以为consumer是poll模式拉取消息,那应该producer生产的消息,只要没有被其他consumer消费掉(在我本机调试,可以保证没有其他消费者),consumer启动就可以直接拉取到offset至HW之间的消息才是吧。

比较典型的应用场景就是比如线上的consumer挂了,那重启consumer之后应该能直接拉取到宕机这段时间生产者消费的消息吧,但是目前我好像做不到。

producer代码如下

public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer producer = new KafkaProducer(props);
        for (int i = 0; i < 100; i++) {
            ProducerRecord record = new ProducerRecord<>("test", String.valueOf(i));
            try {
                producer.send(record).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.print("消息已发送");
    }

consumer代码如下

public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
                , "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
                , "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("test"));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(50000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.print(record.value());
        }

        System.out.print("消费消息");
    }


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容





发表于: 2月前   最后更新时间: 1月前   游览量:251
上一条: kafka消费topic过程中出现的个别bug情况如何处理?
下一条: 1个kafka client 访问2个不同的有验证权限的kafka 服务,java.security.auth.login.config 该如何配置,以满足2个配置文件

  • 顺序不能变,比如订报纸,当消费者向报社告知他需要的时候,报社才会知道有这个消费者,所以之后的消息会发给这个消费者。

    • 明白了,大佬举的列子可以说是非常形象生动了,所以线上使用kafka的话,单机consumer肯定是不行的吧,一旦挂了,肯定会丢一些消息了