
01
引言
这是《手撕Transformer》系列的第八篇,也是最后一篇。编码器和解码器相结合,创建了一个能够轻松将德语翻译成英语的模型。

02
嵌入层
嵌入层为单词库中的每个标记Token提供相应的向量表示。这是每个序列必须经过的第一层。每个序列中的每个标记都必须嵌入到一个长度为 d_model 的向量中。这一层的输入是(batch_size,seq_length)。输出为(batch_size、seq_length、d_model)。
class Embeddings(nn.Module):def __init__(self, vocab_size: int, d_model: int):"""Args:vocab_size: size of vocabularyd_model: dimension of embeddings"""# inherit from nn.Modulesuper().__init__()# embedding look-up table (lut)self.lut = nn.Embedding(vocab_size, d_model)# dimension of embeddingsself.d_model = d_modeldef forward(self, x: Tensor):"""Args:x: input Tensor (batch_size, seq_length)Returns:embedding vector"""# embeddings by constant sqrt(d_model)return self.lut(x) * math.sqrt(self.d_model)
03
位置编码层
class PositionalEncoding(nn.Module):def __init__(self, d_model: int, dropout: float = 0.1, max_length: int = 5000):"""Args:d_model: dimension of embeddingsdropout: randomly zeroes-out some of the inputmax_length: max sequence length"""# inherit from Modulesuper().__init__()# initialize dropoutself.dropout = nn.Dropout(p=dropout)# create tensor of 0spe = torch.zeros(max_length, d_model)# create position columnk = torch.arange(0, max_length).unsqueeze(1)# calc divisor for positional encodingdiv_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))# calc sine on even indicespe[:, 0::2] = torch.sin(k * div_term)# calc cosine on odd indicespe[:, 1::2] = torch.cos(k * div_term)# add dimensionpe = pe.unsqueeze(0)# buffers are saved in state_dict but not trained by the optimizerself.register_buffer("pe", pe)def forward(self, x: Tensor):"""Args:x: embeddings (batch_size, seq_length, d_model)Returns:embeddings + positional encodings (batch_size, seq_length, d_model)"""# add positional encoding to the embeddingsx = x + self.pe[:, : x.size(1)].requires_grad_(False)# perform dropoutreturn self.dropout(x)
04
Multi-Head Attention
这些嵌入和位置编码后的序列被复制三份传递到多头注意力层,以创建对应的Query、Key和Value张量,并由线性层进行转换。它们的大小均为(batch_size、seq_length、d_model)。这些张量被分成各自的head数,大小为(batch_size, n_heads, seq_length, d_key),其中 d_key = (d_model / n_heads)。现在,每个序列都有 n_heads 表示,它们可以在训练过程中关注序列的不同方面。

查询张量Query和Key张量相乘,生成一个概率分布,再除以 √(d_key)。Key张量必须转置。乘法的输出代表每个序列与自身的关系,在解码器的交叉注意力机制中代表目标序列与源序列的关系。这些分布的大小为(batch_size、n_heads、Q_length、K_length)。根据序列的填充情况,填充的位置将会被掩模;如果它们处于解码器的自注意力机制中,它们也会被掩模,以使序列只注意之前的标记,这就是解码器的自回归特性。

代码实现如下:
class MultiHeadAttention(nn.Module):def __init__(self, d_model: int = 512, n_heads: int = 8, dropout: float = 0.1):"""Args:d_model: dimension of embeddingsn_heads: number of self attention headsdropout: probability of dropout occurring"""super().__init__()assert d_model % n_heads == 0 # ensure an even num of headsself.d_model = d_model # 512 dimself.n_heads = n_heads # 8 headsself.d_key = d_model // n_heads # assume d_value equals d_key | 512/8=64self.Wq = nn.Linear(d_model, d_model) # query weightsself.Wk = nn.Linear(d_model, d_model) # key weightsself.Wv = nn.Linear(d_model, d_model) # value weightsself.Wo = nn.Linear(d_model, d_model) # output weightsself.dropout = nn.Dropout(p=dropout) # initialize dropout layerdef forward(self, query: Tensor, key: Tensor, value: Tensor, mask: Tensor = None):"""Args:query: query vector (batch_size, q_length, d_model)key: key vector (batch_size, k_length, d_model)value: value vector (batch_size, s_length, d_model)mask: mask for decoderReturns:output: attention values (batch_size, q_length, d_model)attn_probs: softmax scores (batch_size, n_heads, q_length, k_length)"""batch_size = key.size(0)# calculate query, key, and value tensorsQ = self.Wq(query) # (32, 10, 512) x (512, 512) = (32, 10, 512)K = self.Wk(key) # (32, 10, 512) x (512, 512) = (32, 10, 512)V = self.Wv(value) # (32, 10, 512) x (512, 512) = (32, 10, 512)# split each tensor into n-heads to compute attention# query tensorQ = Q.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64)-1, # -1 = q_lengthself.n_heads,self.d_key).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, q_length, d_key)# key tensorK = K.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64)-1, # -1 = k_lengthself.n_heads,self.d_key).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, k_length, d_key)# value tensorV = V.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64)-1, # -1 = v_lengthself.n_heads,self.d_key).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, v_length, d_key)# computes attention# scaled dot product -> QK^{T}scaled_dot_prod = torch.matmul(Q, # (32, 8, 10, 64) x (32, 8, 64, 10) -> (32, 8, 10, 10) = (batch_size, n_heads, q_length, k_length)K.permute(0, 1, 3, 2)) / math.sqrt(self.d_key) # sqrt(64)# fill those positions of product as (-1e10) where mask positions are 0if mask is not None:scaled_dot_prod = scaled_dot_prod.masked_fill(mask == 0, -1e10)# apply softmaxattn_probs = torch.softmax(scaled_dot_prod, dim=-1)# multiply by values to get attentionA = torch.matmul(self.dropout(attn_probs), V) # (32, 8, 10, 10) x (32, 8, 10, 64) -> (32, 8, 10, 64)# (batch_size, n_heads, q_length, k_length) x (batch_size, n_heads, v_length, d_key) -> (batch_size, n_heads, q_length, d_key)# reshape attention back to (32, 10, 512)A = A.permute(0, 2, 1, 3).contiguous() # (32, 8, 10, 64) -> (32, 10, 8, 64)A = A.view(batch_size, -1, self.n_heads*self.d_key) # (32, 10, 8, 64) -> (32, 10, 8*64) -> (32, 10, 512) = (batch_size, q_length, d_model)# push through the final weight layeroutput = self.Wo(A) # (32, 10, 512) x (512, 512) = (32, 10, 512)return output, attn_probs
05
前馈神经网络FFN
经过层归一化和残差连接后,注意力机制的输出被传递到 FFN。FFN 由两层线性层和一个 ReLU 激活函数组成。第一层的形状为 (d_model, d_ffn)。 每个序列的张量形状为(batch_size、seq_length、d_model)经过该层,它允许模型学习更多关于每个序列的信息。此时,张量的形状为(batch_size、seq_length、d_ffn),并通过 ReLU。然后,它将通过第二层,第二层的形状为(d_ffn,d_model)。这样,张量就会收缩为原来的大小(batch_size, seq_length, d_model)。输出经过层归一化处理,并进行残差加法。
代码如下:
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model: int, d_ffn: int, dropout: float = 0.1):"""Args:d_model: dimension of embeddingsd_ffn: dimension of feed-forward networkdropout: probability of dropout occurring"""super().__init__()self.w_1 = nn.Linear(d_model, d_ffn)self.w_2 = nn.Linear(d_ffn, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):"""Args:x: output from attention (batch_size, seq_length, d_model)Returns:expanded-and-contracted representation (batch_size, seq_length, d_model)"""# w_1(x).relu(): (batch_size, seq_length, d_model) x (d_model,d_ffn) -> (batch_size, seq_length, d_ffn)# w_2(w_1(x).relu()): (batch_size, seq_length, d_ffn) x (d_ffn, d_model) -> (batch_size, seq_length, d_model)return self.w_2(self.dropout(self.w_1(x).relu()))
06
层归一化和残差连接
针对输入形状为(batch_size、seq_length、d_model)张量,层归一化将对每个 d_model 向量进行归一化。使用修改后的 z-score 公式对其进行标准化,这样可以防止梯度下降出现问题。

残差加法将进入层之前的嵌入向量添加到输出中。这就利用从多头注意力和 FFN 中获得的信息丰富了嵌入向量。
层归一化或残差加法都不会影响其输入的形状。这些都在编码器和解码器模块中实现,使用 nn.LayerNorm 是为了简单起见,而不是前序文章中创建的自定义模块。
07
编码器
每个编码器层包括上述所有层。它负责丰富源序列的嵌入。输入的大小为(batch_size、seq_length、d_model)。嵌入序列直接传递给多头注意力机制。在编码器堆栈中经过 Nx 层后,输出是每个序列的丰富表示,其中包含尽可能多的上下文。它的大小为(batch_size、seq_length、d_model)。

代码如下:
class EncoderLayer(nn.Module):def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float):"""Args:d_model: dimension of embeddingsn_heads: number of headsd_ffn: dimension of feed-forward networkdropout: probability of dropout occurring"""super().__init__()# multi-head attention sublayerself.attention = MultiHeadAttention(d_model, n_heads, dropout)# layer norm for multi-head attentionself.attn_layer_norm = nn.LayerNorm(d_model)# position-wise feed-forward networkself.positionwise_ffn = PositionwiseFeedForward(d_model, d_ffn, dropout)# layer norm for position-wise ffnself.ffn_layer_norm = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)def forward(self, src: Tensor, src_mask: Tensor):"""Args:src: positionally embedded sequences (batch_size, seq_length, d_model)src_mask: mask for the sequences (batch_size, 1, 1, seq_length)Returns:src: sequences after self-attention (batch_size, seq_length, d_model)"""# pass embeddings through multi-head attention_src, attn_probs = self.attention(src, src, src, src_mask)# residual add and normsrc = self.attn_layer_norm(src + self.dropout(_src))# position-wise feed-forward network_src = self.positionwise_ffn(src)# residual add and normsrc = self.ffn_layer_norm(src + self.dropout(_src))return src, attn_probsclass Encoder(nn.Module):def __init__(self, d_model: int, n_layers: int,n_heads: int, d_ffn: int, dropout: float = 0.1):"""Args:d_model: dimension of embeddingsn_layers: number of encoder layersn_heads: number of headsd_ffn: dimension of feed-forward networkdropout: probability of dropout occurring"""super().__init__()# create n_layers encodersself.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ffn, dropout)for layer in range(n_layers)])self.dropout = nn.Dropout(dropout)def forward(self, src: Tensor, src_mask: Tensor):"""Args:src: embedded sequences (batch_size, seq_length, d_model)src_mask: mask for the sequences (batch_size, 1, 1, seq_length)Returns:src: sequences after self-attention (batch_size, seq_length, d_model)"""# pass the sequences through each encoderfor layer in self.layers:src, attn_probs = layer(src, src_mask)self.attn_probs = attn_probsreturn src
08
解码器
每个解码器层有两个职责:(1) 学习移位目标序列的自回归表示;(2) 学习目标序列如何与编码器的丰富嵌入相关联。与编码器一样,解码器栈也有 Nx 个解码器层。如前所述,编码器的输出被传递到每个解码器层。

第一个解码层的输入被右移,并被嵌入和位置编码。它的形状为(batch_size, seq_length, d_model)。它将通过第一个注意力机制,在该机制中,模型将学习序列与自身的自回归表示。该机制的输出保持其形状,并传递给第二个交叉注意力机制。它与编码器的丰富嵌入相乘,输出再次保持原来的形状。
通过 FFN 后,将通过最后的线性层,该层的形状为(d_model,vocab_size)。这样就生成了一个大小为(batch_size、seq_length、vocab_size)的张量。之后可以通过一个 softmax 函数,最高概率就是对每个标记的预测值。
代码如下:
class DecoderLayer(nn.Module):def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float):"""Args:d_model: dimension of embeddingsn_heads: number of headsd_ffn: dimension of feed-forward networkdropout: probability of dropout occurring"""super().__init__()# masked multi-head attention sublayerself.masked_attention = MultiHeadAttention(d_model, n_heads, dropout)# layer norm for masked multi-head attentionself.masked_attn_layer_norm = nn.LayerNorm(d_model)# multi-head attention sublayerself.attention = MultiHeadAttention(d_model, n_heads, dropout)# layer norm for multi-head attentionself.attn_layer_norm = nn.LayerNorm(d_model)# position-wise feed-forward networkself.positionwise_ffn = PositionwiseFeedForward(d_model, d_ffn, dropout)# layer norm for position-wise ffnself.ffn_layer_norm = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)def forward(self, trg: Tensor, src: Tensor, trg_mask: Tensor, src_mask: Tensor):"""Args:trg: embedded sequences (batch_size, trg_seq_length, d_model)src: embedded sequences (batch_size, src_seq_length, d_model)trg_mask: mask for the sequences (batch_size, 1, trg_seq_length, trg_seq_length)src_mask: mask for the sequences (batch_size, 1, 1, src_seq_length)Returns:trg: sequences after self-attention (batch_size, trg_seq_length, d_model)attn_probs: self-attention softmax scores (batch_size, n_heads, trg_seq_length, src_seq_length)"""# pass trg embeddings through masked multi-head attention_trg, attn_probs = self.masked_attention(trg, trg, trg, trg_mask)# residual add and normtrg = self.masked_attn_layer_norm(trg + self.dropout(_trg))# pass trg and src embeddings through multi-head attention_trg, attn_probs = self.attention(trg, src, src, src_mask)# residual add and normtrg = self.attn_layer_norm(trg + self.dropout(_trg))# position-wise feed-forward network_trg = self.positionwise_ffn(trg)# residual add and normtrg = self.ffn_layer_norm(trg + self.dropout(_trg))return trg, attn_probsclass Decoder(nn.Module):def __init__(self, vocab_size: int, d_model: int, n_layers: int,n_heads: int, d_ffn: int, dropout: float = 0.1):"""Args:vocab_size: size of the target vocabularyd_model: dimension of embeddingsn_layers: number of encoder layersn_heads: number of headsd_ffn: dimension of feed-forward networkdropout: probability of dropout occurring"""super().__init__()# create n_layers encodersself.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ffn, dropout)for layer in range(n_layers)])self.dropout = nn.Dropout(dropout)# set output layerself.Wo = nn.Linear(d_model, vocab_size)def forward(self, trg: Tensor, src: Tensor, trg_mask: Tensor, src_mask: Tensor):"""Args:trg: embedded sequences (batch_size, trg_seq_length, d_model)src: encoded sequences from encoder (batch_size, src_seq_length, d_model)trg_mask: mask for the sequences (batch_size, 1, trg_seq_length, trg_seq_length)src_mask: mask for the sequences (batch_size, 1, 1, src_seq_length)Returns:output: sequences after decoder (batch_size, trg_seq_length, vocab_size)attn_probs: self-attention softmax scores (batch_size, n_heads, trg_seq_length, src_seq_length)"""# pass the sequences through each decoderfor layer in self.layers:trg, attn_probs = layer(trg, src, trg_mask, src_mask)self.attn_probs = attn_probsreturn self.Wo(trg)
09
The Transformer
编码器和解码器可在一个模块中组合,以创建Transformer模型。该模块可通过编码器、解码器以及目标嵌入和源嵌入进行初始化。
前向传递需要源序列和经过移位的目标序列。源序列被嵌入并通过编码器。编码器的输出和嵌入的目标序列通过解码器。创建源掩码和目标掩码的函数也是该模块的一部分。
class Transformer(nn.Module):def __init__(self, encoder: Encoder, decoder: Decoder,src_embed: Embeddings, trg_embed: Embeddings,src_pad_idx: int, trg_pad_idx: int, device):"""Args:encoder: encoder stackdecoder: decoder stacksrc_embed: source embeddings and encodingstrg_embed: target embeddings and encodingssrc_pad_idx: padding indextrg_pad_idx: padding indexdevice: cuda or cpuReturns:output: sequences after decoder (batch_size, trg_seq_length, vocab_size)"""super().__init__()self.encoder = encoderself.decoder = decoderself.src_embed = src_embedself.trg_embed = trg_embedself.device = deviceself.src_pad_idx = src_pad_idxself.trg_pad_idx = trg_pad_idxdef make_src_mask(self, src: Tensor):"""Args:src: raw sequences with padding (batch_size, seq_length)Returns:src_mask: mask for each sequence (batch_size, 1, 1, seq_length)"""# assign 1 to tokens that need attended to and 0 to padding tokens, then add 2 dimensionssrc_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)return src_maskdef make_trg_mask(self, trg: Tensor):"""Args:trg: raw sequences with padding (batch_size, seq_length)Returns:trg_mask: mask for each sequence (batch_size, 1, seq_length, seq_length)"""seq_length = trg.shape[1]# assign True to tokens that need attended to and False to padding tokens, then add 2 dimensionstrg_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2) # (batch_size, 1, 1, seq_length)# generate subsequent masktrg_sub_mask = torch.tril(torch.ones((seq_length, seq_length), device=self.device)).bool() # (batch_size, 1, seq_length, seq_length)# bitwise "and" operator | 0 & 0 = 0, 1 & 1 = 1, 1 & 0 = 0trg_mask = trg_mask & trg_sub_maskreturn trg_maskdef forward(self, src: Tensor, trg: Tensor):"""Args:trg: raw target sequences (batch_size, trg_seq_length)src: raw src sequences (batch_size, src_seq_length)Returns:output: sequences after decoder (batch_size, trg_seq_length, output_dim)"""# create source and target maskssrc_mask = self.make_src_mask(src) # (batch_size, 1, 1, src_seq_length)trg_mask = self.make_trg_mask(trg) # (batch_size, 1, trg_seq_length, trg_seq_length)# push the src through the encoder layerssrc = self.encoder(self.src_embed(src), src_mask) # (batch_size, src_seq_length, d_model)# decoder output and attention probabilitiesoutput = self.decoder(self.trg_embed(trg), src, trg_mask, src_mask)return output
10
构建模型
下面的简单函数初始化了编码器、解码器、位置编码和嵌入层。然后,它将这些信息传递给 Transformer 模块,创建一个可以训练的模型。
def make_model(device, src_vocab, trg_vocab, n_layers: int = 3, d_model: int = 512,d_ffn: int = 2048, n_heads: int = 8, dropout: float = 0.1,max_length: int = 5000):"""Construct a model when provided parameters.Args:src_vocab: source vocabularytrg_vocab: target vocabularyn_layers: Number of Encoder and Decodersd_model: dimension of embeddingsd_ffn: dimension of feed-forward networkn_heads: number of headsdropout: probability of dropout occurringmax_length: maximum sequence length for positional encodingsReturns:Transformer model based on hyperparameters"""# create the encoderencoder = Encoder(d_model, n_layers, n_heads, d_ffn, dropout)# create the decoderdecoder = Decoder(len(trg_vocab), d_model, n_layers, n_heads, d_ffn, dropout)# create source embedding matrixsrc_embed = Embeddings(len(src_vocab), d_model)# create target embedding matrixtrg_embed = Embeddings(len(trg_vocab), d_model)# create a positional encoding matrixpos_enc = PositionalEncoding(d_model, dropout, max_length)# create the Transformer modelmodel = Transformer(encoder, decoder, nn.Sequential(src_embed, pos_enc),nn.Sequential(trg_embed, pos_enc),src_pad_idx=src_vocab.get_stoi()["<pad>"],trg_pad_idx=trg_vocab.get_stoi()["<pad>"],device=device)# initialize parameters with Xavier/Glorotfor p in model.parameters():if p.dim() > 1:nn.init.xavier_uniform_(p)return model
11
将德语翻译成英语之数据预处理
上一篇文章使用一个小型数据集训练了一个Transformer模型,用于将德语翻译成英语。本文将使用 torchtext.datasets 中的 Multi30k 数据集。它包含训练集、验证集和测试集。所有加载标记符、生成词汇表、处理数据和生成批次的自定义函数都可以在附录中找到。

# global variables used later in the scriptspacy_de, spacy_en = load_tokenizers()vocab_src, vocab_trg = load_vocab(spacy_de, spacy_en)
Loaded English and German tokenizers.Building German Vocabulary...Building English Vocabulary...Vocabulary sizes:Source: 8147Target: 6082
BOS_IDX = vocab_trg['<bos>']EOS_IDX = vocab_trg['<eos>']PAD_IDX = vocab_trg['<pad>']
# raw datatrain_data_raw, val_data_raw, test_data_raw = datasets.Multi30k(language_pair=("de", "en"))
# processed datatrain_data = data_process(train_data_raw)val_data = data_process(val_data_raw)test_data = data_process(test_data_raw)
MAX_PADDING = 20BATCH_SIZE = 128train_iter = DataLoader(to_map_style_dataset(train_data), batch_size=BATCH_SIZE,shuffle=True, drop_last=True, collate_fn=generate_batch)valid_iter = DataLoader(to_map_style_dataset(val_data), batch_size=BATCH_SIZE,shuffle=True, drop_last=True, collate_fn=generate_batch)test_iter = DataLoader(to_map_style_dataset(test_data), batch_size=BATCH_SIZE,shuffle=True, drop_last=True, collate_fn=generate_batch)
12
将德语翻译成英语之搭建模型
下一步是创建模型来训练数据。可以通过 make_model 函数传递参数来创建模型,还可以使用 model.cuda() 来确保模型在 GPU 可用的情况下在 GPU 上进行训练。以下超参的值是根据经验选择的。
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')model = make_model(device, vocab_src, vocab_trg,n_layers=3, n_heads=8, d_model=256,d_ffn=512, max_length=50)model.cuda()
还可以预览模型的全部可训练参数,以评估其规模。
def count_parameters(model):return sum(p.numel() for p in model.parameters() if p.requires_grad)print(f'The model has {count_parameters(model):,} trainable parameters')
输出如下:
The model has 9,159,362 trainable parameters.
13
将德语翻译成英语之设置训练参数
LEARNING_RATE = 0.0005optimizer = torch.optim.Adam(model.parameters(), lr = LEARNING_RATE)criterion = nn.CrossEntropyLoss(ignore_index = PAD_IDX)
该模型可使用以下函数进行训练,这些函数是每个epoch训练期间要执行的步骤。模型根据损失函数更新参数。最后,函数会返回该周期内各批次的平均损失。
def train(model, iterator, optimizer, criterion, clip):"""Train the model on the given data.Args:model: Transformer model to be trainediterator: data to be trained onoptimizer: optimizer for updating parameterscriterion: loss function for updating parametersclip: value to help prevent exploding gradientsReturns:loss for the epoch"""# set the model to training modemodel.train()epoch_loss = 0# loop through each batch in the iteratorfor i, batch in enumerate(iterator):# set the source and target batchessrc,trg = batch# zero the gradientsoptimizer.zero_grad()# logits for each outputlogits = model(src, trg[:,:-1])# expected outputexpected_output = trg[:,1:]# calculate the lossloss = criterion(logits.contiguous().view(-1, logits.shape[-1]),expected_output.contiguous().view(-1))# backpropagationloss.backward()# clip the weightstorch.nn.utils.clip_grad_norm_(model.parameters(), clip)# update the weightsoptimizer.step()# update the lossepoch_loss += loss.item()# return the average loss for the epochreturn epoch_loss / len(iterator)
def evaluate(model, iterator, criterion):"""Evaluate the model on the given data.Args:model: Transformer model to be trainediterator: data to be evaluatedcriterion: loss function for assessing outputsReturns:loss for the data"""# set the model to evaluation modemodel.eval()epoch_loss = 0# evaluate without updating gradientswith torch.no_grad():# loop through each batch in the iteratorfor i, batch in enumerate(iterator):# set the source and target batchessrc, trg = batch# logits for each outputlogits = model(src, trg[:,:-1])# expected outputexpected_output = trg[:,1:]# calculate the lossloss = criterion(logits.contiguous().view(-1, logits.shape[-1]),expected_output.contiguous().view(-1))# update the lossepoch_loss += loss.item()# return the average loss for the epochreturn epoch_loss / len(iterator)
def epoch_time(start_time, end_time):elapsed_time = end_time - start_timeelapsed_mins = int(elapsed_time / 60)elapsed_secs = int(elapsed_time - (elapsed_mins * 60))return elapsed_mins, elapsed_secs
14
将德语翻译成英语之模型训练
N_EPOCHS = 10CLIP = 1best_valid_loss = float('inf')# loop through each epochfor epoch in range(N_EPOCHS):start_time = time.time()# calculate the train loss and update the parameterstrain_loss = train(model, train_iter, optimizer, criterion, CLIP)# calculate the loss on the validation setvalid_loss = evaluate(model, valid_iter, criterion)end_time = time.time()# calculate how long the epoch tookepoch_mins, epoch_secs = epoch_time(start_time, end_time)# save the model when it performs better than the previous runif valid_loss < best_valid_loss:best_valid_loss = valid_losstorch.save(model.state_dict(), 'transformer-model.pt')print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
Epoch: 01 | Time: 0m 21sTrain Loss: 4.534 | Train PPL: 93.169Val. Loss: 3.474 | Val. PPL: 32.280Epoch: 02 | Time: 0m 13sTrain Loss: 3.219 | Train PPL: 24.992Val. Loss: 2.735 | Val. PPL: 15.403Epoch: 03 | Time: 0m 13sTrain Loss: 2.544 | Train PPL: 12.733Val. Loss: 2.225 | Val. PPL: 9.250Epoch: 04 | Time: 0m 14sTrain Loss: 2.096 | Train PPL: 8.131Val. Loss: 1.980 | Val. PPL: 7.246Epoch: 05 | Time: 0m 13sTrain Loss: 1.801 | Train PPL: 6.055Val. Loss: 1.829 | Val. PPL: 6.229Epoch: 06 | Time: 0m 14sTrain Loss: 1.588 | Train PPL: 4.896Val. Loss: 1.743 | Val. PPL: 5.717Epoch: 07 | Time: 0m 13sTrain Loss: 1.427 | Train PPL: 4.166Val. Loss: 1.700 | Val. PPL: 5.476Epoch: 08 | Time: 0m 13sTrain Loss: 1.295 | Train PPL: 3.650Val. Loss: 1.679 | Val. PPL: 5.358Epoch: 09 | Time: 0m 13sTrain Loss: 1.184 | Train PPL: 3.268Val. Loss: 1.677 | Val. PPL: 5.349Epoch: 10 | Time: 0m 13sTrain Loss: 1.093 | Train PPL: 2.984Val. Loss: 1.677 | Val. PPL: 5.351
# load the weightsmodel.load_state_dict(torch.load('transformer-model.pt'))# calculate the loss on the test settest_loss = evaluate(model, test_iter, criterion)print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f}')
结果如下:
Test Loss: 1.692 | Test PPL: 5.430
虽然损失大幅减少,但没有迹象表明该模型在从德语翻译成英语方面有多成功。这可以通过两种方法来评估。第一种方法是向模型提供一个句子,并在推理过程中预览其翻译效果。第二种方法是通过另一种指标来计算其准确性,比如翻译任务的标准指标 BLEU。
15
将德语翻译成英语之模型推理
def translate_sentence(sentence, model, device, max_length = 50):"""Translate a German sentence to its English equivalent.Args:sentence: German sentence to be translated to English; list or strmodel: Transformer model used for translationdevice: device to perform translation onmax_length: maximum token length for translationReturns:src: return the tokenized inputtrg_input: return the input to the decoder before the final outputtrg_output: return the final translation, shifted rightattn_probs: return the attention scores for the decoder headsmasked_attn_probs: return the masked attention scores for the decoder heads"""model.eval()# tokenize and index the provided stringif isinstance(sentence, str):src = ['<bos>'] + [token.text.lower() for token in spacy_de(sentence)] + ['<eos>']else:src = ['<bos>'] + sentence + ['<eos>']# convert to integerssrc_indexes = [vocab_src[token] for token in src]# convert list to tensorsrc_tensor = torch.tensor(src_indexes).int().unsqueeze(0).to(device)# set <bos> token for target generationtrg_indexes = [vocab_trg.get_stoi()['<bos>']]# generate new tokensfor i in range(max_length):# convert the list to a tensortrg_tensor = torch.tensor(trg_indexes).int().unsqueeze(0).to(device)# generate the next tokenwith torch.no_grad():# generate the logitslogits = model.forward(src_tensor, trg_tensor)# select the newly predicted tokenpred_token = logits.argmax(2)[:,-1].item()# if <eos> token or max length, stop generatingif pred_token == vocab_trg.get_stoi()['<eos>'] or i == (max_length-1):# decoder inputtrg_input = vocab_trg.lookup_tokens(trg_indexes)# decoder outputtrg_output = vocab_trg.lookup_tokens(logits.argmax(2).squeeze(0).tolist())return src, trg_input, trg_output, model.decoder.attn_probs, model.decoder.masked_attn_probs# else, continue generatingelse:# add the tokentrg_indexes.append(pred_token)
# 'a woman with a large purse is walking by a gate'src = ['eine', 'frau', 'mit', 'einer', 'großen', 'geldbörse', 'geht', 'an', 'einem', 'tor', 'vorbei', '.']src, trg_input, trg_output, attn_probs, masked_attn_probs = translate_sentence(src, model, device)print(f'source = {src}')print(f'target input = {trg_input}')print(f'target output = {trg_output}')
source = ['<bos>', 'eine', 'frau', 'mit', 'einer', 'großen', 'geldbörse', 'geht', 'an', 'einem', 'tor', 'vorbei', '.', '<eos>']target input = ['<bos>', 'a', 'woman', 'with', 'a', 'large', 'purse', 'walking', 'past', 'a', 'gate', '.']target output = ['a', 'woman', 'with', 'a', 'large', 'purse', 'walking', 'past', 'a', 'gate', '.', '<eos>']
display_attention(src, trg_input, attn_probs)

display_attention(trg_input, trg_input, masked_attn_probs)

16
将德语翻译成英语之模型评价
< 10:几乎无用
10-19: 难以理解
20-29:可以理解,但有明显语法错误
30-39:可以理解到良好
40-49: 高质量
50-59: 高质量、充分、流畅
60:优于人类质量
要计算 BLEU 分数,需要生成模型的预测值及其预期值。这可以通过下面的函数来完成,该函数使用了 translate_sentence 函数。
def compute_metrics(model, iterator):"""Generate predictions for the provided iterator.Args:model: Transformer model to be trainediterator: data to be evaluatedReturns:predictions: list of predictions, which are tokenized stringslabels: list of expected output, which are tokenized strings"""# set the model to evaluation modemodel.eval()predictions = []labels = []# evaluate without updating gradientswith torch.no_grad():# loop through each batch in the iteratorfor i, batch in enumerate(iterator):# set the source and target batchessrc, trg = batch# predict the outputsrc_out, trg_input, trg_output, attn_probs, masked_attn_probs = translate_sentence(vocab_src.lookup_tokens(src.tolist()), model, device)# prediction | remove <eos> tokenpredictions.append(trg_output[:-1])# expected output | add extra dim for calculationlabels.append([vocab_trg.lookup_tokens(trg.tolist())])# return the average loss for the epochreturn predictions, labels
之前生成的 test_data可以传递给 compute_metrics 函数。然后,可以将预测结果和标签传递给 torchtext.data.metrics 中的 bleu_score,以计算 BLEU 分数。
from torchtext.data.metrics import bleu_scorebleu_score(predictions, labels)
0.3588869571685791
这一输出结果表明翻译效果良好,是本教程可以接受的结果。
本例完成后,《手撕Transformer》系列也就结束了。
Packages
!pip install -q portalocker# importing required librariesimport mathimport copyimport timeimport randomimport spacyimport numpy as npimport os# torch packagesimport torchimport torch.nn as nnimport torch.nn.functional as Ffrom torch import Tensorimport torch.optim as optim# load and build datasetsimport torchtextfrom torchtext.data.functional import to_map_style_datasetfrom torch.nn.functional import padfrom torch.utils.data import DataLoaderfrom torchtext.vocab import build_vocab_from_iteratorimport torchtext.datasets as datasetsimport portalocker# visualization packagesfrom mpl_toolkits import mplot3dimport matplotlib.pyplot as pltimport matplotlib.ticker as ticker
Loading the Tokenizers
def load_tokenizers():"""Load the German and English tokenizers provided by spaCy.Returns:spacy_de: German tokenizerspacy_en: English tokenizer"""try:spacy_de = spacy.load("de_core_news_sm")except OSError:os.system("python -m spacy download de_core_news_sm")spacy_de = spacy.load("de_core_news_sm")try:spacy_en = spacy.load("en_core_web_sm")except OSError:os.system("python -m spacy download en_core_web_sm")spacy_en = spacy.load("en_core_web_sm")print("Loaded English and German tokenizers.")return spacy_de, spacy_en
Tokenize the Sequences
def tokenize(text: str, tokenizer):"""Split a string into its tokens using the provided tokenizer.Args:text: stringtokenizer: tokenizer for the languageReturns:tokenized list of strings"""return [tok.text.lower() for tok in tokenizer.tokenizer(text)]
Yield Tokens
def yield_tokens(data_iter, tokenizer, index: int):"""Return the tokens for the appropriate language.Args:data_iter: text heretokenizer: tokenizer for the languageindex: index of the language in the tuple | (de=0, en=1)Yields:sequences based on index"""for from_tuple in data_iter:yield tokenizer(from_tuple[index])
Building the Vocabulary
def build_vocabulary(spacy_de, spacy_en, min_freq: int = 2):def tokenize_de(text: str):"""Call the German tokenizer.Args:text: stringmin_freq: minimum frequency needed to include a word in the vocabularyReturns:tokenized list of strings"""return tokenize(text, spacy_de)def tokenize_en(text: str):"""Call the English tokenizer.Args:text: stringReturns:tokenized list of strings"""return tokenize(text, spacy_en)print("Building German Vocabulary...")# load train, val, and test data pipelinestrain, val, test = datasets.Multi30k(language_pair=("de", "en"))# generate source vocabularyvocab_src = build_vocab_from_iterator(yield_tokens(train + val + test, tokenize_de, index=0), # tokens for each German sentence (index 0)min_freq=min_freq,specials=["<bos>", "<eos>", "<pad>", "<unk>"],)print("Building English Vocabulary...")# generate target vocabularyvocab_trg = build_vocab_from_iterator(yield_tokens(train + val + test, tokenize_en, index=1), # tokens for each English sentence (index 1)min_freq=2, #specials=["<bos>", "<eos>", "<pad>", "<unk>"],)# set default token for out-of-vocabulary words (OOV)vocab_src.set_default_index(vocab_src["<unk>"])vocab_trg.set_default_index(vocab_trg["<unk>"])return vocab_src, vocab_trg
Load the Vocabulary
def load_vocab(spacy_de, spacy_en, min_freq: int = 2):"""Args:spacy_de: German tokenizerspacy_en: English tokenizermin_freq: minimum frequency needed to include a word in the vocabularyReturns:vocab_src: German vocabularyvocab_trg: English vocabulary"""if not os.path.exists("vocab.pt"):# build the German/English vocabulary if it does not existvocab_src, vocab_trg = build_vocabulary(spacy_de, spacy_en, min_freq)# save it to a filetorch.save((vocab_src, vocab_trg), "vocab.pt")else:# load the vocab if it existsvocab_src, vocab_trg = torch.load("vocab.pt")print("Finished.\nVocabulary sizes:")print("\tSource:", len(vocab_src))print("\tTarget:", len(vocab_trg))return vocab_src, vocab_trg
Indexing Sequences
def data_process(raw_data):"""Process raw sentences by tokenizing and converting to integers based onthe vocabulary.Args:raw_data: German-English sentence pairsReturns:data: tokenized data converted to index based on vocabulary"""data = []# loop through each sentence pairfor (raw_de, raw_en) in raw_data:# tokenize the sentence and convert each word to an integersde_tensor_ = torch.tensor([vocab_src[token.text.lower()] for token in spacy_de.tokenizer(raw_de)], dtype=torch.long)en_tensor_ = torch.tensor([vocab_trg[token.text.lower()] for token in spacy_en.tokenizer(raw_en)], dtype=torch.long)# append tensor representationsdata.append((de_tensor_, en_tensor_))return data
Generating Batches
def generate_batch(data_batch):"""Process indexed-sequences by adding <bos>, <eos>, and <pad> tokens.Args:data_batch: German-English indexed-sentence pairsReturns:two batches: one for German and one for English"""de_batch, en_batch = [], []# for each sentencefor (de_item, en_item) in data_batch:# add <bos> and <eos> indices before and after the sentencede_temp = torch.cat([torch.tensor([BOS_IDX]), de_item, torch.tensor([EOS_IDX])], dim=0).to(device)en_temp = torch.cat([torch.tensor([BOS_IDX]), en_item, torch.tensor([EOS_IDX])], dim=0).to(device)# add paddingde_batch.append(pad(de_temp,(0, # dimension to padMAX_PADDING - len(de_temp), # amount of padding to add),value=PAD_IDX,))# add paddingen_batch.append(pad(en_temp,(0, # dimension to padMAX_PADDING - len(en_temp), # amount of padding to add),value=PAD_IDX,))return torch.stack(de_batch), torch.stack(en_batch)
Displaying Attention
def display_attention(sentence: list, translation: list, attention: Tensor,n_heads: int = 8, n_rows: int = 4, n_cols: int = 2):"""Display the attention matrix for each head of a sequence.Args:sentence: German sentence to be translated to English; listtranslation: English sentence predicted by the modelattention: attention scores for the headsn_heads: number of headsn_rows: number of rowsn_cols: number of columns"""# ensure the number of rows and columns are equal to the number of headsassert n_rows * n_cols == n_heads# figure sizefig = plt.figure(figsize=(15,25))# visualize each headfor i in range(n_heads):# create a plotax = fig.add_subplot(n_rows, n_cols, i+1)# select the respective head and make it a numpy array for plotting_attention = attention.squeeze(0)[i,:,:].cpu().detach().numpy()# plot the matrixcax = ax.matshow(_attention, cmap='bone')# set the size of the labelsax.tick_params(labelsize=12)# set the indices for the tick marksax.set_xticks(range(len(sentence)))ax.set_yticks(range(len(translation)))# if the provided sequences are sentences or indicesif isinstance(sentence[0], str):ax.set_xticklabels([t.lower() for t in sentence], rotation=45)ax.set_yticklabels(translation)elif isinstance(sentence[0], int):ax.set_xticklabels(sentence)ax.set_yticklabels(translation)plt.show()
点击上方小卡片关注我
添加个人微信,进专属粉丝群!


