kafka消费者只能消费前面几条数据,后面新生产的数据死活获取不到是什么原因?

today 发表于: 2021-03-04   最后更新时间: 2021-03-04 09:31:01   2,351 游览

kafka库里面已经存在200多条数据,但是用代码去消费,只能消费到前面6条。后面新的数据一直消费不到。有大佬遇到过吗?

发表于 2021-03-04
添加评论

兄弟,你得把你的核心代码贴出来呀,都不知道你是怎么做的,咋帮你定位问题呢。。

你可以尝试用官方的例子,直接消费,看看是否正常:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

参考来自:https://www.orchome.com/451

today -> 半兽人 3年前
class ConsumerRunner implements Runnable {

    private KafkaConsumer<String, String> consumer;

    private String clientId;

    ConsumerRunner(String clientId) {
        Properties props = new Properties();
        // kafka地址 brokers集群地址用,隔开
        props.put("bootstrap.servers", kafkaAddress);
        // groupid
        props.put("group.id", kafkaGroupId);
        // session超时时间
        props.put("session.timeout.ms", kafkaConfig.get("session.timeout.ms"));
        // 是否开启自动提交
        props.put("enable.auto.commit", kafkaConfig.get("enable.auto.commit"));
        // 自动提交的时间间隔
        props.put("auto.commit.interval.ms", kafkaConfig.get("auto.commit.interval.ms"));
        // key的解码方式
        props.put("key.deserializer", kafkaConfig.get("key.deserializer"));
        // value的解码方式
        props.put("value.deserializer", kafkaConfig.get("value.deserializer"));
        //security.protocol
        props.put("security.protocol", kafkaConfig.get("security.protocol"));
        // sasl.mechanism
        props.put("sasl.mechanism", kafkaConfig.get("sasl.mechanism"));
        String loginInfo = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + kafkaAccount + "\"  password=\"" + kafkaPassword + "\";";
        // sasl.jaas.config
        props.put("sasl.jaas.config", loginInfo);
        // Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项:latest, earliest, none
        props.put("auto.offset.reset", kafkaConfig.get("auto.offset.reset"));
        // 批量一次最大拉取数据量
        props.put("max.poll.records", kafkaConfig.get("max.poll.records"));
        // 心跳
        props.put("heartbeat.interval.ms", kafkaConfig.get("heartbeat.interval.ms"));
        // 处理逻辑最大时间
        props.put("max.poll.interval.ms", kafkaConfig.get("max.poll.interval.ms"));
        // 请求响应的最长等待时间
        props.put("request.timeout.ms", kafkaConfig.get("request.timeout.ms"));
        this.consumer = new KafkaConsumer<>(props);
        this.clientId = clientId;
    }

    @Override
    public void run() {
        try {
            Thread.currentThread().setName(clientId);
            consumer.subscribe(Collections.singleton(kafkaTopic));
            // 轮询
            while (GlobalVar.ThirdYCPoliceInstance.getInstance().isKafkaIsRunning() && !Thread.currentThread().isInterrupted()) {
                try {
                   ConsumerRecords<String, String> records = consumer.poll(100);
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        logger.info("partitionRecords:{}", JSON.toJSONString(partitionRecords));
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            logger.info("kafka监听器收到了消息:{}", String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(),
                                    record.partition(), record.offset(), record.key(), record.value()));
                          if (StringUtils.isNotBlank(record.value())) {
                               kafkaMessageService.wholeHandlerKafkaMsg(record.value());
                            }
                        }
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        logger.info("kafka lastOffSet:{}", lastOffset);
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }
                } catch (Exception e) {
                    logger.error("kafka消费失败!", e);
                }
            }
        } catch (Exception e) {
            logger.error("kafka消费运行失败!", e);
        } finally {
            consumer.close();
        }
    }
}
today -> 半兽人 3年前

这个是消费者日志里面的配置信息

[ConsumerConfig values: 
  auto.commit.interval.ms = 1500
  auto.offset.reset = earliest
  bootstrap.servers = [111.111.111.111:9092]
  check.crcs = true
  client.id = 
  connections.max.idle.ms = 540000
  enable.auto.commit = false
  exclude.internal.topics = true
  fetch.max.bytes = 52428800
  fetch.max.wait.ms = 500
  fetch.min.bytes = 1
  group.id = TestGroup
  heartbeat.interval.ms = 10000
  interceptor.classes = null
  internal.leave.group.on.close = true
  isolation.level = read_uncommitted
  key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  max.partition.fetch.bytes = 1048576
  max.poll.interval.ms = 60000
  max.poll.records = 30
  metadata.max.age.ms = 300000
  metric.reporters = []
  metrics.num.samples = 2
  metrics.recording.level = INFO
  metrics.sample.window.ms = 30000
  partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  receive.buffer.bytes = 65536
  reconnect.backoff.max.ms = 1000
  reconnect.backoff.ms = 50
  request.timeout.ms = 65000
  retry.backoff.ms = 100
  sasl.jaas.config = [hidden]
  sasl.kerberos.kinit.cmd = /usr/bin/kinit
  sasl.kerberos.min.time.before.relogin = 60000
  sasl.kerberos.service.name = null
  sasl.kerberos.ticket.renew.jitter = 0.05
  sasl.kerberos.ticket.renew.window.factor = 0.8
  sasl.mechanism = PLAIN
  security.protocol = SASL_PLAINTEXT
  send.buffer.bytes = 131072
  session.timeout.ms = 30000
  ssl.cipher.suites = null
  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  ssl.endpoint.identification.algorithm = null
  ssl.key.password = null
  ssl.keymanager.algorithm = SunX509
  ssl.keystore.location = null
  ssl.keystore.password = null
  ssl.keystore.type = JKS
  ssl.protocol = TLS
  ssl.provider = null
  ssl.secure.random.implementation = null
  ssl.trustmanager.algorithm = PKIX
  ssl.truststore.location = null
  ssl.truststore.password = null
  ssl.truststore.type = JKS
  value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
]
半兽人 -> today 3年前

怀疑这段代码

long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
logger.info("kafka lastOffSet:{}", lastOffset);
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));

其中

partitionRecords.size()

这个明显不是offset的值,换成

consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
today -> 半兽人 3年前

这段代码我也有怀疑过。后面我注释掉。改成自动提交,也还是一样的效果

半兽人 -> today 3年前

查询一下,消费者组的消费情况,贴一下结果:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
today -> 半兽人 3年前

大佬。有没有别的办法确认?因为这个kafka是客户那边的。咨询了下他没给我

半兽人 -> today 3年前

你找一台安装kafka的机器,执行就可以了,会调到那里的。

today -> 半兽人 3年前

客户回了。提示错误。Error:Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException:Call(callName=findCoordinator,deadLineMs=1614796617143) timed out at 1614796617143 after 1 attempt(s)

today -> 半兽人 3年前

kafka运行的日志能看出点问题出来吗?我看日志好像一直在Preparing to read 0 bytes of data for partition 420502007003_1609815065726-0 with offset 6

半兽人 -> today 3年前

你的客户很敷衍那,我提供的命令肯定要修改成你们的ip和端口,消费者组也要换成你们的。
感觉他们直接拿着localhost:9092直接执行的。

today -> 半兽人 3年前

哈哈,是这样的。没办法,我们求着跟他们对接。。。大佬,我问下这种情况会不会是权限哪里有问题?因为刚开始请求客户kafka的时候,会提示Not authorized to access group:testGroup 后面客户说授权了我就可以拿到数据,但只能拿到几条

today -> 半兽人 3年前

大佬,还有辙吗?代码可以拿到消费信息或者拿到最新的偏移量是多少吗

半兽人 -> today 3年前

1、认证成功之后,就跟认证无关了。
2、后面,客户是不是没有新的消息产生那?
3、你的线程写的有问题,最好先在main方法里跑,如果有>1个线程在执行你的逻辑,会导致混乱。

today -> 半兽人 3年前

就是说我能拿到几条消息了应该就不是权限的问题了。客户是有新的消息产生的,有新推消息。我试过重置偏移量到最新开始读

consumer.poll(0); consumer.seekToBeginning(consumer.partitionsFor(kafkaTopic).stream() .map(partitionInfo -> new TopicPartition(kafkaTopic, partitionInfo.partition())) .collect(Collectors.toList()));

也还是读到6的偏移量那边。我再修改代码看看吧。谢谢了

today -> today 3年前

大佬,咨询下,有台能连上对方kafka的电脑,我安装个kafka。对方kafka是有账号验证的。能通过命令来查看到一些消费信息这些的吗?

半兽人 -> today 3年前

能,命令带上认证就可以了。

https://www.orchome.com/1960#item-0-4

周边几篇文章都是,选择正确的加密算法就可以了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题