Perry

0 声望

这家伙太懒,什么都没留下

个人动态
  • 半兽人 回复 Perrykafka消费者Java客户端 中 :

    我看配置没问题,你用官方的例子跑一下看看,别线程

    4年前
  • Perrykafka消费者Java客户端 发表评论:
    props.put("security.protocol", "SASL_PLAINTEXT");
               props.put("sasl.mechanism", "PLAIN");
               props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule "
                        + "required username=\"" + userName + "\" password=\"" + password + "\";");                
                   props.put("bootstrap.servers", bootstrapServer);
            // 分组
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "true");
            // 自动提交间隔(毫秒),默认为1000
            props.put("auto.commit.interval.ms", "1000");
            // 从topic的开始位置消费所有消息.
            props.put("auto.offset.reset", "latest");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     void start() {
            IS_RUNNING = true;
            KafkaConsumer<string, string=""> kafkaConsumer = new KafkaConsumer&lt;&gt;(
                    bulidProperties(bootstrapServer, this.groupId));
            kafkaConsumer.subscribe(Arrays.asList(topic));
            Runnable runnable = () -&gt; {
                try {
                    while (IS_RUNNING) {
                        ConsumerRecords<string, string=""> records = kafkaConsumer.poll(100);
                        for (ConsumerRecord<string, string=""> record : records) {
                            messageCallback.notify(String.format("%s_comsumer", topic), record.key(), record.value());
                        }
                    }
                } finally {
                    kafkaConsumer.close();
                }
            };
            Thread consumeThrd = new Thread(runnable, String.format("%s_comsumer", topic));
            consumeThrd.start();
        }
    

    kafka版本2.11-0.11.0.0
    您好,目前产生一个问题,正常消费状态,如果单独把kafka,zk重启,会重复消费7天数据,也就是默认配置的168小时的数据,无论设置offsets.retention.minutes=14400,还是设置消费端,只要重启就会重新消费168小时的数据,实在搞不懂为什么了?

    4年前