Skip to content

22. 算法与编码题

面向大模型应用工程师/Agent 开发工程师的编码面试题,覆盖基础算法、RAG、Agent、模型推理、系统设计五大方向。每题均提供完整可运行的 Python 代码。


基础算法


Q1: 实现 BPE(Byte Pair Encoding)分词算法

难度:⭐⭐

题目描述:

实现一个简化版的 BPE 分词算法。给定一个训练语料,学习合并规则,然后用学到的词表对新文本进行分词。

解题思路:

  1. 将训练文本拆分为字符序列(每个词以特殊结束符 </w> 结尾)
  2. 统计所有相邻 token 对的频率
  3. 选择频率最高的 pair 合并为一个新 token
  4. 重复指定次数的合并,得到最终词表
  5. 用学到的合并规则对新文本分词

完整代码:

python
from collections import Counter, defaultdict


class BPETokenizer:
    """简化版 BPE 分词器"""

    def __init__(self):
        self.merges = []  # 合并规则列表,按顺序应用
        self.vocab = set()

    def _get_pair_freqs(self, word_freqs: dict) -> Counter:
        """统计所有相邻 token 对的频率"""
        pairs = Counter()
        for tokens, freq in word_freqs.items():
            for i in range(len(tokens) - 1):
                pairs[(tokens[i], tokens[i + 1])] += freq
        return pairs

    def _merge_pair(self, pair: tuple, word_freqs: dict) -> dict:
        """将指定 pair 在所有词中合并"""
        new_word_freqs = {}
        bigram = pair
        replacement = pair[0] + pair[1]
        for tokens, freq in word_freqs.items():
            new_tokens = []
            i = 0
            while i < len(tokens):
                if i < len(tokens) - 1 and tokens[i] == bigram[0] and tokens[i + 1] == bigram[1]:
                    new_tokens.append(replacement)
                    i += 2
                else:
                    new_tokens.append(tokens[i])
                    i += 1
            new_word_freqs[tuple(new_tokens)] = freq
        return new_word_freqs

    def train(self, corpus: list[str], num_merges: int = 10):
        """
        训练 BPE
        Args:
            corpus: 文本列表,如 ["low lower lowest"]
            num_merges: 合并次数
        """
        # 统计词频,每个词拆成字符序列加结束符
        word_freqs = Counter()
        for text in corpus:
            for word in text.strip().split():
                chars = tuple(list(word) + ["</w>"])
                word_freqs[chars] += 1

        # 初始化词表
        self.vocab = set()
        for tokens in word_freqs:
            self.vocab.update(tokens)

        # 迭代合并
        for _ in range(num_merges):
            pair_freqs = self._get_pair_freqs(word_freqs)
            if not pair_freqs:
                break
            best_pair = pair_freqs.most_common(1)[0][0]
            self.merges.append(best_pair)
            word_freqs = self._merge_pair(best_pair, word_freqs)
            # 新 token 加入词表
            self.vocab.add(best_pair[0] + best_pair[1])

    def tokenize(self, text: str) -> list[str]:
        """用学到的合并规则对文本分词"""
        all_tokens = []
        for word in text.strip().split():
            tokens = list(word) + ["</w>"]
            for pair in self.merges:
                new_tokens = []
                i = 0
                while i < len(tokens):
                    if i < len(tokens) - 1 and tokens[i] == pair[0] and tokens[i + 1] == pair[1]:
                        new_tokens.append(pair[0] + pair[1])
                        i += 2
                    else:
                        new_tokens.append(tokens[i])
                        i += 1
                tokens = new_tokens
            all_tokens.extend(tokens)
        return all_tokens


# ---- 测试 ----
if __name__ == "__main__":
    corpus = [
        "low low low low low",
        "lower lower lower",
        "newest newest newest newest",
        "widest widest widest",
    ]
    tokenizer = BPETokenizer()
    tokenizer.train(corpus, num_merges=10)
    print("合并规则:", tokenizer.merges)
    print("词表大小:", len(tokenizer.vocab))
    print("分词结果:", tokenizer.tokenize("low lower newest"))

复杂度分析:

维度复杂度
训练时间O(N × M × num_merges),N=词数,M=平均词长
推理时间O(T × num_merges),T=输入 token 数
空间O(V),V=词表大小

追问:

  1. 真实的 BPE 实现(如 GPT-2 的 tiktoken)如何处理 UTF-8 字节级别的编码?字节级 BPE 有什么优势?
  2. SentencePiece 使用的 Unigram 模型与 BPE 的核心区别是什么?各自在什么场景下更优?

Q2: 实现一个简单的向量相似度搜索(余弦相似度 + 暴力搜索)

难度:⭐

题目描述:

实现一个简易的向量数据库,支持添加向量和基于余弦相似度的 Top-K 搜索。

解题思路:

  1. 存储向量及其关联的文本/元数据
  2. 余弦相似度 = (A·B) / (‖A‖ × ‖B‖)
  3. 暴力遍历所有向量计算相似度,取 Top-K

完整代码:

python
import numpy as np
from dataclasses import dataclass, field


@dataclass
class VectorRecord:
    id: int
    vector: np.ndarray
    text: str
    metadata: dict = field(default_factory=dict)


class SimpleVectorStore:
    """暴力搜索的简单向量数据库"""

    def __init__(self):
        self.records: list[VectorRecord] = []
        self._next_id = 0

    @staticmethod
    def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
        """计算余弦相似度"""
        dot = np.dot(a, b)
        norm = np.linalg.norm(a) * np.linalg.norm(b)
        if norm == 0:
            return 0.0
        return dot / norm

    def add(self, vectors: list[np.ndarray], texts: list[str],
            metadatas: list[dict] | None = None):
        """批量添加向量"""
        for i, (vec, txt) in enumerate(zip(vectors, texts)):
            meta = metadatas[i] if metadatas else {}
            self.records.append(VectorRecord(
                id=self._next_id, vector=vec, text=txt, metadata=meta
            ))
            self._next_id += 1

    def search(self, query: np.ndarray, top_k: int = 3) -> list[dict]:
        """返回与 query 最相似的 top_k 条记录"""
        scored = []
        for rec in self.records:
            sim = self._cosine_similarity(query, rec.vector)
            scored.append({
                "id": rec.id,
                "text": rec.text,
                "score": sim,
                "metadata": rec.metadata,
            })
        scored.sort(key=lambda x: x["score"], reverse=True)
        return scored[:top_k]


# ---- 测试 ----
if __name__ == "__main__":
    np.random.seed(42)
    dim = 128
    store = SimpleVectorStore()

    # 添加一些随机向量
    texts = ["机器学习基础", "深度学习进阶", "自然语言处理", "计算机视觉", "强化学习"]
    vectors = [np.random.randn(dim) for _ in texts]
    store.add(vectors, texts)

    # 查询
    query = np.random.randn(dim)
    results = store.search(query, top_k=3)
    for r in results:
        print(f"[{r['score']:.4f}] {r['text']}")

复杂度分析:

维度复杂度
添加O(1) per vector
搜索O(N × D),N=向量数,D=维度
空间O(N × D)

追问:

  1. 当向量规模达到百万级时,暴力搜索不可行,有哪些加速方案?(IVF、HNSW、PQ 等)
  2. 余弦相似度和欧氏距离在什么场景下等价?L2 归一化后两者的关系是什么?

Q3: 实现 HNSW(Hierarchical Navigable Small World)索引的简化版

难度:⭐⭐⭐

题目描述:

实现一个简化版的 HNSW 索引,支持插入向量和近似最近邻搜索。要求包含多层图结构和贪心搜索逻辑。

解题思路:

  1. HNSW 是多层的 NSW 图:底层包含所有节点,上层节点逐层稀疏
  2. 插入时随机决定节点最高层,从最高层开始贪心搜索找到最近邻
  3. 搜索时从顶层的入口点开始,逐层下降,每层做贪心 BFS

完整代码:

python
import heapq
import math
import random
from collections import defaultdict

import numpy as np


class HNSWIndex:
    """简化版 HNSW 索引"""

    def __init__(self, dim: int, max_layers: int = 4, ef_construction: int = 50,
                 m: int = 16, m_max: int = 32):
        self.dim = dim
        self.max_layers = max_layers
        self.ef_construction = ef_construction
        self.m = m            # 每层连接数
        self.m_max = m_max    # 每层最大连接数
        self.ml = 1.0 / math.log(m)  # 层间概率因子

        # layer -> {node_id: [neighbor_ids]}
        self.graphs: dict[int, dict[int, list[int]]] = defaultdict(lambda: defaultdict(list))
        self.vectors: dict[int, np.ndarray] = {}
        self.entry_point: int | None = None
        self.max_level: int = -1
        self._next_id = 0

    @staticmethod
    def _distance(a: np.ndarray, b: np.ndarray) -> float:
        return float(np.linalg.norm(a - b))

    def _random_level(self) -> int:
        """随机生成节点的最高层级"""
        level = 0
        while random.random() < (1.0 / self.m) and level < self.max_layers - 1:
            level += 1
        return level

    def _search_layer(self, query: np.ndarray, entry_points: list[int],
                      ef: int, layer: int) -> list[int]:
        """在指定层做贪心 beam search,返回 ef 个最近邻"""
        visited = set(entry_points)
        # candidates: min-heap by distance; results: max-heap by distance (negated)
        candidates = []
        results = []
        for ep in entry_points:
            d = self._distance(query, self.vectors[ep])
            heapq.heappush(candidates, (d, ep))
            heapq.heappush(results, (-d, ep))

        while candidates:
            d_c, c = heapq.heappop(candidates)
            # 当前最远结果的距离
            d_f = -results[0][0]
            if d_c > d_f:
                break
            for neighbor in self.graphs[layer].get(c, []):
                if neighbor in visited:
                    continue
                visited.add(neighbor)
                d_n = self._distance(query, self.vectors[neighbor])
                if d_n < d_f or len(results) < ef:
                    heapq.heappush(candidates, (d_n, neighbor))
                    heapq.heappush(results, (-d_n, neighbor))
                    if len(results) > ef:
                        heapq.heappop(results)

        return [nid for _, nid in sorted(results, key=lambda x: -x[0])]

    def insert(self, vector: np.ndarray) -> int:
        """插入一个向量,返回 node_id"""
        node_id = self._next_id
        self._next_id += 1
        self.vectors[node_id] = vector
        level = self._random_level()

        if self.entry_point is None:
            self.entry_point = node_id
            self.max_level = level
            return node_id

        # 从顶层贪心搜索到 level+1 层
        ep = self.entry_point
        for l in range(self.max_level, level, -1):
            eps = self._search_layer(vector, [ep], ef=1, layer=l)
            ep = eps[0] if eps else ep

        # 从 level 层到底层,搜索并建立双向连接
        for l in range(min(level, self.max_level), -1, -1):
            neighbors = self._search_layer(vector, [ep], self.ef_construction, layer=l)
            # 选择最近的 m 个作为连接
            selected = neighbors[:self.m]
            self.graphs[l][node_id] = list(selected)
            for nb in selected:
                self.graphs[l][nb].append(node_id)
                # 修剪超出 m_max 的连接
                if len(self.graphs[l][nb]) > self.m_max:
                    # 按距离排序保留最近的 m_max
                    dists = [(self._distance(self.vectors[nb], self.vectors[nb2]), nb2)
                             for nb2 in self.graphs[l][nb]]
                    dists.sort()
                    self.graphs[l][nb] = [nid for _, nid in dists[:self.m_max]]
            ep = neighbors[0] if neighbors else ep

        if level > self.max_level:
            self.max_level = level
            self.entry_point = node_id

        return node_id

    def search(self, query: np.ndarray, top_k: int = 5, ef: int = 50) -> list[tuple[int, float]]:
        """搜索 top_k 最近邻,返回 (node_id, distance) 列表"""
        if self.entry_point is None:
            return []
        ep = self.entry_point
        # 从顶层贪心到第 1 层
        for l in range(self.max_level, 0, -1):
            eps = self._search_layer(query, [ep], ef=1, layer=l)
            ep = eps[0] if eps else ep
        # 在第 0 层做 beam search
        candidates = self._search_layer(query, [ep], ef=max(ef, top_k), layer=0)
        # 按距离排序返回 top_k
        scored = [(nid, self._distance(query, self.vectors[nid])) for nid in candidates]
        scored.sort(key=lambda x: x[1])
        return scored[:top_k]


# ---- 测试 ----
if __name__ == "__main__":
    random.seed(42)
    np.random.seed(42)
    dim = 64
    index = HNSWIndex(dim=dim, m=8, m_max=16, ef_construction=40, max_layers=3)

    # 插入 500 个随机向量
    n = 500
    all_vecs = np.random.randn(n, dim).astype(np.float32)
    for i in range(n):
        index.insert(all_vecs[i])

    # 查询
    query = np.random.randn(dim).astype(np.float32)
    results = index.search(query, top_k=5, ef=50)
    print("HNSW 搜索结果 (top-5):")
    for nid, dist in results:
        print(f"  node={nid}, dist={dist:.4f}")

    # 暴力验证
    brute = sorted(range(n), key=lambda i: np.linalg.norm(all_vecs[i] - query))[:5]
    print("\n暴力搜索 top-5:", brute)

复杂度分析:

维度复杂度
插入O(m × log(N) × D) 平均
搜索O(log(N) × D) 平均,远优于暴力 O(N×D)
空间O(N × m × L),L=平均层数

追问:

  1. HNSW 的 ef_search 参数如何影响搜索精度和速度的权衡?实际生产中如何调优?
  2. HNSW 与 IVF-PQ 的主要区别是什么?在什么数据规模下各自更合适?

Q4: 实现文本分块算法(固定大小 + 递归分块)

难度:⭐⭐

题目描述:

实现两种文本分块策略:

  1. 固定大小分块(Fixed-size Chunking):按固定 token 数切分,支持重叠
  2. 递归分块(Recursive Character Splitting):按分隔符层级递归切分

解题思路:

  1. 固定大小:按 chunk_size 步进,step = chunk_size - overlap 确保重叠
  2. 递归分块:先用 \n\n 分割,若超长则用 \n,再超长用空格,最后用字符切分

完整代码:

python
def fixed_size_chunks(text: str, chunk_size: int = 100, overlap: int = 20) -> list[str]:
    """
    固定大小分块(按字符数)
    Args:
        text: 输入文本
        chunk_size: 每块字符数
        overlap: 重叠字符数
    Returns:
        分块列表
    """
    if not text or chunk_size <= 0:
        return []
    step = chunk_size - overlap
    if step <= 0:
        raise ValueError("overlap 必须小于 chunk_size")
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start += step
    return chunks


def recursive_split(text: str, chunk_size: int = 200, chunk_overlap: int = 50,
                    separators: list[str] | None = None) -> list[str]:
    """
    递归字符分割(类似 LangChain 的 RecursiveCharacterTextSplitter)
    Args:
        text: 输入文本
        chunk_size: 最大块大小(字符数)
        chunk_overlap: 重叠大小
        separators: 分隔符优先级列表
    Returns:
        分块列表
    """
    if separators is None:
        separators = ["\n\n", "\n", " ", ""]

    def _split_recursive(t: str, seps: list[str]) -> list[str]:
        if len(t) <= chunk_size:
            return [t] if t.strip() else []

        sep = seps[0] if seps else ""
        remaining_seps = seps[1:] if len(seps) > 1 else []

        if sep == "":
            # 最后手段:按字符硬切
            return fixed_size_chunks(t, chunk_size, chunk_overlap)

        # 按当前分隔符分割
        parts = t.split(sep)
        # 合并小块
        current = ""
        chunks = []
        for part in parts:
            candidate = current + sep + part if current else part
            if len(candidate) <= chunk_size:
                current = candidate
            else:
                if current:
                    chunks.append(current)
                # 如果单个 part 仍然超长,递归用下一级分隔符
                if len(part) > chunk_size:
                    chunks.extend(_split_recursive(part, remaining_seps))
                    current = ""
                else:
                    current = part
        if current:
            chunks.append(current)

        # 添加重叠
        if chunk_overlap > 0 and len(chunks) > 1:
            overlapped = [chunks[0]]
            for i in range(1, len(chunks)):
                prev_tail = chunks[i - 1][-chunk_overlap:]
                overlapped.append(prev_tail + sep + chunks[i])
            return overlapped
        return chunks

    result = _split_recursive(text.strip(), separators)
    return [c for c in result if c.strip()]


# ---- 测试 ----
if __name__ == "__main__":
    sample = """这是一个测试文档。

它包含了多个段落。每个段落都有不同的内容。
这是第一段的一些详细信息。

这是第二段。它讨论了另一个话题。
这里有一些额外的细节,使得这段文字变得比较长,需要被正确地分块处理。

这是第三段。"""

    print("=== 固定大小分块 ===")
    for i, c in enumerate(fixed_size_chunks(sample, chunk_size=50, overlap=10)):
        print(f"[块{i}] ({len(c)}字符): {c!r}")

    print("\n=== 递归分块 ===")
    for i, c in enumerate(recursive_split(sample, chunk_size=80, chunk_overlap=20)):
        print(f"[块{i}] ({len(c)}字符): {c!r}")

复杂度分析:

维度复杂度
固定大小O(N),N=文本长度
递归分块O(N × S),S=分隔符层级数
空间O(N)

追问:

  1. 在 RAG 场景中,chunk_size 和 chunk_overlap 如何影响检索效果?有没有自适应的分块策略?
  2. 语义分块(Semantic Chunking)的原理是什么?与固定/递归分块相比有何优劣?

RAG 相关


Q5: 实现一个简单的 RAG Pipeline

难度:⭐⭐

题目描述:

实现一个完整的 RAG 流水线:文档加载 → 文本分块 → Embedding 编码 → 向量检索 → 上下文注入 → 生成回答。可使用模拟的 Embedding 和 LLM。

解题思路:

  1. 文档加载器将文件读入
  2. 分块器将长文档切分为小块
  3. Embedding 模型将文本块转为向量,存入向量库
  4. 用户提问时,将问题编码为向量,检索最相关的 K 个文本块
  5. 将检索结果拼入 Prompt,送入 LLM 生成回答

完整代码:

python
import numpy as np
from dataclasses import dataclass, field


# ---- 组件定义 ----

class FakeEmbedding:
    """模拟 Embedding 模型(用哈希生成固定维度的伪向量)"""

    def __init__(self, dim: int = 64):
        self.dim = dim

    def encode(self, texts: list[str]) -> np.ndarray:
        vectors = []
        for t in texts:
            seed = hash(t) % (2**31)
            rng = np.random.RandomState(seed)
            vec = rng.randn(self.dim).astype(np.float32)
            vec /= np.linalg.norm(vec)  # L2 归一化
            vectors.append(vec)
        return np.array(vectors)


class FakeLLM:
    """模拟 LLM,简单拼接上下文返回"""

    def generate(self, prompt: str, context: str, question: str) -> str:
        return (
            f"根据以下参考资料回答问题:\n\n"
            f"【参考资料】\n{context}\n\n"
            f"【问题】{question}\n\n"
            f"【回答】基于提供的 {prompt.count('[文档片段]')} 段参考资料,"
            f"以下是关于「{question}」的回答:{context[:100]}..."
        )


@dataclass
class Chunk:
    text: str
    metadata: dict = field(default_factory=dict)
    embedding: np.ndarray | None = None


class SimpleVectorStore:
    """简易向量存储与检索"""

    def __init__(self):
        self.chunks: list[Chunk] = []

    def add(self, chunks: list[Chunk]):
        self.chunks.extend(chunks)

    def search(self, query_vec: np.ndarray, top_k: int = 3) -> list[tuple[Chunk, float]]:
        results = []
        for chunk in self.chunks:
            sim = np.dot(query_vec, chunk.embedding)
            results.append((chunk, float(sim)))
        results.sort(key=lambda x: x[1], reverse=True)
        return results[:top_k]


# ---- RAG Pipeline ----

class RAGPipeline:
    def __init__(self, chunk_size: int = 100, chunk_overlap: int = 20, top_k: int = 3):
        self.embedder = FakeEmbedding(dim=64)
        self.llm = FakeLLM()
        self.store = SimpleVectorStore()
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.top_k = top_k

    def _chunk_text(self, text: str) -> list[str]:
        step = self.chunk_size - self.chunk_overlap
        chunks = []
        i = 0
        while i < len(text):
            chunks.append(text[i:i + self.chunk_size])
            i += step
        return chunks

    def ingest(self, documents: list[str]):
        """文档加载 + 分块 + Embedding + 存储"""
        all_chunks = []
        for doc_idx, doc in enumerate(documents):
            text_chunks = self._chunk_text(doc)
            embeddings = self.embedder.encode(text_chunks)
            for i, (text, emb) in enumerate(zip(text_chunks, embeddings)):
                chunk = Chunk(
                    text=text,
                    metadata={"doc_id": doc_idx, "chunk_index": i},
                    embedding=emb,
                )
                all_chunks.append(chunk)
        self.store.add(all_chunks)
        print(f"已摄入 {len(all_chunks)} 个文本块")

    def query(self, question: str) -> str:
        """检索 + 生成"""
        # 1. 问题编码
        q_vec = self.embedder.encode([question])[0]
        # 2. 向量检索
        results = self.store.search(q_vec, top_k=self.top_k)
        # 3. 构建上下文
        context_parts = []
        for chunk, score in results:
            context_parts.append(f"[文档片段] (相似度:{score:.4f})\n{chunk.text}")
        context = "\n\n".join(context_parts)
        # 4. 生成
        return self.llm.generate(prompt=context, context=context, question=question)


# ---- 测试 ----
if __name__ == "__main__":
    documents = [
        "RAG(检索增强生成)是一种结合信息检索和文本生成的技术。它通过从外部知识库中检索相关文档,将其作为上下文输入到大语言模型中,从而生成更准确、更有依据的回答。",
        "向量数据库是 RAG 系统的关键组件。它将文本转换为高维向量,并通过近似最近邻算法快速检索相似内容。常见的向量数据库包括 FAISS、Milvus、Pinecone 等。",
        "Embedding 模型将文本映射到稠密向量空间。好的 Embedding 模型能使语义相似的文本在向量空间中距离更近。常用模型有 OpenAI text-embedding-ada-002、BGE、E5 等。",
    ]

    rag = RAGPipeline(chunk_size=80, chunk_overlap=20, top_k=2)
    rag.ingest(documents)

    answer = rag.query("什么是 RAG?")
    print(answer)

复杂度分析:

维度复杂度
摄入O(N × D),N=文本块数,D=Embedding 维度
查询O(N × D + K × D)
空间O(N × D)

追问:

  1. 如何处理多模态文档(表格、图片)的 RAG?与纯文本 RAG 的 Pipeline 有何不同?
  2. RAG 中的查询改写(Query Rewriting)有哪些常见技术?HyDE、Multi-query 分别是什么?

Q6: 实现 BM25 检索算法

难度:⭐⭐

题目描述:

实现 BM25 文本检索算法,支持文档索引和查询打分。BM25 是传统信息检索中的经典算法,在 RAG 中常与向量检索混合使用。

解题思路:

  1. 对文档集进行分词和索引
  2. BM25 评分公式:score(q,d) = Σ IDF(qi) &times; (f(qi,d) &times; (k1+1)) / (f(qi,d) + k1 &times; (1 - b + b &times; |d|/avgdl))
  3. IDF = log((N - n(qi) + 0.5) / (n(qi) + 0.5) + 1)

完整代码:

python
import math
from collections import Counter, defaultdict
from dataclasses import dataclass


def simple_tokenize(text: str) -> list[str]:
    """简易中文/英文分词(按字符和空格)"""
    import re
    # 英文按空格,中文按字(简化处理)
    tokens = []
    for seg in re.findall(r'[\u4e00-\u9fff]+|[a-zA-Z]+|[0-9]+', text.lower()):
        if re.match(r'[\u4e00-\u9fff]', seg):
            tokens.extend(list(seg))  # 中文按字切分
        else:
            tokens.append(seg)
    return tokens


class BM25:
    """BM25 检索算法"""

    def __init__(self, k1: float = 1.5, b: float = 0.75):
        self.k1 = k1
        self.b = b
        self.doc_count = 0
        self.avgdl = 0.0
        self.doc_freqs: dict[str, int] = defaultdict(int)  # term -> 包含该 term 的文档数
        self.doc_lens: list[int] = []
        self.tf: list[dict[str, int]] = []  # 每篇文档的 term freq
        self.docs: list[str] = []  # 原始文档
        self.idf: dict[str, float] = {}

    def index(self, documents: list[str]):
        """索引文档列表"""
        self.docs = documents
        total_len = 0
        for doc in documents:
            tokens = simple_tokenize(doc)
            total_len += len(tokens)
            tf = Counter(tokens)
            self.tf.append(dict(tf))
            self.doc_lens.append(len(tokens))
            # 更新文档频率
            seen = set(tokens)
            for t in seen:
                self.doc_freqs[t] += 1

        self.doc_count = len(documents)
        self.avgdl = total_len / self.doc_count if self.doc_count > 0 else 0

        # 预计算 IDF
        for term, df in self.doc_freqs.items():
            self.idf[term] = math.log(
                (self.doc_count - df + 0.5) / (df + 0.5) + 1
            )

    def _score_doc(self, query_tokens: list[str], doc_idx: int) -> float:
        score = 0.0
        doc_tf = self.tf[doc_idx]
        dl = self.doc_lens[doc_idx]
        for qt in query_tokens:
            if qt not in doc_tf:
                continue
            f = doc_tf[qt]
            idf = self.idf.get(qt, 0)
            numerator = f * (self.k1 + 1)
            denominator = f + self.k1 * (1 - self.b + self.b * dl / self.avgdl)
            score += idf * (numerator / denominator)
        return score

    def search(self, query: str, top_k: int = 3) -> list[dict]:
        """搜索并返回 top_k 结果"""
        tokens = simple_tokenize(query)
        scored = []
        for i in range(self.doc_count):
            score = self._score_doc(tokens, i)
            scored.append({
                "doc_index": i,
                "text": self.docs[i],
                "score": score,
            })
        scored.sort(key=lambda x: x["score"], reverse=True)
        return scored[:top_k]


# ---- 测试 ----
if __name__ == "__main__":
    docs = [
        "RAG 是检索增强生成技术,用于提升大语言模型的回答质量。",
        "BM25 是经典的文本检索算法,基于词频和逆文档频率。",
        "向量数据库存储高维向量,支持近似最近邻搜索。",
        "Embedding 模型将文本映射到向量空间。",
        "BM25 在关键词匹配方面表现优异,与向量检索互补。",
    ]
    bm25 = BM25()
    bm25.index(docs)
    results = bm25.search("BM25 检索算法", top_k=3)
    for r in results:
        print(f"[{r['score']:.4f}] {r['text']}")

复杂度分析:

维度复杂度
索引O(N × L),N=文档数,L=平均长度
查询O(N × Q),Q=查询词数
空间O(N × V),V=词表大小

追问:

  1. BM25 与向量检索在什么场景下会互补?混合检索(Hybrid Search)的融合策略有哪些(RRF 等)?
  2. BM25 的 k1 和 b 参数分别控制什么?如何通过实验调优?

Q7: 实现 Reranking 逻辑(交叉编码器打分 + 排序)

难度:⭐⭐

题目描述:

实现一个 Reranking 模块:给定查询和候选文档列表,用交叉编码器方式计算相关性分数并重新排序。由于真实模型较重,这里用模拟打分器演示完整流程。

解题思路:

  1. 初次检索(召回)返回较多候选(如 top-20)
  2. Reranker 将 (query, doc) 拼接后精细打分
  3. 按 reranker 分数重新排序,返回 top-K

完整代码:

python
import numpy as np
from dataclasses import dataclass


@dataclass
class RetrievalResult:
    doc_id: int
    text: str
    retrieval_score: float
    rerank_score: float = 0.0


class CrossEncoderReranker:
    """
    模拟交叉编码器 Reranker
    实际实现中会调用 BGE-Reranker / Cohere Rerank 等模型
    """

    def __init__(self, dim: int = 64):
        self.dim = dim
        # 模拟一个打分用的线性权重
        np.random.seed(0)
        self.weights = np.random.randn(dim)

    def _encode_pair(self, query: str, doc: str) -> np.ndarray:
        """模拟交叉编码:将 query+doc 拼接后编码为向量"""
        combined = query + " [SEP] " + doc
        seed = hash(combined) % (2**31)
        rng = np.random.RandomState(seed)
        vec = rng.randn(self.dim).astype(np.float32)
        return vec

    def score(self, query: str, docs: list[str]) -> list[float]:
        """对每个 (query, doc) 对打分"""
        scores = []
        for doc in docs:
            vec = self._encode_pair(query, doc)
            s = float(np.dot(vec, self.weights))
            scores.append(s)
        return scores

    def rerank(self, query: str, results: list[RetrievalResult], top_k: int = 3) -> list[RetrievalResult]:
        """重排序"""
        docs = [r.text for r in results]
        scores = self.score(query, docs)
        for i, s in enumerate(scores):
            results[i].rerank_score = s
        results.sort(key=lambda x: x.rerank_score, reverse=True)
        return results[:top_k]


class RAGWithReranking:
    """带 Reranking 的 RAG 流程"""

    def __init__(self):
        self.reranker = CrossEncoderReranker()
        # 模拟向量召回结果
        self.candidate_pool: list[RetrievalResult] = []

    def recall(self, query_vec: np.ndarray, top_k: int = 10) -> list[RetrievalResult]:
        """模拟向量召回(返回 top_k 较大值)"""
        # 用 mock 数据模拟
        return self.candidate_pool[:top_k]

    def query(self, query: str, recall_results: list[RetrievalResult],
              final_top_k: int = 3) -> list[RetrievalResult]:
        """完整流程:召回 → Rerank → 返回"""
        # Rerank
        reranked = self.reranker.rerank(query, recall_results, top_k=final_top_k)
        return reranked


# ---- 测试 ----
if __name__ == "__main__":
    docs = [
        "BM25 是经典的关键词检索算法",
        "余弦相似度用于衡量向量之间的方向相似性",
        "HNSW 是一种高效的近似最近邻索引算法",
        "RAG 通过检索外部知识来增强生成模型的回答",
        "Transformer 架构是现代 NLP 的基础",
        "向量数据库支持高效的相似性搜索",
        "BM25 基于词频和逆文档频率计算相关性得分",
        "深度学习在 NLP 中的应用越来越广泛",
    ]

    # 模拟召回阶段的结果(按原始分数排序)
    candidates = []
    for i, doc in enumerate(docs):
        score = np.random.random()
        candidates.append(RetrievalResult(
            doc_id=i, text=doc, retrieval_score=score
        ))
    candidates.sort(key=lambda x: x.retrieval_score, reverse=True)

    print("=== 召回阶段 (top-6) ===")
    for c in candidates[:6]:
        print(f"  [{c.retrieval_score:.4f}] {c.text}")

    # Rerank
    reranker = CrossEncoderReranker()
    reranked = reranker.rerank("什么是 BM25 检索算法?", candidates[:6], top_k=3)

    print("\n=== Rerank 后 (top-3) ===")
    for r in reranked:
        print(f"  [rerank:{r.rerank_score:.4f} recall:{r.retrieval_score:.4f}] {r.text}")

复杂度分析:

维度复杂度
打分O(K × D),K=候选数,D=模型维度
排序O(K log K)
总查询召回 O(log N) + Rerank O(K × D)

追问:

  1. 真实场景中,BGE-Reranker 与 Cohere Rerank 的架构有什么区别?各自的延迟和效果如何?
  2. Reranking 的候选数量 K 应该设多大?太大会怎样?太小呢?

Agent 相关


Q8: 实现 ReAct 循环(Thought → Action → Observation)

难度:⭐⭐⭐

题目描述:

实现 ReAct(Reasoning and Acting)Agent 框架。Agent 能够循环执行:思考(Thought)→ 选择动作(Action)→ 执行并观察结果(Observation),直到得出最终答案。

解题思路:

  1. 维护工具注册表(函数名 → 可调用对象)
  2. 每轮循环:构建包含历史的 Prompt → LLM 输出 Thought + Action → 解析 Action → 执行 → 拿到 Observation
  3. 若 LLM 输出 FINISH 则结束循环

完整代码:

python
import json
import re
from dataclasses import dataclass, field
from typing import Callable, Any


@dataclass
class Tool:
    name: str
    description: str
    func: Callable[..., str]


class FakeLLM:
    """模拟 LLM,根据问题返回预设的 Thought/Action"""

    def __init__(self):
        self.call_count = 0

    def generate(self, prompt: str) -> str:
        self.call_count += 1
        # 简单模拟:第一次思考并调用工具,第二次结束
        if self.call_count == 1:
            return (
                "Thought: 用户问的是天气,我需要调用天气查询工具。\n"
                "Action: search_weather\n"
                'Action Input: {"city": "北京"}'
            )
        elif self.call_count == 2:
            return (
                "Thought: 我已经获得了天气信息,可以回答用户了。\n"
                "Action: finish\n"
                'Action Input: {"answer": "北京今天天气晴朗,温度25°C。"}'
            )
        return "Thought: 我可以回答了。\nAction: finish\nAction Input: {\"answer\": \"无法确定。\"}"


class ReActAgent:
    """ReAct Agent 框架"""

    def __init__(self, tools: list[Tool], max_steps: int = 10):
        self.tools = {t.name: t for t in tools}
        self.max_steps = max_steps
        self.llm = FakeLLM()
        self.history: list[dict] = []

    def _build_prompt(self, query: str) -> str:
        tool_desc = "\n".join(
            f"- {t.name}: {t.description}" for t in self.tools.values()
        )
        history_str = ""
        for h in self.history:
            history_str += f"\n{h['thought']}\n{h['action']}: {h['action_input']}\nObservation: {h['observation']}"

        return (
            f"你是一个 ReAct Agent。你可以使用以下工具:\n{tool_desc}\n\n"
            f"请严格按照以下格式回答:\n"
            f"Thought: <你的思考>\nAction: <工具名>\nAction Input: <JSON参数>\n\n"
            f"当你准备好最终回答时:\nThought: <你的思考>\nAction: finish\nAction Input: {{\"answer\": \"...\"}}"
            f"\n\n问题: {query}{history_str}"
        )

    def _parse_response(self, text: str) -> dict:
        thought = ""
        action = ""
        action_input = ""

        thought_match = re.search(r'Thought:\s*(.+?)(?:\n|$)', text)
        if thought_match:
            thought = thought_match.group(1).strip()

        action_match = re.search(r'Action:\s*(.+?)(?:\n|$)', text)
        if action_match:
            action = action_match.group(1).strip()

        input_match = re.search(r'Action Input:\s*(.+?)(?:\n|$)', text)
        if input_match:
            action_input = input_match.group(1).strip()

        return {"thought": thought, "action": action, "action_input": action_input}

    def _execute_tool(self, action: str, action_input: str) -> str:
        if action == "finish":
            return action_input
        tool = self.tools.get(action)
        if not tool:
            return f"错误:未知工具 '{action}'"
        try:
            params = json.loads(action_input)
            return tool.func(**params)
        except Exception as e:
            return f"工具执行错误: {e}"

    def run(self, query: str) -> str:
        """执行 ReAct 循环"""
        for step in range(self.max_steps):
            prompt = self._build_prompt(query)
            response = self.llm.generate(prompt)
            parsed = self._parse_response(response)

            if parsed["action"] == "finish":
                try:
                    final = json.loads(parsed["action_input"])
                    return final.get("answer", parsed["action_input"])
                except json.JSONDecodeError:
                    return parsed["action_input"]

            # 执行工具
            observation = self._execute_tool(parsed["action"], parsed["action_input"])

            # 记录历史
            self.history.append({
                "thought": parsed["thought"],
                "action": parsed["action"],
                "action_input": parsed["action_input"],
                "observation": observation,
            })

        return "达到最大步数限制,未能得出答案。"


# ---- 测试 ----
if __name__ == "__main__":
    def search_weather(city: str) -> str:
        weather_data = {"北京": "晴朗 25°C", "上海": "多云 28°C", "广州": "阵雨 30°C"}
        return weather_data.get(city, f"未找到 {city} 的天气数据")

    def search_knowledge(query: str) -> str:
        return f"关于「{query}」的知识:这是一个测试知识库的回答。"

    tools = [
        Tool(name="search_weather", description="查询城市天气", func=search_weather),
        Tool(name="search_knowledge", description="查询知识库", func=search_knowledge),
    ]

    agent = ReActAgent(tools=tools, max_steps=5)
    answer = agent.run("北京今天天气怎么样?")
    print("最终回答:", answer)
    print("\n执行历史:")
    for i, h in enumerate(agent.history):
        print(f"  步骤{i+1}: Thought={h['thought']}")
        print(f"         Action={h['action']}, Input={h['action_input']}")
        print(f"         Observation={h['observation']}")

复杂度分析:

维度复杂度
每步时间O(1) 框架开销 + LLM 推理时间
总时间O(steps × LLM_latency)
空间O(steps × context_length)

追问:

  1. 如何防止 ReAct Agent 进入死循环或反复调用同一工具?有哪些常见的退出策略?
  2. ReAct 与 Plan-and-Execute、Reflexion 等 Agent 模式的核心区别是什么?

Q9: 实现 Function Calling 的参数解析和执行器

难度:⭐⭐

题目描述:

实现一个 Function Calling 系统,包括:工具注册(JSON Schema 定义)、LLM 输出解析、参数校验、函数执行、结果封装。

解题思路:

  1. 用 JSON Schema 定义函数签名(名称、描述、参数类型、必填字段)
  2. 解析 LLM 的 function_call 输出(JSON 格式)
  3. 校验参数类型和必填项
  4. 执行函数并格式化返回

完整代码:

python
import json
from dataclasses import dataclass, field
from typing import Any, Callable
from datetime import datetime


@dataclass
class ParameterDef:
    name: str
    type: str           # "string", "integer", "float", "boolean", "array"
    description: str
    required: bool = True
    enum: list | None = None


@dataclass
class FunctionDef:
    name: str
    description: str
    parameters: list[ParameterDef]
    handler: Callable[..., Any]

    def to_schema(self) -> dict:
        """生成 OpenAI 风格的 function schema"""
        props = {}
        required = []
        for p in self.parameters:
            prop = {"type": p.type, "description": p.description}
            if p.enum:
                prop["enum"] = p.enum
            props[p.name] = prop
            if p.required:
                required.append(p.name)
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": props,
                "required": required,
            }
        }


class FunctionCallExecutor:
    """Function Calling 执行器"""

    def __init__(self):
        self.functions: dict[str, FunctionDef] = {}

    def register(self, func_def: FunctionDef):
        self.functions[func_def.name] = func_def

    def get_tools_schema(self) -> list[dict]:
        """获取所有工具的 schema(供 LLM 使用)"""
        return [fd.to_schema() for fd in self.functions.values()]

    def _validate_params(self, func_def: FunctionDef, params: dict) -> tuple[bool, str]:
        """校验参数"""
        for p in func_def.parameters:
            if p.required and p.name not in params:
                return False, f"缺少必填参数: {p.name}"
            if p.name in params:
                val = params[p.name]
                type_map = {
                    "string": str, "integer": int, "float": float,
                    "boolean": bool, "array": list,
                }
                expected_type = type_map.get(p.type)
                if expected_type and not isinstance(val, expected_type):
                    # 尝试类型转换
                    try:
                        if p.type == "integer":
                            params[p.name] = int(val)
                        elif p.type == "float":
                            params[p.name] = float(val)
                        elif p.type == "string":
                            params[p.name] = str(val)
                    except (ValueError, TypeError):
                        return False, f"参数 {p.name} 类型错误,期望 {p.type}"
                if p.enum and params[p.name] not in p.enum:
                    return False, f"参数 {p.name} 的值必须在 {p.enum} 中"
        return True, "OK"

    def execute(self, function_call: dict) -> dict:
        """
        执行 LLM 返回的 function_call
        Args:
            function_call: {"name": "...", "arguments": "{...}"}
        Returns:
            {"result": ..., "error": ...}
        """
        name = function_call.get("name", "")
        args_str = function_call.get("arguments", "{}")

        if name not in self.functions:
            return {"error": f"未知函数: {name}"}

        func_def = self.functions[name]

        try:
            params = json.loads(args_str) if isinstance(args_str, str) else args_str
        except json.JSONDecodeError as e:
            return {"error": f"参数 JSON 解析失败: {e}"}

        valid, msg = self._validate_params(func_def, params)
        if not valid:
            return {"error": f"参数校验失败: {msg}"}

        try:
            result = func_def.handler(**params)
            return {"result": result}
        except Exception as e:
            return {"error": f"函数执行失败: {e}"}


# ---- 测试 ----
if __name__ == "__main__":
    executor = FunctionCallExecutor()

    # 注册工具
    executor.register(FunctionDef(
        name="get_weather",
        description="获取指定城市的天气信息",
        parameters=[
            ParameterDef("city", "string", "城市名称", required=True),
            ParameterDef("unit", "string", "温度单位", required=False, enum=["celsius", "fahrenheit"]),
        ],
        handler=lambda city, unit="celsius": f"{city}天气:晴朗,25{'°C' if unit == 'celsius' else '°F'}"
    ))

    executor.register(FunctionDef(
        name="send_email",
        description="发送邮件",
        parameters=[
            ParameterDef("to", "string", "收件人邮箱", required=True),
            ParameterDef("subject", "string", "邮件主题", required=True),
            ParameterDef("body", "string", "邮件正文", required=True),
        ],
        handler=lambda to, subject, body: f"邮件已发送至 {to},主题:{subject}"
    ))

    # 打印 schema
    print("=== 工具 Schema ===")
    print(json.dumps(executor.get_tools_schema(), ensure_ascii=False, indent=2))

    # 模拟 LLM 输出的 function_call
    print("\n=== 执行 function_call ===")
    calls = [
        {"name": "get_weather", "arguments": '{"city": "北京"}'},
        {"name": "send_email", "arguments": '{"to": "test@example.com", "subject": "测试", "body": "Hello"}'},
        {"name": "get_weather", "arguments": '{"city": "上海", "unit": "fahrenheit"}'},
        {"name": "unknown_func", "arguments": '{}'},
    ]

    for call in calls:
        result = executor.execute(call)
        print(f"  {call['name']}({call['arguments']}) => {result}")

复杂度分析:

维度复杂度
注册O(1)
校验O(P),P=参数数量
执行取决于具体函数

追问:

  1. OpenAI 的 function calling 和 tool_choice 参数如何控制工具调用行为?parallel function calling 是什么?
  2. 如何处理函数执行结果过长的情况?截断策略有哪些?

Q10: 实现一个简单的 Agent 记忆系统(短期 + 长期记忆)

难度:⭐⭐⭐

题目描述:

实现 Agent 的双层记忆系统:

  • 短期记忆(Working Memory):当前对话轮次的上下文
  • 长期记忆(Long-term Memory):跨会话持久化的关键信息

解题思路:

  1. 短期记忆:维护一个滑动窗口的对话历史
  2. 长期记忆:提取关键信息(事实、偏好、摘要)存入结构化存储
  3. 查询时融合短期上下文和相关长期记忆

完整代码:

python
import json
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any


@dataclass
class MemoryEntry:
    content: str
    timestamp: float
    memory_type: str      # "fact", "preference", "summary", "conversation"
    metadata: dict = field(default_factory=dict)
    importance: float = 0.5  # 0-1 重要性评分


class ShortTermMemory:
    """短期记忆:滑动窗口对话历史"""

    def __init__(self, max_turns: int = 10):
        self.max_turns = max_turns
        self.messages: deque[dict] = deque(maxlen=max_turns * 2)  # user + assistant

    def add(self, role: str, content: str):
        self.messages.append({
            "role": role,
            "content": content,
            "timestamp": time.time(),
        })

    def get_context(self) -> list[dict]:
        return list(self.messages)

    def get_context_string(self) -> str:
        lines = []
        for msg in self.messages:
            lines.append(f"{msg['role']}: {msg['content']}")
        return "\n".join(lines)

    def clear(self):
        self.messages.clear()


class LongTermMemory:
    """长期记忆:结构化持久存储"""

    def __init__(self):
        self.entries: list[MemoryEntry] = []

    def add(self, content: str, memory_type: str = "fact",
            importance: float = 0.5, metadata: dict | None = None):
        self.entries.append(MemoryEntry(
            content=content,
            timestamp=time.time(),
            memory_type=memory_type,
            metadata=metadata or {},
            importance=importance,
        ))

    def search(self, query: str, top_k: int = 5, memory_type: str | None = None) -> list[MemoryEntry]:
        """简易搜索:按关键词匹配 + 重要性排序"""
        results = []
        query_lower = query.lower()
        for entry in self.entries:
            if memory_type and entry.memory_type != memory_type:
                continue
            # 简单关键词匹配分数
            content_lower = entry.content.lower()
            match_score = sum(1 for word in query_lower.split() if word in content_lower)
            combined_score = match_score * 0.6 + entry.importance * 0.4
            if match_score > 0:
                results.append((combined_score, entry))
        results.sort(key=lambda x: x[0], reverse=True)
        return [entry for _, entry in results[:top_k]]

    def get_all(self, memory_type: str | None = None) -> list[MemoryEntry]:
        if memory_type:
            return [e for e in self.entries if e.memory_type == memory_type]
        return list(self.entries)


class MemoryExtractor:
    """记忆提取器:从对话中提取关键信息存入长期记忆"""

    def extract(self, user_msg: str, assistant_msg: str) -> list[dict]:
        """模拟提取(实际用 LLM 做信息抽取)"""
        extracted = []

        # 模拟:检测偏好
        preference_keywords = ["喜欢", "不喜欢", "偏好", "习惯", "最爱"]
        for kw in preference_keywords:
            if kw in user_msg:
                extracted.append({
                    "content": f"用户表达偏好:{user_msg[:50]}",
                    "type": "preference",
                    "importance": 0.7,
                })
                break

        # 模拟:检测事实
        fact_keywords = ["我是", "我在", "我的", "叫做", "名字"]
        for kw in fact_keywords:
            if kw in user_msg:
                extracted.append({
                    "content": f"用户事实:{user_msg[:80]}",
                    "type": "fact",
                    "importance": 0.8,
                })
                break

        return extracted


class AgentMemorySystem:
    """Agent 记忆系统"""

    def __init__(self, max_short_term_turns: int = 10):
        self.short_term = ShortTermMemory(max_turns=max_short_term_turns)
        self.long_term = LongTermMemory()
        self.extractor = MemoryExtractor()

    def on_user_message(self, content: str):
        """处理用户消息"""
        self.short_term.add("user", content)

    def on_assistant_message(self, content: str, user_msg: str = ""):
        """处理助手消息,同时提取长期记忆"""
        self.short_term.add("assistant", content)
        if user_msg:
            memories = self.extractor.extract(user_msg, content)
            for mem in memories:
                self.long_term.add(
                    content=mem["content"],
                    memory_type=mem["type"],
                    importance=mem["importance"],
                )

    def build_context(self, query: str = "", long_term_top_k: int = 3) -> str:
        """构建融合短期和长期记忆的上下文"""
        parts = []

        # 长期记忆
        if query:
            relevant = self.long_term.search(query, top_k=long_term_top_k)
            if relevant:
                parts.append("=== 相关历史记忆 ===")
                for entry in relevant:
                    parts.append(f"[{entry.memory_type}] {entry.content}")
                parts.append("")

        # 短期记忆(当前对话)
        context = self.short_term.get_context_string()
        if context:
            parts.append("=== 当前对话 ===")
            parts.append(context)

        return "\n".join(parts)

    def summarize_and_consolidate(self):
        """将短期记忆摘要后存入长期记忆"""
        context = self.short_term.get_context_string()
        if context.strip():
            self.long_term.add(
                content=f"对话摘要:{context[:200]}",
                memory_type="summary",
                importance=0.6,
            )
            self.short_term.clear()


# ---- 测试 ----
if __name__ == "__main__":
    mem = AgentMemorySystem(max_short_term_turns=5)

    # 模拟多轮对话
    conversations = [
        ("你好,我叫张明,在字节跳动工作。", "你好张明!很高兴认识你。"),
        ("我喜欢用 Python 编程,不太喜欢 Java。", "了解!Python 确实很流行。"),
        ("你能推荐一些学习资源吗?", "当然,推荐你看看 LangChain 文档。"),
        ("我在做一个 RAG 项目。", "RAG 很有意思,有什么需要帮忙的吗?"),
        ("我想了解一下向量数据库。", "向量数据库是 RAG 的核心组件..."),
    ]

    for user_msg, asst_msg in conversations:
        mem.on_user_message(user_msg)
        mem.on_assistant_message(asst_msg, user_msg=user_msg)

    # 查看长期记忆
    print("=== 长期记忆 ===")
    for entry in mem.long_term.get_all():
        print(f"  [{entry.memory_type}] (重要性:{entry.importance}) {entry.content}")

    # 搜索相关记忆
    print("\n=== 搜索「编程偏好」 ===")
    results = mem.long_term.search("编程 Python 偏好")
    for r in results:
        print(f"  [{r.memory_type}] {r.content}")

    # 构建上下文
    print("\n=== 融合上下文 ===")
    ctx = mem.build_context(query="用户偏好")
    print(ctx)

复杂度分析:

维度复杂度
添加(短期)O(1)
添加(长期)O(1)
搜索(长期)O(N × Q),N=记忆条数,Q=查询词数
上下文构建O(top_k + W),W=短期窗口大小

追问:

  1. 实际生产中,长期记忆通常存储在向量数据库中,检索时用语义相似度而非关键词匹配。如何实现?
  2. MemGPT(现名 Letta)的虚拟内存机制是如何工作的?与上述方案有何区别?

模型推理相关


Q11: 实现 KV Cache 的简化版

难度:⭐⭐⭐

题目描述:

实现 Transformer 自回归解码中 KV Cache 的简化版本。演示缓存如何避免重复计算,加速逐 token 生成。

解题思路:

  1. 自回归生成时,每生成一个新 token 都需要计算 attention
  2. 不用 cache 时,每次需重新计算所有 token 的 K、V
  3. 用 cache 时,将之前的 K、V 缓存起来,只计算新 token 的 Q、K、V,再拼接

完整代码:

python
import numpy as np


class SimpleAttention:
    """简化版自注意力(单头)"""

    def __init__(self, d_model: int):
        self.d_model = d_model
        np.random.seed(42)
        self.W_q = np.random.randn(d_model, d_model) * 0.01
        self.W_k = np.random.randn(d_model, d_model) * 0.01
        self.W_v = np.random.randn(d_model, d_model) * 0.01

    def forward(self, x: np.ndarray) -> np.ndarray:
        """
        标准自注意力(无缓存)
        Args:
            x: (seq_len, d_model)
        Returns:
            output: (seq_len, d_model)
        """
        Q = x @ self.W_q
        K = x @ self.W_k
        V = x @ self.W_v
        scores = Q @ K.T / np.sqrt(self.d_model)
        # 因果掩码
        seq_len = x.shape[0]
        mask = np.triu(np.ones((seq_len, seq_len)), k=1) * -1e9
        scores = scores + mask
        weights = self._softmax(scores)
        return weights @ V

    def forward_with_cache(self, x_new: np.ndarray,
                           kv_cache: dict | None = None) -> tuple[np.ndarray, dict]:
        """
        带 KV Cache 的自注意力
        Args:
            x_new: (1, d_model) 新 token 的嵌入
            kv_cache: {"k": (cached_len, d_model), "v": (cached_len, d_model)}
        Returns:
            output: (1, d_model)
            updated_cache
        """
        q_new = x_new @ self.W_q   # (1, d_model)
        k_new = x_new @ self.W_k   # (1, d_model)
        v_new = x_new @ self.W_v   # (1, d_model)

        if kv_cache is not None and kv_cache.get("k") is not None:
            # 拼接缓存
            K = np.concatenate([kv_cache["k"], k_new], axis=0)  # (cached+1, d_model)
            V = np.concatenate([kv_cache["v"], v_new], axis=0)
        else:
            K = k_new
            V = v_new

        # Q 只有当前 token,K/V 包含所有历史
        scores = q_new @ K.T / np.sqrt(self.d_model)  # (1, cached+1)
        weights = self._softmax(scores)                 # (1, cached+1)
        output = weights @ V                            # (1, d_model)

        new_cache = {"k": K, "v": V}
        return output, new_cache

    @staticmethod
    def _softmax(x: np.ndarray) -> np.ndarray:
        e = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return e / e.sum(axis=-1, keepdims=True)


# ---- 测试 ----
if __name__ == "__main__":
    d_model = 32
    seq_len = 10
    attn = SimpleAttention(d_model)

    # 模拟输入序列
    x = np.random.randn(seq_len, d_model).astype(np.float32)

    # 方式1:无缓存,一次性计算
    output_full = attn.forward(x)
    print(f"无缓存输出 shape: {output_full.shape}")
    print(f"最后一个 token 的输出(前5维): {output_full[-1, :5]}")

    # 方式2:有缓存,逐 token 计算
    kv_cache = {"k": None, "v": None}
    for i in range(seq_len):
        out, kv_cache = attn.forward_with_cache(x[i:i+1, :], kv_cache)

    print(f"\n有缓存最终输出(前5维): {out[0, :5]}")
    print(f"KV Cache 形状 - K: {kv_cache['k'].shape}, V: {kv_cache['v'].shape}")

    # 验证一致性
    diff = np.abs(output_full[-1, :] - out[0, :])
    print(f"\n两种方式的输出差异(最大): {diff.max():.8f}")
    print(f"输出一致: {diff.max() < 1e-5}")

    # 性能对比示意
    print("\n=== 计算量对比 ===")
    for s in [100, 500, 1000]:
        ops_no_cache = s * s * d_model
        ops_cache = s * 1 * d_model
        print(f"序列长度 {s}: 无缓存={ops_no_cache:,} ops, 有缓存={ops_cache:,} ops, "
              f"加速比={ops_no_cache/ops_cache:.0f}x")

复杂度分析:

维度复杂度
无缓存(生成 N tokens)O(N² × D)
有缓存(生成 N tokens)O(N × D)(每次只需 1×D 的注意力)
缓存空间O(N × D × L × H),L=层数,H=头数

追问:

  1. 在多头注意力和多层 Transformer 中,KV Cache 的总内存占用如何计算?对于 7B 模型、序列长度 4096,大约需要多少显存?
  2. MQA(Multi-Query Attention)和 GQA(Grouped-Query Attention)如何减少 KV Cache 的大小?

Q12: 实现贪心解码和采样解码

难度:⭐⭐

题目描述:

实现两种常见的文本解码策略:

  1. 贪心解码(Greedy Decoding):每步选概率最高的 token
  2. 采样解码(Sampling):按概率分布随机采样,支持 temperature 和 top-k/top-p

解题思路:

  1. 贪心解码直接 argmax
  2. 采样解码通过 temperature 调整分布平坦度
  3. Top-k 保留概率最高的 k 个 token
  4. Top-p(Nucleus Sampling)保留累积概率达到 p 的最小 token 集合

完整代码:

python
import numpy as np


class TextDecoder:
    """文本解码器"""

    def __init__(self, vocab: list[str]):
        self.vocab = vocab
        self.vocab_size = len(vocab)

    def _softmax(self, logits: np.ndarray) -> np.ndarray:
        e = np.exp(logits - np.max(logits))
        return e / e.sum()

    def _apply_temperature(self, logits: np.ndarray, temperature: float) -> np.ndarray:
        if temperature <= 0:
            temperature = 1e-8
        return logits / temperature

    def _apply_top_k(self, probs: np.ndarray, k: int) -> np.ndarray:
        top_k_indices = np.argsort(probs)[-k:]
        mask = np.zeros_like(probs)
        mask[top_k_indices] = 1.0
        probs = probs * mask
        return probs / probs.sum()

    def _apply_top_p(self, probs: np.ndarray, p: float) -> np.ndarray:
        sorted_indices = np.argsort(probs)[::-1]
        sorted_probs = probs[sorted_indices]
        cumsum = np.cumsum(sorted_probs)
        # 找到累积概率超过 p 的位置
        cutoff = np.searchsorted(cumsum, p) + 1
        mask = np.zeros_like(probs)
        mask[sorted_indices[:cutoff]] = 1.0
        probs = probs * mask
        return probs / probs.sum()

    def greedy_decode(self, logits: np.ndarray) -> int:
        """贪心解码:选概率最高的"""
        return int(np.argmax(logits))

    def sample_decode(self, logits: np.ndarray, temperature: float = 1.0,
                      top_k: int = 0, top_p: float = 1.0) -> int:
        """采样解码"""
        logits = self._apply_temperature(logits, temperature)
        probs = self._softmax(logits)

        if top_k > 0:
            probs = self._apply_top_k(probs, top_k)
        if top_p < 1.0:
            probs = self._apply_top_p(probs, top_p)

        return int(np.random.choice(len(probs), p=probs))

    def generate(self, get_logits_fn, max_tokens: int = 20,
                 strategy: str = "greedy", temperature: float = 1.0,
                 top_k: int = 0, top_p: float = 1.0,
                 eos_token_id: int | None = None) -> list[int]:
        """
        生成序列
        Args:
            get_logits_fn: 接受 token_ids 列表,返回 (vocab_size,) 的 logits
        """
        token_ids = []
        for _ in range(max_tokens):
            logits = get_logits_fn(token_ids)
            if strategy == "greedy":
                next_id = self.greedy_decode(logits)
            else:
                next_id = self.sample_decode(logits, temperature, top_k, top_p)
            token_ids.append(next_id)
            if eos_token_id is not None and next_id == eos_token_id:
                break
        return token_ids


# ---- 测试 ----
if __name__ == "__main__":
    vocab = ["<pad>", "<eos>", "我", "喜欢", "学习", "机器", "学习", "深度", "自然", "语言", "处理", "模型", "AI", "很好", "今天", "天气", "不错"]
    decoder = TextDecoder(vocab)

    # 模拟 logits 生成函数(带简单的上下文依赖)
    np.random.seed(42)
    def mock_get_logits(token_ids: list[int]) -> np.ndarray:
        logits = np.random.randn(len(vocab))
        # 让某些 token 更可能在特定位置出现
        if len(token_ids) == 0:
            logits[2] = 5.0   # "我"
        elif len(token_ids) == 1:
            logits[3] = 4.0   # "喜欢"
        elif len(token_ids) == 2:
            logits[5] = 4.5   # "机器"
            logits[7] = 4.0   # "深度"
        return logits

    # 贪心解码
    print("=== 贪心解码 ===")
    ids = decoder.generate(mock_get_logits, max_tokens=5, strategy="greedy")
    print(f"Token IDs: {ids}")
    print(f"Tokens: {[vocab[i] for i in ids]}")

    # 采样解码(不同温度)
    for temp in [0.1, 0.7, 1.0, 1.5]:
        np.random.seed(42)
        ids = decoder.generate(mock_get_logits, max_tokens=5, strategy="sample", temperature=temp)
        print(f"温度 {temp}: {[vocab[i] for i in ids]}")

    # Top-k + Top-p
    print("\n=== Top-k + Top-p 采样 ===")
    np.random.seed(42)
    ids = decoder.generate(mock_get_logits, max_tokens=5, strategy="sample",
                           temperature=0.8, top_k=3, top_p=0.9)
    print(f"top-k=3, top-p=0.9: {[vocab[i] for i in ids]}")

复杂度分析:

维度复杂度
贪心解码O(V),V=词表大小
采样解码O(V log V)(排序用于 top-k/p)
生成 N tokensO(N × V) 或 O(N × V log V)

追问:

  1. Beam Search 与贪心解码的区别是什么?Beam Search 的 beam_size 对生成质量有什么影响?
  2. Speculative Decoding(投机解码)的原理是什么?如何保证输出分布与原始模型完全一致?

Q13: 实现简单的 Token 计数器

难度:⭐

题目描述:

实现一个 Token 计数器,支持:按规则估算中英文 token 数、按模型区分计算方式、计算 API 调用费用。

解题思路:

  1. 不同模型的 token 化方式不同(如 GPT 系列约 1 token ≈ 4 英文字符或 1-2 个中文字)
  2. 提供基于规则的快速估算和基于实际 tokenizer 的精确计算
  3. 根据模型的定价计算费用

完整代码:

python
from dataclasses import dataclass


@dataclass
class ModelPricing:
    name: str
    input_price_per_1k: float   # 每 1000 tokens 的输入价格(美元)
    output_price_per_1k: float  # 每 1000 tokens 的输出价格
    chars_per_token_en: float   # 英文每 token 约多少字符
    chars_per_token_zh: float   # 中文每 token 约多少字符


# 常见模型定价(2024 年参考价)
MODEL_PRICING = {
    "gpt-4o": ModelPricing("gpt-4o", 0.0025, 0.01, 4.0, 1.5),
    "gpt-4o-mini": ModelPricing("gpt-4o-mini", 0.00015, 0.0006, 4.0, 1.5),
    "gpt-3.5-turbo": ModelPricing("gpt-3.5-turbo", 0.0005, 0.0015, 4.0, 1.5),
    "claude-3-opus": ModelPricing("claude-3-opus", 0.015, 0.075, 3.5, 1.3),
    "claude-3-sonnet": ModelPricing("claude-3-sonnet", 0.003, 0.015, 3.5, 1.3),
}


class TokenCounter:
    """Token 计数器"""

    def __init__(self, model: str = "gpt-4o"):
        self.model = model
        self.pricing = MODEL_PRICING.get(model)
        if not self.pricing:
            raise ValueError(f"未知模型: {model}")

    def _count_chars(self, text: str) -> tuple[int, int]:
        """统计中英文字符数"""
        en_chars = 0
        zh_chars = 0
        for ch in text:
            if '\u4e00' <= ch <= '\u9fff' or '\u3000' <= ch <= '\u303f':
                zh_chars += 1
            else:
                en_chars += 1
        return en_chars, zh_chars

    def estimate_tokens(self, text: str) -> int:
        """估算 token 数"""
        en_chars, zh_chars = self._count_chars(text)
        en_tokens = en_chars / self.pricing.chars_per_token_en
        zh_tokens = zh_chars / self.pricing.chars_per_token_zh
        return int(en_tokens + zh_tokens) + 3  # +3 for special tokens

    def count_messages(self, messages: list[dict]) -> int:
        """计算消息列表的 token 数"""
        total = 0
        for msg in messages:
            total += 4  # 消息格式开销
            total += self.estimate_tokens(msg.get("role", ""))
            total += self.estimate_tokens(msg.get("content", ""))
        total += 2  # reply priming
        return total

    def estimate_cost(self, input_tokens: int, output_tokens: int) -> dict:
        """估算 API 调用费用"""
        input_cost = (input_tokens / 1000) * self.pricing.input_price_per_1k
        output_cost = (output_tokens / 1000) * self.pricing.output_price_per_1k
        return {
            "model": self.model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "input_cost_usd": round(input_cost, 6),
            "output_cost_usd": round(output_cost, 6),
            "total_cost_usd": round(input_cost + output_cost, 6),
        }

    def analyze(self, messages: list[dict], estimated_output_tokens: int = 500) -> dict:
        """完整分析"""
        input_tokens = self.count_messages(messages)
        cost = self.estimate_cost(input_tokens, estimated_output_tokens)
        return {
            "input_tokens": input_tokens,
            "estimated_output_tokens": estimated_output_tokens,
            "cost": cost,
            "model_pricing": {
                "input_per_1k": self.pricing.input_price_per_1k,
                "output_per_1k": self.pricing.output_price_per_1k,
            }
        }


# ---- 测试 ----
if __name__ == "__main__":
    # 测试不同文本
    texts = [
        "Hello, how are you today?",
        "你好,今天天气怎么样?",
        "这是一段混合 text,包含 English 和中文 characters。",
        "请帮我分析一下这段代码的功能:def hello(): print('world')",
    ]

    for model in ["gpt-4o", "gpt-4o-mini", "claude-3-sonnet"]:
        counter = TokenCounter(model=model)
        print(f"\n=== {model} ===")
        for text in texts:
            tokens = counter.estimate_tokens(text)
            print(f"  [{tokens} tokens] {text[:40]}...")

    # 消息分析
    print("\n=== API 调用费用分析 ===")
    messages = [
        {"role": "system", "content": "你是一个专业的AI助手,擅长编程和算法。"},
        {"role": "user", "content": "请解释一下 Transformer 的注意力机制是如何工作的?"},
        {"role": "assistant", "content": "Transformer 的注意力机制是通过计算 Query、Key、Value 三个矩阵来实现的..."},
        {"role": "user", "content": "那多头注意力和单头注意力有什么区别呢?"},
    ]

    for model in ["gpt-4o", "gpt-4o-mini", "claude-3-sonnet"]:
        counter = TokenCounter(model=model)
        result = counter.analyze(messages, estimated_output_tokens=800)
        cost = result["cost"]
        print(f"  {model}: 输入 {cost['input_tokens']} tokens, "
              f"输出 {cost['estimated_output_tokens']} tokens, "
              f"费用 ${cost['total_cost_usd']:.4f}")

复杂度分析:

维度复杂度
token 估算O(N),N=文本字符数
消息统计O(N × M),M=消息数
空间O(1)

追问:

  1. 如何精确计算 token 数?tiktoken 和 HuggingFace Tokenizers 库的使用方法是什么?
  2. 在 Agent 系统中,如何设计 token 预算管理?当上下文即将超出限制时,有哪些截断策略?

系统设计相关


Q14: 实现一个限流器(Token Bucket)

难度:⭐⭐

题目描述:

实现 Token Bucket(令牌桶)限流器,支持配置桶容量和令牌添加速率,用于保护 LLM API 调用。

解题思路:

  1. 桶有固定容量,以固定速率添加令牌
  2. 每次请求消耗一个令牌,无令牌时拒绝
  3. 支持多维限流(如每分钟请求数 + 每分钟 token 数)

完整代码:

python
import time
import threading
from dataclasses import dataclass


class TokenBucket:
    """令牌桶限流器"""

    def __init__(self, capacity: float, refill_rate: float):
        """
        Args:
            capacity: 桶容量(最大令牌数)
            refill_rate: 每秒补充的令牌数
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = threading.Lock()

    def _refill(self):
        """补充令牌"""
        now = time.monotonic()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

    def acquire(self, tokens: int = 1) -> bool:
        """尝试获取令牌,成功返回 True"""
        with self._lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def wait_and_acquire(self, tokens: int = 1, timeout: float = 10.0) -> bool:
        """等待直到获取到令牌或超时"""
        deadline = time.monotonic() + timeout
        while time.monotonic() < deadline:
            if self.acquire(tokens):
                return True
            # 计算需要等待的时间
            with self._lock:
                self._refill()
                if self.tokens >= tokens:
                    return True
                wait_time = (tokens - self.tokens) / self.refill_rate
            time.sleep(min(wait_time, 0.1))
        return False

    @property
    def available_tokens(self) -> float:
        with self._lock:
            self._refill()
            return self.tokens


class MultiDimensionRateLimiter:
    """多维度限流器(如 RPM + TPM)"""

    def __init__(self):
        self.limiters: dict[str, TokenBucket] = {}

    def add_limiter(self, name: str, capacity: float, refill_rate: float):
        self.limiters[name] = TokenBucket(capacity, refill_rate)

    def acquire(self, **kwargs) -> tuple[bool, str]:
        """
        尝试获取所有维度的令牌
        kwargs: limiter_name=tokens_needed
        """
        # 先检查所有
        for name, tokens_needed in kwargs.items():
            if name not in self.limiters:
                return False, f"未知限流器: {name}"
            if self.limiters[name].tokens < tokens_needed:
                return False, f"限流: {name} 不足"

        # 全部满足才扣减
        for name, tokens_needed in kwargs.items():
            self.limiters[name].acquire(tokens_needed)
        return True, "OK"

    def status(self) -> dict:
        return {
            name: round(lb.available_tokens, 2)
            for name, lb in self.limiters.items()
        }


# ---- 测试 ----
if __name__ == "__main__":
    # 单维限流
    print("=== 单维令牌桶限流 ===")
    bucket = TokenBucket(capacity=5, refill_rate=2)  # 容量5,每秒补充2个
    for i in range(8):
        ok = bucket.acquire()
        print(f"  请求 {i+1}: {'✅ 通过' if ok else '❌ 拒绝'} "
              f"(剩余: {bucket.available_tokens:.1f})")

    # 等待恢复
    print("\n等待 1 秒...")
    time.sleep(1)
    print(f"补充后剩余: {bucket.available_tokens:.1f}")

    # 多维限流(模拟 API 限流)
    print("\n=== 多维限流(RPM + TPM)===")
    limiter = MultiDimensionRateLimiter()
    limiter.add_limiter("rpm", capacity=10, refill_rate=10/60)   # 10 次/分钟
    limiter.add_limiter("tpm", capacity=1000, refill_rate=1000/60)  # 1000 tokens/分钟

    requests = [
        {"rpm": 1, "tpm": 150},
        {"rpm": 1, "tpm": 300},
        {"rpm": 1, "tpm": 200},
        {"rpm": 1, "tpm": 400},
    ]

    for i, req in enumerate(requests):
        ok, msg = limiter.acquire(**req)
        status = limiter.status()
        print(f"  请求 {i+1} {req}: {'✅' if ok else '❌'} {msg} | 余量: {status}")

复杂度分析:

维度复杂度
acquireO(1)(单维度)
多维 acquireO(D),D=维度数
空间O(D)

追问:

  1. 令牌桶(Token Bucket)和漏桶(Leaky Bucket)、滑动窗口(Sliding Window)的区别是什么?各自适合什么场景?
  2. 在分布式系统中,如何实现跨节点的限流?Redis + Lua 脚本的方案如何工作?

Q15: 实现一个简单的 Prompt 模板引擎

难度:⭐

题目描述:

实现一个 Prompt 模板引擎,支持变量替换、条件渲染、循环渲染、模板嵌套等常见功能。

解题思路:

  1. &#123;&#123;variable&#125;&#125; 语法做变量替换
  2. {% if condition %}...{% endif %} 做条件渲染
  3. {% for item in items %}...{% endfor %} 做循环
  4. {% include template_name %} 做模板嵌套

完整代码:

python
import re
from typing import Any


class PromptTemplate:
    """Prompt 模板引擎"""

    def __init__(self, template: str, name: str = ""):
        self.template = template
        self.name = name

    def render(self, **kwargs) -> str:
        """渲染模板"""
        result = self.template
        # 处理 for 循环
        result = self._render_for(result, kwargs)
        # 处理 if 条件
        result = self._render_if(result, kwargs)
        # 处理变量替换
        result = self._render_vars(result, kwargs)
        return result.strip()

    def _render_vars(self, template: str, context: dict) -> str:
        """{{variable}} 变量替换,支持 {{var|default_value}}"""
        def replace_var(match):
            expr = match.group(1).strip()
            if "|" in expr:
                var_name, default = expr.split("|", 1)
                var_name = var_name.strip()
                default = default.strip()
            else:
                var_name = expr
                default = match.group(0)  # 未找到则保留原样

            # 支持嵌套属性 a.b
            value = context
            for key in var_name.split("."):
                if isinstance(value, dict):
                    value = value.get(key, None)
                else:
                    value = getattr(value, key, None)
                if value is None:
                    return default
            return str(value)

        return re.sub(r'\{\{(.+?)\}\}', replace_var, template)

    def _render_for(self, template: str, context: dict) -> str:
        """{% for item in items %}...{% endfor %} 循环渲染"""
        pattern = r'\{%\s*for\s+(\w+)\s+in\s+(\w+)\s*%\}(.*?)\{%\s*endfor\s*%\}'
        def replace_for(match):
            item_var = match.group(1)
            list_var = match.group(2)
            body = match.group(3)
            items = context.get(list_var, [])
            parts = []
            for i, item in enumerate(items):
                local_ctx = {**context, item_var: item, "loop_index": i}
                rendered = self._render_vars(body, local_ctx)
                parts.append(rendered)
            return "".join(parts)
        return re.sub(pattern, replace_for, template, flags=re.DOTALL)

    def _render_if(self, template: str, context: dict) -> str:
        """{% if condition %}...{% else %}...{% endif %} 条件渲染"""
        # 处理 if-else
        pattern = r'\{%\s*if\s+(\w+)\s*%\}(.*?)\{%\s*else\s*%\}(.*?)\{%\s*endif\s*%\}'
        def replace_if_else(match):
            cond_var = match.group(1)
            true_body = match.group(2)
            false_body = match.group(3)
            value = context.get(cond_var)
            return true_body if value else false_body
        result = re.sub(pattern, replace_if_else, template, flags=re.DOTALL)

        # 处理只有 if
        pattern2 = r'\{%\s*if\s+(\w+)\s*%\}(.*?)\{%\s*endif\s*%\}'
        def replace_if(match):
            cond_var = match.group(1)
            body = match.group(2)
            value = context.get(cond_var)
            return body if value else ""
        return re.sub(pattern2, replace_if, result, flags=re.DOTALL)


class PromptManager:
    """Prompt 管理器:管理多个模板"""

    def __init__(self):
        self.templates: dict[str, PromptTemplate] = {}

    def register(self, name: str, template: str):
        self.templates[name] = PromptTemplate(template, name=name)

    def render(self, name: str, **kwargs) -> str:
        if name not in self.templates:
            raise ValueError(f"模板不存在: {name}")
        return self.templates[name].render(**kwargs)


# ---- 测试 ----
if __name__ == "__main__":
    manager = PromptManager()

    # 1. 基础变量替换
    manager.register("basic", "你好,{{name}}!你来自 {{city|未知城市}}。")
    print("=== 基础替换 ===")
    print(manager.render("basic", name="张明"))
    print(manager.render("basic", name="李华", city="上海"))

    # 2. 条件渲染
    manager.register("cond", """你是一个{{role|助手}}
{% if has_tools %}
你可以使用以下工具:{{tools}}
{% endif %}
{% if verbose %}
请详细回答用户的问题。
{% else %}
请简洁回答。
{% endif %}""")
    print("\n=== 条件渲染 ===")
    print(manager.render("cond", role="编程助手", has_tools=True,
                         tools="search, calculator", verbose=False))

    # 3. 循环渲染
    manager.register("rag", """请根据以下参考资料回答问题。

{% for doc in documents %}
[文档{{loop_index}}] {{doc}}
{% endfor %}

问题:{{question}}""")
    print("\n=== RAG 模板 ===")
    docs = ["RAG 是检索增强生成技术", "向量数据库是 RAG 的核心组件", "Embedding 模型将文本映射到向量空间"]
    print(manager.render("rag", documents=docs, question="什么是 RAG?"))

    # 4. 嵌套对象
    manager.register("agent", """Agent 配置:
名称: {{agent.name}}
模型: {{agent.model}}
温度: {{agent.temperature}}

用户输入: {{input}}""")
    print("\n=== 嵌套对象 ===")
    agent_config = {"name": "MyAgent", "model": "gpt-4o", "temperature": 0.7}
    print(manager.render("agent", agent=agent_config, input="帮我查天气"))

复杂度分析:

维度复杂度
变量替换O(N × V),N=模板长度,V=变量数
循环渲染O(N × I),I=迭代次数
空间O(渲染结果大小)

追问:

  1. Jinja2 模板引擎相比自实现的模板引擎有哪些高级功能?在 Prompt 工程中哪些特性最实用?
  2. 如何实现 Prompt 版本管理?如何做 A/B 测试来比较不同 Prompt 的效果?

Transformer 与模型推理


Q12: 实现一个简单的 Transformer Attention 层(Self-Attention + Multi-Head)

难度:⭐⭐

题目描述:

实现一个简化版的 Transformer Self-Attention 层,支持:

  1. 单头 Self-Attention(Scaled Dot-Product Attention)
  2. Multi-Head Attention

解题思路:

  1. Scaled Dot-Product AttentionAttention(Q, K, V) = softmax(QK^T / sqrt(d_k)) * V
  2. Multi-Head:将 Q/K/V 线性投影到多个子空间,分别计算 Attention 后拼接
  3. 使用矩阵乘法高效计算

完整代码:

python
import numpy as np


def softmax(x: np.ndarray, axis: int = -1) -> np.ndarray:
    """数值稳定的 softmax"""
    e_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
    return e_x / np.sum(e_x, axis=axis, keepdims=True)


class ScaledDotProductAttention:
    """单头 Scaled Dot-Product Attention"""

    def __init__(self, d_k: int):
        self.d_k = d_k
        self.scale = np.sqrt(d_k)

    def forward(self, Q: np.ndarray, K: np.ndarray, V: np.ndarray,
                mask: np.ndarray | None = None) -> np.ndarray:
        """
        Args:
            Q: (batch, seq_q, d_k)
            K: (batch, seq_k, d_k)
            V: (batch, seq_k, d_v)
            mask: (batch, seq_q, seq_k), 0 表示需要屏蔽的位置
        Returns:
            output: (batch, seq_q, d_v)
            attention_weights: (batch, seq_q, seq_k)
        """
        # QK^T / sqrt(d_k)
        scores = np.matmul(Q, K.transpose(0, 2, 1)) / self.scale  # (batch, seq_q, seq_k)

        # 应用 mask
        if mask is not None:
            scores = np.where(mask == 0, -1e9, scores)

        # softmax
        attn_weights = softmax(scores, axis=-1)  # (batch, seq_q, seq_k)

        # 加权求和
        output = np.matmul(attn_weights, V)  # (batch, seq_q, d_v)
        return output, attn_weights


class MultiHeadAttention:
    """Multi-Head Attention"""

    def __init__(self, d_model: int, num_heads: int):
        assert d_model % num_heads == 0, "d_model 必须能被 num_heads 整除"
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # 随机初始化权重矩阵
        scale = np.sqrt(2.0 / d_model)
        self.W_Q = np.random.randn(d_model, d_model) * scale
        self.W_K = np.random.randn(d_model, d_model) * scale
        self.W_V = np.random.randn(d_model, d_model) * scale
        self.W_O = np.random.randn(d_model, d_model) * scale

        self.attention = ScaledDotProductAttention(self.d_k)

    def _split_heads(self, x: np.ndarray) -> np.ndarray:
        """将最后一维拆分为多个头: (batch, seq, d_model) -> (batch, num_heads, seq, d_k)"""
        batch, seq, _ = x.shape
        x = x.reshape(batch, seq, self.num_heads, self.d_k)
        return x.transpose(0, 2, 1, 3)

    def _merge_heads(self, x: np.ndarray) -> np.ndarray:
        """合并多头: (batch, num_heads, seq, d_k) -> (batch, seq, d_model)"""
        batch, _, seq, _ = x.shape
        x = x.transpose(0, 2, 1, 3)
        return x.reshape(batch, seq, self.d_model)

    def forward(self, Q: np.ndarray, K: np.ndarray, V: np.ndarray,
                mask: np.ndarray | None = None) -> np.ndarray:
        """
        Args:
            Q, K, V: (batch, seq, d_model)
            mask: (batch, seq_q, seq_k)
        Returns:
            output: (batch, seq, d_model)
        """
        # 线性投影
        Q_proj = np.matmul(Q, self.W_Q)  # (batch, seq, d_model)
        K_proj = np.matmul(K, self.W_K)
        V_proj = np.matmul(V, self.W_V)

        # 拆分多头
        Q_heads = self._split_heads(Q_proj)  # (batch, num_heads, seq, d_k)
        K_heads = self._split_heads(K_proj)
        V_heads = self._split_heads(V_proj)

        # 每个头独立计算 attention
        batch = Q.shape[0]
        outputs = []
        for h in range(self.num_heads):
            out, _ = self.attention.forward(
                Q_heads[:, h], K_heads[:, h], V_heads[:, h], mask
            )
            outputs.append(out)

        # 拼接所有头
        concat = np.stack(outputs, axis=1)  # (batch, num_heads, seq, d_k)
        merged = self._merge_heads(concat)   # (batch, seq, d_model)

        # 输出投影
        output = np.matmul(merged, self.W_O)
        return output


# ---- 测试 ----
if __name__ == "__main__":
    np.random.seed(42)
    batch_size, seq_len, d_model, num_heads = 2, 10, 64, 8

    mha = MultiHeadAttention(d_model=d_model, num_heads=num_heads)

    # 随机输入
    x = np.random.randn(batch_size, seq_len, d_model)
    output = mha.forward(x, x, x)  # Self-Attention: Q=K=V=x

    print(f"输入形状: {x.shape}")
    print(f"输出形状: {output.shape}")
    assert output.shape == x.shape, "输出形状应与输入一致"
    print("Multi-Head Attention 测试通过 ✓")

    # 带 mask 的 Attention
    mask = np.ones((batch_size, seq_len, seq_len))
    mask[:, :, 5:] = 0  # 屏蔽后 5 个位置(模拟 causal mask)
    output_masked = mha.forward(x, x, x, mask=mask)
    print(f"带 mask 的输出形状: {output_masked.shape}")
    print("带 mask 的 Multi-Head Attention 测试通过 ✓")

复杂度分析:

维度复杂度
时间O(seq² × d_model + seq × d_model²),seq² 来自 QK^T
空间O(seq² × num_heads) 存储注意力权重
参数量4 × d_model² (W_Q, W_K, W_V, W_O)

追问:

  1. 为什么需要除以 sqrt(d_k)?不缩放会导致什么问题?
  2. FlashAttention 是如何优化注意力计算的显存和速度的?
  3. 在实际 Transformer 中,Q、K、V 的投影是独立的还是共享的?为什么?

Q13: 实现一个简化版的 PagedAttention ⭐⭐⭐

题目描述:

PagedAttention 是 vLLM 的核心技术,通过分页管理 KV Cache 提升 GPU 显存利用率。实现一个 CPU 版的简化 PagedAttention:

  1. 将 KV Cache 划分为固定大小的 Block
  2. 通过 Block Table 管理逻辑块到物理块的映射
  3. 支持动态分配和释放 Block

解题思路:

  1. 将 KV Cache 划分为固定大小的 Block(如每块存 16 个 token 的 K/V)
  2. 用 Block Table 记录每个请求的逻辑块 → 物理块映射
  3. 分配时从空闲池取块,释放时归还

完整代码:

python
import numpy as np
from dataclasses import dataclass, field


@dataclass
class KVBlock:
    """一个 KV Cache Block"""
    block_id: int
    block_size: int        # 每块存储的 token 数
    num_heads: int
    head_dim: int
    # K: (num_heads, block_size, head_dim), V 同理
    k_cache: np.ndarray = field(default=None)
    v_cache: np.ndarray = field(default=None)
    num_filled: int = 0    # 当前已填充的 token 数

    def __post_init__(self):
        if self.k_cache is None:
            self.k_cache = np.zeros(
                (self.num_heads, self.block_size, self.head_dim), dtype=np.float32
            )
        if self.v_cache is None:
            self.v_cache = np.zeros(
                (self.num_heads, self.block_size, self.head_dim), dtype=np.float32
            )

    def is_full(self) -> bool:
        return self.num_filled >= self.block_size

    def available_slots(self) -> int:
        return self.block_size - self.num_filled

    def write(self, k: np.ndarray, v: np.ndarray):
        """
        写入 KV
        k, v: (num_heads, seq_len, head_dim)
        """
        n = k.shape[1]
        end = min(self.num_filled + n, self.block_size)
        actual = end - self.num_filled
        self.k_cache[:, self.num_filled:end, :] = k[:, :actual, :]
        self.v_cache[:, self.num_filled:end, :] = v[:, :actual, :]
        self.num_filled = end

    def read(self) -> tuple[np.ndarray, np.ndarray]:
        """读取已填充部分的 KV"""
        return (self.k_cache[:, :self.num_filled, :],
                self.v_cache[:, :self.num_filled, :])


class PagedKVCache:
    """简化版 PagedAttention KV Cache 管理器"""

    def __init__(self, num_blocks: int, block_size: int,
                 num_heads: int, head_dim: int):
        self.block_size = block_size
        self.num_heads = num_heads
        self.head_dim = head_dim

        # 物理块池
        self.blocks: dict[int, KVBlock] = {}
        for i in range(num_blocks):
            self.blocks[i] = KVBlock(
                block_id=i, block_size=block_size,
                num_heads=num_heads, head_dim=head_dim
            )

        self.free_blocks: list[int] = list(range(num_blocks))

        # 每个请求的 Block Table: request_id -> [物理块 ID 列表]
        self.block_tables: dict[str, list[int]] = {}

    def allocate(self, request_id: str, num_tokens: int) -> list[int]:
        """为请求分配所需数量的块"""
        num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size
        if len(self.free_blocks) < num_blocks_needed:
            raise MemoryError(f"显存不足:需要 {num_blocks_needed} 块,"
                              f"剩余 {len(self.free_blocks)} 块")

        allocated = []
        for _ in range(num_blocks_needed):
            block_id = self.free_blocks.pop(0)
            allocated.append(block_id)

        self.block_tables[request_id] = allocated
        return allocated

    def free(self, request_id: str):
        """释放请求的所有块"""
        if request_id in self.block_tables:
            for block_id in self.block_tables[request_id]:
                self.blocks[block_id].num_filled = 0
                self.blocks[block_id].k_cache[:] = 0
                self.blocks[block_id].v_cache[:] = 0
                self.free_blocks.append(block_id)
            del self.block_tables[request_id]

    def write_kv(self, request_id: str, k: np.ndarray, v: np.ndarray):
        """
        写入 KV Cache
        k, v: (num_heads, seq_len, head_dim)
        """
        block_ids = self.block_tables.get(request_id, [])
        offset = 0
        remaining = k.shape[1]

        for block_id in block_ids:
            block = self.blocks[block_id]
            if block.is_full():
                continue
            available = block.available_slots()
            write_len = min(remaining, available)
            block.write(
                k[:, offset:offset + write_len, :],
                v[:, offset:offset + write_len, :]
            )
            offset += write_len
            remaining -= write_len
            if remaining <= 0:
                break

    def read_kv(self, request_id: str) -> tuple[np.ndarray, np.ndarray]:
        """读取请求的所有 KV Cache"""
        block_ids = self.block_tables.get(request_id, [])
        k_parts, v_parts = [], []
        for block_id in block_ids:
            k, v = self.blocks[block_id].read()
            k_parts.append(k)
            v_parts.append(v)
        return np.concatenate(k_parts, axis=1), np.concatenate(v_parts, axis=1)

    def get_block_table(self, request_id: str) -> list[int]:
        return self.block_tables.get(request_id, [])

    @property
    def free_block_count(self) -> int:
        return len(self.free_blocks)


class PagedAttention:
    """使用 PagedKVCache 的 Attention 计算"""

    def __init__(self, kv_cache: PagedKVCache):
        self.kv_cache = kv_cache

    def forward(self, request_id: str, query: np.ndarray) -> np.ndarray:
        """
        query: (num_heads, 1, head_dim) - 单个 token 的查询
        返回: (num_heads, 1, head_dim) - attention 输出
        """
        # 从分页 KV Cache 中读取所有 K/V
        K, V = self.kv_cache.read_kv(request_id)

        # 标准 Scaled Dot-Product Attention
        d_k = query.shape[-1]
        scores = np.matmul(query, K.transpose(0, 2, 1)) / np.sqrt(d_k)
        weights = self._softmax(scores, axis=-1)
        output = np.matmul(weights, V)
        return output

    @staticmethod
    def _softmax(x: np.ndarray, axis: int = -1) -> np.ndarray:
        e_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
        return e_x / np.sum(e_x, axis=axis, keepdims=True)


# ---- 测试 ----
if __name__ == "__main__":
    np.random.seed(42)

    num_blocks = 20
    block_size = 16
    num_heads = 4
    head_dim = 32

    # 初始化
    kv_cache = PagedKVCache(num_blocks, block_size, num_heads, head_dim)
    paged_attn = PagedAttention(kv_cache)

    # 模拟 3 个请求
    req1_len = 35  # 需要 3 个块
    req2_len = 10  # 需要 1 个块
    req3_len = 50  # 需要 4 个块

    # 分配
    blocks1 = kv_cache.allocate("req1", req1_len)
    blocks2 = kv_cache.allocate("req2", req2_len)
    blocks3 = kv_cache.allocate("req3", req3_len)
    print(f"req1 分配 {len(blocks1)} 块: {blocks1}")
    print(f"req2 分配 {len(blocks2)} 块: {blocks2}")
    print(f"req3 分配 {len(blocks3)} 块: {blocks3}")
    print(f"剩余空闲块: {kv_cache.free_block_count}")

    # 写入 KV
    k1 = np.random.randn(num_heads, req1_len, head_dim)
    v1 = np.random.randn(num_heads, req1_len, head_dim)
    kv_cache.write_kv("req1", k1, v1)

    k2 = np.random.randn(num_heads, req2_len, head_dim)
    v2 = np.random.randn(num_heads, req2_len, head_dim)
    kv_cache.write_kv("req2", k2, v2)

    # 模拟生成阶段:逐 token 计算
    for step in range(5):
        q = np.random.randn(num_heads, 1, head_dim)
        output = paged_attn.forward("req1", q)
        # 新 token 的 KV
        new_k = np.random.randn(num_heads, 1, head_dim)
        new_v = np.random.randn(num_heads, 1, head_dim)
        # 需要新块
        if all(kv_cache.blocks[bid].is_full() for bid in kv_cache.get_block_table("req1")):
            kv_cache.allocate("req1", 1)  # 按需分配新块
        kv_cache.write_kv("req1", new_k, new_v)

    print(f"\n生成后 req1 块表: {kv_cache.get_block_table('req1')}")
    k_read, v_read = kv_cache.read_kv("req1")
    print(f"req1 实际存储 token 数: {k_read.shape[1]}")

    # 释放
    kv_cache.free("req2")
    print(f"\n释放 req2 后,剩余空闲块: {kv_cache.free_block_count}")

    kv_cache.free("req1")
    kv_cache.free("req3")
    print(f"全部释放后,剩余空闲块: {kv_cache.free_block_count}")

复杂度分析:

维度复杂度
分配O(num_blocks_needed),常数时间
写入O(num_tokens × num_heads × head_dim)
读取O(total_tokens × num_heads × head_dim)
空间O(num_blocks × block_size × num_heads × head_dim),无碎片浪费

追问:

  1. 真实 vLLM 中如何处理不同请求共享相同前缀(如 System Prompt)的 KV Cache?(Prefix Caching)
  2. PagedAttention 相比预分配连续内存的方式,显存利用率能提升多少?为什么?
  3. Block Size 如何选择?过大或过小各有什么问题?

上下文管理与评估


Q14: 实现一个对话上下文压缩器(滑动窗口 + 摘要压缩) ⭐⭐

题目描述:

实现一个对话上下文压缩器,支持两种策略:

  1. 滑动窗口:保留最近 N 轮对话
  2. 摘要压缩:对旧对话生成摘要,保留摘要 + 最近 N 轮

解题思路:

  1. 滑动窗口:直接截取最近 N 轮
  2. 摘要压缩:用 LLM 将旧对话压缩为摘要,拼接摘要 + 最近对话

完整代码:

python
from dataclasses import dataclass, field


@dataclass
class Message:
    role: str       # "user" / "assistant" / "system"
    content: str


class SlidingWindowCompressor:
    """滑动窗口压缩器"""

    def __init__(self, max_turns: int = 10):
        self.max_turns = max_turns

    def compress(self, messages: list[Message]) -> list[Message]:
        """保留 system prompt + 最近 max_turns 轮对话"""
        system_msgs = [m for m in messages if m.role == "system"]
        chat_msgs = [m for m in messages if m.role != "system"]

        # 按轮次(user+assistant)计算
        max_messages = self.max_turns * 2
        recent = chat_msgs[-max_messages:] if len(chat_msgs) > max_messages else chat_msgs

        return system_msgs + recent


class SummaryCompressor:
    """摘要压缩器"""

    def __init__(self, llm, max_recent_turns: int = 5):
        self.llm = llm
        self.max_recent_turns = max_recent_turns

    async def compress(self, messages: list[Message],
                       current_summary: str = "") -> tuple[list[Message], str]:
        """
        压缩对话:
        1. 将旧对话与已有摘要合并生成新摘要
        2. 返回 [system_prompt + 摘要消息 + 最近对话]

        Returns:
            (压缩后的消息列表, 新的摘要文本)
        """
        system_msgs = [m for m in messages if m.role == "system"]
        chat_msgs = [m for m in messages if m.role != "system"]

        max_recent = self.max_recent_turns * 2
        if len(chat_msgs) <= max_recent:
            return messages, current_summary

        old_msgs = chat_msgs[:-max_recent]
        recent_msgs = chat_msgs[-max_recent:]

        # 生成摘要
        old_text = "\n".join(f"[{m.role}] {m.content}" for m in old_msgs)
        prompt = f"""请将以下对话压缩为简洁摘要,保留关键信息(用户需求、重要结论、待办事项)。

已有摘要:{current_summary or '(无)'}

旧对话:
{old_text}

请用 3-5 句话输出摘要:"""

        new_summary = await self.llm.generate(prompt)

        # 构建压缩后的消息列表
        result = system_msgs + [
            Message(role="system", content=f"[对话摘要] {new_summary}")
        ] + recent_msgs

        return result, new_summary


class HybridCompressor:
    """混合压缩器:先滑动窗口,再按需摘要"""

    def __init__(self, llm, window_size: int = 10, summary_threshold: int = 20):
        self.window = SlidingWindowCompressor(max_turns=window_size)
        self.summary = SummaryCompressor(llm, max_recent_turns=window_size)
        self.summary_threshold = summary_threshold
        self._summaries: dict[str, str] = {}  # session_id -> summary

    async def compress(self, session_id: str,
                       messages: list[Message]) -> list[Message]:
        """智能压缩:短对话用窗口,长对话用摘要"""
        chat_count = sum(1 for m in messages if m.role != "system")

        if chat_count <= self.summary_threshold:
            # 短对话:滑动窗口
            return self.window.compress(messages)
        else:
            # 长对话:摘要压缩
            current_summary = self._summaries.get(session_id, "")
            compressed, new_summary = await self.summary.compress(
                messages, current_summary
            )
            self._summaries[session_id] = new_summary
            return compressed


# ---- 测试 ----
if __name__ == "__main__":
    import asyncio

    class MockLLM:
        async def generate(self, prompt: str) -> str:
            return "[摘要] 用户讨论了天气查询、机票预订和旅行计划。已确认北京出发。"

    async def test():
        # 构造长对话
        messages = [
            Message("system", "你是一个旅行助手"),
            Message("user", "你好,我想查一下北京的天气"),
            Message("assistant", "北京今天晴,气温15-25°C"),
            Message("user", "好的,帮我查一下去上海的机票"),
            Message("assistant", "找到3个航班:东航MU5101、国航CA1501..."),
            Message("user", "选东航的吧"),
            Message("assistant", "好的,已为您选择东航MU5101"),
            Message("user", "那帮我订个酒店"),
            Message("assistant", "上海浦东有以下酒店推荐..."),
            Message("user", "就选第一个吧"),
            Message("assistant", "好的,已为您预订"),
            Message("user", "谢谢"),
            Message("assistant", "不客气,祝您旅途愉快!"),
        ]

        # 测试滑动窗口
        window = SlidingWindowCompressor(max_turns=3)
        windowed = window.compress(messages)
        print(f"=== 滑动窗口 (保留最近3轮) ===")
        print(f"原始消息数: {len(messages)}")
        print(f"压缩后消息数: {len(windowed)}")
        for m in windowed:
            print(f"  [{m.role}] {m.content[:50]}")

        # 测试摘要压缩
        llm = MockLLM()
        summarizer = SummaryCompressor(llm, max_recent_turns=2)
        summarized, summary = await summarizer.compress(messages)
        print(f"\n=== 摘要压缩 (保留最近2轮) ===")
        print(f"压缩后消息数: {len(summarized)}")
        for m in summarized:
            print(f"  [{m.role}] {m.content[:60]}")

    asyncio.run(test())

复杂度分析:

维度复杂度
滑动窗口O(max_turns),截取操作
摘要压缩O(N),N=旧对话长度(主要开销在 LLM 调用)
空间O(max_turns) 或 O(摘要长度 + max_recent_turns)

追问:

  1. 摘要压缩时如何保证关键信息不丢失?有哪些改进策略?
  2. 如何处理对话中的结构化信息(如工具调用结果)?直接摘要可能丢失哪些信息?
  3. 在生产环境中,如何确定最佳的窗口大小和摘要触发阈值?

Q15: 实现一个简单的 RAG 评估器(计算 Faithfulness、Relevancy) ⭐⭐

题目描述:

实现一个 RAG 系统评估器,计算两个核心指标:

  1. Faithfulness(忠实度):回答是否忠实于检索到的上下文
  2. Answer Relevancy(回答相关性):回答是否与问题相关

解题思路:

  1. Faithfulness:将回答拆分为声明(claims),检查每个声明是否能从上下文中推导
  2. Answer Relevancy:从回答中反向生成问题,计算与原始问题的相似度

完整代码:

python
import numpy as np
from dataclasses import dataclass


@dataclass
class RAGEvaluationResult:
    faithfulness: float         # 0-1, 回答是否忠实于上下文
    answer_relevancy: float     # 0-1, 回答是否与问题相关
    faithfulness_details: list  # 每个声明的检查结果
    answer_relevancy_details: list  # 每个生成问题的相似度


class RAGEvaluator:
    """RAG 系统评估器"""

    def __init__(self, llm, embedder):
        self.llm = llm
        self.embedder = embedder

    async def evaluate(self, question: str, context: str,
                       answer: str) -> RAGEvaluationResult:
        """完整的 RAG 评估"""
        faith_score, faith_details = await self._evaluate_faithfulness(context, answer)
        relevancy_score, relevancy_details = await self._evaluate_relevancy(question, answer)

        return RAGEvaluationResult(
            faithfulness=faith_score,
            answer_relevancy=relevancy_score,
            faithfulness_details=faith_details,
            answer_relevancy_details=relevancy_details,
        )

    async def _evaluate_faithfulness(self, context: str,
                                      answer: str) -> tuple[float, list]:
        """
        评估忠实度:
        1. 将回答拆分为独立声明
        2. 检查每个声明是否能从上下文推导
        """
        # Step 1: 拆分声明
        claims = await self._extract_claims(answer)

        # Step 2: 逐个验证
        details = []
        supported_count = 0
        for claim in claims:
            is_supported = await self._check_claim_support(context, claim)
            details.append({
                "claim": claim,
                "supported": is_supported,
            })
            if is_supported:
                supported_count += 1

        score = supported_count / len(claims) if claims else 0.0
        return score, details

    async def _extract_claims(self, answer: str) -> list[str]:
        """将回答拆分为独立的声明/事实"""
        prompt = f"""请将以下回答拆分为独立的事实性声明,每行一个声明。

回答:{answer}

声明列表(每行一个):"""
        result = await self.llm.generate(prompt)
        claims = [line.strip() for line in result.strip().split("\n") if line.strip()]
        return claims

    async def _check_claim_support(self, context: str, claim: str) -> bool:
        """检查声明是否能从上下文推导"""
        prompt = f"""请判断以下声明是否能从给定的上下文中推导出来。

上下文:{context}

声明:{claim}

如果声明能从上下文推导,请回答 "YES";否则回答 "NO"。
只回答 YES 或 NO:"""
        result = await self.llm.generate(prompt)
        return "YES" in result.upper()

    async def _evaluate_relevancy(self, question: str,
                                   answer: str) -> tuple[float, list]:
        """
        评估回答相关性:
        1. 从回答中生成 N 个可能的问题
        2. 计算这些问题与原始问题的平均相似度
        """
        # 从回答反向生成问题
        gen_questions = await self._generate_questions(answer, n=3)

        # 计算相似度
        q_emb = self.embedder.encode(question)
        similarities = []
        details = []
        for gq in gen_questions:
            gq_emb = self.embedder.encode(gq)
            sim = self._cosine_similarity(q_emb, gq_emb)
            similarities.append(sim)
            details.append({"generated_question": gq, "similarity": sim})

        score = np.mean(similarities) if similarities else 0.0
        return float(score), details

    async def _generate_questions(self, answer: str, n: int = 3) -> list[str]:
        """从回答中生成可能的问题"""
        prompt = f"""根据以下回答,生成 {n} 个可能引出该回答的问题,每行一个。

回答:{answer}

可能的问题(每行一个):"""
        result = await self.llm.generate(prompt)
        questions = [line.strip() for line in result.strip().split("\n") if line.strip()]
        return questions[:n]

    @staticmethod
    def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
        dot = np.dot(a, b)
        norm = np.linalg.norm(a) * np.linalg.norm(b)
        return float(dot / norm) if norm > 0 else 0.0


# ---- 测试 ----
if __name__ == "__main__":
    import asyncio

    class MockLLM:
        async def generate(self, prompt: str) -> str:
            if "拆分为独立" in prompt:
                return "RAG 代表检索增强生成\nRAG 可以减少幻觉\nRAG 使用向量数据库检索"
            elif "是否能从" in prompt:
                return "YES" if "检索" in prompt else "NO"
            elif "可能引出" in prompt:
                return "什么是 RAG?\nRAG 有什么优势?\nRAG 如何工作?"
            return ""

    class MockEmbedder:
        def encode(self, text: str) -> np.ndarray:
            np.random.seed(hash(text) % (2**31))
            return np.random.randn(128)

    async def test():
        evaluator = RAGEvaluator(MockLLM(), MockEmbedder())

        question = "什么是 RAG?它有什么优势?"
        context = "RAG(检索增强生成)是一种结合检索和生成的技术,可以减少幻觉。"
        answer = "RAG 代表检索增强生成,它可以减少幻觉,并使用向量数据库进行检索。"

        result = await evaluator.evaluate(question, context, answer)

        print(f"Faithfulness: {result.faithfulness:.2f}")
        print(f"Answer Relevancy: {result.answer_relevancy:.2f}")
        print(f"\nFaithfulness 详情:")
        for d in result.faithfulness_details:
            status = "✓" if d["supported"] else "✗"
            print(f"  {status} {d['claim']}")
        print(f"\nRelevancy 详情:")
        for d in result.answer_relevancy_details:
            print(f"  Q: {d['generated_question']} -> sim={d['similarity']:.3f}")

    asyncio.run(test())

复杂度分析:

维度复杂度
FaithfulnessO(C × L),C=声明数,L=LLM 调用开销
RelevancyO(N × D),N=生成问题数,D=Embedding 维度
空间O(context_length + answer_length)

追问:

  1. RAGAS 框架中还有哪些评估指标?它们分别评估什么?
  2. LLM-as-Judge 的评估方式有哪些偏差?如何缓解?
  3. 如何构建一个可靠的评估数据集(Golden Dataset)?

Q16: 实现一个简化版的 LoRA 层 ⭐⭐⭐

题目描述:

实现 LoRA(Low-Rank Adaptation)的核心机制:

  1. 冻结原始权重,添加低秩适配器
  2. 前向传播:output = W @ x + (A @ B) @ x
  3. 支持 LoRA 的创建、前向计算和参数统计

解题思路:

  1. 原始权重 W 不参与梯度更新
  2. 添加两个小矩阵 A (d, r)B (r, d),其中 r << d
  3. 训练时只更新 A 和 B,推理时可以合并到 W

完整代码:

python
import numpy as np
from dataclasses import dataclass


@dataclass
class LoRAConfig:
    rank: int = 8           # 低秩维度
    alpha: float = 16.0     # 缩放因子
    dropout: float = 0.0    # Dropout 概率

    @property
    def scaling(self) -> float:
        return self.alpha / self.rank


class LoRALinear:
    """带 LoRA 适配器的线性层"""

    def __init__(self, in_features: int, out_features: int,
                 config: LoRAConfig = LoRAConfig()):
        self.in_features = in_features
        self.out_features = out_features
        self.config = config

        # 原始权重(冻结)
        self.W = np.random.randn(out_features, in_features) * np.sqrt(2.0 / in_features)
        self.W_frozen = True

        # LoRA 适配器
        self.A = np.random.randn(out_features, config.rank) * 0.01
        self.B = np.zeros((config.rank, in_features))  # B 初始化为零

        # 梯度
        self.dA = np.zeros_like(self.A)
        self.dB = np.zeros_like(self.B)

    def forward(self, x: np.ndarray) -> np.ndarray:
        """
        前向传播: output = W @ x + scaling * (A @ B) @ x
        Args:
            x: (in_features,) 或 (batch, in_features)
        Returns:
            output: (out_features,) 或 (batch, out_features)
        """
        # 原始权重输出
        base_output = x @ self.W.T

        # LoRA 输出
        lora_output = x @ (self.A @ self.B).T * self.config.scaling

        return base_output + lora_output

    def merge_weights(self) -> np.ndarray:
        """将 LoRA 权重合并到原始权重(推理时使用)"""
        return self.W + self.A @ self.B * self.config.scaling

    def count_parameters(self) -> dict:
        """统计参数量"""
        original = self.in_features * self.out_features
        lora = self.A.size + self.B.size
        return {
            "original_params": original,
            "lora_params": lora,
            "trainable_ratio": lora / original,
            "compression_ratio": original / lora,
        }

    def backward(self, x: np.ndarray, grad_output: np.ndarray):
        """
        反向传播(简化版,只计算 LoRA 梯度)
        Args:
            x: (batch, in_features)
            grad_output: (batch, out_features)
        """
        batch_size = x.shape[0]

        # dB = (grad_output @ A).T @ x / batch_size * scaling
        self.dB = (grad_output @ self.A).T @ x / batch_size * self.config.scaling

        # dA = grad_output.T @ (x @ B.T) / batch_size * scaling
        self.dA = grad_output.T @ (x @ self.B.T) / batch_size * self.config.scaling

        # 返回对输入的梯度(用于上游传播)
        grad_x = (grad_output @ (self.A @ self.B)) * self.config.scaling
        if not self.W_frozen:
            grad_x += grad_output @ self.W
        else:
            grad_x += grad_output @ self.W  # 不更新 W,但梯度仍需传播

        return grad_x

    def step(self, lr: float = 0.001):
        """参数更新(只更新 LoRA 参数)"""
        self.A -= lr * self.dA
        self.B -= lr * self.dB


class LoRAModel:
    """带 LoRA 的简单模型(2层 MLP)"""

    def __init__(self, input_dim: int, hidden_dim: int, output_dim: int,
                 lora_config: LoRAConfig = LoRAConfig()):
        self.layer1 = LoRALinear(input_dim, hidden_dim, lora_config)
        self.layer2 = LoRALinear(hidden_dim, output_dim, lora_config)

    def forward(self, x: np.ndarray) -> np.ndarray:
        h = self.layer1.forward(x)
        h = np.maximum(h, 0)  # ReLU
        return self.layer2.forward(h)

    def total_params(self) -> dict:
        p1 = self.layer1.count_parameters()
        p2 = self.layer2.count_parameters()
        return {
            "total_original": p1["original_params"] + p2["original_params"],
            "total_lora": p1["lora_params"] + p2["lora_params"],
            "trainable_ratio": (p1["lora_params"] + p2["lora_params"]) /
                               (p1["original_params"] + p2["original_params"]),
        }


# ---- 测试 ----
if __name__ == "__main__":
    np.random.seed(42)

    # 创建 LoRA 线性层
    config = LoRAConfig(rank=8, alpha=16)
    layer = LoRALinear(in_features=256, out_features=256, config=config)

    # 参数统计
    params = layer.count_parameters()
    print("=== LoRA 线性层参数统计 ===")
    print(f"原始参数量: {params['original_params']:,}")
    print(f"LoRA 参数量: {params['lora_params']:,}")
    print(f"可训练比例: {params['trainable_ratio']:.2%}")
    print(f"压缩比: {params['compression_ratio']:.1f}x")

    # 前向传播
    x = np.random.randn(4, 256)  # batch=4
    output = layer.forward(x)
    print(f"\n输入形状: {x.shape}")
    print(f"输出形状: {output.shape}")

    # 权重合并
    merged_W = layer.merge_weights()
    merged_output = x @ merged_W.T
    diff = np.abs(output - merged_output).max()
    print(f"\n合并权重后最大误差: {diff:.10f}")
    assert diff < 1e-10, "合并权重应产生相同结果"

    # 完整模型
    model = LoRAModel(256, 512, 10, config)
    total = model.total_params()
    print(f"\n=== 完整模型 ===")
    print(f"总原始参数: {total['total_original']:,}")
    print(f"总 LoRA 参数: {total['total_lora']:,}")
    print(f"可训练比例: {total['trainable_ratio']:.2%}")

    # 前向传播
    model_output = model.forward(x)
    print(f"模型输出形状: {model_output.shape}")

复杂度分析:

维度复杂度
前向传播O(d² + d×r×2),与原线性层相比多 O(d×r)
可训练参数2×d×r vs 原始 d²,当 r=8, d=4096 时仅 0.4%
空间额外 2×d×r 存储 A 和 B

追问:

  1. LoRA 的 rank 和 alpha 如何选择?过大或过小各有什么影响?
  2. QLoRA 是如何在 LoRA 基础上进一步减少显存占用的?
  3. LoRA 可以应用在 Transformer 的哪些层?效果有什么差异?

Prompt 工程与 Agent


Q17: 实现一个 Prompt 模板引擎(支持变量替换、条件分支、循环) ⭐⭐

题目描述:

实现一个 Prompt 模板引擎,支持:

  1. 变量替换:&#123;&#123;variable&#125;&#125;,支持默认值 &#123;&#123;variable|default&#125;&#125;
  2. 条件分支:{% if condition %}...{% else %}...{% endif %}
  3. 循环:{% for item in list %}...{% endfor %}

解题思路:

  1. 变量替换:正则匹配 &#123;&#123;...&#125;&#125;,从上下文中取值
  2. 条件分支:正则匹配 {% if %},根据变量值判断
  3. 循环:正则匹配 {% for %},迭代列表并渲染

完整代码:

python
import re
from typing import Any


class PromptTemplateEngine:
    """Prompt 模板引擎"""

    def __init__(self):
        self._templates: dict[str, str] = {}

    def register(self, name: str, template: str):
        """注册模板"""
        self._templates[name] = template

    def render(self, name: str, **kwargs: Any) -> str:
        """渲染模板"""
        template = self._templates.get(name)
        if template is None:
            raise ValueError(f"模板 '{name}' 不存在")
        return self._render(template, kwargs)

    def _render(self, template: str, context: dict) -> str:
        """渲染模板字符串"""
        # 1. 处理循环
        template = self._process_loops(template, context)
        # 2. 处理条件分支
        template = self._process_conditionals(template, context)
        # 3. 变量替换
        template = self._substitute_variables(template, context)
        return template.strip()

    def _substitute_variables(self, template: str, context: dict) -> str:
        """变量替换: {{var}} 或 {{var|default}}"""
        def replacer(match):
            expr = match.group(1).strip()
            parts = expr.split("|", 1)
            var_name = parts[0].strip()
            default = parts[1].strip() if len(parts) > 1 else ""

            value = self._resolve(var_name, context)
            if value is not None:
                return str(value)
            return default

        return re.sub(r"\{\{(.+?)\}\}", replacer, template)

    def _resolve(self, path: str, context: dict) -> Any:
        """解析嵌套变量路径,如 agent.name"""
        parts = path.split(".")
        current = context
        for part in parts:
            if isinstance(current, dict) and part in current:
                current = current[part]
            else:
                return None
        return current

    def _process_conditionals(self, template: str, context: dict) -> str:
        """处理条件分支: {% if %}...{% else %}...{% endif %}"""
        pattern = r"\{%\s*if\s+(\w+)\s%\}(.*?)(?:\{%\s*else\s%\}(.*?))?\{%\s*endif\s%\}"

        def replacer(match):
            var_name = match.group(1)
            true_block = match.group(2)
            false_block = match.group(3) or ""
            value = context.get(var_name)
            if value and value not in (False, 0, "", None, []):
                return true_block
            return false_block

        # 循环处理嵌套条件
        while "{% if " in template:
            new_template = re.sub(pattern, replacer, template, flags=re.DOTALL)
            if new_template == template:
                break
            template = new_template
        return template

    def _process_loops(self, template: str, context: dict) -> str:
        """处理循环: {% for item in list %}...{% endfor %}"""
        pattern = r"\{%\s*for\s+(\w+)\s+in\s+(\w+)\s%\}(.*?)\{%\s*endfor\s%\}"

        def replacer(match):
            item_name = match.group(1)
            list_name = match.group(2)
            body = match.group(3)

            items = context.get(list_name, [])
            if not isinstance(items, (list, tuple)):
                return ""

            result_parts = []
            for i, item in enumerate(items):
                loop_context = {**context, item_name: item, "loop_index": i + 1}
                rendered = self._render(body, loop_context)
                result_parts.append(rendered)

            return "\n".join(result_parts)

        while "{% for " in template:
            new_template = re.sub(pattern, replacer, template, flags=re.DOTALL)
            if new_template == template:
                break
            template = new_template
        return template


# ---- 测试 ----
if __name__ == "__main__":
    engine = PromptTemplateEngine()

    # 1. 基础变量替换
    engine.register("basic", "你好,{{name}}!你来自 {{city|未知城市}}。")
    print("=== 基础替换 ===")
    print(engine.render("basic", name="张明"))
    print(engine.render("basic", name="李华", city="上海"))

    # 2. 条件渲染
    engine.register("cond", """你是一个{{role|助手}}
{% if has_tools %}
你可以使用以下工具:{{tools}}
{% endif %}
{% if verbose %}
请详细回答用户的问题。
{% else %}
请简洁回答。
{% endif %}""")
    print("\n=== 条件渲染 ===")
    print(engine.render("cond", role="编程助手", has_tools=True,
                         tools="search, calculator", verbose=False))

    # 3. 循环渲染
    engine.register("rag", """请根据以下参考资料回答问题。

{% for doc in documents %}
[文档{{loop_index}}] {{doc}}
{% endfor %}

问题:{{question}}""")
    print("\n=== RAG 模板 ===")
    docs = ["RAG 是检索增强生成技术", "向量数据库是 RAG 的核心组件", "Embedding 模型将文本映射到向量空间"]
    print(engine.render("rag", documents=docs, question="什么是 RAG?"))

    # 4. 嵌套对象
    engine.register("agent", """Agent 配置:
名称: {{agent.name}}
模型: {{agent.model}}
温度: {{agent.temperature}}

用户输入: {{input}}""")
    print("\n=== 嵌套对象 ===")
    agent_config = {"name": "MyAgent", "model": "gpt-4o", "temperature": 0.7}
    print(engine.render("agent", agent=agent_config, input="帮我查天气"))

复杂度分析:

维度复杂度
变量替换O(N × V),N=模板长度,V=变量数
循环渲染O(N × I),I=迭代次数
空间O(渲染结果大小)

追问:

  1. Jinja2 模板引擎相比自实现的模板引擎有哪些高级功能?在 Prompt 工程中哪些特性最实用?
  2. 如何实现 Prompt 版本管理?如何做 A/B 测试来比较不同 Prompt 的效果?

Q18: 实现一个简化版的 Agent ReAct 循环(带工具调用和错误处理) ⭐⭐⭐

题目描述:

实现 ReAct(Reasoning + Acting)Agent 循环:

  1. Thought:LLM 进行推理,决定下一步行动
  2. Action:调用工具执行操作
  3. Observation:获取工具执行结果
  4. 循环直到得到最终答案或达到最大步数

解题思路:

  1. 维护对话历史(Thought/Action/Observation 三元组)
  2. 每轮让 LLM 生成 Thought 和 Action
  3. 解析 Action,调用对应工具
  4. 将 Observation 拼入上下文,继续下一轮

完整代码:

python
import json
import re
import traceback
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable


@dataclass
class Tool:
    name: str
    description: str
    func: Callable
    parameters: dict = field(default_factory=dict)

    def execute(self, **kwargs) -> str:
        try:
            return str(self.func(**kwargs))
        except Exception as e:
            return f"工具执行错误: {type(e).__name__}: {e}"


@dataclass
class ReActStep:
    step_number: int
    thought: str
    action: str | None = None
    action_input: dict | None = None
    observation: str | None = None
    is_final: bool = False
    error: str | None = None


class ReActAgent:
    """ReAct Agent 循环"""

    def __init__(self, llm, tools: list[Tool], max_steps: int = 10,
                 max_retries: int = 2):
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.max_steps = max_steps
        self.max_retries = max_retries

    async def run(self, query: str) -> dict:
        """
        运行 ReAct 循环
        Returns: {"answer": str, "steps": list[ReActStep], "success": bool}
        """
        steps: list[ReActStep] = []
        history: list[dict] = []

        for step_num in range(1, self.max_steps + 1):
            # 1. LLM 生成 Thought + Action
            llm_output = await self._think(query, history)

            # 2. 解析输出
            parsed = self._parse_output(llm_output)
            step = ReActStep(
                step_number=step_num,
                thought=parsed.get("thought", ""),
                action=parsed.get("action"),
                action_input=parsed.get("action_input", {}),
                is_final=parsed.get("action") is None,
            )

            # 3. 如果是最终答案
            if step.is_final:
                step.observation = parsed.get("answer", "")
                steps.append(step)
                return {
                    "answer": step.observation,
                    "steps": steps,
                    "success": True,
                    "total_steps": step_num,
                }

            # 4. 执行工具(带重试)
            observation = await self._execute_with_retry(step)
            step.observation = observation
            steps.append(step)

            # 5. 更新历史
            history.append({
                "step": step_num,
                "thought": step.thought,
                "action": step.action,
                "action_input": step.action_input,
                "observation": observation,
            })

        # 达到最大步数
        return {
            "answer": f"达到最大步数 {self.max_steps},未能得出最终答案。",
            "steps": steps,
            "success": False,
            "total_steps": len(steps),
        }

    async def _think(self, query: str, history: list[dict]) -> str:
        """让 LLM 推理并决定下一步"""
        tools_desc = "\n".join(
            f"- {t.name}: {t.description}" for t in self.tools.values()
        )

        history_text = ""
        for h in history:
            history_text += f"Step {h['step']}:\n"
            history_text += f"  Thought: {h['thought']}\n"
            history_text += f"  Action: {h['action']}({h['action_input']})\n"
            history_text += f"  Observation: {h['observation']}\n\n"

        prompt = f"""你是一个智能助手,使用 ReAct 模式回答问题。

可用工具:
{tools_desc}

请按以下格式输出:
Thought: [你的推理过程]
Action: [工具名称]
Action Input: {{"参数名": "参数值"}}

如果你已经有足够信息回答问题,直接输出:
Thought: [总结推理过程]
Answer: [最终答案]

问题:{query}

{history_text}"""

        return await self.llm.generate(prompt)

    def _parse_output(self, text: str) -> dict:
        """解析 LLM 输出"""
        result = {}

        # 提取 Thought
        thought_match = re.search(r"Thought:\s*(.+?)(?=\n(?:Action|Answer):|\Z)",
                                   text, re.DOTALL)
        if thought_match:
            result["thought"] = thought_match.group(1).strip()

        # 提取 Answer(最终答案)
        answer_match = re.search(r"Answer:\s*(.+)", text, re.DOTALL)
        if answer_match:
            result["answer"] = answer_match.group(1).strip()
            return result

        # 提取 Action
        action_match = re.search(r"Action:\s*(\w+)", text)
        if action_match:
            result["action"] = action_match.group(1).strip()

        # 提取 Action Input
        input_match = re.search(r"Action Input:\s*(\{.+?\})", text, re.DOTALL)
        if input_match:
            try:
                result["action_input"] = json.loads(input_match.group(1))
            except json.JSONDecodeError:
                result["action_input"] = {}

        return result

    async def _execute_with_retry(self, step: ReActStep) -> str:
        """带重试的工具执行"""
        tool = self.tools.get(step.action)
        if tool is None:
            return f"错误:工具 '{step.action}' 不存在。可用工具:{list(self.tools.keys())}"

        for attempt in range(self.max_retries + 1):
            try:
                result = tool.execute(**(step.action_input or {}))
                if "错误" not in result or attempt == self.max_retries:
                    return result
                # 如果是错误且还有重试机会
                step.error = result
            except Exception as e:
                if attempt == self.max_retries:
                    return f"工具执行失败(已重试{self.max_retries}次): {e}"
                step.error = str(e)

        return "工具执行失败"


# ---- 测试 ----
if __name__ == "__main__":
    import asyncio
    import math

    # 定义工具
    def calculator(expression: str) -> str:
        """计算数学表达式"""
        allowed = set("0123456789+-*/.() ")
        if not all(c in allowed for c in expression):
            return "错误:不支持的字符"
        return str(eval(expression))

    def search(query: str) -> str:
        """模拟搜索"""
        results = {
            "北京天气": "北京今天晴,气温 15-25°C,空气质量良",
            "上海天气": "上海今天多云,气温 18-28°C,有轻微雾霾",
            "RAG": "RAG(检索增强生成)是结合信息检索和文本生成的AI技术",
        }
        for key, value in results.items():
            if key in query:
                return value
        return f"未找到关于'{query}'的结果"

    def get_time(timezone: str = "Asia/Shanghai") -> str:
        """获取当前时间"""
        return f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ({timezone})"

    tools = [
        Tool(name="calculator", description="计算数学表达式。输入: expression (数学表达式字符串)",
             func=calculator, parameters={"expression": "str"}),
        Tool(name="search", description="搜索信息。输入: query (搜索关键词)",
             func=search, parameters={"query": "str"}),
        Tool(name="get_time", description="获取当前时间。输入: timezone (时区名称)",
             func=get_time, parameters={"timezone": "str"}),
    ]

    # Mock LLM
    class MockLLM:
        def __init__(self):
            self.call_count = 0

        async def generate(self, prompt: str) -> str:
            self.call_count += 1
            if self.call_count == 1:
                return """Thought: 用户想知道北京天气和当前时间。我需要先搜索天气信息。
Action: search
Action Input: {"query": "北京天气"}"""
            elif self.call_count == 2:
                return """Thought: 已经获取到天气信息,现在获取当前时间。
Action: get_time
Action Input: {"timezone": "Asia/Shanghai"}"""
            else:
                return """Thought: 已经获取到所有信息,可以回答用户了。
Answer: 北京今天天气晴朗,气温 15-25°C,空气质量良好。当前时间为 2024-01-15 14:30:00 (北京时间)。建议穿着轻便外套出行。"""

    async def test():
        agent = ReActAgent(
            llm=MockLLM(),
            tools=tools,
            max_steps=5,
            max_retries=2,
        )

        result = await agent.run("北京今天天气怎么样?现在几点了?")

        print("=== ReAct Agent 执行结果 ===")
        print(f"成功: {result['success']}")
        print(f"总步数: {result['total_steps']}")
        print(f"\n最终答案:\n{result['answer']}")

        print(f"\n=== 执行步骤 ===")
        for step in result["steps"]:
            print(f"\nStep {step.step_number}:")
            print(f"  Thought: {step.thought}")
            if step.action:
                print(f"  Action: {step.action}({step.action_input})")
            if step.observation:
                obs = step.observation[:100] + "..." if len(step.observation) > 100 else step.observation
                print(f"  Observation: {obs}")
            if step.is_final:
                print(f"  [最终答案]")

    asyncio.run(test())

复杂度分析:

维度复杂度
时间O(S × L),S=步数,L=LLM 调用延迟
空间O(S × H),H=每步历史大小
Token 消耗O(S × context_length),随步数线性增长

追问:

  1. ReAct 模式与 Plan-and-Execute 模式有什么区别?各适用于什么场景?
  2. 如何处理 Agent 的幻觉问题?(如调用不存在的工具、参数错误等)
  3. 如何实现 Agent 的并行工具调用?与串行调用相比有什么优缺点?

LLM 应用 & Agent 开发面试准备