Kafka如何发送大消息(超过 15MB)?

识趣 发表于: 2021-11-08   最后更新时间: 2021-11-08 23:11:29   1,245 游览

我使用 Java Producer API 向 Kafka V. 0.8 发送字符串消息。
如果消息大小约为 15 MB,我会收到一个 MessageSizeTooLargeException异常。

我试图将 message.max.bytes 设置为 40 MB,但我仍然遇到异常。 小消息工作没有问题。

(异常出现在生产者中,我在这个应用程序中没有消费者。)

我怎么样才能摆脱这个异常?

我的生产者配置

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
发表于 2021-11-08

你需要调整三个(或四个)属性:

  • 消费者端:fetch.message.max.bytes - 这将决定消费者可以获取的最大消息大小。

  • Broker方面:replica.fetch.max.byte - 这将允许kafka之中的副本在集群内发送消息,并确保消息被正确复制。如果这个值太小,那么消息将永远不会被复制,因此,消费者将永远不会看到消息,因为消息永远不会被提交(完全复制)。

  • Broker方面: message.max.bytes - 这是broker可以从生产者接收的最大消息大小。

  • Broker方面 (每个topic): max.message.bytes - 这是broker允许附加到主题上的最大的消息大小。这个大小在压缩前是有效的。(默认是broker的message.max.byte。)

第2点比较难发现,因为你不会从Kafka那里得到任何异常、信息或警告,所以当你发送大的信息时,一定要考虑到这一点。

你的答案

查看kafka相关的其他问题或提一个您自己的问题