Skip to content

08. 多智能体系统

面向大模型应用工程师 / Agent 开发工程师的高频面试题


一、多 Agent 基础

Q: 什么是多智能体系统?为什么需要多个 Agent?

难度:⭐ 基础

答案:

多智能体系统(Multi-Agent System, MAS)是由多个自主的 Agent 组成的系统,它们通过协作、竞争或分工来完成单个 Agent 难以独立完成的复杂任务。

类比理解: 想象一家公司——CEO 不可能一个人做所有事情。需要产品经理写需求、设计师画图、程序员写代码、测试找 Bug。每个人专注自己的领域,通过协作完成一个大项目。多智能体系统就是把这种"团队协作"模式搬到 AI 系统中。

为什么需要多个 Agent?

  1. 能力互补:不同 Agent 可以使用不同的工具、不同的模型、不同的提示词。一个 Agent 擅长写代码,另一个擅长做数据分析,第三个擅长生成报告。
  2. 复杂度分解:一个超长的 System Prompt 可能超出上下文窗口,也容易让模型"走神"。拆成多个 Agent 各司其职,每个 Agent 的提示词更聚焦。
  3. 质量提升:通过"自我检查"或"相互审核"机制,一个 Agent 生成内容,另一个 Agent 检查质量,类似代码审查(Code Review)。
  4. 并行加速:独立的子任务可以并行执行,大幅缩短总耗时。

核心思想: 分而治之(Divide and Conquer),专业化分工(Specialization)。

python
# 单 Agent vs 多 Agent 的直观对比

# 单 Agent:一个提示词做所有事情
single_agent_prompt = """
你是一个全能助手。请分析数据、写代码、生成报告、
检查错误、翻译成英文...
# 结果:每个任务都做得"还行",但没有一个做到极致
"""

# 多 Agent:各司其职
agents = {
    "data_analyst": "你是数据分析专家,专注于数据清洗和统计分析。",
    "coder": "你是 Python 专家,专注于编写高效、可维护的代码。",
    "reviewer": "你是代码审查专家,专注于发现 Bug 和安全隐患。",
    "translator": "你是专业翻译,专注于技术文档的中英互译。",
}
# 结果:每个任务都由最合适的"专家"完成,整体质量更高

追问:

  • Q: 多 Agent 是不是越多越好? 不是。Agent 数量增加会带来通信开销、协调复杂度和成本的指数级增长。一般建议 3-5 个 Agent 组成一个团队,超过 10 个就需要非常精心的架构设计。遵循"奥卡姆剃刀"原则——能用单 Agent 解决的,就不要用多 Agent。

  • Q: 多 Agent 系统和微服务架构有什么相似之处? 非常相似。微服务是把单体应用拆成多个独立服务,通过 API 通信;多 Agent 是把一个复杂任务拆成多个 Agent,通过消息传递协作。两者都面临服务发现、负载均衡、错误处理、分布式状态管理等挑战。


Q: 多 Agent 和单 Agent 的区别?什么时候用多 Agent?

难度:⭐ 基础

答案:

维度单 Agent多 Agent
架构复杂度
调试难度
任务范围适合明确、单一的任务适合复杂、多步骤的任务
Token 消耗较少较多(Agent 间通信消耗)
容错性单点失败可设计冗余
可扩展性差(提示词膨胀)好(增加新 Agent)

什么时候用多 Agent?

  1. 任务需要多种专业知识:比如"帮我分析竞品并生成报告"——需要搜索、分析、写作多个环节。
  2. 任务需要质量保证:比如"写代码并测试"——一个 Agent 写,另一个 Agent 测试。
  3. 任务可以并行化:比如"同时分析 5 个数据源"——5 个 Agent 并行工作。
  4. 任务需要迭代改进:比如"写文章并反复修改"——写作 Agent 和审稿 Agent 多轮交互。

什么时候不需要多 Agent?

  • 简单的问答、翻译、摘要等单一任务
  • 对延迟极度敏感的场景(多 Agent 通信会增加延迟)
  • 预算有限的场景(多 Agent = 多次 LLM 调用 = 更高成本)
python
# 判断是否需要多 Agent 的决策树
def should_use_multi_agent(task):
    """简单决策逻辑"""
    # 问题1:任务是否可以分解为独立子任务?
    subtasks = decompose(task)
    if len(subtasks) <= 1:
        return False  # 单 Agent 就够了

    # 问题2:子任务是否需要不同的专业知识?
    required_skills = set(get_skills(st) for st in subtasks)
    if len(required_skills) == 1:
        return False  # 一个 Agent 就能处理所有子任务

    # 问题3:对延迟是否敏感?
    if task.latency_critical:
        return False  # 多 Agent 通信会增加延迟

    # 问题4:预算是否充足?
    if task.budget < estimate_multi_agent_cost(subtasks):
        return False  # 成本太高

    return True  # 适合使用多 Agent

追问:

  • Q: 一个 Agent 配合多个工具,和多个 Agent 各配一个工具,效果一样吗? 不完全一样。单 Agent + 多工具的优势是上下文连贯、无通信开销;但当工具数量超过 10-15 个时,模型选择工具的准确率会显著下降。多 Agent 各配少量工具,每个 Agent 的工具选择更精准,但需要协调开销。

  • Q: 如何在性能和成本之间权衡? 可以先用单 Agent 做 MVP,当发现质量不达标或提示词过长时再拆分成多 Agent。这是一种"渐进式复杂化"的策略。


Q: 多 Agent 系统的经典架构有哪些?

难度:⭐⭐ 进阶

答案:

多 Agent 系统有三种经典架构:

1. 层级架构(Hierarchical)

像公司的组织架构——有上级和下级。顶层是"管理者"Agent,负责分解任务和分配工作;底层是"执行者"Agent,负责具体执行。

           [Supervisor Agent]
           /       |       \
    [Agent A]  [Agent B]  [Agent C]
    (搜索)     (分析)     (写作)

2. 对等架构(Peer-to-Peer)

没有上下级关系,所有 Agent 地位平等,通过协商和对话完成任务。类似一群专家圆桌讨论。

    [Agent A] ←→ [Agent B]
        ↕            ↕
    [Agent C] ←→ [Agent D]

3. 混合架构(Hybrid)

结合层级和对等的特点。比如在子团队内部用对等协作,团队之间用层级管理。

python
# 三种架构的代码示意

# 1. 层级架构
class HierarchicalSystem:
    def __init__(self):
        self.supervisor = SupervisorAgent()
        self.workers = [SearchAgent(), AnalysisAgent(), WriterAgent()]

    def run(self, task):
        plan = self.supervisor.plan(task)  # 管理者分解任务
        results = []
        for subtask, worker in zip(plan, self.workers):
            results.append(worker.execute(subtask))  # 工人执行
        return self.supervisor.synthesize(results)  # 管理者汇总

# 2. 对等架构
class PeerSystem:
    def __init__(self):
        self.agents = [AgentA(), AgentB(), AgentC()]

    def run(self, task):
        context = {"task": task}
        for round in range(max_rounds):
            for agent in self.agents:
                response = agent.respond(context)
                context["messages"].append(response)  # 所有人共享对话
            if is_consensus(context):
                break
        return context["messages"][-1]

# 3. 混合架构
class HybridSystem:
    def __init__(self):
        # 研究团队:对等协作
        self.research_team = PeerSubTeam([SearchAgent(), ScholarAgent()])
        # 开发团队:对等协作
        self.dev_team = PeerSubTeam([CoderAgent(), TestAgent()])
        # 总监:层级管理
        self.director = DirectorAgent([self.research_team, self.dev_team])

    def run(self, task):
        return self.director.orchestrate(task)

追问:

  • Q: 实际项目中哪种架构最常用? 层级架构最常用,因为它结构清晰、易于调试和控制。对等架构更适合创意类任务(如辩论、头脑风暴),但调试困难。建议从层级架构开始,根据需要引入对等元素。

  • Q: 如何选择架构? 看任务特性:如果任务有明确的先后顺序和上下级关系,用层级;如果需要多方观点碰撞,用对等;如果既有分工又有协作需求,用混合。


二、协作模式

Q: 什么是 Sequential 模式?Agent 串行执行?

难度:⭐ 基础

答案:

Sequential(顺序执行)模式是最简单的多 Agent 协作方式——Agent A 的输出作为 Agent B 的输入,Agent B 的输出作为 Agent C 的输入,像流水线(Pipeline)一样依次执行。

类比理解: 工厂流水线——第一个工人加工零件,传给第二个工人打磨,再传给第三个工人上漆。每个工人的输出就是下一个工人的输入。

[Agent A: 搜索] → [Agent B: 分析] → [Agent C: 写报告]
python
# Sequential 模式实现
class SequentialPipeline:
    def __init__(self, agents: list):
        self.agents = agents

    def run(self, input_data: str) -> str:
        current_input = input_data
        for agent in self.agents:
            current_input = agent.run(current_input)
            print(f"[{agent.name}] 完成,输出: {current_input[:100]}...")
        return current_input

# 使用示例
pipeline = SequentialPipeline([
    SearchAgent(),    # 第一步:搜索相关信息
    AnalysisAgent(),  # 第二步:分析搜索结果
    WriterAgent(),    # 第三步:生成报告
])
result = pipeline.run("分析2024年AI行业趋势")

优点:

  • 简单直观,易于理解和调试
  • 上下文清晰传递,每个 Agent 看到前一个 Agent 的完整输出
  • 适合有明确先后依赖的任务

缺点:

  • 不支持并行,总耗时 = 所有 Agent 耗时之和
  • 任一环节失败,整个流程中断
  • 信息在传递过程中可能丢失(上下文窗口限制)

追问:

  • Q: Sequential 模式中如何控制传递的信息量? 可以在每个 Agent 执行后做一个"摘要压缩"步骤,把关键信息提取出来传递给下一个 Agent,避免上下文膨胀。也可以设计结构化的中间输出格式(如 JSON),只传递必要字段。

  • Q: 如果某个 Agent 执行时间很长怎么办? 可以设置超时机制和降级策略。比如搜索 Agent 超时后,使用缓存的历史结果或直接跳过,把"搜索未完成"的状态传给下游 Agent,让它们基于已有信息尽力生成结果。


Q: 什么是 Parallel 模式?Agent 并行执行?

难度:⭐⭐ 进阶

答案:

Parallel(并行执行)模式让多个 Agent 同时处理同一任务的不同方面,最后汇总结果。就像多个人同时从不同角度分析同一个问题,然后开会汇总。

类比理解: 医院会诊——同一个病人的血液检验、影像检查、心电图检查同时进行,最后由主治医生综合所有检查结果做出诊断。

python
import asyncio

# Parallel 模式实现
class ParallelSystem:
    def __init__(self, agents: list):
        self.agents = agents

    async def run(self, input_data: str) -> str:
        # 所有 Agent 并行执行
        tasks = [agent.arun(input_data) for agent in self.agents]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 汇总结果
        valid_results = [r for r in results if not isinstance(r, Exception)]
        return self.merge(valid_results)

    def merge(self, results: list) -> str:
        # 可以用 LLM 来汇总,也可以简单拼接
        merged = "\n\n---\n\n".join(results)
        return merged

# 使用示例:同时从多个角度分析
system = ParallelSystem([
    TechAnalystAgent(),      # 技术角度分析
    MarketAnalystAgent(),    # 市场角度分析
    RiskAnalystAgent(),      # 风险角度分析
])
result = asyncio.run(system.run("评估GPT-5的技术影响"))

更实用的实现——用 asyncio + 超时控制:

python
import asyncio
from typing import Optional

class RobustParallelSystem:
    def __init__(self, agents: list, timeout: float = 30.0):
        self.agents = agents
        self.timeout = timeout

    async def run(self, input_data: str) -> dict:
        tasks = {}
        for agent in self.agents:
            task = asyncio.create_task(agent.arun(input_data))
            tasks[agent.name] = task

        results = {}
        done, pending = await asyncio.wait(
            tasks.values(),
            timeout=self.timeout,
            return_when=asyncio.ALL_COMPLETED
        )

        for name, task in tasks.items():
            if task in done:
                try:
                    results[name] = task.result()
                except Exception as e:
                    results[name] = f"Error: {e}"
            else:
                task.cancel()
                results[name] = "Timeout: 未在规定时间内完成"

        return results

追问:

  • Q: 并行 Agent 处理同一个输入,结果冲突怎么办? 常见策略有三种:① 投票表决(多数决);② 加权平均(可信度高的 Agent 权重大);③ 交给仲裁 Agent 做最终决定。辩论模式(Debate)就是解决冲突的一种方式。

  • Q: 并行模式的 Agent 之间需要通信吗? 基本的 Parallel 模式不需要——每个 Agent 独立工作,最后汇总。但有些高级变体允许 Agent 在执行过程中交换中间结果(类似分布式计算中的 All-Reduce),称为"协作并行"。


Q: 什么是 Delegation 模式?任务分解与分配?

难度:⭐⭐ 进阶

答案:

Delegation(委托)模式是指一个"规划者"Agent 把复杂任务分解成多个子任务,然后分配给不同的"执行者"Agent 去完成。核心是"谁能做什么"的匹配。

类比理解: 项目经理接到客户需求后,把需求拆成具体的工作项(User Story),然后根据团队成员的技能分配给不同的人去做。

python
class DelegationSystem:
    def __init__(self):
        self.planner = PlannerAgent()  # 规划者
        self.agents = {
            "search": SearchAgent(),
            "code": CoderAgent(),
            "analysis": AnalysisAgent(),
            "writing": WriterAgent(),
        }

    def run(self, task: str) -> str:
        # 第一步:规划者分解任务
        subtasks = self.planner.decompose(task)
        # 输出示例:
        # [
        #   {"id": 1, "desc": "搜索最新AI论文", "assignee": "search", "deps": []},
        #   {"id": 2, "desc": "分析论文技术细节", "assignee": "analysis", "deps": [1]},
        #   {"id": 3, "desc": "写总结报告", "assignee": "writing", "deps": [2]},
        # ]

        # 第二步:按依赖关系执行
        results = {}
        for subtask in subtasks:
            # 等待依赖任务完成
            deps_result = [results[dep] for dep in subtask["deps"]]
            context = self.build_context(subtask, deps_result)

            # 分配给对应 Agent
            agent = self.agents[subtask["assignee"]]
            results[subtask["id"]] = agent.run(context)

        # 第三步:汇总最终结果
        return self.planner.synthesize(results)

    def build_context(self, subtask, deps_result):
        context = f"任务: {subtask['desc']}\n"
        if deps_result:
            context += "前序结果:\n" + "\n".join(deps_result)
        return context

关键设计点:

  1. 任务分解提示词设计:要明确告诉规划者有哪些可用的 Agent 及其能力。
  2. 任务分配策略:基于 Agent 能力匹配,或让规划者自己决定。
  3. 结果汇总:规划者或专门的汇总 Agent 把所有子结果整合。
python
# Planner 的提示词设计
PLANNER_PROMPT = """
你是一个任务规划专家。你有以下可用的 Agent:
- search: 擅长搜索互联网信息、查找资料
- code: 擅长编写和调试 Python 代码
- analysis: 擅长数据分析、统计和可视化
- writing: 擅长撰写报告、文章

请把用户的任务分解为子任务,以 JSON 格式输出:
[
  {"id": 1, "desc": "子任务描述", "assignee": "agent名", "deps": [依赖的任务id]}
]

注意:
- 每个子任务应该足够小,单个 Agent 可以独立完成
- 明确标注任务之间的依赖关系
- 尽量让没有依赖的任务可以并行执行
"""

追问:

  • Q: 如果规划者分解的任务不合理怎么办? 可以加入"反思"机制:执行者完成后,让规划者检查结果是否符合预期。如果不满意,可以重新分解或调整。也可以加入"人工审核"步骤,让用户确认计划后再执行。

  • Q: Delegation 和 Sequential + Parallel 的区别? Delegation 是一种更高级的模式,它动态地决定任务分解和执行顺序,可以自动实现 Sequential 和 Parallel 的混合。而 Sequential 和 Parallel 是静态定义的执行模式。


Q: 什么是 Debate 模式?Agent 之间辩论决策?

难度:⭐⭐ 进阶

答案:

Debate(辩论)模式让多个 Agent 就同一个问题发表不同观点,通过多轮辩论达成共识或由裁判做出最终裁决。这种方法特别适合需要多角度思考的决策问题。

类比理解: 法庭审判——控方律师和辩方律师各自陈述观点、反驳对方,最后由法官(裁判)做出判决。通过"对抗"来逼近真相。

Round 1:
  [Agent A]: 我认为应该用微服务架构,因为...
  [Agent B]: 我反对,微服务会增加复杂度...

Round 2:
  [Agent A]: 你说的复杂度可以通过容器化解决...
  [Agent B]: 但团队规模小,维护成本太高...

Round 3:
  [Judge Agent]: 综合双方观点,考虑到团队规模...
python
class DebateSystem:
    def __init__(self, debaters: list, judge: "JudgeAgent", max_rounds: int = 3):
        self.debaters = debaters  # 辩论者列表
        self.judge = judge        # 裁判
        self.max_rounds = max_rounds

    def run(self, question: str) -> str:
        debate_history = []

        for round_num in range(self.max_rounds):
            round_responses = []
            for debater in self.debaters:
                context = self._build_context(question, debate_history, round_num)
                response = debater.debate(context)
                round_responses.append({
                    "agent": debater.name,
                    "round": round_num + 1,
                    "argument": response
                })
            debate_history.append(round_responses)

        # 裁判做出最终裁决
        verdict = self.judge.judge(question, debate_history)
        return verdict

    def _build_context(self, question, history, current_round):
        context = f"问题: {question}\n\n"
        for round_num, round_data in enumerate(history):
            context += f"--- 第{round_num + 1}轮 ---\n"
            for item in round_data:
                context += f"[{item['agent']}]: {item['argument']}\n"
        if current_round > 0:
            context += "\n请针对前面的观点进行反驳或补充,提出你的论点。"
        else:
            context += "\n请提出你的初始论点。"
        return context

Debate 模式的变体:

  1. 双人辩论:两个 Agent 正反方对抗
  2. 多人圆桌:多个 Agent 各抒己见
  3. 自我辩论:同一个 Agent 扮演正反两个角色(Self-Play)
  4. 陪审团模式:多个"陪审员" Agent 投票表决
python
# Self-Debate:一个 Agent 自己跟自己辩论
class SelfDebateAgent:
    def __init__(self, model):
        self.model = model

    def run(self, question: str) -> str:
        # 正方
        pro = self.model.chat(f"请支持以下观点:{question}")
        # 反方
        con = self.model.chat(f"请反对以下观点:{question}\n对方的论点:{pro}")
        # 最终判断
        verdict = self.model.chat(f"""
            问题:{question}
            正方论点:{pro}
            反方论点:{con}
            请综合双方观点,给出你的最终判断和理由。
        """)
        return verdict

追问:

  • Q: Debate 模式会不会导致"废话连篇"? 确实有这个风险。解决方案:① 限制轮数(通常 2-3 轮就够了);② 要求每轮必须提出新论点,不能重复;③ 裁判设置质量标准,不达标的论点直接忽略。

  • Q: Debate 模式的成本如何? 成本较高——每个 Agent 每轮都是一次 LLM 背调。3 个 Agent 辩论 3 轮 = 9 次 LLM 调用 + 1 次裁判调用 = 10 次调用。适合高价值决策(如架构选型、投资建议),不适合日常任务。


Q: 什么是 Supervisor 模式?一个 Agent 管理其他 Agent?

难度:⭐⭐ 进阶

答案:

Supervisor(管理者)模式是最常用的多 Agent 架构。一个"管理者" Agent 负责理解用户意图、选择合适的"工作" Agent、传递任务、汇总结果。它本质上是一个"智能路由器"。

类比理解: 餐厅经理——顾客点菜后,经理根据菜品类型分配给不同的厨师(中餐厨师、西餐厨师、甜点师),厨师做好后由经理汇总上菜。

python
class SupervisorSystem:
    def __init__(self, workers: dict, model):
        self.workers = workers  # {"name": Agent}
        self.model = model
        self.tools = self._build_routing_tools()

    def _build_routing_tools(self):
        """把每个 worker 封装成 supervisor 可调用的工具"""
        tools = []
        for name, worker in self.workers.items():
            tools.append({
                "name": f"delegate_to_{name}",
                "description": worker.description,
                "parameters": {"task": "string"}
            })
        return tools

    def run(self, user_input: str) -> str:
        messages = [
            {"role": "system", "content": self._supervisor_prompt()},
            {"role": "user", "content": user_input}
        ]

        while True:
            # Supervisor 决定下一步动作
            response = self.model.chat(messages, tools=self.tools)

            if response.tool_calls:
                # 调用对应的 worker
                for call in response.tool_calls:
                    worker = self.workers[call.function.name.replace("delegate_to_", "")]
                    result = worker.run(call.function.arguments["task"])
                    messages.append({"role": "tool", "content": result})
            else:
                # Supervisor 给出最终回答
                return response.content

    def _supervisor_prompt(self):
        worker_desc = "\n".join(
            f"- {name}: {w.description}" for name, w in self.workers.items()
        )
        return f"""
你是一个任务管理者。你可以将任务委派给以下专家:
{worker_desc}

规则:
1. 分析用户需求,选择最合适的专家来处理
2. 如果需要多个专家协作,依次调用
3. 汇总所有专家的结果,给出最终回答
4. 如果任务很简单,你可以直接回答,不需要委派
"""

Supervisor 模式在 LangGraph 中的实现:

python
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import create_react_agent

# 定义各个 worker agent
research_agent = create_react_agent(model, tools=[search_tool, wiki_tool])
code_agent = create_react_agent(model, tools=[python_tool])

# Supervisor 的路由逻辑
def supervisor_node(state: MessagesState):
    """根据对话内容决定路由到哪个 agent"""
    messages = state["messages"]
    last_message = messages[-1].content

    if "代码" in last_message or "程序" in last_message:
        return "code_agent"
    elif "搜索" in last_message or "查找" in last_message:
        return "research_agent"
    else:
        return END

# 构建图
graph = StateGraph(MessagesState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("research_agent", research_agent)
graph.add_node("code_agent", code_agent)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_node)
graph.add_edge("research_agent", "supervisor")
graph.add_edge("code_agent", "supervisor")

追问:

  • Q: Supervisor 怎么知道该调用哪个 Agent? 两种方式:① 基于规则的路由(关键词匹配,快但不灵活);② 基于 LLM 的路由(让 Supervisor 模型自己判断,灵活但慢且有成本)。实际项目中常把两者结合——简单情况用规则,复杂情况用 LLM。

  • Q: Supervisor 模式的瓶颈在哪里? Supervisor Agent 本身。如果 Supervisor 的能力不够(模型不够强或提示词不够好),它可能会错误地分配任务或遗漏必要的步骤。建议 Supervisor 使用能力最强的模型。


三、通信与协调

Q: Agent 之间如何通信?有哪些消息传递机制?

难度:⭐⭐ 进阶

答案:

Agent 之间的通信方式决定了系统的灵活性和效率。主要有以下几种机制:

1. 直接消息传递(Direct Messaging)

Agent A 直接调用 Agent B,类似函数调用。最简单但耦合度高。

python
# 直接调用
class AgentA:
    def __init__(self, agent_b):
        self.agent_b = agent_b  # 直接持有引用

    def run(self, task):
        sub_result = self.agent_b.run("请帮我查一下...")  # 直接调用
        return f"基于B的结果: {sub_result}"

2. 消息队列(Message Queue)

Agent 通过消息队列异步通信,解耦发送方和接收方。

python
import asyncio
from collections import defaultdict

class MessageBus:
    """简单的消息总线"""
    def __init__(self):
        self.queues = defaultdict(asyncio.Queue)

    async def send(self, to: str, message: dict):
        await self.queues[to].put(message)

    async def receive(self, agent_name: str) -> dict:
        return await self.queues[agent_name].get()

# Agent 通过消息总线通信
class AsyncAgent:
    def __init__(self, name: str, bus: MessageBus):
        self.name = name
        self.bus = bus

    async def send_to(self, target: str, content: str):
        await self.bus.send(target, {
            "from": self.name,
            "content": content
        })

    async def receive(self) -> dict:
        return await self.bus.receive(self.name)

3. 共享状态(Shared State)

所有 Agent 读写同一个共享状态对象,类似全局变量。

python
class SharedState:
    def __init__(self):
        self.data = {}
        self.lock = asyncio.Lock()

    async def update(self, key: str, value, agent_name: str):
        async with self.lock:
            self.data[key] = {
                "value": value,
                "updated_by": agent_name,
                "timestamp": time.time()
            }

    def get(self, key: str):
        return self.data.get(key)

4. 事件驱动(Event-Driven)

Agent 发布事件,其他 Agent 订阅感兴趣的事件。

python
class EventBus:
    def __init__(self):
        self.subscribers = defaultdict(list)

    def subscribe(self, event_type: str, handler):
        self.subscribers[event_type].append(handler)

    async def publish(self, event_type: str, data: dict):
        for handler in self.subscribers[event_type]:
            await handler(data)

# 使用
bus = EventBus()
bus.subscribe("search_complete", analysis_agent.on_search_complete)
bus.subscribe("analysis_done", writer_agent.on_analysis_done)
await bus.publish("search_complete", {"results": [...]})
通信方式耦合度复杂度适用场景
直接调用简单的两 Agent 协作
消息队列异步、解耦的多 Agent
共享状态需要共享上下文的场景
事件驱动复杂的动态协作

追问:

  • Q: 实际开发中用哪种方式最多? 大多数框架(如 CrewAI、AutoGen)默认使用"共享对话历史"——所有 Agent 共享同一个消息列表,这本质上是一种简化的共享状态。对于简单场景足够用,但在 Agent 数量增多时会遇到上下文膨胀的问题。

  • Q: 如何保证消息的可靠性? 生产环境中需要考虑:消息重试(失败后重发)、消息去重(防止重复处理)、死信队列(无法处理的消息暂存)、超时机制(防止无限等待)。


Q: 如何实现 Agent 之间的信息共享?

难度:⭐⭐ 进阶

答案:

信息共享是多 Agent 系统的核心挑战。共享太多信息会造成上下文膨胀和隐私泄露;共享太少又会导致 Agent 缺乏必要的背景信息。

核心原则:最小必要信息原则(Need-to-Know Basis)

python
class InformationBroker:
    """信息经纪人——控制 Agent 间的信息流动"""

    def __init__(self):
        self.global_context = {}      # 全局共享信息
        self.agent_contexts = {}      # 各 Agent 的私有信息
        self.access_control = {}      # 访问控制

    def set_global(self, key: str, value):
        """设置全局可见信息"""
        self.global_context[key] = value

    def set_agent_context(self, agent_name: str, key: str, value):
        """设置 Agent 私有信息"""
        if agent_name not in self.agent_contexts:
            self.agent_contexts[agent_name] = {}
        self.agent_contexts[agent_name][key] = value

    def get_context_for(self, agent_name: str) -> dict:
        """获取某个 Agent 应该看到的所有信息"""
        context = dict(self.global_context)

        # 添加该 Agent 的私有信息
        if agent_name in self.agent_contexts:
            context.update(self.agent_contexts[agent_name])

        # 添加其他 Agent 的摘要信息(非完整信息)
        for other_name, other_ctx in self.agent_contexts.items():
            if other_name != agent_name:
                context[f"{other_name}_summary"] = self._summarize(other_ctx)

        return context

    def _summarize(self, context: dict) -> str:
        """将上下文压缩为摘要,减少信息量"""
        return " | ".join(f"{k}: {str(v)[:50]}" for k, v in context.items())

信息共享的三种粒度:

python
# 1. 完整共享(简单但浪费)
def full_sharing(agents):
    shared_history = []
    for agent in agents:
        result = agent.run(shared_history)
        shared_history.append({"agent": agent.name, "result": result})
    # 所有 Agent 都能看到所有历史

# 2. 摘要共享(压缩信息)
def summary_sharing(agents):
    summaries = {}
    for agent in agents:
        result = agent.run(summaries)
        summaries[agent.name] = llm_summarize(result)  # 用 LLM 压缩
    # Agent 只看到其他 Agent 的摘要

# 3. 结构化共享(精确控制)
def structured_sharing(agents):
    shared_data = {"facts": [], "decisions": [], "open_questions": []}
    for agent in agents:
        result = agent.run(shared_data)
        # Agent 输出结构化信息
        parsed = parse_structured_output(result)
        shared_data["facts"].extend(parsed.get("new_facts", []))
        shared_data["decisions"].extend(parsed.get("decisions", []))

追问:

  • Q: 如何处理信息冲突?比如 Agent A 说"股价涨了",Agent B 说"股价跌了"。 需要引入"信息可信度"机制:① 标注信息来源和时间戳;② 对冲突信息进行验证(调用权威数据源);③ 让 Supervisor 裁决冲突。

Q: 如何处理 Agent 之间的冲突?

难度:⭐⭐ 进阶

答案:

多 Agent 系统中冲突不可避免——两个 Agent 可能对同一个问题给出矛盾的结论,或者争抢同一个资源。处理冲突的策略:

1. 优先级机制(Priority)

给 Agent 设置优先级,冲突时高优先级 Agent 的意见胜出。

python
class PriorityResolver:
    def resolve(self, conflicting_results: list) -> dict:
        # 按优先级排序
        sorted_results = sorted(
            conflicting_results,
            key=lambda x: x["agent_priority"],
            reverse=True
        )
        return sorted_results[0]  # 返回最高优先级的结果

2. 投票机制(Voting)

多个 Agent 投票,少数服从多数。

python
class VotingResolver:
    def resolve(self, votes: list) -> str:
        from collections import Counter
        vote_counts = Counter(v["decision"] for v in votes)
        return vote_counts.most_common(1)[0][0]  # 返回票数最多的选项

3. 裁判机制(Arbiter)

引入一个中立的裁判 Agent 做最终决定。

python
class ArbiterResolver:
    def __init__(self, judge_model):
        self.judge = judge_model

    def resolve(self, question: str, conflicting_views: list) -> str:
        views_text = "\n".join(
            f"[{v['agent']}]: {v['argument']}" for v in conflicting_views
        )
        prompt = f"""
        问题:{question}
        不同观点:
        {views_text}

        请综合分析各观点的优劣,给出你的最终判断。必须选择一个明确的结论。
        """
        return self.judge.chat(prompt)

4. 重试与协商(Negotiation)

让冲突的 Agent 互相讨论,尝试达成共识。

python
class NegotiationResolver:
    def resolve(self, agent_a, agent_b, issue: str, max_rounds=3):
        history = []
        for _ in range(max_rounds):
            a_response = agent_a.negotiate(issue, history)
            history.append({"agent": "A", "msg": a_response})

            if self._is_concession(a_response):
                return a_response  # A 让步了

            b_response = agent_b.negotiate(issue, history)
            history.append({"agent": "B", "msg": b_response})

            if self._is_concession(b_response):
                return b_response  # B 让步了

        # 协商失败,交给裁判
        return ArbiterResolver().resolve(issue, history)

追问:

  • Q: 如何避免冲突发生? 预防优于治疗:① 清晰定义每个 Agent 的职责边界,减少"越界";② 使用共享状态的读写锁,避免资源竞争;③ 在任务分解阶段就消除歧义。

Q: 什么是共享黑板(Shared Blackboard)模式?

难度:⭐⭐ 进阶

答案:

共享黑板模式源自 1970 年代的 HEARSAY-II 语音识别系统。所有 Agent 通过一块"黑板"共享信息——任何 Agent 都可以在黑板上读写信息,黑板是唯一的全局数据存储。

类比理解: 办公室里的白板——每个人都可以在上面写东西、看别人写了什么、擦掉过时的信息。白板是团队共享的"大脑"。

python
class Blackboard:
    """共享黑板"""

    def __init__(self):
        self.board = {}           # 黑板内容
        self.history = []         # 修改历史
        self.watchers = []        # 监听变化的回调

    def write(self, key: str, value, agent_name: str):
        """Agent 在黑板上写信息"""
        old_value = self.board.get(key)
        self.board[key] = {
            "value": value,
            "author": agent_name,
            "timestamp": time.time()
        }
        self.history.append({
            "action": "write",
            "key": key,
            "agent": agent_name,
            "old": old_value,
            "new": value
        })
        # 通知监听者
        for watcher in self.watchers:
            watcher(key, value, agent_name)

    def read(self, key: str):
        """Agent 从黑板读信息"""
        entry = self.board.get(key)
        return entry["value"] if entry else None

    def read_all(self) -> dict:
        """读取黑板上所有信息"""
        return {k: v["value"] for k, v in self.board.items()}

    def watch(self, callback):
        """注册监听器,当黑板内容变化时触发"""
        self.watchers.append(callback)


class BlackboardAgent:
    """基于黑板的 Agent"""

    def __init__(self, name: str, blackboard: Blackboard, expertise: str):
        self.name = name
        self.bb = blackboard
        self.expertise = expertise

    def on_board_change(self, key, value, author):
        """当黑板变化时触发"""
        if author == self.name:
            return  # 忽略自己写的内容
        # 检查是否需要响应
        if self.should_respond(key, value):
            self.contribute()

    def contribute(self):
        """根据黑板当前状态做出贡献"""
        current_state = self.bb.read_all()
        result = self.think(current_state)
        self.bb.write(f"contribution_{self.name}", result, self.name)

黑板模式的典型应用场景:

python
# 多 Agent 协作解谜
bb = Blackboard()

# 线索收集 Agent 写入发现的线索
bb.write("clue_1", "嫌疑人身高约180cm", "detective_agent")
bb.write("clue_2", "案发时间是凌晨2点", "time_agent")

# 分析 Agent 读取线索并推理
clues = bb.read_all()
bb.write("suspect_list", ["张三", "李四"], "analysis_agent")

# 验证 Agent 读取嫌疑人列表并验证
suspects = bb.read("suspect_list")
bb.write("alibi_check", {"张三": "有不在场证明", "李四": "无不在场证明"}, "verify_agent")

追问:

  • Q: 黑板模式和共享状态有什么区别? 概念上很相似,但黑板模式强调"主动通知"机制——Agent 可以监听感兴趣的键,当值变化时自动触发响应。共享状态通常是被动读取。黑板模式还通常包含更丰富的元数据(作者、时间戳、历史版本)。

四、任务分解

Q: 如何把复杂任务分解为子任务?

难度:⭐⭐ 进阶

答案:

任务分解是多 Agent 系统的核心能力。分解方式直接影响系统的效率和质量。

常用的任务分解策略:

1. 递归分解(Recursive Decomposition)

把大任务拆成子任务,子任务还可以继续拆,直到每个子任务足够小。

python
class RecursiveDecomposer:
    def __init__(self, model, min_granularity=100):
        self.model = model
        self.min_granularity = min_granularity  # 子任务的最小复杂度

    def decompose(self, task: str, depth: int = 0) -> dict:
        # 评估任务复杂度
        complexity = self.estimate_complexity(task)

        if complexity <= self.min_granularity or depth >= 3:
            # 任务足够简单,不需要再分解
            return {"task": task, "type": "leaf", "children": []}

        # 用 LLM 分解任务
        subtasks = self.model.chat(f"""
            请把以下任务分解为 2-5 个子任务:
            任务:{task}

            输出格式(JSON):
            [{{"id": 1, "desc": "子任务描述", "depends_on": []}}]
        """)

        # 递归分解每个子任务
        children = []
        for st in subtasks:
            child = self.decompose(st["desc"], depth + 1)
            child["id"] = st["id"]
            child["depends_on"] = st.get("depends_on", [])
            children.append(child)

        return {"task": task, "type": "composite", "children": children}

2. 基于模板的分解(Template-Based)

为常见任务类型预定义分解模板。

python
TASK_TEMPLATES = {
    "research_report": [
        {"desc": "搜集相关资料", "agent": "search"},
        {"desc": "筛选和整理信息", "agent": "analyst"},
        {"desc": "撰写报告大纲", "agent": "writer"},
        {"desc": "填充详细内容", "agent": "writer"},
        {"desc": "审校和修改", "agent": "reviewer"},
    ],
    "code_development": [
        {"desc": "需求分析", "agent": "analyst"},
        {"desc": "架构设计", "agent": "architect"},
        {"desc": "编写代码", "agent": "coder"},
        {"desc": "编写测试", "agent": "tester"},
        {"desc": "代码审查", "agent": "reviewer"},
    ],
}

def decompose_by_template(task_type: str, task_detail: str):
    template = TASK_TEMPLATES.get(task_type)
    if not template:
        return None
    # 用任务细节填充模板
    return [{**t, "detail": task_detail} for t in template]

3. 目标分解(Goal Decomposition)

从最终目标反向推导出需要的步骤。

python
class GoalDecomposer:
    def decompose(self, goal: str) -> list:
        prompt = f"""
        最终目标:{goal}

        请从最终目标倒推,列出达成目标需要的所有步骤。
        每个步骤应该是一个可以独立验证的里程碑。

        输出格式:
        - 步骤1: xxx(验证标准:xxx)
        - 步骤2: xxx(验证标准:xxx)
        """
        return self.model.chat(prompt)

追问:

  • Q: 如何判断分解粒度是否合适? 经验法则:每个子任务应该能由一个 Agent 在一次 LLM 调用中完成。如果子任务还需要多轮对话才能完成,说明粒度还不够细;如果子任务只是一句话就能完成的微操作,说明过度分解了。

  • Q: 分解出错怎么办? 加入"验证-调整"循环:分解后先快速检查(让 LLM 评估分解是否合理),发现明显问题就重新分解。执行中发现问题也可以回到规划阶段重新分解。


Q: 什么是 DAG 依赖管理?子任务之间有依赖怎么办?

难度:⭐⭐⭐ 高级

答案:

DAG(Directed Acyclic Graph,有向无环图)是描述子任务依赖关系的标准方式。节点代表子任务,有向边代表依赖关系——"A → B"表示 B 依赖 A,必须等 A 完成才能开始 B。

类比理解: 做饭——你必须先洗菜、切菜,然后才能炒菜。"炒菜"依赖"洗菜"和"切菜"。但"洗菜"和"切菜"可以同时进行(并行)。

python
from collections import defaultdict, deque
import asyncio

class DAGScheduler:
    """DAG 任务调度器"""

    def __init__(self):
        self.tasks = {}           # task_id -> task_info
        self.dependencies = defaultdict(set)  # task_id -> set of dependency task_ids
        self.dependents = defaultdict(set)    # task_id -> set of tasks that depend on this

    def add_task(self, task_id: str, handler, **kwargs):
        self.tasks[task_id] = {"handler": handler, "kwargs": kwargs}

    def add_dependency(self, task_id: str, depends_on: str):
        """task_id 依赖 depends_on"""
        self.dependencies[task_id].add(depends_on)
        self.dependents[depends_on].add(task_id)

    def get_ready_tasks(self, completed: set) -> list:
        """获取所有依赖已满足、可以立即执行的任务"""
        ready = []
        for task_id, deps in self.dependencies.items():
            if task_id not in completed and deps.issubset(completed):
                ready.append(task_id)
        return ready

    async def execute(self) -> dict:
        """执行所有任务,自动处理并行"""
        completed = set()
        results = {}
        running = {}

        while len(completed) < len(self.tasks):
            # 找出所有可以执行的任务
            ready = self.get_ready_tasks(completed)

            if not ready and not running:
                raise RuntimeError("死锁!可能存在循环依赖")

            # 并行执行所有就绪任务
            for task_id in ready:
                if task_id not in running:
                    task = self.tasks[task_id]
                    # 获取依赖任务的结果作为输入
                    deps_results = {
                        dep: results[dep] for dep in self.dependencies[task_id]
                    }
                    running[task_id] = asyncio.create_task(
                        task["handler"](deps_results, **task["kwargs"])
                    )

            # 等待至少一个任务完成
            if running:
                done, _ = await asyncio.wait(
                    running.values(),
                    return_when=asyncio.FIRST_COMPLETED
                )
                for task_id, task in list(running.items()):
                    if task in done:
                        results[task_id] = await task
                        completed.add(task_id)
                        del running[task_id]

        return results


# 使用示例
scheduler = DAGScheduler()

async def search_task(deps):
    return "搜索结果..."

async def analysis_task(deps):
    search_result = deps["search"]
    return f"分析: {search_result}"

async def writing_task(deps):
    return f"报告: {deps['analysis']}"

async def review_task(deps):
    return f"审校: {deps['writing']}"

scheduler.add_task("search", search_task)
scheduler.add_task("analysis", analysis_task)
scheduler.add_task("writing", writing_task)
scheduler.add_task("review", review_task)

# 依赖关系:search → analysis → writing → review
# search 可以立即执行,analysis 等 search,writing 等 analysis...
scheduler.add_dependency("analysis", "search")
scheduler.add_dependency("writing", "analysis")
scheduler.add_dependency("review", "writing")

results = asyncio.run(scheduler.execute())

DAG 可视化:

    [搜索] ───→ [分析] ───→ [写报告] ───→ [审校]

    [爬数据] ───┘   (分析同时依赖搜索和爬数据)

追问:

  • Q: 如何检测循环依赖? 使用拓扑排序(Topological Sort)。如果排序后的节点数不等于总节点数,说明存在循环依赖。也可以在添加依赖时实时检测:尝试从新依赖的目标节点做 DFS,如果能回到当前节点则说明形成了环。

  • Q: 动态生成的 DAG 怎么处理? 有些任务的子任务是在运行时才知道的(比如搜索结果决定后续分析方向)。这时需要"增量式 DAG"——先执行已知任务,根据结果动态添加新节点和依赖。


Q: 如何处理子任务失败?

难度:⭐⭐ 进阶

答案:

子任务失败是多 Agent 系统中最常见的问题。必须设计完善的容错机制。

1. 重试策略(Retry)

python
import asyncio
from functools import wraps

def retry(max_attempts=3, backoff=2.0, exceptions=(Exception,)):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    wait_time = backoff ** attempt
                    print(f"[重试] {func.__name__}{attempt+1}次失败: {e},"
                          f"{wait_time}秒后重试...")
                    await asyncio.sleep(wait_time)
            raise last_exception
        return wrapper
    return decorator

class FaultTolerantAgent:
    @retry(max_attempts=3, backoff=2.0)
    async def run(self, task: str) -> str:
        result = await self._execute(task)
        if not result or len(result) < 10:
            raise ValueError("结果质量不合格,需要重试")
        return result

2. 降级策略(Fallback)

python
class FallbackAgent:
    def __init__(self, primary, fallbacks: list):
        self.primary = primary
        self.fallbacks = fallbacks

    async def run(self, task: str) -> str:
        # 尝试主 Agent
        try:
            return await self.primary.run(task)
        except Exception as e:
            print(f"主 Agent 失败: {e}")

        # 依次尝试备选 Agent
        for fallback in self.fallbacks:
            try:
                return await fallback.run(task)
            except Exception as e:
                print(f"备选 Agent 失败: {e}")

        # 所有都失败,返回默认结果
        return f"所有 Agent 都未能完成任务: {task}"

3. 补偿策略(Compensation / Saga Pattern)

当子任务失败时,撤销已完成子任务的副作用。

python
class SagaOrchestrator:
    """Saga 模式——失败时回滚已完成的步骤"""

    def __init__(self):
        self.steps = []  # [(action, compensation)]

    def add_step(self, action, compensation):
        self.steps.append((action, compensation))

    async def execute(self, context: dict):
        completed = []
        try:
            for action, compensation in self.steps:
                result = await action(context)
                context.update(result)
                completed.append((action, compensation, result))
        except Exception as e:
            print(f"步骤失败: {e},开始回滚...")
            # 逆序回滚已完成的步骤
            for action, compensation, result in reversed(completed):
                try:
                    await compensation(context, result)
                    print(f"回滚成功: {compensation.__name__}")
                except Exception as rollback_error:
                    print(f"回滚失败: {rollback_error},需要人工介入")
            raise

追问:

  • Q: 子任务失败后,应该重试还是跳过? 取决于任务的关键程度。核心任务(如"数据获取")必须重试或降级;非核心任务(如"添加装饰性图表")可以跳过。建议为每个子任务标注"关键程度"。

五、实际框架

Q: CrewAI 的核心概念?

难度:⭐⭐ 进阶

答案:

CrewAI 是一个流行的多 Agent 框架,其核心概念可以用"拍电影"来类比:

  • Agent = 演员(有角色、目标、背景故事)
  • Task = 剧本中的场景(有描述、预期输出、所属 Agent)
  • Tool = 道具(Agent 可以使用的工具)
  • Crew = 剧组(把所有元素组合在一起)
python
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, FileWriterTool

# 定义工具
search_tool = SerperDevTool()
file_tool = FileWriterTool()

# 定义 Agent
researcher = Agent(
    role="高级研究分析师",
    goal="搜集并分析关于{topic}的最新信息",
    backstory="你是一位有10年经验的研究分析师,擅长从海量信息中提取关键洞察。",
    tools=[search_tool],
    verbose=True,
    llm="gpt-4o"  # 可以为不同 Agent 指定不同模型
)

writer = Agent(
    role="技术内容专家",
    goal="将研究结果转化为清晰、易懂的技术报告",
    backstory="你是一位资深技术写手,擅长将复杂概念用通俗语言解释。",
    tools=[file_tool],
    verbose=True,
    llm="gpt-4o"
)

reviewer = Agent(
    role="质量审核专家",
    goal="确保报告的准确性、完整性和可读性",
    backstory="你是一位严谨的编辑,对细节有极高的要求。",
    verbose=True,
    llm="gpt-4o"
)

# 定义 Task
research_task = Task(
    description="搜集关于{topic}的最新研究论文、行业报告和新闻。",
    expected_output="包含关键发现、数据来源的结构化研究报告。",
    agent=researcher
)

writing_task = Task(
    description="基于研究结果,撰写一份5000字的技术报告。",
    expected_output="格式规范、逻辑清晰的技术报告。",
    agent=writer,
    context=[research_task]  # 依赖研究任务的结果
)

review_task = Task(
    description="审核报告的准确性,修正错误,优化表述。",
    expected_output="经过审核和修正的最终报告。",
    agent=reviewer,
    context=[writing_task]
)

# 组建 Crew
crew = Crew(
    agents=[researcher, writer, reviewer],
    tasks=[research_task, writing_task, review_task],
    process=Process.sequential,  # 顺序执行
    verbose=True
)

# 执行
result = crew.kickoff(inputs={"topic": "大语言模型在医疗领域的应用"})

CrewAI 的执行模式:

  • Process.sequential:任务按顺序依次执行
  • Process.hierarchical:管理者 Agent 自动分配任务

追问:

  • Q: CrewAI 的 Task 之间的 context 参数有什么作用?context 指定了当前 Task 依赖哪些前置 Task 的结果。被依赖 Task 的输出会自动注入到当前 Task 的输入中。CrewAI 会在后台做信息传递,不需要手动管理。

  • Q: CrewAI 的局限性是什么? ① 主要支持 Sequential 和 Hierarchical 模式,对复杂的 DAG 依赖支持有限;② Agent 间通信主要通过 Task 结果传递,灵活性不足;③ 调试困难,尤其是 Hierarchical 模式下;④ Token 消耗较高,因为每个 Task 都会带上完整上下文。


Q: AutoGen 的对话模式?

难度:⭐⭐ 进阶

答案:

AutoGen(微软开源)的核心理念是"Agent 之间的对话"。它把多 Agent 协作建模为对话——就像一群人在群聊中讨论问题。

核心组件:

  • ConversableAgent:可以对话的 Agent 基类
  • AssistantAgent:使用 LLM 的 Agent
  • UserProxyAgent:代表用户的 Agent(可以执行代码、请求人工输入)
python
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager

# 创建 Agent
analyst = AssistantAgent(
    name="数据分析师",
    system_message="你是一位专业的数据分析师。你擅长数据分析和可视化。",
    llm_config={"model": "gpt-4o"}
)

coder = AssistantAgent(
    name="Python专家",
    system_message="你是Python编程专家。你负责编写数据分析代码。只写代码,用 ```python 包裹。",
    llm_config={"model": "gpt-4o"}
)

reviewer = AssistantAgent(
    name="代码审查员",
    system_message="你是代码审查专家。审查代码的正确性和效率,提出改进建议。",
    llm_config={"model": "gpt-4o"}
)

# 用户代理(可以执行代码)
user_proxy = UserProxyAgent(
    name="用户",
    human_input_mode="NEVER",  # 不需要人工输入
    code_execution_config={
        "work_dir": "coding",
        "use_docker": True  # 在 Docker 中安全执行代码
    }
)

# 双人对话模式
user_proxy.initiate_chat(
    analyst,
    message="分析 sales.csv 的销售趋势,生成可视化图表。"
)

# 群聊模式(多 Agent 对话)
group_chat = GroupChat(
    agents=[user_proxy, analyst, coder, reviewer],
    messages=[],
    max_round=10,  # 最多10轮对话
    speaker_selection_method="auto"  # 自动选择下一个发言者
)

manager = GroupChatManager(groupchat=group_chat)
user_proxy.initiate_chat(
    manager,
    message="分析 sales.csv 的销售数据,用Python实现,生成图表和分析报告。"
)

AutoGen 的对话模式:

模式1:双人对话(Two-Agent Chat)
User ←→ Agent

模式2:顺序对话(Sequential Chat)
User → Agent A → Agent B → Agent C

模式3:群聊(Group Chat)
User

Agent A ←→ Agent B
 ↕           ↕
Agent C ←→ Agent D
(由 GroupChatManager 管理发言顺序)

追问:

  • Q: AutoGen 的 code_execution_config 有什么安全风险? AutoGen 可以让 Agent 执行生成的代码,这是双刃剑——强大但危险。建议:① 一定要用 Docker 隔离;② 限制可访问的文件和网络;③ 设置超时和资源限制。在生产环境中,建议禁用代码执行或使用沙箱服务。

  • Q: GroupChat 的 speaker_selection_method 有哪些选项?auto(LLM 自动选择)、round_robin(轮流发言)、random(随机选择)、自定义函数。auto 最智能但最贵,round_robin 最可预测但最死板。


Q: LangGraph 的状态机设计?

难度:⭐⭐⭐ 高级

答案:

LangGraph 是 LangChain 团队推出的框架,它把多 Agent 系统建模为"状态图"(State Graph)——节点是处理步骤,边是状态转换条件。这是最灵活的框架,但学习曲线也最陡。

类比理解: 流程图/状态机——每个方框是一个处理步骤,箭头上的条件决定下一步去哪里。可以循环、分支、并行。

python
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from typing import Literal, TypedDict
import operator

# 1. 定义状态
class AgentState(TypedDict):
    messages: list                    # 对话历史
    current_step: str                 # 当前步骤
    research_results: str             # 研究结果
    code_output: str                  # 代码输出
    review_feedback: str              # 审查反馈
    iteration: int                    # 迭代次数

# 2. 定义节点(处理函数)
def research_node(state: AgentState) -> AgentState:
    """研究节点"""
    messages = state["messages"]
    # 调用研究 Agent
    response = research_llm.invoke(messages)
    return {
        "research_results": response.content,
        "messages": [("assistant", f"[研究完成] {response.content}")]
    }

def code_node(state: AgentState) -> AgentState:
    """编码节点"""
    prompt = f"基于以下研究结果写代码:\n{state['research_results']}"
    response = code_llm.invoke([("user", prompt)])
    return {
        "code_output": response.content,
        "messages": [("assistant", f"[代码完成] {response.content}")]
    }

def review_node(state: AgentState) -> AgentState:
    """审查节点"""
    prompt = f"审查以下代码:\n{state['code_output']}"
    response = review_llm.invoke([("user", prompt)])
    return {
        "review_feedback": response.content,
        "iteration": state.get("iteration", 0) + 1,
        "messages": [("assistant", f"[审查意见] {response.content}")]
    }

# 3. 定义路由条件
def should_continue(state: AgentState) -> Literal["code", "end"]:
    """根据审查结果决定是否需要重写代码"""
    feedback = state.get("review_feedback", "")
    if "通过" in feedback or state.get("iteration", 0) >= 3:
        return "end"
    return "code"  # 不通过,回到编码节点

# 4. 构建状态图
graph = StateGraph(AgentState)

# 添加节点
graph.add_node("research", research_node)
graph.add_node("code", code_node)
graph.add_node("review", review_node)

# 添加边
graph.add_edge(START, "research")          # 开始 → 研究
graph.add_edge("research", "code")         # 研究 → 编码
graph.add_edge("code", "review")           # 编码 → 审查

# 条件边:审查后根据结果决定下一步
graph.add_conditional_edges(
    "review",
    should_continue,
    {"code": "code", "end": END}  # 不通过回编码,通过则结束
)

# 5. 编译并运行
app = graph.compile(checkpointer=MemorySaver())
result = app.invoke(
    {"messages": [("user", "分析sales.csv数据并生成可视化")]}
)

LangGraph 的核心优势——循环和条件分支:

python
# 循环模式:Agent 反复改进直到满意
# research → code → review → (不通过) → code → review → (通过) → END

# 分支模式:根据情况走不同路径
def route_by_complexity(state) -> str:
    if state["complexity"] == "simple":
        return "simple_agent"
    elif state["complexity"] == "medium":
        return "medium_agent"
    else:
        return "expert_agent"

graph.add_conditional_edges("classifier", route_by_complexity, {
    "simple_agent": simple_node,
    "medium_agent": medium_node,
    "expert_agent": expert_node,
})

追问:

  • Q: LangGraph 和 CrewAI/AutoGen 的核心区别是什么? LangGraph 底层是一个有限状态机,给了你最大的控制力——你可以精确定义每个状态转换、循环、分支。CrewAI/AutoGen 更高层抽象,开发速度快但灵活性低。如果你的任务流程简单,用 CrewAI;如果需要复杂的流程控制(如重试、分支、人工审核节点),用 LangGraph。

  • Q: LangGraph 的 Checkpointer 有什么用? Checkpointer 可以在每个节点执行后保存状态快照,实现"断点续传"。当系统崩溃或需要人工审核时,可以从最近的检查点恢复,而不是从头开始。这在长时间运行的多 Agent 任务中非常重要。


Q: 这些框架的优缺点对比?

难度:⭐⭐ 进阶

答案:

维度CrewAIAutoGenLangGraph
学习曲线⭐ 低⭐⭐ 中⭐⭐⭐ 高
灵活性⭐⭐ 中⭐⭐ 中⭐⭐⭐ 高
生产就绪度⭐⭐ 中⭐⭐ 中⭐⭐⭐ 高
调试工具⭐ 弱⭐⭐ 中⭐⭐⭐ 强
社区生态⭐⭐ 活跃⭐⭐⭐ 很活跃⭐⭐⭐ 很活跃
类型安全⭐ 弱⭐ 弱⭐⭐⭐ 强(TypedDict)
流式输出⭐⭐ 中⭐⭐ 中⭐⭐⭐ 强
人工介入⭐ 弱⭐⭐⭐ 强⭐⭐⭐ 强
代码执行❌ 不内置✅ 内置❌ 需集成
状态管理⭐ 简单⭐⭐ 中⭐⭐⭐ 强(持久化)
最佳场景快速原型对话式协作复杂生产系统

如何选择框架?

python
def choose_framework(requirements):
    """框架选型决策"""
    if requirements.get("prototype_speed") == "high":
        return "CrewAI"  # 快速原型,API 最简洁

    if requirements.get("need_code_execution"):
        return "AutoGen"  # 需要代码执行能力

    if requirements.get("complex_flow") or requirements.get("production"):
        return "LangGraph"  # 复杂流程控制,生产环境

    if requirements.get("conversational"):
        return "AutoGen"  # 对话式多 Agent

    return "LangGraph"  # 默认推荐

追问:

  • Q: 不用框架,自己手写多 Agent 系统可以吗? 当然可以。对于简单场景(2-3 个 Agent、流程固定),手写反而更轻量、更可控。只有在系统复杂到一定程度(5+ Agent、动态流程、需要持久化)时,框架才真正有价值。建议先手写理解原理,再用框架提升效率。

  • Q: 这些框架支持异步吗? LangGraph 原生支持 async;CrewAI 支持异步执行;AutoGen 的异步支持相对较弱。如果你的场景需要高并发(如多个独立的多 Agent 系统同时运行),优先选择 LangGraph。


六、实际开发

Q: 多 Agent 系统的调试难点?怎么解决?

难度:⭐⭐ 进阶

答案:

多 Agent 系统的调试是公认的痛点,主要难点:

难点 1:执行路径不确定

同一个输入,不同次运行可能走不同的 Agent 路径。这让 Bug 难以复现。

python
# 解决方案:固定随机种子 + 确定性路由
class DebuggableSystem:
    def __init__(self):
        self.trace_log = []  # 执行追踪日志

    def log(self, agent_name, action, input_data, output_data):
        self.trace_log.append({
            "timestamp": time.time(),
            "agent": agent_name,
            "action": action,
            "input": input_data,
            "output": output_data,
            "trace_id": self.current_trace_id
        })

    def run(self, task):
        self.current_trace_id = str(uuid.uuid4())
        self.trace_log = []
        result = self._execute(task)

        # 保存完整的执行追踪
        self.save_trace(self.trace_log)
        return result

难点 2:错误传播

一个 Agent 的小错误可能在后续 Agent 中被放大,但你很难找到根因。

python
# 解决方案:在每个 Agent 的输入输出处设置检查点
class AgentCheckpoint:
    def __init__(self, agent):
        self.agent = agent

    def run(self, input_data):
        # 执行前检查
        self.validate_input(input_data)

        # 执行
        output = self.agent.run(input_data)

        # 执行后检查
        self.validate_output(output)

        return output

    def validate_input(self, data):
        assert data is not None, f"[{self.agent.name}] 输入为空"
        assert len(str(data)) > 0, f"[{self.agent.name}] 输入为空字符串"

    def validate_output(self, data):
        assert data is not None, f"[{self.agent.name}] 输出为空"
        # 可以添加更多业务逻辑检查

难点 3:Token 消耗追踪

不清楚哪个 Agent 消耗了多少 Token,成本难以控制。

python
# 解决方案:Token 使用量追踪
class TokenTracker:
    def __init__(self):
        self.usage = defaultdict(lambda: {"input": 0, "output": 0, "cost": 0})

    def record(self, agent_name: str, input_tokens: int, output_tokens: int):
        self.usage[agent_name]["input"] += input_tokens
        self.usage[agent_name]["output"] += output_tokens
        cost = (input_tokens * 0.005 + output_tokens * 0.015) / 1000
        self.usage[agent_name]["cost"] += cost

    def report(self):
        total = sum(u["cost"] for u in self.usage.values())
        print(f"\n{'='*50}")
        print(f"Token 使用报告 (总成本: ${total:.4f})")
        print(f"{'='*50}")
        for name, usage in sorted(self.usage.items(), key=lambda x: -x[1]["cost"]):
            print(f"  {name}: 输入={usage['input']}, "
                  f"输出={usage['output']}, 成本=${usage['cost']:.4f}")

实用调试工具推荐:

  1. LangSmith:LangChain 的追踪平台,可以看到每个 LLM 调用的详细输入输出
  2. Phoenix (Arize):开源的 LLM 可观测性工具
  3. 自建日志系统:结构化日志 + 可视化
python
# 使用 LangSmith 追踪
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_key"
# 之后所有 LangChain/LangGraph 的调用都会自动记录

追问:

  • Q: 如何在生产环境中监控多 Agent 系统? 关键指标:① 每个 Agent 的成功率和平均耗时;② 端到端的任务完成率;③ Token 使用量和成本;④ Agent 间通信的延迟。建议使用 Prometheus + Grafana 搭建监控看板,设置告警规则。

Q: 如何控制多 Agent 的成本?

难度:⭐⭐ 进阶

答案:

多 Agent 系统的成本是单 Agent 的 N 倍(N = Agent 数量 × 平均调用次数)。控制成本至关重要。

策略 1:分层模型策略

不是所有 Agent 都需要最强的模型。简单任务用小模型,复杂任务用大模型。

python
MODEL_TIERS = {
    "premium": "gpt-4o",           # 最强,用于 Supervisor 和关键决策
    "standard": "gpt-4o-mini",     # 中等,用于常规分析和写作
    "budget": "gpt-3.5-turbo",     # 经济,用于简单分类和格式化
}

def assign_model(agent_role: str) -> str:
    """根据 Agent 角色分配模型"""
    premium_roles = {"supervisor", "judge", "architect"}
    budget_roles = {"formatter", "classifier", "router"}

    if agent_role in premium_roles:
        return MODEL_TIERS["premium"]
    elif agent_role in budget_roles:
        return MODEL_TIERS["budget"]
    else:
        return MODEL_TIERS["standard"]

策略 2:智能路由——能不用 Agent 就不用

python
class SmartRouter:
    """简单任务直接处理,复杂任务才分发给 Agent"""

    def __init__(self, simple_model, complex_system):
        self.simple_model = simple_model
        self.complex_system = complex_system

    def run(self, task: str) -> str:
        # 先用小模型评估复杂度
        complexity = self.assess_complexity(task)

        if complexity == "simple":
            # 直接用小模型回答,省去多 Agent 开销
            return self.simple_model.chat(task)
        else:
            # 交给多 Agent 系统处理
            return self.complex_system.run(task)

    def assess_complexity(self, task: str) -> str:
        prompt = f"判断以下任务的复杂度(simple/complex):{task}"
        return self.simple_model.chat(prompt).strip()

策略 3:缓存机制

相似的子任务结果可以缓存复用。

python
import hashlib

class AgentCache:
    def __init__(self, ttl=3600):
        self.cache = {}
        self.ttl = ttl

    def get(self, agent_name: str, input_data: str):
        key = self._make_key(agent_name, input_data)
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry["time"] < self.ttl:
                return entry["result"]
            del self.cache[key]
        return None

    def set(self, agent_name: str, input_data: str, result: str):
        key = self._make_key(agent_name, input_data)
        self.cache[key] = {"result": result, "time": time.time()}

    def _make_key(self, agent_name, input_data):
        return hashlib.md5(f"{agent_name}:{input_data}".encode()).hexdigest()

策略 4:提前终止

如果中间结果已经足够好,不需要继续执行后续 Agent。

python
class EarlyStoppingSystem:
    def run(self, pipeline, input_data):
        result = input_data
        for agent in pipeline:
            result = agent.run(result)

            # 检查是否已经足够好
            quality = self.assess_quality(result)
            if quality > 0.9:  # 质量已经很高
                print(f"提前终止于 {agent.name},质量已达 {quality:.2f}")
                break

        return result

追问:

  • Q: 实际项目中,多 Agent 的成本大概是单 Agent 的多少倍? 经验数据:简单的 3-Agent 系统大约是单 Agent 的 5-10 倍(因为有通信和协调的额外开销)。通过上述优化手段,可以降到 2-4 倍。关键是要做好成本监控,知道钱花在哪里。

Q: 多 Agent 的延迟优化?

难度:⭐⭐ 进阶

答案:

多 Agent 系统的延迟 = 各 Agent 处理时间 + Agent 间通信时间 + 协调开销。优化延迟的核心思路:减少串行等待,增加并行执行。

策略 1:最大化并行

python
import asyncio

class OptimizedPipeline:
    """优化后的执行流程——无依赖的任务并行执行"""

    async def run(self, dag: DAGScheduler):
        completed = set()
        results = {}

        while len(completed) < len(dag.tasks):
            ready = dag.get_ready_tasks(completed)
            if not ready:
                break

            # 并行执行所有就绪任务
            tasks = []
            for task_id in ready:
                task = dag.tasks[task_id]
                deps_results = {d: results[d] for d in dag.dependencies[task_id]}
                tasks.append(self.execute_task(task_id, task, deps_results))

            task_results = await asyncio.gather(*tasks)
            for task_id, result in zip(ready, task_results):
                results[task_id] = result
                completed.add(task_id)

        return results

策略 2:流式处理——边生成边传递

python
class StreamingAgent:
    """流式输出 Agent——不等完全生成就传递给下游"""

    async def run_streaming(self, input_data: str):
        """流式生成,每生成一段就 yield"""
        response = await self.model.chat_stream(input_data)
        buffer = ""
        async for chunk in response:
            buffer += chunk
            # 每积累一定量就传递给下游
            if len(buffer) > 200:
                yield buffer
                buffer = ""
        if buffer:
            yield buffer

策略 3:预测性执行(Speculative Execution)

预测可能需要的下一步,提前开始执行。

python
class SpeculativeSystem:
    async def run(self, task):
        # 在等待当前任务结果的同时,预测并预执行下一步
        current_task = asyncio.create_task(self.execute_current(task))

        # 预测可能的下一步
        predicted_next = self.predict_next_step(task)
        speculative_task = asyncio.create_task(
            self.warm_up(predicted_next)  # 只做预热,不做完整执行
        )

        current_result = await current_task
        # 如果预测正确,预热的结果可以直接用
        if self.prediction_correct(current_result, predicted_next):
            return await speculative_task
        else:
            speculative_task.cancel()
            return await self.execute_correct_step(current_result)

策略 4:减少 LLM 调用次数

python
# 不好的做法:每个 Agent 单独调用 LLM
# Agent A 调用 LLM → Agent B 调用 LLM → Agent C 调用 LLM
# 总共 3 次 LLM 调用

# 好的做法:合并可以合并的调用
class BatchedAgent:
    """一次 LLM 调用完成多个子任务"""
    def run(self, tasks: list) -> list:
        prompt = "请依次完成以下任务,用 === 分隔每个任务的结果:\n\n"
        for i, task in enumerate(tasks):
            prompt += f"任务{i+1}: {task}\n"
        prompt += "\n请开始:"

        response = self.model.chat(prompt)
        return response.split("===")

追问:

  • Q: 如何测量和定位延迟瓶颈? 在每个 Agent 的入口和出口记录时间戳,计算每个 Agent 的耗时占比。通常 80% 的延迟集中在 20% 的 Agent 上(帕累托法则)。优先优化耗时最长的那个 Agent——可能是它使用的工具太慢(如搜索引擎),需要换更快的工具或添加缓存。

七、实战难题

难题 1:Agent 死循环——两个 Agent 互相调用导致无限递归

难度:⭐⭐⭐ 高级

问题描述:

Supervisor 把任务分配给 Agent A,Agent A 觉得需要 Agent B 的帮助,把任务转给 Agent B,Agent B 又觉得应该让 Agent A 处理,结果陷入无限循环。

解决方案:

python
class CallDepthLimiter:
    """调用深度限制器——防止 Agent 间的无限递归"""

    def __init__(self, max_depth=5):
        self.max_depth = max_depth
        self.call_stack = []

    def enter(self, agent_name: str):
        self.call_stack.append(agent_name)
        if len(self.call_stack) > self.max_depth:
            raise RecursionError(
                f"Agent 调用深度超过限制({self.max_depth})!"
                f"调用栈: {' → '.join(self.call_stack)}"
            )
        # 检测直接循环
        if len(self.call_stack) >= 2 and self.call_stack[-1] == self.call_stack[-2]:
            raise RecursionError(
                f"检测到 Agent 自调用: {agent_name}"
            )
        # 检测间接循环 (A→B→A)
        if self.call_stack.count(agent_name) > 1:
            raise RecursionError(
                f"检测到循环调用: {' → '.join(self.call_stack)}"
            )

    def exit(self):
        self.call_stack.pop()


# 在 Supervisor 提示词中加入防循环指令
ANTI_LOOP_PROMPT = """
重要规则:
1. 不要把任务分配回给你收到任务的那个 Agent
2. 如果你已经尝试了某个 Agent 但没有得到满意结果,不要再次调用它
3. 如果连续3次尝试都失败,直接给出当前最好的答案
4. 记录你已经调用过的 Agent,避免重复调用
"""

难题 2:上下文窗口溢出——多轮对话后信息超出 Token 限制

难度:⭐⭐⭐ 高级

问题描述:

多个 Agent 共享对话历史,几轮交互后上下文超出模型的 Token 限制,导致截断或报错。

解决方案:

python
class ContextManager:
    """智能上下文管理器"""

    def __init__(self, max_tokens=8000, model="gpt-4o-mini"):
        self.max_tokens = max_tokens
        self.model = model  # 用于摘要的轻量模型

    def manage_context(self, messages: list, system_prompt: str) -> list:
        """管理上下文,确保不超出 Token 限制"""
        system_tokens = self.count_tokens(system_prompt)

        # 计算当前消息的 Token 数
        total_tokens = system_tokens + sum(
            self.count_tokens(m["content"]) for m in messages
        )

        if total_tokens <= self.max_tokens:
            return messages  # 没超限,直接返回

        # 超限了!分层压缩
        # 保留:系统提示 + 最近 N 条消息
        recent_count = 5
        recent_messages = messages[-recent_count:]
        old_messages = messages[:-recent_count]

        # 把旧消息压缩成摘要
        if old_messages:
            summary = self.summarize(old_messages)
            compressed = [
                {"role": "system", "content": f"[历史摘要] {summary}"}
            ] + recent_messages
        else:
            compressed = recent_messages

        return compressed

    def summarize(self, messages: list) -> str:
        """用轻量模型生成对话摘要"""
        text = "\n".join(f"[{m['role']}]: {m['content']}" for m in messages)
        prompt = f"请用100字概括以下对话的关键信息:\n{text}"
        return self.model.chat(prompt)

    def count_tokens(self, text: str) -> int:
        """估算 Token 数"""
        return len(text) // 2  # 粗略估算

难题 3:Agent 输出格式不稳定——有时返回 JSON,有时返回纯文本

难度:⭐⭐ 进阶

问题描述:

设计了一个 Supervisor-Worker 架构,要求 Worker Agent 返回 JSON 格式的结果以便 Supervisor 解析。但 LLM 输出格式不稳定,有时返回 JSON,有时返回 markdown 包裹的 JSON,有时直接返回自然语言。

解决方案:

python
import json
import re

class OutputParser:
    """健壮的输出解析器"""

    @staticmethod
    def parse_json(text: str) -> dict:
        """尝试多种方式解析 JSON"""
        # 方法1:直接解析
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass

        # 方法2:提取 ```json ... ``` 中的内容
        json_match = re.search(r'```json\s*(.*?)\s*```', text, re.DOTALL)
        if json_match:
            try:
                return json.loads(json_match.group(1))
            except json.JSONDecodeError:
                pass

        # 方法3:提取 { ... } 或 [ ... ]
        brace_match = re.search(r'(\{.*\}|\[.*\])', text, re.DOTALL)
        if brace_match:
            try:
                return json.loads(brace_match.group(1))
            except json.JSONDecodeError:
                pass

        # 方法4:让 LLM 帮忙提取
        return OutputParser.force_parse_with_llm(text)

    @staticmethod
    def force_parse_with_llm(text: str) -> dict:
        """最后手段:用 LLM 从自然语言中提取结构化信息"""
        prompt = f"""
        请从以下文本中提取信息,以JSON格式返回:
        {text}

        必须返回有效的JSON,不要包含其他内容。
        """
        response = llm.invoke(prompt)
        return json.loads(response.content)

难题 4:多 Agent 系统的一致性问题——并发写入导致数据冲突

难度:⭐⭐⭐ 高级

问题描述:

多个 Agent 并行执行时,同时修改共享状态(如共享的数据库或文件),导致数据覆盖或不一致。

解决方案:

python
import asyncio
from enum import Enum

class LockType(Enum):
    READ = "read"
    WRITE = "write"

class SharedResourceManager:
    """带读写锁的共享资源管理器"""

    def __init__(self):
        self.data = {}
        self._readers = 0
        self._write_lock = asyncio.Lock()
        self._read_lock = asyncio.Lock()

    async def read(self, key: str, agent_name: str) -> any:
        """读操作——允许多个 Agent 同时读"""
        async with self._read_lock:
            self._readers += 1
            if self._readers == 1:
                await self._write_lock.acquire()

        try:
            return self.data.get(key)
        finally:
            async with self._read_lock:
                self._readers -= 1
                if self._readers == 0:
                    self._write_lock.release()

    async def write(self, key: str, value: any, agent_name: str) -> bool:
        """写操作——独占访问"""
        async with self._write_lock:
            old_value = self.data.get(key)
            # 使用乐观锁:检查值是否被其他 Agent 修改过
            self.data[key] = {
                "value": value,
                "updated_by": agent_name,
                "version": self.data.get(key, {}).get("version", 0) + 1
            }
            return True

难题 5:多 Agent 系统的可观测性——出了问题不知道是谁的责任

难度:⭐⭐ 进阶

问题描述:

系统返回了一个错误结果,但不知道是哪个 Agent 出了问题,每个 Agent 都说"我做的是对的"。

解决方案:

python
import json
import time
from dataclasses import dataclass, asdict
from typing import Optional

@dataclass
class AgentTrace:
    """Agent 执行追踪记录"""
    trace_id: str
    agent_name: str
    step: int
    input_summary: str
    output_summary: str
    model: str
    tokens_used: int
    latency_ms: float
    success: bool
    error: Optional[str] = None
    parent_trace_id: Optional[str] = None  # 上游 Agent 的 trace_id

class ObservabilitySystem:
    """多 Agent 可观测性系统"""

    def __init__(self):
        self.traces = []

    def wrap_agent(self, agent):
        """包装 Agent,自动记录执行追踪"""
        original_run = agent.run

        def traced_run(input_data):
            start = time.time()
            trace_id = f"{agent.name}_{int(start * 1000)}"

            try:
                result = original_run(input_data)
                latency = (time.time() - start) * 1000

                self.traces.append(AgentTrace(
                    trace_id=trace_id,
                    agent_name=agent.name,
                    step=len(self.traces),
                    input_summary=str(input_data)[:200],
                    output_summary=str(result)[:200],
                    model=agent.model_name,
                    tokens_used=agent.last_token_count,
                    latency_ms=latency,
                    success=True
                ))
                return result

            except Exception as e:
                latency = (time.time() - start) * 1000
                self.traces.append(AgentTrace(
                    trace_id=trace_id,
                    agent_name=agent.name,
                    step=len(self.traces),
                    input_summary=str(input_data)[:200],
                    output_summary="",
                    model=agent.model_name,
                    tokens_used=0,
                    latency_ms=latency,
                    success=False,
                    error=str(e)
                ))
                raise

        agent.run = traced_run
        return agent

    def get_failure_report(self) -> str:
        """生成失败分析报告"""
        failed = [t for t in self.traces if not t.success]
        if not failed:
            return "所有 Agent 执行成功 ✅"

        report = f"❌ {len(failed)} 个 Agent 执行失败:\n"
        for trace in failed:
            report += f"""
  Agent: {trace.agent_name}
  Trace ID: {trace.trace_id}
  错误: {trace.error}
  输入: {trace.input_summary}
  耗时: {trace.latency_ms:.0f}ms
  ---
"""
        return report

总结

场景推荐架构推荐框架
快速原型Sequential / ParallelCrewAI
对话式协作对等 + GroupChatAutoGen
复杂生产系统层级 + DAGLangGraph
高价值决策Debate手写 / LangGraph
大规模任务Supervisor + DelegationLangGraph

核心设计原则:

  1. 从简单开始:先用单 Agent,不行再拆多 Agent
  2. 最小化通信:Agent 之间只传必要信息
  3. 分层模型:简单任务用小模型,关键决策用大模型
  4. 完善的可观测性:追踪每一次 LLM 调用
  5. 设计容错机制:重试、降级、超时,缺一不可
  6. 控制成本:缓存、提前终止、智能路由

最后更新:2025年6月

LLM 应用 & Agent 开发面试准备