写点什么

美团 DB 数据同步到数据仓库的架构与实践

  • 2020-02-25
  • 本文字数:4588 字

    阅读完需:约 15 分钟

美团DB数据同步到数据仓库的架构与实践

背景

在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为 ODS(Operational Data Store)数据。在互联网企业中,常见的 ODS 数据有业务日志数据(Log)和业务 DB 数据(DB)两类。对于业务 DB 数据来说,从 MySQL 等关系型数据库的业务数据进行采集,然后导入到 Hive 中,是进行数据仓库生产的重要环节。


如何准确、高效地把 MySQL 数据同步到 Hive 中?一般常用的解决方案是批量取数并 Load:直连 MySQL 去 Select 表中的数据,然后存到本地文件作为中间存储,最后把文件 Load 到 Hive 表中。这种方案的优点是实现简单,但是随着业务的发展,缺点也逐渐暴露出来:


  • 性能瓶颈:随着业务规模的增长,Select From MySQL -> Save to Localfile -> Load to Hive 这种数据流花费的时间越来越长,无法满足下游数仓生产的时间要求。

  • 直接从 MySQL 中 Select 大量数据,对 MySQL 的影响非常大,容易造成慢查询,影响业务线上的正常服务。

  • 由于 Hive 本身的语法不支持更新、删除等 SQL 原语,对于 MySQL 中发生 Update/Delete 的数据无法很好地进行支持。


为了彻底解决这些问题,我们逐步转向 CDC (Change Data Capture) + Merge 的技术方案,即实时 Binlog 采集 + 离线处理 Binlog 还原业务数据这样一套解决方案。Binlog 是 MySQL 的二进制日志,记录了 MySQL 中发生的所有数据变更,MySQL 集群自身的主从同步就是基于 Binlog 做的。


本文主要从 Binlog 实时采集和离线处理 Binlog 还原业务数据两个方面,来介绍如何实现 DB 数据准确、高效地进入数仓。

整体架构


整体的架构如上图所示。在 Binlog 实时采集方面,我们采用了阿里巴巴的开源项目 Canal,负责从 MySQL 实时拉取 Binlog 并完成适当解析。Binlog 采集后会暂存到 Kafka 上供下游消费。整体实时采集部分如图中红色箭头所示。


离线处理 Binlog 的部分,如图中黑色箭头所示,通过下面的步骤在 Hive 上还原一张 MySQL 表:


  1. 采用 Linkedin 的开源项目 Camus,负责每小时把 Kafka 上的 Binlog 数据拉取到 Hive 上。

  2. 对每张 ODS 表,首先需要一次性制作快照(Snapshot),把 MySQL 里的存量数据读取到 Hive 上,这一过程底层采用直连 MySQL 去 Select 数据的方式。

  3. 对每张 ODS 表,每天基于存量数据和当天增量产生的 Binlog 做 Merge,从而还原出业务数据。


我们回过头来看看,背景中介绍的批量取数并 Load 方案遇到的各种问题,为什么用这种方案能解决上面的问题呢?


  • 首先,Binlog 是流式产生的,通过对 Binlog 的实时采集,把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对 MySQL 的访问压力上,都会有明显地改善。

  • 第二,Binlog 本身记录了数据变更的类型(Insert/Update/Delete),通过一些语义方面的处理,完全能够做到精准的数据还原。

Binlog 实时采集

对 Binlog 的实时采集包含两个主要模块:一是 CanalManager,主要负责采集任务的分配、监控报警、元数据管理以及和外部依赖系统的对接;二是真正执行采集任务的 Canal 和 CanalClient。



当用户提交某个 DB 的 Binlog 采集请求时,CanalManager 首先会调用 DBA 平台的相关接口,获取这一 DB 所在 MySQL 实例的相关信息,目的是从中选出最适合 Binlog 采集的机器。然后把采集实例(Canal Instance)分发到合适的 Canal 服务器上,即 CanalServer 上。在选择具体的 CanalServer 时,CanalManager 会考虑负载均衡、跨机房传输等因素,优先选择负载较低且同地域传输的机器。


CanalServer 收到采集请求后,会在 ZooKeeper 上对收集信息进行注册。注册的内容包括:


  • 以 Instance 名称命名的永久节点。

  • 在该永久节点下注册以自身 ip:port 命名的临时节点。


这样做的目的有两个:


  • 高可用:CanalManager 对 Instance 进行分发时,会选择两台 CanalServer,一台是 Running 节点,另一台作为 Standby 节点。Standby 节点会对该 Instance 进行监听,当 Running 节点出现故障后,临时节点消失,然后 Standby 节点进行抢占。这样就达到了容灾的目的。

  • 与 CanalClient 交互:CanalClient 检测到自己负责的 Instance 所在的 Running CanalServer 后,便会进行连接,从而接收到 CanalServer 发来的 Binlog 数据。


对 Binlog 的订阅以 MySQL 的 DB 为粒度,一个 DB 的 Binlog 对应了一个 Kafka Topic。底层实现时,一个 MySQL 实例下所有订阅的 DB,都由同一个 Canal Instance 进行处理。这是因为 Binlog 的产生是以 MySQL 实例为粒度的。CanalServer 会抛弃掉未订阅的 Binlog 数据,然后 CanalClient 将接收到的 Binlog 按 DB 粒度分发到 Kafka 上。

离线还原 MySQL 数据

完成 Binlog 采集后,下一步就是利用 Binlog 来还原业务数据。首先要解决的第一个问题是把 Binlog 从 Kafka 同步到 Hive 上。


Kafka2Hive

整个 Kafka2Hive 任务的管理,在美团数据平台的 ETL 框架下进行,包括任务原语的表达和调度机制等,都同其他 ETL 类似。而底层采用 LinkedIn 的开源项目 Camus,并进行了有针对性的二次开发,来完成真正的 Kafka2Hive 数据传输工作。

对 Camus 的二次开发

Kafka 上存储的 Binlog 未带 Schema,而 Hive 表必须有 Schema,并且其分区、字段等的设计,都要便于下游的高效消费。对 Camus 做的第一个改造,便是将 Kafka 上的 Binlog 解析成符合目标 Schema 的格式。


对 Camus 做的第二个改造,由美团的 ETL 框架所决定。在我们的任务调度系统中,目前只对同调度队列的任务做上下游依赖关系的解析,跨调度队列是不能建立依赖关系的。而在 MySQL2Hive 的整个流程中,Kafka2Hive 的任务需要每小时执行一次(小时队列),Merge 任务每天执行一次(天队列)。而 Merge 任务的启动必须要严格依赖小时 Kafka2Hive 任务的完成。


为了解决这一问题,我们引入了 Checkdone 任务。Checkdone 任务是天任务,主要负责检测前一天的 Kafka2Hive 是否成功完成。如果成功完成了,则 Checkdone 任务执行成功,这样下游的 Merge 任务就可以正确启动了。

Checkdone 的检测逻辑

Checkdone 是怎样检测的呢?每个 Kafka2Hive 任务成功完成数据传输后,由 Camus 负责在相应的 HDFS 目录下记录该任务的启动时间。Checkdone 会扫描前一天的所有时间戳,如果最大的时间戳已经超过了 0 点,就说明前一天的 Kafka2Hive 任务都成功完成了,这样 Checkdone 就完成了检测。


此外,由于 Camus 本身只是完成了读 Kafka 然后写 HDFS 文件的过程,还必须完成对 Hive 分区的加载才能使下游查询到。因此,整个 Kafka2Hive 任务的最后一步是加载 Hive 分区。这样,整个任务才算成功执行。


每个 Kafka2Hive 任务负责读取一个特定的 Topic,把 Binlog 数据写入 original_binlog 库下的一张表中,即前面图中的 original_binlog.*db*,其中存储的是对应到一个 MySQL DB 的全部 Binlog。



上图说明了一个 Kafka2Hive 完成后,文件在 HDFS 上的目录结构。假如一个 MySQL DB 叫做 user,对应的 Binlog 存储在 original_binlog.user 表中。ready 目录中,按天存储了当天所有成功执行的 Kafka2Hive 任务的启动时间,供 Checkdone 使用。每张表的 Binlog,被组织到一个分区中,例如 userinfo 表的 Binlog,存储在 table_name=userinfo 这一分区中。每个 table_name 一级分区下,按 dt 组织二级分区。图中的 xxx.lzo 和 xxx.lzo.index 文件,存储的是经过 lzo 压缩的 Binlog 数据。

Merge

Binlog 成功入仓后,下一步要做的就是基于 Binlog 对 MySQL 数据进行还原。Merge 流程做了两件事,首先把当天生成的 Binlog 数据存放到 Delta 表中,然后和已有的存量数据做一个基于主键的 Merge。Delta 表中的数据是当天的最新数据,当一条数据在一天内发生多次变更时,Delta 表中只存储最后一次变更后的数据。


把 Delta 数据和存量数据进行 Merge 的过程中,需要有唯一键来判定是否是同一条数据。如果同一条数据既出现在存量表中,又出现在 Delta 表中,说明这一条数据发生了更新,则选取 Delta 表的数据作为最终结果;否则说明没有发生任何变动,保留原来存量表中的数据作为最终结果。Merge 的结果数据会 Insert Overwrite 到原表中,即图中的 origindb.*table*。

Merge 流程举例

下面用一个例子来具体说明 Merge 的流程。



数据表共 id、value 两列,其中 id 是主键。在提取 Delta 数据时,对同一条数据的多次更新,只选择最后更新的一条。所以对 id=1 的数据,Delta 表中记录最后一条更新后的值 value=120。Delta 数据和存量数据做 Merge 后,最终结果中,新插入一条数据(id=4),两条数据发生了更新(id=1 和 id=2),一条数据未变(id=3)。


默认情况下,我们采用 MySQL 表的主键作为这一判重的唯一键,业务也可以根据实际情况配置不同于 MySQL 的唯一键。


上面介绍了基于 Binlog 的数据采集和 ODS 数据还原的整体架构。下面主要从两个方面介绍我们解决的实际业务问题。

实践一:分库分表的支持

随着业务规模的扩大,MySQL 的分库分表情况越来越多,很多业务的分表数目都在几千个这样的量级。而一般数据开发同学需要把这些数据聚合到一起进行分析。如果对每个分表都进行手动同步,再在 Hive 上进行聚合,这个成本很难被我们接受。因此,我们需要在 ODS 层就完成分表的聚合。



首先,在 Binlog 实时采集时,我们支持把不同 DB 的 Binlog 写入到同一个 Kafka Topic。用户可以在申请 Binlog 采集时,同时勾选同一个业务逻辑下的多个物理 DB。通过在 Binlog 采集层的汇集,所有分库的 Binlog 会写入到同一张 Hive 表中,这样下游在进行 Merge 时,依然只需要读取一张 Hive 表。


第二,Merge 任务的配置支持正则匹配。通过配置符合业务分表命名规则的正则表达式,Merge 任务就能了解自己需要聚合哪些 MySQL 表的 Binlog,从而选取相应分区的数据来执行。


这样通过两个层面的工作,就完成了分库分表在 ODS 层的合并。


这里面有一个技术上的优化,在进行 Kafka2Hive 时,我们按业务分表规则对表名进行了处理,把物理表名转换成了逻辑表名。例如 userinfo123 这张表名会被转换为 userinfo,其 Binlog 数据存储在 original_binlog.user 表的 table_name=userinfo 分区中。这样做的目的是防止过多的 HDFS 小文件和 Hive 分区造成的底层压力。

实践二:删除事件的支持

Delete 操作在 MySQL 中非常常见,由于 Hive 不支持 Delete,如果想把 MySQL 中删除的数据在 Hive 中删掉,需要采用“迂回”的方式进行。


对需要处理 Delete 事件的 Merge 流程,采用如下两个步骤:


  • 首先,提取出发生了 Delete 事件的数据,由于 Binlog 本身记录了事件类型,这一步很容易做到。将存量数据(表 A)与被删掉的数据(表 B)在主键上做左外连接(Left outer join),如果能够全部 join 到双方的数据,说明该条数据被删掉了。因此,选择结果中表 B 对应的记录为 NULL 的数据,即是应当被保留的数据。

  • 然后,对上面得到的被保留下来的数据,按照前面描述的流程做常规的 Merge。


总结与展望

作为数据仓库生产的基础,美团数据平台提供的基于 Binlog 的 MySQL2Hive 服务,基本覆盖了美团内部的各个业务线,目前已经能够满足绝大部分业务的数据同步需求,实现 DB 数据准确、高效地入仓。在后面的发展中,我们会集中解决 CanalManager 的单点问题,并构建跨机房容灾的架构,从而更加稳定地支撑业务的发展。


本文主要从 Binlog 流式采集和基于 Binlog 的 ODS 数据还原两方面,介绍了这一服务的架构,并介绍了我们在实践中遇到的一些典型问题和解决方案。希望能够给其他开发者一些参考价值,同时也欢迎大家和


2020-02-25 20:331664

评论

发布
暂无评论
发现更多内容

IMC总决赛精彩对战应接不暇,英特尔酷睿极致性能燃爆比赛现场!

E科讯

阿里大牛说:你凭什么搞不懂SpringBoot,Cloud,Nginx与Docker

小Q

Java 学习 编程 架构 面试

Teambition 网盘 VS 阿里云盘:阿里这个浓眉大眼的也开始玩赛马了?

郭旭东

阿里云 阿里云网盘

数字投票时代即将到来

CECBC

数字投票

Flutter Bloc模式

码爷

flutter ios 程序员

云图说|多模态AI开发套件HiLens Kit:超强算力彰显云上实力

华为云开发者联盟

人工智能 开发者 物联网 机器人 华为云

手把手教你本地 k8s 集群搭建云原生 Tekton CICD 流水线

比伯

Java 大数据 编程 架构 计算机

高交会科技盛宴:“科技改变生活,创新驱动发展”

13530558032

美国区块链政策大盘点

CECBC

区块链 政策 货币

LeetCode题解:剑指 Offer 22. 链表中倒数第k个节点,双指针,JavaScript,详细注释

Lee Chen

算法 大前端 LeetCode

轻松云上揽胜中华,靠的就是这份聪明的“地图”!

华为云开发者联盟

MySQL 数据库 postgresql AI 地图

握草!美团P8整理的280页超详细Docker实战文档简直太香了,让你对如日中天的Docker有更深入的了解。

Java架构之路

Java 程序员 架构 面试 编程语言

亲测三遍!8步搭建一个属于自己的网站

华为云开发者联盟

MySQL Linux 开发者 网站 华为云

【涂鸦物联网足迹】涂鸦云平台接口列表

IoT云工坊

人工智能 接口 物联网 API 智能家居

“双11”购物狂欢节,所有女生走进了谁的直播间?

博睿数据

APM AIOPS 拨测 直播 用户体验

双11购物节国外剁手党同狂欢 阿里云视频云电商直播实时字幕

阿里云CloudImagine

云直播 直播 直播带货 语音识别

加快脑动脉瘤检测,AI来了

华为云开发者联盟

人工智能 学习 算法 华为云 医疗AI

《迅雷链精品课》第三课:区块链主流框架分析

迅雷链

区块链 区块链方案 区块链+ 区块链应用

微众银行大数据平台建设方案

康月牙

大数据 开源 金融 平台 微众银行

HTTPDNS开源 Android SDK,赋能更多开发者参与共建

移动研发平台EMAS

android 阿里云 开源 httpdns 移动研发平台

JVM真香系列:方法区、堆、栈之间到底有什么关系

田维常

Java JVM 堆栈 方法区 Java虚拟机

java-File对象

Isuodut

Github标星67.9k的微服务架构以及架构设计模式笔记我真的爱了

Java架构之路

Java 程序员 架构 面试 编程语言

数据结构与算法系列之递归(GO)

书旅

数据结构与算法 Go 语言

【Swift实现代码】iOS架构模式之MVP

码爷

ios swift 架构

企业级软件的核心价值

Philips

敏捷开发 企业应用

区块链有了几个新“标准”!

CECBC

区块链 版权保护

DeFi质押挖矿系统开发技术

薇電13242772558

区块链 defi

为什么我就面试阿里P6,好不容易过2面,3面来个架构师来吊打我?

小Q

Java 学习 程序员 架构 面试

《精通Tomcat:Java Web应用开发、框架分析与案例实战》.pdf

田维常

tomcat

多线程并发主题-ThreadLocalRandom类

公众号:程序猿成神之路

Java 并发编程 线程

美团DB数据同步到数据仓库的架构与实践_文化 & 方法_美团技术团队_InfoQ精选文章