你的kafka数据是不是写到/tmp
目录了,这个目录会被系统给回收。
先用命令消费一下topic,看看是否还有数据在。
(ps:我在想你seek之后,消费者对象是不是要重新创建。)
你好,我现在碰到一个问题,current-offset=484338,log-end-offset=494902,lag=10564,但客户端无法poll任何数据,也不阻塞。
topic只有一个0分区,分组也是原来的分组,max-poll-records=10,每条消息处理成功后,我会手动提交offset,代码:consumer.commitSync(offsets);如果处理失败,则会seek回原来的offset,代码: consumer.seek(new TopicPartition(record.topic(), partition),offset);,然后break循环重新poll,循环往复。
之前测试好好的,结果第二天上班之后,就再也poll不到数据了,重启程序也不行,请问大佬,问题出在哪呢?代码如下:
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();
System.out.println("reading...");
while (recordIterator.hasNext()){
ConsumerRecord<String, String> record = recordIterator.next();
String key = record.key();
String value = record.value();
long offset = record.offset();
int partition = record.partition();
try {
if (offset == 25) {
int i = 1 / 0;
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
offsets.put(new TopicPartition(record.topic(), partition), new OffsetAndMetadata(offset+1));
consumer.commitSync(offsets);
System.out.println("partition:" + partition + ",offset:" + offset + ",key:" + key+ ",value:" + value);
}
catch (Exception e) {
System.out.println(offset+":"+e.getMessage());
consumer.seek(new TopicPartition(record.topic(), partition),offset);
Thread.sleep(1000);
break;
}
}
}