Kafka Consumer配置

原创
半兽人 发表于: 2017-04-02   最后更新时间: 2023-06-09 10:02:06  
{{totalSubscript}} 订阅, 51,132 游览

3.4 kafka消费者配置

在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者。新老客户端的配置如下。

3.4.1 新消费者配置

新消费者配置:(注意,右面是可拖动的)

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
bootstrap.servers host/port,用于和kafka集群建立初始化连接。因为这些服务器地址仅用于初始化连接,并通过现有配置的来发现全部的kafka集群成员(集群随时会变化),所以此列表不需要包含完整的集群地址(但尽量多配置几个,以防止配置的服务器宕机)。 list high
key.deserializer key的解析序列化接口实现类(Deserializer)。 class high
value.deserializer value的解析序列化接口实现类(Deserializer) class high
fetch.min.bytes 服务器哦拉取请求返回的最小数据量,如果数据不足,请求将等待数据积累。默认设置为1字节,表示只要单个字节的数据可用或者读取等待请求超时,就会应答读取请求。将此值设置的越大将导致服务器等待数据累积的越长,这可能以一些额外延迟为代价提高服务器吞吐量。 int 1 [0,...] high
group.id 此消费者所属消费者组的唯一标识。如果消费者用于订阅或offset管理策略的组管理功能,则此属性是必须的。 string "" high
heartbeat.interval.ms 当使用Kafka的分组管理功能时,心跳到消费者协调器之间的预计时间。心跳用于确保消费者的会话保持活动状态,并当有新消费者加入或离开组时方便重新平衡。该值必须必比session.timeout.ms小,通常不高于1/3。它可以调整的更低,以控制正常重新平衡的预期时间。 int 3000(3秒) high
max.partition.fetch.bytes 服务器将返回每个分区的最大数据量。如果拉取的第一个非空分区中第一个消息大于此限制,则仍然会返回消息,以确保消费者可以正常的工作。broker接受的最大消息大小通过message.max.bytes(broker config)或max.message.bytes (topic config)定义。参阅fetch.max.bytes以限制消费者请求大小。 int 1048576 [0,...] high
session.timeout.ms 用于发现消费者故障的超时时间。消费者周期性的发送心跳到broker,表示其还活着。如果会话超时期满之前没有收到心跳,那么broker将从分组中移除消费者,并启动重新平衡。请注意,该值必须在broker配置的group.min.session.timeout.msgroup.max.session.timeout.ms允许的范围内。 int 45000(45s) high
ssl.key.password 密钥存储文件中的私钥的密码。 客户端可选 password null high
ssl.keystore.location 密钥存储文件的位置, 这对于客户端是可选的,并且可以用于客户端的双向认证。 string null high
ssl.keystore.password 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 password null high
ssl.truststore.location 信任仓库文件的位置 string null high
ssl.truststore.password 信任仓库文件的密码 password null high
auto.offset.reset 当Kafka中没有初始offset或如果当前的offset不存在时(例如,该数据被删除了),该怎么办。
earliest:自动将偏移重置为最早的偏移
latest:自动将偏移重置为最新偏移
none:如果消费者组找到之前的offset,则向消费者抛出异常
其他:抛出异常给消费者。
string latest [latest, earliest, none] medium
connections.max.idle.ms 指定在多少毫秒之后关闭闲置的连接 long 540000 medium
enable.auto.commit 如果为true,消费者的offset将在后台周期性的提交 boolean true medium
exclude.internal.topics 内部topic的记录(如偏移量)是否应向消费者公开。如果设置为true,则从内部topic接受记录的唯一方法是订阅它。 boolean true medium
fetch.max.bytes 服务器为拉取请求返回的最大数据值。这不是绝对的最大值,如果在第一次非空分区拉取的第一条消息大于该值,该消息将仍然返回,以确保消费者继续工作。接收的最大消息大小通过message.max.bytes (broker config) 或 max.message.bytes (topic config)定义。注意,消费者是并行执行多个提取的。 int 52428800 [0,...] medium
max.poll.interval.ms 使用消费者组管理时poll()调用之间的最大延迟。消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。 int 300000 [1,...] medium
max.poll.records 在单次调用poll()中返回的最大消息数。 int 500 [1,...] medium
partition.assignment.strategy 当使用组管理时,客户端将使用分区分配策略的类名来分配消费者实例之间的分区所有权 list class org.apache.kafka
.clients.consumer
.RangeAssignor
medium
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 65536 [-1,...] medium
request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽则客户端将重新发送请求。 int 305000 [0,...] medium
sasl.jaas.config JAAS配置文件中SASL连接登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为: '(=)*;' password null medium
sasl.kerberos.service.name Kafka运行Kerberos principal名。可以在Kafka的JAAS配置文件或在Kafka的配置文件中定义。 string null medium
sasl.mechanism 用于客户端连接的SASL机制。安全提供者可用的机制。GSSAPI是默认机制。 string GSSAPI medium
security.protocol 用于与broker通讯的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 string PLAINTEXT medium
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 int 131072 [-1,...] medium
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1 medium
ssl.keystore.type key仓库文件的文件格式,客户端可选。 string JKS medium
ssl.protocol 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string TLS medium
ssl.provider 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 string null medium
ssl.truststore.type 信任存储文件的文件格式。 string JKS medium
auto.commit.interval.ms 如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。 int 5000 [0,...] low
check.crcs 自动检查CRC32记录的消耗。 这样可以确保消息发生时不会在线或磁盘损坏。 此检查增加了一些开销,因此在寻求极致性能的情况下可能会被禁用。 boolean true low
client.id 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来跟踪ip/port的请求源。 string "" low
fetch.max.wait.ms 如果没有足够的数据满足fetch.min.bytes,服务器将在接收到提取请求之前阻止的最大时间。 int 500 [0,...] low
interceptor.classes 用作拦截器的类的列表。 你可实现ConsumerInterceptor接口以允许拦截(也可能变化)消费者接收的消息。 默认情况下,没有拦截器。 list null low
metadata.max.age.ms 在一定时间段之后(以毫秒为单位的),强制更新元数据,即使没有任何分区领导变化,任何新的broker或分区。 long 300000 [0,...] low
metric.reporters 用作度量记录员类的列表。实现MetricReporter接口以允许插入通知新的度量创建的类。JmxReporter始终包含在注册JMX统计信息中。 list "" low
metrics.num.samples 保持的样本数以计算度量。 int 2 [1,...] low
metrics.recording.level 最高的记录级别。 string INFO [INFO, DEBUG] low
metrics.sample.window.ms The window of time a metrics sample is computed over. long 30000 [0,...] low
reconnect.backoff.ms 尝试重新连接指定主机之前等待的时间,避免频繁的连接主机,这种机制适用于消费者向broker发送的所有请求。 long 50 [0,...] low
retry.backoff.ms 尝试重新发送失败的请求到指定topic分区之前的等待时间。避免在某些故障情况下,频繁的重复发送。 long 100 [0,...] low
sasl.kerberos.kinit.cmd Kerberos kinit命令路径。 string /usr/bin/kinit low
sasl.kerberos.min.time.before.relogin 尝试/恢复之间的登录线程的休眠时间。 long 60000 low
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05 low
sasl.kerberos.ticket.renew.window.factor 登录线程将休眠,直到从上次刷新到ticket的指定的时间窗口因子到期,此时将尝试续订ticket。 double 0.8 low
ssl.cipher.suites 密码套件列表,用于TLS或SSL网络协议的安全设置,认证,加密,MAC和密钥交换算法的明明组合。默认情况下,支持所有可用的密码套件。 list null low
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null low
ssl.keymanager.algorithm 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 string SunX509 low
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null low
ssl.trustmanager.algorithm 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 string PKIX low

kafka >= 2.0.0

名称 描述 类型 默认 有效值 重要程度
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null 中间
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler class null 中间
sasl.login.class 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin class null 中间

kafka >= 2.1.0

名称 描述 类型 默认 有效值 重要程度
client.dns.lookup 控制客户端如何使用DNS查询。如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。 string use_all_dns_ips [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only] 中间

kafka >= 2.7

名称 描述 类型 默认 有效值 重要程度
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。默认的SSL引擎工厂只支持带X.509证书的PEM格式。 password null
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 long 127000 (127 seconds) 中间
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。如果在超时之前没有建立连接,客户端将关闭socket通道。 long 10000 (10 seconds) 中间

3.4.2 旧消费者配置

旧消费者配置如下:

  • group.id
  • zookeeper.connect
PROPERTY DEFAULT DESCRIPTION
group.id 标识消费者所属消费者组(独一的)。通过设置相同的组ID,多个消费者表明属于该消费者组的一部分。
zookeeper.connect 指定ZooKeeper连接字符串,格式为hostname:port,其中host和port是ZooKeeper服务器的主机和端口。 为了使ZooKeeper宕机时连接到其他ZooKeeper节点,你还可以以hostname1:host1,hostname2:port2,hostname3:port3的形式指定多个主机。
还可以设置ZooKeeper chroot路径,作为其ZooKeeper连接字符串的一部分,将其数据放置在全局ZooKeeper命名空间中的某个路径下。 如果是这样,消费者应该在其连接字符串中使用相同的chroot路径。 例如,要给出/chroot/path的chroot路径,你需要将该值设置为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
consumer.id null 如果未设置将自动生成。
socket.timeout.ms 30 * 1000 网络请求socker的超时时间。实际的超时是 max.fetch.wait+socket.timeout.ms的时间。
socket.receive.buffer.bytes 64 * 1024 网络请求socker的接收缓存大小
fetch.message.max.bytes 1024 * 1024 每个拉取请求的每个topic分区尝试获取的消息的字节大小。这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 拉取请求的大小至少与服务器允许的最大消息的大小一样大,否则生产者可能发送大于消费者可以拉取的消息。
num.consumer.fetchers 1 用于拉取数据的拉取线程数。
auto.commit.enable true 如果为true,请定期向ZooKeeper提交消费者已经获取的消息的偏移量。 当进程失败时,将使用这种承诺偏移量作为新消费者开始的位置。
auto.commit.interval.ms 60 * 1000 消费者offset提交到zookeeper的频率(以毫秒为单位)
queued.max.message.chunks 2 消费缓存消息块的最大大小。每个块可以达到fetch.message.max.bytes。
rebalance.max.retries 4 当新的消费者加入消费者组时,消费者集合尝试“重新平衡”负载,并为每个消费者分配分区。如果消费者集合在分配时发生时发生变化,则重新平衡将失败并重试。此设置控制尝试之前的最大尝试次数。
fetch.min.bytes 1 拉取请求返回最小的数据量。如果没有足够的数据,请求将等待数据积累,然后应答请求。
fetch.wait.max.ms 100 如果没有足够的数据(fetch.min.bytes),服务器将在返回请求数据之前阻塞的最长时间。
rebalance.backoff.ms 2000 重新平衡时重试之间的回退时间。如果未设置,则使用zookeeper.sync.time.ms中的值。
refresh.leader.backoff.ms 200 回退时间等待,然后再尝试选举一个刚刚失去leader的分区。
auto.offset.reset largest 如果ZooKeeper中没有初始偏移量,或偏移值超出范围,该怎么办?
最小:自动将偏移重置为最小偏移
最大:自动将偏移重置为最大偏移
* 其他任何事情:抛出异常消费者
consumer.timeout.ms -1 如果在指定的时间间隔后没有消息可用,则向用户发出超时异常
exclude.internal.topics true 来自内部topic的消息(如偏移量)是否应该暴露给消费者。
client.id group id value 客户端ID是每个请求中发送的用户指定的字符串,用于帮助跟踪调用。 它应该逻辑地标识发出请求的应用程序。
zookeeper.session.timeout.ms 6000 ZooKeeper会话超时。如果消费者在这段时间内没有对ZooKeeper心跳,那么它被认为是死亡的,并且会发生重新平衡。
zookeeper.connection.timeout.ms 6000 与zookeeper建立连接时客户端等待的最长时间。
zookeeper.sync.time.ms 2000 ZK follower可以罗ZK leader多久
offsets.storage zookeeper 选择存储偏移量的位置(zookeeper或kafka)。
offsets.channel.backoff.ms 1000 重新连接offset通道或重试失败的偏移提取/提交请求时的回退周期。
offsets.channel.socket.timeout.ms 10000 读取offset拉取/提交响应的Socker的超时时间。此超时也用于查询offset manager的ConsumerMetadata请求。
offsets.commit.max.retries 5 失败时重试偏移提交的最大次数。此重试计数仅适用于停机期间的offset提交,它不适用于自动提交线程的提交。它也不适用于在提交offset之前查询偏移协调器的尝试。即如果消费者元数据请求由于任何原因而失败,则将重试它,并且重试不计入该限制。
dual.commit.enabled true 如果使用“kafka”作为offsets.storage,则可以向ZooKeeper(除Kafka之外)进行双重提交offset。在从基于zookeeper的offset存储迁移到kafka存储的时候可以这么做。对于任何给定的消费者组,在该组中的所有实例已迁移到提交到broker(而不是直接到ZooKeeper)的新的版本之后,可以关闭这个。
partition.assignment.strategy range 在“range”或“roundrobin”策略之间选择将分区分配给消费者流。

循环分区分配器分配所有可用的分区和所有可用的消费者线程。然后,继续从分区到消费者线程进行循环任务。如果所有消费者实例的订阅是相同的,则分区将被均匀分布。(即,分区所有权计数将在所有消费者线程之间的差异仅在一个delta之内。)循环分配仅在以下情况下被允许:(a)每个主题在消费者实例中具有相同数量的流(b)订阅的topic的对于组内的每个消费者实例都是相同的。

范围(Range)分区基于每个topic。对于每个主题,我们按数字顺序排列可用的分区,并以字典顺序排列消费者线程。然后,我们将分区数除以消费者流(线程)的总数来确定分配给每个消费者的分区数。如果不均匀分割,那么前几个消费者将会有多的分区。

有关消费者配置的更多详细信息,请参见scaf类kafka.consumer.ConsumerConfig。

更新于 2023-06-09

坏脾气先森 4年前

大佬,kafka flower 没有从leader同步数据,日志里面也只显示设置flower的高水位为0,然后就没其他信息了,,这种情况一般导致的原因是什么呢?看官方解释说是线程gc,但是发现当前的broker上面还有其他分区的leader。

你得通过命令,看到整个集群的状态,我才能帮你分析。

## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper

你可以到问题专区里提问,这里施展不开

消费者从kafka读取数据的时候每次都是三条数据,我看了我的max.poll.records = 500,设置了一分钟调取一次数据,用不用我把消费者配置贴出来

勇闯天涯 6年前

消费者在消费过程中丢失消息是怎么回事?什么原因会导致消费过程中丢失数据?

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章