写点什么

过去一年,Spotify 最大一次数据流优化实践

  • 2021-03-04
  • 本文字数:4882 字

    阅读完需:约 16 分钟

过去一年,Spotify最大一次数据流优化实践

Spotify 是一家在线音乐流服务平台,2006 年 4 月由 Daniel Ek 和 Martin Lorentzon 在瑞典创立。目前是全球最大的流音乐服务商之一,与环球音乐集团、索尼音乐娱乐、华纳音乐集团和腾讯音乐娱乐集团四大唱片公司及其它唱片公司合作授权、由数字版权管理保护的音乐,用户规模截至 2019 年 10 月有 2.48 亿。Spotify 采用免费增值模式,基本服务是免费的,而附加功能是通过付费订阅提供。Spotify 靠流媒体订阅 Premium 用户和第三方的广告来作为其营收收入。本文介绍了 Spotify 过去一年最大一次数据流作业优化实践。


本文,我们将讨论 Spotify 如何使用 Sort Merge Bucket(排序合并桶,SMB)联接技术优化和加速 Dataflow 作业Wrapped 2019中的元素,以用于Wrapped 2020。本文介绍了 SMB 的设计和实现,以及如何将 SMB 集成到数据管道中。

引言

Shuffle 是许多大型数据转换,例如 join、GroupByKey 或其他 reduce 操作的核心构件。遗憾的是,这也是许多管道中最昂贵的一步。SortMergeBucket 是一种通过在生产者端做预先工作来减少 Shuffle 的优化。直观地说,对于经常添加到已知 key 上的常见数据集,例如,用户 ID 上包含元数据的用户事件,我们可以将其写到 bucket 文件中,并根据该 key 记录进行分桶和排序。shuffle 知道哪些文件包含 key 的子集和次序,就可以从匹配的 bucket 文件中合并排序值,从而完全消除移动 Key-value 对代价高昂的磁盘和网络 I/O。Andrea Nardelli 在2018 年的硕士论文中进行了关于 Sort Merge Bucket 的最初研究,之后我们开始研究将这个想法推广为Scio 模块

设计与实现

Spotify 的大部分数据管道都是用Scio编写的,Scio 是Apache Beam的 Scala API,它在Google Cloud Dataflow服务上运行。我们用 Java 实现了 SMB,以更接近原生的 Beam SDK(甚至与 Beam 社区一起编写并合作了一份设计文档),和其他许多 I/O 一样, Scio 也提供了语法糖 Scala。 这个设计被模块化到以下所列的主要组件中——我们将从两个顶级 SMBPTransform开始: SortedBucketSink 和 SortedBucketSource 的写和读操作。

SortedBucketSink

这个转换以 SMB 格式写入一个 PCollection<T>(其中 T 有一个对应的 FileOperations<T> 实例)。它首先使用BucketMetadata提供的逻辑提取 Key 和分配 bucket ID,按 ID 对 key-values 进行分组,对所有 values 进行排序,然后使用 FileOperations 实例将它们写入与 bucket ID 相对应的文件中。


除了 bucket 文件之外,还将一个 JSON 文件写入输出目录,这代表从 BucketMetadata 读取源所需的信息:bucket 的数量、散列方案以及从每条记录中提取 key 的指令(例如,对于 Avro 记录,我们可以用包含 key 的 GenericRecord 字段的名称来对该指令进行编码)。


SortedBucketSource

该转换是从一个或多个 SMB 格式的源读取,使用相同的 key 和散列方案编写。它打开了来自每个源的相应 bucket 的文件句柄(使用该输入类型的 FileOperations<T>),并在保持排序的同时合并它们。这些结果作为 CoGbkResult 对象在每个 key 组中发出,就像 Beam 用于常规 Cogroup 操作的类一样,这样用户就可以使用正确的参数来提取每个源的结果。


FileOperations

FileOperations 抽象化了单个 bucket 文件的读写。因为我们需要精确地控制每个文件中的元素及其顺序,因此我们无法利用现有 Beam 文件 I/O,因为它们是在 PCollection 层上操作的,元素的位置和顺序也被抽象出来。取而代之的是, SMB 文件操作是在 BoundedSource(输入)和 ParDo(输出)的 底层进行的。当前, Avro、 BigQuery TableRow JSON 和 TensorFlow TFRecord/Example 记录得到了支持。我们还打算添加其他格式,如 Parquet。

BucketMetadata

该类对元素的 keying 和 bucketing 进行了抽象,包含诸如 key 字段、类、 bucket 的数量,碎片和散列函数。在写入时,元数据与数据文件一起序列化为 JSON 文件,用于在读取 SMB 源代码时检查兼容性。

优化和变体

在过去一年半的时间里,我们在 Spotify 的各种用例中采用了 SMB,并且在处理数据管道的规模和复杂性方面做了很多改进。


  • 日期分区:在 Spotify,事件数据以小时或日分区的方式写入 Google Cloud Services(GCS)。一个常见的数据工程用例是通过管道读取多个分区:例如,计算过去七天的流计数。这在单一 PTransform 中很容易实现,可以通过使用通配符文件模式来匹配跨多个目录的文件,从而实现非 SMB 读取。但是,与 Beam 中的大多数文件 I/O 不同, SMBRead API 要求以目录而非文件模式来指定输入(因为我们需要检查目录中的 metadata. json 文件以及实际的记录文件)。另外,它必须匹配不同分区和不同源的 bucket 文件,同时确保 CoGbkResult 的输出能够正确地将来自一个源的所有分区的数据分组到同一个 TupleTag key 中。在 SMBRead API 上我们做了改进,可以为每个源接收一个或多个目录。


  • 分片:虽然我们在 bucket 分配过程中使用的 Murmur 类散列函数通常可以确保记录在分片 bucket 时均匀分布,但在某些情况下,如果 key 空间倾斜,一个或多个 bucket 可能会不成比例地大,并且在分组和排序记录时可能会出现 OOM 错误。在这种情况下,我们允许用户指定一些碎片来进一步分片每个 bucket 文件。在 bucket 分配步骤中,每个 bundle 会随机生成一个介于 [0,numShards) 之间的值。由于这个值是与 bucket ID 完全正交计算的,所以它可以在不同文件之间拆分大的 key 组。因为每个 shard 仍然是按排序顺序写入的,所以在读的时候只需将它们组合起来就行了。


  • 并行性:由于 SMB sink 中的 bucket 数总是 2 的幂数,所以我们可以根据用户指定的所需的并行度来制定不同 bucket 数的源的连接方案。举例来说,如果用户想用 4 个 bucket 联接 Source 1,用 2 个 bucket 联接 Source 2,则可以指定其中之一:


  • 最低限度的并行性,或“合并最大的 bucket”策略:将创建 2 个并行的读取器。每个读取器将从源 A 读取 2 个桶,从源 B 读取 1 个桶,将它们合并在一起。因为 bucket ID 是通过取 key 的整数散列值模数所需的 bucket 数量来分配的,所以从数学上讲,合并后的 bucket 的 key 空间是重叠的。

  • 最大限度的并行性,即“最少 bucket 复制”策略。将创建 4 个并行的读取器。每个读取器将从源 A 中读取 1 个 bucket,从源 B 中读取 1 个 bucket,在合并每个 key 组后,读取器将不得不对 key 进行最大数量的 bucket 为模数的重新散列,以避免发出重复值。因此,即使这种策略实现了较高的并行性,仍有一些计算重复值和重新散列的开销来消除重复值。

  • 自动并行。基于 Runner 提供的运行时所需的分割大小值,在最小和最大数量之间创建若干个读取器。



  • SortedBucketTransform:一个常见的使用模式是,管道丰富了现有的数据集,方法是将一个已有数据集与一个或多个其他源进行连接,然后将其写入输出位置。在 SMB 中,我们决定使用一种独特的 PTransform 对此进行专门的支持,该 PTransform 使用相同的 keying 和 bucketing 模式对输出进行读取、转换和写入。使用同一个 worker 对每个 bucket 进行读取/转换/写入逻辑操作,就可以避免对 bucket 的数据进行重新洗牌数据和计算:由于 key 是相同的,因此我们知道从输入的 bucket M 中转换的元素在输出中也对应 bucket M,与其读取顺序相同。



  • 外部排序:我们对 Beam 的外部排序器扩展做了很多改进,包括用原生文件 I/O 替换 Hadoop 序列文件,解除 2GB 内存限制,减少磁盘使用和编码器开销。

采用核心数据生产者

因为 SMB 需要以一种特殊的方式对数据进行分类和排序,采用自然是从那个数据的生产者开始。大多数 Spotify 的数据处理依赖于几个核心数据集,它们是各个业务领域的真实数据源,例如流媒体活动、用户元数据和流上下文。为了把一年的数据转换成 SMB 格式,我们和数据集维护者一起工作。


由于 SortedBucketSink 主要通过一些额外的设置来替换 vanilla Avro sink,因此实现起来很简单。利用 Avrosink 的 sharding 选项可以控制输出文件的数量和大小。在迁移到 SMB 之后,我们没有注意到 vCPU、 vRAM 或实际时间的任何显著变化,因为分片需要一个完整的 shuffle,这与 SMB sink 的额外成本相似。我们随后还必须调整其他一些设置:


  • 同意将 user_id 作为一个十六进制字符串作为 bucket 和排序 key,因为我们需要在所有 SMB 数据集中使用相同的 key 类型和语义。


  • 将压缩设置为 DEFLATE,级别为 6,与 Scio 中默认的 Avro sink 保持一致。作为数据被 bucket 化并按 key 排序的一个很好的副作用,我们观察到,由于类似记录的配置,良好的压缩会使存储量减少 50%。


  • 确保输出文件向后兼容。在 SMB 输出文件的名称中,除了“bucket-X-shard-Y”外,还包含了相同模式的记录。这样,就可以使用已有的管道,而无需更改任何代码;它们只是在某些连接情况下没有利用加速功能。

采用 Wrapped 2020

一旦核心数据集以 SMB 格式提供,我们就开始 Wrapped 2020,建立在 Wrapped 2019 活动留下的工作基础上。设计这个架构是为了可重用,这是好的开始。然而,数据源是一个大型、昂贵的 Bigtable 集群,必须进一步扩大规模以处理 Wrapped 作业的负载。通过将 Bigtable 迁移到 SMB 源,我们希望节省成本和时间。今年我们还需要处理过滤和聚合流的新的复杂需求。这样就需要在用户的监听历史中添加一个包含流上下文信息的大数据集。因为每一个联接都有很大的规模,所以这几乎是不可能的,或者至少是非常昂贵的。取而代之的是,我们试图使用 SMB 来完全消除此连接,并避免将 Bigtable 用作监听历史的源。


为了计算 Wrapped 2020,我们必须从三个主要数据源读取流媒体活动、用户元数据以及流媒体上下文。这三个源拥有我们需要的所有数据来生成每个人的 Wrapped,同时根据监听背景进行过滤。在此之前,Bigtable 中已经有 5 年的监听历史记录,并通过 user_id 进行了按 key 排序。现在,我们可以通过 SMB 从这三个源读取已经按 user_id key 进行排序的数据。然后,我们将每个 key 一年的数据进行汇总,计算出每个用户的 Wrapped。


由于 3 个主要源中有 1 个是按小时分区的,而另外 2 个是按日分区的,所以在一个作业中读一年的数据是有问题的,因为按小时分区的数据源有很多并行读取。取而代之的是,我们先运行一些较小的作业,这些作业会汇总出每个用户每周或每天的播放次数、 播放毫秒和其他信息。然后,我们再把所有这些更小的分区汇总成一个数据分区,这个数据分区将保存一年。



SMB 让这一切变得相对简单。利用 sortMergeTransform,我们可以将三个数据源结合起来,分别读取每个按 user_id key 排序的数据,并以 SMB 格式写入 Wrapped 输出(播放次数、播放毫秒、播放上下文等等)。



最后,我们运行聚合作业,使用 sortMergeGroupByKey 读取 SMB 的所有 Wrapped 周分区,合并一年的数据,然后将结果写到输出中,以便以后的作业计算剩余的 Wrapped。此处的灵活性的关键之处在于,聚合作业可以采用任何周和日分区的混合形式,这在运行它们时具有很大的逻辑意义。在实践中,最终结果如下:



在今年的 Wrapped 项目中,这最终为我们节省了很多成本。通过利用 SMB,我们可以在不使用传统的 shuffle 或 Bigtable 的情况下,成功连接了大概总共 1PB 的数据。据我们估计,与往年基于 Bigtable 的方法相比,今年的 Dataflow 成本降低了 50%左右。另外,为了支持繁重的 Wrapped 作业,我们避免将 Bigtable 集群的容量增加到普通容量的两到三倍(峰值时可以达到 1500 个节点左右)。这在今年的活动中是一个巨大的胜利,因为我们能以一种比以往更经济的方式带来精彩的体验。

结语

通过采用 SMB,我们能够执行以前无法实现或成本高昂,或需要像 Bigtable 这样的定制工作方案的超大规模连接。在节省成本的同时,我们也开创了更多优化工作流程的方法。我们还需要做大量的工作。我们期望将更多的工作流迁移到 SMB,同时还可以处理更多的边缘情况,比如数据偏移、复合 key 以及更多的文件格式。


作者介绍:


本文作者为 Neville Li、Claire McGinty、Sahith Nallapareddy、Joel Östlund。


原文链接:


Https://engineering.atspotify.com/2021/02/11/how-spotify-optimized-the-largest-dataflow-job-ever-for-wrapped-2020/

2021-03-04 17:252514
用户头像
赵钰莹 极客邦科技 总编辑

发布了 894 篇内容, 共 681.0 次阅读, 收获喜欢 2694 次。

关注

评论

发布
暂无评论
发现更多内容

如何提高团队的工作效率?

ShineScrum

Scrum 敏捷 团队效率

达人专栏 | 还不会用 Apache Dolphinscheduler?大佬用时一个月写出的最全入门教程【三】

白鲸开源

Apache 大数据 开源 DolphinScheduler workflow

《银行保险机构消费者权益保护管理办法》,如何影响行业与个人?

易观分析

金融消费

市场份额不断提升,百度智能云稳居金融云解决方案市场第一阵营

百度开发者中心

敏捷团队教练工作坊 (Coaching Agile Teams) | 6月11日

ShineScrum

敏捷 教练 敏捷教练 cat 高管教练

IP归属地火了,IP地址黑灰产浮出水面 要如何预防?

郑州埃文科技

网络安全 IP地址 网络灰黑产

AIRIOT物联网低代码平台如何配置Modbus RTU协议?

AIRIOT

驱动配置

为什么要上云,您的团队适合上云吗?Atlassian白皮书给你答案

龙智—DevSecOps解决方案

Atlassian atlassian云版

编写实用有效的产品帮助文档,提升客户满意度

小炮

帮助文档

英特尔宣布加入OpenCloudOS操作系统社区

TencentOS

卷入上海疫情,被封40天,我的一点心得

大数据梦想家

程序员 程序人生 疫情防控

云启资本宣布加入 OpenCloudOS 操作系统社区,将自身在开源领域近十年的经验贡献社区

TencentOS

ShardingSphere 在金融支付场景下的实践与调优

SphereEx

Apache 数据库 开源 ShardingSphere SphereEx

ironSource开通业内首家微信客户服务平台, 为中国客户提供本地支持

极客天地

7 款最棒的 React 移动端 UI 组件库 - 特别针对国内使用场景推荐

蒋川

UI 前端框架 React 移动端 组件库

千万奖金的首届昇腾AI创新大赛来了,OpenI启智社区提供开发环境和全部算力

OpenI启智社区

人工智能 昇腾AI大赛

域成员服务器怎么会突然脱域?

BUG侦探

脱域 域信任关系 windows更新

云原生架构及演进

云智慧AIOps社区

云原生 k8s 构架

OpenCloudOS 云原生演进之路

TencentOS

易安联参编《SASE技术与应用场景白皮书》正式发布

权说安全

网络安全 sase

一文了解游戏美术开发流程,以及可能遇到的问题

龙智—DevSecOps解决方案

perforce Helix Core Helix DAM

如何 DIY 一款属于自己的【3D 重力感应 动态壁纸 】,看完这篇文章你也可以学会

呆呆敲代码的小Y

android Unity 壁纸 动态壁纸

百度吴甜提出大模型落地关键路径 业内首发行业大模型

百度开发者中心

面试题:关于HDFS,你的沉淀是什么?

Joseph295

中兴通讯宣布加入OpenCloudOS操作系统社区,为自主操作系统做出重要贡献

TencentOS

微信小程序和 uniapp 的区别是什么?

CRMEB

关于2022年12代C/C++Linux服务器开发高级架构师课程体系分析

C++后台开发

后端开发 Linux服务器开发 C++后台开发 Linux后台开发 服务器开发架构师

Jira工时管理插件线上安装量过百,龙智产品赢得全球企业信赖

龙智—DevSecOps解决方案

Jira插件 龙智 龙智自研插件

2022年中国互联网母婴行业年度分析

易观分析

母婴产品

物联网时代,如何保障嵌入式系统安全?

龙智—DevSecOps解决方案

klocwork perforce Helix QAC

手把手教你如何高效落地单项目管理 | 一看既会

阿里云云效

云计算 阿里云 项目管理 敏捷开发 单项目管理

过去一年,Spotify最大一次数据流优化实践_AI&大模型_Geek_e883bb_InfoQ精选文章