目录
本文是 PyTorch 分布式系列的第五篇,以几篇官方文档的翻译为基础,加入了自己的一些思考,带领大家进入DistributedDataParallel,在后续会用5~6篇左右做深入分析。
本系列其他文章如下:
[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)
[源码解析] PyTorch如何实现前向传播(3) --- 具体实现
[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎
[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构
[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑
[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法
[源码解析] PyTorch 分布式(1)------历史和概述
[源码解析] PyTorch 分布式(2) ----- DataParallel(上)
[源码解析] PyTorch 分布式(3) ----- DataParallel(下)
[源码解析] PyTorch 分布式(4)------分布式应用基础概念
因为DistributedDataParallel 是数据并行,所以我们首先通过两个图,复习一下什么是数据并行。
第一个图片来自 https://www.cnblogs.com/yh-blog/p/12877922.html,其原始出处未知。
我们可以看到,模型并行与数据并行的区别。
第二张图来自fairscale github源码,清晰的给出了一个数据并行的运行模式,具体包括:
模型分片,本地前向计算,本地反向传播,AllReduce来同步梯度,本地更新梯度这几步。
Torch.distributed 包 为多个计算节点的 PyTorch 提供多进程并行通信原语,可以并行化跨进程和跨集群的计算。torch.nn.parallel.DistributedDataParallel
基于torch.distributed 包的功能提供了一个同步分布式训练wrapper,这个wrapper可以对 PyTorch 模型封装进行训练。其核心功能是基于多进程级别的通信,与Multiprocessing package - torch.multiprocessing 和 DataParrallel 提供的并行性有明显区别。
以下是 DDP 的整体架构,大家可以看到ddp在整个架构之中的位置,依赖项等等。图片来自来自源码。
我们通过一个图来说明 DDP 的运行逻辑。
图片来自 https://www.telesens.co/2019/04/04/distributed-data-parallel-training-using-pytorch-on-aws/
具体逻辑如下:
既然 DataParallel 可以进行数据并行训练,那么为什么还需要提出 DistributedDataParallel呢?这里我们就需要知道两种方法的实现原理与区别:
大型模型训练。
如果模型太大而无法容纳在单个 GPU 上,则必须使用模型并行将其拆分到多个 GPU 中。
如果数据太大而无法容纳在一台计算机上,则需要使用数据并行。
如果您的模型需要跨越多台机器,或者您的用例不适合数据并行性范式,请参阅 RPC API ,以获得更多通用的分布式训练支持。
多进程还是多线程:
GIL-thrashing
)。对于严重依赖 Python 运行时的模型(比如说包含 RNN
层或大量小组件的 models
)这尤其重要。DataParallel
通常也比DistributedDataParallel
慢,这是因为跨线程的 GIL 争用,每次迭代复制的模型以及分散输入和收集输出所带来的额外开销。DDP 与DP在具体实现上的区别如下:
关于优化器:
optimizer
,每个进程都独立完成所有优化步骤,这和非分布式训练一样。optimizer
,在主线程执行。其对各 GPU
上梯度进行求和,而在主 GPU
进行参数更新,之后再将模型参数 broadcast
到其他 GPU
。关于梯度。
broadcast
模型到所有进程(其他GPU)进行下一步训练。关于传播数据:
broadcast
),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。相较于 DataParallel
来说,torch.distributed
传输的数据量更少,因此速度更快,效率更高。Pytorch
中分布式的基本使用流程如下:
init_process_group
初始化进程组,同时初始化 distributed
包,然后才能使用 distributed
包的其他函数。new_group
创建子分组。DDP(model, device_ids=device_ids)
创建 DistributedDataParalle 模型。Sampler
。torch.distributed.launch
在每个主机上执行脚本,开始训练。destory_process_group()
销毁进程组。首先,我们使用 https://pytorch.org/tutorials/intermediate/ddp_tutorial.html 来看看。
在示例的最开始,我们首先要正确设置进程组。
init_process_group 的参数解释如下:
"gloo" 说明后端使用 "gloo"。
rank 是本进程对应的rank,如果是0,则说明本进程是 master 进程,负责广播模型状态等工作。
world_size 指的是总的并行进程数目,如果连接的进程数小于world_size,进程就会阻塞在 init_process_group之上,如果达到了 world_size,程序才会继续运行。如果 batch_size = 16,那么总体的batch size 就是 16 * world_size。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size) # 这条命令之后,master进程就处于等待状态
def cleanup():
dist.destroy_process_group()
现在,让我们创建一个简单模块,用 DDP 包装它,并用一些虚拟输入数据馈送它。请注意,由于 DDP 将模型状态从 rank 0 进程广播到 DDP 构造函数中的所有其他进程,因此对于所有 DDP 进程来说,它们的起始模型参数是一样的,用户无需担心不同的 DDP 进程从不同的模型参数初始值开始。
+-----------+
| |
| Rank 0 |
| |
+-----+-----+
|
| Model Parameters
|
|
+---------------+---------v----------------------+
| | |
| | |
| | |
| | |
v v v
+----+-----+ +----+-----+ +---+-------+
| | | | | |
| Rank 1 | | Rank 2 | ...... | Rank n |
| | | | | |
+----------+ +----------+ +-----------+
DDP 包装了较低级别的分布式通信细节,并提供了一个干净的 API,就好像它是一个本地模型一样。梯度同步通信发生在反向传播期间,并与反向计算重叠。当backward()
返回时,param.grad
已经包含同步梯度张量。因为DDP 封装了分布式通信原语,所以模型参数的梯度可以进行 all-reduce。
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
具体如下图
+--------------------------+ +------------------------+
| torch.optim.SGD | | DDP |
| | parameters() | |
| | | +------------+ |
| | <-----------------+ | | |
| | | | ToyModel | |
| | | | | |
| | | +------------+ |
| | | |
+--------------------------+ +--------+---------------+
|
|
| forward outputs
|
|
v
+-------------------------+
| nn.MSELoss() |
| |
| |
| |
| |
+-------------------------+
在 DDP 中,构造函数、前向传递和后向传递是分布式同步点。我们期望不同的进程会启动相同数量的同步操作,并在大致相同的时间以相同的顺序到达这些同步点。否则,进度快的进程可能会提前到达同步点,如果快进程等待落后者的时间过长,那么先到的进程会超时。
因此,用户需要负责平衡进程间的工作负载分布。有时,由于网络延迟,资源争用,不可预测的工作负载峰值等原因,处理速度的偏差是不可避免的。为避免在这些情况下超时,请确保在调用 init_process_group 时。timeout
这个参数传递足够大的值 。
一般来说,用户可以使用torch.save
和torch.load
作为checkpoints,以便从检查点恢复训练。
在使用 DDP 时,一种优化是只在一个进程中保存模型,然后在所有进程中加载模型,从而减少写入开销(这其实很像数据库中的读写分离)。因为所有进程都从相同的参数开始,并且在反向传递中同步梯度,所以优化器应该将参数设置为相同的值。如果使用此优化,请确保在保存完成之前所有进程都不会开始加载。
此外,在加载模块时,您需要提供适当的map_location
参数,以防止一个进程进入他人的设备。如果map_location
缺失,torch.load
将首先将模块加载到 CPU,然后将每个参数复制到它之前保存的地方,这将导致同一台机器上的所有进程使用相同的一组设备。
有关更高级的故障恢复和弹性支持,请参阅TorchElastic。后续也会有专门系列介绍弹性部分。
从下图可以看出来,Rank 0 负责保存模型到存储之上,其他 Rank 会加载模型到其本地。
+-----------+
| |
| Rank 0 |
| |
+-----+-----+
|
save | Model Parameters
|
|
v
+-------+------+
| |
+-----------+ Model file +---------------------+
| | | |
| +---+----------+ |
| | |
| | |
| | |
| | |
|load |load load |
| | |
| | |
| | |
| | |
v v v
+----+-----+ +----+-----+ +---+-------+
| | | | | |
| Rank 1 | | Rank 2 | ...... | Rank n |
| | | | | |
+----------+ +----------+ +-----------+
具体如下:
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location))
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn = nn.MSELoss()
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as
# a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html 后半部分是与模型并行的结合,我们一起来看看。
DDP 也适用于多 GPU 模型。DDP 在使用大数据训练大模型时候特别有用。
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
注意,当把一个多GPU 模型传递给DDP时候,不能设置device_ids
和output_device
。
输入和输出数据将通过应用程序或模型forward()
方法来放置在适当的设备中。
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
dev0 = (rank * 2) % world_size
dev1 = (rank * 2 + 1) % world_size
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
run_demo(demo_model_parallel, world_size)
请注意,这里没有使用 Sampler,正常在使用之中,需要用DistributedSampler来配合 DDP 使用,DistributedSampler 会把数据集样本针对每个进程来划分,这样每个进程就读取到了自己应该使用的样本,而且 DistributedSampler 会为 DDP 模式使用 set_epoch 来shuffle数据集。
前面提到,如果应用程序需要跨机器边界进行扩展,需要使用多机 DistributedDataParallel 和 启动脚本。torch.nn.parallel.DistributedDataParallel()
支持多个通过网络互联的机器,用户必须为每个进程显式启动一个主训练脚本。
我们下面就看看这个启动脚本 https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md。以下就是这个md文件的翻译。
在本教程中,我们将演示如何构建分布式模型训练应用程序,这样它可以在多个节点上方便地启动。这里每个节点都有多个 GPU,并且使用 PyTorch 的分布式启动程序脚本 https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py 启动实用程序torch.distributed.launch,此脚本程序可用于为每个节点启动多个进程以进行分布式训练,它在每个训练节点上产生多个分布式训练进程。
这个工具可以用作CPU训练或者GPU 训练,如果被用于GPU,每个GPU产生一个进程Process。该工具既可以用来做单节点多GPU训练,也可用于多节点多GPU训练。
在 单节点分布式训练 或 多节点分布式训练 的两种情况下,该工具将为每个节点启动给定数量的进程(--nproc_per_node)。如果用于GPU培训,则此数字需要小于或等于当前系统上的GPU数量(nproc_per_node),每个进程将在从GPU 0到GPU(nproc_per_node - 1)的单个GPU上运行。
多个worker通过处理大型数据集的不同部分来训练同一个全局模型,每个worker将独立计算局部梯度(也称为子梯度 sub-gradients),然后使用 AllReduce 原语来同步梯度。因为同一个程序在所有应用上运行,但每个应用都在训练数据集的不同部分上运行,所以在 HPC 术语中,这种执行模型称为单程序多数据或 SPMD,
一个分布式数据并行 (DDP) 应用程序可以在多个节点上执行,其中每个节点可以由多个 GPU 设备组成。每个节点依次可以运行 DDP 应用程序的多个副本,每个副本在多个 GPU 上处理其模型。
设N为运行应用程序的节点数, G为每个节点的 GPU 数。同时在所有节点上运行的应用程序进程总数称为 World Size,简写为W。在每个节点上运行的进程数称为Local World Size,简写为L。
每个应用进程都分配了两个 ID:local rank 取值在 [0, L -1] 中,global rank 取值在 [0, W -1] 之中。
为了阐明上面定义的术语,我们考虑在两个节点上启动 DDP 应用程序的情况,每个节点都有四个 GPU。然后我们希望每个进程跨越(span)两个 GPU。进程到节点的映射如下图所示:
下面图片也出自于 https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md。
虽然有很多方法可以将进程映射到节点,但一个好的经验法则是让一个进程跨越(span)单个 GPU。这使得 DDP 应用程序能够拥有与 GPU 一样多的并行读取流,并且在现实中也提供了 I/O 和计算成本之间的良好平衡。
无论 DDP 应用程序采用何种启动方式,每个进程都需要一种机制来了解其全局和本地等级。所以,所有进程会创建一个ProcessGroup
,基于ProcessGroup
可以使它们能够参与诸如 AllReduce 之类的集合通信操作。
有一种便捷的方法可以启动多个 DDP 进程,并且可以初始化所有参数(这些数值是建立一个ProcessGroup
所需要的),这就是使用PyTorch 提供的分布式 脚本launch.py
。
这个 Launcher 可以在本地torch
安装目录的distributed
子目录下找到。这是在任何操作系统上获取launch.py
路径的快捷方法 :
python -c " from os import path; import torch; print(path.join(path.dirname(torch.__file__), 'distributed', 'launch.py')) "
这将打印如下内容:
/home/username/miniconda3/envs/pytorch/lib/python3.8/site-packages/torch/distributed/launch.py
当 DDP 应用程序通过 launch.py
启动时,它通过环境变量将 world size、 global rank、local rank,master address 和端口作为命令行参数传递给每个实例。要使用 Launcher,应用程序需要遵守以下约定:
torch.multiprocessing.spawn
启动子进程。为简单起见,应用程序可以假设每个进程映射到单个 GPU,但在下一节中,我们还将展示如何用更通用的办法来执行进程到 GPU 的映射。
此示例 DDP 应用程序基于 DDP 教程 的 “Hello, World” 应用。
DDP 应用程序采用两个命令行参数:
--local_rank
: 此参数将通过 launch.py
传入。--local_world_size
:这是明确传递的,通常是数字 \(1\) 或每个节点的 GPU 数量。应用程序解析这些并调用spmd_main
入口点:
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int, default=0)
parser.add_argument("--local_world_size", type=int, default=1)
args = parser.parse_args()
spmd_main(args.local_world_size, args.local_rank)
在 spmd_main
之中,进程组使用后端(NCCL 或 Gloo)进行初始化。集合点(rendezvous )所需的其余信息来自launch.py
设置的环境变量:
def spmd_main(local_world_size, local_rank):
# These are the parameters used to initialize the process group
env_dict = {
key: os.environ[key]
for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
}
print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
dist.init_process_group(backend="nccl")
print(
f"[{os.getpid()}] world_size = {dist.get_world_size()}, "
+ f"rank = {dist.get_rank()}, backend={dist.get_backend()}"
)
demo_basic(local_world_size, local_rank)
# Tear down the process group
dist.destroy_process_group()
给定 local rank 和 world size,训练函数demo_basic
将通过device_ids
在本地节点的一组 GPU 上初始化DistributedDataParallel
模型:
def demo_basic(local_world_size, local_rank):
# setup devices for this process. For local_world_size = 2, num_gpus = 8,
# rank 0 uses GPUs [0, 1, 2, 3] and
# rank 1 uses GPUs [4, 5, 6, 7].
n = torch.cuda.device_count() // local_world_size
device_ids = list(range(local_rank * n, (local_rank + 1) * n))
print(
f"[{os.getpid()}] rank = {dist.get_rank()}, "
+ f"world_size = {dist.get_world_size()}, n = {n}, device_ids = {device_ids}"
)
model = ToyModel().cuda(device_ids[0])
ddp_model = DDP(model, device_ids)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_ids[0])
loss_fn(outputs, labels).backward()
optimizer.step()
该应用程序可以通过launch.py
以下方式在一个 8 GPU 的节点上启动,每个 GPU 一个进程:
python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=8 example.py --local_world_size=8
并产生类似于下图所示的输出:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
[238627] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '8'}
[238630] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '3', 'WORLD_SIZE': '8'}
[238628] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '1', 'WORLD_SIZE': '8'}
[238634] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '7', 'WORLD_SIZE': '8'}
[238631] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '4', 'WORLD_SIZE': '8'}
[238632] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '5', 'WORLD_SIZE': '8'}
[238629] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '2', 'WORLD_SIZE': '8'}
[238633] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '6', 'WORLD_SIZE': '8'}
[238633] world_size = 8, rank = 6, backend=nccl
[238628] world_size = 8, rank = 1, backend=nccl
[238629] world_size = 8, rank = 2, backend=nccl
[238631] world_size = 8, rank = 4, backend=nccl
[238630] world_size = 8, rank = 3, backend=nccl
[238632] world_size = 8, rank = 5, backend=nccl
[238634] world_size = 8, rank = 7, backend=nccl
[238627] world_size = 8, rank = 0, backend=nccl
[238633] rank = 6, world_size = 8, n = 1, device_ids = [6]
[238628] rank = 1, world_size = 8, n = 1, device_ids = [1]
[238632] rank = 5, world_size = 8, n = 1, device_ids = [5]
[238634] rank = 7, world_size = 8, n = 1, device_ids = [7]
[238629] rank = 2, world_size = 8, n = 1, device_ids = [2]
[238630] rank = 3, world_size = 8, n = 1, device_ids = [3]
[238631] rank = 4, world_size = 8, n = 1, device_ids = [4]
[238627] rank = 0, world_size = 8, n = 1, device_ids = [0]
同样,它可以使用一个跨越(span)所有 8 个 GPU 的单进程来启动:
python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=1 example.py --local_world_size=1
为当前主机创建 nproc_per_node 个进程,每个进程独立执行训练脚本,同时还为每个进程分配一个 local_rank 参数,表示当前进程在当前主机上的编号。
比如 node_rank = 2, local_rank = 0,表示 node_rank 第2个节点,上第一个进程。
依次产生以下输出
[262816] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '1'}
[262816]: world_size = 1, rank = 0, backend=nccl
[262816] rank = 0, world_size = 1, n = 8, device_ids = [0, 1, 2, 3, 4, 5, 6, 7]
作为分布式数据并行应用程序的作者,您的代码需要了解两种类型的资源:计算节点和每个节点内的 GPU。但是需要跟踪GPU集如何映射到应用程序进程,这个簿记(bookkeeping )工作可能既乏味又容易出错。
所以我们希望通过按照本示例所示的方法,使用 launcher 来构建您的应用程序,这样可以显著简化分布式训练的设置。
知道了启动脚本的作用依然不够,我们还需要知道其内部做了什么。
launch.py 位于 torch/distributed/launch.py,但是实际上,它的大部分功能都被转移到了 torch/distributed/run.py 之中。
def main(args=None):
logger.warn(
"The module torch.distributed.launch is deprecated "
"and going to be removed in future."
"Migrate to torch.distributed.run"
)
args = parse_args(args)
run(args)
所以我们要看看 run.py。
可以看到,run.py 的基本思路就是:使用 config_from_args 来从命令行之中提取信息,构建了对应的配置,执行语句和其参数,然后调用 elastic_launch 来执行。由此可见,弹性训练是未来趋势。我们后续也有系列来分析弹性训练。
def run(args):
if args.standalone:
args.rdzv_backend = "c10d"
args.rdzv_endpoint = "localhost:29400"
args.rdzv_id = str(uuid.uuid4())
log.info(
f"\n**************************************\n"
f"Rendezvous info:\n"
f"--rdzv_backend={args.rdzv_backend} "
f"--rdzv_endpoint={args.rdzv_endpoint} "
f"--rdzv_id={args.rdzv_id}\n"
f"**************************************\n"
)
config, cmd, cmd_args = config_from_args(args)
elastic_launch(
config=config,
entrypoint=cmd,
)(*cmd_args)
run.py 也可以独立运行,比如。
>>> python -m torch.distributed.run
--nnodes=$NUM_NODES
--nproc_per_node=$NUM_TRAINERS
--rdzv_id=$JOB_ID
--rdzv_backend=c10d
--rdzv_endpoint=$HOST_NODE_ADDR
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
因为run.py 有很多配置参数,所以我们大致看一下。
Node
- 物理实例或容器;映射到与 job manager 所协调的单元。
Worker
- 分布式培训环境中的worker。
WorkerGroup
- 执行相同功能的一组worker(例如trainer)。
LocalWorkerGroup
- 在同一节点上运行的工作组中的workers子集。
RANK
- 工作组中worker的rank,是全局rank,可以认为是一个全局GPU资源列表。
LOCAL_RANK
- 本地工作组中,某个worker 的 rank,可以认为是当前节点上的GPU资源列表。
GROUP_RANK
- worker group的rank。介于0和“最大节点数”之间的数字。如果每个节点运行一个单一工作组,那就是这个节点的rank。
ROLE_RANK
- 对于具有相同角色worker来说,他们之间共享的rank,角色在“WorkerSpec”中被指定。
WORLD_SIZE
- 工作组中worker的总数。因为节点会加入/离开,所以WORLD_SIZE
会变化,不能依赖 WORLD_SIZE
的稳定性进行编码。
LOCAL_WORLD_SIZE
- 本地工作组的大小,即本地运行的worker数目,等于在torch.distributed.run
运行时候指定的--nproc_per_node
。目前,torch/distributed/run.py 仅支持同构的 LOCAL_WORLD_SIZE
。也就是说,假设所有节点运行相同数量的本地工作者(每个角色)。
ROLE_WORLD_SIZE
- 具有同样角色的workers总数,在 WorkerSpec
之中被指定。
rdzv_id
- 用户定义的id,用于唯一标识作业的工作组。这个id在每个节点加入特定工作组时候使用。
rdzv_backend
-rendezvous 的后端(例如“c10d”)。这通常是一个强一致性的键值存储。
rdzv_endpoint
- rendezvous 后端端点;通常以“<host>:<port>
”的形式出现。
run_id
: 用户定义的id,它唯一地标识分布式应用程序的一个实例。它通常映射到作业id并用于
允许节点加入正确的分布式应用程序。
TORCHELASTIC_RESTART_COUNT
- 迄今为止,工作组重启的次数。
TORCHELASTIC_MAX_RESTARTS
- 配置的最大重启数目。
TORCHELASTIC_RUN_ID
- 与 rendezvous run_id
相等,即唯一的job id。
我们后面会有专门系列来介绍弹性训练,所以就此略过。下一篇我们开始介绍通信所需要的store概念,敬请期待。
https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
手机扫一扫
移动阅读更方便
你可能感兴趣的文章