[源码解析] 深度学习流水线并行 PipeDream(4)--- 运行时引擎
阅读原文时间:2021年09月11日阅读:1

[源码解析] 深度学习流水线并行 PipeDream(4)--- 运行时引擎

目录

在前文中,我们介绍了PipeDream的总体架构,Profile阶段,计算分区阶段和模型转换阶段,本文我们介绍运行时执行引擎,这是一个统一基础设施层。

流水线并行其他文章链接如下:

[源码解析] 深度学习流水线并行Gpipe(1)---流水线基本实现

[源码解析] 深度学习流水线并行GPipe (2) ----- 梯度累积

[源码解析] 深度学习流水线并行 GPipe(3) ----重计算

[源码解析] 深度学习流水线并行之PipeDream(1)--- Profile阶段

[源码解析] 深度学习流水线并行 PipeDream(2)--- 计算分区

[源码解析] 深度学习流水线并行 PipeDream(3)--- 转换模型

1.1 前文回顾

在前文,我们经历了三个阶段:profile,计算分区,模型转换,目前就得到了若干python文件&配置文件。PipeDream 加载这些文件之后,就可以进行训练。

所以从本文开始,我们介绍训练所需要的各种支撑系统,比如运行时执行引擎。主要是看看一个深度学习训练运行时应该包括什么功能。

1.2 运行时系统

结合之前的分析和我们先思考为何要实现一个运行时,以及针对深度学习(流水线并行)需要实现什么功能。

1.2.1 PyTorch 的特点

首先看看PyTorch 的特点:

  • PyTorch 本身实现了 autograd 功能,这样反向传播就做到了自动微分。
  • 在分布式数据并行实现上,PyTorch 实现了 DDP 功能。
  • 在分布式模型并行等方面,PyTorch 也提供了 RPC 功能作为支撑基础。但是,RPC功能是在 PyTorch 1.5 版本中引入的,时间是2020-06-12。
  • 针对 DDP 和 RPC,PyTorch 也相应实现了 distributed.autograd 功能,对用户屏蔽了大量分布式细节,让用户对分布式训练尽量无感(我们后文会有专门系列来分析PyTorch的分布式)。

1.2.2 PyTorch RPC

RPC 功能是在 PyTorch 1.5 版本中正式引入的,时间是2020-06-12,具体如下。

Distributed RPC framework APIs [Now Stable]

The torch.distributed.rpc package aims at supporting a wide range of distributed training paradigms that do not fit into DistributedDataParallel. Examples include parameter server training, distributed model parallelism, and distributed pipeline parallelism. Features in the torch.distributed.rpc package can be categorized into four main sets of APIs.

  • The RPC API allows running a function on a specified destination worker with given arguments and fetches the return value or creates a distributed reference to the return value.
  • The RRef (Remote REFerence) serves as a reference to an object on another worker. A worker holding an RRef can explicitly request copies of the object, and it can also share the light-weight RRef with other workers without worrying about reference counting. This is especially useful when multiple workers need to repeatedly access different versions of the same remote object.
  • With Distributed Autograd, applications can automatically compute gradients even if a model is split on multiple workers using RPC. This is achieved by stitching together local autograd graphs at RPC boundaries in the forward pass and reaching out to participants to transparently launch local autograd in the backward pass.
  • The Distributed Optimizer uses gradients computed by Distributed Autograd to update model parameters. Its constructor takes a local optimizer (e.g., SGD, Adagrad, etc.) and a list of parameter RRefs, and its step() function automatically uses the local optimizer to update parameters on all distinct RRef owner workers.

但是 PipeDream 论文是在 2019 年发布,这就意味着 PipeDream无法精准利用 PyTorch RPC,只能自己实现通信逻辑,即对计算图的支撑。

1.2.3 PipeDream的特性

其次看看PipeDream的特性:

  • PipeDream是把模型并行,数据并行结合在一起,实现了流水线并行。
  • PipeDream实际上是把一个完整的深度训练模型拆分开来,各个子模型(子图)分别放在不同节点之上。

1.2.4 结合起来

综合以上两点,这就意味着,对于PipeDream来说,单纯的 DDP,模型并行和 autograd 功能无法满足我们的需求,必须结合起来使用。

PipeDream需要自己实现至少:

  • 如何在多个阶段(节点)之间进行通信,这可能会使用到 PyTorch RPC 功能,但是因为开始时候没有稳定版本,只能自己实现一个分布式计算图,这样就用到了 PyTorch distributed 的 P2P 功能。
  • 因为通信需要,所以自己管理每个阶段(节点)的发送、接受rank,就是配置和管理各个阶段(节点)的生产者,消费者。这样也意味着需要找到每个阶段(节点)的输入,输出。
  • 因为 P2P 通信功能需要,所以需要给每个张量配置一个唯一的标识(对应下文的tag)。
  • 如何在单个阶段(若干节点)上进行数据并行,这应该会用到 PyTorch DDP 功能。
  • 因为用到数据并行,所以需要自己管理每个阶段的并行数目。
  • 因为需要结合模型并行和数据并行,所以需要自己管理进程工作组。
  • 因为在不同节点(机器)上运行,所以每个机器独立运行训练脚本时候,需要对自己训练job进行独立配置。

所以,下面我们结合这些功能点,做具体分析。

2.1 如何调用

通过源码中的样例我们可以看到,可以在多个节点上分别运行 main_with_runtime.py 脚本,每个脚本启动参数不同,比如 rank 就不同,这样各个节点之上就运行了不同的阶段所对应的模型。

python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 0 --local_rank 0 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 1 --local_rank 1 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 2 --local_rank 2 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 3 --local_rank 3 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo

2.2 总体逻辑

使用 runtime 的总体逻辑以如下文件为例 :runtime/translation/main_with_runtime.py。主要逻辑是:

  • 解析输入参数
  • 加载,生成模型
  • 依据模块来构建模型
  • 依据参数进行配置比如输入大小,batch size等
  • 遍历模型的每个层(跳过最后loss层)
    • 遍历每层的输入,构建输入张量
    • 通过调用stage对应的forward函数,构建出输出
    • 遍历每层的输出,设置其类型和形状
  • 构建输出值张量类型
  • 加载配置文件
  • 构建一个 StageRuntime
  • 建立 optimizer
  • 加载 dataset
  • 进行训练,保存checkpoint

总体代码如下:

def main():
    # 解析输入参数
    global args, best_prec1
    args = parser.parse_args()

    # Special case handling for GNMT model
    l2_promote()

    torch.cuda.set_device(args.local_rank)

    # build tokenizer
    tokenizer = Tokenizer(os.path.join(args.data_dir, config.VOCAB_FNAME))

    # define loss function
    criterion = build_gnmt_criterion(
        vocab_size=tokenizer.vocab_size, padding_idx=config.PAD, smoothing=0.1)

    # create stages of the model
    # 加载,生成模型
    module = importlib.import_module(args.module)
    args.arch = module.arch()
    # 依据模块来构建模型
    model = module.model(criterion)

    # 依据参数进行配置比如输入大小,batch size等
    input_size = [args.max_length_train, args.batch_size]
    training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
                              "input2": input_size, "target": [args.max_length_train * args.batch_size],
                              "target_length": [args.batch_size]}
    dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
              "target": torch.int64, "target_length": torch.int32}
    inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
    target_tensor_names = {"target", "target_length"}

    # 遍历模型的每个层(跳过最后loss层)
    for module_id, (stage, inputs, outputs) in enumerate(model[:-1]):  # Skip last layer (loss).
        input_tensors = []
        # 遍历每层的输入,构建输入张量
        for module_input in inputs:
            if module_input in inputs_module_destinations:
                inputs_module_destinations[module_input] = module_id

            input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
                                      dtype=dtypes[module_input])#.cuda()
            input_tensors.append(input_tensor)
        #stage.cuda()
        # PyTorch should not maintain metadata for a backward pass on
        # synthetic inputs. Without the following line, the runtime is
        # as much as 1.5x slower in a full DP configuration.
        with torch.no_grad():
            # 通过调用stage对应的forward函数,构建出输出
            output_tensors = stage(*tuple(input_tensors))
        if not type(output_tensors) is tuple:
            output_tensors = [output_tensors]
        # 遍历每层的输出,设置其类型和形状
        for output, output_tensor in zip(outputs,
                                         list(output_tensors)):
            # output 是 ['out2', 'out1']
            training_tensor_shapes[output] = list(output_tensor.size())
            dtypes[output] = output_tensor.dtype

    # 构建输出值张量类型
    eval_tensor_shapes = {}
    for key in training_tensor_shapes:
        eval_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])
        training_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])

    # 加载配置文件
    configuration_maps = {
        'module_to_stage_map': None,
        'stage_to_rank_map': None,
        'stage_to_depth_map': None
    }
    if args.config_path is not None:
        json_config_file = json.load(open(args.config_path, 'r'))
        configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
        configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
        configuration_maps['stage_to_rank_map'] = {
            int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
        configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)

    # 构建一个 StageRuntime
    r = runtime.StageRuntime(
        model=model, distributed_backend=args.distributed_backend,
        fp16=args.fp16, loss_scale=args.loss_scale,
        training_tensor_shapes=training_tensor_shapes,
        eval_tensor_shapes=eval_tensor_shapes,
        training_tensor_dtypes=dtypes,
        inputs_module_destinations=inputs_module_destinations,
        target_tensor_names=target_tensor_names,
        configuration_maps=configuration_maps,
        master_addr=args.master_addr,
        rank=args.rank, local_rank=args.local_rank,
        num_ranks_in_server=args.num_ranks_in_server,
        verbose_freq=args.verbose_frequency,
        model_type=runtime.TRANSLATION,
        enable_recompute=args.recompute)

    # stage needed to determine if current stage is the first stage
    # num_stages needed to determine if current stage is the last stage
    # num_ranks needed to determine number of warmup_minibatches in case of pipelining
    args.stage = r.stage
    args.num_stages = r.num_stages
    args.num_ranks = r.num_ranks
    if not is_first_stage():
        args.synthetic_data = True

    # define optimizer
    if args.no_input_pipelining:
        num_versions = 1
    else:
        # number of versions is the total number of machines following the current
        # stage, shared amongst all replicas in this stage
        num_versions = r.num_warmup_minibatches + 1

    # if specified, resume from checkpoint
    if args.resume:
        checkpoint_file_path = "%s.%d.pth.tar" % (args.resume, r.stage)
        assert os.path.isfile(checkpoint_file_path)
        print("=> loading checkpoint '{}'".format(checkpoint_file_path))
        checkpoint = torch.load(checkpoint_file_path)
        args.start_epoch = checkpoint['epoch']
        best_prec1 = checkpoint['best_prec1']
        r.load_state_dict(checkpoint['state_dict'])
        print("=> loaded checkpoint '{}' (epoch {})"
                .format(checkpoint_file_path, checkpoint['epoch']))

    # TODO: make this configurable by args
    # 建立 optimizer
    use_adam_optimizer = True
    if use_adam_optimizer:
        optimizer = adam.AdamWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
            macrobatch=args.macrobatch)
    else:
        optimizer = sgd.SGDWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, momentum=args.momentum,
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)

    if args.resume:
        optimizer.load_state_dict(checkpoint['optimizer'])

    cudnn.benchmark = True

    # 加载 dataset
    train_dataset = LazyParallelDataset(
        src_fname=os.path.join(args.data_dir, config.SRC_TRAIN_FNAME),
        tgt_fname=os.path.join(args.data_dir, config.TGT_TRAIN_FNAME),
        tokenizer=tokenizer,
        min_len=args.min_length_train,
        max_len=args.max_length_train,
        sort=False,
        max_size=None)

    val_dataset = ParallelDataset(
        src_fname=os.path.join(args.data_dir, config.SRC_VAL_FNAME),
        tgt_fname=os.path.join(args.data_dir, config.TGT_VAL_FNAME),
        tokenizer=tokenizer,
        min_len=args.min_length_train,
        max_len=args.max_length_train,
        sort=True)

    distributed_sampler = False
    if configuration_maps['stage_to_rank_map'] is not None:
        num_ranks_in_first_stage = len(configuration_maps['stage_to_rank_map'][0])
        if num_ranks_in_first_stage > 1:
            distributed_sampler = True

    # TODO: fix random seeds
    train_loader = train_dataset.get_loader(
        batch_size=args.batch_size, seeds=range(args.epochs),
        batch_first=False, shuffle=True,
        bucketing=not args.no_bucketing, num_workers=args.workers,
        world_size=r.num_ranks_in_first_stage,
        rank=r.rank_in_stage if r.stage == 0 else 0
    )

    val_loader = val_dataset.get_loader(
        batch_size=args.batch_size, batch_first=False,
        shuffle=True, num_workers=args.workers,
        world_size=r.num_ranks_in_first_stage,
        seeds=range(args.epochs),
        rank=r.rank_in_stage if r.stage == 0 else 0
    )

    # if checkpoint is loaded, start by running validation
    if args.resume:
        assert args.start_epoch > 0
        validate(val_loader, r, args.start_epoch-1)

    # 进行训练,保存checkpoint
    for epoch in range(args.start_epoch, args.epochs):
        if distributed_sampler:
            train_loader.sampler.set_epoch(epoch)
        adjust_learning_rate(optimizer, epoch, args.epochs, r, args.lr_policy)

        # train or run forward pass only for one epoch
        if args.forward_only:
            validate(val_loader, r, epoch)
        else:
            train(train_loader, r, optimizer, epoch)

            # evaluate on validation set
            prec1 = validate(val_loader, r, epoch)
            if r.stage != r.num_stages: prec1 = 0

            # remember best prec@1 and save checkpoint
            best_prec1 = max(prec1, best_prec1)

            should_save_checkpoint = args.checkpoint_dir_not_nfs or r.rank_in_stage == 0
            if args.checkpoint_dir and should_save_checkpoint:
                save_checkpoint({
                    'epoch': epoch + 1,
                    'arch': args.arch,
                    'state_dict': r.state_dict(),
                    'best_prec1': best_prec1,
                    'optimizer' : optimizer.state_dict(),
                    'tokenizer': tokenizer.get_state()
                }, args.checkpoint_dir, r.stage, epoch)

我们先来看看如何加载模型。

3.1 模型文件

模型文件在上文中生成,所以这里加载模型文件,我们以 ../translation/models/gnmt/gpus=4/ 下的模型文件为例。

这里的__init__文件如下:

from .gnmt import GNMTSplit
from .stage0 import Stage0
from .stage1 import Stage1
from .stage2 import Stage2
from .stage3 import Stage3

def arch():
    return "gnmt"

def model(criterion):
    return [
        (Stage0(), ["input0", "input1"], ["out2", "out1"]),
        (Stage1(), ["out2", "input1", "input2", "out1"], ["out3", "out7"]),
        (Stage2(), ["out3", "out7"], ["out8", "out9", "out10"]),
        (Stage3(), ["out8", "out9", "out10"], ["out12"]),
        (criterion, ["out12"], ["loss"])
    ]

def full_model():
    return GNMTSplit()

具体每个 item 的格式如下:

(stage, inputs, outputs)

所以就需要按照这个格式来加载。

3.2 加载

具体加载方法如下:

# create stages of the model
module = importlib.import_module(args.module)
args.arch = module.arch()

得到module如下:

module = {module} <module 'translation.models.gnmt.gpus=4' from '../translation/models/gnmt/gpus=4/__init__.py'>
 GNMTSplit = {type} <class 'translation.models.gnmt.gpus=4.gnmt.GNMTSplit'>
 Stage0 = {type} <class 'translation.models.gnmt.gpus=4.stage0.Stage0'>
 Stage1 = {type} <class 'translation.models.gnmt.gpus=4.stage1.Stage1'>
 Stage2 = {type} <class 'translation.models.gnmt.gpus=4.stage2.Stage2'>
 Stage3 = {type} <class 'translation.models.gnmt.gpus=4.stage3.Stage3'>
 gnmt = {module} <module 'translation.models.gnmt.gpus=4.gnmt' from '../translation/models/gnmt/gpus=4/gnmt.py'>
 stage0 = {module} <module 'translation.models.gnmt.gpus=4.stage0' from '../translation/models/gnmt/gpus=4/stage0.py'>
 stage1 = {module} <module 'translation.models.gnmt.gpus=4.stage1' from '../translation/models/gnmt/gpus=4/stage1.py'>
 stage2 = {module} <module 'translation.models.gnmt.gpus=4.stage2' from '../translation/models/gnmt/gpus=4/stage2.py'>
 stage3 = {module} <module 'translation.models.gnmt.gpus=4.stage3' from '../translation/models/gnmt/gpus=4/stage3.py'>

3.3 构建模型

接下来会依据模块来构建模型。

model = module.model(criterion)

这里 criterion 是 LabelSmoothing() 。

在 model(criterion) 调用之中,会逐一调用 Stage0() ~ Stage3(),构建每个层。

比如Stage3 会调用到 __init__ 函数。

class Stage3(torch.nn.Module):
    def __init__(self):
        super(Stage3, self).__init__()
        self.layer5 = torch.nn.LSTM(2048, 1024)
        self.layer8 = Classifier(1024, 32320)

得到了model,具体如下。

model = {list: 5} 

0 = {tuple: 3}
 0 = {Stage0} Stage0(\n  (layer4): Embedding(32320, 1024, padding_idx=0)\n  (layer5): EmuBidirLSTM(\n    (bidir): LSTM(1024, 1024, bidirectional=True)\n    (layer1): LSTM(1024, 1024)\n    (layer2): LSTM(1024, 1024)\n  )\n  (layer6): Dropout(p=0.2, inplace=False)\n  (layer7): LSTM(2048, 1024)\n  (layer9): Dropout(p=0.2, inplace=False)\n)
 1 = {list: 2} ['input0', 'input1']
 2 = {list: 2} ['out2', 'out1']
 __len__ = {int} 3

1 = {tuple: 3}
 0 = {Stage1} Stage1(\n  (layer6): LSTM(1024, 1024)\n  (layer9): Embedding(32320, 1024, padding_idx=0)\n  (layer11): Dropout(p=0.2, inplace=False)\n  (layer12): LSTM(1024, 1024)\n  (layer15): RecurrentAttention(\n    (rnn): LSTM(1024, 1024)\n    (attn): BahdanauAttention(\n      (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n      (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n      (dropout): Dropout(p=0, inplace=False)\n    )\n    (dropout): Dropout(p=0, inplace=False)\n  )\n)
 1 = {list: 4} ['out2', 'input1', 'input2', 'out1']
 2 = {list: 2} ['out3', 'out7']
 __len__ = {int} 3

2 = {tuple: 3}
 0 = {Stage2} Stage2(\n  (layer7): Dropout(p=0.2, inplace=False)\n  (layer9): LSTM(2048, 1024)\n  (layer11): Dropout(p=0.2, inplace=False)\n  (layer13): LSTM(2048, 1024)\n  (layer16): Dropout(p=0.2, inplace=False)\n)
 1 = {list: 2} ['out3', 'out7']
 2 = {list: 3} ['out8', 'out9', 'out10']
 __len__ = {int} 3

3 = {tuple: 3}
 0 = {Stage3} Stage3(\n  (layer5): LSTM(2048, 1024)\n  (layer8): Classifier(\n    (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n  )\n)
 1 = {list: 3} ['out8', 'out9', 'out10']
 2 = {list: 1} ['out12']
 __len__ = {int} 3

4 = {tuple: 3} (LabelSmoothing(), ['out12'], ['loss'])
 0 = {LabelSmoothing} LabelSmoothing()
 1 = {list: 1} ['out12']
 2 = {list: 1} ['loss']
 __len__ = {int} 3

__len__ = {int} 5

3.4 输入输出

模型加载完之后,开始设置输入和输出,具体逻辑是:

  • 依据参数进行配置
  • 遍历模型的每个层(跳过最后loss层)做如下操作:
    • 遍历每层的输入,构建输入张量。
    • 通过调用stage对应的forward函数,构建出输出。
    • 遍历每层的输出,设置类型。
    • 构建张量形状。

需要注意的是每个子模块的格式如下:

(
Stage0(),
["input0", "input1"], # 输入
["out2", "out1"] # 输出
)

代码注释如下:

# 依据参数进行配置比如输入大小,batch size等
input_size = [args.max_length_train, args.batch_size]
training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
                          "input2": input_size, "target": [args.max_length_train * args.batch_size],
                          "target_length": [args.batch_size]}
dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
          "target": torch.int64, "target_length": torch.int32}
inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
target_tensor_names = {"target", "target_length"}

# 遍历模型的每个层(跳过最后loss层)
for module_id, (stage, inputs, outputs) in enumerate(model[:-1]):  # Skip last layer (loss).
    input_tensors = []
    # 遍历每层的输入,构建输入张量
    for module_input in inputs:
        if module_input in inputs_module_destinations:
            inputs_module_destinations[module_input] = module_id

        input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
                                  dtype=dtypes[module_input]).cuda()
        input_tensors.append(input_tensor)
    stage.cuda()
    # PyTorch should not maintain metadata for a backward pass on
    # synthetic inputs. Without the following line, the runtime is
    # as much as 1.5x slower in a full DP configuration.
    with torch.no_grad():
        # 通过调用stage对应的forward函数,构建出输出
        output_tensors = stage(*tuple(input_tensors))
    if not type(output_tensors) is tuple:
        output_tensors = [output_tensors]  

    # 遍历每层的输出,设置其类型和形状
    for output, output_tensor in zip(outputs,
                                     list(output_tensors)):
        # output 是 ['out2', 'out1']
        training_tensor_shapes[output] = list(output_tensor.size())
        dtypes[output] = output_tensor.dtype

    # 构建输出值张量类型
    eval_tensor_shapes = {}
    for key in training_tensor_shapes:
        eval_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])
        training_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])

得到了输出的形状和类型。

dtypes = {dict: 13}
 'input0' = {dtype} torch.int64
 'input1' = {dtype} torch.int64
 'input2' = {dtype} torch.int64
 'target' = {dtype} torch.int64
 'target_length' = {dtype} torch.int32
 'out2' = {dtype} torch.float32
 'out1' = {dtype} torch.float32
 'out3' = {dtype} torch.float32
 'out7' = {dtype} torch.float32
 'out8' = {dtype} torch.float32
 'out9' = {dtype} torch.float32
 'out10' = {dtype} torch.float32
 'out12' = {dtype} torch.float32
 __len__ = {int} 13

training_tensor_shapes = {dict: 13}
 'input0' = {tuple: 2} (50, 128)
 'input1' = {tuple: 1} 128
 'input2' = {tuple: 2} (50, 128)
 'target' = {tuple: 1} 6400
 'target_length' = {tuple: 1} 128
 'out2' = {tuple: 3} (50, 128, 1024)
 'out1' = {tuple: 3} (50, 128, 1024)
 'out3' = {tuple: 3} (50, 128, 1024)
 'out7' = {tuple: 3} (50, 128, 1024)
 'out8' = {tuple: 3} (50, 128, 1024)
 'out9' = {tuple: 3} (50, 128, 1024)
 'out10' = {tuple: 3} (50, 128, 1024)
 'out12' = {tuple: 3} (50, 128, 32320)
 __len__ = {int} 13

eval_tensor_shapes = {dict: 13} {
 'input0' = {tuple: 2} (50, 128)
 'input1' = {tuple: 1} 128
 'input2' = {tuple: 2} (50, 128)
 'target' = {tuple: 1} 6400
 'target_length' = {tuple: 1} 128
 'out2' = {tuple: 3} (50, 128, 1024)
 'out1' = {tuple: 3} (50, 128, 1024)
 'out3' = {tuple: 3} (50, 128, 1024)
 'out7' = {tuple: 3} (50, 128, 1024)
 'out8' = {tuple: 3} (50, 128, 1024)
 'out9' = {tuple: 3} (50, 128, 1024)
 'out10' = {tuple: 3} (50, 128, 1024)
 'out12' = {tuple: 3} (50, 128, 32320)
 __len__ = {int} 13

3.5 配置

加载上文生成的配置文件。

configuration_maps = {
    'module_to_stage_map': None,
    'stage_to_rank_map': None,
    'stage_to_depth_map': None
}
if args.config_path is not None:
    json_config_file = json.load(open(args.config_path, 'r'))
    configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
    configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
    configuration_maps['stage_to_rank_map'] = {
        int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
    configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)

对应的文件是:

{
    "module_to_stage_map": [0, 1, 2, 3, 3],
    "stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}

得到:

configuration_maps = {dict: 3}
 'module_to_stage_map' = {list: 5} [0, 1, 2, 3, 3]
 'stage_to_rank_map' = {dict: 4} {0: [0], 1: [1], 2: [2], 3: [3]}
 'stage_to_depth_map' = {NoneType} None
 __len__ = {int} 3

为了演示,我们这里用如下参数进行启动 main_with_runtime.py。

--module translation.models.gnmt.gpus=4 --data_dir=wmt16_ende_data_bpe_clean   --config_path pipedream-pipedream/runtime/translation/models/gnmt/gpus=4/mp_conf.json --local_rank 3 --rank 3 --master_addr 127.0.0.1

在main函数中用如下办法构建了Runtime。Runtime是执行引擎,提供一个统一的、可扩展的基础设施层。

r = runtime.StageRuntime(
    model=model, distributed_backend=args.distributed_backend,
    fp16=args.fp16, loss_scale=args.loss_scale,
    training_tensor_shapes=training_tensor_shapes,
    eval_tensor_shapes=eval_tensor_shapes,
    training_tensor_dtypes=dtypes,
    inputs_module_destinations=inputs_module_destinations,
    target_tensor_names=target_tensor_names,
    configuration_maps=configuration_maps,
    master_addr=args.master_addr,
    rank=args.rank, local_rank=args.local_rank,
    num_ranks_in_server=args.num_ranks_in_server,
    verbose_freq=args.verbose_frequency,
    model_type=runtime.TRANSLATION,
    enable_recompute=args.recompute)

4.1 StageRuntime

StageRuntime定义如下,可以看到其主要成员变量为在此stage内部进行前向后向操作所需要的元数据,比如:

张量,梯度,分布式后端,loss scale,训练数据的张量类型,输出值张量形状等等。

class StageRuntime:
    def __init__(self, model, distributed_backend, fp16, loss_scale,
                 training_tensor_shapes, eval_tensor_shapes,
                 training_tensor_dtypes, inputs_module_destinations,
                 target_tensor_names, configuration_maps, master_addr,
                 rank, local_rank, num_ranks_in_server, verbose_freq,
                 model_type, enable_recompute=False):
        # Metadata needed for forward and backward pass within this stage.
        self.tensors = []
        self.gradients = {}
        self.distributed_backend = distributed_backend
        self.fp16 = fp16
        self.loss_scale = loss_scale
        self.training_tensor_shapes = training_tensor_shapes
        self.eval_tensor_shapes = eval_tensor_shapes
        self.training_tensor_dtypes = training_tensor_dtypes
        self.model_type = model_type
        self.target_tensor_names = target_tensor_names

        self.initialize(model, inputs_module_destinations, configuration_maps,
                        master_addr, rank, local_rank, num_ranks_in_server)

        self.verbose_freq = verbose_freq
        self.forward_only = False

        self.forward_stats = runtime_utilities.RuntimeStats(forward=True)
        self.backward_stats = runtime_utilities.RuntimeStats(forward=False)

        # Enable recomputation to prevent the need to save activations
        # computed from the forward pass for the backward pass.
        self.enable_recompute = enable_recompute

        # Disable recomputation for the last stage.
        if rank == num_ranks_in_server - 1:
            self.enable_recompute = False

4.2 初始化

初始化函数代码很长,我们逐段进行分析。

4.2.1 设置tag

在函数开始,遍历模型每一层的输入和输出,设置 tensor_tag,就是给每一个tensor一个独立唯一的tag,tag经过层层传递,最终会用到 distributed_c10d.py 中的 recv 函数。tensor_tag 会在通信过程中被使用,被p2p用作确定标识。

def recv(tensor,
         src=None,
         group=None,
         tag=0):
    """
    Receives a tensor synchronously.

    Args:
        tensor (Tensor): Tensor to fill with received data.
        src (int, optional): Source rank. Will receive from any
            process if unspecified.
        group (ProcessGroup, optional): The process group to work on. If None,
            the default process group will be used.
        tag (int, optional): Tag to match recv with remote send

    Returns:
        Sender rank
        -1, if not part of the group

    """
    _check_single_tensor(tensor, "tensor")
    if _rank_not_in_group(group):
        return -1

    if group is None:
        pg = _get_default_group()
    else:
        pg = group

    if src is None:
        work = pg.recv_anysource([tensor], tag)
        work.wait()
        src_rank = work._source_rank()
        if group is None or group is GroupMember.WORLD:
            return src_rank
        else:
            return _get_global_rank(pg, src_rank)
    else:
        if group is None or group is GroupMember.WORLD:
            pg.recv([tensor], src, tag).wait()
        else:
            group_src_rank = _get_group_rank(pg, src)
            pg.recv([tensor], group_src_rank, tag).wait()
        return src

具体设置 tag 的代码如下:

def initialize(self, model, inputs_module_destinations,
               configuration_maps, master_addr, rank,
               local_rank, num_ranks_in_server):
    self.send_ranks = {}
    self.receive_ranks = {}
    self.rank = rank
    self.local_rank = local_rank
    self.stage = None
    self.tensor_tags = {}
    self.forward_minibatch_id = 0
    self.backward_minibatch_id = 0
    self.criterion_input_name = str(model[-1][1][0])

    tensor_tag = 1
    # 遍历模型中每一层,每一层的格式是 (_, input_tensors, output_tensors)
    for (_, input_tensors, output_tensors) in model:
        # 遍历输入
        for input_tensor in input_tensors:
            if input_tensor not in self.tensor_tags:
                self.tensor_tags[input_tensor] = tensor_tag
                tensor_tag += 1 # 设置 tag
        # 遍历输出
        for output_tensor in output_tensors:
            if output_tensor not in self.tensor_tags:
                self.tensor_tags[output_tensor] = tensor_tag
                tensor_tag += 1 # 设置 tag

    for target_tensor_name in sorted(self.target_tensor_names):
        self.tensor_tags[target_tensor_name] = tensor_tag
        tensor_tag += 1 # 设置 tag
    self.tensor_tags["ack"] = tensor_tag
    tensor_tag += 1 # 设置 tag

输入是:

target_tensor_names = {set: 2} {'target_length', 'target'}
 {str} 'target_length'
 {str} 'target'
 __len__ = {int} 2

model = {list: 5}
  0 = {Stage0} Stage0(\n  (layer4): Embedding(32320, 1024, padding_idx=0)\n  (layer5): EmuBidirLSTM(\n    (bidir): LSTM(1024, 1024, bidirectional=True)\n    (layer1): LSTM(1024, 1024)\n    (layer2): LSTM(1024, 1024)\n  )\n  (layer6): Dropout(p=0.2, inplace=False)\n  (layer7): LS
  1 = {list: 2} ['input0', 'input1']
  2 = {list: 2} ['out2', 'out1']
  __len__ = {int} 3

 1 = {tuple: 3}
  0 = {Stage1} Stage1(\n  (layer6): LSTM(1024, 1024)\n  (layer9): Embedding(32320, 1024, padding_idx=0)\n  (layer11): Dropout(p=0.2, inplace=False)\n  (layer12): LSTM(1024, 1024)\n  (layer15): RecurrentAttention(\n    (rnn): LSTM(1024, 1024)\n    (attn): BahdanauAttention(\n      (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n      (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n      (dropout): Dropout(p=0, inplace=False)\n    )\n    (dropout): Dropout(p=0, inplace=False)\n  )\n)
  1 = {list: 4} ['out2', 'input1', 'input2', 'out1']
  2 = {list: 2} ['out3', 'out7']
  __len__ = {int} 3

 2 = {tuple: 3}
  0 = {Stage2} Stage2(\n  (layer7): Dropout(p=0.2, inplace=False)\n  (layer9): LSTM(2048, 1024)\n  (layer11): Dropout(p=0.2, inplace=False)\n  (layer13): LSTM(2048, 1024)\n  (layer16): Dropout(p=0.2, inplace=False)\n)
  1 = {list: 2} ['out3', 'out7']
  2 = {list: 3} ['out8', 'out9', 'out10']
  __len__ = {int} 3

 3 = {tuple: 3}
  0 = {Stage3} Stage3(\n  (layer5): LSTM(2048, 1024)\n  (layer8): Classifier(\n    (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n  )\n)
  1 = {list: 3} ['out8', 'out9', 'out10']
  2 = {list: 1} ['out12']
  __len__ = {int} 3

 4 = {tuple: 3}
  0 = {LabelSmoothing} LabelSmoothing()
  1 = {list: 1} ['out12']
  2 = {list: 1} ['loss']
  __len__ = {int} 3
 __len__ = {int} 5

得到:

tensor_tags = {dict: 15}
 'input0' = {int} 1
 'input1' = {int} 2
 'out2' = {int} 3
 'out1' = {int} 4
 'input2' = {int} 5
 'out3' = {int} 6
 'out7' = {int} 7
 'out8' = {int} 8
 'out9' = {int} 9
 'out10' = {int} 10
 'out12' = {int} 11
 'loss' = {int} 12
 'target' = {int} 13
 'target_length' = {int} 14
 'ack' = {int} 15
 __len__ = {int} 15

4.2.2 配置map

回忆一下配置文件中的部分定义:

  • module_to_stage_map 就是 :本模型被划分为哪些stage。
  • stage_to_rank_map 就是 :每个stage对应了哪些rank,rank 就代表了具体的 worker 进程,比如本stage被几个rank进行数据并行。

我们给出一个样例,对应的文件内容如下:

{
    "module_to_stage_map": [0, 1, 2, 2],
    "stage_to_rank_map": {"0": [0, 1, 4, 5, 8, 9, 12, 13], "1": [2, 6, 10, 14], "2": [3, 7, 11, 15]}
}

针对我们本文的模型,配置文件如下:

{
    "module_to_stage_map": [0, 1, 2, 3, 3],
    "stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}

加载到内存中为:

module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
rank_to_stage_map = {dict: 4} {0: 0, 1: 1, 2: 2, 3: 3}

因为有时候也需要反过来查找,所以程序接下来进行反向配置,得到如下。

stage_to_module_map = {defaultdict: 4}
 default_factory = {type} <class 'list'>
 0 = {list: 1} [0]
 1 = {list: 1} [1]
 2 = {list: 1} [2]
 3 = {list: 2} [3, 4]
 __len__ = {int} 4
stage_to_rank_map = {dict: 4}
 0 = {list: 1} [0]
 1 = {list: 1} [1]
 2 = {list: 1} [2]
 3 = {list: 1} [3]
 __len__ = {int} 4

4.2.3 找到自己的配置

因为在命令行设置了本地的 local_rank 和 rank,所以接下来runtime从配置文件中依据rank找到自己的东西,对自己进一步做配置。

stage_to_module_map = collections.defaultdict(list)
for module in range(len(module_to_stage_map)):
    # 这里配置了哪个stage拥有哪些module
    stage_to_module_map[module_to_stage_map[module]].append(module)

rank_to_stage_map = {}
for stage in stage_to_rank_map:
    for rank in stage_to_rank_map[stage]:
      # 配置了哪个 rank 拥有哪些 stage
        rank_to_stage_map[rank] = stage

# Now, use this mapping to determine the modules contained in
# each stage.
assert 0 <= self.rank < len(rank_to_stage_map)
self.num_ranks = len(rank_to_stage_map) # 就是得到了world_size,因为有多少个rank,就是有多少个训练进程,就是world size
self.num_stages = len(stage_to_module_map) # 多少个阶段
self.stage = rank_to_stage_map[self.rank] # 通过自己的rank得到自己的stage
self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank)  # 本rank在stage之中排第几个
self.num_ranks_in_stage = len(stage_to_rank_map[self.stage])#得到自己stage的rank数目,就是数据并行数目,可以得到本层的数据并行次数
self.num_ranks_in_first_stage = len(stage_to_rank_map[0])
self.num_ranks_in_previous_stage = 0
self.ranks_in_previous_stage = []
if self.stage > 0:
    self.num_ranks_in_previous_stage = len(
        stage_to_rank_map[self.stage - 1])
    self.ranks_in_previous_stage = stage_to_rank_map[self.stage - 1]
self.num_ranks_in_next_stage = 0
self.ranks_in_next_stage = []
if self.stage < self.num_stages - 1:
    self.num_ranks_in_next_stage = len(
        stage_to_rank_map[self.stage + 1])
    self.ranks_in_next_stage = stage_to_rank_map[self.stage + 1]

modules = stage_to_module_map[self.stage] # 这里得到 [3,4],后续会用到。

self.modules_with_dependencies = ModulesWithDependencies(
    [model[module] for module in modules])
self.is_criterion = self.stage == (self.num_stages - 1)
if stage_to_depth_map is not None:
    self.num_warmup_minibatches = stage_to_depth_map[
        str(self.stage)]
else:
    self.num_warmup_minibatches = self.num_ranks - 1
    for i in range(self.stage):
        self.num_warmup_minibatches -= len(
            stage_to_rank_map[i])
    self.num_warmup_minibatches = self.num_warmup_minibatches // \
        self.num_ranks_in_stage

变量为:

self = {StageRuntime}
 backward_minibatch_id = {int} 0
 criterion_input_name = {str} 'out12'
 distributed_backend = {NoneType} None
 eval_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), '
 forward_minibatch_id = {int} 0
 fp16 = {bool} False
 gradients = {dict: 0} {}
 is_criterion = {bool} True
 local_rank = {int} 3
 loss_scale = {int} 1
 model_type = {str} 'translation'
 modules_with_dependencies = {ModulesWithDependencies}
  _all_input_names = {list: 2} [['out8', 'out9', 'out10'], ['out12']]
  _all_output_names = {list: 2} [['out12'], ['loss']]
  _modules = {list: 2}
   0 = {Stage3} Stage3(\n  (layer5): LSTM(2048, 1024)\n  (layer8): Classifier(\n    (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n  )\n)
   1 = {LabelSmoothing} LabelSmoothing()
   __len__ = {int} 2
 num_ranks = {int} 4
 num_ranks_in_first_stage = {int} 1
 num_ranks_in_next_stage = {int} 0
 num_ranks_in_previous_stage = {int} 1
 num_ranks_in_stage = {int} 1
 num_stages = {int} 4
 num_warmup_minibatches = {int} 0
 rank = {int} 3
 rank_in_stage = {int} 0
 ranks_in_next_stage = {list: 0} []
 ranks_in_previous_stage = {list: 1} [2]
 receive_ranks = {dict: 0} {}
 send_ranks = {dict: 0} {}
 stage = {int} 3
 target = {str} 'python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 178, in _getPyDictionary\n    attr = getattr(var, n)\n  File "../runtime.py", line 295, in target\n    r
 target_tensor_names = {set: 2} {'target', 'target_length'}
 tensor_tags = {dict: 15} {'input0': 1, 'input1': 2, 'out2': 3, 'out1': 4, 'input2': 5, 'out3': 6, 'out7': 7, 'out8': 8, 'out9': 9, 'out10': 10, 'out12': 11, 'loss': 12, 'target': 13, 'target_length': 14, 'ack': 15}
 tensors = {list: 0} []
 training_tensor_dtypes = {dict: 13} {'input0': torch.int64, 'input1': torch.int64, 'input2': torch.int64, 'target': torch.int64, 'target_length': torch.int32, 'out2': torch.float32, 'out1': torch.float32, 'out3': torch.float32, 'out7': torch.float32, 'out8': torch.float32, 'out9': torch.floa
 training_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), '

我们看看几个变量如何使用。

4.2.3.1 num_ranks

首先,看看 num_ranks 如何使用。在后续代码中有使用,比如:

world_size=self.num_ranks # 依据 num_ranks 得到 world_size

self.num_warmup_minibatches = self.num_ranks - 1 # 依据 num_ranks 得到热身batch数目
4.2.3.2 rank_in_stage

其次,再看看 rank_in_stage 如何使用?

前面有

self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank)  # 本rank在stage之中排第几个

rank_in_stage 会传递给 Comm 模块。

            self.comm_handler.initialize(
                self.receive_ranks,
                self.send_ranks,
                self.tensor_tags,
                self.target_tensor_names,
                self.training_tensor_dtypes,
                self.rank_in_stage, # 在这里作为参数传入,在函数里面代表本节点,后续会详细介绍
                self.num_ranks_in_stage,
                self.ranks_in_previous_stage,
                self.ranks_in_next_stage)

4.2.4 设置通信模块

接下来对通信模块进行配置。

# To determine where tensors should be sent and received, first
# determine the "producing" and "consuming" module IDs of each
# tensor. We then use the corresponding machine ranks to send
# and receive tensors.
master_port = 12345
self.comm_handler = communication.CommunicationHandler(
    master_addr=master_addr,
    master_port=master_port,
    rank=self.rank,
    local_rank=self.local_rank,
    num_ranks_in_server=num_ranks_in_server,
    world_size=self.num_ranks,
    fp16=self.fp16,
    backend=self.distributed_backend)

配置代码如下,构建了 CommunicationHandler,这个模块是为后续“设置生产者和消费者”服务的,所以我们暂时把后续代码一起放在这里。

        else:
            ......
            # To determine where tensors should be sent and received, first
            # determine the "producing" and "consuming" module IDs of each
            # tensor. We then use the corresponding machine ranks to send
            # and receive tensors.
            master_port = 12345
            self.comm_handler = communication.CommunicationHandler(
                master_addr=master_addr,
                master_port=master_port,
                rank=self.rank,
                local_rank=self.local_rank,
                num_ranks_in_server=num_ranks_in_server,
                world_size=self.num_ranks,
                fp16=self.fp16,
                backend=self.distributed_backend)

            # 设置生产者和消费者部分,我们下面会详细分析
            # 设置接受ranks
            for i in range(len(model)): # 遍历层
                for j in range(i+1, len(model)): # 遍历 i 层之后的若干层
                    for tensor_name in model[i][2]:  # 找出前面层 output 的tensor
                        if tensor_name in model[j][1]: # 看看 output 在不在input之中
                            if module_to_stage_map[i] == \
                                module_to_stage_map[j]:
                                continue
                            # For now, assume that each stage is served by only
                            # a single machine.
                            if module_to_stage_map[j] == self.stage:
                                self.receive_ranks[tensor_name] = \
                                    stage_to_rank_map[module_to_stage_map[i]]
                            if module_to_stage_map[i] == self.stage:
                                self.send_ranks[tensor_name] = \
                                    stage_to_rank_map[module_to_stage_map[j]]

            # 设置发送ranks
            for model_inputs in inputs_module_destinations.keys():
                destination_stage = module_to_stage_map[
                    inputs_module_destinations[model_inputs]]

                if destination_stage > self.stage:
                    self.send_ranks[model_inputs] = \
                        self.ranks_in_next_stage

                if 0 < self.stage <= destination_stage:
                    self.receive_ranks[model_inputs] = \
                        self.ranks_in_previous_stage

                if destination_stage > 0:
                    if model_inputs not in self.tensor_tags:
                        self.tensor_tags[model_inputs] = tensor_tag
                        tensor_tag += 1

4.2.5 设置生产者和消费者

接下来对发送,接受的rank进行配置,receive_ranks 和 send_ranks 就是在本阶段各个张量对应的发送,接收目标 rank。

前面已经提到,在 PipeDream开发时候,PyTorch 并没有发布稳定的RPC,所以 PipeDream (2019年发布论文)只能自己实现一套通信逻辑关系,或者说是分布式计算图。生产者和消费者就是分布式计算图的重要组成部分。

逻辑抽象如下:

  • 遍历模型的 model,假定是 model [i],注意,这里的 model[i] 是具体的 layer。一个stage可以包括多个layer,比如 [layer1, layer 2, layer3],这个stage又可以在多个rank上进行数据并行,比如 rank 1 和 rank 2 都会运行 [layer1, layer 2, layer3]。

  • 对于每个model [i],遍历model [i] 之后的model,假定是 model [j]。

  • 对于model [i] 的输出进行遍历,假定是 tensor_name。

    • 如果 tensor_name 也在 modle[j] 的输入之中,即 tensor_name即在 model[i] 的输出,也在 module[j]的输入,就说明他们之间可以建立联系。因为如果一个 张量只有输入或者只有输出,就不需要为这个张量建立任何 通信机制。

      • 如果 model[i] 和 modle[j] 在同一个stage 之中,就是同一个节点 或者 若干节点但是用 DDP 控制,这样就用不到 通信机制。
      • 如果 tensor_name 是 modle[j]的输入,且module[j] 位于本节点上,说明本节点的 receive_ranks 就包括 module[j] 的输入(当然也可能包括其他model的输入)。
        • 所以tensor_name的输入rank包括model[j] 对应的rank。
      • tensor_name 是module[i] 的输出,且 module[i] 位于本节点上,说明 本节点的 send_ranks 就包括 module[i] 的输出(当然也可能包括其他model的输出)。
        • 所以tensor_name的输出rank包括 model[i] 对应的rank。

具体代码如下:

            # To determine where tensors should be sent and received, first
            # determine the "producing" and "consuming" module IDs of each
            # tensor. We then use the corresponding machine ranks to send
            # and receive tensors.

            for i in range(len(model)): # 遍历层
                for j in range(i+1, len(model)): # 遍历 i 层之后的若干层
                    for tensor_name in model[i][2]: # 找出前面层 output 的tensor
                        if tensor_name in model[j][1]: # 看看 tensor_name 在不在input之中,即tensor_name 是不是 modle[j]的输入
                            # tensor_name即在 model[i] 的输出,也在 module[j]的输入,就说明他们之间可以建立联系
                            if module_to_stage_map[i] == \
                                module_to_stage_map[j]: # 两个module在一个node上,不用通信机制
                                continue
                            # For now, assume that each stage is served by only
                            # a single machine.
                            # tensor_name 是 modle[j]的输入,且module[j]位于本节点上,说明可以和本节点的 receive_ranks 建立联系
                            if module_to_stage_map[j] == self.stage:
                                # 所以tensor_name的输入rank包括rank i
                                self.receive_ranks[tensor_name] = \
                                    stage_to_rank_map[module_to_stage_map[i]]
                            # tensor_name 是module[i]的输出,且module[i]位于本节点上,说明可以和本节点的 send_ranks 建立联系
                            if module_to_stage_map[i] == self.stage:
                                # 所以tensor_name的输出rank包括rank j
                                self.send_ranks[tensor_name] = \
                                    stage_to_rank_map[module_to_stage_map[j]]

            for model_inputs in inputs_module_destinations.keys():
                destination_stage = module_to_stage_map[
                    inputs_module_destinations[model_inputs]]
                if destination_stage > self.stage:
                    self.send_ranks[model_inputs] = \
                        self.ranks_in_next_stage

                if 0 < self.stage <= destination_stage:
                    self.receive_ranks[model_inputs] = \
                        self.ranks_in_previous_stage

                if destination_stage > 0:
                    if model_inputs not in self.tensor_tags:
                        self.tensor_tags[model_inputs] = tensor_tag
                        tensor_tag += 1

得到变量如下:

num_ranks = {int} 4
num_ranks_in_first_stage = {int} 1
num_ranks_in_next_stage = {int} 0
num_ranks_in_previous_stage = {int} 1
num_ranks_in_stage = {int} 1
num_stages = {int} 4
num_warmup_minibatches = {int} 0
rank = {int} 3
rank_in_stage = {int} 0
ranks_in_next_stage = {list: 0} []
ranks_in_previous_stage = {list: 1} [2]
receive_ranks = {dict: 3}  # 这里就是每个tensor对应的接收目标rank
 'out8' = {list: 1} [2]
 'out9' = {list: 1} [2]
 'out10' = {list: 1} [2]
 __len__ = {int} 3
send_ranks = {dict: 0} {} # 这里就是每个tensor对应的发送目标rank
 __len__ = {int} 0
stage = {int} 3

4.2.6 设置module

接下来会处理module相关操作,这里具体会:

  • 首先使用 ModulesWithDependencies 对模型进行继续处理,把输入,输出配置出来。
  • 然后调用 cuda 把模型和参数移动到 GPU。
  • 如果需要进行处理,针对 fp16 进行转换。

关于 ModulesWithDependencies 部分,我们重点说明。

之前我们代码中有如下,就是得倒本stage对应的modules index。

modules = stage_to_module_map[self.stage] # 这里得到 [3,4],后续会用到。

stage_to_module_map 就是设置 stage 到 modules 的关系,目的是为了得到本stage所对应的modules。

回忆一下配置文件,本stage(数值为 3)对应的是 index 为 3,4 的两个 module,就是下面的 3 ,3

module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]

接下来要通过如下代码拿到本stage具体的modules,包括每个module的输入,输出。

    modules = self.modules_with_dependencies.modules()
    for i in range(len(modules)):
        modules[i] = modules[i].cuda()
        if self.fp16:
            import apex.fp16_utils as fp16_utils
            modules[i] = fp16_utils.BN_convert_float(modules[i].half())

运行之后如下

modules = {list: 2}
 0 = {Stage3} Stage3(\n  (layer5): LSTM(2048, 1024)\n  (layer8): Classifier(\n    (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n  )\n)
 1 = {LabelSmoothing} LabelSmoothing()
 __len__ = {int} 2

具体 ModulesWithDependencies 如下:

class ModulesWithDependencies:
    def __init__(self, modules_with_dependencies):
        self._modules = []
        self._all_input_names = []
        self._all_output_names = []
        for (module, input_names, output_names) in modules_with_dependencies:
            self._modules.append(module)
            self._all_input_names.append(input_names)
            self._all_output_names.append(output_names)

    def modules(self):
        return self._modules

    def all_input_names(self):
        return self._all_input_names

    def all_output_names(self):
        return self._all_output_names

    def is_input_tensor(self, tensor_name):
        for module_input_names in self._all_input_names:
            if tensor_name in module_input_names:
                return True
        return False

4.2.7 设置group

接下来针对每个stage的并行数目,建立group。

ranks就是每个stage的并行 rank,比如 stage 0 对应的就是 [0, 1, 2]。

{
    "module_to_stage_map": [0, 1, 1],
    "stage_to_rank_map": {"0": [0, 1, 2], "1": [3]} # 每个stage的rank,这里目的是得到并行的机器
}

遍历stage,针对每个stage,调用new_group() 建立进程组。new_group() 函数使用所有进程的任意子集来创建新的进程组,该方法返回一个分组句柄,可作为 collectives (用于特定编程模式中的信息交换)相关分布式函数的 group 参数 。

这里就是最开始问题中提到的:为了数据并行,每个stage都需要建立并且管理自己的进程组。

        # Initialize all groups in the same order on every worker.
        if stage_to_rank_map is not None:
            groups = []
            for stage in range(self.num_stages): # 遍历stage
                ranks = stage_to_rank_map[stage] # 与stage的数据并行对应,比如得到 [0, 1, 2]
                if len(ranks) > 1: # 与后面的 ddp 相对应
                    groups.append(dist.new_group(ranks=ranks))
                else:
                    groups.append(None)
            group = groups[self.stage]
        else:
            group = None

4.2.8 设置数据并行

最后调用 DistributedDataParallel 进行处理。这里参数 process_group=group 就是前面 “设定group” 返回的。

就是针对每一个group建立一套 DistributedDataParallel。

# self.modules_with_dependencies contains a list of PyTorch
# modules, along with a list of user-defined input and output
# tensor names. We use our module_executor.ModuleExecutor
# class to wrap these dependencies, and use run_forward and
# run_backward methods downstream.
num_parameters = 0
for i in range(len(modules)):
    if group is not None:
        if ((i < (len(modules)-1) and self.is_criterion)
            or not self.is_criterion):
            num_parameters += \
                sum(x.size()[0] * x.size()[1]
                    if len(x.size()) > 1 else x.size()[0]
                    for x in modules[i].parameters() if x.size())

            # 建立分布式数据并行
            modules[i] = torch.nn.parallel.DistributedDataParallel(
                modules[i],
                process_group=group,
                device_ids=[local_rank],
                output_device=local_rank)
if self.num_ranks_in_stage > 1:
    module_size = 4. * num_parameters
    print("Replicating stage: ranks=%d, module_size=%.3f" % (
        self.num_ranks_in_stage, module_size))

关于 DistributedDataParallel,我们以后有专门系列会进行分析。

4.2.9 初始化通信函数

最后,针对这个通信模块,进行初始化。

        if self.comm_handler is not None:
            self.comm_handler.initialize(
                self.receive_ranks,
                self.send_ranks,
                self.tensor_tags,
                self.target_tensor_names,
                self.training_tensor_dtypes,
                self.rank_in_stage,
                self.num_ranks_in_stage,
                self.ranks_in_previous_stage,
                self.ranks_in_next_stage)

我们还是使用论文中的图片为例来看看运行时引擎初始化之后的结果:

如果针对本文再细化,则是:

                                         +----------------------------------------+
                                         | Stage 2                   StageRuntime |
                                         |                                        |
                                         |           CommunicationHandler         |
                                         |                                        |
                                         |      +----------------------------+    |
                                         |      | +------------------------+ |    |
                                         |      | |Rank 2                  | |    |
                                         |      | |                        | |    |
                                         |      | |                        | |    |
+-----------------------------+          |      | |  Layer 3 +---> Layer 4 | |    |
| Stage 1        StageRuntime |          |      | |                        | |    |       +---------------------------+
|                             |          |      | |                        | |    |       | Stage 3      StageRuntime |
|                             |          |      | +------------------------+ |    |       |                           |
|     CommunicationHandler    |          |      | +------------------------+ |    |       |   CommunicationHandler    |
|                             |          |      | |Rank 3                  | |    |       |                           |
|  +-----------------------+  |          | DDP  | |                        | |    |       | +-----------------------+ |
|  |Rank 1                 |  +---------------->+ |                        | +----------> | | Rank 4                | |
|  |                       |  |          |      | |  Layer 3 +---> Layer 4 | |    |       | |                       | |
|  | Layer 1 +---> Layer 2 |  |          |      | |                        | |    |       | | Layer 5 +---> Layer 6 | |
|  |                       |  |          |      | |                        | |    |       | |                       | |
|  |                       |  |          |      | +------------------------+ |    |       | |                       | |
|  +-----------------------+  |          |      | +------------------------+ |    |       | +-----------------------+ |
|                             |          |      | |Rank 4                  | |    |       |                           |
|                             |          |      | |                        | |    |       |                           |
+-----------------------------+          |      | |                        | |    |       +---------------------------+
                                         |      | |  Layer 3 +---> Layer 4 | |    |
                                         |      | |                        | |    |
                                         |      | |                        | |    |
                                         |      | +------------------------+ |    |
                                         |      +----------------------------+    |
                                         +----------------------------------------+

手机如下:

4.3 功能函数

我们这里只是介绍基础功能函数。另外有几个业务功能函数,比如 run_forward 会在1F1B文章中一并介绍。

以下这几个功能函数都是调用通讯模块完成功能。

4.3.1 receive_tensors_forward

receive_tensors_forward 就是在前向传播中,从前面层获取张量。

前向传播中,张量记录在本实例的 self.tensors 之中。

    def receive_tensors_forward(self):
        if self.forward_only and len(self.tensors) > 0:
            self.tensors.pop(0) # 弹出以前
        self.tensors.append({})
        if self.loader_iter is not None: # 前向传播第一层,需要加载数据
            input = next(self.loader_iter) # 加载新的
            if self.model_type == TRANSLATION:
                (input, target) = input
                src, src_length = input
                tgt, tgt_length = target

                self.tensors[-1]["input0"] = src.cuda(non_blocking=True)
                self.tensors[-1]["input1"] = torch.LongTensor(src_length).cuda(
                    non_blocking=True)
                self.tensors[-1]["input2"] = tgt[:-1].cuda(non_blocking=True)
                self.tensors[-1]["target"] = tgt[1:].cuda().contiguous().view(-1)
                self.tensors[-1]["target_length"] = \
                    torch.tensor([int(sum(torch.LongTensor(tgt_length) - 1))],
                                 dtype=torch.int).cuda()
            elif self.model_type == IMAGE_CLASSIFICATION:
                (input, target) = input
                if self.fp16:
                    input = input.half()
                self.tensors[-1]["input0"] = input.cuda(non_blocking=True)
                self.tensors[-1]["target"] = target.cuda(non_blocking=True)
            elif self.model_type == SPEECH_TO_TEXT:
                input, target, input_percentages, target_sizes = input
                input_sizes = input_percentages.mul_(int(input.size(3))).int()
                self.tensors[-1]["input0"] = input.cuda(non_blocking=True)
                self.tensors[-1]["input1"] = input_sizes.cuda(non_blocking=True)
                self.tensors[-1]["target"] = target.cuda(non_blocking=True)
                self.tensors[-1]["target_length"] = target_sizes.cuda(
                    non_blocking=True)
        else:
            # Receive all required tensors from upstream machines.
            for input_name in self.receive_ranks: # 遍历本stage对应的接受rank,从前面层获取
                if input_name == "ack":
                    continue

                self.tensors[-1][input_name] = \
                    self.comm_handler.recv(
                        input_name,
                        forward_minibatch_id=self.forward_minibatch_id,
                        backward_minibatch_id=self.backward_minibatch_id,
                        backward=False)

                self.forward_stats.stats['receive_tensors_size'] += \
                    (self.tensors[-1][input_name].element_size() *
                     self.tensors[-1][input_name].nelement())

            # Used to track where to receive forward from.
            self.comm_handler.increment_messaging_index(
                sending=False)

4.3.2 send_tensors_forward

send_tensors_forward就是在前向传播中,向后面层发送张量。

    def send_tensors_forward(self):
        # Send all required tensors downstream.
        for output_name in self.send_ranks:  # 遍历本stage对应的发送rank,进行发送
            if output_name == "ack":
                continue

            self.comm_handler.send(
                output_name,
                self.tensors[-1][output_name],
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=False)

            self.forward_stats.stats['send_tensors_size'] += \
                (self.tensors[-1][output_name].element_size() *
                 self.tensors[-1][output_name].nelement())

4.3.3 receive_tensors_backward

后向传播中,梯度保存在 self.gradients。

receive_tensors_backward 就是在后向传播中,从前面层获取张量。

注意,这里对应的是self.send_ranks,就是前向过程中的发送rank,它们在反向过程中就是接受rank

    def receive_tensors_backward(self):
        # Receive all required gradients from downstream
        # machines.
        for output_name in self.send_ranks: # 遍历本stage对应的发送rank(前向),进行接受
             if output_name in self.target_tensor_names:
                continue

             # 获取梯度
             self.gradients[output_name] = \
                self.comm_handler.recv(
                    output_name,
                    forward_minibatch_id=self.forward_minibatch_id,
                    backward_minibatch_id=self.backward_minibatch_id,
                    backward=True)

             self.backward_stats.stats['receive_tensors_size'] += \
                 (self.gradients[output_name].element_size() *
                  self.gradients[output_name].nelement())

4.3.4 send_tensors_backward

后向传播中,梯度保存在 self.gradients。

send_tensors_forward就是在后向传播中,向后面层发送梯度张量。

注意,这里对应的是self.receive_ranks,就是前向过程中的接受rank,它们在反向过程中就是发送rank

    def send_tensors_backward(self):
        # Send all required gradients upstream.
        for input_name in self.receive_ranks: # 遍历本stage对应的接受rank,进行发送
            if input_name in self.target_tensor_names:
                continue

            self.comm_handler.send(
                input_name,
                self.gradients[input_name],
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=True)

            self.backward_stats.stats['send_tensors_size'] += \
                (self.gradients[input_name].element_size() *
                 self.gradients[input_name].nelement())

        if self.num_ranks_in_previous_stage > 0:
            # Used to track where to send tensors in the
            # backward pass.
            self.comm_handler.increment_messaging_index(
                sending=True)

4.3.5 run_ack

run_ack就是在传播中,给前面层,后面层回应一个确认。

    def run_ack(self):
        # No need for ack if running on a single worker.
        if self.rank is None:
            return

        # Receive ack from next stage. Send ack to previous stage.
        if self.stage < (self.num_stages-1):
            self.comm_handler.recv(
                "ack",
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=True)
        if self.stage > 0:
            self.comm_handler.send(
                "ack",
                torch.zeros(self.tensor_shapes["ack"],
                            dtype=torch.int64).cuda(),
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=True)

            # Used to track where to receive forward from.
            self.comm_handler.increment_messaging_index(sending=True)

        self.backward_minibatch_id += 1

至此,运行时引擎我们介绍完毕其静态信息和初始化,下一篇我们介绍通信模块。

https://pytorch.org/docs/stable/rpc.html

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章