Kafka Topic配置

原创
半兽人 发表于: 2017-09-30   最后更新时间: 2021-01-11 22:14:25  
{{totalSubscript}} 订阅, 47,667 游览

3.2 Topic配置

与topic相关的配置,服务器的默认值,也可可选择的覆盖指定的topic。如果没有给出指定topic的配置,则将使用服务器默认值。 可以通过-config选项在topic创建时设置。此示例使用自定义最大消息的大小和刷新率,创建一个名为my-topic的topic:

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

也可以使用alter configs命令修改或设置。 此示例修改更新my-topic的最大的消息大小:

> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
    --alter --add-config max.message.bytes=128000

你可以执行以下命令验证结果

> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe

移除:

> bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes

以下是topic级配置。是服务器的默认配置。服务器默认配置值仅适用于topic。

名称 描述 类型 默认 有效的值 服务器默认属性 重要程度
cleanup.policy “delete”或“compact”。指定在旧的日志段的保留策略。默认策略(“delete”),将达到保留时间或大小限制的日志废弃。 “compact”则压缩日志。 list delete [compact, delete] log.cleanup.policy medium
compression.type 针对指定的topic设置最终的压缩方式。标准的压缩格式有'gzip', 'snappy', lz4。还可以设置'uncompressed',就是不压缩;设置为'producer'这意味着保留生产者设置的原始压缩编解码。 string producer [uncompressed, snappy, lz4, gzip, producer] compression.type medium
delete.retention.ms 保留删除消息压缩topic的删除标记的时间。此设置还给出消费者如果从offset 0开始读取并确保获得最终阶段的有效快照的时间范围(否则,在完成扫描之前可能已经回收了)。 long 86400000 [0,...] log.cleaner.delete.retention.ms medium
file.delete.delay.ms 从文件系统中删除文件之前等待的时间 long 60000 [0,...] log.segment.delete.delay.ms medium
flush.messages 此设置允许指定我们强制fsync写入日志的数据的间隔。例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。一般,我们建议不要设置它,使用复制特性来保持持久性,并允许操作系统的后台刷新功能更高效。可以在每个topic的基础上覆盖此设置(请参阅每个主题的配置部分)。 medium long 9223372036854775807 [0,...] log.flush.interval.messages
flush.ms 此设置允许我们强制fsync写入日志的数据的时间间隔。例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。 一般,我们建议不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能,因为它更有效率 long 9223372036854775807 [0,...] log.flush.interval.ms medium
follower.replication.throttled.replicas follower复制限流列表。该列表应以[PartitionId]的形式描述一组副本:[BrokerId],[PartitionId]:[BrokerId]:...或者通配符'*'可用于限制此topic的所有副本。 list "" [partitionId],[brokerId]:[partitionId],[brokerId]:... follower.replication.throttled.replicas medium
index.interval.bytes 此设置控制Kafka向其offset索引添加索引条目的频率。默认设置确保我们大致每4096个字节索引消息。 更多的索引允许读取更接近日志中的确切位置,但使索引更大。你不需要改变这个值。 int 4096 [0,...] log.index.interval.bytes medium
leader.replication.throttled.replicas 在leader方面进行限制的副本列表。该列表设置以[PartitionId]的形式描述限制副本:[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...或使用通配符‘*’限制该topic的所有副本。 list "" [partitionId],[brokerId]:[partitionId],[brokerId]:... leader.replication.throttled.replicas medium
max.message.bytes kafka允许的最大的消息批次大小。如果增加此值,并且消费者的版本比0.10.2老,那么消费者的提取的大小也必须增加,以便他们可以获取大的消息批次。
在最新的消息格式版本中,消息总是分组批量来提高效率。在之前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个消息。
int 1000012 [0,...] message.max.bytes medium
message.format.version 指定消息附加到日志的消息格式版本。该值应该是一个有效的ApiVersion。例如:0.8.2, 0.9.0.0, 0.10.0,更多细节检查ApiVersion。通过设置特定的消息格式版本,并且磁盘上的所有现有消息都小于或等于指定版本。不正确地设置此值将导致消费者使用旧版本,因为他们将接收到“不认识”的格式的消息。 string 0.11.0-IV2 log.message.format.version medium
min.cleanable.dirty.ratio 此配置控制日志压缩程序将尝试清除日志的频率(假设启用了日志压缩)。默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。 double 0.5 [0,...,1] log.cleaner.min.cleanable.ratio medium
min.compaction.lag.ms 消息在日志中保持不压缩的最短时间。仅适用于正在压缩的日志。 long 0 [0,...] log.cleaner.min.compaction.lag.ms medium
min.insync.replicas 当生产者设置应答为"all"(或“-1”)时,此配置指定了成功写入的副本应答的最小数。如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)
当min.insync.replicas和acks强制更大的耐用性时。典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。如果多数副本没有收到写入,这将确保生产者引发异常。
int 1 [1,...] min.insync.replicas medium
preallocate 如果我们在创建新的日志段时在磁盘上预分配该文件,那么设为True。 boolean false log.preallocate medium
retention.bytes 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧的日志段以释放空间。默认情况下,没有设置大小限制则仅限于时间限制。 long -1 log.retention.bytes medium
retention.ms 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。这代表SLA消费者必须读取数据的时间长度。 long 604800000 log.retention.ms medium
segment.bytes 此配置控制日志的段文件大小。一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 int 1073741824 [14,...] log.segment.bytes medium
segment.index.bytes 此配置控制offset映射到文件位置的索引的大小。我们预先分配此索引文件,并在日志滚动后收缩它。通常不需要更改此设置。 int 10485760 [0,...] log.index.size.max.bytes medium
segment.jitter.ms 从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动 long 0 [0,...] log.roll.jitter.ms medium
segment.ms 此配置控制Kafka强制日志滚动的时间段,以确保保留可以删除或压缩旧数据,即使段文件未满。 long 604800000 [0,...] log.roll.ms medium
unclean.leader.election.enable 是否将不在ISR中的副本作为最后的手段选举为leader,即使这样做可能会导致数据丢失。 boolean false unclean.leader.election.enable medium

kafka新版本新增的配置

kafka > 2.0

名称 描述 类型 默认 有效的值 服务器默认属性 重要程度
message.downconversion.enable 此配置控制是否启用消息格式的向下转换以满足消费请求。当设置为false时,broker不会对期待旧消息格式的消费者执行向下转换。broker 会对来自此类旧客户端的消费请求作出 UNSUPPORTED_VERSION 错误响应。这个配置不适用于复制到followers时可能需要的任何消息格式转换。 boolean false log.message.downconversion.enable
更新于 2021-01-11
在线,1小时前登录

孫悟空 3年前

请问一下我的分区默认只能储存100条数据,被消费完还是这样,请问有什么配置可以修改吗

Lee 5年前

max.message.bytes  kafka允许的最大的消息批次大小。如果增加此值,则消费者的版本比0.10.2老,
-----
max.message.bytes  kafka允许的最大的消息批次大小。如果增加此值,【并且】消费者的版本比0.10.2老,

半兽人 -> Lee 5年前

赞,已更新。

小脑袋贼大 6年前

你好,我的Kafka环境只有一个Broker,但是配了2块硬盘,我创建了一个Topic,请问这个Topic会创建在哪个硬盘上?或者说我怎么才能指定Topic创建在某个硬盘上,谢谢。

無名 -> 小脑袋贼大 6年前

你指定的日志存储目录。

跟硬盘无关吧,要看你指定的数据存储目录在文件系统上的哪个目录里了

Sirius 6年前

问下topic在创建成功之后再设置replication-factor是不是不可以,topic更新的内容是不是只能在列表里面的项。

Sirius -> 半兽人 6年前

ok,跪谢大神~

答人 -> 半兽人 5年前

消费者,怎么设置解压数据的

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章