kafka connector自带的producer如何更改配置?

一如乞人不需要形象 发表于: 2021-10-09   最后更新时间: 2021-10-09 13:47:54   242 游览

关于前段时间提问的“kafka 生产消息报错 RecordTooLargeException”这个问题,最近找到了答案,之所以怎么更改都会报错,是因为kafka connector里除了produce source record之外,还会有三个topic:

  • config.storage.topic
  • offset.storage.topic
  • status.storage.topic

之前的问题实际是offset.storage.topic里消息过大,抛出的异常,所以producer.override.max.request.size配置了,也没有用。

那么问题来了,如何更改这三个topic的producer config,看了源码,关于max.request.size的配置,是没有显式配置的,那么用的就是producer的默认配置,所以改kafka broker端producer.properties就行了吗?有没有其他方式?



kafka服务端核心的2个上限配置:

replica.socket.receive.buffer.bytes

用于网络请求的socket接收缓存区

message.max.bytes

服务器可以接收的消息的最大大小

配置来自:Kafka Broker配置

第一个没改过,不知道改什么值合适,第二个早就改了. 看报错的log,明确表示是max.request.size值设置的过小。

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3175237 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
    at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$SetCallbackFuture.get(KafkaOffsetBackingStore.java:228)
    at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$SetCallbackFuture.get(KafkaOffsetBackingStore.java:161)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:498)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:113)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:47)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:86)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

看kafka connector的源码。org.apache.kafka.connect.storage.KafkaOffsetBackingStore,在new 一个新的producer时,没有配置 max.request.size,所以用的是默认值1048576,导致producer 大于1048576时,会报错. 现在就是不知道,如何修改这个值的默认值。

这不又回到上个问题了。
你在connect-distributed.properties中配置:

producer.max.request.size=15728640

应该是生效的呀。

配置了,但只对于task生产source record时生效,在connect.log里可以看到两种producer config的log,一种是刚起connect service时,是默认值,一种是task启动时,是想要的值。

是不是之前多了override,之前的一直没有生效。

producer.override.max.request.size

哈哈哈,我两个都配了,头都炸了

那现在好了吗?

没有,下午就测了,不行了 根据code来看,得改producer 的那个配置的默认值才行了

不可思议,你把启动命令贴一下,直接设置

max.request.size=15728640

也试试。

/bin/sh -c '/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

connect-distributed.properties里设置了

connector.client.config.override.policy=All
producer.max.request.size=104857600
producer.override.max.request.size=104857600

不是,主要是task的producer生效了

根据log来看,是可以了,不知道是哪个参数生效的,我得一个个排除 :)

producer.max.request.size=104857600
producer.override.max.request.size=104857600
max.request.size=104857600

坐等你最终的结果。

结论:
max.request.size=104857600 是connector内部topic的prodcuer的配置.
producer.max.request.size=104857600 是connector source record 的prodcuer的配置.

你的答案

查看kafka相关的其他问题或提一个您自己的问题

找不到想要的答案?提一个您自己的问题。

我要提问
提问