在吞吐量要求比较严格的场景下, kafka生产者批量发送消息时,如何提高发送消息的速度。

傍窗观雨淞 发表于: 2021-05-16   最后更新时间: 2021-05-16 17:08:45   172 游览

如题, 在注重吞吐量的场景, 如何提高发送消息的速度?
我目前的做法是,打算将ack设置为1, 设大以下配置参数的值linger.msbatch.sizebuffer.memorymax.in.flight.requests.per.connection
以及代码中处理逻辑如下:

    // 多线程中每个线程都如此发送
    List<Foo> fooList = getFooList();
    for (List<Foo> partitionFooList : Lists.partition(fooList, 50)) {
        List<Future<RecordMetadata> futureList = new ArrayList<>();
        for (Foo foo : partitionFooList) {
            ProducerRecord<String, Foo> record = new ProducerRecord<>(topic, foo);
            Future<RecordMetadata> future = kafkaProducer.send(record);
            futureList.add(future);
        }
        try {
            for (Futrue<RecordMetadata> future : futureList) {
                future.get();    
            }
        } catch (InterruptedException | ExecutionException e) {
            log.error("error while send message for foo", e);
        }
    }

这是我目前的做法, 请问你有什么更好的解决办法吗? 以及linger.msbatch.sizebuffer.memorymax.in.flight.requests.per.connection增大到多少合适,有什么好的度量或者考察的方法吗?



发表于 1月前

你这些调整确实是针对吞吐进行提高的。

但是,从你的程序看下来,其实你是阻塞式的提交(get()),等待消息结果通知,性能会大大压缩,ack=1在副本有多个的情况下,会提高结果的。

你发现没有,你的程序和kafka集群其实只有一个连接,你所有的都共用这一个。

kafkaProducer.send

,由它来统一批次消息,缓存消息,然后发送给kafka的。

当然,一个通道并不意味着慢,如果你纯异步提交的话,已经可以达到非常高的吞吐了(接近带宽和kafka),如果你的机器和网络特别强劲,调整你提供的参数,加大批次数量,也可提升性能。

参考:
https://www.orchome.com/42
https://www.orchome.com/303#item-4

你的意思我总结下来主要是以下这一点:
1)取消阻塞? 我之所以使用get()是因为想提高消息发送可靠性,这一点你有什么更好的办法吗?

可是如果不这么做的话,我发送失败自身都是无感知的。

https://www.orchome.com/303#item-4
这里有异步callback的方式。

我想请教一下,发送相同数目的消息, 异步回调的方式会比同步发送的方式快很多吗? 有没有相关的基准测试。

5-10倍的差距。

我现在测试环境采用异步回调的方式发送消息,多线程发送,每个线程需要花7~8 s才能发送1000个消息,这是不是算发送消息算慢的?

我单个消息一般在5KB~10KB消息之间, 起了8个线程去发送消息

慢,不过跟你的消息大小有关。
你测试的方式不对吧,你先用默认生产者发送,1个线程,写个轮询,测试一下。
起多线程我上面已经说过了,其实发送的时候,还是共用一个kafkaProducer

你要有对比。

你的意思是开多个生产者去发送吗?

调了一下午的参数,kafka都快不起来 T T。

没那么玄幻的,默认参数已经满足99%的场景了。

你先去测试异步和同步,加上你说的开多个生产者(但是我记得多个通道,效果其实不明显)。

最后在找一个场景去调优(你已经确定同步还是异步之后)。

增大了batch.size之后,快很多, 但是Failed to allocate memory within the configured max blocking time 60000 ms. 出现异常

以下是我的生产者配置参数:

properties:
 linger.ms: 30
 max.in.flight.requests: 10
 buffer.memory: 536870912
 batch.size: 1048576

我试过默认参数,发送消息还是很慢。

找不到想要的答案?

我要提问
提问