
kafka集群本来正常,重启节点后,网络请求空闲率NetworkProcessorAvgIdlePercent
突然变的很低,低于1%,虽然节点正常,但是集群性能很低。
kafka版本:2.1.1-1.1.0
9 broker 40c 256g 物理机 40topic 每个40-80分区,3副本
主要用于日志收集,所有容器应用有个agent采集日志然后发送到kafka
broker配置如下:
broker.id=1
zookeeper.connect=
log.dirs=/data/data1/kafka_1.1.0_data_bak,/data/data2/kafka_1.1.0_data_bak,/data/data3/kafka_1.1.0_data_bak,/data/data4/kafka_1.1.0_data_bak,/data/data5/kafka_1.1.0_data_bak
message.max.bytes=52428800
num.network.threads=16
num.io.threads=32
listeners=PLAINTEXT://
socket.send.buffer.bytes=10485760
socket.receive.buffer.bytes=10485760
socket.request.max.bytes=104857600
num.partitions=40
log.segment.bytes=1073741824
log.retention.hours=120
log.retention.check.interval.ms=300000
log.cleaner.enable=true
log.flush.interval.messages=1000
log.flush.scheduler.interval.ms=1000
auto.create.topics.enable=false
default.replication.factor=3
replica.socket.receive.buffer.bytes=10485760
replica.fetch.max.bytes=52428800
num.replica.fetchers=4
zookeeper.session.timeout.ms=120000
zookeeper.connection.timeout.ms=60000
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
auto.leader.rebalance.enable=true
num.recovery.threads.per.data.dir=1
delete.topic.enable=true
所有broker server.log日志一直报错:
Attempting to send response via channel for which there is no open connection, connection id xxxx-xxxx53695-854953 (kafka.network.Processor)
以下尝试均失败:
- 修改data过期时间为1小时,然后重启节点
- 重新搭建一个新集群,即删除zk和kafka所有日志,将自动创建topic改为手动,依次创建后,网络请求空闲率就越来越低
- 修改参数
num.network.threads
为40,请求空闲率虽然上升了,但是主机cpu打到98%,然后改为16
之前集群都是正常的,我猜是应用容器越来越多,采集的日志也越来越多,kafka支撑不住了,需要扩容吗?
也可能是应用agent连接kafka的连接超时时间太短吗,其中agent的连接kafka核心代码如下:
/** 重试等待时间,毫秒 */
private final String DEF_RETRY_BACKOFF_MS = "100";
/** 请求超时时间,毫秒 */
private final String DEF_REQUEST_TIMEOUT_MS = "600";
try{
reCreateProducer(properties);
}catch (Throwable t){
Logger.error("创建kafka生产者实例异常,进行重试创建...",t);
Thread redo = new Thread(new Runnable() {
int time = 1;
public void run() {
while (true){
if(time>5){
return ;
}
//构造实例
Logger.debug("重试创建kafka生产者实例{0}",time++);
try {
reCreateProducer(properties);
Thread.sleep(3000);
} catch (Throwable e) {
Logger.error("创建kafka生产者实例异常",e);
}
}
}
});
请论坛里的专家帮看下