半兽人 发表于: 2015-03-10   最后更新时间: 2021-12-13 11:19:24  
{{totalSubscript}} 订阅, 21,191 游览

消费者offset跟踪 (Consumer Offset Tracking)

The high-level consumer tracks the maximum offset it has consumed in each partition and periodically commits its offset vector so that it can resume from those offsets in the event of a restart. Kafka provides the option to store all the offsets for a given consumer group in a designated broker (for that group) called the offset manager. i.e., any consumer instance in that consumer group should send its offset commits and fetches to that offset manager (broker). The high-level consumer handles this automatically. If you use the simple consumer you will need to manage offsets manually. This is currently unsupported in the Java simple consumer which can only commit or fetch offsets in ZooKeeper. If you use the Scala simple consumer you can discover the offset manager and explicitly commit or fetch offsets to the offset manager. A consumer can look up its offset manager by issuing a ConsumerMetadataRequest to any Kafka broker and reading the ConsumerMetadataResponse which will contain the offset manager. The consumer can then proceed to commit or fetch offsets from the offsets manager broker. In case the offset manager moves, the consumer will need to rediscover the offset manager. If you wish to manage your offsets manually, you can take a look at these code samples that explain how to issue OffsetCommitRequest and OffsetFetchRequest.
高级别消费者跟踪每个分区已消费的最大的offset,并定期提交offset,在重新启动的情况下,可从这些offset恢复。Kafka提供了一个选项在指定的broker中来存储所有给定的消费者组的offset,称为offset manager。例如,该消费者组的所有消费者实例发送其offset,提交并获取该offset manager(broker)。高级别消费者都将会自动处理这些。如果你使用低级别的消费者,你将需要去手动管理offset。目前在低级别的java消费者不支持,只能在Zookeeper提交或获取offset。如果你使用简单的Scala消费者,你可拿到offset manager,并显式的提交或获取offset。消费者可以通过发送GroupCoordinatorRequest到任何的broker,并接受GroupCoordinatorResponse响应对象,对象包含offset manager,那么消费者可以继续从offset manager broker提交或获取offset。如果offset manager位置变动,消费者需要重新发现offset manager。如果你想手动管理你的offset,你可以看看OffsetCommitRequest 和 OffsetFetchRequest如何做的。

When the offset manager receives an OffsetCommitRequest, it appends the request to a special compacted Kafka topic named __consumer_offsets. The offset manager sends a successful offset commit response to the consumer only after all the replicas of the offsets topic receive the offsets. In case the offsets fail to replicate within a configurable timeout, the offset commit will fail and the consumer may retry the commit after backing off. (This is done automatically by the high-level consumer.) The brokers periodically compact the offsets topic since it only needs to maintain the most recent offset commit per partition. The offset manager also caches the offsets in an in-memory table in order to serve offset fetches quickly.
当offset manager接收一个OffsetCommitRequest,它追加请求到一个特定的压缩的名为__consumer_offsets的topic中,当offset topic的所有副本接收offset之后,offset manager发送一个成功的offset提交响应给消费者。万一offset无法在规定的时间内复制,offset将提交失败,消费者在回退之后可重试该提交(高级别消费者自动做的)。broker定期压缩offset topic,只需要保存每个分区最近的offset。offset manager也缓存offset在内存表中,以便offset快速获取。

When the offset manager receives an offset fetch request, it simply returns the last committed offset vector from the offsets cache. In case the offset manager was just started or if it just became the offset manager for a new set of consumer groups (by becoming a leader for a partition of the offsets topic), it may need to load the offsets topic partition into the cache. In this case, the offset fetch will fail with an OffsetsLoadInProgress exception and the consumer may retry the OffsetFetchRequest after backing off. (This is done automatically by the high-level consumer.)
当offset manager接收一个offset的获取请求,将从offset缓存中返回最新的的offset。如果offset manager刚启动或新的消费者组集刚成为offset manager(成为offset topic分区的leader),则需要加载offset topic的分区到缓存中,在这种情况下,offset将获取失败,并报出OffsetsLoadInProgress异常,消费者可后退后,重试OffsetFetchRequest(高级别消费者自动做这些)。


Kafka consumers in earlier releases store their offsets by default in ZooKeeper. It is possible to migrate these consumers to commit offsets into Kafka by following these steps:

  1. Set offsets.storage=kafka and dual.commit.enabled=true in your consumer config.
  2. Do a rolling bounce of your consumers and then verify that your consumers are healthy.
  3. Set dual.commit.enabled=false in your consumer config.
  4. Do a rolling bounce of your consumers and then verify that your consumers are healthy.

A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be performed using the above steps if you setoffsets.storage=zookeeper.
回滚(就是从kafka回到Zookeeper)也可以使用上面的步骤,通过设置 offsets.storage=zookeeper


The following gives the ZooKeeper structures and algorithms used for co-ordination between consumers and brokers.


When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a ZooKeeper znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a sub-directory for each topic name. Numerical ranges are also given such as [0...5] to indicate the subdirectories 0, 1, 2, 3, 4. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world".
当一个path中的元素表示为[XYZ],这意味着xyz的值不是固定的,实际上每个xyz的值可能是Zookeeper的znode,例如/topic/[topic]是一个目录,/topic包含一个子目录(每个topic名称)。数字的范围如[0...5]来表示子目录0,1,2,3,4。箭头->用于表示znode的内容,例如/hello -> world表示znode /hello包含值”world”。

Broker Node Registry

/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)

This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error.
这是所有当前broker的节点列表,其中每个提供了一个唯一的逻辑broker的id标识它的消费者(必须作为配置的一部分)。在启动时,broker节点通过在/brokers/ids/下用逻辑broker id创建一个znode来注册它自己。逻辑broker id的目的是当broker移动到不同的物理机器,而不会影响消费者。尝试注册一个已存在的broker id时将返回错误(因为2个server配置了相同的broker id)。

Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).

Broker Topic Registry

/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)

Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.

Consumers and Consumer Groups

Consumers of topics also register themselves in ZooKeeper, in order to coordinate with each other and balance the consumption of data. Consumers can also store their offsets in ZooKeeper by settingoffsets.storage=zookeeper. However, this offset storage mechanism will be deprecated in a future release. Therefore, it is recommended to migrate offsets storage to Kafka.

Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id "foobar". This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to.

The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group.

Consumer Id Registry

In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory.

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of . This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies.

Consumer Offsets

Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory ifoffsets.storage=zookeeper. This valued is stored in a ZooKeeper directory.

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

Partition Owner registry

Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)

Cluster Id(集群ID)

The cluster id is a unique and immutable identifier assigned to a Kafka cluster. The cluster id can have a maximum of 22 characters and the allowed characters are defined by the regular expression [a-zA-Z0-9-]+, which corresponds to the characters used by the URL-safe Base64 variant with no padding. Conceptually, it is auto-generated when a cluster is started for the first time.

Implementation-wise, it is generated when a broker with version 0.10.1 or later is successfully started for the first time. The broker tries to get the cluster id from the /cluster/id znode during startup. If the znode does not exist, the broker generates a new cluster id and creates the znode with this cluster id.
实际上,当第一次成功启动时生成(0.10.1或更高版本)。broker尝试在启动期间从 /cluster/id znode获取集群ID。 如果znode不存在,broker将生成一个新的集群ID,并使用此集群ID创建znode。

Broker node registration

The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.
broker节点基本上都是独立的, 所以它们只发布有关它们的信息,当broker连接,注册broker节点到其注册目录下,并写入它的host name和prot信息。broker还注册了其注册的现有topic和逻辑分区的列表。当创建一个新topic,就在broker上动态的注册新的topic。

Consumer registration algorithm

When a consumer starts, it does the following:

  1. Register itself in the consumer id registry under its group.
  2. Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.)
  3. Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. (Each change triggers rebalancing among all consumers in all consumer groups.)
  4. If the consumer creates a message stream using a topic filter, it also registers a watch on changes (new topics being added) under the broker topic registry. (Each change will trigger re-evaluation of the available topics to determine which topics are allowed by the topic filter. A new allowed topic will trigger rebalancing among all consumers within the consumer group.)
    如果消费者创建一个消费流用于topic过滤器,并在broker topic登记处注册一个观察者(监听新添加的topic)。(每次变化将触发可用的topic进行重新评估,以确定哪些topic是过滤器允许的。新的topic将触发消费者组中所有消费者之间进行重新平衡)
  5. Force itself to rebalance within in its consumer group.

消费者再平衡算法(Consumer rebalancing algorithm)

The consumer rebalancing algorithms allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. This design simplifies the implementation. Had we allowed a partition to be concurrently consumed by multiple consumers, there would be contention on the partition and some kind of locking would be required. If there are more consumers than partitions, some consumers won't get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.

Each consumer does the following during rebalancing:

   1. For each topic T that Ci subscribes to 
   2.   let PT be all partitions producing topic T
   3.   let CG be all consumers in the same group as Ci that consume topic T
   4.   sort PT (so partitions on the same broker are clustered together)
   5.   sort CG 6.   let i be the index position of Ci in CG and let N = size(PT)/size(CG)
   7.   assign partitions from i*N to (i+1)*N - 1 to consumer Ci 8.   remove current entries owned by Ci from the partition owner registry
   9.   add newly assigned partitions to the partition owner registry
        (we may need to re-try this until the original partition owner releases its ownership)

When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers within the same group about the same time.

更新于 2021-12-13