大数跨境
0
0

使用Pytorch手写Qwen3-0.6B推理

使用Pytorch手写Qwen3-0.6B推理 睐芯科技LightSense
2025-11-25
14

 

学习了Transformer基本概念之后需要实践一下,为了更好的理解过程,选择了使用Pytorch手写大模型的推理。本篇文章分享一波编写过程。

模型使用开源的Qwen3-0.6B,huggingface的仓库地址是:

https://huggingface.co/Qwen/Qwen3-0.6B/tree/main。

准备环境

第一步需要下载我们的模型,直接在huggingface里下载就可以。我们需要下载下面这几个文件:

  • • config.json 这个里面描述了模型的基本配置项
  • • model.safetensors 模型文件,这个格式是huggingface搞的,我们可以用对应的safetensors包去加载文件
  • • tokenizer.json 包含分词所需要的信息
  • • vocab.json token到id的映射

接下来配置好python环境就可以开始写了

分析模型结构

接下来要做的就是分析模型结构和输入参数。使用safetensors包的load_file加载模型,读取权重:

state_dict = load_file('model/model.safetensors')
for k,p in state_dict.items():
  print(f"k:{k} p:{p.shape}")

输出太长了我分开贴一下:

k:model.embed_tokens.weight p:torch.Size([151936, 1024])
k:model.norm.weight p:torch.Size([1024])
k:lm_head.weight p:torch.Size([151936, 1024])

这3个表示model下面有

  • • embed_tokens:处理词嵌入,把token转为张量,矩阵shape为[vocab_size, hidden_size]
  • • norm 归一化:线性变换,shape是[hidden_size]

模型最外层有一个lm_head负责输出,是一个线性变换,把模型内部状态映射为词汇表上的概率,矩阵shape为[vocab_size,hidden_size]

第二部分权重定义就是多个layer,因为Qwen3是decode only的结构,所以每个layer都是一个Decoder:

k:model.layers.0.input_layernorm.weight p:torch.Size([1024])
k:model.layers.0.mlp.down_proj.weight p:torch.Size([1024, 3072])
k:model.layers.0.mlp.gate_proj.weight p:torch.Size([3072, 1024])
k:model.layers.0.mlp.up_proj.weight p:torch.Size([3072, 1024])
k:model.layers.0.post_attention_layernorm.weight p:torch.Size([1024])
k:model.layers.0.self_attn.k_norm.weight p:torch.Size([128])
k:model.layers.0.self_attn.k_proj.weight p:torch.Size([1024, 1024])
k:model.layers.0.self_attn.o_proj.weight p:torch.Size([1024, 2048])
k:model.layers.0.self_attn.q_norm.weight p:torch.Size([128])
k:model.layers.0.self_attn.q_proj.weight p:torch.Size([2048, 1024])
k:model.layers.0.self_attn.v_proj.weight p:torch.Size([1024, 1024])

在Qwen3里,这样的Decoder一共有28个,也就是model.layers.0 - model.layers.27

简要看一下相关的权重定义:

  • • input_layernorm:把输入参数做归一化,防止梯度爆炸,shape为[hidden_size]
  • • mlp:这个是Qwen3的FFN层,叫做MLP,包括3个计算,分别是down_proj,gate_proj,up_proj。都是线性变换
  • • post_attention_layernorm:注意力之后,FNN之前的归一化,shape为[hidden_size]
  • • self_attn:自注意力相关的参数
    • • k_proj、q_proj、v_proj:把token的隐藏向量站换为KQV向量,所以他们的shape均为[hidden_size,hidden_size],但是因为Q有16头,KV是8头,使用了分组查询注意力机制,每2个Q共享一组KV,所以Q相关的向量运算需要扩维,所以q_proj.weight的shape为[num_attention_heads/num_key_value_heads*hidden_size, hidden_size],也就是[2048,1024]
    • • k_norm、q_norm:线性变换,对K和Q向量做归一化,目的是稳定注意力分数的分布,提升稳定性。此变换只对每个头内部的 head_dim 维度进行归一化,所以shape为[head_dim]
    • • o_proj:输出投影,把softmax计算出来的结果投影回原来的维度,shape是[1024,2048]

定义模型

分析完模型结果,我们开始编写python代码定义模型结构。

模型

我们使用pytorch定义每一层的参数和forward方法,先从参数开始。正规写法参数需要解析本地的config.json,因为是demo,我们直接定一个Config对象,把字段和值写死:

class SelfQwen3Config:
  architectures = ["Qwen3ForCausalLM"]
  attention_bias = False
  attention_dropout = 0.0
  bos_token_id = 151643
  eos_token_id = 151645
  head_dim = 128
  hidden_act = "silu"
  hidden_size = 1024
  initializer_range = 0.02
  intermediate_size = 3072
  max_position_embeddings = 40960
  max_window_layers = 28
  model_type = "qwen3"
  num_attention_heads = 16
  num_hidden_layers = 28
  num_key_value_heads = 8
  rms_norm_eps = 1e-06
  rope_scaling = None
  rope_theta = 1000000
  sliding_window = None
  tie_word_embeddings = True
  torch_dtype = "bfloat16"
  transformers_version = "4.51.0"
  use_cache = True
  use_sliding_window = False
  vocab_size = 151936

定义模型类,根据模型权重的定义定义相关参数,这里 DecoderLayer、SelfQwen3RMSNorm自己定义,其他的直接用 torch 包里面的工具即可:

class SelfQwen3Model(nn.Module):
  def __init__(self, config):
    print("init Qwen3Model")
    super().__init__()
    self.config = config
    self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)
    # 28个decoder
    self.layers = nn.ModuleList([DecoderLayer(config) for _ in range(config.num_hidden_layers)])
    self.norm = SelfQwen3RMSNorm(config.hidden_size)
    self.lm_head = nn.Linear(config.hidden_size, config.vocab_size,bias=False)
    self.lm_head.weight = self.embed_tokens.weight

接下来写forward方法,也就是前向推理过程,模型推理过程需要几个参数

  • • token_ids:词分割后映射后的id向量
  • • attention_mask:注意力掩码
  • • position_ids:位置编码向量
def forward(self,input_ids,attention_mask=None,position_ids=None):
  bsz,q_len=inputs.shape
  #生成位置编码
  positions_ids=torch.arange(q_len, dtype=torch.long, device=input_ids.device).unsqueeze(0)
  #生成注意力掩码
  attention_mask = torch.ones((bsz, q_len), device=input_ids.device, dtype=torch.bool)
  seq_len = input_ids.size(1)
  #生成因果注意力掩码
  causal_mask = torch.triu(
    torch.full((seq_len, seq_len), float('-inf'), dtype=torch.float32, device=input_ids.device),
    diagonal=1
  )
  causal_mask = causal_mask.unsqueeze(0).unsqueeze(0)
  #词嵌入
  hidden_states = self.embed_tokens(input_ids)
  # decoder
  for layer in self.layers:
    hidden_states = layer(hidden_states, position_ids=position_ids, attention_mask=causal_mask)
  #层归一化
  hidden_states = self.norm(hidden_states)
  #映射词表概率
  logits = self.lm_head(hidden_states)
  return logits

Decoder

接着定义Decoder,这里先定义好子层的class:

class DecoderLayer(nn.Module):
  def __init__(self, config):
  super().__init__()
  # 注意力层
  self.self_attn = Attention(config)
  # 归一化
  self.post_attention_layernorm = SelfQwen3RMSNorm(config.hidden_size)
  # 前馈层
  self.mlp=MLP(config)
  # 归一化
  self.input_layernorm=SelfQwen3RMSNorm(config.hidden_size)

前向推理过程:

Qwen3也适用Pre layer normalization的结构

所以推理流程也为 层归一化->注意力机制->层归一化->FNN,每次层归一化+计算之后再做一次残差连接,代码如下:

def forward(self, hidden_states,position_ids=None, attention_mask=None):
  residual = hidden_states
  # 层归一化
  hidden_states = self.input_layernorm(hidden_states)
  # 注意力机制
  hidden_states = self.self_attn(hidden_states, position_ids, attention_mask)
  # 残差连接
  hidden_states = residual + hidden_states
  residual = hidden_states
  # 层归一化
  hidden_states = self.post_attention_layernorm(hidden_states)
  # fnn
  hidden_states = self.mlp(hidden_states)
  # 残差连接
  hidden_states = residual + hidden_states
  return hidden_states

子层定义:

层归一化

def __init__(self, hidden_size, eps=1e-6):
  super().__init__()
  self.weight = nn.Parameter(torch.ones(hidden_size))
  self.variance_epsilon = eps

eps为归一化公式需要的参数,Qwen3层归一化公式为:

让AI实现一下这个算法:

def forward(self, hidden_states):
  input_dtype = hidden_states.dtype
  hidden_states = hidden_states.to(torch.float32)
  variance = hidden_states.pow(2).mean(-1, keepdim=True)
  hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
  return self.weight * hidden_states.to(input_dtype)

注意力子层

定义,主要根据config.json的配置和权重配置来定义,主要是shape参数需要传对:

class Attention(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config
    self.hidden_size = config.hidden_size
    self.num_heads = config.num_attention_heads
    self.num_key_value_heads = config.num_key_value_heads
    self.head_dim = config.head_dim
    self.num_key_value_groups = self.num_heads

    self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)
    self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=False)
    self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=False)
    self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

    self.q_norm = SelfQwen3RMSNorm(self.head_dim, eps=config.rms_norm_eps)
    self.k_norm = SelfQwen3RMSNorm(self.head_dim, eps=config.rms_norm_eps)
    self.rope_theta = config.rope_theta

Qwen3注意力机制运算过程如下图:

这个流程基本保持了Transformer定义的注意力机制流程,中间也加入了一些Qwen3的机制(也是现在主流大模型采用的机制),例如QK归一化、RoPE位置编码算法。其中KV向量每次运算后会进行缓存,下次直接从缓存使用,不需要重复生成,可以提高推理的速度。

接下来就是推理流程代码,demo里我们忽略掉KV cache的逻辑,这个只会推理稍慢,不会影响推理结果的正确性:

def forward(self, hidden_states, position_ids=None, attention_mask=None):
  bsz, q_len, _ = hidden_states.size()
  # 得到QKV
  query_states = self.q_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
  key_states = self.k_proj(hidden_states).view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
  value_states = self.v_proj(hidden_states).view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 
  # QK归一化
  query_states = self.q_norm(query_states)
  key_states = self.k_norm(key_states)
  # 应用位置编码
  query_states, key_states = apply_rotary_pos_emb(
    query_states, key_states, position_ids, self.head_dim, self.rope_theta
  )
  # 多头分组查询
  if self.num_key_value_groups > 1:
    # head_dim维度扩大
    key_states = key_states.unsqueeze(2).expand(-1, -1, self.num_key_value_groups, -1, -1).flatten(1, 2)
    value_states = value_states.unsqueeze(2).expand(-1, -1, self.num_key_value_groups, -1, -1).flatten(1, 2)
  # 计算注意力得分
  attn_weights = torch.matmul(query_states, key_states.transpose(-2, -1)) / (self.head_dim ** 0.5)   
  if attention_mask is not None:
    attn_weights = attn_weights + attention_mask
  # 计算权重  
  attn_weights = torch.softmax(attn_weights, dim=-1, dtype=torch.float32).to(hidden_states.dtype)
  # 加权求和
  attn_output = torch.matmul(attn_weights, value_states)

  # 合并多头
  attn_output = attn_output.transpose(1, 2).contiguous().view(bsz, q_len, -1)
  # 输出投影
  attn_output = self.o_proj(attn_output)
  return attn_output

多头分组查询这里需要着重解释下:

Qwen3的num_attention_heads是16,num_key_value_heads是8,也就是说,每2个Query头共享一组KV头。

但是在atten的计算过程里,我们需要每个Query头计算他和所有K的相似度。这就要求Q和K的头数是一致的。所以要把KV头扩维成16个头。

打印数据可以看到当前KV的shape为[1,8,9,128]的,但是计算实际上需要[1,15,9.128]的四维张量。

做法就是第2维后扩维到2,表示每个Q头有2个KV头,最后用flatten合并第1,2维,这样第1维的shape就是16了。

这里位置编码算法使用的 RoPE:旋转位置编码,把相对位置信息注入注意力机制的一种方法。其数学表达为

python实现为:

def apply_rotary_pos_emb(q, k, position_ids, head_dim, rope_theta=1000000.0):
  device = q.device
  inv_freq = 1.0 / (rope_theta ** (torch.arange(0, head_dim, 2, dtype=torch.float32, device=device) / head_dim))
  freqs = position_ids.unsqueeze(-1).float() * inv_freq.unsqueeze(0).unsqueeze(0)
  emb = torch.cat([freqs, freqs], dim=-1)
  cos = emb.cos().unsqueeze(1).to(q.dtype)
  sin = emb.sin().unsqueeze(1).to(q.dtype)

  def rotate_half(x):
      x1, x2 = x.chunk(2, dim=-1)
      return torch.cat((-x2, x1), dim=-1)

  q_embed = (q * cos) + (rotate_half(q) * sin)
  k_embed = (k * cos) + (rotate_half(k) * sin)
  return q_embed, k_embed

前馈层MLP

Qwen3前馈层计算相对不复杂,公式为

直接定义:

class MLP(nn.Module):
  def __init__(self,config):
    super().__init__()
    self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
    self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
    self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)
    
  def forward(self, x):
    ret=self.down_proj(F.silu(self.gate_proj(x)) * self.up_proj(x))
    return ret

外层推理逻辑

接着定义最外面的推理逻辑,处理模型输入和输出。推理流程如下:

  • • 分词我们用 tokenizers 包实现,拼接的message为下面格式,问答需要标注为assistant
"<|im_start|>user明天做点啥<|im_end|><|im_start|>assistant"
  • • 停止条件为token返回了eos_token_id
  • • 每次推理结束后加上上次生成的内容再次进行推理
  • • 因为模型权重里定义了model.layers.0,我们又只定义了一层模型类,没有model字段,所以解析的时候需要把权重字段里的model去掉

代码如下:

model = SelfQwen3Model(SelfQwen3Config())
# 模型放到model目录下
state_dict = load_file('model/model.safetensors')
new_state_dict = {}
for k, v in state_dict.items():
  if k.startswith("model."):
    new_state_dict[k[len("model") + 1:]] = v
  else:
    new_state_dict[k] = v
#加载权重,运行模型
model.load_state_dict(new_state_dict, strict=True)
model.eval()

#分词 得到词向量
tokenizer = Tokenizer.from_file(str("model/tokenizer.json"))
message="<|im_start|>user明天做点啥<|im_end|><|im_start|>assistant"
input_ids =tokenizer.encode(message).ids
input_ids = torch.tensor([input_ids], dtype=torch.long)

#推理
with torch.no_grad():
  while True:
    logits = model(input_ids)
    next_token = torch.argmax(logits[:, -1, :], dim=-1, keepdim=True)
    if next_token.item() == 151645:
      break
    input_ids = torch.cat([input_ids, next_token], dim=1)

  #输出
  output_text = tokenizer.decode(input_ids[0].tolist(), skip_special_tokens=True)
  print(output_text)

执行我们的python,可以看到Qwen3开始回答输入的提问了!

总结

到这里我们就使用pytorch手写了Qwen3-0.6B的推理过程,也加深了Transformer结构的认识和每一层的理解。

 


【声明】内容源于网络
0
0
睐芯科技LightSense
1234
内容 795
粉丝 0
睐芯科技LightSense 1234
总阅读1.8k
粉丝0
内容795