kafka 从seek的offset开始读取数据,这时服务器断开,该如何从中断的offset这里开始读取呢?

Game ID:白烛葵 发表于: 2021-03-16   最后更新时间: 2021-03-16 17:27:41   143 游览
0

kafka consumer通过seek的方式可以找到历史数据从那里开始读取,在读取一段时间后服务器自动重启了。如果重新运行这个程序的话,又是seek到之前的位置重复消费。我该如何从上次程序中断那个offset那里消费呢?

package com.example.demo.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class Consumer {

    public static void main(String[] args) throws ParseException {
//        hello();
        hi();
    }

    private static long timeToTimestamp(String time) throws ParseException {
        SimpleDateFormat fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date dt = fm.parse(time);
        long ts = dt.getTime();
        return ts;
    }

    private static void hello() throws ParseException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "tstest");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "100");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "tstest";
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();

        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
        long fetchDataTime = timeToTimestamp("2021-03-16 17:16:00");

        for(PartitionInfo partitionInfo : partitionInfos) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
        }
        consumer.assign(topicPartitions);

        Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);

        OffsetAndTimestamp offsetTimestamp = null;
        System.out.println("开始设置各分区初始偏移量...");
        for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
            // 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
            offsetTimestamp = entry.getValue();
            if(offsetTimestamp != null) {
                int partition = entry.getKey().partition();
                long timestamp = offsetTimestamp.timestamp();
                long offset = offsetTimestamp.offset();
                System.out.println("partition = " + partition +
                        ", time = " + df.format(new Date(timestamp))+
                        ", offset = " + offset);
                // 设置读取消息的偏移量
                consumer.seek(entry.getKey(), offset);
            }
        }
        System.out.println("设置各分区初始偏移量结束...");

        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
                        record.offset(), record.key(), record.value());}
        }
    }


    private static void hi() throws ParseException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "tstest");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "100");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "tstest";
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();

        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
        long fetchDataTime = timeToTimestamp("2021-03-16 17:16:00");

        for(PartitionInfo partitionInfo : partitionInfos) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
//            timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
        }
        consumer.assign(topicPartitions);
/*
        Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);

        OffsetAndTimestamp offsetTimestamp = null;
        System.out.println("开始设置各分区初始偏移量...");

        for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
            // 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
            offsetTimestamp = entry.getValue();
            if(offsetTimestamp != null) {
                int partition = entry.getKey().partition();
                long timestamp = offsetTimestamp.timestamp();
                long offset = offsetTimestamp.offset();
                System.out.println("partition = " + partition +
                        ", time = " + df.format(new Date(timestamp))+
                        ", offset = " + offset);
                // 设置读取消息的偏移量
                consumer.seek(entry.getKey(), offset);
            }
        }
        System.out.println("设置各分区初始偏移量结束...");
*/
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
                        record.offset(), record.key(), record.value());}
        }
    }
}


发表于 1月前

  • 不要seek了,如果你提交了offset,消费者组重新启动会从最后一次提交的地方开始。

    • 我也感觉是这样的,但是实际上跑出来没有从上次的地方继续,是参数哪里设置错误了吗。

      我把consumer的代码贴上。首先执行了hello,让他5点16的数据开始消费。然后按了停止,改为执行hi。好像因为auto.commit.reset默认是latest所以没有消费到数据。

        • GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
          tstest ts 0 3350064 3350064 0 - - -
          tstest tstest 2 3333998 3333998 0 - - -
          tstest tstest 1 3333398 3333398 0 - - -
          tstest tstest 0 3332604 3332604 0 - - -
          tstest ts 2 3350248 3350248 0 - - -
          tstest ts 1 3349688 3349688 0 - - -

          • 找不到想要的答案?

            我要提问
            相关