kafka如何在保留topic的情况下清空消息?

開開新新 发表于: 2021-02-24   最后更新时间: 2021-02-24 13:50:21   3,982 游览

情况是这样,我们就一个kafka集群,开发的测试topic和正式topic都在一个集群,测试产生的topic 偶尔会需要清空,但是topic需要保留。

这样的需求有什么安全 可靠的解决方法吗?

不能重启kafka,也不能影响其他的topic。

发表于 2021-02-24

我使用kafka-configs.sh 让单个topic过期了,但是offset 没有变化,怎么让offset重置?

半兽人 -> 開開新新 3年前

topic过期?提供一下完整命令。
另外,offset调整一般是针对消费者的,如果消费者的offset是最新的,那就不会在继续消费了,坐等topic里的数据过期就可以了。

開開新新 -> 半兽人 3年前
./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name MpMall-dev --entity-type topics --add-config retention.ms=1

我刚才执行了

./kafka-consumer-groups.sh --bootstrap-server 192.168.10.146:9092 --group group.MallAssist --reset-offsets --to-earliest --topic MpMall-dev --execute

提示

Error: Assignments can only be reset if the group 'group.MallAssist' is inactive, but the current state is Stable.

换了--to-earliest--to-offset 0,也不行

./kafka-consumer-groups.sh --bootstrap-server 192.168.10.146:9092 --group group.MallAssist --reset-offsets --topic MpMall-dev --execute

提示

[2021-02-24 14:22:47,517] WARN New offset (0) is lower than earliest offset for topic partition MpMall-dev-0. Value will be set to 189511 (kafka.admin.ConsumerGroupCommand$)
[2021-02-24 14:22:47,518] WARN New offset (0) is lower than earliest offset for topic partition MpMall-dev-1. Value will be set to 5240 (kafka.admin.ConsumerGroupCommand$)
[2021-02-24 14:22:47,518] WARN New offset (0) is lower than earliest offset for topic partition MpMall-dev-2. Value will be set to 208 (kafka.admin.ConsumerGroupCommand$)

这里又产生两个问题, 如何让group 的状态变为inactive? 如何知道earliest offset是多少?

半兽人 -> 開開新新 3年前

1、把消费者停了。
2、该topic下最小的offset比0大(而且你应该设置成最大,而不是最小)

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

拿到CURRENT-OFFSET的值,设置成它。

话说,你已经改掉了topic的回收时间,就不需要在重置offset了,等等就行了。

開開新新 -> 半兽人 3年前

你的意思是 retention.ms=1,这样操作后,offset也会跟着重置吗? 但是我刚才看了一下 没有变化。

--to-offset 0,提示

[2021-02-24 14:22:47,517] WARN New offset (0) is lower than earliest offset for topic partition MpMall-dev-0. Value will be set to 189511 (kafka.admin.ConsumerGroupCommand$)

--to-offset 189512, 提示

[2021-02-24 14:23:26,553] WARN New offset (189512) is higher than latest offset for topic partition MpMall-dev-0. Value will be set to 189511 (kafka.admin.ConsumerGroupCommand$)
[2021-02-24 14:23:26,554] WARN New offset (189512) is higher than latest offset for topic partition MpMall-dev-1. Value will be set to 5240 (kafka.admin.ConsumerGroupCommand$)
[2021-02-24 14:23:26,554] WARN New offset (189512) is higher than latest offset for topic partition MpMall-dev-2. Value will be set to 208 (kafka.admin.ConsumerGroupCommand$)

我一次性都问了。回到正题,对于清空topic,有没有其他操作推荐的?

半兽人 -> 開開新新 3年前

就是依靠kafka自身的清理机制就好了。

把消费者组的offset设置为最新的场景是这样的,当某个topic有100万的积压消息,但是消费者组A已经不需要在消费这100万了,把offset设置为最新,那A消费者组跳过了这100万的消息,开始接受新的消息。随着时间的流逝,过了7天,这些数据默认就被kafka回收掉了。

半兽人 -> 開開新新 3年前

offset不会重置,过期了呀,数据就被kafka回收了,消费者再去消费,都是新消息了。

開開新新 -> 半兽人 3年前

哦 懂了,编号下移一位。 那有没有那种"删除文件夹内所有文件"的操作? 也就是我上面提到的。 类似新建一个topic,没有offset,size 为0这样的。

開開新新 -> 半兽人 3年前

我设置 retention.ms=1,offset没有变化。然后你说的这句 我也没理解。

offset的图片我发群里了

半兽人 -> 開開新新 3年前

据我所知,是没有的(除了删除)。

你的答案

查看kafka相关的其他问题或提一个您自己的问题