21. 系统设计题
面向大模型应用工程师/Agent开发工程师的高频系统设计面试题 每道题包含:需求分析、架构设计、核心模块、技术选型、扩展性考虑
一、RAG 系统设计题
Q1: 设计一个企业知识库问答系统 ⭐⭐
需求分析:
- 支持文档上传(PDF/Word/Markdown/HTML),增量更新
- 多轮对话,带上下文理解
- 准确率 > 90%,延迟 < 3s
- 支持权限隔离(不同部门看到不同文档)
- 日活 1000+ 用户
架构设计:
用户请求 → API Gateway → Query理解模块 → 检索模块 → 重排模块 → LLM生成 → 答案+引用
↓ ↓ ↓
意图识别/改写 向量DB+ES混合检索 Cross-Encoder
↑
文档处理流水线
解析→分块→Embedding→入库核心模块与代码示例:
# 1. 文档处理流水线
class DocumentPipeline:
def __init__(self):
self.parsers = {
'.pdf': PDFParser(),
'.docx': DocxParser(),
'.md': MarkdownParser(),
}
self.chunker = SemanticChunker(
chunk_size=512,
overlap=64,
separators=["\n\n", "\n", "。", ";"]
)
self.embedder = EmbeddingModel("bge-large-zh-v1.5")
async def process(self, file_path: str, metadata: dict):
# 解析
ext = os.path.splitext(file_path)[1]
doc = self.parsers[ext].parse(file_path)
# 分块
chunks = self.chunker.chunk(doc)
# Embedding并入库
for chunk in chunks:
embedding = self.embedder.encode(chunk.text)
await self.vector_db.upsert(
vector=embedding,
text=chunk.text,
metadata={**metadata, "chunk_id": chunk.id,
"page": chunk.page}
)
# 2. 混合检索模块
class HybridRetriever:
def __init__(self, vector_db, es_client, reranker):
self.vector_db = vector_db
self.es = es_client
self.reranker = reranker
async def retrieve(self, query: str, user_dept: str, top_k=10):
filter_ = {"dept": {"$in": [user_dept, "all"]}}
# 向量检索
vec_results = await self.vector_db.search(
vector=self.embedder.encode(query),
filter=filter_, top_k=20
)
# 关键词检索
es_results = await self.es.search(
index="knowledge", query={"match": {"text": query}},
filter={"term": {"dept": user_dept}}, size=20
)
# RRF融合
merged = reciprocal_rank_fusion(vec_results, es_results)
# 重排
reranked = self.reranker.rerank(
query=query,
documents=[r.text for r in merged[:30]]
)
return reranked[:top_k]技术选型:
| 模块 | 推荐方案 | 备选 |
|---|---|---|
| 向量数据库 | Milvus / Qdrant | Weaviate, Pinecone |
| 全文检索 | Elasticsearch | OpenSearch |
| Embedding | BGE-large-zh | M3E, text-embedding-3-large |
| 重排模型 | bge-reranker-v2-m3 | Cohere Rerank |
| LLM | GPT-4o / Qwen-Max | DeepSeek-V3, Claude |
| 文档解析 | Unstructured | LlamaParse |
扩展性考虑:
- 增量索引:文档变更触发CDC,异步更新向量库
- 缓存层:Redis缓存高频query的检索结果
- 评估闭环:用户反馈→标注→微调Embedding/重排模型
- 多语言:检测语言后路由到对应Embedding模型
Q2: 设计一个多模态RAG系统 ⭐⭐⭐
需求分析:
- 同时处理文本、图片、表格、图表
- 用户可以上传图片提问(如"这张图里的数据说明什么")
- 需要理解文档中的图表与文字的关联关系
- 支持跨模态检索(文字查图片,图片查文字)
架构设计:
输入(文本/图片/文档)
↓
多模态解析层:文档解析→文本块 / 图片提取 / 表格提取 / OCR
↓
多模态Embedding层:文本Embedding + 视觉Embedding (CLIP/SigLIP)
↓
统一向量空间存储 (Milvus, 多向量字段)
↓
检索层:跨模态检索 + 多路召回融合
↓
多模态LLM生成 (GPT-4o/Qwen-VL) ← 上下文拼接(文本+图片base64)
↓
回答 + 来源引用(含图片引用)核心模块:
class MultiModalRAG:
def __init__(self):
self.text_embedder = SentenceTransformer("bge-large-zh-v1.5")
self.visual_embedder = SentenceTransformer("clip-vit-large-patch14")
self.vision_llm = VisionLLM("gpt-4o")
self.doc_parser = MultiModalParser() # 基于pdfplumber+PIL
async def index_document(self, doc_path: str):
parsed = self.doc_parser.parse(doc_path)
for item in parsed:
if item.type == "text":
emb = self.text_embedder.encode(item.content)
await self.vdb.upsert(
vector=emb, modality="text",
content=item.content, metadata=item.metadata
)
elif item.type == "image":
img_emb = self.visual_embedder.encode(item.image)
# 同时存储图片描述(由VLM生成)
caption = await self.vision_llm.describe(item.image)
txt_emb = self.text_embedder.encode(caption)
await self.vdb.upsert(
vector=img_emb, secondary_vector=txt_emb,
modality="image", image_path=item.path,
caption=caption, metadata=item.metadata
)
elif item.type == "table":
table_text = item.to_markdown()
emb = self.text_embedder.encode(table_text)
await self.vdb.upsert(
vector=emb, modality="table",
content=table_text, metadata=item.metadata
)
async def query(self, question: str, image=None):
if image:
# 图片查询:用视觉Embedding检索相似图片+相关文本
q_vec = self.visual_embedder.encode(image)
results = await self.vdb.search(q_vec, top_k=5)
txt_vec = self.text_embedder.encode(question)
txt_results = await self.vdb.search(txt_vec, top_k=5)
results = self._merge(results, txt_results)
else:
q_vec = self.text_embedder.encode(question)
results = await self.vdb.search(q_vec, top_k=10)
# 构建多模态prompt
context = self._build_context(results) # 文本+图片base64
answer = await self.vision_llm.generate(
prompt=f"基于以下参考资料回答:\n{context}\n\n问题:{question}"
)
return answer, results技术选型:
| 模块 | 推荐方案 |
|---|---|
| 多模态Embedding | CLIP / SigLIP / E5-V |
| 文档解析 | LlamaParse / Unstructured / Marker |
| 表格提取 | pdfplumber + Camelot |
| 图片理解 | GPT-4o / Qwen2.5-VL |
| 向量存储 | Milvus (支持多向量字段) |
扩展性考虑:
- 图片去重与相似度聚类,减少冗余存储
- 表格结构化存储,支持SQL查询
- 异步预处理:文档上传后异步解析、索引
- 支持视频/音频:ASR转文字 + 关键帧提取
二、Agent 系统设计题
Q3: 设计一个自动化运维Agent ⭐⭐⭐
需求分析:
- 根据告警自动诊断根因并执行修复操作
- 支持自然语言指令(如"检查prod集群的CPU使用率")
- 操作前需人工确认(高危操作)
- 操作日志可审计,支持回滚
- 安全性:最小权限原则,沙箱执行
架构设计:
告警/用户指令
↓
意图解析 → 任务规划 (ReAct/Plan-and-Execute)
↓
工具调用层:K8s API / Shell / 监控查询 / 日志查询 / 数据库
↓ ↓
安全沙箱 审批网关(高危操作需人工确认)
↓ ↓
执行引擎 ←←←←←←←←←┘
↓
结果评估 → 是否需要后续操作 → 循环/结束
↓
操作日志 + 通知核心模块:
class OpsAgent:
def __init__(self, llm, tools, safety_checker, approval_gateway):
self.llm = llm
self.tools = tools
self.safety = safety_checker
self.approval = approval_gateway
self.memory = ConversationBufferMemory()
TOOLS = [
{"name": "kubectl", "desc": "执行K8s命令", "risk": "high"},
{"name": "query_metrics", "desc": "查询Prometheus指标", "risk": "low"},
{"name": "query_logs", "desc": "查询日志(ES/Loki)", "risk": "low"},
{"name": "run_sql", "desc": "执行SQL查询", "risk": "medium"},
{"name": "execute_shell", "desc": "执行Shell命令", "risk": "high"},
{"name": "restart_service", "desc": "重启服务", "risk": "high"},
]
async def handle(self, user_input: str):
# 1. 意图解析与任务规划
plan = await self.llm.plan(
system_prompt=self._build_system_prompt(),
user_input=user_input,
tools=self.TOOLS,
history=self.memory.get()
)
# 2. 逐步执行
for step in plan.steps:
# 安全检查
if not self.safety.check(step):
return f"安全检查未通过: {step.risk_reason}"
# 高危操作需审批
if step.risk_level == "high":
approved = await self.approval.request(
action=step.description,
user=user_input,
dry_run=step.preview()
)
if not approved:
return "操作未获批准"
# 在沙箱中执行
result = await self.tools.execute(
tool=step.tool,
params=step.params,
sandbox=True,
timeout=60
)
self.memory.add(step, result)
# 评估是否需要继续
if result.status == "error":
# 尝试自动修复或上报
recovery = await self.llm.recover(step, result)
if recovery:
continue
return self._escalate(step, result)
return self._summarize(plan, self.memory.get())
def _build_system_prompt(self):
return """你是一个资深运维工程师Agent。
规则:
1. 先诊断,后操作
2. 高危操作必须先dry-run预览
3. 每步操作必须有明确目的和预期结果
4. 出错时尝试回滚
5. 记录完整操作日志"""技术选型:
| 模块 | 推荐方案 |
|---|---|
| Agent框架 | LangGraph / AutoGen |
| LLM | GPT-4o / Claude 3.5 (强推理) |
| 沙箱 | Docker容器 / nsjail |
| 审批 | 飞书/钉钉审批流 + 超时自动拒绝 |
| 监控 | Prometheus + Grafana |
| 日志审计 | ELK / Loki |
扩展性考虑:
- Runbook知识库:历史故障修复方案作为few-shot
- 灰度执行:先在staging验证,再prod执行
- 自学习:成功案例自动沉淀为标准操作流程
- 多集群支持:统一抽象不同K8s集群/云厂商API
Q4: 设计一个代码助手Agent ⭐⭐
需求分析:
- IDE内嵌(VS Code插件),实时代码补全和对话
- 支持多文件上下文理解(整个项目级别)
- 代码生成、解释、重构、Bug修复、单测生成
- 延迟要求:补全 < 500ms,对话 < 3s
- 日活 10000+ 开发者
架构设计:
VS Code插件
├→ 补全请求 → 本地缓存/小模型快速补全 → 返回
└→ 对话请求 → 上下文收集 → Agent后端
↓
代码索引(AST+Embedding)
↓
检索相关代码片段
↓
LLM推理(GPT-4o/Claude)
↓
代码后处理(格式化/lint)核心模块:
class CodeAssistantAgent:
def __init__(self):
self.indexer = CodeIndexer() # 代码索引
self.retriever = CodeRetriever() # 代码检索
self.llm = LLMRouter() # 路由到不同模型
self.context_builder = ContextBuilder()
async def complete(self, cursor_context: CursorContext):
"""代码补全 - 低延迟场景"""
# 本地缓存命中
cache_key = self._cache_key(cursor_context)
if cached := await self.cache.get(cache_key):
return cached
# 使用小模型快速补全
prompt = self.context_builder.build_completion_prompt(
prefix=cursor_context.prefix,
suffix=cursor_context.suffix,
language=cursor_context.language
)
result = await self.llm.route(
task_type="completion", # 路由到codestral/本地小模型
prompt=prompt, max_tokens=128
)
await self.cache.set(cache_key, result, ttl=300)
return result
async def chat(self, message: str, workspace: WorkspaceContext):
"""对话 - 质量优先场景"""
# 1. 收集上下文
active_file = workspace.active_file
relevant_files = await self.retriever.find_relevant(
query=message,
file_tree=workspace.file_tree,
recent_files=workspace.recent_files,
top_k=5
)
# 2. 构建项目级上下文
context = self.context_builder.build_chat_context(
active_file=active_file,
relevant_files=relevant_files,
git_diff=workspace.git_diff,
message=message
)
# 3. LLM生成
response = await self.llm.route(
task_type="chat", # 路由到GPT-4o/Claude
prompt=context
)
# 4. 后处理:提取代码块,格式化
return self._postprocess(response, workspace)
class CodeIndexer:
"""基于AST+Embedding的代码索引"""
def index(self, repo_path: str):
for file in self._walk_source_files(repo_path):
tree = ast.parse(file) # 语言特定AST解析
for symbol in self._extract_symbols(tree):
# 存储函数/类的签名、docstring、body
embedding = self.embedder.encode(
f"{symbol.name}\n{symbol.docstring}\n{symbol.signature}"
)
self.vector_db.upsert(
vector=embedding,
symbol=symbol.name,
file=file.path,
body=symbol.body,
type=symbol.type # function/class/method
)技术选型:
| 模块 | 推荐方案 |
|---|---|
| 补全模型 | Codestral / DeepSeek-Coder / StarCoder2 |
| 对话模型 | GPT-4o / Claude 3.5 Sonnet |
| 代码索引 | Tree-sitter (AST) + Embedding |
| IDE集成 | VS Code Extension API / LSP |
| 缓存 | Redis / 本地LRU |
扩展性考虑:
- 本地模型兜底:离线或API不可用时使用本地小模型
- 增量索引:文件保存时增量更新索引
- 个性化:根据用户编码风格微调补全
- 隐私保护:敏感代码不发送到云端,本地模型处理
三、模型服务平台设计
Q5: 设计一个高可用模型推理服务 ⭐⭐⭐
需求分析:
- 支持多个模型同时服务(LLM、Embedding、Reranker、VLM)
- P99延迟 < 2s(流式首token < 500ms)
- 可用性 99.95%,支持滚动升级
- GPU利用率 > 70%
- 支持自动扩缩容
架构设计:
客户端请求
↓
API Gateway (限流/鉴权/路由)
↓
请求调度器 (优先级队列/公平调度)
↓
模型路由 (根据模型名/版本路由)
↓
推理引擎集群
├→ vLLM (LLM, PagedAttention)
├→ TensorRT-LLM (高吞吐场景)
├→ Triton (Embedding/小模型)
└→ ONNX Runtime (传统模型)
↓
响应流式返回核心模块:
class ModelServingPlatform:
def __init__(self):
self.router = ModelRouter()
self.scheduler = RequestScheduler()
self.health_checker = HealthChecker()
self.metrics = MetricsCollector()
async def serve(self, request: InferenceRequest):
# 1. 限流检查
if not await self.rate_limiter.allow(request.user, request.model):
raise RateLimitError()
# 2. 路由到可用实例
instance = await self.router.route(
model=request.model,
version=request.version,
strategy="least_connections" # 加权轮询/最少连接/延迟优先
)
# 3. 调度(支持优先级和抢占)
future = await self.scheduler.schedule(
request=request,
instance=instance,
priority=request.priority,
timeout=request.timeout
)
# 4. 流式返回
async for chunk in future:
yield chunk
# 5. 记录指标
self.metrics.record(
model=request.model,
latency=future.latency,
tokens=future.output_tokens,
instance=instance.id
)
class ModelRouter:
"""智能路由,考虑负载、延迟、成本"""
async def route(self, model: str, version: str, strategy: str):
instances = await self.registry.get_healthy_instances(model, version)
if strategy == "least_connections":
return min(instances, key=lambda i: i.active_requests)
elif strategy == "latency_based":
return min(instances, key=lambda i: i.avg_latency)
elif strategy == "cost_optimized":
# 优先使用小模型/本地GPU,溢出到云端
for inst in sorted(instances, key=lambda i: i.cost_per_token):
if inst.available_capacity > 0:
return inst
return random.choice(instances)
class AutoScaler:
"""基于队列深度和延迟的自动扩缩容"""
async def evaluate(self, model: str):
metrics = await self.metrics.get_recent(model, window="5m")
instances = await self.registry.get_instances(model)
# 扩容条件
if (metrics.queue_depth > len(instances) * 10 or
metrics.p99_latency > self.thresholds[model].max_latency):
new_count = min(len(instances) * 2, self.max_instances[model])
await self.k8s.scale(model, new_count)
# 缩容条件
if (metrics.avg_gpu_util < 0.3 and
metrics.queue_depth < len(instances) * 2):
new_count = max(len(instances) // 2, self.min_instances[model])
await self.k8s.scale(model, new_count)技术选型:
| 模块 | 推荐方案 |
|---|---|
| LLM推理引擎 | vLLM / TensorRT-LLM |
| 小模型服务 | Triton Inference Server |
| API Gateway | Kong / APISIX |
| 编排 | K8s + GPU Operator |
| 监控 | Prometheus + Grafana |
| 服务网格 | Istio (流量管理/熔断) |
扩展性考虑:
- 多级缓存:KV Cache复用、prompt前缀缓存(vLLM Automatic Prefix Caching)
- 量化部署:INT4/INT8量化,同一GPU服务更多请求
- 多区域部署:就近接入,跨区域容灾
- 模型版本管理:A/B测试,灰度发布,快速回滚
Q6: 设计一个Prompt管理平台 ⭐⭐
需求分析:
- 集中管理所有业务的Prompt模板
- 支持版本管理、A/B测试、灰度发布
- Prompt在线调试和效果评估
- 变量模板、多语言支持
- 权限控制和审计日志
架构设计:
Prompt管理界面 (Web UI)
↓
Prompt CRUD API → Prompt存储 (MySQL + Git)
↓
Prompt渲染引擎 (Jinja2变量替换)
↓
A/B实验引擎 (分流/指标收集)
↓
效果评估模块 (自动打标/人工评估)
↓
SDK (Python/Java/Go) ← 业务系统集成核心模块:
class PromptManager:
def __init__(self, store, renderer, experiment):
self.store = store # Prompt存储
self.renderer = renderer # 模板渲染
self.experiment = experiment # A/B实验
async def get_prompt(self, prompt_key: str, user_id: str,
variables: dict) -> str:
# 1. 获取Prompt(带实验分流)
prompt_config = await self.experiment.assign(
prompt_key=prompt_key,
user_id=user_id
)
# 2. 渲染模板
rendered = self.renderer.render(
template=prompt_config.template,
variables=variables
)
return rendered
async def create_version(self, prompt_key: str, template: str,
metadata: dict):
"""创建新版本,自动diff"""
current = await self.store.get_latest(prompt_key)
new_version = PromptVersion(
key=prompt_key,
template=template,
version=current.version + 1,
parent_version=current.version,
diff=self._compute_diff(current.template, template),
**metadata
)
await self.store.save(new_version)
return new_version
async def evaluate(self, prompt_key: str, version: int,
test_cases: list[EvalCase]) -> EvalReport:
"""自动评估Prompt效果"""
results = []
prompt = await self.store.get(prompt_key, version)
for case in test_cases:
rendered = self.renderer.render(prompt.template, case.inputs)
output = await self.llm.generate(rendered)
score = await self._auto_score(
output=output,
expected=case.expected,
criteria=case.eval_criteria
)
results.append(EvalResult(
input=case.inputs, output=output,
expected=case.expected, score=score
))
return EvalReport(
prompt_key=prompt_key,
version=version,
avg_score=sum(r.score for r in results) / len(results),
pass_rate=sum(1 for r in results if r.score > 0.7) / len(results),
results=results
)
class ABExperimentEngine:
"""Prompt A/B实验引擎"""
async def assign(self, prompt_key: str, user_id: str) -> PromptConfig:
experiment = await self.get_active_experiment(prompt_key)
if not experiment:
return await self.store.get_production(prompt_key)
# 一致性哈希分流
bucket = self._hash(user_id) % 100
for variant in experiment.variants:
if bucket < variant.traffic_percent:
return variant.prompt_config
bucket -= variant.traffic_percent
return experiment.control.prompt_config技术选型:
| 模块 | 推荐方案 |
|---|---|
| 存储 | MySQL + Git版本控制 |
| 模板引擎 | Jinja2 / Mustache |
| A/B实验 | 自研 / Statsig / GrowthBook |
| 评估 | LLM-as-Judge + 人工标注 |
| SDK | Python / TypeScript |
| 前端 | React + Monaco Editor |
扩展性考虑:
- Prompt推荐:基于历史效果数据推荐最优Prompt
- 多模型适配:同一Prompt针对不同模型自动调整
- 批量评估:CI/CD集成,Prompt变更自动回归测试
- 成本估算:渲染后token数估算和费用预测
四、多Agent系统设计
Q7: 设计一个多Agent客服系统 ⭐⭐⭐
需求分析:
- 意图识别后路由到专业Agent(售前/售后/技术支持/退款)
- 复杂问题可跨Agent协作
- 知识库实时更新,新产品上线后快速生效
- 人机协同:超出能力时平滑转人工
- 满意度 > 85%
架构设计:
用户消息
↓
路由Agent (意图识别 + 情绪分析)
↓
┌───────┼───────┬──────────┐
售前Agent 售后Agent 技术Agent 退款Agent
↕ ↕ ↕ ↕
└─────────┴────────┴──────────┘
共享上下文总线
↓
工具层:订单系统/知识库/Ticket系统
↓
人工兜底 (转接人工客服)核心模块:
class CustomerServiceSystem:
def __init__(self):
self.router = RouterAgent()
self.agents = {
"presale": PresaleAgent(),
"aftersale": AfterSaleAgent(),
"tech_support": TechSupportAgent(),
"refund": RefundAgent(),
}
self.context_bus = ContextBus()
self.handoff = HumanHandoff()
async def handle(self, message: Message, session: Session):
# 1. 路由Agent分析意图
intent = await self.router.classify(
message=message,
history=session.history,
user_profile=session.user_profile
)
# 2. 情绪分析 - 检测是否需要转人工
sentiment = await self.analyzer.analyze(message)
if sentiment.anger_score > 0.8:
return await self.handoff.transfer(session, reason="高情绪")
# 3. 分发到专业Agent
agent = self.agents[intent.domain]
response = await agent.handle(
message=message,
context=self.context_bus.get(session.id),
tools=self._get_tools(intent.domain)
)
# 4. 跨Agent协作
if response.needs_collaboration:
collab_agent = self.agents[response.collab_domain]
collab_response = await collab_agent.assist(
context=response.context,
question=response.collab_question
)
response = agent.merge(response, collab_response)
# 5. 置信度检查 - 低置信度转人工
if response.confidence < 0.6:
return await self.handoff.transfer(
session,
reason="低置信度",
partial_answer=response.text
)
# 6. 更新上下文
self.context_bus.update(session.id, message, response)
return response
class RouterAgent:
"""路由Agent - 意图识别与分发"""
SYSTEM_PROMPT = """你是客服路由专家。根据用户消息判断:
1. 主要意图领域:presale/aftersale/tech_support/refund/general
2. 紧急程度:low/medium/high
3. 是否需要多领域协作
返回JSON格式。"""
async def classify(self, message, history, user_profile):
prompt = f"""用户画像:{user_profile.summary}
最近对话:{history[-3:]}
当前消息:{message.content}"""
result = await self.llm.generate(
system=self.SYSTEM_PROMPT,
prompt=prompt,
response_format={"type": "json_object"}
)
return Intent.parse(result)
class PresaleAgent:
"""售前Agent - 产品咨询与推荐"""
def __init__(self):
self.product_kb = ProductKnowledgeBase()
self.tools = [
SearchProductsTool(),
CompareProductsTool(),
CheckStockTool(),
GetPriceTool(),
]
async def handle(self, message, context, tools):
# 检索产品知识库
kb_results = await self.product_kb.search(message, top_k=3)
# 生成回答
response = await self.llm.generate(
system=self._build_system_prompt(kb_results),
messages=context.messages + [message]
)
return response技术选型:
| 模块 | 推荐方案 |
|---|---|
| Agent框架 | LangGraph / CrewAI |
| 意图识别 | 微调BERT + LLM兜底 |
| 知识库 | 向量DB + 结构化产品库 |
| 人工转接 | 飞书/钉钉 + 自建工单系统 |
| 对话存储 | Redis (会话) + MySQL (持久化) |
扩展性考虑:
- Agent热插拔:新业务Agent可动态注册
- 对话质检:自动评估对话质量,发现问题Agent
- 多语言:检测语言后路由到对应语言Agent
- 数据飞轮:对话数据→标注→微调→提升准确率
Q8: 设计一个多Agent研究助手 ⭐⭐⭐
需求分析:
- 用户输入研究主题,自动搜索、阅读、分析、生成研究报告
- 支持学术论文、网页、数据集等多种信息源
- Agent之间可讨论、质疑、迭代改进
- 输出结构化研究报告,附引用来源
- 单次研究耗时 < 10分钟
架构设计:
用户输入研究主题
↓
规划Agent (分解研究问题, 制定计划)
↓
┌───────┼───────┬──────────┐
搜索Agent 阅读Agent 分析Agent 写作Agent
(多源搜索) (文献精读) (数据分析) (报告撰写)
↕ ↕ ↕ ↕
└─────────┴────────┴──────────┘
共享研究笔记 (Blackboard)
↓
审核Agent (质量检查/事实核查)
↓
最终研究报告 + 参考文献核心模块:
class ResearchAssistant:
def __init__(self):
self.agents = {
"planner": PlannerAgent(),
"searcher": SearchAgent(),
"reader": ReaderAgent(),
"analyst": AnalystAgent(),
"writer": WriterAgent(),
"reviewer": ReviewerAgent(),
}
self.blackboard = ResearchBlackboard()
async def research(self, topic: str, depth: str = "standard"):
# 1. 规划分解
plan = await self.agents["planner"].create_plan(
topic=topic,
depth=depth,
template=self._get_template(depth)
)
self.blackboard.set_plan(plan)
# 2. 并行搜索
search_tasks = []
for question in plan.questions:
search_tasks.append(
self.agents["searcher"].search(question)
)
search_results = await asyncio.gather(*search_tasks)
# 3. 阅读与提取
for results in search_results:
for source in results[:5]: # 每个问题取top5
extracted = await self.agents["reader"].read(
url=source.url,
query=source.question
)
self.blackboard.add_note(extracted)
# 4. 分析与综合
analysis = await self.agents["analyst"].analyze(
notes=self.blackboard.get_all_notes(),
plan=plan
)
self.blackboard.set_analysis(analysis)
# 5. 撰写报告
draft = await self.agents["writer"].write(
plan=plan,
analysis=analysis,
notes=self.blackboard.get_all_notes()
)
# 6. 审核迭代 (最多3轮)
for i in range(3):
review = await self.agents["reviewer"].review(draft)
if review.quality_score > 0.85:
break
draft = await self.agents["writer"].revise(
draft=draft,
feedback=review.feedback
)
return ResearchReport(
title=topic,
content=draft,
references=self.blackboard.get_references(),
metadata={"iterations": i + 1, "sources": len(self.blackboard)}
)
class ResearchBlackboard:
"""共享研究笔记板 - 多Agent协作的核心"""
def __init__(self):
self.notes: list[ResearchNote] = []
self.plan: ResearchPlan = None
self.analysis: Analysis = None
def add_note(self, note: ResearchNote):
# 去重检查
if not self._is_duplicate(note):
self.notes.append(note)
self._update_index(note)
def get_relevant_notes(self, query: str, top_k=10) -> list:
"""语义检索相关笔记"""
return self.vector_search(query, self.notes, top_k)
def get_references(self) -> list:
return [n.citation for n in self.notes if n.citation]技术选型:
| 模块 | 推荐方案 |
|---|---|
| Agent编排 | LangGraph (DAG工作流) |
| 搜索 | Tavily / Serper / ArXiv API |
| 网页阅读 | Jina Reader / Firecrawl |
| 论文阅读 | GPT-4o + PDF解析 |
| LLM | Claude 3.5 / GPT-4o |
| 报告生成 | Markdown + 可选PDF导出 |
扩展性考虑:
- 流式进度:实时展示研究进展给用户
- 交互式研究:用户中途可调整方向/补充信息
- 领域专业化:不同领域使用不同的分析策略和知识源
- 缓存复用:相似主题复用已有研究成果
五、实战难题
难题1: RAG系统中如何解决长文档的"大海捞针"问题? ⭐⭐⭐
问题: 一篇100页的PDF中,某个关键信息只出现在一个段落中,常规分块检索很容易遗漏。
解决方案:
class HierarchicalIndexer:
"""层次化索引:文档级→章节级→段落级"""
def index_document(self, doc):
# 第一层:文档摘要索引
doc_summary = self.llm.summarize(doc.full_text, max_len=500)
self.index.upsert("doc", doc.id, doc_summary, {
"type": "summary", "doc_id": doc.id
})
# 第二层:章节索引(每个章节生成摘要)
for section in doc.sections:
section_summary = self.llm.summarize(section.text, max_len=200)
self.index.upsert("section", section.id, section_summary, {
"type": "section", "doc_id": doc.id, "section_id": section.id
})
# 第三层:段落级细粒度索引
for chunk in self.chunker.chunk(doc, strategy="semantic"):
self.index.upsert("chunk", chunk.id, chunk.text, {
"type": "chunk", "doc_id": doc.id,
"section_id": chunk.section_id, "page": chunk.page
})
async def retrieve(self, query, top_k=10):
# 三级检索:粗→细
# 1) 先检索文档摘要,定位目标文档
doc_hits = await self.index.search(query, level="doc", top_k=3)
# 2) 在目标文档中检索章节
section_hits = await self.index.search(
query, level="section",
filter={"doc_id": {"$in": [d.id for d in doc_hits]}}, top_k=5
)
# 3) 在目标章节中检索段落
chunk_hits = await self.index.search(
query, level="chunk",
filter={"section_id": {"$in": [s.id for s in section_hits]}},
top_k=top_k
)
return chunk_hits补充手段:
- 生成假设性问题(HyDE):为每个chunk预先生成可能的提问
- 文档结构化提取:表格、列表等结构化信息单独索引
- Parent-Child索引:检索child chunk时返回parent上下文
难题2: 如何设计一个支持10万QPS的Embedding服务? ⭐⭐⭐
问题: 大规模RAG系统需要对所有查询和文档做Embedding,吞吐量瓶颈明显。
解决方案:
class HighThroughputEmbeddingService:
def __init__(self):
# 多级缓存
self.cache = LRUCache(maxsize=1_000_000) # 本地缓存
self.redis = RedisCache() # 分布式缓存
# 动态批处理
self.batcher = DynamicBatcher(
max_batch_size=256,
max_latency_ms=50, # 最多等50ms凑批
)
# 多模型副本
self.model_pool = ModelPool(
model="bge-large-zh-v1.5",
replicas=8, # 8个GPU副本
backend="triton" # TensorRT加速
)
async def embed(self, texts: list[str]) -> list[list[float]]:
results = [None] * len(texts)
uncached_indices = []
uncached_texts = []
# 1. 查缓存
for i, text in enumerate(texts):
cached = self.cache.get(self._hash(text))
if cached is None:
cached = await self.redis.get(self._hash(text))
if cached:
results[i] = cached
else:
uncached_indices.append(i)
uncached_texts.append(text)
if not uncached_texts:
return results
# 2. 动态批处理推理
embeddings = await self.batcher.process(
texts=uncached_texts,
inference_fn=self.model_pool.infer
)
# 3. 回填缓存和结果
for idx, emb in zip(uncached_indices, embeddings):
results[idx] = emb
self.cache.set(self._hash(texts[idx]), emb)
await self.redis.set(self._hash(texts[idx]), emb, ttl=3600)
return results
class DynamicBatcher:
"""动态批处理:在延迟和吞吐间平衡"""
def __init__(self, max_batch_size, max_latency_ms):
self.queue = asyncio.Queue()
self.max_batch_size = max_batch_size
self.max_latency_ms = max_latency_ms
async def process(self, texts, inference_fn):
# 将请求放入队列
futures = []
for text in texts:
future = asyncio.Future()
await self.queue.put((text, future))
futures.append(future)
# 后台批处理worker
return await asyncio.gather(*futures)
async def _batch_worker(self, inference_fn):
while True:
batch = []
# 收集batch:达到max_batch_size或超时
try:
first = await self.queue.get()
batch.append(first)
deadline = time.time() + self.max_latency_ms / 1000
while len(batch) < self.max_batch_size:
remaining = deadline - time.time()
if remaining <= 0:
break
try:
item = await asyncio.wait_for(
self.queue.get(), timeout=remaining
)
batch.append(item)
except asyncio.TimeoutError:
break
except Exception:
continue
# 推理
texts = [b[0] for b in batch]
embeddings = await inference_fn(texts)
# 回传结果
for (_, future), emb in zip(batch, embeddings):
future.set_result(emb)关键优化点:
- 模型量化:FP16→INT8,吞吐量提升2x
- TensorRT优化:编译优化,推理速度提升3-5x
- 缓存命中率:热点query缓存命中率可达60%+
- 水平扩展:根据QPS自动扩缩GPU副本数
难题3: Agent执行过程中如何处理"幻觉"导致的错误操作? ⭐⭐⭐
问题: Agent调用工具时,LLM可能产生幻觉(如编造参数、误判结果),在自动化运维等场景可能造成严重后果。
解决方案:
class SafeAgentExecutor:
"""多层防护机制"""
def __init__(self, llm, tools, validator, sandbox):
self.llm = llm
self.tools = tools
self.validator = validator
self.sandbox = sandbox
async def execute(self, task: str):
plan = await self.llm.plan(task)
for step in plan.steps:
# 防护层1: 参数验证
validation = await self.validator.validate_params(
tool=step.tool,
params=step.params,
schema=self.tools.get_schema(step.tool)
)
if not validation.valid:
# 参数无效,让LLM重新生成
step = await self.llm.replan_step(
step, validation.error, max_retries=2
)
# 防护层2: 沙箱预执行
if step.risk_level >= RiskLevel.MEDIUM:
preview = await self.sandbox.dry_run(
tool=step.tool, params=step.params
)
# 检查预执行结果是否合理
sanity = await self.llm.evaluate(
f"操作预览:{preview},预期效果:{step.expected_result},"
f"是否合理?回答yes/no及原因。"
)
if "no" in sanity.lower():
return Error(f"预执行检查未通过: {sanity}")
# 防护层3: 执行后验证
result = await self.tools.execute(step.tool, step.params)
# 检查结果是否与预期一致
consistency = await self.llm.evaluate(
f"操作结果:{result},预期:{step.expected_result},"
f"是否一致?"
)
if "不一致" in consistency:
# 回滚
await self.tools.rollback(step)
return Error(f"结果不符合预期,已回滚: {consistency}")
return Success(plan.results)核心策略:
- Schema强制验证:工具参数必须通过JSON Schema校验
- Dry-run机制:高危操作必须先预执行
- 结果一致性检查:LLM评估执行结果是否符合预期
- 自动回滚:检测异常后自动回滚
- 操作确认:高危操作需人工二次确认
难题4: 如何解决多Agent系统中的通信死锁和资源竞争? ⭐⭐⭐
问题: 多Agent并行执行时,Agent A等待Agent B的结果,Agent B也在等待Agent A,造成死锁。
解决方案:
class AgentOrchestrator:
"""基于DAG的Agent编排引擎,避免循环依赖"""
def __init__(self):
self.graph = DAG()
self.resource_locks = {} # 资源锁管理
self.timeout = 300 # 全局超时
async def execute(self, workflow: WorkflowSpec):
# 1. 静态检查:检测循环依赖
dag = self._build_dag(workflow)
if not dag.is_acyclic():
raise CyclicDependencyError(dag.find_cycle())
# 2. 拓扑排序执行
execution_order = dag.topological_sort()
results = {}
for batch in execution_order:
# 同一批次并行执行
tasks = []
for agent_node in batch:
# 等待前置依赖完成
deps = {dep: results[dep] for dep in agent_node.dependencies}
tasks.append(
self._execute_with_timeout(agent_node, deps)
)
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for node, result in zip(batch, batch_results):
if isinstance(result, Exception):
# 单节点失败不影响其他独立节点
results[node.id] = ErrorResult(result)
# 触发降级策略
if node.critical:
return self._handle_critical_failure(node, result)
else:
results[node.id] = result
return results
async def _execute_with_timeout(self, node, deps):
"""带超时和重试的执行"""
for attempt in range(node.max_retries):
try:
return await asyncio.wait_for(
node.agent.execute(deps),
timeout=node.timeout
)
except asyncio.TimeoutError:
if attempt == node.max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
class ResourceManager:
"""资源锁管理,防止竞争"""
def __init__(self):
self.locks = {}
async def acquire(self, resource_id: str, agent_id: str,
timeout: int = 30):
lock = self.locks.setdefault(resource_id, asyncio.Lock())
try:
await asyncio.wait_for(lock.acquire(), timeout=timeout)
return ResourceLock(resource_id, agent_id)
except asyncio.TimeoutError:
raise ResourceBusyError(resource_id, agent_id)关键设计原则:
- DAG编排:用有向无环图定义Agent依赖关系,静态检测循环
- 超时机制:每个Agent执行有明确超时,避免无限等待
- 资源锁:共享资源(如数据库、文件)需加锁
- 降级策略:非关键Agent失败时可降级执行
难题5: 如何在生产环境中监控和调试Agent的执行过程? ⭐⭐
问题: Agent行为不确定性大,生产环境出问题后难以复现和排查。
解决方案:
class AgentTracer:
"""Agent执行追踪系统"""
def __init__(self):
self.trace_store = TraceStore() # ClickHouse/ES
self.metrics = PrometheusMetrics()
def trace_agent_run(self, agent_id: str, run_id: str):
"""装饰器:追踪Agent完整执行过程"""
def decorator(fn):
@wraps(fn)
async def wrapper(*args, **kwargs):
span = TraceSpan(
run_id=run_id,
agent_id=agent_id,
start_time=time.time()
)
try:
result = await fn(*args, **kwargs)
span.status = "success"
span.output = self._sanitize(result)
return result
except Exception as e:
span.status = "error"
span.error = str(e)
raise
finally:
span.end_time = time.time()
span.latency = span.end_time - span.start_time
# 记录LLM调用详情
span.llm_calls = self._get_llm_calls(run_id)
span.token_usage = self._calc_tokens(span.llm_calls)
await self.trace_store.save(span)
self.metrics.observe(agent_id, span)
return wrapper
return decorator
async def debug_run(self, run_id: str):
"""调试:回放Agent执行过程"""
trace = await self.trace_store.get(run_id)
report = []
for span in trace.spans:
report.append({
"step": span.name,
"input": span.input,
"llm_prompt": span.llm_calls[-1].prompt if span.llm_calls else None,
"llm_response": span.llm_calls[-1].response if span.llm_calls else None,
"output": span.output,
"latency": span.latency,
"tokens": span.token_usage,
"status": span.status,
})
return ExecutionReport(run_id=run_id, steps=report)
class AgentEvaluator:
"""Agent效果评估"""
async def evaluate_batch(self, test_cases: list[TestCase]):
results = []
for case in test_cases:
run_id = str(uuid.uuid4())
output = await self.agent.run(case.input, run_id=run_id)
trace = await self.tracer.debug_run(run_id)
score = await self.judge.evaluate(
input=case.input,
output=output,
expected=case.expected,
criteria=case.criteria
)
results.append(EvalResult(
case=case, output=output, score=score, trace=trace
))
return EvalReport(results=results)最佳实践:
- 全链路追踪:每次Agent运行生成完整trace,记录所有LLM调用
- 结构化日志:统一日志格式,便于查询和分析
- 关键指标监控:成功率、延迟、token消耗、工具调用成功率
- 回放调试:可回放任意一次执行的完整过程
- 定期评估:用测试集定期评估Agent质量,检测回归
六、设计题通用答题框架
面试时拿到任何系统设计题,都可以套用以下框架:
1. 需求澄清 (2分钟)
- 功能需求(做什么)
- 非功能需求(性能/可用性/规模)
- 约束条件(预算/时间/技术栈)
2. 高层架构 (5分钟)
- 画出核心模块和数据流
- 确定主要技术选型
3. 核心模块详细设计 (10分钟)
- 每个模块的职责、接口、关键算法
- 代码示例
4. 扩展性讨论 (3分钟)
- 如何水平扩展
- 故障处理和降级策略
- 未来演进方向七、Prompt 管理平台设计
Q9: 设计一个 Prompt 管理平台 ⭐⭐
需求分析:
- 支持 Prompt 的版本控制(类似 Git 的版本管理)
- A/B 测试:同时运行多个 Prompt 版本,对比效果
- 效果评估:自动计算准确率、延迟、token 消耗等指标
- 团队协作:多人协作编辑,权限管理,审核流程
- 支持 Prompt 模板化(变量替换、条件分支)
架构设计:
前端(Prompt编辑器 + 评估Dashboard)
↓
API Gateway → Prompt管理服务 → Prompt存储(版本化)
↓ ↓
A/B测试引擎 模板渲染引擎
↓ ↓
流量分配 → 多版本LLM调用 变量注入 + 条件渲染
↓
效果收集 → 评估服务 → 指标存储(ClickHouse)
↓
Dashboard展示(版本对比、趋势图、统计检验)核心模块:
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
import re
import random
@dataclass
class PromptVersion:
version_id: str
prompt_template: str
variables: list[str]
created_by: str
created_at: datetime
commit_msg: str
parent_version: str | None = None
tags: list[str] = field(default_factory=list)
@dataclass
class EvalMetric:
version_id: str
accuracy: float
avg_latency_ms: float
avg_tokens: int
sample_count: int
p50_latency: float
p99_latency: float
class PromptManager:
"""Prompt 版本管理核心"""
def __init__(self, storage, template_engine):
self.storage = storage
self.template_engine = template_engine
async def create_version(self, prompt_template: str,
commit_msg: str, author: str,
parent_version: str | None = None) -> PromptVersion:
"""创建新版本"""
variables = self._extract_variables(prompt_template)
version_id = hashlib.sha256(
f"{prompt_template}{datetime.now().isoformat()}".encode()
).hexdigest()[:12]
version = PromptVersion(
version_id=version_id,
prompt_template=prompt_template,
variables=variables,
created_by=author,
created_at=datetime.now(),
commit_msg=commit_msg,
parent_version=parent_version,
)
await self.storage.save(version)
return version
def _extract_variables(self, template: str) -> list[str]:
"""提取模板中的变量名"""
return list(set(re.findall(r"\{\{(\w+)\}\}", template)))
async def render(self, version_id: str, **kwargs) -> str:
"""渲染模板"""
version = await self.storage.get(version_id)
return self.template_engine.render(version.prompt_template, **kwargs)
async def diff(self, v1_id: str, v2_id: str) -> dict:
"""对比两个版本的差异"""
v1 = await self.storage.get(v1_id)
v2 = await self.storage.get(v2_id)
return {
"template_diff": self._text_diff(v1.prompt_template, v2.prompt_template),
"variables_added": set(v2.variables) - set(v1.variables),
"variables_removed": set(v1.variables) - set(v2.variables),
}
async def rollback(self, version_id: str, author: str) -> PromptVersion:
"""回滚到指定版本(创建新版本)"""
old = await self.storage.get(version_id)
return await self.create_version(
prompt_template=old.prompt_template,
commit_msg=f"Rollback to {version_id}",
author=author,
parent_version=version_id,
)
class ABTestEngine:
"""A/B 测试引擎"""
def __init__(self, llm_client, metrics_store):
self.llm = llm_client
self.metrics = metrics_store
self.experiments: dict[str, dict] = {}
async def create_experiment(self, name: str,
variants: list[dict],
traffic_split: list[float]) -> str:
"""
创建实验
variants: [{"version_id": "v1", "prompt": "..."}, ...]
traffic_split: [0.5, 0.5] 表示各 50% 流量
"""
exp_id = hashlib.sha256(name.encode()).hexdigest()[:8]
self.experiments[exp_id] = {
"name": name,
"variants": variants,
"traffic_split": traffic_split,
"status": "running",
}
return exp_id
async def route_request(self, exp_id: str, input_data: dict) -> dict:
"""路由请求到某个变体"""
exp = self.experiments[exp_id]
variant = self._select_variant(exp["traffic_split"], exp["variants"])
# 渲染 Prompt 并调用 LLM
prompt = variant["rendered_prompt"].format(**input_data)
start = datetime.now()
response = await self.llm.generate(prompt)
latency = (datetime.now() - start).total_seconds() * 1000
# 记录指标
await self.metrics.record(
exp_id=exp_id,
variant_id=variant["version_id"],
latency_ms=latency,
tokens=response.usage.total_tokens,
input=input_data,
output=response.text,
)
return {"variant": variant["version_id"], "response": response.text}
def _select_variant(self, splits: list[float], variants: list[dict]) -> dict:
"""按流量比例选择变体"""
r = random.random()
cumulative = 0.0
for split, variant in zip(splits, variants):
cumulative += split
if r <= cumulative:
return variant
return variants[-1]
async def analyze(self, exp_id: str) -> dict:
"""分析实验结果"""
exp = self.experiments[exp_id]
results = {}
for variant in exp["variants"]:
vid = variant["version_id"]
metrics = await self.metrics.query(exp_id, vid)
results[vid] = EvalMetric(
version_id=vid,
accuracy=metrics.get("accuracy", 0),
avg_latency_ms=metrics.get("avg_latency", 0),
avg_tokens=metrics.get("avg_tokens", 0),
sample_count=metrics.get("count", 0),
p50_latency=metrics.get("p50", 0),
p99_latency=metrics.get("p99", 0),
)
return results技术选型:
| 模块 | 推荐方案 | 备选 |
|---|---|---|
| Prompt 存储 | PostgreSQL + 版本化 | Git-based (DVC) |
| 模板引擎 | Jinja2 / 自研 | Mustache |
| 指标存储 | ClickHouse / Prometheus | InfluxDB |
| 前端 | React + Monaco Editor | - |
| A/B 测试统计 | scipy.stats + 贝叶斯检验 | - |
扩展性考虑:
- Prompt 市场:团队间共享优质 Prompt 模板
- 自动优化:基于评估结果自动调优 Prompt(DSPy 风格)
- 安全审计:Prompt 注入检测、敏感词过滤
- 多环境管理:dev/staging/prod 环境独立管理
八、多模态 RAG 系统设计
Q10: 设计一个多模态 RAG 系统 ⭐⭐⭐
需求分析:
- 同时处理图片、表格、文本三种模态
- 跨模态对齐:文字查图片、图片查文字
- 多模态 LLM 生成:结合图片和文本上下文生成回答
- 支持 PDF/Word 等复杂文档的解析
架构设计:
文档上传 → 多模态解析层
├→ 文本提取 → 文本 Embedding → 向量库
├→ 图片提取 → 视觉 Embedding (CLIP) → 向量库
└→ 表格提取 → 表格 Embedding → 向量库
↓
用户查询 → 多模态检索层
├→ 文本查询 → 文本 Embedding → 向量检索
└→ 图片查询 → 视觉 Embedding → 跨模态检索
↓
结果融合 + 重排
↓
多模态 LLM 生成(GPT-4o / Qwen-VL)
↓
回答 + 引用(含图片引用)核心模块:
import numpy as np
from dataclasses import dataclass
from enum import Enum
class Modality(Enum):
TEXT = "text"
IMAGE = "image"
TABLE = "table"
@dataclass
class MultimodalChunk:
chunk_id: str
modality: Modality
content: str # 文本内容或图片描述
image_data: bytes | None = None # 图片原始数据
embedding: np.ndarray | None = None
metadata: dict = None
class MultimodalEmbedder:
"""跨模态 Embedding"""
def __init__(self, text_model, vision_model):
self.text_model = text_model # e.g., BGE-large
self.vision_model = vision_model # e.g., CLIP
def embed_text(self, text: str) -> np.ndarray:
return self.text_model.encode(text)
def embed_image(self, image: bytes) -> np.ndarray:
return self.vision_model.encode_image(image)
def embed_table(self, table_markdown: str) -> np.ndarray:
return self.text_model.encode(table_markdown)
class MultimodalRAGSystem:
"""多模态 RAG 系统"""
def __init__(self, embedder, vector_store, doc_parser, llm):
self.embedder = embedder
self.vector_store = vector_store
self.doc_parser = doc_parser
self.llm = llm
async def index_document(self, doc_path: str):
"""索引文档:解析多模态内容并存入向量库"""
parsed = self.doc_parser.parse(doc_path)
for item in parsed.chunks:
if item.modality == Modality.TEXT:
emb = self.embedder.embed_text(item.content)
elif item.modality == Modality.IMAGE:
# 同时存储视觉 Embedding 和文本描述 Embedding
img_emb = self.embedder.embed_image(item.image_data)
desc_emb = self.embedder.embed_text(item.content)
emb = np.concatenate([img_emb, desc_emb])
elif item.modality == Modality.TABLE:
emb = self.embedder.embed_table(item.content)
item.embedding = emb
await self.vector_store.upsert(
id=item.chunk_id,
vector=emb,
content=item.content,
modality=item.modality.value,
image_data=item.image_data,
metadata=item.metadata,
)
async def query(self, question: str, query_image: bytes | None = None,
top_k: int = 5) -> dict:
"""多模态检索 + 生成"""
# 1. 多路召回
text_emb = self.embedder.embed_text(question)
text_results = await self.vector_store.search(text_emb, top_k=top_k)
results = list(text_results)
if query_image:
img_emb = self.embedder.embed_image(query_image)
img_results = await self.vector_store.search(img_emb, top_k=top_k)
results = self._merge_results(text_results, img_results)
# 2. 构建多模态上下文
context_parts = []
images = []
for r in results[:top_k]:
if r["modality"] == "text":
context_parts.append(f"[文本] {r['content']}")
elif r["modality"] == "table":
context_parts.append(f"[表格]\n{r['content']}")
elif r["modality"] == "image":
context_parts.append(f"[图片] {r['content']}")
images.append(r["image_data"])
context = "\n\n".join(context_parts)
# 3. 多模态 LLM 生成
answer = await self.llm.generate(
prompt=f"基于以下参考资料回答问题:\n{context}\n\n问题:{question}",
images=images,
)
return {"answer": answer, "sources": results[:top_k]}
def _merge_results(self, text_results, img_results) -> list:
"""融合多路检索结果(RRF)"""
scores = {}
for rank, r in enumerate(text_results):
scores[r["id"]] = scores.get(r["id"], 0) + 1.0 / (60 + rank)
for rank, r in enumerate(img_results):
scores[r["id"]] = scores.get(r["id"], 0) + 1.0 / (60 + rank)
all_results = {r["id"]: r for r in text_results + img_results}
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [all_results[rid] for rid in sorted_ids]技术选型:
| 模块 | 推荐方案 | 备选 |
|---|---|---|
| 文本 Embedding | BGE-large-zh-v1.5 | M3E, E5 |
| 视觉 Embedding | CLIP / SigLIP | E5-V |
| 文档解析 | LlamaParse / Marker | Unstructured |
| 多模态 LLM | GPT-4o / Qwen2.5-VL | Claude 3.5 |
| 向量数据库 | Milvus (多向量字段) | Qdrant, Weaviate |
扩展性考虑:
- 异步预处理:文档上传后异步解析和索引
- 图片去重:感知哈希去重,减少存储
- 表格结构化:提取为结构化数据,支持 SQL 查询
- 视频/音频支持:ASR 转文字 + 关键帧提取
九、LLM 网关设计
Q11: 设计一个 LLM 网关/API Gateway ⭐⭐
需求分析:
- 统一入口:代理多个 LLM 供应商(OpenAI/Anthropic/国产模型)
- 负载均衡:多实例间请求分发
- 限流与鉴权:按用户/API Key 限流,OAuth2 鉴权
- 模型路由:根据请求类型自动选择最优模型
- 成本控制:token 计费、预算告警、用量统计
架构设计:
客户端请求
↓
LLM Gateway
├→ 鉴权模块(API Key / OAuth2 验证)
├→ 限流模块(令牌桶 / 滑动窗口)
├→ 路由模块(模型选择策略)
│ ├→ 规则路由(按 task_type)
│ ├→ 质量路由(按 benchmark 排名)
│ └→ 成本路由(按预算优先)
├→ 负载均衡(Round-Robin / 加权 / 最小连接)
├→ 成本追踪(token 计数 + 计费)
└→ 响应缓存(语义缓存)
↓
上游 LLM 服务(OpenAI / Anthropic / 自部署)核心模块:
import time
import hashlib
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class ProviderConfig:
name: str
base_url: str
api_key: str
models: list[str]
weight: int = 1
cost_per_1k_tokens: float = 0.0
max_rpm: int = 1000
class RateLimiter:
"""滑动窗口限流器"""
def __init__(self):
self.windows: dict[str, list[float]] = defaultdict(list)
def allow(self, key: str, max_requests: int, window_seconds: int = 60) -> bool:
now = time.time()
cutoff = now - window_seconds
# 清理过期记录
self.windows[key] = [t for t in self.windows[key] if t > cutoff]
if len(self.windows[key]) >= max_requests:
return False
self.windows[key].append(now)
return True
class ModelRouter:
"""模型路由"""
def __init__(self, providers: list[ProviderConfig]):
self.providers = providers
self._provider_map: dict[str, list[ProviderConfig]] = defaultdict(list)
for p in providers:
for m in p.models:
self._provider_map[m].append(p)
def route(self, model: str, strategy: str = "round_robin") -> ProviderConfig:
candidates = self._provider_map.get(model, [])
if not candidates:
raise ValueError(f"No provider for model: {model}")
if strategy == "round_robin":
return self._round_robin(model, candidates)
elif strategy == "lowest_cost":
return min(candidates, key=lambda p: p.cost_per_1k_tokens)
elif strategy == "weighted":
return self._weighted_select(candidates)
return candidates[0]
def _round_robin(self, model: str, candidates: list) -> ProviderConfig:
idx = hash(time.time()) % len(candidates)
return candidates[idx]
def _weighted_select(self, candidates: list) -> ProviderConfig:
import random
total = sum(p.weight for p in candidates)
r = random.uniform(0, total)
cum = 0
for p in candidates:
cum += p.weight
if r <= cum:
return p
return candidates[-1]
class CostTracker:
"""成本追踪"""
def __init__(self, budget_store):
self.budget_store = budget_store
async def check_budget(self, user_id: str) -> bool:
budget = await self.budget_store.get_budget(user_id)
usage = await self.budget_store.get_usage(user_id)
return usage < budget
async def record_usage(self, user_id: str, model: str,
prompt_tokens: int, completion_tokens: int,
provider: ProviderConfig):
cost = (prompt_tokens + completion_tokens) / 1000 * provider.cost_per_1k_tokens
await self.budget_store.add_usage(user_id, cost, {
"model": model,
"provider": provider.name,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
})
class LLMGateway:
"""LLM 网关主类"""
def __init__(self, router, rate_limiter, cost_tracker, cache, auth):
self.router = router
self.rate_limiter = rate_limiter
self.cost_tracker = cost_tracker
self.cache = cache
self.auth = auth
async def handle_request(self, request: dict) -> dict:
# 1. 鉴权
user = await self.auth.verify(request["api_key"])
if not user:
return {"error": "Unauthorized"}
# 2. 限流
if not self.rate_limiter.allow(user.id, max_requests=user.rpm_limit):
return {"error": "Rate limit exceeded"}
# 3. 预算检查
if not await self.cost_tracker.check_budget(user.id):
return {"error": "Budget exceeded"}
# 4. 语义缓存
cache_key = self._semantic_cache_key(request)
if cached := await self.cache.get(cache_key):
return cached
# 5. 路由到上游
provider = self.router.route(request["model"])
response = await self._call_upstream(provider, request)
# 6. 记录成本
await self.cost_tracker.record_usage(
user.id, request["model"],
response["usage"]["prompt_tokens"],
response["usage"]["completion_tokens"],
provider,
)
# 7. 缓存响应
await self.cache.set(cache_key, response, ttl=300)
return response
def _semantic_cache_key(self, request: dict) -> str:
content = f"{request['model']}:{request['messages']}"
return hashlib.sha256(content.encode()).hexdigest()技术选型:
| 模块 | 推荐方案 | 备选 |
|---|---|---|
| API 框架 | FastAPI / Kong | Traefik |
| 限流 | Redis + 滑动窗口 | 令牌桶 |
| 鉴权 | JWT + OAuth2 | API Key |
| 缓存 | Redis (语义缓存) | Memcached |
| 监控 | Prometheus + Grafana | Datadog |
| 成本存储 | PostgreSQL | ClickHouse |
扩展性考虑:
- 语义缓存:Embedding 相似度匹配,避免重复调用
- 熔断降级:上游不可用时自动切换备用供应商
- 请求重试:指数退避 + 自动重试
- 审计日志:全量请求/响应记录,支持回溯
十、自动化测试平台设计
Q12: 设计一个 LLM 应用自动化测试平台 ⭐⭐⭐
需求分析:
- Prompt 回归测试:Prompt 修改后自动检测效果是否退化
- 模型版本对比:对比不同模型版本的输出质量
- 自动化评估:LLM-as-Judge + 人工标注混合评估
- CI/CD 集成:PR 合并前自动运行测试,阻断质量下降
架构设计:
CI/CD 触发(PR / 定时 / 手动)
↓
测试调度器 → 测试用例管理
↓
执行引擎(并发运行测试用例)
├→ Prompt 回归测试
├→ 模型对比测试
└→ 集成测试(端到端)
↓
评估引擎
├→ LLM-as-Judge(GPT-4o 评分)
├→ 规则评估(正则/关键词/格式)
└→ 人工标注队列
↓
报告生成 → CI 状态更新(Pass/Fail)
↓
Dashboard(趋势、对比、告警)核心模块:
import asyncio
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class TestStatus(Enum):
PASS = "pass"
FAIL = "fail"
ERROR = "error"
@dataclass
class TestCase:
id: str
name: str
input: str
expected_output: str | None = None
criteria: str | None = None # 自然语言评估标准
tags: list[str] = None
threshold: float = 0.8 # 通过阈值
@dataclass
class TestResult:
test_id: str
status: TestStatus
score: float
actual_output: str
latency_ms: float
tokens_used: int
judge_reason: str = ""
class LLMEvaluator:
"""LLM-as-Judge 评估器"""
def __init__(self, judge_llm):
self.judge = judge_llm
async def evaluate(self, input_text: str, output: str,
expected: str | None = None,
criteria: str | None = None) -> tuple[float, str]:
"""评估输出质量,返回 (score, reason)"""
prompt = f"""请评估以下 AI 回答的质量(0-1分)。
用户输入:{input_text}
AI 回答:{output}
"""
if expected:
prompt += f"期望回答:{expected}\n"
if criteria:
prompt += f"评估标准:{criteria}\n"
prompt += """
请返回 JSON 格式:
{"score": 0.0-1.0, "reason": "评估理由"}
"""
resp = await self.judge.generate(prompt)
# 解析 JSON 响应
import json
result = json.loads(resp)
return result["score"], result["reason"]
class TestRunner:
"""测试执行引擎"""
def __init__(self, app_under_test, evaluator, max_concurrency: int = 10):
self.app = app_under_test
self.evaluator = evaluator
self.semaphore = asyncio.Semaphore(max_concurrency)
async def run_single(self, case: TestCase) -> TestResult:
async with self.semaphore:
# 1. 执行被测应用
start = datetime.now()
output = await self.app.run(case.input)
latency = (datetime.now() - start).total_seconds() * 1000
# 2. 评估
score, reason = await self.evaluator.evaluate(
input_text=case.input,
output=output,
expected=case.expected_output,
criteria=case.criteria,
)
# 3. 判定
status = TestStatus.PASS if score >= case.threshold else TestStatus.FAIL
return TestResult(
test_id=case.id,
status=status,
score=score,
actual_output=output,
latency_ms=latency,
tokens_used=0, # 从 app 返回
judge_reason=reason,
)
async def run_suite(self, cases: list[TestCase]) -> dict:
"""运行测试套件"""
results = await asyncio.gather(*[self.run_single(c) for c in cases])
passed = sum(1 for r in results if r.status == TestStatus.PASS)
failed = sum(1 for r in results if r.status == TestStatus.FAIL)
avg_score = sum(r.score for r in results) / len(results)
return {
"total": len(results),
"passed": passed,
"failed": failed,
"pass_rate": passed / len(results),
"avg_score": avg_score,
"avg_latency_ms": sum(r.latency_ms for r in results) / len(results),
"results": results,
}
def compare_runs(self, baseline: dict, current: dict) -> dict:
"""对比两次运行结果"""
score_diff = current["avg_score"] - baseline["avg_score"]
latency_diff = current["avg_latency_ms"] - baseline["avg_latency_ms"]
return {
"score_change": score_diff,
"latency_change_ms": latency_diff,
"regression": score_diff < -0.05, # 分数下降超 5% 视为回归
"pass_rate_change": current["pass_rate"] - baseline["pass_rate"],
}技术选型:
| 模块 | 推荐方案 | 备选 |
|---|---|---|
| 测试框架 | pytest + asyncio | unittest |
| LLM-as-Judge | GPT-4o / Claude | DeepSeek |
| 测试用例管理 | PostgreSQL + YAML | - |
| CI/CD 集成 | GitHub Actions / GitLab CI | Jenkins |
| 报告 | Allure / 自研 Dashboard | - |
扩展性考虑:
- 人工标注:争议样本进入人工标注队列
- 测试用例自动生成:用 LLM 生成边界用例
- 性能基准:记录历史数据,检测性能退化
- 金丝雀测试:生产流量影子测试
十一、对话系统设计
Q13: 设计一个智能对话系统 ⭐⭐
需求分析:
- 多轮对话管理:维护对话状态和上下文
- 上下文压缩:长对话场景下压缩历史信息
- 意图识别:理解用户真实意图
- 槽位填充:提取关键参数(如时间、地点、数量)
- 支持多种对话场景(客服、预约、查询等)
架构设计:
用户消息
↓
NLU 层(意图识别 + 槽位填充)
↓
对话管理器(Dialogue State Tracking)
├→ 上下文管理(滑动窗口 / 摘要压缩)
├→ 状态追踪(当前意图 + 已填槽位)
└→ 策略选择(询问 / 执行 / 转人工)
↓
执行层(调用后端 API / 查询数据库)
↓
NLG 层(LLM 自然语言生成)
↓
回复用户核心模块:
import json
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Slot:
name: str
required: bool = True
prompt: str = "" # 询问该槽位的提示语
validator: str = "" # 验证规则
@dataclass
class Intent:
name: str
slots: list[Slot] = field(default_factory=list)
handler: str = "" # 处理函数名
@dataclass
class DialogueState:
session_id: str
current_intent: str | None = None
filled_slots: dict = field(default_factory=dict)
history: list[dict] = field(default_factory=list)
context_summary: str = ""
class NLUEngine:
"""自然语言理解:意图识别 + 槽位填充"""
def __init__(self, llm, intent_definitions: list[Intent]):
self.llm = llm
self.intents = {i.name: i for i in intent_definitions}
async def parse(self, text: str, history: list[dict]) -> dict:
intent_list = "\n".join(f"- {name}: {[s.name for s in intent.slots]}"
for name, intent in self.intents.items())
prompt = f"""分析用户意图并提取关键参数。
可选意图:
{intent_list}
对话历史:
{json.dumps(history[-6:], ensure_ascii=False)}
用户输入:{text}
返回 JSON:
{{"intent": "意图名", "slots": {{"参数名": "参数值"}}, "confidence": 0.9}}
"""
result = await self.llm.generate(prompt)
return json.loads(result)
class DialogueManager:
"""对话管理器"""
def __init__(self, nlu, llm, state_store, max_history: int = 10):
self.nlu = nlu
self.llm = llm
self.state_store = state_store
self.max_history = max_history
async def handle_message(self, session_id: str, user_msg: str) -> str:
# 1. 加载对话状态
state = await self.state_store.get(session_id)
if not state:
state = DialogueState(session_id=session_id)
# 2. NLU 解析
parsed = await self.nlu.parse(user_msg, state.history)
# 3. 更新状态
if parsed.get("intent"):
state.current_intent = parsed["intent"]
if parsed.get("slots"):
state.filled_slots.update(parsed["slots"])
# 4. 检查是否需要追问(槽位未填满)
intent_def = self.nlu.intents.get(state.current_intent)
if intent_def:
missing = self._find_missing_slots(intent_def, state.filled_slots)
if missing:
response = missing[0].prompt # 询问缺失槽位
state.history.append({"role": "user", "content": user_msg})
state.history.append({"role": "assistant", "content": response})
await self.state_store.save(session_id, state)
return response
# 5. 所有槽位已填充,执行任务
response = await self._execute_intent(state)
# 6. 更新历史(带压缩)
state.history.append({"role": "user", "content": user_msg})
state.history.append({"role": "assistant", "content": response})
if len(state.history) > self.max_history * 2:
await self._compress_history(state)
await self.state_store.save(session_id, state)
return response
def _find_missing_slots(self, intent: Intent, filled: dict) -> list[Slot]:
return [s for s in intent.slots if s.required and s.name not in filled]
async def _execute_intent(self, state: DialogueState) -> str:
"""执行意图处理"""
prompt = f"""根据用户意图和参数生成回复。
意图:{state.current_intent}
参数:{json.dumps(state.filled_slots, ensure_ascii=False)}
上下文摘要:{state.context_summary}
请生成自然、友好的回复。"""
return await self.llm.generate(prompt)
async def _compress_history(self, state: DialogueState):
"""上下文压缩:将旧历史生成摘要"""
old_history = state.history[:-self.max_history]
recent = state.history[-self.max_history:]
summary_prompt = f"请用 3 句话概括以下对话的要点:\n{json.dumps(old_history, ensure_ascii=False)}"
state.context_summary = await self.llm.generate(summary_prompt)
state.history = recent技术选型:
| 模块 | 推荐方案 | 备选 |
|---|---|---|
| NLU | LLM (GPT-4o) | Rasa NLU |
| 对话管理 | 自研 + LangGraph | Rasa Core |
| 状态存储 | Redis | PostgreSQL |
| NLG | LLM 生成 | 模板 + LLM 混合 |
扩展性考虑:
- 多语言:检测语言后路由到对应 LLM
- 情感分析:检测用户情绪,调整回复风格
- 人机协作:复杂问题无缝转接人工
- 多模态:支持语音输入(ASR)和图片理解
十二、知识图谱增强 RAG 系统设计
Q14: 设计一个知识图谱增强的 RAG 系统 ⭐⭐⭐
需求分析:
- 自动从文档中构建知识图谱(实体、关系抽取)
- 图查询 + 向量检索融合,提供多跳推理能力
- 推理链展示:展示从问题到答案的推理路径
- 支持增量更新和图谱质量校验
架构设计:
文档 → 知识图谱构建层
├→ 实体抽取(NER)
├→ 关系抽取(RE)
└→ 实体消歧 & 融合
↓
知识图谱(Neo4j) + 向量库(Milvus)
↓
用户查询 → 混合检索层
├→ 向量检索(语义相似文档)
├→ 图查询(实体关系遍历)
└→ 子图检索(相关子图提取)
↓
融合排序 + 推理链构建
↓
LLM 生成(带推理链的回答)核心模块:
from dataclasses import dataclass
@dataclass
class Entity:
id: str
name: str
entity_type: str
properties: dict
embedding: list[float] | None = None
@dataclass
class Relation:
source_id: str
target_id: str
relation_type: str
properties: dict
class KnowledgeGraphBuilder:
"""知识图谱构建"""
def __init__(self, llm, graph_store, ner_model):
self.llm = llm
self.graph = graph_store
self.ner = ner_model
async def build_from_document(self, doc_text: str):
"""从文档构建知识图谱"""
# 1. 实体抽取
entities = await self._extract_entities(doc_text)
# 2. 关系抽取
relations = await self._extract_relations(doc_text, entities)
# 3. 实体消歧
entities = await self._disambiguate(entities)
# 4. 写入图数据库
for e in entities:
await self.graph.upsert_entity(e)
for r in relations:
await self.graph.upsert_relation(r)
async def _extract_entities(self, text: str) -> list[Entity]:
prompt = f"""从以下文本中抽取实体,返回 JSON 列表。
每个实体包含:name(实体名)、type(类型:人名/地名/组织/概念/技术/产品)。
文本:{text[:2000]}
返回:[{{"name": "...", "type": "..."}}]"""
result = await self.llm.generate(prompt)
raw = __import__("json").loads(result)
return [Entity(
id=self._make_id(e["name"]),
name=e["name"],
entity_type=e["type"],
properties={},
) for e in raw]
async def _extract_relations(self, text: str, entities: list[Entity]) -> list[Relation]:
entity_names = [e.name for e in entities]
prompt = f"""基于以下文本和实体列表,抽取实体间的关系。
实体:{entity_names}
文本:{text[:2000]}
返回 JSON 列表:
[{{"source": "实体A", "target": "实体B", "relation": "关系类型"}}]"""
result = await self.llm.generate(prompt)
raw = __import__("json").loads(result)
return [Relation(
source_id=self._make_id(r["source"]),
target_id=self._make_id(r["target"]),
relation_type=r["relation"],
properties={},
) for r in raw]
def _make_id(self, name: str) -> str:
import hashlib
return hashlib.md5(name.encode()).hexdigest()[:10]
async def _disambiguate(self, entities: list[Entity]) -> list[Entity]:
"""简单消歧:同名实体合并"""
seen = {}
for e in entities:
if e.name in seen:
continue
seen[e.name] = e
return list(seen.values())
class KGRetriever:
"""知识图谱增强检索"""
def __init__(self, graph_store, vector_store, embedder):
self.graph = graph_store
self.vector = vector_store
self.embedder = embedder
async def retrieve(self, query: str, top_k: int = 5) -> dict:
# 1. 向量检索
query_emb = self.embedder.encode(query)
vec_results = await self.vector.search(query_emb, top_k=top_k)
# 2. 图查询:提取 query 中的实体,遍历关系
entities = await self._extract_query_entities(query)
graph_results = []
reasoning_paths = []
for entity_name in entities:
subgraph = await self.graph.get_neighbors(entity_name, max_hops=2)
graph_results.extend(subgraph["nodes"])
reasoning_paths.append(subgraph["path"])
# 3. 融合
return {
"vector_context": [r["text"] for r in vec_results],
"graph_context": [n["name"] + ": " + n.get("description", "")
for n in graph_results],
"reasoning_paths": reasoning_paths,
}
async def _extract_query_entities(self, query: str) -> list[str]:
"""从查询中提取实体"""
# 简化实现:实际可用 NER 模型
return [query[:10]] # 占位
class KGRagSystem:
"""知识图谱增强 RAG 系统"""
def __init__(self, retriever, llm):
self.retriever = retriever
self.llm = llm
async def query(self, question: str) -> dict:
# 1. 混合检索
context = await self.retriever.retrieve(question)
# 2. 构建带推理链的 Prompt
reasoning = ""
if context["reasoning_paths"]:
reasoning = "推理路径:\n"
for path in context["reasoning_paths"]:
reasoning += " → ".join(path) + "\n"
prompt = f"""基于以下信息回答问题,并展示推理过程。
文档信息:
{chr(10).join(context['vector_context'])}
知识图谱信息:
{chr(10).join(context['graph_context'])}
{reasoning}
问题:{question}
请给出带推理链的回答。"""
answer = await self.llm.generate(prompt)
return {"answer": answer, "context": context}技术选型:
| 模块 | 推荐方案 | 备选 |
|---|---|---|
| 图数据库 | Neo4j / NebulaGraph | ArangoDB |
| NER/RE | LLM (GPT-4o) | spaCy, UIE |
| 向量库 | Milvus | Qdrant |
| Embedding | BGE-large-zh | E5 |
| LLM | GPT-4o / Qwen-Max | DeepSeek-V3 |
扩展性考虑:
- 图谱质量校验:自动检测孤立节点、矛盾关系
- 增量更新:文档变更时增量更新图谱
- 社区检测:识别图谱中的知识聚类
- 图谱可视化:前端展示知识图谱和推理路径
十三、AI 代码审查系统设计
Q15: 设计一个 AI 代码审查系统 ⭐⭐
需求分析:
- 代码分析:理解代码逻辑,发现潜在 Bug
- 安全检测:SQL 注入、XSS、敏感信息泄露等
- 风格检查:命名规范、代码复杂度、最佳实践
- 自动建议:生成修复建议和代码改进方案
- 集成 GitHub/GitLab PR 流程
架构设计:
PR Webhook 触发
↓
代码审查服务
├→ 代码预处理(Diff 解析、AST 分析)
├→ 规则引擎(静态分析 + 安全扫描)
│ ├→ Semgrep / Bandit(安全规则)
│ ├→ Ruff / Pylint(风格规则)
│ └→ 自定义规则
├→ LLM 分析层
│ ├→ 代码理解(上下文收集)
│ ├→ Bug 检测(逻辑错误)
│ └→ 改进建议生成
└→ 结果聚合
↓
PR Comment(行级评论 + 总结)核心模块:
import json
from dataclasses import dataclass
@dataclass
class CodeIssue:
file_path: str
line_number: int
severity: str # "error" / "warning" / "info"
category: str # "bug" / "security" / "style" / "performance"
message: str
suggestion: str = ""
rule_id: str = ""
class DiffParser:
"""解析 Git Diff"""
def parse(self, diff_text: str) -> list[dict]:
"""解析 diff,返回变更文件和行信息"""
files = []
current_file = None
for line in diff_text.split("\n"):
if line.startswith("+++ b/"):
current_file = {"path": line[6:], "changes": []}
files.append(current_file)
elif line.startswith("@@"):
# 解析行号
parts = line.split(" ")
new_start = int(parts[2].split(",")[0].replace("+", ""))
current_file["current_line"] = new_start
elif current_file and line.startswith("+") and not line.startswith("+++"):
current_file["changes"].append({
"type": "add",
"line": current_file["current_line"],
"content": line[1:],
})
current_file["current_line"] += 1
elif current_file and line.startswith("-") and not line.startswith("---"):
current_file["changes"].append({
"type": "delete",
"content": line[1:],
})
elif current_file:
current_file["current_line"] = current_file.get("current_line", 0) + 1
return files
class RuleEngine:
"""静态规则引擎"""
def __init__(self):
self.rules = [
self._check_sql_injection,
self._check_hardcoded_secrets,
self._check_eval_usage,
self._check_complexity,
]
def analyze(self, file_path: str, content: str) -> list[CodeIssue]:
issues = []
for rule in self.rules:
issues.extend(rule(file_path, content))
return issues
def _check_sql_injection(self, path: str, content: str) -> list[CodeIssue]:
issues = []
for i, line in enumerate(content.split("\n"), 1):
if "f\"" in line and ("SELECT" in line.upper() or "INSERT" in line.upper()):
issues.append(CodeIssue(
file_path=path, line_number=i,
severity="error", category="security",
message="可能存在 SQL 注入风险:使用 f-string 拼接 SQL",
suggestion="使用参数化查询代替字符串拼接",
rule_id="SEC001",
))
return issues
def _check_hardcoded_secrets(self, path: str, content: str) -> list[CodeIssue]:
import re
issues = []
patterns = [
(r'(?:password|secret|api_key)\s*=\s*["\'][^"\']{8,}', "硬编码密钥"),
(r'sk-[a-zA-Z0-9]{20,}', "OpenAI API Key"),
]
for i, line in enumerate(content.split("\n"), 1):
for pattern, desc in patterns:
if re.search(pattern, line, re.IGNORECASE):
issues.append(CodeIssue(
file_path=path, line_number=i,
severity="error", category="security",
message=f"检测到硬编码敏感信息:{desc}",
suggestion="使用环境变量或密钥管理服务",
rule_id="SEC002",
))
return issues
def _check_eval_usage(self, path: str, content: str) -> list[CodeIssue]:
issues = []
for i, line in enumerate(content.split("\n"), 1):
if "eval(" in line and "safe" not in line.lower():
issues.append(CodeIssue(
file_path=path, line_number=i,
severity="warning", category="security",
message="使用 eval() 可能存在代码注入风险",
suggestion="使用 ast.literal_eval() 或其他安全替代方案",
rule_id="SEC003",
))
return issues
def _check_complexity(self, path: str, content: str) -> list[CodeIssue]:
issues = []
for i, line in enumerate(content.split("\n"), 1):
indent = len(line) - len(line.lstrip())
if indent > 20:
issues.append(CodeIssue(
file_path=path, line_number=i,
severity="info", category="style",
message="嵌套层级过深,建议重构",
suggestion="考虑使用 early return 或提取子函数",
rule_id="STYLE001",
))
return issues
class LLMCodeAnalyzer:
"""LLM 代码分析"""
def __init__(self, llm):
self.llm = llm
async def analyze(self, file_path: str, diff_content: str,
full_content: str) -> list[CodeIssue]:
prompt = f"""你是一位资深代码审查专家。请审查以下代码变更。
文件:{file_path}
变更内容:
完整文件上下文:
请从以下角度审查:
1. 潜在 Bug 和逻辑错误
2. 性能问题
3. 代码可读性和维护性
返回 JSON 列表:
[{{"line": 行号, "severity": "error/warning/info", "category": "类别",
"message": "问题描述", "suggestion": "修复建议"}}]
"""
result = await self.llm.generate(prompt)
raw_issues = json.loads(result)
return [CodeIssue(
file_path=file_path,
line_number=issue["line"],
severity=issue["severity"],
category=issue["category"],
message=issue["message"],
suggestion=issue.get("suggestion", ""),
rule_id="LLM",
) for issue in raw_issues]
class CodeReviewService:
"""代码审查主服务"""
def __init__(self, rule_engine, llm_analyzer, diff_parser, pr_client):
self.rule_engine = rule_engine
self.llm_analyzer = llm_analyzer
self.diff_parser = diff_parser
self.pr_client = pr_client
async def review_pr(self, repo: str, pr_number: int):
"""审查 PR"""
# 1. 获取 PR diff
diff = await self.pr_client.get_diff(repo, pr_number)
files = self.diff_parser.parse(diff)
all_issues = []
for f in files:
path = f["path"]
content = "\n".join(c["content"] for c in f["changes"])
# 2. 规则引擎检查
rule_issues = self.rule_engine.analyze(path, content)
all_issues.extend(rule_issues)
# 3. LLM 深度分析
full_content = await self.pr_client.get_file_content(repo, path)
llm_issues = await self.llm_analyzer.analyze(path, content, full_content)
all_issues.extend(llm_issues)
# 4. 去重和排序
all_issues = self._deduplicate(all_issues)
all_issues.sort(key=lambda x: {"error": 0, "warning": 1, "info": 2}[x.severity])
# 5. 发布评论
await self._post_comments(repo, pr_number, all_issues)
return all_issues
def _deduplicate(self, issues: list[CodeIssue]) -> list[CodeIssue]:
seen = set()
unique = []
for issue in issues:
key = (issue.file_path, issue.line_number, issue.rule_id)
if key not in seen:
seen.add(key)
unique.append(issue)
return unique
async def _post_comments(self, repo: str, pr_number: int,
issues: list[CodeIssue]):
"""发布 PR 行级评论"""
summary = f"## 🤖 AI 代码审查报告\n\n"
summary += f"发现 **{len(issues)}** 个问题:\n"
summary += f"- 🔴 错误: {sum(1 for i in issues if i.severity == 'error')}\n"
summary += f"- 🟡 警告: {sum(1 for i in issues if i.severity == 'warning')}\n"
summary += f"- 🔵 建议: {sum(1 for i in issues if i.severity == 'info')}\n"
await self.pr_client.post_comment(repo, pr_number, summary)
for issue in issues:
await self.pr_client.post_review_comment(
repo, pr_number, issue.file_path, issue.line_number,
f"**[{issue.severity.upper()}]** {issue.message}\n\n"
f"💡 {issue.suggestion}"
)技术选型:
| 模块 | 推荐方案 | 备选 |
|---|---|---|
| 静态分析 | Semgrep + Bandit | SonarQube |
| 风格检查 | Ruff | Pylint, Flake8 |
| LLM | GPT-4o / Claude 3.5 | DeepSeek-Coder |
| 代码索引 | Tree-sitter | AST |
| PR 集成 | GitHub API / GitLab API | - |
扩展性考虑:
- 自定义规则:团队可定义自己的审查规则
- 学习模式:从人工 Review 中学习团队偏好
- 增量审查:只审查变更文件,优化性能
- 多语言支持:通过 Tree-sitter 支持多种编程语言