2.3版本的kafka消费组消费lag突然增加

沉淪伱の噯 发表于: 2020-06-08   最后更新时间: 2020-06-08 20:33:08   2,181 游览

提问说明

1、根据流量监控,发现周六从中午12点开始,消费就开始有延时了,然后在周日凌晨快2点延时开始恢复。我查看kafka 的日志server.log,发现在这个时间段内日志在不断报错,报错如下:

[2020-06-06 12:59:59,754] WARN [SocketServer brokerId=2] Unexpected error from /10.93.222.159; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1818848801 larger than 104857600)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:890)
    at kafka.network.Processor.run(SocketServer.scala:789)
    at java.lang.Thread.run(Thread.java:748)

2、我的kakfa版本是2.3,用的是rsyslog写kafka,rsyslog是8.25版本,用的是librdkakfa是0.9.1版本。

3、我观察写入的流量没有问题,就是消费开始突然有延时,然后自己恢复了。而期间这么久,日志里就报这个错,在延时开始恢复的时候这个报错也没了,时间都对的上。这个报错我看是说写入日志量超过了kafka接收的单次最大量。

所以有两个问题:

  1. 报这个错是不是日志没法写入就丢了呢
  2. 这种大量报写入失败,会影响消费吗
发表于 2020-06-08

kafka默认配置一次socket请求最多处理100MB数据,属性是server.properteis中的socket.request.max.bytes控制。你的请求数量大小约1.7G,远远大于kafka单次接收的能力了,所以发出warning(警告)。

  1. 可以增加socket.request.max.bytes
  2. rsyslog限制批次大小batchsize(不清楚rsyslog是不是对应这个)

最后,生产者管生产,是不会影响消费者的(前提你的内存是足够的,因为消费者的消息会优先从内存里获取)。

Treasuremy -> 半兽人 3年前

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group graylog2

大佬,请教您一个问题,执行这个命令时报了一个错误

Error: Executing consumer group command failed due to org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.

这个错误可能是什么原因引起的呢

半兽人 -> Treasuremy 3年前

请到问题专区提问,这是别人的问题区额。
ps:你的消费者组不存在

我今天研究了下librdkafka,发现它的默认配置 batch.size=1000000,batch.num.messages=10000,按照这个默认配置算,根本达不到kakfa里socket.request.max.bytes配置的100M。我有多个rsyslog服务器在发送数据,难道这个将多个写入同一个topic的请求合并算了吗?

客户端会把topic相同的,分区也相同的合并,对kafka节点来说,是总的。

我没太懂你的意思,客户端合并是什么意思呢,我这里的rsyslog是分布在每一台服务器上,每个都是单独发给kafka,只不过topic是相同的

librdkafka就是客户端,简单点客户端发送消息到kafka服务端,kafka单节点同一时刻接收的socket总和超过了kafka设置默认最大的100m,所以发出警告。
(socket总和里,包含了客户端发送的,副本同步的等)

谢谢您,我大概明白了,还有几个小细节想请教下您。
1、这个socket总和里只有一个topic的数据还是包含多个topic呢。
2、还有这个socket总和是否有一个队列存放数据,kafka间隔一段时间接收一次数据呢。如果是的话,是否可以调小这个间隔时间来接收数据呢

1、包含多个
2、如果调大socket,注意jvm也要跟着增加,防止内存溢出。

kafka间隔一段时间接受消息?没有吧,只能是在客户端控制输入,调小batch的消息数量,或增加kafka节点,分解压力。

多谢,明白了

你的答案

查看kafka相关的其他问题或提一个您自己的问题