Kafka节点迁移导致集群报错

发表于: 2021-10-15   最后更新时间: 2021-10-23 05:35:58   921 游览

线上有两套Kafka集群:

  • 集群A版本:0.10.0.1
  • 集群B版本:0.10.2.1

近期由于集群B资源紧张,故从集群A中下线了3台机器,准备添加至集群B中,下线步骤如下:

  1. 迁移该三台机器上的Topic副本至其他Broker上
  2. 分别在三台机器上运行bin/kafka-server-stop.sh命令,关闭Kafka服务
  3. 清理三台机器上的Kafka服务的安装目录、数据目录及日志目录。

下线后,集群A运行正常。

然后,把三台集群依次添加至集群B中,添加后,三台机器上的server.log中均有WARN日志如下:

[2021-10-15 11:12:12,140] INFO [Kafka Server 12], started (kafka.server.KafkaServer)
[2021-10-15 11:12:12,323] WARN Attempting to send response via channel for which there is no open connection, connection id 5 (kafka.network.Processor)
[2021-10-15 11:12:18,445] WARN Attempting to send response via channel for which there is no open connection, connection id 1 (kafka.network.Processor)
[2021-10-15 11:12:29,527] WARN Attempting to send response via channel for which there is no open connection, connection id 3 (kafka.network.Processor)
[2021-10-15 11:12:31,585] WARN Attempting to send response via channel for which there is no open connection, connection id 1 (kafka.network.Processor)
[2021-10-15 11:12:31,728] WARN Attempting to send response via channel for which there is no open connection, connection id 10 (kafka.network.Processor)
[2021-10-15 11:12:57,526] WARN Attempting to send response via channel for which there is no open connection, connection id 0 (kafka.network.Processor)

同时集群A中的各个Broker也开始报ERROR日志如下:

[2021-10-15 11:13:02,187] ERROR Closing socket for xxx:9092-xxxx:49691 because of error (kafka.network.Processor)
kafka.network.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 2
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:87)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2
        at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
        at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)
        at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)
        at org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96)
        at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48)
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92)
        ... 10 more

请问是我Broker从集群A中下线的方式有误吗,为什么三台Broker下线之后没异常,添加至集群B中会导致集群A服务报错?

有没有哪位大神遇到过此类情况。

发表于 2021-10-15
添加评论

应该是连错zk了。

-> 半兽人 2年前

反复检查了好几遍,zk没连错,三台Broker配置的都是集群B的zk目录,除了zk有没有其他需要清理的元数据

半兽人 -> 2年前

kafka是通过broker来确认身份的,通过zk来绑定集群的。

你起B,A会有反应,说明问题在zk。

你报错的信息就是API版本差异呀,说明集群版本成员版本不一致。

你zk的脏数据影响的,或者删,或者broker.id换成新的。

ps(你补充一下2个kafka连接zk的配置,内部ip的话,暴露外网也没关系,尽量真实,我要确认集群)

-> 半兽人 2年前

我两套用的一个zk,配置不同的根目录

集群A的zk配置:

zookeeper.connect=1.zookeeper1001.bip.com.cn:2181,2.zookeeper1001.bip.com.cn:2181,3.zookeeper1001.bip.com.cn:2181/kafka/1004

集群B的zk配置:

zookeeper.connect=1.zookeeper1001.bip.com.cn:2181,2.zookeeper1001.bip.com.cn:2181,3.zookeeper1001.bip.com.cn:2181/kafka/2021

半兽人 -> 2年前

没其他问题了,超出我的知识圈了。

-> 半兽人 2年前

我们再查查看是不是有其他问题,非常感谢!

你的答案

查看kafka相关的其他问题或提一个您自己的问题