Uber 推出数据湖集成神器 DBEvents,支持 MySQL、Cassandra 等

阅读数:2306 2019 年 3 月 26 日

在全球市场保持 Uber 平台的可靠性和实时性是一项 7*24 小时不能间断的任务。当旧金山的人们进入梦乡时,巴黎的上班族们正发送着 Uber 车辆订单准备出门工作。而同一时刻在地球的另一端,孟买的居民可能正在用 Uber Eats 订购晚餐。

我们在 Uber 的大数据平台上促成各种互动,使用我们的Marketplace来匹配乘客和司机;食客、餐馆和配送伙伴;货车司机和运货人。从数据的角度来洞察这些交互有助于我们为全球用户提供优质且有意义的产品体验。

食客们希望食物能及时送达,乘客也希望在最短的时间内被接到,我们的数据必须尽可能快的反映出现场发生的事件。但随着四面八方的数据汇入我们的数据湖,在这种规模下保持数据的新鲜度成为了一项重大的挑战。

虽然现在已经有一些为公司提供 24 小时数据新鲜度的方案,但对于 Uber 的实时性需求来说还是过时了。此外,对于 Uber 的数据规模和运营规模,这种方案无法保证可靠运行。

为了满足我们的特殊需求,我们开发了 DBEvents——一种专为高数据质量和新鲜度而设计的变更数据获取系统。变更数据获取系统(CDC, Change Data Capture System)可以用来确定哪些数据发生了变更,以便采取一些操作,比如获取或是复制。DBEvents 有助于引导、获取已有表格的快照,以及增量的流式更新。

作为 Uber 其他软件(例如MarmarayHudi)的补充,DBEvents 从 MySQL、Apache Cassandra 和 Schemaless 中获取数据,以更新我们的 Hadoop 数据湖。这个解决方案可管理 PB 级的数据并可在全球范围内运营,帮助我们为内部数据客户提供最好的服务。

快照数据获取

从历史上看,Uber 的数据获取一般会先确认要获取的数据集,然后使用 MapReduce 或是 Apache Spark 运行一个大型处理作业,从源数据库或表中高并发地读取数据。接下来,我们会将这个作业的输出发送到离线的数据湖,如 HDFS 或 Apache Hive。我们把这个过程称为快照,根据数据集大小的不同,一般会花费几分钟到几小时的时间,这对于我们内部客户的需求来说还不够快。

当一个作业开始获取数据时,它会分散成多个并行任务,与上游的表格(如 MySQL)建立并行的连接并拉取数据。从 MySQL 读取大量数据会对其实时应用流量施加很大的压力。我们可以使用专用的服务器执行 ETL 操作来减轻压力,但这会影响到数据的完整性,也会因为这个备份的数据库服务器增加额外的硬件成本。

获取数据库或表的时间会随着数据量的增加而延长,并在某些时刻无法再满足业务的需求。由于大多数的数据库每天仅更新部分数据,只有极少量的新纪录会被添加,而整个快照过程会一遍又一遍地读取和写入整张表的数据,包括未修改的行,这会导致计算和存储资源无法被有效利用。

DBEvents 的要求

为了 Uber 对更新鲜更快速的数据洞察需求,我们需要设计一种更好的方式来将数据提取到数据湖中。当我们开始设计 DBEvents 时,为最终的解决方案定义了三个业务要求:新鲜度、质量和效率。

新鲜度

数据的新鲜度指它更新得有多频繁。假设在时间 t1 更新 MySQL 表中的一行。数据提取作业在时间 t1+1 时刻开始运行,并消耗 N 个单位时间完成作业。则用户可以在 t1+1+N 时刻获得数据。这里,数据的新鲜度延迟是 N+1,即数据实际更新到数据湖中并可被获取的时间延迟。

Uber 有很多用例都要求 N+1 尽可能的小,最好是几分钟。这些用例包括欺诈检测,因为即使是最轻微的延迟也会影响到客户的体验。出于这些原因,我们在 DBEvents 中将数据新鲜度的要求排在了最高优先级。

质量

如果我们无法描述或理解数据湖中的数据,它们是无法发挥出价值的。设想不同的上游服务对不同的表有不同的 schema。尽管这些表在创建时都有一个 schema,但这些 schema 会随着用例的变化而变化。如果对于摄入的数据没有一个一致的方法来定义和更新 schema,数据湖很快就会变成一个数据沼泽——一大堆难以被理解使用的数据。

另外,随着表 schema 的演进,搞明白为什么要增加字段或是弃用原有字段很重要。如果不了解每列数据代表的含义,就很难理解数据的意义。因此,确保数据的高质量是 DBEvents 另一个优先考虑的要求。

效率

在 Uber,我们有数以千计的微服务来负责业务逻辑的不同部分以及不同的业务线。在大多数情况下,每个微服务都有一个或多个备用数据库来存储长期的数据。可以想象,这会导致成百上千的表格都需要被读取,这需要大量的计算和存储资源。

因此,DBEvents 的第三个设计目标是使系统更高效。通过对存储和计算资源的使用优化,我们最终降低了数据中心使用和工程时间的成本,并能在未来加入更多的数据源。

设计 DBEvents

结合这三个需求,我们构建了 Uber 的变更数据获取系统 DBEvents,用于增量地感知并获取数据的变更,从而改善我们平台的使用体验。

数据集的获取可以分为两个步骤:

  1. 引导:一个表格在某个时间点快照的表现

  2. 增量获取:对表格进行增量的获取并实施变更(上游发生的)

引导(Bootstrap)

我们开发了一个可插拔数据源的库,来引导例如 Cassandra,Schemaless 和 MySQL 等外部数据源通过我们的摄取平台 Marmaray 将数据导入数据湖。这个库提供了有效引导数据集所需的语意,同时提供了能够添加任何数据源的可插拔架构。每个外部数据源都会将其原始数据的快照备份至 HDFS。

快照备份完成后,Marmaray 会调用库,依次读取备份数据,并将其作为 Marmaray 可用的 Spark RDD 来提供。在执行可选的去重、部分行合并及其他操作后,Marmaray 会将 RDD 持久化到 Apache Hive。

为了提高获取超大表格时的效率和可靠性,引导的过程是增量的。你可以为数据集设计批次的大小,也可以增量式(可能是并行的)地进行引导,从而避免过大的作业。

image

图 1:我们的可插拔源引导库从 HDFS 备份中读取,为取数平台 Marmaray 准备数据集

MySQL 引导案例

为 MySQL 数据库创建备份一般会先在文件系统中为数据创建一个副本,然后使用本地文件格式将其存储在另一个存储引擎中。这种逐位复制文件的方式被称为物理备份。由于存在索引,被复制的物理文件通常包含重复数据,这会使磁盘上数据集的大小明显增加。

作为 DBEvents 体系的一部分,我们开发并开源了一个名为StorageTapper的服务,它从 MySQL 数据库读取数据,将其转换为模式化的版本,并将事件发布到不同的目的地,例如 HDFS 或者 Apache Kafka。这种在目标存储系统上生成事件的方法使我们能够创建逻辑备份。逻辑备份依赖 StorageTapper 基于原始数据库创建的事件从而在目标系统上重新创建数据集,而不是使用数据集的直接备份。

除了比物理备份具有更高的效率外,逻辑备份还有以下优点:

  • 它们很容易被原始存储服务以外的系统处理,因为数据格式是标准、可用的。
  • 它们不依赖特定版本的 MySQL,从而能提供更好的数据完整性。
  • 它们非常紧凑,不会复制重复的数据。

image

图 2:StorageTapper 从 MySQL 读取二进制变更日志,对 Apache Avro 中的时间进行编码,并将它们发送至 Apache Kafka 或是在 HDFS 中备份。可以用这些事件在其他系统(例如 Apache Hive)中重构数据集。

实现新鲜度

为了使我们的数据足够新鲜,我们需要以小批量的方式来增量地消费和修改数据集。我们的数据湖使用的是 HDFS(一种 append-only 系统)来存储 PB 级的数据。而大部分的分析数据都是用 Apache Parquet 文件格式编写的,这种方式适用于大规模的列扫描,但是无法更新。遗憾的是,由于 HDFS 是 append-only 模式而 Apache Parquet 是不可修改的,用户如果想更新数据集就必须要批量重写整个数据集(在使用 Hive 时,是重写数据集的大量分区)。

为了快速获取数据,我们使用了 Apache Hudi——一个由 Uber 设计的用于管理 HDFS 中所有原始数据集的开源库,它能减少对不可变的数据湖执行 upsert 操作所花费的时间。Apache Hudi 能在数据集上提供原子的 upsert 操作和增量数据流。

MySQL 增量提取案例

除了引导,我们还能使用 StorageTapper 从 MySQL 源执行增量提取。在我们的用例中,StorageTapper 从 MySQL 的二进制日志中读取事件,其中记录了数据库所做的变更。二进制日志中包含了所有 INSERT、UPDATE、DELETE 和 DDL 这些我们称之为二进制日志事件的操作。这些事件会按照数据库变更发生的顺序依次写入日志中。

StorageTapper 读取这些事件,用Apache Avro 的格式对其进行编码,并将它们发送至 Apache Kafka。每条二进制日志事件都是 Kafka 中的一条消息,每条消息对应一整行表格的数据。由于发送到 Apache Kafka 的事件能反映出对原始数据库做变更的顺序,当我们将 Kafka 中的消息应用到另一个数据库时,就会获得与原始数据完全一致的副本。这个方法会比直接从 MySQL 转发数据到另一个数据库使用更少的计算资源。

保证质量

为了确保数据高质量,我们需要先使用 schema 来为数据湖中的数据集定义结构。Uber 使用了一种内部的 schema 管理服务 Schema-Service,它能保证数据湖中的每个数据集都有关联的 schema,并且使 schema 的任何变更都遵从变更规则。这些变更规则保证了 schema 后向的兼容性,以避免影响到这类数据集的消费者。

Schema-Service 使用 Apache Avro 的格式来存储 schema 并执行 schema 的变更。此 schema 通常是上游表 schema 的 1:1 展现。只要变更被接受为向后兼容,自助服务工具就能允许内部用户修改 schema。一旦 schema 以 Apache Avro 的格式变更,一个 DDL 语句就会作用到表来改变实际的 schema。

我们通过 Schema 编码过程将数据模式化(schematized)。schema 执行库(heatpipe)会将数据模式化或编码,就像能对数据进行 schema 检查的瘦客户端。schema 执行库还会向每个变更日志中添加元数据,使其全局标准化,不用考虑数据从哪儿来或是要写到哪里去。确保数据都有 schema 且 schema 都是最新的,意味着我们可以找到并使用数据湖中所有的数据。

image

图 3:DBEvents 的 heatpipe 库对数据进行编码,Schema-Service 是所有 schema 的网关。这是实现将所有数据模式化的方法。

MySQL schema 执行案例

如上所述,用户可以通过 Schema-Service 请求变更 MySQL 的 schema,这能使变更生效并保证它们是向后兼容的。如果请求成功,就可以使用新版本的 schema。每当 StorageTapper 在 MySQL 二进制日志中读取 ALERT TABLE 语句时,它都会检测到这些 schema 的变更。这会触发 StorageTapper 开始用新的 schema 去处理未来的事件。

有效的资源利用

我们发现在较早的 pipeline 中有一些低效率的问题:

  • 计算利用率:大型作业快照整个表并以一定频率重新加载的操作是非常低效的,尤其是在只有少数记录更新的情况下。

  • 上游稳定性:由于经常要加载整个表,作业会对源施加压力,例如大量读取 MySQL 表。

  • 数据准确性:不预先检查数据质量,会导致数据质量变低并且无法给数据湖用户带来良好的体验。

  • 延迟:源表中发生变化的时间到数据湖中可查询的时间之间的延迟很大,这会降低数据的新鲜度。

Hudi 仅消费和应用上游表中更新的行和变更日志来提升我们用 DBEvents 使能的 pipeline 的效率。Hudi 的设计使用增量更新来替代快照,会用更少的计算资源,从而可以改善许多低效率的问题。同时通过读取变更日志,Hudi 不再需要加载整张表,因此可以减轻对上游数据源的压力。

图 4 清晰地描述了这些解决方案是如何在 DBEvents 的增量架构中协同工作的。在 Uber,我们从不同的数据源中拉取数据。每个源都有一个自定义的实现去读取变更日志时间并提供增量变更。举个例子,MySQL 的变更日志通过 StorageTapper 拉取并推送至 Apache Kafka,而如前面提到的,Cassandra 的变更日志则是通过 Cassandra 的变更数据捕获(CDC)功能结合 Uber 特有的集成能力来实现的。

image

图 4:在 DBEvents 中,每种源类型都以统一的消息格式向 Kafka 中发送变更日志事件。

Marmaray 是 Uber 开源的、通用的数据获取和分发库。在较高的层面上,Marmaray 为我们的 DBEvents pipeline 提供了下列功能,以提高整个架构的效率:

  • 通过我们的 schema 管理库和服务,提供高质量的、模式化的数据。
  • 从各类数据存储提取数据至我们的 Apache Hadoop 数据湖。
  • 使用 Uber 内部的工作流编排服务构建 pipeline 来消化和处理获取到的数据,并基于 HDFS 和 Apache Hive 中的数据存储和计算各类业务指标。

无论数据源是什么,单个摄取 pipeline 都会执行相同的有向无环图作业(DAG)。这个过程会依据特定的源来确定运行时的摄取行为,类似于策略设计模式

标准化变更日志事件

我们的目标之一是以一种能被其他内部数据消费者使用的方式(如流式作业和自定义的 pipeline)来标准化变更日志事件。

在标准化 DBEvents 中的变更日志之前,我们需要先解决一些问题:

  • 在模式化负载时如何发现错误,以及我们应该如何处理?
  • 负载有可能很复杂,只有全量负载的一小部分能被更新。我们如何准确的知道更新了什么?
  • 如果这个负载是上游数据库或表中行变更的日志,那么该行的主键是什么?
  • 由于我们使用 Apache Kafka 作为消息总线来发送和接收变更日志,我们如何强制事件使用单调递增的时间戳?

为了回答 DBEvents 用例的这些问题,我们定义了一组 Apache Hadoop 的元数据标题并添加到每个 Kafka 的消息中。通过这种设计,元数据和数据都能使用 heatpipe(使用 Apache Avro)进行编码并通过 Kafka 进行传输。这使我们能标准化一个能被这类事件所有消费者使用的全局的元数据集。这个元数据单独描述每个更新,以及这些更新与之前的更新有怎样的联系。元数据也会遵循 schema 规则被写入 Apache Hive 中一个名为 MetadataStruct 的特殊列。之后用户就可以轻松地查询 MetadataStruct 来获取有关行状态的更详细的信息。

下面,我们会重点介绍在事件中标准化的一些关键元数据字段:

Hadoop 元数据字段
元数据字段 描述
Row Key Row Key 字段是每个源表的唯一键,用于标识行,并根据结果合并部分变更日志。
Reference Key Reference Key 是收到的变更日志的版本,这个版本必须是单调递增的。该键可以用来判断此数据是否是某个特定行最近的更新。
Changelog Columns Changelog Columns 字段是一个数组 <record{“name”:string, “ref_key”:long, “Hadoop_Changelog_Fields”:array}> ,它包含了 column names、ref_key 和 all_changed_fieldnames,会在当前的消息事件中被更新。
Source Source 字段反映了生成变更日志的源表的类型。比如 Apache Kafka、Schemaless、Apache Cassandra 和 MySQL。
Timestamp Timestamp 字段以毫秒为单位标记事件的创建时间。Timestamp 有多种用途,最主要的是用于监控时延和完整性。(我们所说的事件的创建是指,当 StorageTapper 这类数据模式化服务在把事件推向 Kafka 前,实际模式化该事件的时间点。)
isDeleted [True/False]。这是一个布尔值,用于支持 Hive 表中 row_key 的删除。
Error Exception Error Exception 是一个字符串,用于捕获在发送当前的变更日志时遇到的异常或问题(无错时是 null)。如果源数据有任何 schema 的问题,Error Exception 会反映出收到的异常,之后就能用来追踪源的问题或修复 / 重发消息。
Error Source Data Error Source Data 是一个包含了实际数据源错误的字符串(无错时是 null)。如果出现任何有问题的消息,我们就不能将这个字段获取到主表中,应将它移至相应的错误表。可以使用这个数据来和生产者一起进行修复。
ForceUpdate [True/False]。ForceUpdate 是一个布尔值,它可以确保变更日志作用在已有数据之上。在许多情况下,比最新看到的 ref_key 更早的 ref_key 会被视为重复的并跳过。设置这个标志后,会忽略 hadoop_ref_key 字段直接应用变更日志。
Data Center Data Center 字段指的是产生事件的原始数据中心。这个字段非常有利于追踪消息以及调试任何潜在的问题,尤其是在 active-active 或 all-active 架构下。Heatpipe 会根据发布消息的数据中心自动填充这个值。

如上表所述,标准化的元数据使我们的架构具备鲁棒性和普适性。元数据提供了充足的信息使我们能完整地了解每个事件的状态。举个例子,如果事件的模式化或编码有任何问题,定义错误的字段就会被填充,如图 5 所示,我们就可以决定下一步采取什么操作。在 Uber,我们会将错误以及造成问题的实际负载都一起写入错误表中。

image

图 5:所有不符合 schema 标准的数据都会被写入 DBEvents 的错误表

错误表有很多用途:

  • 数据的生产者可以发现未通过 schema 检查的数据并在之后进行修复或发布更新。
  • 数据的操作和工具可以使用错误表来查找和调试丢失的数据。
  • 写入错误表可以确保我们的系统没有任何的数据丢失,因为数据不在实际表中就在错误表中。

下一步工作

有了 DBEvents 提供的增量变更流,我们就能为数据湖提供更快、更新、更高质量的数据。使用这些数据,才能确保 Uber 的服务更有效的运作,如车辆共享和 Uber Eats。

在未来,我们还打算通过以下功能来强化这个项目:

  • 自助服务集成:我们希望能极大地简化数据集加载至 Apache Hive 的过程。为此,我们需要对 DBEvents 的架构做一些增强,这样每个数据源都能无缝地触发引导和增量获取。这需要源系统、摄取系统和数据源监控框架之间的互相集成。
  • 延迟和完整性监控:虽然我们已经构建了模块来提供这部分的信息,但只在 Kafka 数据源做了具体实现。我们希望能为所有类型的数据源增加这部分的功能。

致谢

特别感谢 Reza Shiftehfar,Evan Richards,Yevgeniy Firsov,Shriniket Kale,Basanth Roy,Jintao Guan 以及其他团队成员的贡献!

更多内容,请关注 AI 前线

image