Skip to content

21. 系统设计题

面向大模型应用工程师/Agent开发工程师的高频系统设计面试题 每道题包含:需求分析、架构设计、核心模块、技术选型、扩展性考虑


一、RAG 系统设计题

Q1: 设计一个企业知识库问答系统 ⭐⭐

需求分析:

  • 支持文档上传(PDF/Word/Markdown/HTML),增量更新
  • 多轮对话,带上下文理解
  • 准确率 > 90%,延迟 < 3s
  • 支持权限隔离(不同部门看到不同文档)
  • 日活 1000+ 用户

架构设计:

用户请求 → API Gateway → Query理解模块 → 检索模块 → 重排模块 → LLM生成 → 答案+引用
                                ↓                ↓           ↓
                          意图识别/改写     向量DB+ES混合检索  Cross-Encoder

                                          文档处理流水线
                                    解析→分块→Embedding→入库

核心模块与代码示例:

python
# 1. 文档处理流水线
class DocumentPipeline:
    def __init__(self):
        self.parsers = {
            '.pdf': PDFParser(),
            '.docx': DocxParser(),
            '.md': MarkdownParser(),
        }
        self.chunker = SemanticChunker(
            chunk_size=512,
            overlap=64,
            separators=["\n\n", "\n", "。", ";"]
        )
        self.embedder = EmbeddingModel("bge-large-zh-v1.5")

    async def process(self, file_path: str, metadata: dict):
        # 解析
        ext = os.path.splitext(file_path)[1]
        doc = self.parsers[ext].parse(file_path)
        # 分块
        chunks = self.chunker.chunk(doc)
        # Embedding并入库
        for chunk in chunks:
            embedding = self.embedder.encode(chunk.text)
            await self.vector_db.upsert(
                vector=embedding,
                text=chunk.text,
                metadata={**metadata, "chunk_id": chunk.id,
                          "page": chunk.page}
            )

# 2. 混合检索模块
class HybridRetriever:
    def __init__(self, vector_db, es_client, reranker):
        self.vector_db = vector_db
        self.es = es_client
        self.reranker = reranker

    async def retrieve(self, query: str, user_dept: str, top_k=10):
        filter_ = {"dept": {"$in": [user_dept, "all"]}}
        # 向量检索
        vec_results = await self.vector_db.search(
            vector=self.embedder.encode(query),
            filter=filter_, top_k=20
        )
        # 关键词检索
        es_results = await self.es.search(
            index="knowledge", query={"match": {"text": query}},
            filter={"term": {"dept": user_dept}}, size=20
        )
        # RRF融合
        merged = reciprocal_rank_fusion(vec_results, es_results)
        # 重排
        reranked = self.reranker.rerank(
            query=query,
            documents=[r.text for r in merged[:30]]
        )
        return reranked[:top_k]

技术选型:

模块推荐方案备选
向量数据库Milvus / QdrantWeaviate, Pinecone
全文检索ElasticsearchOpenSearch
EmbeddingBGE-large-zhM3E, text-embedding-3-large
重排模型bge-reranker-v2-m3Cohere Rerank
LLMGPT-4o / Qwen-MaxDeepSeek-V3, Claude
文档解析UnstructuredLlamaParse

扩展性考虑:

  • 增量索引:文档变更触发CDC,异步更新向量库
  • 缓存层:Redis缓存高频query的检索结果
  • 评估闭环:用户反馈→标注→微调Embedding/重排模型
  • 多语言:检测语言后路由到对应Embedding模型

Q2: 设计一个多模态RAG系统 ⭐⭐⭐

需求分析:

  • 同时处理文本、图片、表格、图表
  • 用户可以上传图片提问(如"这张图里的数据说明什么")
  • 需要理解文档中的图表与文字的关联关系
  • 支持跨模态检索(文字查图片,图片查文字)

架构设计:

输入(文本/图片/文档)

多模态解析层:文档解析→文本块 / 图片提取 / 表格提取 / OCR

多模态Embedding层:文本Embedding + 视觉Embedding (CLIP/SigLIP)

统一向量空间存储 (Milvus, 多向量字段)

检索层:跨模态检索 + 多路召回融合

多模态LLM生成 (GPT-4o/Qwen-VL) ← 上下文拼接(文本+图片base64)

回答 + 来源引用(含图片引用)

核心模块:

python
class MultiModalRAG:
    def __init__(self):
        self.text_embedder = SentenceTransformer("bge-large-zh-v1.5")
        self.visual_embedder = SentenceTransformer("clip-vit-large-patch14")
        self.vision_llm = VisionLLM("gpt-4o")
        self.doc_parser = MultiModalParser()  # 基于pdfplumber+PIL

    async def index_document(self, doc_path: str):
        parsed = self.doc_parser.parse(doc_path)
        for item in parsed:
            if item.type == "text":
                emb = self.text_embedder.encode(item.content)
                await self.vdb.upsert(
                    vector=emb, modality="text",
                    content=item.content, metadata=item.metadata
                )
            elif item.type == "image":
                img_emb = self.visual_embedder.encode(item.image)
                # 同时存储图片描述(由VLM生成)
                caption = await self.vision_llm.describe(item.image)
                txt_emb = self.text_embedder.encode(caption)
                await self.vdb.upsert(
                    vector=img_emb, secondary_vector=txt_emb,
                    modality="image", image_path=item.path,
                    caption=caption, metadata=item.metadata
                )
            elif item.type == "table":
                table_text = item.to_markdown()
                emb = self.text_embedder.encode(table_text)
                await self.vdb.upsert(
                    vector=emb, modality="table",
                    content=table_text, metadata=item.metadata
                )

    async def query(self, question: str, image=None):
        if image:
            # 图片查询:用视觉Embedding检索相似图片+相关文本
            q_vec = self.visual_embedder.encode(image)
            results = await self.vdb.search(q_vec, top_k=5)
            txt_vec = self.text_embedder.encode(question)
            txt_results = await self.vdb.search(txt_vec, top_k=5)
            results = self._merge(results, txt_results)
        else:
            q_vec = self.text_embedder.encode(question)
            results = await self.vdb.search(q_vec, top_k=10)

        # 构建多模态prompt
        context = self._build_context(results)  # 文本+图片base64
        answer = await self.vision_llm.generate(
            prompt=f"基于以下参考资料回答:\n{context}\n\n问题:{question}"
        )
        return answer, results

技术选型:

模块推荐方案
多模态EmbeddingCLIP / SigLIP / E5-V
文档解析LlamaParse / Unstructured / Marker
表格提取pdfplumber + Camelot
图片理解GPT-4o / Qwen2.5-VL
向量存储Milvus (支持多向量字段)

扩展性考虑:

  • 图片去重与相似度聚类,减少冗余存储
  • 表格结构化存储,支持SQL查询
  • 异步预处理:文档上传后异步解析、索引
  • 支持视频/音频:ASR转文字 + 关键帧提取

二、Agent 系统设计题

Q3: 设计一个自动化运维Agent ⭐⭐⭐

需求分析:

  • 根据告警自动诊断根因并执行修复操作
  • 支持自然语言指令(如"检查prod集群的CPU使用率")
  • 操作前需人工确认(高危操作)
  • 操作日志可审计,支持回滚
  • 安全性:最小权限原则,沙箱执行

架构设计:

告警/用户指令

意图解析 → 任务规划 (ReAct/Plan-and-Execute)

工具调用层:K8s API / Shell / 监控查询 / 日志查询 / 数据库
    ↓              ↓
安全沙箱     审批网关(高危操作需人工确认)
    ↓              ↓
执行引擎 ←←←←←←←←←┘

结果评估 → 是否需要后续操作 → 循环/结束

操作日志 + 通知

核心模块:

python
class OpsAgent:
    def __init__(self, llm, tools, safety_checker, approval_gateway):
        self.llm = llm
        self.tools = tools
        self.safety = safety_checker
        self.approval = approval_gateway
        self.memory = ConversationBufferMemory()

    TOOLS = [
        {"name": "kubectl", "desc": "执行K8s命令", "risk": "high"},
        {"name": "query_metrics", "desc": "查询Prometheus指标", "risk": "low"},
        {"name": "query_logs", "desc": "查询日志(ES/Loki)", "risk": "low"},
        {"name": "run_sql", "desc": "执行SQL查询", "risk": "medium"},
        {"name": "execute_shell", "desc": "执行Shell命令", "risk": "high"},
        {"name": "restart_service", "desc": "重启服务", "risk": "high"},
    ]

    async def handle(self, user_input: str):
        # 1. 意图解析与任务规划
        plan = await self.llm.plan(
            system_prompt=self._build_system_prompt(),
            user_input=user_input,
            tools=self.TOOLS,
            history=self.memory.get()
        )

        # 2. 逐步执行
        for step in plan.steps:
            # 安全检查
            if not self.safety.check(step):
                return f"安全检查未通过: {step.risk_reason}"

            # 高危操作需审批
            if step.risk_level == "high":
                approved = await self.approval.request(
                    action=step.description,
                    user=user_input,
                    dry_run=step.preview()
                )
                if not approved:
                    return "操作未获批准"

            # 在沙箱中执行
            result = await self.tools.execute(
                tool=step.tool,
                params=step.params,
                sandbox=True,
                timeout=60
            )
            self.memory.add(step, result)

            # 评估是否需要继续
            if result.status == "error":
                # 尝试自动修复或上报
                recovery = await self.llm.recover(step, result)
                if recovery:
                    continue
                return self._escalate(step, result)

        return self._summarize(plan, self.memory.get())

    def _build_system_prompt(self):
        return """你是一个资深运维工程师Agent。
        规则:
        1. 先诊断,后操作
        2. 高危操作必须先dry-run预览
        3. 每步操作必须有明确目的和预期结果
        4. 出错时尝试回滚
        5. 记录完整操作日志"""

技术选型:

模块推荐方案
Agent框架LangGraph / AutoGen
LLMGPT-4o / Claude 3.5 (强推理)
沙箱Docker容器 / nsjail
审批飞书/钉钉审批流 + 超时自动拒绝
监控Prometheus + Grafana
日志审计ELK / Loki

扩展性考虑:

  • Runbook知识库:历史故障修复方案作为few-shot
  • 灰度执行:先在staging验证,再prod执行
  • 自学习:成功案例自动沉淀为标准操作流程
  • 多集群支持:统一抽象不同K8s集群/云厂商API

Q4: 设计一个代码助手Agent ⭐⭐

需求分析:

  • IDE内嵌(VS Code插件),实时代码补全和对话
  • 支持多文件上下文理解(整个项目级别)
  • 代码生成、解释、重构、Bug修复、单测生成
  • 延迟要求:补全 < 500ms,对话 < 3s
  • 日活 10000+ 开发者

架构设计:

VS Code插件
    ├→ 补全请求 → 本地缓存/小模型快速补全 → 返回
    └→ 对话请求 → 上下文收集 → Agent后端

                    代码索引(AST+Embedding)

                    检索相关代码片段

                    LLM推理(GPT-4o/Claude)

                    代码后处理(格式化/lint)

核心模块:

python
class CodeAssistantAgent:
    def __init__(self):
        self.indexer = CodeIndexer()       # 代码索引
        self.retriever = CodeRetriever()   # 代码检索
        self.llm = LLMRouter()            # 路由到不同模型
        self.context_builder = ContextBuilder()

    async def complete(self, cursor_context: CursorContext):
        """代码补全 - 低延迟场景"""
        # 本地缓存命中
        cache_key = self._cache_key(cursor_context)
        if cached := await self.cache.get(cache_key):
            return cached

        # 使用小模型快速补全
        prompt = self.context_builder.build_completion_prompt(
            prefix=cursor_context.prefix,
            suffix=cursor_context.suffix,
            language=cursor_context.language
        )
        result = await self.llm.route(
            task_type="completion",  # 路由到codestral/本地小模型
            prompt=prompt, max_tokens=128
        )
        await self.cache.set(cache_key, result, ttl=300)
        return result

    async def chat(self, message: str, workspace: WorkspaceContext):
        """对话 - 质量优先场景"""
        # 1. 收集上下文
        active_file = workspace.active_file
        relevant_files = await self.retriever.find_relevant(
            query=message,
            file_tree=workspace.file_tree,
            recent_files=workspace.recent_files,
            top_k=5
        )

        # 2. 构建项目级上下文
        context = self.context_builder.build_chat_context(
            active_file=active_file,
            relevant_files=relevant_files,
            git_diff=workspace.git_diff,
            message=message
        )

        # 3. LLM生成
        response = await self.llm.route(
            task_type="chat",  # 路由到GPT-4o/Claude
            prompt=context
        )

        # 4. 后处理:提取代码块,格式化
        return self._postprocess(response, workspace)


class CodeIndexer:
    """基于AST+Embedding的代码索引"""
    def index(self, repo_path: str):
        for file in self._walk_source_files(repo_path):
            tree = ast.parse(file)  # 语言特定AST解析
            for symbol in self._extract_symbols(tree):
                # 存储函数/类的签名、docstring、body
                embedding = self.embedder.encode(
                    f"{symbol.name}\n{symbol.docstring}\n{symbol.signature}"
                )
                self.vector_db.upsert(
                    vector=embedding,
                    symbol=symbol.name,
                    file=file.path,
                    body=symbol.body,
                    type=symbol.type  # function/class/method
                )

技术选型:

模块推荐方案
补全模型Codestral / DeepSeek-Coder / StarCoder2
对话模型GPT-4o / Claude 3.5 Sonnet
代码索引Tree-sitter (AST) + Embedding
IDE集成VS Code Extension API / LSP
缓存Redis / 本地LRU

扩展性考虑:

  • 本地模型兜底:离线或API不可用时使用本地小模型
  • 增量索引:文件保存时增量更新索引
  • 个性化:根据用户编码风格微调补全
  • 隐私保护:敏感代码不发送到云端,本地模型处理

三、模型服务平台设计

Q5: 设计一个高可用模型推理服务 ⭐⭐⭐

需求分析:

  • 支持多个模型同时服务(LLM、Embedding、Reranker、VLM)
  • P99延迟 < 2s(流式首token < 500ms)
  • 可用性 99.95%,支持滚动升级
  • GPU利用率 > 70%
  • 支持自动扩缩容

架构设计:

客户端请求

API Gateway (限流/鉴权/路由)

请求调度器 (优先级队列/公平调度)

模型路由 (根据模型名/版本路由)

推理引擎集群
    ├→ vLLM (LLM, PagedAttention)
    ├→ TensorRT-LLM (高吞吐场景)
    ├→ Triton (Embedding/小模型)
    └→ ONNX Runtime (传统模型)

响应流式返回

核心模块:

python
class ModelServingPlatform:
    def __init__(self):
        self.router = ModelRouter()
        self.scheduler = RequestScheduler()
        self.health_checker = HealthChecker()
        self.metrics = MetricsCollector()

    async def serve(self, request: InferenceRequest):
        # 1. 限流检查
        if not await self.rate_limiter.allow(request.user, request.model):
            raise RateLimitError()

        # 2. 路由到可用实例
        instance = await self.router.route(
            model=request.model,
            version=request.version,
            strategy="least_connections"  # 加权轮询/最少连接/延迟优先
        )

        # 3. 调度(支持优先级和抢占)
        future = await self.scheduler.schedule(
            request=request,
            instance=instance,
            priority=request.priority,
            timeout=request.timeout
        )

        # 4. 流式返回
        async for chunk in future:
            yield chunk

        # 5. 记录指标
        self.metrics.record(
            model=request.model,
            latency=future.latency,
            tokens=future.output_tokens,
            instance=instance.id
        )


class ModelRouter:
    """智能路由,考虑负载、延迟、成本"""
    async def route(self, model: str, version: str, strategy: str):
        instances = await self.registry.get_healthy_instances(model, version)

        if strategy == "least_connections":
            return min(instances, key=lambda i: i.active_requests)
        elif strategy == "latency_based":
            return min(instances, key=lambda i: i.avg_latency)
        elif strategy == "cost_optimized":
            # 优先使用小模型/本地GPU,溢出到云端
            for inst in sorted(instances, key=lambda i: i.cost_per_token):
                if inst.available_capacity > 0:
                    return inst

        return random.choice(instances)


class AutoScaler:
    """基于队列深度和延迟的自动扩缩容"""
    async def evaluate(self, model: str):
        metrics = await self.metrics.get_recent(model, window="5m")
        instances = await self.registry.get_instances(model)

        # 扩容条件
        if (metrics.queue_depth > len(instances) * 10 or
            metrics.p99_latency > self.thresholds[model].max_latency):
            new_count = min(len(instances) * 2, self.max_instances[model])
            await self.k8s.scale(model, new_count)

        # 缩容条件
        if (metrics.avg_gpu_util < 0.3 and
            metrics.queue_depth < len(instances) * 2):
            new_count = max(len(instances) // 2, self.min_instances[model])
            await self.k8s.scale(model, new_count)

技术选型:

模块推荐方案
LLM推理引擎vLLM / TensorRT-LLM
小模型服务Triton Inference Server
API GatewayKong / APISIX
编排K8s + GPU Operator
监控Prometheus + Grafana
服务网格Istio (流量管理/熔断)

扩展性考虑:

  • 多级缓存:KV Cache复用、prompt前缀缓存(vLLM Automatic Prefix Caching)
  • 量化部署:INT4/INT8量化,同一GPU服务更多请求
  • 多区域部署:就近接入,跨区域容灾
  • 模型版本管理:A/B测试,灰度发布,快速回滚

Q6: 设计一个Prompt管理平台 ⭐⭐

需求分析:

  • 集中管理所有业务的Prompt模板
  • 支持版本管理、A/B测试、灰度发布
  • Prompt在线调试和效果评估
  • 变量模板、多语言支持
  • 权限控制和审计日志

架构设计:

Prompt管理界面 (Web UI)

Prompt CRUD API → Prompt存储 (MySQL + Git)

Prompt渲染引擎 (Jinja2变量替换)

A/B实验引擎 (分流/指标收集)

效果评估模块 (自动打标/人工评估)

SDK (Python/Java/Go) ← 业务系统集成

核心模块:

python
class PromptManager:
    def __init__(self, store, renderer, experiment):
        self.store = store       # Prompt存储
        self.renderer = renderer  # 模板渲染
        self.experiment = experiment  # A/B实验

    async def get_prompt(self, prompt_key: str, user_id: str,
                         variables: dict) -> str:
        # 1. 获取Prompt(带实验分流)
        prompt_config = await self.experiment.assign(
            prompt_key=prompt_key,
            user_id=user_id
        )

        # 2. 渲染模板
        rendered = self.renderer.render(
            template=prompt_config.template,
            variables=variables
        )
        return rendered

    async def create_version(self, prompt_key: str, template: str,
                             metadata: dict):
        """创建新版本,自动diff"""
        current = await self.store.get_latest(prompt_key)
        new_version = PromptVersion(
            key=prompt_key,
            template=template,
            version=current.version + 1,
            parent_version=current.version,
            diff=self._compute_diff(current.template, template),
            **metadata
        )
        await self.store.save(new_version)
        return new_version

    async def evaluate(self, prompt_key: str, version: int,
                       test_cases: list[EvalCase]) -> EvalReport:
        """自动评估Prompt效果"""
        results = []
        prompt = await self.store.get(prompt_key, version)

        for case in test_cases:
            rendered = self.renderer.render(prompt.template, case.inputs)
            output = await self.llm.generate(rendered)
            score = await self._auto_score(
                output=output,
                expected=case.expected,
                criteria=case.eval_criteria
            )
            results.append(EvalResult(
                input=case.inputs, output=output,
                expected=case.expected, score=score
            ))

        return EvalReport(
            prompt_key=prompt_key,
            version=version,
            avg_score=sum(r.score for r in results) / len(results),
            pass_rate=sum(1 for r in results if r.score > 0.7) / len(results),
            results=results
        )


class ABExperimentEngine:
    """Prompt A/B实验引擎"""
    async def assign(self, prompt_key: str, user_id: str) -> PromptConfig:
        experiment = await self.get_active_experiment(prompt_key)
        if not experiment:
            return await self.store.get_production(prompt_key)

        # 一致性哈希分流
        bucket = self._hash(user_id) % 100
        for variant in experiment.variants:
            if bucket < variant.traffic_percent:
                return variant.prompt_config
            bucket -= variant.traffic_percent

        return experiment.control.prompt_config

技术选型:

模块推荐方案
存储MySQL + Git版本控制
模板引擎Jinja2 / Mustache
A/B实验自研 / Statsig / GrowthBook
评估LLM-as-Judge + 人工标注
SDKPython / TypeScript
前端React + Monaco Editor

扩展性考虑:

  • Prompt推荐:基于历史效果数据推荐最优Prompt
  • 多模型适配:同一Prompt针对不同模型自动调整
  • 批量评估:CI/CD集成,Prompt变更自动回归测试
  • 成本估算:渲染后token数估算和费用预测

四、多Agent系统设计

Q7: 设计一个多Agent客服系统 ⭐⭐⭐

需求分析:

  • 意图识别后路由到专业Agent(售前/售后/技术支持/退款)
  • 复杂问题可跨Agent协作
  • 知识库实时更新,新产品上线后快速生效
  • 人机协同:超出能力时平滑转人工
  • 满意度 > 85%

架构设计:

用户消息

路由Agent (意图识别 + 情绪分析)

┌───────┼───────┬──────────┐
售前Agent 售后Agent 技术Agent  退款Agent
   ↕         ↕        ↕          ↕
   └─────────┴────────┴──────────┘
            共享上下文总线

          工具层:订单系统/知识库/Ticket系统

          人工兜底 (转接人工客服)

核心模块:

python
class CustomerServiceSystem:
    def __init__(self):
        self.router = RouterAgent()
        self.agents = {
            "presale": PresaleAgent(),
            "aftersale": AfterSaleAgent(),
            "tech_support": TechSupportAgent(),
            "refund": RefundAgent(),
        }
        self.context_bus = ContextBus()
        self.handoff = HumanHandoff()

    async def handle(self, message: Message, session: Session):
        # 1. 路由Agent分析意图
        intent = await self.router.classify(
            message=message,
            history=session.history,
            user_profile=session.user_profile
        )

        # 2. 情绪分析 - 检测是否需要转人工
        sentiment = await self.analyzer.analyze(message)
        if sentiment.anger_score > 0.8:
            return await self.handoff.transfer(session, reason="高情绪")

        # 3. 分发到专业Agent
        agent = self.agents[intent.domain]
        response = await agent.handle(
            message=message,
            context=self.context_bus.get(session.id),
            tools=self._get_tools(intent.domain)
        )

        # 4. 跨Agent协作
        if response.needs_collaboration:
            collab_agent = self.agents[response.collab_domain]
            collab_response = await collab_agent.assist(
                context=response.context,
                question=response.collab_question
            )
            response = agent.merge(response, collab_response)

        # 5. 置信度检查 - 低置信度转人工
        if response.confidence < 0.6:
            return await self.handoff.transfer(
                session,
                reason="低置信度",
                partial_answer=response.text
            )

        # 6. 更新上下文
        self.context_bus.update(session.id, message, response)
        return response


class RouterAgent:
    """路由Agent - 意图识别与分发"""
    SYSTEM_PROMPT = """你是客服路由专家。根据用户消息判断:
    1. 主要意图领域:presale/aftersale/tech_support/refund/general
    2. 紧急程度:low/medium/high
    3. 是否需要多领域协作

    返回JSON格式。"""

    async def classify(self, message, history, user_profile):
        prompt = f"""用户画像:{user_profile.summary}
最近对话:{history[-3:]}
当前消息:{message.content}"""

        result = await self.llm.generate(
            system=self.SYSTEM_PROMPT,
            prompt=prompt,
            response_format={"type": "json_object"}
        )
        return Intent.parse(result)


class PresaleAgent:
    """售前Agent - 产品咨询与推荐"""
    def __init__(self):
        self.product_kb = ProductKnowledgeBase()
        self.tools = [
            SearchProductsTool(),
            CompareProductsTool(),
            CheckStockTool(),
            GetPriceTool(),
        ]

    async def handle(self, message, context, tools):
        # 检索产品知识库
        kb_results = await self.product_kb.search(message, top_k=3)
        # 生成回答
        response = await self.llm.generate(
            system=self._build_system_prompt(kb_results),
            messages=context.messages + [message]
        )
        return response

技术选型:

模块推荐方案
Agent框架LangGraph / CrewAI
意图识别微调BERT + LLM兜底
知识库向量DB + 结构化产品库
人工转接飞书/钉钉 + 自建工单系统
对话存储Redis (会话) + MySQL (持久化)

扩展性考虑:

  • Agent热插拔:新业务Agent可动态注册
  • 对话质检:自动评估对话质量,发现问题Agent
  • 多语言:检测语言后路由到对应语言Agent
  • 数据飞轮:对话数据→标注→微调→提升准确率

Q8: 设计一个多Agent研究助手 ⭐⭐⭐

需求分析:

  • 用户输入研究主题,自动搜索、阅读、分析、生成研究报告
  • 支持学术论文、网页、数据集等多种信息源
  • Agent之间可讨论、质疑、迭代改进
  • 输出结构化研究报告,附引用来源
  • 单次研究耗时 < 10分钟

架构设计:

用户输入研究主题

规划Agent (分解研究问题, 制定计划)

┌───────┼───────┬──────────┐
搜索Agent 阅读Agent 分析Agent  写作Agent
(多源搜索) (文献精读) (数据分析) (报告撰写)
   ↕         ↕        ↕          ↕
   └─────────┴────────┴──────────┘
          共享研究笔记 (Blackboard)

审核Agent (质量检查/事实核查)

最终研究报告 + 参考文献

核心模块:

python
class ResearchAssistant:
    def __init__(self):
        self.agents = {
            "planner": PlannerAgent(),
            "searcher": SearchAgent(),
            "reader": ReaderAgent(),
            "analyst": AnalystAgent(),
            "writer": WriterAgent(),
            "reviewer": ReviewerAgent(),
        }
        self.blackboard = ResearchBlackboard()

    async def research(self, topic: str, depth: str = "standard"):
        # 1. 规划分解
        plan = await self.agents["planner"].create_plan(
            topic=topic,
            depth=depth,
            template=self._get_template(depth)
        )
        self.blackboard.set_plan(plan)

        # 2. 并行搜索
        search_tasks = []
        for question in plan.questions:
            search_tasks.append(
                self.agents["searcher"].search(question)
            )
        search_results = await asyncio.gather(*search_tasks)

        # 3. 阅读与提取
        for results in search_results:
            for source in results[:5]:  # 每个问题取top5
                extracted = await self.agents["reader"].read(
                    url=source.url,
                    query=source.question
                )
                self.blackboard.add_note(extracted)

        # 4. 分析与综合
        analysis = await self.agents["analyst"].analyze(
            notes=self.blackboard.get_all_notes(),
            plan=plan
        )
        self.blackboard.set_analysis(analysis)

        # 5. 撰写报告
        draft = await self.agents["writer"].write(
            plan=plan,
            analysis=analysis,
            notes=self.blackboard.get_all_notes()
        )

        # 6. 审核迭代 (最多3轮)
        for i in range(3):
            review = await self.agents["reviewer"].review(draft)
            if review.quality_score > 0.85:
                break
            draft = await self.agents["writer"].revise(
                draft=draft,
                feedback=review.feedback
            )

        return ResearchReport(
            title=topic,
            content=draft,
            references=self.blackboard.get_references(),
            metadata={"iterations": i + 1, "sources": len(self.blackboard)}
        )


class ResearchBlackboard:
    """共享研究笔记板 - 多Agent协作的核心"""
    def __init__(self):
        self.notes: list[ResearchNote] = []
        self.plan: ResearchPlan = None
        self.analysis: Analysis = None

    def add_note(self, note: ResearchNote):
        # 去重检查
        if not self._is_duplicate(note):
            self.notes.append(note)
            self._update_index(note)

    def get_relevant_notes(self, query: str, top_k=10) -> list:
        """语义检索相关笔记"""
        return self.vector_search(query, self.notes, top_k)

    def get_references(self) -> list:
        return [n.citation for n in self.notes if n.citation]

技术选型:

模块推荐方案
Agent编排LangGraph (DAG工作流)
搜索Tavily / Serper / ArXiv API
网页阅读Jina Reader / Firecrawl
论文阅读GPT-4o + PDF解析
LLMClaude 3.5 / GPT-4o
报告生成Markdown + 可选PDF导出

扩展性考虑:

  • 流式进度:实时展示研究进展给用户
  • 交互式研究:用户中途可调整方向/补充信息
  • 领域专业化:不同领域使用不同的分析策略和知识源
  • 缓存复用:相似主题复用已有研究成果

五、实战难题

难题1: RAG系统中如何解决长文档的"大海捞针"问题? ⭐⭐⭐

问题: 一篇100页的PDF中,某个关键信息只出现在一个段落中,常规分块检索很容易遗漏。

解决方案:

python
class HierarchicalIndexer:
    """层次化索引:文档级→章节级→段落级"""

    def index_document(self, doc):
        # 第一层:文档摘要索引
        doc_summary = self.llm.summarize(doc.full_text, max_len=500)
        self.index.upsert("doc", doc.id, doc_summary, {
            "type": "summary", "doc_id": doc.id
        })

        # 第二层:章节索引(每个章节生成摘要)
        for section in doc.sections:
            section_summary = self.llm.summarize(section.text, max_len=200)
            self.index.upsert("section", section.id, section_summary, {
                "type": "section", "doc_id": doc.id, "section_id": section.id
            })

        # 第三层:段落级细粒度索引
        for chunk in self.chunker.chunk(doc, strategy="semantic"):
            self.index.upsert("chunk", chunk.id, chunk.text, {
                "type": "chunk", "doc_id": doc.id,
                "section_id": chunk.section_id, "page": chunk.page
            })

    async def retrieve(self, query, top_k=10):
        # 三级检索:粗→细
        # 1) 先检索文档摘要,定位目标文档
        doc_hits = await self.index.search(query, level="doc", top_k=3)
        # 2) 在目标文档中检索章节
        section_hits = await self.index.search(
            query, level="section",
            filter={"doc_id": {"$in": [d.id for d in doc_hits]}}, top_k=5
        )
        # 3) 在目标章节中检索段落
        chunk_hits = await self.index.search(
            query, level="chunk",
            filter={"section_id": {"$in": [s.id for s in section_hits]}},
            top_k=top_k
        )
        return chunk_hits

补充手段:

  • 生成假设性问题(HyDE):为每个chunk预先生成可能的提问
  • 文档结构化提取:表格、列表等结构化信息单独索引
  • Parent-Child索引:检索child chunk时返回parent上下文

难题2: 如何设计一个支持10万QPS的Embedding服务? ⭐⭐⭐

问题: 大规模RAG系统需要对所有查询和文档做Embedding,吞吐量瓶颈明显。

解决方案:

python
class HighThroughputEmbeddingService:
    def __init__(self):
        # 多级缓存
        self.cache = LRUCache(maxsize=1_000_000)  # 本地缓存
        self.redis = RedisCache()  # 分布式缓存

        # 动态批处理
        self.batcher = DynamicBatcher(
            max_batch_size=256,
            max_latency_ms=50,  # 最多等50ms凑批
        )

        # 多模型副本
        self.model_pool = ModelPool(
            model="bge-large-zh-v1.5",
            replicas=8,  # 8个GPU副本
            backend="triton"  # TensorRT加速
        )

    async def embed(self, texts: list[str]) -> list[list[float]]:
        results = [None] * len(texts)
        uncached_indices = []
        uncached_texts = []

        # 1. 查缓存
        for i, text in enumerate(texts):
            cached = self.cache.get(self._hash(text))
            if cached is None:
                cached = await self.redis.get(self._hash(text))
            if cached:
                results[i] = cached
            else:
                uncached_indices.append(i)
                uncached_texts.append(text)

        if not uncached_texts:
            return results

        # 2. 动态批处理推理
        embeddings = await self.batcher.process(
            texts=uncached_texts,
            inference_fn=self.model_pool.infer
        )

        # 3. 回填缓存和结果
        for idx, emb in zip(uncached_indices, embeddings):
            results[idx] = emb
            self.cache.set(self._hash(texts[idx]), emb)
            await self.redis.set(self._hash(texts[idx]), emb, ttl=3600)

        return results


class DynamicBatcher:
    """动态批处理:在延迟和吞吐间平衡"""
    def __init__(self, max_batch_size, max_latency_ms):
        self.queue = asyncio.Queue()
        self.max_batch_size = max_batch_size
        self.max_latency_ms = max_latency_ms

    async def process(self, texts, inference_fn):
        # 将请求放入队列
        futures = []
        for text in texts:
            future = asyncio.Future()
            await self.queue.put((text, future))
            futures.append(future)

        # 后台批处理worker
        return await asyncio.gather(*futures)

    async def _batch_worker(self, inference_fn):
        while True:
            batch = []
            # 收集batch:达到max_batch_size或超时
            try:
                first = await self.queue.get()
                batch.append(first)
                deadline = time.time() + self.max_latency_ms / 1000

                while len(batch) < self.max_batch_size:
                    remaining = deadline - time.time()
                    if remaining <= 0:
                        break
                    try:
                        item = await asyncio.wait_for(
                            self.queue.get(), timeout=remaining
                        )
                        batch.append(item)
                    except asyncio.TimeoutError:
                        break
            except Exception:
                continue

            # 推理
            texts = [b[0] for b in batch]
            embeddings = await inference_fn(texts)

            # 回传结果
            for (_, future), emb in zip(batch, embeddings):
                future.set_result(emb)

关键优化点:

  • 模型量化:FP16→INT8,吞吐量提升2x
  • TensorRT优化:编译优化,推理速度提升3-5x
  • 缓存命中率:热点query缓存命中率可达60%+
  • 水平扩展:根据QPS自动扩缩GPU副本数

难题3: Agent执行过程中如何处理"幻觉"导致的错误操作? ⭐⭐⭐

问题: Agent调用工具时,LLM可能产生幻觉(如编造参数、误判结果),在自动化运维等场景可能造成严重后果。

解决方案:

python
class SafeAgentExecutor:
    """多层防护机制"""

    def __init__(self, llm, tools, validator, sandbox):
        self.llm = llm
        self.tools = tools
        self.validator = validator
        self.sandbox = sandbox

    async def execute(self, task: str):
        plan = await self.llm.plan(task)

        for step in plan.steps:
            # 防护层1: 参数验证
            validation = await self.validator.validate_params(
                tool=step.tool,
                params=step.params,
                schema=self.tools.get_schema(step.tool)
            )
            if not validation.valid:
                # 参数无效,让LLM重新生成
                step = await self.llm.replan_step(
                    step, validation.error, max_retries=2
                )

            # 防护层2: 沙箱预执行
            if step.risk_level >= RiskLevel.MEDIUM:
                preview = await self.sandbox.dry_run(
                    tool=step.tool, params=step.params
                )
                # 检查预执行结果是否合理
                sanity = await self.llm.evaluate(
                    f"操作预览:{preview},预期效果:{step.expected_result},"
                    f"是否合理?回答yes/no及原因。"
                )
                if "no" in sanity.lower():
                    return Error(f"预执行检查未通过: {sanity}")

            # 防护层3: 执行后验证
            result = await self.tools.execute(step.tool, step.params)

            # 检查结果是否与预期一致
            consistency = await self.llm.evaluate(
                f"操作结果:{result},预期:{step.expected_result},"
                f"是否一致?"
            )
            if "不一致" in consistency:
                # 回滚
                await self.tools.rollback(step)
                return Error(f"结果不符合预期,已回滚: {consistency}")

        return Success(plan.results)

核心策略:

  • Schema强制验证:工具参数必须通过JSON Schema校验
  • Dry-run机制:高危操作必须先预执行
  • 结果一致性检查:LLM评估执行结果是否符合预期
  • 自动回滚:检测异常后自动回滚
  • 操作确认:高危操作需人工二次确认

难题4: 如何解决多Agent系统中的通信死锁和资源竞争? ⭐⭐⭐

问题: 多Agent并行执行时,Agent A等待Agent B的结果,Agent B也在等待Agent A,造成死锁。

解决方案:

python
class AgentOrchestrator:
    """基于DAG的Agent编排引擎,避免循环依赖"""

    def __init__(self):
        self.graph = DAG()
        self.resource_locks = {}  # 资源锁管理
        self.timeout = 300  # 全局超时

    async def execute(self, workflow: WorkflowSpec):
        # 1. 静态检查:检测循环依赖
        dag = self._build_dag(workflow)
        if not dag.is_acyclic():
            raise CyclicDependencyError(dag.find_cycle())

        # 2. 拓扑排序执行
        execution_order = dag.topological_sort()
        results = {}

        for batch in execution_order:
            # 同一批次并行执行
            tasks = []
            for agent_node in batch:
                # 等待前置依赖完成
                deps = {dep: results[dep] for dep in agent_node.dependencies}
                tasks.append(
                    self._execute_with_timeout(agent_node, deps)
                )

            batch_results = await asyncio.gather(*tasks, return_exceptions=True)

            for node, result in zip(batch, batch_results):
                if isinstance(result, Exception):
                    # 单节点失败不影响其他独立节点
                    results[node.id] = ErrorResult(result)
                    # 触发降级策略
                    if node.critical:
                        return self._handle_critical_failure(node, result)
                else:
                    results[node.id] = result

        return results

    async def _execute_with_timeout(self, node, deps):
        """带超时和重试的执行"""
        for attempt in range(node.max_retries):
            try:
                return await asyncio.wait_for(
                    node.agent.execute(deps),
                    timeout=node.timeout
                )
            except asyncio.TimeoutError:
                if attempt == node.max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)


class ResourceManager:
    """资源锁管理,防止竞争"""

    def __init__(self):
        self.locks = {}

    async def acquire(self, resource_id: str, agent_id: str,
                      timeout: int = 30):
        lock = self.locks.setdefault(resource_id, asyncio.Lock())
        try:
            await asyncio.wait_for(lock.acquire(), timeout=timeout)
            return ResourceLock(resource_id, agent_id)
        except asyncio.TimeoutError:
            raise ResourceBusyError(resource_id, agent_id)

关键设计原则:

  • DAG编排:用有向无环图定义Agent依赖关系,静态检测循环
  • 超时机制:每个Agent执行有明确超时,避免无限等待
  • 资源锁:共享资源(如数据库、文件)需加锁
  • 降级策略:非关键Agent失败时可降级执行

难题5: 如何在生产环境中监控和调试Agent的执行过程? ⭐⭐

问题: Agent行为不确定性大,生产环境出问题后难以复现和排查。

解决方案:

python
class AgentTracer:
    """Agent执行追踪系统"""

    def __init__(self):
        self.trace_store = TraceStore()  # ClickHouse/ES
        self.metrics = PrometheusMetrics()

    def trace_agent_run(self, agent_id: str, run_id: str):
        """装饰器:追踪Agent完整执行过程"""
        def decorator(fn):
            @wraps(fn)
            async def wrapper(*args, **kwargs):
                span = TraceSpan(
                    run_id=run_id,
                    agent_id=agent_id,
                    start_time=time.time()
                )

                try:
                    result = await fn(*args, **kwargs)
                    span.status = "success"
                    span.output = self._sanitize(result)
                    return result
                except Exception as e:
                    span.status = "error"
                    span.error = str(e)
                    raise
                finally:
                    span.end_time = time.time()
                    span.latency = span.end_time - span.start_time
                    # 记录LLM调用详情
                    span.llm_calls = self._get_llm_calls(run_id)
                    span.token_usage = self._calc_tokens(span.llm_calls)

                    await self.trace_store.save(span)
                    self.metrics.observe(agent_id, span)

            return wrapper
        return decorator

    async def debug_run(self, run_id: str):
        """调试:回放Agent执行过程"""
        trace = await self.trace_store.get(run_id)

        report = []
        for span in trace.spans:
            report.append({
                "step": span.name,
                "input": span.input,
                "llm_prompt": span.llm_calls[-1].prompt if span.llm_calls else None,
                "llm_response": span.llm_calls[-1].response if span.llm_calls else None,
                "output": span.output,
                "latency": span.latency,
                "tokens": span.token_usage,
                "status": span.status,
            })

        return ExecutionReport(run_id=run_id, steps=report)


class AgentEvaluator:
    """Agent效果评估"""
    async def evaluate_batch(self, test_cases: list[TestCase]):
        results = []
        for case in test_cases:
            run_id = str(uuid.uuid4())
            output = await self.agent.run(case.input, run_id=run_id)
            trace = await self.tracer.debug_run(run_id)

            score = await self.judge.evaluate(
                input=case.input,
                output=output,
                expected=case.expected,
                criteria=case.criteria
            )
            results.append(EvalResult(
                case=case, output=output, score=score, trace=trace
            ))

        return EvalReport(results=results)

最佳实践:

  • 全链路追踪:每次Agent运行生成完整trace,记录所有LLM调用
  • 结构化日志:统一日志格式,便于查询和分析
  • 关键指标监控:成功率、延迟、token消耗、工具调用成功率
  • 回放调试:可回放任意一次执行的完整过程
  • 定期评估:用测试集定期评估Agent质量,检测回归

六、设计题通用答题框架

面试时拿到任何系统设计题,都可以套用以下框架:

1. 需求澄清 (2分钟)
   - 功能需求(做什么)
   - 非功能需求(性能/可用性/规模)
   - 约束条件(预算/时间/技术栈)

2. 高层架构 (5分钟)
   - 画出核心模块和数据流
   - 确定主要技术选型

3. 核心模块详细设计 (10分钟)
   - 每个模块的职责、接口、关键算法
   - 代码示例

4. 扩展性讨论 (3分钟)
   - 如何水平扩展
   - 故障处理和降级策略
   - 未来演进方向

七、Prompt 管理平台设计

Q9: 设计一个 Prompt 管理平台 ⭐⭐

需求分析:

  • 支持 Prompt 的版本控制(类似 Git 的版本管理)
  • A/B 测试:同时运行多个 Prompt 版本,对比效果
  • 效果评估:自动计算准确率、延迟、token 消耗等指标
  • 团队协作:多人协作编辑,权限管理,审核流程
  • 支持 Prompt 模板化(变量替换、条件分支)

架构设计:

前端(Prompt编辑器 + 评估Dashboard)

API Gateway → Prompt管理服务 → Prompt存储(版本化)
    ↓                              ↓
A/B测试引擎                    模板渲染引擎
    ↓                              ↓
流量分配 → 多版本LLM调用        变量注入 + 条件渲染

效果收集 → 评估服务 → 指标存储(ClickHouse)

Dashboard展示(版本对比、趋势图、统计检验)

核心模块:

python
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
import re
import random


@dataclass
class PromptVersion:
    version_id: str
    prompt_template: str
    variables: list[str]
    created_by: str
    created_at: datetime
    commit_msg: str
    parent_version: str | None = None
    tags: list[str] = field(default_factory=list)


@dataclass
class EvalMetric:
    version_id: str
    accuracy: float
    avg_latency_ms: float
    avg_tokens: int
    sample_count: int
    p50_latency: float
    p99_latency: float


class PromptManager:
    """Prompt 版本管理核心"""

    def __init__(self, storage, template_engine):
        self.storage = storage
        self.template_engine = template_engine

    async def create_version(self, prompt_template: str,
                              commit_msg: str, author: str,
                              parent_version: str | None = None) -> PromptVersion:
        """创建新版本"""
        variables = self._extract_variables(prompt_template)
        version_id = hashlib.sha256(
            f"{prompt_template}{datetime.now().isoformat()}".encode()
        ).hexdigest()[:12]

        version = PromptVersion(
            version_id=version_id,
            prompt_template=prompt_template,
            variables=variables,
            created_by=author,
            created_at=datetime.now(),
            commit_msg=commit_msg,
            parent_version=parent_version,
        )
        await self.storage.save(version)
        return version

    def _extract_variables(self, template: str) -> list[str]:
        """提取模板中的变量名"""
        return list(set(re.findall(r"\{\{(\w+)\}\}", template)))

    async def render(self, version_id: str, **kwargs) -> str:
        """渲染模板"""
        version = await self.storage.get(version_id)
        return self.template_engine.render(version.prompt_template, **kwargs)

    async def diff(self, v1_id: str, v2_id: str) -> dict:
        """对比两个版本的差异"""
        v1 = await self.storage.get(v1_id)
        v2 = await self.storage.get(v2_id)
        return {
            "template_diff": self._text_diff(v1.prompt_template, v2.prompt_template),
            "variables_added": set(v2.variables) - set(v1.variables),
            "variables_removed": set(v1.variables) - set(v2.variables),
        }

    async def rollback(self, version_id: str, author: str) -> PromptVersion:
        """回滚到指定版本(创建新版本)"""
        old = await self.storage.get(version_id)
        return await self.create_version(
            prompt_template=old.prompt_template,
            commit_msg=f"Rollback to {version_id}",
            author=author,
            parent_version=version_id,
        )


class ABTestEngine:
    """A/B 测试引擎"""

    def __init__(self, llm_client, metrics_store):
        self.llm = llm_client
        self.metrics = metrics_store
        self.experiments: dict[str, dict] = {}

    async def create_experiment(self, name: str,
                                 variants: list[dict],
                                 traffic_split: list[float]) -> str:
        """
        创建实验
        variants: [{"version_id": "v1", "prompt": "..."}, ...]
        traffic_split: [0.5, 0.5] 表示各 50% 流量
        """
        exp_id = hashlib.sha256(name.encode()).hexdigest()[:8]
        self.experiments[exp_id] = {
            "name": name,
            "variants": variants,
            "traffic_split": traffic_split,
            "status": "running",
        }
        return exp_id

    async def route_request(self, exp_id: str, input_data: dict) -> dict:
        """路由请求到某个变体"""
        exp = self.experiments[exp_id]
        variant = self._select_variant(exp["traffic_split"], exp["variants"])

        # 渲染 Prompt 并调用 LLM
        prompt = variant["rendered_prompt"].format(**input_data)
        start = datetime.now()
        response = await self.llm.generate(prompt)
        latency = (datetime.now() - start).total_seconds() * 1000

        # 记录指标
        await self.metrics.record(
            exp_id=exp_id,
            variant_id=variant["version_id"],
            latency_ms=latency,
            tokens=response.usage.total_tokens,
            input=input_data,
            output=response.text,
        )
        return {"variant": variant["version_id"], "response": response.text}

    def _select_variant(self, splits: list[float], variants: list[dict]) -> dict:
        """按流量比例选择变体"""
        r = random.random()
        cumulative = 0.0
        for split, variant in zip(splits, variants):
            cumulative += split
            if r <= cumulative:
                return variant
        return variants[-1]

    async def analyze(self, exp_id: str) -> dict:
        """分析实验结果"""
        exp = self.experiments[exp_id]
        results = {}
        for variant in exp["variants"]:
            vid = variant["version_id"]
            metrics = await self.metrics.query(exp_id, vid)
            results[vid] = EvalMetric(
                version_id=vid,
                accuracy=metrics.get("accuracy", 0),
                avg_latency_ms=metrics.get("avg_latency", 0),
                avg_tokens=metrics.get("avg_tokens", 0),
                sample_count=metrics.get("count", 0),
                p50_latency=metrics.get("p50", 0),
                p99_latency=metrics.get("p99", 0),
            )
        return results

技术选型:

模块推荐方案备选
Prompt 存储PostgreSQL + 版本化Git-based (DVC)
模板引擎Jinja2 / 自研Mustache
指标存储ClickHouse / PrometheusInfluxDB
前端React + Monaco Editor-
A/B 测试统计scipy.stats + 贝叶斯检验-

扩展性考虑:

  • Prompt 市场:团队间共享优质 Prompt 模板
  • 自动优化:基于评估结果自动调优 Prompt(DSPy 风格)
  • 安全审计:Prompt 注入检测、敏感词过滤
  • 多环境管理:dev/staging/prod 环境独立管理

八、多模态 RAG 系统设计

Q10: 设计一个多模态 RAG 系统 ⭐⭐⭐

需求分析:

  • 同时处理图片、表格、文本三种模态
  • 跨模态对齐:文字查图片、图片查文字
  • 多模态 LLM 生成:结合图片和文本上下文生成回答
  • 支持 PDF/Word 等复杂文档的解析

架构设计:

文档上传 → 多模态解析层
    ├→ 文本提取 → 文本 Embedding → 向量库
    ├→ 图片提取 → 视觉 Embedding (CLIP) → 向量库
    └→ 表格提取 → 表格 Embedding → 向量库

用户查询 → 多模态检索层
    ├→ 文本查询 → 文本 Embedding → 向量检索
    └→ 图片查询 → 视觉 Embedding → 跨模态检索

                结果融合 + 重排

                多模态 LLM 生成(GPT-4o / Qwen-VL)

                回答 + 引用(含图片引用)

核心模块:

python
import numpy as np
from dataclasses import dataclass
from enum import Enum


class Modality(Enum):
    TEXT = "text"
    IMAGE = "image"
    TABLE = "table"


@dataclass
class MultimodalChunk:
    chunk_id: str
    modality: Modality
    content: str          # 文本内容或图片描述
    image_data: bytes | None = None  # 图片原始数据
    embedding: np.ndarray | None = None
    metadata: dict = None


class MultimodalEmbedder:
    """跨模态 Embedding"""

    def __init__(self, text_model, vision_model):
        self.text_model = text_model   # e.g., BGE-large
        self.vision_model = vision_model  # e.g., CLIP

    def embed_text(self, text: str) -> np.ndarray:
        return self.text_model.encode(text)

    def embed_image(self, image: bytes) -> np.ndarray:
        return self.vision_model.encode_image(image)

    def embed_table(self, table_markdown: str) -> np.ndarray:
        return self.text_model.encode(table_markdown)


class MultimodalRAGSystem:
    """多模态 RAG 系统"""

    def __init__(self, embedder, vector_store, doc_parser, llm):
        self.embedder = embedder
        self.vector_store = vector_store
        self.doc_parser = doc_parser
        self.llm = llm

    async def index_document(self, doc_path: str):
        """索引文档:解析多模态内容并存入向量库"""
        parsed = self.doc_parser.parse(doc_path)

        for item in parsed.chunks:
            if item.modality == Modality.TEXT:
                emb = self.embedder.embed_text(item.content)
            elif item.modality == Modality.IMAGE:
                # 同时存储视觉 Embedding 和文本描述 Embedding
                img_emb = self.embedder.embed_image(item.image_data)
                desc_emb = self.embedder.embed_text(item.content)
                emb = np.concatenate([img_emb, desc_emb])
            elif item.modality == Modality.TABLE:
                emb = self.embedder.embed_table(item.content)

            item.embedding = emb
            await self.vector_store.upsert(
                id=item.chunk_id,
                vector=emb,
                content=item.content,
                modality=item.modality.value,
                image_data=item.image_data,
                metadata=item.metadata,
            )

    async def query(self, question: str, query_image: bytes | None = None,
                    top_k: int = 5) -> dict:
        """多模态检索 + 生成"""
        # 1. 多路召回
        text_emb = self.embedder.embed_text(question)
        text_results = await self.vector_store.search(text_emb, top_k=top_k)

        results = list(text_results)
        if query_image:
            img_emb = self.embedder.embed_image(query_image)
            img_results = await self.vector_store.search(img_emb, top_k=top_k)
            results = self._merge_results(text_results, img_results)

        # 2. 构建多模态上下文
        context_parts = []
        images = []
        for r in results[:top_k]:
            if r["modality"] == "text":
                context_parts.append(f"[文本] {r['content']}")
            elif r["modality"] == "table":
                context_parts.append(f"[表格]\n{r['content']}")
            elif r["modality"] == "image":
                context_parts.append(f"[图片] {r['content']}")
                images.append(r["image_data"])

        context = "\n\n".join(context_parts)

        # 3. 多模态 LLM 生成
        answer = await self.llm.generate(
            prompt=f"基于以下参考资料回答问题:\n{context}\n\n问题:{question}",
            images=images,
        )

        return {"answer": answer, "sources": results[:top_k]}

    def _merge_results(self, text_results, img_results) -> list:
        """融合多路检索结果(RRF)"""
        scores = {}
        for rank, r in enumerate(text_results):
            scores[r["id"]] = scores.get(r["id"], 0) + 1.0 / (60 + rank)
        for rank, r in enumerate(img_results):
            scores[r["id"]] = scores.get(r["id"], 0) + 1.0 / (60 + rank)

        all_results = {r["id"]: r for r in text_results + img_results}
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        return [all_results[rid] for rid in sorted_ids]

技术选型:

模块推荐方案备选
文本 EmbeddingBGE-large-zh-v1.5M3E, E5
视觉 EmbeddingCLIP / SigLIPE5-V
文档解析LlamaParse / MarkerUnstructured
多模态 LLMGPT-4o / Qwen2.5-VLClaude 3.5
向量数据库Milvus (多向量字段)Qdrant, Weaviate

扩展性考虑:

  • 异步预处理:文档上传后异步解析和索引
  • 图片去重:感知哈希去重,减少存储
  • 表格结构化:提取为结构化数据,支持 SQL 查询
  • 视频/音频支持:ASR 转文字 + 关键帧提取

九、LLM 网关设计

Q11: 设计一个 LLM 网关/API Gateway ⭐⭐

需求分析:

  • 统一入口:代理多个 LLM 供应商(OpenAI/Anthropic/国产模型)
  • 负载均衡:多实例间请求分发
  • 限流与鉴权:按用户/API Key 限流,OAuth2 鉴权
  • 模型路由:根据请求类型自动选择最优模型
  • 成本控制:token 计费、预算告警、用量统计

架构设计:

客户端请求

LLM Gateway
    ├→ 鉴权模块(API Key / OAuth2 验证)
    ├→ 限流模块(令牌桶 / 滑动窗口)
    ├→ 路由模块(模型选择策略)
    │      ├→ 规则路由(按 task_type)
    │      ├→ 质量路由(按 benchmark 排名)
    │      └→ 成本路由(按预算优先)
    ├→ 负载均衡(Round-Robin / 加权 / 最小连接)
    ├→ 成本追踪(token 计数 + 计费)
    └→ 响应缓存(语义缓存)

上游 LLM 服务(OpenAI / Anthropic / 自部署)

核心模块:

python
import time
import hashlib
from dataclasses import dataclass
from collections import defaultdict


@dataclass
class ProviderConfig:
    name: str
    base_url: str
    api_key: str
    models: list[str]
    weight: int = 1
    cost_per_1k_tokens: float = 0.0
    max_rpm: int = 1000


class RateLimiter:
    """滑动窗口限流器"""

    def __init__(self):
        self.windows: dict[str, list[float]] = defaultdict(list)

    def allow(self, key: str, max_requests: int, window_seconds: int = 60) -> bool:
        now = time.time()
        cutoff = now - window_seconds
        # 清理过期记录
        self.windows[key] = [t for t in self.windows[key] if t > cutoff]
        if len(self.windows[key]) >= max_requests:
            return False
        self.windows[key].append(now)
        return True


class ModelRouter:
    """模型路由"""

    def __init__(self, providers: list[ProviderConfig]):
        self.providers = providers
        self._provider_map: dict[str, list[ProviderConfig]] = defaultdict(list)
        for p in providers:
            for m in p.models:
                self._provider_map[m].append(p)

    def route(self, model: str, strategy: str = "round_robin") -> ProviderConfig:
        candidates = self._provider_map.get(model, [])
        if not candidates:
            raise ValueError(f"No provider for model: {model}")

        if strategy == "round_robin":
            return self._round_robin(model, candidates)
        elif strategy == "lowest_cost":
            return min(candidates, key=lambda p: p.cost_per_1k_tokens)
        elif strategy == "weighted":
            return self._weighted_select(candidates)
        return candidates[0]

    def _round_robin(self, model: str, candidates: list) -> ProviderConfig:
        idx = hash(time.time()) % len(candidates)
        return candidates[idx]

    def _weighted_select(self, candidates: list) -> ProviderConfig:
        import random
        total = sum(p.weight for p in candidates)
        r = random.uniform(0, total)
        cum = 0
        for p in candidates:
            cum += p.weight
            if r <= cum:
                return p
        return candidates[-1]


class CostTracker:
    """成本追踪"""

    def __init__(self, budget_store):
        self.budget_store = budget_store

    async def check_budget(self, user_id: str) -> bool:
        budget = await self.budget_store.get_budget(user_id)
        usage = await self.budget_store.get_usage(user_id)
        return usage < budget

    async def record_usage(self, user_id: str, model: str,
                           prompt_tokens: int, completion_tokens: int,
                           provider: ProviderConfig):
        cost = (prompt_tokens + completion_tokens) / 1000 * provider.cost_per_1k_tokens
        await self.budget_store.add_usage(user_id, cost, {
            "model": model,
            "provider": provider.name,
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
        })


class LLMGateway:
    """LLM 网关主类"""

    def __init__(self, router, rate_limiter, cost_tracker, cache, auth):
        self.router = router
        self.rate_limiter = rate_limiter
        self.cost_tracker = cost_tracker
        self.cache = cache
        self.auth = auth

    async def handle_request(self, request: dict) -> dict:
        # 1. 鉴权
        user = await self.auth.verify(request["api_key"])
        if not user:
            return {"error": "Unauthorized"}

        # 2. 限流
        if not self.rate_limiter.allow(user.id, max_requests=user.rpm_limit):
            return {"error": "Rate limit exceeded"}

        # 3. 预算检查
        if not await self.cost_tracker.check_budget(user.id):
            return {"error": "Budget exceeded"}

        # 4. 语义缓存
        cache_key = self._semantic_cache_key(request)
        if cached := await self.cache.get(cache_key):
            return cached

        # 5. 路由到上游
        provider = self.router.route(request["model"])
        response = await self._call_upstream(provider, request)

        # 6. 记录成本
        await self.cost_tracker.record_usage(
            user.id, request["model"],
            response["usage"]["prompt_tokens"],
            response["usage"]["completion_tokens"],
            provider,
        )

        # 7. 缓存响应
        await self.cache.set(cache_key, response, ttl=300)
        return response

    def _semantic_cache_key(self, request: dict) -> str:
        content = f"{request['model']}:{request['messages']}"
        return hashlib.sha256(content.encode()).hexdigest()

技术选型:

模块推荐方案备选
API 框架FastAPI / KongTraefik
限流Redis + 滑动窗口令牌桶
鉴权JWT + OAuth2API Key
缓存Redis (语义缓存)Memcached
监控Prometheus + GrafanaDatadog
成本存储PostgreSQLClickHouse

扩展性考虑:

  • 语义缓存:Embedding 相似度匹配,避免重复调用
  • 熔断降级:上游不可用时自动切换备用供应商
  • 请求重试:指数退避 + 自动重试
  • 审计日志:全量请求/响应记录,支持回溯

十、自动化测试平台设计

Q12: 设计一个 LLM 应用自动化测试平台 ⭐⭐⭐

需求分析:

  • Prompt 回归测试:Prompt 修改后自动检测效果是否退化
  • 模型版本对比:对比不同模型版本的输出质量
  • 自动化评估:LLM-as-Judge + 人工标注混合评估
  • CI/CD 集成:PR 合并前自动运行测试,阻断质量下降

架构设计:

CI/CD 触发(PR / 定时 / 手动)

测试调度器 → 测试用例管理

执行引擎(并发运行测试用例)
    ├→ Prompt 回归测试
    ├→ 模型对比测试
    └→ 集成测试(端到端)

评估引擎
    ├→ LLM-as-Judge(GPT-4o 评分)
    ├→ 规则评估(正则/关键词/格式)
    └→ 人工标注队列

报告生成 → CI 状态更新(Pass/Fail)

Dashboard(趋势、对比、告警)

核心模块:

python
import asyncio
from dataclasses import dataclass
from enum import Enum
from datetime import datetime


class TestStatus(Enum):
    PASS = "pass"
    FAIL = "fail"
    ERROR = "error"


@dataclass
class TestCase:
    id: str
    name: str
    input: str
    expected_output: str | None = None
    criteria: str | None = None       # 自然语言评估标准
    tags: list[str] = None
    threshold: float = 0.8            # 通过阈值


@dataclass
class TestResult:
    test_id: str
    status: TestStatus
    score: float
    actual_output: str
    latency_ms: float
    tokens_used: int
    judge_reason: str = ""


class LLMEvaluator:
    """LLM-as-Judge 评估器"""

    def __init__(self, judge_llm):
        self.judge = judge_llm

    async def evaluate(self, input_text: str, output: str,
                       expected: str | None = None,
                       criteria: str | None = None) -> tuple[float, str]:
        """评估输出质量,返回 (score, reason)"""
        prompt = f"""请评估以下 AI 回答的质量(0-1分)。

用户输入:{input_text}
AI 回答:{output}
"""
        if expected:
            prompt += f"期望回答:{expected}\n"
        if criteria:
            prompt += f"评估标准:{criteria}\n"

        prompt += """
请返回 JSON 格式:
{"score": 0.0-1.0, "reason": "评估理由"}
"""
        resp = await self.judge.generate(prompt)
        # 解析 JSON 响应
        import json
        result = json.loads(resp)
        return result["score"], result["reason"]


class TestRunner:
    """测试执行引擎"""

    def __init__(self, app_under_test, evaluator, max_concurrency: int = 10):
        self.app = app_under_test
        self.evaluator = evaluator
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def run_single(self, case: TestCase) -> TestResult:
        async with self.semaphore:
            # 1. 执行被测应用
            start = datetime.now()
            output = await self.app.run(case.input)
            latency = (datetime.now() - start).total_seconds() * 1000

            # 2. 评估
            score, reason = await self.evaluator.evaluate(
                input_text=case.input,
                output=output,
                expected=case.expected_output,
                criteria=case.criteria,
            )

            # 3. 判定
            status = TestStatus.PASS if score >= case.threshold else TestStatus.FAIL

            return TestResult(
                test_id=case.id,
                status=status,
                score=score,
                actual_output=output,
                latency_ms=latency,
                tokens_used=0,  # 从 app 返回
                judge_reason=reason,
            )

    async def run_suite(self, cases: list[TestCase]) -> dict:
        """运行测试套件"""
        results = await asyncio.gather(*[self.run_single(c) for c in cases])

        passed = sum(1 for r in results if r.status == TestStatus.PASS)
        failed = sum(1 for r in results if r.status == TestStatus.FAIL)
        avg_score = sum(r.score for r in results) / len(results)

        return {
            "total": len(results),
            "passed": passed,
            "failed": failed,
            "pass_rate": passed / len(results),
            "avg_score": avg_score,
            "avg_latency_ms": sum(r.latency_ms for r in results) / len(results),
            "results": results,
        }

    def compare_runs(self, baseline: dict, current: dict) -> dict:
        """对比两次运行结果"""
        score_diff = current["avg_score"] - baseline["avg_score"]
        latency_diff = current["avg_latency_ms"] - baseline["avg_latency_ms"]
        return {
            "score_change": score_diff,
            "latency_change_ms": latency_diff,
            "regression": score_diff < -0.05,  # 分数下降超 5% 视为回归
            "pass_rate_change": current["pass_rate"] - baseline["pass_rate"],
        }

技术选型:

模块推荐方案备选
测试框架pytest + asynciounittest
LLM-as-JudgeGPT-4o / ClaudeDeepSeek
测试用例管理PostgreSQL + YAML-
CI/CD 集成GitHub Actions / GitLab CIJenkins
报告Allure / 自研 Dashboard-

扩展性考虑:

  • 人工标注:争议样本进入人工标注队列
  • 测试用例自动生成:用 LLM 生成边界用例
  • 性能基准:记录历史数据,检测性能退化
  • 金丝雀测试:生产流量影子测试

十一、对话系统设计

Q13: 设计一个智能对话系统 ⭐⭐

需求分析:

  • 多轮对话管理:维护对话状态和上下文
  • 上下文压缩:长对话场景下压缩历史信息
  • 意图识别:理解用户真实意图
  • 槽位填充:提取关键参数(如时间、地点、数量)
  • 支持多种对话场景(客服、预约、查询等)

架构设计:

用户消息

NLU 层(意图识别 + 槽位填充)

对话管理器(Dialogue State Tracking)
    ├→ 上下文管理(滑动窗口 / 摘要压缩)
    ├→ 状态追踪(当前意图 + 已填槽位)
    └→ 策略选择(询问 / 执行 / 转人工)

执行层(调用后端 API / 查询数据库)

NLG 层(LLM 自然语言生成)

回复用户

核心模块:

python
import json
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class Slot:
    name: str
    required: bool = True
    prompt: str = ""  # 询问该槽位的提示语
    validator: str = ""  # 验证规则


@dataclass
class Intent:
    name: str
    slots: list[Slot] = field(default_factory=list)
    handler: str = ""  # 处理函数名


@dataclass
class DialogueState:
    session_id: str
    current_intent: str | None = None
    filled_slots: dict = field(default_factory=dict)
    history: list[dict] = field(default_factory=list)
    context_summary: str = ""


class NLUEngine:
    """自然语言理解:意图识别 + 槽位填充"""

    def __init__(self, llm, intent_definitions: list[Intent]):
        self.llm = llm
        self.intents = {i.name: i for i in intent_definitions}

    async def parse(self, text: str, history: list[dict]) -> dict:
        intent_list = "\n".join(f"- {name}: {[s.name for s in intent.slots]}"
                                 for name, intent in self.intents.items())

        prompt = f"""分析用户意图并提取关键参数。

可选意图:
{intent_list}

对话历史:
{json.dumps(history[-6:], ensure_ascii=False)}

用户输入:{text}

返回 JSON:
{{"intent": "意图名", "slots": {{"参数名": "参数值"}}, "confidence": 0.9}}
"""
        result = await self.llm.generate(prompt)
        return json.loads(result)


class DialogueManager:
    """对话管理器"""

    def __init__(self, nlu, llm, state_store, max_history: int = 10):
        self.nlu = nlu
        self.llm = llm
        self.state_store = state_store
        self.max_history = max_history

    async def handle_message(self, session_id: str, user_msg: str) -> str:
        # 1. 加载对话状态
        state = await self.state_store.get(session_id)
        if not state:
            state = DialogueState(session_id=session_id)

        # 2. NLU 解析
        parsed = await self.nlu.parse(user_msg, state.history)

        # 3. 更新状态
        if parsed.get("intent"):
            state.current_intent = parsed["intent"]
        if parsed.get("slots"):
            state.filled_slots.update(parsed["slots"])

        # 4. 检查是否需要追问(槽位未填满)
        intent_def = self.nlu.intents.get(state.current_intent)
        if intent_def:
            missing = self._find_missing_slots(intent_def, state.filled_slots)
            if missing:
                response = missing[0].prompt  # 询问缺失槽位
                state.history.append({"role": "user", "content": user_msg})
                state.history.append({"role": "assistant", "content": response})
                await self.state_store.save(session_id, state)
                return response

        # 5. 所有槽位已填充,执行任务
        response = await self._execute_intent(state)

        # 6. 更新历史(带压缩)
        state.history.append({"role": "user", "content": user_msg})
        state.history.append({"role": "assistant", "content": response})
        if len(state.history) > self.max_history * 2:
            await self._compress_history(state)

        await self.state_store.save(session_id, state)
        return response

    def _find_missing_slots(self, intent: Intent, filled: dict) -> list[Slot]:
        return [s for s in intent.slots if s.required and s.name not in filled]

    async def _execute_intent(self, state: DialogueState) -> str:
        """执行意图处理"""
        prompt = f"""根据用户意图和参数生成回复。

意图:{state.current_intent}
参数:{json.dumps(state.filled_slots, ensure_ascii=False)}
上下文摘要:{state.context_summary}

请生成自然、友好的回复。"""
        return await self.llm.generate(prompt)

    async def _compress_history(self, state: DialogueState):
        """上下文压缩:将旧历史生成摘要"""
        old_history = state.history[:-self.max_history]
        recent = state.history[-self.max_history:]

        summary_prompt = f"请用 3 句话概括以下对话的要点:\n{json.dumps(old_history, ensure_ascii=False)}"
        state.context_summary = await self.llm.generate(summary_prompt)
        state.history = recent

技术选型:

模块推荐方案备选
NLULLM (GPT-4o)Rasa NLU
对话管理自研 + LangGraphRasa Core
状态存储RedisPostgreSQL
NLGLLM 生成模板 + LLM 混合

扩展性考虑:

  • 多语言:检测语言后路由到对应 LLM
  • 情感分析:检测用户情绪,调整回复风格
  • 人机协作:复杂问题无缝转接人工
  • 多模态:支持语音输入(ASR)和图片理解

十二、知识图谱增强 RAG 系统设计

Q14: 设计一个知识图谱增强的 RAG 系统 ⭐⭐⭐

需求分析:

  • 自动从文档中构建知识图谱(实体、关系抽取)
  • 图查询 + 向量检索融合,提供多跳推理能力
  • 推理链展示:展示从问题到答案的推理路径
  • 支持增量更新和图谱质量校验

架构设计:

文档 → 知识图谱构建层
    ├→ 实体抽取(NER)
    ├→ 关系抽取(RE)
    └→ 实体消歧 & 融合

    知识图谱(Neo4j) + 向量库(Milvus)

用户查询 → 混合检索层
    ├→ 向量检索(语义相似文档)
    ├→ 图查询(实体关系遍历)
    └→ 子图检索(相关子图提取)

    融合排序 + 推理链构建

    LLM 生成(带推理链的回答)

核心模块:

python
from dataclasses import dataclass


@dataclass
class Entity:
    id: str
    name: str
    entity_type: str
    properties: dict
    embedding: list[float] | None = None


@dataclass
class Relation:
    source_id: str
    target_id: str
    relation_type: str
    properties: dict


class KnowledgeGraphBuilder:
    """知识图谱构建"""

    def __init__(self, llm, graph_store, ner_model):
        self.llm = llm
        self.graph = graph_store
        self.ner = ner_model

    async def build_from_document(self, doc_text: str):
        """从文档构建知识图谱"""
        # 1. 实体抽取
        entities = await self._extract_entities(doc_text)

        # 2. 关系抽取
        relations = await self._extract_relations(doc_text, entities)

        # 3. 实体消歧
        entities = await self._disambiguate(entities)

        # 4. 写入图数据库
        for e in entities:
            await self.graph.upsert_entity(e)
        for r in relations:
            await self.graph.upsert_relation(r)

    async def _extract_entities(self, text: str) -> list[Entity]:
        prompt = f"""从以下文本中抽取实体,返回 JSON 列表。
每个实体包含:name(实体名)、type(类型:人名/地名/组织/概念/技术/产品)。

文本:{text[:2000]}

返回:[{{"name": "...", "type": "..."}}]"""

        result = await self.llm.generate(prompt)
        raw = __import__("json").loads(result)
        return [Entity(
            id=self._make_id(e["name"]),
            name=e["name"],
            entity_type=e["type"],
            properties={},
        ) for e in raw]

    async def _extract_relations(self, text: str, entities: list[Entity]) -> list[Relation]:
        entity_names = [e.name for e in entities]
        prompt = f"""基于以下文本和实体列表,抽取实体间的关系。

实体:{entity_names}
文本:{text[:2000]}

返回 JSON 列表:
[{{"source": "实体A", "target": "实体B", "relation": "关系类型"}}]"""

        result = await self.llm.generate(prompt)
        raw = __import__("json").loads(result)
        return [Relation(
            source_id=self._make_id(r["source"]),
            target_id=self._make_id(r["target"]),
            relation_type=r["relation"],
            properties={},
        ) for r in raw]

    def _make_id(self, name: str) -> str:
        import hashlib
        return hashlib.md5(name.encode()).hexdigest()[:10]

    async def _disambiguate(self, entities: list[Entity]) -> list[Entity]:
        """简单消歧:同名实体合并"""
        seen = {}
        for e in entities:
            if e.name in seen:
                continue
            seen[e.name] = e
        return list(seen.values())


class KGRetriever:
    """知识图谱增强检索"""

    def __init__(self, graph_store, vector_store, embedder):
        self.graph = graph_store
        self.vector = vector_store
        self.embedder = embedder

    async def retrieve(self, query: str, top_k: int = 5) -> dict:
        # 1. 向量检索
        query_emb = self.embedder.encode(query)
        vec_results = await self.vector.search(query_emb, top_k=top_k)

        # 2. 图查询:提取 query 中的实体,遍历关系
        entities = await self._extract_query_entities(query)
        graph_results = []
        reasoning_paths = []
        for entity_name in entities:
            subgraph = await self.graph.get_neighbors(entity_name, max_hops=2)
            graph_results.extend(subgraph["nodes"])
            reasoning_paths.append(subgraph["path"])

        # 3. 融合
        return {
            "vector_context": [r["text"] for r in vec_results],
            "graph_context": [n["name"] + ": " + n.get("description", "")
                              for n in graph_results],
            "reasoning_paths": reasoning_paths,
        }

    async def _extract_query_entities(self, query: str) -> list[str]:
        """从查询中提取实体"""
        # 简化实现:实际可用 NER 模型
        return [query[:10]]  # 占位


class KGRagSystem:
    """知识图谱增强 RAG 系统"""

    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm

    async def query(self, question: str) -> dict:
        # 1. 混合检索
        context = await self.retriever.retrieve(question)

        # 2. 构建带推理链的 Prompt
        reasoning = ""
        if context["reasoning_paths"]:
            reasoning = "推理路径:\n"
            for path in context["reasoning_paths"]:
                reasoning += " → ".join(path) + "\n"

        prompt = f"""基于以下信息回答问题,并展示推理过程。

文档信息:
{chr(10).join(context['vector_context'])}

知识图谱信息:
{chr(10).join(context['graph_context'])}

{reasoning}

问题:{question}
请给出带推理链的回答。"""

        answer = await self.llm.generate(prompt)
        return {"answer": answer, "context": context}

技术选型:

模块推荐方案备选
图数据库Neo4j / NebulaGraphArangoDB
NER/RELLM (GPT-4o)spaCy, UIE
向量库MilvusQdrant
EmbeddingBGE-large-zhE5
LLMGPT-4o / Qwen-MaxDeepSeek-V3

扩展性考虑:

  • 图谱质量校验:自动检测孤立节点、矛盾关系
  • 增量更新:文档变更时增量更新图谱
  • 社区检测:识别图谱中的知识聚类
  • 图谱可视化:前端展示知识图谱和推理路径

十三、AI 代码审查系统设计

Q15: 设计一个 AI 代码审查系统 ⭐⭐

需求分析:

  • 代码分析:理解代码逻辑,发现潜在 Bug
  • 安全检测:SQL 注入、XSS、敏感信息泄露等
  • 风格检查:命名规范、代码复杂度、最佳实践
  • 自动建议:生成修复建议和代码改进方案
  • 集成 GitHub/GitLab PR 流程

架构设计:

PR Webhook 触发

代码审查服务
    ├→ 代码预处理(Diff 解析、AST 分析)
    ├→ 规则引擎(静态分析 + 安全扫描)
    │      ├→ Semgrep / Bandit(安全规则)
    │      ├→ Ruff / Pylint(风格规则)
    │      └→ 自定义规则
    ├→ LLM 分析层
    │      ├→ 代码理解(上下文收集)
    │      ├→ Bug 检测(逻辑错误)
    │      └→ 改进建议生成
    └→ 结果聚合

PR Comment(行级评论 + 总结)

核心模块:

python
import json
from dataclasses import dataclass


@dataclass
class CodeIssue:
    file_path: str
    line_number: int
    severity: str      # "error" / "warning" / "info"
    category: str      # "bug" / "security" / "style" / "performance"
    message: str
    suggestion: str = ""
    rule_id: str = ""


class DiffParser:
    """解析 Git Diff"""

    def parse(self, diff_text: str) -> list[dict]:
        """解析 diff,返回变更文件和行信息"""
        files = []
        current_file = None

        for line in diff_text.split("\n"):
            if line.startswith("+++ b/"):
                current_file = {"path": line[6:], "changes": []}
                files.append(current_file)
            elif line.startswith("@@"):
                # 解析行号
                parts = line.split(" ")
                new_start = int(parts[2].split(",")[0].replace("+", ""))
                current_file["current_line"] = new_start
            elif current_file and line.startswith("+") and not line.startswith("+++"):
                current_file["changes"].append({
                    "type": "add",
                    "line": current_file["current_line"],
                    "content": line[1:],
                })
                current_file["current_line"] += 1
            elif current_file and line.startswith("-") and not line.startswith("---"):
                current_file["changes"].append({
                    "type": "delete",
                    "content": line[1:],
                })
            elif current_file:
                current_file["current_line"] = current_file.get("current_line", 0) + 1

        return files


class RuleEngine:
    """静态规则引擎"""

    def __init__(self):
        self.rules = [
            self._check_sql_injection,
            self._check_hardcoded_secrets,
            self._check_eval_usage,
            self._check_complexity,
        ]

    def analyze(self, file_path: str, content: str) -> list[CodeIssue]:
        issues = []
        for rule in self.rules:
            issues.extend(rule(file_path, content))
        return issues

    def _check_sql_injection(self, path: str, content: str) -> list[CodeIssue]:
        issues = []
        for i, line in enumerate(content.split("\n"), 1):
            if "f\"" in line and ("SELECT" in line.upper() or "INSERT" in line.upper()):
                issues.append(CodeIssue(
                    file_path=path, line_number=i,
                    severity="error", category="security",
                    message="可能存在 SQL 注入风险:使用 f-string 拼接 SQL",
                    suggestion="使用参数化查询代替字符串拼接",
                    rule_id="SEC001",
                ))
        return issues

    def _check_hardcoded_secrets(self, path: str, content: str) -> list[CodeIssue]:
        import re
        issues = []
        patterns = [
            (r'(?:password|secret|api_key)\s*=\s*["\'][^"\']{8,}', "硬编码密钥"),
            (r'sk-[a-zA-Z0-9]{20,}', "OpenAI API Key"),
        ]
        for i, line in enumerate(content.split("\n"), 1):
            for pattern, desc in patterns:
                if re.search(pattern, line, re.IGNORECASE):
                    issues.append(CodeIssue(
                        file_path=path, line_number=i,
                        severity="error", category="security",
                        message=f"检测到硬编码敏感信息:{desc}",
                        suggestion="使用环境变量或密钥管理服务",
                        rule_id="SEC002",
                    ))
        return issues

    def _check_eval_usage(self, path: str, content: str) -> list[CodeIssue]:
        issues = []
        for i, line in enumerate(content.split("\n"), 1):
            if "eval(" in line and "safe" not in line.lower():
                issues.append(CodeIssue(
                    file_path=path, line_number=i,
                    severity="warning", category="security",
                    message="使用 eval() 可能存在代码注入风险",
                    suggestion="使用 ast.literal_eval() 或其他安全替代方案",
                    rule_id="SEC003",
                ))
        return issues

    def _check_complexity(self, path: str, content: str) -> list[CodeIssue]:
        issues = []
        for i, line in enumerate(content.split("\n"), 1):
            indent = len(line) - len(line.lstrip())
            if indent > 20:
                issues.append(CodeIssue(
                    file_path=path, line_number=i,
                    severity="info", category="style",
                    message="嵌套层级过深,建议重构",
                    suggestion="考虑使用 early return 或提取子函数",
                    rule_id="STYLE001",
                ))
        return issues


class LLMCodeAnalyzer:
    """LLM 代码分析"""

    def __init__(self, llm):
        self.llm = llm

    async def analyze(self, file_path: str, diff_content: str,
                      full_content: str) -> list[CodeIssue]:
        prompt = f"""你是一位资深代码审查专家。请审查以下代码变更。

文件:{file_path}

变更内容:


完整文件上下文:


请从以下角度审查:
1. 潜在 Bug 和逻辑错误
2. 性能问题
3. 代码可读性和维护性

返回 JSON 列表:
[{{"line": 行号, "severity": "error/warning/info", "category": "类别",
  "message": "问题描述", "suggestion": "修复建议"}}]
"""
        result = await self.llm.generate(prompt)
        raw_issues = json.loads(result)
        return [CodeIssue(
            file_path=file_path,
            line_number=issue["line"],
            severity=issue["severity"],
            category=issue["category"],
            message=issue["message"],
            suggestion=issue.get("suggestion", ""),
            rule_id="LLM",
        ) for issue in raw_issues]


class CodeReviewService:
    """代码审查主服务"""

    def __init__(self, rule_engine, llm_analyzer, diff_parser, pr_client):
        self.rule_engine = rule_engine
        self.llm_analyzer = llm_analyzer
        self.diff_parser = diff_parser
        self.pr_client = pr_client

    async def review_pr(self, repo: str, pr_number: int):
        """审查 PR"""
        # 1. 获取 PR diff
        diff = await self.pr_client.get_diff(repo, pr_number)
        files = self.diff_parser.parse(diff)

        all_issues = []
        for f in files:
            path = f["path"]
            content = "\n".join(c["content"] for c in f["changes"])

            # 2. 规则引擎检查
            rule_issues = self.rule_engine.analyze(path, content)
            all_issues.extend(rule_issues)

            # 3. LLM 深度分析
            full_content = await self.pr_client.get_file_content(repo, path)
            llm_issues = await self.llm_analyzer.analyze(path, content, full_content)
            all_issues.extend(llm_issues)

        # 4. 去重和排序
        all_issues = self._deduplicate(all_issues)
        all_issues.sort(key=lambda x: {"error": 0, "warning": 1, "info": 2}[x.severity])

        # 5. 发布评论
        await self._post_comments(repo, pr_number, all_issues)
        return all_issues

    def _deduplicate(self, issues: list[CodeIssue]) -> list[CodeIssue]:
        seen = set()
        unique = []
        for issue in issues:
            key = (issue.file_path, issue.line_number, issue.rule_id)
            if key not in seen:
                seen.add(key)
                unique.append(issue)
        return unique

    async def _post_comments(self, repo: str, pr_number: int,
                              issues: list[CodeIssue]):
        """发布 PR 行级评论"""
        summary = f"## 🤖 AI 代码审查报告\n\n"
        summary += f"发现 **{len(issues)}** 个问题:\n"
        summary += f"- 🔴 错误: {sum(1 for i in issues if i.severity == 'error')}\n"
        summary += f"- 🟡 警告: {sum(1 for i in issues if i.severity == 'warning')}\n"
        summary += f"- 🔵 建议: {sum(1 for i in issues if i.severity == 'info')}\n"

        await self.pr_client.post_comment(repo, pr_number, summary)
        for issue in issues:
            await self.pr_client.post_review_comment(
                repo, pr_number, issue.file_path, issue.line_number,
                f"**[{issue.severity.upper()}]** {issue.message}\n\n"
                f"💡 {issue.suggestion}"
            )

技术选型:

模块推荐方案备选
静态分析Semgrep + BanditSonarQube
风格检查RuffPylint, Flake8
LLMGPT-4o / Claude 3.5DeepSeek-Coder
代码索引Tree-sitterAST
PR 集成GitHub API / GitLab API-

扩展性考虑:

  • 自定义规则:团队可定义自己的审查规则
  • 学习模式:从人工 Review 中学习团队偏好
  • 增量审查:只审查变更文件,优化性能
  • 多语言支持:通过 Tree-sitter 支持多种编程语言

LLM 应用 & Agent 开发面试准备