使用Acknowledgment.nack来暂停kafka消费,如果DB挂了

一如乞人不需要形象 发表于: 2023-01-05   最后更新时间: 2023-01-05 10:50:32   701 游览

我们现在有个业务,消费kafka持久化到DB,此时如果DB挂了,消费会报错,期望的是,每次拉完数据,检测DB是否正常,如果DB挂了,那么等待一段时间再重新拉数据,可以用Acknowledgment.nack来实现吗?例如:

public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
    if (isConsumeNeedStop) {
        ack.nack(0, 10000);
        LOGGER.info(errorInfo);
    } else {
        try {
            records.forEach(record -> consume(record));
        } finally {
            ack.acknowledge();
        }
    } 
}

如果可以的话,nack方法的第一个参数填什么?

发表于 2023-01-05

如果没报超时,可以实现:

nack的方法

  • nack(long sleep)
  • nack(int index, long sleep).

index我在想是不是一个消息批次的offset位置,就是说一个批次比如有1000条,你是全部重复消费还是怎么样。
我现在没办法尝试,我希望你测试后可以给我分享下结果。

这边尝试过的同事跟我反馈是,offset设置0,就是全部重试,offset是多少,就丢掉多少数据

那你应该是完美解决你的问题了,可以通过record.offset()定位你当前失败的位置。

你的答案

查看kafka相关的其他问题或提一个您自己的问题