简单总结一下主流深度学习框架 Tensorflow 和 Pytorch 的分布式训练策略。

# 分布式训练概述

随着计算机算力逐渐提高,神经网络的参数量规模也逐步增大,之前一个情感分析的LSTM模型可能只需要1GB不到的显存,现在一个 BERT 起步就是 11GB,CV领域更是夸张。伴随着参数量和数据量的增大,单机单卡训练以及不能满足科研的需求,因此分布式训练逐渐进入人们视野。

分布式训练与研发中的分布式系统相似,将资源分布在一个集群内运行,提高并行效率。具体分为两种情况:模型并行数据并行

模型并行指将模型的不同层分别运行在不同的GPU中,每个GPU只计算和更新自己负责的隐藏层,以解决模型参数量过大出现的 OOM 问题。

数据并行指在多个卡上运行同一份模型,但是输入不同的数据,实现多个 batch 同时并行计算,加快模型训练速度。数据并行根据并行计算的方式不同由分为同步并行异步并行

  • 同步并行指每个卡上的数据计算完后,通过某些通信方式统一计算结果,并更新梯度,训练时间取决于计算最慢的那张卡,多卡间的数据同步策略也会影响训练速度。
  • 异步并行指每个卡独自进行数据计算和梯度更新,涉及模型间参数及时同步的问题(略复杂)。

另外,分布式训练还有单机多卡和多机多卡(集群)区别,单机多卡往往是单进程多卡训练,而多机多卡涉及到进程间通信问题,通常是通过TCP或MPI进行数据通信。目前大多数情况下论文训练都是通过 单机多卡的异步数据并行 进行分布式训练。

需要注意的是,由于分布式训练需要不同卡(甚至不同进程)间进行通信,所以一个模型如果想要改成分布式训练,或多或少都需要进行代码修改,分布式训练与直接写一个普通的模型还是有一定区别的。

# 一些概念定义

在分布式中涉及一些集群间的通信方法:

scatter: 分割资源,将数据按某一维度分割成若干部分,分给集群中的终端。

broadcast: 广播资源,将数据复制给集群内的每一个终端。

gather: 聚集资源,将集群中的资源集中到某一个终端。

all_gather: 集群聚集资源,通过某种通信方式,将集群中的资源集中在每个终端

reduce: 汇总资源,将集群中的资源以某种计算方法汇总到某个终端(默认是求和)。

all_reduce: 集群汇总资源,通过某种通信方式,将集群中的资源以某种计算方法汇总到每个终端(默认求和)。

Pytorch中的图例很好的解释了这几种集群通信的区别:

# TensorFlow 分布式

参考版本 TensorFlow 2.0

v1.12 后, tf 将自己的分布式训练进行了更新,共四种不同的分布式策略,统一使用 tf.distribute 进行管理。

就在总结的过程中,tf 2.0 手册又更新了一个 CentralStrorageStrategy ,看来 TensorFlow 还是很重视分布式训练这一块功能的。

# MirroredStrategy

MirroredStrategy 是一种单机多卡同步训练方法。Mirrored 会在每张卡复制一份模型,各卡计算梯度后进行all-reduce,独立更新梯度。多卡间通信默认使用 Nvidia NCCL(通信方式的差异将在另外一篇博客中单独总结),每张卡计算完 tensor 后将对所有卡可见,之后进行梯度更新。

MirroredStrategy 使用方法相对简单,以 Keras 为例,其他接口可参阅官方文档

# 规定使用的 GPU
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
# 在分布式策略下创建模型
with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  model.compile(loss='mse', optimizer='sgd')
# 输入数据,进行训练
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
1
2
3
4
5
6
7
8
9
10
11

需要注意的是, MirroredStrategy 是将整个 batch 平均分给各个卡(在上面的例子中,gpu0 和 gpu1 各得到 batch_size 为 5),因此如果想规定每个卡训练的batch数量,可以使用 strategy.num_replicas_in_sync 来计算整个 batch:

# Compute global batch size using number of replicas.
# 每张卡得到的batch
BATCH_SIZE_PER_REPLICA = 5
# 整体 batch
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
# 另外,当batch扩大时,一般会根据batch来扩大学习率
LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
1
2
3
4
5
6
7
8
9
10

MirroredStrategy 是分布式策略中最简单的一种,当然它的局限性也较大,只能在单机上运行,当单机GPU数量不足时则略显乏力。

注:其实 keras 自己有单机多卡的训练接口 keras.utils.multi_gpu_model ,不得不说 TensorFlow 的接口真的是“百花齐放”,一种目的可以用10种方法来实现,期待2.0能对接口有一个整齐的定义。

# CentralStrorageStrategy

就在总结这篇文章的第3天,TensorFlow 2.0 文档又更新了新的分布式策略 CentralStrorageStrategy,等于现在有5种分布式方法了。

根据官方介绍,CentralStorageStrategy 是一种单机同步数据并行训练方法, 与 Mirrored 不同的是,它不再将变量进行复制,它将所有变量储存在CPU中,将所有计算方法拷贝到每个GPU中。但是如果只有一块GPU参与计算,所有的变量和计算方法都将存于GPU中。

从文档解释来看,CentralStorage 中的 GPU 将只负责计算工作,它根据自身的计算方法从 CPU 中获取权重,然后将计算结果返回。变量的更新操作由 CPU 来维护(但是计算应该还是在 GPU 中,可能默认第一块 GPU 负责梯度更新),不涉及复杂的多卡通信算法,瓶颈将存在于GPU与CPU间的通信带宽与 Pytorch 的 DP 方法类似,GPU 与 CPU 间频繁的变量交换可能会对分布式效率产生影响。

截止目前(2019.5.10)官方还未对该分布式有更多说明,API手册也没找到相关接口代码,对于其细节需要等Google更新后再进一步探究。

# MultiWorkerMirroredStrategy

以下简称 MWM, 和 Mirrored 相似,均为同步数据并行策略。MWM可以进行多机多卡的分布式训练,与最传统的 Param Servers 架构不同的是,MWM 没有参数服务器,所有的 GPU 都是 worker,所有的worker 拥有一份复制的模型,计算梯度后进行 All_reduce 然后独立更新。多卡间通信默认为自动调整(AUTO),将根据卡数量和结构在NCCLRING 中进行切换。(目前只有 MWM 支持 Ring-based)

MWM 使用前需要定义 workers 列表:

# workers 数量
NUM_WORKERS = 1
# 每个 workers 地址
IP_ADDRS = ['localhost']
# workers 端口,与地址相对应
PORTS = [12345]
# 将配置写入环境变量
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ['%s:%d' % (IP_ADDRS[w], PORTS[w]) for w in range(NUM_WORKERS)]
    },
    'task': {'type': 'worker', 'index': 0}
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14

列表中第一个定义的 worker 将作为 leader,负责变量的初始化及分发工作。(多卡通信方式的不同将对架构产生较大的影响)

截至目前(2019.5.10), MWM会使用workers内的所有GPU(暂不能指定),并且会假定所有 workers 的GPU数量是相等的(显然现在不相等的workers会报错),未来会移除这样的假设。

官方给了一个使用 Estimator 进行 MWM 训练的例子,由于本人对Estimator接口不是很熟悉,因此不做更多说明,有意可移步官方教程。另外,如果想用 Keras 实现 MWM 训练,可以先关闭 eager, 然后参照 Mirrored 的例子进行训练:

tf.compat.v1.disable_eager_execution()
1

另一个事情: Another thing to note is that when using MultiWorkerMirorredStrategy for multiple workers with Keras, currently the user will have to explicitly shard or shuffle the data for different workers, but we will change this in the future to automatically shard the input data intelligently.

# TPUStrategy

TPUStrategy 顾名思义,使用 TPU 进行分布式训练。不过 TPU 资源不是人人都能接触到,就不做过多解释了,会用 TPU 的也不会对分布式训练方法感到陌生。

# ParameterServerStrategy

Param 是 TensorFlow 最初的分布式训练方法,它由若干个 param servers 和若干 worker servers 构成,param 用于储存参数变量,worker用于计算。 Param 严格来说是一个异步数据并行训练方法,每个 worker 自己负责参数的更新工作。

Param 训练的过程略显复杂,需要结合 K8s 来创建不同的 param servers 和 worker servers 以及对应的 clusters(这也是 Horovod 吐槽的地方),官方似乎也在逐渐摒弃这种训练方法,移除了对应的“新手教程”,将 Param 移到了 tensorflow/ecosystem中,真正成为了 K8s 中的一部分。因此想要了解 Param 工作原理需要对 k8s + kubeflow 有一定了解,才能熟练运用 Param 进行训练。

# Pytorch

参考版本:1.1.0

本人对 Pytorch 了解较少,因此此处的介绍更多来自于对官方文档的理解

Pytorch 自出生后就备受好评,其动态图计算和更加python化的代码风格受到众多人追捧,其分布式计算方法也继承了 torch 简洁明了的特点,相较于 TensorFlow 复杂且没什么用处的分布式逻辑,pytorch 分布式相当简洁明了。

# 模型并行

模型并行在实际中应用较少,pytorch 使用 to(device_id) 方法来指定模型的位置:

import torch
import torch.nn as nn
import torch.optim as optim
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = torch.nn.Linear(10, 10).to('cuda:0')
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to('cuda:1')
    def forward(self, x):
        x = self.relu(self.net1(x.to('cuda:0')))
        return self.net2(x.to('cuda:1'))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# DataParallel

Pytorch 使用 nn.DataParallel(model) 实现单机多卡数据并行训练

# 定义 model
class Model(nn.Module):
    ...
# 数据
batch_size = 30
data_size = 100
rand_loader = DataLoader(dataset=OurDataSet,batch_size=batch_size, shuffle=True)
# 创建模型
model = Model(input_size, output_size)
# 开启数据并行,指定显卡
if torch.cuda.device_count() > 1:
  model = nn.DataParallel(model, device_ids=[0, 1, 2])
# 指定 leader,用于集中处理
device = torch.device("cuda:0")
model.to(device)
# 运行
for data in rand_loader:
    input = data.to(device)
    output = model(input)
    print("Outside: input size", input.size(),"output_size", output.size())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

DataParallel 会将模型复制到每个 GPU 中,并将输入的 batch 平分给每个 GPU (例如此处batch 为30,共2张卡,则每张卡得到batch 为15),计算出最后结果后返回给 leader GPU进行汇总。DP 策略的瓶颈在于 leader GPU 的通信带宽,因为所有卡的计算结果都需要汇总到 leader,由主卡更新参数后,再将新的参数广播给所有的卡。

DataParallel 是 Pytorch 最简单的单机多卡训练模式,也是应用最广泛的训练方法。

# DistributedDataParallel

在 DataParallel 前加一个 Distributed 就产生了新的数据并行训练方法,简称 DDP. 之前收集各种资料时看到有人说 DP 是用于单机多卡,而 DDP 用于多机多卡,实际不然。 DPP 相较于 DP 最明显的变化在于前面多了个 D (废话)—— 它由原先的单进程多卡模式变为了多进程模式,各个进程相互独立,由 DP 的进程内变量通信转化为了进程间的通信,从系统层面提高了通信效率。

官方强烈建议使用 DPP 进行分布式训练,即使是在单机多卡上 DDP (1 process on 1 gpu) 的表现也显著优于 DP. 我查阅了许多资料来求证为什么 DDP 的表现要优于 DP,官方文档中对于 DP 和 DDP 有这样一段描述:

In the single-machine synchronous case, torch.distributed or the torch.nn.parallel.DistributedDataParallel() wrapper may still have advantages over other approaches to data-parallelism, including torch.nn.DataParallel() :

  • Each process maintains its own optimizer and performs a complete optimization step with each iteration. While this may appear redundant, since the gradients have already been gathered together and averaged across processes and are thus the same for every process, this means that no parameter broadcast step is needed, reducing time spent transferring tensors between nodes.
  • Each process contains an independent Python interpreter, eliminating the extra interpreter overhead and “GIL-thrashing” that comes from driving several execution threads, model replicas, or GPUs from a single Python process. This is especially important for models that make heavy use of the Python runtime, including models with recurrent layers or many small components.

结合我对于两种不同架构的理解,总结一下其大意是:

  1. DDP 中每个进程拥有自己独立的模型和优化过程,梯度计算通过 All_Reduce 进行共享(NCCL方式);而 DP 架构在每张卡计算结果后需要与主卡进行变量交换,主卡带宽吞吐量将成为主要瓶颈。因此 DDP 架构的通信效率会高很多。

  2. DDP 中每个进程独立享有CPU资源,因此在运行时更快,而 DP 中所有的worker共享了一个 GPU 资源(单进程)。

在实战中,DDP 运行有两种方式:

  1. 1 process on all GPUs

    在 1-on-all 运行模式中,每个节点(物理机)只运行一个进程,进程调用该节点的所有 GPU 资源,这与 tensorflow 里的 MWM 工作模式一致。

  2. 1 process on 1 GPU

1-on-1 模式中一个进程只调用一个 GPU 资源,因此如果需要用N个GPU就要运行N个进程,尽管进程管理略复杂但是资源自由度最高,这也是官方最推荐的运行方式。

本人没有深入了解过 Pytorch 开发,因此不再举例分布式训练方法,有兴趣的读者可以去官方教程(下方参考资料里也有)查看相关API介绍。


参考资料:

[1] tensorflow 分布式训练向导

[2] tensorflow 分布式 api 手册

[3] pytorch 分布式训练 DP 向导

[4] pytorch 分布式训练 DDP 向导

[5] pytorch 分布式训练 API 手册

[5] pytorch 关于 DP 与 DDP 解释