东亚银行、岚图汽车带你解锁 AIGC 时代的数字化人才培养各赛道新模式! 了解详情
写点什么

Kafka 设计解析(三):Kafka High Availability (下)

  • 2015-06-11
  • 本文字数:9712 字

    阅读完需:约 32 分钟

Kafka 是由 LinkedIn 开发的一个分布式的消息系统,使用 Scala 编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Apache Storm、Spark 都支持与 Kafka 集成。InfoQ 一直在紧密关注Kafka 的应用以及发展,“Kafka 剖析”专栏将会从架构设计、实现、应用场景、性能等方面深度解析Kafka。

本文在上篇文章基础上,更加深入讲解了Kafka 的HA 机制,主要阐述了HA 相关各种场景,如Broker failover、Controller failover、Topic 创建/ 删除、Broker 启动、Follower 从Leader fetch 数据等详细处理过程。同时介绍了Kafka 提供的与Replication 相关的工具,如重新分配Partition 等。

Broker Failover 过程

Controller 对 Broker failure 的处理过程

  1. Controller 在 ZooKeeper 的/brokers/ids节点上注册 Watch。一旦有 Broker 宕机(本文用宕机代表任何让 Kafka 认为其 Broker die 的情景,包括但不限于机器断电,网络不可用,GC 导致的 Stop The World,进程 crash 等),其在 ZooKeeper 对应的 Znode 会自动被删除,ZooKeeper 会 fire Controller 注册的 Watch,Controller 即可获取最新的幸存的 Broker 列表。
  2. Controller 决定 set_p,该集合包含了宕机的所有 Broker 上的所有 Partition。
  3. 对 set_p 中的每一个 Partition: 3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该 Partition 当前的 ISR。

3.2 决定该 Partition 的新 Leader。如果当前 ISR 中有至少一个 Replica 还幸存,则选择其中一个作为新 Leader,新的 ISR 则包含当前 ISR 中所有幸存的 Replica。否则选择该 Partition 中任意一个幸存的 Replica 作为新的 Leader 以及 ISR(该场景下可能会有潜在的数据丢失)。如果该 Partition 的所有 Replica 都宕机了,则将新的 Leader 设置为 -1。

3.3 将新的 Leader,ISR 和新的leader_epochcontroller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有 Controller 版本在 3.1 至 3.3 的过程中无变化时才会执行,否则跳转到 3.1。
4. 直接通过 RPC 向 set_p 相关的 Broker 发送 LeaderAndISRRequest 命令。Controller 可以在一个 RPC 操作中发送多个命令从而提高效率。 Broker failover 顺序图如下所示。

LeaderAndIsrRequest 结构如下

LeaderAndIsrResponse 结构如下

创建 / 删除 Topic

  1. Controller 在 ZooKeeper 的/brokers/topics节点上注册 Watch,一旦某个 Topic 被创建或删除,则 Controller 会通过 Watch 得到新创建 / 删除的 Topic 的 Partition/Replica 分配。
  2. 对于删除 Topic 操作,Topic 工具会将该 Topic 名字存于/admin/delete_topics。若delete.topic.enable为 true,则 Controller 注册在/admin/delete_topics上的 Watch 被 fire,Controller 通过回调向对应的 Broker 发送 StopReplicaRequest,若为 false 则 Controller 不会在/admin/delete_topics上注册 Watch,也就不会对该事件作出反应。
  3. 对于创建 Topic 操作,Controller 从/brokers/ids读取当前所有可用的 Broker 列表,对于 set_p 中的每一个 Partition: 3.1 从分配给该 Partition 的所有 Replica(称为 AR)中任选一个可用的 Broker 作为新的 Leader,并将 AR 设置为新的 ISR(因为该 Topic 是新创建的,所以 AR 中所有的 Replica 都没有数据,可认为它们都是同步的,也即都在 ISR 中,任意一个 Replica 都可作为 Leader)

3.2 将新的 Leader 和 ISR 写入/brokers/topics/[topic]/partitions/[partition]
4. 直接通过 RPC 向相关的 Broker 发送 LeaderAndISRRequest。 创建 Topic 顺序图如下所示。

Broker 响应请求流程

Broker 通过kafka.network.SocketServer及相关模块接受各种请求并作出响应。整个网络通信模块基于 Java NIO 开发,并采用 Reactor 模式,其中包含 1 个 Acceptor 负责接受客户请求,N 个 Processor 负责读写数据,M 个 Handler 处理业务逻辑。

Acceptor 的主要职责是监听并接受客户端(请求发起方,包括但不限于 Producer,Consumer,Controller,Admin Tool)的连接请求,并建立和客户端的数据传输通道,然后为该客户端指定一个 Processor,至此它对该客户端该次请求的任务就结束了,它可以去响应下一个客户端的连接请求了。其核心代码如下。

Processor 主要负责从客户端读取数据并将响应返回给客户端,它本身并不处理具体的业务逻辑,并且其内部维护了一个队列来保存分配给它的所有 SocketChannel。Processor 的 run 方法会循环从队列中取出新的 SocketChannel 并将其SelectionKey.OP_READ注册到 selector 上,然后循环处理已就绪的读(请求)和写(响应)。Processor 读取完数据后,将其封装成 Request 对象并将其交给 RequestChannel。

RequestChannel 是 Processor 和 KafkaRequestHandler 交换数据的地方,它包含一个队列 requestQueue 用来存放 Processor 加入的 Request,KafkaRequestHandler 会从里面取出 Request 来处理;同时它还包含一个 respondQueue,用来存放 KafkaRequestHandler 处理完 Request 后返还给客户端的 Response。

Processor 会通过 processNewResponses 方法依次将 requestChannel 中 responseQueue 保存的 Response 取出,并将对应的SelectionKey.OP_WRITE事件注册到 selector 上。当 selector 的 select 方法返回时,对检测到的可写通道,调用 write 方法将 Response 返回给客户端。

KafkaRequestHandler 循环从 RequestChannel 中取 Request 并交给kafka.server.KafkaApis处理具体的业务逻辑。

LeaderAndIsrRequest 响应过程

对于收到的 LeaderAndIsrRequest,Broker 主要通过 ReplicaManager 的 becomeLeaderOrFollower 处理,流程如下:

  1. 若请求中 controllerEpoch 小于当前最新的 controllerEpoch,则直接返回 ErrorMapping.StaleControllerEpochCode。
  2. 对于请求中 partitionStateInfos 中的每一个元素,即((topic, partitionId), partitionStateInfo): 2.1 若 partitionStateInfo 中的 leader epoch 大于当前 ReplicManager 中存储的 (topic, partitionId) 对应的 partition 的 leader epoch,则:

2.1.1 若当前 brokerid(或者说 replica id)在 partitionStateInfo 中,则将该 partition 及 partitionStateInfo 存入一个名为 partitionState 的 HashMap 中

2.1.2 否则说明该 Broker 不在该 Partition 分配的 Replica list 中,将该信息记录于 log 中

2.2 否则将相应的 Error code(ErrorMapping.StaleLeaderEpochCode)存入 Response 中
3. 筛选出 partitionState 中 Leader 与当前 Broker ID 相等的所有记录存入 partitionsTobeLeader 中,其它记录存入 partitionsToBeFollower 中。
4. 若 partitionsTobeLeader 不为空,则对其执行 makeLeaders 方。
5. 若 partitionsToBeFollower 不为空,则对其执行 makeFollowers 方法。
6. 若 highwatermak 线程还未启动,则将其启动,并将 hwThreadInitialized 设为 true。
7. 关闭所有 Idle 状态的 Fetcher。

LeaderAndIsrRequest 处理过程如下图所示

Broker 启动过程

Broker 启动后首先根据其 ID 在 ZooKeeper 的/brokers/idszonde 下创建临时子节点( Ephemeral node ),创建成功后 Controller 的 ReplicaStateMachine 注册其上的 Broker Change Watch 会被 fire,从而通过回调 KafkaController.onBrokerStartup 方法完成以下步骤:

  1. 向所有新启动的 Broker 发送 UpdateMetadataRequest,其定义如下。
  2. 将新启动的 Broker 上的所有 Replica 设置为 OnlineReplica 状态,同时这些 Broker 会为这些 Partition 启动 high watermark 线程。
  3. 通过 partitionStateMachine 触发 OnlinePartitionStateChange。

Controller Failover

Controller 也需要 Failover。每个 Broker 都会在 Controller Path (/controller) 上注册一个 Watch。当前 Controller 失败时,对应的 Controller Path 会自动消失(因为它是 Ephemeral Node),此时该 Watch 被 fire,所有“活”着的 Broker 都会去竞选成为新的 Controller(创建新的 Controller Path),但是只会有一个竞选成功(这点由 ZooKeeper 保证)。竞选成功者即为新的 Leader,竞选失败者则重新在新的 Controller Path 上注册 Watch。因为 ZooKeeper 的 Watch 是一次性的,被 fire 一次之后即失效,所以需要重新注册。

Broker 成功竞选为新 Controller 后会触发 KafkaController.onControllerFailover 方法,并在该方法中完成如下操作:

  1. 读取并增加 Controller Epoch。
  2. 在 ReassignedPartitions Patch(/admin/reassign_partitions) 上注册 Watch。
  3. 在 PreferredReplicaElection Path(/admin/preferred_replica_election) 上注册 Watch。
  4. 通过 partitionStateMachine 在 Broker Topics Patch(/brokers/topics) 上注册 Watch。
  5. delete.topic.enable设置为 true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 Watch。
  6. 通过 replicaStateMachine 在 Broker Ids Patch(/brokers/ids) 上注册 Watch。
  7. 初始化 ControllerContext 对象,设置当前所有 Topic,“活”着的 Broker 列表,所有 Partition 的 Leader 及 ISR 等。
  8. 启动 replicaStateMachine 和 partitionStateMachine。
  9. 将 brokerState 状态设置为 RunningAsController。
  10. 将每个 Partition 的 Leadership 信息发送给所有“活”着的 Broker。
  11. auto.leader.rebalance.enable配置为 true(默认值是 true),则启动 partition-rebalance 线程。
  12. delete.topic.enable设置为 true 且 Delete Topic Patch(/admin/delete_topics) 中有值,则删除相应的 Topic。

Partition 重新分配

管理工具发出重新分配 Partition 请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发 ReassignedPartitionsIsrChangeListener,从而通过执行回调函数 KafkaController.onPartitionReassignment 来完成以下操作:

  1. 将 ZooKeeper 中的 AR(Current Assigned Replicas)更新为 OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  2. 强制更新 ZooKeeper 中的 leader epoch,向 AR 中的每个 Replica 发送 LeaderAndIsrRequest。
  3. 将 RAR - OAR 中的 Replica 设置为 NewReplica 状态。
  4. 等待直到 RAR 中所有的 Replica 都与其 Leader 同步。
  5. 将 RAR 中所有的 Replica 都设置为 OnlineReplica 状态。
  6. 将 Cache 中的 AR 设置为 RAR。
  7. 若 Leader 不在 RAR 中,则从 RAR 中重新选举出一个新的 Leader 并发送 LeaderAndIsrRequest。若新的 Leader 不是从 RAR 中选举而出,则还要增加 ZooKeeper 中的 leader epoch。
  8. 将 OAR - RAR 中的所有 Replica 设置为 OfflineReplica 状态,该过程包含两部分。第一,将 ZooKeeper 上 ISR 中的 OAR - RAR 移除并向 Leader 发送 LeaderAndIsrRequest 从而通知这些 Replica 已经从 ISR 中移除;第二,向 OAR - RAR 中的 Replica 发送 StopReplicaRequest 从而停止不再分配给该 Partition 的 Replica。
  9. 将 OAR - RAR 中的所有 Replica 设置为 NonExistentReplica 状态从而将其从磁盘上删除。
  10. 将 ZooKeeper 中的 AR 设置为 RAR。
  11. 删除/admin/reassign_partition

注意:最后一步才将 ZooKeeper 中的 AR 更新,因为这是唯一一个持久存储 AR 的地方,如果 Controller 在这一步之前 crash,新的 Controller 仍然能够继续完成该过程。

以下是 Partition 重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition 重新分配过程中 ZooKeeper 中的 AR 和 Leader/ISR 路径如下

AR leader/isr Sttep {1,2,3} 1/{1,2,3} (initial state) {1,2,3,4,5,6} 1/{1,2,3} (step 2) {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4) {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7) {1,2,3,4,5,6} 4/{4,5,6} (step 8) {4,5,6} 4/{4,5,6} (step 10)### Follower 从 Leader Fetch 数据

Follower 通过向 Leader 发送 FetchRequest 获取消息,FetchRequest 结构如下

从 FetchRequest 的结构可以看出,每个 Fetch 请求都要指定最大等待时间和最小获取字节数,以及由 TopicAndPartition 和 PartitionFetchInfo 构成的 Map。实际上,Follower 从 Leader 数据和 Consumer 从 Broker Fetch 数据,都是通过 FetchRequest 请求完成,所以在 FetchRequest 结构中,其中一个字段是 clientID,并且其默认值是 ConsumerConfig.DefaultClientId。

Leader 收到 Fetch 请求后,Kafka 通过 KafkaApis.handleFetchRequest 响应该请求,响应过程如下:

  1. replicaManager 根据请求读出数据存入 dataRead 中。
  2. 如果该请求来自 Follower 则更新其相应的 LEO(log end offset)以及相应 Partition 的 High Watermark
  3. 根据 dataRead 算出可读消息长度(单位为字节)并存入 bytesReadable 中。
  4. 满足下面 4 个条件中的 1 个,则立即将相应的数据返回
  • Fetch 请求不希望等待,即 fetchRequest.macWait <= 0
  • Fetch 请求不要求一定能取到消息,即 fetchRequest.numPartitions <= 0,也即 requestInfo 为空
  • 有足够的数据可供返回,即 bytesReadable >= fetchRequest.minBytes
  • 读取数据时发生异常
  1. 若不满足以上 4 个条件,FetchRequest 将不会立即返回,并将该请求封装成 DelayedFetch。检查该 DeplayedFetch 是否满足,若满足则返回请求,否则将该请求加入 Watch 列表

Leader 通过以 FetchResponse 的形式将消息返回给 Follower,FetchResponse 结构如下

Replication 工具

Topic Tool

$KAFKA_HOME/bin/kafka-topics.sh,该工具可用于创建、删除、修改、查看某个 Topic,也可用于列出所有 Topic。另外,该工具还可修改某个 Topic 的以下配置。

复制代码
unclean.leader.election.enable
delete.retention.ms
segment.jitter.ms
retention.ms
flush.ms
segment.bytes
flush.messages
segment.ms
retention.bytes
cleanup.policy
segment.index.bytes
min.cleanable.dirty.ratio
max.message.bytes
file.delete.delay.ms
min.insync.replicas
index.interval.bytes

Replica Verification Tool

$KAFKA_HOME/bin/kafka-replica-verification.sh,该工具用来验证所指定的一个或多个 Topic 下每个 Partition 对应的所有 Replica 是否都同步。可通过topic-white-list这一参数指定所需要验证的所有 Topic,支持正则表达式。

Preferred Replica Leader Election Tool

用途

有了 Replication 机制后,每个 Partition 可能有多个备份。某个 Partition 的 Replica 列表叫作 AR(Assigned Replicas),AR 中的第一个 Replica 即为“Preferred Replica”。创建一个新的 Topic 或者给已有 Topic 增加 Partition 时,Kafka 保证 Preferred Replica 被均匀分布到集群中的所有 Broker 上。理想情况下,Preferred Replica 会被选为 Leader。以上两点保证了所有 Partition 的 Leader 被均匀分布到了集群当中,这一点非常重要,因为所有的读写操作都由 Leader 完成,若 Leader 分布过于集中,会造成集群负载不均衡。但是,随着集群的运行,该平衡可能会因为 Broker 的宕机而被打破,该工具就是用来帮助恢复 Leader 分配的平衡。

事实上,每个 Topic 从失败中恢复过来后,它默认会被设置为 Follower 角色,除非某个 Partition 的 Replica 全部宕机,而当前 Broker 是该 Partition 的 AR 中第一个恢复回来的 Replica。因此,某个 Partition 的 Leader(Preferred Replica)宕机并恢复后,它很可能不再是该 Partition 的 Leader,但仍然是 Preferred Replica。

原理

1. 在 ZooKeeper 上创建/admin/preferred_replica_election节点,并存入需要调整 Preferred Replica 的 Partition 信息。

2. Controller 一直 Watch 该节点,一旦该节点被创建,Controller 会收到通知,并获取该内容。

3. Controller 读取 Preferred Replica,如果发现该 Replica 当前并非是 Leader 并且它在该 Partition 的 ISR 中,Controller 向该 Replica 发送 LeaderAndIsrRequest,使该 Replica 成为 Leader。如果该 Replica 当前并非是 Leader,且不在 ISR 中,Controller 为了保证没有数据丢失,并不会将其设置为 Leader。

用法

复制代码
$KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

在包含 8 个 Broker 的 Kafka 集群上,创建 1 个名为 topic1,replication-factor 为 3,Partition 数为 8 的 Topic,使用如下命令查看其 Partition/Replica 分布。

复制代码
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181

查询结果如下图所示,从图中可以看到,Kafka 将所有 Replica 均匀分布到了整个集群,并且 Leader 也均匀分布。

手动停止部分 Broker,topic1 的 Partition/Replica 分布如下图所示。从图中可以看到,由于 Broker 1/2/4 都被停止,Partition 0 的 Leader 由原来的 1 变为 3,Partition 1 的 Leader 由原来的 2 变为 5,Partition 2 的 Leader 由原来的 3 变为 6,Partition 3 的 Leader 由原来的 4 变为 7。

再重新启动 ID 为 1 的 Broker,topic1 的 Partition/Replica 分布如下。可以看到,虽然 Broker 1 已经启动(Partition 0 和 Partition5 的 ISR 中有 1),但是 1 并不是任何一个 Parititon 的 Leader,而 Broker 5/6/7 都是 2 个 Partition 的 Leader,即 Leader 的分布不均衡——一个 Broker 最多是 2 个 Partition 的 Leader,而最少是 0 个 Partition 的 Leader。

运行该工具后,topic1 的 Partition/Replica 分布如下图所示。由图可见,除了 Partition 1 和 Partition 3 由于 Broker 2 和 Broker 4 还未启动,所以其 Leader 不是其 Preferred Repliac 外,其它所有 Partition 的 Leader 都是其 Preferred Replica。同时,与运行该工具前相比,Leader 的分配更均匀——一个 Broker 最多是 2 个 Parittion 的 Leader,最少是 1 个 Partition 的 Leader。

启动 Broker 2 和 Broker 4,Leader 分布与上一步相比并未变化,如下图所示。

再次运行该工具,所有 Partition 的 Leader 都由其 Preferred Replica 承担,Leader 分布更均匀——每个 Broker 承担 1 个 Partition 的 Leader 角色。

除了手动运行该工具使 Leader 分配均匀外,Kafka 还提供了自动平衡 Leader 分配的功能,该功能可通过将auto.leader.rebalance.enable设置为 true 开启,它将周期性检查 Leader 分配是否平衡,若不平衡度超过一定阈值则自动由 Controller 尝试将各 Partition 的 Leader 设置为其 Preferred Replica。检查周期由leader.imbalance.check.interval.seconds指定,不平衡度阈值由leader.imbalance.per.broker.percentage指定。

Kafka Reassign Partitions Tool

用途

该工具的设计目标与 Preferred Replica Leader Election Tool 有些类似,都旨在促进 Kafka 集群的负载均衡。不同的是,Preferred Replica Leader Election 只能在 Partition 的 AR 范围内调整其 Leader,使 Leader 分布均匀,而该工具还可以调整 Partition 的 AR。

Follower 需要从 Leader Fetch 数据以保持与 Leader 同步,所以仅仅保持 Leader 分布的平衡对整个集群的负载均衡来说是不够的。另外,生产环境下,随着负载的增大,可能需要给 Kafka 集群扩容。向 Kafka 集群中增加 Broker 非常简单方便,但是对于已有的 Topic,并不会自动将其 Partition 迁移到新加入的 Broker 上,此时可用该工具达到此目的。某些场景下,实际负载可能远小于最初预期负载,此时可用该工具将分布在整个集群上的 Partition 重装分配到某些机器上,然后可以停止不需要的 Broker 从而实现节约资源的目的。

需要说明的是,该工具不仅可以调整 Partition 的 AR 位置,还可调整其 AR 数量,即改变该 Topic 的 replication factor。

原理

该工具只负责将所需信息存入 ZooKeeper 中相应节点,然后退出,不负责相关的具体操作,所有调整都由 Controller 完成。

1. 在 ZooKeeper 上创建/admin/reassign_partitions节点,并存入目标 Partition 列表及其对应的目标 AR 列表。

2. Controller 注册在/admin/reassign_partitions上的 Watch 被 fire,Controller 获取该列表。

3. 对列表中的所有 Partition,Controller 会做如下操作:

  • 启动RAR - AR中的 Replica,即新分配的 Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
  • 等待新的 Replica 与 Leader 同步
  • 如果 Leader 不在 RAR 中,从 RAR 中选出新的 Leader
  • 停止并删除AR - RAR中的 Replica,即不再需要的 Replica
  • 删除/admin/reassign_partitions节点

用法

该工具有三种使用模式

  • generate 模式,给定需要重新分配的 Topic,自动生成 reassign plan(并不执行)
  • execute 模式,根据指定的 reassign plan 重新分配 Partition
  • verify 模式,验证重新分配 Partition 是否成功

下面这个例子将使用该工具将 Topic 的所有 Partition 重新分配到 Broker 4/5/6/7 上,步骤如下:

1. 使用 generate 模式,生成 reassign plan

指定需要重新分配的 Topic ({“topics”:[{“topic”:“topic1”}],“version”:1}),并存入/tmp/topics-to-move.json文件中,然后执行如下命令

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--topics-to-move-json-file /tmp/topics-to-move.json
--broker-list "4,5,6,7" --generate

结果如下图所示

2. 使用 execute 模式,执行 reassign plan

将上一步生成的 reassignment plan 存入/tmp/reassign-plan.json文件中,并执行

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --execute

此时,ZooKeeper 上/admin/reassign_partitions节点被创建,且其值与/tmp/reassign-plan.json文件的内容一致。

3. 使用 verify 模式,验证 reassign 是否完成

执行 verify 命令

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --verify

结果如下所示,从图中可看出 topic1 的所有 Partititon 都根据 reassign plan 重新分配成功。

接下来用 Topic Tool 再次验证。

复制代码
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1

结果如下图所示,从图中可看出 topic1 的所有 Partition 都被重新分配到 Broker 4/5/6/7,且每个 Partition 的 AR 与 reassign plan 一致。

需要说明的是,在使用 execute 之前,并不一定要使用 generate 模式自动生成 reassign plan,使用 generate 模式只是为了方便。事实上,某些场景下,generate 模式生成的 reassign plan 并不一定能满足需求,此时用户可以自己设置 reassign plan。

State Change Log Merge Tool

用途

该工具旨在从整个集群的 Broker 上收集状态改变日志,并生成一个集中的格式化的日志以帮助诊断状态改变相关的故障。每个 Broker 都会将其收到的状态改变相关的的指令存于名为state-change.log的日志文件中。某些情况下,Partition 的 Leader election 可能会出现问题,此时我们需要对整个集群的状态改变有个全局的了解从而诊断故障并解决问题。该工具将集群中相关的state-change.log日志按时间顺序合并,同时支持用户输入时间范围和目标 Topic 及 Partition 作为过滤条件,最终将格式化的结果输出。

用法

复制代码
bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
--logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log
--topic topic1 --partitions 0,1,2,3,4,5,6,7

作者简介

郭俊(Jason),硕士,从事大数据平台研发工作,精通 Kafka 等分布式消息系统,Storm 等流式处理系统及数据库性能调优。

新浪微博:郭俊_Jason

微信: habren

公众号:大数据架构

个人博客: http://www.jasongj.com

下篇预告

下篇文章将详细介绍 Kafka High Level Consumer,Consumer Group,Consumer Group Rebalance 和 Simple Consumer,以及未来新版本中对 Kafka High Level Consumer 的重新设计——使用 Consumer Controller 解决 Split Brain 和 Herd 等问题。


感谢郭蕾对本文的策划和审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。

2015-06-11 00:0325873

评论

发布
暂无评论
发现更多内容

教你如何使用Zig实现Cmpp协议

华为云开发者联盟

云计算 华为云 华为云开发者联盟 华为云短信服务 企业号2024年4月PK榜

云服务器干嘛的?带你掌握云计算的优势

一只扑棱蛾子

云服务器

【荣誉】第七在线出席ToB商业头条行业大会 斩获创新力产品奖

第七在线

去哪儿完成鸿蒙原生应用Beta版本开发,带来一站式在线旅行体验

最新动态

一文读懂模块化赛道新的头部公链Meta Earth

大瞿科技

行云防水堡-打造企业数据安全新防线

行云管家

网络安全 数据安全 防水堡

提升团队工程交付能力,从“看见”工程活动和研发模式开始

阿里云云效

阿里云 云原生 云效

天翼云超大规模高性能云基础底座、“息壤”获国资委权威认可!

编程猫

我们是如何测试人工智能的(六)推荐系统拆解

测试人

人工智能 软件测试 自动化测试 测试开发

解锁ETLCloud中Kettle的用法

RestCloud

kettle 数据同步 ETL 数据集成

做跨境电商,为什么要建独立站

Noah

人工智能,应该如何测试?(四)模型全生命周期流程与测试图

霍格沃兹测试开发学社

人工智能,应该如何测试?(八)企业级智能客服测试大模型 RAG

霍格沃兹测试开发学社

一文读懂模块化赛道新的头部公链Meta Earth

加密眼界

人工智能,应该如何测试?(六)推荐系统拆解

霍格沃兹测试开发学社

提升团队工程交付能力,从“看见”工程活动和研发模式开始

阿里巴巴云原生

阿里云 云原生 云效

企业级依赖管理: 深入解读 Maven BOM

LightGao

maven 设计模式 架构设计 软件系统 java 架构

企业智能体(Agent)来袭!拥有“无限可能”的数智化AI底座

行云创新

AI 智能体 agent

日志系统:一条SQL更新语句是如何执行的?

TimeFriends

为什么中小企业普遍选择IT运维外包了?

Ogcloud

IT运维 IT外包 IT外包公司 IT外包服务 IT运维外包

KaiwuDB 成功入选《2023 ToB 行业影响力价值榜 · 创新力产品榜》

KaiwuDB

数据库

广州等级保护测评公司一览表2024

行云管家

等保 堡垒机 等级保护 等保测评

2024年LED显示屏租赁屏市场

Dylan

商业 LED显示屏 全彩LED显示屏 led显示屏厂家 舞台表演

人工智能,应该如何测试?(七)大模型客服系统测试

霍格沃兹测试开发学社

DACI决策框架,给低效能企业一个机会

填空时光

项目管理 效能提升 效能工具 决策管理

亚马逊店铺引流:海外云手机的利用方法

Ogcloud

云手机 海外云手机 云手机海外版 国外云手机 美国云手机

思考-使用JSON结构映射业务数据与数据库表结构

alexgaoyh

json 数据库 系统设计 映射

BSN-DID研究--主题二:发证方函数

BSN研习社

区块链 BSN did

5个为什么要做外贸网站推广的理由

九凌网络

IT外包服务助推企业产业融通

Ogcloud

IT IT外包 IT外包公司 IT外包服务 IT外包企业

Sam Altman 联手苹果前首席设计官打造 AI 设备;特斯拉将推出无人驾驶出租车丨 RTE 开发者日报 Vol.178

声网

Kafka设计解析(三):Kafka High Availability (下)_语言 & 开发_郭俊_InfoQ精选文章