08. 多智能体系统
面向大模型应用工程师 / Agent 开发工程师的高频面试题
一、多 Agent 基础
Q: 什么是多智能体系统?为什么需要多个 Agent?
难度:⭐ 基础
答案:
多智能体系统(Multi-Agent System, MAS)是由多个自主的 Agent 组成的系统,它们通过协作、竞争或分工来完成单个 Agent 难以独立完成的复杂任务。
类比理解: 想象一家公司——CEO 不可能一个人做所有事情。需要产品经理写需求、设计师画图、程序员写代码、测试找 Bug。每个人专注自己的领域,通过协作完成一个大项目。多智能体系统就是把这种"团队协作"模式搬到 AI 系统中。
为什么需要多个 Agent?
- 能力互补:不同 Agent 可以使用不同的工具、不同的模型、不同的提示词。一个 Agent 擅长写代码,另一个擅长做数据分析,第三个擅长生成报告。
- 复杂度分解:一个超长的 System Prompt 可能超出上下文窗口,也容易让模型"走神"。拆成多个 Agent 各司其职,每个 Agent 的提示词更聚焦。
- 质量提升:通过"自我检查"或"相互审核"机制,一个 Agent 生成内容,另一个 Agent 检查质量,类似代码审查(Code Review)。
- 并行加速:独立的子任务可以并行执行,大幅缩短总耗时。
核心思想: 分而治之(Divide and Conquer),专业化分工(Specialization)。
# 单 Agent vs 多 Agent 的直观对比
# 单 Agent:一个提示词做所有事情
single_agent_prompt = """
你是一个全能助手。请分析数据、写代码、生成报告、
检查错误、翻译成英文...
# 结果:每个任务都做得"还行",但没有一个做到极致
"""
# 多 Agent:各司其职
agents = {
"data_analyst": "你是数据分析专家,专注于数据清洗和统计分析。",
"coder": "你是 Python 专家,专注于编写高效、可维护的代码。",
"reviewer": "你是代码审查专家,专注于发现 Bug 和安全隐患。",
"translator": "你是专业翻译,专注于技术文档的中英互译。",
}
# 结果:每个任务都由最合适的"专家"完成,整体质量更高追问:
Q: 多 Agent 是不是越多越好? 不是。Agent 数量增加会带来通信开销、协调复杂度和成本的指数级增长。一般建议 3-5 个 Agent 组成一个团队,超过 10 个就需要非常精心的架构设计。遵循"奥卡姆剃刀"原则——能用单 Agent 解决的,就不要用多 Agent。
Q: 多 Agent 系统和微服务架构有什么相似之处? 非常相似。微服务是把单体应用拆成多个独立服务,通过 API 通信;多 Agent 是把一个复杂任务拆成多个 Agent,通过消息传递协作。两者都面临服务发现、负载均衡、错误处理、分布式状态管理等挑战。
Q: 多 Agent 和单 Agent 的区别?什么时候用多 Agent?
难度:⭐ 基础
答案:
| 维度 | 单 Agent | 多 Agent |
|---|---|---|
| 架构复杂度 | 低 | 高 |
| 调试难度 | 低 | 高 |
| 任务范围 | 适合明确、单一的任务 | 适合复杂、多步骤的任务 |
| Token 消耗 | 较少 | 较多(Agent 间通信消耗) |
| 容错性 | 单点失败 | 可设计冗余 |
| 可扩展性 | 差(提示词膨胀) | 好(增加新 Agent) |
什么时候用多 Agent?
- 任务需要多种专业知识:比如"帮我分析竞品并生成报告"——需要搜索、分析、写作多个环节。
- 任务需要质量保证:比如"写代码并测试"——一个 Agent 写,另一个 Agent 测试。
- 任务可以并行化:比如"同时分析 5 个数据源"——5 个 Agent 并行工作。
- 任务需要迭代改进:比如"写文章并反复修改"——写作 Agent 和审稿 Agent 多轮交互。
什么时候不需要多 Agent?
- 简单的问答、翻译、摘要等单一任务
- 对延迟极度敏感的场景(多 Agent 通信会增加延迟)
- 预算有限的场景(多 Agent = 多次 LLM 调用 = 更高成本)
# 判断是否需要多 Agent 的决策树
def should_use_multi_agent(task):
"""简单决策逻辑"""
# 问题1:任务是否可以分解为独立子任务?
subtasks = decompose(task)
if len(subtasks) <= 1:
return False # 单 Agent 就够了
# 问题2:子任务是否需要不同的专业知识?
required_skills = set(get_skills(st) for st in subtasks)
if len(required_skills) == 1:
return False # 一个 Agent 就能处理所有子任务
# 问题3:对延迟是否敏感?
if task.latency_critical:
return False # 多 Agent 通信会增加延迟
# 问题4:预算是否充足?
if task.budget < estimate_multi_agent_cost(subtasks):
return False # 成本太高
return True # 适合使用多 Agent追问:
Q: 一个 Agent 配合多个工具,和多个 Agent 各配一个工具,效果一样吗? 不完全一样。单 Agent + 多工具的优势是上下文连贯、无通信开销;但当工具数量超过 10-15 个时,模型选择工具的准确率会显著下降。多 Agent 各配少量工具,每个 Agent 的工具选择更精准,但需要协调开销。
Q: 如何在性能和成本之间权衡? 可以先用单 Agent 做 MVP,当发现质量不达标或提示词过长时再拆分成多 Agent。这是一种"渐进式复杂化"的策略。
Q: 多 Agent 系统的经典架构有哪些?
难度:⭐⭐ 进阶
答案:
多 Agent 系统有三种经典架构:
1. 层级架构(Hierarchical)
像公司的组织架构——有上级和下级。顶层是"管理者"Agent,负责分解任务和分配工作;底层是"执行者"Agent,负责具体执行。
[Supervisor Agent]
/ | \
[Agent A] [Agent B] [Agent C]
(搜索) (分析) (写作)2. 对等架构(Peer-to-Peer)
没有上下级关系,所有 Agent 地位平等,通过协商和对话完成任务。类似一群专家圆桌讨论。
[Agent A] ←→ [Agent B]
↕ ↕
[Agent C] ←→ [Agent D]3. 混合架构(Hybrid)
结合层级和对等的特点。比如在子团队内部用对等协作,团队之间用层级管理。
# 三种架构的代码示意
# 1. 层级架构
class HierarchicalSystem:
def __init__(self):
self.supervisor = SupervisorAgent()
self.workers = [SearchAgent(), AnalysisAgent(), WriterAgent()]
def run(self, task):
plan = self.supervisor.plan(task) # 管理者分解任务
results = []
for subtask, worker in zip(plan, self.workers):
results.append(worker.execute(subtask)) # 工人执行
return self.supervisor.synthesize(results) # 管理者汇总
# 2. 对等架构
class PeerSystem:
def __init__(self):
self.agents = [AgentA(), AgentB(), AgentC()]
def run(self, task):
context = {"task": task}
for round in range(max_rounds):
for agent in self.agents:
response = agent.respond(context)
context["messages"].append(response) # 所有人共享对话
if is_consensus(context):
break
return context["messages"][-1]
# 3. 混合架构
class HybridSystem:
def __init__(self):
# 研究团队:对等协作
self.research_team = PeerSubTeam([SearchAgent(), ScholarAgent()])
# 开发团队:对等协作
self.dev_team = PeerSubTeam([CoderAgent(), TestAgent()])
# 总监:层级管理
self.director = DirectorAgent([self.research_team, self.dev_team])
def run(self, task):
return self.director.orchestrate(task)追问:
Q: 实际项目中哪种架构最常用? 层级架构最常用,因为它结构清晰、易于调试和控制。对等架构更适合创意类任务(如辩论、头脑风暴),但调试困难。建议从层级架构开始,根据需要引入对等元素。
Q: 如何选择架构? 看任务特性:如果任务有明确的先后顺序和上下级关系,用层级;如果需要多方观点碰撞,用对等;如果既有分工又有协作需求,用混合。
二、协作模式
Q: 什么是 Sequential 模式?Agent 串行执行?
难度:⭐ 基础
答案:
Sequential(顺序执行)模式是最简单的多 Agent 协作方式——Agent A 的输出作为 Agent B 的输入,Agent B 的输出作为 Agent C 的输入,像流水线(Pipeline)一样依次执行。
类比理解: 工厂流水线——第一个工人加工零件,传给第二个工人打磨,再传给第三个工人上漆。每个工人的输出就是下一个工人的输入。
[Agent A: 搜索] → [Agent B: 分析] → [Agent C: 写报告]# Sequential 模式实现
class SequentialPipeline:
def __init__(self, agents: list):
self.agents = agents
def run(self, input_data: str) -> str:
current_input = input_data
for agent in self.agents:
current_input = agent.run(current_input)
print(f"[{agent.name}] 完成,输出: {current_input[:100]}...")
return current_input
# 使用示例
pipeline = SequentialPipeline([
SearchAgent(), # 第一步:搜索相关信息
AnalysisAgent(), # 第二步:分析搜索结果
WriterAgent(), # 第三步:生成报告
])
result = pipeline.run("分析2024年AI行业趋势")优点:
- 简单直观,易于理解和调试
- 上下文清晰传递,每个 Agent 看到前一个 Agent 的完整输出
- 适合有明确先后依赖的任务
缺点:
- 不支持并行,总耗时 = 所有 Agent 耗时之和
- 任一环节失败,整个流程中断
- 信息在传递过程中可能丢失(上下文窗口限制)
追问:
Q: Sequential 模式中如何控制传递的信息量? 可以在每个 Agent 执行后做一个"摘要压缩"步骤,把关键信息提取出来传递给下一个 Agent,避免上下文膨胀。也可以设计结构化的中间输出格式(如 JSON),只传递必要字段。
Q: 如果某个 Agent 执行时间很长怎么办? 可以设置超时机制和降级策略。比如搜索 Agent 超时后,使用缓存的历史结果或直接跳过,把"搜索未完成"的状态传给下游 Agent,让它们基于已有信息尽力生成结果。
Q: 什么是 Parallel 模式?Agent 并行执行?
难度:⭐⭐ 进阶
答案:
Parallel(并行执行)模式让多个 Agent 同时处理同一任务的不同方面,最后汇总结果。就像多个人同时从不同角度分析同一个问题,然后开会汇总。
类比理解: 医院会诊——同一个病人的血液检验、影像检查、心电图检查同时进行,最后由主治医生综合所有检查结果做出诊断。
import asyncio
# Parallel 模式实现
class ParallelSystem:
def __init__(self, agents: list):
self.agents = agents
async def run(self, input_data: str) -> str:
# 所有 Agent 并行执行
tasks = [agent.arun(input_data) for agent in self.agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 汇总结果
valid_results = [r for r in results if not isinstance(r, Exception)]
return self.merge(valid_results)
def merge(self, results: list) -> str:
# 可以用 LLM 来汇总,也可以简单拼接
merged = "\n\n---\n\n".join(results)
return merged
# 使用示例:同时从多个角度分析
system = ParallelSystem([
TechAnalystAgent(), # 技术角度分析
MarketAnalystAgent(), # 市场角度分析
RiskAnalystAgent(), # 风险角度分析
])
result = asyncio.run(system.run("评估GPT-5的技术影响"))更实用的实现——用 asyncio + 超时控制:
import asyncio
from typing import Optional
class RobustParallelSystem:
def __init__(self, agents: list, timeout: float = 30.0):
self.agents = agents
self.timeout = timeout
async def run(self, input_data: str) -> dict:
tasks = {}
for agent in self.agents:
task = asyncio.create_task(agent.arun(input_data))
tasks[agent.name] = task
results = {}
done, pending = await asyncio.wait(
tasks.values(),
timeout=self.timeout,
return_when=asyncio.ALL_COMPLETED
)
for name, task in tasks.items():
if task in done:
try:
results[name] = task.result()
except Exception as e:
results[name] = f"Error: {e}"
else:
task.cancel()
results[name] = "Timeout: 未在规定时间内完成"
return results追问:
Q: 并行 Agent 处理同一个输入,结果冲突怎么办? 常见策略有三种:① 投票表决(多数决);② 加权平均(可信度高的 Agent 权重大);③ 交给仲裁 Agent 做最终决定。辩论模式(Debate)就是解决冲突的一种方式。
Q: 并行模式的 Agent 之间需要通信吗? 基本的 Parallel 模式不需要——每个 Agent 独立工作,最后汇总。但有些高级变体允许 Agent 在执行过程中交换中间结果(类似分布式计算中的 All-Reduce),称为"协作并行"。
Q: 什么是 Delegation 模式?任务分解与分配?
难度:⭐⭐ 进阶
答案:
Delegation(委托)模式是指一个"规划者"Agent 把复杂任务分解成多个子任务,然后分配给不同的"执行者"Agent 去完成。核心是"谁能做什么"的匹配。
类比理解: 项目经理接到客户需求后,把需求拆成具体的工作项(User Story),然后根据团队成员的技能分配给不同的人去做。
class DelegationSystem:
def __init__(self):
self.planner = PlannerAgent() # 规划者
self.agents = {
"search": SearchAgent(),
"code": CoderAgent(),
"analysis": AnalysisAgent(),
"writing": WriterAgent(),
}
def run(self, task: str) -> str:
# 第一步:规划者分解任务
subtasks = self.planner.decompose(task)
# 输出示例:
# [
# {"id": 1, "desc": "搜索最新AI论文", "assignee": "search", "deps": []},
# {"id": 2, "desc": "分析论文技术细节", "assignee": "analysis", "deps": [1]},
# {"id": 3, "desc": "写总结报告", "assignee": "writing", "deps": [2]},
# ]
# 第二步:按依赖关系执行
results = {}
for subtask in subtasks:
# 等待依赖任务完成
deps_result = [results[dep] for dep in subtask["deps"]]
context = self.build_context(subtask, deps_result)
# 分配给对应 Agent
agent = self.agents[subtask["assignee"]]
results[subtask["id"]] = agent.run(context)
# 第三步:汇总最终结果
return self.planner.synthesize(results)
def build_context(self, subtask, deps_result):
context = f"任务: {subtask['desc']}\n"
if deps_result:
context += "前序结果:\n" + "\n".join(deps_result)
return context关键设计点:
- 任务分解提示词设计:要明确告诉规划者有哪些可用的 Agent 及其能力。
- 任务分配策略:基于 Agent 能力匹配,或让规划者自己决定。
- 结果汇总:规划者或专门的汇总 Agent 把所有子结果整合。
# Planner 的提示词设计
PLANNER_PROMPT = """
你是一个任务规划专家。你有以下可用的 Agent:
- search: 擅长搜索互联网信息、查找资料
- code: 擅长编写和调试 Python 代码
- analysis: 擅长数据分析、统计和可视化
- writing: 擅长撰写报告、文章
请把用户的任务分解为子任务,以 JSON 格式输出:
[
{"id": 1, "desc": "子任务描述", "assignee": "agent名", "deps": [依赖的任务id]}
]
注意:
- 每个子任务应该足够小,单个 Agent 可以独立完成
- 明确标注任务之间的依赖关系
- 尽量让没有依赖的任务可以并行执行
"""追问:
Q: 如果规划者分解的任务不合理怎么办? 可以加入"反思"机制:执行者完成后,让规划者检查结果是否符合预期。如果不满意,可以重新分解或调整。也可以加入"人工审核"步骤,让用户确认计划后再执行。
Q: Delegation 和 Sequential + Parallel 的区别? Delegation 是一种更高级的模式,它动态地决定任务分解和执行顺序,可以自动实现 Sequential 和 Parallel 的混合。而 Sequential 和 Parallel 是静态定义的执行模式。
Q: 什么是 Debate 模式?Agent 之间辩论决策?
难度:⭐⭐ 进阶
答案:
Debate(辩论)模式让多个 Agent 就同一个问题发表不同观点,通过多轮辩论达成共识或由裁判做出最终裁决。这种方法特别适合需要多角度思考的决策问题。
类比理解: 法庭审判——控方律师和辩方律师各自陈述观点、反驳对方,最后由法官(裁判)做出判决。通过"对抗"来逼近真相。
Round 1:
[Agent A]: 我认为应该用微服务架构,因为...
[Agent B]: 我反对,微服务会增加复杂度...
Round 2:
[Agent A]: 你说的复杂度可以通过容器化解决...
[Agent B]: 但团队规模小,维护成本太高...
Round 3:
[Judge Agent]: 综合双方观点,考虑到团队规模...class DebateSystem:
def __init__(self, debaters: list, judge: "JudgeAgent", max_rounds: int = 3):
self.debaters = debaters # 辩论者列表
self.judge = judge # 裁判
self.max_rounds = max_rounds
def run(self, question: str) -> str:
debate_history = []
for round_num in range(self.max_rounds):
round_responses = []
for debater in self.debaters:
context = self._build_context(question, debate_history, round_num)
response = debater.debate(context)
round_responses.append({
"agent": debater.name,
"round": round_num + 1,
"argument": response
})
debate_history.append(round_responses)
# 裁判做出最终裁决
verdict = self.judge.judge(question, debate_history)
return verdict
def _build_context(self, question, history, current_round):
context = f"问题: {question}\n\n"
for round_num, round_data in enumerate(history):
context += f"--- 第{round_num + 1}轮 ---\n"
for item in round_data:
context += f"[{item['agent']}]: {item['argument']}\n"
if current_round > 0:
context += "\n请针对前面的观点进行反驳或补充,提出你的论点。"
else:
context += "\n请提出你的初始论点。"
return contextDebate 模式的变体:
- 双人辩论:两个 Agent 正反方对抗
- 多人圆桌:多个 Agent 各抒己见
- 自我辩论:同一个 Agent 扮演正反两个角色(Self-Play)
- 陪审团模式:多个"陪审员" Agent 投票表决
# Self-Debate:一个 Agent 自己跟自己辩论
class SelfDebateAgent:
def __init__(self, model):
self.model = model
def run(self, question: str) -> str:
# 正方
pro = self.model.chat(f"请支持以下观点:{question}")
# 反方
con = self.model.chat(f"请反对以下观点:{question}\n对方的论点:{pro}")
# 最终判断
verdict = self.model.chat(f"""
问题:{question}
正方论点:{pro}
反方论点:{con}
请综合双方观点,给出你的最终判断和理由。
""")
return verdict追问:
Q: Debate 模式会不会导致"废话连篇"? 确实有这个风险。解决方案:① 限制轮数(通常 2-3 轮就够了);② 要求每轮必须提出新论点,不能重复;③ 裁判设置质量标准,不达标的论点直接忽略。
Q: Debate 模式的成本如何? 成本较高——每个 Agent 每轮都是一次 LLM 背调。3 个 Agent 辩论 3 轮 = 9 次 LLM 调用 + 1 次裁判调用 = 10 次调用。适合高价值决策(如架构选型、投资建议),不适合日常任务。
Q: 什么是 Supervisor 模式?一个 Agent 管理其他 Agent?
难度:⭐⭐ 进阶
答案:
Supervisor(管理者)模式是最常用的多 Agent 架构。一个"管理者" Agent 负责理解用户意图、选择合适的"工作" Agent、传递任务、汇总结果。它本质上是一个"智能路由器"。
类比理解: 餐厅经理——顾客点菜后,经理根据菜品类型分配给不同的厨师(中餐厨师、西餐厨师、甜点师),厨师做好后由经理汇总上菜。
class SupervisorSystem:
def __init__(self, workers: dict, model):
self.workers = workers # {"name": Agent}
self.model = model
self.tools = self._build_routing_tools()
def _build_routing_tools(self):
"""把每个 worker 封装成 supervisor 可调用的工具"""
tools = []
for name, worker in self.workers.items():
tools.append({
"name": f"delegate_to_{name}",
"description": worker.description,
"parameters": {"task": "string"}
})
return tools
def run(self, user_input: str) -> str:
messages = [
{"role": "system", "content": self._supervisor_prompt()},
{"role": "user", "content": user_input}
]
while True:
# Supervisor 决定下一步动作
response = self.model.chat(messages, tools=self.tools)
if response.tool_calls:
# 调用对应的 worker
for call in response.tool_calls:
worker = self.workers[call.function.name.replace("delegate_to_", "")]
result = worker.run(call.function.arguments["task"])
messages.append({"role": "tool", "content": result})
else:
# Supervisor 给出最终回答
return response.content
def _supervisor_prompt(self):
worker_desc = "\n".join(
f"- {name}: {w.description}" for name, w in self.workers.items()
)
return f"""
你是一个任务管理者。你可以将任务委派给以下专家:
{worker_desc}
规则:
1. 分析用户需求,选择最合适的专家来处理
2. 如果需要多个专家协作,依次调用
3. 汇总所有专家的结果,给出最终回答
4. 如果任务很简单,你可以直接回答,不需要委派
"""Supervisor 模式在 LangGraph 中的实现:
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import create_react_agent
# 定义各个 worker agent
research_agent = create_react_agent(model, tools=[search_tool, wiki_tool])
code_agent = create_react_agent(model, tools=[python_tool])
# Supervisor 的路由逻辑
def supervisor_node(state: MessagesState):
"""根据对话内容决定路由到哪个 agent"""
messages = state["messages"]
last_message = messages[-1].content
if "代码" in last_message or "程序" in last_message:
return "code_agent"
elif "搜索" in last_message or "查找" in last_message:
return "research_agent"
else:
return END
# 构建图
graph = StateGraph(MessagesState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("research_agent", research_agent)
graph.add_node("code_agent", code_agent)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_node)
graph.add_edge("research_agent", "supervisor")
graph.add_edge("code_agent", "supervisor")追问:
Q: Supervisor 怎么知道该调用哪个 Agent? 两种方式:① 基于规则的路由(关键词匹配,快但不灵活);② 基于 LLM 的路由(让 Supervisor 模型自己判断,灵活但慢且有成本)。实际项目中常把两者结合——简单情况用规则,复杂情况用 LLM。
Q: Supervisor 模式的瓶颈在哪里? Supervisor Agent 本身。如果 Supervisor 的能力不够(模型不够强或提示词不够好),它可能会错误地分配任务或遗漏必要的步骤。建议 Supervisor 使用能力最强的模型。
三、通信与协调
Q: Agent 之间如何通信?有哪些消息传递机制?
难度:⭐⭐ 进阶
答案:
Agent 之间的通信方式决定了系统的灵活性和效率。主要有以下几种机制:
1. 直接消息传递(Direct Messaging)
Agent A 直接调用 Agent B,类似函数调用。最简单但耦合度高。
# 直接调用
class AgentA:
def __init__(self, agent_b):
self.agent_b = agent_b # 直接持有引用
def run(self, task):
sub_result = self.agent_b.run("请帮我查一下...") # 直接调用
return f"基于B的结果: {sub_result}"2. 消息队列(Message Queue)
Agent 通过消息队列异步通信,解耦发送方和接收方。
import asyncio
from collections import defaultdict
class MessageBus:
"""简单的消息总线"""
def __init__(self):
self.queues = defaultdict(asyncio.Queue)
async def send(self, to: str, message: dict):
await self.queues[to].put(message)
async def receive(self, agent_name: str) -> dict:
return await self.queues[agent_name].get()
# Agent 通过消息总线通信
class AsyncAgent:
def __init__(self, name: str, bus: MessageBus):
self.name = name
self.bus = bus
async def send_to(self, target: str, content: str):
await self.bus.send(target, {
"from": self.name,
"content": content
})
async def receive(self) -> dict:
return await self.bus.receive(self.name)3. 共享状态(Shared State)
所有 Agent 读写同一个共享状态对象,类似全局变量。
class SharedState:
def __init__(self):
self.data = {}
self.lock = asyncio.Lock()
async def update(self, key: str, value, agent_name: str):
async with self.lock:
self.data[key] = {
"value": value,
"updated_by": agent_name,
"timestamp": time.time()
}
def get(self, key: str):
return self.data.get(key)4. 事件驱动(Event-Driven)
Agent 发布事件,其他 Agent 订阅感兴趣的事件。
class EventBus:
def __init__(self):
self.subscribers = defaultdict(list)
def subscribe(self, event_type: str, handler):
self.subscribers[event_type].append(handler)
async def publish(self, event_type: str, data: dict):
for handler in self.subscribers[event_type]:
await handler(data)
# 使用
bus = EventBus()
bus.subscribe("search_complete", analysis_agent.on_search_complete)
bus.subscribe("analysis_done", writer_agent.on_analysis_done)
await bus.publish("search_complete", {"results": [...]})| 通信方式 | 耦合度 | 复杂度 | 适用场景 |
|---|---|---|---|
| 直接调用 | 高 | 低 | 简单的两 Agent 协作 |
| 消息队列 | 低 | 中 | 异步、解耦的多 Agent |
| 共享状态 | 中 | 中 | 需要共享上下文的场景 |
| 事件驱动 | 低 | 高 | 复杂的动态协作 |
追问:
Q: 实际开发中用哪种方式最多? 大多数框架(如 CrewAI、AutoGen)默认使用"共享对话历史"——所有 Agent 共享同一个消息列表,这本质上是一种简化的共享状态。对于简单场景足够用,但在 Agent 数量增多时会遇到上下文膨胀的问题。
Q: 如何保证消息的可靠性? 生产环境中需要考虑:消息重试(失败后重发)、消息去重(防止重复处理)、死信队列(无法处理的消息暂存)、超时机制(防止无限等待)。
Q: 如何实现 Agent 之间的信息共享?
难度:⭐⭐ 进阶
答案:
信息共享是多 Agent 系统的核心挑战。共享太多信息会造成上下文膨胀和隐私泄露;共享太少又会导致 Agent 缺乏必要的背景信息。
核心原则:最小必要信息原则(Need-to-Know Basis)
class InformationBroker:
"""信息经纪人——控制 Agent 间的信息流动"""
def __init__(self):
self.global_context = {} # 全局共享信息
self.agent_contexts = {} # 各 Agent 的私有信息
self.access_control = {} # 访问控制
def set_global(self, key: str, value):
"""设置全局可见信息"""
self.global_context[key] = value
def set_agent_context(self, agent_name: str, key: str, value):
"""设置 Agent 私有信息"""
if agent_name not in self.agent_contexts:
self.agent_contexts[agent_name] = {}
self.agent_contexts[agent_name][key] = value
def get_context_for(self, agent_name: str) -> dict:
"""获取某个 Agent 应该看到的所有信息"""
context = dict(self.global_context)
# 添加该 Agent 的私有信息
if agent_name in self.agent_contexts:
context.update(self.agent_contexts[agent_name])
# 添加其他 Agent 的摘要信息(非完整信息)
for other_name, other_ctx in self.agent_contexts.items():
if other_name != agent_name:
context[f"{other_name}_summary"] = self._summarize(other_ctx)
return context
def _summarize(self, context: dict) -> str:
"""将上下文压缩为摘要,减少信息量"""
return " | ".join(f"{k}: {str(v)[:50]}" for k, v in context.items())信息共享的三种粒度:
# 1. 完整共享(简单但浪费)
def full_sharing(agents):
shared_history = []
for agent in agents:
result = agent.run(shared_history)
shared_history.append({"agent": agent.name, "result": result})
# 所有 Agent 都能看到所有历史
# 2. 摘要共享(压缩信息)
def summary_sharing(agents):
summaries = {}
for agent in agents:
result = agent.run(summaries)
summaries[agent.name] = llm_summarize(result) # 用 LLM 压缩
# Agent 只看到其他 Agent 的摘要
# 3. 结构化共享(精确控制)
def structured_sharing(agents):
shared_data = {"facts": [], "decisions": [], "open_questions": []}
for agent in agents:
result = agent.run(shared_data)
# Agent 输出结构化信息
parsed = parse_structured_output(result)
shared_data["facts"].extend(parsed.get("new_facts", []))
shared_data["decisions"].extend(parsed.get("decisions", []))追问:
- Q: 如何处理信息冲突?比如 Agent A 说"股价涨了",Agent B 说"股价跌了"。 需要引入"信息可信度"机制:① 标注信息来源和时间戳;② 对冲突信息进行验证(调用权威数据源);③ 让 Supervisor 裁决冲突。
Q: 如何处理 Agent 之间的冲突?
难度:⭐⭐ 进阶
答案:
多 Agent 系统中冲突不可避免——两个 Agent 可能对同一个问题给出矛盾的结论,或者争抢同一个资源。处理冲突的策略:
1. 优先级机制(Priority)
给 Agent 设置优先级,冲突时高优先级 Agent 的意见胜出。
class PriorityResolver:
def resolve(self, conflicting_results: list) -> dict:
# 按优先级排序
sorted_results = sorted(
conflicting_results,
key=lambda x: x["agent_priority"],
reverse=True
)
return sorted_results[0] # 返回最高优先级的结果2. 投票机制(Voting)
多个 Agent 投票,少数服从多数。
class VotingResolver:
def resolve(self, votes: list) -> str:
from collections import Counter
vote_counts = Counter(v["decision"] for v in votes)
return vote_counts.most_common(1)[0][0] # 返回票数最多的选项3. 裁判机制(Arbiter)
引入一个中立的裁判 Agent 做最终决定。
class ArbiterResolver:
def __init__(self, judge_model):
self.judge = judge_model
def resolve(self, question: str, conflicting_views: list) -> str:
views_text = "\n".join(
f"[{v['agent']}]: {v['argument']}" for v in conflicting_views
)
prompt = f"""
问题:{question}
不同观点:
{views_text}
请综合分析各观点的优劣,给出你的最终判断。必须选择一个明确的结论。
"""
return self.judge.chat(prompt)4. 重试与协商(Negotiation)
让冲突的 Agent 互相讨论,尝试达成共识。
class NegotiationResolver:
def resolve(self, agent_a, agent_b, issue: str, max_rounds=3):
history = []
for _ in range(max_rounds):
a_response = agent_a.negotiate(issue, history)
history.append({"agent": "A", "msg": a_response})
if self._is_concession(a_response):
return a_response # A 让步了
b_response = agent_b.negotiate(issue, history)
history.append({"agent": "B", "msg": b_response})
if self._is_concession(b_response):
return b_response # B 让步了
# 协商失败,交给裁判
return ArbiterResolver().resolve(issue, history)追问:
- Q: 如何避免冲突发生? 预防优于治疗:① 清晰定义每个 Agent 的职责边界,减少"越界";② 使用共享状态的读写锁,避免资源竞争;③ 在任务分解阶段就消除歧义。
Q: 什么是共享黑板(Shared Blackboard)模式?
难度:⭐⭐ 进阶
答案:
共享黑板模式源自 1970 年代的 HEARSAY-II 语音识别系统。所有 Agent 通过一块"黑板"共享信息——任何 Agent 都可以在黑板上读写信息,黑板是唯一的全局数据存储。
类比理解: 办公室里的白板——每个人都可以在上面写东西、看别人写了什么、擦掉过时的信息。白板是团队共享的"大脑"。
class Blackboard:
"""共享黑板"""
def __init__(self):
self.board = {} # 黑板内容
self.history = [] # 修改历史
self.watchers = [] # 监听变化的回调
def write(self, key: str, value, agent_name: str):
"""Agent 在黑板上写信息"""
old_value = self.board.get(key)
self.board[key] = {
"value": value,
"author": agent_name,
"timestamp": time.time()
}
self.history.append({
"action": "write",
"key": key,
"agent": agent_name,
"old": old_value,
"new": value
})
# 通知监听者
for watcher in self.watchers:
watcher(key, value, agent_name)
def read(self, key: str):
"""Agent 从黑板读信息"""
entry = self.board.get(key)
return entry["value"] if entry else None
def read_all(self) -> dict:
"""读取黑板上所有信息"""
return {k: v["value"] for k, v in self.board.items()}
def watch(self, callback):
"""注册监听器,当黑板内容变化时触发"""
self.watchers.append(callback)
class BlackboardAgent:
"""基于黑板的 Agent"""
def __init__(self, name: str, blackboard: Blackboard, expertise: str):
self.name = name
self.bb = blackboard
self.expertise = expertise
def on_board_change(self, key, value, author):
"""当黑板变化时触发"""
if author == self.name:
return # 忽略自己写的内容
# 检查是否需要响应
if self.should_respond(key, value):
self.contribute()
def contribute(self):
"""根据黑板当前状态做出贡献"""
current_state = self.bb.read_all()
result = self.think(current_state)
self.bb.write(f"contribution_{self.name}", result, self.name)黑板模式的典型应用场景:
# 多 Agent 协作解谜
bb = Blackboard()
# 线索收集 Agent 写入发现的线索
bb.write("clue_1", "嫌疑人身高约180cm", "detective_agent")
bb.write("clue_2", "案发时间是凌晨2点", "time_agent")
# 分析 Agent 读取线索并推理
clues = bb.read_all()
bb.write("suspect_list", ["张三", "李四"], "analysis_agent")
# 验证 Agent 读取嫌疑人列表并验证
suspects = bb.read("suspect_list")
bb.write("alibi_check", {"张三": "有不在场证明", "李四": "无不在场证明"}, "verify_agent")追问:
- Q: 黑板模式和共享状态有什么区别? 概念上很相似,但黑板模式强调"主动通知"机制——Agent 可以监听感兴趣的键,当值变化时自动触发响应。共享状态通常是被动读取。黑板模式还通常包含更丰富的元数据(作者、时间戳、历史版本)。
四、任务分解
Q: 如何把复杂任务分解为子任务?
难度:⭐⭐ 进阶
答案:
任务分解是多 Agent 系统的核心能力。分解方式直接影响系统的效率和质量。
常用的任务分解策略:
1. 递归分解(Recursive Decomposition)
把大任务拆成子任务,子任务还可以继续拆,直到每个子任务足够小。
class RecursiveDecomposer:
def __init__(self, model, min_granularity=100):
self.model = model
self.min_granularity = min_granularity # 子任务的最小复杂度
def decompose(self, task: str, depth: int = 0) -> dict:
# 评估任务复杂度
complexity = self.estimate_complexity(task)
if complexity <= self.min_granularity or depth >= 3:
# 任务足够简单,不需要再分解
return {"task": task, "type": "leaf", "children": []}
# 用 LLM 分解任务
subtasks = self.model.chat(f"""
请把以下任务分解为 2-5 个子任务:
任务:{task}
输出格式(JSON):
[{{"id": 1, "desc": "子任务描述", "depends_on": []}}]
""")
# 递归分解每个子任务
children = []
for st in subtasks:
child = self.decompose(st["desc"], depth + 1)
child["id"] = st["id"]
child["depends_on"] = st.get("depends_on", [])
children.append(child)
return {"task": task, "type": "composite", "children": children}2. 基于模板的分解(Template-Based)
为常见任务类型预定义分解模板。
TASK_TEMPLATES = {
"research_report": [
{"desc": "搜集相关资料", "agent": "search"},
{"desc": "筛选和整理信息", "agent": "analyst"},
{"desc": "撰写报告大纲", "agent": "writer"},
{"desc": "填充详细内容", "agent": "writer"},
{"desc": "审校和修改", "agent": "reviewer"},
],
"code_development": [
{"desc": "需求分析", "agent": "analyst"},
{"desc": "架构设计", "agent": "architect"},
{"desc": "编写代码", "agent": "coder"},
{"desc": "编写测试", "agent": "tester"},
{"desc": "代码审查", "agent": "reviewer"},
],
}
def decompose_by_template(task_type: str, task_detail: str):
template = TASK_TEMPLATES.get(task_type)
if not template:
return None
# 用任务细节填充模板
return [{**t, "detail": task_detail} for t in template]3. 目标分解(Goal Decomposition)
从最终目标反向推导出需要的步骤。
class GoalDecomposer:
def decompose(self, goal: str) -> list:
prompt = f"""
最终目标:{goal}
请从最终目标倒推,列出达成目标需要的所有步骤。
每个步骤应该是一个可以独立验证的里程碑。
输出格式:
- 步骤1: xxx(验证标准:xxx)
- 步骤2: xxx(验证标准:xxx)
"""
return self.model.chat(prompt)追问:
Q: 如何判断分解粒度是否合适? 经验法则:每个子任务应该能由一个 Agent 在一次 LLM 调用中完成。如果子任务还需要多轮对话才能完成,说明粒度还不够细;如果子任务只是一句话就能完成的微操作,说明过度分解了。
Q: 分解出错怎么办? 加入"验证-调整"循环:分解后先快速检查(让 LLM 评估分解是否合理),发现明显问题就重新分解。执行中发现问题也可以回到规划阶段重新分解。
Q: 什么是 DAG 依赖管理?子任务之间有依赖怎么办?
难度:⭐⭐⭐ 高级
答案:
DAG(Directed Acyclic Graph,有向无环图)是描述子任务依赖关系的标准方式。节点代表子任务,有向边代表依赖关系——"A → B"表示 B 依赖 A,必须等 A 完成才能开始 B。
类比理解: 做饭——你必须先洗菜、切菜,然后才能炒菜。"炒菜"依赖"洗菜"和"切菜"。但"洗菜"和"切菜"可以同时进行(并行)。
from collections import defaultdict, deque
import asyncio
class DAGScheduler:
"""DAG 任务调度器"""
def __init__(self):
self.tasks = {} # task_id -> task_info
self.dependencies = defaultdict(set) # task_id -> set of dependency task_ids
self.dependents = defaultdict(set) # task_id -> set of tasks that depend on this
def add_task(self, task_id: str, handler, **kwargs):
self.tasks[task_id] = {"handler": handler, "kwargs": kwargs}
def add_dependency(self, task_id: str, depends_on: str):
"""task_id 依赖 depends_on"""
self.dependencies[task_id].add(depends_on)
self.dependents[depends_on].add(task_id)
def get_ready_tasks(self, completed: set) -> list:
"""获取所有依赖已满足、可以立即执行的任务"""
ready = []
for task_id, deps in self.dependencies.items():
if task_id not in completed and deps.issubset(completed):
ready.append(task_id)
return ready
async def execute(self) -> dict:
"""执行所有任务,自动处理并行"""
completed = set()
results = {}
running = {}
while len(completed) < len(self.tasks):
# 找出所有可以执行的任务
ready = self.get_ready_tasks(completed)
if not ready and not running:
raise RuntimeError("死锁!可能存在循环依赖")
# 并行执行所有就绪任务
for task_id in ready:
if task_id not in running:
task = self.tasks[task_id]
# 获取依赖任务的结果作为输入
deps_results = {
dep: results[dep] for dep in self.dependencies[task_id]
}
running[task_id] = asyncio.create_task(
task["handler"](deps_results, **task["kwargs"])
)
# 等待至少一个任务完成
if running:
done, _ = await asyncio.wait(
running.values(),
return_when=asyncio.FIRST_COMPLETED
)
for task_id, task in list(running.items()):
if task in done:
results[task_id] = await task
completed.add(task_id)
del running[task_id]
return results
# 使用示例
scheduler = DAGScheduler()
async def search_task(deps):
return "搜索结果..."
async def analysis_task(deps):
search_result = deps["search"]
return f"分析: {search_result}"
async def writing_task(deps):
return f"报告: {deps['analysis']}"
async def review_task(deps):
return f"审校: {deps['writing']}"
scheduler.add_task("search", search_task)
scheduler.add_task("analysis", analysis_task)
scheduler.add_task("writing", writing_task)
scheduler.add_task("review", review_task)
# 依赖关系:search → analysis → writing → review
# search 可以立即执行,analysis 等 search,writing 等 analysis...
scheduler.add_dependency("analysis", "search")
scheduler.add_dependency("writing", "analysis")
scheduler.add_dependency("review", "writing")
results = asyncio.run(scheduler.execute())DAG 可视化:
[搜索] ───→ [分析] ───→ [写报告] ───→ [审校]
↑
[爬数据] ───┘ (分析同时依赖搜索和爬数据)追问:
Q: 如何检测循环依赖? 使用拓扑排序(Topological Sort)。如果排序后的节点数不等于总节点数,说明存在循环依赖。也可以在添加依赖时实时检测:尝试从新依赖的目标节点做 DFS,如果能回到当前节点则说明形成了环。
Q: 动态生成的 DAG 怎么处理? 有些任务的子任务是在运行时才知道的(比如搜索结果决定后续分析方向)。这时需要"增量式 DAG"——先执行已知任务,根据结果动态添加新节点和依赖。
Q: 如何处理子任务失败?
难度:⭐⭐ 进阶
答案:
子任务失败是多 Agent 系统中最常见的问题。必须设计完善的容错机制。
1. 重试策略(Retry)
import asyncio
from functools import wraps
def retry(max_attempts=3, backoff=2.0, exceptions=(Exception,)):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
wait_time = backoff ** attempt
print(f"[重试] {func.__name__} 第{attempt+1}次失败: {e},"
f"{wait_time}秒后重试...")
await asyncio.sleep(wait_time)
raise last_exception
return wrapper
return decorator
class FaultTolerantAgent:
@retry(max_attempts=3, backoff=2.0)
async def run(self, task: str) -> str:
result = await self._execute(task)
if not result or len(result) < 10:
raise ValueError("结果质量不合格,需要重试")
return result2. 降级策略(Fallback)
class FallbackAgent:
def __init__(self, primary, fallbacks: list):
self.primary = primary
self.fallbacks = fallbacks
async def run(self, task: str) -> str:
# 尝试主 Agent
try:
return await self.primary.run(task)
except Exception as e:
print(f"主 Agent 失败: {e}")
# 依次尝试备选 Agent
for fallback in self.fallbacks:
try:
return await fallback.run(task)
except Exception as e:
print(f"备选 Agent 失败: {e}")
# 所有都失败,返回默认结果
return f"所有 Agent 都未能完成任务: {task}"3. 补偿策略(Compensation / Saga Pattern)
当子任务失败时,撤销已完成子任务的副作用。
class SagaOrchestrator:
"""Saga 模式——失败时回滚已完成的步骤"""
def __init__(self):
self.steps = [] # [(action, compensation)]
def add_step(self, action, compensation):
self.steps.append((action, compensation))
async def execute(self, context: dict):
completed = []
try:
for action, compensation in self.steps:
result = await action(context)
context.update(result)
completed.append((action, compensation, result))
except Exception as e:
print(f"步骤失败: {e},开始回滚...")
# 逆序回滚已完成的步骤
for action, compensation, result in reversed(completed):
try:
await compensation(context, result)
print(f"回滚成功: {compensation.__name__}")
except Exception as rollback_error:
print(f"回滚失败: {rollback_error},需要人工介入")
raise追问:
- Q: 子任务失败后,应该重试还是跳过? 取决于任务的关键程度。核心任务(如"数据获取")必须重试或降级;非核心任务(如"添加装饰性图表")可以跳过。建议为每个子任务标注"关键程度"。
五、实际框架
Q: CrewAI 的核心概念?
难度:⭐⭐ 进阶
答案:
CrewAI 是一个流行的多 Agent 框架,其核心概念可以用"拍电影"来类比:
- Agent = 演员(有角色、目标、背景故事)
- Task = 剧本中的场景(有描述、预期输出、所属 Agent)
- Tool = 道具(Agent 可以使用的工具)
- Crew = 剧组(把所有元素组合在一起)
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, FileWriterTool
# 定义工具
search_tool = SerperDevTool()
file_tool = FileWriterTool()
# 定义 Agent
researcher = Agent(
role="高级研究分析师",
goal="搜集并分析关于{topic}的最新信息",
backstory="你是一位有10年经验的研究分析师,擅长从海量信息中提取关键洞察。",
tools=[search_tool],
verbose=True,
llm="gpt-4o" # 可以为不同 Agent 指定不同模型
)
writer = Agent(
role="技术内容专家",
goal="将研究结果转化为清晰、易懂的技术报告",
backstory="你是一位资深技术写手,擅长将复杂概念用通俗语言解释。",
tools=[file_tool],
verbose=True,
llm="gpt-4o"
)
reviewer = Agent(
role="质量审核专家",
goal="确保报告的准确性、完整性和可读性",
backstory="你是一位严谨的编辑,对细节有极高的要求。",
verbose=True,
llm="gpt-4o"
)
# 定义 Task
research_task = Task(
description="搜集关于{topic}的最新研究论文、行业报告和新闻。",
expected_output="包含关键发现、数据来源的结构化研究报告。",
agent=researcher
)
writing_task = Task(
description="基于研究结果,撰写一份5000字的技术报告。",
expected_output="格式规范、逻辑清晰的技术报告。",
agent=writer,
context=[research_task] # 依赖研究任务的结果
)
review_task = Task(
description="审核报告的准确性,修正错误,优化表述。",
expected_output="经过审核和修正的最终报告。",
agent=reviewer,
context=[writing_task]
)
# 组建 Crew
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, writing_task, review_task],
process=Process.sequential, # 顺序执行
verbose=True
)
# 执行
result = crew.kickoff(inputs={"topic": "大语言模型在医疗领域的应用"})CrewAI 的执行模式:
Process.sequential:任务按顺序依次执行Process.hierarchical:管理者 Agent 自动分配任务
追问:
Q: CrewAI 的 Task 之间的 context 参数有什么作用?
context指定了当前 Task 依赖哪些前置 Task 的结果。被依赖 Task 的输出会自动注入到当前 Task 的输入中。CrewAI 会在后台做信息传递,不需要手动管理。Q: CrewAI 的局限性是什么? ① 主要支持 Sequential 和 Hierarchical 模式,对复杂的 DAG 依赖支持有限;② Agent 间通信主要通过 Task 结果传递,灵活性不足;③ 调试困难,尤其是 Hierarchical 模式下;④ Token 消耗较高,因为每个 Task 都会带上完整上下文。
Q: AutoGen 的对话模式?
难度:⭐⭐ 进阶
答案:
AutoGen(微软开源)的核心理念是"Agent 之间的对话"。它把多 Agent 协作建模为对话——就像一群人在群聊中讨论问题。
核心组件:
- ConversableAgent:可以对话的 Agent 基类
- AssistantAgent:使用 LLM 的 Agent
- UserProxyAgent:代表用户的 Agent(可以执行代码、请求人工输入)
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
# 创建 Agent
analyst = AssistantAgent(
name="数据分析师",
system_message="你是一位专业的数据分析师。你擅长数据分析和可视化。",
llm_config={"model": "gpt-4o"}
)
coder = AssistantAgent(
name="Python专家",
system_message="你是Python编程专家。你负责编写数据分析代码。只写代码,用 ```python 包裹。",
llm_config={"model": "gpt-4o"}
)
reviewer = AssistantAgent(
name="代码审查员",
system_message="你是代码审查专家。审查代码的正确性和效率,提出改进建议。",
llm_config={"model": "gpt-4o"}
)
# 用户代理(可以执行代码)
user_proxy = UserProxyAgent(
name="用户",
human_input_mode="NEVER", # 不需要人工输入
code_execution_config={
"work_dir": "coding",
"use_docker": True # 在 Docker 中安全执行代码
}
)
# 双人对话模式
user_proxy.initiate_chat(
analyst,
message="分析 sales.csv 的销售趋势,生成可视化图表。"
)
# 群聊模式(多 Agent 对话)
group_chat = GroupChat(
agents=[user_proxy, analyst, coder, reviewer],
messages=[],
max_round=10, # 最多10轮对话
speaker_selection_method="auto" # 自动选择下一个发言者
)
manager = GroupChatManager(groupchat=group_chat)
user_proxy.initiate_chat(
manager,
message="分析 sales.csv 的销售数据,用Python实现,生成图表和分析报告。"
)AutoGen 的对话模式:
模式1:双人对话(Two-Agent Chat)
User ←→ Agent
模式2:顺序对话(Sequential Chat)
User → Agent A → Agent B → Agent C
模式3:群聊(Group Chat)
User
↕
Agent A ←→ Agent B
↕ ↕
Agent C ←→ Agent D
(由 GroupChatManager 管理发言顺序)追问:
Q: AutoGen 的 code_execution_config 有什么安全风险? AutoGen 可以让 Agent 执行生成的代码,这是双刃剑——强大但危险。建议:① 一定要用 Docker 隔离;② 限制可访问的文件和网络;③ 设置超时和资源限制。在生产环境中,建议禁用代码执行或使用沙箱服务。
Q: GroupChat 的 speaker_selection_method 有哪些选项?
auto(LLM 自动选择)、round_robin(轮流发言)、random(随机选择)、自定义函数。auto最智能但最贵,round_robin最可预测但最死板。
Q: LangGraph 的状态机设计?
难度:⭐⭐⭐ 高级
答案:
LangGraph 是 LangChain 团队推出的框架,它把多 Agent 系统建模为"状态图"(State Graph)——节点是处理步骤,边是状态转换条件。这是最灵活的框架,但学习曲线也最陡。
类比理解: 流程图/状态机——每个方框是一个处理步骤,箭头上的条件决定下一步去哪里。可以循环、分支、并行。
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from typing import Literal, TypedDict
import operator
# 1. 定义状态
class AgentState(TypedDict):
messages: list # 对话历史
current_step: str # 当前步骤
research_results: str # 研究结果
code_output: str # 代码输出
review_feedback: str # 审查反馈
iteration: int # 迭代次数
# 2. 定义节点(处理函数)
def research_node(state: AgentState) -> AgentState:
"""研究节点"""
messages = state["messages"]
# 调用研究 Agent
response = research_llm.invoke(messages)
return {
"research_results": response.content,
"messages": [("assistant", f"[研究完成] {response.content}")]
}
def code_node(state: AgentState) -> AgentState:
"""编码节点"""
prompt = f"基于以下研究结果写代码:\n{state['research_results']}"
response = code_llm.invoke([("user", prompt)])
return {
"code_output": response.content,
"messages": [("assistant", f"[代码完成] {response.content}")]
}
def review_node(state: AgentState) -> AgentState:
"""审查节点"""
prompt = f"审查以下代码:\n{state['code_output']}"
response = review_llm.invoke([("user", prompt)])
return {
"review_feedback": response.content,
"iteration": state.get("iteration", 0) + 1,
"messages": [("assistant", f"[审查意见] {response.content}")]
}
# 3. 定义路由条件
def should_continue(state: AgentState) -> Literal["code", "end"]:
"""根据审查结果决定是否需要重写代码"""
feedback = state.get("review_feedback", "")
if "通过" in feedback or state.get("iteration", 0) >= 3:
return "end"
return "code" # 不通过,回到编码节点
# 4. 构建状态图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("research", research_node)
graph.add_node("code", code_node)
graph.add_node("review", review_node)
# 添加边
graph.add_edge(START, "research") # 开始 → 研究
graph.add_edge("research", "code") # 研究 → 编码
graph.add_edge("code", "review") # 编码 → 审查
# 条件边:审查后根据结果决定下一步
graph.add_conditional_edges(
"review",
should_continue,
{"code": "code", "end": END} # 不通过回编码,通过则结束
)
# 5. 编译并运行
app = graph.compile(checkpointer=MemorySaver())
result = app.invoke(
{"messages": [("user", "分析sales.csv数据并生成可视化")]}
)LangGraph 的核心优势——循环和条件分支:
# 循环模式:Agent 反复改进直到满意
# research → code → review → (不通过) → code → review → (通过) → END
# 分支模式:根据情况走不同路径
def route_by_complexity(state) -> str:
if state["complexity"] == "simple":
return "simple_agent"
elif state["complexity"] == "medium":
return "medium_agent"
else:
return "expert_agent"
graph.add_conditional_edges("classifier", route_by_complexity, {
"simple_agent": simple_node,
"medium_agent": medium_node,
"expert_agent": expert_node,
})追问:
Q: LangGraph 和 CrewAI/AutoGen 的核心区别是什么? LangGraph 底层是一个有限状态机,给了你最大的控制力——你可以精确定义每个状态转换、循环、分支。CrewAI/AutoGen 更高层抽象,开发速度快但灵活性低。如果你的任务流程简单,用 CrewAI;如果需要复杂的流程控制(如重试、分支、人工审核节点),用 LangGraph。
Q: LangGraph 的 Checkpointer 有什么用? Checkpointer 可以在每个节点执行后保存状态快照,实现"断点续传"。当系统崩溃或需要人工审核时,可以从最近的检查点恢复,而不是从头开始。这在长时间运行的多 Agent 任务中非常重要。
Q: 这些框架的优缺点对比?
难度:⭐⭐ 进阶
答案:
| 维度 | CrewAI | AutoGen | LangGraph |
|---|---|---|---|
| 学习曲线 | ⭐ 低 | ⭐⭐ 中 | ⭐⭐⭐ 高 |
| 灵活性 | ⭐⭐ 中 | ⭐⭐ 中 | ⭐⭐⭐ 高 |
| 生产就绪度 | ⭐⭐ 中 | ⭐⭐ 中 | ⭐⭐⭐ 高 |
| 调试工具 | ⭐ 弱 | ⭐⭐ 中 | ⭐⭐⭐ 强 |
| 社区生态 | ⭐⭐ 活跃 | ⭐⭐⭐ 很活跃 | ⭐⭐⭐ 很活跃 |
| 类型安全 | ⭐ 弱 | ⭐ 弱 | ⭐⭐⭐ 强(TypedDict) |
| 流式输出 | ⭐⭐ 中 | ⭐⭐ 中 | ⭐⭐⭐ 强 |
| 人工介入 | ⭐ 弱 | ⭐⭐⭐ 强 | ⭐⭐⭐ 强 |
| 代码执行 | ❌ 不内置 | ✅ 内置 | ❌ 需集成 |
| 状态管理 | ⭐ 简单 | ⭐⭐ 中 | ⭐⭐⭐ 强(持久化) |
| 最佳场景 | 快速原型 | 对话式协作 | 复杂生产系统 |
如何选择框架?
def choose_framework(requirements):
"""框架选型决策"""
if requirements.get("prototype_speed") == "high":
return "CrewAI" # 快速原型,API 最简洁
if requirements.get("need_code_execution"):
return "AutoGen" # 需要代码执行能力
if requirements.get("complex_flow") or requirements.get("production"):
return "LangGraph" # 复杂流程控制,生产环境
if requirements.get("conversational"):
return "AutoGen" # 对话式多 Agent
return "LangGraph" # 默认推荐追问:
Q: 不用框架,自己手写多 Agent 系统可以吗? 当然可以。对于简单场景(2-3 个 Agent、流程固定),手写反而更轻量、更可控。只有在系统复杂到一定程度(5+ Agent、动态流程、需要持久化)时,框架才真正有价值。建议先手写理解原理,再用框架提升效率。
Q: 这些框架支持异步吗? LangGraph 原生支持 async;CrewAI 支持异步执行;AutoGen 的异步支持相对较弱。如果你的场景需要高并发(如多个独立的多 Agent 系统同时运行),优先选择 LangGraph。
六、实际开发
Q: 多 Agent 系统的调试难点?怎么解决?
难度:⭐⭐ 进阶
答案:
多 Agent 系统的调试是公认的痛点,主要难点:
难点 1:执行路径不确定
同一个输入,不同次运行可能走不同的 Agent 路径。这让 Bug 难以复现。
# 解决方案:固定随机种子 + 确定性路由
class DebuggableSystem:
def __init__(self):
self.trace_log = [] # 执行追踪日志
def log(self, agent_name, action, input_data, output_data):
self.trace_log.append({
"timestamp": time.time(),
"agent": agent_name,
"action": action,
"input": input_data,
"output": output_data,
"trace_id": self.current_trace_id
})
def run(self, task):
self.current_trace_id = str(uuid.uuid4())
self.trace_log = []
result = self._execute(task)
# 保存完整的执行追踪
self.save_trace(self.trace_log)
return result难点 2:错误传播
一个 Agent 的小错误可能在后续 Agent 中被放大,但你很难找到根因。
# 解决方案:在每个 Agent 的输入输出处设置检查点
class AgentCheckpoint:
def __init__(self, agent):
self.agent = agent
def run(self, input_data):
# 执行前检查
self.validate_input(input_data)
# 执行
output = self.agent.run(input_data)
# 执行后检查
self.validate_output(output)
return output
def validate_input(self, data):
assert data is not None, f"[{self.agent.name}] 输入为空"
assert len(str(data)) > 0, f"[{self.agent.name}] 输入为空字符串"
def validate_output(self, data):
assert data is not None, f"[{self.agent.name}] 输出为空"
# 可以添加更多业务逻辑检查难点 3:Token 消耗追踪
不清楚哪个 Agent 消耗了多少 Token,成本难以控制。
# 解决方案:Token 使用量追踪
class TokenTracker:
def __init__(self):
self.usage = defaultdict(lambda: {"input": 0, "output": 0, "cost": 0})
def record(self, agent_name: str, input_tokens: int, output_tokens: int):
self.usage[agent_name]["input"] += input_tokens
self.usage[agent_name]["output"] += output_tokens
cost = (input_tokens * 0.005 + output_tokens * 0.015) / 1000
self.usage[agent_name]["cost"] += cost
def report(self):
total = sum(u["cost"] for u in self.usage.values())
print(f"\n{'='*50}")
print(f"Token 使用报告 (总成本: ${total:.4f})")
print(f"{'='*50}")
for name, usage in sorted(self.usage.items(), key=lambda x: -x[1]["cost"]):
print(f" {name}: 输入={usage['input']}, "
f"输出={usage['output']}, 成本=${usage['cost']:.4f}")实用调试工具推荐:
- LangSmith:LangChain 的追踪平台,可以看到每个 LLM 调用的详细输入输出
- Phoenix (Arize):开源的 LLM 可观测性工具
- 自建日志系统:结构化日志 + 可视化
# 使用 LangSmith 追踪
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_key"
# 之后所有 LangChain/LangGraph 的调用都会自动记录追问:
- Q: 如何在生产环境中监控多 Agent 系统? 关键指标:① 每个 Agent 的成功率和平均耗时;② 端到端的任务完成率;③ Token 使用量和成本;④ Agent 间通信的延迟。建议使用 Prometheus + Grafana 搭建监控看板,设置告警规则。
Q: 如何控制多 Agent 的成本?
难度:⭐⭐ 进阶
答案:
多 Agent 系统的成本是单 Agent 的 N 倍(N = Agent 数量 × 平均调用次数)。控制成本至关重要。
策略 1:分层模型策略
不是所有 Agent 都需要最强的模型。简单任务用小模型,复杂任务用大模型。
MODEL_TIERS = {
"premium": "gpt-4o", # 最强,用于 Supervisor 和关键决策
"standard": "gpt-4o-mini", # 中等,用于常规分析和写作
"budget": "gpt-3.5-turbo", # 经济,用于简单分类和格式化
}
def assign_model(agent_role: str) -> str:
"""根据 Agent 角色分配模型"""
premium_roles = {"supervisor", "judge", "architect"}
budget_roles = {"formatter", "classifier", "router"}
if agent_role in premium_roles:
return MODEL_TIERS["premium"]
elif agent_role in budget_roles:
return MODEL_TIERS["budget"]
else:
return MODEL_TIERS["standard"]策略 2:智能路由——能不用 Agent 就不用
class SmartRouter:
"""简单任务直接处理,复杂任务才分发给 Agent"""
def __init__(self, simple_model, complex_system):
self.simple_model = simple_model
self.complex_system = complex_system
def run(self, task: str) -> str:
# 先用小模型评估复杂度
complexity = self.assess_complexity(task)
if complexity == "simple":
# 直接用小模型回答,省去多 Agent 开销
return self.simple_model.chat(task)
else:
# 交给多 Agent 系统处理
return self.complex_system.run(task)
def assess_complexity(self, task: str) -> str:
prompt = f"判断以下任务的复杂度(simple/complex):{task}"
return self.simple_model.chat(prompt).strip()策略 3:缓存机制
相似的子任务结果可以缓存复用。
import hashlib
class AgentCache:
def __init__(self, ttl=3600):
self.cache = {}
self.ttl = ttl
def get(self, agent_name: str, input_data: str):
key = self._make_key(agent_name, input_data)
if key in self.cache:
entry = self.cache[key]
if time.time() - entry["time"] < self.ttl:
return entry["result"]
del self.cache[key]
return None
def set(self, agent_name: str, input_data: str, result: str):
key = self._make_key(agent_name, input_data)
self.cache[key] = {"result": result, "time": time.time()}
def _make_key(self, agent_name, input_data):
return hashlib.md5(f"{agent_name}:{input_data}".encode()).hexdigest()策略 4:提前终止
如果中间结果已经足够好,不需要继续执行后续 Agent。
class EarlyStoppingSystem:
def run(self, pipeline, input_data):
result = input_data
for agent in pipeline:
result = agent.run(result)
# 检查是否已经足够好
quality = self.assess_quality(result)
if quality > 0.9: # 质量已经很高
print(f"提前终止于 {agent.name},质量已达 {quality:.2f}")
break
return result追问:
- Q: 实际项目中,多 Agent 的成本大概是单 Agent 的多少倍? 经验数据:简单的 3-Agent 系统大约是单 Agent 的 5-10 倍(因为有通信和协调的额外开销)。通过上述优化手段,可以降到 2-4 倍。关键是要做好成本监控,知道钱花在哪里。
Q: 多 Agent 的延迟优化?
难度:⭐⭐ 进阶
答案:
多 Agent 系统的延迟 = 各 Agent 处理时间 + Agent 间通信时间 + 协调开销。优化延迟的核心思路:减少串行等待,增加并行执行。
策略 1:最大化并行
import asyncio
class OptimizedPipeline:
"""优化后的执行流程——无依赖的任务并行执行"""
async def run(self, dag: DAGScheduler):
completed = set()
results = {}
while len(completed) < len(dag.tasks):
ready = dag.get_ready_tasks(completed)
if not ready:
break
# 并行执行所有就绪任务
tasks = []
for task_id in ready:
task = dag.tasks[task_id]
deps_results = {d: results[d] for d in dag.dependencies[task_id]}
tasks.append(self.execute_task(task_id, task, deps_results))
task_results = await asyncio.gather(*tasks)
for task_id, result in zip(ready, task_results):
results[task_id] = result
completed.add(task_id)
return results策略 2:流式处理——边生成边传递
class StreamingAgent:
"""流式输出 Agent——不等完全生成就传递给下游"""
async def run_streaming(self, input_data: str):
"""流式生成,每生成一段就 yield"""
response = await self.model.chat_stream(input_data)
buffer = ""
async for chunk in response:
buffer += chunk
# 每积累一定量就传递给下游
if len(buffer) > 200:
yield buffer
buffer = ""
if buffer:
yield buffer策略 3:预测性执行(Speculative Execution)
预测可能需要的下一步,提前开始执行。
class SpeculativeSystem:
async def run(self, task):
# 在等待当前任务结果的同时,预测并预执行下一步
current_task = asyncio.create_task(self.execute_current(task))
# 预测可能的下一步
predicted_next = self.predict_next_step(task)
speculative_task = asyncio.create_task(
self.warm_up(predicted_next) # 只做预热,不做完整执行
)
current_result = await current_task
# 如果预测正确,预热的结果可以直接用
if self.prediction_correct(current_result, predicted_next):
return await speculative_task
else:
speculative_task.cancel()
return await self.execute_correct_step(current_result)策略 4:减少 LLM 调用次数
# 不好的做法:每个 Agent 单独调用 LLM
# Agent A 调用 LLM → Agent B 调用 LLM → Agent C 调用 LLM
# 总共 3 次 LLM 调用
# 好的做法:合并可以合并的调用
class BatchedAgent:
"""一次 LLM 调用完成多个子任务"""
def run(self, tasks: list) -> list:
prompt = "请依次完成以下任务,用 === 分隔每个任务的结果:\n\n"
for i, task in enumerate(tasks):
prompt += f"任务{i+1}: {task}\n"
prompt += "\n请开始:"
response = self.model.chat(prompt)
return response.split("===")追问:
- Q: 如何测量和定位延迟瓶颈? 在每个 Agent 的入口和出口记录时间戳,计算每个 Agent 的耗时占比。通常 80% 的延迟集中在 20% 的 Agent 上(帕累托法则)。优先优化耗时最长的那个 Agent——可能是它使用的工具太慢(如搜索引擎),需要换更快的工具或添加缓存。
七、实战难题
难题 1:Agent 死循环——两个 Agent 互相调用导致无限递归
难度:⭐⭐⭐ 高级
问题描述:
Supervisor 把任务分配给 Agent A,Agent A 觉得需要 Agent B 的帮助,把任务转给 Agent B,Agent B 又觉得应该让 Agent A 处理,结果陷入无限循环。
解决方案:
class CallDepthLimiter:
"""调用深度限制器——防止 Agent 间的无限递归"""
def __init__(self, max_depth=5):
self.max_depth = max_depth
self.call_stack = []
def enter(self, agent_name: str):
self.call_stack.append(agent_name)
if len(self.call_stack) > self.max_depth:
raise RecursionError(
f"Agent 调用深度超过限制({self.max_depth})!"
f"调用栈: {' → '.join(self.call_stack)}"
)
# 检测直接循环
if len(self.call_stack) >= 2 and self.call_stack[-1] == self.call_stack[-2]:
raise RecursionError(
f"检测到 Agent 自调用: {agent_name}"
)
# 检测间接循环 (A→B→A)
if self.call_stack.count(agent_name) > 1:
raise RecursionError(
f"检测到循环调用: {' → '.join(self.call_stack)}"
)
def exit(self):
self.call_stack.pop()
# 在 Supervisor 提示词中加入防循环指令
ANTI_LOOP_PROMPT = """
重要规则:
1. 不要把任务分配回给你收到任务的那个 Agent
2. 如果你已经尝试了某个 Agent 但没有得到满意结果,不要再次调用它
3. 如果连续3次尝试都失败,直接给出当前最好的答案
4. 记录你已经调用过的 Agent,避免重复调用
"""难题 2:上下文窗口溢出——多轮对话后信息超出 Token 限制
难度:⭐⭐⭐ 高级
问题描述:
多个 Agent 共享对话历史,几轮交互后上下文超出模型的 Token 限制,导致截断或报错。
解决方案:
class ContextManager:
"""智能上下文管理器"""
def __init__(self, max_tokens=8000, model="gpt-4o-mini"):
self.max_tokens = max_tokens
self.model = model # 用于摘要的轻量模型
def manage_context(self, messages: list, system_prompt: str) -> list:
"""管理上下文,确保不超出 Token 限制"""
system_tokens = self.count_tokens(system_prompt)
# 计算当前消息的 Token 数
total_tokens = system_tokens + sum(
self.count_tokens(m["content"]) for m in messages
)
if total_tokens <= self.max_tokens:
return messages # 没超限,直接返回
# 超限了!分层压缩
# 保留:系统提示 + 最近 N 条消息
recent_count = 5
recent_messages = messages[-recent_count:]
old_messages = messages[:-recent_count]
# 把旧消息压缩成摘要
if old_messages:
summary = self.summarize(old_messages)
compressed = [
{"role": "system", "content": f"[历史摘要] {summary}"}
] + recent_messages
else:
compressed = recent_messages
return compressed
def summarize(self, messages: list) -> str:
"""用轻量模型生成对话摘要"""
text = "\n".join(f"[{m['role']}]: {m['content']}" for m in messages)
prompt = f"请用100字概括以下对话的关键信息:\n{text}"
return self.model.chat(prompt)
def count_tokens(self, text: str) -> int:
"""估算 Token 数"""
return len(text) // 2 # 粗略估算难题 3:Agent 输出格式不稳定——有时返回 JSON,有时返回纯文本
难度:⭐⭐ 进阶
问题描述:
设计了一个 Supervisor-Worker 架构,要求 Worker Agent 返回 JSON 格式的结果以便 Supervisor 解析。但 LLM 输出格式不稳定,有时返回 JSON,有时返回 markdown 包裹的 JSON,有时直接返回自然语言。
解决方案:
import json
import re
class OutputParser:
"""健壮的输出解析器"""
@staticmethod
def parse_json(text: str) -> dict:
"""尝试多种方式解析 JSON"""
# 方法1:直接解析
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 方法2:提取 ```json ... ``` 中的内容
json_match = re.search(r'```json\s*(.*?)\s*```', text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# 方法3:提取 { ... } 或 [ ... ]
brace_match = re.search(r'(\{.*\}|\[.*\])', text, re.DOTALL)
if brace_match:
try:
return json.loads(brace_match.group(1))
except json.JSONDecodeError:
pass
# 方法4:让 LLM 帮忙提取
return OutputParser.force_parse_with_llm(text)
@staticmethod
def force_parse_with_llm(text: str) -> dict:
"""最后手段:用 LLM 从自然语言中提取结构化信息"""
prompt = f"""
请从以下文本中提取信息,以JSON格式返回:
{text}
必须返回有效的JSON,不要包含其他内容。
"""
response = llm.invoke(prompt)
return json.loads(response.content)难题 4:多 Agent 系统的一致性问题——并发写入导致数据冲突
难度:⭐⭐⭐ 高级
问题描述:
多个 Agent 并行执行时,同时修改共享状态(如共享的数据库或文件),导致数据覆盖或不一致。
解决方案:
import asyncio
from enum import Enum
class LockType(Enum):
READ = "read"
WRITE = "write"
class SharedResourceManager:
"""带读写锁的共享资源管理器"""
def __init__(self):
self.data = {}
self._readers = 0
self._write_lock = asyncio.Lock()
self._read_lock = asyncio.Lock()
async def read(self, key: str, agent_name: str) -> any:
"""读操作——允许多个 Agent 同时读"""
async with self._read_lock:
self._readers += 1
if self._readers == 1:
await self._write_lock.acquire()
try:
return self.data.get(key)
finally:
async with self._read_lock:
self._readers -= 1
if self._readers == 0:
self._write_lock.release()
async def write(self, key: str, value: any, agent_name: str) -> bool:
"""写操作——独占访问"""
async with self._write_lock:
old_value = self.data.get(key)
# 使用乐观锁:检查值是否被其他 Agent 修改过
self.data[key] = {
"value": value,
"updated_by": agent_name,
"version": self.data.get(key, {}).get("version", 0) + 1
}
return True难题 5:多 Agent 系统的可观测性——出了问题不知道是谁的责任
难度:⭐⭐ 进阶
问题描述:
系统返回了一个错误结果,但不知道是哪个 Agent 出了问题,每个 Agent 都说"我做的是对的"。
解决方案:
import json
import time
from dataclasses import dataclass, asdict
from typing import Optional
@dataclass
class AgentTrace:
"""Agent 执行追踪记录"""
trace_id: str
agent_name: str
step: int
input_summary: str
output_summary: str
model: str
tokens_used: int
latency_ms: float
success: bool
error: Optional[str] = None
parent_trace_id: Optional[str] = None # 上游 Agent 的 trace_id
class ObservabilitySystem:
"""多 Agent 可观测性系统"""
def __init__(self):
self.traces = []
def wrap_agent(self, agent):
"""包装 Agent,自动记录执行追踪"""
original_run = agent.run
def traced_run(input_data):
start = time.time()
trace_id = f"{agent.name}_{int(start * 1000)}"
try:
result = original_run(input_data)
latency = (time.time() - start) * 1000
self.traces.append(AgentTrace(
trace_id=trace_id,
agent_name=agent.name,
step=len(self.traces),
input_summary=str(input_data)[:200],
output_summary=str(result)[:200],
model=agent.model_name,
tokens_used=agent.last_token_count,
latency_ms=latency,
success=True
))
return result
except Exception as e:
latency = (time.time() - start) * 1000
self.traces.append(AgentTrace(
trace_id=trace_id,
agent_name=agent.name,
step=len(self.traces),
input_summary=str(input_data)[:200],
output_summary="",
model=agent.model_name,
tokens_used=0,
latency_ms=latency,
success=False,
error=str(e)
))
raise
agent.run = traced_run
return agent
def get_failure_report(self) -> str:
"""生成失败分析报告"""
failed = [t for t in self.traces if not t.success]
if not failed:
return "所有 Agent 执行成功 ✅"
report = f"❌ {len(failed)} 个 Agent 执行失败:\n"
for trace in failed:
report += f"""
Agent: {trace.agent_name}
Trace ID: {trace.trace_id}
错误: {trace.error}
输入: {trace.input_summary}
耗时: {trace.latency_ms:.0f}ms
---
"""
return report总结
| 场景 | 推荐架构 | 推荐框架 |
|---|---|---|
| 快速原型 | Sequential / Parallel | CrewAI |
| 对话式协作 | 对等 + GroupChat | AutoGen |
| 复杂生产系统 | 层级 + DAG | LangGraph |
| 高价值决策 | Debate | 手写 / LangGraph |
| 大规模任务 | Supervisor + Delegation | LangGraph |
核心设计原则:
- 从简单开始:先用单 Agent,不行再拆多 Agent
- 最小化通信:Agent 之间只传必要信息
- 分层模型:简单任务用小模型,关键决策用大模型
- 完善的可观测性:追踪每一次 LLM 调用
- 设计容错机制:重试、降级、超时,缺一不可
- 控制成本:缓存、提前终止、智能路由
最后更新:2025年6月