Kafka Broker配置

原创
半兽人 发表于: 2016-11-07   最后更新时间: 2023-02-08 21:00:16  
{{totalSubscript}} 订阅, 88,226 游览

3.1 Broker配置

基本配置如下:

  • broker.id
  • log.dirs
  • zookeeper.connect

下文将详细论述了主题级别配置和默认值。

kafka >= 0.10版

名称 描述 类型 默认 有效值 重要程度
zookeeper.connect hostname:port的形式指定ZooKeeper连接字符串,其中host和port是ZooKeeper服务器的主机和端口。为了使得单个ZooKeeper机器宕机时通过其他ZooKeeper节点进行连接,你也可以以hostname1:port1,hostname2:port2,hostname3:port3的形式指定多个,提高可用性。
也可以将ZooKeeper chroot路径作为其ZooKeeper连接字符串的一部分,将其数据放在全局ZooKeeper命名空间的某个路径下。例如,要提供一个/chroot/path的chroot路径,你可以将连接字符串设为hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
string null
advertised.host.name 已弃用:当advertised.listenerslisteners没设置时候才使用。请改用advertised.listeners。Hostname发布到Zookeeper供客户端使用。在IaaS环境中,Broker可能需要绑定不同的接口。如果没有设置,将会使用host.name(如果配置了)。否则将从java.net.InetAddress.getCanonicalHostName()获取。 string null
advertised.listeners 发布给Zookeeper供客户端使用的监听地址(如果与 listeners 配置属性不同)。在IaaS环境中,broker可能需要绑定不同的接口。如果没设置,则使用listeners。与listeners不同是,0.0.0.0元地址是无效的。
同样与listeners不同的是,此属性中可以有重复的端口,因此可以将一个listener配置为通告另一个listener的地址。 这在使用外部负载平衡器的某些情况下很有用。
string null
advertised.port 过时的:当advertised.listenerslisteners没有设置才使用。请改用advertised.listeners。端口发布到Zookeeper供客户端使用,在IaaS环境中,broker可能需要绑定到不同的端口。如果没有设置,将和broker绑定的同一个端口。 int null
auto.create.topics.enable 启用自动创建topic boolean true
auto.leader.rebalance.enable 启用自动平衡leader。如果需要,后台线程会定期检查并触发leader平衡。 boolean true
background.threads 用于各种后台处理任务的线程数 int 10 [1,...]
broker.id 服务器的broker id。如果未设置,将生成一个独一无二的broker id。要避免zookeeper生成的broker id和用户配置的broker id冲突,从reserved.broker.max.id + 1开始生成。 int -1
compression.type 为给定topic指定最终的压缩类型。支持标准的压缩编码器('gzip', 'snappy', 'lz4')。也接受'未压缩',就是没有压缩。保留由producer设置的原始的压缩编码。 string producer
delete.topic.enable 启用删除topic。如果此配置已关闭,通过管理工具删除topic将没有任何效果 boolean false
host.name 不赞成:当listeners没有设置才会使用。请改用listeners。如果设置,它将只绑定到此地址。如果没有设置,它将绑定到所有接口 string ""
leader.imbalance.check.interval.seconds 由控制器触发分区再平衡检查的频率 long 300
leader.imbalance.per.broker.percentage 允许每个broker的leader比例不平衡。如果每个broker的值高于此值,控制器将触发leader平衡,该值以百分比的形式指定。 int 10
listeners 监听列表 - 监听的URL列表和协议(逗号分隔)。如果侦听协议不是安全协议,则还必须设置 listener.security.protocol.map。
监听的协议名称和端口号必须是唯一的。
将hostname留空则绑定到默认接口,将hostname留空则绑定到默认接口
合法的listener列表是:
PLAINTEXT://myhost:9092,SSL://:9091
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
string PLAINTEXT://:9092
log.dir 保存日志数据的目录 (补充log.dirs属性) string /tmp/kafka-logs
log.dirs 保存日志数据的目录。如果未设置,则使用log.dir中的值 string null
log.flush.interval.messages 消息刷新到磁盘之前,累计在日志分区的消息数 long 9223372036854775807 [1,...]
log.flush.interval.ms topic中的消息在刷新到磁盘之前保存在内存中的最大时间(以毫秒为单位),如果未设置,则使用log.flush.scheduler.interval.ms中的值 null
log.flush.offset.checkpoint.interval.ms 我们更新的持续记录的最后一次刷新的频率。作为日志的恢复点。 int 60000 [0,...]
log.flush.scheduler.interval.ms 日志刷新的频率(以毫秒为单位)检查是否有任何日志需要刷新到磁盘 long 9223372036854775807
log.retention.bytes 删除日志之前的最大大小 long -1
log.retention.hours 删除日志文件保留的小时数(以小时为单位)。第三级是log.retention.ms属性 int 168
log.retention.minutes 删除日志文件之前保留的分钟数(以分钟为单位)。次于log.retention.ms属性。如果没设置,则使用log.retention.hours的值。 int null
log.retention.ms 删除日志文件之前保留的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes的值。 long null
log.roll.hours 新建一个日志段的最大时间(以小时为单位),次于log.roll.ms属性。 int 168 [1,...]
log.roll.jitter.hours 从logRollTimeMillis(以小时为单位)减去最大抖动,次于log.roll.jitter.ms属性。 int 0 [0,...]
log.roll.ms 新建一个日志段之前的最大事时间(以毫秒为单位)。如果未设置,则使用log.roll.hours的值。 long null
log.segment.bytes 单个日志文件的最大大小 int 1073741824 [14,...]
log.segment.delete.delay.ms 从文件系统中删除文件之前的等待的时间 long 60000 [0,...]
message.max.bytes 服务器可以接收的消息的最大大小 int 1000012 [0,...]
min.insync.replicas 当producer设置acks为"all"(或"-1")时。min.insync.replicas指定必须应答成功写入的replicas最小数。如果不能满足最小值,那么producer抛出一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
当一起使用时,min.insync.replicas和acks提供最大的耐用性保证。一个典型的场景是创建一个复制因子3的topic,设置min.insync.replicas为2,并且ack是“all”。如果多数副本没有接到写入时,将会抛出一个异常。
int 1 [1,...]
num.io.threads 服务器用于执行网络请求的io线程数 int 8 [1,...]
num.network.threads 服务器用于处理网络请求的线程数。 int 3 [1,...]
num.recovery.threads.per.data.dir 每个数据的目录线程数,用于启动时日志恢复和关闭时flush。 int 1 [1,...]
num.replica.fetchers 从源broker复制消息的提取线程数。递增该值可提高follower broker的I/O的并发。 int 1
offset.metadata.max.bytes offset提交关联元数据条目的最大大小 int 4096
offsets.commit.required.acks commit之前需要的应答数,通常,不应覆盖默认的(-1) short -1
offsets.commit.timeout.ms Offset提交延迟,直到所有副本都收到提交或超时。 这类似于生产者请求超时。 int 5000 [1,...]
offsets.load.buffer.size 当加载offset到缓存时,从offset段读取的批量大小。 int 5242880 [1,...]
offsets.retention.check.interval.ms 检查过期的offset的频率。 long 600000 [1,...]
offsets.retention.minutes 当一个消费者组失去其所有消费者后(即为空时),其偏移量在被丢弃前将被保留这个保留期。对于独立的消费者(使用手动分配),偏移量将在最后一次提交的时间加上这个保留期后过期。 int 10080 [1,...]
offsets.topic.compression.codec 压缩编码器的offset topic - 压缩可以用于实现“原子”提交 int 0
offsets.topic.num.partitions offset commit topic的分区数(部署之后不应更改) int 50 [1,...]
offsets.topic.replication.factor offset topic复制因子(ps:就是备份数,设置的越高来确保可用性)。为了确保offset topic有效的复制因子,第一次请求offset topic时,活的broker的数量必须最少最少是配置的复制因子数。 如果不是,offset topic将创建失败或获取最小的复制因子(活着的broker,复制因子的配置) short 3 [1,...]
offsets.topic.segment.bytes offset topic段字节应该相对较小一点,以便于加快日志压缩和缓存加载 int 104857600 [1,...]
port 不赞成:当listener没有设置才使用。请改用listeners。该port监听和接收连接。 int 9092
queued.max.requests 在阻塞网络线程之前允许的排队请求数 int 500 [1,...]
quota.consumer.default 过时的:当默认动态的quotas没有配置或在Zookeeper时。如果每秒获取的字节比此值高,所有消费者将通过clientId/consumer区分限流。 long 9223372036854775807 [1,...]
quota.producer.default 过时的:当默认动态的quotas没有配置,或在zookeeper时。如果生产者每秒比此值高,所有生产者将通过clientId区分限流。 long 9223372036854775807 [1,...]
replica.fetch.min.bytes Minimum 每个获取响应的字节数。如果没有满足字节数,等待replicaMaxWaitTimeMs。 int 1
replica.fetch.wait.max.ms 跟随者副本发出每个获取请求的最大等待时间,此值应始终小于replica.lag.time.max.ms,以防止低吞吐的topic的ISR频繁的收缩。 int 500
replica.high.watermark.
checkpoint.interval.ms
达到高“水位”保存到磁盘的频率。 long 5000
replica.lag.time.max.ms 如果一个追随者没有发送任何获取请求或至少在这个时间的这个leader的没有消费完。该leader将从isr中移除这个追随者。 long 10000
replica.socket.receive.buffer.bytes 用于网络请求的socket接收缓存区 int 65536
replica.socket.timeout.ms 网络请求的socket超时,该值最少是replica.fetch.wait.max.ms int 30000
request.timeout.ms 该配置控制客户端等待请求的响应的最大时间,。如果超过时间还没收到消费。客户端将重新发送请求,如果重试次数耗尽,则请求失败。 int 30000
socket.receive.buffer.bytes socket服务的SO_RCVBUF缓冲区。如果是-1,则默认使用OS的。 int 102400
socket.request.max.bytes socket请求的最大字节数 int 104857600 [1,...]
socket.send.buffer.bytes socket服务的SO_SNDBUF缓冲区。如果是-1,则默认使用OS的。 int 102400
unclean.leader.election.enable 是否启用不在ISR中的副本参与选举leader的最后的手段。这样做有可能丢失数据。 boolean true
zookeeper.connection.timeout.ms 连接zookeeper的最大等待时间,如果未设置,则使用zookeeper.session.timeout.ms。 int null
zookeeper.session.timeout.ms Zookeeper会话的超时时间 int 6000
zookeeper.set.acl 设置客户端使用安全的ACL boolean false
broker.id.generation.enable 启用自动生成broker id。启用该配置时应检查reserved.broker.max.id。 boolean true 中等
broker.rack broker机架,用于机架感知副本分配的失败容错。例如:RACK1, us-east-1d string null 中等
connections.max.idle.ms 闲置连接超时:闲置时间超过该设置,则服务器socket处理线程将关闭这个连接。 long 600000 中等
controlled.shutdown.enable 启用服务器的关闭控制。 boolean true 中等
controlled.shutdown.max.retries 控制因多种原因导致的shutdown失败,当这样失败发生,尝试重试的次数 int 3 中等
controlled.shutdown.retry.backoff.ms 在每次重试之前,系统需要时间从导致先前故障的状态(控制器故障转移,复制延迟等)恢复。 此配置是重试之前等待的时间数。 long 5000 中等
controller.socket.timeout.ms 控制器到broker通道的sockt超时时间 int 30000
default.replication.factor 自动创建topic的默认的副本数 int 1
fetch.purgatory.purge.interval.requests 拉取请求清洗间隔(请求数) int 1000
group.max.session.timeout.ms 已注册的消费者允许的最大会话超时时间,设置的时候越长使消费者有更多时间去处理心跳之间的消息。但察觉故障的时间也拉长了。 int 300000
group.min.session.timeout.ms 已经注册的消费者允许最小的会话超时时间,更短的时间去快速的察觉到故障,代价是频繁的心跳,这可能会占用大量的broker资源。 int 6000
inter.broker.protocol.version 指定broker内部通讯使用的版本。通常在更新broker时使用。有效的值为:0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1。查看ApiVersion找到的全部列表。 string 0.10.1-IV2
log.cleaner.backoff.ms 当没有日志要清理时,休眠的时间 long 15000 [0,...]
log.cleaner.dedupe.buffer.size 用于日志去重的内存总量(所有cleaner线程) long 134217728
log.cleaner.delete.retention.ms 删除记录保留多长时间? long 86400000
log.cleaner.enable 在服务器上启用日志清洗处理?如果使用的任何topic的cleanup.policy=compact包含内部的offset topic,应启动。如果禁用,那些topic将不会被压缩并且会不断的增大。 boolean true
log.cleaner.io.buffer.load.factor 日志cleaner去重缓冲负载因子。去重缓冲区的百分比,较高的值将允许同时清除更多的日志,但将会导致更多的hash冲突。 double 0.9
log.cleaner.io.buffer.size 所有日志清洁器线程I/O缓存的总内存 int 524288 [0,...]
log.cleaner.io.max.bytes.per.second 日志清理器限制,以便其读写i/o平均小与此值。 double 1.7976931348623157E308
log.cleaner.min.cleanable.ratio 脏日志与日志的总量的最小比率,以符合清理条件 double 0.5
log.cleaner.min.compaction.lag.ms 一条消息在日志保留不压缩的最小时间,仅适用于正在压缩的日志。 long 0
log.cleaner.threads 用于日志清除的后台线程数 int 1 [0,...]
log.cleanup.policy 超过保留时间段的默认清除策略。逗号分隔的有效的策略列表。有效的策略有:“delete”和“compact” list [delete] [compact, delete]
log.index.interval.bytes 添加一个条目到offset的间隔 int 4096(4 kibibytes) [0,...]
log.index.size.max.bytes offset index的最大大小(字节) int 10485760 [4,...]
log.message.format.version 指定追加到日志中的消息格式版本。例如: 0.8.2, 0.9.0.0, 0.10.0。通过设置一个特定消息格式版本,用户需要保证磁盘上所有现有的消息小于或等于指定的版本。错误的设置将导致旧版本的消费者中断,因为消费者接收一个不理解的消息格式。 string 0.10.1-IV2
log.message.timestamp.difference.max.ms 如果log.message.timestamp.type=CreateTime,broker接收消息时的时间戳和消息中指定的时间戳之间允许的最大差异。如果时间戳超过此阈值,则消息将被拒绝。如果log.message.timestamp.type=LogAppendTime,则此配置忽略。 long 9223372036854775807 [0,...]
log.message.timestamp.type 定义消息中的时间戳是消息创建时间或日志追加时间。该值可设置为CreateTimeLogAppendTime string CreateTime [CreateTime, LogAppendTime]
log.preallocate 在创建新段时预分配文件?如果你在Windowns上使用kafka,你可能需要设置它为true。 boolean false
log.retention.check.interval.ms 日志清除程序检查日志是否满足被删除的频率(以毫秒为单位) long 300000 [1,...]
max.connections.per.ip 允许每个ip地址的最大连接数。 int 2147483647 [1,...]
max.connections.per.ip.overrides per-ip或hostname覆盖默认最大连接数 string ""
num.partitions topic的默认分区数 int 1 [1,...]
principal.builder.class 实现PrincipalBuilder接口类的完全限定名,该接口目前用于构建与SSL SecurityProtocol连接的Principal。 class class org.apache.kafka.
common.security.auth
.DefaultPrincipalBuilder
producer.purgatory.purge.interval.requests 生产者请求purgatory的清洗间隔(请求数) int 1000
replica.fetch.backoff.ms 当拉取分区发生错误时休眠的时间 1000 [0,...]
replica.fetch.max.bytes 拉取每个分区的消息的字节数。这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 int 1048576 [0,...]
replica.fetch.response.max.bytes 预计整个获取响应的最大字节数,这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 int 10485760 [0,...]
reserved.broker.max.id broker.id的最大数 int 1000 [0,...]
sasl.enabled.mechanisms 可用的SASL机制列表,包含任何可用的安全提供程序的机制。默认情况下只有GSSAPI是启用的。 list [GSSAPI]
sasl.kerberos.kinit.cmd Kerberos kinit 命令路径。 string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 登录线程在刷新尝试的休眠时间。 long 60000
sasl.kerberos.principal.to.local.rules principal名称映射到一个短名称(通常是操作系统用户名)。按顺序,使用与principal名称匹配的第一个规则将其映射其到短名称。忽略后面的规则。默认情况下,{username}/{hostname}@{REALM} 映射到 {username}。 list [DEFAULT]
sasl.kerberos.service.name Kafka运行的Kerberos principal名称。 可以在JAAS或Kafka的配置文件中定义。 string null
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动的百分比 time. double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程休眠,直到从上次刷新到ticket的到期的时间已到达(指定窗口因子),在此期间它将尝试更新ticket。 double 0.8
sasl.mechanism.inter.broker.protocol SASL机制,用于broker之间的通讯,默认是GSSAPI。 string GSSAPI
security.inter.broker.protocol broker之间的通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 string PLAINTEXT
ssl.cipher.suites 密码套件列表。认证,加密,MAC和秘钥交换算法的组合,用于使用TLS或SSL的网络协议交涉网络连接的安全设置,默认情况下,支持所有可用的密码套件。 list null
ssl.client.auth 配置请求客户端的broker认证。常见的设置:
ssl.client.auth=required 需要客户端认证。
ssl.client.auth=requested 客户端认证可选,不同于requested ,客户端可选择不提供自身的身份验证信息
* ssl.client.auth=none 不需要客户端身份认证
string none [required, requested, none]
ssl.enabled.protocols 已启用的SSL连接协议列表。 list [TLSv1.2, TLSv1.1, TLSv1]
ssl.key.password 秘钥库文件中的私钥密码。对客户端是可选的。 password null
ssl.keymanager.algorithm 用于SSL连接的密钥管理工厂算法。默认值是Java虚拟机的密钥管理工厂算法。 string SunX509
ssl.keystore.location 密钥仓库文件的位置。客户端可选,并可用于客户端的双向认证。 string null
ssl.keystore.password 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 password null
ssl.keystore.type 密钥仓库文件的格式。客户端可选。 string JKS
ssl.protocol 用于生成SSLContext,默认是TLS,适用于大多数情况。允许使用最新的JVM,LS, TLSv1.1 和TLSv1.2。 SSL,SSLv2和SSLv3 老的JVM也可能支持,由于有已知的安全漏洞,不建议使用。 string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。默认值是JVM的安全程序。 string null
ssl.trustmanager.algorithm 信任管理工厂用于SSL连接的算法。默认为Java虚拟机配置的信任算法。 string PKIX
ssl.truststore.location 信任仓库文件的位置 string null
ssl.truststore.password 信任仓库文件的密码 password null
ssl.truststore.type 信任仓库文件的文件格式 string JKS
authorizer.class.name 用于认证的授权程序类 string ""
metric.reporters 度量报告的类列表,通过实现MetricReporter接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。 list []
metrics.num.samples 维持计算度量的样本数。 int 2 [1,...]
metrics.sample.window.ms 计算度量样本的时间窗口 long 30000 [1,...]
quota.window.num 在内存中保留客户端限额的样本数 int 11 [1,...]
quota.window.size.seconds 每个客户端限额的样本时间跨度 int 1 [1,...]
replication.quota.window.num 在内存中保留副本限额的样本数 int 11 [1,...]
replication.quota.window.size.seconds 每个副本限额样本数的时间跨度 int 1 [1,...]
ssl.endpoint.identification.algorithm 端点身份标识算法,使用服务器证书验证服务器主机名。 string null
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
zookeeper.sync.time.ms ZK follower可落后与leader多久。 int 2000

kafka <= 0.8.2

可查看之前的版本:Kafka Broker配置(0.8.2)

以下是kafka新版本的增量配置

kafka >= 1.0

名称 描述 类型 默认 有效值 重要程度 更新模式
group.initial.rebalance.delay.ms 分组协调器在执行第一次重新平衡之前,等待更多消费者加入新组的时间。延迟时间越长,意味着重新平衡的次数可能越少,但会增加处理开始前的时间。 int 3000 只读
transaction.abort.timed.out.transaction.cleanup.interval.ms 回滚已超时的事务的时间间隔。 int 10000 (10 seconds) [1,...] 只读
transaction.remove.expired.transaction.cleanup.interval.ms 删除因transactional.id.expiration.ms过期的事务的时间间隔。 int 3600000 (1 hour) [1,...] 只读
transaction.max.timeout.ms 事务的最大允许超时时间。如果客户端请求的交易时间超过了这个时间,那么broker将在InitProducerIdRequest中返回一个错误。这可以防止客户端的超时时间过大,从而阻滞消费者从事务中包含的主题中读取。 int 900000 (15 minutes) [1,...] 只读
transaction.state.log.load.buffer.size 在将生产者id和事务加载到缓存中时,从事务日志段读取的批次大小(软限制,如果消息太大,则重写) int 5242880 [1,...] 只读
transaction.state.log.min.isr 覆盖事务topic的min.insync.replicas配置。 int 2 [1,...] 只读
transaction.state.log.num.partitions 事务topic的分区数(部署后不应改变)。 int 50 [1,...] 只读
transaction.state.log.replication.factor 事务topic的复制因子(设置较高来确保可用性)。内部topic创建将失败,直到集群规模满足该复制因子要求。 short 3 [1,...] 只读
transaction.state.log.segment.bytes 事务topic段的字节数应保持相对较小,以利于加快日志压缩和缓存加载速度 int 104857600 (100 mebibytes) [1,...] 只读
transactional.id.expiration.ms 事务协调器在没有收到当前事务的任何事务状态更新的情况下,在其事务id过期前等待的时间,单位为ms。这个设置也会影响生产者id过期 - 一旦这个时间在给定的生产者id最后一次写入后过去,生产者id就会过期。请注意,如果由于主题的保留设置而删除了生产者id的最后一次写入,那么生产者id可能会更快过期。 int 604800000 (7 days) [1,...] 只读

kafka >= 2.0

名称 描述 类型 默认 有效值 重要程度 动态更新模式
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null 中间 只读
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler class null 中间 只读
sasl.login.class 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin class null 中间 只读

kafka >= 2.5

名称 描述 类型 默认 有效值 重要程度 更新模式
zookeeper.clientCnxnSocket 当使用TLS连接到ZooKeeper时,通常设置为org.apache.zookeeper.ClientCnxnSocketNetty。覆盖任何同名的zookeeper.clientCnxnSocket设置的显式值。 string null 中间 只读
zookeeper.ssl.client.enable 设置客户端连接到ZooKeeper时使用TLS。显式的值会覆盖任何通过zookeeper.client.secure设置的值(注意名称不同)。如果两者都没有设置,默认为false;当为true时,必须设置zookeeper.clientCnxnSocket(通常为org.apache.zookeeper.ClientCnxnSocketNetty);其他需要设置的值可能包括zookeeper.ssl.cipher.suiteszookeeper.ssl.crl.enablezookeeper.ssl.enabled.protocolszookeeper.ssl.endpoint. identification.algorithmzookeeper.ssl.keystore.locationzookeeper.ssl.keystore.passwordzookeeper.ssl.keystore.typezookeeper.ssl. ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type boolean false 中间 只读
zookeeper.ssl.keystore.location 当使用客户端证书与TLS连接到ZooKeeper时的keystore位置。覆盖任何通过zookeeper.ssl.keyStore.location系统属性设置的显式值(注意是驼峰大小)。 password null 中间 只读
zookeeper.ssl.keystore.password 当使用客户端证书与TLS连接到ZooKeeper时的keystore密码。覆盖任何通过zookeeper.ssl.keyStore.password系统属性设置的显式值(注意驼峰大写)。 注意,ZooKeeper不支持与keystore密码不同的密钥密码,所以一定要将keystore中的密钥密码设置为与keystore密码相同,否则连接Zookeeper的尝试将失败。 password null 中间 只读
zookeeper.ssl.keystore.type 当使用客户端证书与TLS连接到ZooKeeper时的keystore类型。覆盖任何通过zookeeper.ssl.keyStore.type系统属性设置的显式值(注意骆驼大写)。默认值为null意味着该类型将根据keystore的文件扩展名自动检测。 string null 中间 只读
zookeeper.ssl.protocol 指定ZooKeeper TLS协商中使用的协议。一个显式的值会覆盖任何通过同名的zookeeper.ssl.protocol系统设置的值。 string TLSv1.2 只读
zookeeper.ssl.cipher.suites 指定在ZooKeeper TLS协商中使用的密码套件(csv),覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意单字 "ciphersuites")。覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意 "ciphersuites "这个单字)。默认值为 "null "意味着启用的密码套件列表是由正在使用的Java运行时决定的。 boolean false 只读
zookeeper.ssl.crl.enable 指定是否启用ZooKeeper TLS协议中的证书撤销列表。覆盖任何通过zookeeper.ssl.crl系统属性设置的显式值(注意是短名)。 boolean false 只读
zookeeper.ssl.enabled.protocols 指定ZooKeeper TLS协商(csv)中启用的协议。覆盖任何通过zookeeper.ssl.enabledProtocols系统属性设置的显式值(注意骆驼大写)。默认值为 "null "意味着启用的协议将是zookeeper.ssl.protocol配置属性的值。 list null 只读
zookeeper.ssl.endpoint.identification.algorithm 指定是否在ZooKeeper TLS协商过程中启用主机名验证,(不区分大小写)"https "表示启用ZooKeeper主机名验证,显式的空白值表示禁用(仅为测试目的建议禁用)。明确的值会覆盖任何通过zookeeper.ssl.hostnameVerification系统属性设置的 "true "或 "false "值(注意不同的名称和值;true意味着https,false意味着空白)。 string HTTPS 只读
zookeeper.ssl.ocsp.enable 指定是否启用ZooKeeper TLS协议中的在线证书状态协议。覆盖任何通过zookeeper.ssl.ocsp系统属性设置的显式值(注意是短名)。 boolean false 只读

kafka >= 2.7

名称 描述 类型 默认 有效值 重要程度 更新模式
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。默认的SSL引擎工厂只支持带X.509证书的PEM格式。 password null 中间 每个broker
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 long 127000 (127 seconds) 中间 只读
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。如果在超时之前没有建立连接,客户端将关闭socket通道。 long 10000 (10 seconds) 中间 只读

kafka >= 3.0.0版

名称 描述 类型 默认 有效值 重要程度
node.id process.role为非空时,与该进程所扮演的角色相关的节点ID。这是在KRaft模式下运行时的必要配置。 int -1
process.roles 这个进程所扮演的角色:broker、controller,或者 broker,controller(两者都是)。此配置只适用于KRaft(Kafka Raft)模式的集群(而不是ZooKeeper)。对于Zookeeper集群,让这个配置无需定义或为空。 list "" [broker, controller]
controller.listener.names 以逗号分隔的controller使用的监听器的名称列表。如果在KRaft模式下运行,这是必须的。基于ZK的controller将不使用这个配置。 string null
metadata.log.dir 这个配置决定了我们将KRaft模式下的集群的元数据日志放在哪里。如果没有设置,元数据日志将被放置在log.dirs中的第一个日志目录。 string null
broker.heartbeat.interval.ms broker之间心跳之间的时间间隔(毫秒)。在KRaft模式下运行时使用。 int 2000 (2秒) 中间
broker.session.timeout.ms 如果没有心跳,broker持续的时间,以毫秒计。在KRaft模式下运行时使用。 int 9000 (9秒) 中间
controller.quorum.voters 选民的id/endpoint信息的map列表,逗号分隔{id}@{host}:{port}。例如:1@localhost:9092,2@localhost:9093,3@localhost:9094 list “” 非空列表
controller.quorum.election.backoff.max.ms 这用于二进制指数退避机制,有助于防止选举陷入僵局。 int 1000 (1秒)
controller.quorum.election.timeout.ms 在触发新的选举之前,无法从leader那里获取的最大等待时间(毫秒)。 int 1000 (1秒)
controller.quorum.fetch.timeout.ms 在成为候选人并触发选民选举之前,没有从现任leader那里成功获取的最长时间;在四处询问是否有新的领导人纪元之前,没有从法定人数的大多数人那里获得获取的最长时间。 int 2000 (2秒)
control.plane.listener.name 用于controller和broker之间通信的监听器的名称。broker将使用control.plane.listener.name来定位监听器列表中的endpoint(端点),以监听来自controller的连接。
例如,如果一个broker的配置是:

listeners = INTERNAL://192.1.1.8:9092, EXTERNAL:/10.1.1.5:9093, CONTROLLER://192.1.1.8:9094
listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
control.plane.listener.name = CONTROLLER

启动时,broker将开始监听"192.1.1.8:9094",安全协议为 "SSL"。
在controller方面,当它通过zookeeper发现一个broker发布的端点时,它将使用control.plane.listener.name来寻找端点,它将用它来建立与broker的连接。
例如,如果broker在zookeeper上发布的端点是:
"endpoints" : ["INTERNAL://broker1.example.com:9092", "EXTERNAL://broker1.example.com:9093", "CONTROLLER://broker1.example.com:9094" ]

而控制器的配置是:
listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
control.plane.listener.name = CONTROLLER

那么controller将使用 "broker1.example.com:9094"和安全协议 "SSL"来连接到broker。

如果没有明确配置,默认值将为空,将没有专门的端点用于controller连接。
string null

更多关于broker配置的详情,可以在scala类中的kafka.server.KafkaConfig找到。

更新于 2023-02-08
在线,1小时前登录

FV_Kaja 2年前

大佬,我想问下,一个从broker服务宕机了后,kafka后续做了什么操作,是在其他存活的topic中新建被挂掉的分区和备份吗,但是备份数量不能超过broker数量,这块有点小白

半兽人 -> FV_Kaja 2年前

kafka的分区和副本是什么关系你要先清楚,备份的是分区。副本数不能超过broker的数量。1个副本只在一个broker备份一次。
参考:kafka副本和leader选举

FV_Kaja -> 半兽人 2年前

1、是这样,我在3个brocker的集群上,建的topic是单分区单备份,分区和备份落的broker宕机了,那会影响后续的消息接收吗

2、如果在其他健康的broker上重建分区,那后续重启了这台宕机的服务器,是不是不会有数据再落过来

半兽人 -> FV_Kaja 2年前

你先了解下基础入门吧,做一个遍这个例子就知道了:
kafka集群安装搭建

还有问题到问题专区提问吧。

不二の 3年前

大佬你好,请教下副本因子副本个数有什么联系吗

不二の -> 不二の 3年前

应该是Kafka每个topic的partition有N个副本,其中N是topic的复制因子

半兽人 -> 不二の 3年前

副本因子就是副本个数额。

不二の -> 半兽人 3年前

好的,谢谢大佬

kiga.top 3年前

请教一下日志删除的问题,我的配置如下

log.retention.hours=72
log.retention.check.interval.ms=180000
log.segment.ms=86400000
log.cleaner.delete.retention.ms = 20000

设置了保存时间为3天,但是logs文件还是按照7天去存储
不清楚哪个配置失效了

半兽人 -> kiga.top 3年前

1、所有的broker都配置了吗,并且要都重启。
2、你看看kafka broker回收日志
3、文件大小你看看是不是变小了,只是保留的空文件。

kiga.top -> 半兽人 3年前

1、都重启过没有生效。
2、zookeeper-gc.log.0.current ,都是GC的信息,没看到回收日志相关信息。
3、文件并没有改变,log文件都是1G大小。

# 执行如下命令行  他才开始标记segment log文件.delete  然后删除日志,保留三天内的数据
- bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=259200000
江流。 3年前

大佬,你好,我想问一下,我把kafka和zookeeper更新到kafka_2.12-2.6.0和apache-zookeeper-3.5.8-bin,之后莫名的提示消息发送成功但是kafka-manager工具查不出来消息,也无法消费,但是offset有偏移

半兽人 -> 江流。 3年前

先用命令检查下环境是否都正常
https://www.orchome.com/454

Src 3年前

log.index.interval.bytes 配置项 多写了一个列

半兽人 -> Src 3年前

感谢,已更新。

# 3年前

大佬,我想问一下,目前一个topic设置了24个分区,消费者有4个节点,每个节点配的消费线程数concurrency是12,合理吗,现在偶尔会堆积,消费者消费消息后会重新发送消息,偶尔会发送超时,Expiring 28 record(s) for xxx-11: 30302 ms has passed since batch creation plus linger time

半兽人 -> # 3年前

集群负载高了,优先关注下网络io和磁盘io、

  1. 集群高负载(或网络抖动)导致这样的现象。
  2. 消费者重新发送回kafka,异步还是同步,应答机制是什么,是所有节点需要应答。

解决:

  1. 增加集群,分摊负载
  2. 发送机制和应答机制改成异步和紧主节点应答或不应答。
  3. 减少分区的副本数量(内部要复制备份数据)
  4. 调大过期时间...
# -> 半兽人 3年前

重新发送消息是同步的,确认机制是调用callback,所有的节点都是一样的,减少分区的副本数量不太理解

为什么我在 server.properties 中没有找到 delete.topic.enable 这个配置

自由如风 4年前

advertised.listeners:
它和listeners类似,该参数也是用于发布给clients的监听器,
不过该参数主要用于IaaS环境,比如云上的机器通常都配有多块网卡(私网网卡和公网网卡)。
对于这种机器,用户可以设置该参数绑定公网IP供外部clients使用,
然后配置listeners来绑定私网IP供broker间通信使用。
当然不设置该参数也是可以的,只是云上的机器很容易出现clients无法获取数据的问题,
原因就是listeners绑定的是默认网卡,而默认网卡通常都是绑定私网IP的。
在实际使用场景中,对于配有多块网卡的机器而言,这个参数通常都是需要配置的。
(摘自《Apache Kafka 实战》)

—— Only、 4年前

大佬,请问broker相当于服务器个数,partitions的个数不管broker的个数是多少,都可以随便定义吗? 网上说partitions的个数与broker个数一般是相等或者partition是broker个数的倍数比较好,是这样吗?

broker就是kafka应用程序本身了,parition可以理解为队列,1个kafka服务可以有多个partition,kakfa的副本容错都是基于parition的,客户端也是根据分区数来建立长连接来消费这些parition。数量的比例好坏都是基于服务器的物理性能来决定的。

比如:1台kafka服务器,有5个parition,这5个parition的有的数据量大,有的数据量小,有的并发高,有的并发少。
但是,它们共享是物理层的带宽能力,cpu能力,内存能力。如果你总体就10M带宽,你怎么分,都是这么多,5个partition,10个parition,都是10M。所以还是要看物理能力,业务压力等。

等你有具体业务场景,到问答专区详细提问,我在帮你具体分析。

感谢回复,还有点疑问,
1.生产端和消费端配置的bootstrap.servers的地址就是broker即kafka服务是吗?
2.我现在有三个kafka服务节点,十几台消费端服务器,分区应该配多少呢?

1、是的
2、6个分区

J -> 半兽人 4年前

上面的朋友 十几台Consumer,你建议6个Partition。那他其余的Consumer不是白白浪费了?
每个Partition只能对应一台Consumer,为什么建议是是6台?

半兽人 -> J 4年前

sorry,没有看仔细,其余的consumer是会空着,如果消费者是十几个,匹配相对的分区。
broker数 x 2 = 分区数是个最简单的计算方式,broker物理机的网络、内存资源等是服务端的瓶颈,但大部分瓶颈通常都是消费者的消费能力,消费的慢。kafka客户端是批量将消息丢给消费者(一般是2000-3000条),如果这一个批次的消息,你消费的过慢,则可继续扩展消费者,无非是与broker多建立一些长连接。

喵帕斯~ 5年前

能补充下默认值吗

喵帕斯~ -> 喵帕斯~ 5年前

看错,往右划就可以看到

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章