“AI 技术+人才”如何成为企业增长新引擎?戳此了解>>> 了解详情
写点什么

Kafka 设计解析(三):Kafka High Availability (下)

  • 2015-06-11
  • 本文字数:9712 字

    阅读完需:约 32 分钟

Kafka 是由 LinkedIn 开发的一个分布式的消息系统,使用 Scala 编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Apache Storm、Spark 都支持与 Kafka 集成。InfoQ 一直在紧密关注Kafka 的应用以及发展,“Kafka 剖析”专栏将会从架构设计、实现、应用场景、性能等方面深度解析Kafka。

本文在上篇文章基础上,更加深入讲解了Kafka 的HA 机制,主要阐述了HA 相关各种场景,如Broker failover、Controller failover、Topic 创建/ 删除、Broker 启动、Follower 从Leader fetch 数据等详细处理过程。同时介绍了Kafka 提供的与Replication 相关的工具,如重新分配Partition 等。

Broker Failover 过程

Controller 对 Broker failure 的处理过程

  1. Controller 在 ZooKeeper 的/brokers/ids节点上注册 Watch。一旦有 Broker 宕机(本文用宕机代表任何让 Kafka 认为其 Broker die 的情景,包括但不限于机器断电,网络不可用,GC 导致的 Stop The World,进程 crash 等),其在 ZooKeeper 对应的 Znode 会自动被删除,ZooKeeper 会 fire Controller 注册的 Watch,Controller 即可获取最新的幸存的 Broker 列表。
  2. Controller 决定 set_p,该集合包含了宕机的所有 Broker 上的所有 Partition。
  3. 对 set_p 中的每一个 Partition: 3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该 Partition 当前的 ISR。

3.2 决定该 Partition 的新 Leader。如果当前 ISR 中有至少一个 Replica 还幸存,则选择其中一个作为新 Leader,新的 ISR 则包含当前 ISR 中所有幸存的 Replica。否则选择该 Partition 中任意一个幸存的 Replica 作为新的 Leader 以及 ISR(该场景下可能会有潜在的数据丢失)。如果该 Partition 的所有 Replica 都宕机了,则将新的 Leader 设置为 -1。

3.3 将新的 Leader,ISR 和新的leader_epochcontroller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有 Controller 版本在 3.1 至 3.3 的过程中无变化时才会执行,否则跳转到 3.1。
4. 直接通过 RPC 向 set_p 相关的 Broker 发送 LeaderAndISRRequest 命令。Controller 可以在一个 RPC 操作中发送多个命令从而提高效率。 Broker failover 顺序图如下所示。

LeaderAndIsrRequest 结构如下

LeaderAndIsrResponse 结构如下

创建 / 删除 Topic

  1. Controller 在 ZooKeeper 的/brokers/topics节点上注册 Watch,一旦某个 Topic 被创建或删除,则 Controller 会通过 Watch 得到新创建 / 删除的 Topic 的 Partition/Replica 分配。
  2. 对于删除 Topic 操作,Topic 工具会将该 Topic 名字存于/admin/delete_topics。若delete.topic.enable为 true,则 Controller 注册在/admin/delete_topics上的 Watch 被 fire,Controller 通过回调向对应的 Broker 发送 StopReplicaRequest,若为 false 则 Controller 不会在/admin/delete_topics上注册 Watch,也就不会对该事件作出反应。
  3. 对于创建 Topic 操作,Controller 从/brokers/ids读取当前所有可用的 Broker 列表,对于 set_p 中的每一个 Partition: 3.1 从分配给该 Partition 的所有 Replica(称为 AR)中任选一个可用的 Broker 作为新的 Leader,并将 AR 设置为新的 ISR(因为该 Topic 是新创建的,所以 AR 中所有的 Replica 都没有数据,可认为它们都是同步的,也即都在 ISR 中,任意一个 Replica 都可作为 Leader)

3.2 将新的 Leader 和 ISR 写入/brokers/topics/[topic]/partitions/[partition]
4. 直接通过 RPC 向相关的 Broker 发送 LeaderAndISRRequest。 创建 Topic 顺序图如下所示。

Broker 响应请求流程

Broker 通过kafka.network.SocketServer及相关模块接受各种请求并作出响应。整个网络通信模块基于 Java NIO 开发,并采用 Reactor 模式,其中包含 1 个 Acceptor 负责接受客户请求,N 个 Processor 负责读写数据,M 个 Handler 处理业务逻辑。

Acceptor 的主要职责是监听并接受客户端(请求发起方,包括但不限于 Producer,Consumer,Controller,Admin Tool)的连接请求,并建立和客户端的数据传输通道,然后为该客户端指定一个 Processor,至此它对该客户端该次请求的任务就结束了,它可以去响应下一个客户端的连接请求了。其核心代码如下。

Processor 主要负责从客户端读取数据并将响应返回给客户端,它本身并不处理具体的业务逻辑,并且其内部维护了一个队列来保存分配给它的所有 SocketChannel。Processor 的 run 方法会循环从队列中取出新的 SocketChannel 并将其SelectionKey.OP_READ注册到 selector 上,然后循环处理已就绪的读(请求)和写(响应)。Processor 读取完数据后,将其封装成 Request 对象并将其交给 RequestChannel。

RequestChannel 是 Processor 和 KafkaRequestHandler 交换数据的地方,它包含一个队列 requestQueue 用来存放 Processor 加入的 Request,KafkaRequestHandler 会从里面取出 Request 来处理;同时它还包含一个 respondQueue,用来存放 KafkaRequestHandler 处理完 Request 后返还给客户端的 Response。

Processor 会通过 processNewResponses 方法依次将 requestChannel 中 responseQueue 保存的 Response 取出,并将对应的SelectionKey.OP_WRITE事件注册到 selector 上。当 selector 的 select 方法返回时,对检测到的可写通道,调用 write 方法将 Response 返回给客户端。

KafkaRequestHandler 循环从 RequestChannel 中取 Request 并交给kafka.server.KafkaApis处理具体的业务逻辑。

LeaderAndIsrRequest 响应过程

对于收到的 LeaderAndIsrRequest,Broker 主要通过 ReplicaManager 的 becomeLeaderOrFollower 处理,流程如下:

  1. 若请求中 controllerEpoch 小于当前最新的 controllerEpoch,则直接返回 ErrorMapping.StaleControllerEpochCode。
  2. 对于请求中 partitionStateInfos 中的每一个元素,即((topic, partitionId), partitionStateInfo): 2.1 若 partitionStateInfo 中的 leader epoch 大于当前 ReplicManager 中存储的 (topic, partitionId) 对应的 partition 的 leader epoch,则:

2.1.1 若当前 brokerid(或者说 replica id)在 partitionStateInfo 中,则将该 partition 及 partitionStateInfo 存入一个名为 partitionState 的 HashMap 中

2.1.2 否则说明该 Broker 不在该 Partition 分配的 Replica list 中,将该信息记录于 log 中

2.2 否则将相应的 Error code(ErrorMapping.StaleLeaderEpochCode)存入 Response 中
3. 筛选出 partitionState 中 Leader 与当前 Broker ID 相等的所有记录存入 partitionsTobeLeader 中,其它记录存入 partitionsToBeFollower 中。
4. 若 partitionsTobeLeader 不为空,则对其执行 makeLeaders 方。
5. 若 partitionsToBeFollower 不为空,则对其执行 makeFollowers 方法。
6. 若 highwatermak 线程还未启动,则将其启动,并将 hwThreadInitialized 设为 true。
7. 关闭所有 Idle 状态的 Fetcher。

LeaderAndIsrRequest 处理过程如下图所示

Broker 启动过程

Broker 启动后首先根据其 ID 在 ZooKeeper 的/brokers/idszonde 下创建临时子节点( Ephemeral node ),创建成功后 Controller 的 ReplicaStateMachine 注册其上的 Broker Change Watch 会被 fire,从而通过回调 KafkaController.onBrokerStartup 方法完成以下步骤:

  1. 向所有新启动的 Broker 发送 UpdateMetadataRequest,其定义如下。
  2. 将新启动的 Broker 上的所有 Replica 设置为 OnlineReplica 状态,同时这些 Broker 会为这些 Partition 启动 high watermark 线程。
  3. 通过 partitionStateMachine 触发 OnlinePartitionStateChange。

Controller Failover

Controller 也需要 Failover。每个 Broker 都会在 Controller Path (/controller) 上注册一个 Watch。当前 Controller 失败时,对应的 Controller Path 会自动消失(因为它是 Ephemeral Node),此时该 Watch 被 fire,所有“活”着的 Broker 都会去竞选成为新的 Controller(创建新的 Controller Path),但是只会有一个竞选成功(这点由 ZooKeeper 保证)。竞选成功者即为新的 Leader,竞选失败者则重新在新的 Controller Path 上注册 Watch。因为 ZooKeeper 的 Watch 是一次性的,被 fire 一次之后即失效,所以需要重新注册。

Broker 成功竞选为新 Controller 后会触发 KafkaController.onControllerFailover 方法,并在该方法中完成如下操作:

  1. 读取并增加 Controller Epoch。
  2. 在 ReassignedPartitions Patch(/admin/reassign_partitions) 上注册 Watch。
  3. 在 PreferredReplicaElection Path(/admin/preferred_replica_election) 上注册 Watch。
  4. 通过 partitionStateMachine 在 Broker Topics Patch(/brokers/topics) 上注册 Watch。
  5. delete.topic.enable设置为 true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 Watch。
  6. 通过 replicaStateMachine 在 Broker Ids Patch(/brokers/ids) 上注册 Watch。
  7. 初始化 ControllerContext 对象,设置当前所有 Topic,“活”着的 Broker 列表,所有 Partition 的 Leader 及 ISR 等。
  8. 启动 replicaStateMachine 和 partitionStateMachine。
  9. 将 brokerState 状态设置为 RunningAsController。
  10. 将每个 Partition 的 Leadership 信息发送给所有“活”着的 Broker。
  11. auto.leader.rebalance.enable配置为 true(默认值是 true),则启动 partition-rebalance 线程。
  12. delete.topic.enable设置为 true 且 Delete Topic Patch(/admin/delete_topics) 中有值,则删除相应的 Topic。

Partition 重新分配

管理工具发出重新分配 Partition 请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发 ReassignedPartitionsIsrChangeListener,从而通过执行回调函数 KafkaController.onPartitionReassignment 来完成以下操作:

  1. 将 ZooKeeper 中的 AR(Current Assigned Replicas)更新为 OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  2. 强制更新 ZooKeeper 中的 leader epoch,向 AR 中的每个 Replica 发送 LeaderAndIsrRequest。
  3. 将 RAR - OAR 中的 Replica 设置为 NewReplica 状态。
  4. 等待直到 RAR 中所有的 Replica 都与其 Leader 同步。
  5. 将 RAR 中所有的 Replica 都设置为 OnlineReplica 状态。
  6. 将 Cache 中的 AR 设置为 RAR。
  7. 若 Leader 不在 RAR 中,则从 RAR 中重新选举出一个新的 Leader 并发送 LeaderAndIsrRequest。若新的 Leader 不是从 RAR 中选举而出,则还要增加 ZooKeeper 中的 leader epoch。
  8. 将 OAR - RAR 中的所有 Replica 设置为 OfflineReplica 状态,该过程包含两部分。第一,将 ZooKeeper 上 ISR 中的 OAR - RAR 移除并向 Leader 发送 LeaderAndIsrRequest 从而通知这些 Replica 已经从 ISR 中移除;第二,向 OAR - RAR 中的 Replica 发送 StopReplicaRequest 从而停止不再分配给该 Partition 的 Replica。
  9. 将 OAR - RAR 中的所有 Replica 设置为 NonExistentReplica 状态从而将其从磁盘上删除。
  10. 将 ZooKeeper 中的 AR 设置为 RAR。
  11. 删除/admin/reassign_partition

注意:最后一步才将 ZooKeeper 中的 AR 更新,因为这是唯一一个持久存储 AR 的地方,如果 Controller 在这一步之前 crash,新的 Controller 仍然能够继续完成该过程。

以下是 Partition 重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition 重新分配过程中 ZooKeeper 中的 AR 和 Leader/ISR 路径如下

AR leader/isr Sttep {1,2,3} 1/{1,2,3} (initial state) {1,2,3,4,5,6} 1/{1,2,3} (step 2) {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4) {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7) {1,2,3,4,5,6} 4/{4,5,6} (step 8) {4,5,6} 4/{4,5,6} (step 10)### Follower 从 Leader Fetch 数据

Follower 通过向 Leader 发送 FetchRequest 获取消息,FetchRequest 结构如下

从 FetchRequest 的结构可以看出,每个 Fetch 请求都要指定最大等待时间和最小获取字节数,以及由 TopicAndPartition 和 PartitionFetchInfo 构成的 Map。实际上,Follower 从 Leader 数据和 Consumer 从 Broker Fetch 数据,都是通过 FetchRequest 请求完成,所以在 FetchRequest 结构中,其中一个字段是 clientID,并且其默认值是 ConsumerConfig.DefaultClientId。

Leader 收到 Fetch 请求后,Kafka 通过 KafkaApis.handleFetchRequest 响应该请求,响应过程如下:

  1. replicaManager 根据请求读出数据存入 dataRead 中。
  2. 如果该请求来自 Follower 则更新其相应的 LEO(log end offset)以及相应 Partition 的 High Watermark
  3. 根据 dataRead 算出可读消息长度(单位为字节)并存入 bytesReadable 中。
  4. 满足下面 4 个条件中的 1 个,则立即将相应的数据返回
  • Fetch 请求不希望等待,即 fetchRequest.macWait <= 0
  • Fetch 请求不要求一定能取到消息,即 fetchRequest.numPartitions <= 0,也即 requestInfo 为空
  • 有足够的数据可供返回,即 bytesReadable >= fetchRequest.minBytes
  • 读取数据时发生异常
  1. 若不满足以上 4 个条件,FetchRequest 将不会立即返回,并将该请求封装成 DelayedFetch。检查该 DeplayedFetch 是否满足,若满足则返回请求,否则将该请求加入 Watch 列表

Leader 通过以 FetchResponse 的形式将消息返回给 Follower,FetchResponse 结构如下

Replication 工具

Topic Tool

$KAFKA_HOME/bin/kafka-topics.sh,该工具可用于创建、删除、修改、查看某个 Topic,也可用于列出所有 Topic。另外,该工具还可修改某个 Topic 的以下配置。

复制代码
unclean.leader.election.enable
delete.retention.ms
segment.jitter.ms
retention.ms
flush.ms
segment.bytes
flush.messages
segment.ms
retention.bytes
cleanup.policy
segment.index.bytes
min.cleanable.dirty.ratio
max.message.bytes
file.delete.delay.ms
min.insync.replicas
index.interval.bytes

Replica Verification Tool

$KAFKA_HOME/bin/kafka-replica-verification.sh,该工具用来验证所指定的一个或多个 Topic 下每个 Partition 对应的所有 Replica 是否都同步。可通过topic-white-list这一参数指定所需要验证的所有 Topic,支持正则表达式。

Preferred Replica Leader Election Tool

用途

有了 Replication 机制后,每个 Partition 可能有多个备份。某个 Partition 的 Replica 列表叫作 AR(Assigned Replicas),AR 中的第一个 Replica 即为“Preferred Replica”。创建一个新的 Topic 或者给已有 Topic 增加 Partition 时,Kafka 保证 Preferred Replica 被均匀分布到集群中的所有 Broker 上。理想情况下,Preferred Replica 会被选为 Leader。以上两点保证了所有 Partition 的 Leader 被均匀分布到了集群当中,这一点非常重要,因为所有的读写操作都由 Leader 完成,若 Leader 分布过于集中,会造成集群负载不均衡。但是,随着集群的运行,该平衡可能会因为 Broker 的宕机而被打破,该工具就是用来帮助恢复 Leader 分配的平衡。

事实上,每个 Topic 从失败中恢复过来后,它默认会被设置为 Follower 角色,除非某个 Partition 的 Replica 全部宕机,而当前 Broker 是该 Partition 的 AR 中第一个恢复回来的 Replica。因此,某个 Partition 的 Leader(Preferred Replica)宕机并恢复后,它很可能不再是该 Partition 的 Leader,但仍然是 Preferred Replica。

原理

1. 在 ZooKeeper 上创建/admin/preferred_replica_election节点,并存入需要调整 Preferred Replica 的 Partition 信息。

2. Controller 一直 Watch 该节点,一旦该节点被创建,Controller 会收到通知,并获取该内容。

3. Controller 读取 Preferred Replica,如果发现该 Replica 当前并非是 Leader 并且它在该 Partition 的 ISR 中,Controller 向该 Replica 发送 LeaderAndIsrRequest,使该 Replica 成为 Leader。如果该 Replica 当前并非是 Leader,且不在 ISR 中,Controller 为了保证没有数据丢失,并不会将其设置为 Leader。

用法

复制代码
$KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

在包含 8 个 Broker 的 Kafka 集群上,创建 1 个名为 topic1,replication-factor 为 3,Partition 数为 8 的 Topic,使用如下命令查看其 Partition/Replica 分布。

复制代码
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181

查询结果如下图所示,从图中可以看到,Kafka 将所有 Replica 均匀分布到了整个集群,并且 Leader 也均匀分布。

手动停止部分 Broker,topic1 的 Partition/Replica 分布如下图所示。从图中可以看到,由于 Broker 1/2/4 都被停止,Partition 0 的 Leader 由原来的 1 变为 3,Partition 1 的 Leader 由原来的 2 变为 5,Partition 2 的 Leader 由原来的 3 变为 6,Partition 3 的 Leader 由原来的 4 变为 7。

再重新启动 ID 为 1 的 Broker,topic1 的 Partition/Replica 分布如下。可以看到,虽然 Broker 1 已经启动(Partition 0 和 Partition5 的 ISR 中有 1),但是 1 并不是任何一个 Parititon 的 Leader,而 Broker 5/6/7 都是 2 个 Partition 的 Leader,即 Leader 的分布不均衡——一个 Broker 最多是 2 个 Partition 的 Leader,而最少是 0 个 Partition 的 Leader。

运行该工具后,topic1 的 Partition/Replica 分布如下图所示。由图可见,除了 Partition 1 和 Partition 3 由于 Broker 2 和 Broker 4 还未启动,所以其 Leader 不是其 Preferred Repliac 外,其它所有 Partition 的 Leader 都是其 Preferred Replica。同时,与运行该工具前相比,Leader 的分配更均匀——一个 Broker 最多是 2 个 Parittion 的 Leader,最少是 1 个 Partition 的 Leader。

启动 Broker 2 和 Broker 4,Leader 分布与上一步相比并未变化,如下图所示。

再次运行该工具,所有 Partition 的 Leader 都由其 Preferred Replica 承担,Leader 分布更均匀——每个 Broker 承担 1 个 Partition 的 Leader 角色。

除了手动运行该工具使 Leader 分配均匀外,Kafka 还提供了自动平衡 Leader 分配的功能,该功能可通过将auto.leader.rebalance.enable设置为 true 开启,它将周期性检查 Leader 分配是否平衡,若不平衡度超过一定阈值则自动由 Controller 尝试将各 Partition 的 Leader 设置为其 Preferred Replica。检查周期由leader.imbalance.check.interval.seconds指定,不平衡度阈值由leader.imbalance.per.broker.percentage指定。

Kafka Reassign Partitions Tool

用途

该工具的设计目标与 Preferred Replica Leader Election Tool 有些类似,都旨在促进 Kafka 集群的负载均衡。不同的是,Preferred Replica Leader Election 只能在 Partition 的 AR 范围内调整其 Leader,使 Leader 分布均匀,而该工具还可以调整 Partition 的 AR。

Follower 需要从 Leader Fetch 数据以保持与 Leader 同步,所以仅仅保持 Leader 分布的平衡对整个集群的负载均衡来说是不够的。另外,生产环境下,随着负载的增大,可能需要给 Kafka 集群扩容。向 Kafka 集群中增加 Broker 非常简单方便,但是对于已有的 Topic,并不会自动将其 Partition 迁移到新加入的 Broker 上,此时可用该工具达到此目的。某些场景下,实际负载可能远小于最初预期负载,此时可用该工具将分布在整个集群上的 Partition 重装分配到某些机器上,然后可以停止不需要的 Broker 从而实现节约资源的目的。

需要说明的是,该工具不仅可以调整 Partition 的 AR 位置,还可调整其 AR 数量,即改变该 Topic 的 replication factor。

原理

该工具只负责将所需信息存入 ZooKeeper 中相应节点,然后退出,不负责相关的具体操作,所有调整都由 Controller 完成。

1. 在 ZooKeeper 上创建/admin/reassign_partitions节点,并存入目标 Partition 列表及其对应的目标 AR 列表。

2. Controller 注册在/admin/reassign_partitions上的 Watch 被 fire,Controller 获取该列表。

3. 对列表中的所有 Partition,Controller 会做如下操作:

  • 启动RAR - AR中的 Replica,即新分配的 Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
  • 等待新的 Replica 与 Leader 同步
  • 如果 Leader 不在 RAR 中,从 RAR 中选出新的 Leader
  • 停止并删除AR - RAR中的 Replica,即不再需要的 Replica
  • 删除/admin/reassign_partitions节点

用法

该工具有三种使用模式

  • generate 模式,给定需要重新分配的 Topic,自动生成 reassign plan(并不执行)
  • execute 模式,根据指定的 reassign plan 重新分配 Partition
  • verify 模式,验证重新分配 Partition 是否成功

下面这个例子将使用该工具将 Topic 的所有 Partition 重新分配到 Broker 4/5/6/7 上,步骤如下:

1. 使用 generate 模式,生成 reassign plan

指定需要重新分配的 Topic ({“topics”:[{“topic”:“topic1”}],“version”:1}),并存入/tmp/topics-to-move.json文件中,然后执行如下命令

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--topics-to-move-json-file /tmp/topics-to-move.json
--broker-list "4,5,6,7" --generate

结果如下图所示

2. 使用 execute 模式,执行 reassign plan

将上一步生成的 reassignment plan 存入/tmp/reassign-plan.json文件中,并执行

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --execute

此时,ZooKeeper 上/admin/reassign_partitions节点被创建,且其值与/tmp/reassign-plan.json文件的内容一致。

3. 使用 verify 模式,验证 reassign 是否完成

执行 verify 命令

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --verify

结果如下所示,从图中可看出 topic1 的所有 Partititon 都根据 reassign plan 重新分配成功。

接下来用 Topic Tool 再次验证。

复制代码
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1

结果如下图所示,从图中可看出 topic1 的所有 Partition 都被重新分配到 Broker 4/5/6/7,且每个 Partition 的 AR 与 reassign plan 一致。

需要说明的是,在使用 execute 之前,并不一定要使用 generate 模式自动生成 reassign plan,使用 generate 模式只是为了方便。事实上,某些场景下,generate 模式生成的 reassign plan 并不一定能满足需求,此时用户可以自己设置 reassign plan。

State Change Log Merge Tool

用途

该工具旨在从整个集群的 Broker 上收集状态改变日志,并生成一个集中的格式化的日志以帮助诊断状态改变相关的故障。每个 Broker 都会将其收到的状态改变相关的的指令存于名为state-change.log的日志文件中。某些情况下,Partition 的 Leader election 可能会出现问题,此时我们需要对整个集群的状态改变有个全局的了解从而诊断故障并解决问题。该工具将集群中相关的state-change.log日志按时间顺序合并,同时支持用户输入时间范围和目标 Topic 及 Partition 作为过滤条件,最终将格式化的结果输出。

用法

复制代码
bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
--logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log
--topic topic1 --partitions 0,1,2,3,4,5,6,7

作者简介

郭俊(Jason),硕士,从事大数据平台研发工作,精通 Kafka 等分布式消息系统,Storm 等流式处理系统及数据库性能调优。

新浪微博:郭俊_Jason

微信: habren

公众号:大数据架构

个人博客: http://www.jasongj.com

下篇预告

下篇文章将详细介绍 Kafka High Level Consumer,Consumer Group,Consumer Group Rebalance 和 Simple Consumer,以及未来新版本中对 Kafka High Level Consumer 的重新设计——使用 Consumer Controller 解决 Split Brain 和 Herd 等问题。


感谢郭蕾对本文的策划和审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。

2015-06-11 00:0325853

评论

发布
暂无评论
发现更多内容

率先布局 RWA 赛道,PoseiSwap 成为最具先进性的 DEX

股市老人

Linux系统下如何在防火墙开放指定端口

百度搜索:蓝易云

Linux 运维 服务器 云服务器 运维、

AI 2.0来袭,AIGC如何重新定义招聘?

用友BIP

招聘

架构师的核心工作:价值放大

高鹏

Java 深度思考 架构 架构师

率先布局 RWA 赛道,PoseiSwap 成为最具先进性的 DEX

大瞿科技

Java程序性能分析:内存

javalover123

Java 性能优化 性能 性能分析 性能监控

业务系统技术债治理终极指南

高鹏

Java 架构

支付宝小程序云产品发布会:6大产品20项亮点全揭秘

陈橘又青

小程序 云开发

机器学习洞察 | 分布式训练让机器学习更加快速准确

亚马逊云科技 (Amazon Web Services)

机器学习 分布式

websocket和http有什么不同?以及websocket协议如何实现?

百度搜索:蓝易云

Linux 运维 HTTP websocket

Boundless Hackathon @Stanford 主题黑客松活动闭幕,一文回顾

大瞿科技

PyTorch模型创建与nn.Module

timerring

PyTorch

率先布局 RWA 赛道,PoseiSwap 成为最具先进性的 DEX

EOSdreamer111

C++中set的用法学习

攻城狮Wayne

代码随想录 Day13 - 栈与队列(下)

jjn0703

Linux系统Tomcat安装与配置。

百度搜索:蓝易云

tomcat Linux 运维 云服务器 云服务器ECS

AI写代码靠谱吗?

石云升

ChatGPT AI编码

一些有用的资料

Joseph295

什么是区块链?| 社区征文

TiAmo

区块链 以太坊 年中技术盘点

10分钟入门Flink--了解Flink

不焦躁的程序员

Java 大数据 flink

华为云CodeArts DevSecOps系列插件——助力更高效的软件研发

华为云PaaS服务小智

华为 软件开发 华为云 华为开发者大会

Istio与Mcp Server服务器讲解与搭建演示

谐云

istio

Linux查看进程PID的方法?

百度搜索:蓝易云

Linux 运维 云服务器 PID 虚拟主机

Ubuntu、CentOS修改时区、设置24小时时间格式教程。

百度搜索:蓝易云

云计算 Linux ubuntu centos 运维

vue基础知识

猫九

Vue

langchain:Prompt在手,天下我有

程序那些事

#LangChain AI大语言模型 大语言模型

OpenTiny 前端组件库正式开源啦!面向未来,为开发者而生

OpenTiny社区

开源 前端 UI组件库

权衡矩阵-《敏捷实战-破解敏捷落地的60个难题》读后感

Bruce Talk

Centos8升级到Centos 8 stream教程。

百度搜索:蓝易云

Linux 运维 服务器 云服务器 ECS

MyBatis查询所有

猫九

mybatis

D3可视化

猫九

D3

Kafka设计解析(三):Kafka High Availability (下)_语言 & 开发_郭俊_InfoQ精选文章