一台机器上的三个kafka节点伪集群,集群启动大概十分钟后,节点间的就不能通讯,一直重复报如下异常

发表于: 2019-07-03   最后更新时间: 2019-07-03  

描述

一台机器上的三个kafka节点集群,集群启动大概十分钟后,节点间的就不能通讯,导致整个集群不可用

问题日志

[2019-07-02 22:01:21,077] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1269536234, epoch=189475) to node 3: java.io.IOException: Connection
 to 3 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-07-02 22:01:21,080] WARN [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=1048
5760, fetchData={__consumer_offsets-45=(offset=53605, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[2])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1269
536234, epoch=189475)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:241)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:257)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019-07-02 22:01:53,114] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1269536234, epoch=INITIAL) to node 3: java.net.SocketTimeoutExceptio
n: Failed to connect within 30000 ms. (org.apache.kafka.clients.FetchSessionHandler)


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容





发表于: 1月前   最后更新时间: 1月前   游览量:411
上一条: Kafka2.0 开启权限后,发送消息报错INVALID_REPLICATION_FACTOR
下一条: 为什么zookeeper中查看不到kafka消费者group的节点

  • 看现象跟网络相关,如果刚启动就报错,关注下启动kafka的日志 是否有异常。

    • 看了启动过程的日志是没有异常,集群正常启动,但是正常启动运行大概十几分钟后就开始出现这个异常,然后节点之间的通讯就完全断开了,节点日志就一直在打印上面的异常信息,而且我觉的我三个节点都是在同一台服务器上的不同端口,理论上不应该会出现节点间访问不通的问题,不清楚这是啥情况

        • ####################### Broker 1 配置

          broker.id=1
          port=9091
          default.replication.factor=2
          listeners=SASL_PLAINTEXT://devhost:9091
          advertised.listeners=SASL_PLAINTEXT://devhost:9091
          security.inter.broker.protocol=SASL_PLAINTEXT
          sasl.enabled.mechanisms=PLAIN
          sasl.mechanism.inter.broker.protocol=PLAIN
          authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
          allow.everyone.if.no.acl.found=true
          num.network.threads=3
          num.io.threads=8
          socket.send.buffer.bytes=102400
          socket.receive.buffer.bytes=102400
          socket.request.max.bytes=104857600
          log.dirs=/data/kafka/kafka-logs-1
          num.partitions=10
          num.recovery.threads.per.data.dir=2
          offsets.topic.replication.factor=3
          transaction.state.log.replication.factor=1
          transaction.state.log.min.isr=1
          delete.topic.enable=true
          auto.create.topics.enable=true
          log.flush.interval.messages=10000

          log.flush.interval.ms=1000

          log.retention.hours=72
          log.segment.bytes=1073741824
          log.retention.check.interval.ms=300000
          zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/hd/kafka
          zookeeper.connection.timeout.ms=6000
          group.initial.rebalance.delay.ms=0

          ####################### Broker 2配置

          broker.id=2
          port=9092
          default.replication.factor=2
          listeners=SASL_PLAINTEXT://devhost:9092
          advertised.listeners=SASL_PLAINTEXT://devhost:9092
          security.inter.broker.protocol=SASL_PLAINTEXT
          sasl.enabled.mechanisms=PLAIN
          sasl.mechanism.inter.broker.protocol=PLAIN
          authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
          allow.everyone.if.no.acl.found=true
          num.network.threads=3
          num.io.threads=8
          socket.send.buffer.bytes=102400
          socket.receive.buffer.bytes=102400
          socket.request.max.bytes=104857600
          log.dirs=/data/kafka/kafka-logs-2
          num.partitions=10
          num.recovery.threads.per.data.dir=2
          offsets.topic.replication.factor=3
          transaction.state.log.replication.factor=1
          transaction.state.log.min.isr=1
          delete.topic.enable=true
          auto.create.topics.enable=true
          log.flush.interval.messages=10000

          log.flush.interval.ms=1000

          log.retention.hours=72
          log.segment.bytes=1073741824
          log.retention.check.interval.ms=300000
          zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/hd/kafka
          zookeeper.connection.timeout.ms=6000
          group.initial.rebalance.delay.ms=0

          ####################### Broker 3配置

          broker.id=3
          port=9093
          default.replication.factor=2
          listeners=SASL_PLAINTEXT://devhost:9093
          advertised.listeners=SASL_PLAINTEXT://devhost:9093
          security.inter.broker.protocol=SASL_PLAINTEXT
          sasl.enabled.mechanisms=PLAIN
          sasl.mechanism.inter.broker.protocol=PLAIN
          authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
          allow.everyone.if.no.acl.found=true
          super.users=User:admin
          num.network.threads=3
          num.io.threads=8
          socket.send.buffer.bytes=102400
          socket.receive.buffer.bytes=102400
          socket.request.max.bytes=104857600
          log.dirs=/data/kafka/kafka-logs-3
          num.partitions=10
          num.recovery.threads.per.data.dir=2
          offsets.topic.replication.factor=3
          transaction.state.log.replication.factor=1
          transaction.state.log.min.isr=1
          delete.topic.enable=true
          auto.create.topics.enable=true
          log.flush.interval.messages=10000

          log.flush.interval.ms=1000

          log.retention.hours=72
          log.segment.bytes=1073741824
          log.retention.check.interval.ms=300000
          zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/hd/kafka
          zookeeper.connection.timeout.ms=6000
          group.initial.rebalance.delay.ms=0

          上面是三个节点的配置,基本是使用其默认属性值,区别只有broker.id、port、log.dirs这几个属性值有区别其它的属性值在三个节点中都是一样的

            • 集群时使用了SASL_PLAINTEXT的安全协议,但是这个集群之前一直运行的很正常,就昨天开始才出现这个问题,应该不会是安全认证导致的这个问题,另外我还想问下大神另外一个问题,我这个集群目前是部署在开发环境,给开发者使用,有十几二十个组件连着使用,由于开发者需要在本机上开发功能调试,需要很频繁的断开连接这个kafka集群,每次连接上来,就会触发kafka集群的重平衡以及其他的操作,由于几十个开发者这样频繁的连接断开,导致这个集群经常不可用,每次都要很无奈的去重启集群,对于这种情况,请问大神有什么优化或者建议吗?

                • 我们本地给开发也在用,至少有30个项目组在使用。你这个不是固定10分钟就出问题了吗?
                  另外,你这个错误 还属于警告级别,说明可自行恢复。我是觉得你真正的问题并没有找到。
                  防火墙你也关注下。

                    • 还想问下大神,为什么kafka集群中某个节点出现大量CLOSE_WAIT状态连接,导致其他节点与该节点通讯不了,日志一直报如下异常:
                      java.net.SocketTimeoutException: Failed to connect within 30000 ms
                      at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
                      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
                      at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
                      at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
                      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:241)
                      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
                      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
                      at scala.Option.foreach(Option.scala:257)
                      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
                      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
                      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
                      此时为什么kafka集群,不在多次尝试与该节点进行通讯超时时,不把该节点从集群中剔除?其它正常节点还要一直不断的进行连接尝试,由于一直连接超时,还会导致整个集群不可用,要是kafka集群主动在多次节点间的心跳超时,就将该节点判断为故障,并将该节点从集群中剔除,整个集群就可以恢复正常,后面再又人工去重启故障节点,不是更好吗?请问这样的机制是可以通过kafka的哪些参数配置来实现还是kafka没有这样的判断机制?

                        • 现在看到的现象是,一个节点出现大量CLOSE_WAIT状态连接,其它正常节点日志一直报“java.net.SocketTimeoutException: Failed to connect within 30000 ms”,3个broker的进程都在也没有宕机,但是整个集群生产者发不了消息,消费者消费不了消息,处于不可用状态,只有当kill调有问题的节点后,整个集群才能恢复正常的生产消费状态,我是想问,在这种情况下,问题节点与其它正常节点的通讯一直是超时,为什么kafka集群不把出问题的节点从集群中剔除,让整个集群恢复正常?