Kafka 设计解析(二):Kafka High Availability (上)

阅读数:42865 2015 年 4 月 24 日 23:21

Kafka 在 0.8 以前的版本中,并不提供 High Availablity 机制,一旦一个或多个 Broker 宕机,则宕机期间其上所有 Partition 都无法继续提供服务。若该 Broker 永远不能再恢复,亦或磁盘故障,则其上数据将丢失。而 Kafka 的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对 Failover 要求非常高。因此,Kafka 从 0.8 开始提供 High Availability 机制。本文从 Data Replication 和 Leader Election 两方面介绍了 Kafka 的 HA 机制。

Kafka 为何需要 High Available

为何需要 Replication

在 Kafka 在 0.8 以前的版本中,是没有 Replication 的,一旦某一个 Broker 宕机,则其上所有的 Partition 数据都不可被消费,这与 Kafka 数据持久性及 Delivery Guarantee 的设计目标相悖。同时 Producer 都不能再将数据存于这些 Partition 中。

  • 如果 Producer 使用同步模式则 Producer 会在尝试重新发送message.send.max.retries(默认值为 3)次后抛出 Exception,用户可以选择停止发送后续数据也可选择继续选择发送。而前者会造成数据的阻塞,后者会造成本应发往该 Broker 的数据的丢失。
  • 如果 Producer 使用异步模式,则 Producer 会尝试重新发送message.send.max.retries(默认值为 3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题。同时,Kafka 的 Producer 并未对异步模式提供 callback 接口。

由此可见,在没有 Replication 的情况下,一旦某机器宕机或者某个 Broker 停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言 Replication 机制的引入非常重要。

为何需要 Leader Election

注意:本文所述 Leader Election 主要指 Replica 之间的 Leader Election。

引入 Replication 之后,同一个 Partition 可能会有多个 Replica,而这时需要在这些 Replication 之间选出一个 Leader,Producer 和 Consumer 只与这个 Leader 交互,其它 Replica 作为 Follower 从 Leader 中复制数据。

因为需要保证同一个 Partition 的多个 Replica 之间的数据一致性(其中一个宕机后其它 Replica 必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。如果没有一个 Leader,所有 Replica 都可同时读 / 写数据,那就需要保证多个 Replica 之间互相(N×N 条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了 Replication 实现的复杂性,同时也增加了出现异常的几率。而引入 Leader 后,只有 Leader 负责数据读写,Follower 只向 Leader 顺序 Fetch 数据(N 条通路),系统更加简单且高效。

Kafka HA 设计解析

如何将所有 Replica 均匀分布到整个集群

为了更好的做负载均衡,Kafka 尽量将所有的 Partition 均匀分配到整个集群上。一个典型的部署方式是一个 Topic 的 Partition 数量大于 Broker 的数量。同时为了提高 Kafka 的容错能力,也需要将同一个 Partition 的 Replica 尽量分散到不同的机器。实际上,如果所有的 Replica 都在同一个 Broker 上,那一旦该 Broker 宕机,该 Partition 的所有 Replica 都无法工作,也就达不到 HA 的效果。同时,如果某个 Broker 宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有 Broker 上。

Kafka 分配 Replica 的算法如下:

  1. 将所有 Broker(假设共 n 个 Broker)和待分配的 Partition 排序
  2. 将第 i 个 Partition 分配到第(i mod n)个 Broker 上
  3. 将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mode n)个 Broker 上

Data Replication

Kafka 的 Data Replication 需要解决如下问题:

  • 怎样 Propagate 消息
  • 在向 Producer 发送 ACK 前需要保证有多少个 Replica 已经收到该消息
  • 怎样处理某个 Replica 不工作的情况
  • 怎样处理 Failed Replica 恢复回来的情况

Propagate 消息

Producer 在发布消息到某个 Partition 时,先通过 ZooKeeper 找到该 Partition 的 Leader,然后无论该 Topic 的 Replication Factor 为多少(也即该 Partition 有多少个 Replica),Producer 只将该消息发送到该 Partition 的 Leader。Leader 会将该消息写入其本地 Log。每个 Follower 都从 Leader pull 数据。这种方式上,Follower 存储的数据顺序与 Leader 保持一致。Follower 在收到该消息并写入其 Log 后,向 Leader 发送 ACK。一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,该消息就被认为已经 commit 了,Leader 将增加 HW 并且向 Producer 发送 ACK。

为了提高性能,每个 Follower 在接收到数据后就立马向 Leader 发送 ACK,而非等到数据写入 Log 中。因此,对于已经 commit 的消息,Kafka 只能保证它被存于多个 Replica 的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被 Consumer 消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka 会考虑提供更高的持久性。

Consumer 读消息也是从 Leader 读取,只有被 commit 过的消息(offset 低于 HW 的消息)才会暴露给 Consumer。

Kafka Replication 的数据流如下图所示:

ACK 前需要保证有多少个备份

和大部分分布式系统一样,Kafka 处理失败需要明确定义一个 Broker 是否“活着”。对于 Kafka 而言,Kafka 存活包含两个条件,一是它必须维护与 ZooKeeper 的 session(这个通过 ZooKeeper 的 Heartbeat 机制来实现)。二是 Follower 必须能够及时将 Leader 的消息复制过来,不能“落后太多”。

Leader 会跟踪与其保持同步的 Replica 列表,该列表称为 ISR(即 in-sync Replica)。如果一个 Follower 宕机,或者落后太多,Leader 将把它从 ISR 中移除。这里所描述的“落后太多”指 Follower 复制的消息落后于 Leader 后的条数超过预定值(该值可在 $KAFKA_HOME/config/server.properties 中通过replica.lag.max.messages配置,其默认值是 4000)或者 Follower 超过一定时间(该值可在 $KAFKA_HOME/config/server.properties 中通过replica.lag.time.max.ms来配置,其默认值是 10000)未向 Leader 发送 fetch 请求。

Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的 Follower 都复制完,这条消息才会被认为 commit,这种复制方式极大的影响了吞吐率(高吞吐率是 Kafka 非常重要的一个特性)。而异步复制方式下,Follower 异步的从 Leader 复制数据,数据只要被 Leader 写入 log 就被认为已经 commit,这种情况下如果 Follower 都复制完都落后于 Leader,而如果 Leader 突然宕机,则会丢失数据。而 Kafka 的这种使用 ISR 的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower 可以批量的从 Leader 复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了 Follower 与 Leader 的差距。

需要说明的是,Kafka 只解决 fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被 ISR 里的所有 Follower 都从 Leader 复制过去才会被认为已提交。这样就避免了部分数据被写进了 Leader,还没来得及被任何 Follower 复制就宕机了,而造成数据丢失(Consumer 无法消费这些数据)。而对于 Producer 而言,它可以选择是否等待消息 commit,这可以通过request.required.acks来设置。这种机制确保了只要 ISR 有一个或以上的 Follower,一条被 commit 的消息就不会丢失。

Leader Election 算法

上文说明了 Kafka 是如何做 Replication 的,另外一个很重要的问题是当 Leader 宕机了,怎样在 Follower 中选举出新的 Leader。因为 Follower 可能落后许多或者 crash 了,所以必须确保选择“最新”的 Follower 作为新的 Leader。一个基本的原则就是,如果 Leader 不在了,新的 Leader 必须拥有原来的 Leader commit 过的所有消息。这就需要作一个折衷,如果 Leader 在标明一条消息被 commit 前等待更多的 Follower 确认,那在它宕机之后就有更多的 Follower 可以作为新的 Leader,但这也会造成吞吐率的下降。

一种非常常用的选举 leader 的方式是“Majority Vote”(“少数服从多数”),但 Kafka 并未采用这种方式。这种模式下,如果我们有 2f+1 个 Replica(包含 Leader 和 Follower),那在 commit 之前必须保证有 f+1 个 Replica 复制完消息,为了保证正确选出新的 Leader,fail 的 Replica 不能超过 f 个。因为在剩下的任意 f+1 个 Replica 里,至少有一个 Replica 包含有最新的所有消息。这种方式有个很大的优势,系统的 latency 只取决于最快的几个 Broker,而非最慢那个。Majority Vote 也有一些劣势,为了保证 Leader Election 的正常进行,它所能容忍的 fail 的 follower 个数比较少。如果要容忍 1 个 follower 挂掉,必须要有 3 个以上的 Replica,如果要容忍 2 个 Follower 挂掉,必须要有 5 个以上的 Replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的 Replica,而大量的 Replica 又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在 ZooKeeper 这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如 HDFS 的 HA Feature 是基于 majority-vote-based journal ,但是它的数据存储并没有使用这种方式。

实际上,Leader Election 算法非常多,比如 ZooKeeper 的 Zab , Raft Viewstamped Replication 。而 Kafka 所使用的 Leader Election 算法更像微软的 PacificA 算法。

Kafka 在 ZooKeeper 中动态维护了一个 ISR(in-sync replicas),这个 ISR 里的所有 Replica 都跟上了 leader,只有 ISR 里的成员才有被选为 Leader 的可能。在这种模式下,对于 f+1 个 Replica,一个 Partition 能在保证不丢失已经 commit 的消息的前提下容忍 f 个 Replica 的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍 f 个 Replica 的失败,Majority Vote 和 ISR 在 commit 前需要等待的 Replica 数量是一样的,但是 ISR 需要的总的 Replica 的个数几乎是 Majority Vote 的一半。

虽然 Majority Vote 与 ISR 相比有不需等待最慢的 Broker 这一优势,但是 Kafka 作者认为 Kafka 可以通过 Producer 选择是否被 commit 阻塞来改善这一问题,并且节省下来的 Replica 和磁盘使得 ISR 模式仍然值得。

如何处理所有 Replica 都不工作

上文提到,在 ISR 中至少有一个 follower 时,Kafka 可以确保已经 commit 的数据不丢失,但如果某个 Partition 的所有 Replica 都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

  • 等待 ISR 中的任一个 Replica“活”过来,并且选它作为 Leader
  • 选择第一个“活”过来的 Replica(不一定是 ISR 中的)作为 Leader

这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待 ISR 中的 Replica“活”过来,那不可用的时间就可能会相对较长。而且如果 ISR 中的所有 Replica 都无法“活”过来了,或者数据都丢失了,这个 Partition 将永远不可用。选择第一个“活”过来的 Replica 作为 Leader,而这个 Replica 不是 ISR 中的 Replica,那即使它并不保证已经包含了所有已 commit 的消息,它也会成为 Leader 而作为 consumer 的数据源(前文有说明,所有读写都由 Leader 完成)。Kafka0.8.* 使用了第二种方式。根据 Kafka 的文档,在以后的版本中,Kafka 支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。

如何选举 Leader

最简单最直观的方案是,所有 Follower 都在 ZooKeeper 上设置一个 Watch,一旦 Leader 宕机,其对应的 ephemeral znode 会自动删除,此时所有 Follower 都尝试创建该节点,而创建成功者(ZooKeeper 保证只有一个能创建成功)即是新的 Leader,其它 Replica 即为 Follower。

但是该方法会有 3 个问题:

  • split-brain 这是由 ZooKeeper 的特性引起的,虽然 ZooKeeper 能保证所有 Watch 按顺序触发,但并不能保证同一时刻所有 Replica“看”到的状态是一样的,这就可能造成不同 Replica 的响应不一致
  • herd effect 如果宕机的那个 Broker 上的 Partition 比较多,会造成多个 Watch 被触发,造成集群内大量的调整
  • ZooKeeper 负载过重 每个 Replica 都要为此在 ZooKeeper 上注册一个 Watch,当集群规模增加到几千个 Partition 时 ZooKeeper 负载会过重。

Kafka 0.8.* 的 Leader Election 方案解决了上述问题,它在所有 broker 中选出一个 controller,所有 Partition 的 Leader 选举都由 controller 决定。controller 会将 Leader 的改变直接通过 RPC 的方式(比 ZooKeeper Queue 的方式更高效)通知需为为此作为响应的 Broker。同时 controller 也负责增删 Topic 以及 Replica 的重新分配。

HA 相关 ZooKeeper 结构

首先声明本节所示 ZooKeeper 结构中,实线框代表路径名是固定的,而虚线框代表路径名与业务相关

admin (该目录下 znode 只有在有相关操作时才会存在,操作结束时会将其删除)

/admin/preferred_replica_election 数据结构

{
   "fields":[
      {
         "name":"version",
         "type":"int",
         "doc":"version id"
      },
      {
         "name":"partitions",
         "type":{
            "type":"array",
            "items":{
               "fields":[
                  {
                     "name":"topic",
                     "type":"string",
                     "doc":"topic of the partition for which preferred replica election should be triggered"
                  },
                  {
                     "name":"partition",
                     "type":"int",
                     "doc":"the partition for which preferred replica election should be triggered"
                  }
               ],
            }
            "doc":"an array of partitions for which preferred replica election should be triggered"
         }
      }
   ]
}

Example:

{
  "version": 1,
  "partitions":
     [
        {
            "topic": "topic1",
            "partition": 8         
        },
        {
            "topic": "topic2",
            "partition": 16        
        }
     ]            
}

/admin/reassign_partitions用于将一些 Partition 分配到不同的 broker 集合上。对于每个待重新分配的 Partition,Kafka 会在该 znode 上存储其所有的 Replica 和相应的 Broker id。该 znode 由管理进程创建并且一旦重新分配成功它将会被自动移除。其数据结构如下:

{ 
"fields":[ 
{ 
"name":"version", 
"type":"int", 
"doc":"version id" 
}, 
{ 
"name":"partitions", 
"type":{ 
"type":"array", 
"items":{ 
"fields":[ 
{ 
"name":"topic", 
"type":"string", 
"doc":"topic of the partition to be reassigned" 
}, 
{ 
"name":"partition", 
"type":"int", 
"doc":"the partition to be reassigned" 
}, 
{ 
"name":"replicas", 
"type":"array", 
"items":"int", 
"doc":"a list of replica ids" 
} 
], 
} 
"doc":"an array of partitions to be reassigned to new replicas" 
} 
} 
] 
}
Example:
{
  "version": 1,
  "partitions":
     [
        {
            "topic": "topic3",
            "partition": 1,
            "replicas": [1, 2, 3]
        }
     ]            
}

/admin/delete_topics 数据结构:

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "topics",
       "type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"}
      } ]
}

Example:
{
  "version": 1,
  "topics": ["topic4", "topic5"]
}

brokers

broker(即/brokers/ids/[brokerId])存储“活着”的 broker 信息。数据结构如下:

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "host", "type": "string", "doc": "ip address or host name of the broker"},
      {"name": "port", "type": "int", "doc": "port of the broker"},
      {"name": "jmx_port", "type": "int", "doc": "port for jmx"}
    ]
}

Example:
{
    "jmx_port":-1,
    "host":"node1",
    "version":1,
    "port":9092
}

topic 注册信息(/brokers/topics/[topic]),存储该 topic 的所有 partition 的所有 replica 所在的 broker id,第一个 replica 即为 preferred replica,对一个给定的 partition,它在同一个 broker 上最多只有一个 replica, 因此 broker id 可作为 replica id。数据结构如下:

Schema:
{ "fields" :
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "partitions",
       "type": {"type": "map",
                "values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
                "doc": "a map from partition id to replica list"},
      }
    ]
}
Example:
{
    "version":1,
    "partitions":
        {"12":[6],
        "8":[2],
        "4":[6],
        "11":[5],
        "9":[3],
        "5":[7],
        "10":[4],
        "6":[8],
        "1":[3],
        "0":[2],
        "2":[4],
        "7":[1],
        "3":[5]}
}

partition state(/brokers/topics/[topic]/partitions/[partitionId]/state) 结构如下:

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "isr",
       "type": {"type": "array",
                "items": "int",
                "doc": "an array of the id of replicas in isr"}
      },
      {"name": "leader", "type": "int", "doc": "id of the leader replica"},
      {"name": "controller_epoch", "type": "int", "doc": "epoch of the controller that last updated the leader and isr info"},
      {"name": "leader_epoch", "type": "int", "doc": "epoch of the leader"}
    ]
}

Example:
{
    "controller_epoch":29,
    "leader":2,
    "version":1,
    "leader_epoch":48,
    "isr":[2]
}

controller

/controller -> int (broker id of the controller)存储当前 controller 的信息

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "brokerid", "type": "int", "doc": "broker id of the controller"}
    ]
}
Example:
{
    "version":1,
  "brokerid":8
}

/controller_epoch -> int (epoch)直接以整数形式存储 controller epoch,而非像其它 znode 一样以 JSON 字符串形式存储。

broker failover 过程简介

  1. Controller 在 ZooKeeper 注册 Watch,一旦有 Broker 宕机(这是用宕机代表任何让系统认为其 die 的情景,包括但不限于机器断电,网络不可用,GC 导致的 Stop The World,进程 crash 等),其在 ZooKeeper 对应的 znode 会自动被删除,ZooKeeper 会 fire Controller 注册的 watch,Controller 读取最新的幸存的 Broker。
  2. Controller 决定 set_p,该集合包含了宕机的所有 Broker 上的所有 Partition。
  3. 对 set_p 中的每一个 Partition

    3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该 Partition 当前的 ISR

    3.2 决定该 Partition 的新 Leader。如果当前 ISR 中有至少一个 Replica 还幸存,则选择其中一个作为新 Leader,新的 ISR 则包含当前 ISR 中所有幸存的 Replica。否则选择该 Partition 中任意一个幸存的 Replica 作为新的 Leader 以及 ISR(该场景下可能会有潜在的数据丢失)。如果该 Partition 的所有 Replica 都宕机了,则将新的 Leader 设置为 -1。

    3.3 将新的 Leader,ISR 和新的leader_epochcontroller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其 version 在 3.1 至 3.3 的过程中无变化时才会执行,否则跳转到 3.1

  4. 直接通过 RPC 向 set_p 相关的 Broker 发送 LeaderAndISRRequest 命令。Controller 可以在一个 RPC 操作中发送多个命令从而提高效率。

    broker failover 顺序图如下所示。

作者简介

郭俊(Jason),硕士,从事大数据平台研发工作,精通 Kafka 等分布式消息系统及 Storm 等流式处理系统。

新浪微博:郭俊_Jason 微信:habren 个人博客: http://www.jasongj.com

下篇预告

下篇文章将详细介绍 Kafka HA 相关的异常情况处理,例如,怎样处理 Broker failover,Follower 如何从 Leader fetch 消息,如何重新分配 Replica,如何处理 Controller failure 等。


感谢郭蕾对本文的策划和审校。

给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流。

评论

发布