靠谱的人

0 声望

这家伙太懒,什么都没留下

个人动态
  • 半兽人 回复 靠谱的人kafka消费者Java客户端 中 :

    到问题专区里提问吧,可以深入。

    3年前
  • 半兽人 回复 靠谱的人kafka消费者Java客户端 中 :

    你的kafka数据是不是写到/tmp目录了,这个目录会被系统给回收。
    先用命令消费一下topic,看看是否还有数据在。
    (ps:我在想你seek之后,消费者对象是不是要重新创建。)

    3年前
  • 靠谱的人 回复 半兽人kafka消费者Java客户端 中 :

    你好,我现在碰到一个问题,current-offset=484338,log-end-offset=494902,lag=10564,但客户端无法poll任何数据,也不阻塞。

    topic只有一个0分区,分组也是原来的分组,max-poll-records=10,每条消息处理成功后,我会手动提交offset,代码:consumer.commitSync(offsets);如果处理失败,则会seek回原来的offset,代码: consumer.seek(new TopicPartition(record.topic(), partition),offset);,然后break循环重新poll,循环往复。

    之前测试好好的,结果第二天上班之后,就再也poll不到数据了,重启程序也不行,请问大佬,问题出在哪呢?代码如下:

    while (true){
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();
        System.out.println("reading...");
        while (recordIterator.hasNext()){
            ConsumerRecord<String, String> record = recordIterator.next();
            String key = record.key();
            String value = record.value();
            long offset = record.offset();
            int partition = record.partition();
    
            try {
                if (offset == 25) {
                    int i = 1 / 0;
                }
    
                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
                offsets.put(new TopicPartition(record.topic(), partition), new OffsetAndMetadata(offset+1));
                consumer.commitSync(offsets);
    
                System.out.println("partition:" + partition + ",offset:" + offset + ",key:" + key+ ",value:"  + value);
            }
            catch (Exception e) {
                System.out.println(offset+":"+e.getMessage());
                consumer.seek(new TopicPartition(record.topic(), partition),offset);
                Thread.sleep(1000);
                break;
            }
        }
    }
    
    3年前