为什么使用ConsumerRecords获取到的timeStamp为-1

提问说明

创建topic时用--config指定creataTime或者LogApppendTime获取timeStamp都为-1

server.proerties中已加入message.timestamp.type配置。获取到的timeStamp仍为-1






发表于: 25天前   最后更新时间: 25天前   游览量:135
上一条: 到头了!
下一条: 已经是最后了!

  • log.message.timestamp.type配置好后,你是怎么获取的?
    • public static void main(String[] args) {
      Properties props = new Properties();
      props.put("bootstrap.servers", "172.30.79.134:6667,172.30.79.135:6667,192.168.1.219:6667,192.168.1.220:6667,192.168.1.221:6667");
      //props.put("bootstrap.servers", "47.107.148.97:9092");
      //props.put("bootstrap.servers", "106.12.73.246:9092");
      //props.put("config","message.timestamp.type=LogAppendTime");
      props.put("group.id", "test-consumer-group");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      Consumer consumer = new KafkaConsumer(props);
      //TopicPartition partition01 = new TopicPartition("gcgTest7", 0);
      //consumer.assign(Arrays.asList(new TopicPartition[]{partition01}));
      //consumer.seek(partition01,10);
      consumer.subscribe(Arrays.asList(new String[]{"testGcg1"}));
      while (true) {
      //--消费100ms内的数据
      ConsumerRecords rs = consumer.poll(100);
      //--遍历得到的结果集
      for (ConsumerRecord r : rs) {
      //--得到信息 打印
      System.out.println("--topic:[" + r.topic() + "]"
      + "--partition:[" + r.partition() + "]"
      + "--offset:[" + r.offset() + "]"
      + "--timestamp:[" + r.timestamp() + "]"
      + "--key:[" + r.key() + "]"
      + "--value:[" + r.value() + "]--" +
      "type="+r.timestampType());}
      }
      }
        • 你kafka什么版本呢,默认情况下,不需要任何配置,时间戳是可以获取到的,除非你的kafka版本过低。
          你可以关注我的微信公众号,里面分享了一个完整的例子。