大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

阅读数:3 2020 年 2 月 22 日 09:00

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

复制数据到云

LiveRamp 正在大规模迁移其基础设施到 GCP。在之前的文章中,我们谈到了迁移以及使用谷歌作为云供应商的决定。在本文中,我想着重强调一个为完成本次迁移而必须解决的问题:复制我们的数据到云。

在大数据公司之中,LiveRamp 占据着一个相对小众的领域:我们所有的产品都与热数据类的大数据任务相关。我们确实有一些相对较大的历史数据集,但与我们在线生成和使用的大型数据集相比,这些只是次要问题,我们的数据集预计只能保留几天。并且由于我们的数据访问模式,大多数云数据迁移方案都不适合我们,尤其是像亚马逊的 Snowball 或谷歌的设备传输,这样数据传输方式对我们毫无用处。

更麻烦的是,保存这些热数据的托管设施的网络设备都相当陈旧,它们主要用于数据中心内部的数据传输。硬件升级可能会花费数十万甚至数百万美元,所以我们不得不利用现有资源,将硬件利用到极致,以获得 50Gbps 的 GCP(谷歌云计算平台)互连吞吐量。我们数据基础设施团队的任务是构建工具,从而让应用程序开发人员能够共享这些有限的资源,并在不中断客户服务的基础上将数据放入云中。

Data Replicator

我们所构建的用于满足这些需求的应用程序称为 Data Replicator(数据复制器)。本质上,Data Replicator 是目前大多数 Hadoop 版本都支持的 DistCp 任务的载体,但围绕它的却是一些非常复杂的问题,如分布式作业调度、监控和遵循最小惊喜原则等,我们将在后面详细讨论。

架构

我们使用 LiveRamp 的 daemon_lib 项目来编排一组 Kubernetes 的 pod,这些 pod 来自于一个数据复制请求的提交队列,每个请求都包含一个源文件路径和一个目标文件路径。我们检查数据源的大小:对于小数据集,我们直接使用本地进程将数据复制到其目的地,而对于大数据集,我们向 Hadoop 集群提交一个 Hadoop DistCp 任务,进而执行高度分布式的复制(稍后将详细介绍)。

数据库作为优先队列

像很多应用程序一样,Data Replicator 可以被抽象为 worker 从队列中获取任务并执行它们。然而,我们并没有使用现有的一些典型的分布式队列解决方案,比如 Google 的 Pub Sub 或 Apache 的 Kafka。相反,我们只是使用一些普通的 SQL(结构查询语言)来处理请求。这给了我们一些优势:

Data Replicator 任务可以在任何地方运行 30 秒到 24 小时,我们希望尽可能地避免向用户返回任何类型的失败(或者请求被删除等更糟糕的信息)。这意味着我们需要更显著的持久性和确认特性:只有在工作完成时执行删除操作,而不是在处理开始时。一个请求的生命周期如下:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

大多数分布式队列都不能很好地支持一些处理,例如处理可能需要很长时间,或者 worker 在工作中死亡。如果一个执行者在工作中死亡,请求将被标记为失败,并被放到“挂起”队列或者被手动取消。

同样,原始请求队列对长时间请求的状态的可见性也非常有限。我们希望为人和软件客户端提供一个清晰的系统状态视图。因为这只是 SQL,我们可以任意索引从而为外部应用程序、用户界面或执行器来提供这种可见性:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

具体来说,执行器希望选择优先级最高的挂起请求。大多数队列不支持从多达数千计的候选里获取一个更高优先级的请求。我们的索引如同一个优先队列,查询非常简洁:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

这种细致的索引可以让应用程序团队提交数十万个低优先级请求,而且不会降级系统的健康状况:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

最后也很重要的是,我们的 devops 团队和开发人员对 SQL 和 MySQL 都非常了解,学习一项新技术并构建与之相关的功能,这也意味着巨大的机会成本。选择使用 SQL,我们能最好地利用开发团队中现有的人才。

我们使用 LiveRamp 的 daemon_lib 项目来编排数据库检查流程、将个体标记为选中、处理它们,然后将它们标记为完成或失败。我们利用 Apache Curator 和 Apache ZooKeeper 的内置锁来协调读取和更新数据库的进程。我们还使用 Curator 的临时锁来注册正在完成的工作,这样,如果数据复制任务由于某种方式死亡,这将会被检测到,而其它 worker 会把这项任务返回到队列中。所有这一切给了我们一个非常有弹性的系统,可以确保我们在正确的时间做正确的事,并且不会有任何损失。

需要澄清的是,我们并没有为 Data Replicator 而构建一个守护服务的基础设施。daemon_lib 和数据库后端的队列在 LiveRamp 内部被广泛使用,我们也借助现有的工具快速构建了一个开箱即用的容错系统。

监控

系统健康的可视化是我们设计服务的驱动目标之一。通过使用 LiveRamp 的内部服务处理框架,我们得到了一个非常好的用户界面,该界面显示了所有 Data Replicator 请求的汇总:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

虽然这提供了系统健康的真实信息,但在 LiveRamp 我们更喜欢使用 DataDog 的控制面板来构建时间序列界面,因此我们决定将尽可能多的监控指标推送给 DataDog,并在那里构建一个全面的监控。我们从不同来源获取数据:

  • 我们以前介绍过 MapReduce 计数器在调试 MapReduce 作业时非常有用。Distcp 是一个 MapReduce 任务,因此我们将 Distcp 的计数器推送给 DataDog。
  • 因为我们的工作队列很容易查询,所以我们将挂起的工作队列也推送给 DataDog。
  • 执行者自己推送关于他们正在复制的文件的指标。
  • 我们依靠 GCP API 来提取云互连利用率指标

如下是我们构建的一些更常见的参考图表:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

这是一个关于系统健康的最基本的度量标准,显示了我们工作队列的大小。如果我们没有挂起或失败的请求,那么就表示我们的状态非常好。

当然,原始计数只是一个粗略的度量标准。我们支持请求优先级,这样团队就可以在不影响高优先级流量的情况下提交大量低优先级冷数据的复制任务。通过跟踪每个优先级的请求时间,我们可以确认对时间敏感的请求实际上得到了快速处理:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

我们最重要的系统限制是带宽。我们跟踪了互连带宽的历史使用情况:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

这提供了聚合利用率,但没有按应用程序分解。由于单个请求被客户端应用程序标记,我们实际上可以将带宽利用率归于服务消费者:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

通过这些工具,我们可以很直观地了解数据复制服务的健康状况。

DistCp 问题

我们在使用 DistCp 时遇到了一个有趣的接口问题:DistCp 的覆写选项。覆写标志位决定了当 DistCp 检测到其试图复制的文件已经在目标文件系统时所发生的情况:如果标志位为假,DistCp 不复制文件;如果标志位为真,它将删除已经存在的文件并复制源文件。

Distcp 不会检查文件的内容,因此用户必须知道他们的数据是不是最新的,以及是否需要覆写。我们希望使用者能够自己设置这个标志位,这样他们就可以在不知道数据集复制情况的前提下仍能够执行最有效的操作,尤其是在他们被要求去重复复制一些并未改变的数据的时候。

但是我们遇到了一个阻碍,正如 DistCp 文档中所描述的,覆写标志位实际上改变了被复制数据的目录结构!我们开始只是打算简单地把这个信息通知给用户,并让他们为自己的请求负责,但我们认为这严重违背了最小惊喜原则,最终只会给我们的用户和团队带来更多的痛苦,因此我们决定由自己负责,编写代码让 Data Replicator 的复制行为表现一致。

这些封装 DistCp 不一致的代码牵扯到了一些分支逻辑,但最终并没有像我们起初担心的那样复杂。真正困难的是如何证实我们想要覆盖的测试的确表现出了一致的行为。为了测试新代码,我们使用 LiveRamp 的 generative 库编写了一个单独的、可扩展的测试,创建了 100 多个潜在输入的组合,并确认创建的文件和目录结构符合我们的预期。我会在以后介绍更多关于使用 generative 以及基于属性的测试经验。

不同规模的任务

为保持用户界面的简洁性,即便是复制不同大小的文件,Data Replicator 都必须为用户提供相同的 API。所以,不论是复制 5TB 的冷数据还是复制 5MB 的导入,用户最终都会使用相同的服务调用。

虽然 MapReduce 和 DistCP 非常乐意启动一个 YARN 应用程序来处理一个哪怕只是复制 5MB 数据的请求,但启动一个 YARN 应用程序会产生相当高的开销:如果向 ResourceManager(资源管理器)提交一个 5MB 的复制任务,那么就会启动一个 ApplicationMaster(程序调度器),以及一个 Map 任务来执行实际的复制:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

为避免这种不必要的开销,当请求的输入文件为 1GB 时,我们将任务降低到“本地模式”。在这种模式下,worker 直接将数据从源文件系统复制到 GCS,而不需要启动 DistCP 应用程序:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

本地模式在减少系统开销方面非常成功;事实上,我们有 75% 的请求完全跳过了 DistCP,直接通过这些 worker 进行复制:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

我们遇到的最后一个瓶颈实际上与我们快速启动任务的能力有关。当从队列中拖出任务时,worker 会在构建请求时将队列“锁定”几秒钟。在请求数量极大但请求的工作量又很小的情况下,这会成为吞吐量的瓶颈。我们通过将 [状态,优先级] 队列中执行器的任务分片以及独立锁定分片的方式解决了这个问题:

大数据公司 LiveRamp 上云记(四):如何在迁移时处理数百万请求和 PB 级数据传输

有了这些,我们就能够处理尽可能多的请求。

Data Replicator 的未来

在 LiveRamp,我们喜欢编写代码,但我们更喜欢在不使用时将它们删除。虽然我们为 Data Replicator 服务感到骄傲,但我们不会永久性的安装它。我们正在全面迁移到 GCP ,一旦迁移完成,我们就不会受任何带宽限制,也就没有了使用这个服务的必要。

前三篇文章主要讨论了如何将我们现有的基础设施过渡到 GCP。在下一篇文章中,我们将讨论使用 GCP 最有趣的部分,即使用云原生技术重构我们的本地系统。

原文链接:

https://liveramp.com/engineering/migrating-a-big-data-environment-to-the-cloud-part-4%ef%bb%bf/

相关阅读:

大数据公司 LiveRamp 上云记(一):为什么选择 GCP?

大数据公司 LiveRamp 上云记(二):哪些功能可以直接迁移,哪些需要重新设计?

大数据公司 LiveRamp 上云记(三):如何在吞吐量有限的情况下处理数据复制

评论

发布