Kafka消费端,配置设置为手动提交时,单线程拉取消息,多线程处理消息,然后单线程手动提交时,如何提交?

傍窗观雨淞 发表于: 2021-04-27   最后更新时间: 2021-05-12 14:01:39   1,417 游览

由于处理消息是在多线程中处理的,消息处理完成的时间顺序可能得不到保证,具体如下图所示:

kafka

但是我看kafkaConsumer的api同步提交只有这个方法commitSync(final Map<topicpartition, offsetandmetadata=""> offsets), key为topic和partition, value为offset, 也就是说,我的对于同一个topic的同一个partition, 一次提交只能提交一个offset吗? 是不是我的理解有误? 或者说在此场景下,有没有更好的提交offset的办法?

Kafka消费端,配置设置为手动提交时,单线程拉取消息,多线程处理消息,然后单线程手动提交时,如何提交?

在线,5小时前登录
发表于 2021-04-27
¥1.0

是的,因为多线程,处理的时候,后面可能会比前面的快,导致提交offset错乱。

kafka的消息是否被消费,是基于offset的(下标),kafka的消息没有任何状态,这也是kafka高速的原因之一。

比如消费者拉取500条消息,这个时候,有几种情况。

  1. 串行,有序提交offset,然后处理。(消息最多丢失一条)
  2. 并行,将这500条消息,一次性提交,然后在处理。(如果这个时候程序down掉,会丢失这500条的部分消息)
  3. 并行,先处理这500条,完成之后,一次性提交。(如果在提交之前,这个时候程序down掉,其他消费者接管之后,会重复消费这500条)

这几种方式,根据你业务可承受的风险,进行选择。

好的 谢谢!

请教一下,kafka拉取消息是可以控制单次拉取消息的大小和或者消息条数吗? 相关的配置参数是什么?

另外请教一下,如果我消费端第一次拉的消息是1234, 然后没有提交, 之后第二次去拉消息,拉到的消息会不会还会包含1234?

当下的这个消费者不会再拉一遍了。当这个消费失效,其他消费者才会拉取。

你是指如果我再用这个消费者去拉取消息(也就是多次调用poll方法), 这个消费者不会再去拉取消息吗?

还是说不会同一个消费者第二次拉取的消息,不会拉取到和第一次拉取相同的消息?

还是说同一个消费者第二次拉取的消息,不会拉取到和第一次拉取相同的消息?

同一个消费者第二次拉取的消息,不会拉取到和第一次拉取相同的消息

好的,感谢,请问前一次拉取到哪里的offset是存储在消费者客户端吗?

存储在kafka主题为__consumer_offsets里。

我理解的是,完成提交之后的消息的offset才会存储在_consumer_offsets中,对于拉取了但是未提交的offset也会存储在这个主题中吗?

没提交的不会的。在消费者客户端的临时缓存里。

你的答案

查看kafka相关的其他问题或提一个您自己的问题