写点什么

面向大数据与云计算的阿里经济体核心调度系统 Fuxi 2.0 全揭秘

  • 2020-05-22
  • 本文字数:16934 字

    阅读完需:约 56 分钟

面向大数据与云计算的阿里经济体核心调度系统Fuxi 2.0全揭秘

伏羲(Fuxi)是十年前最初创立飞天平台时的三大服务之一(分布式存储 Pangu,分布式计算 MaxCompute,分布式调度 Fuxi),当时的设计初衷是为了解决大规模分布式资源的调度问题(本质上是多目标的最优匹配问题)。


随阿里经济体和阿里云丰富的业务需求(尤其是双十一)和磨练,伏羲的内涵不断扩大,从单一的资源调度器(对标开源系统的 YARN)扩展成大数据的核心调度服务,覆盖数据调度(Data Placement)、资源调度(Resouce Management)、计算调度(Application Manager)、和本地微(自治)调度(即正文中的单机调度)等多个领域,并在每一个细分领域致力于打造超越业界主流的差异化能力。


过去十年来,伏羲在技术能力上每年都有一定的进展和突破(如 2013 年的 5K,15 年的 Sortbenchmark 世界冠军,17 年的超大规模离在/在离混布能力,2019 年的 Yugong 发布并论文被 VLDB 接受等等)。本文试从面向大数据/云计算的调度挑战出发,介绍各个子领域的关键进展,并回答什么是“伏羲 2.0”。

1. 引言

过去 10 年,是云计算的 10 年,伴随云计算的爆炸式增长,大数据行业的工作方式也发生了很大的变化:从传统的自建自运维 hadoop 集群,变成更多的依赖云上的弹性低成本计算资源。海量大数据客户的信任和托付,对阿里大数据系统来说,是很大的责任,但也催生出了大规模、多场景、低成本、免运维的 MaxCompute 通用计算系统。


同样的 10 年,伴随着阿里年年双 11,MaxCompute 同样支撑了阿里内部大数据的蓬勃发展,从原来的几百台,到现在的 10 万台物理机规模。


双线需求,殊途同归,海量资源池,如何自动匹配到大量不同需求的异地客户计算需求上,需要调度系统的工作。本文主要介绍阿里大数据的调度系统 FUXI 往 2.0 的演进。先给大家介绍几个概念:


  • 首先,数据从哪里来?数据往往伴随着在线业务系统产生。而在线系统,出于延迟和容灾的考虑,往往遍布北京、上海、深圳等多个地域,如果是跨国企业,还可能遍布欧美等多个大陆的机房。这也造成了我们的数据天然分散的形态。而计算,也可能发生在任意一个地域和机房。可是网络,是他们中间的瓶颈,跨地域的网络,在延迟和带宽上,远远无法满足大数据计算的需求。如何平衡计算资源、数据存储、跨域网络这几点之间的平衡,需要做好“数据调度”。

  • 其次,有了数据,计算还需要 CPU,内存,甚至 GPU 等资源,当不同的公司,或者单个公司内部不同的部门,同时需要计算资源,而计算资源紧张时,如何平衡不同的用户,不同的作业?作业也可能长短不一,重要程度不尽相同,今天和明天的需求也大相径庭。除了用户和作业,计算资源本身可能面临硬件故障,但用户不想受影响。所有这些,都需要“资源调度”。

  • 有了数据和计算资源,如何完成用户的计算任务,比如一个 SQL query?这需要将一个大任务,分成几个步骤,每个步骤又切分成成千上万个小任务,并行同时计算,才能体现出分布式系统的加速优势。但小任务切粗切细,在不同的机器上有快有慢,上下步骤如何交接数据,同时避开各自故障和长尾,这些都需要“计算调度”。

  • 很多不同用户的不同小任务,经过层层调度,最后汇集到同一台物理机上,如何避免单机上真正运行时,对硬件资源使用的各种不公平,避免老实人吃亏。避免重要关键任务受普通任务影响,这都需要内核层面的隔离保障机制。同时还要兼顾隔离性和性能、成本的折中考虑。这都需要“单机调度”。



2013 年,伏羲在飞天 5K 项目中对系统架构进行了第一次大重构,解决了规模、性能、利用率、容错等线上问题,并取得世界排序大赛 Sortbenchmark 四项冠军,这标志着 Fuxi 1.0 的成熟。


2019 年,伏羲再次出发,从技术上对系统进行了第二次重构,发布 Fuxi 2.0 版本:阿里自研的新一代高性能、分布式的数据、资源、计算、单机调度系统。Fuxi 2.0 进行了全面的技术升级,在全区域数据排布、去中心化调度、在线离线混合部署、动态计算等方面全方位满足新业务场景下的调度需求。


伏羲 2.0 成果概览


• 业内首创跨地域多数据中心的数据调度方案-Yugong,通过 3%的冗余存储,节省 80%的跨地域网络带宽


• 业内领先的去中心化资源调度架构,单集群支持 10 万服务器*10 万并发 job 的高频调度


• 动态 DAG 闯入传统 SQL 优化盲区,TPC-DS 性能提升 27%,conditional join 性能提升 3X。


• 创新性的数据动态 shuffle 和全局跨级优化,取代业界磁盘 shuffle;线上千万 job,整体性能提升 20%,成本下降 15%,出错率降低一个数量级


• 在线离线规模化混合部署,在线集群利用率由 10%提升到 40%,双十一大促节省 4200 台 F53 资源,且同时保障在线离线业务稳定。

2. 数据调度 2.0 - 跨地域的数据调度

阿里巴巴在全球都建有数据中心,每个地区每天会产生一份当地的交易订单信息,存在就近的数据中心。北京的数据中心,每天会运行一个定时任务来统计当天全球所有的订单信息,需要从其他数据中心读取这些交易数据。当数据的产生和消费不在一个数据中心时,我们称之为跨数据中心数据依赖(下文简称跨中心依赖)。



MaxCompute 上每天运行着数以千万计的作业,处理 EB 级别的数据。这些计算和数据分布在全球的数据中心,复杂的业务依赖关系产生了大量的跨中心依赖。相比于数据中心内的网络,跨数据中心网络(尤其是跨域的网络)是非常昂贵的,同时具有带宽小、延迟高、稳定性低的特点。比如网络延迟,数据中心内部网络的网络延迟一般在 100 微秒以下,而跨地域的网络延迟则高达数十毫秒,相差百倍以上。因此,如何高效地将跨中心依赖转化为数据中心内部的数据依赖,减少跨数据中心网络带宽消耗,从而降低成本、提高系统效率,对 MaxCompute 这样超大规模计算平台而言,具有极其重要的意义。



为了解决这个问题,我们在数据中心上增加了一层调度层,用于在数据中心之间调度数据和计算。这层调度独立于数据中心内部的调度,目的是实现跨地域维度上存储冗余–计算均衡–长传带宽–性能最优之间的最佳平衡。这层调度层包括跨数据中心数据缓存、业务整体排布、作业粒度调度。


首先是对访问频次高的数据进行跨数据中心缓存,在缓存空间有限的约束下,选择合适的数据进行换入换出。不同于其他缓存系统,MaxCompute 的数据(分区)以表的形式组织在一起,每张表每天产生一个或多个分区,作业访问数据也有一些特殊规律,比如一般访问的是连续分区、生成时间越新的分区访问概率越大。


其次是业务的整体排布策略。数据和计算以业务为单位组织在一起(MaxCompute 中称之为 project),每个 project 被分配在一个数据中心,包括数据存储和计算作业。如果将 project 看做一个整体,可以根据作业对数据的依赖关系计算出 project 之间的相互依赖关系。如果能将有互相数据依赖的 project 放在一个数据中心,就可以减少跨中心依赖。但 project 间的依赖往往复杂且不断变化,很难有一劳永逸的排布策略,并且 project 排布需要对 project 进行整体迁移,周期较长,且需要消耗大量的带宽。


最后,当 project 之间的互相依赖集中在极少数几个作业上,并且作业的输入数据量远大于输出数据量时,比起数据缓存和 project 整体迁移,更好的办法是将这些作业调度到数据所在的数据中心,再将作业的输出远程写回原数据中心,即作业粒度调度。如何在作业运行之前就预测到作业的输入输出数据量和资源消耗,另一方面当作业调度到 remote 数据中心后,如何保证作业运行不会变慢,不影响用户体验,这都是作业粒度调度要解决的问题。


本质上,数据缓存、业务排布、作业粒度调度三者都在解同一个问题,即在跨地域多数据中心系统中减少跨中心依赖量、优化作业的 data locality、减少网络带宽消耗。

1.2.1 跨数据中心数据缓存策略

我们首次提出了跨地域、跨数据中心数据缓存这一概念,通过集群的存储换集群间带宽,在有限的冗余存储下,找到存储和带宽最佳的 tradeoff。通过深入的分析 MaxCompute 的作业、数据的特点,我们设计了一种高效的算法,根据作业历史的 workload、数据的大小和分布,自动进行缓存的换入换出。


我们研究了多种数据缓存算法,并对其进行了对比试验,下图展示了不同缓存策略的收益,横轴是冗余存储空间,纵轴是带宽消耗。从图中可以看出,随着冗余存储的增加,带宽成本不断下降,但收益比逐渐降低,我们最终采用的 k-probe 算法在存储和带宽间实现了很好的平衡。


1.2.2 以 project 为粒度的多集群业务排布算法

随着上层业务的不断发展,业务的资源需求和数据需求也在不断变化。比如一个集群的跨中心依赖增长迅速,无法完全通过数据缓存来转化为本地读取,这就会造成大量的跨数据中心流量。因此我们需要定期对业务的排布进行分析,根据业务对计算资源、数据资源的需求情况,以及集群、机房的规划,通过业务的迁移来降低跨中心依赖以及均衡各集群压力。


下图展示了某个时刻业务迁移的收益分析:左图横轴为迁移的 project 数量,纵轴为带宽减少比例,可以看出大约移动 60 个 project 就可以减少约 30%的带宽消耗。右图统计了不同排布下(迁移 0 个、20 个、50 个 project)的最优带宽消耗,横轴为冗余存储,纵轴为带宽。


1.2.3 跨数据中心计算调度机制

我们打破了计算资源按照数据中心进行规划的限制,理论上允许作业跑在任何一个数据中心。我们将调度粒度拆解到作业粒度,根据每个作业的数据需求、资源需求,为其找到一个最合适的数据中心。在对作业进行调度之前需要知道这个作业的输入和输出,目前我们有两种方式获得这一信息,对于周期性作业,通过对作业历史运行数据进行分析推测出作业的输入输出;对于偶发的作业,我们发现其产生较大跨域流量时,动态的将其调度到数据所在的数据中心上运行。另外,调度计算还要考虑作业对计算资源的需求,防止作业全部调度到热点数据所在的数据中心,造成任务堆积。

1.3 线上效果

线上三种策略相辅相成,数据缓存主要解决周期类型作业、热数据的依赖;作业粒度调度主要解决临时作业、历史数据的依赖;并周期性地通过业务整体排布进行全局优化,用来降低跨中心依赖。整体来看,通过三种策略的共同作用,降低了约 90%的跨地域数据依赖,通过约 3%的冗余存储节省了超过 80%的跨数据中心带宽消耗,将跨中心依赖转化为本地读取的比例提高至 90%。下图以机房为单位展示了带宽的收益:


3. 资源调度 2.0 - 去中心化的多调度器架构

2019 年双十一,MaxCompute 平台产生的数据量已接近 EB 级别,作业规模达到了千万,有几十亿的 worker 跑在几百万核的计算单元上,在超大规模(单集群超过万台),高并发的场景下,如何快速地给不同的计算任务分配资源,实现资源的高速流转,需要一个聪明的“大脑”,而这就是集群的资源管理与调度系统(简称资源调度系统)。


资源调度系统负责连接成千上万的计算节点,将数据中心海量的异构资源抽象,并提供给上层的分布式应用,像使用一台电脑一样使用集群资源,它的核心能力包括规模、性能、稳定性、调度效果、多租户间的公平性等等。一个成熟的资源调度系统需要在以下五个方面进行权衡,做到“既要又要”,非常具有挑战性。



13 年的 5K 项目初步证明了伏羲规模化能力,此后资源调度系统不断演进,并通过 MaxCompute 平台支撑了阿里集团的大数据计算资源需求,在核心调度指标上保持着对开源系统的领先性,比如 1)万台规模集群,调度延时控制在了 10 微秒级别,worker 启动延时控制在 30 毫秒;2)支持任意多级租户的资源动态调节能力(支持十万级别的租户);3)极致稳定,调度服务全年 99.99%的可靠性,并做到服务秒级故障恢复。

2.1 单调度器的局限性

2.1.1 线上的规模与压力

大数据计算的场景与需求正在快速增长(下图是过去几年 MaxComputer 平台计算和数据的增长趋势)。单集群早已突破万台规模,急需提供十万台规模的能力。



但规模的增长将带来复杂度的极速上升,机器规模扩大一倍,资源请求并发度也会翻一番。在保持既有性能、稳定性、调度效果等核心能力不下降的前提下,可以通过对调度器持续性能优化来扩展集群规模(这也是伏羲资源调度 1.0 方向),但受限于单机的物理限制,这种优化总会存在天花板,因此需要从架构上优化来彻底规模和性能的可扩展性问题。

2.1.2 调度需求的多样性

伏羲支持了各种各样的大数据计算引擎,除了离线计算(SQL、MR),还包括实时计算、图计算,以及近几年迅速发展面向人工智能领域的机器学习引擎。



场景的不同对资源调度的需求也不相同,比如,SQL 类型的作业通常体积小、运行时间短,对资源匹配的要求低,但对调度延时要求高,而机器学习的作业一般体积大、运行时间长,调度结果的好坏可能对运行时间产生直接影响,因此也能容忍通过较长的调度延时换取更优的调度结果。资源调度需求这种多样性,决定了单一调度器很难做到“面面俱到”,需要各个场景能定制各自的调度策略,并进行独立优化。

2.1.3 灰度发布与工程效率

资源调度系统是分布式系统中最复杂最重要的的模块之一,需要有严苛的生产发布流程来保证其线上稳定运行。单一的调度器对开发人员要求高,出问题之后影响范围大,测试发布周期长,严重影响了调度策略迭代的效率,在快速改进各种场景调度效果的过程中,这些弊端逐渐显现,因此急需从架构上改进,让资源调度具备线上的灰度能力,从而幅提升工程效率。

2.2 去中心化的多调度器架构

为了解决上述规模和扩展性问题,更好地满足多种场景的调度需求,同时从架构上支持灰度能力,伏羲资源调度 2.0 在 1.0 的基础上对调度架构做了大规模的重构,引入了去中心化的多调度器架构。



我们将系统中最核心的资源管理和资源调度逻辑进行了拆分解耦,使两者同时具备了多 partition 的可扩展能力(如下图所示),其中:


• 资源调度器(Scheduler):负责核心的机器资源和作业资源需求匹配的调度逻辑,可以横向扩展。


• 资源管理和仲裁服务(ResourceManagerService,简称 RMS):负责机器资源和状态管理,对各个 Scheduler 的调度结果进行仲裁,可以横向扩展。


• 调度协调服务(Coordinator):管理资源调度系统的配置信息,Meta 信息,以及对机器资源、Scheduler、RMS 的可用性和服务角色间的可见性做仲裁。不可横向扩展,但有秒级多机主备切换能力。


• 调度信息收集监控服务(FuxiEye):统计集群中每台机的运行状态信息,给 Scheduler 提供调度决策支持,可以横向扩展。


• 用户接口服务(ApiServer):为资源调度系统提供外部调用的总入口,会根据 Coordinator 提供的 Meta 信息将用户请求路由到资源调度系统具体的某一个服务上,可以横向扩展。


2.3 上线数据

以下是 10w 规模集群/10 万作业并发场景调度器核心指标(5 个 Scheduler、5 个 RMS,单 RMS 负责 2w 台机器,单 Scheduler 并发处理 2w 个作业)。通过数据可以看到,集群 10w 台机器的调度利用率超过了 99%,关键调度指标,单 Scheduler 向 RMS commit 的 slot 的平均数目达到了 1w slot/s。


在保持原有单调度器各项核心指标稳定不变的基础上,去中心化的多调度器框架实现了机器规模和应用并发度的双向扩展,彻底解决了集群的可扩展性问题。



目前资源调度的新架构已全面上线,各项指标持续稳定。在多调度器架构基础上,我们把机器学习场景调度策略进行了分离,通过独立的调度器来进行持续的优化。同时通过测试专用的调度器,我们也让资源调度具备了灰度能力,调度策略的开发和上线周期显著缩短。

4. 计算调度 2.0 - 从静态到动态

分布式作业的执行与单机作业的最大区别,在于数据的处理需要拆分到不同的计算节点上,“分而治之”的执行。这个“分”,包括数据的切分,聚合以及对应的不同逻辑运行阶段的区分,也包括在逻辑运行阶段间数据的 shuffle 传输。每个分布式作业的中心管理点,也就是 application master (AM)。这个管理节点也经常被称为 DAG (Directional Acyclic Graph, 有向无环图) 组件,是因为其最重要的责任,就是负责协调分布式系统中的作业执行流程,包括计算节点的调度以及数据流(shuffle)。


对于作业的逻辑阶段和各个计算节点的管理, 以及 shuffle 策略的选择/执行,是一个分布式作业能够正确完成重要前提。这一特点,无论是传统的 MR 作业,分布式 SQL 作业,还是分布式的机器学习/深度学习作业,都是一脉相承的,为了帮助更好的理解计算调度(DAG 和 Shuffle)在大数据平台中的位置,我们可以通过 MaxCompute 分布式 SQL 的执行过程做为例子来了解:



在这么一个简单的例子中,用户有一张订单表 order_data,存储了海量的交易信息,用户想所有查询花费超过 1000 的交易订单按照 userid 聚合后,每个用户的花费之和是多少。于是提交了如下 SQL query:


INSERT OVERWRITE TABLE resultSELECT userid, SUM(spend) FROM  order_dataWHERE spend > 1000GROUP BY userid;
复制代码


这个 SQL 经过编译优化之后生成了优化执行计划,提交到 fuxi 管理的分布式集群中执行。我们可以看到,这个简单的 SQL 经过编译优化,被转换成一个具有 M->R 两个逻辑节点的 DAG 图,也就是传统上经典的 MR 类型作业。而这个图在提交给 fuxi 系统后,根据每个逻辑节点需要的并发度,数据传输边上的 shuffle 方式,调度时间等等信息,就被物化成右边的物理执行图。物理图上的每个节点都代表了一个具体的执行实例,实例中包含了具体处理数据的算子,特别的作为一个典型的分布式作业,其中包含了数据交换的算子 shuffle——负责依赖外部存储和网络交换节点间的数据。一个完整的计算调度,包含了上图中的 DAG 的调度执行以及数据 shuffle 的过程。


阿里计算平台的 fuxi 计算调度,经过十年的发展和不断迭代,成为了作为阿里集团内部以及阿里云上大数据计算的重要基础设施。今天计算调度同时服务了以 MaxCompute SQL 和 PAI 为代表的多种计算引擎,在近 10 万台机器上日均运行着千万界别的分布式 DAG 作业,每天处理 EB 数量级的数据。一方面随着业务规模和需要处理的数据量的爆发,这个系统需要服务的分布式作业规模也在不断增长;另一方面,业务逻辑以及数据来源的多样性,计算调度在阿里已经很早就跨越了不同规模上的可用/够用的前中期阶段,2.0 上我们开始探索更加前沿的智能化执行阶段。



在云上和阿里集团的大数据实践中,我们发现对于计算调度需要同时具备超大规模和智能化的需求,以此为基本诉求我们开了 Fuxi 计算调度 2.0 的研发。下面就为大家从 DAG 调度和数据 shuffle 两个方面分别介绍计算调度 2.0 的工作。

4.1 Fuxi DAG 2.0–动态、灵活的分布式计算生态

4.1.1 DAG 调度的挑战

传统的分布式作业 DAG,一般是在作业提交前静态指定的,这种指定方式,使得作业的运行没有太多动态调整的空间。放在 DAG 的逻辑图与物理图的背景中来说,这要求分布式系统在运行作业前,必须事先了解作业逻辑和处理数据各种特性,并能够准确回答作业运行过程,各个节点和连接边的物理特性问题,然而在现实情况中,许多和运行过程中数据特性相关的问题,都只有个在执行过程中才能被最准确的获得。静态的 DAG 执行,可能导致选中的是非最优的执行计划,从而导致各种运行时的效率低下,甚至作业失败。这里我们可以用一个分布式 SQL 中很常见的例子来说明:


SELECT a.spend, a.userid, b.ageFROM    (            SELECT  spend, userid            FROM    order_data            WHERE   spend > 1000        ) aJOIN    (            SELECT  userid, age            FROM    user            WHERE   age > 60        ) bON      a.userid = b.userid;
复制代码


上面是一个简单的 join 的例子,目的是获取 60 岁以上用户花费大于 1000 的详细信息,由于年纪和花费在两张表中,所以此时需要做一次 join。一般来说 join 有两种实现方式:


一是 Sorted Merge Join(如下图左侧的所示):也就是对于 a 和 b 两个子句执行后的数据按照 join key(userid)进行分区,然后在下游节点按照相同的 key 进行 Merge Join 操作,实现 Merge Join 需要对两张表都要做 shuffle 操作——也就是进行一次数据狡猾,特别的如果有数据倾斜(例如某个 userid 对应的交易记录特别多),这时候 MergeJoin 过程就会出现长尾,影响执行效率;


二是实现方式是 Map join(Hash join)的方式(如下图右侧所示):上述 sql 中如果 60 岁以上的用户信息较少,数据可以放到一个计算节点的内存中,那对于这个超小表可以不做 shuffle,而是直接将其全量数据 broadcast 到每个处理大表的分布式计算节点上,大表不用进行 shuffle 操作,通过在内存中直接建立 hash 表,完成 join 操作,由此可见 map join 优化能大量减少 (大表) shuffle 同时避免数据倾斜,能够提升作业性能。但是如果选择了 map join 的优化,执行过程中发现小表数据量超过了内存限制(大于 60 岁的用户很多),这个时候 query 执行就会由于 oom 而失败,只能重新执行。



但是在实际执行过程中,具体数据量的大小,需要在上游节点完成后才能被感知,因此在提交作业前很难准确的判断是否可以采用 Map join 优化,从上图可以看出在 Map Join 和 Sorted Merge Join 上 DAG 图是两种结构,因此这需要 DAG 调度在执行过程中具有足够的动态性,能够动态的修改 DAG 图来达到执行效率的最优。我们在阿里集团和云上海量业务的实践中发现,类似 map join 优化的这样的例子是很普遍的,从这些例子可以看出,随着大数据平台优化的深入进行,对于 DAG 系统的动态性要求越来越高。


由于业界大部分 DAG 调度框架都在逻辑图和物理图之间没有清晰的分层,缺少执行过程中的动态性,无法满足多种计算模式的需求。例如 spark 社区很早提出了运行时调整 Join 策略的需求(Join: Determine the join strategy (broadcast join or shuffle join) at runtime),但是目前仍然没有解决。


除此上述用户体感明显的场景之外,随着 MaxCompute 计算引擎本身更新换代和优化器能力的增强,以及 PAI 平台的新功能演进,上层的计算引擎自身能力在不断的增强。对于 DAG 组件在作业管理,DAG 执行等方面的动态性,灵活性等方面的需求也日益强烈。在这样的一个大的背景下,为了支撑计算平台下个 10 年的发展,伏羲团队启动了 DAG 2.0 的项目,在更好的支撑上层计算需求。

4.1.2 DAG2.0 动态灵活统一的执行框架

DAG2.0 通过逻辑图和物理图的清晰分层,可扩展的状态机管理,插件式的系统管理,以及基于事件驱动的调度策略等基座设计,实现了对计算平台上多种计算模式的统一管理,并更好的提供了作业执行过程中在不同层面上的动态调整能力。作业执行的动态性和统一 DAG 执行框架是 DAG2.0 的两个主要特色:


作业执行的动态性


如前所诉,分布式作业执行的许多物理特性相关的问题,在作业运行前是无法被感知的。例如一个分布式作业在运行前,能够获得的只有原始输入的一些基本特性(数据量等), 对于一个较深的 DAG 执行而言,这也就意味着只有根节点的物理计划(并发度选择等) 可能相对合理,而下游的节点和边的物理特性只能通过一些特定的规则来猜测。这就带来了执行过程中的不确定性,因此,要求一个好的分布式作业执行系统,需要能够根据中间运行结果的特点,来进行执行过程中的动态调整。


而 DAG/AM 作为分布式作业唯一的中心节点和调度管控节点,是唯一有能力收集并聚合相关数据信息,并基于这些数据特性来做作业执行的动态调整。这包括简单的物理执行图调整(比如动态的并发度调整),也包括复杂一点的调整比如对 shuffle 方式和数据编排方式重组。除此以外,数据的不同特点也会带来逻辑执行图调整的需求:对于逻辑图的动态调整,在分布式作业处理中是一个全新的方向,也是我们在 DAG 2.0 里面探索的新式解决方案。


还是以 map join 优化作为例子,由于 map join 与默认 join 方式(sorted merge join)对应的其实是两种不同优化器执行计划,在 DAG 层面,对应的是两种不同的逻辑图。DAG2.0 的动态逻辑图能力很好的支持了这种运行过程中根据中间数据特性的动态优化,而通过与上层引擎优化器的深度合作,在 2.0 上实现了业界首创的 conditional join 方案。如同下图展示,在对于 join 使用的算法无法被事先确定的时候,分布式调度执行框架可以允许优化提交一个 conditional DAG,这样的 DAG 同时包括使用两种不同 join 的方式对应的不同执行计划支路。在实际执行时,AM 根据上游产出数据量,动态选择一条支路执行(plan A or plan B)。这样子的动态逻辑图执行流程,能够保证每次作业运行时,根据实际产生的中间数据特性,选择最优的执行计划。在这个例子中:


  • 当 M1 输出的数据量较小时,允许其输出被全量载入下游单个计算节点的内存,DAG 就会选择优化的 map join(plan A),来避免额外的 shuffle 和排序。

  • 当 M1 输出的数据量大到一定程度,已经不属于 map join 的适用范围,DAG 就可以自动选择走 merge join,来保证作业的成功执行。



除了 map join 这个典型场景外,借助 DAG2.0 的动态调度能力,MaxCompute 在解决其他用户痛点上也做了很多探索,并取得了不错的效果。例如智能动态并发度调整:在执行过程中依据分区数据统计调整,动态调整并发度;自动合并小分区,避免不必要的资源使用,节约用户资源使用;切分大分区,避免不必要的长尾出现等等。


统一的 AM/DAG 执行框架


除了动态性在 SQL 执行中带来的重大性能提升外,DAG 2.0 抽象分层的点,边,图架构上,也使其能通过对点和边上不同物理特性的描述,对接不同的计算模式。业界各种分布式数据处理引擎,包括 SPARK, FLINK, HIVE, SCOPE, TENSORFLOW 等等,其分布式执行框架的本源都可以归结于 Dryad 提出的 DAG 模型。我们认为对于图的抽象分层描述,将允许在同一个 DAG 系统中,对于离线/实时/流/渐进计算等多种模型都可以有一个好的描述。


如果我们对分布式 SQL 进行细分的话,可以看见业界对于不同场景上的优化经常走在两个极端:要么优化 throughput (大规模,相对高延时),要么优化 latency(中小数据量,迅速完成)。前者以 Hive 为典型代表,后者则以 Spark 以及各种分布式 MPP 解决方案为代表。而在阿里分布式系统的发展过程中,历史上同样出现了两种对比较为显著的执行方式:SQL 线离线(batch)作业与准实时(interactive)作业。这两种模式的资源管理和作业执行,过去是搭建在两套完全分开的代码实现上的。这除了导致两套代码和功能无法复用以外,两种计算模式的非黑即白,使得彼此在资源利用率和执行性能之间无法 tradeoff。而在 DAG 2.0 模型上,通过对点/边物理特性的映射,实现了这两种计算模式比较自然的融合和统一。离线作业和准实时作业在逻辑节点和逻辑边上映射不同的物理特性后,都能得到准确的描述:


  • 离线作业:每个节点按需去申请资源,一个逻辑节点代表一个调度单位;节点间连接边上传输的数据,通过落盘的方式来保证可靠性;

  • 准实时作业:整个作业的所有节点都统一在一个调度单位内进行 gang scheduling;节点间连接边上通过网络/内存直连传输数据,并利用数据 pipeline 来追求最优的性能。


在此统一离线作业与准实时作业的到一套架构的基础上,这种统一的描述方式,使得探索离线作业高资源利用率,以及准实时作业的高性能之间的 tradeoff 成为可能:当调度单位可以自由调整,就可以实现一种全新的混合的计算模式,我们称之为 Bubble 执行模式。



这种混合 Bubble 模式,使得 DAG 的用户,也就是上层计算引擎的开发者(比如 MaxCompute 的优化器),能够结合执行计划的特点,以及引擎终端用户对资源使用和性能的敏感度,来灵活选择在执行计划中切出 Bubble 子图。在 Bubble 内部充分利用网络直连和计算节点预热等方式提升性能,没有切入 Bubble 的节点则依然通过传统离线作业模式运行。在统一的新模型之上,计算引擎和执行框架可以在两个极端之间,根据具体需要,选择不同的平衡点。

4.1.3 效果

DAG2.0 的动态性使得很多执行优化可以运行时决定,使得实际执行的效果更优。例如,在阿里内部的作业中,动态的 conditional join 相比静态的执行计划,整体获得了将近 3X 的性能提升。



混合 Bubble 执行模式平衡了离线作业高资源利用率以及准实时作业的高性能,这在 1TB TPCH 测试集上有显著的体现,


  • Bubble 相对离线作业:在多使用 20%资源的情况下,Bubble 模式性能提升将近一倍;

  • Bubble 相对准实时模式:在节省了 2.6X 资源情况下, Bubble 性能仅下降 15%;

4.2 Fuxi Shuffle 2.0 - 磁盘内存网络的最佳使用

4.2.1 背景

大数据计算作业中,节点间的数据传递称为 shuffle, 主流分布式计算系统都提供了数据 shuffle 服务的子系统。如前述 DAG 计算模型中,task 间的上下游数据传输就是典型的 shuffle 过程。


在数据密集型作业中,shuffle 阶段的时间和资源使用占比非常高,有其他大数据公司研究显示,在大数据计算平台上 Shuffle 阶段均是在所有作业的资源使用中占比超过 50%. 根据统计在 MaxCompute 生产中 shuffle 占作业运行时间和资源消耗的 30-70%,因此优化 shuffle 流程不但可以提升作业执行效率,而且可以整体上降低资源使用,节约成本,提升 MaxCompute 在云计算市场的竞争优势。


从 shuffle 介质来看,最广泛使用的 shuffle 方式是基于磁盘文件的 shuffle. 这种模式这种方式简单,直接,通常只依赖于底层的分布式文件系统,适用于所有类型作业。而在典型的常驻内存的实时/准实时计算中,通常使用网络直连 shuffle 的方式追求极致性能。Fuxi Shuffle 在 1.0 版本中将这两种 shuffle 模式进行了极致优化,保障了日常和高峰时期作业的高效稳定运行。

挑战

我们先以使用最广泛的,基于磁盘文件系统的离线作业 shuffle 为例。


通常每个 mapper 生成一个磁盘文件,包含了这个 mapper 写给下游所有 reducer 的数据。而一个 reducer 要从所有 mapper 所写的文件中,读取到属于自己的那一小块。右侧则是一个系统中典型规模的 MR 作业,当每个 mapper 处理 256MB 数据,而下游 reducer 有 10000 个时,平均每个 reducer 读取来自每个 mapper 的数据量就是 25.6KB, 在机械硬盘 HDD 为介质的存储系统中,属于典型的读碎片现象,因为假设我们的磁盘 iops 能达到 1000, 对应的 throughput 也只有 25MB/s, 严重影响性能和磁盘压力。



分布式作业中并发度的提升往往是加速作业运行的最重要手段之一。但处理同样的数据量,并发度越高意味着上述碎片读现象越严重。通常情况下选择忍受一定的碎片 IO 现象而在集群规模允许的情况下提升并发度,还是更有利于作业的性能。所以碎片 IO 现象在线上普遍存在,磁盘也处于较高的压力水位。


一个线上的例子是,某些主流集群单次读请求 size 为 50-100KB, Disk util 指标长期维持在 90%的警戒线上。这些限制了对作业规模的进一步追求。


我们不禁考虑,作业并发度和磁盘效率真的不能兼得吗?

4.2.2 Fuxi 的答案:Fuxi Shuffle 2.0

引入 Shuffle Service - 高效管理 shuffle 资源


为了针对性地解决上述碎片读问题及其引发的一连串负面效应,我们全新打造了基于 shuffle service 的 shuffle 模式。Shuffle service 的最基本工作方式是,在集群每台机器部署一个 shuffle


agent 节点,用来归集写给同一 reducer 的 shuffle 数据。如下图:



可以看到,mapper 生成 shuffle 数据的过程变为 mapper 将 shuffle 数据通过网络传输给每个 reducer 对应的 shuffle agent, 而 shuffle agent 归集一个 reducer 来自所有 mapper 的数据,并追加到 shuffle 磁盘文件中,两个过程是流水线并行化起来的。


Shuffle agent 的归集功能将 reducer 的 input 数据从碎片变为了连续数据文件,对 HDD 介质相当友好。由此,整个 shuffle 过程中对磁盘的读写均为连续访问。从标准的 TPCH 等测试中可以看到不同场景下性能可取得百分之几十到几倍的提升,且大幅降低磁盘压力、提升 CPU 等资源利用率。


Shuffle Service 的容错机制


Shuffle service 的归集思想在公司内外都有不同的工作展现类似的思想,但都限于“跑分”和小范围使用。因为这种模式对于各环节的错误天生处理困难。


以 shuffle agent 文件丢失/损坏是大数据作业的常见问题为例,传统的文件系统 shuffle 可以直接定位到出错的数据文件来自哪个 mapper,只要重跑这个 mapper 即可恢复。但在前述 shuffle service 流程中,由于 shuffle agent 输出的 shuffle 这个文件包含了来自所有 mapper 的 shuffle 数据,损坏文件的重新生成需要以重跑所有 mapper 为代价。如果这种机制应用于所有线上作业,显然是不可接受的。


我们设计了数据双副本机制解决了这个问题,使得大多数通常情况下 reducer 可以读取到高效的 agent 生成的数据,而当少数 agent 数据丢失的情况,可以读取备份数据,备份数据的重新生成只依赖特定的上游 mapper.



具体来说,mapper 产生的每份 shuffle 数据除了发送给对于 shuffle agent 外,也会按照与传统文件系统 shuffle 数据类似的格式,在本地写一个备份。按前面所述,这份数据写的代价较小但读取的性能不佳,但由于仅在 shuffle agent 那个副本出错时才会读到备份数据,所以对作业整体性能影响很小,也不会引起集群级别的磁盘压力升高。


有效的容错机制使得 shuffle service 相对于文件系统 shuffle,在提供更好的作业性能的同时,因 shuffle 数据出错的 task 重试比例降低了一个数量级,给线上全面投入使用打好了稳定性基础。


线上生产环境的极致性能稳定性


在前述基础功能之上,Fuxi 线上的 shuffle 系统应用了更多功能和优化,在性能、成本、稳定性等方便取得了进一步的提升。举例如下。


  1. 流控和负载均衡


前面的数据归集模型中,shuffle agent 作为新角色衔接了 mapper 的数据发送与数据落盘。分布式集群中磁盘、网络等问题可能影响这条链路上的数据传输,节点本身的压力也可能影响 shuffle agent 的工作状态。当因集群热点等原因使得 shuffle agent 负载过重时,我们提供了必要的流控措施缓解网络和磁盘的压力;和模型中一个 reducer 有一个 shuffle agent 收集数据不同,我们使用了多个 shuffle agent 承担同样的工作,当发生数据倾斜时,这个方式可以有效地将压力分散到多个节点上。从线上表现看,这些措施消除了绝大多数的 shuffle 期间拥塞流控和集群负载不均现象。


  1. 故障 shuffle


agent 的切换


各种软硬件故障导致 shuffle agent 对某个 reducer 的数据工作不正常时,后续数据可以实时切换到其他正常 shuffle agent. 这样,就会有更多的数据可以从 shuffle agent 侧读到,而减少低效的备份副本访问。


  1. Shuffle agent 数据的回追


很多时候发生 shuffle


agent 切换时(如机器下线),原 shuffle agent 生成的数据可能已经丢失或访问不到。在后续数据发送到新的 shuffle agent 同时,Fuxi 还会将丢失的部分数据从备份副本中 load 起来并同样发送给新的 shuffle agent, 使得后续 reducer 所有的数据都可以读取自 shuffle agent 侧,极大地提升了容错情况下的作业性能。


  1. 新 shuffle 模式的探索


前述数据归集模型及全面扩展优化,在线上集群中单位资源处理的数据量提升了约 20%, 而因出错重试的发生频率降至原来文件系统 shuffle 的 5%左右。但这就是最高效的 shuffle 方式了吗?


我们在生产环境对部分作业应用了一种新的 shuffle 模型,这种模型中 mapper 的发送端和 reducer 的接收端都通过一个 agent 节点来中转 shuffle 流量。线上已经有部分作业使用此种方式并在性能上得到了进一步的提升。


内存数据 shuffle


离线大数据作业可能承担了主要的计算数据量,但流行的大数据计算系统中有非常多的场景是通过实时/准实时方式运行的,作业全程的数据流动发生在网络和内存,从而在有限的作业规模下取得极致的运行性能,如大家熟悉的 Spark, Flink 等系统。


Fuxi DAG 也提供了实时/准实时作业运行环境,传统的 shuffle 方式是通过网络直连,也能收到明显优于离线 shuffle 的性能。这种方式下,要求作业中所有节点都要调度起来才能开始运行,限制了作业的规模。而实际上多数场景计算逻辑生成 shuffle 数据的速度不足以填满 shuffle 带宽,运行中的计算节点等待数据的现象明显,性能提升付出了资源浪费的代价。


我们将 shuffle service 应用到内存存储中,以替换 network 传输的 shuffle 方式。一方面,这种模式解耦了上下游调度,整个作业不再需要全部节点同时拉起;另一方面通过精确预测数据的读写速度并适时调度下游节点,可以取得与 network 传输 shuffle 相当的作业性能,而资源消耗降低 50%以上。这种 shuffle 方式还使得 DAG 系统中多种运行时调整 DAG 的能力可以应用到实时/准实时作业中。

4.2.3 收益

Fuxi Shuffle 2.0 全面上线生产集群,处理同样数据量的作业资源比原来节省 15%,仅 shuffle 方式的变化就使得磁盘压力降低 23%,作业运行中发生错误重试的比例降至原来的 5%。



对使用内存 shuffle 的准实时作业,我们在 TPCH 等标准测试集中与网络 shuffle 性能相当,资源使用只有原来的 30%左右,且支持了更大的作业规模,和 DAG 2.0 系统更多的动态调度功能应用至准实时作业。

5. 单机调度

大量分布式作业汇集到一台机器上,如何将单机有限的各种资源合理分配给每个作业使用,从而达到作业运行质量、资源利用率、作业稳定性的多重保障,是单机调度要解决的任务。


典型的互联网公司业务一般区分为离线业务与在线业务两种类型。在阿里巴巴,我们也同样有在线业务如淘宝、天猫、钉钉、Blink 等,这类业务的特点是对响应延迟特别敏感,一旦服务抖动将会出现添加购物车失败、下单失败、浏览卡顿、钉钉消息发送失败等各种异常情况,严重影响用户体验,同时为了应对在 618、双 11 等各种大促的情况,需要提前准备大量的机器。由于以上种种原因,日常状态这些机器的资源利用率不足 10%,产生资源浪费的情况。与此同时,阿里的离线业务又是另外一幅风景,MaxCompute 计算平台承担了阿里所有大数据离线计算业务类型,各个集群资源利用率常态超负载运行,数据量和计算量每年都在保持高速增长。


一方面是在线业务资源利用率不足,另一方面是离线计算长期超负载运行,那么能否将在线业务与离线计算进行混合部署,提升资源利用率同时大幅降低成本,实现共赢。

5.1 三大挑战

如何保障在线服务质量


在线集群的平均 CPU 利用率只有 10%左右,混部的目标就是将剩余的资源提供给 MaxCompute 进行离线计算使用,从而达到节约成本的目的。那么,如何能够保障资源利用率提升的同时又能够保护在线服务不受影响呢?


如何保障离线稳定


当资源发生冲突时,第一反应往往是保护在线,牺牲离线。毕竟登不上淘宝天猫下不了单可是大故障。可是,离线如果无限制的牺牲下去,服务质量将会出现大幅度下降。试想,我在 dataworks 上跑个 SQL,之前一分钟就出结果,现在十几分钟甚至一个小时都跑不出来,大数据分析的同学估计也受不了了。


如何衡量资源质量


电商业务通过富容器的方式集成多种容器粒度的分析手段,但是前文描述过离线作业的特点,如何能够精准的对离线作业资源使用进行资源画像分析,如果能够评估资源受干扰的程度,混部集群的稳定性等问题,是对我们的又一个必须要解决的挑战

5.2 资源隔离分级管理

单机的物理资源总是有限的,按照资源特性可以大体划分为可伸缩资源与不可伸缩资源两大类。CPU、Net、IO 等属于可伸缩资源,Memory 属于不可伸缩资源,不同类型的资源有不同层次的资源隔离方案。另一方面,通用集群中作业类型种类繁多,不同作业类型对资源的诉求是不同的。这里包括在线、离线两个大类的资源诉求,同时也包含了各自内部不同层次的优先级二次划分需求,十分复杂。


基于此,Fuxi2.0 提出了一套基于资源优先级的资源划分逻辑,在资源利用率、多层次资源保障复杂需求寻找到了解决方案。



下面我们将针对 CPU 分级管理进行深入描述,其他维度资源管理策略我们将在今后的文章中进行深入介绍。


CPU 分级管理


通过精细的组合多种内核策略,将 CPU 区分为高、中、低三类优先级



隔离策略如下图所示



基于不同类型的资源对应不同的优先级作业


5.3 资源画像

Fuxi 作为资源调度模块,对资源使用情况的精准画像是衡量资源分配,调查/分析/解决解决资源问题的关键。针对在线作业的资源情况,集团和业界都有较多的解决方案。这类通用的资源采集角色存在以下无法解决的问题无法应用于离线作业资源画像的数据采集阶段


  1. 采集时间精度过低。大部分信息是分钟级别,而 MaxCompute 作业大部分运行时间在秒级。

  2. 无法定位 MaxCompute 信息。MaxCompute 是基于 Cgroup 资源隔离,因此以上工具无法针对作业进行针对性采集

  3. 采集指标不足。有大量新内核新增的微观指标需要进行收集,过去是不支持的



为此,我们提出了 FuxiSensor 的资源画像方案,架构如上图所示,同时利用 SLS 进行数据的收集和分析。在集群、Job 作业、机器、worker 等不同层次和粒度实现了资源信息的画像,实现了秒级的数据采集精度。在混部及 MaxCompute 的实践中,成为资源问题监控、报警、稳定性数据分析、作业异常诊断、资源监控状况的统一入口,成为混部成功的关键指标。

5.4 线上效果

日常资源利用率由 10%提升到 40%以上



在线抖动小于 5%


5.5 单机调度小结

为了解决三大挑战,通过完善的各维度优先级隔离策略,将在线提升到高优先级资源维度,我们保障了在线的服务质量稳定;通过离线内部优先级区分及各种管理策略,实现了离线质量的稳定性保障;通过细粒度资源画像信息,实现了资源使用的评估与分析,最终实现了混部在阿里的大规模推广与应用,从而大量提升了集群资源利用率,为离线计算节省了大量成本。

6. 展望

从 2009 到 2019 年历经十年的锤炼,伏羲系统仍然在不断的演化,满足不断涌现的业务新需求,引领分布式调度技术的发展。接下来,我们会从以下几个方面继续创新:


  • 资源调度 FuxiMaster 将基于机器学习,实现智能化调度策略和动态精细的资源管理模式,进一步提高集群资源利用率,提供更强大灵活的分布式集群资源管理服务。

  • 新一代 DAG2.0 继续利用动态性精耕细作,优化各种不同类型的作业;与 SQL 深入合作,解决线上痛点,推动 SQL 引擎深度优化,提升性能的同时也让 SQL 作业运行更加智能化;探索机器学习场景的 DAG 调度,改善训练作业的效率,提升 GPU 使用率。

  • 数据 Shuffle2.0 则一方面优化 shuffle 流程,追求性能、成本、稳定性的极致,另一方面与 DAG 2.0 深入结合,提升更多场景;同时探索新的软硬件架构带来的新的想象空间。

  • 智能化的精细单机资源管控,基于资源画像信息通过对历史数据分析产生未来趋势预测,通过多种资源管控手段进行精准的资源控制,实现资源利用率和不同层次服务质量的完美均衡。


最后,我们热忱欢迎集团各个团队一起交流探讨,共同打造世界一流的分布式调度系统!


作者介绍:


李超,阿里云智能资深技术专家


2020-05-22 14:193917

评论

发布
暂无评论
发现更多内容

低代码开发:学校低成本数字化转型的新引擎

不在线第一只蜗牛

低代码 数字化

浅析软件开发技术的发展历程与展望:从过去到现在,探索未来趋势

快乐非自愿限量之名

软件开发 项目开发

Java并发编程基础(下)

FunTester

我是怎么用静态IP代理为Google账号保驾护航的

陈橘又青

如何保障服务的高可用:提升可观测性

SFLYQ

高可用 后端 可观测性

C#中使用IntPtr.Size属性来判断当前系统是32位还是64位

百度搜索:蓝易云

C# 云计算 Linux 运维 云服务器

Chatbot具体需要如何搭建

百度搜索:蓝易云

云计算 Linux 运维 chatbot 云服务器

AWS安全组是什么?有什么用?

行云管家

云计算 AWS 安全组 亚马逊云

教你如何拿Merlin Chain空投,附视频教程

石头财经

NGINX Agent 的可观测性和远程配置

NGINX开源社区

鸿蒙Next怎么升级更便捷

FinFish

纯血鸿蒙 鸿蒙化改造 鸿蒙app 鸿蒙app升级 混合app开发

数字化,网络化和未来的AI

agnostic

人工智能 信息化

管理者请注意,要珍惜有愤怒情绪的员工

芃篙君

管理

监管人工智能——未来之路

孤傲小二~阿沐

比特币原生 L2 解决方案 Merlin Chain梅林链科普(bitget wallet)

股市老人

hal库中串口常用函数介绍

百度搜索:蓝易云

云计算 Linux 运维 云服务器 HAL

Qualcomm’s “core”QCN9274 leads WiFi 7 to break through the boundaries of wireless connections

wallysSK

你所在的行业,有必要做小程序么?

天津汇柏科技有限公司

小程序 小程序开发 开发小程序

程序员必会的6个数据可视化库

伤感汤姆布利柏

ETL、ELT区别以及如何正确运用

RestCloud

ETL 数据集成 ELT

除了Sora,还有哪些AI软件工具值得推荐?这30个一定要知道!

彭宏豪95

AI软件 AIGC AI工具

教你如何拿Merlin Chain空投,附视频教程

BlockChain先知

比特币原生 L2 解决方案 Merlin Chain梅林链科普(bitget wallet)

股市老人

可扩展性是什么意思?为什么企业采购软件时候需要考虑可扩展性?

行云管家

软件 可扩展性 采购

orca市值机器人/刷量机器人/做市机器人

区块链技术

一座“超级工厂”:让中国没有流不通的数据

脑极体

数据

深入解析 Java 面向对象编程与类属性应用

小万哥

Java 程序人生 编程语言 软件工程 后端开发

细粒度的代码权限怎么做?极狐GitLab 代码所有者来帮忙

极狐GitLab

一文搞懂设计模式—模板方法模式

Java随想录

Java 设计模式

面向大数据与云计算的阿里经济体核心调度系统Fuxi 2.0全揭秘_AI&大模型_李超_InfoQ精选文章