Tensorflow中ring all-reduce的实现

2019 年 12 月 02 日

Tensorflow中ring all-reduce的实现

上一篇文章,给大家介绍了 ring all-reduce 算法的过程和优点,那如何在 Tensorflow 代码中实现 ring all-reduce 呢,现在主要有两种方式:1.Tensorflow estimator 接口搭配 MultiWorkerMirroredStrategy API 使用;2. Tensorflow 搭配 horovod 使用。


Tensorflow estimator 接口搭配 MultiWorkerMirroredStrategy


tf.distribute.experimental.MultiWorkerMirroredStrategy 实现了多机多卡的同步训练,在每个 gpu 卡上都创建了所有变量的副本。它使用 CollectiveOps 操作作为多机之间的通信方式。Tensorflow 针对 performance 还做了一些优化,比如 static optimization that converts multiple all-reductions on small tensors into fewer all-reductions on larger tensors。


import tensorflow as tfimport osimport json
NUM_WORKERS = 1IP_ADDRS = ["xx.xxx.xx.xxxx", "xx.xxx.xx.xxxx"]PORTS = [22222222]
def model_fn(...): .....
def input_fn(...): .....
# 需要每个机器配置TF_CONFIG环境变量os.environ['TF_CONFIG'] = json.dumps({ 'cluster': { 'worker': ['%s:%d' % (IP_ADDRS[w], PORTS[w]) for w in range(NUM_WORKERS)] }, 'task': {'type': 'worker', 'index': 0}})
# Method for using MultiWorkerMirroredStrategystrategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator( model_fn=model_fn, model_dir='/tmp/multiworker', config=config)tf.estimator.train_and_evaluate( classifier, train_spec=tf.estimator.TrainSpec(input_fn=input_fn), eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))
复制代码


缺点:


只能和 Tensorflow 的 estimator 框架搭配使用,当训练代码是低阶的 API 时,就没有办法使用这种接口。


Tensorflow 搭配 horovod


Horovod 是 Uber 开源的高效分布式训练通信框架,Horovod 本身只负责节点间网络通信、梯度融合,在运行时需要绑定 TensorFlow 做单机运算。


Horovod 可以搭配任何 Tensorflow 的代码使用,比如搭配低阶 API 的,estimator 框架的,keres 的,因此 Horovod 使用起来更方便,没有任何局限性。


horovod 搭配低阶 API


参考代码:https://github.com/horovod/horovod/blob/master/examples/tensorflow_mnist.py


import osimport errnoimport tensorflow as tfimport horovod.tensorflow as hvdimport numpy as npfrom tensorflow import keras
tf.logging.set_verbosity(tf.logging.INFO)
def create_model(feature, target, mode): .....
def train_input_generator(x_train, y_train, batch_size=64): .....
def main(_): # Horovod: initialize Horovod. hvd.init()
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets') if not os.path.exists(cache_dir): try: os.mkdir(cache_dir) except OSError as e: if e.errno == errno.EEXIST and os.path.isdir(cache_dir): pass else: raise
# Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = \ keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())
# The shape of downloaded data is (-1, 28, 28), hence we need to reshape it # into (-1, 784) to feed into our network. Also, need to normalize the # features between 0 and 1. x_train = np.reshape(x_train, (-1, 784)) / 255.0 x_test = np.reshape(x_test, (-1, 784)) / 255.0
# Build model... with tf.name_scope('input'): image = tf.placeholder(tf.float32, [None, 784], name='image') label = tf.placeholder(tf.float32, [None], name='label') predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN)
# Horovod: adjust learning rate based on number of GPUs. opt = tf.train.RMSPropOptimizer(0.001 * hvd.size())
# Horovod: add Horovod Distributed Optimizer. opt = hvd.DistributedOptimizer(opt)
global_step = tf.train.get_or_create_global_step() train_op = opt.minimize(loss, global_step=global_step)
hooks = [ # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states # from rank 0 to all other processes. This is necessary to ensure consistent # initialization of all workers when training is started with random weights # or restored from a checkpoint. hvd.BroadcastGlobalVariablesHook(0),
# Horovod: adjust number of steps based on number of GPUs. tf.train.StopAtStepHook(last_step=20000 // hvd.size()),
tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss}, every_n_iter=10), ]
# Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank())
# Horovod: save checkpoints only on worker 0 to prevent other workers from # corrupting them. checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None training_batch_generator = train_input_generator(x_train, y_train, batch_size=100) # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, hooks=hooks, config=config) as mon_sess: while not mon_sess.should_stop(): # Run a training step synchronously. image_, label_ = next(training_batch_generator) mon_sess.run(train_op, feed_dict={image: image_, label: label_})

if __name__ == "__main__": tf.app.run()2. horovod搭配estimator
from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_function
import osimport errnoimport numpy as npimport tensorflow as tfimport horovod.tensorflow as hvd
from tensorflow import keras
tf.logging.set_verbosity(tf.logging.INFO)
def create_model(input_shape, num_classes): .....


def cnn_model_fn(features, labels, mode): model = create_model() logits = model(features) predictions = { # Generate predictions (for PREDICT and EVAL mode) "classes": tf.argmax(input=logits, axis=1), # Add `softmax_tensor` to the graph. It is used for PREDICT and by the # `logging_hook`. "probabilities": tf.nn.softmax(logits, name="softmax_tensor") } if mode == tf.estimator.ModeKeys.PREDICT: return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
# Calculate Loss (for both TRAIN and EVAL modes) onehot_labels = tf.one_hot(indices=tf.cast(labels, tf.int32), depth=10) loss = tf.losses.softmax_cross_entropy( onehot_labels=onehot_labels, logits=logits)
# Configure the Training Op (for TRAIN mode) if mode == tf.estimator.ModeKeys.TRAIN: # Horovod: scale learning rate by the number of workers. optimizer = tf.train.MomentumOptimizer( learning_rate=0.001 * hvd.size(), momentum=0.9)
# Horovod: add Horovod Distributed Optimizer. optimizer = hvd.DistributedOptimizer(optimizer)
train_op = optimizer.minimize( loss=loss, global_step=tf.train.get_global_step()) return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
# Add evaluation metrics (for EVAL mode) eval_metric_ops = { "accuracy": tf.metrics.accuracy( labels=labels, predictions=predictions["classes"])} return tf.estimator.EstimatorSpec( mode=mode, loss=loss, eval_metric_ops=eval_metric_ops)

def main(unused_argv): # Horovod: initialize Horovod. hvd.init()
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets') if not os.path.exists(cache_dir): try: os.mkdir(cache_dir) except OSError as e: if e.errno == errno.EEXIST and os.path.isdir(cache_dir): pass else: raise
# Download and load MNIST dataset. (train_data, train_labels), (eval_data, eval_labels) = \ keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())
train_data = np.reshape(train_data, (-1, 784)) / 255.0 eval_data = np.reshape(eval_data, (-1, 784)) / 255.0
# Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank())
# Horovod: save checkpoints only on worker 0 to prevent other workers from # corrupting them. model_dir = './mnist_convnet_model' if hvd.rank() == 0 else None
# Create the Estimator mnist_classifier = tf.estimator.Estimator( model_fn=cnn_model_fn, model_dir=model_dir, config=tf.estimator.RunConfig(session_config=config))
# Set up logging for predictions # Log the values in the "Softmax" tensor with label "probabilities" tensors_to_log = {"probabilities": "softmax_tensor"} logging_hook = tf.train.LoggingTensorHook( tensors=tensors_to_log, every_n_iter=500)
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states from # rank 0 to all other processes. This is necessary to ensure consistent # initialization of all workers when training is started with random weights or # restored from a checkpoint. bcast_hook = hvd.BroadcastGlobalVariablesHook(0)
# Train the model train_input_fn = tf.estimator.inputs.numpy_input_fn( x={"x": train_data}, y=train_labels, batch_size=100, num_epochs=None, shuffle=True)
# Horovod: adjust number of steps based on number of GPUs. mnist_classifier.train( input_fn=train_input_fn, steps=20000 // hvd.size(), hooks=[logging_hook, bcast_hook])
# Evaluate the model and print results eval_input_fn = tf.estimator.inputs.numpy_input_fn( x={"x": eval_data}, y=eval_labels, num_epochs=1, shuffle=False) eval_results = mnist_classifier.evaluate(input_fn=eval_input_fn) print(eval_results)

if __name__ == "__main__": tf.app.run()
复制代码


Horovod 的优势


  • 运行简单,单机多卡启动命令:

  • horovodrun -np 4 -H localhost:4 python train.py

  • 多机多卡启动命令,不需要在每个机器上都启动,只需要在第一个机器上启动该命令即可。


horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
复制代码


一套代码可实现单机单卡、单机多卡、多机多卡。


可搭配各种代码框架使用,比如低阶 API 的 tensorflow,estimator,keres,还支持与其他深度学习框架一起使用,比如和 pytorch,mxnet 一起使用。


参考文献:


https://www.tensorflow.org/guide/distribute_strategy


https://zhuanlan.zhihu.com/p/34172340


https://github.com/horovod/horo


本文转载自 Alex-zhai 知乎账号。


原文链接:https://zhuanlan.zhihu.com/p/69806200


2019 年 12 月 02 日 16:23329

评论

发布
暂无评论
发现更多内容

模拟电路设计工程师发展九段

老壳有点爽

集成电路 IC 芯片设计 模拟电路

Golang领域模型-六边形架构

奔奔奔跑

golang 领域驱动设计 微服务拆分 架构设计

成都信息工程大学的区块链工程成全国首个区块链工程本科专业

CECBC区块链专委会

区块链技术 区块链工程

强势入局,区块链专利将成为银行下一个战场?

CECBC区块链专委会

区块链 金融 银行

区块链之物流产业上链解决痛点

CECBC区块链专委会

区块链 供应链

集成电路工艺基础介绍

老壳有点爽

芯片 集成电路 IC 工艺 制程

世界集成电路发展概况

老壳有点爽

芯片 集成电路 IC 芯片营销

最初芯片国产化是怎么来的?

老壳有点爽

芯片 集成电路 国产化 替代

魅力非凡的半导体电路行业

老壳有点爽

芯片 集成电路 IC 芯片营销

MySQL备份与恢复场景示例

Simon

MySQL

MySQL DDL详情揭露

Simon

MySQL

Linux指令简述&vim引入(1)

老壳有点爽

vim Linux 脚本

IC设计流程及工具

老壳有点爽

芯片 集成电路 IC IC设计流程及工具

看门狗 | 分布式锁架构设计方案-02

高翔龙

redis 分布式锁 Jedis RedLock

半导体行业个人理解

老壳有点爽

芯片 半导体 集成电路 IC

Verilog 的debug技巧(1)

老壳有点爽

芯片 集成电路 IC Verilog 电路

数字后端工程师发展六阶段

老壳有点爽

芯片 集成电路 IC 数字电路工程师

sed语言学习技巧(1)

老壳有点爽

vim 编程语言 sed 脚本语言

sed 语言学习技巧(2)

老壳有点爽

vim sed 脚本语言

Python 核心技术与实践 input&output

Bonaparte

中国大陆芯片行业发展概况

老壳有点爽

芯片 集成电路 IC

集成电路设计概括

老壳有点爽

芯片 集成电路 IC

芯片行业的主要生意模式

老壳有点爽

芯片 集成电路 IC

2020深圳站-GIAC全球互联网架构大会PPT分享

高翔龙

架构 分布式系统 服务治理 大型网站演变 全链路压测

Vim小技巧(2)

老壳有点爽

vim Linux 脚本语言

看门狗 | 分布式锁架构设计方案-01

高翔龙

redis 分布式锁 RedLock WatchDog

物理实现(Physical Implementation)

老壳有点爽

芯片 集成电路 IC 物理设计 PI

数字电路后端设计流程

老壳有点爽

设计 flow 集成电路 IC

芯片行业基本生态:设计生产封装的行业分工

老壳有点爽

芯片 集成电路 IC

国产数据库的经济民族

郭华

数据库 商业

PySpark RDD 基础运算和操作总结

是老郭啊

spark pyspark RDD

Tensorflow中ring all-reduce的实现-InfoQ