半兽人 发表于: 2015-03-10   最后更新时间: 2020-11-23  


Kafka replicates the log for each topic's partitions across a configurable number of servers (you can set this replication factor on a topic-by-topic basis). This allows automatic failover to these replicas when a server in the cluster fails so messages remain available in the presence of failures.

Other messaging systems provide some replication-related features, but, in our (totally biased) opinion, this appears to be a tacked-on thing, not heavily used, and with large downsides: slaves are inactive, throughput is heavily impacted, it requires fiddly manual configuration, etc. Kafka is meant to be used with replication by default—in fact we implement un-replicated topics as replicated topics where the replication factor is one.
其他消息系统提供一些副本相关的功能,但是,在我们看来(有偏见),这似乎是一个附加的东西,没有大量的使用,这有很大的缺点:slave不活跃,吞吐量受到严重影响,它需要的精确的手动配置等。kafka使用的是默认副本 — 就是不需要副本的topic的复制因子就是1。

The unit of replication is the topic partition. Under non-failure conditions, each partition in Kafka has a single leader and zero or more followers. The total number of replicas including the leader constitute the replication factor. All reads and writes go to the leader of the partition. Typically, there are many more partitions than brokers and the leaders are evenly distributed among brokers. The logs on the followers are identical to the leader's log—all have the same offsets and messages in the same order (though, of course, at any given time the leader may have a few as-yet unreplicated messages at the end of its log).
副本以topic的分区为单位。在正常情况下,kafka每个分区都有一个单独的leader,0个或多个follower。副本的总数包括leader。所有的读取和写入到该分区的leader。通常,分区数比broker多,leader均匀分布在broker。follower的日志完全等同于leader的日志 — 相同的顺序相同的偏移量和消息(当然,在任何一个时间点上,leader比follower多几条消息,尚未同步到follower)

Followers consume messages from the leader just as a normal Kafka consumer would and apply them to their own log. Having the followers pull from the leader has the nice property of allowing the follower to naturally batch together log entries they are applying to their log.

As with most distributed systems automatically handling failures requires having a precise definition of what it means for a node to be "alive". For Kafka node liveness has two conditions

  1. A node must be able to maintain its session with ZooKeeper (via ZooKeeper's heartbeat mechanism)
  2. If it is a slave it must replicate the writes happening on the leader and not fall "too far" behind

We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a follower dies, gets stuck, or falls behind, the leader will remove it from the list of in sync replicas. The definition of, how far behind is too far, is controlled by the replica.lag.max.messages configuration and the definition of a stuck replica is controlled by the replica.lag.time.max.ms configuration.

In distributed systems terminology we only attempt to handle a "fail/recover" model of failures where nodes suddenly cease working and then later recover (perhaps without knowing that they have died). Kafka does not handle so-called "Byzantine" failures in which nodes produce arbitrary or malicious responses (perhaps due to bugs or foul play).

We can now more precisely define that a message is considered committed when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the acks setting that the producer uses. Note that topics have a setting for the "minimum number" of in-sync replicas that is checked when the producer requests acknowledgment that a message has been written to the full set of in-sync replicas. If a less stringent acknowledgement is requested by the producer, then the message can be committed, and consumed, even if the number of in-sync replicas is lower than the minimum (e.g. it can be as low as just the leader).

The guarantee that Kafka offers is that a committed message will not be lost, as long as there is at least one in sync replica alive, at all times.

Kafka will remain available in the presence of node failures after a short fail-over period, but may not remain available in the presence of network partitions.



At its heart a Kafka partition is a replicated log. The replicated log is one of the most basic primitives in distributed data systems, and there are many approaches for implementing one. A replicated log can be used by other systems as a primitive for implementing other distributed systems in the state-machine style.

A replicated log models the process of coming into consensus on the order of a series of values (generally numbering the log entries 0, 1, 2, ...). There are many ways to implement this, but the simplest and fastest is with a leader who chooses the ordering of values provided to it. As long as the leader remains alive, all followers need to only copy the values and ordering, the leader chooses.
副本日志模拟了对一系列值顺序进入的过程(通常日志编号是 0,1,2,……)。有很多方法可以实现这一点,但最简单和最快的是leader提供选择的排序值,只要leader活着,所有的followers只需要复制和排序。

Of course if leaders didn't fail we wouldn't need followers! When the leader does die we need to choose a new leader from among the followers. But followers themselves may fall behind or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log replication algorithm must provide is that if we tell the client a message is committed, and the leader fails, the new leader we elect must also have that message. This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.

If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.

A common approach to this tradeoff is to use a majority vote for both the commit decision and the leader election. This is not what Kafka does, but let's explore it anyway to understand the tradeoffs. Let's say we have 2f+1 replicas. If f+1 replicas must receive a message prior to a commit being declared by the leader, and if we elect a new leader by electing the follower with the most complete log from at least f+1 replicas, then, with no more than f failures, the leader is guaranteed to have all committed messages. This is because among any f+1 replicas, there must be at least one replica that contains all committed messages. That replica's log will be the most complete and therefore will be selected as the new leader. There are many remaining details that each algorithm must handle (such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set) but we will ignore these for now.
一种常见的方法,用多数投票决定leader选举。kafka不是这样做的,但先让我们了解这个权衡,假如,我们有2f+1副本,如果f+1副本在leader提交之前必须收到消息,并且如果我们选举新的leader,至少从f+1副本选出最完整日志的follwer,并且不大于f的失败,leader担保所有已提交的信息。这是因为任何f+1副本中,必须至少有一个副本,其中包含所有已提交的消息。该副本的日志是最完整的,因此选定为新的leader。有许多其余细节,每个算法必须处理(如 精确的定义是什么让一个日志更加完整,确保日志一致性,leader故障期间或更改服务器的副本集),但我们现在不讲这些。

This majority vote approach has a very nice property: the latency is dependent on only the fastest servers. That is, if the replication factor is three, the latency is determined by the faster slave not the slower one.

There are a rich variety of algorithms in this family including ZooKeeper's Zab, Raft, and Viewstamped Replication. The most similar academic publication we are aware of to Kafka's actual implementation is PacificA from Microsoft.
有各种丰富的算法,包括zookeeper的Zab、 Raft和 Viewstamped Replication。kafka实现的最相似的学术理论是微软的PacificA。

The downside of majority vote is that it doesn't take many failures to leave you with no electable leaders. To tolerate one failure requires three copies of the data, and to tolerate two failures requires five copies of the data. In our experience having only enough redundancy to tolerate a single failure is not enough for a practical system, but doing every write five times, with 5x the disk space requirements and 1/5th the throughput, is not very practical for large volume data problems. This is likely why quorum algorithms more commonly appear for shared cluster configuration such as ZooKeeper but are less common for primary data storage. For example in HDFS the namenode's high-availability feature is built on a majority-vote-based journal, but this more expensive approach is not used for the data itself.
多数投票的缺点是,故障数还不太多的情况下会让你没有候选人可选,要容忍1个故障需要3个数据副本,容忍2个故障需要5个数据副本。实际的系统以我们的经验只能容忍单个故障的冗余是不够的,但是如果5个数据副本,每个写5次,5倍的磁盘空间要求,1/5的吞吐量,这对于大数据量系统是不实用的,这可能是quorum算法更通常在共享集群配置。如zookeeper,主要用于数据存储的系统是不太常见的。例如,在HDFS namenode的高可用性特性是建立在majority-vote-based journal,但这更昂贵的方法不能用于数据本身。

Kafka takes a slightly different approach to choosing its quorum set. Instead of majority vote, Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until all in-sync replicas have received the write. This ISR set is persisted to ZooKeeper whenever it changes. Because of this, any replica in the ISR is eligible to be elected leader. This is an important factor for Kafka's usage model where there are many partitions and ensuring leadership balance is important. With this ISR model and f+1 replicas, a Kafka topic can tolerate f failures without losing committed messages.

For most use cases we hope to handle, we think this tradeoff is a reasonable one. In practice, to tolerate f failures, both the majority vote and the ISR approach will wait for the same number of replicas to acknowledge before committing a message (e.g. to survive one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach requires two replicas and one acknowledgement). The ability to commit without the slowest servers is an advantage of the majority vote approach. However, we think it is ameliorated by allowing the client to choose whether they block on the message commit or not, and the additional throughput and disk space due to the lower required replication factor is worth it.

Another important design distinction is that Kafka does not require that crashed nodes recover with all their data intact. It is not uncommon for replication algorithms in this space to depend on the existence of "stable storage" that cannot be lost in any failure-recovery scenario without potential consistency violations. There are two primary problems with this assumption. First, disk errors are the most common problem we observe in real operation of persistent data systems and they often do not leave data intact. Secondly, even if this were not a problem, we do not want to require the use of fsync on every write for our consistency guarantees as this can reduce performance by two to three orders of magnitude. Our protocol for allowing a replica to rejoin the ISR ensures that before rejoining, it must fully re-sync again even if it lost unflushed data in its crash.

unclean leader选举:如果他们都死了怎么办?

Note that Kafka's guarantee with respect to data loss is predicated on at least on replica remaining in sync. If all the nodes replicating a partition die, this guarantee no longer holds.

However a practical system needs to do something reasonable when all the replicas die. If you are unlucky enough to have this occur, it is important to consider what will happen. There are two behaviors that could be implemented:

  1. Wait for a replica in the ISR to come back to life and choose this replica as the leader (hopefully it still has all its data).
  2. Choose the first replica (not necessarily in the ISR) that comes back to life as the leader.
    选择第一个副本 (不一定在 ISR),作为leader。

This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to have every committed message. In our current release we choose the second strategy and favor choosing a potentially inconsistent replica when all replicas in the ISR are dead.This behavior can be disabled using configuration property unclean.leader.election.enable, to support use cases where downtime is preferable to inconsistency.


This dilemma is not specific to Kafka. It exists in any quorum-based scheme. For example in a majority voting scheme, if a majority of servers suffer a permanent failure, then you must either choose to lose 100% of your data or violate consistency by taking what remains on an existing server as your new source of truth.

可用性和耐久性保证 (Availability and Durability Guarantees)

When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0,1 or all (-1) replicas. Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, when acks=all, acknowledgement happens as soon as all the current in-sync replicas have received the message. For example, if a topic is configured with only two replicas and one fails (i.e., only one in sync replica remains), then writes that specify acks=all will succeed. However, these writes could be lost if the remaining replica also fails. Although this ensures maximum availability of the partition, this behavior may be undesirable to some users who prefer durability over availability. Therefore, we provide two topic-level configurations that can be used to prefer message durability over availability:
当写入到kakfa时,生产者可以选择是否等待0,1 或 全部副本(-1)的消息确认。需要注意的是“所有副本确认”并不能保证全部分配副本已收到消息。默认情况下,当acks=all时,只要当前所有在同步中的副本收到消息,就会进行确认。例如:如果一个topic有2个副本,有一个故障(即,只剩下一个同步副本),即使写入是 acks=all 也将会成功。如果剩下的副本也故障了那么这些写入就会丢失。虽然这可以确保分区的最大可用性,这种方式可能不受欢迎,一些用户喜欢耐久性超过可用性。因此,我们提供两种配置。

Disable unclean leader election - if all replicas become unavailable, then the partition will remain unavailable until the most recent leader becomes available again. This effectively prefers unavailability over the risk of message loss. See the previous section on Unclean Leader Election for clarification.
禁用unclean leader选举 - 如果所有副本不可用,那份分区将一直不可用,直到最近的leader再次变得可用,这种宁愿不可用,而不是冒着丢失消息的风险。

Specify a minimum ISR size - the partition will only accept writes if the size of the ISR is above a certain minimum, in order to prevent the loss of messages that were written to just a single replica, which subsequently becomes unavailable. This setting only takes effect if the producer uses required.acks=-1 and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.
指定一个最小的ISR大小 — 如果ISR的大小高于最小值,则该分区才接受写入,以预防消息丢失,防止消息写到单个副本上,则让其变为不可用。如果生产者使用的是acks=all并保证最少这些同步分本已确认,则设置才生效。该设置提供一致性和可用性之间的权衡。ISR的大小设置的越高更好的保证一致性,因为消息写到更多的副本以减少消息丢失的风险。但是,这样降低了可用性,因为如果同步副本数低于最小的阈值,则该分区将不可写入。


The above discussion on replicated logs really covers only a single log, i.e. one topic partition. However a Kafka cluster will manage hundreds or thousands of these partitions. We attempt to balance partitions within a cluster in a round-robin fashion to avoid clustering all partitions for high-volume topics on a small number of nodes. Likewise we try to balance leadership so that each node is the leader for a proportional share of its partitions.

It is also important to optimize the leadership election process as that is the critical window of unavailability. A naive implementation of leader election would end up running an election per partition for all partitions a node hosted when that node failed. Instead, we elect one of the brokers as the "controller". This controller detects failures at the broker level and is responsible for changing the leader of all affected partitions in a failed broker. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number of partitions. If the controller fails, one of the surviving brokers will become the new controller.

您需要解锁本帖隐藏内容请: 点击这里

上一条: kafka消息传递保障
下一条: kafka日志压缩

  • 大佬,新版本如果所有副本死了好像改用了第一种策略
    This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to have every committed message. By default from version, Kafka chooses the first strategy and favor waiting for a consistent replica. This behavior can be changed using configuration property unclean.leader.election.enable, to support use cases where uptime is preferable to consistency.

    假如 我有3个broker,新建一个topic,分区指定3个 ,然后启动一个生产者写入一些数据,再执行./kafka-run-class.sh kafka.tools.GetOffsetShell
    此时显示的三个分区,是不是都独自在一个broker上? 那如果我指定了5个分区,它的数据保存位置怎么分呢?
    另外怎么知道 每个分区都保存在哪些个broker上?

    • 会不会出现一种情况,3个broker节点,大部分或是全部数据都集中在一个broker节点上?比如说生产者指定写入分区1,假设分区1是broker:2,那broker2就无论如何都不能宕机了。

        总结一下第三段我的疑问,副本以topic的分区为单位,那么一个broker上可以有多少分区? 一个broker上的一个leader的follower数量应该有多少?这个数量是副本数吗? 有一台主机,上面只有一个broker:1 ,那么所有运行在这个主机上的分区,follower 都听从这个broker:1 指挥吗?此时 我再拿一台主机 2020.09.09 12:00:00 新建一个broker: 2,broker启动后follower数量会马上与broker:1的follower一致吗?新启动的broker:2 通过重分区 将集群的旧topic同步到broker:2 上,那此时leader 应该是broker:1 如何改变为broker:2?

        • 1、broker不限制分区数量。


            设置的broker配置参数 default.replication.factor=3时,应该在创建Topic的元数据失败超时异常。
            但是参数调成 default.replication.factor=2,就可以写入消息。

            • [admin@slave1 kafka_2.11-2.1.0]$ bin/kafka-topics.sh --create --zookeeper --replication-factor 3 --partitions 3 --topic xueshan
              Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
              [2020-01-08 16:21:57,432] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.



                It is not uncommon for replication algorithms in this space to depend on the existence of "stable storage" that cannot be lost in any failure-recovery scenario without potential consistency violations.
                在任何故障恢复场景下都不违反一致性的、依赖"稳定存储"的复制算法是不太常见的。in this space未翻译,待纠正。

                A naive implementation of leader election would end up running an election per partition for all partitions a node hosted when that node failed.

                if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.