Yelp 的实时流技术之二:将 MySQL 表数据变更实时流到 Kafka 中

阅读数:10449 2016 年 9 月 11 日 17:48

这是关于 Yelp 的实时流数据基础设施系列文章的第二篇。这个系列会深度讲解我们如何用“确保只有一次”的方式把 MySQL 数据库中的改动实时地以流的方式传输出去,我们如何自动跟踪表模式变化,如何处理和转换流,以及最终如何把这些数据存储到 Redshift 或 Salesforce 之类的数据仓库中去。

阅读本系列的第一篇:

伴随着我们技术团队的扩张,我们意识到必须将系统从单一庞大的程序向服务转变,这样才会更容易使用,更方便扩展。向这样的架构迁移有许多优点,也有许多问题,最大的问题之一是:我们怎样才能尽快向服务提供它们需要的数据?

设想这样一个系统,里面有数以百计的服务,每个服务又自己管理着 MySQL 中的数以百万行计的数据,那服务之间为发送数据而进行的通信就会成为一个非常大的技术难题。正如上一篇文章中提到的,很快就会碰到“ N+1 次查询问题”。对于我们来说,解决方案就是构建一个名为 MySQLStreamer 的系统,来监控数据库中的数据变更,并将数据变更发送给所有订阅了这些内容的服务。

MySQLStreamer 是一个数据库变更捕获和发布系统,它负责捕获数据库的每一条数据变更,将它们打包成消息并发布到Kafka 中。“流表二象性”的概念就是讲述可以重放表中的每一条变更操作来将整张表重建为某个时间点的镜像,或者发送表中的每一条变更来重新生成流。

要理解我们是如何捕获和发布数据变更的,就必须了解我们的数据库基础架构。在 Yelp,我们把绝大多数数据都保存在 MySQL 集群中。为了处理海量访问和管理读请求负载,Yelp 必须维护几十个在地理上按区域分布的读副本。

数据复制的原理

作为数据复制的基础,MySQL 主库上发生的数据变更操作会写到一种特定格式的二进制日志文件中。当一个从库连上主库时,从库会读取主库的二进制文件来获取数据变更信息。这个过程由从库上运行的两个线程完成,一个是 IO 线程,一个是 SQL 线程,如下图所示。IO 线程主要负责从主库中读回二进制事件,在读到了二进制事件之后,就把它们保存在从库本地的 Relay 日志文件中。SQL 线程就接着再把这些事件从 Relay 日志中读取出来,按收到的顺序把它们有顺序地重放写入从库中。

MySQL 数据复制原理

要注意的是从库上 SQL 线程执行的事件并不一定是主库二进制文件中的最新事件,这个在上图中表现为复制延迟。Yelp 的 MySQLStreamer 就做为一个从库,把更新操作持久化到 Apache Kafka 中,而不是象普通的 MySQL 从库一样持久化到数据表中。

MySQL 数据复制类型

MySQL 数据库有两种复制类型:

  • 基于语句的复制(Statement-based replication,SBR)
  • 基于行的复制(Row-based replication,RBR)

在基于语句的复制中,主库写到二进制日志文件中的是 SQL 语句,从库的 SQL 线程则直接在从库上重新执行这些语句。使用基于语句的复制有一些缺点,其中最重要的是有可能导致主从库之间的数据不一致。因为从库上的 SQL 线程只是简单的负责重新执行从主库上拷贝回来的操作语句,但事实上相同的语句在不同的时间不同的数据库上有可能会产生不同的结果。比如下面的语句:

INSERT INTO places (name, location)
(SELECT name, location FROM business)

这种情况下你本来是想查询出某些行并将它们插入到另一张表中,但是不带 ORDER BY 子句的语句在查询多次的情况下,查出的多条结果的顺序不一定每次都相同。而且,如果一个字段有 AUTO_INCREMENT 属性的话,每次执行相同的语句就可能会产生不同的自增值。另一个明显的例子就是使用 RAND() 或者 NOW() 函数,在不同的 MySQL 实例上运行时就会产生不同的结果。由于这些限制的存在,我们的 MySQLStreamer 就要求使用基于行的数据复制。在基于行的复制中,每个事件都会表明表中的单条记录是如何被改变的。UPDATE 和 DELETE 语句的日志中都会包含对应数据行在修改之前的原始值。这样,重放这些数据变更记录时就会保持数据一致性。

现在我们知道了数据复制是什么,那又为什么需要 MySQLStreamer 呢?

Yelp 的实时流平台的一个重要用途就是将数据变更流出去,从而让下游的子系统可以处理它们并保持数据更新。有两类我们要知道的 SQL 变更事件:

  • 数据定义语言(Data Definition Language,DDL ):定义或者修改数据库结构或模式;
  • 数据操作语言(Data Manipulation Language,DML ):修改数据行;

MySQLStreamer 则负责:

  • 不断从 MySQL 二进制文件中查看最新的日志,读取这两种类型的事件;
  • 根据事件类型不同而进行相应处理,将 DML 事件发布到 Kafka Topic 中;

MySQLStreamer 会发布四种不同的事件类型:插入、更新、删除和刷新。前三种对应着相同类型的 DML 操作。刷新事件由我们的初始化 Topic 过程产生,在后文中会详细描述。对于每种事件类型,我们都会包含完整的数据行内容。更新事件包括相应数据行的更新前和更新后的全部字段值。这对处理环形操作非常重要。设想一个地理编码服务在处理了业务更新信息后,如果该行数据的地址变了,于是触发了对经度和纬度的更新。如果没有更新前的值,服务就不得不保存非常多的状态来判断该行的地址是不是已经修改完了,从而可以忽略掉经度和纬度的重复更新。可是如果有了更改前和更改后的内容,只需要做一个简单的对比操作就可以打破这样的环形操作了,不必保存任何状态。

MySQLStreamer 事件类型

数据库拓扑

MySQLStreamer 由以下三种数据库组成:

数据库拓扑

源数据库

源数据库中存储上游数据的所有变更事件。MySQLStreamer 会不断查看它的变更状态,把变更事件以流的形式传送给下游的消费者。MySQLStreamer 的二进制日志流解析模块就负责解析二进制日志,找出新事件。我们的流解析模块是个对 python-mysql-replication 包的 BinLogStreamReader 的进一步抽象,这个 API 主要提供了三个功能:跟踪下一个事件、读取下一个事件、从流的指定位置开始重新读取事件。

模式跟踪数据库

模式跟踪数据库和只有模式没有数据的从库差不多。它最初是只由源库中的模式生成,然后不断的在上面执行源库中的 DDL 操作来保持更新。这意味着它会略过数据更改,只保存所有表的表结构。我们通过 Schematizer 服务来在需要时从这个数据库中获取 CREATE TABLE 语句,并生成 Avro 模式。模式信息对于把二进制文件中的字段名与值映射起来也是非常必要的。由于存在复制延迟,MySQLStreamer 当前处理的复制数据的位置所对应的数据库模式也不一定是与主库的最新状态完全一致的。因此,MySQLStreamer 使用的模式定义不能简单地从主库上获取。我们决定用一个数据库来存储这个信息,来避免重新实现 MySQL 的 DDL 引擎。

由于 DDL 操作是非事务型的,因此假如系统执行一条 SQL 语句失败之后,数据库可能就处于崩溃的状态。为防止这样的事情发生,我们用事务的方式来处理整个数据库。我们在执行任何 DDL 操作之前先生成检查点,把整个模式跟踪数据库的全部模式创建语句都导出来,并保存在状态数据库中。然后再执行 DDL 事件。如果成功,就把刚才保存的模式导出文件删掉,再生成一个检查点。一个检查点通常包括保存的二进制文件名和位置,以及相关的 Kafka 偏移量信息。如果失败了,在 MySQLStreamer 重启之后,它会检查是否存在模式导出文件。如果有,就执行出错的 DDL 事件之前的模式导出文件来重建整个模式跟踪数据库。一旦回放完毕,MySQLStreamer 就重启日志事件查看服务,从检查点位置开始恢复数据复制,从而最终追上主库,保持数据最新。

状态数据库

状态数据库保存 MySQLStreamer 的内部状态,它包含三张表,分别保存不同的状态信息:

DATA_EVENT_CHECKPOINT

保存每个 Topic 及相应的最后推送的偏移量信息。

GLOBAL_EVENT_STATE

这张表中保存的最重要的信息就是位置,具体是这样定义的:

位置信息

要保证数据复制在发生故障的情况下仍然是安全的,前提之一就是每个事务都必须有唯一的标识符。它不仅在故障恢复的过程中有用,还在分层复制架构中有用。全局事务标识符(Global Transaction IDentifier,GTID)就是一个这样的标识符,在一套主从复制环境中,它是在所有服务器中全局唯一的。我们的代码已经支持 GTID 了,但我们使用的 MySQL 的版本还没支持。所以我们只好寻找其它替代方案来保存这样的状态,要求必须是在整个复制体系中都非常容易解释的,于是我们就想到了 Yelp 现有的心跳守护进程。这个 Python 守护进程会负责周期性地向数据库中更新心跳信息,内容是一个序列号和一个时间戳。这个心跳会随后被复制到所有的从库中。MySQLStreamer 从心跳信息中提取序列号和时间戳,再附上它现在正在处理的日志文件名和日志内偏移量,把这些信息保存在 global_event_state 表中。当主库由于某些原因出故障时, 一个脚本程序会利用心跳序列号和时间戳信息从新的主库中找出日志文件名和偏移量。

MYSQL_DUMPS

保存模式跟踪数据库导出的模式文件,用于在出错之后将数据库恢复到一个旧的稳定状态。

MySQLStreamer 是怎么工作的?

MySQLStreamer 工作机制

在 MySQLStreamer 进程启动时,它必须先去 ZooKeeper 上获取一个锁,然后才可以开始消息处理。获取锁的操作目的是避免在一个集群上会运行多个 MySQLStreamer 实例。在一个集群上运行多实例的问题在于日志复制本身是有顺序的,在某些场景下我们必须保留表内和表间的日志顺序。保证只有一个 MySQLStreamer 实例在处理日志,就可以保留日志顺序,避免消息被重复处理。

如前文所述,MySQLStreamer 利用 Binlog Parser 从源数据库中获取事件。对于数据事件,就从中提取表的模式信息,并发送给 Schematizer 服务。Schematizer 服务会返回相应的 Avro 模式和 Kafka Topic。Schematizer 服务是幂等的,不管你调用多少次,只要你发给它的是相同的建表语句,那它就会返回相同的 Avro 模式和 Kafka Topic。

MySQLStreamer 用收到的 Avro 模式将数据事件打包,然后发布到 Kafka Topic 中。Yelp 数据管道的 Kafka 生产者会为所有发布到 Kafka 中的事件维护一个内部队列。当它从 Binlog Parser 中收到了一个模式事件时,MySQLStreamer 就先把内部队列里的所有事件全都刷出去,再生成一个检查点,以备万一发生故障时恢复用。然后它再把模式事件应用到模式跟踪数据库上。

对数据事件的错误处理与模式事件略有不同。我们会在处理任何数据事件之前先生成检查点,然后在成功发布一批消息之后再继续生成检查点。我们信任成功的回复,而不是失败的。如果我们处理失败了,恢复时就去获取最新的检查点和 Kafka 高水位信息,将刚才没有成功发布的消息再发布一次。对于 Kafka 一端的设置,我们会要求收到所有处于复制状态的副本的响应消息才认为写入是成功的,而且我们也把min.isr 设置得比较高,牺牲了可用性来换取一致性。通过一整套的验证错误和故障恢复手段,我们可以保证消息是确定只发布一次的。

初始化一个 Topic

Yelp 公司成立于 2004 年,有趣的是许多表和公司的年纪一样大。我们需要有方法来用表中的现有内容来初始化相应的 Kafka Topic。我们设计了一套流程,来一边处理新的复制事件,一边完成一致性的 Topic 初始化过程。

在讨论具体的初始化过程之前,先看看数据复制拓朴架构。参考上图,有一个主库,它有一个叫做中间主库(Intermediate Master)的从库。中间主库下级还有许多的从库,叫本地主库(Local Master)。MySQLStreamer 连接的从库叫做刷新主库(Refresh Primary),它实际上是某一个本地主库的从库。刷新主库的数据复制连接是基于行的,而其它的复制全都是基于语句的。

初始化过程首先要在 MySQLStreamer 的刷新主库上创建一张与原始表一样的表,区别只是要用 MySQL 的 Blackhole 引擎。

blackhole_table = create_blackhole_table(original_table)
while remaining_data(original_table):
    lock_table(original_table)
    copy_batch(original_table, blackhole_table, batch_size)
    unlock_table(original_table)
    wait_for_replication()
drop_table(blackhole_table)

初始化 Topic 过程伪码

MySQL 的 Blackhole 引擎就象是 Linux 系统上的'/dev/null'一样。我们使用 Blackhole 引擎的主要原因是写入这种表的数据都不会被保存下来,但却可以生成二进制日志,可供复制使用。这样我们就可以重现出原始表中的二进制日志,但不必担心数据要多存一份。

创建完 Blackhole 引擎的表之后,需要把本地主库上的原始表锁住,以避免导数据的过程中会发生数据更新操作。然后我们就把数据一批批的从原始表中拷贝到 Blackhole 表中。如上图所示,MySQLStreamer 是连接到某个叶子节点上的,原因是我们不希望初始化过程中产生的日志会被复制到集群中的其它节点上。但我们还是希望在初始化的过程中原始表可以被更新,因此在批量拷数据的操作之间,我们会把原始表解锁一段时间,让本地主库跟上数据复制进度。锁住原始表会导致从本地主库到刷新主库之间的数据复制操作暂时失效,但这样可以保证我们拷到 Blackhole 表中的数据在那个时刻是与本地主库一致的。解锁可以让复制跟上进度,自然也会拖慢初始化过程的速度。但这个过程实际上是非常快的,因为数据没有离开过 MySQL 服务器。每一批导数据操作而导致的复制延迟大概都是微秒级的。

这个过程的所有复杂之处都在于数据库。在 MySQLStreamer 内部,我们的代码会把 Blackhole 表上的 insert 操作当成原始表上的刷新事件来处理。Topic 中的多个刷新事件之间会被 Insert、Update、Delete 等正常的复制事件间隔开,因为在初始化过程中我们会不断地解锁原始表,所以复制过程中的这些操作都会发布出来。从语义上来说,许多消费者程序都把刷新事件当成 Update 来处理。

关键点

技术团队的时间是非常宝贵的。技术人员应该遵守一个好原则:任何重复性工作都应该自动化掉。伴随着 Yelp 的技术团队扩张,我们需要实现一个单一、有弹性的基础架构来方便实现众多应用程序。有了数据管道之后,我们就可以为检索数据方便而创建索引、将数据保存到数据仓库中、与其它的内部服务共享转换后的数据……等等。事实证明数据管道价值巨大,让我们朝着目标中的自动化状态迈进了一大步。MySQLStreamer 则是数据管道非常重要的一个组成部分,它从 MySQL 二进制文件中提取所有的数据变更操作事件,再把这些事件发布到 Kafka 中。当改动信息都发布到 Kafka Topic 中之后,下游消费者程序就很容易根据自己的特定用途来使用数据了。

鸣谢

感谢整个业务分析与指标组(Business Analytics and Metrics,B.A.M),数据库项目组和所有为 MySQLStreamer 做出了贡献的同事们:Abrar Sheikh、Cheng Chen、Jenni Snyder 和 Justin Cunningham 等。

这是关于 Yelp 的实时流数据基础设施系列文章的第二篇。这个系列会深度讲解我们如何用“确保只有一次”的方式把 MySQL 数据库中的改动实时地以流的方式传输出去,我们如何自动跟踪表模式变化,如何处理和转换流,以及最终如何把这些数据存储到 Redshift 或 Salesforce 之类的数据仓库中去。这一篇中介绍的是 MySQLStreamer,它从 MySQL 二进制文件中提取所有的数据变更操作事件,再把这些事件发布到 Kafka 中。

查看英文原文 Streaming MySQL tables in real-time to Kafka

评论

发布