spring-kafka消费消息出现异常时,怎么处理?

想喝好几罐八宝粥的男孩 发表于: 2021-03-16   最后更新时间: 2021-03-16 14:59:49   3,199 游览

利用spring boot集成的kafka在消费kafka 消息时,如果出现kafka集群崩溃或数据库连接故障时,有什么好的方法进行消息逻辑幂等性处理吗?我用到的方法是一直重复消费该条消息,但是该处理逻辑会导致后面的消息出现严重挤压,如果将失败的消息发送到失败主题中去,会出现入库数据存在时差错误,目前kafka即可群数据量较大,不方便存储位移到redis或者数据库中,大家有什么好的方法吗?

发表于 2021-03-16
添加评论

首先,消费者不支持幂等的,要靠你自己来判断。

其次:

如果将失败的消息发送到失败主题中去,会出现入库数据存在时差错误
  1. 如果数据库有问题(阻塞),当下失败的消息就无法处理,那后续的消息也无法处理,你应该直到数据库恢复,重新消费这些失败的消息。

  2. 数据库是正常的,那你这个消息因为某种原因,就它失败了,放到异常topic中,我认为是合理的,当下,无论你反复的处理,也是异常的,人工干预一下即可。

一般只要粗暴的关注第方案1,停止offset提交,从失败的地方开始重新消费。

建议:不要写过多的逻辑,反而引起系统的不稳定(我们运行6、7年了,基本没出过异常)。

我们数据库是采用的HBASe数据库,当发送失败消息到另一个主题中时,可能会把原来hbase正确的数据替换掉,能否设置一下消费重试次数,如果达到次数重试阈值的话,则停止消费,或出现kafka集群故障或者网络延迟大规模故障的话,该方案显得可行吗?

我们目前数据库采用的是hbase数据库,采用方案发送失败消息到另一个主题中,让程序消费失败主题的数据,可能会存在将正确数据进行更新;目前我们的方案是一直seek某条位移消息,当kafka集群大规模故障或网络出现延迟故障时,kafka集群消息会出现一定的挤压,你说的方案和我目前采用的方案一致,现在能否设置一下重试消费次数阈值?如果达到重试阈值,就停止消费;若达不到则进行重试;麻烦请回复一下,这样是否妥当。

kafka消费者的消息是没有重试的,批量丢到客户端之后,标记消息是否已消费是靠offset:

  • 自动提交offset
  • 手动提交offset

如果没提交,那消费者重启,则还是从该位置进行消费(重复消费)。

所以一切的核心还是至于你的拿到消息之后,如果处理这条消息的。

所以你自己实现逻辑:

  • 发送失败,继续尝试去发送,直到你说的3次,你确实是可以停止消费(可能会导致消息堆积更严重,这个还要实践)。

最后,我还是认为失败应该是几年都不会发生的事情,如果有阻塞(业务),你肯定需要提前准备好相对充足的资源来满足你的业务。

你的答案

查看kafka相关的其他问题或提一个您自己的问题