大数跨境

使用PyTorch实现Transformer的Encoder编码器

使用PyTorch实现Transformer的Encoder编码器 AI探索时代
2025-01-07
2
导读:神经网络一定要多动手,多实践;把理论和实践相结合才能学得更好,更快

 神经网络一定要多动手,多实践;把理论和实践相结合才能学得更好,更快。



在前面关于Transformer架构的Encoder-Decoder,编码器-解码器结构的文章中介绍过,编码器和解码器是Transformer的核心结构,也是Transformer的载体;但而今天就来揭秘一下Transformer的编码器具体是怎么实现的。


今天使用PyTorch神经网络框架实现Transformer的编码器;Transformer是一种实现神经网络的架构或者叫算法;而PyTorch是FaceBook(现在的Meta公司)开发的一款神经网络框架,由python语言实现,里面封装了神经网络常用的一些结构与算法;其底层是C语言实现,因此效率较高。






记忆PyTorch实现的Transformer编码器




Transformer的编码器器,核心就在于其论文中的这张架构图:



如上图所示,Encoder是由一个输入Embedding,一个Positional Encoding和以及多个Encoder-block组成;而每个block都由一个多头注意力层(Multi-Head Attenttion),一个前馈全连接层(Feed Forward)和两个规范化层和残差连接层(Add & Norm)组成。



在神经网络中 每一层的输入都来自上一层的输出 每一层的输出 都来自上一层的输入  只有两个特殊点 输入层和输出层 输入层的输入来自用户 输出层的输出就是神经网络的执行结果 


pytorch框架,从代码结构看,神经网络类主要有两个模块;一个是init方法 另一个是forward,init主要用来实现一些初始化工作,而forward 才是神经网络主要的实现方法。


而神经网络整体来看,就是一个数学模型,不断的进行数学计算,包括向量计算,数值运算等。


它很好的诠释了机器学习的核心思路:“将现实问题转化为数学问题,通过求解数学问题,从而解决现实问题。”而如何转化和求解 或者说 如何设计编码器与解码器,这个就因架构而异了。



因此,我们就需要使用PyTorch来实现这一个一个的网络层,代码如下所示:


使用的python版本是3.9 pytorch等其它模块采用默认版本即可;此代码可以直接执行;如果执行过程中报缺少包,可自行按照提示安装即可。

# pytorch 实现transformerimport copy
import numpy as npimport torchimport torch.nn as nnimport mathimport torch.nn.functional as F
from torch.autograd import Variable

class Embedding(nn.Module): def __init__(self, vocab, embe_dim): """ :param vocab: 词表的大小 :param embe_dim: 词嵌入的维度 """ super(Embedding, self).__init__() self.lut = nn.Embedding(vocab, embe_dim, padding_idx=0) self.embe_dim = embe_dim
def forward(self, x): """ :param x: 输入给模型的文本通过词汇映射后的张量 :return: """ return self.lut(x) * math.sqrt(self.d_model)

class TextEmbedding(nn.Module): def __init__(self, vocab, d_model): """ :param vocab: 词表大小 :param d_model: 词嵌入的维度 """ super(TextEmbedding, self).__init__() self.lut = nn.Embedding(vocab, d_model, padding_idx=0) print('lut', self.lut.weight) # self.lut.weight.data = torch.zeros(2, 5) print('lut n', self.lut.weight) self.d_model = d_model
# self.lut = nn.Embedding(100, 10) word_embed = self.lut(Variable(torch.LongTensor([50]))) print('word_embed', word_embed)
def forward(self, x): """ :param x: 输入给模型的文本通过词汇映射后的张量 :return: """ return self.lut(x) * math.sqrt(self.d_model)
# 位置编码class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout, max_len=5000): """ :param d_model: 词嵌入维度 :param dropout: 丢失率 :param max_len: 每个句子的最长长度 """
super(PositionalEncoding, self).__init__() # 实例化dropout层 self.dpot = nn.Dropout(p=dropout) # 初始化位置编码矩阵 pe = torch.zeros(max_len, d_model) # 初始化绝对位置矩阵 # position 矩阵size为(max_len, 1) position = torch.arange(0, max_len).unsqueeze(1)
# 将绝对位置矩阵和位置编码矩阵特征融合 # 定义一个变换矩阵 跳跃式初始化 div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term)
# 将二维张量扩充成三维张量 pe = pe.unsqueeze((0)) self.register_buffer('pe', pe)
def forward(self, x): """ :param x: 文本的词嵌入表示 :return: """ x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False) return self.dpot(x)

""" 掩码张量 掩码张量一般只有0和1 代表位置被遮掩或不被遮掩 在transformer中, 掩码张量的作用在应用attention时有一些生成的attention 变量中的值有可能已知了未来的信息而得到的未来信息被看到是因为 训练时会把整个输出结果都一次性进行Embedding"""

def subsequent_mask(size): atten_shape = (1, size, size) # 对角线下就是负 对角线上就是正 对角线就是0 mask = np.triu(np.ones(atten_shape), k=1).astype('uint8') return torch.from_numpy(1 - mask)

def attention(query, key, value, mask=None, dropout=None): """
:param query: :param key: :param value: :param mask: 掩码张量 :param dropout: :return: query 在key和value作用下的表示 """
d_k = query.size(-1) # 按照注意力公式 将query 与 key转置相乘 这里的key是将最后两个维度进行转置 再除以缩放系数 # 得到注意力的得分张量 score score = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) if mask is not None: # 使用tensor 的masked_fill方法,将掩码张量和scores张量每个位置一一比较 score = score.masked_fill(mask == 0, -1e9)
p_atten = F.softmax(score, dim=-1)
if dropout is not None: p_atten = dropout(p_atten)
# 返回注意力表示 return torch.matmul(p_atten, value.float()), p_atten

def clones(module, N): """ 生成相同的网络层的克隆函数 :param module: 目标网络层 :param N: 克隆数量 :return: """ return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class MultiHeadAtten(nn.Module): def __init__(self, head, embedding_dim, dropout=0.1): """ :param head: 头数 :param embedding_dim: 词嵌入维度 :param dropout: """ super(MultiHeadAtten, self).__init__()
# 在函数中 首先使用了一个测试中常用的assert语句, 判断h是否能被此嵌入维度整除 assert embedding_dim % head == 0
# 得到每个头获得的分割词向量维度d_K self.d_k = embedding_dim // head
# 获得多头 self.head = head # 克隆四个全连接层对象 通过nn的Linear实例化 self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)
self.attn = None self.dpot = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None): # 前向逻辑函数 if mask is not None: # 扩展维度 代表多头中的第i个头 mask = mask.unsqueeze(1)
batch_size = query.size(0)
""" 首先就进入多头处理环节 首先利用zip将输入QKV与三个全连接层组到一起 然后使用for循环 将输入QKV分别传到线性层中 做完线性变换后,开始为每个头风格输入 使用view 方法对线性变换的结果进行维度重塑 这样就意味着每个头可以获得一部分词特征组成的句子 其中的-1代表自适应维度 计算机会根据这种变换自动计算这里的值 然后对第二维和第三维进行置换操作 lis = [query, key, value] r = [] for step.model in enumerate(self.linears): r.append(model(lis[step]) """ query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2) for model, x in zip(self.linears, (query, key, value))]
""" 得到每个头的输入后 接下来就是将他们传入到attention中 """ x, self.attn = attention(query, key, value, mask=mask, dropout=self.dpot)
x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)
return self.linears[-1](x)

class FeedForward(nn.Module): def __init__(self, d_model, d_ff, dropout=0.1): """ :param d_model: 线性层输入维度 :param d_ff: 线性层输出维度 :param dropout: """ super(FeedForward, self).__init__() self.w1 = nn.Linear(d_model, d_ff) self.w2 = nn.Linear(d_ff, d_model) self.dpot = nn.Dropout(dropout)
def forward(self, x): return self.w2(self.dpot(F.relu(self.w1(x))))

# 规范化层class LayerNorm(nn.Module): def __init__(self, features, eps=1e-6): """ :param features: 代表词嵌入的维度 :param eps: """ super(LayerNorm, self).__init__() self.a2 = nn.Parameter(torch.ones(features)) self.b2 = nn.Parameter(torch.zeros(features))
# 防止分母为0 self.eps = eps
def forward(self, x): print('x', type(x)) # 对输入变量x 求其最后一个维度的均值 并保持输出维度与输入维度一致 mean = x.mean(-1, keepdim=True)
# 接着再求最后一个维度的标准差 std = x.std(-1, keepdim=True) # 然后就是规范化公式 用x减去均值除以标准差获得规范化的结果 return self.a2 * (x - mean) / (std + self.eps) + self.b2

# 残差连接实现class ResConnect(nn.Module): def __init__(self, size, dropout=0.1): super(ResConnect, self).__init__() self.norm = LayerNorm(size) self.dpot = nn.Dropout(p=dropout)
def forward(self, x, sublayer): """ 接受上一个层或子层的输入作为第一个参数 将该子层连接中的子层函数作为第二个参数
:param x: :param sublayer: :return: """ return x + self.dpot(sublayer(self.norm(x)))

class SublayerConnection(nn.Module): def __init__(self, size, dropout=0.1): super(SublayerConnection, self).__init__() self.norm = LayerNorm(size) self.dpot = nn.Dropout(p=dropout)
def forward(self, x, sublayer): return x + self.dpot(sublayer(self.norm(x)))

if __name__ == '__main__': vocab = 500 d_model = 512 dropout = 0.2 inputs = torch.randint(low=0, high=100, size=(5, 10), dtype=torch.long)
# 文本嵌入层 TE = TextEmbedding(vocab, d_model)
# 位置编码层 PE = PositionalEncoding(d_model, dropout, max_len=10)
TER = TE(inputs)
print('TER', TER.shape)
PER = PE(TER)
print('PER', PER.shape)
# 实例化多头注意力机制 head = 8 MHA = MultiHeadAtten(head=head, embedding_dim=d_model, dropout=dropout) MHAR = lambda x: MHA(x, x, x) SLC1 = SublayerConnection(d_model, dropout=dropout) SLC1R = SLC1(PER, MHAR) print('SLC1R', SLC1R.shape)
FF = FeedForward(d_model, 1024, dropout=dropout) SLC2 = SublayerConnection(d_model, dropout=dropout) SLC2R = SLC2(SLC1R, FF) print(SLC2R.shape)


也可以在公众号回复——编码器,获得代码。


pytorch实现Transoformer编码器
https://blog.csdn.net/qq_43462005/article/details/115640339








【声明】内容源于网络
0
0
AI探索时代
专注AI工程化落地,让你的技术不再纸上谈兵
内容 0
粉丝 0
AI探索时代 专注AI工程化落地,让你的技术不再纸上谈兵
总阅读0
粉丝0
内容0