13.5. 多GPU训练
在 Colab 中打开 Notebook
在 Colab 中打开 Notebook
在 Colab 中打开 Notebook
在 Colab 中打开 Notebook
在 SageMaker Studio Lab 中打开 Notebook

到目前为止,我们讨论了如何在CPU和GPU上高效地训练模型。在 13.3节 中,我们甚至展示了深度学习框架如何在CPU和GPU之间自动并行化计算和通信。我们还在 6.7节 中展示了如何使用 nvidia-smi 命令列出计算机上所有可用的GPU。但我们没有讨论的是如何真正地并行化深度学习训练。相反,我们只是简单地引用了这样一个观点:以某种方式在多个设备之间拆分数据。本节将详细介绍如何从零开始并行地训练网络。有关如何利用高级API中功能的详细信息,请参阅 13.6节。我们假设你熟悉小批量随机梯度下降算法,例如 12.5节 中描述的那些算法。

13.5.1. 问题拆分

让我们从一个简单的计算机视觉问题和一个略微过时的网络开始。例如,网络具有多个卷积层和池化层,最后可能有几个全连接层。也就是说,我们从一个看起来很像LeNet (LeCun et al., 1998) 或AlexNet (Krizhevsky et al., 2012)的网络开始。假设有多个GPU(如果是台式服务器,则为2个;在AWS g4dn.12xlarge实例上为4个;在p3.16xlarge上为8个;在p2.16xlarge上为16个),我们希望以一种方式划分训练,以便获得良好的加速,同时受益于简单且可复现的设计选择。毕竟,多个GPU可以同时增加内存计算能力。简而言之,对于一个我们想要分类的小批量训练数据,我们有以下选择。

首先,我们可以将网络划分到多个GPU上。也就是说,每个GPU接收流入特定层的数据,处理多个后续层的数据,然后将数据发送到下一个GPU。与单个GPU可以处理的数据相比,这使我们能够处理具有更大网络的模型。此外,每个GPU的内存占用可以得到很好的控制(它是总网络内存占用的一部分)。

然而,层(以及GPU)之间的接口需要紧密的同步。这可能很棘手,特别是如果层之间的计算工作负载没有正确匹配。对于大量的GPU,这个问题会更加严重。层之间的接口也需要大量的数据传输,例如激活和梯度。这可能会压垮GPU总线的带宽。此外,计算密集型但顺序的操作很难划分。这方面最好的尝试可以参见 Mirhoseini et al. (2017)。这仍然是一个难题,并且不清楚在重要问题上是否能实现良好的(线性)扩展。除非有非常好的框架或操作系统支持将多个GPU链接在一起,否则我们不推荐这种方式。

其次,我们可以按层拆分工作。例如,我们可以在4个GPU上拆分问题,每个GPU为16个通道生成数据,而不是在单个GPU上计算64个通道。同样,对于全连接层,我们可以拆分输出单元的数量。图 13.5.1(取自 Krizhevsky et al. (2012))说明了这种设计,当时使用这种策略来处理GPU内存非常小(当时为2GB)的问题。只要通道(或单元)的数量不是太小,这就可以在计算方面实现良好的扩展。此外,多个GPU可以处理越来越大的网络,因为可用内存呈线性扩展。

../_images/alexnet-original.svg

图 13.5.1 由于GPU内存有限,原始AlexNet设计中的模型并行。

但是,我们需要*非常大量*的同步或屏障操作,因为每个层都依赖于所有其他层的结果。此外,需要传输的数据量可能比在GPU之间分配层时更大。因此,由于其带宽成本和复杂性,我们不推荐这种方法。

最后,我们可以在多个GPU之间划分数据。这样,所有GPU都执行相同类型的工作,只是处理不同的观测数据。在每个小批量训练数据之后,梯度会在所有GPU上进行聚合。这是最简单的方法,可以在任何情况下应用。我们只需要在每个小批量之后进行同步。也就是说,在计算其他梯度的同时开始交换梯度参数是非常可取的。此外,更多的GPU会导致更大的小批量大小,从而提高训练效率。但是,增加更多的GPU并不能让我们训练更大的模型。

../_images/splitting.svg

图 13.5.2 在多个GPU上进行并行化。从左到右:原始问题、网络划分、逐层划分、数据并行。

图 13.5.2 描述了在多个GPU上进行并行的不同方式的比较。总的来说,数据并行是最方便的方法,前提是我们能够使用具有足够大内存的GPU。关于分布式训练的分区详细描述,另请参阅 (Li et al., 2014)。在深度学习的早期,GPU内存曾经是一个问题。到现在,除了极少数特殊情况外,这个问题已经得到了解决。在下文中,我们将重点关注数据并行。

13.5.2. 数据并行

假设一台机器上有 \(k\) 个GPU。给定要训练的模型,每个GPU将独立维护一套完整的模型参数,尽管不同GPU上的参数值是相同且同步的。例如,图 13.5.3 说明了当 \(k=2\) 时使用数据并行进行训练的过程。

../_images/data-parallel.svg

图 13.5.3 在两个GPU上使用数据并行计算小批量随机梯度下降。

通常,训练过程如下:

  • 在训练的任何一次迭代中,给定一个随机的小批量数据,我们将其中的样本分成 \(k\) 部分,并均匀地分发到各个GPU上。

  • 每个GPU根据其分配到的小批量子集,计算模型参数的损失和梯度。

  • \(k\) 个GPU的局部梯度聚合起来,以获得当前的小批量随机梯度。

  • 将聚合后的梯度重新分发到每个GPU。

  • 每个GPU使用这个小批量随机梯度来更新它所维护的整套模型参数。

请注意,在实践中,当在 \(k\) 个GPU上进行训练时,我们会将小批量大小*增加* \(k\) 倍,这样每个GPU的工作量就和只在单个GPU上训练时一样。在一台16-GPU的服务器上,这会显著增加小批量大小,我们可能需要相应地提高学习率。另外请注意,8.5节 中的批量规范化需要进行调整,例如,为每个GPU保留一个单独的批量规范化系数。接下来,我们将使用一个简单的网络来说明多GPU训练。

%matplotlib inline
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
%matplotlib inline
from mxnet import autograd, gluon, np, npx
from d2l import mxnet as d2l

npx.set_np()

13.5.3. 一个简单的网络

我们使用在 7.6节 中介绍的LeNet(稍作修改)。我们从头开始定义它,以详细说明参数交换和同步。

# Initialize model parameters
scale = 0.01
W1 = torch.randn(size=(20, 1, 3, 3)) * scale
b1 = torch.zeros(20)
W2 = torch.randn(size=(50, 20, 5, 5)) * scale
b2 = torch.zeros(50)
W3 = torch.randn(size=(800, 128)) * scale
b3 = torch.zeros(128)
W4 = torch.randn(size=(128, 10)) * scale
b4 = torch.zeros(10)
params = [W1, b1, W2, b2, W3, b3, W4, b4]

# Define the model
def lenet(X, params):
    h1_conv = F.conv2d(input=X, weight=params[0], bias=params[1])
    h1_activation = F.relu(h1_conv)
    h1 = F.avg_pool2d(input=h1_activation, kernel_size=(2, 2), stride=(2, 2))
    h2_conv = F.conv2d(input=h1, weight=params[2], bias=params[3])
    h2_activation = F.relu(h2_conv)
    h2 = F.avg_pool2d(input=h2_activation, kernel_size=(2, 2), stride=(2, 2))
    h2 = h2.reshape(h2.shape[0], -1)
    h3_linear = torch.mm(h2, params[4]) + params[5]
    h3 = F.relu(h3_linear)
    y_hat = torch.mm(h3, params[6]) + params[7]
    return y_hat

# Cross-entropy loss function
loss = nn.CrossEntropyLoss(reduction='none')
# Initialize model parameters
scale = 0.01
W1 = np.random.normal(scale=scale, size=(20, 1, 3, 3))
b1 = np.zeros(20)
W2 = np.random.normal(scale=scale, size=(50, 20, 5, 5))
b2 = np.zeros(50)
W3 = np.random.normal(scale=scale, size=(800, 128))
b3 = np.zeros(128)
W4 = np.random.normal(scale=scale, size=(128, 10))
b4 = np.zeros(10)
params = [W1, b1, W2, b2, W3, b3, W4, b4]

# Define the model
def lenet(X, params):
    h1_conv = npx.convolution(data=X, weight=params[0], bias=params[1],
                              kernel=(3, 3), num_filter=20)
    h1_activation = npx.relu(h1_conv)
    h1 = npx.pooling(data=h1_activation, pool_type='avg', kernel=(2, 2),
                     stride=(2, 2))
    h2_conv = npx.convolution(data=h1, weight=params[2], bias=params[3],
                              kernel=(5, 5), num_filter=50)
    h2_activation = npx.relu(h2_conv)
    h2 = npx.pooling(data=h2_activation, pool_type='avg', kernel=(2, 2),
                     stride=(2, 2))
    h2 = h2.reshape(h2.shape[0], -1)
    h3_linear = np.dot(h2, params[4]) + params[5]
    h3 = npx.relu(h3_linear)
    y_hat = np.dot(h3, params[6]) + params[7]
    return y_hat

# Cross-entropy loss function
loss = gluon.loss.SoftmaxCrossEntropyLoss()
[22:00:38] ../src/storage/storage.cc:196: Using Pooled (Naive) StorageManager for CPU

13.5.4. 数据同步

为了进行高效的多GPU训练,我们需要两个基本操作。首先,我们需要能够将参数列表分发到多个设备上并附加梯度(get_params)。没有参数,就不可能在GPU上评估网络。其次,我们需要能够将多个设备上的参数求和,即我们需要一个 allreduce 函数。

def get_params(params, device):
    new_params = [p.to(device) for p in params]
    for p in new_params:
        p.requires_grad_()
    return new_params
def get_params(params, device):
    new_params = [p.copyto(device) for p in params]
    for p in new_params:
        p.attach_grad()
    return new_params

让我们通过将模型参数复制到一个GPU上来试试。

new_params = get_params(params, d2l.try_gpu(0))
print('b1 weight:', new_params[1])
print('b1 grad:', new_params[1].grad)
b1 weight: tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       device='cuda:0', requires_grad=True)
b1 grad: None
new_params = get_params(params, d2l.try_gpu(0))
print('b1 weight:', new_params[1])
print('b1 grad:', new_params[1].grad)
b1 weight: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.] @gpu(0)
b1 grad: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.] @gpu(0)
[22:00:39] ../src/storage/storage.cc:196: Using Pooled (Naive) StorageManager for GPU

由于我们还没有进行任何计算,偏置参数的梯度仍然为零。现在让我们假设有一个向量分布在多个GPU上。下面的 allreduce 函数将所有向量相加,并将结果广播回所有GPU。请注意,为了让这个工作,我们需要将数据复制到累积结果的设备上。

def allreduce(data):
    for i in range(1, len(data)):
        data[0][:] += data[i].to(data[0].device)
    for i in range(1, len(data)):
        data[i][:] = data[0].to(data[i].device)
def allreduce(data):
    for i in range(1, len(data)):
        data[0][:] += data[i].copyto(data[0].ctx)
    for i in range(1, len(data)):
        data[0].copyto(data[i])

让我们通过在不同设备上创建具有不同值的向量并聚合它们来测试一下。

data = [torch.ones((1, 2), device=d2l.try_gpu(i)) * (i + 1) for i in range(2)]
print('before allreduce:\n', data[0], '\n', data[1])
allreduce(data)
print('after allreduce:\n', data[0], '\n', data[1])
before allreduce:
 tensor([[1., 1.]], device='cuda:0')
 tensor([[2., 2.]], device='cuda:1')
after allreduce:
 tensor([[3., 3.]], device='cuda:0')
 tensor([[3., 3.]], device='cuda:1')
data = [np.ones((1, 2), ctx=d2l.try_gpu(i)) * (i + 1) for i in range(2)]
print('before allreduce:\n', data[0], '\n', data[1])
allreduce(data)
print('after allreduce:\n', data[0], '\n', data[1])
before allreduce:
 [[1. 1.]] @gpu(0)
 [[2. 2.]] @gpu(1)
after allreduce:
 [22:00:40] ../src/storage/storage.cc:196: Using Pooled (Naive) StorageManager for GPU
[[3. 3.]] @gpu(0)
 [[3. 3.]] @gpu(1)

13.5.5. 数据分发

我们需要一个简单的工具函数来将一个小批量数据均匀地分发到多个GPU上。例如,在两个GPU上,我们希望将一半的数据复制到每个GPU上。由于使用深度学习框架的内置函数更方便、更简洁,我们用它来在一个 \(4 \times 5\) 的矩阵上进行尝试。

data = torch.arange(20).reshape(4, 5)
devices = [torch.device('cuda:0'), torch.device('cuda:1')]
split = nn.parallel.scatter(data, devices)
print('input :', data)
print('load into', devices)
print('output:', split)
input : tensor([[ 0,  1,  2,  3,  4],
        [ 5,  6,  7,  8,  9],
        [10, 11, 12, 13, 14],
        [15, 16, 17, 18, 19]])
load into [device(type='cuda', index=0), device(type='cuda', index=1)]
output: (tensor([[0, 1, 2, 3, 4],
        [5, 6, 7, 8, 9]], device='cuda:0'), tensor([[10, 11, 12, 13, 14],
        [15, 16, 17, 18, 19]], device='cuda:1'))
data = np.arange(20).reshape(4, 5)
devices = [npx.gpu(0), npx.gpu(1)]
split = gluon.utils.split_and_load(data, devices)
print('input :', data)
print('load into', devices)
print('output:', split)
input : [[ 0.  1.  2.  3.  4.]
 [ 5.  6.  7.  8.  9.]
 [10. 11. 12. 13. 14.]
 [15. 16. 17. 18. 19.]]
load into [gpu(0), gpu(1)]
output: [array([[0., 1., 2., 3., 4.],
       [5., 6., 7., 8., 9.]], ctx=gpu(0)), array([[10., 11., 12., 13., 14.],
       [15., 16., 17., 18., 19.]], ctx=gpu(1))]

为了以后重用,我们定义一个 split_batch 函数,它同时分割数据和标签。

#@save
def split_batch(X, y, devices):
    """Split `X` and `y` into multiple devices."""
    assert X.shape[0] == y.shape[0]
    return (nn.parallel.scatter(X, devices),
            nn.parallel.scatter(y, devices))
#@save
def split_batch(X, y, devices):
    """Split `X` and `y` into multiple devices."""
    assert X.shape[0] == y.shape[0]
    return (gluon.utils.split_and_load(X, devices),
            gluon.utils.split_and_load(y, devices))

13.5.6. 训练

现在我们可以在一个小批量数据上实现多GPU训练。它的实现主要基于本节描述的数据并行方法。我们将使用我们刚才讨论的辅助函数 allreducesplit_and_load 来在多个GPU之间同步数据。请注意,我们不需要编写任何特定的代码来实现并行化。由于计算图在一个小批量内没有跨设备的依赖关系,它会*自动地*并行执行。

def train_batch(X, y, device_params, devices, lr):
    X_shards, y_shards = split_batch(X, y, devices)
    # Loss is calculated separately on each GPU
    ls = [loss(lenet(X_shard, device_W), y_shard).sum()
          for X_shard, y_shard, device_W in zip(
              X_shards, y_shards, device_params)]
    for l in ls:  # Backpropagation is performed separately on each GPU
        l.backward()
    # Sum all gradients from each GPU and broadcast them to all GPUs
    with torch.no_grad():
        for i in range(len(device_params[0])):
            allreduce([device_params[c][i].grad for c in range(len(devices))])
    # The model parameters are updated separately on each GPU
    for param in device_params:
        d2l.sgd(param, lr, X.shape[0]) # Here, we use a full-size batch
def train_batch(X, y, device_params, devices, lr):
    X_shards, y_shards = split_batch(X, y, devices)
    with autograd.record():  # Loss is calculated separately on each GPU
        ls = [loss(lenet(X_shard, device_W), y_shard)
              for X_shard, y_shard, device_W in zip(
                  X_shards, y_shards, device_params)]
    for l in ls:  # Backpropagation is performed separately on each GPU
        l.backward()
    # Sum all gradients from each GPU and broadcast them to all GPUs
    for i in range(len(device_params[0])):
        allreduce([device_params[c][i].grad for c in range(len(devices))])
    # The model parameters are updated separately on each GPU
    for param in device_params:
        d2l.sgd(param, lr, X.shape[0])  # Here, we use a full-size batch

现在,我们可以定义训练函数。它与前面章节中使用的函数略有不同:我们需要分配GPU并将所有模型参数复制到所有设备上。显然,每个批量都使用 train_batch 函数来处理多个GPU。为方便起见(以及代码的简洁性),我们在单个GPU上计算准确率,但这效率*低下*,因为其他GPU处于空闲状态。

def train(num_gpus, batch_size, lr):
    train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)
    devices = [d2l.try_gpu(i) for i in range(num_gpus)]
    # Copy model parameters to `num_gpus` GPUs
    device_params = [get_params(params, d) for d in devices]
    num_epochs = 10
    animator = d2l.Animator('epoch', 'test acc', xlim=[1, num_epochs])
    timer = d2l.Timer()
    for epoch in range(num_epochs):
        timer.start()
        for X, y in train_iter:
            # Perform multi-GPU training for a single minibatch
            train_batch(X, y, device_params, devices, lr)
            torch.cuda.synchronize()
        timer.stop()
        # Evaluate the model on GPU 0
        animator.add(epoch + 1, (d2l.evaluate_accuracy_gpu(
            lambda x: lenet(x, device_params[0]), test_iter, devices[0]),))
    print(f'test acc: {animator.Y[0][-1]:.2f}, {timer.avg():.1f} sec/epoch '
          f'on {str(devices)}')
def train(num_gpus, batch_size, lr):
    train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)
    devices = [d2l.try_gpu(i) for i in range(num_gpus)]
    # Copy model parameters to `num_gpus` GPUs
    device_params = [get_params(params, d) for d in devices]
    num_epochs = 10
    animator = d2l.Animator('epoch', 'test acc', xlim=[1, num_epochs])
    timer = d2l.Timer()
    for epoch in range(num_epochs):
        timer.start()
        for X, y in train_iter:
            # Perform multi-GPU training for a single minibatch
            train_batch(X, y, device_params, devices, lr)
            npx.waitall()
        timer.stop()
        # Evaluate the model on GPU 0
        animator.add(epoch + 1, (d2l.evaluate_accuracy_gpu(
            lambda x: lenet(x, device_params[0]), test_iter, devices[0]),))
    print(f'test acc: {animator.Y[0][-1]:.2f}, {timer.avg():.1f} sec/epoch '
          f'on {str(devices)}')

让我们看看这在单个GPU上的效果如何。我们首先使用256的批量大小和0.2的学习率。

train(num_gpus=1, batch_size=256, lr=0.2)
test acc: 0.83, 3.0 sec/epoch on [device(type='cuda', index=0)]
../_images/output_multiple-gpus_f17d18_93_1.svg
train(num_gpus=1, batch_size=256, lr=0.2)
test acc: 0.83, 5.5 sec/epoch on [gpu(0)]
../_images/output_multiple-gpus_f17d18_96_1.svg

通过保持批量大小和学习率不变,并将GPU数量增加到2,我们可以看到测试准确率与之前的实验大致相同。就优化算法而言,它们是相同的。不幸的是,这里没有获得有意义的加速:模型太小了;此外我们只有一个小数据集,我们实现多GPU训练的略显粗糙的方法受到了显著的Python开销的影响。接下来,我们将遇到更复杂的模型和更复杂的并行化方法。尽管如此,让我们看看在Fashion-MNIST上会发生什么。

train(num_gpus=2, batch_size=256, lr=0.2)
test acc: 0.84, 2.8 sec/epoch on [device(type='cuda', index=0), device(type='cuda', index=1)]
../_images/output_multiple-gpus_f17d18_102_1.svg
train(num_gpus=2, batch_size=256, lr=0.2)
test acc: 0.85, 6.9 sec/epoch on [gpu(0), gpu(1)]
../_images/output_multiple-gpus_f17d18_105_1.svg

13.5.7. 小结

  • 有多种方法可以在多个GPU上拆分深度网络的训练。我们可以按层之间、跨层或跨数据进行拆分。前两种需要紧密协调的数据传输。数据并行是最简单的策略。

  • 数据并行训练很简单。然而,为了提高效率,它增加了有效的小批量大小。

  • 在数据并行中,数据被分割到多个GPU上,每个GPU执行自己的前向和后向操作,随后梯度被聚合,结果被广播回所有GPU。

  • 对于更大的小批量,我们可以使用稍大的学习率。

13.5.8. 练习

  1. 当在 \(k\) 个GPU上训练时,将小批量大小从 \(b\) 改为 \(k \cdot b\),即按GPU的数量进行扩展。

  2. 比较不同学习率下的准确率。它如何随着GPU数量的变化而变化?

  3. 实现一个更高效的 allreduce 函数,在不同的GPU上聚合不同的参数?为什么它更高效?

  4. 实现多GPU测试准确率的计算。