写点什么

干货 | 百万 QPS,秒级延迟,携程基于实时流的大数据基础层建设

2021 年 4 月 11 日

干货 | 百万QPS,秒级延迟,携程基于实时流的大数据基础层建设

一、背景

2017 年 9 月携程金融成立,本着践行金融助力旅行的使命,开始全面开展集团风控和金融业务,需要在携程 DC 构建统一的金融数据中心,实现多地多机房间的数据融合,满足离线和在线需求;涉及数千张 mysql 表到离线数仓 、实时数仓、在线缓存的同步工作。由于跨地域、实时性、准确性、完整性要求高,集团内二次开发的 DataX(业界常用的离线同步方案)无法支持。以 mysql-hive 同步为例,DataX 通过直连 MySQL 批量拉取数据,存在以下问题:


1)性能瓶颈:随着业务规模的增长,离线批量拉取的数据规模越来越大,影响 mysql-hive 镜像表的产出时间,进而影响数仓下游任务。对于一些需要 mysql-hive 小时级镜像的场景更加捉襟见肘。

2)影响线上业务:离线批量拉取数据,可能引起慢查询,影响业务库的线上服务。

3)无法保证幂等:由于线上库在实时更新,在批量拉取 SQL 不变的情况下,每次执行可能产生不一样的结果。比如指定了 create_time 范围,但一批记录的部分字段(比如支付状态)时刻在变化。也即无法产出一个明确的 mysql-hive 镜像 , 对于一些对时点要求非常高的场景(比如离线对账) 无法接受。

4)缺乏对 DELETE 的支持:业务库做了 DELETE 操作后,只有整表全量拉取,才能在 Hive 镜像里体现。

二、方案概述

基于上述背景,我们设计了一套基于 binlog 实时流的数据基础层构建方案,并取得了预期效果。架构如图,各模块简介:


1)webUI 做 binlog 采集的配置,以及 mysql->hive,mysql→实时数仓,mysql→在线缓存的镜像配置工作。

2)canal 负责 binlog 采集 ,写入 kafka ;其中 kafka 在多地部署,并通过专线实现 topic 的实时同步。

3)spark-streaming 负责将 binlog 写入 HDFS。

4)merge 离线调度的 ETL 作业,负责将 HDFS 增量和 snap 合并成新的 snap。

5)mirror 负责将 binlog 事件更新到实时数仓、在线缓存。

6)基础服务:包括历史数据的重放,数据校验,全链路监控,明文检测等功能。


三、详细介绍

本章将以 mysql-hive 镜像为例,对技术方案做详细介绍。

3.1.binlog 采集

canal 是阿里巴巴开源的 Mysql binlog 增量订阅和消费组件,在业界有非常广泛的应用,通过实时增量采集 binlog ,可以降低对 mysql 的压力,细粒度的还原数据的变更过程,我们选型 canal 作为 binlog 采集的基础组件,根据应用场景做了二次开发,其中 raw binlog → simple binlog 的消息格式转换是重点。

下面是 binlog 采集的架构图:



canal 在 1.1.4 版本引入了 canal-admin 工程,支持面向 WebUI 的管理能力;我们采用原生的 canal-admin 对 binlog 采集进行管理 ,采集粒度是 mysql instance 级别。


Canal Server 会向 canalAdmin 拉取所属集群下的所有 mysql instance 列表,针对每个 mysql instance 采集任务,canal server 通过在 zookeeper 创建临时节点的方式实现 HA,并通过 zookeeper 实现 binlog position 的共享。


canal 1.1.1 版本引入 MQProducer 原生支持 kafka 消息投递 , 图中 instance active 从 mysql 获取实时的增量 raw binlog 数据,在 MQProducer 环节进行 raw binlog → simple binlog 的消息转换,发送至 kafka。我们按照 instance 创建了对应的 kafka topic,而非每个 database 一个 topic , 主要考虑到同一个 mysql instance 下有多个 database,过多的 topic (partition) 导致 kafka 随机 IO 增加,影响吞吐。发送 Kafka 时以 schemaName+tableName 作为 partitionKey,结合 producer 的参数控制,保证同一个表的 binlog 消息按顺序写入 kafka。


参考 producer 参数控制:

max.in.flight.requests.per.connection=1retries=0acks=all
复制代码

topic level 的配置:

topic partition 3副本, 且min.insync.replicas=2
复制代码


从保证数据的顺序性、容灾等方面考虑,我们设计了一个轻量级的 SimpleBinlog 消息格式:


  • binlogOffset:全局序列 ID,由 ${timestamp}${seq} 组成,该字段用于全局排序,方便 Hive 做 row_number 取出最新镜像,其中 seq 是同一个时间戳下自增的数字,长度为 6。

  • executeTime:binlog 的执行时间。

  • eventType:事件类型:INSERT,UPDATE,DELETE。

  • schemaName:库名,在后续的 spark-streaming,mirror 处理时,可以根据分库的规则,只提取出前缀,比如(ordercenter_001 → ordercenter) 以屏蔽分库问题。

  • tableName:表名,在后续的 spark-streaming,mirror 处理时,可以根据分表规则,只提取出前缀,比如(orderinfo_001 → orderinfo ) 以屏蔽分表问题。

  • source:用于区分 simple binlog 的来源,实时采集的 binlog 为 BINLOG, 重放的历史数据为 MOCK 。

  • version:版本

  • content:本次变更的内容,INSERT,UPDATE 取 afterColumnList,DELETE 取 beforeColumnList。


金融当前部署了 4 组 canal 集群,每组 2 个物理机节点,跨机房 DR 部署,承担了数百个 mysql instance binlog 采集工作。Canal server 自带的性能监控基于 Prometheus 实现,我们通过实现 PrometheusScraper 主动拉取核心指标,推送到集团内部的 Watcher 监控系统上,配置相关报警,其中各 mysql instance 的 binlog 采集延迟是全链路监控的重要指标。


系统上线初期遇到过 canal-server instance 脑裂的问题,具体场景是 active instance 所在的 canal-server ,因网络问题与 zookeeper 的链接超时,这时候 standby instance 会抢占创建临时节点,成为新的 active;也就出现了 2 个 active 同时采集并推送 binlog 的情况。解决办法是 active instance 与 zookeeper 链接超时后,立即自 kill,再次发起下一轮抢占。

3.2 历史数据重放

有两个场景需要我们采集历史数据:

1)首次做 mysql-hive 镜像 ,需要从 mysql 加载历史数据;

2)系统故障(丢数等极端情况),需要从 mysql 恢复数据。


有两种方案:

1)从 mysql 批量拉取历史数据,上传到 HDFS 。需要考虑批量拉取的数据与 binlog 采集产出的 mysql-hive 镜像的格式差异,比如去重主键的选择,排序字段的选择等问题。

2)流式方式, 批量从 mysql 拉取历史数据,转换为 simple binlog 消息流写入 kafka,同实时采集的 simple binlog 流复用后续的处理流程。在合并产生 mysql-hive 镜像表时,需要确保这部分数据不会覆盖实时采集的 simple binlog 数据。


我们选用了更简单易维护的方案 2,并开发了一个 binlog-mock 服务,可以根据用户给出的库、表(前缀)以及条件,按批次(比如每次 select 10000 行)从 mysql 查询数据,组装成 simple_binlog 消息发送 kafka。


对于 mock 的历史数据,需要注意:

1)保证不覆盖后续实时采集的 binlog:simple binlog 消息里 binlogOffset 字段用于全局排序,它由 ${timestamp}+${seq}组成,mock 的这部分数据 timestamp 为发起 SQL 查询的时间戳向前移 5 分钟,seq 为 000000;  

2)落到哪个分区:我们根据 binlog 事件时间(executeTime) 判断数据所属哪个 dt 分区,mock 的这部分数据 executeTime 为用户指定的一个值,默认为 ${yesterday}。

3.3 Write2HDFS 

我们采用 spark-streaming 将 kafka 消息持久化到 HDFS,每 5 分钟一个批次,一个批次的数据处理完成(持久化到 HDFS)后再提交 consumer offset,保证消息被 at-least-once 处理;同时也考虑了分库分表问题、数据倾斜问题:


屏蔽分库分表:以订单表为例,mysql 数据存储在 ordercenter_00 ... ordercenter_99 100 个库,每个库下面又有 orderinfo_00...orderinfo_99 100 张表,库前缀 schemaNamePrefix=ordercenter,表前缀 tableNamePrefix=orderinfo,统一映射到 tableName=${schemaNamePrefix}_${tableNamePrefix}里; 根据 binlog executeTime 字段生成对应的分区 dt,确保同一个库表同一天的数据落到同一个分区目录里:  base_path/ods_binlog_source.db/${database_prefix}_${table_prefix}/dt={binlogDt}/binlog-{timestamp}-{rdd.id}


防止数据倾斜:  系统上线初期经常出现数据倾斜问题,排查发现某些时间段个别表由于业务跑批等产生的 binlog 量特别大,一张表一个批次的数据需要写入同一个 HDFS 文件,单个 HDFS 文件的写入速度成为瓶颈。因此增加了一个环节(Step2),过滤出当前批次里的“大表",将这些大表的数据分散写入多个 HDFS 文件里。 


base_path/ods_binlog_source.db/${database_prefix}_${table_prefix}/dt={binlogDt}/binlog-{timestamp}-{rdd.id}-[${randomInt}]


3.4 生成镜像

3.4.1  数据就绪检查     

spark-streaming 作业每 5 分钟一个批次将 kafka simple_binlog 消息持久化到 HDFS,merge 任务是每天执行一次。每天 0 点 15 分,开始进行数据就绪检查。我们对消息的全链路进行了监控,包括 binlog 采集延迟 t1 、kafka 同步延迟 t2 、spark-streaming consumer 延迟 t3。假设当前时间为凌晨 0 点 30 分,设为 t4,若 t4>(t1+t2+t3) 说明 T-1 日数据已全部落入 HDFS,即可执行下游的 ETL 作业(merge)。



3.4.2  Merge

HDFS 上的 simple binlog 数据就绪后,下一步就是对相应 MySQL 业务表数据进行还原。以下是 Merge 的执行流程,步骤如下:


1)加载 T-1 分区的 simple binlog 数据

数据就绪检查通过后,通过 MSCK REPAIR PARTITION 加载 T-1 分区的 simple_binlog 数据,注意:这个表是原始的 simple binlog 数据,并未平铺具体 mysql 表的字段。如果是首次做 mysql-hive 镜像,历史数据重放的 simple binlog 也会落入 T-1 分区。


2)检查 Schema ,并抽取 T-1 增量

请求 mirror 后台,获取最新的 mysql schema,如果发生了变更则更新 mysql-hive 镜像表(snap),让下游无感知;同时根据 mysql schema 的 field 列表 、以及"hive 主键" 等配置信息,从上述 simple_binlog 分区抽取出 mysql 表的 T-1 日明细数据 (delta)。


3)判断业务库是否发生了归档操作,以决定后续合并时是否忽略 DELETE 事件。

业务 DELETE 数据有 2 种情况:业务修单等引起的正常 DELETE,需要同步变更到 Hive;业务库归档历史数据产生的 DELETE,这类 DELETE 操作需要忽略掉。


系统上线初期,我们等待业务或 DBA 通知,然后手工处理,比较繁琐,很多时候会有通知不到位的情况,导致 Hive 数据缺失历史数据。为了解决这个问题,在 Merge 之前进行程自动判断,参考规则如下:


a)业务归档通常是大批量的 DELETE(百万+),因此可以设置一个阈值,比如 500W 或日增量的 7 倍。 


b)业务归档的时间段通常比较久,比如设置阈值为 30 天。如果满足了条件 1,且删除的这些数据在 30 天以前,则属于归档产生的 DELETE。


4)对增量数据(delta)和当前快照(snap T-2)进行合并去重,得到最新 snap T-1。



下面通过一个例子说明 merge 的过程,假设订单 order 表共有 id,order_no,amount 三个字段,id 是全局唯一建;  snap 表 t3 是 mysql-hive 镜像,merge 过程如图展示。


1)加载目标(dt=T-1)分区里的 simple binlog 数据,表格式如 t1;

2)请求 mirror 后台获取 mysql 的最新 schema,从 t1 抽取数据到临时表 t2; 

3)snap 表 t3 与 mysql schema 进行适配(本例无变更);   

4)对增量表 t2、存量 snap t3 进行 union(对 t3 自动增加 type 列,值为 INSERT),得到临时表 t4;

5)对 t4 表按唯一键 id 进行 row_number,分组按 binlogOffset 降序排序,序号为 1 的即为最新数据。


3.4.3  check

在数据 merge 完成后,为了保证 mysql-hive 镜像表中数据准确性,会对 hive 表和 mysql 表进行字段和数据量对比,做好最后一道防线。我们在配置 mysql-hive 镜像时,会指定一个检查条件,通常是按 createTime 字段对比 7 天的数据;mirror 后台每天凌晨会预先从 mysql 统计出过去 7 日增量,离线任务通过脚本(http)获取上述数据,和 snap 表进行校验。实践中遇到一些问题:


1)T-1 的 binlog 落在 T 分区的情况

check 服务根据 createTime 生成查询条件去 check mysql 和 Hive 数据,由于业务 sql 里的 createTime 和 binlog executeTime 不一致,分别为凌晨时刻的前后 1 秒,会导致 Hive 里漏掉这条数据,这种情况可以通过一起加载 T 日分区的 binlog 数据,重新 merge。


2)业务表迁移,原表停止更新,虽然 mysql 和 hive 数据量一致,但已经不符合要求了,这种情况可以通过波动率发现。

3.5 其他    

在实践中,可根据需要在 binlog 采集以及后续的消息流里引入一些数据治理工作。比如:

1)明文检测:binlog 采集环节对核心库表数据做实时明文检测,可以避免敏感数据流入数仓;

2)标准化:一些字段的标准化操作,比如 id 映射、不同密文的映射;

3)元数据:mysql→hive 镜像是数仓 ODS 的核心,可以根据采集配置信息,实现二者映射关系的双向检索,便于数仓溯源。这块是金融元数据管理的重要组成部分。

通过消费 binlog 实现 mysql 到实时数仓(kudu、es)、在线缓存(redis)的镜像逻辑相对简单,限于篇幅,本文不再赘述。

四、总结与展望

金融基于 binlog 的数据基础层构建方案,顺利完成了预期目标:


1)金融数据中心建设(ODS 层):数千张 mysql 表到携程 DC 的镜像, 全部 T+1 1:30 产出;

2)金融实时数仓建设:金融核心 mysql 表到 kudu 的镜像,支持实时分析、分表合并查询等偏实时的运营场景;

3)金融在线缓存服务:异地多活,缓存近 1000G 业务数据;支撑整个消金入口、风控业务近 100W/min 的请求。


该方案已经成为金融在线和离线服务的基石,并在持续扩充使用场景。未来会在自动化配置(整合 mirror-admin 和 canal-admin,实现一键构造)、智能运维(数据 check 异常的识别与恢复)、元数据管理方面做更多的投入。


本文介绍了携程金融构建大数据基础层的技术方案,着重介绍了 binlog 采集和 mysql-hive 镜像的设计,以及实践中遇到的一些问题及解决办法。希望能给大家带来一些参考价值,也欢迎大家一起来交流。


本文转载自:携程技术(ID:ctriptech)

原文链接:干货 | 百万QPS,秒级延迟,携程基于实时流的大数据基础层建设

2021 年 4 月 11 日 07:002562

评论

发布
暂无评论
发现更多内容

Flink 支持的重启策略有哪些

古月木易

flink

django-admin和manage.py用法

BigYoung

Python django django-admin manage.py

什么是死锁?如何解决死锁?

古月木易

死锁

一文了解greenplum

数据社

数据库 greenplum MPP

关于微服务架构的思考和认知

任小龙

微服务和DDD总结

周冬辉

微服务 DDD

第十周学习总结

赵龙

Kubernetes 网络通讯模型解析

ninetyhe

Django如何编写自定义manage.py 命令

BigYoung

Python django manage.py

解决 WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED

邵俊达

Linux SSH

anyRTC 4.0 以心铸造,以梦相承

anyRTC开发者

anyRTC 4.0 官网升级

报警不响,黄金万两的“稳定性成熟度”干货

滴滴普惠出行

Jira 和 Confluence 企业最佳部署方式

Atlassian

项目管理 敏捷开发 Atlassian Jira

数据库的那些事

数据社

数据库 大数据

微服务架构的思考

Season

幂律分布 - 世界是不公平的

石云升

幂律分布 正态分布 二八法则

什么是死锁?如何解决死锁?

奈学教育

聊聊数据库

数据社

数据库 大数据

CDH部署指南

数据社

大数据 CDH

架构师训练营第十章作业

叮叮董董

芯片破壁者(十一):回看日本半导体的倾塌

脑极体

Dubbo微服务框架请求流程

GalaxyCreater

架构

Flink 支持的重启策略有哪些

奈学教育

flink

凉了!张三同学没答好「进程间通信」,被面试官挂了....

小林coding

操作系统 计算机基础 进程

一周信创舆情观察(8.3~8.9)

统小信uos

数据中台建设方法论

数据社

大数据 数据中台

架构师训练营 week10 summary

Nick

架构师训练营 week10 homework

Nick

第十周作业

方堃

架构师训练营第十章总结

叮叮董董

第十周命题作业

赵龙

干货 | 百万QPS,秒级延迟,携程基于实时流的大数据基础层建设-InfoQ