kafka2.4.0的MirrorMaker2配置后可以同步到目标kafka集群, 但是只能同步到MirrorMaker2在目标集群自动生成的topic上, 无法同步到指定topic上,如何处理?

呵呵哒 发表于: 2023-12-21   最后更新时间: 2023-12-21 17:01:19   987 游览

我自己在本地装了两个centos7的虚拟机, 部署的kafka版本为2.7.0, 现在想把kafka1作为源集群, kafka2作为目标集群, 将kafka1中指定topic的数据和对应groupid的消费偏移量同步到kafka2中相同的topic和groupid上, 实现数据和消费偏移量的实时同步。

配置文件 connect-mirror-maker.properties 内容如下

# 集群配置
clusters = A, B

# 集群A配置
A.bootstrap.servers = kafka1:9092
A->B.enabled = true                  # 启用从集群A到集群B的复制
A->B.topics = .*                     # 复制所有的主题从集群A到集群B
A->B.groups = .*                     # 复制所有的消费者组从集群A到集群B

# 集群B配置
B.bootstrap.servers = kafka2:9092
B->A.enabled = true                  # 启用从集群B到集群A的复制
B->A.topics = .*                     # 复制所有的主题从集群B到集群A
B->A.groups = .*                     # 复制所有的消费者组从集群B到集群A

# 复制策略配置
# replication.policy.class的取值有两个 DefaultReplicationPolicy 和 CustomReplicationPolicy
# replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy # 这是默认值
replication.factor=1                 # 复制因子,指定每个主题的复制因子

# 数据检查点配置
checkpoints.topic.replication.factor=1            # 检查点主题的复制因子
heartbeats.topic.replication.factor=1             # 心跳主题的复制因子
offset-syncs.topic.replication.factor=1           # 偏移同步主题的复制因子
offset.storage.replication.factor=1               # 偏移存储的复制因子
status.storage.replication.factor=1               # 状态存储的复制因子
config.storage.replication.factor=1               # 配置存储的复制因子

# 刷新配置
refresh.topics.enabled = true                     # 启用主题刷新
refresh.topics.interval.seconds = 3               # 刷新主题的时间间隔(秒)
refresh.groups.enabled = true                     # 启用消费者组刷新
refresh.groups.interval.seconds = 3               # 刷新消费者组的时间间隔(秒)

# 同步配置
sync.topic.configs.enabled = true                 # 启用主题配置同步
sync.topic.acls.enabled = false                   # 启用主题ACL同步
sync.group.offsets.enabled = true                 # 启用消费者组偏移同步
sync.group.offsets.interval.seconds = 3           # 同步消费者组偏移的时间间隔(秒)

replication.policy.class取值有两个, 分别是 DefaultReplicationPolicy 和 CustomReplicationPolicy。

  • DefaultReplicationPolicy: 默认取值, 这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,目的是为了避免双向同步的场景出现死循环, 官方的解释是为了避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。
  • CustomReplicationPolicy: 使用自定义复制策略类来完成此操作。

上述的配置文件需要在源集群和目标集群中都创建, 并且两个集群都要启动, 启动命令如下:

bin/connect-mirror-maker.sh config/connect-mirror-maker.properties --clusters A
bin/connect-mirror-maker.sh config/connect-mirror-maker.properties --clusters B

我的问题是, kafka官方网站对CustomReplicationPolicy策略写的不多, 而我需要把数据同步到跟源集群topic名一致的topic里, 如何配置配置文件才能实现?

ps: kafka权威指南第一版和第二版对使用MirrorMaker2的描述非常简略, 网上的参考资料需要通过修改kafka源码来实现, 我这边没有部署和开发java的条件(运维平台只有服务器的linux命令行)

发表于 2023-12-21

从 Kafka 3.0.0 开始,只需设置:

replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

也可以通过以下设置,删除前缀:

"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",
呵呵哒 -> 半兽人 8月前

kafka3.0版本的这个参数需要三节点同步三节点, 我个人在我笔记本上得搭建六个虚拟机, 目前没带动只能再试试, 删除前缀的操作死循环了, 目标kafka集群的topic不停同步数据, 没成功, 谢谢大佬指点, 我试试3.0怎么实现吧

半兽人 -> 呵呵哒 8月前

前缀为了防止无限循环:
去掉前缀就只能A->B,如果是A、B双向复制,那A集群和B集群都会读取topic1,来回同步。

呵呵哒 -> 半兽人 8月前

单向我也试了, 一开始发送的数据会连着发很多遍, 我猜测可能同步生成的topic不止一个, 如果都替换掉前缀, 那就相当于多条监控的进程都往一个topic里发, 导致数据发送多次
后来删除一些多余配置后, 配置内容如下:

clusters = A, B
A.bootstrap.servers = kafka1:9092
B.bootstrap.servers = kafka2:9092
A->B.enabled = true
A->B.topics = .*
A->B.groups = .*
*B->A.enabled = true
*B->A.topics = .*
*B->A.groups = .*

#replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy

replication.policy.separator=
source.cluster.alias=
target.cluster.alias=

replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
refresh.topics.enabled = true
refresh.topics.interval.seconds = 1
refresh.groups.enabled = true
refresh.groups.interval.seconds = 2
sync.topic.configs.enabled = true
sync.topic.acls.enabled = false
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 1

结果一开始同步发现没问题, 但是关闭mm2再重启就会报错, 报错内容如下:

[2023-12-26 17:25:40,327] WARN Could not create topic test_a. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic 'test_a' already exists.
[2023-12-26 17:25:40,328] WARN Could not create topic heartbeats. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic 'heartbeats' already exists.
[2023-12-26 17:25:40,328] WARN Could not create topic test_mm. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic 'test_mm' already exists.
[2023-12-26 17:25:40,328] WARN Could not create topic -test_mm. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic '-test_mm' already exists.
[2023-12-26 17:25:40,328] WARN Could not create topic -heartbeats. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic '-heartbeats' already exists.
[2023-12-26 17:25:41,100] INFO refreshing idle consumers group offsets at target cluster took 113 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2023-12-26 17:25:41,102] INFO sync idle consumer group offset from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2023-12-26 17:25:41,429] INFO refreshing consumer groups took 99 ms (org.apache.kafka.connect.mirror.Scheduler:95)
已杀死

这就应该是没有开启B->A的问题, 所以单开也不能完全解决这个问题, 我其实是想搞一个灾备的集群, 将A集群的数据同时同步到B集群, 这样就算A集群挂掉也能无缝衔接B集群, 数据和消费偏移量都一致

半兽人 -> 呵呵哒 8月前

这是警告,不是报错,验证过数据的一致性了吗?

呵呵哒 -> 半兽人 8月前

不是警告, 我发现这个还是因为重启后同步数据结果目标集群消费不到了, 我切回去才看到进程显示已杀死,的确就是没起来. 我猜测其实是重启mm2后程序检测到目标集群对应的topic都有, 无法创建新的topic, 之前没有删除前缀的时候, 每次重启mm2都会在上一个自动创建的topic基础上再创建一个, 比如说第一次创建了A.test, 第二次就会创建A.A.test, 以此类推. 取消前缀后应该是无法创建, 所以进程才会自动挂掉

你的答案

查看kafka相关的其他问题或提一个您自己的问题