10.7. 用于机器翻译的序列到序列学习
在 Colab 中打开 Notebook
在 Colab 中打开 Notebook
在 Colab 中打开 Notebook
在 Colab 中打开 Notebook
在 SageMaker Studio Lab 中打开 Notebook

在诸如机器翻译(如 第 10.5 节 中所讨论的)之类的序列到序列问题中,输入和输出都由可变长度的未对齐序列组成,我们通常依赖于编码器-解码器架构 (第 10.6 节)。在本节中,我们将演示一个编码器-解码器架构的应用,其中编码器和解码器都实现为 RNN,用于机器翻译任务 (Cho et al., 2014, Sutskever et al., 2014)

在这里,编码器 RNN 将接收一个可变长度的序列作为输入,并将其转换为一个固定形状的隐藏状态。稍后,在 第 11 节 中,我们将介绍注意力机制,它允许我们访问编码后的输入,而无需将整个输入压缩成一个单一的固定长度表示。

然后,为了逐个词元生成输出序列,解码器模型(由一个独立的 RNN 组成)将在给定输入序列和输出中先前词元的条件下,预测每个后续的目标词元。在训练期间,解码器通常会以官方“真实”标签中的先前词元为条件。然而,在测试时,我们希望解码器的每个输出都以已经预测出的词元为条件。请注意,如果我们忽略编码器,序列到序列架构中的解码器行为就如同一个普通的语言模型。图 10.7.1 说明了如何使用两个 RNN 进行机器翻译中的序列到序列学习。

../_images/seq2seq.svg

图 10.7.1 使用 RNN 编码器和 RNN 解码器的序列到序列学习。

图 10.7.1 中,特殊的“<eos>”词元标记序列的结束。一旦生成此词元,我们的模型就可以停止进行预测。在 RNN 解码器的初始时间步,有两个特殊的设计决策需要注意:首先,我们以一个特殊的序列开始符“<bos>”开始每个输入。其次,我们可以将编码器的最终隐藏状态输入到解码器的每一个解码时间步 (Cho et al., 2014)。在其他一些设计中,例如 Sutskever et al. (2014) 的设计,RNN 编码器的最终隐藏状态仅在第一个解码步骤用于初始化解码器的隐藏状态。

import collections
import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
import collections
import math
from mxnet import autograd, gluon, init, np, npx
from mxnet.gluon import nn, rnn
from d2l import mxnet as d2l

npx.set_np()
import collections
import math
from functools import partial
import jax
import optax
from flax import linen as nn
from jax import numpy as jnp
from d2l import jax as d2l
import collections
import math
import tensorflow as tf
from d2l import tensorflow as d2l

10.7.1. 强制教学

虽然在输入序列上运行编码器相对直接,但处理解码器的输入和输出需要更加小心。最常见的方法有时被称为*强制教学* (teacher forcing)。在这里,原始目标序列(词元标签)作为输入被送入解码器。更具体地说,将特殊的序列开始符和原始目标序列(不包括最后一个词元)连接起来作为解码器的输入,而解码器的输出(用于训练的标签)是原始目标序列,向左移动一个词元:“<bos>”, “Ils”, “regardent”, “.” \(\rightarrow\) “Ils”, “regardent”, “.”, “<eos>” (图 10.7.1)。

我们在 第 10.5.3 节 中的实现为强制教学准备了训练数据,其中用于自监督学习的词元移位类似于 第 9.3 节 中语言模型的训练。另一种方法是将前一个时间步*预测*的词元作为当前输入送入解码器。

在下文中,我们将更详细地解释 图 10.7.1 中描述的设计。我们将在 第 10.5 节 中介绍的英法数据​​集上训练这个模型进行机器翻译。

10.7.2. 编码器

回想一下,编码器将一个可变长度的输入序列转换成一个固定形状的*上下文变量* \(\mathbf{c}\)(见 图 10.7.1)。

考虑一个单一序列的例子(批量大小为1)。假设输入序列是 \(x_1, \ldots, x_T\),其中 \(x_t\) 是第 \(t^{\textrm{th}}\) 个词元。在时间步 \(t\),RNN将 \(x_t\) 的输入特征向量 \(\mathbf{x}_t\) 和前一个时间步的隐藏状态 \(\mathbf{h} _{t-1}\) 转换成当前的隐藏状态 \(\mathbf{h}_t\)。我们可以使用一个函数 \(f\) 来表示 RNN 循环层的变换

(10.7.1)\[\mathbf{h}_t = f(\mathbf{x}_t, \mathbf{h}_{t-1}).\]

通常,编码器通过一个自定义函数 \(q\) 将所有时间步的隐藏状态转换成一个上下文变量

(10.7.2)\[\mathbf{c} = q(\mathbf{h}_1, \ldots, \mathbf{h}_T).\]

例如,在 图 10.7.1 中,上下文变量就是处理完输入序列最后一个词元后,编码器 RNN 的隐藏状态 \(\mathbf{h}_T\)

在这个例子中,我们使用了一个单向 RNN 来设计编码器,其中隐藏状态仅依赖于该隐藏状态时间步及其之前的输入子序列。我们也可以使用双向 RNN 构建编码器。在这种情况下,隐藏状态依赖于时间步之前和之后的子序列(包括当前时间步的输入),从而编码了整个序列的信息。

现在让我们实现 RNN 编码器。注意,我们使用一个*嵌入层*来获取输入序列中每个词元的特征向量。嵌入层的权重是一个矩阵,其中行数对应于输入词汇表的大小(vocab_size),列数对应于特征向量的维度(embed_size)。对于任何输入词元的索引 \(i\),嵌入层会获取权重矩阵的第 \(i^{\textrm{th}}\) 行(从 0 开始)来返回其特征向量。这里我们用一个多层 GRU 来实现编码器。

def init_seq2seq(module):  #@save
    """Initialize weights for sequence-to-sequence learning."""
    if type(module) == nn.Linear:
         nn.init.xavier_uniform_(module.weight)
    if type(module) == nn.GRU:
        for param in module._flat_weights_names:
            if "weight" in param:
                nn.init.xavier_uniform_(module._parameters[param])

class Seq2SeqEncoder(d2l.Encoder):  #@save
    """The RNN encoder for sequence-to-sequence learning."""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = d2l.GRU(embed_size, num_hiddens, num_layers, dropout)
        self.apply(init_seq2seq)

    def forward(self, X, *args):
        # X shape: (batch_size, num_steps)
        embs = self.embedding(X.t().type(torch.int64))
        # embs shape: (num_steps, batch_size, embed_size)
        outputs, state = self.rnn(embs)
        # outputs shape: (num_steps, batch_size, num_hiddens)
        # state shape: (num_layers, batch_size, num_hiddens)
        return outputs, state
class Seq2SeqEncoder(d2l.Encoder):  #@save
    """The RNN encoder for sequence-to-sequence learning."""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = d2l.GRU(num_hiddens, num_layers, dropout)
        self.initialize(init.Xavier())

    def forward(self, X, *args):
        # X shape: (batch_size, num_steps)
        embs = self.embedding(d2l.transpose(X))
        # embs shape: (num_steps, batch_size, embed_size)
        outputs, state = self.rnn(embs)
        # outputs shape: (num_steps, batch_size, num_hiddens)
        # state shape: (num_layers, batch_size, num_hiddens)
        return outputs, state
class Seq2SeqEncoder(d2l.Encoder):  #@save
    """The RNN encoder for sequence-to-sequence learning."""
    vocab_size: int
    embed_size: int
    num_hiddens: int
    num_layers: int
    dropout: float = 0

    def setup(self):
        self.embedding = nn.Embed(self.vocab_size, self.embed_size)
        self.rnn = d2l.GRU(self.num_hiddens, self.num_layers, self.dropout)

    def __call__(self, X, *args, training=False):
        # X shape: (batch_size, num_steps)
        embs = self.embedding(d2l.transpose(X).astype(jnp.int32))
        # embs shape: (num_steps, batch_size, embed_size)
        outputs, state = self.rnn(embs, training=training)
        # outputs shape: (num_steps, batch_size, num_hiddens)
        # state shape: (num_layers, batch_size, num_hiddens)
        return outputs, state
class Seq2SeqEncoder(d2l.Encoder):  #@save
    """The RNN encoder for sequence-to-sequence learning."""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embed_size)
        self.rnn = d2l.GRU(num_hiddens, num_layers, dropout)

    def call(self, X, *args):
        # X shape: (batch_size, num_steps)
        embs = self.embedding(tf.transpose(X))
        # embs shape: (num_steps, batch_size, embed_size)
        outputs, state = self.rnn(embs)
        # outputs shape: (num_steps, batch_size, num_hiddens)
        # state shape: (num_layers, batch_size, num_hiddens)
        return outputs, state

让我们用一个具体的例子来说明上述编码器的实现。下面,我们实例化一个隐藏单元数为 16 的两层 GRU 编码器。给定一个小批量的序列输入 X(批量大小 \(=4\);时间步数 \(=9\)),所有时间步上最后一层的隐藏状态(由编码器的循环层返回的 enc_outputs)是一个形状为(时间步数,批量大小,隐藏单元数)的张量。

vocab_size, embed_size, num_hiddens, num_layers = 10, 8, 16, 2
batch_size, num_steps = 4, 9
encoder = Seq2SeqEncoder(vocab_size, embed_size, num_hiddens, num_layers)
X = torch.zeros((batch_size, num_steps))
enc_outputs, enc_state = encoder(X)
d2l.check_shape(enc_outputs, (num_steps, batch_size, num_hiddens))
vocab_size, embed_size, num_hiddens, num_layers = 10, 8, 16, 2
batch_size, num_steps = 4, 9
encoder = Seq2SeqEncoder(vocab_size, embed_size, num_hiddens, num_layers)
X = np.zeros((batch_size, num_steps))
enc_outputs, enc_state = encoder(X)
d2l.check_shape(enc_outputs, (num_steps, batch_size, num_hiddens))
[22:59:16] ../src/storage/storage.cc:196: Using Pooled (Naive) StorageManager for CPU
vocab_size, embed_size, num_hiddens, num_layers = 10, 8, 16, 2
batch_size, num_steps = 4, 9
encoder = Seq2SeqEncoder(vocab_size, embed_size, num_hiddens, num_layers)
X = jnp.zeros((batch_size, num_steps))
(enc_outputs, enc_state), _ = encoder.init_with_output(d2l.get_key(), X)

d2l.check_shape(enc_outputs, (num_steps, batch_size, num_hiddens))
vocab_size, embed_size, num_hiddens, num_layers = 10, 8, 16, 2
batch_size, num_steps = 4, 9
encoder = Seq2SeqEncoder(vocab_size, embed_size, num_hiddens, num_layers)
X = tf.zeros((batch_size, num_steps))
enc_outputs, enc_state = encoder(X)
d2l.check_shape(enc_outputs, (num_steps, batch_size, num_hiddens))

由于我们在这里使用 GRU,因此在最后一个时间步的多层隐藏状态的形状是(隐藏层数,批量大小,隐藏单元数)。

d2l.check_shape(enc_state, (num_layers, batch_size, num_hiddens))
d2l.check_shape(enc_state, (num_layers, batch_size, num_hiddens))
d2l.check_shape(enc_state, (num_layers, batch_size, num_hiddens))
d2l.check_len(enc_state, num_layers)
d2l.check_shape(enc_state[0], (batch_size, num_hiddens))

10.7.3. 解码器

给定目标输出序列 \(y_1, y_2, \ldots, y_{T'}\),对于每个时间步 \(t'\)(我们用 \(t^\prime\) 来区分输入序列的时间步),解码器在给定目标序列中之前的词元 \(y_1, \ldots, y_{t'}\) 和上下文变量 \(\mathbf{c}\) 的条件下,为在步骤 \(y_{t'+1}\) 出现的每个可能词元分配一个预测概率,即 \(P(y_{t'+1} \mid y_1, \ldots, y_{t'}, \mathbf{c})\)

为了预测目标序列中的下一个词元 \(t^\prime+1\),RNN 解码器将上一步的目标词元 \(y_{t^\prime}\)、上一个时间步的 RNN 隐藏状态 \(\mathbf{s}_{t^\prime-1}\) 以及上下文变量 \(\mathbf{c}\) 作为输入,并将它们转换成当前时间步的隐藏状态 \(\mathbf{s}_{t^\prime}\)。我们可以用一个函数 \(g\) 来表示解码器隐藏层的转换

(10.7.3)\[\mathbf{s}_{t^\prime} = g(y_{t^\prime-1}, \mathbf{c}, \mathbf{s}_{t^\prime-1}).\]

在获得解码器的隐藏状态后,我们可以使用一个输出层和 softmax 操作来计算关于下一个输出词元 \({t^\prime+1}\) 的预测分布 \(p(y_{t^{\prime}+1} \mid y_1, \ldots, y_{t^\prime}, \mathbf{c})\)

遵循 图 10.7.1,在如下实现解码器时,我们直接使用编码器最后一个时间步的隐藏状态来初始化解码器的隐藏状态。这要求 RNN 编码器和 RNN 解码器具有相同的层数和隐藏单元数。为了进一步整合编码后的输入序列信息,上下文变量在所有时间步都与解码器输入进行拼接。为了预测输出词元的概率分布,我们使用一个全连接层来转换 RNN 解码器最后一层的隐藏状态。

class Seq2SeqDecoder(d2l.Decoder):
    """The RNN decoder for sequence to sequence learning."""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = d2l.GRU(embed_size+num_hiddens, num_hiddens,
                           num_layers, dropout)
        self.dense = nn.LazyLinear(vocab_size)
        self.apply(init_seq2seq)

    def init_state(self, enc_all_outputs, *args):
        return enc_all_outputs

    def forward(self, X, state):
        # X shape: (batch_size, num_steps)
        # embs shape: (num_steps, batch_size, embed_size)
        embs = self.embedding(X.t().type(torch.int32))
        enc_output, hidden_state = state
        # context shape: (batch_size, num_hiddens)
        context = enc_output[-1]
        # Broadcast context to (num_steps, batch_size, num_hiddens)
        context = context.repeat(embs.shape[0], 1, 1)
        # Concat at the feature dimension
        embs_and_context = torch.cat((embs, context), -1)
        outputs, hidden_state = self.rnn(embs_and_context, hidden_state)
        outputs = self.dense(outputs).swapaxes(0, 1)
        # outputs shape: (batch_size, num_steps, vocab_size)
        # hidden_state shape: (num_layers, batch_size, num_hiddens)
        return outputs, [enc_output, hidden_state]
class Seq2SeqDecoder(d2l.Decoder):
    """The RNN decoder for sequence to sequence learning."""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = d2l.GRU(num_hiddens, num_layers, dropout)
        self.dense = nn.Dense(vocab_size, flatten=False)
        self.initialize(init.Xavier())

    def init_state(self, enc_all_outputs, *args):
        return enc_all_outputs

    def forward(self, X, state):
        # X shape: (batch_size, num_steps)
        # embs shape: (num_steps, batch_size, embed_size)
        embs = self.embedding(d2l.transpose(X))
        enc_output, hidden_state = state
        # context shape: (batch_size, num_hiddens)
        context = enc_output[-1]
        # Broadcast context to (num_steps, batch_size, num_hiddens)
        context = np.tile(context, (embs.shape[0], 1, 1))
        # Concat at the feature dimension
        embs_and_context = np.concatenate((embs, context), -1)
        outputs, hidden_state = self.rnn(embs_and_context, hidden_state)
        outputs = self.dense(outputs).swapaxes(0, 1)
        # outputs shape: (batch_size, num_steps, vocab_size)
        # hidden_state shape: (num_layers, batch_size, num_hiddens)
        return outputs, [enc_output, hidden_state]
class Seq2SeqDecoder(d2l.Decoder):
    """The RNN decoder for sequence to sequence learning."""
    vocab_size: int
    embed_size: int
    num_hiddens: int
    num_layers: int
    dropout: float = 0

    def setup(self):
        self.embedding = nn.Embed(self.vocab_size, self.embed_size)
        self.rnn = d2l.GRU(self.num_hiddens, self.num_layers, self.dropout)
        self.dense = nn.Dense(self.vocab_size)

    def init_state(self, enc_all_outputs, *args):
        return enc_all_outputs

    def __call__(self, X, state, training=False):
        # X shape: (batch_size, num_steps)
        # embs shape: (num_steps, batch_size, embed_size)
        embs = self.embedding(d2l.transpose(X).astype(jnp.int32))
        enc_output, hidden_state = state
        # context shape: (batch_size, num_hiddens)
        context = enc_output[-1]
        # Broadcast context to (num_steps, batch_size, num_hiddens)
        context = jnp.tile(context, (embs.shape[0], 1, 1))
        # Concat at the feature dimension
        embs_and_context = jnp.concatenate((embs, context), -1)
        outputs, hidden_state = self.rnn(embs_and_context, hidden_state,
                                         training=training)
        outputs = self.dense(outputs).swapaxes(0, 1)
        # outputs shape: (batch_size, num_steps, vocab_size)
        # hidden_state shape: (num_layers, batch_size, num_hiddens)
        return outputs, [enc_output, hidden_state]
class Seq2SeqDecoder(d2l.Decoder):
    """The RNN decoder for sequence to sequence learning."""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embed_size)
        self.rnn = d2l.GRU(num_hiddens, num_layers, dropout)
        self.dense = tf.keras.layers.Dense(vocab_size)

    def init_state(self, enc_all_outputs, *args):
        return enc_all_outputs

    def call(self, X, state):
        # X shape: (batch_size, num_steps)
        # embs shape: (num_steps, batch_size, embed_size)
        embs = self.embedding(tf.transpose(X))
        enc_output, hidden_state = state
        # context shape: (batch_size, num_hiddens)
        context = enc_output[-1]
        # Broadcast context to (num_steps, batch_size, num_hiddens)
        context = tf.tile(tf.expand_dims(context, 0), (embs.shape[0], 1, 1))
        # Concat at the feature dimension
        embs_and_context = tf.concat((embs, context), -1)
        outputs, hidden_state = self.rnn(embs_and_context, hidden_state)
        outputs = tf.transpose(self.dense(outputs), (1, 0, 2))
        # outputs shape: (batch_size, num_steps, vocab_size)
        # hidden_state shape: (num_layers, batch_size, num_hiddens)
        return outputs, [enc_output, hidden_state]

为了说明所实现的解码器,下面我们用与上述编码器相同的超参数来实例化它。我们可以看到,解码器的输出形状变为(批量大小,时间步数,词汇表大小),其中张量的最后一个维度存储了预测的词元分布。

decoder = Seq2SeqDecoder(vocab_size, embed_size, num_hiddens, num_layers)
state = decoder.init_state(encoder(X))
dec_outputs, state = decoder(X, state)
d2l.check_shape(dec_outputs, (batch_size, num_steps, vocab_size))
d2l.check_shape(state[1], (num_layers, batch_size, num_hiddens))
decoder = Seq2SeqDecoder(vocab_size, embed_size, num_hiddens, num_layers)
state = decoder.init_state(encoder(X))
dec_outputs, state = decoder(X, state)
d2l.check_shape(dec_outputs, (batch_size, num_steps, vocab_size))
d2l.check_shape(state[1], (num_layers, batch_size, num_hiddens))
decoder = Seq2SeqDecoder(vocab_size, embed_size, num_hiddens, num_layers)
state = decoder.init_state(encoder.init_with_output(d2l.get_key(), X)[0])
(dec_outputs, state), _ = decoder.init_with_output(d2l.get_key(), X,
                                                   state)


d2l.check_shape(dec_outputs, (batch_size, num_steps, vocab_size))
d2l.check_shape(state[1], (num_layers, batch_size, num_hiddens))
decoder = Seq2SeqDecoder(vocab_size, embed_size, num_hiddens, num_layers)
state = decoder.init_state(encoder(X))
dec_outputs, state = decoder(X, state)
d2l.check_shape(dec_outputs, (batch_size, num_steps, vocab_size))
d2l.check_len(state[1], num_layers)
d2l.check_shape(state[1][0], (batch_size, num_hiddens))

上述 RNN 编码器-解码器模型中的层在 图 10.7.2 中进行了总结。

../_images/seq2seq-details.svg

图 10.7.2 RNN 编码器-解码器模型中的层。

10.7.4. 用于序列到序列学习的编码器-解码器

将所有代码整合在一起得到如下结果

class Seq2Seq(d2l.EncoderDecoder):  #@save
    """The RNN encoder--decoder for sequence to sequence learning."""
    def __init__(self, encoder, decoder, tgt_pad, lr):
        super().__init__(encoder, decoder)
        self.save_hyperparameters()

    def validation_step(self, batch):
        Y_hat = self(*batch[:-1])
        self.plot('loss', self.loss(Y_hat, batch[-1]), train=False)

    def configure_optimizers(self):
        # Adam optimizer is used here
        return torch.optim.Adam(self.parameters(), lr=self.lr)
class Seq2Seq(d2l.EncoderDecoder):  #@save
    """The RNN encoder--decoder for sequence to sequence learning."""
    def __init__(self, encoder, decoder, tgt_pad, lr):
        super().__init__(encoder, decoder)
        self.save_hyperparameters()

    def validation_step(self, batch):
        Y_hat = self(*batch[:-1])
        self.plot('loss', self.loss(Y_hat, batch[-1]), train=False)

    def configure_optimizers(self):
        # Adam optimizer is used here
        return gluon.Trainer(self.parameters(), 'adam',
                             {'learning_rate': self.lr})
class Seq2Seq(d2l.EncoderDecoder):  #@save
    """The RNN encoder--decoder for sequence to sequence learning."""
    encoder: nn.Module
    decoder: nn.Module
    tgt_pad: int
    lr: float

    def validation_step(self, params, batch, state):
        l, _ = self.loss(params, batch[:-1], batch[-1], state)
        self.plot('loss', l, train=False)

    def configure_optimizers(self):
        # Adam optimizer is used here
        return optax.adam(learning_rate=self.lr)
class Seq2Seq(d2l.EncoderDecoder):  #@save
    """The RNN encoder--decoder for sequence to sequence learning."""
    def __init__(self, encoder, decoder, tgt_pad, lr):
        super().__init__(encoder, decoder)
        self.save_hyperparameters()

    def validation_step(self, batch):
        Y_hat = self(*batch[:-1])
        self.plot('loss', self.loss(Y_hat, batch[-1]), train=False)

    def configure_optimizers(self):
        # Adam optimizer is used here
        return tf.keras.optimizers.Adam(learning_rate=self.lr)

10.7.5. 带掩码的损失函数

在每个时间步,解码器都会预测输出词元的概率分布。与语言建模一样,我们可以应用 softmax 来获得分布,并计算交叉熵损失进行优化。回想一下 第 10.5 节,特殊的填充词元被附加到序列的末尾,因此不同长度的序列可以以相同形状的小批量高效加载。然而,填充词元的预测应从损失计算中排除。为此,我们可以用零值掩蔽不相关的条目,这样任何不相关预测与零的乘积都等于零。

@d2l.add_to_class(Seq2Seq)
def loss(self, Y_hat, Y):
    l = super(Seq2Seq, self).loss(Y_hat, Y, averaged=False)
    mask = (Y.reshape(-1) != self.tgt_pad).type(torch.float32)
    return (l * mask).sum() / mask.sum()
@d2l.add_to_class(Seq2Seq)
def loss(self, Y_hat, Y):
    l = super(Seq2Seq, self).loss(Y_hat, Y, averaged=False)
    mask = (Y.reshape(-1) != self.tgt_pad).astype(np.float32)
    return (l * mask).sum() / mask.sum()
@d2l.add_to_class(Seq2Seq)
@partial(jax.jit, static_argnums=(0, 5))
def loss(self, params, X, Y, state, averaged=False):
    Y_hat = state.apply_fn({'params': params}, *X,
                           rngs={'dropout': state.dropout_rng})
    Y_hat = Y_hat.reshape((-1, Y_hat.shape[-1]))
    Y = Y.reshape((-1,))
    fn = optax.softmax_cross_entropy_with_integer_labels
    l = fn(Y_hat, Y)
    mask = (Y.reshape(-1) != self.tgt_pad).astype(jnp.float32)
    return (l * mask).sum() / mask.sum(), {}
@d2l.add_to_class(Seq2Seq)
def loss(self, Y_hat, Y):
    l = super(Seq2Seq, self).loss(Y_hat, Y, averaged=False)
    mask = tf.cast(tf.reshape(Y, -1) != self.tgt_pad, tf.float32)
    return tf.reduce_sum(l * mask) / tf.reduce_sum(mask)

10.7.6. 训练

现在我们可以在机器翻译数据集上创建并训练一个用于序列到序列学习的 RNN 编码器-解码器模型。

data = d2l.MTFraEng(batch_size=128)
embed_size, num_hiddens, num_layers, dropout = 256, 256, 2, 0.2
encoder = Seq2SeqEncoder(
    len(data.src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqDecoder(
    len(data.tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
model = Seq2Seq(encoder, decoder, tgt_pad=data.tgt_vocab['<pad>'],
                lr=0.005)
trainer = d2l.Trainer(max_epochs=30, gradient_clip_val=1, num_gpus=1)
trainer.fit(model, data)
../_images/output_seq2seq_13725e_123_0.svg
data = d2l.MTFraEng(batch_size=128)
embed_size, num_hiddens, num_layers, dropout = 256, 256, 2, 0.2
encoder = Seq2SeqEncoder(
    len(data.src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqDecoder(
    len(data.tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
model = Seq2Seq(encoder, decoder, tgt_pad=data.tgt_vocab['<pad>'],
                lr=0.005)
trainer = d2l.Trainer(max_epochs=30, gradient_clip_val=1, num_gpus=1)
trainer.fit(model, data)
../_images/output_seq2seq_13725e_126_0.svg
data = d2l.MTFraEng(batch_size=128)
embed_size, num_hiddens, num_layers, dropout = 256, 256, 2, 0.2
encoder = Seq2SeqEncoder(
    len(data.src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqDecoder(
    len(data.tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
model = Seq2Seq(encoder, decoder, tgt_pad=data.tgt_vocab['<pad>'],
                lr=0.005, training=True)
trainer = d2l.Trainer(max_epochs=30, gradient_clip_val=1, num_gpus=1)
trainer.fit(model, data)
../_images/output_seq2seq_13725e_129_0.svg
data = d2l.MTFraEng(batch_size=128)
embed_size, num_hiddens, num_layers, dropout = 256, 256, 2, 0.2
with d2l.try_gpu():
    encoder = Seq2SeqEncoder(
        len(data.src_vocab), embed_size, num_hiddens, num_layers, dropout)
    decoder = Seq2SeqDecoder(
        len(data.tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
    model = Seq2Seq(encoder, decoder, tgt_pad=data.tgt_vocab['<pad>'],
                    lr=0.005)
trainer = d2l.Trainer(max_epochs=30, gradient_clip_val=1)
trainer.fit(model, data)
../_images/output_seq2seq_13725e_132_0.svg

10.7.7. 预测

为了在每一步预测输出序列,前一个时间步预测的词元被作为输入送入解码器。一个简单的策略是在每一步预测时,采样解码器分配了最高概率的那个词元。和训练时一样,在初始时间步,序列开始符(“<bos>”)被送入解码器。这个预测过程在 图 10.7.3 中进行了说明。当预测出序列结束符(“<eos>”)时,输出序列的预测就完成了。

../_images/seq2seq-predict.svg

图 10.7.3 使用 RNN 编码器-解码器逐词元预测输出序列。

在下一节中,我们将介绍基于束搜索(第 10.8 节)的更复杂的策略。

@d2l.add_to_class(d2l.EncoderDecoder)  #@save
def predict_step(self, batch, device, num_steps,
                 save_attention_weights=False):
    batch = [a.to(device) for a in batch]
    src, tgt, src_valid_len, _ = batch
    enc_all_outputs = self.encoder(src, src_valid_len)
    dec_state = self.decoder.init_state(enc_all_outputs, src_valid_len)
    outputs, attention_weights = [tgt[:, (0)].unsqueeze(1), ], []
    for _ in range(num_steps):
        Y, dec_state = self.decoder(outputs[-1], dec_state)
        outputs.append(Y.argmax(2))
        # Save attention weights (to be covered later)
        if save_attention_weights:
            attention_weights.append(self.decoder.attention_weights)
    return torch.cat(outputs[1:], 1), attention_weights
@d2l.add_to_class(d2l.EncoderDecoder)  #@save
def predict_step(self, batch, device, num_steps,
                 save_attention_weights=False):
    batch = [a.as_in_context(device) for a in batch]
    src, tgt, src_valid_len, _ = batch
    enc_all_outputs = self.encoder(src, src_valid_len)
    dec_state = self.decoder.init_state(enc_all_outputs, src_valid_len)
    outputs, attention_weights = [np.expand_dims(tgt[:, 0], 1), ], []
    for _ in range(num_steps):
        Y, dec_state = self.decoder(outputs[-1], dec_state)
        outputs.append(Y.argmax(2))
        # Save attention weights (to be covered later)
        if save_attention_weights:
            attention_weights.append(self.decoder.attention_weights)
    return np.concatenate(outputs[1:], 1), attention_weights
@d2l.add_to_class(d2l.EncoderDecoder)  #@save
def predict_step(self, params, batch, num_steps,
                 save_attention_weights=False):
    src, tgt, src_valid_len, _ = batch
    enc_all_outputs, inter_enc_vars = self.encoder.apply(
        {'params': params['encoder']}, src, src_valid_len, training=False,
        mutable='intermediates')
    # Save encoder attention weights if inter_enc_vars containing encoder
    # attention weights is not empty. (to be covered later)
    enc_attention_weights = []
    if bool(inter_enc_vars) and save_attention_weights:
        # Encoder Attention Weights saved in the intermediates collection
        enc_attention_weights = inter_enc_vars[
            'intermediates']['enc_attention_weights'][0]

    dec_state = self.decoder.init_state(enc_all_outputs, src_valid_len)
    outputs, attention_weights = [jnp.expand_dims(tgt[:,0], 1), ], []
    for _ in range(num_steps):
        (Y, dec_state), inter_dec_vars = self.decoder.apply(
            {'params': params['decoder']}, outputs[-1], dec_state,
            training=False, mutable='intermediates')
        outputs.append(Y.argmax(2))
        # Save attention weights (to be covered later)
        if save_attention_weights:
            # Decoder Attention Weights saved in the intermediates collection
            dec_attention_weights = inter_dec_vars[
                'intermediates']['dec_attention_weights'][0]
            attention_weights.append(dec_attention_weights)
    return jnp.concatenate(outputs[1:], 1), (attention_weights,
                                        enc_attention_weights)
@d2l.add_to_class(d2l.EncoderDecoder)  #@save
def predict_step(self, batch, device, num_steps,
                 save_attention_weights=False):
    src, tgt, src_valid_len, _ = batch
    enc_all_outputs = self.encoder(src, src_valid_len, training=False)
    dec_state = self.decoder.init_state(enc_all_outputs, src_valid_len)
    outputs, attention_weights = [tf.expand_dims(tgt[:, 0], 1), ], []
    for _ in range(num_steps):
        Y, dec_state = self.decoder(outputs[-1], dec_state, training=False)
        outputs.append(tf.argmax(Y, 2))
        # Save attention weights (to be covered later)
        if save_attention_weights:
            attention_weights.append(self.decoder.attention_weights)
    return tf.concat(outputs[1:], 1), attention_weights

10.7.8. 预测序列的评估

我们可以通过将预测序列与目标序列(真实标签)进行比较来评估它。但究竟什么才是比较两个序列相似度的合适度量呢?

双语评估替补(BLEU),虽然最初是为评估机器翻译结果而提出的 (Papineni et al., 2002),但已广泛用于衡量不同应用的输出序列质量。原则上,对于预测序列中的任何 \(n\)-gram(第 9.3.1.1 节),BLEU 评估这个 \(n\)-gram 是否出现在目标序列中。

\(p_n\) 表示 \(n\)-gram 的精度,定义为预测序列和目标序列中匹配的 \(n\)-gram 数量与预测序列中 \(n\)-gram 数量的比率。为了解释,给定一个目标序列 \(A\), \(B\), \(C\), \(D\), \(E\), \(F\),和一个预测序列 \(A\), \(B\), \(B\), \(C\), \(D\),我们有 \(p_1 = 4/5\)\(p_2 = 3/4\)\(p_3 = 1/3\),和 \(p_4 = 0\)。现在让 \(\textrm{len}_{\textrm{label}}\)\(\textrm{len}_{\textrm{pred}}\) 分别是目标序列和预测序列中的词元数量。那么,BLEU 定义为

(10.7.4)\[\exp\left(\min\left(0, 1 - \frac{\textrm{len}_{\textrm{label}}}{\textrm{len}_{\textrm{pred}}}\right)\right) \prod_{n=1}^k p_n^{1/2^n},\]

其中 \(k\) 是用于匹配的最长 \(n\)-gram。

根据 (10.7.4) 中 BLEU 的定义,只要预测序列与目标序列相同,BLEU 就是 1。此外,由于匹配更长的 \(n\)-gram 更加困难,当更长的 \(n\)-gram 具有高精度时,BLEU 会赋予更大的权重。具体来说,当 \(p_n\) 固定时,\(p_n^{1/2^n}\) 随着 \(n\) 的增长而增加(原始论文使用 \(p_n^{1/n}\))。此外,由于预测较短的序列往往会产生更高的 \(p_n\) 值,(10.7.4) 中乘法项之前的系数会对较短的预测序列进行惩罚。例如,当 \(k=2\) 时,给定目标序列 \(A\), \(B\), \(C\), \(D\), \(E\), \(F\) 和预测序列 \(A\), \(B\),虽然 \(p_1 = p_2 = 1\),但惩罚因子 \(\exp(1-6/2) \approx 0.14\) 会降低 BLEU。

我们如下实现 BLEU 度量。

def bleu(pred_seq, label_seq, k):  #@save
    """Compute the BLEU."""
    pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ')
    len_pred, len_label = len(pred_tokens), len(label_tokens)
    score = math.exp(min(0, 1 - len_label / len_pred))
    for n in range(1, min(k, len_pred) + 1):
        num_matches, label_subs = 0, collections.defaultdict(int)
        for i in range(len_label - n + 1):
            label_subs[' '.join(label_tokens[i: i + n])] += 1
        for i in range(len_pred - n + 1):
            if label_subs[' '.join(pred_tokens[i: i + n])] > 0:
                num_matches += 1
                label_subs[' '.join(pred_tokens[i: i + n])] -= 1
        score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n))
    return score

最后,我们使用训练好的 RNN 编码器-解码器将一些英语句子翻译成法语,并计算结果的 BLEU。

engs = ['go .', 'i lost .', 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
preds, _ = model.predict_step(
    data.build(engs, fras), d2l.try_gpu(), data.num_steps)
for en, fr, p in zip(engs, fras, preds):
    translation = []
    for token in data.tgt_vocab.to_tokens(p):
        if token == '<eos>':
            break
        translation.append(token)
    print(f'{en} => {translation}, bleu,'
          f'{bleu(" ".join(translation), fr, k=2):.3f}')
go . => ['va', '!'], bleu,1.000
i lost . => ["j'ai", 'perdu', '.'], bleu,1.000
he's calm . => ['elle', 'court', '.'], bleu,0.000
i'm home . => ['je', 'suis', 'chez', 'moi', '.'], bleu,1.000
engs = ['go .', 'i lost .', 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
preds, _ = model.predict_step(
    data.build(engs, fras), d2l.try_gpu(), data.num_steps)
for en, fr, p in zip(engs, fras, preds):
    translation = []
    for token in data.tgt_vocab.to_tokens(p):
        if token == '<eos>':
            break
        translation.append(token)
    print(f'{en} => {translation}, bleu,'
          f'{bleu(" ".join(translation), fr, k=2):.3f}')
go . => ['va', '!'], bleu,1.000
i lost . => ["j'ai", 'perdu', '.'], bleu,1.000
he's calm . => ['il', 'est', 'mouillé', '.'], bleu,0.658
i'm home . => ['je', 'suis', 'chez', 'moi', '.'], bleu,1.000
engs = ['go .', 'i lost .', 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
preds, _ = model.predict_step(trainer.state.params, data.build(engs, fras),
                              data.num_steps)
for en, fr, p in zip(engs, fras, preds):
    translation = []
    for token in data.tgt_vocab.to_tokens(p):
        if token == '<eos>':
            break
        translation.append(token)
    print(f'{en} => {translation}, bleu,'
          f'{bleu(" ".join(translation), fr, k=2):.3f}')
go . => ['va', '.'], bleu,0.000
i lost . => ['j’ai', 'perdu', '.'], bleu,0.687
he's calm . => ['il', '<unk>', 'perdu', '.'], bleu,0.000
i'm home . => ['je', 'suis', '<unk>', '.'], bleu,0.512
engs = ['go .', 'i lost .', 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
preds, _ = model.predict_step(
    data.build(engs, fras), d2l.try_gpu(), data.num_steps)
for en, fr, p in zip(engs, fras, preds):
    translation = []
    for token in data.tgt_vocab.to_tokens(p):
        if token == '<eos>':
            break
        translation.append(token)
    print(f'{en} => {translation}, bleu,'
          f'{bleu(" ".join(translation), fr, k=2):.3f}')
go . => ['poursuis', '.'], bleu,0.000
i lost . => ['je', 'refuse', '.'], bleu,0.000
he's calm . => ['nous', '<unk>', '.'], bleu,0.000
i'm home . => ['je', 'suis', 'chez', '.'], bleu,0.704

10.7.9. 小结

遵循编码器-解码器架构的设计,我们可以使用两个 RNN 来设计一个用于序列到序列学习的模型。在编码器-解码器训练中,强制教学方法将原始输出序列(而不是预测)输入到解码器。在实现编码器和解码器时,我们可以使用多层 RNN。我们可以使用掩码来过滤掉不相关的计算,例如在计算损失时。为了评估输出序列,BLEU 是一个流行的度量,它匹配预测序列和目标序列之间的 \(n\)-grams。

10.7.10. 练习

  1. 你能否调整超参数来改善翻译结果?

  2. 在损失计算中不使用掩码重新运行实验。你观察到什么结果?为什么?

  3. 如果编码器和解码器的层数或隐藏单元数不同,我们如何初始化解码器的隐藏状态?

  4. 在训练中,用前一个时间步的预测替换强制教学送入解码器。这对性能有何影响?

  5. 用 LSTM 替换 GRU 重新运行实验。

  6. 还有其他设计解码器输出层的方法吗?