Uber 机器学习平台 Michelangelo 是如何使用 Spark 模型的?

阅读数:257 2019 年 11 月 8 日 08:00

Uber机器学习平台Michelangelo是如何使用Spark模型的?

Michelangelo 是 Uber 的机器学习(ML)平台,可以训练并服务于整个公司范围内生产环境中的数千种模型。该平台被设计成了一个端到端的工作流,目前支持经典的机器学习、时间序列预测和深度学习模型,可以涵盖大量的用例,从生成市场预测、响应客户支持工单到准确计算预计到达时间(EAT)以及使用自然语言处理(NLP)模型在驾驶员 App 中提供一键式聊天功能。

本文最初发布于 Uber 工程博客,由 InfoQ 中文站翻译并分享。

Uber机器学习平台Michelangelo是如何使用Spark模型的?

Michelangelo 是 Uber 的机器学习(ML)平台,可以训练并服务于整个公司范围内生产环境中的数千种模型。该平台被设计成了一个端到端的工作流,目前支持经典的机器学习、时间序列预测和深度学习模型,可以涵盖大量的用例,从生成市场预测响应客户支持工单准确计算预计到达时间(EAT)以及使用自然语言处理(NLP)模型在驾驶员 App 中提供一键式聊天功能

大多数 Michelangelo 模型都是基于 Apache Spark MLlib 的,这是一个可伸缩的 Apache Spark 机器学习库。为了处理高 QPS 的在线服务,Michelangelo 最初仅通过内部定制的模型序列化和表示支持 Spark MLlib 模型的一个子集,这使得客户无法灵活地试验任意复杂的模型管道,限制了 Michelangelo 的扩展速度。为了解决这些问题,我们改进了 Michelangelo 对 Spark MLlib 的使用,特别是在模型表示、持久性和在线服务方面。

在 Michelangelo 中改进 Spark MLlib 用法的动机

我们最初开发 Michelangelo 是为了为生产环境提供可扩展的机器学习模型。它对基于 Spark 的数据摄取调度、模型训练和评估以及批量和在线模型服务部署提供端到端的支持,在 Uber 获得了广泛的认可。

Uber机器学习平台Michelangelo是如何使用Spark模型的?

图 1 为了将 Spark 用于数据预处理和低延迟服务,并使用 GPU 进行分布式深度学习训练,Michelangelo 针对深度学习用例使用了一致的 Spark 管道架构。

最近,Michelangelo 已经发展到可以处理更多的用例,包括提供在核心 Michelangelo 之外训练过的模型。例如,深度学习模型的扩展和端到端加速训练需要在不同环境中执行操作步骤,以便利用 Spark 的分布式计算转换、Spark Pipelines 在 CPU 上的低延迟服务以及使用 Horovod (Uber 的分布深度学习框架)在 GPU 集群上的分布式深度学习训练。为了使这些需求更容易实现,并保证训练和服务的一致性,有一个一致的架构和模型表示很重要,它可以利用我们现有的低延迟的基于 JVM 的模型服务基础设施,同时提供正确的抽象来封装这些需求。

Uber机器学习平台Michelangelo是如何使用Spark模型的?

图 2 具有活动 Spark 会话的外部环境可以无缝地反序列化来自 Michelangelo 的经过训练的管道模型,并序列化供 Michelangelo 生态系统的其余部分使用的模型。Apache Spark 是 Apache 软件基金会在美国和 / 或其他国家的注册商标。使用这个标识并不意味着 Apache 软件基金会的认可。

另一个动机是使数据科学家能够使用 PySpark 在熟悉的 Jupyter 笔记本(数据科学工作台)中构建和试验任意复杂的模型,同时仍然能够利用 Michelangelo 生态系统可靠地进行分布式训练、部署和服务。这也为集成学习多任务学习技术所需的更复杂的模型结构提供了可能性,同时让用户可以动态地实现数据操作和自定义评估。

因此,我们回顾了 Michelangelo 对 Spark MLlib 和 Spark ML 管道的使用,概括了其模型持久性和在线服务机制,目的是在不影响可伸缩性的情况下实现可扩展性和互操作性。

Michelangelo 最初推出的是一个统一的架构,它负责管理紧耦合的工作流和用于训练和服务的 Spark 作业。Michelangelo 对每种支持的模型类型都有特定的管道模型定义,并使用内部定制的 protobuf 表示经过训练的服务模型。离线服务通过 Spark 处理;在线服务使用添加到 Spark 内部版本的自定义 API 来处理,以实现高效的单行预测。

Uber机器学习平台Michelangelo是如何使用Spark模型的?

图 3 向本地 Spark 序列化和反序列化转变在模型持久化管道阶段(Transformer/Estimator)层面上实现了灵活性和跨环境兼容性。Apache Spark 是 Apache 软件基金会在美国和 / 或其他国家的注册商标。使用这个标识并不意味着 Apache 软件基金会的认可。

最初的架构支持通用机器学习模型(如广义线性模型 GLM 和梯度增强决策树模型 GBDT)的大规模训练和服务,但是自定义的 protobuf 表示使添加对新 Spark 转换器的支持变得困难,并排除了在 Michelangelo 之外训练的模型。当新版本的 Spark 可用时,Spark 的自定义内部版本也使得每次的升级迭代变得复杂。为了提高对新转换器的支持速度,并允许客户将自己的模型引入 Michelangelo,我们考虑了如何改进模型表示并更加顺畅地添加在线服务接口。

Michelangelo 的架构和模型表示法的演变

Uber机器学习平台Michelangelo是如何使用Spark模型的?

图 4 Michelangelo 的架构必须处理由不同功能需求带来的复杂性,并保持训练和服务环境之间的一致性。

Uber 的机器学习工作流通常很复杂,涉及不同的团队、库、数据格式和语言;为了使模型表示和在线服务接口可以恰当地演化,我们需要考虑所有这些维度。

Uber机器学习平台Michelangelo是如何使用Spark模型的?

图 5 部署并提供机器学习管道模型服务包括模型前面的所有转换和操作步骤。

要部署用于服务的机器学习模型,需要部署整个管道,包括模型前面的转换工作流。通常还需要打包数据转换、特征提取、预处理甚至预测后转换。原始预测通常需要解释或转换回标签,在某些情况下需要转换到不同的维度空间,比如日志空间,以便下游服务使用。它也可以用于通过额外的数据加强原始预测,如它们的置信区间,通过概率校准来校准概率。我们想要一个能够反映 Spark MLlib 模型固有管道步骤的模型表示,并允许 Michelangelo 与外部工具进行无缝交互。

选择一种最新的模型表示

在评估可供选择的模型表示时,我们评估了不同的需求,包括:

  • 表示广义转换序列的能力(必需)
  • 处理在线用例的轻量级服务的可扩展性(必需)
  • 支持使用非 Michelangelo 原生 Spark 工具替换存储在 Michelangelo 中的模型(必需)
  • 训练时和服务时的模型解释偏差风险低(非常想要)
  • Spark 更新速度要快,并且易于编写新的估计器 / 转换器(非常想要)

我们考虑的一种方法是使用 MLeap ,这是一个独立的库,它通过一个专用的运行时来执行管道,提供了管道和模型序列化(到 Bundle.ML )和反序列化支持。MLeap 具有所需的表达能力和对轻量级在线服务的支持。但是,它有自己专有的持久化格式,这限制了与序列化和反序列化普通 Spark MLlib 模型的工具集的互操作性。

MLeap 还引入了服务时行为偏离训练时评估的风险,因为在技术上,正在提供服务的模型加载时的格式与训练时内存中的格式不同。MLeap 还降低了 Spark 更新的速度,因为除了 Spark MLlib 本地使用的方法之外,还必须为每个转换器和估计器添加单独的 MLeap 保存 / 加载方法。Databricks 的 ML 模型导出 dbml-local 提供了类似的方法。

我们考虑的另一种方法就是将训练模型导出到一个标准的格式,如预测模型标记语言(PMML) 或可移植分析格式(PFA),它们都具备我们需要的表达能力并且可以和Spark 交互,Spark 直接提供了PMML 支持,而 aardpfark 可以将 Spark 模型导出为 PFA。然而,这些表示还是存在服务时行为偏离方面的风险,我们认为这一风险高于 MLeap,因为一般标准在特定的实现中通常会有不同的解释。这些标准在 Spark 更新速度方面也带来了更大的阻碍,因为根据 Spark 变化的性质,标准本身可能需要更新。

我们发现,最直接的方法是使用标准的 Spark ML 管道序列化来表示模型。Spark ML 管道展示了我们想要的表达能力,允许与 Michelangelo 之外的 Spark 工具集交互,展现出了低风险的模型解释偏差,对 Spark 的更新速度影响较小。它还有助于编写自定义的转换器和估计器。

我们看到,使用 Spark 管道序列化的主要挑战是它与在线服务需求的不兼容性(Nick Pentreath 在 2018 年的 Spark AI 峰会上也讨论了这一点)。这种方法会启动一个本地 Spark 会话,并用它来加载一个训练好的 Spark MLlib 模型,这相当于在单台主机上运行一个小型集群,内存开销和延迟都很大,使它不适合许多在线服务场景要求的 p99 毫秒级延迟。虽然现有的用于提供服务的 Spark API 集在性能上还不足以满足 Uber 的用例,但我们发现,我们可以在这种开箱即用的体验中进行许多直接的更改,以满足我们的需求。

为了提供用于在线服务的轻量级接口,我们将 anOnlineTransformer添加到可以提供在线服务的转换器中,包括利用低级 Spark 预测方法的单个方法和小型方法列表。我们还调整了模型加载的性能,以满足我们的目标开销要求。

使用增强型转换器和估计器的管道

为了实现一个可以由 Michelangelo 在线训练并提供服务的 Transformer 或 Estimator,我们构建了一个 OnlineTransformer 接口,它扩展了开箱即用的 Spark Transformer 接口,并执行两个方法:1)Transform(instance: Dataset[Any]) ;2)ScoreInstance(instance: Map[String, Any])。

Uber机器学习平台Michelangelo是如何使用Spark模型的?

Transform(instance: Dataset[Object]) 作为分布式批处理服务的入口,提供了开箱即用的基于数据集的执行模型。ScoreInstance(instance: Map[String, Object]): Map[String, Object] 作为较为轻量级的 API,用于低延迟、实时服务场景中出现的针对单一特征值映射集的单行预测请求。ScoreInstance 背后的动机是提供一个更轻量级的 API,它可以绕过依赖于 Spark SQL Engine 的 Catalyst Optimizer 的 Dataset 所带来的巨大开销,从而对每个请求进行查询规划和优化。如上所述,这对于实时服务场景(如市场营销和欺诈检测)非常重要,其中 p99 延迟的 SLA 通常是毫秒级的。

当加载 Spark PipelineModel 时,任何具有相似类(包含 OnlineTransformer 特性)的 Transformer 都会映射到该类。这使得现有的训练好的 Spark 模型(由支持的转换器组成)获得了提供在线服务的能力,而且没有任何额外的工作。注意,OnlineTransformer 还实现了 Spark 的 MLWritable 和 MLReadable 接口,这为 Spark 免费提供了对序列化和反序列化的本地支持。

保持在线和离线服务一致性

转向标准的 PipelineModel 驱动的架构,通过消除 PipelineModel 之外的任何自定义预评分和后评分实现,进一步加强了在线和离线服务准确性之间的一致性。在每个管道阶段,实现自定义评分方法时的标准实践是首先实现一个公共评分函数。在离线转换中,它可以作为 DataFrame 输入上的一组 Spark 用户定义函数(UDF)运行,相同的评分函数也可以应用于在线 scoreInstance 和 scoreInstances 方法。在线和离线评分一致性将通过单元测试和端到端集成测试进一步保证。

性能优化

我们最初的度量结果显示,与我们自定义的 protobuf 表示的加载延迟相比,原生 Spark 管道加载延迟非常高,如下表所示:

Uber机器学习平台Michelangelo是如何使用Spark模型的?

这种序列化模型加载时间上的性能差异对于在线服务场景是不可接受的。模型实际上被分片到每个在线预测服务实例中,并在每个服务实例启动时、新模型部署期间或接收到针对特定模型的预测请求时加载。在我们的多租户模型服务设置中,过长的加载时间会影响服务器资源的敏捷性和健康状况监控。我们分析了加载延迟的来源,并进行了一些调优更改。

影响所有转换器加载时间的一个开销来源是 Spark 本地使用 sc.textFile 读取转换器元数据;从一个小的单行文件生成一个字符串 RDD 非常慢。对于本地文件,用 Java I/O 替换这段代码要快得多:

复制代码
[loadMetadata in src/main/scala/org/apache/spark/ml/util/ReadWrite.scala]

在我们的用例中(例如,LogisticRegression、StringIndexer 和 LinearRegression),影响许多转换器的另一个开销来源是对与转换器相关的少量数据使用 Spark 分布式 read/select 命令。对于这些情况,我们使用 ParquetUtil.read 代替 sparkSession.read.parquet;直接进行 Parquet read/getRecord 大大降低了转换器的加载时间。

树集成(Tree ensemble)转换器有一些特殊的调优机会。加载树集成模型需要将序列化到磁盘的模型元数据文件读取到磁盘,这会触发小文件的 groupByKey、sortByKey 以及 Spark 的分布式 read/select/sort/collect 操作,这些操作非常慢。我们直接用更快的 Parquet read/getRecord 代替了它们。在树集成模型保存方面,我们合并了树集成节点和元数据权重 DataFrame,以避免写入大量读起来很慢的小文件。

通过这些调优工作,我们能够将基准示例的本地 Spark 模型加载时间从 8 到 44 倍减少到仅比从自定义 protobuf 加载慢 2 到 3 倍,这相当于比 Spark 原生模型快 4 到 15 倍。考虑到使用标准表示的好处,这种水平的开销是可以接受的。

Uber机器学习平台Michelangelo是如何使用Spark模型的?

需要注意的是,Michelangelo 在线服务创建了一个本地的 SparkContext 来处理任何未加密的转换器的负载,因此,在线服务不需要 SparkContext。我们发现,当没有模型加载处于活动状态时,让 SparkContext 继续运行会对性能产生负面影响并导致延迟,例如,通过 SparkContext 清理器的操作。为了避免这种影响,我们在没有运行负载时停止 SparkContext。

可服务管道的灵活结构

使用管道模型作为 Michelangelo 的模型表示,用户可以灵活地组合和扩展可服务组件单元,使它们在线和离线服务时保持一致。然而,这并没有完全封装管道模型在机器学习工作流的各个阶段使用时的操作需求差异。有些操作步骤或概念本质上与机器学习工作流的特定阶段相关,而与其他阶段完全无关。例如,当用户对模型进行评估和迭代时,常常需要进行超参数优化、交叉验证以及生成模型解释及评估所需的特定元数据等操作。这些步骤允许用户帮助生成模型、与模型交互以及评估管道模型,但是一旦准备好进行产品化,就不应该将这些步骤合并到模型服务中。

同时,在机器学习工作流不同阶段的需求差异,促使我们开发了一个基于通用编排引擎的工作流和操作员框架。除了组合自定义可服务管道模型的灵活性之外,这还允许用户以有向图或工作流的形式组合和编排自定义操作的执行,以实现最终的可服务管道模型,如图 6 所示:

Uber机器学习平台Michelangelo是如何使用Spark模型的?

图 6 Michelangelo 基于 Operator Framework 的工作流提供了另一种程度的灵活性,通过优化的执行计划来方便地定制操作,从而生成可服务的、序列化的 Michelangelo 管道模型以及有用的工件。Apache Spark 是 Apache 软件基金会在美国和 / 或其他国家的注册商标。使用这个标记并不意味着 Apache 软件基金会的认可。Docker 和 Docker 标识是 Docker 公司在美国和 / 或其他国家的商标或注册商标。Docker 公司及其他各方也可以在本协议中使用的其他条款中享有商标权。本标记的使用并不意味着 Docker 的认可。TensorFlow、TensorFlow 标识及任何相关标识均为谷歌公司的商标。

未来展望

在这一点上,Spark 原生模型表示已经在 Michelangelo 生产环境中运行了一年多,作为一种健壮且可扩展的方法,在我们公司范围内提供 ML 模型。

得益于这种演变和 Michelangelo 平台的其他更新,Uber 的技术栈可以支持新的用例(如灵活的试验和在 Uber 的数据科学工作台中训练模型)、Jupyter 笔记本环境以及使用 TFTransformers 的端到端深度学习。为了介绍我们的经验并帮助其他人扩展他们自己的 ML 建模解决方案,我们在 2019 年 4 月的 Spark AI 峰会上讨论了这些更新,并提交了 SPIP 和 JIRA,把我们对 Spark MLlib 的更改开源。

原文链接:

Evolving Michelangelo Model Representation for Flexibility at Scale

评论

发布