22. 算法与编码题
面向大模型应用工程师/Agent 开发工程师的编码面试题,覆盖基础算法、RAG、Agent、模型推理、系统设计五大方向。每题均提供完整可运行的 Python 代码。
基础算法
Q1: 实现 BPE(Byte Pair Encoding)分词算法
难度:⭐⭐
题目描述:
实现一个简化版的 BPE 分词算法。给定一个训练语料,学习合并规则,然后用学到的词表对新文本进行分词。
解题思路:
- 将训练文本拆分为字符序列(每个词以特殊结束符
</w>结尾) - 统计所有相邻 token 对的频率
- 选择频率最高的 pair 合并为一个新 token
- 重复指定次数的合并,得到最终词表
- 用学到的合并规则对新文本分词
完整代码:
from collections import Counter, defaultdict
class BPETokenizer:
"""简化版 BPE 分词器"""
def __init__(self):
self.merges = [] # 合并规则列表,按顺序应用
self.vocab = set()
def _get_pair_freqs(self, word_freqs: dict) -> Counter:
"""统计所有相邻 token 对的频率"""
pairs = Counter()
for tokens, freq in word_freqs.items():
for i in range(len(tokens) - 1):
pairs[(tokens[i], tokens[i + 1])] += freq
return pairs
def _merge_pair(self, pair: tuple, word_freqs: dict) -> dict:
"""将指定 pair 在所有词中合并"""
new_word_freqs = {}
bigram = pair
replacement = pair[0] + pair[1]
for tokens, freq in word_freqs.items():
new_tokens = []
i = 0
while i < len(tokens):
if i < len(tokens) - 1 and tokens[i] == bigram[0] and tokens[i + 1] == bigram[1]:
new_tokens.append(replacement)
i += 2
else:
new_tokens.append(tokens[i])
i += 1
new_word_freqs[tuple(new_tokens)] = freq
return new_word_freqs
def train(self, corpus: list[str], num_merges: int = 10):
"""
训练 BPE
Args:
corpus: 文本列表,如 ["low lower lowest"]
num_merges: 合并次数
"""
# 统计词频,每个词拆成字符序列加结束符
word_freqs = Counter()
for text in corpus:
for word in text.strip().split():
chars = tuple(list(word) + ["</w>"])
word_freqs[chars] += 1
# 初始化词表
self.vocab = set()
for tokens in word_freqs:
self.vocab.update(tokens)
# 迭代合并
for _ in range(num_merges):
pair_freqs = self._get_pair_freqs(word_freqs)
if not pair_freqs:
break
best_pair = pair_freqs.most_common(1)[0][0]
self.merges.append(best_pair)
word_freqs = self._merge_pair(best_pair, word_freqs)
# 新 token 加入词表
self.vocab.add(best_pair[0] + best_pair[1])
def tokenize(self, text: str) -> list[str]:
"""用学到的合并规则对文本分词"""
all_tokens = []
for word in text.strip().split():
tokens = list(word) + ["</w>"]
for pair in self.merges:
new_tokens = []
i = 0
while i < len(tokens):
if i < len(tokens) - 1 and tokens[i] == pair[0] and tokens[i + 1] == pair[1]:
new_tokens.append(pair[0] + pair[1])
i += 2
else:
new_tokens.append(tokens[i])
i += 1
tokens = new_tokens
all_tokens.extend(tokens)
return all_tokens
# ---- 测试 ----
if __name__ == "__main__":
corpus = [
"low low low low low",
"lower lower lower",
"newest newest newest newest",
"widest widest widest",
]
tokenizer = BPETokenizer()
tokenizer.train(corpus, num_merges=10)
print("合并规则:", tokenizer.merges)
print("词表大小:", len(tokenizer.vocab))
print("分词结果:", tokenizer.tokenize("low lower newest"))复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 训练时间 | O(N × M × num_merges),N=词数,M=平均词长 |
| 推理时间 | O(T × num_merges),T=输入 token 数 |
| 空间 | O(V),V=词表大小 |
追问:
- 真实的 BPE 实现(如 GPT-2 的 tiktoken)如何处理 UTF-8 字节级别的编码?字节级 BPE 有什么优势?
- SentencePiece 使用的 Unigram 模型与 BPE 的核心区别是什么?各自在什么场景下更优?
Q2: 实现一个简单的向量相似度搜索(余弦相似度 + 暴力搜索)
难度:⭐
题目描述:
实现一个简易的向量数据库,支持添加向量和基于余弦相似度的 Top-K 搜索。
解题思路:
- 存储向量及其关联的文本/元数据
- 余弦相似度 = (A·B) / (‖A‖ × ‖B‖)
- 暴力遍历所有向量计算相似度,取 Top-K
完整代码:
import numpy as np
from dataclasses import dataclass, field
@dataclass
class VectorRecord:
id: int
vector: np.ndarray
text: str
metadata: dict = field(default_factory=dict)
class SimpleVectorStore:
"""暴力搜索的简单向量数据库"""
def __init__(self):
self.records: list[VectorRecord] = []
self._next_id = 0
@staticmethod
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算余弦相似度"""
dot = np.dot(a, b)
norm = np.linalg.norm(a) * np.linalg.norm(b)
if norm == 0:
return 0.0
return dot / norm
def add(self, vectors: list[np.ndarray], texts: list[str],
metadatas: list[dict] | None = None):
"""批量添加向量"""
for i, (vec, txt) in enumerate(zip(vectors, texts)):
meta = metadatas[i] if metadatas else {}
self.records.append(VectorRecord(
id=self._next_id, vector=vec, text=txt, metadata=meta
))
self._next_id += 1
def search(self, query: np.ndarray, top_k: int = 3) -> list[dict]:
"""返回与 query 最相似的 top_k 条记录"""
scored = []
for rec in self.records:
sim = self._cosine_similarity(query, rec.vector)
scored.append({
"id": rec.id,
"text": rec.text,
"score": sim,
"metadata": rec.metadata,
})
scored.sort(key=lambda x: x["score"], reverse=True)
return scored[:top_k]
# ---- 测试 ----
if __name__ == "__main__":
np.random.seed(42)
dim = 128
store = SimpleVectorStore()
# 添加一些随机向量
texts = ["机器学习基础", "深度学习进阶", "自然语言处理", "计算机视觉", "强化学习"]
vectors = [np.random.randn(dim) for _ in texts]
store.add(vectors, texts)
# 查询
query = np.random.randn(dim)
results = store.search(query, top_k=3)
for r in results:
print(f"[{r['score']:.4f}] {r['text']}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 添加 | O(1) per vector |
| 搜索 | O(N × D),N=向量数,D=维度 |
| 空间 | O(N × D) |
追问:
- 当向量规模达到百万级时,暴力搜索不可行,有哪些加速方案?(IVF、HNSW、PQ 等)
- 余弦相似度和欧氏距离在什么场景下等价?L2 归一化后两者的关系是什么?
Q3: 实现 HNSW(Hierarchical Navigable Small World)索引的简化版
难度:⭐⭐⭐
题目描述:
实现一个简化版的 HNSW 索引,支持插入向量和近似最近邻搜索。要求包含多层图结构和贪心搜索逻辑。
解题思路:
- HNSW 是多层的 NSW 图:底层包含所有节点,上层节点逐层稀疏
- 插入时随机决定节点最高层,从最高层开始贪心搜索找到最近邻
- 搜索时从顶层的入口点开始,逐层下降,每层做贪心 BFS
完整代码:
import heapq
import math
import random
from collections import defaultdict
import numpy as np
class HNSWIndex:
"""简化版 HNSW 索引"""
def __init__(self, dim: int, max_layers: int = 4, ef_construction: int = 50,
m: int = 16, m_max: int = 32):
self.dim = dim
self.max_layers = max_layers
self.ef_construction = ef_construction
self.m = m # 每层连接数
self.m_max = m_max # 每层最大连接数
self.ml = 1.0 / math.log(m) # 层间概率因子
# layer -> {node_id: [neighbor_ids]}
self.graphs: dict[int, dict[int, list[int]]] = defaultdict(lambda: defaultdict(list))
self.vectors: dict[int, np.ndarray] = {}
self.entry_point: int | None = None
self.max_level: int = -1
self._next_id = 0
@staticmethod
def _distance(a: np.ndarray, b: np.ndarray) -> float:
return float(np.linalg.norm(a - b))
def _random_level(self) -> int:
"""随机生成节点的最高层级"""
level = 0
while random.random() < (1.0 / self.m) and level < self.max_layers - 1:
level += 1
return level
def _search_layer(self, query: np.ndarray, entry_points: list[int],
ef: int, layer: int) -> list[int]:
"""在指定层做贪心 beam search,返回 ef 个最近邻"""
visited = set(entry_points)
# candidates: min-heap by distance; results: max-heap by distance (negated)
candidates = []
results = []
for ep in entry_points:
d = self._distance(query, self.vectors[ep])
heapq.heappush(candidates, (d, ep))
heapq.heappush(results, (-d, ep))
while candidates:
d_c, c = heapq.heappop(candidates)
# 当前最远结果的距离
d_f = -results[0][0]
if d_c > d_f:
break
for neighbor in self.graphs[layer].get(c, []):
if neighbor in visited:
continue
visited.add(neighbor)
d_n = self._distance(query, self.vectors[neighbor])
if d_n < d_f or len(results) < ef:
heapq.heappush(candidates, (d_n, neighbor))
heapq.heappush(results, (-d_n, neighbor))
if len(results) > ef:
heapq.heappop(results)
return [nid for _, nid in sorted(results, key=lambda x: -x[0])]
def insert(self, vector: np.ndarray) -> int:
"""插入一个向量,返回 node_id"""
node_id = self._next_id
self._next_id += 1
self.vectors[node_id] = vector
level = self._random_level()
if self.entry_point is None:
self.entry_point = node_id
self.max_level = level
return node_id
# 从顶层贪心搜索到 level+1 层
ep = self.entry_point
for l in range(self.max_level, level, -1):
eps = self._search_layer(vector, [ep], ef=1, layer=l)
ep = eps[0] if eps else ep
# 从 level 层到底层,搜索并建立双向连接
for l in range(min(level, self.max_level), -1, -1):
neighbors = self._search_layer(vector, [ep], self.ef_construction, layer=l)
# 选择最近的 m 个作为连接
selected = neighbors[:self.m]
self.graphs[l][node_id] = list(selected)
for nb in selected:
self.graphs[l][nb].append(node_id)
# 修剪超出 m_max 的连接
if len(self.graphs[l][nb]) > self.m_max:
# 按距离排序保留最近的 m_max
dists = [(self._distance(self.vectors[nb], self.vectors[nb2]), nb2)
for nb2 in self.graphs[l][nb]]
dists.sort()
self.graphs[l][nb] = [nid for _, nid in dists[:self.m_max]]
ep = neighbors[0] if neighbors else ep
if level > self.max_level:
self.max_level = level
self.entry_point = node_id
return node_id
def search(self, query: np.ndarray, top_k: int = 5, ef: int = 50) -> list[tuple[int, float]]:
"""搜索 top_k 最近邻,返回 (node_id, distance) 列表"""
if self.entry_point is None:
return []
ep = self.entry_point
# 从顶层贪心到第 1 层
for l in range(self.max_level, 0, -1):
eps = self._search_layer(query, [ep], ef=1, layer=l)
ep = eps[0] if eps else ep
# 在第 0 层做 beam search
candidates = self._search_layer(query, [ep], ef=max(ef, top_k), layer=0)
# 按距离排序返回 top_k
scored = [(nid, self._distance(query, self.vectors[nid])) for nid in candidates]
scored.sort(key=lambda x: x[1])
return scored[:top_k]
# ---- 测试 ----
if __name__ == "__main__":
random.seed(42)
np.random.seed(42)
dim = 64
index = HNSWIndex(dim=dim, m=8, m_max=16, ef_construction=40, max_layers=3)
# 插入 500 个随机向量
n = 500
all_vecs = np.random.randn(n, dim).astype(np.float32)
for i in range(n):
index.insert(all_vecs[i])
# 查询
query = np.random.randn(dim).astype(np.float32)
results = index.search(query, top_k=5, ef=50)
print("HNSW 搜索结果 (top-5):")
for nid, dist in results:
print(f" node={nid}, dist={dist:.4f}")
# 暴力验证
brute = sorted(range(n), key=lambda i: np.linalg.norm(all_vecs[i] - query))[:5]
print("\n暴力搜索 top-5:", brute)复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 插入 | O(m × log(N) × D) 平均 |
| 搜索 | O(log(N) × D) 平均,远优于暴力 O(N×D) |
| 空间 | O(N × m × L),L=平均层数 |
追问:
- HNSW 的
ef_search参数如何影响搜索精度和速度的权衡?实际生产中如何调优? - HNSW 与 IVF-PQ 的主要区别是什么?在什么数据规模下各自更合适?
Q4: 实现文本分块算法(固定大小 + 递归分块)
难度:⭐⭐
题目描述:
实现两种文本分块策略:
- 固定大小分块(Fixed-size Chunking):按固定 token 数切分,支持重叠
- 递归分块(Recursive Character Splitting):按分隔符层级递归切分
解题思路:
- 固定大小:按 chunk_size 步进,step = chunk_size - overlap 确保重叠
- 递归分块:先用
\n\n分割,若超长则用\n,再超长用空格,最后用字符切分
完整代码:
def fixed_size_chunks(text: str, chunk_size: int = 100, overlap: int = 20) -> list[str]:
"""
固定大小分块(按字符数)
Args:
text: 输入文本
chunk_size: 每块字符数
overlap: 重叠字符数
Returns:
分块列表
"""
if not text or chunk_size <= 0:
return []
step = chunk_size - overlap
if step <= 0:
raise ValueError("overlap 必须小于 chunk_size")
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start += step
return chunks
def recursive_split(text: str, chunk_size: int = 200, chunk_overlap: int = 50,
separators: list[str] | None = None) -> list[str]:
"""
递归字符分割(类似 LangChain 的 RecursiveCharacterTextSplitter)
Args:
text: 输入文本
chunk_size: 最大块大小(字符数)
chunk_overlap: 重叠大小
separators: 分隔符优先级列表
Returns:
分块列表
"""
if separators is None:
separators = ["\n\n", "\n", " ", ""]
def _split_recursive(t: str, seps: list[str]) -> list[str]:
if len(t) <= chunk_size:
return [t] if t.strip() else []
sep = seps[0] if seps else ""
remaining_seps = seps[1:] if len(seps) > 1 else []
if sep == "":
# 最后手段:按字符硬切
return fixed_size_chunks(t, chunk_size, chunk_overlap)
# 按当前分隔符分割
parts = t.split(sep)
# 合并小块
current = ""
chunks = []
for part in parts:
candidate = current + sep + part if current else part
if len(candidate) <= chunk_size:
current = candidate
else:
if current:
chunks.append(current)
# 如果单个 part 仍然超长,递归用下一级分隔符
if len(part) > chunk_size:
chunks.extend(_split_recursive(part, remaining_seps))
current = ""
else:
current = part
if current:
chunks.append(current)
# 添加重叠
if chunk_overlap > 0 and len(chunks) > 1:
overlapped = [chunks[0]]
for i in range(1, len(chunks)):
prev_tail = chunks[i - 1][-chunk_overlap:]
overlapped.append(prev_tail + sep + chunks[i])
return overlapped
return chunks
result = _split_recursive(text.strip(), separators)
return [c for c in result if c.strip()]
# ---- 测试 ----
if __name__ == "__main__":
sample = """这是一个测试文档。
它包含了多个段落。每个段落都有不同的内容。
这是第一段的一些详细信息。
这是第二段。它讨论了另一个话题。
这里有一些额外的细节,使得这段文字变得比较长,需要被正确地分块处理。
这是第三段。"""
print("=== 固定大小分块 ===")
for i, c in enumerate(fixed_size_chunks(sample, chunk_size=50, overlap=10)):
print(f"[块{i}] ({len(c)}字符): {c!r}")
print("\n=== 递归分块 ===")
for i, c in enumerate(recursive_split(sample, chunk_size=80, chunk_overlap=20)):
print(f"[块{i}] ({len(c)}字符): {c!r}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 固定大小 | O(N),N=文本长度 |
| 递归分块 | O(N × S),S=分隔符层级数 |
| 空间 | O(N) |
追问:
- 在 RAG 场景中,chunk_size 和 chunk_overlap 如何影响检索效果?有没有自适应的分块策略?
- 语义分块(Semantic Chunking)的原理是什么?与固定/递归分块相比有何优劣?
RAG 相关
Q5: 实现一个简单的 RAG Pipeline
难度:⭐⭐
题目描述:
实现一个完整的 RAG 流水线:文档加载 → 文本分块 → Embedding 编码 → 向量检索 → 上下文注入 → 生成回答。可使用模拟的 Embedding 和 LLM。
解题思路:
- 文档加载器将文件读入
- 分块器将长文档切分为小块
- Embedding 模型将文本块转为向量,存入向量库
- 用户提问时,将问题编码为向量,检索最相关的 K 个文本块
- 将检索结果拼入 Prompt,送入 LLM 生成回答
完整代码:
import numpy as np
from dataclasses import dataclass, field
# ---- 组件定义 ----
class FakeEmbedding:
"""模拟 Embedding 模型(用哈希生成固定维度的伪向量)"""
def __init__(self, dim: int = 64):
self.dim = dim
def encode(self, texts: list[str]) -> np.ndarray:
vectors = []
for t in texts:
seed = hash(t) % (2**31)
rng = np.random.RandomState(seed)
vec = rng.randn(self.dim).astype(np.float32)
vec /= np.linalg.norm(vec) # L2 归一化
vectors.append(vec)
return np.array(vectors)
class FakeLLM:
"""模拟 LLM,简单拼接上下文返回"""
def generate(self, prompt: str, context: str, question: str) -> str:
return (
f"根据以下参考资料回答问题:\n\n"
f"【参考资料】\n{context}\n\n"
f"【问题】{question}\n\n"
f"【回答】基于提供的 {prompt.count('[文档片段]')} 段参考资料,"
f"以下是关于「{question}」的回答:{context[:100]}..."
)
@dataclass
class Chunk:
text: str
metadata: dict = field(default_factory=dict)
embedding: np.ndarray | None = None
class SimpleVectorStore:
"""简易向量存储与检索"""
def __init__(self):
self.chunks: list[Chunk] = []
def add(self, chunks: list[Chunk]):
self.chunks.extend(chunks)
def search(self, query_vec: np.ndarray, top_k: int = 3) -> list[tuple[Chunk, float]]:
results = []
for chunk in self.chunks:
sim = np.dot(query_vec, chunk.embedding)
results.append((chunk, float(sim)))
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
# ---- RAG Pipeline ----
class RAGPipeline:
def __init__(self, chunk_size: int = 100, chunk_overlap: int = 20, top_k: int = 3):
self.embedder = FakeEmbedding(dim=64)
self.llm = FakeLLM()
self.store = SimpleVectorStore()
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.top_k = top_k
def _chunk_text(self, text: str) -> list[str]:
step = self.chunk_size - self.chunk_overlap
chunks = []
i = 0
while i < len(text):
chunks.append(text[i:i + self.chunk_size])
i += step
return chunks
def ingest(self, documents: list[str]):
"""文档加载 + 分块 + Embedding + 存储"""
all_chunks = []
for doc_idx, doc in enumerate(documents):
text_chunks = self._chunk_text(doc)
embeddings = self.embedder.encode(text_chunks)
for i, (text, emb) in enumerate(zip(text_chunks, embeddings)):
chunk = Chunk(
text=text,
metadata={"doc_id": doc_idx, "chunk_index": i},
embedding=emb,
)
all_chunks.append(chunk)
self.store.add(all_chunks)
print(f"已摄入 {len(all_chunks)} 个文本块")
def query(self, question: str) -> str:
"""检索 + 生成"""
# 1. 问题编码
q_vec = self.embedder.encode([question])[0]
# 2. 向量检索
results = self.store.search(q_vec, top_k=self.top_k)
# 3. 构建上下文
context_parts = []
for chunk, score in results:
context_parts.append(f"[文档片段] (相似度:{score:.4f})\n{chunk.text}")
context = "\n\n".join(context_parts)
# 4. 生成
return self.llm.generate(prompt=context, context=context, question=question)
# ---- 测试 ----
if __name__ == "__main__":
documents = [
"RAG(检索增强生成)是一种结合信息检索和文本生成的技术。它通过从外部知识库中检索相关文档,将其作为上下文输入到大语言模型中,从而生成更准确、更有依据的回答。",
"向量数据库是 RAG 系统的关键组件。它将文本转换为高维向量,并通过近似最近邻算法快速检索相似内容。常见的向量数据库包括 FAISS、Milvus、Pinecone 等。",
"Embedding 模型将文本映射到稠密向量空间。好的 Embedding 模型能使语义相似的文本在向量空间中距离更近。常用模型有 OpenAI text-embedding-ada-002、BGE、E5 等。",
]
rag = RAGPipeline(chunk_size=80, chunk_overlap=20, top_k=2)
rag.ingest(documents)
answer = rag.query("什么是 RAG?")
print(answer)复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 摄入 | O(N × D),N=文本块数,D=Embedding 维度 |
| 查询 | O(N × D + K × D) |
| 空间 | O(N × D) |
追问:
- 如何处理多模态文档(表格、图片)的 RAG?与纯文本 RAG 的 Pipeline 有何不同?
- RAG 中的查询改写(Query Rewriting)有哪些常见技术?HyDE、Multi-query 分别是什么?
Q6: 实现 BM25 检索算法
难度:⭐⭐
题目描述:
实现 BM25 文本检索算法,支持文档索引和查询打分。BM25 是传统信息检索中的经典算法,在 RAG 中常与向量检索混合使用。
解题思路:
- 对文档集进行分词和索引
- BM25 评分公式:
score(q,d) = Σ IDF(qi) × (f(qi,d) × (k1+1)) / (f(qi,d) + k1 × (1 - b + b × |d|/avgdl)) - IDF = log((N - n(qi) + 0.5) / (n(qi) + 0.5) + 1)
完整代码:
import math
from collections import Counter, defaultdict
from dataclasses import dataclass
def simple_tokenize(text: str) -> list[str]:
"""简易中文/英文分词(按字符和空格)"""
import re
# 英文按空格,中文按字(简化处理)
tokens = []
for seg in re.findall(r'[\u4e00-\u9fff]+|[a-zA-Z]+|[0-9]+', text.lower()):
if re.match(r'[\u4e00-\u9fff]', seg):
tokens.extend(list(seg)) # 中文按字切分
else:
tokens.append(seg)
return tokens
class BM25:
"""BM25 检索算法"""
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.doc_count = 0
self.avgdl = 0.0
self.doc_freqs: dict[str, int] = defaultdict(int) # term -> 包含该 term 的文档数
self.doc_lens: list[int] = []
self.tf: list[dict[str, int]] = [] # 每篇文档的 term freq
self.docs: list[str] = [] # 原始文档
self.idf: dict[str, float] = {}
def index(self, documents: list[str]):
"""索引文档列表"""
self.docs = documents
total_len = 0
for doc in documents:
tokens = simple_tokenize(doc)
total_len += len(tokens)
tf = Counter(tokens)
self.tf.append(dict(tf))
self.doc_lens.append(len(tokens))
# 更新文档频率
seen = set(tokens)
for t in seen:
self.doc_freqs[t] += 1
self.doc_count = len(documents)
self.avgdl = total_len / self.doc_count if self.doc_count > 0 else 0
# 预计算 IDF
for term, df in self.doc_freqs.items():
self.idf[term] = math.log(
(self.doc_count - df + 0.5) / (df + 0.5) + 1
)
def _score_doc(self, query_tokens: list[str], doc_idx: int) -> float:
score = 0.0
doc_tf = self.tf[doc_idx]
dl = self.doc_lens[doc_idx]
for qt in query_tokens:
if qt not in doc_tf:
continue
f = doc_tf[qt]
idf = self.idf.get(qt, 0)
numerator = f * (self.k1 + 1)
denominator = f + self.k1 * (1 - self.b + self.b * dl / self.avgdl)
score += idf * (numerator / denominator)
return score
def search(self, query: str, top_k: int = 3) -> list[dict]:
"""搜索并返回 top_k 结果"""
tokens = simple_tokenize(query)
scored = []
for i in range(self.doc_count):
score = self._score_doc(tokens, i)
scored.append({
"doc_index": i,
"text": self.docs[i],
"score": score,
})
scored.sort(key=lambda x: x["score"], reverse=True)
return scored[:top_k]
# ---- 测试 ----
if __name__ == "__main__":
docs = [
"RAG 是检索增强生成技术,用于提升大语言模型的回答质量。",
"BM25 是经典的文本检索算法,基于词频和逆文档频率。",
"向量数据库存储高维向量,支持近似最近邻搜索。",
"Embedding 模型将文本映射到向量空间。",
"BM25 在关键词匹配方面表现优异,与向量检索互补。",
]
bm25 = BM25()
bm25.index(docs)
results = bm25.search("BM25 检索算法", top_k=3)
for r in results:
print(f"[{r['score']:.4f}] {r['text']}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 索引 | O(N × L),N=文档数,L=平均长度 |
| 查询 | O(N × Q),Q=查询词数 |
| 空间 | O(N × V),V=词表大小 |
追问:
- BM25 与向量检索在什么场景下会互补?混合检索(Hybrid Search)的融合策略有哪些(RRF 等)?
- BM25 的 k1 和 b 参数分别控制什么?如何通过实验调优?
Q7: 实现 Reranking 逻辑(交叉编码器打分 + 排序)
难度:⭐⭐
题目描述:
实现一个 Reranking 模块:给定查询和候选文档列表,用交叉编码器方式计算相关性分数并重新排序。由于真实模型较重,这里用模拟打分器演示完整流程。
解题思路:
- 初次检索(召回)返回较多候选(如 top-20)
- Reranker 将 (query, doc) 拼接后精细打分
- 按 reranker 分数重新排序,返回 top-K
完整代码:
import numpy as np
from dataclasses import dataclass
@dataclass
class RetrievalResult:
doc_id: int
text: str
retrieval_score: float
rerank_score: float = 0.0
class CrossEncoderReranker:
"""
模拟交叉编码器 Reranker
实际实现中会调用 BGE-Reranker / Cohere Rerank 等模型
"""
def __init__(self, dim: int = 64):
self.dim = dim
# 模拟一个打分用的线性权重
np.random.seed(0)
self.weights = np.random.randn(dim)
def _encode_pair(self, query: str, doc: str) -> np.ndarray:
"""模拟交叉编码:将 query+doc 拼接后编码为向量"""
combined = query + " [SEP] " + doc
seed = hash(combined) % (2**31)
rng = np.random.RandomState(seed)
vec = rng.randn(self.dim).astype(np.float32)
return vec
def score(self, query: str, docs: list[str]) -> list[float]:
"""对每个 (query, doc) 对打分"""
scores = []
for doc in docs:
vec = self._encode_pair(query, doc)
s = float(np.dot(vec, self.weights))
scores.append(s)
return scores
def rerank(self, query: str, results: list[RetrievalResult], top_k: int = 3) -> list[RetrievalResult]:
"""重排序"""
docs = [r.text for r in results]
scores = self.score(query, docs)
for i, s in enumerate(scores):
results[i].rerank_score = s
results.sort(key=lambda x: x.rerank_score, reverse=True)
return results[:top_k]
class RAGWithReranking:
"""带 Reranking 的 RAG 流程"""
def __init__(self):
self.reranker = CrossEncoderReranker()
# 模拟向量召回结果
self.candidate_pool: list[RetrievalResult] = []
def recall(self, query_vec: np.ndarray, top_k: int = 10) -> list[RetrievalResult]:
"""模拟向量召回(返回 top_k 较大值)"""
# 用 mock 数据模拟
return self.candidate_pool[:top_k]
def query(self, query: str, recall_results: list[RetrievalResult],
final_top_k: int = 3) -> list[RetrievalResult]:
"""完整流程:召回 → Rerank → 返回"""
# Rerank
reranked = self.reranker.rerank(query, recall_results, top_k=final_top_k)
return reranked
# ---- 测试 ----
if __name__ == "__main__":
docs = [
"BM25 是经典的关键词检索算法",
"余弦相似度用于衡量向量之间的方向相似性",
"HNSW 是一种高效的近似最近邻索引算法",
"RAG 通过检索外部知识来增强生成模型的回答",
"Transformer 架构是现代 NLP 的基础",
"向量数据库支持高效的相似性搜索",
"BM25 基于词频和逆文档频率计算相关性得分",
"深度学习在 NLP 中的应用越来越广泛",
]
# 模拟召回阶段的结果(按原始分数排序)
candidates = []
for i, doc in enumerate(docs):
score = np.random.random()
candidates.append(RetrievalResult(
doc_id=i, text=doc, retrieval_score=score
))
candidates.sort(key=lambda x: x.retrieval_score, reverse=True)
print("=== 召回阶段 (top-6) ===")
for c in candidates[:6]:
print(f" [{c.retrieval_score:.4f}] {c.text}")
# Rerank
reranker = CrossEncoderReranker()
reranked = reranker.rerank("什么是 BM25 检索算法?", candidates[:6], top_k=3)
print("\n=== Rerank 后 (top-3) ===")
for r in reranked:
print(f" [rerank:{r.rerank_score:.4f} recall:{r.retrieval_score:.4f}] {r.text}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 打分 | O(K × D),K=候选数,D=模型维度 |
| 排序 | O(K log K) |
| 总查询 | 召回 O(log N) + Rerank O(K × D) |
追问:
- 真实场景中,BGE-Reranker 与 Cohere Rerank 的架构有什么区别?各自的延迟和效果如何?
- Reranking 的候选数量 K 应该设多大?太大会怎样?太小呢?
Agent 相关
Q8: 实现 ReAct 循环(Thought → Action → Observation)
难度:⭐⭐⭐
题目描述:
实现 ReAct(Reasoning and Acting)Agent 框架。Agent 能够循环执行:思考(Thought)→ 选择动作(Action)→ 执行并观察结果(Observation),直到得出最终答案。
解题思路:
- 维护工具注册表(函数名 → 可调用对象)
- 每轮循环:构建包含历史的 Prompt → LLM 输出 Thought + Action → 解析 Action → 执行 → 拿到 Observation
- 若 LLM 输出
FINISH则结束循环
完整代码:
import json
import re
from dataclasses import dataclass, field
from typing import Callable, Any
@dataclass
class Tool:
name: str
description: str
func: Callable[..., str]
class FakeLLM:
"""模拟 LLM,根据问题返回预设的 Thought/Action"""
def __init__(self):
self.call_count = 0
def generate(self, prompt: str) -> str:
self.call_count += 1
# 简单模拟:第一次思考并调用工具,第二次结束
if self.call_count == 1:
return (
"Thought: 用户问的是天气,我需要调用天气查询工具。\n"
"Action: search_weather\n"
'Action Input: {"city": "北京"}'
)
elif self.call_count == 2:
return (
"Thought: 我已经获得了天气信息,可以回答用户了。\n"
"Action: finish\n"
'Action Input: {"answer": "北京今天天气晴朗,温度25°C。"}'
)
return "Thought: 我可以回答了。\nAction: finish\nAction Input: {\"answer\": \"无法确定。\"}"
class ReActAgent:
"""ReAct Agent 框架"""
def __init__(self, tools: list[Tool], max_steps: int = 10):
self.tools = {t.name: t for t in tools}
self.max_steps = max_steps
self.llm = FakeLLM()
self.history: list[dict] = []
def _build_prompt(self, query: str) -> str:
tool_desc = "\n".join(
f"- {t.name}: {t.description}" for t in self.tools.values()
)
history_str = ""
for h in self.history:
history_str += f"\n{h['thought']}\n{h['action']}: {h['action_input']}\nObservation: {h['observation']}"
return (
f"你是一个 ReAct Agent。你可以使用以下工具:\n{tool_desc}\n\n"
f"请严格按照以下格式回答:\n"
f"Thought: <你的思考>\nAction: <工具名>\nAction Input: <JSON参数>\n\n"
f"当你准备好最终回答时:\nThought: <你的思考>\nAction: finish\nAction Input: {{\"answer\": \"...\"}}"
f"\n\n问题: {query}{history_str}"
)
def _parse_response(self, text: str) -> dict:
thought = ""
action = ""
action_input = ""
thought_match = re.search(r'Thought:\s*(.+?)(?:\n|$)', text)
if thought_match:
thought = thought_match.group(1).strip()
action_match = re.search(r'Action:\s*(.+?)(?:\n|$)', text)
if action_match:
action = action_match.group(1).strip()
input_match = re.search(r'Action Input:\s*(.+?)(?:\n|$)', text)
if input_match:
action_input = input_match.group(1).strip()
return {"thought": thought, "action": action, "action_input": action_input}
def _execute_tool(self, action: str, action_input: str) -> str:
if action == "finish":
return action_input
tool = self.tools.get(action)
if not tool:
return f"错误:未知工具 '{action}'"
try:
params = json.loads(action_input)
return tool.func(**params)
except Exception as e:
return f"工具执行错误: {e}"
def run(self, query: str) -> str:
"""执行 ReAct 循环"""
for step in range(self.max_steps):
prompt = self._build_prompt(query)
response = self.llm.generate(prompt)
parsed = self._parse_response(response)
if parsed["action"] == "finish":
try:
final = json.loads(parsed["action_input"])
return final.get("answer", parsed["action_input"])
except json.JSONDecodeError:
return parsed["action_input"]
# 执行工具
observation = self._execute_tool(parsed["action"], parsed["action_input"])
# 记录历史
self.history.append({
"thought": parsed["thought"],
"action": parsed["action"],
"action_input": parsed["action_input"],
"observation": observation,
})
return "达到最大步数限制,未能得出答案。"
# ---- 测试 ----
if __name__ == "__main__":
def search_weather(city: str) -> str:
weather_data = {"北京": "晴朗 25°C", "上海": "多云 28°C", "广州": "阵雨 30°C"}
return weather_data.get(city, f"未找到 {city} 的天气数据")
def search_knowledge(query: str) -> str:
return f"关于「{query}」的知识:这是一个测试知识库的回答。"
tools = [
Tool(name="search_weather", description="查询城市天气", func=search_weather),
Tool(name="search_knowledge", description="查询知识库", func=search_knowledge),
]
agent = ReActAgent(tools=tools, max_steps=5)
answer = agent.run("北京今天天气怎么样?")
print("最终回答:", answer)
print("\n执行历史:")
for i, h in enumerate(agent.history):
print(f" 步骤{i+1}: Thought={h['thought']}")
print(f" Action={h['action']}, Input={h['action_input']}")
print(f" Observation={h['observation']}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 每步时间 | O(1) 框架开销 + LLM 推理时间 |
| 总时间 | O(steps × LLM_latency) |
| 空间 | O(steps × context_length) |
追问:
- 如何防止 ReAct Agent 进入死循环或反复调用同一工具?有哪些常见的退出策略?
- ReAct 与 Plan-and-Execute、Reflexion 等 Agent 模式的核心区别是什么?
Q9: 实现 Function Calling 的参数解析和执行器
难度:⭐⭐
题目描述:
实现一个 Function Calling 系统,包括:工具注册(JSON Schema 定义)、LLM 输出解析、参数校验、函数执行、结果封装。
解题思路:
- 用 JSON Schema 定义函数签名(名称、描述、参数类型、必填字段)
- 解析 LLM 的 function_call 输出(JSON 格式)
- 校验参数类型和必填项
- 执行函数并格式化返回
完整代码:
import json
from dataclasses import dataclass, field
from typing import Any, Callable
from datetime import datetime
@dataclass
class ParameterDef:
name: str
type: str # "string", "integer", "float", "boolean", "array"
description: str
required: bool = True
enum: list | None = None
@dataclass
class FunctionDef:
name: str
description: str
parameters: list[ParameterDef]
handler: Callable[..., Any]
def to_schema(self) -> dict:
"""生成 OpenAI 风格的 function schema"""
props = {}
required = []
for p in self.parameters:
prop = {"type": p.type, "description": p.description}
if p.enum:
prop["enum"] = p.enum
props[p.name] = prop
if p.required:
required.append(p.name)
return {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": props,
"required": required,
}
}
class FunctionCallExecutor:
"""Function Calling 执行器"""
def __init__(self):
self.functions: dict[str, FunctionDef] = {}
def register(self, func_def: FunctionDef):
self.functions[func_def.name] = func_def
def get_tools_schema(self) -> list[dict]:
"""获取所有工具的 schema(供 LLM 使用)"""
return [fd.to_schema() for fd in self.functions.values()]
def _validate_params(self, func_def: FunctionDef, params: dict) -> tuple[bool, str]:
"""校验参数"""
for p in func_def.parameters:
if p.required and p.name not in params:
return False, f"缺少必填参数: {p.name}"
if p.name in params:
val = params[p.name]
type_map = {
"string": str, "integer": int, "float": float,
"boolean": bool, "array": list,
}
expected_type = type_map.get(p.type)
if expected_type and not isinstance(val, expected_type):
# 尝试类型转换
try:
if p.type == "integer":
params[p.name] = int(val)
elif p.type == "float":
params[p.name] = float(val)
elif p.type == "string":
params[p.name] = str(val)
except (ValueError, TypeError):
return False, f"参数 {p.name} 类型错误,期望 {p.type}"
if p.enum and params[p.name] not in p.enum:
return False, f"参数 {p.name} 的值必须在 {p.enum} 中"
return True, "OK"
def execute(self, function_call: dict) -> dict:
"""
执行 LLM 返回的 function_call
Args:
function_call: {"name": "...", "arguments": "{...}"}
Returns:
{"result": ..., "error": ...}
"""
name = function_call.get("name", "")
args_str = function_call.get("arguments", "{}")
if name not in self.functions:
return {"error": f"未知函数: {name}"}
func_def = self.functions[name]
try:
params = json.loads(args_str) if isinstance(args_str, str) else args_str
except json.JSONDecodeError as e:
return {"error": f"参数 JSON 解析失败: {e}"}
valid, msg = self._validate_params(func_def, params)
if not valid:
return {"error": f"参数校验失败: {msg}"}
try:
result = func_def.handler(**params)
return {"result": result}
except Exception as e:
return {"error": f"函数执行失败: {e}"}
# ---- 测试 ----
if __name__ == "__main__":
executor = FunctionCallExecutor()
# 注册工具
executor.register(FunctionDef(
name="get_weather",
description="获取指定城市的天气信息",
parameters=[
ParameterDef("city", "string", "城市名称", required=True),
ParameterDef("unit", "string", "温度单位", required=False, enum=["celsius", "fahrenheit"]),
],
handler=lambda city, unit="celsius": f"{city}天气:晴朗,25{'°C' if unit == 'celsius' else '°F'}"
))
executor.register(FunctionDef(
name="send_email",
description="发送邮件",
parameters=[
ParameterDef("to", "string", "收件人邮箱", required=True),
ParameterDef("subject", "string", "邮件主题", required=True),
ParameterDef("body", "string", "邮件正文", required=True),
],
handler=lambda to, subject, body: f"邮件已发送至 {to},主题:{subject}"
))
# 打印 schema
print("=== 工具 Schema ===")
print(json.dumps(executor.get_tools_schema(), ensure_ascii=False, indent=2))
# 模拟 LLM 输出的 function_call
print("\n=== 执行 function_call ===")
calls = [
{"name": "get_weather", "arguments": '{"city": "北京"}'},
{"name": "send_email", "arguments": '{"to": "test@example.com", "subject": "测试", "body": "Hello"}'},
{"name": "get_weather", "arguments": '{"city": "上海", "unit": "fahrenheit"}'},
{"name": "unknown_func", "arguments": '{}'},
]
for call in calls:
result = executor.execute(call)
print(f" {call['name']}({call['arguments']}) => {result}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 注册 | O(1) |
| 校验 | O(P),P=参数数量 |
| 执行 | 取决于具体函数 |
追问:
- OpenAI 的 function calling 和 tool_choice 参数如何控制工具调用行为?parallel function calling 是什么?
- 如何处理函数执行结果过长的情况?截断策略有哪些?
Q10: 实现一个简单的 Agent 记忆系统(短期 + 长期记忆)
难度:⭐⭐⭐
题目描述:
实现 Agent 的双层记忆系统:
- 短期记忆(Working Memory):当前对话轮次的上下文
- 长期记忆(Long-term Memory):跨会话持久化的关键信息
解题思路:
- 短期记忆:维护一个滑动窗口的对话历史
- 长期记忆:提取关键信息(事实、偏好、摘要)存入结构化存储
- 查询时融合短期上下文和相关长期记忆
完整代码:
import json
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any
@dataclass
class MemoryEntry:
content: str
timestamp: float
memory_type: str # "fact", "preference", "summary", "conversation"
metadata: dict = field(default_factory=dict)
importance: float = 0.5 # 0-1 重要性评分
class ShortTermMemory:
"""短期记忆:滑动窗口对话历史"""
def __init__(self, max_turns: int = 10):
self.max_turns = max_turns
self.messages: deque[dict] = deque(maxlen=max_turns * 2) # user + assistant
def add(self, role: str, content: str):
self.messages.append({
"role": role,
"content": content,
"timestamp": time.time(),
})
def get_context(self) -> list[dict]:
return list(self.messages)
def get_context_string(self) -> str:
lines = []
for msg in self.messages:
lines.append(f"{msg['role']}: {msg['content']}")
return "\n".join(lines)
def clear(self):
self.messages.clear()
class LongTermMemory:
"""长期记忆:结构化持久存储"""
def __init__(self):
self.entries: list[MemoryEntry] = []
def add(self, content: str, memory_type: str = "fact",
importance: float = 0.5, metadata: dict | None = None):
self.entries.append(MemoryEntry(
content=content,
timestamp=time.time(),
memory_type=memory_type,
metadata=metadata or {},
importance=importance,
))
def search(self, query: str, top_k: int = 5, memory_type: str | None = None) -> list[MemoryEntry]:
"""简易搜索:按关键词匹配 + 重要性排序"""
results = []
query_lower = query.lower()
for entry in self.entries:
if memory_type and entry.memory_type != memory_type:
continue
# 简单关键词匹配分数
content_lower = entry.content.lower()
match_score = sum(1 for word in query_lower.split() if word in content_lower)
combined_score = match_score * 0.6 + entry.importance * 0.4
if match_score > 0:
results.append((combined_score, entry))
results.sort(key=lambda x: x[0], reverse=True)
return [entry for _, entry in results[:top_k]]
def get_all(self, memory_type: str | None = None) -> list[MemoryEntry]:
if memory_type:
return [e for e in self.entries if e.memory_type == memory_type]
return list(self.entries)
class MemoryExtractor:
"""记忆提取器:从对话中提取关键信息存入长期记忆"""
def extract(self, user_msg: str, assistant_msg: str) -> list[dict]:
"""模拟提取(实际用 LLM 做信息抽取)"""
extracted = []
# 模拟:检测偏好
preference_keywords = ["喜欢", "不喜欢", "偏好", "习惯", "最爱"]
for kw in preference_keywords:
if kw in user_msg:
extracted.append({
"content": f"用户表达偏好:{user_msg[:50]}",
"type": "preference",
"importance": 0.7,
})
break
# 模拟:检测事实
fact_keywords = ["我是", "我在", "我的", "叫做", "名字"]
for kw in fact_keywords:
if kw in user_msg:
extracted.append({
"content": f"用户事实:{user_msg[:80]}",
"type": "fact",
"importance": 0.8,
})
break
return extracted
class AgentMemorySystem:
"""Agent 记忆系统"""
def __init__(self, max_short_term_turns: int = 10):
self.short_term = ShortTermMemory(max_turns=max_short_term_turns)
self.long_term = LongTermMemory()
self.extractor = MemoryExtractor()
def on_user_message(self, content: str):
"""处理用户消息"""
self.short_term.add("user", content)
def on_assistant_message(self, content: str, user_msg: str = ""):
"""处理助手消息,同时提取长期记忆"""
self.short_term.add("assistant", content)
if user_msg:
memories = self.extractor.extract(user_msg, content)
for mem in memories:
self.long_term.add(
content=mem["content"],
memory_type=mem["type"],
importance=mem["importance"],
)
def build_context(self, query: str = "", long_term_top_k: int = 3) -> str:
"""构建融合短期和长期记忆的上下文"""
parts = []
# 长期记忆
if query:
relevant = self.long_term.search(query, top_k=long_term_top_k)
if relevant:
parts.append("=== 相关历史记忆 ===")
for entry in relevant:
parts.append(f"[{entry.memory_type}] {entry.content}")
parts.append("")
# 短期记忆(当前对话)
context = self.short_term.get_context_string()
if context:
parts.append("=== 当前对话 ===")
parts.append(context)
return "\n".join(parts)
def summarize_and_consolidate(self):
"""将短期记忆摘要后存入长期记忆"""
context = self.short_term.get_context_string()
if context.strip():
self.long_term.add(
content=f"对话摘要:{context[:200]}",
memory_type="summary",
importance=0.6,
)
self.short_term.clear()
# ---- 测试 ----
if __name__ == "__main__":
mem = AgentMemorySystem(max_short_term_turns=5)
# 模拟多轮对话
conversations = [
("你好,我叫张明,在字节跳动工作。", "你好张明!很高兴认识你。"),
("我喜欢用 Python 编程,不太喜欢 Java。", "了解!Python 确实很流行。"),
("你能推荐一些学习资源吗?", "当然,推荐你看看 LangChain 文档。"),
("我在做一个 RAG 项目。", "RAG 很有意思,有什么需要帮忙的吗?"),
("我想了解一下向量数据库。", "向量数据库是 RAG 的核心组件..."),
]
for user_msg, asst_msg in conversations:
mem.on_user_message(user_msg)
mem.on_assistant_message(asst_msg, user_msg=user_msg)
# 查看长期记忆
print("=== 长期记忆 ===")
for entry in mem.long_term.get_all():
print(f" [{entry.memory_type}] (重要性:{entry.importance}) {entry.content}")
# 搜索相关记忆
print("\n=== 搜索「编程偏好」 ===")
results = mem.long_term.search("编程 Python 偏好")
for r in results:
print(f" [{r.memory_type}] {r.content}")
# 构建上下文
print("\n=== 融合上下文 ===")
ctx = mem.build_context(query="用户偏好")
print(ctx)复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 添加(短期) | O(1) |
| 添加(长期) | O(1) |
| 搜索(长期) | O(N × Q),N=记忆条数,Q=查询词数 |
| 上下文构建 | O(top_k + W),W=短期窗口大小 |
追问:
- 实际生产中,长期记忆通常存储在向量数据库中,检索时用语义相似度而非关键词匹配。如何实现?
- MemGPT(现名 Letta)的虚拟内存机制是如何工作的?与上述方案有何区别?
模型推理相关
Q11: 实现 KV Cache 的简化版
难度:⭐⭐⭐
题目描述:
实现 Transformer 自回归解码中 KV Cache 的简化版本。演示缓存如何避免重复计算,加速逐 token 生成。
解题思路:
- 自回归生成时,每生成一个新 token 都需要计算 attention
- 不用 cache 时,每次需重新计算所有 token 的 K、V
- 用 cache 时,将之前的 K、V 缓存起来,只计算新 token 的 Q、K、V,再拼接
完整代码:
import numpy as np
class SimpleAttention:
"""简化版自注意力(单头)"""
def __init__(self, d_model: int):
self.d_model = d_model
np.random.seed(42)
self.W_q = np.random.randn(d_model, d_model) * 0.01
self.W_k = np.random.randn(d_model, d_model) * 0.01
self.W_v = np.random.randn(d_model, d_model) * 0.01
def forward(self, x: np.ndarray) -> np.ndarray:
"""
标准自注意力(无缓存)
Args:
x: (seq_len, d_model)
Returns:
output: (seq_len, d_model)
"""
Q = x @ self.W_q
K = x @ self.W_k
V = x @ self.W_v
scores = Q @ K.T / np.sqrt(self.d_model)
# 因果掩码
seq_len = x.shape[0]
mask = np.triu(np.ones((seq_len, seq_len)), k=1) * -1e9
scores = scores + mask
weights = self._softmax(scores)
return weights @ V
def forward_with_cache(self, x_new: np.ndarray,
kv_cache: dict | None = None) -> tuple[np.ndarray, dict]:
"""
带 KV Cache 的自注意力
Args:
x_new: (1, d_model) 新 token 的嵌入
kv_cache: {"k": (cached_len, d_model), "v": (cached_len, d_model)}
Returns:
output: (1, d_model)
updated_cache
"""
q_new = x_new @ self.W_q # (1, d_model)
k_new = x_new @ self.W_k # (1, d_model)
v_new = x_new @ self.W_v # (1, d_model)
if kv_cache is not None and kv_cache.get("k") is not None:
# 拼接缓存
K = np.concatenate([kv_cache["k"], k_new], axis=0) # (cached+1, d_model)
V = np.concatenate([kv_cache["v"], v_new], axis=0)
else:
K = k_new
V = v_new
# Q 只有当前 token,K/V 包含所有历史
scores = q_new @ K.T / np.sqrt(self.d_model) # (1, cached+1)
weights = self._softmax(scores) # (1, cached+1)
output = weights @ V # (1, d_model)
new_cache = {"k": K, "v": V}
return output, new_cache
@staticmethod
def _softmax(x: np.ndarray) -> np.ndarray:
e = np.exp(x - np.max(x, axis=-1, keepdims=True))
return e / e.sum(axis=-1, keepdims=True)
# ---- 测试 ----
if __name__ == "__main__":
d_model = 32
seq_len = 10
attn = SimpleAttention(d_model)
# 模拟输入序列
x = np.random.randn(seq_len, d_model).astype(np.float32)
# 方式1:无缓存,一次性计算
output_full = attn.forward(x)
print(f"无缓存输出 shape: {output_full.shape}")
print(f"最后一个 token 的输出(前5维): {output_full[-1, :5]}")
# 方式2:有缓存,逐 token 计算
kv_cache = {"k": None, "v": None}
for i in range(seq_len):
out, kv_cache = attn.forward_with_cache(x[i:i+1, :], kv_cache)
print(f"\n有缓存最终输出(前5维): {out[0, :5]}")
print(f"KV Cache 形状 - K: {kv_cache['k'].shape}, V: {kv_cache['v'].shape}")
# 验证一致性
diff = np.abs(output_full[-1, :] - out[0, :])
print(f"\n两种方式的输出差异(最大): {diff.max():.8f}")
print(f"输出一致: {diff.max() < 1e-5}")
# 性能对比示意
print("\n=== 计算量对比 ===")
for s in [100, 500, 1000]:
ops_no_cache = s * s * d_model
ops_cache = s * 1 * d_model
print(f"序列长度 {s}: 无缓存={ops_no_cache:,} ops, 有缓存={ops_cache:,} ops, "
f"加速比={ops_no_cache/ops_cache:.0f}x")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 无缓存(生成 N tokens) | O(N² × D) |
| 有缓存(生成 N tokens) | O(N × D)(每次只需 1×D 的注意力) |
| 缓存空间 | O(N × D × L × H),L=层数,H=头数 |
追问:
- 在多头注意力和多层 Transformer 中,KV Cache 的总内存占用如何计算?对于 7B 模型、序列长度 4096,大约需要多少显存?
- MQA(Multi-Query Attention)和 GQA(Grouped-Query Attention)如何减少 KV Cache 的大小?
Q12: 实现贪心解码和采样解码
难度:⭐⭐
题目描述:
实现两种常见的文本解码策略:
- 贪心解码(Greedy Decoding):每步选概率最高的 token
- 采样解码(Sampling):按概率分布随机采样,支持 temperature 和 top-k/top-p
解题思路:
- 贪心解码直接 argmax
- 采样解码通过 temperature 调整分布平坦度
- Top-k 保留概率最高的 k 个 token
- Top-p(Nucleus Sampling)保留累积概率达到 p 的最小 token 集合
完整代码:
import numpy as np
class TextDecoder:
"""文本解码器"""
def __init__(self, vocab: list[str]):
self.vocab = vocab
self.vocab_size = len(vocab)
def _softmax(self, logits: np.ndarray) -> np.ndarray:
e = np.exp(logits - np.max(logits))
return e / e.sum()
def _apply_temperature(self, logits: np.ndarray, temperature: float) -> np.ndarray:
if temperature <= 0:
temperature = 1e-8
return logits / temperature
def _apply_top_k(self, probs: np.ndarray, k: int) -> np.ndarray:
top_k_indices = np.argsort(probs)[-k:]
mask = np.zeros_like(probs)
mask[top_k_indices] = 1.0
probs = probs * mask
return probs / probs.sum()
def _apply_top_p(self, probs: np.ndarray, p: float) -> np.ndarray:
sorted_indices = np.argsort(probs)[::-1]
sorted_probs = probs[sorted_indices]
cumsum = np.cumsum(sorted_probs)
# 找到累积概率超过 p 的位置
cutoff = np.searchsorted(cumsum, p) + 1
mask = np.zeros_like(probs)
mask[sorted_indices[:cutoff]] = 1.0
probs = probs * mask
return probs / probs.sum()
def greedy_decode(self, logits: np.ndarray) -> int:
"""贪心解码:选概率最高的"""
return int(np.argmax(logits))
def sample_decode(self, logits: np.ndarray, temperature: float = 1.0,
top_k: int = 0, top_p: float = 1.0) -> int:
"""采样解码"""
logits = self._apply_temperature(logits, temperature)
probs = self._softmax(logits)
if top_k > 0:
probs = self._apply_top_k(probs, top_k)
if top_p < 1.0:
probs = self._apply_top_p(probs, top_p)
return int(np.random.choice(len(probs), p=probs))
def generate(self, get_logits_fn, max_tokens: int = 20,
strategy: str = "greedy", temperature: float = 1.0,
top_k: int = 0, top_p: float = 1.0,
eos_token_id: int | None = None) -> list[int]:
"""
生成序列
Args:
get_logits_fn: 接受 token_ids 列表,返回 (vocab_size,) 的 logits
"""
token_ids = []
for _ in range(max_tokens):
logits = get_logits_fn(token_ids)
if strategy == "greedy":
next_id = self.greedy_decode(logits)
else:
next_id = self.sample_decode(logits, temperature, top_k, top_p)
token_ids.append(next_id)
if eos_token_id is not None and next_id == eos_token_id:
break
return token_ids
# ---- 测试 ----
if __name__ == "__main__":
vocab = ["<pad>", "<eos>", "我", "喜欢", "学习", "机器", "学习", "深度", "自然", "语言", "处理", "模型", "AI", "很好", "今天", "天气", "不错"]
decoder = TextDecoder(vocab)
# 模拟 logits 生成函数(带简单的上下文依赖)
np.random.seed(42)
def mock_get_logits(token_ids: list[int]) -> np.ndarray:
logits = np.random.randn(len(vocab))
# 让某些 token 更可能在特定位置出现
if len(token_ids) == 0:
logits[2] = 5.0 # "我"
elif len(token_ids) == 1:
logits[3] = 4.0 # "喜欢"
elif len(token_ids) == 2:
logits[5] = 4.5 # "机器"
logits[7] = 4.0 # "深度"
return logits
# 贪心解码
print("=== 贪心解码 ===")
ids = decoder.generate(mock_get_logits, max_tokens=5, strategy="greedy")
print(f"Token IDs: {ids}")
print(f"Tokens: {[vocab[i] for i in ids]}")
# 采样解码(不同温度)
for temp in [0.1, 0.7, 1.0, 1.5]:
np.random.seed(42)
ids = decoder.generate(mock_get_logits, max_tokens=5, strategy="sample", temperature=temp)
print(f"温度 {temp}: {[vocab[i] for i in ids]}")
# Top-k + Top-p
print("\n=== Top-k + Top-p 采样 ===")
np.random.seed(42)
ids = decoder.generate(mock_get_logits, max_tokens=5, strategy="sample",
temperature=0.8, top_k=3, top_p=0.9)
print(f"top-k=3, top-p=0.9: {[vocab[i] for i in ids]}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 贪心解码 | O(V),V=词表大小 |
| 采样解码 | O(V log V)(排序用于 top-k/p) |
| 生成 N tokens | O(N × V) 或 O(N × V log V) |
追问:
- Beam Search 与贪心解码的区别是什么?Beam Search 的 beam_size 对生成质量有什么影响?
- Speculative Decoding(投机解码)的原理是什么?如何保证输出分布与原始模型完全一致?
Q13: 实现简单的 Token 计数器
难度:⭐
题目描述:
实现一个 Token 计数器,支持:按规则估算中英文 token 数、按模型区分计算方式、计算 API 调用费用。
解题思路:
- 不同模型的 token 化方式不同(如 GPT 系列约 1 token ≈ 4 英文字符或 1-2 个中文字)
- 提供基于规则的快速估算和基于实际 tokenizer 的精确计算
- 根据模型的定价计算费用
完整代码:
from dataclasses import dataclass
@dataclass
class ModelPricing:
name: str
input_price_per_1k: float # 每 1000 tokens 的输入价格(美元)
output_price_per_1k: float # 每 1000 tokens 的输出价格
chars_per_token_en: float # 英文每 token 约多少字符
chars_per_token_zh: float # 中文每 token 约多少字符
# 常见模型定价(2024 年参考价)
MODEL_PRICING = {
"gpt-4o": ModelPricing("gpt-4o", 0.0025, 0.01, 4.0, 1.5),
"gpt-4o-mini": ModelPricing("gpt-4o-mini", 0.00015, 0.0006, 4.0, 1.5),
"gpt-3.5-turbo": ModelPricing("gpt-3.5-turbo", 0.0005, 0.0015, 4.0, 1.5),
"claude-3-opus": ModelPricing("claude-3-opus", 0.015, 0.075, 3.5, 1.3),
"claude-3-sonnet": ModelPricing("claude-3-sonnet", 0.003, 0.015, 3.5, 1.3),
}
class TokenCounter:
"""Token 计数器"""
def __init__(self, model: str = "gpt-4o"):
self.model = model
self.pricing = MODEL_PRICING.get(model)
if not self.pricing:
raise ValueError(f"未知模型: {model}")
def _count_chars(self, text: str) -> tuple[int, int]:
"""统计中英文字符数"""
en_chars = 0
zh_chars = 0
for ch in text:
if '\u4e00' <= ch <= '\u9fff' or '\u3000' <= ch <= '\u303f':
zh_chars += 1
else:
en_chars += 1
return en_chars, zh_chars
def estimate_tokens(self, text: str) -> int:
"""估算 token 数"""
en_chars, zh_chars = self._count_chars(text)
en_tokens = en_chars / self.pricing.chars_per_token_en
zh_tokens = zh_chars / self.pricing.chars_per_token_zh
return int(en_tokens + zh_tokens) + 3 # +3 for special tokens
def count_messages(self, messages: list[dict]) -> int:
"""计算消息列表的 token 数"""
total = 0
for msg in messages:
total += 4 # 消息格式开销
total += self.estimate_tokens(msg.get("role", ""))
total += self.estimate_tokens(msg.get("content", ""))
total += 2 # reply priming
return total
def estimate_cost(self, input_tokens: int, output_tokens: int) -> dict:
"""估算 API 调用费用"""
input_cost = (input_tokens / 1000) * self.pricing.input_price_per_1k
output_cost = (output_tokens / 1000) * self.pricing.output_price_per_1k
return {
"model": self.model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"input_cost_usd": round(input_cost, 6),
"output_cost_usd": round(output_cost, 6),
"total_cost_usd": round(input_cost + output_cost, 6),
}
def analyze(self, messages: list[dict], estimated_output_tokens: int = 500) -> dict:
"""完整分析"""
input_tokens = self.count_messages(messages)
cost = self.estimate_cost(input_tokens, estimated_output_tokens)
return {
"input_tokens": input_tokens,
"estimated_output_tokens": estimated_output_tokens,
"cost": cost,
"model_pricing": {
"input_per_1k": self.pricing.input_price_per_1k,
"output_per_1k": self.pricing.output_price_per_1k,
}
}
# ---- 测试 ----
if __name__ == "__main__":
# 测试不同文本
texts = [
"Hello, how are you today?",
"你好,今天天气怎么样?",
"这是一段混合 text,包含 English 和中文 characters。",
"请帮我分析一下这段代码的功能:def hello(): print('world')",
]
for model in ["gpt-4o", "gpt-4o-mini", "claude-3-sonnet"]:
counter = TokenCounter(model=model)
print(f"\n=== {model} ===")
for text in texts:
tokens = counter.estimate_tokens(text)
print(f" [{tokens} tokens] {text[:40]}...")
# 消息分析
print("\n=== API 调用费用分析 ===")
messages = [
{"role": "system", "content": "你是一个专业的AI助手,擅长编程和算法。"},
{"role": "user", "content": "请解释一下 Transformer 的注意力机制是如何工作的?"},
{"role": "assistant", "content": "Transformer 的注意力机制是通过计算 Query、Key、Value 三个矩阵来实现的..."},
{"role": "user", "content": "那多头注意力和单头注意力有什么区别呢?"},
]
for model in ["gpt-4o", "gpt-4o-mini", "claude-3-sonnet"]:
counter = TokenCounter(model=model)
result = counter.analyze(messages, estimated_output_tokens=800)
cost = result["cost"]
print(f" {model}: 输入 {cost['input_tokens']} tokens, "
f"输出 {cost['estimated_output_tokens']} tokens, "
f"费用 ${cost['total_cost_usd']:.4f}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| token 估算 | O(N),N=文本字符数 |
| 消息统计 | O(N × M),M=消息数 |
| 空间 | O(1) |
追问:
- 如何精确计算 token 数?tiktoken 和 HuggingFace Tokenizers 库的使用方法是什么?
- 在 Agent 系统中,如何设计 token 预算管理?当上下文即将超出限制时,有哪些截断策略?
系统设计相关
Q14: 实现一个限流器(Token Bucket)
难度:⭐⭐
题目描述:
实现 Token Bucket(令牌桶)限流器,支持配置桶容量和令牌添加速率,用于保护 LLM API 调用。
解题思路:
- 桶有固定容量,以固定速率添加令牌
- 每次请求消耗一个令牌,无令牌时拒绝
- 支持多维限流(如每分钟请求数 + 每分钟 token 数)
完整代码:
import time
import threading
from dataclasses import dataclass
class TokenBucket:
"""令牌桶限流器"""
def __init__(self, capacity: float, refill_rate: float):
"""
Args:
capacity: 桶容量(最大令牌数)
refill_rate: 每秒补充的令牌数
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.monotonic()
self._lock = threading.Lock()
def _refill(self):
"""补充令牌"""
now = time.monotonic()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
def acquire(self, tokens: int = 1) -> bool:
"""尝试获取令牌,成功返回 True"""
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens: int = 1, timeout: float = 10.0) -> bool:
"""等待直到获取到令牌或超时"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if self.acquire(tokens):
return True
# 计算需要等待的时间
with self._lock:
self._refill()
if self.tokens >= tokens:
return True
wait_time = (tokens - self.tokens) / self.refill_rate
time.sleep(min(wait_time, 0.1))
return False
@property
def available_tokens(self) -> float:
with self._lock:
self._refill()
return self.tokens
class MultiDimensionRateLimiter:
"""多维度限流器(如 RPM + TPM)"""
def __init__(self):
self.limiters: dict[str, TokenBucket] = {}
def add_limiter(self, name: str, capacity: float, refill_rate: float):
self.limiters[name] = TokenBucket(capacity, refill_rate)
def acquire(self, **kwargs) -> tuple[bool, str]:
"""
尝试获取所有维度的令牌
kwargs: limiter_name=tokens_needed
"""
# 先检查所有
for name, tokens_needed in kwargs.items():
if name not in self.limiters:
return False, f"未知限流器: {name}"
if self.limiters[name].tokens < tokens_needed:
return False, f"限流: {name} 不足"
# 全部满足才扣减
for name, tokens_needed in kwargs.items():
self.limiters[name].acquire(tokens_needed)
return True, "OK"
def status(self) -> dict:
return {
name: round(lb.available_tokens, 2)
for name, lb in self.limiters.items()
}
# ---- 测试 ----
if __name__ == "__main__":
# 单维限流
print("=== 单维令牌桶限流 ===")
bucket = TokenBucket(capacity=5, refill_rate=2) # 容量5,每秒补充2个
for i in range(8):
ok = bucket.acquire()
print(f" 请求 {i+1}: {'✅ 通过' if ok else '❌ 拒绝'} "
f"(剩余: {bucket.available_tokens:.1f})")
# 等待恢复
print("\n等待 1 秒...")
time.sleep(1)
print(f"补充后剩余: {bucket.available_tokens:.1f}")
# 多维限流(模拟 API 限流)
print("\n=== 多维限流(RPM + TPM)===")
limiter = MultiDimensionRateLimiter()
limiter.add_limiter("rpm", capacity=10, refill_rate=10/60) # 10 次/分钟
limiter.add_limiter("tpm", capacity=1000, refill_rate=1000/60) # 1000 tokens/分钟
requests = [
{"rpm": 1, "tpm": 150},
{"rpm": 1, "tpm": 300},
{"rpm": 1, "tpm": 200},
{"rpm": 1, "tpm": 400},
]
for i, req in enumerate(requests):
ok, msg = limiter.acquire(**req)
status = limiter.status()
print(f" 请求 {i+1} {req}: {'✅' if ok else '❌'} {msg} | 余量: {status}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| acquire | O(1)(单维度) |
| 多维 acquire | O(D),D=维度数 |
| 空间 | O(D) |
追问:
- 令牌桶(Token Bucket)和漏桶(Leaky Bucket)、滑动窗口(Sliding Window)的区别是什么?各自适合什么场景?
- 在分布式系统中,如何实现跨节点的限流?Redis + Lua 脚本的方案如何工作?
Q15: 实现一个简单的 Prompt 模板引擎
难度:⭐
题目描述:
实现一个 Prompt 模板引擎,支持变量替换、条件渲染、循环渲染、模板嵌套等常见功能。
解题思路:
- 用
{{variable}}语法做变量替换 - 用
{% if condition %}...{% endif %}做条件渲染 - 用
{% for item in items %}...{% endfor %}做循环 - 用
{% include template_name %}做模板嵌套
完整代码:
import re
from typing import Any
class PromptTemplate:
"""Prompt 模板引擎"""
def __init__(self, template: str, name: str = ""):
self.template = template
self.name = name
def render(self, **kwargs) -> str:
"""渲染模板"""
result = self.template
# 处理 for 循环
result = self._render_for(result, kwargs)
# 处理 if 条件
result = self._render_if(result, kwargs)
# 处理变量替换
result = self._render_vars(result, kwargs)
return result.strip()
def _render_vars(self, template: str, context: dict) -> str:
"""{{variable}} 变量替换,支持 {{var|default_value}}"""
def replace_var(match):
expr = match.group(1).strip()
if "|" in expr:
var_name, default = expr.split("|", 1)
var_name = var_name.strip()
default = default.strip()
else:
var_name = expr
default = match.group(0) # 未找到则保留原样
# 支持嵌套属性 a.b
value = context
for key in var_name.split("."):
if isinstance(value, dict):
value = value.get(key, None)
else:
value = getattr(value, key, None)
if value is None:
return default
return str(value)
return re.sub(r'\{\{(.+?)\}\}', replace_var, template)
def _render_for(self, template: str, context: dict) -> str:
"""{% for item in items %}...{% endfor %} 循环渲染"""
pattern = r'\{%\s*for\s+(\w+)\s+in\s+(\w+)\s*%\}(.*?)\{%\s*endfor\s*%\}'
def replace_for(match):
item_var = match.group(1)
list_var = match.group(2)
body = match.group(3)
items = context.get(list_var, [])
parts = []
for i, item in enumerate(items):
local_ctx = {**context, item_var: item, "loop_index": i}
rendered = self._render_vars(body, local_ctx)
parts.append(rendered)
return "".join(parts)
return re.sub(pattern, replace_for, template, flags=re.DOTALL)
def _render_if(self, template: str, context: dict) -> str:
"""{% if condition %}...{% else %}...{% endif %} 条件渲染"""
# 处理 if-else
pattern = r'\{%\s*if\s+(\w+)\s*%\}(.*?)\{%\s*else\s*%\}(.*?)\{%\s*endif\s*%\}'
def replace_if_else(match):
cond_var = match.group(1)
true_body = match.group(2)
false_body = match.group(3)
value = context.get(cond_var)
return true_body if value else false_body
result = re.sub(pattern, replace_if_else, template, flags=re.DOTALL)
# 处理只有 if
pattern2 = r'\{%\s*if\s+(\w+)\s*%\}(.*?)\{%\s*endif\s*%\}'
def replace_if(match):
cond_var = match.group(1)
body = match.group(2)
value = context.get(cond_var)
return body if value else ""
return re.sub(pattern2, replace_if, result, flags=re.DOTALL)
class PromptManager:
"""Prompt 管理器:管理多个模板"""
def __init__(self):
self.templates: dict[str, PromptTemplate] = {}
def register(self, name: str, template: str):
self.templates[name] = PromptTemplate(template, name=name)
def render(self, name: str, **kwargs) -> str:
if name not in self.templates:
raise ValueError(f"模板不存在: {name}")
return self.templates[name].render(**kwargs)
# ---- 测试 ----
if __name__ == "__main__":
manager = PromptManager()
# 1. 基础变量替换
manager.register("basic", "你好,{{name}}!你来自 {{city|未知城市}}。")
print("=== 基础替换 ===")
print(manager.render("basic", name="张明"))
print(manager.render("basic", name="李华", city="上海"))
# 2. 条件渲染
manager.register("cond", """你是一个{{role|助手}}。
{% if has_tools %}
你可以使用以下工具:{{tools}}
{% endif %}
{% if verbose %}
请详细回答用户的问题。
{% else %}
请简洁回答。
{% endif %}""")
print("\n=== 条件渲染 ===")
print(manager.render("cond", role="编程助手", has_tools=True,
tools="search, calculator", verbose=False))
# 3. 循环渲染
manager.register("rag", """请根据以下参考资料回答问题。
{% for doc in documents %}
[文档{{loop_index}}] {{doc}}
{% endfor %}
问题:{{question}}""")
print("\n=== RAG 模板 ===")
docs = ["RAG 是检索增强生成技术", "向量数据库是 RAG 的核心组件", "Embedding 模型将文本映射到向量空间"]
print(manager.render("rag", documents=docs, question="什么是 RAG?"))
# 4. 嵌套对象
manager.register("agent", """Agent 配置:
名称: {{agent.name}}
模型: {{agent.model}}
温度: {{agent.temperature}}
用户输入: {{input}}""")
print("\n=== 嵌套对象 ===")
agent_config = {"name": "MyAgent", "model": "gpt-4o", "temperature": 0.7}
print(manager.render("agent", agent=agent_config, input="帮我查天气"))复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 变量替换 | O(N × V),N=模板长度,V=变量数 |
| 循环渲染 | O(N × I),I=迭代次数 |
| 空间 | O(渲染结果大小) |
追问:
- Jinja2 模板引擎相比自实现的模板引擎有哪些高级功能?在 Prompt 工程中哪些特性最实用?
- 如何实现 Prompt 版本管理?如何做 A/B 测试来比较不同 Prompt 的效果?
Transformer 与模型推理
Q12: 实现一个简单的 Transformer Attention 层(Self-Attention + Multi-Head)
难度:⭐⭐
题目描述:
实现一个简化版的 Transformer Self-Attention 层,支持:
- 单头 Self-Attention(Scaled Dot-Product Attention)
- Multi-Head Attention
解题思路:
- Scaled Dot-Product Attention:
Attention(Q, K, V) = softmax(QK^T / sqrt(d_k)) * V - Multi-Head:将 Q/K/V 线性投影到多个子空间,分别计算 Attention 后拼接
- 使用矩阵乘法高效计算
完整代码:
import numpy as np
def softmax(x: np.ndarray, axis: int = -1) -> np.ndarray:
"""数值稳定的 softmax"""
e_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
return e_x / np.sum(e_x, axis=axis, keepdims=True)
class ScaledDotProductAttention:
"""单头 Scaled Dot-Product Attention"""
def __init__(self, d_k: int):
self.d_k = d_k
self.scale = np.sqrt(d_k)
def forward(self, Q: np.ndarray, K: np.ndarray, V: np.ndarray,
mask: np.ndarray | None = None) -> np.ndarray:
"""
Args:
Q: (batch, seq_q, d_k)
K: (batch, seq_k, d_k)
V: (batch, seq_k, d_v)
mask: (batch, seq_q, seq_k), 0 表示需要屏蔽的位置
Returns:
output: (batch, seq_q, d_v)
attention_weights: (batch, seq_q, seq_k)
"""
# QK^T / sqrt(d_k)
scores = np.matmul(Q, K.transpose(0, 2, 1)) / self.scale # (batch, seq_q, seq_k)
# 应用 mask
if mask is not None:
scores = np.where(mask == 0, -1e9, scores)
# softmax
attn_weights = softmax(scores, axis=-1) # (batch, seq_q, seq_k)
# 加权求和
output = np.matmul(attn_weights, V) # (batch, seq_q, d_v)
return output, attn_weights
class MultiHeadAttention:
"""Multi-Head Attention"""
def __init__(self, d_model: int, num_heads: int):
assert d_model % num_heads == 0, "d_model 必须能被 num_heads 整除"
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
# 随机初始化权重矩阵
scale = np.sqrt(2.0 / d_model)
self.W_Q = np.random.randn(d_model, d_model) * scale
self.W_K = np.random.randn(d_model, d_model) * scale
self.W_V = np.random.randn(d_model, d_model) * scale
self.W_O = np.random.randn(d_model, d_model) * scale
self.attention = ScaledDotProductAttention(self.d_k)
def _split_heads(self, x: np.ndarray) -> np.ndarray:
"""将最后一维拆分为多个头: (batch, seq, d_model) -> (batch, num_heads, seq, d_k)"""
batch, seq, _ = x.shape
x = x.reshape(batch, seq, self.num_heads, self.d_k)
return x.transpose(0, 2, 1, 3)
def _merge_heads(self, x: np.ndarray) -> np.ndarray:
"""合并多头: (batch, num_heads, seq, d_k) -> (batch, seq, d_model)"""
batch, _, seq, _ = x.shape
x = x.transpose(0, 2, 1, 3)
return x.reshape(batch, seq, self.d_model)
def forward(self, Q: np.ndarray, K: np.ndarray, V: np.ndarray,
mask: np.ndarray | None = None) -> np.ndarray:
"""
Args:
Q, K, V: (batch, seq, d_model)
mask: (batch, seq_q, seq_k)
Returns:
output: (batch, seq, d_model)
"""
# 线性投影
Q_proj = np.matmul(Q, self.W_Q) # (batch, seq, d_model)
K_proj = np.matmul(K, self.W_K)
V_proj = np.matmul(V, self.W_V)
# 拆分多头
Q_heads = self._split_heads(Q_proj) # (batch, num_heads, seq, d_k)
K_heads = self._split_heads(K_proj)
V_heads = self._split_heads(V_proj)
# 每个头独立计算 attention
batch = Q.shape[0]
outputs = []
for h in range(self.num_heads):
out, _ = self.attention.forward(
Q_heads[:, h], K_heads[:, h], V_heads[:, h], mask
)
outputs.append(out)
# 拼接所有头
concat = np.stack(outputs, axis=1) # (batch, num_heads, seq, d_k)
merged = self._merge_heads(concat) # (batch, seq, d_model)
# 输出投影
output = np.matmul(merged, self.W_O)
return output
# ---- 测试 ----
if __name__ == "__main__":
np.random.seed(42)
batch_size, seq_len, d_model, num_heads = 2, 10, 64, 8
mha = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
# 随机输入
x = np.random.randn(batch_size, seq_len, d_model)
output = mha.forward(x, x, x) # Self-Attention: Q=K=V=x
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
assert output.shape == x.shape, "输出形状应与输入一致"
print("Multi-Head Attention 测试通过 ✓")
# 带 mask 的 Attention
mask = np.ones((batch_size, seq_len, seq_len))
mask[:, :, 5:] = 0 # 屏蔽后 5 个位置(模拟 causal mask)
output_masked = mha.forward(x, x, x, mask=mask)
print(f"带 mask 的输出形状: {output_masked.shape}")
print("带 mask 的 Multi-Head Attention 测试通过 ✓")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 时间 | O(seq² × d_model + seq × d_model²),seq² 来自 QK^T |
| 空间 | O(seq² × num_heads) 存储注意力权重 |
| 参数量 | 4 × d_model² (W_Q, W_K, W_V, W_O) |
追问:
- 为什么需要除以
sqrt(d_k)?不缩放会导致什么问题? - FlashAttention 是如何优化注意力计算的显存和速度的?
- 在实际 Transformer 中,Q、K、V 的投影是独立的还是共享的?为什么?
Q13: 实现一个简化版的 PagedAttention ⭐⭐⭐
题目描述:
PagedAttention 是 vLLM 的核心技术,通过分页管理 KV Cache 提升 GPU 显存利用率。实现一个 CPU 版的简化 PagedAttention:
- 将 KV Cache 划分为固定大小的 Block
- 通过 Block Table 管理逻辑块到物理块的映射
- 支持动态分配和释放 Block
解题思路:
- 将 KV Cache 划分为固定大小的 Block(如每块存 16 个 token 的 K/V)
- 用 Block Table 记录每个请求的逻辑块 → 物理块映射
- 分配时从空闲池取块,释放时归还
完整代码:
import numpy as np
from dataclasses import dataclass, field
@dataclass
class KVBlock:
"""一个 KV Cache Block"""
block_id: int
block_size: int # 每块存储的 token 数
num_heads: int
head_dim: int
# K: (num_heads, block_size, head_dim), V 同理
k_cache: np.ndarray = field(default=None)
v_cache: np.ndarray = field(default=None)
num_filled: int = 0 # 当前已填充的 token 数
def __post_init__(self):
if self.k_cache is None:
self.k_cache = np.zeros(
(self.num_heads, self.block_size, self.head_dim), dtype=np.float32
)
if self.v_cache is None:
self.v_cache = np.zeros(
(self.num_heads, self.block_size, self.head_dim), dtype=np.float32
)
def is_full(self) -> bool:
return self.num_filled >= self.block_size
def available_slots(self) -> int:
return self.block_size - self.num_filled
def write(self, k: np.ndarray, v: np.ndarray):
"""
写入 KV
k, v: (num_heads, seq_len, head_dim)
"""
n = k.shape[1]
end = min(self.num_filled + n, self.block_size)
actual = end - self.num_filled
self.k_cache[:, self.num_filled:end, :] = k[:, :actual, :]
self.v_cache[:, self.num_filled:end, :] = v[:, :actual, :]
self.num_filled = end
def read(self) -> tuple[np.ndarray, np.ndarray]:
"""读取已填充部分的 KV"""
return (self.k_cache[:, :self.num_filled, :],
self.v_cache[:, :self.num_filled, :])
class PagedKVCache:
"""简化版 PagedAttention KV Cache 管理器"""
def __init__(self, num_blocks: int, block_size: int,
num_heads: int, head_dim: int):
self.block_size = block_size
self.num_heads = num_heads
self.head_dim = head_dim
# 物理块池
self.blocks: dict[int, KVBlock] = {}
for i in range(num_blocks):
self.blocks[i] = KVBlock(
block_id=i, block_size=block_size,
num_heads=num_heads, head_dim=head_dim
)
self.free_blocks: list[int] = list(range(num_blocks))
# 每个请求的 Block Table: request_id -> [物理块 ID 列表]
self.block_tables: dict[str, list[int]] = {}
def allocate(self, request_id: str, num_tokens: int) -> list[int]:
"""为请求分配所需数量的块"""
num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size
if len(self.free_blocks) < num_blocks_needed:
raise MemoryError(f"显存不足:需要 {num_blocks_needed} 块,"
f"剩余 {len(self.free_blocks)} 块")
allocated = []
for _ in range(num_blocks_needed):
block_id = self.free_blocks.pop(0)
allocated.append(block_id)
self.block_tables[request_id] = allocated
return allocated
def free(self, request_id: str):
"""释放请求的所有块"""
if request_id in self.block_tables:
for block_id in self.block_tables[request_id]:
self.blocks[block_id].num_filled = 0
self.blocks[block_id].k_cache[:] = 0
self.blocks[block_id].v_cache[:] = 0
self.free_blocks.append(block_id)
del self.block_tables[request_id]
def write_kv(self, request_id: str, k: np.ndarray, v: np.ndarray):
"""
写入 KV Cache
k, v: (num_heads, seq_len, head_dim)
"""
block_ids = self.block_tables.get(request_id, [])
offset = 0
remaining = k.shape[1]
for block_id in block_ids:
block = self.blocks[block_id]
if block.is_full():
continue
available = block.available_slots()
write_len = min(remaining, available)
block.write(
k[:, offset:offset + write_len, :],
v[:, offset:offset + write_len, :]
)
offset += write_len
remaining -= write_len
if remaining <= 0:
break
def read_kv(self, request_id: str) -> tuple[np.ndarray, np.ndarray]:
"""读取请求的所有 KV Cache"""
block_ids = self.block_tables.get(request_id, [])
k_parts, v_parts = [], []
for block_id in block_ids:
k, v = self.blocks[block_id].read()
k_parts.append(k)
v_parts.append(v)
return np.concatenate(k_parts, axis=1), np.concatenate(v_parts, axis=1)
def get_block_table(self, request_id: str) -> list[int]:
return self.block_tables.get(request_id, [])
@property
def free_block_count(self) -> int:
return len(self.free_blocks)
class PagedAttention:
"""使用 PagedKVCache 的 Attention 计算"""
def __init__(self, kv_cache: PagedKVCache):
self.kv_cache = kv_cache
def forward(self, request_id: str, query: np.ndarray) -> np.ndarray:
"""
query: (num_heads, 1, head_dim) - 单个 token 的查询
返回: (num_heads, 1, head_dim) - attention 输出
"""
# 从分页 KV Cache 中读取所有 K/V
K, V = self.kv_cache.read_kv(request_id)
# 标准 Scaled Dot-Product Attention
d_k = query.shape[-1]
scores = np.matmul(query, K.transpose(0, 2, 1)) / np.sqrt(d_k)
weights = self._softmax(scores, axis=-1)
output = np.matmul(weights, V)
return output
@staticmethod
def _softmax(x: np.ndarray, axis: int = -1) -> np.ndarray:
e_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
return e_x / np.sum(e_x, axis=axis, keepdims=True)
# ---- 测试 ----
if __name__ == "__main__":
np.random.seed(42)
num_blocks = 20
block_size = 16
num_heads = 4
head_dim = 32
# 初始化
kv_cache = PagedKVCache(num_blocks, block_size, num_heads, head_dim)
paged_attn = PagedAttention(kv_cache)
# 模拟 3 个请求
req1_len = 35 # 需要 3 个块
req2_len = 10 # 需要 1 个块
req3_len = 50 # 需要 4 个块
# 分配
blocks1 = kv_cache.allocate("req1", req1_len)
blocks2 = kv_cache.allocate("req2", req2_len)
blocks3 = kv_cache.allocate("req3", req3_len)
print(f"req1 分配 {len(blocks1)} 块: {blocks1}")
print(f"req2 分配 {len(blocks2)} 块: {blocks2}")
print(f"req3 分配 {len(blocks3)} 块: {blocks3}")
print(f"剩余空闲块: {kv_cache.free_block_count}")
# 写入 KV
k1 = np.random.randn(num_heads, req1_len, head_dim)
v1 = np.random.randn(num_heads, req1_len, head_dim)
kv_cache.write_kv("req1", k1, v1)
k2 = np.random.randn(num_heads, req2_len, head_dim)
v2 = np.random.randn(num_heads, req2_len, head_dim)
kv_cache.write_kv("req2", k2, v2)
# 模拟生成阶段:逐 token 计算
for step in range(5):
q = np.random.randn(num_heads, 1, head_dim)
output = paged_attn.forward("req1", q)
# 新 token 的 KV
new_k = np.random.randn(num_heads, 1, head_dim)
new_v = np.random.randn(num_heads, 1, head_dim)
# 需要新块
if all(kv_cache.blocks[bid].is_full() for bid in kv_cache.get_block_table("req1")):
kv_cache.allocate("req1", 1) # 按需分配新块
kv_cache.write_kv("req1", new_k, new_v)
print(f"\n生成后 req1 块表: {kv_cache.get_block_table('req1')}")
k_read, v_read = kv_cache.read_kv("req1")
print(f"req1 实际存储 token 数: {k_read.shape[1]}")
# 释放
kv_cache.free("req2")
print(f"\n释放 req2 后,剩余空闲块: {kv_cache.free_block_count}")
kv_cache.free("req1")
kv_cache.free("req3")
print(f"全部释放后,剩余空闲块: {kv_cache.free_block_count}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 分配 | O(num_blocks_needed),常数时间 |
| 写入 | O(num_tokens × num_heads × head_dim) |
| 读取 | O(total_tokens × num_heads × head_dim) |
| 空间 | O(num_blocks × block_size × num_heads × head_dim),无碎片浪费 |
追问:
- 真实 vLLM 中如何处理不同请求共享相同前缀(如 System Prompt)的 KV Cache?(Prefix Caching)
- PagedAttention 相比预分配连续内存的方式,显存利用率能提升多少?为什么?
- Block Size 如何选择?过大或过小各有什么问题?
上下文管理与评估
Q14: 实现一个对话上下文压缩器(滑动窗口 + 摘要压缩) ⭐⭐
题目描述:
实现一个对话上下文压缩器,支持两种策略:
- 滑动窗口:保留最近 N 轮对话
- 摘要压缩:对旧对话生成摘要,保留摘要 + 最近 N 轮
解题思路:
- 滑动窗口:直接截取最近 N 轮
- 摘要压缩:用 LLM 将旧对话压缩为摘要,拼接摘要 + 最近对话
完整代码:
from dataclasses import dataclass, field
@dataclass
class Message:
role: str # "user" / "assistant" / "system"
content: str
class SlidingWindowCompressor:
"""滑动窗口压缩器"""
def __init__(self, max_turns: int = 10):
self.max_turns = max_turns
def compress(self, messages: list[Message]) -> list[Message]:
"""保留 system prompt + 最近 max_turns 轮对话"""
system_msgs = [m for m in messages if m.role == "system"]
chat_msgs = [m for m in messages if m.role != "system"]
# 按轮次(user+assistant)计算
max_messages = self.max_turns * 2
recent = chat_msgs[-max_messages:] if len(chat_msgs) > max_messages else chat_msgs
return system_msgs + recent
class SummaryCompressor:
"""摘要压缩器"""
def __init__(self, llm, max_recent_turns: int = 5):
self.llm = llm
self.max_recent_turns = max_recent_turns
async def compress(self, messages: list[Message],
current_summary: str = "") -> tuple[list[Message], str]:
"""
压缩对话:
1. 将旧对话与已有摘要合并生成新摘要
2. 返回 [system_prompt + 摘要消息 + 最近对话]
Returns:
(压缩后的消息列表, 新的摘要文本)
"""
system_msgs = [m for m in messages if m.role == "system"]
chat_msgs = [m for m in messages if m.role != "system"]
max_recent = self.max_recent_turns * 2
if len(chat_msgs) <= max_recent:
return messages, current_summary
old_msgs = chat_msgs[:-max_recent]
recent_msgs = chat_msgs[-max_recent:]
# 生成摘要
old_text = "\n".join(f"[{m.role}] {m.content}" for m in old_msgs)
prompt = f"""请将以下对话压缩为简洁摘要,保留关键信息(用户需求、重要结论、待办事项)。
已有摘要:{current_summary or '(无)'}
旧对话:
{old_text}
请用 3-5 句话输出摘要:"""
new_summary = await self.llm.generate(prompt)
# 构建压缩后的消息列表
result = system_msgs + [
Message(role="system", content=f"[对话摘要] {new_summary}")
] + recent_msgs
return result, new_summary
class HybridCompressor:
"""混合压缩器:先滑动窗口,再按需摘要"""
def __init__(self, llm, window_size: int = 10, summary_threshold: int = 20):
self.window = SlidingWindowCompressor(max_turns=window_size)
self.summary = SummaryCompressor(llm, max_recent_turns=window_size)
self.summary_threshold = summary_threshold
self._summaries: dict[str, str] = {} # session_id -> summary
async def compress(self, session_id: str,
messages: list[Message]) -> list[Message]:
"""智能压缩:短对话用窗口,长对话用摘要"""
chat_count = sum(1 for m in messages if m.role != "system")
if chat_count <= self.summary_threshold:
# 短对话:滑动窗口
return self.window.compress(messages)
else:
# 长对话:摘要压缩
current_summary = self._summaries.get(session_id, "")
compressed, new_summary = await self.summary.compress(
messages, current_summary
)
self._summaries[session_id] = new_summary
return compressed
# ---- 测试 ----
if __name__ == "__main__":
import asyncio
class MockLLM:
async def generate(self, prompt: str) -> str:
return "[摘要] 用户讨论了天气查询、机票预订和旅行计划。已确认北京出发。"
async def test():
# 构造长对话
messages = [
Message("system", "你是一个旅行助手"),
Message("user", "你好,我想查一下北京的天气"),
Message("assistant", "北京今天晴,气温15-25°C"),
Message("user", "好的,帮我查一下去上海的机票"),
Message("assistant", "找到3个航班:东航MU5101、国航CA1501..."),
Message("user", "选东航的吧"),
Message("assistant", "好的,已为您选择东航MU5101"),
Message("user", "那帮我订个酒店"),
Message("assistant", "上海浦东有以下酒店推荐..."),
Message("user", "就选第一个吧"),
Message("assistant", "好的,已为您预订"),
Message("user", "谢谢"),
Message("assistant", "不客气,祝您旅途愉快!"),
]
# 测试滑动窗口
window = SlidingWindowCompressor(max_turns=3)
windowed = window.compress(messages)
print(f"=== 滑动窗口 (保留最近3轮) ===")
print(f"原始消息数: {len(messages)}")
print(f"压缩后消息数: {len(windowed)}")
for m in windowed:
print(f" [{m.role}] {m.content[:50]}")
# 测试摘要压缩
llm = MockLLM()
summarizer = SummaryCompressor(llm, max_recent_turns=2)
summarized, summary = await summarizer.compress(messages)
print(f"\n=== 摘要压缩 (保留最近2轮) ===")
print(f"压缩后消息数: {len(summarized)}")
for m in summarized:
print(f" [{m.role}] {m.content[:60]}")
asyncio.run(test())复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 滑动窗口 | O(max_turns),截取操作 |
| 摘要压缩 | O(N),N=旧对话长度(主要开销在 LLM 调用) |
| 空间 | O(max_turns) 或 O(摘要长度 + max_recent_turns) |
追问:
- 摘要压缩时如何保证关键信息不丢失?有哪些改进策略?
- 如何处理对话中的结构化信息(如工具调用结果)?直接摘要可能丢失哪些信息?
- 在生产环境中,如何确定最佳的窗口大小和摘要触发阈值?
Q15: 实现一个简单的 RAG 评估器(计算 Faithfulness、Relevancy) ⭐⭐
题目描述:
实现一个 RAG 系统评估器,计算两个核心指标:
- Faithfulness(忠实度):回答是否忠实于检索到的上下文
- Answer Relevancy(回答相关性):回答是否与问题相关
解题思路:
- Faithfulness:将回答拆分为声明(claims),检查每个声明是否能从上下文中推导
- Answer Relevancy:从回答中反向生成问题,计算与原始问题的相似度
完整代码:
import numpy as np
from dataclasses import dataclass
@dataclass
class RAGEvaluationResult:
faithfulness: float # 0-1, 回答是否忠实于上下文
answer_relevancy: float # 0-1, 回答是否与问题相关
faithfulness_details: list # 每个声明的检查结果
answer_relevancy_details: list # 每个生成问题的相似度
class RAGEvaluator:
"""RAG 系统评估器"""
def __init__(self, llm, embedder):
self.llm = llm
self.embedder = embedder
async def evaluate(self, question: str, context: str,
answer: str) -> RAGEvaluationResult:
"""完整的 RAG 评估"""
faith_score, faith_details = await self._evaluate_faithfulness(context, answer)
relevancy_score, relevancy_details = await self._evaluate_relevancy(question, answer)
return RAGEvaluationResult(
faithfulness=faith_score,
answer_relevancy=relevancy_score,
faithfulness_details=faith_details,
answer_relevancy_details=relevancy_details,
)
async def _evaluate_faithfulness(self, context: str,
answer: str) -> tuple[float, list]:
"""
评估忠实度:
1. 将回答拆分为独立声明
2. 检查每个声明是否能从上下文推导
"""
# Step 1: 拆分声明
claims = await self._extract_claims(answer)
# Step 2: 逐个验证
details = []
supported_count = 0
for claim in claims:
is_supported = await self._check_claim_support(context, claim)
details.append({
"claim": claim,
"supported": is_supported,
})
if is_supported:
supported_count += 1
score = supported_count / len(claims) if claims else 0.0
return score, details
async def _extract_claims(self, answer: str) -> list[str]:
"""将回答拆分为独立的声明/事实"""
prompt = f"""请将以下回答拆分为独立的事实性声明,每行一个声明。
回答:{answer}
声明列表(每行一个):"""
result = await self.llm.generate(prompt)
claims = [line.strip() for line in result.strip().split("\n") if line.strip()]
return claims
async def _check_claim_support(self, context: str, claim: str) -> bool:
"""检查声明是否能从上下文推导"""
prompt = f"""请判断以下声明是否能从给定的上下文中推导出来。
上下文:{context}
声明:{claim}
如果声明能从上下文推导,请回答 "YES";否则回答 "NO"。
只回答 YES 或 NO:"""
result = await self.llm.generate(prompt)
return "YES" in result.upper()
async def _evaluate_relevancy(self, question: str,
answer: str) -> tuple[float, list]:
"""
评估回答相关性:
1. 从回答中生成 N 个可能的问题
2. 计算这些问题与原始问题的平均相似度
"""
# 从回答反向生成问题
gen_questions = await self._generate_questions(answer, n=3)
# 计算相似度
q_emb = self.embedder.encode(question)
similarities = []
details = []
for gq in gen_questions:
gq_emb = self.embedder.encode(gq)
sim = self._cosine_similarity(q_emb, gq_emb)
similarities.append(sim)
details.append({"generated_question": gq, "similarity": sim})
score = np.mean(similarities) if similarities else 0.0
return float(score), details
async def _generate_questions(self, answer: str, n: int = 3) -> list[str]:
"""从回答中生成可能的问题"""
prompt = f"""根据以下回答,生成 {n} 个可能引出该回答的问题,每行一个。
回答:{answer}
可能的问题(每行一个):"""
result = await self.llm.generate(prompt)
questions = [line.strip() for line in result.strip().split("\n") if line.strip()]
return questions[:n]
@staticmethod
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
dot = np.dot(a, b)
norm = np.linalg.norm(a) * np.linalg.norm(b)
return float(dot / norm) if norm > 0 else 0.0
# ---- 测试 ----
if __name__ == "__main__":
import asyncio
class MockLLM:
async def generate(self, prompt: str) -> str:
if "拆分为独立" in prompt:
return "RAG 代表检索增强生成\nRAG 可以减少幻觉\nRAG 使用向量数据库检索"
elif "是否能从" in prompt:
return "YES" if "检索" in prompt else "NO"
elif "可能引出" in prompt:
return "什么是 RAG?\nRAG 有什么优势?\nRAG 如何工作?"
return ""
class MockEmbedder:
def encode(self, text: str) -> np.ndarray:
np.random.seed(hash(text) % (2**31))
return np.random.randn(128)
async def test():
evaluator = RAGEvaluator(MockLLM(), MockEmbedder())
question = "什么是 RAG?它有什么优势?"
context = "RAG(检索增强生成)是一种结合检索和生成的技术,可以减少幻觉。"
answer = "RAG 代表检索增强生成,它可以减少幻觉,并使用向量数据库进行检索。"
result = await evaluator.evaluate(question, context, answer)
print(f"Faithfulness: {result.faithfulness:.2f}")
print(f"Answer Relevancy: {result.answer_relevancy:.2f}")
print(f"\nFaithfulness 详情:")
for d in result.faithfulness_details:
status = "✓" if d["supported"] else "✗"
print(f" {status} {d['claim']}")
print(f"\nRelevancy 详情:")
for d in result.answer_relevancy_details:
print(f" Q: {d['generated_question']} -> sim={d['similarity']:.3f}")
asyncio.run(test())复杂度分析:
| 维度 | 复杂度 |
|---|---|
| Faithfulness | O(C × L),C=声明数,L=LLM 调用开销 |
| Relevancy | O(N × D),N=生成问题数,D=Embedding 维度 |
| 空间 | O(context_length + answer_length) |
追问:
- RAGAS 框架中还有哪些评估指标?它们分别评估什么?
- LLM-as-Judge 的评估方式有哪些偏差?如何缓解?
- 如何构建一个可靠的评估数据集(Golden Dataset)?
Q16: 实现一个简化版的 LoRA 层 ⭐⭐⭐
题目描述:
实现 LoRA(Low-Rank Adaptation)的核心机制:
- 冻结原始权重,添加低秩适配器
- 前向传播:
output = W @ x + (A @ B) @ x - 支持 LoRA 的创建、前向计算和参数统计
解题思路:
- 原始权重
W不参与梯度更新 - 添加两个小矩阵
A (d, r)和B (r, d),其中r << d - 训练时只更新 A 和 B,推理时可以合并到 W
完整代码:
import numpy as np
from dataclasses import dataclass
@dataclass
class LoRAConfig:
rank: int = 8 # 低秩维度
alpha: float = 16.0 # 缩放因子
dropout: float = 0.0 # Dropout 概率
@property
def scaling(self) -> float:
return self.alpha / self.rank
class LoRALinear:
"""带 LoRA 适配器的线性层"""
def __init__(self, in_features: int, out_features: int,
config: LoRAConfig = LoRAConfig()):
self.in_features = in_features
self.out_features = out_features
self.config = config
# 原始权重(冻结)
self.W = np.random.randn(out_features, in_features) * np.sqrt(2.0 / in_features)
self.W_frozen = True
# LoRA 适配器
self.A = np.random.randn(out_features, config.rank) * 0.01
self.B = np.zeros((config.rank, in_features)) # B 初始化为零
# 梯度
self.dA = np.zeros_like(self.A)
self.dB = np.zeros_like(self.B)
def forward(self, x: np.ndarray) -> np.ndarray:
"""
前向传播: output = W @ x + scaling * (A @ B) @ x
Args:
x: (in_features,) 或 (batch, in_features)
Returns:
output: (out_features,) 或 (batch, out_features)
"""
# 原始权重输出
base_output = x @ self.W.T
# LoRA 输出
lora_output = x @ (self.A @ self.B).T * self.config.scaling
return base_output + lora_output
def merge_weights(self) -> np.ndarray:
"""将 LoRA 权重合并到原始权重(推理时使用)"""
return self.W + self.A @ self.B * self.config.scaling
def count_parameters(self) -> dict:
"""统计参数量"""
original = self.in_features * self.out_features
lora = self.A.size + self.B.size
return {
"original_params": original,
"lora_params": lora,
"trainable_ratio": lora / original,
"compression_ratio": original / lora,
}
def backward(self, x: np.ndarray, grad_output: np.ndarray):
"""
反向传播(简化版,只计算 LoRA 梯度)
Args:
x: (batch, in_features)
grad_output: (batch, out_features)
"""
batch_size = x.shape[0]
# dB = (grad_output @ A).T @ x / batch_size * scaling
self.dB = (grad_output @ self.A).T @ x / batch_size * self.config.scaling
# dA = grad_output.T @ (x @ B.T) / batch_size * scaling
self.dA = grad_output.T @ (x @ self.B.T) / batch_size * self.config.scaling
# 返回对输入的梯度(用于上游传播)
grad_x = (grad_output @ (self.A @ self.B)) * self.config.scaling
if not self.W_frozen:
grad_x += grad_output @ self.W
else:
grad_x += grad_output @ self.W # 不更新 W,但梯度仍需传播
return grad_x
def step(self, lr: float = 0.001):
"""参数更新(只更新 LoRA 参数)"""
self.A -= lr * self.dA
self.B -= lr * self.dB
class LoRAModel:
"""带 LoRA 的简单模型(2层 MLP)"""
def __init__(self, input_dim: int, hidden_dim: int, output_dim: int,
lora_config: LoRAConfig = LoRAConfig()):
self.layer1 = LoRALinear(input_dim, hidden_dim, lora_config)
self.layer2 = LoRALinear(hidden_dim, output_dim, lora_config)
def forward(self, x: np.ndarray) -> np.ndarray:
h = self.layer1.forward(x)
h = np.maximum(h, 0) # ReLU
return self.layer2.forward(h)
def total_params(self) -> dict:
p1 = self.layer1.count_parameters()
p2 = self.layer2.count_parameters()
return {
"total_original": p1["original_params"] + p2["original_params"],
"total_lora": p1["lora_params"] + p2["lora_params"],
"trainable_ratio": (p1["lora_params"] + p2["lora_params"]) /
(p1["original_params"] + p2["original_params"]),
}
# ---- 测试 ----
if __name__ == "__main__":
np.random.seed(42)
# 创建 LoRA 线性层
config = LoRAConfig(rank=8, alpha=16)
layer = LoRALinear(in_features=256, out_features=256, config=config)
# 参数统计
params = layer.count_parameters()
print("=== LoRA 线性层参数统计 ===")
print(f"原始参数量: {params['original_params']:,}")
print(f"LoRA 参数量: {params['lora_params']:,}")
print(f"可训练比例: {params['trainable_ratio']:.2%}")
print(f"压缩比: {params['compression_ratio']:.1f}x")
# 前向传播
x = np.random.randn(4, 256) # batch=4
output = layer.forward(x)
print(f"\n输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
# 权重合并
merged_W = layer.merge_weights()
merged_output = x @ merged_W.T
diff = np.abs(output - merged_output).max()
print(f"\n合并权重后最大误差: {diff:.10f}")
assert diff < 1e-10, "合并权重应产生相同结果"
# 完整模型
model = LoRAModel(256, 512, 10, config)
total = model.total_params()
print(f"\n=== 完整模型 ===")
print(f"总原始参数: {total['total_original']:,}")
print(f"总 LoRA 参数: {total['total_lora']:,}")
print(f"可训练比例: {total['trainable_ratio']:.2%}")
# 前向传播
model_output = model.forward(x)
print(f"模型输出形状: {model_output.shape}")复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 前向传播 | O(d² + d×r×2),与原线性层相比多 O(d×r) |
| 可训练参数 | 2×d×r vs 原始 d²,当 r=8, d=4096 时仅 0.4% |
| 空间 | 额外 2×d×r 存储 A 和 B |
追问:
- LoRA 的 rank 和 alpha 如何选择?过大或过小各有什么影响?
- QLoRA 是如何在 LoRA 基础上进一步减少显存占用的?
- LoRA 可以应用在 Transformer 的哪些层?效果有什么差异?
Prompt 工程与 Agent
Q17: 实现一个 Prompt 模板引擎(支持变量替换、条件分支、循环) ⭐⭐
题目描述:
实现一个 Prompt 模板引擎,支持:
- 变量替换:
{{variable}},支持默认值{{variable|default}} - 条件分支:
{% if condition %}...{% else %}...{% endif %} - 循环:
{% for item in list %}...{% endfor %}
解题思路:
- 变量替换:正则匹配
{{...}},从上下文中取值 - 条件分支:正则匹配
{% if %},根据变量值判断 - 循环:正则匹配
{% for %},迭代列表并渲染
完整代码:
import re
from typing import Any
class PromptTemplateEngine:
"""Prompt 模板引擎"""
def __init__(self):
self._templates: dict[str, str] = {}
def register(self, name: str, template: str):
"""注册模板"""
self._templates[name] = template
def render(self, name: str, **kwargs: Any) -> str:
"""渲染模板"""
template = self._templates.get(name)
if template is None:
raise ValueError(f"模板 '{name}' 不存在")
return self._render(template, kwargs)
def _render(self, template: str, context: dict) -> str:
"""渲染模板字符串"""
# 1. 处理循环
template = self._process_loops(template, context)
# 2. 处理条件分支
template = self._process_conditionals(template, context)
# 3. 变量替换
template = self._substitute_variables(template, context)
return template.strip()
def _substitute_variables(self, template: str, context: dict) -> str:
"""变量替换: {{var}} 或 {{var|default}}"""
def replacer(match):
expr = match.group(1).strip()
parts = expr.split("|", 1)
var_name = parts[0].strip()
default = parts[1].strip() if len(parts) > 1 else ""
value = self._resolve(var_name, context)
if value is not None:
return str(value)
return default
return re.sub(r"\{\{(.+?)\}\}", replacer, template)
def _resolve(self, path: str, context: dict) -> Any:
"""解析嵌套变量路径,如 agent.name"""
parts = path.split(".")
current = context
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
def _process_conditionals(self, template: str, context: dict) -> str:
"""处理条件分支: {% if %}...{% else %}...{% endif %}"""
pattern = r"\{%\s*if\s+(\w+)\s%\}(.*?)(?:\{%\s*else\s%\}(.*?))?\{%\s*endif\s%\}"
def replacer(match):
var_name = match.group(1)
true_block = match.group(2)
false_block = match.group(3) or ""
value = context.get(var_name)
if value and value not in (False, 0, "", None, []):
return true_block
return false_block
# 循环处理嵌套条件
while "{% if " in template:
new_template = re.sub(pattern, replacer, template, flags=re.DOTALL)
if new_template == template:
break
template = new_template
return template
def _process_loops(self, template: str, context: dict) -> str:
"""处理循环: {% for item in list %}...{% endfor %}"""
pattern = r"\{%\s*for\s+(\w+)\s+in\s+(\w+)\s%\}(.*?)\{%\s*endfor\s%\}"
def replacer(match):
item_name = match.group(1)
list_name = match.group(2)
body = match.group(3)
items = context.get(list_name, [])
if not isinstance(items, (list, tuple)):
return ""
result_parts = []
for i, item in enumerate(items):
loop_context = {**context, item_name: item, "loop_index": i + 1}
rendered = self._render(body, loop_context)
result_parts.append(rendered)
return "\n".join(result_parts)
while "{% for " in template:
new_template = re.sub(pattern, replacer, template, flags=re.DOTALL)
if new_template == template:
break
template = new_template
return template
# ---- 测试 ----
if __name__ == "__main__":
engine = PromptTemplateEngine()
# 1. 基础变量替换
engine.register("basic", "你好,{{name}}!你来自 {{city|未知城市}}。")
print("=== 基础替换 ===")
print(engine.render("basic", name="张明"))
print(engine.render("basic", name="李华", city="上海"))
# 2. 条件渲染
engine.register("cond", """你是一个{{role|助手}}。
{% if has_tools %}
你可以使用以下工具:{{tools}}
{% endif %}
{% if verbose %}
请详细回答用户的问题。
{% else %}
请简洁回答。
{% endif %}""")
print("\n=== 条件渲染 ===")
print(engine.render("cond", role="编程助手", has_tools=True,
tools="search, calculator", verbose=False))
# 3. 循环渲染
engine.register("rag", """请根据以下参考资料回答问题。
{% for doc in documents %}
[文档{{loop_index}}] {{doc}}
{% endfor %}
问题:{{question}}""")
print("\n=== RAG 模板 ===")
docs = ["RAG 是检索增强生成技术", "向量数据库是 RAG 的核心组件", "Embedding 模型将文本映射到向量空间"]
print(engine.render("rag", documents=docs, question="什么是 RAG?"))
# 4. 嵌套对象
engine.register("agent", """Agent 配置:
名称: {{agent.name}}
模型: {{agent.model}}
温度: {{agent.temperature}}
用户输入: {{input}}""")
print("\n=== 嵌套对象 ===")
agent_config = {"name": "MyAgent", "model": "gpt-4o", "temperature": 0.7}
print(engine.render("agent", agent=agent_config, input="帮我查天气"))复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 变量替换 | O(N × V),N=模板长度,V=变量数 |
| 循环渲染 | O(N × I),I=迭代次数 |
| 空间 | O(渲染结果大小) |
追问:
- Jinja2 模板引擎相比自实现的模板引擎有哪些高级功能?在 Prompt 工程中哪些特性最实用?
- 如何实现 Prompt 版本管理?如何做 A/B 测试来比较不同 Prompt 的效果?
Q18: 实现一个简化版的 Agent ReAct 循环(带工具调用和错误处理) ⭐⭐⭐
题目描述:
实现 ReAct(Reasoning + Acting)Agent 循环:
- Thought:LLM 进行推理,决定下一步行动
- Action:调用工具执行操作
- Observation:获取工具执行结果
- 循环直到得到最终答案或达到最大步数
解题思路:
- 维护对话历史(Thought/Action/Observation 三元组)
- 每轮让 LLM 生成 Thought 和 Action
- 解析 Action,调用对应工具
- 将 Observation 拼入上下文,继续下一轮
完整代码:
import json
import re
import traceback
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable
@dataclass
class Tool:
name: str
description: str
func: Callable
parameters: dict = field(default_factory=dict)
def execute(self, **kwargs) -> str:
try:
return str(self.func(**kwargs))
except Exception as e:
return f"工具执行错误: {type(e).__name__}: {e}"
@dataclass
class ReActStep:
step_number: int
thought: str
action: str | None = None
action_input: dict | None = None
observation: str | None = None
is_final: bool = False
error: str | None = None
class ReActAgent:
"""ReAct Agent 循环"""
def __init__(self, llm, tools: list[Tool], max_steps: int = 10,
max_retries: int = 2):
self.llm = llm
self.tools = {t.name: t for t in tools}
self.max_steps = max_steps
self.max_retries = max_retries
async def run(self, query: str) -> dict:
"""
运行 ReAct 循环
Returns: {"answer": str, "steps": list[ReActStep], "success": bool}
"""
steps: list[ReActStep] = []
history: list[dict] = []
for step_num in range(1, self.max_steps + 1):
# 1. LLM 生成 Thought + Action
llm_output = await self._think(query, history)
# 2. 解析输出
parsed = self._parse_output(llm_output)
step = ReActStep(
step_number=step_num,
thought=parsed.get("thought", ""),
action=parsed.get("action"),
action_input=parsed.get("action_input", {}),
is_final=parsed.get("action") is None,
)
# 3. 如果是最终答案
if step.is_final:
step.observation = parsed.get("answer", "")
steps.append(step)
return {
"answer": step.observation,
"steps": steps,
"success": True,
"total_steps": step_num,
}
# 4. 执行工具(带重试)
observation = await self._execute_with_retry(step)
step.observation = observation
steps.append(step)
# 5. 更新历史
history.append({
"step": step_num,
"thought": step.thought,
"action": step.action,
"action_input": step.action_input,
"observation": observation,
})
# 达到最大步数
return {
"answer": f"达到最大步数 {self.max_steps},未能得出最终答案。",
"steps": steps,
"success": False,
"total_steps": len(steps),
}
async def _think(self, query: str, history: list[dict]) -> str:
"""让 LLM 推理并决定下一步"""
tools_desc = "\n".join(
f"- {t.name}: {t.description}" for t in self.tools.values()
)
history_text = ""
for h in history:
history_text += f"Step {h['step']}:\n"
history_text += f" Thought: {h['thought']}\n"
history_text += f" Action: {h['action']}({h['action_input']})\n"
history_text += f" Observation: {h['observation']}\n\n"
prompt = f"""你是一个智能助手,使用 ReAct 模式回答问题。
可用工具:
{tools_desc}
请按以下格式输出:
Thought: [你的推理过程]
Action: [工具名称]
Action Input: {{"参数名": "参数值"}}
如果你已经有足够信息回答问题,直接输出:
Thought: [总结推理过程]
Answer: [最终答案]
问题:{query}
{history_text}"""
return await self.llm.generate(prompt)
def _parse_output(self, text: str) -> dict:
"""解析 LLM 输出"""
result = {}
# 提取 Thought
thought_match = re.search(r"Thought:\s*(.+?)(?=\n(?:Action|Answer):|\Z)",
text, re.DOTALL)
if thought_match:
result["thought"] = thought_match.group(1).strip()
# 提取 Answer(最终答案)
answer_match = re.search(r"Answer:\s*(.+)", text, re.DOTALL)
if answer_match:
result["answer"] = answer_match.group(1).strip()
return result
# 提取 Action
action_match = re.search(r"Action:\s*(\w+)", text)
if action_match:
result["action"] = action_match.group(1).strip()
# 提取 Action Input
input_match = re.search(r"Action Input:\s*(\{.+?\})", text, re.DOTALL)
if input_match:
try:
result["action_input"] = json.loads(input_match.group(1))
except json.JSONDecodeError:
result["action_input"] = {}
return result
async def _execute_with_retry(self, step: ReActStep) -> str:
"""带重试的工具执行"""
tool = self.tools.get(step.action)
if tool is None:
return f"错误:工具 '{step.action}' 不存在。可用工具:{list(self.tools.keys())}"
for attempt in range(self.max_retries + 1):
try:
result = tool.execute(**(step.action_input or {}))
if "错误" not in result or attempt == self.max_retries:
return result
# 如果是错误且还有重试机会
step.error = result
except Exception as e:
if attempt == self.max_retries:
return f"工具执行失败(已重试{self.max_retries}次): {e}"
step.error = str(e)
return "工具执行失败"
# ---- 测试 ----
if __name__ == "__main__":
import asyncio
import math
# 定义工具
def calculator(expression: str) -> str:
"""计算数学表达式"""
allowed = set("0123456789+-*/.() ")
if not all(c in allowed for c in expression):
return "错误:不支持的字符"
return str(eval(expression))
def search(query: str) -> str:
"""模拟搜索"""
results = {
"北京天气": "北京今天晴,气温 15-25°C,空气质量良",
"上海天气": "上海今天多云,气温 18-28°C,有轻微雾霾",
"RAG": "RAG(检索增强生成)是结合信息检索和文本生成的AI技术",
}
for key, value in results.items():
if key in query:
return value
return f"未找到关于'{query}'的结果"
def get_time(timezone: str = "Asia/Shanghai") -> str:
"""获取当前时间"""
return f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ({timezone})"
tools = [
Tool(name="calculator", description="计算数学表达式。输入: expression (数学表达式字符串)",
func=calculator, parameters={"expression": "str"}),
Tool(name="search", description="搜索信息。输入: query (搜索关键词)",
func=search, parameters={"query": "str"}),
Tool(name="get_time", description="获取当前时间。输入: timezone (时区名称)",
func=get_time, parameters={"timezone": "str"}),
]
# Mock LLM
class MockLLM:
def __init__(self):
self.call_count = 0
async def generate(self, prompt: str) -> str:
self.call_count += 1
if self.call_count == 1:
return """Thought: 用户想知道北京天气和当前时间。我需要先搜索天气信息。
Action: search
Action Input: {"query": "北京天气"}"""
elif self.call_count == 2:
return """Thought: 已经获取到天气信息,现在获取当前时间。
Action: get_time
Action Input: {"timezone": "Asia/Shanghai"}"""
else:
return """Thought: 已经获取到所有信息,可以回答用户了。
Answer: 北京今天天气晴朗,气温 15-25°C,空气质量良好。当前时间为 2024-01-15 14:30:00 (北京时间)。建议穿着轻便外套出行。"""
async def test():
agent = ReActAgent(
llm=MockLLM(),
tools=tools,
max_steps=5,
max_retries=2,
)
result = await agent.run("北京今天天气怎么样?现在几点了?")
print("=== ReAct Agent 执行结果 ===")
print(f"成功: {result['success']}")
print(f"总步数: {result['total_steps']}")
print(f"\n最终答案:\n{result['answer']}")
print(f"\n=== 执行步骤 ===")
for step in result["steps"]:
print(f"\nStep {step.step_number}:")
print(f" Thought: {step.thought}")
if step.action:
print(f" Action: {step.action}({step.action_input})")
if step.observation:
obs = step.observation[:100] + "..." if len(step.observation) > 100 else step.observation
print(f" Observation: {obs}")
if step.is_final:
print(f" [最终答案]")
asyncio.run(test())复杂度分析:
| 维度 | 复杂度 |
|---|---|
| 时间 | O(S × L),S=步数,L=LLM 调用延迟 |
| 空间 | O(S × H),H=每步历史大小 |
| Token 消耗 | O(S × context_length),随步数线性增长 |
追问:
- ReAct 模式与 Plan-and-Execute 模式有什么区别?各适用于什么场景?
- 如何处理 Agent 的幻觉问题?(如调用不存在的工具、参数错误等)
- 如何实现 Agent 的并行工具调用?与串行调用相比有什么优缺点?