[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush
阅读原文时间:2022年02月15日阅读:1

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

目录

NVIDIA Megatron 是一个基于 PyTorch 的分布式训练框架,用来训练超大Transformer语言模型,其通过综合应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得我们深入分析其背后机理。本系列有 5 篇文章,通过论文和源码和大家一起学习研究。本文将看看 Megatron 如何给流水线各个阶段安排执行执行序列。

本系列其他文章为:

[源码解析] 模型并行分布式训练Megatron (1) --- 论文 & 基础

[源码解析] 模型并行分布式训练Megatron (2) --- 整体架构

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

在流水线训练之中,如何给流水线各个阶段安排执行执行序列是一个关键,所以这里我们看看如何做schedule。

对于 Megatron 来说,在训练时候,get_forward_backward_func 获取pipeline 的schedule,这里分为 flush 和 interleaving 两种, 因为时间所限,我们只分析 flush 的schedule,有兴趣的读者可以自行研究 interleaving。

def get_forward_backward_func():
    args = get_args()
    if mpu.get_pipeline_model_parallel_world_size() > 1:
        if args.virtual_pipeline_model_parallel_size is not None:
            forward_backward_func = forward_backward_pipelining_with_interleaving
        else:
            forward_backward_func = forward_backward_pipelining_without_interleaving
    else:
        forward_backward_func = forward_backward_no_pipelining
    return forward_backward_func

概括来说,Megatron 是基于 PipeDream-2BW 之上实现了定期刷新。

  • PipeDream-2BW 在流水线之中维护了两个版本的模型权重,“2BW” 是 双缓冲权重(double-buffered weights)”,PipeDream-2BW 会为每个微批次生成一个新的模型版本K(K>d),但是因为有些剩余后向传递仍然依赖于旧版本模型,所以新的模型版本无法立即取代旧版本,但是由于只保存了两个版本,所以极大降低了内存占用。
  • PipeDream-flush 则在 PipeDream-2BW 之上添加了一个全局同步的流水线更新刷新操作,思路类似 GPipe。这种方法通过牺牲吞吐量的能力部分下降的代价来减少了内存占用(即只维护一个版本的模型权重)。

Memory-Efficient Pipeline-Parallel DNN TrainingEfficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM 是 Megatron 对应的相关论文,我们就从论文开始研究。注:下面论述内容是基于原论文发表时间,因为各个开源系统也在演进,所以其针对其他开源系统的论述在今天看来不一定完全正确。

2.1 引论

近来,一些工作提出了流水线模型并行以加速模型并行训练。例如 GPipe(Huang等人,2019年)和PipeDream(Harlap等人 2018年;Narayanan等人,2019年)把多个输入顺序推送到一系列worker之中来训练,每个worker负责一个模型分区,这允许不同worker并行处理不同的输入。

  • 由于特定输入的向前和向后传播之间的权重版本不一致,Native 流水线可能会造成模型不收敛,现有技术权衡了内存占用和吞吐量,以不同的方式来避免这种情况。
  • GPipe维护单一权重版本,但是会定期进行流水线刷新(图1a),具体刷新时间是在流水线训练完输入要更新权重时候,由于资源空闲,这些刷新限制了总体吞吐量。
  • PipeDream不会定期Flush流水线,但会存储多个权重版本,这增加了吞吐量,但也增加了内存占用,由于内存限制,无法训练大型模型。

所以,有效地训练大型模型需要一种同时具有高吞吐量和低内存占用的方法。此外,流水线并行系统的性能取决于DNN模型operators在 worker 上的划分方式。这具有挑战性,原因有三:

  • 内存容量限制:与模型分区相关的参数和中间激活需要能够放置在加速器的主设备内存之中。
  • 异构网络互连:如今的训练部署具有异构网络拓扑性,同一服务器上的设备之间具有更高的带宽链路。
  • 运算符如何放置的大搜索空间:随着模型尺寸的增加,拆分运算符图在计算上变得非常昂贵,因为不同分区方式数量是指数级的。

图1a。不同流水线并行执行的timeline。后向传播的时间假定为向前传播的两倍;向前传播以蓝色显示,后向传播以绿色显示。数字表示微批次ID,时间沿x轴显示,每个worker的利用率沿y轴显示。GPipe维护单一权重版本,但会定期刷新flush流水线。PipeDream不引入周期性流水线刷新,但维护多个权重版本。

在论文中,作者介绍了PipeDream-2BW,一个高效的DNN模型流水线并行训练系统。PipeDream-2BW通过两个关键贡献实现了高吞吐和低内存占用。首先,作者提出了双缓冲权重更新(2BW),这是一种在避免流水线刷新的同时减少训练内存占用的技术。

作者利用了这样一个事实,即每个输入生成的梯度不需要立即应用于权重,而是可以累积为“合并(coalesced)”梯度,以限制保留的权重版本的数量。2BW没有在使用最近更新的权重之前刷新流水线,而是将新权重用于新进入流水线的输入,同时将以前的权重版本(称为阴影版本)用于已在训练中的输入(in-flight inputs)。

每个 worker 的权重双缓冲产生了一种流水线方案,其吞吐量高于GPipe(无流水线刷新),内存效率高于PipeDream(这里是2个权重版本,而在PipeDream 的一个depth-d流水线中,最差的情况是d个权重版本)。

作者还介绍了2BW的一种变体(称为PipeDream Flush),它在吞吐量上进行权衡,以获得更低的内存占用和更高的性能。

2.2 背景

在本节中,作者简要概述DNN模型分布式训练的相关技术。

  • 数据并行

    • 数据并行用于扩展模型训练。利用数据并行性(Xing等人,2015),每个worker都有整个模型的副本,输入数据集在worker之间分片。Worker 定期聚合他们的梯度,以确保所有 Worker 都看到一致版本的权重。数据并行性不能训练没法放入单个worker的大型模型,但可以用于较小的模型分区(model partitions)。
    • 数据并行scale-out通常工作良好,但存在两个限制:a)超出某一点,每个GPU的batch size变得太小,降低了GPU利用率并增加了通信成本;b)可以使用的最大设备数量是batch size,这限制了可用于训练的加速器数量。于是人们提出了各种模型并行技术来解决这两个挑战。
  • 模型并行。对于不适合单个worker的大型模型,一般来说使用模型并行训练。

    • 利用模型并行性(Dean等人,2012年;Chilimbi等人,2014年),模型中的权重参数在可用worker上进行分割(每个transformer层内的矩阵乘法在多个GPU上分割),worker之间交流中间激活和梯度。层间模型并行性未充分利用资源,因为在任何时间点最多只有一个工作进程处于活动状态。
    • Tensor(intra-layer)模型并行性(Shoeybi et al.,2019)容易导致关键路径中all-to-all通信过于昂贵。因为 tensor 并行所需的all-reduce通信需要通过服务器间链路,这比多GPU服务器中可用的高带宽NVLink要慢,于是容易将模型分区的数量限制为单个服务器中的GPU数量。而且高度的模型并行会创建大量的小矩阵乘法 (GEMMs),这可能会降低GPU的利用率。
    • FlexFlow(Jia et al.,2018)展示了如何使用模型和数据并行性拆分模型图,但在使用模型并行性时仍然存在资源利用率低的问题。
  • 流水线并行。为了解决模型并行性的缺点,最近的工作如PipeDream和GPipe提出了流水线并行性。

    • 通过流水线并行,将多个输入(而不是1个)注入到由层间(inter-layer)模型分区组成的训练中。这确保了计算资源得到更好的利用。
    • 一个批(batch)被分割成更小的微批(microbatches),并在这些微批之间以流水线方式执行。可以用各种方式将层分配给worker,并且输入的向前和向后传播可以使用各种不同计划。
    • 然而,简单的流水线可能会导致特定输入的前后传递之间的权重版本不匹配。具体来说,如果立刻用最新的权重版本来进行权重更新,那么在流水线之中,一个输入可能会看到的是向后传播更新的权重,而不是它在向前传播时候看到的权重,从而导致不正确的梯度计算。
    • 层分配和调度策略导致不同的性能权衡。不管计划如何,为了保持严格的优化器语义,优化器步骤需要跨设备同步,从而在每个批处理结束时进行流水线刷新,允许微批处理完成执行(此时不加入新的微批)用来Flush流水线的时间最高可以达到50%,这取决于注入流水线的微批次数量。微批次数量与流水线尺寸的比率越大,流水线Flush所花费的时间越短。因此,为了实现高效率,通常需要更大的batch size。

用户可以使用各种技术训练他们的大型模型,每种技术都有不同的权衡。此外,这些技术可以结合使用。然而,结合这些技术会产生复杂(non-trivial)的交互,为了获得良好的性能,需要仔细地进行推理,才能做到在保持严格的优化器语义的同时最大化给定batch size的大型模型训练吞吐量。

要实现大规模的吞吐量,需要沿着多个轴进行创新和精心设计:高效的内核实现使大部分训练受计算限制,而不是内存限制;应该在设备上对计算图进行智能分区,以减少通过网络链路发送的字节数,同时限制设备空闲时间;使用特定领域的通信优化和快速硬件(最先进的GPU以及相同和不同服务器上GPU之间的高带宽链路)。

此外,论文作者还研究了影响吞吐量的各种成分之间的相互作用,包括经验和分析。基于这些研究,论文作者就如何配置分布式培训提供以下指导原则:

  • 不同形式的并行以复杂的方式相互作用:并行化策略影响通信量、执行内核的计算效率,以及 Worker 因流水线刷新(流水线气泡)而等待计算的空闲时间。例如,张量和流水线模型并行性的次优组合可以导致高达2×更低的吞吐量,即使服务器之间的网络链路带宽较高;张量模型并行性在多GPU服务器中是有效的,但流水线模型并行性必须用于更大的模型。

  • 用于流水线并行性的计划会影响通信量、流水线气泡大小以及用于存储激活的内存。

    • 超参数的值(如微批次大小)会影响内存占用、在辅助进程上执行的内核的算术效率以及流水线气泡大小。微批次大小的最佳值取决于具体问题,一个合适取值可以将吞吐量提高15%。
    • 分布式培训是通信密集型的。如果节点间互连较慢或更多的通信密集型分区将阻碍性能的扩展。
    • 论文没有研究如何自动探索并行策略的搜索空间(如FlexFlow、PipeDream、Tarnawski 和DAPPLE),而是建议使用那些在实践中效果良好的启发式方法。

2.3 流水线权重问题

我们这里回顾一下流水线权重问题。下图是朴素流水线执行情况,本质上是一种 async SGD。

2.3.1 问题1

遇到的第一个问题是:在一般情况下,当计算第二个迭代时候,我们需要基于第一个迭代更新之后的模型来计算。但是如下图所示,对于机器 1,当第二轮迭代开始时候(红色圆圈的深蓝色2号),第一轮迭代的反向传播(浅绿色1号格)还没有开始。

2.3.2 问题2

第二个问题是:对于机器2,当它进行第5个mini-batch的前向传播时候(第二行蓝色5),它基于更新两次的权重来进行前向计算(第二行蓝色5之前有两个绿色格子,意味着权重被更新了两次)。

但进行第5个mini-batch的反向传播(第二行浅绿色5)时候,用到的权重是更新了4次的(第二行前面浅绿色的1,2,3,4一共会更新权重4次)。这与单节点深度学习假设冲突,会导致训练效果下降。

PipeDream 作者为了解决这些问题,提出了 Weight Stashing,以确保相同输入的向前和后向传播中使用相同的权重版本(原论文图1b)。具体就是每个机器多备份几个版本的权重,前向传播用哪个权重计算,反向传播还用这个权重计算。

就上图来说,机器1需要保存4个版本的权重,机器2需要保存3个版本的权重,机器3需要保存2个版本的权重,机器4需要保存1个版本的权重。在最坏的情况下,储存的权重版本总数是d,其中d是流水线深度,这对于大型模型来说内存占用太高了。使用PipeDream的默认权重更新语义,每个阶段的权重更新都有不同的延迟项,并且不会在流水线内执行累积。

2.3.3 问题3

另外一个问题是:现在做前向传播时候,每个机器计算时候,其基于的权重被更新的不同次数,比如第5个mini-batch(深蓝色的5),在机器 1 计算 5 时候,基于的权重是更新一次的(其前面有一个绿色),但是机器 2 计算 5 时候,基于的权重是更新两次的(其前面有两个绿色)。

解决思路是:每次前向传播时候,每个机器基于更新最少的权重来计算,比如对于机器2,就忽略绿色2更新的权重,对于机器3,就忽略绿色2,3两次更新之后的权重,它们都使用被绿色1更新一次之后的权重(图上矩形框黄色 1 )。

2.4 PipeDream-2BW 系统设计

PipeDream-2BW使用内存高效的流水线并行性来训练不适合单个加速器的大型模型。它的双缓冲权重更新(2BW)和刷新机制确保了高吞吐量、低内存占用和类似于数据并行的权重更新语义。PipeDream-2BW将模型拆分为多个Worker上的多个阶段,并对每个阶段进行相同次数的复制(在同一阶段的副本之间进行数据并行更新)。这种平行流水线适用于每层重复固定次数的模型(例如transformer模型)。

2.4.1 GPipe

GPipe维护模型权重的单一版本。输入批次被分成更小的微批次。权重梯度是累积的,不会立即应用,并且定期flush 流水线,以确保不需要保持多个权重版本。GPipe提供了类似于数据并行的权重更新语义。原论文图1a显示了GPipe执行的时间线。周期性流水线Flush可能会很昂贵,从而限制吞吐量。缓解这一开销的一种方法是在流水线内进行额外的累积,但这并不总是切实可行的:a)在large scale factors下,能支持的最小batch size较大(与scale factor成比例),且大批量会影响所有模型的收敛性,b)GPipe需要保持与批大小成比例的激活存储。

2.4.2 Double-Buffered Weight Updates (2BW)

PipeDream-2BW结合1F1B调度(Narayanan等人,2019年)使用了一种新颖的双缓冲权重更新(2BW)方案,其中每个 worker 在不同输入的向前和向后传递之间交替,以确保在特定输入的向前和向后传递中使用相同的权重版本(论文原图2)。2BW的内存占用比PipeDream和GPipe低,并且避免了GPipe昂贵的流水线刷新。

梯度是以较小的mi-crobatches粒度计算的。对于任何输入微批次,PipeDream-2BW对输入的向前和向后传播使用相同的权重版本。在以批的粒度应用更新之前,会在多个微批次上累积更新,从而限制生成和维护的权重版本的数量。图2显示了2BW的时间线示例。

PipeDream-2BW 为每m个微批次生成一个新的权重版本(m≥ d, d是流水线深度)。为了简单起见,作者首先假设m=d(图2中的d=4)。新权重版本不能立即使用。特别是,进行中的输入(in-flight)不能使用最新的权重版本进行向后传播(例如,在t=21时, worker 3上的输入7),因为这些输入的向前传递已在不同阶段使用较旧的权重版本启动。

因此,新生成的权重版本需要缓冲以备将来使用。但是,需要维护的权重版本总数最多为2,因为用于生成新权重版本的权重版本可以立即丢弃(通过该阶段的未来输入不再使用旧权重版本)。例如,在图2中,每个 worker 在处理完输入8 的 backward pass后都可以丢弃W(0),因为所有后续输入的前向传递和后向传递都使用更高的权重版本。

给定输入微批次k(基于1开始的索引)使用的权重版本为 \(max(⌊(k − 1)/m⌋ − 1, 0)\),其中m是批次中的微批次数(图2中的4)。对于输入k的向前和向后传播,此权重版本相同。m可以是任何 ≥ d 的数字,额外的梯度累积(较大的m)会增加全局 batch size。

论文原图2。时间轴显示PipeDream-2BW的双缓冲权重更新 (2BW) 方案,时间轴沿x轴进行。在不丧失通用性的情况下,假设向后传播的时间是向前传播的两倍。PipeDream-2BW在每个worker上只存储两个权重版本,减少了总内存占用,同时不再需要昂贵的流水线暂停。\(W_i^{(v)}\)表示worker i上的具有版本v的权重(包含从输入v生成的权重梯度)。在方格绿色框中会生成新的权重版本; \(W_4^{(4)}\)首先用在输入9的向前传播之中。

上图中的 Before 意思是丢弃版本之前系统的两个权重buffer,After 意思是做了丢弃动作之后系统的两个权重buffer。

2.4.2. Weight Updates with Flushes (PipeDream-Flush)

作者还提出了第二个内存高效的流水线计划,称为PipeDream Flush。它的内存占用比2BW和vanilla优化器语义更低,但以较低的吞吐量为代价。该计划重用PipeDream(Narayanan等人,2019年)的1F1B计划,但保持单一权重版本,并引入定期流水线刷新,以确保权重更新期间的一致权重版本。具有两个流水线阶段的PipeDream-Flush和GPipe的时间表如图3所示。

为何要选择 1F1B?因为它将n-flight microbatches 数量缩减到流水线深度 d,而不是GPipe的微批次数目 m,所以 1F1B 是memory-efficient。为了降低bubble time,一般来说,m >> d。

内存占用。使用PipeDream Flush,in-flight “活动”输入激活的总数小于或等于流水线深度,这使其内存占用比GPipe低,GPipe必须保持输入激活与梯度平均(m)的微批次数量成比例。PipeDream Flush的内存占用也低于PipeDream-2BW,因为它只需要维护一个权重版本(而PipeDream-2BW只需要2个)。

Semantics。定期流水线刷新确保可以使用最新权重版本计算的梯度执行权重更新。这将使权重更新用如下方式进行:\(W^{(t+1)} = W^{(t)} − ν · ∇f(W^{(t)})\)。

论文原图3。GPipe和PipeDream的时间表将分为两个阶段。GPipe和PipeDream Flush都使用管道刷新;PipeDream-Flush在稳定状态下交替进行向前和向后传播,通过限制仅保留进行中(in-flight)微批次的激活来保持较低的内存占用。

我们前面提到,当没有设置 virtual_pipeline_model_parallel_size 时候,就是使用 Flush 方法得到流水线schedule,具体函数是 forward_backward_pipelining_without_interleaving。

def get_forward_backward_func():
    args = get_args()
    if mpu.get_pipeline_model_parallel_world_size() > 1:
        if args.virtual_pipeline_model_parallel_size is not None:
            forward_backward_func = forward_backward_pipelining_with_interleaving
        else:
            # Flush schedule
            forward_backward_func = forward_backward_pipelining_without_interleaving
    else:
        forward_backward_func = forward_backward_no_pipelining
    return forward_backward_func

为何要选择 1F1B?论文作者提到,因为它将in-flight microbatches 数量缩减到流水线深度 d,而不是GPipe的微批次数目 m,所以 1F1B 是memory-efficient。为了降低bubble time,一般来说,m >> d。

3.1 总体思路

3.1.1 缺省计划

GPipe提出了一个执行计划,其中首先执行一个批次中所有微批次的正向传播,然后执行所有微批次的反向传播(如图3所示)。我们可以量化GPipe流水线气泡的大小( )。我们将批次中的微批次数量表示为,流水线阶段的数量(用于流水线并行的设备数量)为,每次迭代的理想时间为 (假设完美或理想的缩放),以及执行单个微批次前进和后退通道的时间 和。

在此计划中,流水线气泡包含:

  • 在批次开始时的 − 1 个前向传播。
  • 在批次结束时候的 − 1 个向后传播。

在流水线中花费的总时间 = (−1)·( +),于是此任务的处理时间为 =·( +)。因此,在流水线气泡中花费的计算时间的理想占比(fraction)为:

\[Bubble\ time\ fraction (pipeline\ bubble\ size) = \frac{t_{pb}}{t_{id}} = \frac{p-1}{m}
\]

图3 : GPipe流水线计划,所有微批次(以数字表示)均为前向传播(蓝色),然后为后向传播(绿色)。灰色区域表示流水线气泡。为简单起见,我们假设前向传播的时间是后向传播的两倍。流水线计划的效率不取决于此时间因素。本例中的每个批次由8个微批次组成,每个蓝色或绿色框中的数字是给相应微批次的唯一标识符(比如,第一批由1− 8个微批次组成,第二批由微批次9− 16组成等)。优化器在流水线刷新时进行步进(step)并更新权重参数,以确保严格的优化器语义。

为了使气泡时间占比(fraction)很小,我们需要 ≫ 。但是对于这么大的, 这种方法具有很高的内存占用,因为它需要将中间激活(或在使用激活重新编译时仅为每个流水线阶段输入激活)保存在内存中,以供所有 个微批次在训练迭代的整个生命周期中都使用到。

3.1.2 PipeDream计划

PipeDream-Flush 把一个迭代分成三个阶段:

  • 预热前向传播阶段(warmup forward passes):在这里,除了最后一个stage,每个worker 会做前向计算,进行不同数目的前向传播,并且向其下游发送激活,一直到最后一个stage被激发。该计划将执行中的(in-flight)微批次数量(未完成反向传播且需要保持激活的微批次数量)限制在流水线深度之内,而不是一个批次中的微批次数量。

  • 稳定 1F1B 阶段(Run 1F1B in steady state):进入稳定状态之后,每个 worker 都进行1F1B 操作。

  • 冷却反向传播阶段(Cooldown backward passes):此阶段会把执行中的(in-flight)的微批次执行完毕,只是执行反向计算和向反向计算下游发送梯度。

这个新计划在气泡中花费的时间与GPipe是相同的,但是未完成的向前传播的数量最多和流水线阶段的数量一样。因此,该计划要求将激活减少到 或更少的微批次(GPipe计划则是 m 个微批次)。因此,当 ≫ 的时候, PipeDream-Flush 的内存效率比GPipe高得多。

我们首先给出具体代码如下,后续会逐步分析。

def forward_backward_pipelining_without_interleaving(forward_step_func, data_iterator,
                                                     model, optimizer, timers,
                                                     forward_only):
    """Run non-interleaved 1F1B schedule, with communication between pipeline
    stages.

    Returns dictionary with losses if the last stage, empty dict otherwise."""
    timers = get_timers()

    assert len(model) == 1
    model = model[0]

    # Compute number of warmup microbatches.
    num_microbatches = get_num_microbatches()
    num_warmup_microbatches = \
        (mpu.get_pipeline_model_parallel_world_size() -
         mpu.get_pipeline_model_parallel_rank() - 1)
    num_warmup_microbatches = min(
        num_warmup_microbatches,
        num_microbatches)
    num_microbatches_remaining = \
        num_microbatches - num_warmup_microbatches

    unwrapped_model = unwrap_model(
        model, (torchDDP, LocalDDP, Float16Module))
    model_type = unwrapped_model.model_type
    rank = mpu.get_pipeline_model_parallel_rank()
    recv_tensor_shapes = get_tensor_shapes(rank-1, model_type)
    send_tensor_shapes = get_tensor_shapes(rank, model_type)

    # Input, output tensors only need to be saved when doing backward passes
    input_tensors = None
    output_tensors = None
    if not forward_only:
        input_tensors = []
        output_tensors = []
    losses_reduced = []

    # Run warmup forward passes.
    for i in range(num_warmup_microbatches):
        input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
        output_tensor = forward_step(forward_step_func, data_iterator, model,
                                     input_tensor, losses_reduced)
        send_forward(output_tensor, send_tensor_shapes, timers=timers)

        if not forward_only:
            input_tensors.append(input_tensor)
            output_tensors.append(output_tensor)

    # Before running 1F1B, need to receive first forward tensor.
    # If all microbatches are run in warmup / cooldown phase, then no need to
    # receive this tensor here.
    if num_microbatches_remaining > 0:
        input_tensor = recv_forward(recv_tensor_shapes, timers=timers)

    # Run 1F1B in steady state.
    for i in range(num_microbatches_remaining):
        last_iteration = (i == (num_microbatches_remaining - 1))

        output_tensor = forward_step(forward_step_func, data_iterator, model,
                                     input_tensor, losses_reduced)
        if forward_only:
            send_forward(output_tensor, send_tensor_shapes, timers=timers)

            if not last_iteration:
                input_tensor = recv_forward(recv_tensor_shapes, timers=timers)

        else:
            output_tensor_grad = \
                send_forward_recv_backward(output_tensor,
                                           send_tensor_shapes,
                                           timers=timers)

            # Add input_tensor and output_tensor to end of list.
            input_tensors.append(input_tensor)
            output_tensors.append(output_tensor)

            # Pop input_tensor and output_tensor from the start of the list for
            # the backward pass.
            input_tensor = input_tensors.pop(0)
            output_tensor = output_tensors.pop(0)

            input_tensor_grad = \
                backward_step(optimizer, input_tensor, output_tensor,
                              output_tensor_grad)

            if last_iteration:
                input_tensor = None
                send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
            else:
                input_tensor = \
                    send_backward_recv_forward(
                        input_tensor_grad, recv_tensor_shapes, timers=timers)

    # Run cooldown backward passes.
    if not forward_only:
        for i in range(num_warmup_microbatches):
            input_tensor = input_tensors.pop(0)
            output_tensor = output_tensors.pop(0)

            output_tensor_grad = recv_backward(send_tensor_shapes, timers=timers)

            input_tensor_grad = \
                backward_step(optimizer, input_tensor, output_tensor,
                              output_tensor_grad)

            send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)

    return losses_reduced

3.2 启动阶段

这是在每个 worker 之上都会做的,每个worker 的rank 不同,具体逻辑如下:

  • 首先需要确定本worker在热身阶段需要执行的微批次数目,是min((world-size - rank - 1), num_microbatches),因为rank是依次递增,所以热身所需的微批次会逐次递减,直到为0,这样就会直接进入稳定阶段进行计算,比如 world size 为5,rank区间为0~4,微批次数目为4,则从前往后几个stage的热身批次为 5 - 0 - 1 = 4, 5 - 1 - 1 = 3, 5 - 2 - 1 = 2, 5 - 3 - 1 = 1, 5 - 4 - 1 = 0(就直接进入稳定状态)。

  • 其次计算稳定阶段所需要计算的微批次。

  • 当需要进行反向传播时候,需要建立两个FIFO队列,input_tensors 保存来自上游的激活,output_tensors 保存来自下游的激活。

    timers = get_timers()

    assert len(model) == 1
    model = model[0]

    Compute number of warmup microbatches.

    num_microbatches = get_num_microbatches() # 得到微批次数目

    需要确定本worker在热身阶段需要执行的微批次数目,是min((world-size - rank - 1), num_microbatches)

    因为rank是依次递增,所以热身所需的微批次会逐次递减,直到为0,这样就会直接进入稳定阶段进行计算

    比如 world size 为5,rank区间为0~4,微批次数目为4,则从前往后几个stage的热身批次为 5 - 0 - 1, 5 - 1 - 1, 5 - 2 - 1, 5 - 3 - 1, 5 - 4 - 1。

    num_warmup_microbatches = <br /> (mpu.get_pipeline_model_parallel_world_size() -
    mpu.get_pipeline_model_parallel_rank() - 1)
    num_warmup_microbatches = min(
    num_warmup_microbatches,
    num_microbatches)

    计算稳定阶段所需要计算的微批次

    num_microbatches_remaining = <br /> num_microbatches - num_warmup_microbatches

    unwrapped_model = unwrap_model(
    model, (torchDDP, LocalDDP, Float16Module))
    model_type = unwrapped_model.model_type
    rank = mpu.get_pipeline_model_parallel_rank()
    recv_tensor_shapes = get_tensor_shapes(rank-1, model_type)
    send_tensor_shapes = get_tensor_shapes(rank, model_type)

    Input, output tensors only need to be saved when doing backward passes

    当需要进行反向传播时候,需要建立两个队列,input_tensors 保存来自上游的激活,output_tensors 保存来自下游的激活

    input_tensors = None
    output_tensors = None
    if not forward_only:
    input_tensors = []
    output_tensors = []
    losses_reduced = []

3.3 热身阶段

热身阶段会根据本worker在热身阶段需要执行的微批次数目,依次进行处理:

  • 从上游获取输入激活。

  • 本地进行前向计算,上游输入的激活就是本stage的输入。

  • 向下游发送本地激活。

  • 如果需要反向传播,则每个 worker 在 input_tensor 之中保存上游激活,在output_tensor 之中保存发送给下游的激活。

  • 早期阶段会运行尽可能多的向前传播,这样后期阶段可以立即从1F1B开始。

    Run warmup forward passes.

    for i in range(num_warmup_microbatches):
    # 从上游获取输入激活
    input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
    # 本地进行前向计算,上游输入的激活就是本stage的输入
    output_tensor = forward_step(forward_step_func, data_iterator, model,
    input_tensor, losses_reduced)
    # 向下游发送本地激活
    send_forward(output_tensor, send_tensor_shapes, timers=timers)

    if not forward_only:
        input_tensors.append(input_tensor) # 保存上游激活
        output_tensors.append(output_tensor) # 保存本地计算的激活,就是发送给下游的激活

其中,第一个stage因为没有上游,所以recv_forward将会返回None,其他情况下将返回一个上游激活。

def recv_forward(tensor_shapes, timers):
    input_tensors = []
    for tensor_shape in tensor_shapes:
        if tensor_shape is None:
            input_tensors.append(None)
        else:
            input_tensors.append(p2p_communication.recv_forward(tensor_shape,
                                                                timers=timers))
    return input_tensors

3.4 通信模块

3.4.1 基础通信方法

pipeline parallelism需要inter-stage的P2P通信,其主要实现是_communnicate函数,_communicate 函数主要是封装了 PyTorch 的基础通信函数,给流水线并行提供了stage之间的双向P2P通信。在此基础之上,又封装了一些API方法。这个函数的注释写得不错,解释得非常清楚。这里需要注意的是:每个层怎么知道自己在流水线之中上下游 rank 是什么?这是通过例如这样的调用mpu.get_pipeline_model_parallel_next_rank() 来知道的。

_communicate 具体代码如下:

def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
                 tensor_shape,
                 use_ring_exchange=False,
                 dtype_=None):
    """Communicate tensors between stages. Used as helper method in other
    communication methods that are used in megatron/schedules.py.

    Takes the following arguments:
        tensor_send_next: tensor to send to next rank (no tensor sent if
                          set to None).
        tensor_send_prev: tensor to send to prev rank (no tensor sent if
                          set to None).
        recv_prev: boolean for whether tensor should be received from
                   previous rank.
        recv_next: boolean for whether tensor should be received from
                   next rank.
        tensor_shape: shape of tensor to receive (this method assumes that all
                      tensors sent and received in a single function call are
                      the same shape).
        use_ring_exchange: boolean for whether torch.distributed.ring_exchange()
                           API should be used.
        dtype_: optional, this is used when the tensor that needs to be
                communicated is different from args.params_dtype.
    Returns:
        (tensor_recv_prev, tensor_recv_next)
    """
    args = get_args()

    # Create placeholder tensors for receive in forward and backward directions
    # if needed.
    tensor_recv_prev = None
    tensor_recv_next = None

    # Some legacy inference code doesn't set the tensor shape, do so now
    # for the normal values for gpt/bert. This could be removed if inference
    # code is changed to provide tensor_shape.
    if tensor_shape is None:
        tensor_shape = (args.seq_length, args.micro_batch_size, args.hidden_size)

    override_scatter_gather_tensors_in_pipeline = False
    if args.scatter_gather_tensors_in_pipeline:
        tensor_chunk_shape = reduce(operator.mul, tensor_shape, 1)
        if tensor_chunk_shape % mpu.get_tensor_model_parallel_world_size() == 0:
            tensor_chunk_shape = tensor_chunk_shape // \
                mpu.get_tensor_model_parallel_world_size()
        else:
            tensor_chunk_shape = tensor_shape
            override_scatter_gather_tensors_in_pipeline = True
    else:
        tensor_chunk_shape = tensor_shape
    dtype = args.params_dtype
    if args.fp32_residual_connection:
        dtype = torch.float

    requires_grad = True
    if dtype_ is not None:
        dtype = dtype_
        requires_grad = False

    # 如果需要接受张量,则先分配空张量,接受的张量会存在此处
    if recv_prev:
        tensor_recv_prev = torch.empty(tensor_chunk_shape,
                                       requires_grad=requires_grad,
                                       device=torch.cuda.current_device(),
                                       dtype=dtype)
    if recv_next:
        tensor_recv_next = torch.empty(tensor_chunk_shape,
                                       requires_grad=requires_grad,
                                       device=torch.cuda.current_device(),
                                       dtype=dtype)

    # Split tensor into smaller chunks if using scatter-gather optimization.
    if not override_scatter_gather_tensors_in_pipeline and \
            args.scatter_gather_tensors_in_pipeline:
        if tensor_send_next is not None:
            tensor_send_next = mpu.split_tensor_into_1d_equal_chunks(tensor_send_next)

        if tensor_send_prev is not None:
            tensor_send_prev = mpu.split_tensor_into_1d_equal_chunks(tensor_send_prev)

    # Send tensors in both the forward and backward directions as appropriate.
    if use_ring_exchange:
        # 如果需要,则使用ring exchange,这个是新版本PyTorch才有
        torch.distributed.ring_exchange(tensor_send_prev=tensor_send_prev,
                                        tensor_recv_prev=tensor_recv_prev,
                                        tensor_send_next=tensor_send_next,
                                        tensor_recv_next=tensor_recv_next,
                                        group=mpu.get_pipeline_model_parallel_group())
    else:
        # 先根据目标rank生成对应的torch.distributed.P2POp,放入列表
        ops = []
        if tensor_send_prev is not None:
            send_prev_op = torch.distributed.P2POp(
                torch.distributed.isend, tensor_send_prev,
                mpu.get_pipeline_model_parallel_prev_rank())
            ops.append(send_prev_op)
        if tensor_recv_prev is not None:
            recv_prev_op = torch.distributed.P2POp(
                torch.distributed.irecv, tensor_recv_prev,
                mpu.get_pipeline_model_parallel_prev_rank())
            ops.append(recv_prev_op)
        if tensor_send_next is not None:
            send_next_op = torch.distributed.P2POp(
                torch.distributed.isend, tensor_send_next,
                mpu.get_pipeline_model_parallel_next_rank())
            ops.append(send_next_op)
        if tensor_recv_next is not None:
            recv_next_op = torch.distributed.P2POp(
                torch.distributed.irecv, tensor_recv_next,
                mpu.get_pipeline_model_parallel_next_rank())
            ops.append(recv_next_op)

        # 然后做批量异步send/recv
        if len(ops) > 0:
            reqs = torch.distributed.batch_isend_irecv(ops)
            for req in reqs:
                req.wait() # 用wait来同步

    # To protect against race condition when using batch_isend_irecv().
    torch.cuda.synchronize()

    # If using scatter-gather optimization, gather smaller chunks.
    # 特殊优化,21年论文中提到,大概因为做了all-reduce,因此可以先split发送,下游gather成统一数据
    # 有兴趣读者可以深入研究论文和代码,
    if not override_scatter_gather_tensors_in_pipeline and \
            args.scatter_gather_tensors_in_pipeline:
        if recv_prev:
            tensor_recv_prev = mpu.gather_split_1d_tensor(
                tensor_recv_prev).view(tensor_shape).requires_grad_()

        if recv_next:
            tensor_recv_next = mpu.gather_split_1d_tensor(
                tensor_recv_next).view(tensor_shape).requires_grad_()

    return tensor_recv_prev, tensor_recv_next

3.4.2 API

在 _communicate 的基础之上,封装了众多API函数,主要就是依据参数的不同来做不同处理,比如:

def send_backward_recv_forward(input_tensor_grad, tensor_shape=None, timers=None):
    """Batched send and recv with previous rank in pipeline."""
    if mpu.is_pipeline_first_stage():
        input_tensor = None
    else:
        input_tensor, _ = _communicate(
            tensor_send_next=None,
            tensor_send_prev=input_tensor_grad,
            recv_prev=True,
            recv_next=False,
            tensor_shape=tensor_shape)
    return input_tensor

3.4.3 流水线上下游

以下若干函数用来确定流水线上下游,结合前一篇文章我们知道,假如本进程是 rank 2,则流水线进程组 ranks 是 [g2, g6, g10, g14],那么其下游就是 rank 6。

def get_pipeline_model_parallel_first_rank():
    return _PIPELINE_GLOBAL_RANKS[0]

def get_pipeline_model_parallel_last_rank():
    last_rank_local = get_pipeline_model_parallel_world_size() - 1
    return _PIPELINE_GLOBAL_RANKS[last_rank_local]

def get_pipeline_model_parallel_next_rank():
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
    return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline + 1) % world_size]

def get_pipeline_model_parallel_prev_rank():
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
    return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline - 1) % world_size]

3.5 稳定阶段

稳定阶段的总体逻辑如下:前向计算 -> 发送激活给前向计算下游 & 从下游接受梯度 -> 后向计算 -> 给上游发送本worker计算的梯度 & 从上游接受激活。

3.5.1 逻辑

稳定阶段具体逻辑如下:

  1. forward_step :拿到一个微批次(上游激活),进行本地前向计算。

  2. send_forward:

  3. 每个 worker 在 input_tensor 之中保存上游激活,在output_tensor 之中保存发送给下游的激活。

  4. backward_step : 本地后向计算。

  5. send_backward:

  6. 跳回1继续处理下一个微批次(上游激活)。

    Before running 1F1B, need to receive first forward tensor.

    If all microbatches are run in warmup / cooldown phase, then no need to

    receive this tensor here.

    if num_microbatches_remaining > 0:
    # 需要在稳定状态下运行,所以得拿到前面层的激活值
    input_tensor = recv_forward(recv_tensor_shapes, timers=timers)

    Run 1F1B in steady state.

    for i in range(num_microbatches_remaining):
    last_iteration = (i == (num_microbatches_remaining - 1))

    # 前向计算
    output_tensor = forward_step(forward_step_func, data_iterator, model,
                                 input_tensor, losses_reduced)
    if forward_only:
        send_forward(output_tensor, send_tensor_shapes, timers=timers)
    if not last_iteration:
        input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
    else: # 发送中间激活给下游,并且从下游获取其反向梯度 output_tensor_grad = \ send_forward_recv_backward(output_tensor, send_tensor_shapes, timers=timers)
    # Add input_tensor and output_tensor to end of list.
    input_tensors.append(input_tensor) # 保存上游激活到队列
    output_tensors.append(output_tensor) # 保存本地计算的激活,就是发送给下游的激活到队列
    
    # Pop input_tensor and output_tensor from the start of the list for
    # the backward pass.
    input_tensor = input_tensors.pop(0) # 从队列中弹出第一个未处理的(就是最早未处理的)上游激活
    output_tensor = output_tensors.pop(0) # 从队列弹出对应的本地激活
    
    # 反向计算,利用(上游激活,本地激活,下游梯度)来对最早的未处理的微批次进行反向计算,得到本地梯度
    input_tensor_grad = \
        backward_step(optimizer, input_tensor, output_tensor,
                      output_tensor_grad) # 下游传来的梯度在这里
    
    if last_iteration:
        input_tensor = None
        # 如果是最后一个微批次,把本地梯度 input_tensor_grad 传递给前向计算的上游
        send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
    else:
        # 如果不是最后一个微批次,把本地梯度 input_tensor_grad 传递给前向计算的上游,还需要从上游再获取一个激活值
        input_tensor = \
            send_backward_recv_forward(
                input_tensor_grad, recv_tensor_shapes, timers=timers)</code></pre></li>

3.5.2 串行

其中,send_forward_recv_backward 这个从名字就能看到逻辑,这个函数先发送给下游,再从下游接受。

def send_forward_recv_backward(output_tensors, tensor_shapes, timers):
    if not isinstance(output_tensors, list):
        output_tensors = [output_tensors]
    output_tensor_grads = []
    for (output_tensor, tensor_shape) in zip(output_tensors, tensor_shapes):
        if tensor_shape is None:
            output_tensor_grads.append(None)
            continue
        # 发送自己的激活,然后得到下游传上来的梯度
        output_tensor_grad = p2p_communication.send_forward_recv_backward(
                output_tensor, tensor_shape, timers=timers)
        output_tensor_grads.append(output_tensor_grad)
    return output_tensor_grads #返回梯度

可以发现,对于单个 worker,都是阻塞进行,因为 send 和 recv 都是阻塞,这样通信和计算必须串行,不能重叠。因为前面热身阶段已经把前向传递一直从 worker 0 传送到 worker d,所以 worker d 可以直接拿到 input,就进行处理,然后直接进行反向计算,然后返回给上游。所以串行也无所谓。我们从论文之中的图例也可以看出来:

图:PipeDream-Flush在稳定状态下交替进行向前和向后传播,通过将激活隐藏限制为仅执行中(in-flight)的微批次来保持较低的内存占用。从图上可以看到:

  • Worker 1的执行序列是:1 FW(warmup), 2 FW, 1 BW,3 FW,2 BW,4 FW,3 BW,4 BW(cooldown)
  • Worker 2的执行序列是:1 FW,1BW, 2 FW, 2 BW, 3 FW, 3 BW, 4 FW, 4 BW,worker 2直接就进入了稳定状态。

3.6 冷却阶段

冷却阶段和热身阶段对称,也执行num_warmup_microbatches个步骤,但是只做反向传播。这个阶段因为是清理未完毕的反向传播,所以只是从队列中pop。具体就是弹出上游激活和传递给下游的激活,然后进行梯度计算。

# Run cooldown backward passes.
if not forward_only:
    for i in range(num_warmup_microbatches):
        input_tensor = input_tensors.pop(0)
        output_tensor = output_tensors.pop(0)

        output_tensor_grad = recv_backward(send_tensor_shapes, timers=timers)
        input_tensor_grad = \
            backward_step(optimizer, input_tensor, output_tensor,
                          output_tensor_grad)

        send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)

return losses_reduced

3.7 Flush 体现在哪里?

我们需要看看 megatron/training.py。就是一次训练step的流程。这里在 update_successful, grad_norm, num_zeros_in_grad = optimizer.step() 时候会调用优化器进行参数更新,此时,内部两个激活值队列也全部清空过了,所以在这个时间点上,flush也就完成了。

def train_step(forward_step_func, data_iterator,
               model, optimizer, lr_scheduler):
    """Single training step."""
    args = get_args()
    timers = get_timers()

    # 1. 把梯度归零
    # Set grad to zero.
    if args.DDP_impl == 'local' and args.use_contiguous_buffers_in_local_ddp:
        for partition in model:
            partition.zero_grad_buffer()
    optimizer.zero_grad()

    # 2. 进行前向,后向传播,对于本章来说,就是调用forward_backward_pipelining_without_interleaving
    forward_backward_func = get_forward_backward_func()
    losses_reduced = forward_backward_func(
        forward_step_func, data_iterator, model,
        optimizer, timers, forward_only=False)

    # 到了这里,整个流水线处理完毕,loss 和 梯度都计算完毕
    # Empty unused memory
    if args.empty_unused_memory_level >= 1:
        torch.cuda.empty_cache()

    # 3. 数据并行的all-reduce
    # All-reduce if needed.
    if args.DDP_impl == 'local':
        for model_module in model:
            model_module.allreduce_gradients()

    # All-reduce word_embeddings' grad across first and last stages to ensure
    # that word_embeddings parameters stay in sync.
    # This should only run for models that support pipelined model parallelism
    # (BERT and GPT-2).
    # 4. 嵌入层 all-reduce,嵌入层也进行了权重分享,所以要进行all-reduce来确保参数统一
    if mpu.is_rank_in_embedding_group(ignore_virtual=True) and \
            mpu.get_pipeline_model_parallel_world_size() > 1:
        if mpu.is_pipeline_first_stage(ignore_virtual=True):
            unwrapped_model = model[0]
        elif mpu.is_pipeline_last_stage(ignore_virtual=True):
            unwrapped_model = model[-1]
        else:  # We do not support the interleaved schedule for T5 yet.
            unwrapped_model = model[0]
        unwrapped_model = unwrap_model(
            unwrapped_model, (torchDDP, LocalDDP, Float16Module))

        if unwrapped_model.share_word_embeddings:
            word_embeddings_weight = unwrapped_model.word_embeddings_weight()
            if args.DDP_impl == 'local':
                grad = word_embeddings_weight.main_grad
            else:
                grad = word_embeddings_weight.grad
            torch.distributed.all_reduce(grad, group=mpu.get_embedding_group())

    # Update parameters.
    # 5. 更新参数,这里才进行Flush,
    update_successful, grad_norm, num_zeros_in_grad = optimizer.step()

    # Update learning rate.
    if update_successful:
        increment = get_num_microbatches() * \
                    args.micro_batch_size * \
                    args.data_parallel_size
        lr_scheduler.step(increment=increment)
        skipped_iter = 0
    else:
        skipped_iter = 1

    # Empty unused memory
    if args.empty_unused_memory_level >= 2:
        torch.cuda.empty_cache()

    if mpu.is_pipeline_last_stage(ignore_virtual=True):
        # Average loss across microbatches.
        loss_reduced = {}
        for key in losses_reduced[0]:
            losses_reduced_for_key = [x[key] for x in losses_reduced]
            loss_reduced[key] = sum(losses_reduced_for_key) / len(losses_reduced_for_key)
        return loss_reduced, skipped_iter, grad_norm, num_zeros_in_grad
    return {}, skipped_iter, grad_norm, num_zeros_in_grad

至此,NVIDIA Megetron 分析完毕,我们接下来使用 NVIDIA HugeCTR 看看如何处理大型稀疏嵌入。

[细读经典]Megatron论文和代码详细分析(2)

[细读经典]Megatron论文和代码详细分析(1)

Megatron-LM源码阅读(一)

Megatron-LM源码阅读(二)

megatron学习总结

GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism

PipeDream-Flush

PipeDream: 数据并行+流水线

PipeDream-interleaved