kafka消费者根据时间戳进行消费后,总是报错无法提交offset

流苏 发表于: 2019-10-21   最后更新时间: 2019-10-22  

kafka消费者根据时间戳进行消费后,总是报错无法提交offset,导致再开启其他消费者时都是从最开始的消息开始消费,无法从后面连续消费。大佬帮忙看看,十分感谢!

// 已设置为自动提交   spring.kafka.consumer.enable-auto-commit=true
@RequestMapping(value = "/incrementConsumer")
 public void IncrementConsumer() {
     ConsumerFactory consumerFactory = containerFactory.getConsumerFactory();
     Map<String, Object> properties = consumerFactory.getConfigurationProperties();

     for (Object key : properties.keySet()) {
         System.out.println("key:" + key + "---" + "value:" + properties.get(key));
     }
     Consumer consumer = consumerFactory.createConsumer();
         Map<TopicPartition, Long> startMap = new HashMap<>();
         List<PartitionInfo> partitionInfoList = consumer.partitionsFor("testIncrement");
         List<TopicPartition> topicPartitions = new ArrayList<>();
         for (PartitionInfo par : partitionInfoList) {
             topicPartitions.add(new TopicPartition(par.topic(), par.partition()));
             startMap.put(new TopicPartition("testIncrement", par.partition()), TableMessage.all_get_time.getTime());
         }

         DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

         consumer.assign(topicPartitions);
         Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = consumer.offsetsForTimes(startMap);
         Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
         System.out.println("startMap====" + startMap);
         System.out.println("startOffsetMap====" + startOffsetMap);

         OffsetAndTimestamp offsetTimestamp = null;

         for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : startOffsetMap.entrySet()) {
             // 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
             offsetTimestamp = entry.getValue();
             if (offsetTimestamp != null) {
                 int partition = entry.getKey().partition();
                 long timestamp = offsetTimestamp.timestamp();
                 long offset = offsetTimestamp.offset();
                 System.out.println("partition = " + partition +
                         ", time = " + df.format(new Date(timestamp)) +
                         ", offset = " + offset);
                 // 设置读取消息的偏移量
                 consumer.seek(entry.getKey(), offset);
             } else {
                 consumer.seek(entry.getKey(), endOffsets.get(entry.getKey()));
             }
         }
    while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
         ObjectMapper objectMapper = new ObjectMapper();
         System.out.print(records.count());
         for (ConsumerRecord<String, String> record : records) {
             System.out.println(record.key() + record.value());

         }
    }
 }

贴上报错信息


ERROR [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition topic01-0 at offset 205: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

WARN [Consumer clientId=consumer-1, groupId=console-consumer-56648] Asynchronous auto-commit of offsets {topic01-0=OffsetAndMetadata{offset=205, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

尝试过使用手动提交,但是会在提交的方法处报错

consumer.commitSync(); //该行报错

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: nginx代理转发kafka请求的问题
下一条: Uncaught exception in kafka-coordinator-heartbeat-thread kafka java.lang.OutOfMemoryError: Java heap space 消费者运行一段时间之后报了这样的错误

  • 朋友,根据时间戳之后,消费者都是按每个分区对应时间戳的offset开始消费的。
    你的重新开启消费者指的是什么?

    • 大佬,我的需求是这样的,开启指定时间戳消费后,如果这个消费者宕机后,我需要开启另一个消费者,那么这个消费者需要从宕机后的偏移量开始读取消息,但是前一个消费者我自动还是手动都无法提交偏移量,导致我后面开启的消费者总是从消息的最开启进行消费,而不是从宕机处开始消费。

        • 1、通过时间戳读取消息,是属于故障级别的消息重读,而非正常业务流程,此场景几乎不会用到
          2、很多人实现客户端想的太多,导致设计过度复杂,其实根本用不到
          3、消费者宕机,只要是正常关闭的情况下,是不会丢失任何消息 (kill -9 会丢)
          结论:普通的方式部署消费者即可,如果重要的业务,减少拉取的消息数,即使物理级别的故障,最多也只丢几笔(但是性能上就打折扣了)。

          另外,提交的错误原因是,当你提交offset的时候,消费者已经从成员里踢出来了,正在进行消费者重新平衡,所以你无法提交(默认为30秒)。