props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule "
+ "required username=\"" + userName + "\" password=\"" + password + "\";");
props.put("bootstrap.servers", bootstrapServer);
// 分组
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
// 自动提交间隔(毫秒),默认为1000
props.put("auto.commit.interval.ms", "1000");
// 从topic的开始位置消费所有消息.
props.put("auto.offset.reset", "latest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
void start() {
IS_RUNNING = true;
KafkaConsumer<string, string=""> kafkaConsumer = new KafkaConsumer<>(
bulidProperties(bootstrapServer, this.groupId));
kafkaConsumer.subscribe(Arrays.asList(topic));
Runnable runnable = () -> {
try {
while (IS_RUNNING) {
ConsumerRecords<string, string=""> records = kafkaConsumer.poll(100);
for (ConsumerRecord<string, string=""> record : records) {
messageCallback.notify(String.format("%s_comsumer", topic), record.key(), record.value());
}
}
} finally {
kafkaConsumer.close();
}
};
Thread consumeThrd = new Thread(runnable, String.format("%s_comsumer", topic));
consumeThrd.start();
}
kafka版本2.11-0.11.0.0
您好,目前产生一个问题,正常消费状态,如果单独把kafka,zk重启,会重复消费7天数据,也就是默认配置的168小时的数据,无论设置offsets.retention.minutes=14400,还是设置消费端,只要重启就会重新消费168小时的数据,实在搞不懂为什么了?