kafka集群网络请求空闲率长期低于1%,所有broker日志报错:Attempting to send response via channel for which there is no open connection, connection id

冰海落花 发表于: 2020-08-15   最后更新时间: 2020-08-16  

kafka集群本来正常,重启节点后,网络请求空闲率NetworkProcessorAvgIdlePercent突然变的很低,低于1%,虽然节点正常,但是集群性能很低。

kafka版本:2.1.1-1.1.0

9 broker 40c 256g 物理机 40topic 每个40-80分区,3副本

主要用于日志收集,所有容器应用有个agent采集日志然后发送到kafka

broker配置如下:

broker.id=1
zookeeper.connect=
log.dirs=/data/data1/kafka_1.1.0_data_bak,/data/data2/kafka_1.1.0_data_bak,/data/data3/kafka_1.1.0_data_bak,/data/data4/kafka_1.1.0_data_bak,/data/data5/kafka_1.1.0_data_bak
message.max.bytes=52428800
num.network.threads=16
num.io.threads=32
listeners=PLAINTEXT://
socket.send.buffer.bytes=10485760
socket.receive.buffer.bytes=10485760
socket.request.max.bytes=104857600
num.partitions=40
log.segment.bytes=1073741824
log.retention.hours=120
log.retention.check.interval.ms=300000
log.cleaner.enable=true
log.flush.interval.messages=1000
log.flush.scheduler.interval.ms=1000
auto.create.topics.enable=false
default.replication.factor=3
replica.socket.receive.buffer.bytes=10485760
replica.fetch.max.bytes=52428800
num.replica.fetchers=4
zookeeper.session.timeout.ms=120000
zookeeper.connection.timeout.ms=60000
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
auto.leader.rebalance.enable=true
num.recovery.threads.per.data.dir=1
delete.topic.enable=true

所有broker server.log日志一直报错:

Attempting to send response via channel for which there is no open connection, connection id xxxx-xxxx53695-854953 (kafka.network.Processor)

以下尝试均失败:

  1. 修改data过期时间为1小时,然后重启节点
  2. 重新搭建一个新集群,即删除zk和kafka所有日志,将自动创建topic改为手动,依次创建后,网络请求空闲率就越来越低
  3. 修改参数num.network.threads为40,请求空闲率虽然上升了,但是主机cpu打到98%,然后改为16

之前集群都是正常的,我猜是应用容器越来越多,采集的日志也越来越多,kafka支撑不住了,需要扩容吗?

也可能是应用agent连接kafka的连接超时时间太短吗,其中agent的连接kafka核心代码如下:

/** 重试等待时间,毫秒 */
private final String DEF_RETRY_BACKOFF_MS = "100";
/** 请求超时时间,毫秒 */
private final String DEF_REQUEST_TIMEOUT_MS = "600";
try{
            reCreateProducer(properties);
        }catch (Throwable t){
            Logger.error("创建kafka生产者实例异常,进行重试创建...",t);
            Thread redo = new Thread(new Runnable() {
                int time = 1;
                public void run() {
                    while (true){
                        if(time>5){
                            return ;
                        }
                        //构造实例
                        Logger.debug("重试创建kafka生产者实例{0}",time++);
                        try {
                            reCreateProducer(properties);
                            Thread.sleep(3000);
                        } catch (Throwable e) {
                            Logger.error("创建kafka生产者实例异常",e);
                        }
                    }
                }
            });

请论坛里的专家帮看下



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka 5台节点,8核32g的配置,-Xmx1G -Xms1G -XX:MaxGCPauseMillis=20,这三个参数该怎么修改
下一条: 使用spring kafka 本地调试循环发送1W+消息时,会出现producer超时被强制关闭异常

  • 1、副本3个,日志2个副本就可以了吧。(相互备份,3倍消耗)
    2、自动创建和手动创建topic应该没啥区别吧,就创建一次。
    3、超时时间,是会影响,主要是如果老是超时,你就会要重试,一条消息如果重试几次,网络请求自然就更多了。

    • 谢谢大佬回复,现在情况如下:

      1. 现在白天业务繁忙的时候,网络等待空闲率都低于3%,晚上没业务时可以达到30-60%,白天网络请求队列基本是满的(500)。
      2. 应用agent连接kafka默认超时时间是600ms,但是配置文件里写的是15秒(只有没配置采用600ms),这个推荐配置是多少?
      3. 网上有答案:如果太快重新连接, 响应未完成时重新连接,broker将未完成的响应发送到具有重用端口的新连接。新连接将以相关标识(connection id )不匹配结束,会出现WRNING,其中send response via channel for which there is no open connection,connection id broker ip:16667-应用ip:59454-1976888 (kafka.network.Processor) 里面的59454是应用连接kafka的端口吧,1976888是啥,id吗
        4.使用lsof -n|awk '{print $2}'|sort|uniq -c|sort -nr|grep查看kafka进程打开的文件数竟然有300多万,但是主机配置ulimit -a显示1024000,主机监控报表显示句柄数正常,5万不到,执行命令的时候需要几分钟才能返回,是不是命令返回的是几分钟内打开的文件数总和?
        5.能否通过扩容节点解决目前的问题呢?
        • 15秒-30秒,默认超时时间即可,集群越热,超时时间应越长,否则会导致雪崩。

          是可以通过扩容解决目前的问题。

          but,从你提供的数据分析(网络连接过多)我个人觉得你的客户端程序写的有问题,kakfa的生产者是长连接,一旦建立,就用该连接持续发送消息,你是不是发完一条消息就关了,重新建立?

            • 应用的客户端运行了2年了,7月份出过一次故障把文件句柄耗光了,之前从未出现过这种问题。现在应用也没反应有啥问题,也能消费,但是总感觉是个不定时炸弹。

              现在发现所有kafka进程有211个线程,32个线程cpu利用率都在99.1以上(和num.io.threads=32配置一致),其他线程cpu利用率低于0.5,通过jstack发现:

              "kafka-request-handler-29" #111 daemon prio=5 os_prio=0 tid=0x00007f0d59653800 nid=0x7dd1 runnable [0x00007f0a46c4a000]
                 java.lang.Thread.State: RUNNABLE
                      at java.util.concurrent.locks.ReentrantReadWriteLock$Sync.tryReleaseShared(ReentrantReadWriteLock.java:440)
                      at java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared(AbstractQueuedSynchronizer.java:1341)
                      at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.unlock(ReentrantReadWriteLock.java:881)
                      at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:252)
                      at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
                      at kafka.server.MetadataCache.kafka$server$MetadataCache$$getAliveEndpoint(MetadataCache.scala:107)
                      at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getEndpoints$1.apply$mcVI$sp(MetadataCache.scala:57)
                      at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getEndpoints$1.apply(MetadataCache.scala:56)
                      at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getEndpoints$1.apply(MetadataCache.scala:56)
                      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                      at kafka.server.MetadataCache.kafka$server$MetadataCache$$getEndpoints(MetadataCache.scala:56)
                      at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getPartitionMetadata$1$$anonfun$apply$1.apply(MetadataCache.scala:84)
                      at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getPartitionMetadata$1$$anonfun$apply$1.apply(MetadataCache.scala:69)
                      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
                      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
                      at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
                      at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
                      at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
                      at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
                      at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
                      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
                      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
                      at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getPartitionMetadata$1.apply(MetadataCache.scala:69)
                      at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getPartitionMetadata$1.apply(MetadataCache.scala:68)
                      at scala.Option.map(Option.scala:146)
                      at kafka.server.MetadataCache.kafka$server$MetadataCache$$getPartitionMetadata(MetadataCache.scala:68)
                      at kafka.server.MetadataCache$$anonfun$getTopicMetadata$1$$anonfun$apply$8.apply(MetadataCache.scala:117)
                      at kafka.server.MetadataCache$$anonfun$getTopicMetadata$1$$anonfun$apply$8.apply(MetadataCache.scala:116)
                      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
                      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
                      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
                      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
                      at kafka.server.MetadataCache$$anonfun$getTopicMetadata$1.apply(MetadataCache.scala:116)
                      at kafka.server.MetadataCache$$anonfun$getTopicMetadata$1.apply(MetadataCache.scala:116)
                      at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
                      at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
                      at kafka.server.MetadataCache.getTopicMetadata(MetadataCache.scala:115)
                      at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:937)
                      at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1015)
                      at kafka.server.KafkaApis.handle(KafkaApis.scala:107)
                      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
                      at java.lang.Thread.run(Thread.java:745)
              

              这有哪些可能造成的呢。

                • kafka集群安装完成之后,就会进行一轮压测。压力固定的情况下,连接和openfile是相对持平的。
                  建议你也拿客户端进行一轮压测来观察,是否是客户端的问题。

                    另外主机参数配置了net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 1,不知道和这有关没,之前没配置的时候有次主机句柄和连接数暴涨,主机都挂了了。