11 月 19 - 20 日 Apache Pulsar 社区年度盛会来啦,立即报名! 了解详情
写点什么

分布式 tensorflow 相关源码阅读笔记 1:参数设备分配

  • 2019-11-29
  • 本文字数:7749 字

    阅读完需:约 25 分钟

分布式tensorflow相关源码阅读笔记1:参数设备分配

首先回顾下 tensorflow 中 In-graph 和 between-graph 的概念:


In-graph replication:只构建一个 client,这个 client 构建一个 Graph,Graph 中包含一套模型参数,放置在 ps 上,同时这个 Graph 中包含模型计算部分的多个副本,每个副本都放置在一个 worker 上,这样多个 worker 可以同时训练复制的模型。这种方式的缺点也很明显,一旦唯一创建 client 的那个 worker 挂了,整个系统就全崩溃了,容错能力差,因此在实际中使用较少。


Between-graph replication:每个 worker 都会创建一个 client,这个 client 一般还与 task 的主程序在同一进程中。各个 client 构建相同的 Graph,但是参数还是放置在 ps 上。这种方式容错能力比较好,一个 worker 的 client 挂掉之后,其它 worker 还可以继续 running。


我们重点讨论下 Between-graph 的方式,当只有一个 ps 的情况下,参数分配没什么好讨论的,那如果有多个 ps 节点时,变量存储和更新该怎么分配呢?tf.train.replica_device_setter 这个 API 给出了答案,该函数可以使用不同的设备函数来创建不同的参数放置策略。它默认会循环分配 graph 中的参数变量到各个 ps 上,并将计算部分的 op 放置于当前的 worker 节点上。下面先给一个如何使用 tf.train.replica_device_setter 的例子:


# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker# jobs on hosts worker0, worker1 and worker2.cluster_spec = {    "ps": ["ps0:2222", "ps1:2222"],    "worker": ["worker0:2222", "worker1:2222", "worker2:2222"]}with tf.device(tf.train.replica_device_setter(cluster=cluster_spec)):  # Build your graph  v1 = tf.Variable(...)  # assigned to /job:ps/task:0  v2 = tf.Variable(...)  # assigned to /job:ps/task:1  v3 = tf.Variable(...)  # assigned to /job:ps/task:0那replica_device_setter内部是怎么实现的呢?我们来看下内部的源码:
@tf_export(v1=["train.replica_device_setter"])def replica_device_setter(ps_tasks=0, ps_device="/job:ps", worker_device="/job:worker", merge_devices=True, cluster=None, ps_ops=None, ps_strategy=None): """Return a `device function` to use when building a Graph for replicas. By default, only Variable ops are placed on ps tasks, and the placement strategy is round-robin over all ps tasks. A custom `ps_strategy` may be used to do more intelligent placement, such as `tf.contrib.training.GreedyLoadBalancingStrategy`. Args: ps_tasks: Number of tasks in the `ps` job. Ignored if `cluster` is provided. ps_device: String. Device of the `ps` job. If empty no `ps` job is used. Defaults to `ps`. worker_device: String. Device of the `worker` job. If empty no `worker` job is used. merge_devices: `Boolean`. If `True`, merges or only sets a device if the device constraint is completely unset. merges device specification rather than overriding them. cluster: `ClusterDef` proto or `ClusterSpec`. ps_ops: List of strings representing `Operation` types that need to be placed on `ps` devices. If `None`, defaults to `STANDARD_PS_OPS`. ps_strategy: A callable invoked for every ps `Operation` (i.e. matched by `ps_ops`), that takes the `Operation` and returns the ps task index to use. If `None`, defaults to a round-robin strategy across all `ps` devices.
Returns: A function to pass to `tf.device()`.
Raises: TypeError if `cluster` is not a dictionary or `ClusterDef` protocol buffer, or if `ps_strategy` is provided but not a callable. """ if cluster is not None: if isinstance(cluster, server_lib.ClusterSpec): cluster_spec = cluster.as_dict() else: cluster_spec = server_lib.ClusterSpec(cluster).as_dict() # Get ps_job_name from ps_device by stripping "/job:". ps_job_name = pydev.DeviceSpec.from_string(ps_device).job if ps_job_name not in cluster_spec or cluster_spec[ps_job_name] is None: return None ps_tasks = len(cluster_spec[ps_job_name])
if ps_tasks == 0: return None
if ps_ops is None: # TODO(sherrym): Variables in the LOCAL_VARIABLES collection should not be # placed in the parameter server. ps_ops = list(STANDARD_PS_OPS)
if not merge_devices: logging.warning( "DEPRECATION: It is recommended to set merge_devices=true in " "replica_device_setter") if ps_strategy is None: ps_strategy = _RoundRobinStrategy(ps_tasks) if not six.callable(ps_strategy): raise TypeError("ps_strategy must be callable") chooser = _ReplicaDeviceChooser(ps_tasks, ps_device, worker_device, merge_devices, ps_ops, ps_strategy) return chooser.device_function
class _RoundRobinStrategy(object): """Returns the next ps task index for placement in round-robin order.
This class is not to be used directly by users. See instead `replica_device_setter()` below. """
def __init__(self, num_tasks): """Create a new `_RoundRobinStrategy`.
Args: num_tasks: Number of ps tasks to cycle among. """ self._num_tasks = num_tasks self._next_task = 0
def __call__(self, unused_op): """Choose a ps task index for the given `Operation`.
Args: unused_op: An `Operation` to be placed on ps.
Returns: The next ps task index to use for the `Operation`. Returns the next index, in the range `[offset, offset + num_tasks)`. """ task = self._next_task self._next_task = (self._next_task + 1) % self._num_tasks return task
复制代码


可以看到参数在 ps 上的默认分配方式是_RoundRobinStrategy,_RoundRobinStrategy 策略以 round-robin 顺序返回 ps 任务的 index。replica_device_setter 函数返回的一个 device function ,而这个 device function 就是用于指定 op 的 device 放置的。


我们结合一个测试 replica_device_setter 函数的例子,来说明 device function 内部的原理:


"""Tests for device function for replicated training."""
from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_function
from tensorflow.core.framework import node_def_pb2from tensorflow.python.framework import device as pydevfrom tensorflow.python.framework import opsfrom tensorflow.python.ops import variablesfrom tensorflow.python.platform import testfrom tensorflow.python.training import device_setterfrom tensorflow.python.training import server_lib

class _RoundRobinStrategy(object): """Returns the next ps task index for placement in round-robin order.
This class is not to be used directly by users. See instead `replica_device_setter()` below. """
def __init__(self, num_tasks): """Create a new `_RoundRobinStrategy`.
Args: num_tasks: Number of ps tasks to cycle among. """ self._num_tasks = num_tasks self._next_task = 0
def __call__(self, unused_op): """Choose a ps task index for the given `Operation`.
Args: unused_op: An `Operation` to be placed on ps.
Returns: The next ps task index to use for the `Operation`. Returns the next index, in the range `[offset, offset + num_tasks)`. """ task = self._next_task self._next_task = (self._next_task + 1) % self._num_tasks return task

class DeviceSetterTest(test.TestCase): _cluster_spec = server_lib.ClusterSpec({ "ps": ["ps0:2222", "ps1:2222"], "worker": ["worker0:2222", "worker1:2222", "worker2:2222"] })
_ps_tasks = 2 _ps_device = "/job:ps" _worker_device = "/job:worker" _merge_devices = True _ps_ops = ("Variable", "VariableV2", "AutoReloadVariable", "MutableHashTable", "MutableHashTableV2", "MutableHashTableOfTensors", "MutableHashTableOfTensorsV2", "MutableDenseHashTable", "MutableDenseHashTableV2", "VarHandleOp", "BoostedTreesEnsembleResourceHandleOp") _ps_strategy = _RoundRobinStrategy(_ps_tasks)
def testCPUOverride(self): with ops.device( device_setter.replica_device_setter(cluster=self._cluster_spec)): # with ops.device("/cpu:0"): v = variables.Variable([1, 2]) with ops.device("/cpu:0"): w = variables.Variable([2, 1]) print(self.device_function(w.op)) k = variables.Variable([1, 2]) print(self.device_function(k.op)) with ops.device("/cpu:0"): a = v + w print(self.device_function(a.op)) # self.assertDeviceEqual("/job:ps/task:0/cpu:0", v.device) # self.assertDeviceEqual("/job:ps/task:0/cpu:0", v.initializer.device) # self.assertDeviceEqual("/job:ps/task:1", w.device) # self.assertDeviceEqual("/job:ps/task:1", w.initializer.device) # self.assertDeviceEqual("/job:worker/cpu:0", a.device)
def device_function(self, op): """Choose a device for `op`.
Args: op: an `Operation`.
Returns: The device to use for the `Operation`. """ # If we don't return early here, either merge_devices is True, or op.device # is empty (in which case merging is a no-op). So we can always merge below. if not self._merge_devices and op.device: return op.device
current_device = pydev.DeviceSpec.from_string(op.device or "")
# The ps_device will be used for specified ops (ps_ops) whenever it is # present and ps_tasks is non-zero. However, its task number will only be # set (using ps_strategy) if there is a job field in ps_device that won't be # changed by the job field (if present) in current_device. node_def = op if isinstance(op, node_def_pb2.NodeDef) else op.node_def if self._ps_tasks and self._ps_device and node_def.op in self._ps_ops: ps_device = pydev.DeviceSpec.from_string(self._ps_device) print("ps_device:", ps_device.to_string()) current_job, ps_job = current_device.job, ps_device.job print("cur_device:", current_device.to_string()) if ps_job and (not current_job or current_job == ps_job): ps_device = ps_device.replace(task=self._ps_strategy(op))
ps_device = ps_device.make_merged_spec(current_device) print("merge_ps_device:", ps_device.to_string()) return ps_device.to_string()
worker_device = pydev.DeviceSpec.from_string(self._worker_device or "") worker_device = worker_device.make_merged_spec(current_device) return worker_device.to_string()

if __name__ == "__main__": setter_test = DeviceSetterTest() setter_test.testCPUOverride() # test.main()
返回结果:ps_device: /job:pscur_device: /job:ps/task:0merge_ps_device: /job:ps/task:0/job:ps/task:0_____________ps_device: /job:pscur_device: /job:ps/task:1/device:CPU:0merge_ps_device: /job:ps/task:1/device:CPU:0/job:ps/task:1/device:CPU:0_____________ps_device: /job:pscur_device: /job:ps/task:0merge_ps_device: /job:ps/task:0/job:ps/task:0_________________/job:worker/device:CPU:0
复制代码


可以清楚的看出其实 device_function 的作用就是根据当前 op 的性质,返回执行该 op 的 device 信息,当 op 是属于_ps_ops 其中一种时,需要将该 op 放置于 ps 节点,否则就放置于 worker 节点。


可以看出如果有多个参数,默认的放置策略是 round-robin,按出现次序将参数挨个放到各个 ps 节点上,但是这种方式可能不能使 ps 达到负载均衡,在 2 个 ps 的情况下可能会出现,所有的参数 variable 都在 ps0,所有的偏置 b 都在 ps1 上,这显然会给 ps0 带来更大的负载压力。如果需要更加合理的参数分配方式,使用 tf.contrib.training.GreedyLoadBalancingStrategy 策略, 这是一个简单的贪婪策略,它可根据参数的内存字节大小来放置到内存合适的 ps 节点上,从而带来更好的负载均衡,如下图所示:



"""Strategies for placing variables on parameter servers."""from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_function
import hashlibimport numpy as np
from tensorflow.python.framework import tensor_shape
class GreedyLoadBalancingStrategy(object): """Returns the least-loaded ps task for op placement.
The load is calculated by a user-specified load function passed in at construction. There are no units for load, and the load function is responsible for providing an internally consistent measure.
Note that this strategy is very sensitive to the exact order in which ps ops (typically variables) are created, as it greedily places ops on the least-loaded ps at the point each op is processed.
One reasonable heuristic is the `byte_size_load_fn`, which estimates load as the number of bytes that would be used to store and transmit the entire variable. More advanced load functions could consider the difference in access patterns across ops, or trade off CPU-intensive ops with RAM-intensive ops with network bandwidth.
This class is intended to be used as a `ps_strategy` in `tf.train.replica_device_setter`. """
def __init__(self, num_tasks, load_fn): """Create a new `LoadBalancingStrategy`.
Args: num_tasks: Number of ps tasks to cycle among. load_fn: A callable that takes an `Operation` and returns a numeric load value for that op. """ self._num_tasks = num_tasks self._load_fn = load_fn self._ps_loads = np.zeros(num_tasks)
def __call__(self, op): """Choose a ps task index for the given `Operation`.
Args: op: A `Operation` to be placed on ps.
Returns: The next ps task index to use for the `Operation`. Greedily places the op on the least-loaded ps task so far, as determined by the load function. """ task = np.argmin(self._ps_loads) self._ps_loads[task] += self._load_fn(op) return task

def byte_size_load_fn(op): """Load function that computes the byte size of a single-output `Operation`.
This is intended to be used with `"Variable"` ops, which have a single `Tensor` output with the contents of the variable. However, it can also be used for calculating the size of any op that has a single output.
Intended to be used with `GreedyLoadBalancingStrategy`.
Args: op: An `Operation` with a single output, typically a "Variable" op.
Returns: The number of bytes in the output `Tensor`.
Raises: ValueError: if `op` does not have a single output, or if the shape of the single output is not fully-defined. """ if len(op.outputs) != 1: raise ValueError("Op %s must have a single output" % op) output = op.outputs[0] elem_size = output.dtype.size shape = output.get_shape() if not shape.is_fully_defined(): # Due to legacy behavior, scalar "Variable" ops have output Tensors that # have unknown shape when the op is created (and hence passed to this # load function for placement), even though the scalar shape is set # explicitly immediately afterward. shape = tensor_shape.TensorShape(op.get_attr("shape")) shape.assert_is_fully_defined() return shape.num_elements() * elem_size
复制代码


上面是 GreedyLoadBalancingStrategy 策略的源码,还是比较清晰明了的。该策略是根据参数的内存字节大小 shape.num_elements()* elem_size 来选择放置到内存合适的 ps 节点上,每次都会选择当前放置内存被占用最少的那个 ps 节点来放置。所以其实可以自己定义放置策略,比如先把变量按大小排序后,然后每次初始化的时候向当前负载最小的 ps 上放。


以上讨论的都还是小字节的参数,每个 PS 节点都还可以单独处理一个变量。当遇到超大字节的变量,比如千万甚至亿级别的 embedding 特征,该如何处理?TensorFlow 提供了一个分割变量的方法,对于这种超大字节的变量,可使用一个分隔符,把这个变量分成多个部分,分发到不同的 ps 节点上去,如下图所示:



参考文献:


https://blog.csdn.net/tiangcs/article/details/85952007


https://blog.csdn.net/u012133034/article/details/81167040


https://www.youtube.com/watch?


本文转载自 Alex-zhai 知乎账号。


原文链接:https://zhuanlan.zhihu.com/p/90234576


2019-11-29 08:00971

评论

发布
暂无评论
发现更多内容

Vuex整洁架构之道

devpoint

vuex vue架构 mapGetters mapState mapActions

声网 Agora 初体验

若尘

声网 Agora

阿里P8架构师分享私用Java学习资料(含视频和项目源码以及面试题)

北游学Java

Java 数据库 分布式 微服务

手撕83K STAR的Axios设计思想,并进行能力增强

梁龙先森

源码分析 大前端 axios

继续探究:一文理清JVM和GC(下)

比伯

Java 架构 程序人生 计算机 技术宅

安卓开发基础面试题,分享一点面试小经验,含BATJM大厂

欢喜学安卓

android 程序员 面试 移动开发

自定义Hadoop的输入格式

小舰

4月日更

HBase的rowKey设计技巧

五分钟学大数据

HBase 4月日更

架构实战营 - 模块 2- 作业

泄矢的呼啦圈

架构实战营

Linux rm 命令

一个大红包

4月日更

园区网的网关部署在接入层还是汇聚层 面试官与求职者之间谈话

艺博东

网关 交换机 网络配置

Prometheus官方文档Querying[三]function

卓丁

ffmpeg完美实现解封装操作!

txp

音视频

JavaScript 中的执行上下文和执行栈 的理解

程序员海军

JavaScript 大前端 上下文 执行栈

Markdown使用语法

Geek_6370d5

markdown语法

推荐18个 Vue常用组件库

程序员海军

Vue 组件库

如何高效率的工作

程序员海军

工作效率 提升效率

移动端混合开发选型方案分析

花花

移动开发 移动端 签约计划

自己挖的坑,自己填|靠谱点评

无量靠谱

一文带你了解如何排查内存泄漏导致的页面卡顿现象

零一

chrome 大前端 浏览器 内存泄露 问题处理

【签约计划】调查采访能力考核成绩公布

InfoQ写作社区官方

签约计划 热门活动

Nginx新增模块more_clear_headers问题记录

六维

nginx 4月日更

再谈日更公众号

彭宏豪95

写作 感悟 微信公众号 4月日更

机器学习水水笔记之——世界是积木吗?

Nydia

签约计划

一篇文章带你彻底了解MySQL各种约束

若尘

MySQL 数据库 约束 4月日更

面向小白使用Git 的手册

程序员海军

git

ARTS - week 5

steve_lee

M2-task

Focused

ARTS - week 6

steve_lee

和面试官简单聊聊 Elasticsearch

escray

elasticsearch elastic 4月日更 技术编辑能力考核

Java检查异常、非检查异常、运行时异常、非运行时异常的区别

Sakura

4月日更

分布式tensorflow相关源码阅读笔记1:参数设备分配_数据库_Alex-zhai_InfoQ精选文章