【ArchSummit架构师峰会】探讨数据与人工智能相互驱动的关系>>> 了解详情
写点什么

基于 Kafka 实现分布式事件驱动

  • 2019-02-20
  • 本文字数:6042 字

    阅读完需:约 20 分钟

基于Kafka实现分布式事件驱动

事件驱动是一种灵活的系统设计方法,在事件驱动的系统中,当数据发生变化时系统会产生、发布一个对应的事件,其它对这个事件感兴趣的部分会接收到通知,并进行相应的处理。事件驱动设计最大的好处在我看来有两点:一是它为系统提供了很好的扩展能力,比如我们可以对某类事件增加一个订阅者来对系统进行扩展,最主要的是我们并不需要修改任何已有的代码,它完全符合开闭原则;二是它实现了模块间的低偶合,系统间各个部分不是强依赖关系,而是通过事件把整个系统串联起来。


当然,任何事务都有两面性,事件驱动也有其不好的方面。首先,实现一套这样的系统复杂度就很高,对开发人员的要求也很高;再次,对系统的整体把控会很困难,想象一下面对几百个类别的事件,并且没有一个统一的地方可以让我们看到整个业务处理流程,会是什么心情?所以当我们决定采用事件驱动实现系统中,一定要维护好相关的文档,并保持它们的有效性。


我们再来看看事件驱动架构的一些其它的优点:


  • 更好的响应性

  • 事件驱动中,事件的响应是异步处理的,所以它具有更好的响应性。

  • 更好的容错性

  • 业务主流程在发布事件之后便结束了,扩展流程的延后处理可以异步不断的失败重试,直到成功为止,系统整体容错性更强。

设计篇

首先,我们需要定义什么是事件?从业务角度看,事件包括以下属性:


事件的定义
属性字段类型说明
标识IDstring系统内部每个事件都需要一个唯一的标识。
类型eventTypestring数据发生变化产生事件,不同类型的数据变化产生不同类型的事件。比如会员下单、会员注册、用户修改手机号等等。
时间eventTimedatetime即数据发生变化的时间。
上下文contextstring事件发生时的上下文信息。比如会员修改手机号事件,需要原号码和新号码,会员ID等信息。


接下来,我们看看如何设计一套基于事件驱动的系统,你知道设计模式中的观察者模式吗?


观察者模式:定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有观察者对象,使它们能够自动更新自己。


观察者模式天生就是事件驱动的一个实现,但是直接使用它有很多的弊端。首先,它是基于主题的,有多少类事件就需要多少个主题类,这可能会导致类爆炸;其次,观察者模式是同步实现的,这样我们可能会牺牲掉响应性和容错性等优势。


所以我们需要对观察者模式稍作改进,我们分别从事件发发布和消费两个方面来分析。

事件的发布

本文的标题是《基于 Kafka 实现事件驱动架构》,很明显,我们使用 kafka 作为消息中间件来传递事件消息。所以,像修改会员手机号码的代码可能实现如下:


@Transactional(readOnly = false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)@Overridepublic void changePhoneNumber(String newNumber) {    userDao.updatePhone(this.getUserId(), newNumber); // 本地数据库修改
// 发布 用户手机号码变更 事件 Event event = new Event(...); // 创建一个事件对象,表示用户修改手机号码 ProducerRecord record = new ProducerRecord(...event); // 根据event生成kakfa record
Future<RecordMetadata> f = kafkaProducer.send(record); try { f.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); }}
复制代码


这段代码正确吗?从逻辑上看,它完全正确。但从可靠性角度看它是有问题的。Kafka 和数据库是两个异构系统,我们不能仅仅通过一个本地事务保证他们之间的数据一致性。例如,推送 Kafka 成功了,但是在提交 DB 事务的时候失败了呢(比如说事务超时滚)?这样 kafka 中就会存在一个脏数据,因为本地数据库事务已经回滚了。


分布式系统数据一致性一直就是复杂的问题,常用的方案有两阶段提交、三阶段提交、zookeeper 的 zab 协议、proxs、raft 等算法,这不是本文的重点。我们采用一个简单易懂的方式来解决上面的问题。我们引入一张 DB 事件表,在发布事件时将事件信息存入这个事件表,将事件的发布和业务处理包装在同一个本地事务中。


create table if not exists `event_queue` (  `id`          bigint      not null  auto_increment comment '主键',  `event_id`    char(32)    not null comment '事件ID',  `event_type`  char(12)    not null comment '事件类型',  `event_time`  datetime    not null comment '事件发生时间',  `context`     mediumtext  not null comment '事件内容',  primary key (`id`),  unique key(`event_id`)) engine=innodb default charset=utf8 comment='事件队列表';
复制代码


发布事件,就是向这个事件表中增加一条记录,修改会员手机号码的代码现在变成了:


@Transactional(readOnly = false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)@Overridepublic void changePhoneNumber(String newNumber) {    userDao.updatePhone(this.getUserId(), newNumber); // 本地数据库修改
// 发布 用户手机号码变更 事件 Event event = new Event(...); // 创建一个事件对象,表示用户修改手机号码 eventDao.insert(event); // 向事件表创建一条新记录。}
复制代码


由于事件消息现在被暂存进了 DB,我们还需要将它取出来推到 Kafka,为此我们需要起一个线程,不断的读取事件表中的记录发送给 Kafka,并在成功发送之后将记录从 DB 中删除。如果删除 DB 的时候失败了,那么消息会被重新推送到 kafka,意为着我们实现的是 At least once 的递交语义,对于业务上不接受重复的场景,在消费端需要做好幂等处理。


讲到这里,关于事件的分布已经接近尾声,但还有一个问题:性能。如果一个系统的负载很高,一秒内产生成千上万个事件,那我们的事件表就会成为瓶颈,因为只用了一个线程来处理事件表向 Kafka 的推送,集群中只有一个实例能发辉作用,无法实现弹性。为了解决这个问题,我们可以对事件表进行分表,并使用多线程并发处理,而且这些线程可以分布在不同的集群实例中。但这样使设计变得更复杂了,现在我们需要解决一个新的问题:如何保证一个事件表,最多只被一个线程处理?我们需要保证一个事件表同一时刻只能被一个线程处理,同时在实例宕机后,其它实例可以起线程接替它的工作。这句话我们换一种方式来描述更容易理解:


  1. 集群有 M 个实例,需要进行 N 个任务(任务是把事件分表中的事件信息推送到 kafka)

  2. 一个任务最多可以分配给 1 个实例,1 个实例可以同时执行多个任务。

  3. 如果一个实例宕机了,分配给它的任务需要重新在其它实例上分配。

  4. N 个任务固定不变,实例可以动态增加或减少,需要实现实例之间的均衡负载。


如果你熟悉像 HBase、ES 这类分布式系统的话,不难理解我们需要在集群中选出一个实例作为 Master,由它来负责任务在集群中的分配工作。我们借助 Zookeeper,所有实例在启动时创建一个 EPHEMERAL 类型的 master 节点,创建成功的实例成为 Master,其它实例则监听 master 节点,当 Master 实例宕机后重新竞选。



每个实例启动后,会在 workers 节点下创建一个临时节点,表示自己作为一个 Worker 加入集群;Worker 同时会监听自己创建的子节点,接收由 Master 分配给自己的任务。Master 会监听 workers 下子节点的变化,当实例下线或有新的实例加入集群中时,Master 会收到通知并重新进行任务的分配。分配的具体信息保存在 Worker 实例创建的子节点中,Master 通过直接修改这些子节点的内容实现分配。


从事件的发布来看,系统的架构是这样的:



这里有个细节需要说明:因为 Kafka 只保证 partition 级别的有序性,我们的事件分表数必须大于或等于 partition 的数量,否则事件的顺序得不到保证

事件的消费

因为我们使用了 Kafka 作为事件消息中间件,事件的消费简单很多。每个实例在启动时启一个 Kafka Consumer 即可,像实例间的负载、可用性、故障转移等等问题,Kafka 已经帮我们解决了,我们只需要从 Kafka 中获取事件消息,并通知相应的订阅者即可。



订阅者需要实现BaseSubscriber接口,另外在启动时,需要把事件与订阅者的关系维护在SubscriberConfig类中:


BaseSubscriber sub = ... // your implementationSubscriberConfig.instance().addSubscriber("event_type", sub);
复制代码


系统整体的设计是面向扩展的,我们可以通过调整集群应用实例数、事件表分表数量和 kafka partitions 数量来提高系统整体的吞吐量。事件表分表越多,事件消息从 DB 到 kafka 的延迟就更低;应用实例越多,系统单位时间内能承受的事件上限也越多,另外也能更好的负载 kafka 消息的消费。


每一个应用,作为事件发布者,其产生的事件最终都被推送到一个 Kafka Topic;但作为消费者,可以订阅不同的 Topic,这些 Topic 可以是自己的推送的,也可以是其它应用推送的事件。

实现篇

附上完整源码地址https://github.com/OuYangLiang/libevent,目前只支持了分表,还不支持分库。


这里我们只对部分核心代码作一个简单的介绍:


SimpleLock是一个基于 Zookeeper 的简单分布式锁实现,可以参考这里,我们使用SimpleLock来实现 Master 的竞选。


EventSubmitter是一个线程,负责把事件表中的事件信息推送到 Kafka broker。初始化时需要传入一个 int 参数,表示处理哪一个事件分表。它被实现成一个响应中断的线程,因为当 Master 重新分配任务后,Worker 需要先停掉当前进行中的任务。


Master类是 Master 实例的主要实现。实例在启动时会调Master类的start方法,Master 实例监听 workers 节点,当有新实例加入或实例下线时,Master 实例会调用onWorkerChange方法进行重新分配,onWorkerChange方法实现了一个简单的分配算法,只有任务变更的 Worker 实例会收到分配通知。


Worker类是 Worker 实例的主要实现,实例在启动时会调Worker类的start方法。集群中的每一个实例都是 Worker,会在 workers 节点下创建一个临时的节点表示自己,同时监听该节点,接受 Master 分配给自己的任务。当 Worker 接收到分配通知时,会先停止当前在运行的所有任务,再根据 worker 节点的内容开始执行新分配的任务。

示例

来看一个具体的事例,假设我们要以天为维度,统计每天的下单量和下单金额。现在,我们已经有了订单表:


create table if not exists `order` (  `order_id`        bigint      not null  auto_increment comment '主键',  `user_id`         bigint      not null comment '客户id',  `order_time`      datetime    not null comment '订单时间',  `order_amount`    int         not null comment '订单金额,单位:分',  primary key (`order_id`)) engine=innodb default charset=utf8 comment='订单表';
复制代码


这个需求我们可以简单的使用 sql 来做,比如:


select date(order_time) as day, count(*) as total_num, sum(order_amount) as total_amount from `order`group by date(order_time)
复制代码


但是在生产环境中这么做往往不现实,比如性能问题、或者我们对订单表做了分表、或者几个月前的数据库了备份,而你正好需要查询这些数据,等等。实现这个需求更好的方式是采用事件驱动,在下单的时候发布一个事件,然后异步的维护一个查询表,这样之间的种种问题都将不复存在。先创建一个查询表,如下:


create table if not exists `daily_order_report` (  `id`              bigint      not null  auto_increment comment '主键',  `day`             date      not null comment '统计日',  `order_num`       bigint      not null comment '订单数量',  `order_total`     bigint      not null comment '订单总金额,单位:分',  primary key (`id`),  unique key(`day`)) engine=innodb default charset=utf8 comment='订单日报表';
复制代码


在下单的时候,我们需要发布一个下单事件


@Transactional(readOnly = false, propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)@Overridepublic void createOrder(Order order) {    orderDao.insert(order);
//发布下单事件 publisher.publish("order_created", new Date(), order.json(), order.getUserId().intValue() % Configuration.instance().getNumOfEventTables());}
复制代码


之后,我们需要实现一个订阅者,在接收到下单事件后,根据订单的日期做相应的统计:


@Componentpublic class DailyOrderReportSubscriber implements BaseSubscriber {
@Autowired private OrderRepos repos;
@Override public void onEvent(Event e) { Order order = Order.fromJson(e.getContext()); DailyOrderReport report = repos.selectDailyOrderReportByKey(new java.sql.Date(order.getOrderTime().getTime()));
if (null == report) { report = new DailyOrderReport(); report.setDay(new java.sql.Date(order.getOrderTime().getTime())); report.setOrderNum(1l); report.setOrderTotal(new Long(order.getOrderAmount()));
repos.createDailyOrderReport(report); } else { report.setOrderNum(report.getOrderNum() + 1); report.setOrderTotal(report.getOrderTotal() + order.getOrderAmount());
repos.updateDailyOrderReport(report); } }}
复制代码


随机创建 10 个订单后,我们的报表情况如下:


mysql> select * from `order`;+----------+---------+---------------------+--------------+| order_id | user_id | order_time          | order_amount |+----------+---------+---------------------+--------------+|       21 |       3 | 2018-09-24 01:06:43 |          251 ||       22 |       2 | 2018-09-24 01:06:43 |          371 ||       23 |       5 | 2018-09-24 01:06:43 |          171 ||       24 |       0 | 2018-09-24 01:06:43 |          904 ||       25 |       3 | 2018-09-24 01:06:43 |           55 ||       26 |       5 | 2018-09-24 01:06:44 |          315 ||       27 |       8 | 2018-09-24 01:06:44 |          543 ||       28 |       8 | 2018-09-24 01:06:44 |          537 ||       29 |       2 | 2018-09-24 01:06:44 |          123 ||       30 |       3 | 2018-09-24 01:06:45 |          938 |+----------+---------+---------------------+--------------+10 rows in set (0.00 sec)
mysql> select * from daily_order_report;+----+------------+-----------+-------------+| id | day | order_num | order_total |+----+------------+-----------+-------------+| 2 | 2018-09-24 | 10 | 4208 |+----+------------+-----------+-------------+1 row in set (0.00 sec)
mysql>
复制代码




作者简介:


欧阳亮,满帮集团运满满 CRM 团队负责人,架构师。关注微服务、大数据、实时计算等领域。


2019-02-20 14:598863

评论 2 条评论

发布
用户头像
这特么也能写篇文章??
2019-04-01 17:16
回复
用户头像
讲解搭配实例,好给力
2019-02-24 10:35
回复
没有更多了
发现更多内容

从0到1搭建大数据平台之调度系统

数据社

大数据 工作流调度

读书笔记:Google软件测试之道【一】

Man

测试 测试文化

如何从0到1搭建大数据平台

数据社

大数据 中台

自主管理——对人性的假设

zhongzhq

自主管理 组织

从0到1搭建大数据平台之计算存储系统

数据社

大数据 中台 计算引擎

LeetCode 328. Odd Even Linked List

liu_liu

算法 LeetCode

话题讨论 | 哪本极具影响力的书,是每位程序员都应该读的?

InfoQ写作社区官方

写作平台 话题讨论

实时计算的业务劣势、思维误区和改进之道

KAMI

大数据 flink 方法论 实时计算

周子衡 | 数字资产、数字支付及跨境活动——以美元数字化为例

CECBC

加密货币 数字资产

一位区块链产品经理讲述“区块链”的通知 重点方向包括区块链安全

CECBC

物联网 区块链技术 联盟链

读书时,如何提炼文章架构形成思维导图

dd多了个多

读书笔记 读书感悟

从0到1搭建大数据平台之数据采集系统

数据社

大数据 数据采集

架构师训练营第九周作业

一剑

原创 | 使用JPA实现DDD持久化- O:对象的世界(1/3)

编程道与术

Java hibernate DDD JDBC jpa

区块链在这些生活场景中悄然落地了......

CECBC

区块链 落地应用

写作社区划线笔记新功能全新上线!给你带来不一样的写作学习体验~

InfoQ写作社区官方

写作平台 玩转写作平台 热门活动

湾区金科沙龙,华青融天技术总监吴伟平详解旁路式应用性能监控

DT极客

我是如何写读书笔记的

dd多了个多

读书笔记

成功的9大步骤:从手动测试转为自动化测试

禅道项目管理

测试 自动化测试

读书笔记:Google软件测试之道【二】

Man

测试 测试文化

糟糕,你写的 BUG 要被存1000年了!

华为云开发者联盟

GitHub 开源 代码 bug 卤化银胶片

什么是零代码?零代码开发可以带来的好处

代码制造者

可视化 零代码 编程效率

读书,区分一二三四手知识

dd多了个多

读书笔记

作业1

chenzt

前端面试vue部分(1)——谈谈你对MVVM的理解

dd多了个多

面试 Vue 大前端 Web

30秒,2种方法解决SQL Server的内存管理问题

华为云开发者联盟

数据库 sql 内存 服务器 华为云

读书笔记:Google软件测试之道【三】

Man

测试 测试文化

VIPKID 在线教育场景下的实时计算技术落地和实践

Apache Flink

flink

Java 垃圾回收

dongge

Flink x Zeppelin ,Hive Streaming 实战解析

Apache Flink

flink hive Zeppelin

汇丰坠落:世间已无「日不落」

钛禾产业观察

汇丰 财经

基于Kafka实现分布式事件驱动_语言 & 开发_欧阳亮_InfoQ精选文章