阿里云「飞天发布时刻」2024来啦!新产品、新特性、新能力、新方案,等你来探~ 了解详情
写点什么

Apache Kafka:下一代分布式消息系统

  • 2014-06-11
  • 本文字数:6236 字

    阅读完需:约 20 分钟

简介

Apache Kafka 是分布式发布 - 订阅消息系统。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Apache Kafka 与传统消息系统相比,有以下不同:

  • 它被设计为一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;
  • 它将消息持久化到磁盘,因此可用于批量消费,例如 ETL ,以及实时应用程序。

本文我将重点介绍 Apache Kafka 的架构、特性和特点,帮助我们理解 Kafka 为何比传统消息服务更好。

我将比较 Kafak 和传统消息服务 RabbitMQ 、Apache ActiveMQ 的特点,讨论一些 Kafka 优于传统消息服务的场景。在最后一节,我们将探讨一个进行中的示例应用,展示 Kafka 作为消息服务器的用途。这个示例应用的完整源代码在 GitHub 。关于它的详细讨论在本文的最后一节。

架构

首先,我介绍一下 Kafka 的基本概念。它的架构包括以下组件:

  • 话题(Topic)是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。
  • 生产者(Producer)是能够发布消息到话题的任何对象。
  • 已发布的消息保存在一组服务器中,它们被称为代理(Broker)或 Kafka 集群
  • 消费者可以订阅一个或多个话题,并从 Broker 拉数据,从而消费这些已发布的消息。

图 1:Kafka 生产者、消费者和代理环境

生产者可以选择自己喜欢的序列化方法对消息内容编码。为了提高效率,生产者可以在一个发布请求中发送一组消息。下面的代码演示了如何创建生产者并发送消息。

生产者示例代码:

复制代码
producer = new Producer(…);
message = new Message(“test message str”.getBytes());
set = new MessageSet(message);
producer.send(“topic1”, set);

为了订阅话题,消费者首先为话题创建一个或多个消息流。发布到该话题的消息将被均衡地分发到这些流。每个消息流为不断产生的消息提供了迭代接口。然后消费者迭代流中的每一条消息,处理消息的有效负载。与传统迭代器不同,消息流迭代器永不停止。如果当前没有消息,迭代器将阻塞,直到有新的消息发布到该话题。Kafka 同时支持点到点分发模型(Point-to-point delivery model),即多个消费者共同消费队列中某个消息的单个副本,以及发布 - 订阅模型(Publish-subscribe model),即多个消费者接收自己的消息副本。下面的代码演示了消费者如何使用消息。

消费者示例代码:

复制代码
streams[] = Consumer.createMessageStreams(“topic1”, 1)
for (message : streams[0]) {
bytes = message.payload();
// do something with the bytes
}

Kafka 的整体架构如图 2 所示。因为 Kafka 内在就是分布式的,一个 Kafka 集群通常包括多个代理。为了均衡负载,将话题分成多个分区,每个代理存储一或多个分区。多个生产者和消费者能够同时生产和获取消息。

图 2:Kafka 架构

Kafka 存储

Kafka 的存储布局非常简单。话题的每个分区对应一个逻辑日志。物理上,一个日志为相同大小的一组分段文件。每次生产者发布消息到一个分区,代理就将消息追加到最后一个段文件中。当发布的消息数量达到设定值或者经过一定的时间后,段文件真正写入磁盘中。写入完成后,消息公开给消费者。

与传统的消息系统不同,Kafka 系统中存储的消息没有明确的消息 Id。

消息通过日志中的逻辑偏移量来公开。这样就避免了维护配套密集寻址,用于映射消息 ID 到实际消息地址的随机存取索引结构的开销。消息 ID 是增量的,但不连续。要计算下一消息的 ID,可以在其逻辑偏移的基础上加上当前消息的长度。

消费者始终从特定分区顺序地获取消息,如果消费者知道特定消息的偏移量,也就说明消费者已经消费了之前的所有消息。消费者向代理发出异步拉请求,准备字节缓冲区用于消费。每个异步拉请求都包含要消费的消息偏移量。Kafka 利用 sendfile API 高效地从代理的日志段文件中分发字节给消费者。

图 3:Kafka 存储架构

Kafka 代理

与其它消息系统不同,Kafka 代理是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,代理完全不管。这种设计非常微妙,它本身包含了创新。

  • 从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka 创新性地解决了这个问题,它将一个简单的基于时间的 SLA 应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。
  • 这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。

ZooKeeper 与 Kafka

考虑一下有多个服务器的分布式系统,每台服务器都负责保存数据,在数据上执行操作。这样的潜在例子包括分布式搜索引擎、分布式构建系统或者已知的系统如 Apache Hadoop 。所有这些分布式系统的一个常见问题是,你如何在任一时间点确定哪些服务器活着并且在工作中。最重要的是,当面对这些分布式计算的难题,例如网络失败、带宽限制、可变延迟连接、安全问题以及任何网络环境,甚至跨多个数据中心时可能发生的错误时,你如何可靠地做这些事。这些正是 Apache ZooKeeper 所关注的问题,它是一个快速、高可用、容错、分布式的协调服务。你可以使用 ZooKeeper 构建可靠的、分布式的数据结构,用于群组成员、领导人选举、协同工作流和配置服务,以及广义的分布式数据结构如锁、队列、屏障(Barrier)和锁存器(Latch)。许多知名且成功的项目依赖于 ZooKeeper,其中包括 HBase、Hadoop 2.0、Solr Cloud、Neo4J、 Apache Blur (Incubating)和 Accumulo。

ZooKeeper 是一个分布式的、分层级的文件系统,能促进客户端间的松耦合,并提供最终一致的,类似于传统文件系统中文件和目录的 Znode 视图。它提供了基本的操作,例如创建、删除和检查 Znode 是否存在。它提供了事件驱动模型,客户端能观察特定 Znode 的变化,例如现有 Znode 增加了一个新的子节点。ZooKeeper 运行多个 ZooKeeper 服务器,称为Ensemble,以获得高可用性。每个服务器都持有分布式文件系统的内存复本,为客户端的读取请求提供服务。

图 4:ZooKeeper Ensemble 架构

上图 4 展示了典型的 ZooKeeper ensemble,一台服务器作为 Leader,其它作为 Follower。当 Ensemble 启动时,先选出 Leader,然后所有 Follower 复制 Leader 的状态。所有写请求都通过 Leader 路由,变更会广播给所有 Follower。变更广播被称为原子广播

Kafka 中 ZooKeeper 的用途:正如 ZooKeeper 用于分布式系统的协调和促进,Kafka 使用 ZooKeeper 也是基于相同的原因。ZooKeeper 用于管理、协调 Kafka 代理。每个 Kafka 代理都通过 ZooKeeper 协调其它 Kafka 代理。当 Kafka 系统中新增了代理或者某个代理故障失效时,ZooKeeper 服务将通知生产者和消费者。生产者和消费者据此开始与其它代理协调工作。Kafka 整体系统架构如图 5 所示。

图 5:Kafka 分布式系统的总体架构

Apache Kafka 对比其它消息服务

让我们了解一下使用 Apache Kafka 的两个项目,以对比其它消息服务。这两个项目分别是 LinkedIn 和我的项目:

LinkedIn 的研究

LinkedIn 团队做了个实验研究,对比Kafka 与Apache ActiveMQ V5.4 和RabbitMQ V2.4 的性能。他们使用ActiveMQ 默认的消息持久化库 Kahadb 。LinkedIn 在两台 Linux 机器上运行他们的实验,每台机器的配置为 8 核 2GHz、16GB 内存,6 个磁盘使用 RAID10。两台机器通过 1GB 网络连接。一台机器作为代理,另一台作为生产者或者消费者。

生产者测试

LinkedIn 团队在所有系统中配置代理,异步将消息刷入其持久化库。对每个系统,运行一个生产者,总共发布 1000 万条消息,每条消息 200 字节。Kafka 生产者以 1 和 50 批量方式发送消息。ActiveMQ 和 RabbitMQ 似乎没有简单的办法来批量发送消息,LinkedIn 假定它的批量值为 1。结果如下面的图 6 所示:

图 6:LinkedIn 的生产者性能实验结果

Kafka 性能要好很多的主要原因包括:

  • Kafka 不等待代理的确认,以代理能处理的最快速度发送消息。
  • Kafka 有更高效的存储格式。平均而言,Kafka 每条消息有 9 字节的开销,而 ActiveMQ 有 144 字节。其原因是 JMS 所需的沉重消息头,以及维护各种索引结构的开销。LinkedIn 注意到 ActiveMQ 一个最忙的线程大部分时间都在存取 B-Tree 以维护消息元数据和状态。

消费者测试

为了做消费者测试,LinkedIn 使用一个消费者获取总共 1000 万条消息。LinkedIn 让所有系统每次拉请求都预获取大约相同数量的数据,最多 1000 条消息或者 200KB。对 ActiveMQ 和 RabbitMQ,LinkedIn 设置消费者确认模型为自动。结果如图 7 所示。

图 7:LinkedIn 的消费者性能实验结果

Kafka 性能要好很多的主要原因包括:

  • Kafka 有更高效的存储格式;在 Kafka 中,从代理传输到消费者的字节更少。
  • ActiveMQ 和 RabbitMQ 两个容器中的代理必须维护每个消息的传输状态。LinkedIn 团队注意到其中一个 ActiveMQ 线程在测试过程中,一直在将 KahaDB 页写入磁盘。与此相反,Kafka 代理没有磁盘写入动作。最后,Kafka 通过使用 sendfile API 降低了传输开销。

目前,我正在工作的一个项目提供实时服务,从消息中快速并准确地提取场外交易市场(OTC)定价内容。这是一个非常重要的项目,处理近 25 种资产类别的财务信息,包括债券、贷款和 ABS(资产担保证券)。项目的原始信息来源涵盖了欧洲、北美、加拿大和拉丁美洲的主要金融市场领域。下面是这个项目的一些统计,说明了解决方案中包括高效的分布式消息服务是多么重要:

  • 每天处理的消息数量超过1,300,000
  • 每天解析的 OTC 价格数量超过12,000,000
  • 支持超过 25 种资产类别;
  • 每天解析的独立票据超过70,000

消息包含 PDF、Word 文档、Excel 及其它格式。OTC 定价也可能要从附件中提取。

由于传统消息服务器的性能限制,当处理大附件时,消息队列变得非常大,我们的项目面临严重的问题,JMSqueue 一天需要启动 2-3 次。重启 JMS 队列可能丢失队列中的全部消息。项目需要一个框架,不论解析器(消费者)的行为如何,都能够保住消息。Kafka 的特性非常适用于我们项目的需求。

当前项目具备的特性:

  1. 使用 Fetchmail 获取远程邮件消息,然后由 Procmail 过滤并处理,例如单独分发基于附件的消息。
  2. 每条消息从单独的文件获取,该文件被处理(读取和删除)为一条消息插入到消息服务器中。
  3. 消息内容从消息服务队列中获取,用于解析和提取信息。

示例应用

这个示例应用是基于我在项目中使用的原始应用修改后的版本。我已经删除日志的使用和多线程特性,使示例应用的工件尽量简单。示例应用的目的是展示如何使用 Kafka 生产者和消费者的 API。应用包括一个生产者示例(简单的生产者代码,演示Kafka 生产者API 用法并发布特定话题的消息),消费者示例(简单的消费者代码,用于演示Kafka 消费者API 的用法)以及消息内容生成 API(在特定路径下生成消息内容到文件的 API)。下图展示了各组件以及它们与系统中其它组件间的关系。

图 8:示例应用组件架构

示例应用的结构与 Kafka 源代码中的例子程序相似。应用的源代码包含 Java 源程序文件夹‘src’和’config’文件夹,后者包括几个配置文件和一些 Shell 脚本,用于执行示例应用。要运行示例应用,请参照 ReadMe.md 文件或 GitHub 网站 Wiki 页面的说明。

程序构建可以使用 Apache Maven ,定制也很容易。如果有人想修改或定制示例应用的代码,有几个 Kafka 构建脚本已经过修改,可用于重新构建示例应用代码。关于如何定制示例应用的详细描述已经放在项目 GitHub 的 Wiki 页面

现在,让我们看看示例应用的核心工件。

Kafka 生产者代码示例

复制代码
/**
* Instantiates a new Kafka producer.
*
* @param topic the topic
* @param directoryPath the directory path
*/
public KafkaMailProducer(String topic, String directoryPath) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
this.directoryPath = directoryPath;
}
public void run() {
Path dir = Paths.get(directoryPath);
try {
new WatchDir(dir).start();
new ReadDir(dir).start();
} catch (IOException e) {
e.printStackTrace();
}
}

上面的代码片断展示了 Kafka 生产者 API 的基本用法,例如设置生产者的属性,包括发布哪个话题的消息,可以使用哪个序列化类以及代理的相关信息。这个类的基本功能是从邮件目录读取邮件消息文件,然后作为消息发布到 Kafka 代理。目录通过 java.nio.WatchService 类监视,一旦新的邮件消息 Dump 到该目录,就会被立即读取并作为消息发布到 Kafka 代理。

Kafka 消费者代码示例

复制代码
public KafkaMailConsumer(String topic) {
consumer =
Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
/**
* Creates the consumer config.
*
* @return the consumer config
*/
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaMailProperties.zkConnect);
props.put("group.id", KafkaMailProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()));
}

上面的代码演示了基本的消费者 API。正如我们前面提到的,消费者需要设置消费的消息流。在 Run 方法中,我们进行了设置,并在控制台打印收到的消息。在我的项目中,我们将其输入到解析系统以提取 OTC 定价。

在当前的质量保证系统中,我们使用 Kafka 作为消息服务器用于概念验证(Proof of Concept,POC)项目,它的整体性能优于 JMS 消息服务。其中一个我们感到非常兴奋的特性是消息的再消费(re-consumption),这让我们的解析系统可以按照业务需求重新解析某些消息。基于 Kafka 这些很好的效果,我们正计划使用它,而不是用 Nagios 系统,去做日志聚合与分析。

总结

Kafka 是一种处理大量数据的新型系统。Kafka 基于拉的消费模型让消费者以自己的速度处理消息。如果处理消息时出现了异常,消费者始终可以选择再消费该消息。

关于作者

Abhishek Sharma是金融领域产品的自然语言处理(NLP)、机器学习和解析程序员。他为多个公司提供算法设计和解析开发。Abhishek 的兴趣包括分布式系统、自然语言处理和使用机器算法进行大数据分析。

关注 IT 趋势,承载前沿、深入、有温度的内容。感兴趣的读者可以搜索 ID:laocuixiabian,或者扫描下方二维码加关注。

查看英文原文: Apache Kafka: Next Generation Distributed Messaging System

公众号推荐:

跳进 AI 的奇妙世界,一起探索未来工作的新风貌!想要深入了解 AI 如何成为产业创新的新引擎?好奇哪些城市正成为 AI 人才的新磁场?《中国生成式 AI 开发者洞察 2024》由 InfoQ 研究中心精心打造,为你深度解锁生成式 AI 领域的最新开发者动态。无论你是资深研发者,还是对生成式 AI 充满好奇的新手,这份报告都是你不可错过的知识宝典。欢迎大家扫码关注「AI前线」公众号,回复「开发者洞察」领取。

2014-06-11 11:25130382

评论 1 条评论

发布
暂无评论
发现更多内容

第三代分布式数据库来了,真香!

OceanBase 数据库

数据库 开源 分布式 oceanbase

双11“剁手”后,你的包裹收到了吗?

OceanBase 数据库

数据库 开源 分布式 oceanbase

深入浅出,一文吃透mysql索引

微客鸟窝

MySQL 11月日更

[Pulsar] ChunkMessageID介绍及其原理

Zike Yang

Apache Pulsar 11月日更

Go 的 Panics 处理

baiyutang

golang 11月日更

dart系列之:dart中的异步编程

程序那些事

flutter dart 程序那些事 11月日更

推动数字化转型,OceanBase 助力保险企业创造新价值

OceanBase 数据库

数据库 开源 技术宅 oceanbase 分布式,

我理解的CPI和PPI

石云升

学习笔记 11月日更

家庭太阳能发电,何时能告别“两极分化”?

脑极体

.NET6新东西--Lambda优化

喵叔

11月日更

Prometheus 都可以采集那些指标?-- 常用 Exporter 合集

耳东@Erdong

内容合集

【Flutter 专题】03 图解第一个程序 Hello World!

阿策小和尚

Flutter 小菜 0 基础学习 Flutter Android 小菜鸟 11月日更

【Three.js】随着元宇宙开启WEB3D之路

devpoint

JavaScript WebGL 3D three.js 11月日更

共建全栈国产化解决方案 | OceanBase持续与合作伙伴推进测试互认

OceanBase 数据库

数据库 分布式 双十一 oceanbase

给面试官上一课:HTTPS是先进行TCP三次握手,再进行TLS四次握手

热爱java的分享家

Java 架构 程序人生 编程语言 经验分享

黑客常备20个工具,你知道几个?

喀拉峻

FFmpeg[5] - 将视频文件转码成MP4格式(FFmpeg转封装2)

liuzhen007

11月日更

【死磕Java并发】-----Java内存模型之总结

chenssy

11月日更 死磕 Java 死磕 Java 并发

前端开发:VS Code编辑器新建Vue文件自定义模板的方法

三掌柜

11月日更

Android C++系列:Linux文件系统(二)

轻口味

c++ android jni 11月日更

惨遭GitHub直接封杀的阿里P8手敲出来这份565页凤凰架构分布式手册,有何神奇之处

热爱java的分享家

Java 面试 程序人生 编程语言 凤凰架构

“敏捷版”全链路压测

阿里巴巴云原生

阿里云 云原生 全链路压测 PTS

Python Qt GUI设计:QLabel标签类(基础篇—11)

不脱发的程序猿

Python PyQt GUI设计 QLabel标签类

参与tdengine开源的方式

williamcai

fork git 学习

阿里大规模业务混部下的全链路资源隔离技术演进

阿里巴巴云原生

阿里云 云原生 资源隔离 ACK 混部

两个排序数组的中位数,“最”有技术含量的解法!

老表

Python 算法 LeetCode 11月日更

千万级学生管理系统的考试试卷存储方案设计

Beyond Ryan

度小满启动“小微加油站”,让低息服务可持续

脑极体

MySQL事务的实现原理之Undo Log的分析

卢卡多多

Undo Log 11月日更

🍃【Spring专题】「实战系列」重新回顾一下异常重试框架Spring Retry的功能指南

洛神灬殇

spring 11月日更 Spring retry Guava retry

Prometheus Exporter (三)容器信息监控

耳东@Erdong

container Prometheus exporter 11月日更 cAdvisor

Apache Kafka:下一代分布式消息系统_语言 & 开发_Abhishek Sharma_InfoQ精选文章