kafka消费者只能消费前面几条数据,后面新生产的数据死活获取不到是什么原因?

today 发表于: 2021-03-04   最后更新时间: 2021-03-04 09:31:01   235 游览
0

kafka库里面已经存在200多条数据,但是用代码去消费,只能消费到前面6条。后面新的数据一直消费不到。有大佬遇到过吗?



发表于 1月前

  • 兄弟,你得把你的核心代码贴出来呀,都不知道你是怎么做的,咋帮你定位问题呢。。

    你可以尝试用官方的例子,直接消费,看看是否正常:

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "true");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    

    参考来自:https://www.orchome.com/451

    • class ConsumerRunner implements Runnable {
      
          private KafkaConsumer<String, String> consumer;
      
          private String clientId;
      
          ConsumerRunner(String clientId) {
              Properties props = new Properties();
              // kafka地址 brokers集群地址用,隔开
              props.put("bootstrap.servers", kafkaAddress);
              // groupid
              props.put("group.id", kafkaGroupId);
              // session超时时间
              props.put("session.timeout.ms", kafkaConfig.get("session.timeout.ms"));
              // 是否开启自动提交
              props.put("enable.auto.commit", kafkaConfig.get("enable.auto.commit"));
              // 自动提交的时间间隔
              props.put("auto.commit.interval.ms", kafkaConfig.get("auto.commit.interval.ms"));
              // key的解码方式
              props.put("key.deserializer", kafkaConfig.get("key.deserializer"));
              // value的解码方式
              props.put("value.deserializer", kafkaConfig.get("value.deserializer"));
              //security.protocol
              props.put("security.protocol", kafkaConfig.get("security.protocol"));
              // sasl.mechanism
              props.put("sasl.mechanism", kafkaConfig.get("sasl.mechanism"));
              String loginInfo = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                      + kafkaAccount + "\"  password=\"" + kafkaPassword + "\";";
              // sasl.jaas.config
              props.put("sasl.jaas.config", loginInfo);
              // Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项:latest, earliest, none
              props.put("auto.offset.reset", kafkaConfig.get("auto.offset.reset"));
              // 批量一次最大拉取数据量
              props.put("max.poll.records", kafkaConfig.get("max.poll.records"));
              // 心跳
              props.put("heartbeat.interval.ms", kafkaConfig.get("heartbeat.interval.ms"));
              // 处理逻辑最大时间
              props.put("max.poll.interval.ms", kafkaConfig.get("max.poll.interval.ms"));
              // 请求响应的最长等待时间
              props.put("request.timeout.ms", kafkaConfig.get("request.timeout.ms"));
              this.consumer = new KafkaConsumer<>(props);
              this.clientId = clientId;
          }
      
          @Override
          public void run() {
              try {
                  Thread.currentThread().setName(clientId);
                  consumer.subscribe(Collections.singleton(kafkaTopic));
                  // 轮询
                  while (GlobalVar.ThirdYCPoliceInstance.getInstance().isKafkaIsRunning() && !Thread.currentThread().isInterrupted()) {
                      try {
                         ConsumerRecords<String, String> records = consumer.poll(100);
                          for (TopicPartition partition : records.partitions()) {
                              List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                              logger.info("partitionRecords:{}", JSON.toJSONString(partitionRecords));
                              for (ConsumerRecord<String, String> record : partitionRecords) {
                                  logger.info("kafka监听器收到了消息:{}", String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(),
                                          record.partition(), record.offset(), record.key(), record.value()));
                                if (StringUtils.isNotBlank(record.value())) {
                                     kafkaMessageService.wholeHandlerKafkaMsg(record.value());
                                  }
                              }
                              long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                              logger.info("kafka lastOffSet:{}", lastOffset);
                              consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                          }
                      } catch (Exception e) {
                          logger.error("kafka消费失败!", e);
                      }
                  }
              } catch (Exception e) {
                  logger.error("kafka消费运行失败!", e);
              } finally {
                  consumer.close();
              }
          }
      }
      
        • 这个是消费者日志里面的配置信息

          [ConsumerConfig values: 
            auto.commit.interval.ms = 1500
            auto.offset.reset = earliest
            bootstrap.servers = [111.111.111.111:9092]
            check.crcs = true
            client.id = 
            connections.max.idle.ms = 540000
            enable.auto.commit = false
            exclude.internal.topics = true
            fetch.max.bytes = 52428800
            fetch.max.wait.ms = 500
            fetch.min.bytes = 1
            group.id = TestGroup
            heartbeat.interval.ms = 10000
            interceptor.classes = null
            internal.leave.group.on.close = true
            isolation.level = read_uncommitted
            key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
            max.partition.fetch.bytes = 1048576
            max.poll.interval.ms = 60000
            max.poll.records = 30
            metadata.max.age.ms = 300000
            metric.reporters = []
            metrics.num.samples = 2
            metrics.recording.level = INFO
            metrics.sample.window.ms = 30000
            partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
            receive.buffer.bytes = 65536
            reconnect.backoff.max.ms = 1000
            reconnect.backoff.ms = 50
            request.timeout.ms = 65000
            retry.backoff.ms = 100
            sasl.jaas.config = [hidden]
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.min.time.before.relogin = 60000
            sasl.kerberos.service.name = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            sasl.kerberos.ticket.renew.window.factor = 0.8
            sasl.mechanism = PLAIN
            security.protocol = SASL_PLAINTEXT
            send.buffer.bytes = 131072
            session.timeout.ms = 30000
            ssl.cipher.suites = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            ssl.endpoint.identification.algorithm = null
            ssl.key.password = null
            ssl.keymanager.algorithm = SunX509
            ssl.keystore.location = null
            ssl.keystore.password = null
            ssl.keystore.type = JKS
            ssl.protocol = TLS
            ssl.provider = null
            ssl.secure.random.implementation = null
            ssl.trustmanager.algorithm = PKIX
            ssl.truststore.location = null
            ssl.truststore.password = null
            ssl.truststore.type = JKS
            value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
          ]
          
            • 怀疑这段代码

              long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
              logger.info("kafka lastOffSet:{}", lastOffset);
              consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
              

              其中

              partitionRecords.size()
              

              这个明显不是offset的值,换成

              consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
              
                • 查询一下,消费者组的消费情况,贴一下结果:

                  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
                  
                    • 客户回了。提示错误。Error:Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException:Call(callName=findCoordinator,deadLineMs=1614796617143) timed out at 1614796617143 after 1 attempt(s)

                        • kafka运行的日志能看出点问题出来吗?我看日志好像一直在Preparing to read 0 bytes of data for partition 420502007003_1609815065726-0 with offset 6

                            • 你的客户很敷衍那,我提供的命令肯定要修改成你们的ip和端口,消费者组也要换成你们的。
                              感觉他们直接拿着localhost:9092直接执行的。

                                • 哈哈,是这样的。没办法,我们求着跟他们对接。。。大佬,我问下这种情况会不会是权限哪里有问题?因为刚开始请求客户kafka的时候,会提示Not authorized to access group:testGroup 后面客户说授权了我就可以拿到数据,但只能拿到几条

                                    • 1、认证成功之后,就跟认证无关了。
                                      2、后面,客户是不是没有新的消息产生那?
                                      3、你的线程写的有问题,最好先在main方法里跑,如果有>1个线程在执行你的逻辑,会导致混乱。

                                        • 就是说我能拿到几条消息了应该就不是权限的问题了。客户是有新的消息产生的,有新推消息。我试过重置偏移量到最新开始读

                                          consumer.poll(0); consumer.seekToBeginning(consumer.partitionsFor(kafkaTopic).stream() .map(partitionInfo -> new TopicPartition(kafkaTopic, partitionInfo.partition())) .collect(Collectors.toList()));
                                          

                                          也还是读到6的偏移量那边。我再修改代码看看吧。谢谢了

                                            • 大佬,咨询下,有台能连上对方kafka的电脑,我安装个kafka。对方kafka是有账号验证的。能通过命令来查看到一些消费信息这些的吗?

                                              • 找不到想要的答案?

                                                我要提问
                                                相关