分布式 tensorflow 相关源码阅读笔记 1:参数设备分配

阅读数:30 2019 年 11 月 29 日 08:00

分布式tensorflow相关源码阅读笔记1:参数设备分配

首先回顾下 tensorflow 中 In-graph 和 between-graph 的概念:

In-graph replication:只构建一个 client,这个 client 构建一个 Graph,Graph 中包含一套模型参数,放置在 ps 上,同时这个 Graph 中包含模型计算部分的多个副本,每个副本都放置在一个 worker 上,这样多个 worker 可以同时训练复制的模型。这种方式的缺点也很明显,一旦唯一创建 client 的那个 worker 挂了,整个系统就全崩溃了,容错能力差,因此在实际中使用较少。

Between-graph replication:每个 worker 都会创建一个 client,这个 client 一般还与 task 的主程序在同一进程中。各个 client 构建相同的 Graph,但是参数还是放置在 ps 上。这种方式容错能力比较好,一个 worker 的 client 挂掉之后,其它 worker 还可以继续 running。

我们重点讨论下 Between-graph 的方式,当只有一个 ps 的情况下,参数分配没什么好讨论的,那如果有多个 ps 节点时,变量存储和更新该怎么分配呢?tf.train.replica_device_setter 这个 API 给出了答案,该函数可以使用不同的设备函数来创建不同的参数放置策略。它默认会循环分配 graph 中的参数变量到各个 ps 上,并将计算部分的 op 放置于当前的 worker 节点上。下面先给一个如何使用 tf.train.replica_device_setter 的例子:

复制代码
# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker
# jobs on hosts worker0, worker1 and worker2.
cluster_spec = {
"ps": ["ps0:2222", "ps1:2222"],
"worker": ["worker0:2222", "worker1:2222", "worker2:2222"]}
with tf.device(tf.train.replica_device_setter(cluster=cluster_spec)):
# Build your graph
v1 = tf.Variable(...) # assigned to /job:ps/task:0
v2 = tf.Variable(...) # assigned to /job:ps/task:1
v3 = tf.Variable(...) # assigned to /job:ps/task:0
那 replica_device_setter 内部是怎么实现的呢?我们来看下内部的源码:
@tf_export(v1=["train.replica_device_setter"])
def replica_device_setter(ps_tasks=0,
ps_device="/job:ps",
worker_device="/job:worker",
merge_devices=True,
cluster=None,
ps_ops=None,
ps_strategy=None):
"""Return a `device function` to use when building a Graph for replicas.
By default, only Variable ops are placed on ps tasks, and the placement
strategy is round-robin over all ps tasks. A custom `ps_strategy` may be used
to do more intelligent placement, such as
`tf.contrib.training.GreedyLoadBalancingStrategy`.
Args:
ps_tasks: Number of tasks in the `ps` job. Ignored if `cluster` is
provided.
ps_device: String. Device of the `ps` job. If empty no `ps` job is used.
Defaults to `ps`.
worker_device: String. Device of the `worker` job. If empty no `worker`
job is used.
merge_devices: `Boolean`. If `True`, merges or only sets a device if the
device constraint is completely unset. merges device specification rather
than overriding them.
cluster: `ClusterDef` proto or `ClusterSpec`.
ps_ops: List of strings representing `Operation` types that need to be
placed on `ps` devices. If `None`, defaults to `STANDARD_PS_OPS`.
ps_strategy: A callable invoked for every ps `Operation` (i.e. matched by
`ps_ops`), that takes the `Operation` and returns the ps task index to
use. If `None`, defaults to a round-robin strategy across all `ps`
devices.
Returns:
A function to pass to `tf.device()`.
Raises:
TypeError if `cluster` is not a dictionary or `ClusterDef` protocol buffer,
or if `ps_strategy` is provided but not a callable.
"""
if cluster is not None:
if isinstance(cluster, server_lib.ClusterSpec):
cluster_spec = cluster.as_dict()
else:
cluster_spec = server_lib.ClusterSpec(cluster).as_dict()
# Get ps_job_name from ps_device by stripping "/job:".
ps_job_name = pydev.DeviceSpec.from_string(ps_device).job
if ps_job_name not in cluster_spec or cluster_spec[ps_job_name] is None:
return None
ps_tasks = len(cluster_spec[ps_job_name])
if ps_tasks == 0:
return None
if ps_ops is None:
# TODO(sherrym): Variables in the LOCAL_VARIABLES collection should not be
# placed in the parameter server.
ps_ops = list(STANDARD_PS_OPS)
if not merge_devices:
logging.warning(
"DEPRECATION: It is recommended to set merge_devices=true in "
"replica_device_setter")
if ps_strategy is None:
ps_strategy = _RoundRobinStrategy(ps_tasks)
if not six.callable(ps_strategy):
raise TypeError("ps_strategy must be callable")
chooser = _ReplicaDeviceChooser(ps_tasks, ps_device, worker_device,
merge_devices, ps_ops, ps_strategy)
return chooser.device_function
class _RoundRobinStrategy(object):
"""Returns the next ps task index for placement in round-robin order.
This class is not to be used directly by users. See instead
`replica_device_setter()` below.
"""
def __init__(self, num_tasks):
"""Create a new `_RoundRobinStrategy`.
Args:
num_tasks: Number of ps tasks to cycle among.
"""
self._num_tasks = num_tasks
self._next_task = 0
def __call__(self, unused_op):
"""Choose a ps task index for the given `Operation`.
Args:
unused_op: An `Operation` to be placed on ps.
Returns:
The next ps task index to use for the `Operation`. Returns the next
index, in the range `[offset, offset + num_tasks)`.
"""
task = self._next_task
self._next_task = (self._next_task + 1) % self._num_tasks
return task

可以看到参数在 ps 上的默认分配方式是 _RoundRobinStrategy,_RoundRobinStrategy 策略以 round-robin 顺序返回 ps 任务的 index。replica_device_setter 函数返回的一个 device function ,而这个 device function 就是用于指定 op 的 device 放置的。

我们结合一个测试 replica_device_setter 函数的例子,来说明 device function 内部的原理:

复制代码
"""Tests for device function for replicated training."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.core.framework import node_def_pb2
from tensorflow.python.framework import device as pydev
from tensorflow.python.framework import ops
from tensorflow.python.ops import variables
from tensorflow.python.platform import test
from tensorflow.python.training import device_setter
from tensorflow.python.training import server_lib
class _RoundRobinStrategy(object):
"""Returns the next ps task index for placement in round-robin order.
{1}
This class is not to be used directly by users. See instead
`replica_device_setter()` below.
"""
def __init__(self, num_tasks):
"""Create a new `_RoundRobinStrategy`.
{1}
Args:
num_tasks: Number of ps tasks to cycle among.
"""
self._num_tasks = num_tasks
self._next_task = 0
def __call__(self, unused_op):
"""Choose a ps task index for the given `Operation`.
{1}
Args:
unused_op: An `Operation` to be placed on ps.
{1}
Returns:
The next ps task index to use for the `Operation`. Returns the next
index, in the range `[offset, offset + num_tasks)`.
"""
task = self._next_task
self._next_task = (self._next_task + 1) % self._num_tasks
return task
class DeviceSetterTest(test.TestCase):
_cluster_spec = server_lib.ClusterSpec({
"ps": ["ps0:2222", "ps1:2222"],
"worker": ["worker0:2222", "worker1:2222", "worker2:2222"]
})
_ps_tasks = 2
_ps_device = "/job:ps"
_worker_device = "/job:worker"
_merge_devices = True
_ps_ops = ("Variable", "VariableV2", "AutoReloadVariable",
"MutableHashTable", "MutableHashTableV2",
"MutableHashTableOfTensors", "MutableHashTableOfTensorsV2",
"MutableDenseHashTable", "MutableDenseHashTableV2",
"VarHandleOp", "BoostedTreesEnsembleResourceHandleOp")
_ps_strategy = _RoundRobinStrategy(_ps_tasks)
def testCPUOverride(self):
with ops.device(
device_setter.replica_device_setter(cluster=self._cluster_spec)):
# with ops.device("/cpu:0"):
v = variables.Variable([1, 2])
with ops.device("/cpu:0"):
w = variables.Variable([2, 1])
print(self.device_function(w.op))
k = variables.Variable([1, 2])
print(self.device_function(k.op))
with ops.device("/cpu:0"):
a = v + w
print(self.device_function(a.op))
# self.assertDeviceEqual("/job:ps/task:0/cpu:0", v.device)
# self.assertDeviceEqual("/job:ps/task:0/cpu:0", v.initializer.device)
# self.assertDeviceEqual("/job:ps/task:1", w.device)
# self.assertDeviceEqual("/job:ps/task:1", w.initializer.device)
# self.assertDeviceEqual("/job:worker/cpu:0", a.device)
def device_function(self, op):
"""Choose a device for `op`.
{1}
Args:
op: an `Operation`.
{1}
Returns:
The device to use for the `Operation`.
"""
# If we don't return early here, either merge_devices is True, or op.device
# is empty (in which case merging is a no-op). So we can always merge below.
if not self._merge_devices and op.device:
return op.device
current_device = pydev.DeviceSpec.from_string(op.device or "")
# The ps_device will be used for specified ops (ps_ops) whenever it is
# present and ps_tasks is non-zero. However, its task number will only be
# set (using ps_strategy) if there is a job field in ps_device that won't be
# changed by the job field (if present) in current_device.
node_def = op if isinstance(op, node_def_pb2.NodeDef) else op.node_def
if self._ps_tasks and self._ps_device and node_def.op in self._ps_ops:
ps_device = pydev.DeviceSpec.from_string(self._ps_device)
print("ps_device:", ps_device.to_string())
current_job, ps_job = current_device.job, ps_device.job
print("cur_device:", current_device.to_string())
if ps_job and (not current_job or current_job == ps_job):
ps_device = ps_device.replace(task=self._ps_strategy(op))
ps_device = ps_device.make_merged_spec(current_device)
print("merge_ps_device:", ps_device.to_string())
return ps_device.to_string()
worker_device = pydev.DeviceSpec.from_string(self._worker_device or "")
worker_device = worker_device.make_merged_spec(current_device)
return worker_device.to_string()
if __name__ == "__main__":
setter_test = DeviceSetterTest()
setter_test.testCPUOverride()
# test.main()
返回结果:
ps_device: /job:ps
cur_device: /job:ps/task:0
merge_ps_device: /job:ps/task:0
/job:ps/task:0
_____________
ps_device: /job:ps
cur_device: /job:ps/task:1/device:CPU:0
merge_ps_device: /job:ps/task:1/device:CPU:0
/job:ps/task:1/device:CPU:0
_____________
ps_device: /job:ps
cur_device: /job:ps/task:0
merge_ps_device: /job:ps/task:0
/job:ps/task:0
_________________
/job:worker/device:CPU:0

可以清楚的看出其实 device_function 的作用就是根据当前 op 的性质,返回执行该 op 的 device 信息,当 op 是属于 _ps_ops 其中一种时,需要将该 op 放置于 ps 节点,否则就放置于 worker 节点。

可以看出如果有多个参数,默认的放置策略是 round-robin,按出现次序将参数挨个放到各个 ps 节点上,但是这种方式可能不能使 ps 达到负载均衡,在 2 个 ps 的情况下可能会出现,所有的参数 variable 都在 ps0,所有的偏置 b 都在 ps1 上,这显然会给 ps0 带来更大的负载压力。如果需要更加合理的参数分配方式,使用 tf.contrib.training.GreedyLoadBalancingStrategy 策略, 这是一个简单的贪婪策略,它可根据参数的内存字节大小来放置到内存合适的 ps 节点上,从而带来更好的负载均衡,如下图所示:
分布式tensorflow相关源码阅读笔记1:参数设备分配

复制代码
"""Strategies for placing variables on parameter servers.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import hashlib
import numpy as np
from tensorflow.python.framework import tensor_shape
class GreedyLoadBalancingStrategy(object):
"""Returns the least-loaded ps task for op placement.
The load is calculated by a user-specified load function passed in at
construction. There are no units for load, and the load function is
responsible for providing an internally consistent measure.
Note that this strategy is very sensitive to the exact order in which
ps ops (typically variables) are created, as it greedily places ops
on the least-loaded ps at the point each op is processed.
One reasonable heuristic is the `byte_size_load_fn`, which
estimates load as the number of bytes that would be used to store and
transmit the entire variable. More advanced load functions
could consider the difference in access patterns across ops, or trade
off CPU-intensive ops with RAM-intensive ops with network bandwidth.
This class is intended to be used as a `ps_strategy` in
`tf.train.replica_device_setter`.
"""
{1}
def __init__(self, num_tasks, load_fn):
"""Create a new `LoadBalancingStrategy`.
Args:
num_tasks: Number of ps tasks to cycle among.
load_fn: A callable that takes an `Operation` and returns a
numeric load value for that op.
"""
self._num_tasks = num_tasks
self._load_fn = load_fn
self._ps_loads = np.zeros(num_tasks)
{1}
def __call__(self, op):
"""Choose a ps task index for the given `Operation`.
Args:
op: A `Operation` to be placed on ps.
Returns:
The next ps task index to use for the `Operation`. Greedily
places the op on the least-loaded ps task so far, as determined
by the load function.
"""
task = np.argmin(self._ps_loads)
self._ps_loads[task] += self._load_fn(op)
return task
{1}
{1}
def byte_size_load_fn(op):
"""Load function that computes the byte size of a single-output `Operation`.
This is intended to be used with `"Variable"` ops, which have a single
`Tensor` output with the contents of the variable. However, it can also be
used for calculating the size of any op that has a single output.
Intended to be used with `GreedyLoadBalancingStrategy`.
Args:
op: An `Operation` with a single output, typically a "Variable" op.
Returns:
The number of bytes in the output `Tensor`.
Raises:
ValueError: if `op` does not have a single output, or if the shape of the
single output is not fully-defined.
"""
if len(op.outputs) != 1:
raise ValueError("Op %s must have a single output" % op)
output = op.outputs[0]
elem_size = output.dtype.size
shape = output.get_shape()
if not shape.is_fully_defined():
# Due to legacy behavior, scalar "Variable" ops have output Tensors that
# have unknown shape when the op is created (and hence passed to this
# load function for placement), even though the scalar shape is set
# explicitly immediately afterward.
shape = tensor_shape.TensorShape(op.get_attr("shape"))
shape.assert_is_fully_defined()
return shape.num_elements() * elem_size
{1}

上面是 GreedyLoadBalancingStrategy 策略的源码,还是比较清晰明了的。该策略是根据参数的内存字节大小 shape.num_elements()* elem_size 来选择放置到内存合适的 ps 节点上,每次都会选择当前放置内存被占用最少的那个 ps 节点来放置。所以其实可以自己定义放置策略,比如先把变量按大小排序后,然后每次初始化的时候向当前负载最小的 ps 上放。

以上讨论的都还是小字节的参数,每个 PS 节点都还可以单独处理一个变量。当遇到超大字节的变量,比如千万甚至亿级别的 embedding 特征,该如何处理?TensorFlow 提供了一个分割变量的方法,对于这种超大字节的变量,可使用一个分隔符,把这个变量分成多个部分,分发到不同的 ps 节点上去,如下图所示:

分布式tensorflow相关源码阅读笔记1:参数设备分配

参考文献:

https://blog.csdn.net/tiangcs/article/details/85952007
https://blog.csdn.net/u012133034/article/details/81167040
https://www.youtube.com/watch ?

本文转载自 Alex-zhai 知乎账号。
原文链接: https://zhuanlan.zhihu.com/p/90234576

评论

发布