全局整体调用逻辑:
run_busy_loop()
├─ _process_input_queue() ← 从 input_queue 读取新请求/abort # 将请求传输到引擎侧,然后执行下面的函数
└─ _process_engine_step()
├─ step_fn() ← 即 step() 或 step_with_batch_queue()
│ ├─ scheduler.schedule() ← ① 调度,决定本轮送哪些请求
│ ├─ executor.execute_model(sched_output) ← ② 触发 GPU 前向
│ ├─ scheduler.get_grammar_bitmask() ← ③ 结构化输出的语法掩码
│ ├─ executor.sample_tokens(grammar_output) ← ④ 采样 这里会进行拒绝采样 & draft model的推测解码前向
│ └─ scheduler.update_from_output() ← ⑤ 处理结果、判断截止
└─ post_step()
└─ executor.take_draft_token_ids() ← ⑥ 取出 draft tokens(同步模式)
└─ scheduler.update_draft_token_ids() ← ⑦ 写入 request.spec_token_ids
每个函数的循环与终止条件:
_process_input_queue:
获取所有的请求,并处理完所有获取的请求。这里的处理主要指的是将请求送入到引擎侧
_process_engine_step:
单次执行,不循环
整体调用逻辑中,只有run_busy_loop是一直循环的,每个step结束之后会从run_busy_loop头部继续执行
验证和生成在同一次 sample_tokens 里完成
投机解码的一个完整推理步骤是这样流动的:
第 N 轮 execute_model
├─ execute_model()(带 draft tokens 作为输入,得到target model的验证tokens)
├─ sample_tokens():
│ ├─ self.sample() ← ①(拒绝采样的验证) 验证上一轮的 draft tokens
│ ├─ postprocess() ← ② 更新状态(已被接受的 token + KV cache 进度)
│ └─ propose_draft() ← ③ 生成下一轮的新 draft tokens
└─ 返回结果
***************************************************************************
第 N 轮(验证第 N-1 轮的 draft):
目标模型输入: [正常 token..., d0, d1, d2, d3]
↑上一轮 draft
rejection_sample → 接受 [d0, d1, bonus],拒绝 [d2, d3]
postprocess → last_sampled = bonus,num_computed += 3
propose_draft → 新 draft [d0', d1', d2', d3'](基于 bonus 的 hidden state)
写回 draft_tokens
第 N+1 轮(验证第 N 轮的 draft):
目标模型输入: [正常 token..., bonus, d0', d1', d2', d3']
...
draft token 验证与生成阶段
调用链路总览
EngineCore.step() # 引擎核心
└─> Executor.execute_model() # 执行器 (UniProc/MultiProc/Ray) ⭐️
└─> WorkerBase.execute_model() # Worker 基类
└─> GPUWorker.execute_model() # GPU Worker
└─> GPUModelRunner.execute_model() ← 目标模型前向
(返回 None,表示需要继续调用 sample_tokens)
└─> Executor.sample_tokens() ⭐️
└─> GPUWorker.sample_tokens()
└─> GPUModelRunner.sample_tokens() ← 采样 + 拒绝采样 + 草稿提案
execute_model
model_runner.execute_model()
│
│ 主模型前向
├──→ DeepseekV3ForCausalLM.forward()
│ └──→ 返回 hidden_states (最后一层输出)
# 这里将信息到GPUModelRunner类中的execute_model_state参数
# 这里也更新了kv_connector_output,获取已完成的请求传输状态
sample_tokens
sample_tokens(hidden_states)
└──→ 返回 sampled_token_ids
采样
sample(hidden_states, input_batch)
│
├──→ compute_logits(hidden_states[logits_indices]) # 根据hidden_state进行target model的logits的计算
│ │ logits_indices: 只取需要计算logit的位置
│ │ 在 spec-decode verify 阶段: 每个请求target model会生成 num_draft+1 个位置
│ └──→ LM head (Linear) → logits [num_logit_pos, vocab_size]
│
| # 这里在下面的拒采样之前会应用到sampler的apply_penalties去修改logits的数值,最下方有关于词频惩罚的说明
├──→ self.sampler(logits) # Gumbel-max 采样
│ └──→ 对target model的每个 logit 位置采样一个 token
│ → target_sampled [num_draft_tokens + num_reqs]
│ (每个请求: N个验证位置 + 1个bonus位置)
│
│ ┌── 非 spec-decode ──┐
│ │ num_sampled = 1 │ 每个请求固定接受1个token
│ └────────────────────┘
│
├──→ rejection_sample() # 仅 spec-decode 阶段调用
│ │
│ │ 输入:
│ │ target_sampled: 目标模型在每个位置采样的token
│ │ input_ids[logits_indices]: draft模型的token(偏移+1)
│ │ cu_num_logits: 各请求的token范围前缀和
│ │
│ └──→ _rejection_sample_kernel [Triton, 每个请求一个program]
│ │
│ │ 对每个 draft 位置 i 循环比较:
│ │ target_token[i] == draft_token[i] ?
│ │ ├─ 相等(accept): 存入 target_token[i], 继续
│ │ └─ 不等(reject): 存入 target_token[i] 作为修正
│ │ 停止后续比较
│ │ 全部接受时: 额外追加1个 bonus token
│ │
│ └──→ sampled [num_reqs, num_spec_steps+1]
│ num_sampled [num_reqs]
│ # 注意: 无论接受/拒绝,始终写入目标模型的token
│ # 保证输出分布与目标模型一致
│
└──→ get_num_sampled_and_rejected() [Triton]
│ 处理 chunked prefill 场景下的计数修正
└──→ (num_sampled [num_reqs], num_rejected [num_reqs])
后处理
postprocess(input_batch, sampled_tokens, num_sampled, num_rejected)
│
├──→ post_update() [input_batch.py, Triton kernel]
│ │
│ └──→ _post_update_kernel [每个请求一个Triton program]
│ │
│ ├── 写入 last_sampled_tokens[req_state_idx]
│ │ = sampled_tokens 中最后一个被接受的token
│ │ → 下一轮 prepare_eagle_inputs() 用它作为
│ │ Eagle/MTP 输入序列的最后一个位置
│ │
│ ├── 累加 output_bin_counts[num_sampled 个token]
│ │ → 用于重复惩罚(repetition penalty)统计
│ │
│ └── 更新 num_computed_tokens[req_state_idx]
│ += query_len - num_rejected
│ → KV cache manager 据此判断哪些槽位有效
│ → 被拒绝的draft token对应的KV槽位被标记为
│ "可覆盖",无需显式释放/重新分配
│
└──→ 更新 num_computed_prefill_tokens [CPU numpy]
= min(已计算 + num_scheduled_tokens, prefill总长度)
→ 用于 chunked prefill 进度追踪
MTP 投机解码提议
└──→ propose_draft(hidden_states) # 在sample_tokens函数的最后部分执行
│
└──→ 调用EagleSpeculator.propose() # 上一个函数只调用本函数
│
│ 首步: 用主模型的 hidden_states + sampled_tokens
│ 后续: 用前面draft model生成的的 hidden_states + sampled_tokens
├──→ run_model() ──→ DeepSeekMTP.forward()
│ │ │
│ │ └──→ DeepSeekMultiTokenPredictor.forward()
│ │ │
│ │ ├── embed_tokens(input_ids)
│ │ └── MTPLayer.forward(embeds, hidden_states)
│ │ ├── enorm(embeds) + hnorm(hidden)
│ │ ├── eh_proj(concat)
│ │ └── mtp_block(transformer层)
│ │
│ └──→ 返回 new_hidden_states
│
├──→ compute_logits(new_hidden_states) # 这里会加载对应的mtp layer层以及共享的lm_head
│ └──→ SharedHead.norm + head ──→ logits
│ # 这里做了两步操作:RMSNorm + Linear (hidden --> volcab_size)
│
├──→ sample draft_token
│
│ 后续步: 用 MTP 自己的 hidden_states + draft_token (循环)
└──→ 重复 run_model → compute_logits → sample
(共重复了 num_speculative_tokens-1 步,加上开头的1步一共num_speculative_tokens步)
prefill的draft 前向
请求输入序列长度:5(tokens: t0, t1, t2, t3, t4)
Target model prefill 后采样得到:t5
Hidden size(H):假设 4096
Draft model 前向的实际输入(已经过 prepare_eagle_inputs 右移):
eagle_input_ids = [t1, t2, t3, t4, t5],shape [5]
eagle_hidden_states(来自 target model 每个位置的输出),shape [5, 4096]
Step 1:Token Embedding
input_ids: [t1, t2, t3, t4, t5] shape: [5]
↓ embed_tokens(词表查找)
inputs_embeds: shape: [5, 4096]
Step 2:Mask position=0 的位置(MTP 特有)
inputs_embeds = torch.where(positions.unsqueeze(-1) == 0, 0, inputs_embeds)
对 position=0 的 token 的 embedding 置零(因为 position 0 的语义信息已经由 previous_hidden_states 携带,不需要重复),这是 MTP 的特殊设计:
positions: [0, 1, 2, 3, 4] shape: [5]
inputs_embeds: [0, e1, e2, e3, e4] shape: [5, 4096] ← position=0处被置零
Step 3:分别 RMSNorm + Concat + 线性投影
inputs_embeds: [0, e1, e2, e3, e4] shape: [5, 4096]
↓ enorm(RMSNorm)
inputs_embeds_normed: shape: [5, 4096]
previous_hidden_states: [h0, h1, h2, h3, h4] shape: [5, 4096]
↓ hnorm(RMSNorm)
hidden_states_normed: shape: [5, 4096]
↓ torch.cat([inputs_embeds_normed, hidden_states_normed], dim=-1)
concatenated: shape: [5, 8192] ← 2*H
↓ eh_proj(Linear: 8192 → 4096)
hidden_states: shape: [5, 4096] ← 压回 H
Step 4:经过一个完整的 Decoder Layer(Attention + FFN)
hidden_states: shape: [5, 4096]
↓ DeepseekV2DecoderLayer
├─ RMSNorm
├─ MLA / MHA Attention(KV cache 参与计算)
│ Q: [5, num_heads, head_dim]
│ K/V: 来自 KV cache(历史tokens)+ 当前5个token新增
│ attn_output: [5, 4096]
├─ 残差连接
├─ RMSNorm
├─ MoE / FFN
└─ 残差连接
↓
hidden_states: shape: [5, 4096] ← 维度不变
Step 5:取最后一个位置计算 logits,采样第一个 draft token
sample_hidden_states = last_hidden_states[last_token_indices]
logits = self.model.compute_logits(sample_hidden_states)
last_hidden_states: shape: [5, 4096]
↓ 取 last_token_indices(每个请求最后一个有效位置,这里是 index=4)
sample_hidden_states: shape: [1, 4096] ← 只取最后1个位置
↓ RMSNorm + lm_head(Linear: 4096 → vocab_size)
logits: shape: [1, vocab_size]
↓ gumbel_sample 采样
draft_token_0 = t6 shape: [1] ← 第0个 draft token
Decode 阶段对比(每次draft前向)
调用 prepare_eagle_decode 后,每个请求只保留 1个 token + 1个 hidden_state:
input_ids: [t6] shape: [1]
hidden_states: [h4] shape: [1, 4096] ← 取上一步 last_token 位置的 hidden_state
Step 1: embed(t6) shape: [1, 4096]
Step 2: enorm + hnorm + cat shape: [1, 8192]
Step 3: eh_proj shape: [1, 4096]
Step 4: Decoder Layer shape: [1, 4096]
关于KVcache
draft token 的 KV 不写入 prefix cache
new tokens 包含 verified 和 unverified draft tokens,
但只缓存 verified tokens(通过 request.num_tokens 来 cap)
注意num_computed_token是target model生成的KV
draft model负责产生token,在多步draft前向的时候也会保存和复用KV
在propose_draft过程中,除了第0步输入 的token长度是变长的,后续进行前向的输入token长度都是1+上一步的hidden_state
在后续步数中,,之前draft otken的KVcache是已经被保存的,因为在scheduler中显存已经申请了;
在拒绝采样之后会更新computed tokens,没有被采纳的token其KVcache可以被覆盖;
Scheduler._schedule_running_requests()
│
├── num_new_tokens = num_tokens_with_spec(含上轮draft) - num_computed_tokens
│
└── kv_cache_manager.allocate_slots(
num_new_tokens,
num_lookahead_tokens = num_spec_tokens ← 为下一轮 draft 预留
)
│
├── num_tokens_need_slot = computed + new + lookahead
├── 按 num_tokens_need_slot 分配物理 block
└── prefix cache 只写入到 request.num_tokens(不含 draft)
↑
确保被拒绝的 draft token 不污染 prefix cache
deepseek mtp
在propose_draft过程中,除了第0步输入 的token长度是变长的,后续进行前向的输入token长度都是1+上一步的hidden_state
在后续步数中,,之前draft otken的KVcache是已经被保存的,因为在scheduler中显存已经申请了;
在拒绝采样之后会更新computed tokens,没有被采纳的token其KVcache可以被覆盖;
总结:
输入:token + 上一轮的hidden_state
处理:
[token]-->embedding-->rmsNorm
--> cat --> linear --> mtp_block --> residual --> shared_head
[hidden_state]-->rmsNorm
具体细节在下方查看
Step 1: embed_tokens(input_ids)
input_ids [8]
↓ VocabParallelEmbedding
inputs_embeds [8, 7168]
Step 2: 掩码 position==0 的 embedding 置零
inputs_embeds [8, 7168] → 第0和第4行(position=0)被清零
Step 3: enorm(inputs_embeds)
inputs_embeds [8, 7168] → [8, 7168] (RMSNorm,形状不变)
Step 4: hnorm(previous_hidden_states)
previous_hidden_states [8, 7168] → [8, 7168] (RMSNorm,形状不变)
Step 5: concat + eh_proj
torch.cat([inputs_embeds, previous_hidden_states], dim=-1)
[8, 7168] + [8, 7168] → [8, 14336]
eh_proj (Linear: 14336 → 7168)
hidden_states [8, 14336] → [8, 7168]
Step 6: mtp_block (DeepseekV2DecoderLayer)
┌── input_layernorm(hidden_states)
│ residual [8, 7168] (clone)
│ hidden_states [8, 7168] (RMSNorm)
│
├── self_attn (MLA Attention)
│ positions [8]
│ hidden_states [8, 7168] → [8, 7168]
│ (内部: Q/K/V投影、RoPE、PagedAttention、输出投影)
│
├── post_attention_layernorm(hidden_states, residual)
│ residual += hidden_states (fused add) → [8, 7168]
│ hidden_states [8, 7168] (RMSNorm)
│
└── mlp (DeepseekV2MoE)
hidden_states [8, 7168]
gate(hidden_states) → router_logits [8, n_routed_experts]
experts(...) → [8, 7168]
+ shared_experts → [8, 7168]
hidden_states [8, 7168]
return hidden_states [8, 7168], residual [8, 7168]
Step 7: 残差连接
hidden_states = residual + hidden_states → [8, 7168]
Step 8: compute_logits
shared_head.norm(hidden_states) [8, 7168] → [8, 7168] (RMSNorm)
logits_processor(head, ...)
lm_head (Linear: 7168 → 129280)
logits [8, 7168] → [8, 129280]
mtp_layers
对于deepseek v3来说,其mtp_layers只有一个(mtp_start_layer_idx:61,num_mtp_layers:1)
DeepSeek-V3 checkpoint 结构
├── model.embed_tokens.weight ← 主模型 embedding
├── model.layers.0 ~ 60.* ← 主模型 61 层 Transformer
└── model.layers.61.* ← MTP 层权重
├── embed_tokens.weight ← 与主模型 embed_tokens 共享(路径重写)
├── enorm.weight
├── hnorm.weight
├── eh_proj.weight
├── shared_head.norm.weight
├── shared_head.head.weight ← lm_head,与主模型 lm_head 共享(DeepSeek-V3 报告)
└── self_attn.*, mlp.* ← 独立的 MTP Transformer block 权重
假设有多层的话,根据其实现逻辑,其embedding和shared_head还是只会共享一层;
其他
重复惩罚
举个例子
假设模型正在生成一段文字,用户设置了 frequency_penalty=0.5,词表中有:
token "the" (id=100)
token "cat" (id=200)
第1轮 decode → 采样出 "the"
postprocess() 执行:
output_bin_counts[req][100] += 1 → output_bin_counts[req][100] = 1
第2轮 decode → 进入 sample() 前,先修改 logits:
# _penalties_kernel 内:
logit["the"] -= frequency_penalty * output_bin_counts[req][100]
= logit["the"] - 0.5 * 1
= logit["the"] - 0.5 ← 被压低了 0.5
第3轮 "the" 又被选中,output_bin_counts[req][100] = 2,下一轮再压低 0.5×2=1.0。
效果:越生成越难再被选中,防止模型陷入循环重复。
主要代码
代码中的具体逻辑参考文件内注释,主要是:
vllm/v1/worker/gpu/model_runner.py
vllm/model_executor/models/deepseek_mtp.py
vllm/v1/core/sched/scheduler.py
vllm/v1/engine/core.py
vllm/v1/worker/gpu/spec_decode/eagle.py
vllm/v1/worker/gpu/spec_decode/rejection_sample.py