为什么已有 Spark 和 Dask,阿里还要开源自研分布式科学计算引擎 Mars?

阅读数:1785 2019 年 9 月 17 日 08:00

为什么已有Spark和Dask,阿里还要开源自研分布式科学计算引擎Mars?

随着数据应用类型的愈加丰富和数据规模的不断扩大,单机早已无法满足超大规模数据计算和分析的需求,分布式才是如今大数据领域的核心关键词。作为数据分析领域非常优秀的 Python 计算库,Numpy、Scipy 能够非常高效地执行单机数值计算,但却无法支持分布式计算,难以满足对海量数据进行分析的需求。为了解决这一痛点,涌现了一批为分布式场景下大规模数据科学计算而生的新计算库 / 计算引擎,比如 Numba、Dask、谷歌开源的 JAX、阿里开源的 Mars 等。

阿里的 Mars 自开源之初,就经常被拿来与 Dask 做对比,或被看作 Numpy 的替代者,那么 Mars 与 Dask 到底有何不同?Mars 的终极目标真的是要取代 Numpy 吗?同时,当前有向无环图和分布式已经有 Spark 作为后端了,为什么还需要 Mars?近日,我们有幸采访了阿里巴巴技术专家、Mars 主要开发者之一斯文骏老师,对上述问题一一作了解答。此外,他还将在 QCon 上海 2019 带来《Mars:大规模张量计算系统的构建实践》的演讲分享,对 Mars 感兴趣的同学敬请关注。

Mars 从何而来?

InfoQ:你们一开始决定开发 Mars 的初衷是什么?Mars 和您之前参与开发的 PyODPS 这个项目之间有什么渊源或关联吗?(据了解,这两个项目的主要负责人是同一个人)

斯文骏:我们最初开发 PyODPS 的目的是为了方便数据开发者使用 MaxCompute,其出发点和 Mars 是一致的,都是能让数据开发者使用与 Python 通用 Library(Numpy、Scipy、Pandas)类似的用法来使用 MaxCompute。但在开发和推广 PyODPS 的过程中,我们发现 PyODPS 存在不少局限性。PyODPS 仅仅在用户端执行,其工作原理是将用户表达式编译为 SQL,因而对于矩阵运算和 Pandas 中的 Index 变换等操作存在实现困难。即便上述操作可以使用 SQL 编写,SQL Engine 对其执行效率也难以保证。因此,我们决定从更底层出发,开发一套兼容 Python 生态要求的分布式 Library,且支持在 MaxCompute 中运行,这便是 Mars。

相比 PyODPS 只支持 DataFrame,且 DataFrame 与 Pandas DataFrame 存在一定的差异。Mars 直接依照 Numpy、Scipy 和 Pandas 接口编写,因而更符合 Python 开发者的习惯。

InfoQ:为什么业界需要分布式的科学计算引擎?原来 Python 中非常有名的 Numpy、Scipy 这些科学计算库存在什么问题?对大规模数据做科学计算有哪些难点?

斯文骏:Numpy、Scipy 等计算库能够非常高效地在单机上进行数值计算,这也是 Python 为什么能够在数据分析领域迅速增长的重要原因。但随着数据规模的不断扩大,现有单机库逐渐难以应对在如此规模的数据中进行数据分析的需求。而分布式计算库需要在传统科学计算库之上加上分布式计算引擎的能力,具体包括:

  1. 根据计算的特点对数据进行拆分。例如,矩阵乘法和 SVD 在分布式计算的场景中,对于数据切分的要求不同。进行数据拆分和物理执行图的建立都是分布式计算库的职责;
  2. 执行过程中,执行图常常深度较大,且 Barrier 较少,调度器需要利用这一特性以减少中途的 IO 和等待代价;
  3. 分布式计算引擎共有的要求,例如计算、IO 的高效并行化,故障恢复,等等。

InfoQ:Mars 架构设计的理念和原则是什么样的?它在性能、易用性、泛化性、可靠性等方面做了哪些权衡?

斯文骏:Mars 需要实现的是细粒度图调度和执行。用户使用 Mars 类型编写代码后,将形成一张逻辑上的粗粒度图。该 Graph 在提交到 Scheduler 后,会形成一张细粒度图,此后细粒度图的节点将被分配到各个 Worker 中执行。

我们基于自己实现的 Actor Model 搭建分布式调度服务。Actor Model 使用 Gevent 实现异步操作,并包装所有通信,包括进程间和机器间的调用。每个 Mars 服务都由多个进程组成,以减少 Python GIL 对执行效率的影响。为减少大块数据在 Worker 内各进程间的复制成本,Mars 使用 Arrow 进行序列化,并使用 Plasma Store 在各进程间共享所需的数据。当 Plasma Store 充满时,会 Spill 一部分旧数据到磁盘以使计算能继续进行下去。

Mars 目前支持 Worker 进程以及 Worker 两级 Failover。当 Worker 进程 Fail,Worker Daemon 会重建进程,并重跑受影响的节点。当整个 Worker Fail,Scheduler 会收到通知,根据丢失的 Worker 以及数据血缘关系确定需要重跑的节点,重新分配初始节点并将相关节点再次提交执行。

Mars 的定位

InfoQ:Mars 的很多接口格式跟 Numpy 是一致的,有人认为这是为了让开发者能够从 Numpy 无痛迁移到 Mars,也有人把 Mars 看作并行化 / 分布式化的 Numpy,Mars 开源的官方文章中对二者的性能也做了直接比较,所以 Mars 的终极目标是替代 Numpy 吗?

斯文骏:Numpy 是非常强大的 Python 数值计算库,也是 Python 数值计算的事实标准。Mars 对 Numpy 的兼容并不意味着要替换 Numpy,而是在大规模分布式计算的情景下为用户提供科学计算的能力,这一能力在单机上的执行仍然需要依赖 Numpy、Scipy、Pandas 或者 API 类似 Numpy、Pandas 的 Cupy、Cudf 等单机库。当然,Numpy 并不是在所有情形下执行效率都是最优的,所以会有 Numexpr/Numba/Jax 等在 Numpy 基础上实现加速的 Library,而使用 Mars 在某些场合下也能达到这样的效果。

InfoQ:Mars、Dask 和 Ray 做的都是 Python 并行数据分析,能否跟我们详细地解读一下,在实现并行计算的原理或方法上三者有哪些不同?三者各有什么优势和劣势?

斯文骏:Mars 和 Dask 都是使用 Python 编写的以离线数据分析为目标的分布式并行计算库,且都拥有和 Numpy/Pandas 相近的 API。但 Mars 和 Dask 在设计思路上有明显的差异。

表达式和计算图方面,Mars 构建了一整套 Tensor/DataFrame 表达方式,采用 Protobuf/JSON 记录计算图,这使得 Mars 可以使用非 Python 客户端提交作业,也可以方便地采用 Numexpr/Cupy 等库对不同运算符采用个性化的优化。而 Dask 则采用 pickle 序列化 Python Dict 及 Python Function 的形式,对客户端 / 服务端的 Python 及相关 Library 的版本一致性要求很高,同时优化难度也较高。

分布式框架方面,Mars 实现了自己的轻量化 Actor 模型,并在此基础上搭建 Scheduler 和 Worker。Mars 支持多个 Scheduler 以降低单点负载,同时通过多进程减少 Python GIL 的影响。而 Dask 则使用线程模型,单机调度效率较低。目前 Benchmark 的结果,Mars 单机执行效率全面高过 Dask,分布式执行中的大规模矩阵乘法等作业执行效率可达到 Dask 的 3 倍以上。同时,Mars 还支持进程和 Worker 级别的 Failover,使作业执行更加可靠。

Ray 主要是为增强学习开发,提供了一套非常灵活的高效率分布式执行框架,但其并未直接提供分布式 DataFrame 等支持,也没有提供图优化、调度策略以及磁盘 Spill 等支持,用户上手会比较困难。现有基于 Ray 的 Modin 等库虽然提供了 DataFrame 功能,但也存在计算规模较低等问题。

InfoQ:有人认为有向无环图和分布式已经有 Spark 作为后端了,为什么还需要 Mars?Mars 的设计有何优势?

斯文骏:Spark 和 Mars 在调度方式上是有显著差异的。Spark 的 DAG 是一种粗粒度图,两个节点内的各 Partition 根据 Narrow 或者 Wide Dependency 建立一对一的连接或者全连接。Spark 根据 Wide Dependency 切分 DAG 为 Stage,每个 Stage 需要依次执行。而 Mars 的 DAG 则是细粒度图,每个 Chunk 可以有自己的依赖。这就意味着 Mars 在执行过程中,可以更精细地控制每个 Chunk 的执行和数据释放,从而获得更大的并行度和更高的效率。

在很多场景中,全连接事实上是不必要的。以矩阵乘法为例,分布式矩阵乘法可分为 Chunk 相乘阶段和乘积累加阶段。在 Chunk 相乘阶段,Tensor a 中的每个 Chunk 并不是与 Tensor b 中的所有 Chunk 相乘,其所需的连接个数仅与 Tensor b 的某个维度的大小相同。因而,在执行中,无需等待所有 Chunk 乘法执行完成,即可执行后续的加法,待加法完成后,加法所依赖的乘积即可被释放,用于其他计算。因而 Mars 所采用的细粒度调度可以有效减少 IO 和 Stage 等待的开销,从而拥有更高的执行效率。

InfoQ:能否帮我们理清 Mars 和 TensorFlow、JAX、MXNet 等机器学习框架之间的关系?未来 Mars 有没有可能应用到上述这些深度学习框架里?

斯文骏:Mars 和 TensorFlow/PyTorch/MXNet 等机器学习框架的出发点不同。Mars 的出发点在于解决数据分析的规模问题。TensorFlow 等机器学习框架的关注点不在规模,而在如何方便地实现机器学习 / 深度学习算法。其提供的分布式功能需要用户自行指定各个节点的职能。为构建生态,Mars 正在考虑引入上述框架,使用户可以使用 Mars 产生的数据进行后续的机器学习,并将结果回流到 Mars。

JAX 是一套 Numpy 加速库,Mars 正在尝试引入以提高执行速度。

InfoQ:据了解,Mars 是阿里 MaxCompute 平台的一部分,请问它在整个 MaxCompute 平台中扮演的是什么样的角色?和其他计算引擎组件之间是如何配合的?

斯文骏:Mars 可以独立部署执行,也可以使用 PyODPS 通过 MaxCompute 调度执行,从 MaxCompute 表中读取数据,执行结果回流到表中。目前 MaxCompute 中运行 Mars 处于内测阶段。

开源以来 Mars 的新变化

InfoQ:自开源以来,Mars 又做了哪些改进?能否具体介绍一下。

斯文骏:自从开源以来,Mars 已经从 0.1 演进到 0.2,目前主干代码为 0.3。自开源以来的主要更改包括:

  1. 初步增加 DataFrame 支持,已支持一元、二元运算、merge/join、reduction、iloc 等操作;
  2. 支持 Worker 级别 Failover,可在个别 Worker fail 的情况下使作业正常完成执行;
  3. 支持 Shuffle,并在此基础上增加对 Tensor Fancy Indexing 等复杂操作的支持;
  4. 支持 Eager Mode,可避免用户手动调用 Execute,方便调试;
  5. Worker 级别 Storage 重构,统一管理各级存储,未来将可扩展到 GPU Memory 以及其他存储介质;
  6. 支持使用 Kubernetes 部署。

InfoQ:在早前一篇文章中,你们曾说过 Mars 会完全以开源的方式运作,而不是简单把代码放出来。开源这大半年来,Mars 项目的推广情况如何?公司内部和外部使用情况如何?现在有多少开发者参与到代码贡献中了?

斯文骏:Mars 目前所有开发、CI 和 Code Review 都在 Github 上进行,目前主要是 5 位阿里内部开发者参与 Mars 的开发。近一年来我们工作的重心主要在 Mars 本身的完善而非推广,阿里内部和外部的使用主要是试用性质。近期我们会加大推广的力度。

InfoQ:您认为目前 Mars 还存在哪些问题?未来改进的重点是什么?能否分享一下接下来 Mars 在技术和开源社区两个方面的规划和 Roadmap。

斯文骏:技术上,Mars 未来会继续增强对 Numpy/Pandas API 的支持,进一步提高执行效率,同时建立接口以方便与其他数据开发 / 机器学习工具交换数据。Mars 也会加强社区建设,近期将提供一系列代码解析文章,并通过建立开发者讨论组等形式加强开发者之间的联络。

采访嘉宾:斯文骏,阿里巴巴计算平台事业部技术专家,硕士研究生毕业后加入阿里巴巴,先后参与机器学习平台和 PyODPS 开发,2017 年参与启动 Mars 开发,为 Mars 分布式引擎主要开发者之一。

在 QCon 上海 2019 的演讲中,斯文骏老师将基于 Mars 以往的实践介绍系统架构和提升执行效率方面的实践,点击了解详情

评论

发布