Kafka消费者拉取消息可否对消息不进行处理,而是让同消费组的其他消费者处理消息?

傍窗观雨淞 发表于: 2021-04-22   最后更新时间: 2021-04-22 13:08:05   155 游览

场景:一个主从的分布式系统中, leader节点是往kafka中发送消息,follower节点是从kafka中消费消息。现在假设leader节点宕机, 我希望从follower节点中选中新的leader节点,但是因为了follower节点其中拉取大量的消息正在处理,follower节点成为新的leader节点时可能还有很多消息拉取了,但是还没来得及处理(我是想设置手动提交,消息处理好之后进行提交),这时候,我希望这些消息能让同消费者组的其他消费者来消费, 我应该如何做?有什么好办法吗?

我个人的理解是只有当follower节点宕机时,kafka才会重平衡,这些消息就可以被消费到,但现在的问题是,followe节点并没有宕机而是变成leader节点。



发表于 19天前

我需要明确几个问题。
1、leader和follower指的是你自己的程序,里面leader用于生产,follower用于消费?
2、如果我上面理解的没错的话,你后面说的我就完全理解不了。

是的,我自己的分布式的leader节点,和followe节点。

我个人的理解是只有当follower节点宕机时,kafka才会重平衡,这些消息就可以被消费到,但现在的问题是,followe节点并没有宕机而是变成leader节点。

那这句的意思是?

另外我为了提高消费消息的速度,采用的方式是将拉取和消息处理提交解耦开来的,单线程去拉取消息,多线程去处理消息,之后再进行手动提交

假设场景,当leader宕机之后,follwer进入选举期:

  • 选举期间,停止所有消费
  • 选举期间,继续消费

ps:因为follower也不知道谁即将成为leader,只有2种情况

我觉得,消费者一次拉取2000-3000条消息,处理完成之后,才会继续拉取,所以当一个follower成为leader期间,顶多也就一个批次的消息需要处理了,成为leader之后,关闭消费就好了,个人觉得这个是最好的方式。

我有一个leader节点,多个follower节点,这些follower节点在同一个消费组里面,我在follower节点消费消息是采用拉取和处理消息解耦开的方式,单线程去拉取消息,多线程去处理消息,之后再进行手动提交。 这里我假设的场景是leader节点宕机,我需要从follower节点选出新的leader节点,而leader只负责生产消息,所以从follower节点转变为leader节点之前,我需要把follower节点中那些拉取了,但是没处理的消息进行处理,处理的逻辑我希望是这些消息可以让同消费组的其他follower节点去处理这些消息。

单线程拉取消息是对的,因为一个批次的消息2000-3000条(基于消息大小),然后丢给多线程并行处理。但是你提交offset是基于下标的,并行处理你就不要关心是否处理成功了,让业务去处理。主进程(你的单进程拉取消息的),就拉取分发,提交offset。

我的follower节点是计算密集型的, 可能处理2000-3000条消息要一定时间,是否应该减少单批次拉取的消息数目?

你的意思是不提交follower成为了leader,拉取到的消息也不处理了,让给别的消费者处理?
如果是这样的话,你不提交offset就好了。

是的, 也就是说直接对于未处理的消息不提交offset, 然后直接关闭kafka consumer, 因为consumer数目变化, kafka会重平衡,这些消息又可以被其他follower节点消费到了。

是这样吗?

好的,谢谢! :smile:

请问一下,主线程的提交你有什么建议, 因为我是在多线程中处理的,消息处理完成可能是,提交的offset也可能是乱序的,但是我看kafkaConsumer的api同步提交只有这个方法commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets), key为topic和partition, value为offset, 也就是说,我的对于同一个topic的同一个partition, 一次提交只能提交一个offset吗? 是不是我的理解有误? 或者说在此场景下,有没有更好的提交offset的办法?

找不到想要的答案?

我要提问
相关
提问