目录
GPipe是一个基于 Lingvo (Lingvo 是 Google 基于 TensorFlow 二次开发的重点针对序列模型的框架)开发的,支持超大规模模型的神经网络训练并行库,本文介绍其基本功能和流水线机制。
GPipe是一个基于 Lingvo (Lingvo 是 Google 基于 TensorFlow 二次开发的重点针对序列模型的框架 https://github.com/tensorflow/lingvo)开发的,支持超大规模模型的神经网络训练并行库,其特点如下:
深度学习框架本质上是一个基于张量(Tensor)之间的计算(Operator)表达式所组成的计算图(Graph)编译执行引擎,提供了一系列张量的定义、一元操作、二元操作等数学原语,并根据反向传播算法(Back Propagation)进行梯度自动求导以及模型更新。在大量数据分批次流入计算图进行模型训练之后,使得模型学习到数据中的内在关联关系,从而获得对应场景中的“智能”感知与判断能力。
DNN训练的目标是在尽可能短的时间内获得一个高精度的模型。这一目标可以通过两个指标来实现:
GPU最主要提供的是两种资源:计算资源 和 显存带宽资源。所以训练大型模型有两个基本挑战:显存效率和计算效率。
深度学习框架性能优化的最终目标是深度学习模型训练最快,从而使得完成训练的时间最短,节省模型训练开发周期和用户的时间成本。
业界采用一些并行机制达到优化的目的。
本节以下主要参考如下文章:
Efficient Large-Scale Language Model Training on GPU Clusters
DeepSpeed: Extreme-scale model training for everyone
[译] DeepSpeed:所有人都能用的超大规模模型训练工具
PipeDream: Fast and Efficient Pipeline Parallel DNN Training。
在 "Efficient Large-Scale Language Model Training on GPU Clusters" 论文中, NVIDIA 介绍了分布式训练超大规模模型的三种必须的并行技术:
数据并行(Data Parallelism)是最常见的方法。其特点如下:
数据并行性具有几个明显的优势,包括计算效率高和实现起来工作量小,这使得数据并行训练在一些流行的具有高计算通信比的模型上运行良好,但有几个重要的趋势威胁着它的有效性:
模型并行在传统上用于训练过程中太大而无法保存在工作者内存或缓存中的模型。其特点如下:
就其本质而言,模型并行性的计算和通信因模型结构而异,因此在实现上有很大的工作量。
然而,即使模型并行能够训练非常大的模型,传统的模型并行也会导致计算资源的严重利用率不足,因为它一次只主动使用一个worker(如果每个层被分配给一个worker),或者不能重叠计算和通信(如果每个层被分区)。
流水并行(Pipeline Model Parallelism)在有的论文里叫做流水线级别的模型并行,其特点是:
但是流水线并行依然有一些问题:
给定一个特定的神经网络模型和一批计算资源,从任务到设备之间的映射有多种方式,但不同的映射方案运行效率不同。哪种方案最优既取决于作业本身的特性,也取决于底层硬件的拓扑。
神经网络由很多局部计算搭建组成,一般来说,同一个神经网络的不同算子可能适合不同的并行模式。一个局部计算是采用数据并行,还是模型并行取决于这个局部任务的计算传输比。比如:
因此,对于每一个任务选择最优的并行模式是一个非常复杂的问题,需要具体情况具体分析。
目前已有的深度学习框架,大多数提供了对数据并行的原生支持,但是对模型并行支持的还不完善。如果用户想要将模型参数分配到不同设备上,往往会遇到需要人工指定模型切分方式、手工编写数据通信逻辑代码等问题。
我们就看看 Pytorch 如何手动指定,主要摘录(翻译):
https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html
PyTorch以Tensor为基本单元,更符合算法工程师写Python脚本的直觉,以面向对象的方式进行模型搭建和训练。对Tensor进行赋值、切片,就像numpy一样易用。
PyTorch 是单卡视角,一个设备上的 Tensor、模型脚本跟另一个设备上的 Tensor、模型脚本并无直接关系,对于每个设备上的模型脚本都完全对称的(Mirror)最简单的数据并行来说,PyTorch 这样的设计没有什么明显的缺陷。每个设备上的脚本运行到相同 batch 的模型更新部分(Optimizer),统一做一次模型同步(AllReduce 操作)就完成了数据并行,这就是 PyTorch 的 DDP(DistributedDataParallel)模块。
但在分布式情况下想把一个Tensor切分到不同机器上,需要手动构建传输过程,相当于直接对物理编程,所以对分布式使用的门槛更高。
PyTorch 模型并行将单个模型拆分到不同的GPU上,而不是在每个GPU上复制整个模型(具体来说,假设模型 m
包含10层。如果使用 DataParallel
,则每个GPU都具有这10层中每个层的副本,而如果在两个GPU上使用模型并行时,每个GPU可以托管5层)。
模型并行的高级思想是将模型的不同子网络放置在不同的设备上,并相应地实现该forward
方法以跨设备移动中间输出。由于模型的一部分只在任何单个设备上运行,因此一组设备可以共同服务于一个更大的模型。
让我们从包含两个线性层的玩具模型(toy model)开始。要在两个GPU上运行此模型,只需将每个线性层放在不同的GPU上,然后移动输入(input)和中间输出(intermediate outputs)以匹配层设备(layer devices)。
import torch
import torch.nn as nn
import torch.optim as optim
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = torch.nn.Linear(10, 10).to('cuda:0') # 将net1放置在第1个GPU上
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to('cuda:1') # 将net2放置在第2个GPU上
def forward(self, x):
x = self.relu(self.net1(x.to('cuda:0')))
return self.net2(x.to('cuda:1'))
请注意对于 ToyModel
,除了五个用于将线性层(linear layers)和张量(tensors)放置在适当的设备上的to(device)
调用之外,以上内容与在单个GPU上实现该功能非常相似。这是模型中唯一需要更改地方(即to(device)
)。 backward()
和 torch.optim
会自动关注梯度(gradients),就好像模型是一个GPU一样。调用损失函数时,只需确保标签(label)与输出(output)在同一设备(on the same device)上。
model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.paraeters(), lr=0.001)
optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to('cuda:1') # ToyMode 的 output 是在 'cuda:1' 上,此处的 label 也应该置于 'cuda:1' 上
loss_fn(outputs,labels).backward()
optimizer.step()
只需更改几行,就可以在多个GPU上运行现有的单GPU模块。以下代码显示了如何分解 torchvision.models.reset50()
为两个GPU。思想是从现有 ResNet
模块继承,并在构建过程中将层拆分为两个GPU。然后,覆盖 forward
方法来缝合两个子网,通过相应地移动中间输出。
from torchvision.models.resnet import ResNet, Bottleneck
num_classes = 1000
class ModelParallelResNet50(ResNet):
def __init__(self, *args, **kwargs):
super(ModelParallelResNet50, self).__init__(
Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)
self.seq1 = nn.Sequential(
self.conv1,
self.bn1,
self.relu,
self.maxpool,
self.layer1,
self.layer2
).to('cuda:0') # 放置在第1个GPU上
self.seq2 = nn.Sequential(
self.layer3,
self.layer4,
self.avgpool,
).to('cuda:1') # 放置在第2个GPU上
self.fc.to('cuda:1')
def forward(self, x):
x = self.seq2(self.seq1(x).to('cuda:1'))
return self.fc(x.view(x.size(0), -1))
对于模型太大而无法放入单个GPU的情况,上述实现解决了该问题。但是,你可能已经注意到,如果模型合适,它(model parallel)将比在单个GPU上运行要慢。这是因为在任何时间点,两个GPU中只有一个在工作,而另一个在那儿什么也没做。在 layer2
和 layer3
之间,中间输出需要从 cuda:0
复制到 cuda:1
,这使得性能进一步恶化。
在整个执行过程中,两个GPU中的一个会处于空闲状态。为了解决这个问题,有一种选择是将每个批次进一步划分为拆分流水线,以便当一个拆分到达第二子网时,可以将下一个拆分馈入第一子网。这样,两个连续的拆分可以在两个GPU上同时运行。
在以下实验中,我们将每批次 120-image 进一步划分为 20-image 。当PyTorch异步启动CUDA操作时,该实现无需生成多个线程即可实现并发。
class PipelineParallelResNet50(ModelParallelResNet50):
def __init__(self, split_size=20, *args, **kwargs):
super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
self.split_size = split_size
def forward(self, x):
splits = iter(x.split(self.split_size, dim=0))
s_next = next(splits)
s_prev = self.seq1(s_next).to('cuda:1')
ret = []
for s_next in splits:
# A. s_prev runs on cuda:1
s_prev = self.seq2(s_prev)
ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
# B. s_next runs on cuda:0, which can run concurrently with A
s_prev = self.seq1(s_next).to('cuda:1')
s_prev = self.seq2(s_prev)
ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
return torch.cat(ret)
setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)
plot([mp_mean, rn_mean, pp_mean],
[mp_std, rn_std, pp_std],
['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
'mp_vs_rn_vs_pp.png')
请注意,设备到设备的张量复制操作在源设备和目标设备上的当前流(current streams)上同步。如果创建多个流,则必须确保复制操作正确同步。在完成复制操作之前写入源张量或读取/写入目标张量可能导致不确定的行为。上面的实现仅在源设备和目标设备上都使用默认流,因此没有必要强制执行其他同步。
因为每个模型的并行策略候选集合是指数级的,纯手工从中挑出一种合适的并行策略,需要耗费算法工程师大量的时间以及计算资源,而且算法工程师需要考虑的相关事宜太多,比如:如何分配内存,层之间如何交互,如何减少通信代价,分割的张量不能破坏原有数学模型,如何确定确定张量shape,如何确定输入输出等等。
所以自动并行技术(如何从框架层次自动解决并行策略选择问题)成为一个研究热点。
自动并行通过建立代价模型来预测并挑选一个较优的并行策略(暂时无法保证是最优的策略,因为挑出最优的策略是个NP-Hard的问题),有希望将算法工程师从并行策略的选择和配置中解放出来。
因此,目前分布式模型训练有几个必要并行技术:
下面我们结合 Gpipe代码看看这些技术如何使用。
Lingvo 的核心组件如下:
在Lingvo中,网络是一个层的嵌套结构。Lingvo中的大多数类都是[Lingvo/core/base_layer.py] BaseLayer 的子类。
Params
:用来配置类,定义了配置所需要的keys,这些keys在对象创建时都应该被定义。Params对象还可以包含用于配置子层的Params对象。每个layer类都会有一个params的classmethod,这个方法将会创建一个新的params对象,并且通过定义的keys来配置这个layer,同时为这些keys给出一些合理的默认值。
Params对象中属性包括:
cls
: tParams对象关联的python类。这可以用来构造类的实例;name
: 该层的名称;dtype
: 创建变量时使用的默认数据类型。__init__
constructor :所有子层和变量都应该在这里创建。
CreateVariable
:创建变量的方法。每个Layer负责创建和管理它自己的variable。
CreateChild
:创建子层的方法。
FProp
: 所有的layers都有一个FProp() 函数, 实现该层的前向传播,在计算的前向step时将会被调用。 因为可以在分布式训练时在不同的设备上执行,出于性能的考虑,Lingvo通过theta参数访问variable,而不是通过self.vars或者self.theta。
FPropMeta
: 返回该层关于FProp
计算的元数据。其中 meta.flops
在得到一个输入张量时给出估计的floating point operations数目。
对于模型算法的落地,有两个指标特别重要:
接下来,我们需要看看如何计算模型训练的内存大小,以及如何计算算力(后续流水线并行需要)。
我们主要参考了 ZeRO: Memory Optimization Towards Training A Trillion Parameter Models 这篇论文的思路。
在模型训练期间,大部分内存被以下三种情况之一消耗:
输入数据所占用的显存其实并不大,这是因为我们往往采用迭代器的方式读取数据,这意味着我们其实并不是一次性的将所有数据读入显存,而这保证每次输入所占用的显存与整个网络参数来比是微不足道的。
我们逐一分析。
对于激活函数,有如下特点:
模型自身的参数指的就是各个网络层的 Weight 和Bias,这部分显存在模型加载完成之后就会被占用。另外需要注意到的是,有些层是有参数的,如CNN, RNN; 而有些层是无参数的, 如激活层, 池化层等。
优化器参数指的是模型在优化过程即反向传播中所产生的参数, 这部分参数主要指的就是 dw, 即梯度,在SGD中, 其大小与参数一样, 因此在优化期间, 模型参数所占用的显存会翻倍。
值得注意的是,不同的优化器其所需保存的优化参数不同, 对于 Adam, 由于其还需要保存其余参数, 模型的参数量会在优化区间翻 4 倍。
对于OGP状态,让我们以ADAM为例。使用ADAM对具有Ψ个参数的模型进行混合精度训练。
总的来说,OGP状态有2Ψ+2Ψ+KΨ=16Ψ字节(混合精度ADAM的K=12)的内存需求。
具体如下:
蓝色是参数,橙色是梯度,绿色是优化器状态。
在内存消耗公式中,Ψ表示模型大小(参数个数),K表示优化器状态的内存乘数,Nd表示数据并行度。在这个例子中,我们假设模型大小为Ψ=75亿,基于Adam优化器的混合精度训练,数据并行度为Nd=64(即64个GPU),K=12。
对于一个拥有15亿个参数的GPT-2这样的模型,这导致了至少24gb的内存需求,远远高于单独保存fp16参数所需的3gb内存。
临时缓冲区是用于存储临时结果的缓冲区,例如,对于参数为15亿的GPT-2模型, fp32缓冲区将需要6GB的内存。
前向传播时所需的计算力就是由FLOPs体现,那么FLOPs该怎么计算呢?
我们知道,在一个模型进行前向传播的时候,会进行卷积、池化、BatchNorm、Relu、Upsample等操作。这些操作的进行都会有其对应的计算力消耗产生,其中,卷积所对应的计算力消耗是所占比重最高的。所以,我们以卷积操作为例,看看卷积所对应的计算力。
推导过程:卷积层 wx + b 需要计算两部分,首先考虑前半部分 wx 的计算量:
令 :
则对于输出 feature map 上的单个 Unit 有:
k * k * c 次乘法,以及 k * k * c - 1 次加法
如果输出 feature map 的分辨率是 H * W ,且输出 o 个 feature map,则输出 feature map 包含 Unit的总数就是 H * W * o。
因此,该卷积层在计算 wx 时有:
k * k * c * H * W * o 次乘法 --(1)
(k * k * c - 1) * H * W * o 次加法 --(2)
再考虑偏置项 b 包含的计算量:
由于 b 只存在加法运算,输出 feature map 上的每个 Unit 做一次偏置项加法。因此,该卷积层在计算偏置项时总共包含:
H * W * o 次加法 --(3)
将该卷积层的 wx 和 b 两部分的计算次数累计起来就有:
式(1) 次乘法:
k * k * c * H * W * o 次乘法
式(2) + 式(3) 次加法:
(k * k * c - 1) * H * W * o + H * W * o = k * k * c * H * W * o
可见,式(2) + 式(3) = 式 (1)
因此,对于带偏置项的卷积层,该层的计算力消耗 为:
k * k * c * H * W * o
在 Lingvo 之中,具体计算算力,就是通过每个类的 FPropMeta
完成,这些方法都是每个类根据自己的特点实现。我们具体找几个例子看看如何计算FLOPS。
Conv2DLayerNoPadding如下计算:
@classmethod
def FPropMeta(cls, p, inputs):
py_utils.CheckShapes((inputs,))
b, h, w, c = inputs
fh, fw, ic, oc = p.filter_shape
assert ic == c
sh, sw = p.filter_stride
if p.padding == 'SAME':
oh = sympy.ceiling(h / sh)
ow = sympy.ceiling(w / sw)
else:
oh = sympy.ceiling((h - fh + 1) / sh)
ow = sympy.ceiling((w - fw + 1) / sw)
flops = b * oh * ow * fh * fw * ic * oc * 2 # mul/add counts as 2 flop.
outputs = tshape.Shape([b, oh, ow, oc])
return py_utils.NestedMap(flops=flops, out_shapes=(outputs,))
DropoutLayer 如下计算:
@classmethod
def FPropMeta(cls, p, inputs, *args):
py_utils.CheckShapes((inputs,))
flops_per_element = 10 # Approximately 10 flops per element.
return py_utils.NestedMap(
flops=inputs.num_elements() * flops_per_element, out_shapes=(inputs,))
BatchNormLayer 的 FLOPS 如下计算。
@classmethod
def FPropMeta(cls, p, inputs, padding=None):
py_utils.CheckShapes((inputs,))
return py_utils.NestedMap(
flops=inputs.num_elements() * _BN_FLOPS_PER_ELEMENT,
out_shapes=(inputs,))
ActivationLayer 如下计算:
@classmethod
def FPropMeta(cls, p, inputs):
py_utils.CheckShapes((inputs,))
return py_utils.NestedMap(
flops=inputs.num_elements() * GetFlops(p.activation),
out_shapes=(inputs,))
无论是数据并行还是模型并行,都会在相应的机器之间进行全连接的通信,当机器数量增大时,通信开销和时延会大到难以忍受。
比如参数服务器使用中,三段式流程如下:Pull weight ---> Compute new weight ---> Push new weight
。
如果三段式流程串行的进行通信和计算,无论这个通信是快是慢,这个时间开销都会导致在分布式环境下每个iteration的时间比单机版要长(Ethernet的带宽高低或者latency大小只会影响这个时间的长短,但并不能把这个时间降到零)。所以,把通信和计算重叠(overlap)起来以便 “掩盖” 通信时间几乎是一个必须的步骤。如何重叠计算和传输从而提高设备利用率就非常有挑战。
原则上我们可以通过并行计算在GPU或者TPU上训练更大的DNN模型。但是由于DNN的顺序性,这种方法可能导致在计算期间只有一个加速器处于活动状态,不能充分利用设备的计算能力。
回到神经网络的训练过程上,怎么设计系统来重叠计算和通信?
在后向传播之中有两个特点可以利用:
因此,根据这个特点,人们引入了流水线并行。
数据并行与模型并行都是让设备执行同一个层次的计算,流水并行则是把任务划分为几个有明确先后顺序的阶段,把不同的阶段分给不同的计算设备,使得单设备只负责网络中部分层的计算。模型网络分布在各个设备上是非对称的,各个设备“接力”执行网络的一部分。每个阶段(stage) 和下一个阶段之间仅有相邻的某一个 Tensor 数据需要传输,每台机器的数据传输量跟总的网络大小、机器总数、并行规模无关。
在这种多设备接力完成一个网络计算的模式下,可以支持更大的模型或者支持更大的 Batch Size。如果通信和计算重叠(overlap)好,又可以解决了机器之间的通信开销的问题,
总的来说,流水并行在通常大模型训练情况下具有优势。流水并行的数据传输量少,仅为阶段之间需要传输的数据量之和,不像数据并行与模型并行那样大,传输量与整个计算图都有关,因此对于带宽较小的机器,会趋于使用流水并行。但某些情况下,流水并行与模型并行的结合则会优于单一的模型并行与流水并行。同时,在数据并行与模型并行中也存在计算时间掩盖传输时间的优化。
从概念上讲,GPipe是一个分布式机器学习库,它使用同步随机梯度下降和流水线并行的方式进行训练,适用于任何由多个有序的层组成的深度神经网络(Deep Neural Networks, DNN)。 Gpipe通过跨不同的加速器来分割模型,并自动将一小批训练示例划分成更小的批量。 该模型允许GPipe的加速器进行并行操作,最大限度地提高了训练过程中的可扩展性。
GPipe 有几个关键要点:
Network partition(网络分片):将一个N层的网络划分成K个partition, 每个partition在单独的TPU上执行,partition之间需要插入一些网络通信操作。
Pipeline parallelism(流水线并行):把CPU里的流水线并发技术用在了深度学习上,主要是把计算和网络通信两种操作,更好地重排列。即自动将mini-batch的训练样本分成更小的micro-batch,并在pipeline中运行,使TPU核心能够并行操作。
Gradient Accumulation(梯度累积) :梯度一直在micro-batch中累积,因此分区数量不会影响模型质量。
Re-Materialization(重计算) :Re-Materialization具体是指在前向计算过程中,GPipe只记录stage划分处的输出,在计算梯度时,GPipe会重新执行前向计算逻辑,从而得到各个算子的前向结果,然后再计算梯度结果。跟OpenAI开源的gradient-checkpointing一样,只不过GPipe是在TPU上实现的,OpenAI的只能运行在GPU上。
我们根据论文思路,提出了几个问题,争取以后按图索骥。
如何划分 stage?
依据什么分割做流水?
如何做Re-Materialization?
神经网络有一个特点:对不同的输入,其运行时间相差不大,因此可以预估其算力,时间,参数大小等等。Gpipe就是依据算力对图进行了分割,从而把不同层分配到不同的设备上。
PartitionSequentialLayers 把一个包括sequential layers的层分解,目的是让每一个partition都大致拥有同样的 flops。最终目的是让每个 GPU 都拥有尽量同样的算力。
逻辑是:
如果params只是一个layer,那么就把这个layer构建成一个包含sub-layers的列表 subs;
利用 FPropMeta 计算出来这个 subs 列表的shapes和总flops,赋值给了 histo;
利用 histo 计算出来一个层代价(layer's cost)的归一化累积直方图;
构建一个parts变量;
把parts转换成一个 FeatureExtractionLayer param 列表;
def PartitionSequentialLayers(params, num_partitions, *shapes):
r"""Partition a layer composed of sequential layers.
This routine strives to partition layers so that each partition costs roughly
the same flops given the input shapes.
Args:
params: A layer param or a list of layer param.
num_partitions: The desired number of partitions.
*shapes: A tuple of tshape.Shape representing input tensors to the first
layer.
Returns:
A list of FeatureExtractionLayer params.
"""
# Recursively concatenate SequentialLayer into a list.
# SequentialLayer 是一个层,其作用是把若干层按顺序连接起来
def FlattenSeq(p):
if isinstance(p, list): # 已经是列表则返回
return p
if p.cls not in [builder_layers.SequentialLayer, FeatureExtractionLayer]:
return [p.Copy()]
subs = []
for _ in range(p.repeat): # 把p包含的所有层都组装成一个层列表
for s in p.sub:
subs += FlattenSeq(s)
return subs
# 如果params是一个layer,那么就依据这个layer,构建一个包含sub-layers的新列表subs,如果是列表则直接返回
subs = FlattenSeq(params)
assert len(shapes) == 1
tf.logging.info('num_partitions: {} input_shape: {}'.format(
num_partitions, shapes[0]))
# 利用 FPropMeta 计算出来这个 subs 列表的shapes和总flops,赋值给了 histo
# Computes the estimate cost for each sub layer.
# 假设有7个sub-layers,其flops分别是 10,40,30,10,20,50,10
total, histo, output_shapes = 0, [], []
for i, s in enumerate(subs):
s.name = 'cell_%03d' % i
meta = s.cls.FPropMeta(s, *shapes) #
total += meta.flops
histo.append(total)
output_shapes.append(meta.out_shapes)
shapes = meta.out_shapes
tf.logging.vlog(1, 'len %d histogram = %s', len(subs), histo)
# 则对应的histo 为:[10,50,80,90,110,160, 170],total为170
# 利用 histo 计算出来一个层代价(layer's cost)的归一化累积直方图
# Computes the normalized cumulative histogram of the layer's cost.
histo_pct = [float(x / total) for x in histo]
tf.logging.vlog(1, 'cost pct = %s', histo_pct)
# histo_pct 为 [1/17,5/17,8/17,9/17,11/17,16/17, 1],
# 假设 num_partitions = 3
# 构建一个parts变量,该变量是一个num_partitions大小的数组,数组每个item也是一个数组
# 依据直方图把subs分到parts中的每个item之中,这样每个parts[i]都拥有部分layers,目的是让最终 parts 每个item的算力尽量相同
# i-th sub layer is put into partition j, where j is roughly i-th cumulative
# histogram times num_partitions.
parts = [[] for _ in range(num_partitions)]
parts_cost = [0] * num_partitions
pre_hist_cost = 0
for i, s in enumerate(subs):
# 从histogram数组中找出s对应cost的index,j也就是s对应的partition
# 对于i,s,则 histo_pct[i] * num_partitions 分别为: [3/17, 15/17, 24/17, 27/17, 33/17, 48/17,3],j分别为[0,0,1,1,1,2,2]
j = min(int(histo_pct[i] * num_partitions), num_partitions - 1)
# The boundary at parts[j] where j > 0
if j > 0 and not parts[j]:
parts_cost[j - 1] = histo_pct[i - 1] - pre_hist_cost
pre_hist_cost = histo_pct[i - 1]
parts[j].append(s) # 把s加入到对应的partition
# 三个桶内容分别为:[1,2],[3,4,5],[6,7]
# 对应每个桶的flops为: [60,280,330]
# 把parts转换成一个 FeatureExtractionLayer 列表
parts_cost[num_partitions - 1] = 1.0 - pre_hist_cost
seqs = []
for i, pa in enumerate(parts):
tf.logging.info('Partition %d #subs %d #cost %.3f', i, len(pa),
parts_cost[i])
seqs.append(FeatureExtractionLayer.Params().Set(name='d%d' % i, sub=pa))
return seqs
上面代码中使用了FeatureExtractionLayer,其功能就是返回一个层的序列。
FeatureExtractionLayer 从一个层序列中提取特征,具体特点是:
把一些层连接成一个序列;
可以 得到&传递 激活点;
class FeatureExtractionLayer(base_layer.BaseLayer):
"""A layer that extrac features from a sequence of layers.
FeatureExtractionLayer is a layer which connects a few layers in a sequence.
It is also capable of fetching and forwarding activation endpoints.
# TODO(huangyp): Make it a sublayer of builder_layers.SequentialLayer
"""
@classmethod
def Params(cls):
p = super().Params()
p.Define('variable_name_prefix', '',
'Prefix for variable names in sub layers')
p.Define('sub', [], 'A list of layers\' params.')
p.Define('num_act_inputs', 0, 'Number of activation inputs.')
p.Define('num_act_outputs', 0, 'Number of activation outputs.')
p.Define('act_fetch_layers', [],
'Names of fetch layers that cached extra activations')
return p
def init(self, params):
super().init(params)
p = self.params
assert p.num_act_inputs >= 0
assert p.num_act_outputs >= 0
p.act_fetch_layers = p.act_fetch_layers or []
assert p.num_act_outputs == p.num_act_inputs + len(p.act_fetch_layers)
self._seq = []
for sub in p.sub:
assert sub.name
sub.name = p.variable_name_prefix + sub.name
self.CreateChild(sub.name, sub)
self._seq.append((sub.name, self.children[sub.name])) # 把一些层连接成一个序列
def FProp(self, theta, *args): # 实现该层的前向传播,在计算的前向step时将会被调用
p = self.params
assert len(args) > p.num_act_inputs
out_args = args[:-p.num_act_inputs] if p.num_act_inputs > 0 else args
extra_args = args[-p.num_act_inputs:] if p.num_act_inputs > 0 else ()
for (name, ch) in self._seq:
th = theta[name]
out_args = _ToTuple(out_args)
out_args = ch.FProp(th, *out_args)
# Append fetched activations to fprop outputs.
for fetch_layer in p.act_fetch_layers:
assert fetch_layer in self.children
activation = self.children[fetch_layer].activation # 子层激活点
if isinstance(activation, (tuple, list)):
activation = activation[0] # 如果是list,得到相应激活点
extra_args += (activation,) # 把激活点添加进来
if extra_args:
out_args = _ToTuple(out_args) + extra_args # 最终返回所有激活点
return out_args
@classmethod
def FPropMeta(cls, p, *args): # 返回该层关于FProp
计算的元数据
assert len(args) > p.num_act_inputs
seq_args = args[:-p.num_act_inputs] if p.num_act_inputs > 0 else args
extra_args = args[-p.num_act_inputs:] if p.num_act_inputs > 0 else ()
total = 0
act_fetch_metas = {}
for sub in p.sub:
meta = sub.cls.FPropMeta(sub, *seq_args)
if sub.name in p.act_fetch_layers:
act_fetch_metas[sub.name] = meta.out_shapes[0]
total += meta.flops
seq_args = meta.out_shapes
for fetch_layer in p.act_fetch_layers:
extra_args += (act_fetch_metas[fetch_layer],)
return py_utils.NestedMap(flops=total, out_shapes=seq_args + extra_args)
计算过程如下图,里面具体数值请参见上面几段代码之中的举例:
+--------------+ +--------------+ +---------------+
| | | | | |
| sub-layer 1 | | sub-layer 2 | | sub-layer n |
| | | | | |
+-------+------+ +-------+------+ +--------+------+
| | |
|FPropMeta |FPropMeta |FPropMeta
| | |
v v v
flops 1 flops 2 flops n
+ + +
| | |
| | |
+--------------------------------------+
|
|
v
for i, s in enumerate(subs):
total += meta.flops
histo.append(total)
histo=[10,50,80,90,110,160,170]
+
|
|
v
Computes the normalized cumulative histogram of the layer's cost
histo_pct = [float(x / total) for x in histo]
histo_pct=[1/17,5/17,8/17,9/17,11/17,16/17,1]
+
|
|
+
Assign layers to partition based on histogram
[1,2],[3,4,5],[6,7]
+
|
|
v
+----------------------+----------------------------+
| parts |
| |
| +--------------+ +------------+ +-------------+ |
| | sub-layer 1 | |sub-layer 3 | | sub-layer 6 | |
| | | | | | | |
| | sub-layer 2 | |sub-layer 4 | | sub-layer 7 | |
| | | | | | | |
| | | |sub-layer 5 | | | |
| +--------------+ +------------+ +-------------+ |
+---------------------------------------------------+
该层的目的是:用 Round-robin 策略把 cell_tpl之中的每个children cell 部署到 工作设备之上。
Params 包括:
初始化函数的逻辑是:
遍历before_tpl,对于每个item调用CreateChild构建其子层,把item添加到 _before_layers 之中;
遍历cell_tpl,对于每个item调用CreateChild构建其子层,把item添加到 _cells 之中;
def init(self, params):
super().init(params)
p = self.params
self._before_layers = []
self._cells = []
# 遍历before_tpl,对于每个item调用CreateChild构建其子层,把item添加到 _before_layers 之中
for l in p.before_tpl:
self.CreateChild(l.name, l)
self._before_layers.append((l.name, self.children[l.name]))
# 遍历cell_tpl,对于每个item调用CreateChild构建其子层,把item添加到 _cells 之中
for l in p.cell_tpl:
self.CreateChild(l.name, l)
self._cells.append((l.name, self.children[l.name]))
构建变量。逻辑如下:
如果使用 tpu,则
cluster.WorkerDeviceInModelSplit(0)
来构建 before_tpl_device,即用集群的第一个设备作为 before_tpl_device;遍历 _before_layers,把其中每个变量部署在 before_tpl_device;
遍历 _cells,把其中每个变量部署在 cell_devices;
def _CreateChildrenVariables(self):
p = self.params
num_cells = len(p.cell_tpl)
before_tpl_device = ''
cell_devices = [''] * num_cells
if py_utils.use_tpu(): # 如果使用 tpu
# 利用 `cluster.WorkerDeviceInModelSplit(0)` 来构建 before_tpl_device,即用集群的第一个设备作为 before_tpl_device
cluster = self.cluster
before_tpl_device = cluster.WorkerDeviceInModelSplit(0)
# 遍历集群的其他设备,分配给cell_devices
cell_devices = [
cluster.WorkerDeviceInModelSplit(i) for i in range(num_cells)
]
# 遍历 _before_layers,把其中每个变量部署在 before_tpl_device
for unused_name, l in self._before_layers:
with tf.device(before_tpl_device):
l.InstantiateVariables()
# 遍历 _cells,把其中每个变量部署在 cell_devices
for i, (unused_name, l) in enumerate(self._cells):
with tf.device(cell_devices[i]):
l.InstantiateVariables()
super()._CreateChildrenVariables()
前向传播代码,具体逻辑如下:
遍历 _before_layers,对于其中每层调用其FProp;
遍历 _cells,对于其中每层,在cluster.WorkerDeviceInModelSplit(i)
之上调用其FProp;
def FProp(self, theta, *args):
"""Round-robin every children cells in cell_tpl among worker devices.
Args:
theta: A NestedMap object containing weights' values of this layer and its
children layers.
*args: Input args
Returns:
A list contains one tensor of [batch_size, feature_height, feature_width,
channel].
"""
num_layers = len(self.params.cell_tpl)
cluster = self.cluster
# 遍历 _before_layers,对于其中每层调用其FProp
for (name, l) in self._before_layers:
l_theta = theta[name]
args = _ToTuple(args)
args = l.FProp(l_theta, *args)
# 遍历 _cells,对于其中每层,在`cluster.WorkerDeviceInModelSplit(i)`之上调用其FProp
for i in range(num_layers):
with tf.device(cluster.WorkerDeviceInModelSplit(i)):
cell_name, cell = self._cells[i]
args = _ToTuple(args)
args = cell.FProp(theta[cell_name], *args)
return args
SeqLayer 全部代码如下:
class SeqLayer(base_layer.BaseLayer):
"""Round-robin every children cells in cell_tpl among worker devices."""
@classmethod
def Params(cls):
p = super().Params()
p.Define('before_tpl', [],
'Config for the CNN layers that runs before pipelining.')
p.Define('cell_tpl', [], 'A list of FeatureExtractionLayer layers.')
return p
def __init__(self, params):
super().__init__(params)
p = self.params
self._before_layers = []
self._cells = []
for l in p.before_tpl:
self.CreateChild(l.name, l)
self._before_layers.append((l.name, self.children[l.name]))
for l in p.cell_tpl:
self.CreateChild(l.name, l)
self._cells.append((l.name, self.children[l.name]))
def _CreateChildrenVariables(self):
p = self.params
num_cells = len(p.cell_tpl)
before_tpl_device = ''
cell_devices = [''] * num_cells
if py_utils.use_tpu():
cluster = self.cluster
before_tpl_device = cluster.WorkerDeviceInModelSplit(0)
cell_devices = [
cluster.WorkerDeviceInModelSplit(i) for i in range(num_cells)
]
for unused_name, l in self._before_layers:
with tf.device(before_tpl_device):
l.InstantiateVariables()
for i, (unused_name, l) in enumerate(self._cells):
with tf.device(cell_devices[i]):
l.InstantiateVariables()
super()._CreateChildrenVariables()
def FProp(self, theta, *args):
"""Round-robin every children cells in cell_tpl among worker devices.
Args:
theta: A NestedMap object containing weights' values of this layer and its
children layers.
*args: Input args
Returns:
A list contains one tensor of [batch_size, feature_height, feature_width,
channel].
"""
num_layers = len(self.params.cell_tpl)
cluster = self.cluster
for (name, l) in self._before_layers:
l_theta = theta[name]
args = _ToTuple(args)
args = l.FProp(l_theta, *args)
for i in range(num_layers):
with tf.device(cluster.WorkerDeviceInModelSplit(i)):
cell_name, cell = self._cells[i]
args = _ToTuple(args)
args = cell.FProp(theta[cell_name], *args)
return args
PipeliningLayer 是 SeqLayer 的派生类。
_CalculateOutputShapes 计算出中间层的output shape。具体逻辑如下:
遍历 _before_layers,对其中每层调用其FPropMeta,得到 output shapes,插入 state_shapes 数组之中;
遍历 _cells,对其中每层调用其FPropMeta,得到 output shapes,插入 state_shapes 数组之中;
def _CalculateOutputShapes(self, input_shapes):
"""Calcuate the output shape of intermediate layers.
Given the FPropMeta function in each FeatureExtractionLayer, calcuates
the shapes of outputs of that layer. This is used to recover the shape
information in StackedRecurrent.
Args:
input_shapes: NestedMap or tuple of input TensorShapes.
Returns:
Return a list of K + 1 NestedMaps or lists of tShape where K is
the number of partitions.
"""
p = self.params
shapes = []
# Converts TensorShape to tshape.Shape.
def _ToTShape(x):
if x is None:
return None
return tshape.Shape(x.as_list())
shapes = py_utils.Transform(_ToTShape, input_shapes)
shapes = _ToTuple(shapes)
state_shapes = []
# 遍历_before_layers,对其中每层调用其FPropMeta,得到 output shapes,插入 state_shapes 数组之中
for (_, cell) in self._before_layers:
shapes = cell.FPropMeta(cell.params, *shapes).out_shapes
state_shapes.append(shapes[0] if p.nested_map_fprop else shapes)
# 遍历 _cells,对其中每层调用其FPropMeta,得到 output shapes,插入 state_shapes 数组之中
for (_, cell) in self._cells:
shapes = cell.FPropMeta(cell.params, *shapes).out_shapes
state_shapes.append(shapes[0] if p.nested_map_fprop else shapes)
return state_shapes
_get_state_dtype 的作用是得到数据类型。
def _get_state_dtype(self, *args):
if self.params.state_dtype:
return self.params.state_dtype
if self.params.nested_map_fprop:
inputs = args[0].Filter(lambda x: x is not None)
return py_utils.Flatten(inputs)[0].dtype
return args[0].dtype
Gpipe 会首先将一个小批量的训练样本(mini-batch)分割成更小的小批量(micro-batches),然后将每组小批量的执行通过管道传送到单元上。
_get_input_shapes作用是得到输入的shapes,具体逻辑如下:
从 args 得到输入 input_tensors;
遍历 input_tensors,找出第一个不为空的张量,获取这个张量的 batch size,赋给 mini_batch_size;
从参数中得到 micro_batch_size,设置到 micro_batch_size;
如果 micro_batch_size 没有意义,则:
建立一个 input_shapes 集合,遍历 input_tensors,对于每个张量,得到其shapes列表 input_shape,并且设置 input_shape 的 batch_dim 为 micro_batch_size;
如果设置了 p.nested_map_fprop,则把 input_shapes 构建成一个递归嵌套的结构;
返回 input_shapes;
def _get_input_shapes(self, *args):
p = self.params
if p.nested_map_fprop:
assert len(args) == 1
assert isinstance(args[0], py_utils.NestedMap)
input_tensors = py_utils.Flatten(args[0])
else:
input_tensors = _ToTuple(args)
# 遍历 input_tensors,找出第一个不为空的张量,获取这个张量的 batch size,赋给 mini_batch_size
# Get batch size from the first tensor which is not None.
mini_batch_size = None
for input_tensor in input_tensors:
if input_tensor is not None:
mini_batch_size = input_tensor.get_shape().as_list()[p.batch_dim]
assert mini_batch_size is not None
micro_batch_size = p.micro_batch_size
if not micro_batch_size: # 如果 micro_batch_size 没有意义
# 如果 p.num_micro_batches 大于 mini_batch_size,则 p.num_micro_batches 为 mini_batch_size
if p.num_micro_batches > mini_batch_size:
p.num_micro_batches = mini_batch_size
# 把 micro_batch_size 设置为 mini_batch_size // p.num_micro_batches
micro_batch_size = mini_batch_size // p.num_micro_batches
if mini_batch_size is not None:
if micro_batch_size * p.num_micro_batches != mini_batch_size:
raise ValueError('micro_batch_size * num_micro_batches != batch_size.')
# 遍历 input_tensors,对于每个张量,得到其shapes列表 input_shape,并且设置 input_shape 的 batch_dim 为 micro_batch_size
input_shapes = ()
for input_tensor in input_tensors:
if input_tensor is not None:
input_shape = input_tensor.get_shape().as_list()
input_shape[p.batch_dim] = micro_batch_size
input_shapes += (tf.TensorShape(input_shape),)
else:
input_shapes += (None,)
# 如果设置了 p.nested_map_fprop,则把 input_shapes 构建成一个递归嵌套的结构
if p.nested_map_fprop:
input_shapes = py_utils.Pack(args[0], input_shapes)
return input_shapes
前向传播函数,用流水线方式在多个设备上运行多个 children cells。具体逻辑如下:
做一些准备工作,比如:
遍历处理中间层:
为流水线中间(去除头尾)的各个设备设定一些变量;
在第一个设备上执行如下操作:
在中间设备上执行recurrent.StackedRecurrent操作 ;
在最后一个设备上把micro_batches的形状聚合,最终得到输出张量:
如果嵌套,则返回最后一个形状;
否则遍历输出,聚合各个输出的形状;
def FProp(self, theta, *args):
"""Run multiple cells in different devices in a pipelining manner.
Args:
theta: A NestedMap object containing weights' values of this layer and its
children layers.
*args: Non-keyworded variable length argument list of input tensors.
Returns:
A list of output tensors
"""
p = self.params
if self.do_eval and self.cluster.num_devices_per_split == 1: # 如果设置了 do_eval 并且集群的 num_devices_per_split 为 1
outputs = copy.copy(args)
# 就直接串行执行
for (name, l) in self._before_layers + self._cells:
outputs = _ToTuple(outputs)
outputs = l.FProp(theta[name], *outputs)
return outputs
num_cells = len(p.cell_tpl) # 得到 children cell个数
cluster = self.cluster # 得到集群
input_shapes = self._get_input_shapes(args) state_dtype = self._get_state_dtype(args)
state_shapes = self._CalculateOutputShapes(input_shapes)
tf.logging.info('state_shapes={}'.format(state_shapes))
def GetCellFn(i): # 对于第 i 个层,返回一个对应的函数,这个函数将在 StackedRecurrent 内部执行
"""Get the ith feature extraction layer."""
def CellFn(theta, state0, inputs):
"""A cell fn is exectued inside of StackedRecurrent."""
# 没有深入研究StackedRecurrent,只从此函数看,作用是利用cell.FProp计算输出,并且得到一个state,其中包括输出和micro batch tensor
del state0
def _FPropInputSetShape(name, t_shape): # 给输入设置shape
if t_shape is None:
return None
inputs[name].set_shape(t_shape.ToTensorShape().as_list())
return inputs[name]
if p.nested_map_fprop:
# pylint: disable=protected-access
fprop_inputs = state_shapes[i]._RecursiveMap(_FPropInputSetShape)
# pylint: enable=protected-access
else:
fprop_inputs = []
for input_idx, input_shape in enumerate(state_shapes[i]):
name = 's{}'.format(input_idx)
fprop_inputs.append(_FPropInputSetShape(name, input_shape))
with py_utils.RemoveAssertContext(remove=True):
with CellFnFPropOpReplacementWrapper():
tf.logging.info('cell {} input {}'.format(i, fprop_inputs))
mb_tensor = inputs[_MICRO_BATCH_STATE_NAME] # 得到输入的 micro batch tensor
SetOverWriteGlobalStep(mb_tensor)
_, cell = self._cells[i]
fprop_inputs = _ToTuple(fprop_inputs)
outputs = cell.FProp(theta, *fprop_inputs) # 计算输出
if p.nested_map_fprop:
assert py_utils.IsCompatible(outputs, state_shapes[i + 1])
state1 = outputs.Filter(lambda x: x is not None)
else:
state1 = py_utils.NestedMap()
outputs = _ToTuple(outputs)
assert len(outputs) == len(state_shapes[i + 1])
for output_idx in range(len(outputs)):
if outputs[output_idx] is not None:
name = 's{}'.format(output_idx)
state1[name] = outputs[output_idx]
state1[_MICRO_BATCH_STATE_NAME] = mb_tensor
return state1, py_utils.NestedMap()
return CellFn
cell_fns = []
accumulator_layers = [] # 为了梯度累积
thetas = []
init_states = []
devices = []
for cell_idx in range(num_cells): # 遍历 children cell
cell_name, cell = self._cells[cell_idx] # 得到具体一个 cell
accumulator_layers.append(cell) # 把cell加入到累积层中
cell_fns.append(GetCellFn(cell_idx)) # 对于每个cell,得到对应的function
thetas.append(theta[cell_name]) # 添加 theta
# 返回一个带有形状t_shape的,类型为state_dtype的张量,并且所有元素都设为零.
def _TfZeros(t_shape):
if t_shape is None:
return None
return tf.zeros(t_shape.ToTensorShape().as_list(), dtype=state_dtype)
# 为后续的 StackedRecurrent 运行设置其初始状态
if p.nested_map_fprop:
init_state = py_utils.Transform(_TfZeros, state_shapes[cell_idx + 1])
init_state = init_state.Filter(lambda x: x is not None)
else:
init_state = py_utils.NestedMap()
for output_idx, state in enumerate(state_shapes[cell_idx + 1]):
state = _TfZeros(state)
if state is not None:
name = 's{}'.format(output_idx)
init_state[name] = state
init_state[_MICRO_BATCH_STATE_NAME] = tf.cast(0, dtype=state_dtype)
init_states.append(init_state)
# 把cell_idx对应的设备加入到devices列表
devices.append(cluster.WorkerDeviceInModelSplit(cell_idx))
cell_grads = [None] * num_cells
cell_outs = [lambda x: x] * num_cells
cell_out_grads = [lambda x: x] * num_cells
with tf.device(devices[0]):
previous = _ToTuple(args)
for (name, l) in self._before_layers: # 遍历_before_layers,运行每层的FProp,最终得到 previous
previous = l.FProp(theta[name], *previous)
previous = _ToTuple(previous)
def _StackAndSplit(x): # 把张量分割成
# Split tensors into microbatches.
if x is None:
return None
# tf.split按照行或者列分割一个矩阵
return tf.stack(tf.split(x, p.num_micro_batches, axis=p.batch_dim))
# 对于 previous 继续操作,构建出 inputs,即利用_StackAndSplit分割张量
if p.nested_map_fprop: # 嵌套情况,只选取previous[0]做处理
inputs = py_utils.Transform(_StackAndSplit, previous[0]) #利用_StackAndSplit分割张量
inputs = inputs.Filter(lambda x: x is not None)
else: # 非嵌套
inputs = py_utils.NestedMap()
for output_idx, output_tensor in enumerate(previous): # 遍历第一层的输出
output_tensor = _StackAndSplit(output_tensor) # 利用_StackAndSplit分割张量
if output_tensor is not None:
name = 's{}'.format(output_idx)
inputs[name] = output_tensor
gs_tensor = py_utils.GetGlobalStep()
# 为流水线后续设备设置其输入
inputs[_MICRO_BATCH_STATE_NAME] = tf.stack([
tf.cast(gs_tensor * p.num_micro_batches + t, dtype=state_dtype)
for t in range(p.num_micro_batches)
])
tf.logging.info('pipeline input = {}'.format(inputs))
output_state, _ = recurrent.StackedRecurrent(
devices=devices,
cell_fns=cell_fns,
cell_grads=cell_grads,
cell_outs=cell_outs,
cell_out_grads=cell_out_grads,
thetas=thetas,
init_states=init_states,
inputs=inputs,
accumulator_layers=accumulator_layers,
unused_acc_state=True)
with tf.device(devices[-1]):
def _ReshapeRetVal(name, t_shape): # 把micro_batches的形状聚合,得到最终输出
"""Restore shape for tensors in microbatches."""
if t_shape is None:
return None
output_tensor = output_state[name]
if p.batch_dim != 0:
perm = list(range(1, p.batch_dim + 1)) + [0]
perm += list(range(p.batch_dim + 1, t_shape.rank + 1))
output_tensor = tf.transpose(output_tensor, perm=perm)
output_shape = t_shape.ToTensorShape().as_list()
output_shape[p.batch_dim] *= p.num_micro_batches
output_tensor = tf.reshape(output_tensor, output_shape)
return output_tensor
# Construct the final return values from output_state.
if p.nested_map_fprop: # 如果嵌套,则返回最后一个形状
# pylint: disable=protected-access
output_tensors = state_shapes[-1]._RecursiveMap(_ReshapeRetVal) # 聚合形状
# pylint: enable=protected-access
else:
output_tensors = []
# 遍历输出,聚合各个输出的形状
for output_idx, state_shape in enumerate(state_shapes[-1]):
output_name = 's{}'.format(output_idx)
output_tensor = _ReshapeRetVal(output_name, state_shape) # 聚合形状
output_tensors.append(output_tensor)
if len(output_tensors) == 1:
output_tensors = output_tensors[0]
else:
output_tensors = tuple(output_tensors)
tf.logging.info('pipeline output = {}'.format(output_tensors))
return output_tensors
具体代码如下:
class PipeliningLayer(SeqLayer):
"""Pipelining a sequence of layers on multiple devices."""
@classmethod
def Params(cls):
p = super().Params()
p.Define('num_micro_batches', 1, 'Number of micro batches.')
p.Define('micro_batch_size', None, 'Size of a micro batch.')
p.Define('batch_dim', 0, 'The batch dimension.')
p.Define('state_dtype', None, 'Externally specify dtype for states.')
p.Define(
'nested_map_fprop', False, 'Whether arguments and returns of '
'cell fprop functions are nested maps')
return p
具体FProp函数逻辑如下图:
+--------------------------------------------------------------+
| FProp _CalculateOutputShapes |
| + |
| | |
| | |
| v |
| state_shapes |
| + |
| | |
| | |
| | |
| v |
| for cell_idx in range(num_cells): |
| + |
| | |
| | |
| v |
| devices.append(WorkerDeviceInModelSplit(cell_idx)) |
| + |
| | |
| | |
| v |
| with tf.device(devices[0]) |
| + |
| | |
| | |
| v |
| recurrent.StackedRecurrent(cell_outs) |
| + |
| | |
| | |
| v |
| with tf.device(devices[-1]) |
| + |
| | |
| | |
| v |
| output_tensors |
| |
+--------------------------------------------------------------+
Device流水线逻辑如下:
devices[0]
+
|
|
|
v
+----------------------+-------------------------+
|Pipeline |
| devices[1] |
| + |
| | |
| | |
| v |
| cell_grads[1~n] devices[2] |
| + |
| cell_outs[1~n] | |
| | |
| cell_out_grads[1~n] v |
| devices[3] |
| + |
| | |
| | |
| v |
| devices[4] |
| |
+----------------------+-------------------------+
|
|
|
v
devices[-1]
源码中给出的例子是 GPipeBatchMajorTransformerStack,目前看来,继承PipeliningLayer即可。
class GPipeBatchMajorTransformerStack(PipeliningLayer):
"""Stacked self- multi-head attention and fully connected layers.
With optional layer normalization applied to the final output.
See 'Attention Is All You Need' https://arxiv.org/abs/1706.03762
for details.
Implements a gipe stack for the batch major transformer variant.
"""
GPipeBatchMajorTransformerStack 的 FProp 返回一个输出张量的列表,其中下面代码调用了PipeliningLayer的功能。
logits = super().FProp(theta, source_input, source_paddings, target_input,
target_paddings, encoder_self_atten_segment_mask,
decoder_self_atten_segment_mask,
decoder_cross_atten_segment_mask, source_segment_pos,
target_segment_pos)
具体代码如下:
def FProp(self,
theta,
source_input,
source_paddings,
target_input=None,
target_paddings=None,
source_segment_id=None,
target_segment_id=None,
labels=None,
label_weights=None,
source_segment_pos=None,
target_segment_pos=None):
p = self.params
if p.num_decoder_layers > 0:
assert target_input is not None
assert target_paddings is not None
target_time = tf.shape(target_input)[1]
batch = tf.shape(target_input)[0]
encoder_self_atten_segment_mask = None
decoder_self_atten_segment_mask = None
decoder_cross_atten_segment_mask = None
# Prepare segment masks from segment ids.
if p.packed_input:
dtype = py_utils.FPropDtype(p)
assert source_segment_id is not None, (
'Need to specify src_segment_id if packed input is supported.')
assert source_segment_pos is not None, (
'Need to specify src_segment_pos for packed input and embeddings.')
encoder_self_atten_segment_mask = batch_major_attention.SegmentMask(
source_segment_id, source_segment_id, dtype, False)
if target_segment_id is not None:
decoder_self_atten_segment_mask = batch_major_attention.SegmentMask(
target_segment_id, target_segment_id, dtype, False)
causal_padding = tf.expand_dims(
tf.tile(
tf.expand_dims(
batch_major_attention.CausalPadding(
target_time, dtype=dtype), 0), [batch, 1, 1]), 1)
decoder_self_atten_segment_mask = tf.math.maximum(
causal_padding, decoder_self_atten_segment_mask)
decoder_cross_atten_segment_mask = batch_major_attention.SegmentMask(
target_segment_id, source_segment_id, dtype, False)
# FProp through the gpipe pipeline.
# 这里调用了基类的PipeliningLayer,完成流水线操作。
logits = super().FProp(theta, source_input, source_paddings, target_input,
target_paddings, encoder_self_atten_segment_mask,
decoder_self_atten_segment_mask,
decoder_cross_atten_segment_mask, source_segment_pos,
target_segment_pos)
label_weights = tf.reshape(label_weights, [-1])
target_probs = None
if p.label_smoothing:
target_probs = self.smoother.FProp(
theta.smoother, target_paddings, labels, target_ids=None)
target_probs = tf.reshape(target_probs, [-1, p.softmax_tpl.num_classes])
reshaped_logits = tf.reshape(logits, [-1, p.softmax_tpl.num_classes])
tgt_labels = tf.reshape(labels, [-1])
num_splits = len(p.splits)
softmax = self.children['cell_{}'.format(num_splits - 1)].softmax
softmax_theta = theta['cell_{}'.format(num_splits - 1)].softmax
per_example_xent, _ = softmax.XentLossFromLogits(
softmax_theta,
reshaped_logits,
class_weights=tf.reshape(label_weights, [-1]),
class_ids=tgt_labels,
class_probabilities=target_probs)
xent_shape = tf.shape(logits)[:2]
per_example_xent = tf.reshape(per_example_xent, xent_shape)
return per_example_xent, logits
DeepSpeed: Extreme-scale model training for everyone
[译] DeepSpeed:所有人都能用的超大规模模型训练工具
GPT-3模型为何难以复现?这也许是分布式AI框架的最优设计
深度学习中parameters个数和FLOPS计算(以CNN中经典的AlexNet网络结构为例)
论文解读系列第十三篇:ZeRO——面向万亿级参数的模型训练方法
Tensorflow: Model parallelism 模型并行计算
https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html
https://arxiv.org/pdf/1802.09941.pdf
https://www.microsoft.com/en-us/research/uploads/prod/2019/08/fiddle_pipedream_sosp19.pdf
论文解读系列第五篇:微软斯坦福等PipeDream快速训练大规模神经网络
微软提出 DNN 并行训练新方法 PipeDream,比传统方法快四倍
训练时显存优化技术——OP合并与gradient checkpoint
TF-Replicator, GPipe, Mesh-Tensorflow 三个库对比
深度神经网络训练中的数据并行(Data Parallelism)总结
https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html
https://github.com/pytorch/tutorials/blob/master/intermediate_source/model_parallel_tutorial.py
Model.zero_grad() or optimizer.zero_grad()?
A trick to use bigger batches for training: gradient accumulation
Training Neural Nets on Larger Batches: Practical Tips for 1-GPU, Multi-GPU & Distributed setups
手机扫一扫
移动阅读更方便
你可能感兴趣的文章