NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

基于 Kafka 实现分布式事件驱动

  • 2019-02-20
  • 本文字数:6042 字

    阅读完需:约 20 分钟

基于Kafka实现分布式事件驱动

事件驱动是一种灵活的系统设计方法,在事件驱动的系统中,当数据发生变化时系统会产生、发布一个对应的事件,其它对这个事件感兴趣的部分会接收到通知,并进行相应的处理。事件驱动设计最大的好处在我看来有两点:一是它为系统提供了很好的扩展能力,比如我们可以对某类事件增加一个订阅者来对系统进行扩展,最主要的是我们并不需要修改任何已有的代码,它完全符合开闭原则;二是它实现了模块间的低偶合,系统间各个部分不是强依赖关系,而是通过事件把整个系统串联起来。


当然,任何事务都有两面性,事件驱动也有其不好的方面。首先,实现一套这样的系统复杂度就很高,对开发人员的要求也很高;再次,对系统的整体把控会很困难,想象一下面对几百个类别的事件,并且没有一个统一的地方可以让我们看到整个业务处理流程,会是什么心情?所以当我们决定采用事件驱动实现系统中,一定要维护好相关的文档,并保持它们的有效性。


我们再来看看事件驱动架构的一些其它的优点:


  • 更好的响应性

  • 事件驱动中,事件的响应是异步处理的,所以它具有更好的响应性。

  • 更好的容错性

  • 业务主流程在发布事件之后便结束了,扩展流程的延后处理可以异步不断的失败重试,直到成功为止,系统整体容错性更强。

设计篇

首先,我们需要定义什么是事件?从业务角度看,事件包括以下属性:


事件的定义
属性字段类型说明
标识IDstring系统内部每个事件都需要一个唯一的标识。
类型eventTypestring数据发生变化产生事件,不同类型的数据变化产生不同类型的事件。比如会员下单、会员注册、用户修改手机号等等。
时间eventTimedatetime即数据发生变化的时间。
上下文contextstring事件发生时的上下文信息。比如会员修改手机号事件,需要原号码和新号码,会员ID等信息。


接下来,我们看看如何设计一套基于事件驱动的系统,你知道设计模式中的观察者模式吗?


观察者模式:定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有观察者对象,使它们能够自动更新自己。


观察者模式天生就是事件驱动的一个实现,但是直接使用它有很多的弊端。首先,它是基于主题的,有多少类事件就需要多少个主题类,这可能会导致类爆炸;其次,观察者模式是同步实现的,这样我们可能会牺牲掉响应性和容错性等优势。


所以我们需要对观察者模式稍作改进,我们分别从事件发发布和消费两个方面来分析。

事件的发布

本文的标题是《基于 Kafka 实现事件驱动架构》,很明显,我们使用 kafka 作为消息中间件来传递事件消息。所以,像修改会员手机号码的代码可能实现如下:


@Transactional(readOnly = false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)@Overridepublic void changePhoneNumber(String newNumber) {    userDao.updatePhone(this.getUserId(), newNumber); // 本地数据库修改
// 发布 用户手机号码变更 事件 Event event = new Event(...); // 创建一个事件对象,表示用户修改手机号码 ProducerRecord record = new ProducerRecord(...event); // 根据event生成kakfa record
Future<RecordMetadata> f = kafkaProducer.send(record); try { f.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); }}
复制代码


这段代码正确吗?从逻辑上看,它完全正确。但从可靠性角度看它是有问题的。Kafka 和数据库是两个异构系统,我们不能仅仅通过一个本地事务保证他们之间的数据一致性。例如,推送 Kafka 成功了,但是在提交 DB 事务的时候失败了呢(比如说事务超时滚)?这样 kafka 中就会存在一个脏数据,因为本地数据库事务已经回滚了。


分布式系统数据一致性一直就是复杂的问题,常用的方案有两阶段提交、三阶段提交、zookeeper 的 zab 协议、proxs、raft 等算法,这不是本文的重点。我们采用一个简单易懂的方式来解决上面的问题。我们引入一张 DB 事件表,在发布事件时将事件信息存入这个事件表,将事件的发布和业务处理包装在同一个本地事务中。


create table if not exists `event_queue` (  `id`          bigint      not null  auto_increment comment '主键',  `event_id`    char(32)    not null comment '事件ID',  `event_type`  char(12)    not null comment '事件类型',  `event_time`  datetime    not null comment '事件发生时间',  `context`     mediumtext  not null comment '事件内容',  primary key (`id`),  unique key(`event_id`)) engine=innodb default charset=utf8 comment='事件队列表';
复制代码


发布事件,就是向这个事件表中增加一条记录,修改会员手机号码的代码现在变成了:


@Transactional(readOnly = false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)@Overridepublic void changePhoneNumber(String newNumber) {    userDao.updatePhone(this.getUserId(), newNumber); // 本地数据库修改
// 发布 用户手机号码变更 事件 Event event = new Event(...); // 创建一个事件对象,表示用户修改手机号码 eventDao.insert(event); // 向事件表创建一条新记录。}
复制代码


由于事件消息现在被暂存进了 DB,我们还需要将它取出来推到 Kafka,为此我们需要起一个线程,不断的读取事件表中的记录发送给 Kafka,并在成功发送之后将记录从 DB 中删除。如果删除 DB 的时候失败了,那么消息会被重新推送到 kafka,意为着我们实现的是 At least once 的递交语义,对于业务上不接受重复的场景,在消费端需要做好幂等处理。


讲到这里,关于事件的分布已经接近尾声,但还有一个问题:性能。如果一个系统的负载很高,一秒内产生成千上万个事件,那我们的事件表就会成为瓶颈,因为只用了一个线程来处理事件表向 Kafka 的推送,集群中只有一个实例能发辉作用,无法实现弹性。为了解决这个问题,我们可以对事件表进行分表,并使用多线程并发处理,而且这些线程可以分布在不同的集群实例中。但这样使设计变得更复杂了,现在我们需要解决一个新的问题:如何保证一个事件表,最多只被一个线程处理?我们需要保证一个事件表同一时刻只能被一个线程处理,同时在实例宕机后,其它实例可以起线程接替它的工作。这句话我们换一种方式来描述更容易理解:


  1. 集群有 M 个实例,需要进行 N 个任务(任务是把事件分表中的事件信息推送到 kafka)

  2. 一个任务最多可以分配给 1 个实例,1 个实例可以同时执行多个任务。

  3. 如果一个实例宕机了,分配给它的任务需要重新在其它实例上分配。

  4. N 个任务固定不变,实例可以动态增加或减少,需要实现实例之间的均衡负载。


如果你熟悉像 HBase、ES 这类分布式系统的话,不难理解我们需要在集群中选出一个实例作为 Master,由它来负责任务在集群中的分配工作。我们借助 Zookeeper,所有实例在启动时创建一个 EPHEMERAL 类型的 master 节点,创建成功的实例成为 Master,其它实例则监听 master 节点,当 Master 实例宕机后重新竞选。



每个实例启动后,会在 workers 节点下创建一个临时节点,表示自己作为一个 Worker 加入集群;Worker 同时会监听自己创建的子节点,接收由 Master 分配给自己的任务。Master 会监听 workers 下子节点的变化,当实例下线或有新的实例加入集群中时,Master 会收到通知并重新进行任务的分配。分配的具体信息保存在 Worker 实例创建的子节点中,Master 通过直接修改这些子节点的内容实现分配。


从事件的发布来看,系统的架构是这样的:



这里有个细节需要说明:因为 Kafka 只保证 partition 级别的有序性,我们的事件分表数必须大于或等于 partition 的数量,否则事件的顺序得不到保证

事件的消费

因为我们使用了 Kafka 作为事件消息中间件,事件的消费简单很多。每个实例在启动时启一个 Kafka Consumer 即可,像实例间的负载、可用性、故障转移等等问题,Kafka 已经帮我们解决了,我们只需要从 Kafka 中获取事件消息,并通知相应的订阅者即可。



订阅者需要实现BaseSubscriber接口,另外在启动时,需要把事件与订阅者的关系维护在SubscriberConfig类中:


BaseSubscriber sub = ... // your implementationSubscriberConfig.instance().addSubscriber("event_type", sub);
复制代码


系统整体的设计是面向扩展的,我们可以通过调整集群应用实例数、事件表分表数量和 kafka partitions 数量来提高系统整体的吞吐量。事件表分表越多,事件消息从 DB 到 kafka 的延迟就更低;应用实例越多,系统单位时间内能承受的事件上限也越多,另外也能更好的负载 kafka 消息的消费。


每一个应用,作为事件发布者,其产生的事件最终都被推送到一个 Kafka Topic;但作为消费者,可以订阅不同的 Topic,这些 Topic 可以是自己的推送的,也可以是其它应用推送的事件。

实现篇

附上完整源码地址https://github.com/OuYangLiang/libevent,目前只支持了分表,还不支持分库。


这里我们只对部分核心代码作一个简单的介绍:


SimpleLock是一个基于 Zookeeper 的简单分布式锁实现,可以参考这里,我们使用SimpleLock来实现 Master 的竞选。


EventSubmitter是一个线程,负责把事件表中的事件信息推送到 Kafka broker。初始化时需要传入一个 int 参数,表示处理哪一个事件分表。它被实现成一个响应中断的线程,因为当 Master 重新分配任务后,Worker 需要先停掉当前进行中的任务。


Master类是 Master 实例的主要实现。实例在启动时会调Master类的start方法,Master 实例监听 workers 节点,当有新实例加入或实例下线时,Master 实例会调用onWorkerChange方法进行重新分配,onWorkerChange方法实现了一个简单的分配算法,只有任务变更的 Worker 实例会收到分配通知。


Worker类是 Worker 实例的主要实现,实例在启动时会调Worker类的start方法。集群中的每一个实例都是 Worker,会在 workers 节点下创建一个临时的节点表示自己,同时监听该节点,接受 Master 分配给自己的任务。当 Worker 接收到分配通知时,会先停止当前在运行的所有任务,再根据 worker 节点的内容开始执行新分配的任务。

示例

来看一个具体的事例,假设我们要以天为维度,统计每天的下单量和下单金额。现在,我们已经有了订单表:


create table if not exists `order` (  `order_id`        bigint      not null  auto_increment comment '主键',  `user_id`         bigint      not null comment '客户id',  `order_time`      datetime    not null comment '订单时间',  `order_amount`    int         not null comment '订单金额,单位:分',  primary key (`order_id`)) engine=innodb default charset=utf8 comment='订单表';
复制代码


这个需求我们可以简单的使用 sql 来做,比如:


select date(order_time) as day, count(*) as total_num, sum(order_amount) as total_amount from `order`group by date(order_time)
复制代码


但是在生产环境中这么做往往不现实,比如性能问题、或者我们对订单表做了分表、或者几个月前的数据库了备份,而你正好需要查询这些数据,等等。实现这个需求更好的方式是采用事件驱动,在下单的时候发布一个事件,然后异步的维护一个查询表,这样之间的种种问题都将不复存在。先创建一个查询表,如下:


create table if not exists `daily_order_report` (  `id`              bigint      not null  auto_increment comment '主键',  `day`             date      not null comment '统计日',  `order_num`       bigint      not null comment '订单数量',  `order_total`     bigint      not null comment '订单总金额,单位:分',  primary key (`id`),  unique key(`day`)) engine=innodb default charset=utf8 comment='订单日报表';
复制代码


在下单的时候,我们需要发布一个下单事件


@Transactional(readOnly = false, propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)@Overridepublic void createOrder(Order order) {    orderDao.insert(order);
//发布下单事件 publisher.publish("order_created", new Date(), order.json(), order.getUserId().intValue() % Configuration.instance().getNumOfEventTables());}
复制代码


之后,我们需要实现一个订阅者,在接收到下单事件后,根据订单的日期做相应的统计:


@Componentpublic class DailyOrderReportSubscriber implements BaseSubscriber {
@Autowired private OrderRepos repos;
@Override public void onEvent(Event e) { Order order = Order.fromJson(e.getContext()); DailyOrderReport report = repos.selectDailyOrderReportByKey(new java.sql.Date(order.getOrderTime().getTime()));
if (null == report) { report = new DailyOrderReport(); report.setDay(new java.sql.Date(order.getOrderTime().getTime())); report.setOrderNum(1l); report.setOrderTotal(new Long(order.getOrderAmount()));
repos.createDailyOrderReport(report); } else { report.setOrderNum(report.getOrderNum() + 1); report.setOrderTotal(report.getOrderTotal() + order.getOrderAmount());
repos.updateDailyOrderReport(report); } }}
复制代码


随机创建 10 个订单后,我们的报表情况如下:


mysql> select * from `order`;+----------+---------+---------------------+--------------+| order_id | user_id | order_time          | order_amount |+----------+---------+---------------------+--------------+|       21 |       3 | 2018-09-24 01:06:43 |          251 ||       22 |       2 | 2018-09-24 01:06:43 |          371 ||       23 |       5 | 2018-09-24 01:06:43 |          171 ||       24 |       0 | 2018-09-24 01:06:43 |          904 ||       25 |       3 | 2018-09-24 01:06:43 |           55 ||       26 |       5 | 2018-09-24 01:06:44 |          315 ||       27 |       8 | 2018-09-24 01:06:44 |          543 ||       28 |       8 | 2018-09-24 01:06:44 |          537 ||       29 |       2 | 2018-09-24 01:06:44 |          123 ||       30 |       3 | 2018-09-24 01:06:45 |          938 |+----------+---------+---------------------+--------------+10 rows in set (0.00 sec)
mysql> select * from daily_order_report;+----+------------+-----------+-------------+| id | day | order_num | order_total |+----+------------+-----------+-------------+| 2 | 2018-09-24 | 10 | 4208 |+----+------------+-----------+-------------+1 row in set (0.00 sec)
mysql>
复制代码




作者简介:


欧阳亮,满帮集团运满满 CRM 团队负责人,架构师。关注微服务、大数据、实时计算等领域。


2019-02-20 14:598866

评论 2 条评论

发布
用户头像
这特么也能写篇文章??
2019-04-01 17:16
回复
用户头像
讲解搭配实例,好给力
2019-02-24 10:35
回复
没有更多了
发现更多内容

给弟弟的信第19封|年轻人要注意养生

大菠萝

28天写作

从 Discord 看未来社交的「超级群」模式

融云 RongCloud

Ajax+SSM实现客户端开发 实现简单的前后端分离

Bug终结者

Java ajax 前后端分离

30个类手写Spring核心原理之动态数据源切换(8)

Tom弹架构

Java spring 源码

从Hadoop框架讨论大数据生态

编程江湖

大数据 hadoop

【等保小知识】等保一级需要测评吗?

行云管家

网络安全 等保 等级保护 等保一级

🏆【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析

洛神灬殇

RocketMQ 消息队列 Apache RocketMQ 12月日更

盘点2021 | 技术十年-记录十年技术经历

高性能架构探索

技术人 工作经历 经历分享 盘点2021

JAVA 开发常用工具汇总

编程江湖

java编程

web技术分享| 白板SDK的几种图形检测算法

anyRTC开发者

前端 音视频 白板 web技术分享 图形检测算法

升级过log4j,却还没搞懂log4j漏洞的本质?

华为云开发者联盟

Java log4j 漏洞 JNDI rmi

2021数据技术嘉年华 | OceanBase 技术盛宴ON LINE ,我们不见不散!

OceanBase 数据库

数据库 OceanBase 社区版 技术嘉年华 DTC

6000 字干货详解:直播聊天室的无限用户优化

融云 RongCloud

高并发 直播 直播聊天室 海量用户

「猿桌派」即将开播,聚焦客户端埋点和大数据分析

融云 RongCloud

大数据 程序员 埋点

超市发:多措并举 提振销售 服务顾客

科技热闻

一站式云安全保障,就用行云管家!完美保障!

行云管家

云计算 云安全 企业上云 云资源 云管理

Apache APISIX 社区双周报 | 功能亮点更新进行中

API7.ai 技术团队

云原生 后端 开源社区 api 网关 Apache APISIX

DotNet工具箱之性能监控组件——CLRStats

为自己带盐

dotnet 28天写作 12月日更

【MongoDB学习笔记】-使用 MongoDB 进行 CRUD 操作(上)

恒生LIGHT云社区

数据库 mongodb

实用机器学习笔记二十:偏差和方差

打工人!

机器学习 深度学习 算法 学习笔记 12月日更

视频通信中的码率控制算法

拍乐云Pano

音视频 RTC 视频编码 码率控制

腾讯云原生数据库TDSQL-C斩获2021 PostgreSQL中国最佳数据库产品奖

科技热闻

10 个打造 React.js App 的最佳 UI 框架

编程江湖

前端开发

Linux云计算好学吗?Linux云计算运维学习资料,手把手教你学 条件测试语句和流程控制语句的使用

学神来啦

Linux centos Shell if linux云计算

30个类手写Spring核心原理之自定义ORM(下)(7)

Tom弹架构

Java spring 源码

(转)大数据开发之Hive中UDTF函数

@零度

大数据 hive

熟悉又陌生的白帽黑客组织OWASP

喀拉峻

黑客 网络安全 安全 OWASP

化繁为简--百度智能小程序主数据架构实战总结

百度Geek说

小程序 百度 架构 后端 数据

Linux之more命令

入门小站

Linux

酷炫3D效果在瘦设备上也能实现?|HDC2021技术分论坛

HarmonyOS开发者

HarmonyOS

在线JSON转Csharp工具

入门小站

工具

基于Kafka实现分布式事件驱动_语言 & 开发_欧阳亮_InfoQ精选文章