如何实现Transformer的解码器及输出?

摘要:我们在《Transformer的结构拆解》那篇文章中介绍过,Transformer 可以分为四个部分:输入、输出、编码器、解码器。上篇文章介绍了编码器的实现,这篇文章介绍一下解码器的实现。 我们回顾一下 Transformer 的解码器的构
我们在《Transformer的结构拆解》那篇文章中介绍过,Transformer 可以分为四个部分:输入、输出、编码器、解码器。上篇文章介绍了编码器的实现,这篇文章介绍一下解码器的实现。 我们回顾一下 Transformer 的解码器的构成。它由 N 个解码器层堆叠而成,每个解码器层由三个子层连接结构组成。 第一个子层连接结构包括一个掩码多头自注意力子层和层归一化以及一个残差连接; 第二个子层连接结构包括一个多头交叉注意力子层和层归一化以及一个残差连接; 第三个子层连接结构包括一个前馈网络子层和层归一化以及一个残差连接。 1 解码器层 Transformer 解码器层是实现序列生成的基础单元,每层由掩码多头自注意力、编码器 - 解码器交叉注意力与前馈网络三个子层构成,且每个子层均通过残差连接与层归一化封装为子层连接结构。其中掩码机制保证了自回归生成的合法性,交叉注意力实现了源序列与目标序列的语义对齐,共同完成翻译、文本生成等生成式任务。 1.1 解码器层的代码实现 class DecoderLayer(nn.Module): def __init__(self, size, self_attn, src_attn, feed_forward, dropout): """ size: 模型的特征维度(通常与词嵌入维度 d_model 相同),用来确定层归一化的输入维度 self_attn: 解码器的自注意力机制实例 src_attn: 编码器-解码器交叉注意力机制实例 feed_forward: 位置前馈神经网络实例 dropout: dropout 置零概率,用于正则化 """ super(DecoderLayer, self).__init__() self.size = size self.self_attn = self_attn self.src_attn = src_attn self.feed_forward = feed_forward self.sublayer = clones(SublayerConnection(size, dropout), 3) def forward(self, x, memory, src_mask, tgt_mask): """ x: 目标序列嵌入 memory: 编码器输出 src_mask: 源掩码 tgt_mask: 目标掩码 """ m = memory # 自注意力子层 x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask)) # 编码器-解码器交叉注意力子层 x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask)) # 前馈层 return self.sublayer[2](x, self.feed_forward) 自注意力子层是把目标序列嵌入当作 query、key、value 计算自注意力。编码器-解码器交叉注意力子层是把自注意力子层的输出作为 query,将编码器的输出同时作为 key 和 value。解码器通过交叉注意力机制,能够关注编码器输出中与当前解码位置相关的信息。这种设计使得解码器能够利用源序列的信息来指导目标序列的生成。 (1)为什么有两个掩码tgt_mask和src_mask? x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))在解码器的自注意力计算中,会通过前瞻掩码(Look-ahead Mask,包含在tgt_mask中)对目标序列进行遮掩,其核心目的是实现自回归生成,保证模型在生成第 \(i\) 个位置的输出时,只能依赖前 \(i-1\) 个位置的信息,绝对无法访问未来位置的信息。 讲得再通俗一点,Transformer 在训练时并非逐词生成,而是采用Teacher Forcing机制直接传入完整的目标序列来快速计算损失。但我们绝不允许模型在生成当前词元时“偷看”后面的真实结果,这相当于直接抄答案,违背了生成逻辑。因此必须通过掩码将未来信息全部遮蔽。也就是说,模型只能根据已生成的第1个词元(Token)推导第2个词元,第2个及之后的所有信息都不允许被当前步骤使用。 x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))也是用了一个掩码,但此处的作用和自注意力不一样。src_mask是用来遮掩编码器输出的源序列用的,它遮掩的不是未来信息,它只遮掩源序列里的填充符(<pad>),确保交叉注意力在对齐语义时,只关注源序列中的有效文本信息,避免模型被无意义的填充噪声干扰,保证源序列与目标序列的语义对齐精准度。 填充符是哪来的? Transformer 训练时是批次训练(一次输多个句子),但每个句子长度不一样。如:I love AI(3 个词),Hello(1 个词)。必须用 <pad> 补齐到相同长度才能批量计算:I love AI,Hello <pad> <pad>。 关于掩码张量,我们在《掩码张量》中介绍过。 (2)验证一下解码器层的代码实现 head = 8 size = 512 d_model = 512 d_ff = 64 dropout = 0.2 self_attn = src_attn = MultiHeadedAttention(head, d_model, dropout) ff = PositionwiseFeedForward(d_model, d_ff, dropout) # x 是来自目标序列的词嵌入表示,但形式和源序列的词嵌入表示相同,这里使用pe_result充当 x = pe_result # 编码器的输出 memory = en_result # 实际source_mask和target_mask并不相同,这里为了方便演示,让它们都相同 mask = Variable(torch.zeros(2, 4, 4)) source_mask = target_mask = mask dl = DecoderLayer(size, self_attn, src_attn, ff, dropout) dl_result = dl(x, memory, source_mask, target_mask) print(dl_result) print(dl_result.shape) memory = en_result中的en_result是手撕 Transformer (3):编码器的实现中的编码器的输出。 运行结果: tensor([[[ 14.5528, 9.9080, 26.9711, ..., 25.4314, -27.4445, 0.3006], [ 10.5142, -0.6420, -32.8790, ..., -17.5598, -2.8126, 14.4887], [ 6.1937, 26.9177, -9.3980, ..., -29.2055, -13.7845, 76.0141], [-44.3466, -41.3957, 13.9992, ..., -5.7149, -7.1333, 0.1480]], [[-21.2524, 36.1790, 54.1334, ..., 53.2883, -6.1286, -5.1146], [ 0.0000, -5.5930, -3.8052, ..., 30.2352, 3.6395, -13.5134], [ 13.7671, -23.4760, -2.0049, ..., -7.4957, 28.7890, 25.3961], [ 2.4194, 18.3551, -11.6830, ..., 20.9146, -32.0714, 51.2123]]], grad_fn=<AddBackward0>) torch.Size([2, 4, 512]) 解码器层的输出形状也是torch.Size([2, 4, 512]) 2 解码器 2.1 解码器的代码实现 class Decoder(nn.Module): def __init__(self, layer, N): """ layer: 单个解码器层的实例,通常是 DecoderLayer 类的对象 N: 解码器堆叠的层,控制解码器的深度,决定模型的表达能力,原始 Transformer 论文中建议使用 N=6 """ super(Decoder, self).__init__() self.layers = clones(layer, N) self.norm = LayerNorm(layer.size) def forward(self, x, memory, src_mask, tgt_mask): # 遍历N个解码器层 for layer in self.layers: x = layer(x, memory, src_mask, tgt_mask) # 层归一化 return self.norm(x) 验证一下 size = 512 d_model = 512 head = 8 d_ff = 64 dropout = 0.2 c = copy.deepcopy attn = MultiHeadedAttention(head, d_model) ff = PositionwiseFeedForward(d_model, d_ff, dropout) layer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout) N = 8 x = pe_result memory = en_result mask = Variable(torch.zeros(2, 4, 4)) source_mask = target_mask = mask de = Decoder(layer, N) de_result = de(x, memory, source_mask, target_mask) print(de_result) print(de_result.shape) 运行结果: tensor([[[-0.5926, 1.2526, -0.0355, ..., 1.3078, 2.0725, -0.3516], [-0.1689, 0.0543, 0.7917, ..., -0.3255, 0.8230, 1.7012], [-1.9878, 1.3672, -1.5412, ..., -0.9789, 0.0725, -0.2123], [-0.9310, 1.6301, -0.6347, ..., 1.3257, -1.9393, 0.1986]], [[ 0.4990, -0.1874, 0.5215, ..., -2.5730, 0.0910, 1.1869], [-1.0347, -0.4227, 0.3293, ..., 0.9881, -0.4895, -1.4625], [ 1.1816, -0.5605, -0.0725, ..., 1.3171, 0.3150, -1.1374], [ 0.1681, -0.2411, 0.5406, ..., -0.2607, 1.2019, 0.4188]]], grad_fn=<AddBackward0>) torch.Size([2, 4, 512]) 3 输出部分 如下图所示,Transformer 的输出部分由线性层和 Softmax 组成。 线性层的作用是把解码器输出的高维特征转换成和目标词表维度相同的原始分数(logits)。比如,解码器输出的特征维度是d_model=512,但我们要预测的是词表中的词,比如词表中有 30000 个词,那这个线性层的作用就是把 512 的特征映射成 30000 维的向量,每个维度代表该词的logits。这个 logits 是可正可负的。Softmax 层的作用是把 logits 转换成概率分布。最终输出每个词作为下一个词的概率。例如:P(我)=0.1,P(你)=0.7,P(他)=0.2,模型通常取概率最高的词(如“你”)作为预测输出。上述过程可以用公式表达: \[\text{Output}=\text{Softmax}(W \cdot h+b) \] 3.1 输出部分的代码实现 class Generator(nn.Module): def __init__(self, d_model, vocab): """ d_model: 词嵌入的维度 vocab: 词表大小 """ super(Generator, self).__init__() self.proj = nn.Linear(d_model, vocab) # (out_features, in_features),注意nn.Linear内部有转置操作 def forward(self, x): # log_softmax是对softmax结果取了对数 # dim=-1 表明这个softmax操作是在最后一个维度进行的,即词汇表维度 return log_softmax(self.proj(x), dim=-1) 代码中没有直接使用softmax,而是使用了log_softmax函数,这是在算完softmax之后又取了对数。这是因为,当词汇表很大时,softmax 计算的概率值可能非常小(接近 0),这会导致数值下溢问题,即浮点数精度不足以表示这些极小值。取对数后,极小的概率值会变成较大的负数(例如,log(1e-100) ≈ -230),而不是被舍入为 0。这样可以保留更多的数值信息,提高计算稳定性。同时也是为了和交叉熵损失函数配合,这和交叉熵损失函数的公式有关,这里不展开讲。log_softmax 的梯度计算比 softmax 更简洁。对于 log_softmax(x)_i 关于 \(x_j\) 的梯度:当 \(i = j\) 时,梯度为 \(1 - \text{softmax}(x)_i\);当 \(i \neq j\) 时,梯度为 \(-\text{softmax}(x)_j\)。这种形式在反向传播时计算更高效,且数值更稳定。 3.1.1 nn.Linear 的代码演示 import torch import torch.nn as nn m = nn.Linear(20, 30) input = torch.randn(128, 20) output = m(input) print(output.size()) 运行结果: torch.Size([128, 30]) 相当于一个 128×20 的矩阵乘以了一个 20×30 的矩阵。 3.1.2 验证一下输出部分的代码实现 d_model = 512 vocab_size = 1000 x = de_result gen = Generator(d_model, vocab_size) gen_result = gen(x) print(gen_result) print(gen_result.shape) 运行结果: tensor([[[-6.7541, -7.9885, -6.8159, ..., -6.2267, -6.3681, -5.6904], [-7.7249, -7.5576, -7.3687, ..., -7.2493, -6.2787, -6.6030], [-8.2810, -7.7648, -7.0530, ..., -7.2627, -6.9457, -7.0046], [-6.3292, -7.3185, -6.8771, ..., -7.0422, -7.5640, -7.7301]], [[-5.2091, -8.0264, -7.4655, ..., -7.0698, -7.6706, -7.0742], [-7.8060, -6.8962, -6.3360, ..., -7.7206, -7.6472, -6.3008], [-6.5726, -7.5838, -6.6203, ..., -7.1430, -6.8337, -7.5951], [-7.2082, -7.5446, -6.4408, ..., -7.0181, -7.5636, -7.5240]]], grad_fn=<LogSoftmaxBackward0>) torch.Size([2, 4, 1000]) 可以看到原本512的维度变成了1000维。