呵呵哒

0 声望

这家伙太懒,什么都没留下

个人动态
  • 呵呵哒 回复 半兽人kafka2.4.0的MirrorMaker2配置后可以同步到目标kafka集群, 但是只能同步到MirrorMaker2在目标集群自动生成的topic上, 无法同步到指定topic上,如何处理? 中 :

    不是警告, 我发现这个还是因为重启后同步数据结果目标集群消费不到了, 我切回去才看到进程显示已杀死,的确就是没起来. 我猜测其实是重启mm2后程序检测到目标集群对应的topic都有, 无法创建新的topic, 之前没有删除前缀的时候, 每次重启mm2都会在上一个自动创建的topic基础上再创建一个, 比如说第一次创建了A.test, 第二次就会创建A.A.test, 以此类推. 取消前缀后应该是无法创建, 所以进程才会自动挂掉

    5月前
  • 呵呵哒 回复 半兽人kafka2.4.0的MirrorMaker2配置后可以同步到目标kafka集群, 但是只能同步到MirrorMaker2在目标集群自动生成的topic上, 无法同步到指定topic上,如何处理? 中 :

    单向我也试了, 一开始发送的数据会连着发很多遍, 我猜测可能同步生成的topic不止一个, 如果都替换掉前缀, 那就相当于多条监控的进程都往一个topic里发, 导致数据发送多次
    后来删除一些多余配置后, 配置内容如下:

    clusters = A, B
    A.bootstrap.servers = kafka1:9092
    B.bootstrap.servers = kafka2:9092
    A->B.enabled = true
    A->B.topics = .*
    A->B.groups = .*
    *B->A.enabled = true
    *B->A.topics = .*
    *B->A.groups = .*
    
    #replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    
    replication.policy.separator=
    source.cluster.alias=
    target.cluster.alias=
    
    replication.factor=1
    checkpoints.topic.replication.factor=1
    heartbeats.topic.replication.factor=1
    offset-syncs.topic.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    config.storage.replication.factor=1
    refresh.topics.enabled = true
    refresh.topics.interval.seconds = 1
    refresh.groups.enabled = true
    refresh.groups.interval.seconds = 2
    sync.topic.configs.enabled = true
    sync.topic.acls.enabled = false
    sync.group.offsets.enabled = true
    sync.group.offsets.interval.seconds = 1
    

    结果一开始同步发现没问题, 但是关闭mm2再重启就会报错, 报错内容如下:

    [2023-12-26 17:25:40,327] WARN Could not create topic test_a. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
    org.apache.kafka.common.errors.TopicExistsException: Topic 'test_a' already exists.
    [2023-12-26 17:25:40,328] WARN Could not create topic heartbeats. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
    org.apache.kafka.common.errors.TopicExistsException: Topic 'heartbeats' already exists.
    [2023-12-26 17:25:40,328] WARN Could not create topic test_mm. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
    org.apache.kafka.common.errors.TopicExistsException: Topic 'test_mm' already exists.
    [2023-12-26 17:25:40,328] WARN Could not create topic -test_mm. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
    org.apache.kafka.common.errors.TopicExistsException: Topic '-test_mm' already exists.
    [2023-12-26 17:25:40,328] WARN Could not create topic -heartbeats. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
    org.apache.kafka.common.errors.TopicExistsException: Topic '-heartbeats' already exists.
    [2023-12-26 17:25:41,100] INFO refreshing idle consumers group offsets at target cluster took 113 ms (org.apache.kafka.connect.mirror.Scheduler:95)
    [2023-12-26 17:25:41,102] INFO sync idle consumer group offset from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)
    [2023-12-26 17:25:41,429] INFO refreshing consumer groups took 99 ms (org.apache.kafka.connect.mirror.Scheduler:95)
    已杀死
    

    这就应该是没有开启B->A的问题, 所以单开也不能完全解决这个问题, 我其实是想搞一个灾备的集群, 将A集群的数据同时同步到B集群, 这样就算A集群挂掉也能无缝衔接B集群, 数据和消费偏移量都一致

    5月前
  • 半兽人 回复 呵呵哒kafka2.4.0的MirrorMaker2配置后可以同步到目标kafka集群, 但是只能同步到MirrorMaker2在目标集群自动生成的topic上, 无法同步到指定topic上,如何处理? 中 :

    前缀为了防止无限循环:
    去掉前缀就只能A->B,如果是A、B双向复制,那A集群和B集群都会读取topic1,来回同步。

    5月前
  • 呵呵哒 回复 半兽人kafka2.4.0的MirrorMaker2配置后可以同步到目标kafka集群, 但是只能同步到MirrorMaker2在目标集群自动生成的topic上, 无法同步到指定topic上,如何处理? 中 :

    kafka3.0版本的这个参数需要三节点同步三节点, 我个人在我笔记本上得搭建六个虚拟机, 目前没带动只能再试试, 删除前缀的操作死循环了, 目标kafka集群的topic不停同步数据, 没成功, 谢谢大佬指点, 我试试3.0怎么实现吧

    5月前
  • 赞了 呵呵哒CDH6.3.2集群配套的2.2.1版本kafka配置SASL_SCRAM认证配置的问题 的评论!

    没用过CDH额,这个配置是默认生成的,不过文件内容很简单,只是为了方便你命令形态的东西,写到里面变成固定的,不用每次带那么多参数了,你可以下载一个官方的版本的kafka来获取。

    producer.properties的默认内容如下:

    cat config/producer.properties
    
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # see kafka.producer.ProducerConfig for more details
    
    ############################# Producer Basics #############################
    
    # list of brokers used for bootstrapping knowledge about the rest of the cluster
    # format: host1:port1,host2:port2 ...
    bootstrap.servers=localhost:9092
    
    # specify the compression codec for all data generated: none, gzip, snappy, lz4
    compression.type=none
    
    # name of the partitioner class for partitioning events; default partition spreads data randomly
    #partitioner.class=
    
    # the maximum amount of time the client will wait for the response of a request
    #request.timeout.ms=
    
    # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
    #max.block.ms=
    
    # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
    #linger.ms=
    
    # the maximum size of a request in bytes
    #max.request.size=
    
    # the default batch size in bytes when batching multiple records sent to a partition
    #batch.size=
    
    # the total bytes of memory the producer can use to buffer records waiting to be sent to the server
    #buffer.memory=
    
    7月前