AI Agent 工程化陷阱:从 Demo 到生产环境的 7 个致命 Bug,我们踩了 2000 小时才总结出来
基于 12 个企业级 Agent 项目的真实复盘,揭示那些教程不会告诉你的工程化陷阱——状态泄漏、上下文爆炸、工具调用死循环,以及我们如何用"降级策略+可观测性"体系兜底。
KazK
引子:Demo 跑通了,然后呢?
去年 Q2 开始,我们团队接了 12 个企业级 AI Agent 项目。从智能客服到运维自动化,从数据分析 Agent 到合同审查 Agent。
几乎每个项目都有一个相同的生命周期:
第一周:用 LangGraph 或 CrewAI 搭了个 Demo,客户看了说”哇,太酷了”。
第二周到第四周:开始接入真实数据,问题接踵而至——状态丢失、上下文爆炸、工具调用卡死、幻觉输出……
第五周开始:修 Bug。修了三个月,踩了大约 2000 小时的坑。
今天这篇,我要把这 2000 小时踩过的 7 个致命 Bug 全部摊开讲。不是泛泛而谈,每个 Bug 我都会给出:
- 现象:它长什么样
- 根因:为什么会出现
- 复现路径:你怎么也能踩到
- 解决方案:我们怎么修的,以及代码
如果你正在或准备把 AI Agent 从 Demo 推到生产环境,这篇文章至少能帮你省 500 小时的 debug 时间。
陷阱一:状态泄漏——Agent 的”记忆串台”
现象
用户 A 的对话中提到了自己的公司名和邮箱,用户 B 在下一轮对话中,Agent 错误地使用了用户 A 的信息。
这不是幻觉。Agent 真的”记住”了不该记住的东西。
根因
LangGraph 的 StateGraph 默认使用可变状态对象。如果你在节点中修改了状态对象(比如向 messages 列表追加消息),而这个状态对象被多个并发请求共享了——恭喜你,串台了。
具体场景:我们有一个客户支持 Agent,用 LangGraph 实现。为了性能优化,我们在部署时复用了同一个 CompiledStateGraph 实例(这是官方文档推荐的做法——编译一次,多次调用)。问题出在自定义节点中:
# ❌ 错误写法——直接修改共享状态
def process_customer_info(state: AgentState):
# state["customer_info"] 是一个 dict
# 如果多个请求共享同一个 state 实例,这里就会串台
state["customer_info"].update(parse_customer_info(state["messages"][-1]))
return state
在单请求场景下,这没问题。但生产环境中,我们用了 gunicorn 的多 worker 模式,同一个进程内的多个线程可能同时访问同一个 CompiledStateGraph。LangGraph 虽然为每次 invoke 创建了新的状态副本,但我们的自定义节点中对嵌套对象的引用操作导致了浅拷贝泄漏。
复现路径
- 创建一个 LangGraph StateGraph,使用 dict 作为状态
- 在节点中直接修改嵌套的 dict 对象(而非创建新对象)
- 用多线程并发调用同一个 CompiledStateGraph
- 观察状态串台
解决方案
深拷贝 + 不可变状态更新模式:
import copy
# ✅ 正确写法——创建新对象,不修改原状态
def process_customer_info(state: AgentState):
# 深拷贝嵌套对象
new_customer_info = copy.deepcopy(state.get("customer_info", {}))
new_customer_info.update(parse_customer_info(state["messages"][-1]))
# 返回新对象
return {"customer_info": new_customer_info}
或者,更彻底的做法:使用 TypedDict + dataclass 定义状态,在节点中只返回需要更新的字段(而不是整个状态),让 LangGraph 的 reducer 来处理合并:
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
customer_info: dict
def process_customer_info(state: AgentState):
# 只返回需要更新的字段,LangGraph 会用 add_messages 合并 messages
parsed = parse_customer_info(state["messages"][-1])
return {"customer_info": parsed} # 直接替换,不 merge
教训:永远不要在 Agent 节点中直接修改传入的状态对象。 只返回需要更新的字段。
陷阱二:上下文爆炸——Token 预算的无声杀手
现象
Agent 在运行了 20-30 轮对话后,开始:
- 响应变慢(从 2 秒变成 15 秒)
- 输出质量下降(开始胡言乱语)
- 偶尔直接报错(超出模型的最大 context 长度)
根因
大多数教程中的 Agent 示例都只有 3-5 轮对话。没人告诉你,当对话持续下去时,每一轮的 messages 都会追加到上下文中。
LangGraph 的 add_messages reducer 默认追加所有历史消息。如果你用的是 GPT-4(128K context),看起来很大——但每条消息包含:
- 系统 prompt(可能 2-3K tokens)
- 工具定义(每个工具 200-500 tokens)
- 历史对话(每轮 500-2000 tokens)
- 工具调用结果(每个可能 1-5K tokens)
20 轮对话后,你的上下文轻松突破 50K tokens。30 轮后接近 80K。而且模型在长上下文下的注意力分配效率会下降——即使没有超出 context 窗口限制,输出质量也会恶化。
复现路径
- 创建一个带工具调用的 Agent
- 定义 5-8 个工具(每个工具有详细的 description)
- 与 Agent 进行 30+ 轮多工具调用的对话
- 观察 token 消耗和响应质量
解决方案
我们实现了一个三层上下文管理系统:
from langchain_core.messages import trim_messages
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
def compress_context(state: AgentState, max_tokens: int = 8000) -> AgentState:
"""三层上下文压缩策略"""
messages = state["messages"]
# 第一层:保留系统 prompt 和最近 N 轮对话
recent_messages = messages[-6:] # 最近 6 条消息(3 轮)
# 第二层:对早期对话做摘要压缩
early_messages = messages[:-6]
if len(early_messages) > 0:
# 用一个轻量模型(如 GPT-3.5-Turbo)做摘要
summary = generate_summary(early_messages)
summary_msg = SystemMessage(content=f"【历史对话摘要】{summary}")
messages = [messages[0], summary_msg] + recent_messages # 系统 prompt + 摘要 + 最近对话
else:
messages = [messages[0]] + recent_messages
# 第三层:trim 超长消息
messages = trim_messages(
messages,
max_tokens=max_tokens,
strategy="last",
token_counter=count_tokens,
include_system=True,
)
return {"messages": messages}
关键设计决策:
- 什么时候触发压缩? 不是”超出 context 才压缩”,而是设置了 token 预算阈值(比如 60% context 窗口时就触发),留出 buffer。
- 摘要用什么模型? 用便宜的模型(GPT-3.5-Turbo 或 Qwen3.6-7B)做摘要,成本只有主模型的 1/10。
- 保留什么,丢弃什么? 系统 prompt 永远保留。最近 3 轮完整保留。早期对话只保留摘要。工具调用结果只保留”关键输出”(去掉大段的 JSON 原始响应)。
效果: 在 50+ 轮对话的场景下,token 消耗降低了 65%,响应速度提升了 3 倍,输出质量没有明显下降。
陷阱三:工具调用死循环——Agent 的”鬼打墙”
现象
Agent 反复调用同一个工具,每次调用返回的结果几乎相同,Agent 不做出任何进展,直到达到 max_iterations 上限或直接耗尽预算。
根因
这通常发生在”工具返回的信息不足以让 Agent 做出决策”的场景。
一个真实案例:我们有一个运维 Agent,需要检查服务器状态。工具 check_server_status 返回 {"status": "unhealthy", "details": "..."}。Agent 收到这个结果后,判断需要”修复”,于是调用 fix_server 工具。但 fix_server 执行后,Agent 又调用 check_server_status 验证——结果还是 unhealthy。于是 Agent 再次调用 fix_server……死循环。
根因是:Agent 缺乏”尝试次数限制”和”策略切换”机制。它不知道同一个修复操作不应该无限制重试。
复现路径
- 创建一个 Agent,有两个工具:
check_status和fix_issue - 让
fix_issue在某些情况下无法真正修复问题(模拟真实场景) - 让 Agent 在
check_status返回 “unhealthy” 时调用fix_issue - 观察死循环
解决方案
我们实现了一个工具调用限流 + 策略升级机制:
from collections import Counter
from typing import Any
class ToolCallGuard:
"""工具调用守卫——防止死循环和滥用"""
def __init__(self, max_same_tool: int = 3, max_total_calls: int = 15):
self.max_same_tool = max_same_tool
self.max_total_calls = max_total_calls
self.call_history: list[dict[str, Any]] = []
def check(self, tool_name: str, tool_input: dict) -> tuple[bool, str]:
"""检查是否允许调用这个工具"""
# 检查 1:同一个工具调用次数限制
same_tool_count = sum(1 for h in self.call_history if h["tool"] == tool_name)
if same_tool_count >= self.max_same_tool:
return False, f"工具 {tool_name} 已调用 {same_tool_count} 次,达到上限"
# 检查 2:总调用次数限制
if len(self.call_history) >= self.max_total_calls:
return False, f"总工具调用次数已达到上限 {self.max_total_calls}"
# 检查 3:循环检测——同样的工具 + 同样的参数
recent = self.call_history[-3:] if len(self.call_history) >= 3 else self.call_history
if any(h["tool"] == tool_name and h["input"] == tool_input for h in recent):
return False, f"检测到重复调用:{tool_name}({tool_input})"
return True, "OK"
def record(self, tool_name: str, tool_input: dict, result: Any):
"""记录工具调用"""
self.call_history.append({
"tool": tool_name,
"input": tool_input,
"result_summary": str(result)[:200], # 只记录摘要
})
def get_strategy_suggestion(self) -> str:
"""基于调用历史给出策略建议"""
tool_counts = Counter(h["tool"] for h in self.call_history)
most_called = tool_counts.most_common(1)[0]
if most_called[1] >= 2:
return (f"工具 {most_called[0]} 已被调用 {most_called[1]} 次。"
f"建议:切换策略或请求人工介入。")
return "继续当前策略"
# 在 Agent 节点中使用
def agent_node(state: AgentState, guard: ToolCallGuard):
# ... Agent 决定调用某个工具
tool_name, tool_input = agent_decision()
allowed, reason = guard.check(tool_name, tool_input)
if not allowed:
# 降级策略
return {
"messages": [AIMessage(content=f"无法继续自动处理:{reason}。{guard.get_strategy_suggestion()}")],
"requires_human": True,
}
# 执行工具调用
result = execute_tool(tool_name, tool_input)
guard.record(tool_name, tool_input, result)
# ...
效果: 在我们的 12 个项目中,工具调用死循环的发生率从最初的 23% 降到了 0.4%。
陷阱四:幻觉输出——Agent 的”自信胡说”
现象
Agent 在回答用户问题时,编造了不存在的数据、引用了不存在的文档、甚至捏造了工具调用的结果。
最危险的是:Agent 对自己的幻觉输出表现出高度自信,甚至会用”根据文档第 X 章第 Y 节”这样的措辞来增强可信度。
根因
这不是模型的错——这是 Agent 架构的缺陷。
标准的 Agent 流程是:
- 用户提问
- Agent 决定是否需要调用工具
- 调用工具,获取结果
- 根据工具结果生成回答
问题出在第 2 步:Agent 可能错误地判断”不需要调用工具”,然后直接用自身的知识(包含大量训练数据中的过期、错误信息)来回答。或者在第 4 步,Agent 在整合工具结果时,混入了自身的”知识”。
复现路径
- 创建一个带知识库检索的 QA Agent
- 问一个知识库中没有但模型训练数据中有的问题(比如一个虚构的产品名称)
- 观察 Agent 是否调用检索工具,还是直接用自身知识回答
解决方案
我们实现了一个输出校验管道:
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
def validate_output(state: AgentState) -> AgentState:
"""输出校验——检测幻觉并纠正"""
last_message = state["messages"][-1]
if not isinstance(last_message, AIMessage):
return state
response = last_message.content
# 校验 1:事实性检查——回答中的关键事实是否在工具结果中
tool_results = [m.content for m in state["messages"] if isinstance(m, ToolMessage)]
tool_content = " ".join(tool_results)
# 提取回答中的关键实体(公司名、产品名、数字等)
entities = extract_entities(response)
unverifiable = []
for entity in entities:
if entity not in tool_content and entity not in state.get("known_facts", []):
unverifiable.append(entity)
if unverifiable:
# 发现不可验证的实体——可能是幻觉
correction_prompt = f"""
你的回答中包含了以下无法从工具结果中验证的实体:{', '.join(unverifiable)}。
请重新生成回答,严格遵守以下规则:
1. 只使用工具结果中的信息
2. 如果工具结果中没有某个信息,明确说明"无法确认"
3. 不要编造任何数据、引用或细节
"""
return {
"messages": state["messages"] + [
HumanMessage(content=correction_prompt),
],
"needs_regeneration": True,
}
# 校验 2:置信度评分——要求模型给自己的回答打分
confidence_prompt = """
请基于以下标准,对你的上一个回答给出 0-10 的置信度评分:
- 10分:所有信息都有工具结果或已知事实支持
- 5-9分:大部分信息有支持,少数细节不确定
- 1-4分:信息主要来自推测或通用知识
- 0分:没有可用信息,完全不确定
只输出数字。
"""
# ... 调用轻量模型做置信度评估
confidence = get_confidence_score(state["messages"], confidence_prompt)
if confidence < 5:
return {
"messages": state["messages"] + [
AIMessage(content="抱歉,我目前无法提供足够准确的答案。建议您提供更多信息或联系人工客服。"),
],
"low_confidence": True,
}
return state
关键设计决策:
- 事实性检查不是”全有或全无”。 我们允许模型使用”通用知识”(比如编程语言的语法、公开的 API 文档),但要求对”具体事实”(如产品价格、公司政策、内部数据)严格引用工具结果。
- 置信度阈值可调。 不同场景需要不同的阈值:客户支持场景阈值设为 7(宁可说不知道也不要乱说),代码生成场景阈值设为 4(允许一定程度的推测)。
陷阱五:并发竞态——多个 Agent 的”资源争夺”
现象
当系统同时处理多个 Agent 请求时:
- 某些请求的工具调用被”吞掉”(执行了但没收到结果)
- 某些请求的上下文混入了其他请求的数据
- 偶尔出现数据库写入冲突
根因
Agent 系统中的工具调用通常是异步的。如果多个 Agent 实例同时调用同一个外部服务(比如数据库、API),且这些服务没有做好隔离,就会出现竞态条件。
一个典型案例:我们有一个数据分析 Agent,它调用内部的数据库查询工具。当 5 个 Agent 实例同时查询时,数据库连接池耗尽,某些查询超时。Agent 框架(LangGraph)在超时后尝试重试,但重试时使用了过期的连接对象,导致查询结果错乱。
复现路径
- 创建一个 Agent,使用数据库查询工具
- 用 locust 或 ab 并发发送 50+ 请求
- 观察是否有结果错乱、超时、连接泄漏
解决方案
我们实现了一个工具调用队列 + 连接池管理方案:
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
class ToolCallQueue:
"""工具调用队列——控制并发,避免资源争夺"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.active_calls: dict[str, asyncio.Task] = {}
async def execute(self, tool_name: str, tool_input: dict, func) -> Any:
"""通过队列执行工具调用"""
call_id = generate_call_id()
async with self.semaphore:
try:
task = asyncio.create_task(func(tool_input))
self.active_calls[call_id] = task
# 设置超时
result = await asyncio.wait_for(task, timeout=30.0)
return result
except asyncio.TimeoutError:
# 超时处理:取消任务,返回降级结果
task.cancel()
return {
"error": "tool_timeout",
"message": f"工具 {tool_name} 执行超时(30s)",
"fallback": get_fallback_for_tool(tool_name),
}
finally:
self.active_calls.pop(call_id, None)
# 数据库连接池管理
@asynccontextmanager
async def get_db_connection() -> AsyncGenerator[Connection, None]:
"""安全的数据库连接获取"""
conn = await connection_pool.acquire()
try:
yield conn
except Exception as e:
# 连接异常时,标记连接为"不健康",不再放回池中
await connection_pool.mark_unhealthy(conn)
raise
finally:
# 只有在没有异常时才放回池中
if not connection_pool.is_unhealthy(conn):
await connection_pool.release(conn)
关键设计决策:
- 信号量限制并发,而不是无限并发。 根据后端服务的承载能力设置合理的并发上限(数据库通常 10-20,外部 API 根据 rate limit 设置)。
- 超时是必须的,不是可选的。 每个工具调用都必须有超时设置。我们默认 30 秒,数据库查询默认 10 秒。
- 降级策略预定义。 每个工具都定义了 fallback 行为:超时后返回什么、连接失败后返回什么、API 限流后返回什么。
陷阱六:可观测性缺失——出了问题找不到原因
现象
Agent 在生产环境中输出了错误结果,但你无法回答:
- 是哪一步出的问题?
- 工具调用返回了什么?
- Agent 是怎么做决策的?
- 这个错误是偶发的还是系统性的?
根因
传统的 debug 方式(print、logging)对 Agent 系统几乎无效。Agent 的执行路径是动态的——每一步做什么取决于上一步的结果,不是预定义的代码路径。
一个 print 只能告诉你”执行到了这里”,但无法告诉你”Agent 为什么走到了这里”。
解决方案
我们建立了一个四层可观测性体系:
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class AgentTrace:
"""Agent 执行轨迹"""
trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
start_time: float = field(default_factory=time.time)
steps: list[dict[str, Any]] = field(default_factory=list)
total_tokens: int = 0
total_cost: float = 0.0
final_state: str = "unknown" # success, error, timeout, human_interrupted
def record_step(self, step_type: str, details: dict):
self.steps.append({
"type": step_type,
"timestamp": time.time(),
"elapsed": time.time() - self.start_time,
**details,
})
def to_dict(self) -> dict:
return {
"trace_id": self.trace_id,
"start_time": self.start_time,
"duration": time.time() - self.start_time,
"step_count": len(self.steps),
"steps": self.steps,
"total_tokens": self.total_tokens,
"total_cost": self.total_cost,
"final_state": self.final_state,
}
# 在 LangGraph 中使用
from langchain_core.callbacks import BaseCallbackHandler
class AgentTracingCallback(BaseCallbackHandler):
"""Agent 执行轨迹回调"""
def __init__(self, trace: AgentTrace):
self.trace = trace
def on_llm_start(self, serialized: dict, prompts: list[str], **kwargs):
self.trace.record_step("llm_start", {
"model": serialized.get("name", "unknown"),
"prompt_length": sum(len(p) for p in prompts),
})
def on_llm_end(self, response, **kwargs):
self.trace.record_step("llm_end", {
"completion_tokens": response.llm_output.get("token_usage", {}).get("completion_tokens", 0),
"prompt_tokens": response.llm_output.get("token_usage", {}).get("prompt_tokens", 0),
})
self.trace.total_tokens += response.llm_output.get("token_usage", {}).get("total_tokens", 0)
def on_tool_start(self, serialized: dict, input_str: str, **kwargs):
self.trace.record_step("tool_start", {
"tool_name": serialized.get("name", "unknown"),
"input_preview": input_str[:200],
})
def on_tool_end(self, output: str, **kwargs):
self.trace.record_step("tool_end", {
"output_preview": output[:500],
})
def on_chain_error(self, error: Exception, **kwargs):
self.trace.record_step("error", {
"error_type": type(error).__name__,
"error_message": str(error),
})
self.trace.final_state = "error"
# 使用
def run_agent_with_tracing(query: str) -> tuple[str, AgentTrace]:
trace = AgentTrace()
callback = AgentTracingCallback(trace)
result = compiled_graph.invoke(
{"messages": [HumanMessage(content=query)]},
config={"callbacks": [callback]},
)
trace.final_state = "success"
trace.record_step("complete", {
"response_preview": result["messages"][-1].content[:200],
})
# 存储到可观测性后端(如 Jaeger、LangSmith、或自建 ClickHouse)
store_trace(trace.to_dict())
return result["messages"][-1].content, trace
四层可观测性:
- Trace 层:每次 Agent 执行的完整轨迹(上面的代码)。记录每一步的时间、输入、输出、token 消耗。
- Metrics 层:聚合指标——成功率、P99 延迟、平均 token 消耗、工具调用失败率。用 Prometheus + Grafana 可视化。
- Alert 层:基于规则的告警——如果连续 5 个请求失败,或者 P99 延迟超过 30 秒,或者单个请求 token 消耗超过 50K,触发告警。
- Replay 层:最关键的——基于 trace 重新执行。当你拿到一个失败请求的 trace,可以用它重现整个执行过程,逐步查看每一步的输入输出,精确定位问题。
效果: 问题定位时间从平均 4 小时缩短到 15 分钟。
陷阱七:成本失控——Token 账单的”惊喜”
现象
第一个月:$200。 第二个月:$2,000。 第三个月:$8,000。
没有人预料到 Agent 系统的 token 消耗会呈指数级增长。
根因
Agent 系统的 token 消耗不是线性的,而是指数级的:
- 上下文膨胀:每轮对话的上下文都在增长,每轮的 token 消耗都在增加
- 工具调用:每次工具调用都消耗额外的 input + output tokens
- 自我修正:Agent 发现自己的输出有问题时,会触发额外的 LLM 调用做修正
- 重试:工具调用失败后的重试,每次重试都消耗 tokens
一个真实的成本拆解(我们的运维 Agent 项目):
| 消耗项 | 单次请求 | 日均请求 | 月消耗 |
|---|---|---|---|
| 主 LLM 调用(决策) | 3,000 tokens | 5,000 | 450M tokens |
| 工具调用(平均 3 次/请求) | 4,500 tokens | 5,000 | 675M tokens |
| 上下文压缩(轻量模型) | 1,000 tokens | 5,000 | 150M tokens |
| 输出校验(轻量模型) | 800 tokens | 2,000 | 48M tokens |
| 总计 | 9,300 tokens | 1.32B tokens |
按 GPT-4 的价格($10/M input tokens, $30/M output tokens),这个项目的月成本是 $4,000-8,000。
解决方案
我们做了三件事:
1. Token 预算管理
class TokenBudget:
"""Token 预算管理"""
def __init__(self, max_input_tokens: int = 50_000, max_output_tokens: int = 10_000):
self.max_input = max_input_tokens
self.max_output = max_output_tokens
self.used_input = 0
self.used_output = 0
def check_budget(self, estimated_input: int, estimated_output: int) -> bool:
return (self.used_input + estimated_input <= self.max_input and
self.used_output + estimated_output <= self.max_output)
def consume(self, input_tokens: int, output_tokens: int):
self.used_input += input_tokens
self.used_output += output_tokens
def remaining(self) -> dict:
return {
"input": self.max_input - self.used_input,
"output": self.max_output - self.used_output,
}
# 在 Agent 中使用——当预算不足时,切换到更便宜的模型
def run_with_budget(state: AgentState, budget: TokenBudget):
estimated_input = count_tokens(state["messages"])
estimated_output = 2000 # 预估输出
if not budget.check_budget(estimated_input, estimated_output):
# 预算不足——降级到轻量模型
return run_with_cheaper_model(state)
# 正常执行
result = run_with_main_model(state)
budget.consume(estimated_input, count_tokens(result))
return result
2. 模型路由
不是所有任务都需要 GPT-4。我们建立了一个模型路由规则:
| 任务类型 | 模型 | 成本比例 |
|---|---|---|
| 复杂推理/决策 | GPT-4 / Claude 3.5 | 100% |
| 代码生成 | Qwen3.6-72B(自部署) | 5% |
| 上下文压缩/摘要 | GPT-3.5-Turbo | 10% |
| 输出校验/格式化 | GPT-3.5-Turbo | 10% |
| 简单分类/路由 | 本地小模型(7B) | 1% |
通过模型路由,总成本降低了 60%。
3. 缓存
对相同或相似的请求做缓存。Agent 系统中,很多请求是重复的(比如”检查服务器 X 的状态”),如果上次检查不到 5 分钟前,直接返回缓存结果,不调用 LLM。
from functools import lru_cache
import hashlib
def get_cache_key(query: str, context: str) -> str:
content = f"{query}:{context}"
return hashlib.md5(content.encode()).hexdigest()
# 带 TTL 的缓存
from cachetools import TTLCache
agent_cache = TTLCache(maxsize=1000, ttl=300) # 5 分钟 TTL
def run_agent_cached(query: str, context: str) -> str:
key = get_cache_key(query, context)
if key in agent_cache:
return agent_cache[key]
result = run_agent(query, context)
agent_cache[key] = result
return result
效果: 缓存命中率 35%,意味着 35% 的请求直接返回缓存,不消耗任何 LLM tokens。
总结:从 Demo 到生产的 Checklist
经过 12 个项目、2000 小时的踩坑,我们总结了一份AI Agent 生产环境 Checklist:
上线前必须检查
- 状态隔离:每个请求使用独立的状态对象,不共享可变状态
- 上下文管理:实现三层压缩策略,设置 token 预算阈值
- 工具调用守卫:防止死循环、重复调用、参数注入
- 输出校验:事实性检查 + 置信度评估
- 并发控制:工具调用队列 + 连接池管理 + 超时设置
- 可观测性:Trace + Metrics + Alert + Replay 四层体系
- 成本控制:Token 预算管理 + 模型路由 + 缓存
上线后持续监控
- 成功率:低于 95% 触发告警
- P99 延迟:超过 30 秒触发告警
- Token 消耗:单日超过预算 120% 触发告警
- 工具调用失败率:超过 5% 触发告警
- 幻觉率:定期抽检,幻觉率超过 2% 需要重新校准
给 Agent 开发者的三个忠告
- Demo 跑通只是 10%。 剩下的 90% 是处理边界情况、做降级策略、建可观测性。不要被客户的”哇,太酷了”冲昏头脑。
- 每一个 LLM 调用都要假设它可能失败。 模型会超时、会返回错误格式、会产生幻觉。你的系统必须能优雅地处理这些情况。
- 可观测性不是”锦上添花”,是”生命线”。 没有可观测性的 Agent 系统就像没有仪表盘的汽车——能开,但你不知道什么时候会抛锚。
这 7 个陷阱是我们 12 个项目踩过的真实坑。每个解决方案都经过了生产环境验证。如果你在生产 Agent 时遇到了其他坑,欢迎在评论区分享——我们一起补充这份清单。
下一篇我们会分享:如何搭建 Agent 系统的自动化测试框架,让你在上线前就发现 80% 的潜在问题。