最新发布《数智时代的AI人才粮仓模型解读白皮书(2024版)》,立即领取! 了解详情
写点什么

深度学习利器:分布式 TensorFlow 及实例分析

  • 2016-12-07
  • 本文字数:6594 字

    阅读完需:约 22 分钟

TensorFlow 发展及使用简介

2015 年 11 月 9 日谷歌开源了人工智能系统 TensorFlow,同时成为 2015 年最受关注的开源项目之一。TensorFlow 的开源大大降低了深度学习在各个行业中的应用难度。TensorFlow 的近期里程碑事件主要有:

2016 年 11 月 09 日:TensorFlow 开源一周年。

2016 年 09 月 27 日:TensorFlow 支持机器翻译模型。

2016 年 08 月 30 日:TensorFlow 支持使用 TF-Slim 接口定义复杂模型。

2016 年 08 月 24 日:TensorFlow 支持自动学习生成文章摘要模型。

2016 年 06 月 29 日:TensorFlow 支持 Wide & Deep Learning。

2016 年 06 月 27 日:TensorFlow v0.9 发布,改进了移动设备的支持。

2016 年 05 月 12 日:发布 SyntaxNet,最精确的自然语言处理模型。

2016 年 04 月 29 日:DeepMind 模型迁移到 TensorFlow。

2016 年 04 月 14 日:发布了分布式 TensorFlow。

TensorFlow 是一种基于图计算的开源软件库,图中节点表示数学运算,图中的边表示多维数组(Tensor)。TensorFlow 是跨平台的深度学习框架,支持 CPU 和 GPU 的运算,支持台式机、服务器、移动平台的计算,并从 r0.12 版本开始支持 Windows 平台。Tensorflow 提供了各种安装方式,包括 Pip 安装,Virtualenv 安装,Anaconda 安装,docker 安装,源代码安装。 本文主要介绍 Pip 的安装方式,Pip 是一个 Python 的包安装及管理工具。Linux 系统下,使用 Pip 的安装流程如下:

yum install python-pip python-dev

export TF_BINARY_URL=https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl

pip install --upgrade $TF_BINARY_URL

安装完毕后,TensorFlow 会安装到 /usr/lib/python2.7/site-packages/tensorflow 目录下。使用 TensorFlow 之前,我们需要先熟悉下常用 API。

tf.random_uniform([1], -1.0, 1.0):构建一个 tensor, 该 tensor 的 shape 为 [1],该值符合 [-1, 1) 的均匀分布。其中 [1] 表示一维数组,里面包含 1 个元素。

tf.Variable(initial_value=None):构建一个新变量,该变量会加入到 TensorFlow 框架中的图集合中。

tf.zeros([1]):构建一个 tensor, 该 tensor 的 shape 为 [1], 里面所有元素为 0。

tf.square(x, name=None):计算 tensor 的平方值。

tf.reduce_mean(input_tensor):计算 input_tensor 中所有元素的均值。

tf.train.GradientDescentOptimizer(0.5):构建一个梯度下降优化器,0.5 为学习速率。学习率决定我们迈向(局部)最小值时每一步的步长,设置的太小,那么下降速度会很慢,设的太大可能出现直接越过最小值的现象。所以一般调到目标函数的值在减小而且速度适中的情况。

optimizer.minimize(loss):构建一个优化算子操作。使用梯度下降法计算损失方程的最小值。loss 为需要被优化的损失方程。

tf.initialize_all_variables():初始化所有 TensorFlow 的变量。

tf.Session():创建一个 TensorFlow 的 session,在该 session 种会运行 TensorFlow 的图计算模型。

sess.run():在 session 中执行图模型的运算操作。如果参数为 tensor 时,可以用来求 tensor 的值。

下面为使用 TensorFlow 中的梯度下降法构建线性学习模型的使用示例:

复制代码
#导入 TensorFlow python API 库
import tensorflow as tf
import numpy as np
#随机生成 100 点(x,y)
x_data = np.random.rand(100).astype(np.float32)
y_data = x_data * 0.1 + 0.3
#构建线性模型的 tensor 变量 W, b
W = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
b = tf.Variable(tf.zeros([1]))
y = W * x_data + b
#构建损失方程,优化器及训练模型操作 train
loss = tf.reduce_mean(tf.square(y - y_data))
optimizer = tf.train.GradientDescentOptimizer(0.5)
train = optimizer.minimize(loss)
#构建变量初始化操作 init
init = tf.initialize_all_variables()
#构建 TensorFlow session
sess = tf.Session()
#初始化所有 TensorFlow 变量
sess.run(init)
#训练该线性模型,每隔 20 次迭代,输出模型参数
for step in range(201):
sess.run(train)
if step % 20 == 0:
print(step, sess.run(W), sess.run(b))

分布式 TensorFlow 应用架构

2016 年 4 月 14 日,Google 发布了分布式 TensorFlow,能够支持在几百台机器上并行训练。分布式的 TensorFlow 由高性能的 gRPC 库作为底层技术支持。TensorFlow 集群由一系列的任务组成,这些任务执行 TensorFlow 的图计算。每个任务会关联到 TensorFlow 的一个服务,该服务用于创建 TensorFlow 会话及执行图计算。TensorFlow 集群也可以划分为一个或多个作业,每个作业可以包含一个或多个任务。在一个 TensorFlow 集群中,通常一个任务运行在一个机器上。如果该机器支持多 GPU 设备,可以在该机器上运行多个任务,由应用程序控制任务在哪个 GPU 设备上运行。

常用的深度学习训练模型为数据并行化,即 TensorFlow 任务采用相同的训练模型在不同的小批量数据集上进行训练,然后在参数服务器上更新模型的共享参数。TensorFlow 支持同步训练和异步训练两种模型训练方式。

异步训练即 TensorFlow 上每个节点上的任务为独立训练方式,不需要执行协调操作,如下图所示:

同步训练为 TensorFlow 上每个节点上的任务需要读入共享参数,执行并行化的梯度计算,然后将所有共享参数进行合并,如下图所示:

分布式 TensorFlow 应用开发 API 主要包括:

tf.train.ClusterSpec({“ps”: ps_hosts, “worker”: worker_hosts}): 创建 TensorFlow 集群描述信息,其中 ps,worker 为作业名称,ps_hosts,worker_hosts 为该作业的任务所在节点的地址信息。示例如下:

cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", "worker1.example.com:2222", "worker2.example.com:2222"], "ps": ["ps0.example.com:2222", "ps1.example.com:2222"]})tf.train.Server(cluster, job_name, task_index):创建一个 TensorFlow 服务,用于运行相应作业上的计算任务,运行的任务在 task_index 指定的机器上启动。

tf.device(device_name_or_function):设定在指定的设备上执行 Tensor 运算,示例如下:

复制代码
#指定在 task0 所在的机器上执行 Tensor 的操作运算
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)

分布式 TensorFlow MNIST 模型训练

MNIST 是一个手写数字的图片数据库,可从网站 http://yann.lecun.com/exdb/mnist/ 下载相关数据,其中的每一张图片为 0 到 9 之间的手写数字灰度图片,大小为 28*28 像素,如下图所示:

MNIST 数据集主要包含训练样本 60000 个,测试样本 10000 个。图像数据主要为图片的像素数据,图像数据标签主要表示该图片的类别。由以下四个文件组成:

train-images-idx3-ubyte.gz (训练图像数据 60000 个)

train-labels-idx1-ubyte.gz (训练图像数据标签 60000 个)

t10k-images-idx3-ubyte.gz (测试图像数据 10000 个)

t10k-labels-idx1-ubyte.gz (测试图像数据标签 10000 个)

本文采用如下的结构对 MNIST 数据集进行分布式训练,由三个节点组成。ww01 节点为 Parameter Server,ww02 节点为 Worker0,ww03 节点为 Worker1。其中 Parameter Server 执行参数更新任务,Worker0,Worker1 执行图模型训练计算任务,如下图所示。分布式 MNIST 训练模型在执行十万次迭代后,收敛精度达到 97.77%。

在 ww01 节点执行如下命令,启动参数服务 /job:ps/task:0:

python asyncmnist.py --ps_hosts=ww01:2222 --worker_hosts=ww02:2222,ww03:2222 --job_name=ps --task_index=0

在 ww02 节点执行如下命令,启动模型运算 /job:worker/task:0:

python asyncmnist.py --ps_hosts=ww01:2222 --worker_hosts=ww02:2222,ww03:2222 --job_name=worker --task_index=0

在 ww03 节点执行如下命令,启动模型运算 /job:worker/task:1:

python asyncmnist.py --ps_hosts=ww01:2222 --worker_hosts=ww02:2222,ww03:2222 --job_name=worker --task_index=1

分布式 MNIST 的训练模型如下:

复制代码
import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
# TensorFlow 集群描述信息,ps_hosts 表示参数服务节点信息,worker_hosts 表示 worker 节点信息
tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
# TensorFlow Server 模型描述信息,包括作业名称,任务编号,隐含层神经元数量,MNIST 数据目录以及每次训练数据大小(默认一个批次为 100 个图片)
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100, "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "MNIST_data", "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")
FLAGS = tf.app.flags.FLAGS
#图片像素大小为 28*28 像素
IMAGE_PIXELS = 28
def main(_):
#从命令行参数中读取 TensorFlow 集群描述信息
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# 创建 TensorFlow 集群描述对象
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# 为本地执行 Task,创建 TensorFlow 本地 Server 对象.
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
#如果是参数服务,直接启动即可
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
#分配操作到指定的 worker 上执行,默认为该节点上的 cpu0
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
# 定义 TensorFlow 隐含层参数变量,为全连接神经网络隐含层
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# 定义 TensorFlow softmax 回归层的参数变量
sm_w = tf.Variable(tf.truncated_normal([FLAGS.hidden_units, 10], tddev=1.0 / math.sqrt(FLAGS.hidden_units)), name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
#定义模型输入数据变量(x 为图片像素数据,y_ 为手写数字分类)
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
#定义隐含层及神经元计算模型
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
#定义 softmax 回归模型,及损失方程
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
#定义全局步长,默认值为 0
global_step = tf.Variable(0)
#定义训练模型,采用 Adagrad 梯度下降法
train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)
#定义模型精确度验证模型,统计模型精确度
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
#对模型定期做 checkpoint,通常用于模型回复
saver = tf.train.Saver()
#定义收集模型统计信息的操作
summary_op = tf.merge_all_summaries()
#定义操作初始化所有模型变量
init_op = tf.initialize_all_variables()
#创建一个监管程序,用于构建模型检查点以及计算模型统计信息。
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), logdir="/tmp/train_logs", init_op=init_op, summary_op=summary_op, saver=saver, global_step=global_step, save_model_secs=600)
#读入 MNIST 训练数据集
mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
#创建 TensorFlow session 对象,用于执行 TensorFlow 图计算
with sv.managed_session(server.target) as sess:
step = 0
while not sv.should_stop() and step < 1000:
# 读入 MNIST 的训练数据,默认每批次为 100 个图片
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
#执行分布式 TensorFlow 模型训练
_, step = sess.run([train_op, global_step], feed_dict=train_feed)
#每隔 100 步长,验证模型精度
if step % 100 == 0:
print "Done step %d" % step
print(sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
# 停止 TensorFlow Session
sv.stop()
if __name__ == "__main__":
tf.app.run()

梯度下降法在分布式 TensorFlow 中的性能比较分析

2016 年谷歌在 ICLR(the International Conference on Learning Representations) Workshop 上发表了论文 REVISITING DISTRIBUTED SYNCHRONOUS SGD。基于 ImageNet 数据集,该论文对异步随机梯度下降法(Async-SGD)和同步随机梯度下降法(Sync-SGD)进行了比较分析。

Dean 在 2012 年提出了分布式随机梯度下降法,模型参数可以分布式地存储在不同的服务器上,称之为参数服务器(Parameter Server,PS),以及 Worker 节点可以并发地处理训练数据并且能够和参数服务通信获取模型参数。异步随机梯度下降法(Async-SGD),主要由以下几个步骤组成:

  • 针对当前批次的训练数据,从参数服务器获取模型的最新参数。
  • 基于上述获取到的模型参数,计算损失方程的梯度。
  • 将上述计算得到的梯度发送回参数服务器,并相应地更新模型参数。

同步随机梯度下降法(Sync-SGD)与 Sync-SGD 的主要差异在于参数服务器将等待所有 Worker 发送相应的梯度值,并聚合这些梯度值,最后把更新后的梯度值发送回节点。

Async-SGD 的主要问题是每个 Worker 节点计算的梯度值发送回参数服务器会有参数更新冲突,一定程度影响算法的收敛速度。Sync-SGD 算法能够保证在数据集上执行的是真正的随机梯度下降法,消除掉了参数的更新冲突。但同步随机梯度下降法同时带来的问题是训练数据的批量数据会比较大,参数服务器上参数的更新时间依赖于最慢的 worker 节点。

为了解决有些 worker 节点比较慢的问题,我们可以使用多一点的 Worker 节点,这样 Worker 节点数变为 N+N*5%,N 为集群 Worker 节点数。Sync-SGD 可以设定为在接受到 N 个 Worker 节点的参数后,可以直接更新参数服务器上的模型参数,进入下一个批次的模型训练。慢节点上训练出来的参数是会被丢弃掉。我们称这种方法为 Sync-SGD with backups。

2015 年,Abadi 使用 TensorFlow 的 Async-SGD, Sync-SGD,Sync-SGD with backups 训练模型对 ImageNet 的 Benchmark 问题进行了实验分析。要对该训练数据进行 1000 种图片的分类训练,实验环境为 50 到 200 个的 worker 节点,每个 worker 节点上运行 k40 GPU。使用分布式 TensorFlow 后大大缩短了模型训练时间,Async-SGD 算法实验结果如下,其中 200 个节点的训练时间比采用 25 个节点的运算时间缩短了 8 倍,如下图所示。

下图为 50 个 Worker 节点的 Async-SGD, Sync-SGD,Sync-SGD with backups 模型训练结果的比较。

从结果中可以看出增加 2 个 backup 节点,Sync-SGD with backups 模型可以快速提升模型训练速度。同时 Sync-SGD 模型比 Async-SGD 模型大概提升了 25% 的训练速度,以及 0.48% 的精确度。随着数据集的增加,分布式训练的架构变得越来越重要。而分布式 TensorFlow 正是解决该问题的利器,有效地提升了大规模模型训练的效率,提供了企业级的深度学习解决方案。

作者简介

武维(微信:allawnweiwu):西安交通大学博士,现为 IBM Spectrum Computing 研发工程师。主要从事大数据,深度学习,云计算等领域的研发工作。

公众号推荐:

跳进 AI 的奇妙世界,一起探索未来工作的新风貌!想要深入了解 AI 如何成为产业创新的新引擎?好奇哪些城市正成为 AI 人才的新磁场?《中国生成式 AI 开发者洞察 2024》由 InfoQ 研究中心精心打造,为你深度解锁生成式 AI 领域的最新开发者动态。无论你是资深研发者,还是对生成式 AI 充满好奇的新手,这份报告都是你不可错过的知识宝典。欢迎大家扫码关注「AI前线」公众号,回复「开发者洞察」领取。

2016-12-07 03:5126245

评论

发布
暂无评论
发现更多内容

图文并茂!你管这破玩意儿叫TCP?

热爱java的分享家

Java TCP 程序人生 编程语言 经验分享

秀出新天际的SpringBoot笔记,让开发像搭积木一样简单

热爱java的分享家

Java 架构 程序人生 编程语言 经验分享

混沌工程:分布式系统稳定性的“疫苗”

中原银行

微服务 云原生 混沌工程

Flux 源码之reactor 核心原理及概述

漫游指南

reactor Flux

Go语言学习查缺补漏ing Day4

Regan Yue

Go 语言 11月日更

前端如何低门槛开发iOS、Android、小程序多端应用

YonBuilder低代码开发平台

React性能优化

CRMEB

欢迎 Apache ShardingSphere 社区海外新晋 Committer!

SphereEx

开源社区 ShardingSphere SphereEx Committer

如何使用SAP CRM Marketing Survey创建一个市场问卷调查

Jerry Wang

SAP abap 11月日更 Survey

Shopee 末端物流智能提效之路

Shopee技术团队

人工智能 算法 后端 供应链 物流

TDengine助力顺丰科技大数据监控改造

TDengine

tdengine 时序数据库

Docker心经

卫先生

Python 编译器 编译器远程连接docker docker常用命令 docker总结

业务流程建模,你真的做对了吗

明道云

人脸识别实战:使用Python OpenCV 和深度学习进行人脸识别

AI浩

人脸识别

供应链安全隐患迫在眉睫,2021年全球APT攻击暗藏何种趋势?

科技热闻

Alibaba 新产 SpringCloud Aliababa(全彩第四版)开源

Geek_1df311

Java 编程 架构 微服务

第四范式x英特尔“AI应用与异构内存编程挑战赛”圆满收官

第四范式开发者社区

前端避坑指南丨辛辛苦苦开发的APP竟然被判定为简单网页打包?

YonBuilder低代码开发平台

太顶了!华为高工用一份423页的网络协议笔记把计算机网络讲清了

热爱java的分享家

Java 面试 程序人生 网络协议 经验分享

我们是如何使用 PingCode Flow 实现研发自动化管理的?

PingCode

项目管理 敏捷开发 PingCode

百万关注的CSRF攻击是什么意思?

喀拉峻

黑客 网络安全 安全 信息安全

Java 处理表格,真的很爽!

程序员鱼皮

Java

保持清洁的Git提交记录,三招就够了

Geek_1df311

Java 开源 架构 git 学习

什么是色彩心理学?

坚果

心理学 11月日更

WeTest.net全球能力开放:锻造高品质产品,构建全球竞争力

WeTest

这一次,飞书改变「飞书」

ToB行业头条

热爱代码且发量惊人,一名反“内卷”研发工程师的日常

尔达Erda

程序员 开发者 技术人生 成长笔记

如果你正在准备面试TCP,看这一篇就够了

热爱java的分享家

Java 架构 程序人生 编程语言 经验分享

使用ABAP代码返回S/4HANA Material上维护的Attachment明细

Jerry Wang

SAP S/4HANA 11月日更 Material

打造数字化软件工厂 —— 一站式 DevOps 平台全景解读

CODING DevOps

DevOps 研发管理 CODING

大咖说·未来组织的底层逻辑

大咖说

云计算

深度学习利器:分布式TensorFlow及实例分析_Google_武维_InfoQ精选文章