kafka consumer不消费消息,consumer.poll(100)不返回数据

野心永恒 发表于: 2019-07-25   最后更新时间: 2021-09-16 19:58:41   8,546 游览

使用kafka consumer不消费消息,poll方法不返回数据

这是创建消费者的代码:

Properties props = new Properties();
props.put("bootstrap.servers", "172.16.0.93:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("zombie1"));

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  }
}

我使用命令,可以看到消息已经推送到了kafka

bin/kafka-console-consumer.sh --bootstrap-server 172.16.0.93:9092 --topic zombie1 --from-beginning

大神,有没有遇到这样的现象?求救啊

发表于 2019-07-25

刚才刚到了这篇https://www.orchome.com/1619 ,但是我这边是打开的,配置如下:

listeners=PLAINTEXT://172.16.0.93:9092

消费前输出如下:

[2019-07-25 14:47:34,604][org.apache.kafka.clients.consumer.ConsumerConfig:165] [INFO ] ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = test
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [172.16.0.93:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = latest

[2019-07-25 14:47:34,719][org.apache.kafka.common.utils.AppInfoParser:82] [INFO ] Kafka version : 0.9.0.1
[2019-07-25 14:47:34,719][org.apache.kafka.common.utils.AppInfoParser:83] [INFO ] Kafka commitId : 23c69d62a0cabf06
半兽人 -> 野心永恒 4年前

这样就可以了,你在往里面发消息,就能消费到了。

野心永恒 -> 半兽人 4年前

那就是说我之前发的消息都消费不到了,是吗?

半兽人 -> 野心永恒 4年前

嗯,你要调整一个参数,可以消费到之前的,但是没必要。
就跟订报纸一样,一开始报社是不知道你,从当你从订的那刻,报社就知道你了

野心永恒 -> 半兽人 4年前

哦哦。好的。谢谢大神

半兽人 -> 野心永恒 4年前

结贴吧。

你的答案

查看kafka相关的其他问题或提一个您自己的问题