Spark 上的深度学习框架再添新兵:Yahoo 开源 TensorFlowOnSpark

  • 刘志勇

2017 年 2 月 15 日

话题:语言 & 开发架构深度学习Spark

前言

2 月 13 日,雅虎宣布开源TensorFlowOnSpark。TensorFlowOnSpark 为 Apache Hadoop 和 Apache Spark 集群带来可扩展的深度学习。 通过结合深入学习框架 TensorFlow 和大数据框架 Apache Spark 、Apache Hadoop 的显着特征,TensorFlowOnSpark 能够在 GPU 和 CPU 服务器集群上实现分布式深度学习。

Yahoo Big ML 团队成员 Lee Yang、Jun Shi、Bobbie Chern 和 Andy Feng 日前合著了一篇文章,详细介绍了他们开源的 TensorFlowOnSpark 的方方面面。InfoQ 翻译并整理本文。

正文

深度学习(DL)在最近几年快马加鞭地发展。在 Yahoo,我们发现,为了从海量数据中获得洞察力,需要部署分布式深度学习。现有的 DL 框架通常需要为深度学习设置单独的集群,迫使我们为机器学习流程创建多个程序(见图 1)。拥有独立的集群需要我们在它们之间传递大型数据集,从而引起不必要的系统复杂性和端到端的学习延迟。

去年我们通过开发和发布CaffeOnSpark来解决 scaleout 问题,我们的开源框架,支持在相同的 Spark 和 Hadoop 集群进行分布式深度学习和大数据处理。我们在 Yahoo 使用 CaffeOnSpark 来改善我们的NSFW 图像检测,比如自动从现场直播等自动识别电竞比赛等。借助社区的宝贵意见和贡献,CaffeOnSpark 已经升级,支持 LSTM,带有一个新的数据层,可用于训练和测试交错,还有一个 Python API 以及在 Docker 容器上的部署。对我们来说,这些极大提升了用户体验。但对于那些使用深层学习框架TensorFlow的用户怎么办呢 ?于是我们仿效之前的做法,开发了 TensorFlowOnSpark。

在 TensorFlow 的首次发布后,谷歌在 2016 年 4 月发布了增强的 TensorFlow 与分布式深度学习功能。在 2016 年 10 月,TensorFlow 宣布支持 HDFS。然而,在 Google 云之外,用户仍然需要一个专用于 TensorFlow 应用程序的集群。TensorFlow 程序不能部署在现有的大数据集群上,从而增加了那些希望大规模利用这种技术的成本和延迟。

为了打破这个限制,一些社区项目将 TensorFlow 连接到 Spark 集群。SparkNet在 Spark 执行器添加了运行 TensorFlow 网络的能力。DataBricks 提出TensorFrame,用来使用 TensorFlow 程序操纵 Apache Spark 的 DataFrames(数据帧)。虽然这些方法是在正确的方向迈出了一步,但我们检查其代码后,发现我们无法使多个 TensorFlow 进程直接相互通信,我们也无法实现异步分布式学习,我们还必须花费大量精力来迁移现有的 TensorFlow 程序。

TensorFlowOnSpark

我们的新框架 TensorFlowOnSpark(TFoS),支持 TensorFlow 在 Spark 和 Hadoop 集群上分布式执行。如上图 2 所示,TensorFlowOnSpark 被设计为与 SparkSQL、MLlib 和其他 Spark 库一起在一个单独流水线或程序(如 Python notebook)中运行。

TensorFlowOnSpark 支持所有类型的 TensorFlow 程序,可以实现异步和同步的训练和推理。它支持模型并行性和数据的并行处理,以及 TensorFlow 工具(如 Spark 集群上的 TensorBoard)。

任何 TensorFlow 程序都可以轻松地修改为在 TensorFlowOnSpark 上运行。通常情况下,需要改变的 Python 代码少于 10 行。许多 Yahoo 平台使用 TensorFlow 的开发人员很容易迁移 TensorFlow 程序,以便在 TensorFlowOnSpark 上执行。

TensorFlowOnSpark 支持 TensorFlow 进程(计算节点和参数服务节点)之间的直接张量通信。过程到过程的直接通信机制使 TensorFlowOnSpark 程序能够在增加的机器上很轻松的进行扩展。如图 3 所示,TensorFlowOnSpark 不涉及张量通信中的 Spark 驱动程序,因此实现了与独立 TensorFlow 集群类似的可扩展性。

TensorFlowOnSpark 提供两种不同的模式来提取训练和推理数据:

  1. **TensorFlow QueueRunners:**TensorFlowOnSpark 利用 TensorFlow 的file readersQueueRunners直接从 HDFS 文件中读取数据。Spark 不涉及访问数据。
  2. Spark Feeding :Spark RDD 数据被传输到每个 Spark 执行器里,随后的数据将通过feed_dict传入 TensorFlow 图。

图 4 说明初始图像分类中同时进行的分布式训练如何使用 TFoS 中通过 QueueRunners 的一个简单设置进行扩展:每个节点一个 GPU、一个读入以及批处理为 32。四个 TFoS 工作同时进行,训练 100,000 步。两天后,当这些工作完成时,这些工作的前 5 个准确度分别为 0.730、0.814、0.854 和 0.879。精确度达到 0.730 的单计算节点工作需要 46 小时,对于双计算节点则需要 22.5 小时,4 计算节点需要 13 小时,8 计算节点工需要 7.5 小时。TFoS 因此实现了接近模型训练的近线性可扩展性。这是非常令人鼓舞的,虽然 TFoS 可扩展性会因不同的型号和超级数而有所不同。

分布式 TensorFlow 的 RDMA

在 Yahoo 的 Hadoop 集群上,GPU 节点通过以太网和 Infiniband 连接。Infiniband 提供更快的连接,并支持通过 RDMA 直接访问其他服务器的内存。然而,当前 TensorFlow 版本仅支持使用 gRPC}通过以太网的分布式学习。为了加快分布式学习,我们增强了 TensorFlow C ++ 层,以支持 Infiniband 上的 RDMA。

为结合我们发布的 TFoS,我们除了默认的“GRPC”协议外,还引入了新的 TensorFlow 服务器协议。任何分布式 TensorFlow 程序可以通过指定利用tf.train.ServerDef()tf.train.Server()中的protocol="grpc_rdma"来使用增强版的 TensorFlow。

使用此新协议,就需要创建 RDMA 汇集管理器以确保张量直接写入远程服务器的内存。我们最小化张量缓冲区的创建:Tensor 缓冲区在开始时分配一次,然后在一个 TensorFlow 作业的所有训练步骤中重复使用。从我们早期的实验与大型模型(如VGG-19 网络)来看,业已证明,与现有 GRPC 相比,我们的 TDMA 实现在训练时间上显著加速了。

由于支持 RDMA 是一个高度要求的能力(见 TensorFlow issue#2916),我们决定把现有的实现版本作为一个 alpha 版向 TensorFlow 社区开放。在接下来的几周内,我们将进一步优化 RDMA 实现,并分享一些详细的基准测试结果。

简单的 CLI 和 API

TFoS 程序由标准的 Apache Spark 命令spark-submit来启动。如下图所示,用户可以在 CLI 中指定 Spark 执行器的数目,每个执行器的 GPU 数量和参数服务器的数目。用户还可以指定是否要使用 TensorBoard(-tensorboard)和 / 或 RDMA(-rdma)。

      spark-submit –master ${MASTER} \ 
      ${TFoS_HOME}/examples/slim/train_image_classifier.py \ 
      –model_name inception_v3 \
      –train_dir hdfs://default/slim_train \ 
      –dataset_dir hdfs://default/data/imagenet \
      –dataset_name imagenet \
      –dataset_split_name train \
      –cluster_size ${NUM_EXEC} \
      –num_gpus ${NUM_GPU} \
      –num_ps_tasks ${NUM_PS} \
      –sync_replicas \
      –replicas_to_aggregate ${NUM_WORKERS} \
      –tensorboard \
      –rdma  

TFoS 提供了一个高层次的 Python API(在我们示例 Python notebook说明):

  • TFCluster.reserve() … construct a TensorFlow cluster from Spark executors
  • TFCluster.start() … launch Tensorflow program on the executors
  • TFCluster.train() or TFCluster.inference() … feed RDD data to TensorFlow processes
  • TFCluster.shutdown() … shutdown Tensorflow execution on executors

开放源码

TensorFlowOnSparkTensorFlow 的 RDMA 增强包、多个示例程序(包括 MNIST,cifar10,创建以来,VGG)来说明 TensorFlow 方案 TensorFlowOnSpark,并充分利用 RDMA 的简单转换过程。亚马逊机器映像也对 AWS EC2 应用 TensorFlowOnSpark。


感谢杜小芳对本文的审校。

给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ@丁晓昀),微信(微信号:InfoQChina)关注我们。

语言 & 开发架构深度学习Spark