Skip to content

13. 模型部署与推理优化

面向大模型应用工程师/Agent开发工程师的高频面试题,难度标记:⭐基础 ⭐⭐进阶 ⭐⭐⭐高级


一、量化基础

Q1: FP16、BF16、INT8、INT4 各有什么特点和适用场景? ⭐⭐

答:

数据类型位宽特点适用场景
FP1616bit浮点数,精度较高,范围±65504训练/推理默认精度
BF1616bit1bit符号+8bit指数+7bit尾数,范围与FP32一致训练更稳定,A100/H100原生支持
INT88bit整数,模型体积减半推理加速,精度损失可控
INT44bit整数,模型体积减至1/4资源受限场景,需配合GPTQ/AWQ

BF16相比FP16的核心优势是动态范围大(指数位与FP32相同),训练时不容易溢出。INT8量化需要校准数据集确定scale/zero-point,INT4量化则需要更复杂的算法(如GPTQ基于Hessian矩阵的逐层量化)来最小化精度损失。

python
# 模型内存估算公式(以参数量为例)
def estimate_memory(params_b, dtype_bytes=2):
    """估算模型加载所需GPU显存(GB)"""
    return params_b * 1e9 * dtype_bytes / 1e9

print(f"7B FP16: {estimate_memory(7, 2):.1f} GB")   # 14.0 GB
print(f"7B INT8: {estimate_memory(7, 1):.1f} GB")    # 7.0 GB
print(f"7B INT4: {estimate_memory(7, 0.5):.1f} GB")  # 3.5 GB

Q2: GPTQ、AWQ、GGUF 三种量化方法有什么区别? ⭐⭐⭐

答:

GPTQ(GPU友好):基于OBQ框架,逐层量化权重,利用Hessian矩阵的逆来最小化量化误差。特点是逐层独立量化,需要少量校准数据(128条),量化后模型适合GPU部署。缺点是量化速度较慢,且依赖CUDA。

AWQ(Activation-aware Weight Quantization):核心思想是不是所有权重同等重要,对激活值大的通道给予更高精度保护。通过per-channel scaling因子保护显著权重,在INT4下精度通常优于GPTQ。量化速度快,且更适合边缘设备部署。

GGUF(CPU/混合推理友好):由llama.cpp项目定义的文件格式,支持多种量化方案(Q4_K_M、Q5_K_S等)。特点是纯CPU可推理,也支持CPU+GPU混合offload,适合消费级硬件。K-quant方案对不同层使用不同bit量化,平衡速度和精度。

python
# GPTQ 量化示例(使用 AutoGPTQ)
from auto_gptq import AutoGPTQForCausalLM
model = AutoGPTQForCausalLM.from_quantized("TheBloke/Llama-2-7B-GPTQ", device="cuda:0")

# AWQ 量化示例(使用 AutoAWQ)
from awq import AutoAWQForCausalLM
model = AutoAWQForCausalLM.from_quantized("TheBloke/Llama-2-7B-AWQ", fuse_layers=True)

# GGUF 推理示例(使用 llama-cpp-python)
from llama_cpp import Llama
llm = Llama(model_path="llama-2-7b-Q4_K_M.gguf", n_ctx=4096, n_gpu_layers=35)

选择建议:GPU服务器选AWQ(精度好)或GPTQ(生态成熟);消费级硬件/边缘部署选GGUF。


Q3: 如何评估量化后的精度损失? ⭐⭐

答:

评估量化精度损失需要从多个维度综合考量:

  1. 基准测试(Benchmark):在MMLU、HumanEval、GSM8K等标准评测集上对比量化前后分数。一般INT8损失<1%,INT4损失1-3%。
  2. 困惑度(Perplexity):在WikiText-2等数据集上计算PPL变化,是最基础的指标。
  3. 任务级评估:针对实际业务场景构造评测集,关注边界case——长文本理解、数学推理、代码生成等对量化更敏感的任务。
  4. 输出一致性:对比相同prompt下量化前后输出的语义相似度(可用embedding cosine similarity)。
python
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

def compare_outputs(original_model, quantized_model, tokenizer, prompts):
    """对比量化前后输出的一致性"""
    results = []
    for prompt in prompts:
        inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
        with torch.no_grad():
            orig_out = original_model.generate(**inputs, max_new_tokens=100, do_sample=False)
            quant_out = quantized_model.generate(**inputs, max_new_tokens=100, do_sample=False)
        orig_text = tokenizer.decode(orig_out[0], skip_special_tokens=True)
        quant_text = tokenizer.decode(quant_out[0], skip_special_tokens=True)
        results.append({"prompt": prompt, "match": orig_text == quant_text,
                        "orig": orig_text[:100], "quant": quant_text[:100]})
    return results

二、KV Cache

Q4: KV Cache 的原理是什么?为什么它在大模型推理中至关重要? ⭐⭐

答:

Transformer自回归生成时,每个新token需要与所有已生成token做attention计算。如果不缓存,生成第N个token时需要重新计算前N-1个token的K和V矩阵,时间复杂度为O(N²)。

KV Cache 的原理:将已计算过的每一层每个token的Key和Value向量缓存起来,生成新token时只需计算当前token的Q,与缓存的K、V做attention即可。这样每步只需增量计算,时间复杂度降为O(N)。

内存计算公式

KV Cache大小 = 2 × num_layers × num_kv_heads × head_dim × seq_len × dtype_bytes

以LLaMA-2-7B为例(32层、32头、128维、FP16):

单条序列4096长度 = 2 × 32 × 32 × 128 × 4096 × 2B ≈ 2.1GB

这意味着batch_size=16时仅KV Cache就需要约33GB,成为推理内存瓶颈

python
def calc_kv_cache_gb(num_layers, num_kv_heads, head_dim, seq_len, batch_size=1, dtype_bytes=2):
    """计算KV Cache内存占用"""
    per_token = 2 * num_layers * num_kv_heads * head_dim * dtype_bytes  # bytes
    total = per_token * seq_len * batch_size
    return total / 1e9

# LLaMA-2-7B: 32层, 32 KV heads, 128 dim
print(f"单序列4096: {calc_kv_cache_gb(32, 32, 128, 4096):.2f} GB")
# LLaMA-2-70B: 80层, 8 KV heads (GQA), 128 dim
print(f"单序列4096: {calc_kv_cache_gb(80, 8, 128, 4096):.2f} GB")

Q5: GQA(Grouped Query Attention)如何减少 KV Cache 开销? ⭐⭐

答:

在标准Multi-Head Attention (MHA)中,Q/K/V各有 num_heads 个头。GQA的核心思想是让多个Q头共享同一组K/V头

  • MHA:32个Q头对应32个K头和32个V头 → KV Cache完整大小
  • GQA:32个Q头只对应8个K/V头(每4个Q头共享1组KV) → KV Cache减少75%
  • MQA(极端GQA):所有Q头共享1个K/V头 → KV Cache最小但精度下降较多

GQA在精度和效率之间取得平衡。LLaMA-2-70B使用8个KV头(GQA_ratio=4),KV Cache仅为MHA的1/4,但精度几乎无损。推理时将K/V头复制到与Q头数量匹配后计算attention,额外开销极小。

python
import torch
import torch.nn as nn

class GroupedQueryAttention(nn.Module):
    def __init__(self, hidden_dim, num_q_heads, num_kv_heads):
        super().__init__()
        self.num_q_heads = num_q_heads
        self.num_kv_heads = num_kv_heads
        self.num_groups = num_q_heads // num_kv_heads
        self.head_dim = hidden_dim // num_q_heads
        self.q_proj = nn.Linear(hidden_dim, num_q_heads * self.head_dim)
        self.k_proj = nn.Linear(hidden_dim, num_kv_heads * self.head_dim)
        self.v_proj = nn.Linear(hidden_dim, num_kv_heads * self.head_dim)

    def forward(self, x, kv_cache=None):
        B, L, _ = x.shape
        q = self.q_proj(x).view(B, L, self.num_q_heads, self.head_dim)
        k = self.k_proj(x).view(B, L, self.num_kv_heads, self.head_dim)
        v = self.v_proj(x).view(B, L, self.num_kv_heads, self.head_dim)
        # 将KV头复制到与Q头数量匹配
        k = k.repeat_interleave(self.num_groups, dim=2)  # [B, L, num_q_heads, head_dim]
        v = v.repeat_interleave(self.num_groups, dim=2)
        return q, k, v

Q6: 长序列场景下如何优化 KV Cache 的内存占用? ⭐⭐⭐

答:

长序列(如128K+)下KV Cache可轻松超过百GB,主要优化手段包括:

  1. PagedAttention(vLLM):借鉴OS虚拟内存分页思想,将KV Cache划分为固定大小的block(如16 tokens),通过block table做逻辑到物理的映射。消除了预分配连续内存的浪费,内存利用率从约50%提升到>95%。

  2. KV Cache量化:对KV Cache做INT8/FP8量化,内存直接减半。研究表明KV Cache对量化的容忍度高于模型权重。

  3. Sliding Window Attention:如Mistral使用的方案,只保留最近W个token的KV Cache(如4096),超出窗口的丢弃。适合不需要超长距离依赖的场景。

  4. StreamingLLM:保留attention sink(前几个token)+ 最近窗口内的token,中间部分丢弃。可以无限长度推理但会丢失中间信息。

  5. Token Pruning/Eviction:基于attention score动态淘汰不重要的token的KV Cache,如H2O(Heavy Hitter Oracle)方法。

python
# PagedAttention 的核心概念示意
class PagedKVCache:
    def __init__(self, block_size=16, num_blocks=1024):
        self.block_size = block_size
        # 预分配物理block池
        self.k_blocks = torch.zeros(num_blocks, block_size, 32, 128)  # 物理块
        self.v_blocks = torch.zeros(num_blocks, block_size, 32, 128)
        self.free_blocks = list(range(num_blocks))  # 空闲块链表

    def allocate(self, seq_len):
        """按需分配block,不需要连续内存"""
        needed = (seq_len + self.block_size - 1) // self.block_size
        blocks = [self.free_blocks.pop() for _ in range(needed)]
        return blocks  # 返回block table(逻辑→物理映射)

    def free(self, block_table):
        """释放block"""
        self.free_blocks.extend(block_table)

三、推理框架

Q7: vLLM 的核心原理和优势是什么? ⭐⭐

答:

vLLM是UC Berkeley开源的高性能LLM推理引擎,核心创新是PagedAttention

核心问题:传统推理框架为每个请求预分配 max_seq_len 大小的连续KV Cache内存。由于实际生成长度远小于最大长度,导致严重的内存碎片化和浪费(实测浪费60-80%)。

PagedAttention解决方案

  • KV Cache被划分为固定大小的物理block(如16 tokens)
  • 每个序列维护一个block table(类似页表),记录逻辑block到物理block的映射
  • 物理block按需分配,不需要连续内存
  • 序列结束时block可立即回收复用

vLLM的其他优势

  • Continuous Batching:动态插入新请求,不等当前batch全部完成
  • 高效调度器:基于先来先服务+抢占策略管理请求
  • 张量并行:支持多GPU推理
  • OpenAI兼容API:一行命令即可部署为API服务
python
# vLLM 最简部署
from vllm import LLM, SamplingParams

llm = LLM(model="meta-llama/Llama-2-7b-chat-hf", tensor_parallel_size=1,
          gpu_memory_utilization=0.9, max_model_len=4096)
params = SamplingParams(temperature=0.7, top_p=0.9, max_tokens=512)
outputs = llm.generate(["你好,请介绍一下自己。"], params)
print(outputs[0].outputs[0].text)

# OpenAI兼容API服务启动: python -m vllm.entrypoints.openai.api_server --model Llama-2-7b-chat-hf

Q8: Continuous Batching 与 Static Batching 的区别是什么? ⭐⭐

答:

Static Batching(静态批处理):将多个请求组成一个batch,等所有请求都完成生成后才释放资源。短请求必须等待最长请求完成,GPU利用率低。

Continuous Batching(连续批处理):也叫iteration-level batching。核心区别是每个decode step都可以插入新请求或移除已完成请求

具体流程:

  1. 某个请求生成了EOS → 立即从batch中移除,释放KV Cache
  2. 队列中有新请求 → 立即插入当前batch(prefill后加入)
  3. 整个batch的吞吐量不再受最长请求限制

以实际效果为例:Static Batching下3个请求(10/50/100 tokens),需要等100步全部完成;Continuous Batching下,10步后请求1完成并插入新请求,50步后请求2完成并插入新请求,GPU始终满载。

TensorRT-LLMvLLMTGI 均支持Continuous Batching。这是现代推理框架的标配特性,对在线服务的吞吐量提升通常在2-8倍


Q9: TensorRT-LLM 与 vLLM 各自适合什么场景? ⭐⭐

答:

维度vLLMTensorRT-LLM
定位通用开源推理引擎NVIDIA官方优化引擎
优化重点内存管理(PagedAttention)计算图优化(算子融合、Kernel调优)
量化支持GPTQ/AWQ/FP8FP8/INT8/INT4,NVIDIA原生优化
易用性简单,pip install即可较复杂,需要编译engine
性能中高,通用性好极致延迟优化,throughput更高
适用场景快速原型、中小规模服务大规模生产环境、极致性能要求
生态活跃开源社区NVIDIA闭源优化+开源包装

选择建议

  • 原型验证/中小团队 → vLLM(开箱即用)
  • 大规模生产/延迟敏感 → TensorRT-LLM(需投入工程成本)
  • 需要NVIDIA GPU专用优化(如FP8、Hopper架构特性)→ TensorRT-LLM
  • 需要支持多种模型/快速迭代 → vLLM

四、推理加速

Q10: Speculative Decoding 的原理是什么? ⭐⭐⭐

答:

自回归生成的核心瓶颈是串行解码——每生成一个token都需要完整的一次forward pass。Speculative Decoding(推测解码)的核心思想是:用小模型快速"猜"多个token,再用大模型一次性验证

算法流程

  1. Draft阶段:小模型(如7B)快速自回归生成K个候选token(如K=5)
  2. Verify阶段:大模型(如70B)对这K个token做一次前向计算(并行验证)
  3. Accept/Reject:从左到右逐个检查,接受与大模型分布一致的token,遇到第一个不一致的则reject并从大模型分布重新采样

关键保证:通过巧妙的acceptance-rejection采样策略,输出分布与直接用大模型生成完全一致(无精度损失)。

加速效果:取决于draft模型与target模型的"一致性"。通常可获得2-3倍吞吐提升。draft模型越接近target模型,接受率越高。

python
# Speculative Decoding 核心逻辑示意
def speculative_decode(target_model, draft_model, prompt, K=5):
    tokens = tokenize(prompt)
    while not is_done(tokens):
        # 1. Draft: 小模型快速生成K个候选
        draft_tokens = []
        draft_probs = []
        for _ in range(K):
            p = draft_model.forward_prob(tokens + draft_tokens)
            t = sample(p)
            draft_tokens.append(t)
            draft_probs.append(p)

        # 2. Verify: 大模型一次性验证K个token
        target_probs = target_model.forward_probs(tokens + draft_tokens)  # 一次前向

        # 3. Accept/Reject
        accepted = 0
        for i in range(K):
            if random() < min(1, target_probs[i][draft_tokens[i]] / draft_probs[i][draft_tokens[i]]):
                accepted += 1
            else:
                # Reject: 从target分布重新采样
                resample = sample(adjust_distribution(target_probs[i], draft_probs[i]))
                tokens = tokens + draft_tokens[:accepted] + [resample]
                break
        else:
            tokens = tokens + draft_tokens  # 全部接受
    return tokens

Q11: Prefill 和 Decode 阶段有什么区别?如何分别优化? ⭐⭐⭐

答:

LLM推理分为两个阶段,它们的计算特性截然不同:

Prefill(预填充)阶段

  • 处理输入prompt的所有token,生成KV Cache
  • 特点:计算密集(Compute-bound),大量矩阵乘法可并行
  • 优化方向:FlashAttention、张量并行、batch prefill

Decode(解码)阶段

  • 逐个生成输出token,每步只处理1个新token
  • 特点:内存带宽瓶颈(Memory-bound),需要加载全部模型权重只做少量计算
  • 优化方向:量化减少权重体积、GQA减少KV Cache带宽、Continuous Batching提高GPU利用率

Prefill/Decode分离优化(Splitwise/DistServe)

  • 将Prefill和Decode部署在不同GPU集群上
  • Prefill节点追求高算力利用率,可接受batch较大
  • Decode节点追求低延迟,需要高带宽
  • 避免prefill长请求阻塞decode请求导致延迟抖动

Chunked Prefill

  • 将长prompt的prefill切成多个chunk,与decode请求交替调度
  • 避免长prefill独占GPU导致其他decode请求饥饿

Q12: Prefix Caching 是什么?在什么场景下特别有效? ⭐⭐

答:

Prefix Caching(前缀缓存)是指缓存相同前缀的KV Cache,当新请求的prompt前缀与已缓存的前缀匹配时,直接复用缓存的KV Cache,跳过prefill计算。

适用场景

  1. System Prompt共享:所有请求共享相同的system prompt(如1000+ tokens),只需prefill一次
  2. Few-shot Learning:相同的示例前缀被多个请求复用
  3. 多轮对话:历史对话部分的KV Cache可复用
  4. RAG场景:检索到的相同文档片段被多个请求引用

实现方式

  • vLLM Automatic Prefix Caching:基于token序列的hash自动匹配前缀
  • RadixAttention(SGLang):使用Radix Tree管理前缀树,高效查找最长公共前缀
python
# vLLM 启用 Prefix Caching
from vllm import LLM
llm = LLM(
    model="meta-llama/Llama-2-7b-chat-hf",
    enable_prefix_caching=True,  # 启用前缀缓存
    block_size=16
)

# SGLang 使用 RadixAttention
# python -m sglang.launch_server --model Llama-2-7b --enable-radix-attention

效果:在system prompt较长的场景下,首个请求之后的请求prefill时间可减少50-90%


五、部署架构

Q13: 大模型服务的高可用架构如何设计? ⭐⭐

答:

大模型服务的高可用设计需要覆盖基础设施层、推理引擎层、应用层三个层面:

基础设施层

  • 多副本部署:同一模型至少部署2个副本,避免单点故障
  • 多可用区(AZ):副本分布在不同可用区,防机房级故障
  • GPU健康监控:监控GPU显存、温度、ECC错误,异常自动摘除

推理引擎层

  • 健康检查:定期发送测试请求验证服务可用性
  • 优雅退出:收到SIGTERM后等待正在处理的请求完成再关闭
  • 超时控制:设置请求级超时(如120s),避免慢请求占用资源

应用层

  • 负载均衡:使用Nginx/Envoy做L7负载均衡,支持least-connections策略
  • 熔断降级:下游超时率超过阈值时自动熔断,返回降级结果
  • 请求队列:使用Redis/Kafka缓冲突发流量,异步处理
                    ┌─ Load Balancer (Nginx/Envoy) ─┐
                    │                                │
        ┌───────────┴──┐    ┌───────────┴──┐    ┌───┴───────────┐
        │  vLLM Pod 1  │    │  vLLM Pod 2  │    │  vLLM Pod 3   │
        │  (GPU×2)      │    │  (GPU×2)      │    │  (GPU×2)       │
        │  AZ-1         │    │  AZ-2         │    │  AZ-1          │
        └──────────────┘    └──────────────┘    └───────────────┘

Q14: 如何对大模型服务进行 A/B 测试和灰度发布? ⭐⭐

答:

A/B 测试:同时部署新旧两个版本,按流量比例分配请求,通过指标对比决定是否全量切换。

灰度发布:逐步扩大新版本的流量比例(如1% → 5% → 20% → 100%),每阶段观察关键指标。

实现方案:

python
# 基于权重的路由逻辑(简化示意)
import random

class ModelRouter:
    def __init__(self):
        self.routes = {
            "v1": {"endpoint": "http://vllm-v1:8000", "weight": 90},
            "v2": {"endpoint": "http://vllm-v2:8000", "weight": 10},
        }

    def route(self, request):
        """按权重路由请求"""
        total = sum(r["weight"] for r in self.routes.values())
        rand = random.uniform(0, total)
        cumulative = 0
        for name, config in self.routes.items():
            cumulative += config["weight"]
            if rand <= cumulative:
                # 记录路由决策用于后续分析
                request.metadata["model_version"] = name
                return self._forward(config["endpoint"], request)

    def promote(self, version, new_weight):
        """灰度调整权重"""
        self.routes[version]["weight"] = new_weight
        # 自动归一化其他版本权重

关键指标对比:延迟P50/P99、吞吐量、用户满意度评分、任务完成率、幻觉率。灰度期间需设置自动回滚条件(如错误率>2%或延迟P99超标则自动回退)。


Q15: 如何计算大模型推理服务的成本? ⭐⭐

答:

大模型推理成本主要包括GPU硬件成本运营成本

成本计算公式

单次请求成本 = GPU小时单价 × 推理时间(小时) × GPU数量
每百万token成本 = 单次请求成本 × (1,000,000 / 单次请求token数)

以A100-80G为例($3/小时):推理7B模型每秒约50 tokens

每百万token成本 = $3 × (1,000,000 / 50) / 3600 ≈ $16.7(单卡)
考虑batch=16后: $16.7 / 16 ≈ $1.04/百万tokens

成本降低手段

  1. 量化:INT4量化使7B模型可在单卡运行,13B模型可挤入单卡
  2. Continuous Batching:提升GPU利用率,batch越大单位成本越低
  3. 模型选择:用更小的模型+精心设计的prompt替代大模型
  4. Spot/竞价实例:非实时场景使用Spot实例,成本降低60-70%
  5. 智能路由:简单请求路由到小模型,复杂请求路由到大模型
  6. 缓存:对相同/相似请求缓存结果,直接跳过推理
python
# 智能路由:根据请求复杂度选择模型
class SmartRouter:
    def __init__(self):
        self.models = {
            "small": {"endpoint": "http://7b:8000", "cost_per_1k": 0.001},
            "large": {"endpoint": "http://70b:8000", "cost_per_1k": 0.015},
        }

    def route(self, request):
        complexity = self.estimate_complexity(request)
        if complexity < 0.3:
            return self.models["small"]  # 简单请求 → 小模型
        elif complexity > 0.7:
            return self.models["large"]  # 复杂请求 → 大模型
        else:
            # 中间地带:先用小模型,置信度低时fallback到大模型
            result = self._call(self.models["small"], request)
            if result.confidence < 0.8:
                result = self._call(self.models["large"], request)
            return result

    def estimate_complexity(self, request):
        """基于规则/分类器评估请求复杂度"""
        # 简单启发式:长度、关键词、意图分类等
        pass

六、实战难题

Q16: 线上服务出现 GPU OOM,如何排查和解决? ⭐⭐⭐

答:

排查步骤

  1. 确认OOM类型:是模型加载OOM还是推理过程OOM(运行时OOM通常是KV Cache导致)
  2. 检查显存分布:模型权重 + KV Cache + 激活值 + 框架开销
  3. 检查并发量:连续批处理的batch size过大导致KV Cache超出显存
  4. 检查序列长度:是否有异常长的输入/输出序列

解决手段

python
# 1. 限制并发和序列长度
from vllm import LLM
llm = LLM(
    model="model-name",
    gpu_memory_utilization=0.9,    # 预留10%防止碎片
    max_model_len=4096,             # 限制最大序列长度
    max_num_seqs=32,                # 限制最大并发序列数
    enforce_eager=True,             # 禁用CUDA Graph减少显存峰值
)

# 2. 启用KV Cache量化
llm = LLM(model="model-name", kv_cache_dtype="fp8")  # KV Cache用FP8

# 3. 设置请求级超时和长度限制
MAX_INPUT_TOKENS = 3000
MAX_OUTPUT_TOKENS = 1024

防御措施

  • 输入端做token数量校验,超长输入截断或拒绝
  • 设置output token限制,防止模型"跑飞"
  • 监控GPU显存使用率,设置告警阈值(如>90%)
  • 使用抢占机制:低优先级请求可被高优先级请求抢占,释放其KV Cache

Q17: 推理服务的延迟P99突然飙升,如何排查? ⭐⭐⭐

答:

P99飙升通常不是模型本身的问题,而是系统性瓶颈。排查路径:

第一步:定位瓶颈在哪个阶段

python
import time

class LatencyTracker:
    def track(self, request):
        t0 = time.time()
        # 1. 排队时间
        queue_time = request.start_process_time - request.arrival_time
        # 2. Prefill时间(处理输入)
        prefill_time = ...  # 从推理引擎获取
        # 3. Decode时间(生成输出)
        decode_time = ...
        # 4. 网络/序列化时间
        network_time = total_time - queue_time - prefill_time - decode_time

        return {"queue": queue_time, "prefill": prefill_time,
                "decode": decode_time, "network": network_time}

常见原因及对策

现象原因对策
队列时间高并发超载扩容/限流
Prefill时间高长输入+无chunked prefill启用chunked prefill
Decode时间高batch过大/输出过长限制batch size和max_tokens
周期性飙升GPU thermal throttling散热优化/降低功耗
偶发性飙升CUDA kernel编译/JIT预热、固定batch shape
持续性飙升显存swap到host减少并发/量化

关键工具:NVIDIA NSight Systems、PyTorch Profiler、推理框架自带的metrics endpoint。


Q18: 多模型服务如何复用GPU资源? ⭐⭐⭐

答:

实际业务中往往需要部署多个模型(不同规模/不同能力),但为每个模型分配独立GPU非常浪费。主要方案:

方案一:模型切换(Model Switching)

  • 单卡上按需加载不同模型,通过LRU策略管理显存
  • 适合模型不同时高频使用的场景
  • 缺点:模型加载有冷启动延迟(数十秒级)

方案二:模型并行(Model Parallelism)

  • 多个模型共享同一组GPU,通过tensor/pipeline parallelism分布
  • 适合总参数量在GPU容量范围内的场景

方案三:NVIDIA MPS/MIG

  • MPS(Multi-Process Service):多个进程共享同一GPU,按SM分配算力
  • MIG(Multi-Instance GPU):A100/H100支持将GPU划分为最多7个独立实例,每个实例有独立显存和算力
yaml
# 使用Kubernetes + GPU资源管理的部署方案
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-small  # 7B模型
spec:
  replicas: 2
  template:
    spec:
      containers:
      - name: vllm
        resources:
          limits:
            nvidia.com/gpu: 1
            # 通过 nodeSelector 或 MIG 策略分配
        env:
        - name: MODEL_NAME
          value: "qwen-7b-chat"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-large  # 72B模型,使用更多GPU
spec:
  replicas: 1
  template:
    spec:
      containers:
      - name: vllm
        resources:
          limits:
            nvidia.com/gpu: 4  # 张量并行
        env:
        - name: MODEL_NAME
          value: "qwen-72b-chat"

Q19: 如何实现大模型推理的请求级优先级调度? ⭐⭐⭐

答:

不同业务场景对延迟的要求不同(如实时聊天 vs 批量处理),需要请求级优先级调度

核心设计

  1. 请求携带优先级标签(P0/P1/P2)
  2. 推理引擎的调度器支持优先级队列
  3. 高优先级请求可抢占低优先级请求的GPU资源
python
import heapq
from dataclasses import dataclass, field
from enum import IntEnum

class Priority(IntEnum):
    REALTIME = 0    # 最高优先级:实时交互
    NORMAL = 1      # 普通优先级:标准API调用
    BATCH = 2       # 最低优先级:离线批量处理

@dataclass(order=True)
class PrioritizedRequest:
    priority: int
    arrival_time: float = field(compare=False)
    request: object = field(compare=False)

class PriorityScheduler:
    def __init__(self, max_running=32):
        self.queue = []  # 优先级队列
        self.running = {}  # 正在运行的请求
        self.max_running = max_running

    def submit(self, request, priority):
        heapq.heappush(self.queue,
                       PrioritizedRequest(priority, time.time(), request))

    def schedule(self):
        """调度决策:有空位则取出最高优先级请求"""
        if len(self.running) < self.max_running and self.queue:
            req = heapq.heappop(self.queue)
            self.running[req.request.id] = req
            return req.request
        # 可选:P0请求到达时,抢占P2请求
        if self.queue and self.queue[0].priority == Priority.REALTIME:
            lowest = max(self.running.values(), key=lambda r: r.priority)
            if lowest.priority > Priority.REALTIME:
                self._preempt(lowest)  # 暂停低优先级请求,保存KV Cache
                return self.queue[0].request

关键考虑

  • 抢占时需要保存被抢占请求的KV Cache到CPU内存(swap),恢复时再加载回来
  • P0请求的SLA通常要求<500ms首token延迟,需要预留buffer capacity
  • 监控各优先级队列的等待时间和吞吐量,确保公平性

Q20: 如何评估和优化推理服务的首Token延迟(TTFT)? ⭐⭐⭐

答:

TTFT(Time To First Token)是用户体验最关键的指标,主要由Prefill阶段决定。

TTFT的组成

TTFT = 排队时间 + 网络传输 + Prefill计算时间
Prefill时间 ≈ (prompt_tokens × hidden_dim² × 2) / GPU_TFLOPS

优化手段

  1. FlashAttention v2/v3:减少attention计算的HBM访问次数,prefill提速1.5-2x
  2. Prefix Caching:命中缓存的请求直接跳过已缓存部分的prefill
  3. Chunked Prefill:将长prompt分块,避免长请求独占GPU影响其他请求的TTFT
  4. Tensor Parallelism:多GPU分担计算,prefill时间与GPU数近似成反比
  5. 输入侧优化:精简system prompt、压缩few-shot示例
python
# 优化system prompt减少prefill时间
# 差:冗长的system prompt(2000 tokens)
system_bad = "You are a helpful assistant. " * 200 + detailed_instructions

# 好:精简的system prompt(200 tokens),效果相当
system_good = """你是一个AI助手。遵循以下规则:
1. 简洁准确回答
2. 不确定时说明
3. 引用来源"""

监控指标

  • TTFT P50/P95/P99
  • Prefill tokens/second
  • 排队深度和等待时间
  • Prefix cache命中率

七、总结速查表

主题核心要点
量化INT4(GPTQ/AWQ)省显存,GGUF适合CPU,精度损失需多维评估
KV CacheGQA减少KV头数,PagedAttention消除内存碎片,长序列用sliding window
推理框架vLLM通用易用,TRT-LLM极致性能,Continuous Batching是标配
推理加速Speculative Decoding用小模型加速,Prefix Caching复用前缀KV
部署架构多副本多AZ,灰度发布权重路由,智能路由降本
成本优化量化+batching+缓存+模型路由,Spot实例降60-70%
实战

八、进阶题目

题目 8 ⭐⭐⭐:FlashAttention 的原理是什么?

Q:请详细解释 FlashAttention 的核心原理,包括 IO-aware 算法设计、HBM 与 SRAM 的关系、tiling 策略,以及 FlashAttention v1/v2/v3 的演进。

A:

FlashAttention 是一种 IO-aware 的精确注意力算法,核心思想是减少对高带宽内存(HBM)的访问次数,而非减少浮点运算量。

背景问题: 标准注意力需要将完整的 S×S 注意力矩阵(S 为序列长度)写入 HBM,再进行 softmax,HBM 访问次数为 O(S²),成为瓶颈。

IO-aware 设计:

  • GPU 内存分为 HBM(大但慢,~1.5TB/s)和 SRAM(小但快,~19TB/s)
  • 标准注意力在 HBM 和 SRAM 之间来回搬运数据,带宽受限
  • FlashAttention 通过 tiling(分块) 策略,将 Q/K/V 分成小块,每块在 SRAM 中完成计算,避免将完整的注意力矩阵写回 HBM

Tiling 策略:

  1. 将 Q 分成 Tr 块,K/V 分成 Tc 块
  2. 外层循环遍历 K/V 块,内层循环遍历 Q 块
  3. 每个块在 SRAM 中完成 matmul → mask → softmax → 与 V 相乘的全流程
  4. 使用 online softmax(维护运行中的 max 和 sum)实现分块 softmax 的精确计算
  5. 通过额外的 rescaling 因子合并各块结果,保证数值精确等价

FlashAttention v1 → v2 → v3 演进:

  • v1:首次实现 IO-aware 注意力,HBM 访问从 O(S²) 降到 O(S²d²/M)(M 为 SRAM 大小),消除中间注意力矩阵的存储
  • v2:优化并行策略,将外层循环改为遍历 Q 块(而非 K/V),更好地利用 GPU 的并行性;减少非 matmul 操作,prefill 速度提升约 2x
  • v3:针对 Hopper 架构(H100)优化,利用异步 warp 特性和 TMA(Tensor Memory Accelerator),进一步隐藏内存延迟,接近硬件理论峰值
python
# FlashAttention 使用示例(PyTorch 2.0+ 原生支持)
import torch
import torch.nn.functional as F

# 方式1:PyTorch 2.0+ 原生 scaled_dot_product_attention
def attention_with_flash(q, k, v, is_causal=False):
    """PyTorch 会自动选择 FlashAttention 后端"""
    return F.scaled_dot_product_attention(
        q, k, v,
        is_causal=is_causal,
        # 可强制指定后端
        # backend=torch.backends.cuda.SDPBackend.FLASH_ATTENTION
    )

# 方式2:直接使用 flash-attn 库
from flash_attn import flash_attn_func

def flash_attention_direct(q, k, v, causal=False):
    """
    q, k, v: (batch, seqlen, nheads, headdim)
    注意输入维度顺序与 PyTorch 不同!
    """
    return flash_attn_func(q, k, v, causal=causal)

# 方式3:HuggingFace Transformers 中启用
# 在模型加载时指定 attn_implementation="flash_attention_2"
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-3-8B",
    attn_implementation="flash_attention_2",  # 启用 FlashAttention
    torch_dtype=torch.float16,
)

# 性能对比
# 序列长度 4096, batch_size 8, nheads 32, headdim 128
# 标准注意力: ~120ms, 显存 ~16GB(存储完整 S×S 矩阵)
# FlashAttention: ~45ms, 显存 ~4GB(不存储注意力矩阵)

追问:

  1. FlashAttention 为什么是"精确"的? online softmax 算法通过维护全局 max 和 sum 的增量更新,使得分块计算的结果与一次性 softmax 完全等价(忽略浮点精度差异),并非近似算法。
  2. FlashAttention 能减少 FLOPs 吗? 不能,FLOPs 不变甚至略增(多了 rescaling),但减少了 IO 操作,因为现代 GPU 的注意力是 memory-bound 而非 compute-bound。
  3. FlashAttention 对 decode 阶段有效吗? 效果有限,因为 decode 阶段 Q 只有 1 个 token,不再是 memory-bound,这就是 FlashDecoding 要解决的问题。

题目 9 ⭐⭐⭐:SGLang 框架的核心特性是什么?

Q:请介绍 SGLang 推理框架的核心特性,包括 RadixAttention、Constrained Decoding、Frontend Language 设计,以及它与 vLLM 的对比。

A:

SGLang(Structured Generation Language)是一个面向 LLM 的高性能推理框架,核心理念是将前端语言设计与后端运行时优化深度结合

1. RadixAttention(基数注意力):

  • 传统 Prefix Caching 按固定前缀匹配,粒度粗
  • RadixAttention 使用 Radix Tree(基数树) 管理所有请求的 KV Cache
  • 自动发现任意长度的公共前缀/后缀,实现更细粒度的缓存复用
  • 支持 LRU 淘汰策略,自动管理缓存空间
  • 对多轮对话、few-shot、tree-of-thought 等场景收益显著

2. Constrained Decoding(受限解码):

  • 原生支持 JSON Schema、正则表达式约束下的生成
  • 将约束编译为有限状态机(FSM),在每一步解码时 mask 掉不合法的 token
  • 相比外部约束方案(如 Outlines),SGLang 将 FSM 状态跟踪集成到调度器中,零额外开销
  • 保证输出 100% 符合 schema,无需后处理重试

3. Frontend Language:

  • 提供 Python DSL 描述复杂的 LLM 调用流程
  • 支持 fork/join 并发、条件分支、循环控制流
  • 编译器自动将 DSL 优化为高效的执行计划

4. 与 vLLM 对比:

特性SGLangvLLM
缓存策略RadixAttention(自动前缀匹配)Prefix Caching(手动/自动)
受限解码原生 FSM 集成需外部工具
前端语言Python DSL无(纯 API)
Continuous Batching支持支持
PagedAttention支持支持(原创)
多模态支持支持
社区生态较新但增长快成熟庞大
python
# SGLang 使用示例
import sglang as sgl

# 1. 启动服务器(RadixAttention 默认启用)
# python -m sglang.launch_server --model meta-llama/Llama-3-8B-Instruct

# 2. 前端 DSL 示例:多路并发 + 汇总
@sgl.function
def multi_query(s, question):
    # 并发执行3个不同角度的回答
    s += sgl.fork("a1", generate, f"从技术角度回答:{question}")
    s += sgl.fork("a2", generate, f"从商业角度回答:{question}")
    s += sgl.fork("a3", generate, f"从用户体验角度回答:{question}")
    sgl.join_all(s)

    # 汇总三个回答
    combined = s["a1"] + "\n" + s["a2"] + "\n" + s["a3"]
    s += sgl.fork("summary", generate, f"总结以下内容:\n{combined}")
    sgl.join(s, "summary")

# 3. Constrained Decoding with JSON Schema
from sglang import RuntimeEndpoint, gen

json_schema = {
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "age": {"type": "integer"},
        "skills": {"type": "array", "items": {"type": "string"}}
    },
    "required": ["name", "age", "skills"]
}

@sgl.function
def extract_json(s, text):
    s += sgl.system("Extract structured info from text.")
    s += sgl.user(text)
    s += sgl.assistant(sgl.gen(
        "result",
        max_tokens=256,
        json_schema=json_schema  # 保证输出合法JSON
    ))

# 4. 简单客户端调用
import sglang as sgl

@sgl.function
def chat(s, message):
    s += sgl.system("You are a helpful assistant.")
    s += sgl.user(message)
    s += sgl.assistant(sgl.gen("answer", max_tokens=512))

state = chat.run(message="What is FlashAttention?")
print(state["answer"])

追问:

  1. RadixAttention 如何处理 eviction? 使用 LRU 策略,当 KV Cache 内存不足时,淘汰最近最少使用的节点。支持 tree-aware eviction,优先淘汰分支末尾。
  2. SGLang 的 FSM 约束解码如何处理 Unicode 和 BPE 分词? BPE 分词可能导致一个字符跨多个 token,SGLang 使用 token-level FSM 转换,预计算每个 token 对应的 FSM 状态转移,支持所有 tokenizer。
  3. 什么时候选 SGLang vs vLLM? 需要复杂 LLM 编排(多轮、分支、工具调用)或需要 JSON 约束输出时选 SGLang;简单服务且看重成熟生态选 vLLM。

题目 10 ⭐⭐⭐:Tensor Parallelism vs Pipeline Parallelism 有什么区别?

Q:请对比 Tensor Parallelism 和 Pipeline Parallelism 的原理、通信开销、适用场景,并介绍混合并行策略。

A:

模型并行是将大模型拆分到多个 GPU 上的核心技术,TP 和 PP 是两种主要方式。

Tensor Parallelism(TP,张量并行):

  • 将每一层的权重矩阵按列或行切分到不同 GPU
  • 每个 GPU 持有部分权重,独立计算后通过 AllReduce 通信合并结果
  • 一个 Transformer 层的计算分布在所有 TP GPU 上,需要多次同步
  • 通信量:每层 2 次 AllReduce,通信量 = 2 × hidden_size × seq_len × dtype_size
  • 优点:延迟低(所有 GPU 同时参与单层计算)
  • 缺点:通信频繁,跨节点时网络带宽成为瓶颈

Pipeline Parallelism(PP,流水线并行):

  • 将模型按层分组,不同 GPU 负责不同层
  • 数据在 GPU 间顺序传递,形成流水线
  • 存在 pipeline bubble:流水线填充和排空时 GPU 空闲
  • bubble 率 ≈ (p-1)/(m+p-1),p 为 micro-batch 数,m 为流水线阶段数
  • 优点:通信量小(只在相邻阶段传递 activations)
  • 缺点:bubble 导致 GPU 利用率降低;需要足够大的 micro-batch 数

关键对比:

维度Tensor ParallelismPipeline Parallelism
切分维度层内(权重矩阵)层间(模型层)
通信模式AllReduce(每层多次)P2P(相邻阶段)
通信量大(每层都要同步)小(只传 activations)
GPU 利用率高(同步计算)受 bubble 影响
最佳场景节点内(NVLink)跨节点(带宽有限)

混合并行(Megatron-LM 风格): 典型部署:节点内用 TP(8 GPU),节点间用 PP

python
# 概念性实现:混合并行策略
import torch
import torch.distributed as dist

class TensorParallelLinear(torch.nn.Module):
    """将线性层按列切分到 TP 组"""
    def __init__(self, in_features, out_features, tp_size, tp_rank):
        super().__init__()
        assert out_features % tp_size == 0
        self.local_out = out_features // tp_size
        self.weight = torch.nn.Parameter(
            torch.randn(in_features, self.local_out)
        )
        self.tp_group = None  # 由外部设置通信组

    def forward(self, x):
        # 每个 GPU 计算部分输出
        local_out = x @ self.weight
        # AllReduce 汇总所有 GPU 的结果
        dist.all_reduce(local_out, group=self.tp_group)
        return local_out

class PipelineStage(torch.nn.Module):
    """流水线并行的一个阶段"""
    def __init__(self, layers, stage_id, num_stages):
        super().__init__()
        self.layers = torch.nn.ModuleList(layers)
        self.stage_id = stage_id
        self.num_stages = num_stages

    def forward(self, x):
        if self.stage_id > 0:
            # 从上一个阶段接收 activations
            x = dist.recv(x, src=self.stage_id - 1)
        for layer in self.layers:
            x = layer(x)
        if self.stage_id < self.num_stages - 1:
            # 发送给下一个阶段
            dist.send(x, dst=self.stage_id + 1)
        return x

# Megatron-LM 风格的混合并行配置
# 假设 2 个节点,每节点 8 GPU
TP_SIZE = 8    # 节点内 TP
PP_SIZE = 2    # 跨节点 PP
DP_SIZE = 1    # 数据并行(如有更多GPU)
# 总 GPU = TP × PP × DP = 8 × 2 × 1 = 16

追问:

  1. TP 的 AllReduce 为什么在节点内更合适? NVLink 带宽可达 900GB/s(H100),而跨节点 RDMA 通常 100-400Gb/s,差一个数量级。频繁的 AllReduce 在跨节点时成为严重瓶颈。
  2. 如何减小 Pipeline Bubble? 增加 micro-batch 数量(p >> m),使用 GPipe 的 1F1B 调度策略,或使用 Interleaved 1F1B 进一步降低 bubble。
  3. Sequence Parallelism 是什么? 在 TP 的基础上,对 LayerNorm 和 Dropout 等非张量并行操作也进行序列维度切分,减少这些操作的显存冗余。

题目 11 ⭐⭐:什么是模型蒸馏?和量化有什么区别?

Q:请解释模型蒸馏的概念和原理,以及它与量化技术的本质区别。

A:

模型蒸馏(Knowledge Distillation): 将大模型(Teacher)的知识迁移到小模型(Student),目标是让小模型获得接近大模型的能力。

核心方法:

  1. Logits Distillation:Student 学习 Teacher 的输出概率分布(soft targets),而非只学 hard label
  2. KD Loss:L = α × L_CE(y, student_logits) + (1-α) × KL(teacher_logits/T, student_logits/T)
    • T 为温度参数(通常 T=2~5),软化概率分布,暴露更多"暗知识"
    • Teacher 在错误类别上的相对概率包含了类间相似性等隐含知识
  3. Feature Distillation:Student 学习 Teacher 中间层的特征表示
  4. 数据增强:用 Teacher 生成 synthetic 数据,用这些数据训练 Student

蒸馏 vs 量化:

维度模型蒸馏量化
本质改变模型架构(大→小)降低数值精度(FP16→INT4)
架构变化需要新模型(参数量减少)保持相同架构
精度损失取决于 Student 容量,可能较大通常较小(INT4 几乎无损)
训练需求需要 Teacher 推理 + Student 训练PTQ 几乎不需要训练
压缩比高(可 10x+ 参数量缩减)中(约 4x 显存缩减)
部署灵活性独立小模型,部署简单需要支持低精度的硬件/框架
可组合可以蒸馏后再量化可以量化后再蒸馏
python
# 模型蒸馏示例
import torch
import torch.nn as nn
import torch.nn.functional as F

class DistillationLoss(nn.Module):
    """标准知识蒸馏损失"""
    def __init__(self, temperature=4.0, alpha=0.7):
        super().__init__()
        self.T = temperature
        self.alpha = alpha
        self.ce_loss = nn.CrossEntropyLoss()
        self.kl_loss = nn.KLDivLoss(reduction='batchmean')

    def forward(self, student_logits, teacher_logits, labels):
        # Hard loss: Student 对真实标签的交叉熵
        hard_loss = self.ce_loss(student_logits, labels)

        # Soft loss: Student 输出与 Teacher soft targets 的 KL 散度
        student_soft = F.log_softmax(student_logits / self.T, dim=-1)
        teacher_soft = F.softmax(teacher_logits / self.T, dim=-1)
        soft_loss = self.kl_loss(student_soft, teacher_soft) * (self.T ** 2)

        return self.alpha * soft_loss + (1 - self.alpha) * hard_loss

# 训练循环示例
teacher = load_teacher_model("llama-3-70B")  # 大模型
student = load_student_model("llama-3-8B")   # 小模型
teacher.eval()  # Teacher 不更新梯度
criterion = DistillationLoss(temperature=4.0, alpha=0.7)

for batch in dataloader:
    input_ids, labels = batch
    with torch.no_grad():
        teacher_logits = teacher(input_ids).logits
    student_logits = student(input_ids).logits
    loss = criterion(student_logits, teacher_logits, labels)
    loss.backward()
    optimizer.step()

# 实际应用场景
# 1. Llama-3-70B → Llama-3-8B(同架构蒸馏,保持能力)
# 2. GPT-4 → 开源小模型(API 蒸馏,成本高但效果好)
# 3. 多 Teacher 蒸馏:融合多个 Teacher 的知识到一个 Student

追问:

  1. 蒸馏时 Teacher 和 Student 必须同架构吗? 不必须,但同架构蒸馏效果通常更好。跨架构蒸馏(如 Transformer→MLP)需要额外的特征对齐。
  2. 温度 T 的作用? T=1 时 softmax 输出接近 one-hot,暗知识丢失;T 增大时概率分布更平滑,暴露更多类间关系。但 T 过大会使分布过于均匀,信息量下降。
  3. 蒸馏和剪枝(Pruning)的关系? 剪枝是移除不重要的权重,可与蒸馏结合:先剪枝得到小模型,再用蒸馏恢复精度。

题目 12 ⭐⭐:边缘部署和端侧推理怎么做?

Q:请介绍如何将 LLM 部署到边缘设备/端侧,包括 llama.cpp、GGML/GGUF 格式、ONNX Runtime、Core ML 等方案。

A:

端侧部署 LLM 的核心挑战:有限的内存、算力和功耗。主流方案如下:

1. llama.cpp:

  • 纯 C/C++ 实现的 LLM 推理引擎,无需 GPU
  • 支持多种量化格式(Q4_0, Q4_K_M, Q5_K_M, Q8_0 等)
  • 针对 CPU SIMD 指令集优化(AVX2, AVX-512, ARM NEON)
  • 支持 Apple Silicon 的 Metal GPU 加速
  • 适用于:macOS/Linux/Windows/Android/iOS

2. GGML/GGUF 格式:

  • GGML:Georgi Gerganov 的张量库,用于 CPU 推理
  • GGUF:GGML 的改进格式,取代旧的 GGML 格式
    • 自描述元数据(模型架构、量化类型等嵌入文件)
    • 支持更大的 vocabulary(突破 token 数限制)
    • 更灵活的扩展性
  • 量化级别:Q2_K(最小,质量差)→ Q4_K_M(推荐平衡点)→ Q8_0(接近原精度)

3. ONNX Runtime:

  • 微软的跨平台推理引擎
  • 支持 CPU、CUDA、TensorRT、DirectML、Core ML 等后端
  • 通过 Optimum 库导出 HuggingFace 模型为 ONNX
  • 适合 Windows 生态和跨平台需求

4. Core ML(Apple):

  • Apple 原生推理框架,针对 iOS/macOS 优化
  • 支持 Neural Engine 加速(A14+ 芯片)
  • 通过 coremltools 将模型转换为 .mlpackage
  • 量化支持:INT4/INT8 weight-only quantization

5. 端侧量化方案:

  • Weight-only INT4(最常用,GPTQ/AWQ 风格)
  • 混合精度量化(关键层保持高精度,其余层低精度)
  • 激活量化(INT8 动态量化,配合权重 INT4)
python
# 1. 使用 llama.cpp 量化和推理
# 安装:git clone https://github.com/ggerganov/llama.cpp && cd llama.cpp && make
# 转换模型(HuggingFace → GGUF)
# python convert_hf_to_gguf.py /path/to/model --outfile model-f16.gguf
# 量化:./llama-quantize model-f16.gguf model-Q4_K_M.gguf Q4_K_M
# 推理:./llama-cli -m model-Q4_K_M.gguf -p "Hello" -n 100

# 2. Python 绑定
from llama_cpp import Llama

llm = Llama(
    model_path="./models/llama-3-8B-Q4_K_M.gguf",
    n_ctx=2048,        # 上下文长度(越小越省内存)
    n_threads=4,       # CPU 线程数
    n_gpu_layers=0,    # 0=纯CPU,>0 部分卸载到GPU
)
output = llm("What is AI?", max_tokens=100)
print(output["choices"][0]["text"])

# 3. ONNX Runtime 推理
from optimum.onnxruntime import ORTModelForCausalLM
from transformers import AutoTokenizer

# 导出并加载 ONNX 模型
model = ORTModelForCausalLM.from_pretrained(
    "meta-llama/Llama-3-8B",
    export=True,              # 自动导出为ONNX
    provider="CPUExecutionProvider"  # 或 CUDAExecutionProvider
)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3-8B")
inputs = tokenizer("Hello world", return_tensors="pt")
outputs = model.generate(**inputs, max_new_tokens=50)

# 4. Core ML 转换(概念示例)
import coremltools as ct
import torch

# 先将 PyTorch 模型导出为 TorchScript
traced_model = torch.jit.trace(pytorch_model, example_input)
# 转换为 Core ML
mlmodel = ct.convert(
    traced_model,
    inputs=[ct.TensorType(shape=example_input.shape)],
    compute_precision=ct.precision.FLOAT16,
    minimum_deployment_target=ct.target.iOS17,
)
mlmodel.save("model.mlpackage")

追问:

  1. Q4_K_M 和 Q4_0 的区别? Q4_0 是最简单的 4-bit 量化;Q4_K_M 使用 K-Quant 方法,对不同层使用不同精度(重要层高精度),精度更好但略慢。
  2. llama.cpp 的 Metal 加速效果如何? 在 M2 Max 上,7B Q4_K_M 约 40-60 tokens/s(CPU 约 15-20 tps),提升约 2-3x。主要瓶颈是内存带宽而非计算。
  3. 端侧模型大小限制? iPhone 15 Pro 有 8GB 内存,可运行 Q4 量化的 7B 模型(约 4-5GB)。更大模型需要考虑模型分片或流式加载。

题目 13 ⭐⭐⭐:什么是 FlashDecoding?

Q:请解释 FlashDecoding 的原理,它与 FlashAttention 的区别,以及为什么 decode 阶段需要专门的优化。

A:

问题背景: LLM 推理分为 prefill(处理输入)和 decode(逐 token 生成)两个阶段。

Decode 阶段的特殊性:

  • Decode 时 Q 只有 1 个 token(当前生成的),K/V 是完整的 KV Cache
  • 计算量:1 × d × S(d 为 head_dim,S 为序列长度)
  • 这是典型的 memory-bound 操作——需要从 HBM 加载整个 KV Cache,但计算量很小
  • FlashAttention 的 tiling 对 decode 效果有限,因为 Q 没有多行可以并行

FlashDecoding 的核心思想: 将 KV Cache 沿序列维度切分成多块,每块由一个 GPU warp 独立处理,最后通过 reduce 操作(类似 online softmax)合并结果。

与 FlashAttention 的区别:

维度FlashAttentionFlashDecoding
优化目标Prefill(长 Q 序列)Decode(单 token Q)
并行维度Q 维度(行并行)KV 维度(列并行)
Tiling 策略Q 分块 × KV 分块KV 分块,Q 不分
通信模式无(单 pass 内完成)Reduce(合并各 KV 块结果)
加速比prefill 2-4xdecode 5-40x(长序列)

关键实现:

  1. 将 KV Cache 均分为 M 个 chunk
  2. 每个 chunk 的 warp 独立计算局部 attention(使用 online softmax)
  3. 所有 warp 的结果通过 reduce(log-sum-exp)合并为最终输出
  4. M 个 warp 充分利用 GPU 并行性,即使 Q 只有 1 个 token
python
# FlashDecoding 概念实现(简化版)
import torch
import torch.nn.functional as F

def flash_decoding_concept(q, k_cache, v_cache, num_chunks=8):
    """
    q: (batch, 1, nheads, headdim) - decode阶段Q只有1个token
    k_cache: (batch, seqlen, nheads, headdim) - 完整KV Cache
    v_cache: (batch, seqlen, nheads, headdim)
    """
    batch, seqlen, nheads, headdim = k_cache.shape
    chunk_size = seqlen // num_chunks

    # 存储每个chunk的局部结果
    outputs = []
    max_scores = []
    sum_exps = []

    for i in range(num_chunks):
        start = i * chunk_size
        end = start + chunk_size

        k_chunk = k_cache[:, start:end]  # (batch, chunk_size, nheads, headdim)
        v_chunk = v_cache[:, start:end]

        # 局部attention计算
        scale = headdim ** -0.5
        # q: (batch, 1, nheads, headdim) @ k_chunk^T: (batch, headdim, nheads, chunk_size)
        scores = (q.transpose(-2, -3) @ k_chunk.transpose(-2, -3).transpose(-2, -1)) * scale
        # scores: (batch, nheads, 1, chunk_size)

        local_max = scores.max(dim=-1, keepdim=True).values
        local_exp = torch.exp(scores - local_max)
        local_sum = local_exp.sum(dim=-1, keepdim=True)
        local_output = (local_exp @ v_chunk.transpose(-2, -3)).transpose(-2, -3)

        max_scores.append(local_max)
        sum_exps.append(local_sum)
        outputs.append(local_output)

    # Reduce: 合并所有chunk的结果(类似online softmax的merge)
    global_max = torch.cat(max_scores, dim=-1).max(dim=-1, keepdim=True).values
    # 重新缩放每个chunk的exp sum
    corrected_exps = []
    corrected_outputs = []
    for i in range(num_chunks):
        correction = torch.exp(max_scores[i] - global_max)
        corrected_exps.append(sum_exps[i] * correction)
        corrected_outputs.append(outputs[i] * correction)

    total_exp = sum(corrected_exps)
    final_output = sum(corrected_outputs) / total_exp

    return final_output

# 实际使用:FlashDecoding 已集成在 flash-attn v2+ 和 vLLM 中
# 在 vLLM 中自动启用,无需额外配置
from vllm import LLM, SamplingParams

llm = LLM(model="meta-llama/Llama-3-8B")
# 长序列decode场景下自动使用FlashDecoding加速
outputs = llm.generate(["Write a long essay about AI..."] * 8,
                       SamplingParams(max_tokens=2048))

追问:

  1. FlashDecoding 的加速比与什么有关? 与 KV Cache 长度成正比。序列越长,标准 decode 越 memory-bound,FlashDecoding 通过并行化 KV 维度的收益越大。128K 序列可加速 20-40x。
  2. FlashDecoding 和 Continuous Batching 的关系? 两者互补。Continuous Batching 解决不同请求间的效率问题,FlashDecoding 解决单请求内长序列的效率问题。
  3. 为什么不直接用 FlashAttention 做 decode? FlashAttention 的 tiling 策略假设 Q 有多行可以并行,decode 时 Q 只有 1 行,无法充分利用 GPU 并行性。

题目 14 ⭐⭐:多模态模型部署有什么特殊考量?

Q:在部署多模态 LLM(如 LLaVA、Qwen-VL、GPT-4V)时,相比纯文本模型有哪些特殊的考量?

A:

多模态模型的部署比纯文本 LLM 复杂得多,主要体现在以下几个方面:

1. 视觉编码器的额外开销:

  • 多模态模型通常包含一个视觉编码器(如 ViT)+ 一个 LLM
  • ViT 推理有独立的计算开销,不参与 token-by-token decode
  • 一张图片经过 ViT 编码后通常产生数百个 visual tokens(如 CLIP ViT-L 产生 576 个 token)
  • 这些 visual tokens 会占用大量 KV Cache 空间

2. 图像预处理开销:

  • 图像需要 resize、crop、normalize 等预处理
  • 不同模型的预处理流程不同(分辨率、channel 顺序等)
  • 预处理通常在 CPU 上进行,可能成为瓶颈

3. 多模态 KV Cache 管理:

  • visual tokens 的 KV Cache 与 text tokens 混合存储
  • Prefix Caching 需要同时处理图像和文本的缓存
  • 图像 token 数量不固定(取决于分辨率和 patch size)

4. 动态分辨率和 Token 数量:

  • 支持动态分辨率的模型,输入图像大小不同导致 token 数量不同
  • 影响 batch 内的 padding 和 GPU 利用率
  • 需要更灵活的调度策略

5. 显存规划:

  • ViT 模型本身占用显存(~1-3GB)
  • 高分辨率图像产生大量 visual tokens
  • 需要预留足够显存给视觉编码器和 visual KV Cache
python
# 多模态模型部署示例(vLLM)
from vllm import LLM, SamplingParams
from PIL import Image

# 部署多模态模型
llm = LLM(
    model="llava-hf/llava-v1.6-mistral-7b-hf",
    max_model_len=4096,       # 需要考虑visual tokens占用
    gpu_memory_utilization=0.85,
    limit_mm_per_prompt={"image": 5},  # 每个请求最多5张图片
)

# 带图片的请求
image = Image.open("photo.jpg")
prompt = {
    "prompt": "<image>\nDescribe this image in detail.",
    "multi_modal_data": {"image": image}
}
outputs = llm.generate([prompt], SamplingParams(max_tokens=256))

# 多图片请求
images = [Image.open(f"img_{i}.jpg") for i in range(3)]
multi_image_prompt = {
    "prompt": "<image><image><image>\nCompare these three images.",
    "multi_modal_data": {"image": images}
}

# 图像预处理优化(独立服务)
import torchvision.transforms as T
from concurrent.futures import ThreadPoolExecutor

preprocess = T.Compose([
    T.Resize(336, interpolation=T.InterpolationMode.BICUBIC),
    T.CenterCrop(336),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def preprocess_batch(images, num_workers=4):
    """并行预处理多张图片"""
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        processed = list(executor.map(preprocess, images))
    return torch.stack(processed)

# 显存规划示例
# LLaVA 1.6 7B:
#   - LLM (7B, FP16): ~14GB
#   - ViT (CLIP ViT-L): ~1.2GB
#   - KV Cache (4096 tokens, FP16): ~1GB
#   - Visual tokens KV (576 per image × 5 images = 2880): ~0.7GB
#   - 总计: ~17GB

追问:

  1. Visual tokens 压缩方法? 常见方案包括:Perceiver Resampler(将 N 个 visual tokens 压缩到 M 个)、Average Pooling、Cross-Attention 等。LLaVA 1.5+ 使用简单的 MLP projection,不压缩。
  2. 多模态模型的 Prefix Caching 如何处理? 图像编码结果需要与 text prefix 一起缓存。SGLang 的 RadixAttention 可以自动处理,vLLM 需要通过 mm_cache 配置。
  3. 视频输入如何处理? 通常采样关键帧(如每秒 1 帧),每帧独立编码后拼接。token 数量随视频长度线性增长,需要 sliding window 或压缩策略控制。

九、进阶推理框架与调度

Q21: SGLang 框架的核心特性有哪些? ⭐⭐⭐

答:

SGLang 是 UC Berkeley 开源的高效 LLM 推理引擎,核心特性包括:

  1. RadixAttention:基于 Radix Tree(基数树)管理 KV Cache,自动实现前缀共享。相同前缀的请求共享已计算的 KV Cache,无需手动配置 Prefix Caching,对多轮对话、few-shot 等场景自动优化。

  2. Constrained Decoding:原生支持正则表达式和 JSON Schema 约束解码,通过有限状态机(FSM)在每一步 token 采样时只允许符合约束的 token 被选中,保证输出格式 100% 合规,且几乎没有额外开销。

  3. Frontend Language:提供 Python DSL 用于编排复杂 LLM 调用流程(多轮生成、分支、循环),支持自动并行化,比手写 API 调用更高效。

  4. 与 vLLM 对比:SGLang 在多轮对话和共享前缀场景下吞吐量提升显著(最高 5x),而 vLLM 更适合单轮大批量推理。SGLang 的 RadixAttention 是自动的,vLLM 的 Prefix Caching 需要显式开启。

python
# SGLang 使用示例
import sglang as sgl

@sgl.function
def multi_turn_chat(s, question1, question2):
    s += sgl.system("You are a helpful assistant.")
    s += sgl.user(question1)
    s += sgl.assistant(sgl.gen("answer1", max_tokens=256))
    s += sgl.user(question2)
    s += sgl.assistant(sgl.gen("answer2", max_tokens=256))

# JSON 约束解码
@sgl.function
def json_extract(s, text):
    s += sgl.user(f"Extract info as JSON from: {text}")
    s += sgl.assistant(sgl.gen("result", regex=r'\{"name": "[^"]+", "age": \d+\}'))

# 启动服务
# python -m sglang.launch_server --model-path meta-llama/Llama-2-7b-chat-hf --port 30000

追问:

  1. RadixAttention 的淘汰策略? 使用 LRU 策略管理 Radix Tree 节点,当内存不足时淘汰最久未使用的前缀节点。
  2. Constrained Decoding 会影响生成质量吗? 不会改变模型概率分布,只是在采样时 mask 掉不合法 token。但如果约束过紧,可能导致采样到低概率 token,影响语义质量。
  3. SGLang 的 Frontend Language 如何实现自动并行? 通过分析 DSL 中各生成调用的依赖关系,无依赖的调用自动并发执行。

Q22: 什么是 Continuous Batching 的详细实现? ⭐⭐⭐

答:

Continuous Batching(又称 Iteration-level Batching)是 LLM 推理的核心调度策略。与 Static Batching 不同,它在每个 decode step 级别动态调度请求。

Static Batching 的问题:批次中最长序列决定整个批次的计算时间,短序列需等待长序列完成,GPU 利用率低。且批次固定,新请求必须等当前批次全部完成。

Continuous Batching 的实现细节

  1. 调度循环:每个 decode step 结束后,检查是否有请求完成(生成 EOS),完成的请求立即从批次中移除,腾出位置给等待队列中的新请求。
  2. Prefill 抢占:新请求的 Prefill 可以插入当前批次(与 Decode 并行),但需要权衡 Prefill 的计算量对 Decode 延迟的影响。
  3. Preemption 机制:当显存不足时,可以选择抢占低优先级请求(swap 到 CPU 或重算)。
python
# Continuous Batching 核心逻辑伪代码
class ContinuousBatchScheduler:
    def __init__(self, max_batch_size):
        self.max_batch = max_batch_size
        self.running = []      # 正在推理的请求
        self.waiting = []      # 等待队列
    
    def step(self):
        # 1. 检查完成的请求,移出批次
        self.running = [r for r in self.running if not r.is_finished()]
        
        # 2. 从等待队列填充空位
        while len(self.running) < self.max_batch and self.waiting:
            new_req = self.waiting.pop(0)
            if self.can_allocate_kv_cache(new_req):
                new_req.run_prefill()  # 先做 prefill
                self.running.append(new_req)
        
        # 3. 对所有 running 请求做一步 decode
        if self.running:
            tokens = [r.next_token() for r in self.running]
            self.model.decode_step(tokens)

    def submit(self, request):
        self.waiting.append(request)

# vLLM 的实际实现更复杂,包含:
# - Chunked Prefill(将长 prefill 分块)
# - Priority-based scheduling
# - Swap/Prefill/Decode 三个阶段的精细调度

追问:

  1. Continuous Batching 对延迟的影响? 降低了平均延迟(请求无需等待批次中最慢的请求),但单个请求的 decode 延迟可能因 batch 内请求增多而略微增加。
  2. 如何处理变长序列的 padding? 使用 PagedAttention 将 KV Cache 分页管理,避免连续内存分配,无需 padding 到相同长度。
  3. Prefill 和 Decode 混合调度的公平性问题? 需要限制 Prefill 请求的比例,避免新请求的 Prefill 抢占已有请求的 Decode 时间。

Q23: 如何实现模型的 A/B 测试? ⭐⭐

答:

模型 A/B 测试是评估新模型版本效果的关键手段,需要在流量分配、指标收集和统计分析三个层面设计。

1. 流量分配: 通过 API Gateway 或负载均衡器按比例分流。可以基于用户 ID hash 保证同一用户始终访问同一版本,或按请求随机分配。

2. 指标收集: 记录每个请求的模型版本、输入、输出、延迟、token 数量等,以及业务指标(用户满意度、任务完成率等)。

3. 统计显著性: 使用假设检验(如 t-test、Mann-Whitney U)判断两组指标差异是否显著,通常要求 p-value < 0.05。

python
# A/B 测试流量分配(基于用户 ID hash)
import hashlib

def assign_group(user_id: str, groups: dict[str, float] = None) -> str:
    """根据用户 ID hash 分配实验组"""
    if groups is None:
        groups = {"control": 0.5, "treatment": 0.5}
    
    hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16) % 1000
    cumulative = 0
    for group, ratio in groups.items():
        cumulative += ratio * 1000
        if hash_val < cumulative:
            return group
    return list(groups.keys())[-1]

# FastAPI 中间件实现
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

class ABTestMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, ab_config):
        super().__init__(app)
        self.config = ab_config  # {"model_v1": 0.7, "model_v2": 0.3}
    
    async def dispatch(self, request: Request, call_next):
        user_id = request.headers.get("X-User-ID", "anonymous")
        group = assign_group(user_id, self.config)
        request.state.model_version = group
        response = await call_next(request)
        response.headers["X-Model-Version"] = group
        return response

# 统计显著性检验
from scipy import stats
import numpy as np

def check_significance(control_metrics, treatment_metrics, alpha=0.05):
    """检验两组指标是否有显著差异"""
    t_stat, p_value = stats.ttest_ind(control_metrics, treatment_metrics)
    return {
        "p_value": p_value,
        "significant": p_value < alpha,
        "control_mean": np.mean(control_metrics),
        "treatment_mean": np.mean(treatment_metrics),
        "lift": (np.mean(treatment_metrics) - np.mean(control_metrics)) / np.mean(control_metrics)
    }

追问:

  1. 如何保证实验的公平性? 需要确保两组用户的分布一致(年龄、地域等),可以用 A/A 测试验证分流是否均匀。
  2. 多组实验如何设计? 使用 Multi-Armed Bandit 或正交实验设计,避免实验之间的交互影响。
  3. 样本量如何确定? 使用 Power Analysis,根据预期 effect size、显著性水平和统计功效计算所需样本量。

Q24: 什么是 Speculative Decoding 的详细原理? ⭐⭐⭐

答:

Speculative Decoding(推测解码)通过小模型快速生成候选 token,大模型一次性验证,从而加速推理。

核心流程

  1. Draft 阶段:小模型(draft model)自回归生成 K 个候选 token,速度很快(参数量小或非自回归)。
  2. Verify 阶段:大模型(target model)一次性对 K 个候选 token 进行前向传播,通过概率对比决定接受或拒绝。
  3. Accept/Reject:从左到右逐个比较,如果 draft token 的概率在 target 模型的采样范围内,则接受;否则从 target 模型的分布中重新采样。

数学保证:通过 rejection sampling,最终输出的分布与直接用 target 模型生成的分布完全一致,不损失任何质量。

python
# Speculative Decoding 核心算法
import torch
from torch.nn import functional as F

def speculative_decode(target_model, draft_model, prefix_ids, K=5):
    """
    Speculative Decoding 实现
    target_model: 大模型
    draft_model: 小模型(草稿模型)
    K: 每次推测的 token 数
    """
    generated = list(prefix_ids)
    
    while True:
        # Step 1: Draft model 生成 K 个候选
        draft_tokens = []
        draft_probs = []
        draft_input = torch.tensor([generated]).to(target_model.device)
        
        for _ in range(K):
            with torch.no_grad():
                logits = draft_model(draft_input).logits[:, -1, :]
                probs = F.softmax(logits, dim=-1)
                token = torch.multinomial(probs, 1)
            draft_tokens.append(token.item())
            draft_probs.append(probs[0, token.item()].item())
            draft_input = torch.cat([draft_input, token.unsqueeze(0)], dim=1)
        
        # Step 2: Target model 一次性验证 K 个 token
        verify_input = torch.tensor([generated + draft_tokens]).to(target_model.device)
        with torch.no_grad():
            target_logits = target_model(verify_input).logits
        
        # Step 3: Accept/Reject 逐个检查
        n_accepted = 0
        for i in range(K):
            pos = len(generated) + i - 1
            target_probs = F.softmax(target_logits[0, pos], dim=-1)
            target_prob = target_probs[draft_tokens[i]].item()
            draft_prob = draft_probs[i]
            
            if torch.rand(1).item() < min(1, target_prob / draft_prob):
                n_accepted += 1
                generated.append(draft_tokens[i])
            else:
                # 从 target 分布重新采样
                adjusted_probs = torch.clamp(target_probs - probs_draft, min=0)
                adjusted_probs /= adjusted_probs.sum()
                new_token = torch.multinomial(adjusted_probs, 1).item()
                generated.append(new_token)
                break
        
        if generated[-1] == target_model.config.eos_token_id:
            break
    
    return generated

# 使用 HuggingFace Transformers 的原生支持
# from transformers import AutoModelForCausalLM
# target = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-70b-hf")
# draft = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")
# assistant_model=draft 参数启用推测解码
# output = target.generate(input_ids, assistant_model=draft, num_assistant_tokens=5)

追问:

  1. 加速比如何计算? 理论加速比 ≈ αK / (1 + ατ),其中 α 是平均接受率,K 是推测长度,τ 是 draft/target 的耗时比。α 越高、K 越大、τ 越小,加速越明显。
  2. Draft Model 的选择? 可以是同系列小模型(Llama-2-7B 作为 Llama-2-70B 的 draft)、Medusa(多头并行预测)、Eagle(基于特征的自回归 draft)。
  3. 何时效果不佳? 当 draft model 与 target model 分布差异大时(接受率低),加速效果有限,甚至可能更慢。

Q25: 如何优化 LLM 的首 Token 延迟(TTFT)? ⭐⭐

答:

TTFT(Time To First Token)是用户体验的关键指标,主要由 Prefill 阶段决定。优化方向包括:

1. Prefill 计算优化

  • 使用 Flash Attention 减少 Prefill 的内存访问开销
  • 对输入做 tokenization 预处理优化
  • 利用 Tensor Parallelism 分散计算

2. Prefix Caching

  • 缓存系统 prompt 的 KV Cache,避免重复计算
  • 对多轮对话,复用历史对话的 KV Cache

3. Chunked Prefill

  • 将长 prompt 分块与 Decode 交错执行,避免长 Prefill 阻塞其他请求

4. 投机 Prefill(Prompt Lookup Decoding)

  • 对有固定模式的输入(如模板),预计算并缓存
python
# TTFT 优化:Prefix Caching 示例(vLLM)
from vllm import LLM, SamplingParams

llm = LLM(
    model="meta-llama/Llama-2-7b-chat-hf",
    enable_prefix_caching=True,   # 开启自动前缀缓存
    gpu_memory_utilization=0.9,
)

# 系统 prompt 会在首次请求后被缓存,后续请求复用
system_prompt = "You are a helpful assistant specialized in medical Q&A. ..."

# 多个请求共享同一 system prompt 的 KV Cache
prompts = [
    system_prompt + "\nUser: What is diabetes?\nAssistant:",
    system_prompt + "\nUser: What is hypertension?\nAssistant:",
    system_prompt + "\nUser: What is asthma?\nAssistant:",
]

import time
start = time.time()
outputs = llm.generate(prompts, SamplingParams(max_tokens=256))
# 第一个请求需要完整 Prefill,后续请求跳过 system prompt 的计算
for output in outputs:
    print(f"TTFT: {time.time() - start:.3f}s | Output: {output.outputs[0].text[:50]}...")

# Chunked Prefill 配置(vLLM)
# llm = LLM(
#     model="meta-llama/Llama-2-7b-chat-hf",
#     enable_chunked_prefill=True,
#     max_num_batched_tokens=512,  # 每次 Prefill 最多处理 512 tokens
# )

追问:

  1. TTFT 与吞吐量如何权衡? 开启 Chunked Prefill 可以降低单个请求的排队等待时间,但 Prefill 分块会增加总计算量。需要根据 SLA 要求调整 max_num_batched_tokens
  2. Prefix Caching 的命中率如何监控? vLLM 提供 prefix_cache_hit_rate 指标,可以通过 Prometheus 采集。命中率低说明 prompt 多样性高,优化空间有限。
  3. 首 Token 延迟过高但不是 Prefill 的原因? 可能是请求排队时间长(服务过载),或网络延迟,需要区分 TTFT = 排队时间 + Prefill 时间 + 网络时间。

Q26: 如何实现模型的负载均衡? ⭐⭐

答:

大模型服务的负载均衡需要考虑 GPU 实例的特殊性:请求处理时间差异大、显存是稀缺资源、预热成本高。

1. 健康检查: 不仅检查进程存活,还要检查 GPU 状态(显存使用、温度)和推理能力(发送测试请求)。

2. 权重分配策略

  • Round Robin:简单但不考虑实例负载差异
  • Least Connections:选择当前请求数最少的实例
  • TTFT-based:选择响应最快的实例(考虑了实际负载)

3. 故障转移: 实例健康检查失败后,将流量切换到其他实例,需要考虑正在进行中的请求的优雅处理。

python
# 自定义负载均衡器(基于响应时间的加权选择)
import time
import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class BackendInstance:
    url: str
    weight: float = 1.0
    active_requests: int = 0
    avg_latency_ms: float = 0.0
    is_healthy: bool = True
    last_health_check: float = 0
    _latency_samples: list = field(default_factory=list)

    def update_latency(self, latency_ms: float):
        self._latency_samples.append(latency_ms)
        if len(self._latency_samples) > 100:
            self._latency_samples.pop(0)
        self.avg_latency_ms = sum(self._latency_samples) / len(self._latency_samples)

class LLMRouter:
    def __init__(self, backends: list[BackendInstance]):
        self.backends = backends
    
    async def select_backend(self) -> BackendInstance:
        """基于综合评分选择最优后端"""
        healthy = [b for b in self.backends if b.is_healthy]
        if not healthy:
            raise RuntimeError("No healthy backends available")
        
        # 评分 = 权重 / (活跃请求数 × 平均延迟)
        def score(b: BackendInstance) -> float:
            load_factor = max(b.active_requests, 1) * max(b.avg_latency_ms, 1)
            return b.weight / load_factor
        
        return max(healthy, key=score)
    
    async def health_check_loop(self, interval: float = 10.0):
        """定期健康检查"""
        async with aiohttp.ClientSession() as session:
            while True:
                for backend in self.backends:
                    try:
                        async with session.get(
                            f"{backend.url}/health", timeout=aiohttp.ClientTimeout(total=5)
                        ) as resp:
                            backend.is_healthy = resp.status == 200
                    except Exception:
                        backend.is_healthy = False
                await asyncio.sleep(interval)

# Kubernetes 配置示例
# apiVersion: v1
# kind: Service
# metadata:
#   name: llm-service
# spec:
#   selector:
#     app: llm-server
#   ports:
#   - port: 80
#     targetPort: 8000
# ---
# apiVersion: autoscaling/v2
# kind: HorizontalPodAutoscaler
# metadata:
#   name: llm-hpa
# spec:
#   scaleTargetRef:
#     apiVersion: apps/v1
#     kind: Deployment
#     name: llm-server
#   minReplicas: 2
#   maxReplicas: 10
#   metrics:
#   - type: Pods
#     pods:
#       metric:
#         name: gpu_memory_usage
#       target:
#         type: AverageValue
#         averageValue: "80"

追问:

  1. GPU 实例的预热时间很长,如何处理扩缩容? 使用 Predictive Scaling(基于历史流量模式预测),保持最小实例数,缩容时设置长冷却期。
  2. 多模型场景下如何路由? 根据请求中的 model 字段路由到对应的模型服务,使用 Service Mesh(如 Istio)管理路由规则。
  3. 如何处理会话粘性? 对多轮对话,通过用户 ID hash 路由到同一实例,确保 Prefix Caching 命中率。

Q27: 什么是 Chunked Prefill?为什么能优化长序列推理? ⭐⭐⭐

答:

Chunked Prefill 是将长 prompt 的 Prefill 计算分成多个小块(chunk),与 Decode 请求交错执行的调度策略,由 Sarathi-Serve 等系统提出。

问题背景: 传统调度中,一个长 prompt 的 Prefill 会独占 GPU 一段时间(可能数百毫秒到数秒),导致正在 Decode 的请求被阻塞,产生延迟尖峰。

Chunked Prefill 原理

  1. 分块:将长 prompt 分成固定大小的 chunk(如 512 tokens)
  2. 交错调度:每个调度 step 中,只执行一个 chunk 的 Prefill + 所有 running 请求的 Decode
  3. 填充气泡:当 batch 较小时,Prefill chunk 可以"填充"Decode 的计算空隙,提高 GPU 利用率

优势

  • 消除长 Prefill 导致的延迟尖峰(Decode 请求的 TTBT 更稳定)
  • 更好的 GPU 利用率(Prefill 和 Decode 交错,减少气泡)
  • 与 Continuous Batching 配合,实现更细粒度的调度
python
# Chunked Prefill 调度逻辑伪代码
class ChunkedPrefillScheduler:
    def __init__(self, chunk_size=512, max_batch_size=32):
        self.chunk_size = chunk_size
        self.max_batch = max_batch_size
        self.running = []    # Decode 阶段的请求
        self.prefilling = [] # Prefill 阶段的请求(带 chunk 进度)
        self.waiting = []    # 等待队列
    
    def schedule_step(self):
        batch = []
        
        # 1. 加入所有 Decode 请求
        for req in self.running:
            batch.append(("decode", req))
        
        # 2. 加入一个 Prefill chunk(如果还有空间)
        if len(batch) < self.max_batch:
            if self.prefilling:
                req = self.prefilling[0]
                chunk = req.get_next_chunk(self.chunk_size)
                batch.append(("prefill", req, chunk))
                if req.is_prefill_done():
                    self.prefilling.pop(0)
                    self.running.append(req)
            elif self.waiting:
                req = self.waiting.pop(0)
                chunk = req.get_next_chunk(self.chunk_size)
                batch.append(("prefill", req, chunk))
                if not req.is_prefill_done():
                    self.prefilling.append(req)
                else:
                    self.running.append(req)
        
        # 3. 执行混合批次
        self.execute_batch(batch)

# vLLM 配置 Chunked Prefill
# from vllm import LLM
# llm = LLM(
#     model="meta-llama/Llama-2-7b-chat-hf",
#     enable_chunked_prefill=True,
#     max_num_batched_tokens=2048,   # 每 step 总 token 上限
#     max_num_seqs=32,               # 最大并发序列数
# )

# Sarathi-Serve 的核心思想是 "merge" 策略:
# 将 Prefill chunk 和 Decode step 合并到同一个 GPU kernel 中执行
# 通过调整 chunk 大小,可以精确控制延迟-吞吐量的权衡

追问:

  1. Chunk 大小如何选择? 太小会增加调度开销和 kernel launch 次数,太大会导致 Decode 延迟抖动。通常 256-1024 tokens,需要根据模型和硬件调优。
  2. Chunked Prefill 对 Prefill 本身有加速吗? 没有直接加速,甚至因为分块略有开销。但它通过减少 Decode 延迟的抖动,改善了整体服务质量。
  3. 与 Prefix Caching 的关系? Prefix Caching 命中的部分不需要 Prefill,可以直接跳过。两者是互补的优化。

Q28: 如何计算和优化 LLM 推理的吞吐量? ⭐⭐

答:

LLM 推理吞吐量通常以 tokens/s 衡量,包括单请求吞吐量和系统总吞吐量。

吞吐量计算

单请求吞吐量 = 生成 token 数 / 端到端时间
系统吞吐量 = 所有请求的总 token 数 / 时间窗口
Prefill 吞吐量 = 输入 token 数 / Prefill 时间(compute-bound)
Decode 吞吐量 = 输出 token 数 / Decode 时间(memory-bound)

优化方向

  1. 批处理大小:增大 batch size 提高 GPU 利用率,但受显存限制
  2. 模型并行:Tensor Parallelism + Pipeline Parallelism 分散计算
  3. 量化:INT8/INT4 减少显存占用,允许更大 batch
  4. KV Cache 优化:PagedAttention 减少碎片,Prefix Caching 减少重复计算
  5. Speculative Decoding:提高 Decode 阶段的有效吞吐
python
# 吞吐量基准测试脚本
import time
from dataclasses import dataclass
from vllm import LLM, SamplingParams

@dataclass
class ThroughputMetrics:
    total_input_tokens: int = 0
    total_output_tokens: int = 0
    total_time_s: float = 0
    
    @property
    def input_throughput(self) -> float:
        return self.total_input_tokens / self.total_time_s if self.total_time_s > 0 else 0
    
    @property
    def output_throughput(self) -> float:
        return self.total_output_tokens / self.total_time_s if self.total_time_s > 0 else 0

def benchmark_throughput(model_name: str, prompts: list[str], max_tokens: int = 256):
    """测量模型吞吐量"""
    llm = LLM(model=model_name, gpu_memory_utilization=0.9, max_model_len=4096)
    sampling_params = SamplingParams(max_tokens=max_tokens, temperature=0.0)
    
    # 预热
    llm.generate(["warmup"], sampling_params)
    
    # 正式测试
    start = time.perf_counter()
    outputs = llm.generate(prompts, sampling_params)
    elapsed = time.perf_counter() - start
    
    metrics = ThroughputMetrics(total_time_s=elapsed)
    for output in outputs:
        metrics.total_input_tokens += len(output.prompt_token_ids)
        metrics.total_output_tokens += len(output.outputs[0].token_ids)
    
    print(f"Batch Size: {len(prompts)}")
    print(f"Input Throughput: {metrics.input_throughput:.1f} tokens/s")
    print(f"Output Throughput: {metrics.output_throughput:.1f} tokens/s")
    print(f"Total Time: {elapsed:.2f}s")
    return metrics

# 不同 batch size 的吞吐量对比
# batch_sizes = [1, 4, 8, 16, 32, 64]
# for bs in batch_sizes:
#     prompts = [f"Explain quantum computing in detail. Request {i}" for i in range(bs)]
#     metrics = benchmark_throughput("meta-llama/Llama-2-7b-chat-hf", prompts)

# 瓶颈分析公式:
# Decode 受限于显存带宽:peak_throughput = memory_bandwidth / model_size_per_token
# 例如 A100 (80GB) 显存带宽 2TB/s,Llama-2-7B 每 token 约 14GB
# 理论最大 Decode 吞吐 ≈ 2000 / 14 ≈ 142 tokens/s (batch=1)
# 实际需要乘以 GPU 利用率(通常 50-70%)

追问:

  1. 如何确定系统是 Compute-bound 还是 Memory-bound? Prefill 阶段通常是 Compute-bound(大矩阵乘法),Decode 阶段通常是 Memory-bound(逐 token 读取 KV Cache)。可以通过 GPU 利用率和显存带宽利用率判断。
  2. Batch size 越大吞吐量越高吗? 在显存允许范围内,增大 batch 通常提高吞吐量。但超过某个点后,KV Cache 管理开销增加,且可能导致 OOM。
  3. 如何在延迟和吞吐量之间平衡? 设置合理的 max_num_seqs(最大并发请求数)和 max_num_batched_tokens(每 step 最大 token 数),在 SLA 约束下最大化吞吐。

十、Context Caching 与 Prompt 缓存

Q: 什么是 Context Caching?解决什么问题?

Context Caching 是指将 LLM 请求中重复出现的长前缀(如 System Prompt、Few-shot 示例、知识库上下文)缓存起来,避免每次请求都重新处理。

解决的核心问题

RAG 应用的典型请求:
├── System Prompt: 800 token(每次都一样)
├── 知识库上下文: 3000 token(同一文档可能多次查询)
├── Few-shot 示例: 500 token(每次都一样)
└── 用户问题: 50 token(每次不同)

每次请求都要 Prefill 4300 token → 浪费算力和时间!

Context Caching 后:
├── 缓存命中部分: 4300 token → 直接复用 KV Cache(跳过 Prefill)
└── 新增部分: 50 token → 只处理这部分

Prefill 从 4300 token 降到 50 token → 延迟降低 90%+!

各厂商实现

厂商名称特点
AnthropicPrompt Caching自动缓存,5 分钟 TTL,写入费 1.25x,读取费 0.1x
DeepSeekContext Caching自动缓存,无额外费用,命中时输入费减半
GoogleContext Caching手动创建缓存对象,按时间收费
OpenAI无(2025 年初)通过 GPT Cache 等第三方实现

Anthropic Prompt Caching 示例

python
import anthropic

client = anthropic.Anthropic()

response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": "你是一个专业的法律顾问..." * 100,  # 2000 token 的 System Prompt
            "cache_control": {"type": "ephemeral"}  # 标记为可缓存
        }
    ],
    messages=[{"role": "user", "content": "什么是知识产权?"}]
)

# 首次请求:cache_creation_input_tokens: 2000(写入缓存)
# 后续请求:cache_read_input_tokens: 2000(读取缓存,费用降为 1/10)

DeepSeek Context Caching

python
# DeepSeek 自动缓存,无需代码改动
# 首次请求:input_tokens: 4300, cache_miss: true
# 后续请求:input_tokens: 50, cache_hit: true
# 费用:缓存命中部分按半价计算

# 查看缓存状态
print(response.usage)
# prompt_tokens: 4300
# prompt_cache_hit_tokens: 4250  ← 命中 4250 token
# prompt_cache_miss_tokens: 50   ← 只有 50 token 未命中

Q: Context Caching 的底层实现原理是什么?

Context Caching 的本质是复用 KV Cache

传统流程:
Input tokens → [Prefill 计算] → KV Cache → [Decode 生成]
                 ↑ 每次都算

缓存流程:
首次请求:
Input tokens → [Prefill 计算] → KV Cache → 存入缓存池

后续请求:
新 token → [只对新 token 做 Prefill] → 拼接缓存的 KV → [Decode 生成]
           ↑ 只算增量

技术细节

  1. 前缀匹配:缓存基于 token 序列的前缀匹配

    请求 1: [System(800) + Doc(3000) + Q1(50)]
    请求 2: [System(800) + Doc(3000) + Q2(50)]
                         ↑ 前 3800 token 匹配,复用 KV Cache
  2. 哈希索引:对 token 序列计算哈希,快速查找缓存

    python
    def compute_cache_key(tokens: list[int]) -> str:
        # 对前缀 token 序列计算哈希
        return hashlib.sha256(bytes(tokens)).hexdigest()[:16]
  3. 显存管理:缓存的 KV Cache 存储在 GPU 显存中

    GPU 显存分配:
    ├── 模型权重: 14GB (7B 模型 FP16)
    ├── 活跃 KV Cache: 2GB
    ├── 缓存 KV Cache: 8GB  ← 用于 Context Caching
    └── 空闲: 8GB
  4. 淘汰策略:LRU 或 TTL-based

    缓存池:
    ├── [System Prompt KV]     TTL: 5min  使用次数: 1000
    ├── [RAG Doc A KV]         TTL: 3min  使用次数: 50
    ├── [Few-shot KV]          TTL: 5min  使用次数: 800
    └── [RAG Doc B KV]         TTL: 0     ← 过期,淘汰

Q: 如何最大化 Context Caching 的命中率?

策略 1: 固定前缀结构

python
# ❌ 差:随机顺序导致缓存失效
messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": user_query},
    {"role": "system", "content": context_docs},  # 位置不固定
]

# ✅ 好:固定顺序,前缀稳定
messages = [
    {"role": "system", "content": system_prompt + "\n\n" + context_docs},  # 固定前缀
    {"role": "user", "content": user_query},  # 只有这部分变化
]

策略 2: 分层缓存

python
# 将内容按变化频率分层
cache_layers = {
    "system_prompt": {"ttl": 300, "content": "..."},      # 5 分钟,几乎不变
    "few_shot": {"ttl": 300, "content": "..."},           # 5 分钟,几乎不变
    "rag_docs": {"ttl": 60, "content": "..."},            # 1 分钟,按文档变化
    "user_query": {"ttl": 0, "content": "..."},           # 不缓存,每次都变
}

# 构建请求时,按缓存层顺序排列
messages = build_messages(
    system=cache_layers["system_prompt"]["content"],
    context=cache_layers["few_shot"]["content"] + cache_layers["rag_docs"]["content"],
    user=user_query
)

策略 3: 批量请求复用

python
# 同一文档的多个问题,放在一起处理
async def batch_query(doc: str, questions: list[str]):
    # 构建公共前缀
    system = f"请基于以下文档回答问题:\n\n{doc}"

    results = []
    for q in questions:
        # 第一次请求会缓存 system 部分
        # 后续请求直接命中缓存
        result = await llm.generate(system=system, user=q)
        results.append(result)

    return results

命中率目标

  • RAG 应用:80-95%(System Prompt + 知识库文档可缓存)
  • 多轮对话:60-80%(对话历史不断增长,只有前缀可缓存)
  • 单次查询:0%(无重复前缀)

Q: Context Caching 的成本模型如何计算?

Anthropic 定价示例(Claude Sonnet):

标准价格:
├── 输入: $3 / 1M tokens
└── 输出: $15 / 1M tokens

Prompt Caching 价格:
├── 缓存写入: $3.75 / 1M tokens (1.25x)
├── 缓存读取: $0.30 / 1M tokens (0.1x)  ← 省 90%!
└── 输出: $15 / 1M tokens (不变)

成本计算

python
def calculate_cost(
    input_tokens: int,
    output_tokens: int,
    cache_hit_tokens: int,
    cache_miss_tokens: int
) -> dict:
    """计算带缓存的 API 费用"""

    # 标准价格
    standard_cost = (input_tokens * 3 + output_tokens * 15) / 1_000_000

    # 缓存价格
    cache_cost = (
        cache_miss_tokens * 3.75 +   # 写入缓存(首次)
        cache_hit_tokens * 0.30 +     # 读取缓存(命中)
        output_tokens * 15            # 输出
    ) / 1_000_000

    return {
        "standard_cost": f"${standard_cost:.4f}",
        "cache_cost": f"${cache_cost:.4f}",
        "savings": f"{(1 - cache_cost/standard_cost) * 100:.1f}%"
    }

# 示例:RAG 应用,4000 token 输入,500 token 输出
# 首次请求(缓存写入):
#   standard: $0.0195, cache: $0.0225(更贵!因为写入费 1.25x)
#
# 第 2 次请求(缓存命中 3800 token):
#   standard: $0.0195, cache: $0.0091(省 53%!)
#
# 第 10 次请求:
#   累计节省: ~70%

盈亏平衡点

当 cache_hit_tokens > 0 时,缓存总是更便宜(读取费 < 标准费)

但首次请求(只有写入):
写入费 1.25x > 标准费 1x

所以:同一前缀至少被使用 2 次,缓存才划算

盈亏平衡公式:
n * 标准费 > 写入费 + (n-1) * 读取费
n > 1.25x / (1x - 0.1x) = 1.39 次

结论:同一前缀被使用 ≥2 次,缓存就有正收益

Q: 自建 Context Caching 系统如何设计?

python
import hashlib
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class CacheEntry:
    key: str                    # 缓存键(token 前缀的哈希)
    kv_cache: bytes             # 序列化的 KV Cache
    token_count: int            # 缓存的 token 数
    created_at: float           # 创建时间
    last_used: float            # 最后使用时间
    use_count: int              # 使用次数
    ttl: float                  # 过期时间

class ContextCacheManager:
    """Context Caching 管理器"""

    def __init__(self, max_cache_size_gb: float = 8.0):
        self.cache: dict[str, CacheEntry] = {}
        self.max_cache_size = max_cache_size_gb * 1024 * 1024 * 1024  # bytes
        self.current_size = 0

    def compute_key(self, tokens: list[int]) -> str:
        """计算缓存键(前缀哈希)"""
        return hashlib.sha256(bytes(tokens[:1000])).hexdigest()[:16]

    def find_longest_prefix(self, tokens: list[int]) -> Optional[CacheEntry]:
        """查找最长前缀匹配"""
        best_match = None
        best_length = 0

        for entry in self.cache.values():
            # 检查是否过期
            if time.time() > entry.created_at + entry.ttl:
                continue

            # 检查前缀匹配
            entry_tokens = entry.token_count
            if entry_tokens > best_length and entry_tokens <= len(tokens):
                # 验证 token 序列匹配(抽样检查)
                if self._verify_prefix(tokens, entry_tokens, entry.key):
                    best_match = entry
                    best_length = entry_tokens

        return best_match

    def store(self, tokens: list[int], kv_cache: bytes, ttl: float = 300):
        """存储缓存"""
        key = self.compute_key(tokens)
        size = len(kv_cache)

        # 检查空间,必要时淘汰
        while self.current_size + size > self.max_cache_size:
            self._evict()

        entry = CacheEntry(
            key=key,
            kv_cache=kv_cache,
            token_count=len(tokens),
            created_at=time.time(),
            last_used=time.time(),
            use_count=0,
            ttl=ttl
        )

        self.cache[key] = entry
        self.current_size += size

    def _evict(self):
        """LRU 淘汰"""
        if not self.cache:
            return

        # 找到最久未使用的
        oldest_key = min(self.cache, key=lambda k: self.cache[k].last_used)
        entry = self.cache.pop(oldest_key)
        self.current_size -= len(entry.kv_cache)

    def _verify_prefix(self, tokens: list[int], length: int, cached_key: str) -> bool:
        """验证前缀匹配"""
        # 简化实现:比较哈希
        return self.compute_key(tokens[:length]) == cached_key

    def get_stats(self) -> dict:
        """获取缓存统计"""
        return {
            "entries": len(self.cache),
            "size_mb": self.current_size / 1024 / 1024,
            "total_use_count": sum(e.use_count for e in self.cache.values()),
            "hit_rate": self._calculate_hit_rate()
        }

十一、部署实战指南

Q: vLLM 部署实战:从零到生产 ⭐⭐

安装

bash
pip install vllm

启动服务

bash
# 基础启动
python -m vllm.entrypoints.openai.api_server \
    --model Qwen/Qwen2.5-7B-Instruct \
    --host 0.0.0.0 \
    --port 8000

# 生产配置
python -m vllm.entrypoints.openai.api_server \
    --model Qwen/Qwen2.5-7B-Instruct \
    --host 0.0.0.0 \
    --port 8000 \
    --tensor-parallel-size 2 \          # 多卡并行
    --max-model-len 32768 \             # 最大上下文
    --gpu-memory-utilization 0.9 \      # GPU 显存使用率
    --max-num-seqs 64 \                 # 最大并发请求
    --quantization awq \               # 量化
    --dtype bfloat16 \
    --enable-prefix-caching \           # 前缀缓存
    --swap-space 4 \                    # CPU swap 空间 (GB)
    --disable-log-requests              # 生产环境关闭请求日志

Docker 部署

dockerfile
FROM vllm/vllm-openai:latest

COPY ./model /model

CMD ["--model", "/model", \
     "--host", "0.0.0.0", \
     "--port", "8000", \
     "--tensor-parallel-size", "2", \
     "--gpu-memory-utilization", "0.9"]
yaml
# docker-compose.yml
services:
  vllm:
    image: vllm/vllm-openai:latest
    runtime: nvidia
    ports:
      - "8000:8000"
    volumes:
      - ./model:/model
    command: >
      --model /model
      --host 0.0.0.0
      --tensor-parallel-size 2
      --gpu-memory-utilization 0.9
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 2
              capabilities: [gpu]

客户端调用

python
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed"  # vLLM 不需要 API key
)

response = client.chat.completions.create(
    model="Qwen/Qwen2.5-7B-Instruct",
    messages=[{"role": "user", "content": "你好"}],
    temperature=0.7,
    max_tokens=512,
    stream=True  # 流式输出
)

for chunk in response:
    print(chunk.choices[0].delta.content, end="")

Q: 自建 vs API:怎么选? ⭐⭐

成本对比(以 7B 模型为例):

方案月成本适用场景
自建 (1×A100)~$800/月高调用量,数据隐私要求高
自建 (1×4090)~$300/月低延迟要求,本地部署
DeepSeek API$0.14/1M tokens调用量中等,不想运维
GPT-4o API$2.5/1M tokens需要最强能力
Claude API$3/1M tokens编程任务

盈亏平衡点

假设 7B 模型自建成本 $300/月(4090)

DeepSeek API: $0.14/1M tokens
$300 ÷ $0.14 × 1M = 2.14B tokens/月

当月调用量 > 2.1B tokens 时,自建更划算
约等于: 每天 70M tokens ≈ 每天 35M 汉字

决策矩阵

因素自建API
数据隐私✅ 完全控制❌ 数据出境风险
延迟✅ 本地 <10ms❌ 网络 50-200ms
可用性⚠️ 需自己保证✅ 厂商保证 99.9%
成本(低量)❌ 固定成本高✅ 按量付费
成本(高量)✅ 边际成本低❌ 费用线性增长
模型能力⚠️ 受限于模型✅ 最新最强模型
定制化✅ 可微调⚠️ 有限微调

Q: 推理服务如何做监控和告警? ⭐⭐

python
# 关键监控指标
MONITORING_METRICS = {
    # 延迟指标
    "ttft_ms": "首 Token 延迟(Time to First Token)",
    "tpot_ms": "每 Token 延迟(Time per Output Token)",
    "e2e_latency_ms": "端到端延迟",
    "latency_p50": "P50 延迟",
    "latency_p95": "P95 延迟",
    "latency_p99": "P99 延迟",
    
    # 吞吐指标
    "qps": "每秒请求数",
    "tokens_per_second": "每秒 Token 数",
    "concurrent_requests": "并发请求数",
    
    # 资源指标
    "gpu_utilization": "GPU 利用率",
    "gpu_memory_used": "GPU 显存使用",
    "queue_depth": "队列深度",
    
    # 质量指标
    "error_rate": "错误率",
    "timeout_rate": "超时率",
    "cache_hit_rate": "缓存命中率",
}

# Prometheus 告警规则
ALERTS = """
groups:
  - name: llm_alerts
    rules:
      - alert: HighLatency
        expr: llm_latency_p99_seconds > 5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "P99 延迟超过 5 秒"
      
      - alert: HighErrorRate
        expr: rate(llm_errors_total[5m]) / rate(llm_requests_total[5m]) > 0.05
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "错误率超过 5%"
      
      - alert: GPUOOM
        expr: nvidia_gpu_memory_used_bytes / nvidia_gpu_memory_total_bytes > 0.95
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "GPU 显存使用超过 95%"
"""

Q: 如何实现模型的 A/B 测试? ⭐⭐

python
import random

class ModelABRouter:
    """模型 A/B 测试路由"""
    
    def __init__(self, config: dict):
        self.config = config  # {"model_a": 0.7, "model_b": 0.3}
    
    def route(self, request) -> str:
        """按流量比例路由"""
        rand = random.random()
        cumulative = 0
        for model, weight in self.config.items():
            cumulative += weight
            if rand < cumulative:
                return model
        return list(self.config.keys())[-1]

# Nginx 配置
"""
upstream model_a {
    server 127.0.0.1:8001;
}
upstream model_b {
    server 127.0.0.1:8002;
}

split_clients "${request_id}" $backend {
    70% model_a;
    30% model_b;
}

server {
    listen 80;
    location /v1/chat/completions {
        proxy_pass http://$backend;
    }
}
"""

Q: 如何实现自动扩缩容? ⭐⭐

yaml
# Kubernetes HPA 配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-service
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-service
  minReplicas: 1
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Pods
      pods:
        metric:
          name: gpu_utilization
        target:
          type: AverageValue
          averageValue: "80"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Pods
          value: 2
          periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Pods
          value: 1
          periodSeconds: 120

Q: 多模型如何共享 GPU 资源? ⭐⭐⭐

方案一:时分复用(推荐)

python
# 多个模型共享同一组 GPU
# 通过 vLLM 的模型切换实现

from vllm import LLM

# 加载模型 A
llm_a = LLM(model="model_a", tensor_parallel_size=2)

# 用完后卸载,加载模型 B
del llm_a
torch.cuda.empty_cache()

llm_b = LLM(model="model_b", tensor_parallel_size=2)

方案二:MPS(NVIDIA Multi-Process Service)

bash
# 允许多个进程共享同一 GPU
nvidia-cuda-mps-control -d

# 多个 vLLM 实例共享 GPU
CUDA_MPS_ACTIVE_THREAD_PERCENTAGE=50 python -m vllm.entrypoints.openai.api_server --model model_a --port 8001 &
CUDA_MPS_ACTIVE_THREAD_PERCENTAGE=50 python -m vllm.entrypoints.openai.api_server --model model_b --port 8002 &

方案三:模型合并(Model Merging)

python
# 将多个 LoRA 合并到基座模型
from peft import PeftModel

base_model = AutoModelForCausalLM.from_pretrained("base_model")

# 加载不同的 LoRA
lora_a = PeftModel.from_pretrained(base_model, "lora_a")
lora_b = PeftModel.from_pretrained(base_model, "lora_b")

# 运行时切换 LoRA
# vLLM 支持 LoRA 动态加载

LLM 应用 & Agent 开发面试准备