请教下kafka消费速度问题

发表于: 2021-12-29   最后更新时间: 2021-12-29  

您好,有个问题请教下。

环境:

kafka版本2.6.0,单节点(centOS7.6,4核16G),主题名“Topic1”,共30个分区,

自定义containerFactory命名为“batchFactory”,concurrency=30,其他配置均为spring-kafka.2.3.4.RELEASE默认,消费者组目前一共有4个。

问题描述:

由于业务对实时性要求比较高,且数据在整点前后几分钟时数据相对较大,目前整点数据量在2000条/每秒,根据对kafka写入时间与消费者读取消息时间记录后做对比,大部分数据在kafka写入50ms后便由消费者拉取,但某些分区偶尔会存在1-3秒延迟后才由消费者拉取(大多集中在数据量大时,30万条左右数据出现200条大于1秒延迟的数据,最大的将近4秒),请问下:

1)后续应该做哪些优化能提高消费速度?
2)kafka搭建集群会不会提升消费速度?
3)如果消费者做3节点集群,30个分区,每个节点设置10个线程,还是30个线程?

感谢。

已经做的优化:

(1)消费者引入线程池,数据在拉取到后立即异步处理,spring-kafka自动提交设置为false(默认)。
(2)分区提高到30个(原为12个),有一些提高。
(3)批量消费用List<ConsumerRecord<?,?>>,拉取到后再循环异步消费,因为fetch-min-size默认为1,不知道批量有没有生效,批量时max-poll-records设置为10

@KafkaListener(topics = {"Topic1"},containerFactory = "batchFactory")
    public void listen1(ConsumerRecord<?, ?> record) {
      String threadName = Thread.currentThread().getName();
        long start = new Date().getTime();
       //异步将一些kafka信息入库,为了比较kafka写入时间与当前线程读取消息时间
       apiAsyncService.addKafkaLog(start,record,threadName);
       //异步业务处理
       listenAsyncService.listen1Task(record);
    }


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka 2.5.1 consumer支持幂等吗
下一条: kafka-consumer-perf-test.sh脚本遇到OutOfMemoryError

  • 1、kafka默认是批量拉取消息的(一次拉取约2000条,是根据消息大小的),所以一般瓶颈不在消费者拉取消息这块,一般是处理能力的问题,就是我拉取到了2000条给程序,程序处理这2000需要多久,然后kafka才会拉取下一个批次,你用异步做的,是对的,但是我担心你没有阻塞(就是一直往异步里丢,丢爆了,或者突然重启,导致业务丢失)。
    2、你这点量,kafka是没压力的。
    3、你现在不是让一个应用程序异步处理30个分区,而是把你的应用程序多起一些,放到不同的机器上,共同消化30个分区的消息。

    • 特别感谢您的回复。
      1.关于您提到的由于异步没有阻塞导致业务丢失的风险,解决方式只能是增加机器,以保证有足够的系统资源支撑所有异步任务吧?(这样的情况下,如果有一台忽然重启,是否也有这样的风险?有没有较好地应对这样异步任务的方式呢?还是说要保证业务不丢失,需要取消异步,横向扩展以保证处理速度?)
      2.关于第3点的消费者集群问题,由于我描述的不清楚,还要请教一下,30个分区,如果启动了3个节点的消费者程序,每个节点的分区是怎样分配的?(如果是默认配置concurrency=1,应该是每个节点分配10个分区吧,这样如果想要用3个节点保证并发量,是否应该设置每个节点的concurrency=10?)
      谢谢。

        • 1、强制(kill -9)或程序异常崩溃会丢消息的,正常停止应用不会丢消息的,几乎是不会出现崩溃情况(测试通过后,不要再随意调整线程数什么的了,很容易造成OOM,引起崩溃)。手动提交offset也可以解决,但是效率会大打折扣,崩溃时会造成重复消费的问题了(该消息处理了,但是在提交offset的时候,应用崩溃了)。 https://www.orchome.com/1056 这个例子是同步提交offset的,崩溃时最多丢1笔消息,线程运行5、6年了,没出过问题。

          2、springboot这个太久我忘了,不过concurrency=1你可以做个测试,设置成30个(对应你的分区数),那么你在启动一个新的应用程序时,如果它一直拿不到消息,则说明concurrency是对应的消费者分区,启动了30个消费者,占满了30个分区,如果新启动的消费者进程拿到了消息,则说明concurrency对应的是消息处理线程,什么意思呢,就是消费者是批量获取消息的,比如一次获取2000条消息,拿到之后,丢给消息处理线程,也就是说有30个消息处理程序同时在处理这2000条消息。

          也可以通过下面命令查看下消费者客户端占用情况:

          ## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
          bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
          
            • 第2点经过了一下测试,3台服务器(4核16G),kafka单节点,30个分区,每台机器(concurrency=10)负责的分区个数均为10。目前偶尔仍会有1-2秒的消费延迟,是否意味着需要增加kafka主题的分区个数来应对高并发的时刻?(因为分区新建后不可删除,所以先请教帮忙分析下,谢谢)

                • 你在多启动一个消费者,也设置concurrency=10),确保有4个消费者同时在,在观察一下分区情况,另外,你消息收到就打印,不要做任何处理,观察下延迟情况,告诉我结果。