Tensorflow 中 ring all-reduce 的实现

阅读数:13 2019 年 12 月 2 日 16:23

Tensorflow中ring all-reduce的实现

上一篇文章,给大家介绍了 ring all-reduce 算法的过程和优点,那如何在 Tensorflow 代码中实现 ring all-reduce 呢,现在主要有两种方式:1.Tensorflow estimator 接口搭配 MultiWorkerMirroredStrategy API 使用;2. Tensorflow 搭配 horovod 使用。

Tensorflow estimator 接口搭配 MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy 实现了多机多卡的同步训练,在每个 gpu 卡上都创建了所有变量的副本。它使用 CollectiveOps 操作作为多机之间的通信方式。Tensorflow 针对 performance 还做了一些优化,比如 static optimization that converts multiple all-reductions on small tensors into fewer all-reductions on larger tensors。

复制代码
import tensorflow as tf
import os
import json
NUM_WORKERS = 1
IP_ADDRS = ["xx.xxx.xx.xxxx", "xx.xxx.xx.xxxx"]
PORTS = [2222, 2222]
def model_fn(...):
.....
def input_fn(...):
.....
# 需要每个机器配置 TF_CONFIG 环境变量
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ['%s:%d' % (IP_ADDRS[w], PORTS[w]) for w in range(NUM_WORKERS)]
},
'task': {'type': 'worker', 'index': 0}
})
# Method for using MultiWorkerMirroredStrategy
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)

缺点:

只能和 Tensorflow 的 estimator 框架搭配使用,当训练代码是低阶的 API 时,就没有办法使用这种接口。

Tensorflow 搭配 horovod

Horovod 是 Uber 开源的高效分布式训练通信框架,Horovod 本身只负责节点间网络通信、梯度融合,在运行时需要绑定 TensorFlow 做单机运算。

Horovod 可以搭配任何 Tensorflow 的代码使用,比如搭配低阶 API 的,estimator 框架的,keres 的,因此 Horovod 使用起来更方便,没有任何局限性。

horovod 搭配低阶 API
参考代码: https://github.com/horovod/horovod/blob/master/examples/tensorflow_mnist.py

复制代码
import os
import errno
import tensorflow as tf
import horovod.tensorflow as hvd
import numpy as np
from tensorflow import keras
tf.logging.set_verbosity(tf.logging.INFO)
def create_model(feature, target, mode):
.....
def train_input_generator(x_train, y_train, batch_size=64):
.....
def main(_):
# Horovod: initialize Horovod.
hvd.init()
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets')
if not os.path.exists(cache_dir):
try:
os.mkdir(cache_dir)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(cache_dir):
pass
else:
raise
# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = \
keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())
# The shape of downloaded data is (-1, 28, 28), hence we need to reshape it
# into (-1, 784) to feed into our network. Also, need to normalize the
# features between 0 and 1.
x_train = np.reshape(x_train, (-1, 784)) / 255.0
x_test = np.reshape(x_test, (-1, 784)) / 255.0
# Build model...
with tf.name_scope('input'):
image = tf.placeholder(tf.float32, [None, 784], name='image')
label = tf.placeholder(tf.float32, [None], name='label')
predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN)
# Horovod: adjust learning rate based on number of GPUs.
opt = tf.train.RMSPropOptimizer(0.001 * hvd.size())
# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)
global_step = tf.train.get_or_create_global_step()
train_op = opt.minimize(loss, global_step=global_step)
hooks = [
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states
# from rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights
# or restored from a checkpoint.
hvd.BroadcastGlobalVariablesHook(0),
# Horovod: adjust number of steps based on number of GPUs.
tf.train.StopAtStepHook(last_step=20000 // hvd.size()),
tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss},
every_n_iter=10),
]
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
# Horovod: save checkpoints only on worker 0 to prevent other workers from
# corrupting them.
checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None
training_batch_generator = train_input_generator(x_train,
y_train, batch_size=100)
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
hooks=hooks,
config=config) as mon_sess:
while not mon_sess.should_stop():
# Run a training step synchronously.
image_, label_ = next(training_batch_generator)
mon_sess.run(train_op, feed_dict={image: image_, label: label_})
if __name__ == "__main__":
tf.app.run()
2. horovod 搭配 estimator
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import errno
import numpy as np
import tensorflow as tf
import horovod.tensorflow as hvd
from tensorflow import keras
tf.logging.set_verbosity(tf.logging.INFO)
def create_model(input_shape, num_classes):
.....
def cnn_model_fn(features, labels, mode):
model = create_model()
logits = model(features)
predictions = {
# Generate predictions (for PREDICT and EVAL mode)
"classes": tf.argmax(input=logits, axis=1),
# Add `softmax_tensor` to the graph. It is used for PREDICT and by the
# `logging_hook`.
"probabilities": tf.nn.softmax(logits, name="softmax_tensor")
}
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
# Calculate Loss (for both TRAIN and EVAL modes)
onehot_labels = tf.one_hot(indices=tf.cast(labels, tf.int32), depth=10)
loss = tf.losses.softmax_cross_entropy(
onehot_labels=onehot_labels, logits=logits)
# Configure the Training Op (for TRAIN mode)
if mode == tf.estimator.ModeKeys.TRAIN:
# Horovod: scale learning rate by the number of workers.
optimizer = tf.train.MomentumOptimizer(
learning_rate=0.001 * hvd.size(), momentum=0.9)
# Horovod: add Horovod Distributed Optimizer.
optimizer = hvd.DistributedOptimizer(optimizer)
train_op = optimizer.minimize(
loss=loss,
global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode=mode, loss=loss,
train_op=train_op)
# Add evaluation metrics (for EVAL mode)
eval_metric_ops = {
"accuracy": tf.metrics.accuracy(
labels=labels, predictions=predictions["classes"])}
return tf.estimator.EstimatorSpec(
mode=mode, loss=loss, eval_metric_ops=eval_metric_ops)
def main(unused_argv):
# Horovod: initialize Horovod.
hvd.init()
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets')
if not os.path.exists(cache_dir):
try:
os.mkdir(cache_dir)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(cache_dir):
pass
else:
raise
# Download and load MNIST dataset.
(train_data, train_labels), (eval_data, eval_labels) = \
keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())
train_data = np.reshape(train_data, (-1, 784)) / 255.0
eval_data = np.reshape(eval_data, (-1, 784)) / 255.0
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
# Horovod: save checkpoints only on worker 0 to prevent other workers from
# corrupting them.
model_dir = './mnist_convnet_model' if hvd.rank() == 0 else None
# Create the Estimator
mnist_classifier = tf.estimator.Estimator(
model_fn=cnn_model_fn, model_dir=model_dir,
config=tf.estimator.RunConfig(session_config=config))
# Set up logging for predictions
# Log the values in the "Softmax" tensor with label "probabilities"
tensors_to_log = {"probabilities": "softmax_tensor"}
logging_hook = tf.train.LoggingTensorHook(
tensors=tensors_to_log, every_n_iter=500)
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states from
# rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights or
# restored from a checkpoint.
bcast_hook = hvd.BroadcastGlobalVariablesHook(0)
# Train the model
train_input_fn = tf.estimator.inputs.numpy_input_fn(
x={"x": train_data},
y=train_labels,
batch_size=100,
num_epochs=None,
shuffle=True)
# Horovod: adjust number of steps based on number of GPUs.
mnist_classifier.train(
input_fn=train_input_fn,
steps=20000 // hvd.size(),
hooks=[logging_hook, bcast_hook])
# Evaluate the model and print results
eval_input_fn = tf.estimator.inputs.numpy_input_fn(
x={"x": eval_data},
y=eval_labels,
num_epochs=1,
shuffle=False)
eval_results = mnist_classifier.evaluate(input_fn=eval_input_fn)
print(eval_results)
if __name__ == "__main__":
tf.app.run()

Horovod 的优势

  • 运行简单,单机多卡启动命令:
    horovodrun -np 4 -H localhost:4 python train.py
    多机多卡启动命令,不需要在每个机器上都启动,只需要在第一个机器上启动该命令即可。
复制代码
horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py

一套代码可实现单机单卡、单机多卡、多机多卡。
可搭配各种代码框架使用,比如低阶 API 的 tensorflow,estimator,keres,还支持与其他深度学习框架一起使用,比如和 pytorch,mxnet 一起使用。
参考文献:

https://www.tensorflow.org/guide/distribute_strategy
https://zhuanlan.zhihu.com/p/34172340
https://github.com/horovod/horo

本文转载自 Alex-zhai 知乎账号。

原文链接: https://zhuanlan.zhihu.com/p/69806200

评论

发布