AI AinoCode AI 工具与基础设施
AI Agent 12 分钟

AI Agent 工程化陷阱:从 Demo 到生产环境的 7 个致命 Bug,我们踩了 2000 小时才总结出来

基于 12 个企业级 Agent 项目的真实复盘,揭示那些教程不会告诉你的工程化陷阱——状态泄漏、上下文爆炸、工具调用死循环,以及我们如何用"降级策略+可观测性"体系兜底。

KazK

AI Agent 工程化陷阱实战复盘

引子:Demo 跑通了,然后呢?

去年 Q2 开始,我们团队接了 12 个企业级 AI Agent 项目。从智能客服到运维自动化,从数据分析 Agent 到合同审查 Agent。

几乎每个项目都有一个相同的生命周期:

第一周:用 LangGraph 或 CrewAI 搭了个 Demo,客户看了说”哇,太酷了”。

第二周到第四周:开始接入真实数据,问题接踵而至——状态丢失、上下文爆炸、工具调用卡死、幻觉输出……

第五周开始:修 Bug。修了三个月,踩了大约 2000 小时的坑。

今天这篇,我要把这 2000 小时踩过的 7 个致命 Bug 全部摊开讲。不是泛泛而谈,每个 Bug 我都会给出:

  1. 现象:它长什么样
  2. 根因:为什么会出现
  3. 复现路径:你怎么也能踩到
  4. 解决方案:我们怎么修的,以及代码

如果你正在或准备把 AI Agent 从 Demo 推到生产环境,这篇文章至少能帮你省 500 小时的 debug 时间。


陷阱一:状态泄漏——Agent 的”记忆串台”

现象

用户 A 的对话中提到了自己的公司名和邮箱,用户 B 在下一轮对话中,Agent 错误地使用了用户 A 的信息。

这不是幻觉。Agent 真的”记住”了不该记住的东西。

根因

LangGraph 的 StateGraph 默认使用可变状态对象。如果你在节点中修改了状态对象(比如向 messages 列表追加消息),而这个状态对象被多个并发请求共享了——恭喜你,串台了。

具体场景:我们有一个客户支持 Agent,用 LangGraph 实现。为了性能优化,我们在部署时复用了同一个 CompiledStateGraph 实例(这是官方文档推荐的做法——编译一次,多次调用)。问题出在自定义节点中:

# ❌ 错误写法——直接修改共享状态
def process_customer_info(state: AgentState):
    # state["customer_info"] 是一个 dict
    # 如果多个请求共享同一个 state 实例,这里就会串台
    state["customer_info"].update(parse_customer_info(state["messages"][-1]))
    return state

在单请求场景下,这没问题。但生产环境中,我们用了 gunicorn 的多 worker 模式,同一个进程内的多个线程可能同时访问同一个 CompiledStateGraph。LangGraph 虽然为每次 invoke 创建了新的状态副本,但我们的自定义节点中对嵌套对象的引用操作导致了浅拷贝泄漏。

复现路径

  1. 创建一个 LangGraph StateGraph,使用 dict 作为状态
  2. 在节点中直接修改嵌套的 dict 对象(而非创建新对象)
  3. 用多线程并发调用同一个 CompiledStateGraph
  4. 观察状态串台

解决方案

深拷贝 + 不可变状态更新模式:

import copy

# ✅ 正确写法——创建新对象,不修改原状态
def process_customer_info(state: AgentState):
    # 深拷贝嵌套对象
    new_customer_info = copy.deepcopy(state.get("customer_info", {}))
    new_customer_info.update(parse_customer_info(state["messages"][-1]))
    # 返回新对象
    return {"customer_info": new_customer_info}

或者,更彻底的做法:使用 TypedDict + dataclass 定义状态,在节点中只返回需要更新的字段(而不是整个状态),让 LangGraph 的 reducer 来处理合并:

from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    customer_info: dict

def process_customer_info(state: AgentState):
    # 只返回需要更新的字段,LangGraph 会用 add_messages 合并 messages
    parsed = parse_customer_info(state["messages"][-1])
    return {"customer_info": parsed}  # 直接替换,不 merge

教训:永远不要在 Agent 节点中直接修改传入的状态对象。 只返回需要更新的字段。


陷阱二:上下文爆炸——Token 预算的无声杀手

现象

Agent 在运行了 20-30 轮对话后,开始:

  • 响应变慢(从 2 秒变成 15 秒)
  • 输出质量下降(开始胡言乱语)
  • 偶尔直接报错(超出模型的最大 context 长度)

根因

大多数教程中的 Agent 示例都只有 3-5 轮对话。没人告诉你,当对话持续下去时,每一轮的 messages 都会追加到上下文中

LangGraph 的 add_messages reducer 默认追加所有历史消息。如果你用的是 GPT-4(128K context),看起来很大——但每条消息包含:

  • 系统 prompt(可能 2-3K tokens)
  • 工具定义(每个工具 200-500 tokens)
  • 历史对话(每轮 500-2000 tokens)
  • 工具调用结果(每个可能 1-5K tokens)

20 轮对话后,你的上下文轻松突破 50K tokens。30 轮后接近 80K。而且模型在长上下文下的注意力分配效率会下降——即使没有超出 context 窗口限制,输出质量也会恶化。

复现路径

  1. 创建一个带工具调用的 Agent
  2. 定义 5-8 个工具(每个工具有详细的 description)
  3. 与 Agent 进行 30+ 轮多工具调用的对话
  4. 观察 token 消耗和响应质量

解决方案

我们实现了一个三层上下文管理系统

from langchain_core.messages import trim_messages
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

def compress_context(state: AgentState, max_tokens: int = 8000) -> AgentState:
    """三层上下文压缩策略"""
    messages = state["messages"]
    
    # 第一层:保留系统 prompt 和最近 N 轮对话
    recent_messages = messages[-6:]  # 最近 6 条消息(3 轮)
    
    # 第二层:对早期对话做摘要压缩
    early_messages = messages[:-6]
    if len(early_messages) > 0:
        # 用一个轻量模型(如 GPT-3.5-Turbo)做摘要
        summary = generate_summary(early_messages)
        summary_msg = SystemMessage(content=f"【历史对话摘要】{summary}")
        messages = [messages[0], summary_msg] + recent_messages  # 系统 prompt + 摘要 + 最近对话
    else:
        messages = [messages[0]] + recent_messages
    
    # 第三层:trim 超长消息
    messages = trim_messages(
        messages,
        max_tokens=max_tokens,
        strategy="last",
        token_counter=count_tokens,
        include_system=True,
    )
    
    return {"messages": messages}

关键设计决策:

  1. 什么时候触发压缩? 不是”超出 context 才压缩”,而是设置了 token 预算阈值(比如 60% context 窗口时就触发),留出 buffer。
  2. 摘要用什么模型? 用便宜的模型(GPT-3.5-Turbo 或 Qwen3.6-7B)做摘要,成本只有主模型的 1/10。
  3. 保留什么,丢弃什么? 系统 prompt 永远保留。最近 3 轮完整保留。早期对话只保留摘要。工具调用结果只保留”关键输出”(去掉大段的 JSON 原始响应)。

效果: 在 50+ 轮对话的场景下,token 消耗降低了 65%,响应速度提升了 3 倍,输出质量没有明显下降。


陷阱三:工具调用死循环——Agent 的”鬼打墙”

现象

Agent 反复调用同一个工具,每次调用返回的结果几乎相同,Agent 不做出任何进展,直到达到 max_iterations 上限或直接耗尽预算。

根因

这通常发生在”工具返回的信息不足以让 Agent 做出决策”的场景。

一个真实案例:我们有一个运维 Agent,需要检查服务器状态。工具 check_server_status 返回 {"status": "unhealthy", "details": "..."}。Agent 收到这个结果后,判断需要”修复”,于是调用 fix_server 工具。但 fix_server 执行后,Agent 又调用 check_server_status 验证——结果还是 unhealthy。于是 Agent 再次调用 fix_server……死循环。

根因是:Agent 缺乏”尝试次数限制”和”策略切换”机制。它不知道同一个修复操作不应该无限制重试。

复现路径

  1. 创建一个 Agent,有两个工具:check_statusfix_issue
  2. fix_issue 在某些情况下无法真正修复问题(模拟真实场景)
  3. 让 Agent 在 check_status 返回 “unhealthy” 时调用 fix_issue
  4. 观察死循环

解决方案

我们实现了一个工具调用限流 + 策略升级机制:

from collections import Counter
from typing import Any

class ToolCallGuard:
    """工具调用守卫——防止死循环和滥用"""
    
    def __init__(self, max_same_tool: int = 3, max_total_calls: int = 15):
        self.max_same_tool = max_same_tool
        self.max_total_calls = max_total_calls
        self.call_history: list[dict[str, Any]] = []
    
    def check(self, tool_name: str, tool_input: dict) -> tuple[bool, str]:
        """检查是否允许调用这个工具"""
        # 检查 1:同一个工具调用次数限制
        same_tool_count = sum(1 for h in self.call_history if h["tool"] == tool_name)
        if same_tool_count >= self.max_same_tool:
            return False, f"工具 {tool_name} 已调用 {same_tool_count} 次,达到上限"
        
        # 检查 2:总调用次数限制
        if len(self.call_history) >= self.max_total_calls:
            return False, f"总工具调用次数已达到上限 {self.max_total_calls}"
        
        # 检查 3:循环检测——同样的工具 + 同样的参数
        recent = self.call_history[-3:] if len(self.call_history) >= 3 else self.call_history
        if any(h["tool"] == tool_name and h["input"] == tool_input for h in recent):
            return False, f"检测到重复调用:{tool_name}({tool_input})"
        
        return True, "OK"
    
    def record(self, tool_name: str, tool_input: dict, result: Any):
        """记录工具调用"""
        self.call_history.append({
            "tool": tool_name,
            "input": tool_input,
            "result_summary": str(result)[:200],  # 只记录摘要
        })
    
    def get_strategy_suggestion(self) -> str:
        """基于调用历史给出策略建议"""
        tool_counts = Counter(h["tool"] for h in self.call_history)
        most_called = tool_counts.most_common(1)[0]
        
        if most_called[1] >= 2:
            return (f"工具 {most_called[0]} 已被调用 {most_called[1]} 次。"
                    f"建议:切换策略或请求人工介入。")
        return "继续当前策略"

# 在 Agent 节点中使用
def agent_node(state: AgentState, guard: ToolCallGuard):
    # ... Agent 决定调用某个工具
    tool_name, tool_input = agent_decision()
    
    allowed, reason = guard.check(tool_name, tool_input)
    if not allowed:
        # 降级策略
        return {
            "messages": [AIMessage(content=f"无法继续自动处理:{reason}{guard.get_strategy_suggestion()}")],
            "requires_human": True,
        }
    
    # 执行工具调用
    result = execute_tool(tool_name, tool_input)
    guard.record(tool_name, tool_input, result)
    # ...

效果: 在我们的 12 个项目中,工具调用死循环的发生率从最初的 23% 降到了 0.4%。


陷阱四:幻觉输出——Agent 的”自信胡说”

现象

Agent 在回答用户问题时,编造了不存在的数据、引用了不存在的文档、甚至捏造了工具调用的结果。

最危险的是:Agent 对自己的幻觉输出表现出高度自信,甚至会用”根据文档第 X 章第 Y 节”这样的措辞来增强可信度。

根因

这不是模型的错——这是 Agent 架构的缺陷。

标准的 Agent 流程是:

  1. 用户提问
  2. Agent 决定是否需要调用工具
  3. 调用工具,获取结果
  4. 根据工具结果生成回答

问题出在第 2 步:Agent 可能错误地判断”不需要调用工具”,然后直接用自身的知识(包含大量训练数据中的过期、错误信息)来回答。或者在第 4 步,Agent 在整合工具结果时,混入了自身的”知识”。

复现路径

  1. 创建一个带知识库检索的 QA Agent
  2. 问一个知识库中没有但模型训练数据中有的问题(比如一个虚构的产品名称)
  3. 观察 Agent 是否调用检索工具,还是直接用自身知识回答

解决方案

我们实现了一个输出校验管道

from langchain_core.messages import HumanMessage, AIMessage, ToolMessage

def validate_output(state: AgentState) -> AgentState:
    """输出校验——检测幻觉并纠正"""
    last_message = state["messages"][-1]
    if not isinstance(last_message, AIMessage):
        return state
    
    response = last_message.content
    
    # 校验 1:事实性检查——回答中的关键事实是否在工具结果中
    tool_results = [m.content for m in state["messages"] if isinstance(m, ToolMessage)]
    tool_content = " ".join(tool_results)
    
    # 提取回答中的关键实体(公司名、产品名、数字等)
    entities = extract_entities(response)
    
    unverifiable = []
    for entity in entities:
        if entity not in tool_content and entity not in state.get("known_facts", []):
            unverifiable.append(entity)
    
    if unverifiable:
        # 发现不可验证的实体——可能是幻觉
        correction_prompt = f"""
        你的回答中包含了以下无法从工具结果中验证的实体:{', '.join(unverifiable)}
        
        请重新生成回答,严格遵守以下规则:
        1. 只使用工具结果中的信息
        2. 如果工具结果中没有某个信息,明确说明"无法确认"
        3. 不要编造任何数据、引用或细节
        """
        return {
            "messages": state["messages"] + [
                HumanMessage(content=correction_prompt),
            ],
            "needs_regeneration": True,
        }
    
    # 校验 2:置信度评分——要求模型给自己的回答打分
    confidence_prompt = """
    请基于以下标准,对你的上一个回答给出 0-10 的置信度评分:
    - 10分:所有信息都有工具结果或已知事实支持
    - 5-9分:大部分信息有支持,少数细节不确定
    - 1-4分:信息主要来自推测或通用知识
    - 0分:没有可用信息,完全不确定
    
    只输出数字。
    """
    # ... 调用轻量模型做置信度评估
    confidence = get_confidence_score(state["messages"], confidence_prompt)
    
    if confidence < 5:
        return {
            "messages": state["messages"] + [
                AIMessage(content="抱歉,我目前无法提供足够准确的答案。建议您提供更多信息或联系人工客服。"),
            ],
            "low_confidence": True,
        }
    
    return state

关键设计决策:

  1. 事实性检查不是”全有或全无”。 我们允许模型使用”通用知识”(比如编程语言的语法、公开的 API 文档),但要求对”具体事实”(如产品价格、公司政策、内部数据)严格引用工具结果。
  2. 置信度阈值可调。 不同场景需要不同的阈值:客户支持场景阈值设为 7(宁可说不知道也不要乱说),代码生成场景阈值设为 4(允许一定程度的推测)。

陷阱五:并发竞态——多个 Agent 的”资源争夺”

现象

当系统同时处理多个 Agent 请求时:

  • 某些请求的工具调用被”吞掉”(执行了但没收到结果)
  • 某些请求的上下文混入了其他请求的数据
  • 偶尔出现数据库写入冲突

根因

Agent 系统中的工具调用通常是异步的。如果多个 Agent 实例同时调用同一个外部服务(比如数据库、API),且这些服务没有做好隔离,就会出现竞态条件。

一个典型案例:我们有一个数据分析 Agent,它调用内部的数据库查询工具。当 5 个 Agent 实例同时查询时,数据库连接池耗尽,某些查询超时。Agent 框架(LangGraph)在超时后尝试重试,但重试时使用了过期的连接对象,导致查询结果错乱。

复现路径

  1. 创建一个 Agent,使用数据库查询工具
  2. 用 locust 或 ab 并发发送 50+ 请求
  3. 观察是否有结果错乱、超时、连接泄漏

解决方案

我们实现了一个工具调用队列 + 连接池管理方案:

import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator

class ToolCallQueue:
    """工具调用队列——控制并发,避免资源争夺"""
    
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.active_calls: dict[str, asyncio.Task] = {}
    
    async def execute(self, tool_name: str, tool_input: dict, func) -> Any:
        """通过队列执行工具调用"""
        call_id = generate_call_id()
        
        async with self.semaphore:
            try:
                task = asyncio.create_task(func(tool_input))
                self.active_calls[call_id] = task
                
                # 设置超时
                result = await asyncio.wait_for(task, timeout=30.0)
                return result
            except asyncio.TimeoutError:
                # 超时处理:取消任务,返回降级结果
                task.cancel()
                return {
                    "error": "tool_timeout",
                    "message": f"工具 {tool_name} 执行超时(30s)",
                    "fallback": get_fallback_for_tool(tool_name),
                }
            finally:
                self.active_calls.pop(call_id, None)

# 数据库连接池管理
@asynccontextmanager
async def get_db_connection() -> AsyncGenerator[Connection, None]:
    """安全的数据库连接获取"""
    conn = await connection_pool.acquire()
    try:
        yield conn
    except Exception as e:
        # 连接异常时,标记连接为"不健康",不再放回池中
        await connection_pool.mark_unhealthy(conn)
        raise
    finally:
        # 只有在没有异常时才放回池中
        if not connection_pool.is_unhealthy(conn):
            await connection_pool.release(conn)

关键设计决策:

  1. 信号量限制并发,而不是无限并发。 根据后端服务的承载能力设置合理的并发上限(数据库通常 10-20,外部 API 根据 rate limit 设置)。
  2. 超时是必须的,不是可选的。 每个工具调用都必须有超时设置。我们默认 30 秒,数据库查询默认 10 秒。
  3. 降级策略预定义。 每个工具都定义了 fallback 行为:超时后返回什么、连接失败后返回什么、API 限流后返回什么。

陷阱六:可观测性缺失——出了问题找不到原因

现象

Agent 在生产环境中输出了错误结果,但你无法回答:

  • 是哪一步出的问题?
  • 工具调用返回了什么?
  • Agent 是怎么做决策的?
  • 这个错误是偶发的还是系统性的?

根因

传统的 debug 方式(print、logging)对 Agent 系统几乎无效。Agent 的执行路径是动态的——每一步做什么取决于上一步的结果,不是预定义的代码路径。

一个 print 只能告诉你”执行到了这里”,但无法告诉你”Agent 为什么走到了这里”。

解决方案

我们建立了一个四层可观测性体系

import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional

@dataclass
class AgentTrace:
    """Agent 执行轨迹"""
    trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    start_time: float = field(default_factory=time.time)
    steps: list[dict[str, Any]] = field(default_factory=list)
    total_tokens: int = 0
    total_cost: float = 0.0
    final_state: str = "unknown"  # success, error, timeout, human_interrupted
    
    def record_step(self, step_type: str, details: dict):
        self.steps.append({
            "type": step_type,
            "timestamp": time.time(),
            "elapsed": time.time() - self.start_time,
            **details,
        })
    
    def to_dict(self) -> dict:
        return {
            "trace_id": self.trace_id,
            "start_time": self.start_time,
            "duration": time.time() - self.start_time,
            "step_count": len(self.steps),
            "steps": self.steps,
            "total_tokens": self.total_tokens,
            "total_cost": self.total_cost,
            "final_state": self.final_state,
        }

# 在 LangGraph 中使用
from langchain_core.callbacks import BaseCallbackHandler

class AgentTracingCallback(BaseCallbackHandler):
    """Agent 执行轨迹回调"""
    
    def __init__(self, trace: AgentTrace):
        self.trace = trace
    
    def on_llm_start(self, serialized: dict, prompts: list[str], **kwargs):
        self.trace.record_step("llm_start", {
            "model": serialized.get("name", "unknown"),
            "prompt_length": sum(len(p) for p in prompts),
        })
    
    def on_llm_end(self, response, **kwargs):
        self.trace.record_step("llm_end", {
            "completion_tokens": response.llm_output.get("token_usage", {}).get("completion_tokens", 0),
            "prompt_tokens": response.llm_output.get("token_usage", {}).get("prompt_tokens", 0),
        })
        self.trace.total_tokens += response.llm_output.get("token_usage", {}).get("total_tokens", 0)
    
    def on_tool_start(self, serialized: dict, input_str: str, **kwargs):
        self.trace.record_step("tool_start", {
            "tool_name": serialized.get("name", "unknown"),
            "input_preview": input_str[:200],
        })
    
    def on_tool_end(self, output: str, **kwargs):
        self.trace.record_step("tool_end", {
            "output_preview": output[:500],
        })
    
    def on_chain_error(self, error: Exception, **kwargs):
        self.trace.record_step("error", {
            "error_type": type(error).__name__,
            "error_message": str(error),
        })
        self.trace.final_state = "error"

# 使用
def run_agent_with_tracing(query: str) -> tuple[str, AgentTrace]:
    trace = AgentTrace()
    callback = AgentTracingCallback(trace)
    
    result = compiled_graph.invoke(
        {"messages": [HumanMessage(content=query)]},
        config={"callbacks": [callback]},
    )
    
    trace.final_state = "success"
    trace.record_step("complete", {
        "response_preview": result["messages"][-1].content[:200],
    })
    
    # 存储到可观测性后端(如 Jaeger、LangSmith、或自建 ClickHouse)
    store_trace(trace.to_dict())
    
    return result["messages"][-1].content, trace

四层可观测性:

  1. Trace 层:每次 Agent 执行的完整轨迹(上面的代码)。记录每一步的时间、输入、输出、token 消耗。
  2. Metrics 层:聚合指标——成功率、P99 延迟、平均 token 消耗、工具调用失败率。用 Prometheus + Grafana 可视化。
  3. Alert 层:基于规则的告警——如果连续 5 个请求失败,或者 P99 延迟超过 30 秒,或者单个请求 token 消耗超过 50K,触发告警。
  4. Replay 层:最关键的——基于 trace 重新执行。当你拿到一个失败请求的 trace,可以用它重现整个执行过程,逐步查看每一步的输入输出,精确定位问题。

效果: 问题定位时间从平均 4 小时缩短到 15 分钟。


陷阱七:成本失控——Token 账单的”惊喜”

现象

第一个月:$200。 第二个月:$2,000。 第三个月:$8,000。

没有人预料到 Agent 系统的 token 消耗会呈指数级增长。

根因

Agent 系统的 token 消耗不是线性的,而是指数级的:

  1. 上下文膨胀:每轮对话的上下文都在增长,每轮的 token 消耗都在增加
  2. 工具调用:每次工具调用都消耗额外的 input + output tokens
  3. 自我修正:Agent 发现自己的输出有问题时,会触发额外的 LLM 调用做修正
  4. 重试:工具调用失败后的重试,每次重试都消耗 tokens

一个真实的成本拆解(我们的运维 Agent 项目):

消耗项单次请求日均请求月消耗
主 LLM 调用(决策)3,000 tokens5,000450M tokens
工具调用(平均 3 次/请求)4,500 tokens5,000675M tokens
上下文压缩(轻量模型)1,000 tokens5,000150M tokens
输出校验(轻量模型)800 tokens2,00048M tokens
总计9,300 tokens1.32B tokens

按 GPT-4 的价格($10/M input tokens, $30/M output tokens),这个项目的月成本是 $4,000-8,000

解决方案

我们做了三件事:

1. Token 预算管理

class TokenBudget:
    """Token 预算管理"""
    
    def __init__(self, max_input_tokens: int = 50_000, max_output_tokens: int = 10_000):
        self.max_input = max_input_tokens
        self.max_output = max_output_tokens
        self.used_input = 0
        self.used_output = 0
    
    def check_budget(self, estimated_input: int, estimated_output: int) -> bool:
        return (self.used_input + estimated_input <= self.max_input and
                self.used_output + estimated_output <= self.max_output)
    
    def consume(self, input_tokens: int, output_tokens: int):
        self.used_input += input_tokens
        self.used_output += output_tokens
    
    def remaining(self) -> dict:
        return {
            "input": self.max_input - self.used_input,
            "output": self.max_output - self.used_output,
        }

# 在 Agent 中使用——当预算不足时,切换到更便宜的模型
def run_with_budget(state: AgentState, budget: TokenBudget):
    estimated_input = count_tokens(state["messages"])
    estimated_output = 2000  # 预估输出
    
    if not budget.check_budget(estimated_input, estimated_output):
        # 预算不足——降级到轻量模型
        return run_with_cheaper_model(state)
    
    # 正常执行
    result = run_with_main_model(state)
    budget.consume(estimated_input, count_tokens(result))
    return result

2. 模型路由

不是所有任务都需要 GPT-4。我们建立了一个模型路由规则:

任务类型模型成本比例
复杂推理/决策GPT-4 / Claude 3.5100%
代码生成Qwen3.6-72B(自部署)5%
上下文压缩/摘要GPT-3.5-Turbo10%
输出校验/格式化GPT-3.5-Turbo10%
简单分类/路由本地小模型(7B)1%

通过模型路由,总成本降低了 60%

3. 缓存

对相同或相似的请求做缓存。Agent 系统中,很多请求是重复的(比如”检查服务器 X 的状态”),如果上次检查不到 5 分钟前,直接返回缓存结果,不调用 LLM。

from functools import lru_cache
import hashlib

def get_cache_key(query: str, context: str) -> str:
    content = f"{query}:{context}"
    return hashlib.md5(content.encode()).hexdigest()

# 带 TTL 的缓存
from cachetools import TTLCache

agent_cache = TTLCache(maxsize=1000, ttl=300)  # 5 分钟 TTL

def run_agent_cached(query: str, context: str) -> str:
    key = get_cache_key(query, context)
    if key in agent_cache:
        return agent_cache[key]
    
    result = run_agent(query, context)
    agent_cache[key] = result
    return result

效果: 缓存命中率 35%,意味着 35% 的请求直接返回缓存,不消耗任何 LLM tokens。


总结:从 Demo 到生产的 Checklist

经过 12 个项目、2000 小时的踩坑,我们总结了一份AI Agent 生产环境 Checklist

上线前必须检查

  • 状态隔离:每个请求使用独立的状态对象,不共享可变状态
  • 上下文管理:实现三层压缩策略,设置 token 预算阈值
  • 工具调用守卫:防止死循环、重复调用、参数注入
  • 输出校验:事实性检查 + 置信度评估
  • 并发控制:工具调用队列 + 连接池管理 + 超时设置
  • 可观测性:Trace + Metrics + Alert + Replay 四层体系
  • 成本控制:Token 预算管理 + 模型路由 + 缓存

上线后持续监控

  • 成功率:低于 95% 触发告警
  • P99 延迟:超过 30 秒触发告警
  • Token 消耗:单日超过预算 120% 触发告警
  • 工具调用失败率:超过 5% 触发告警
  • 幻觉率:定期抽检,幻觉率超过 2% 需要重新校准

给 Agent 开发者的三个忠告

  1. Demo 跑通只是 10%。 剩下的 90% 是处理边界情况、做降级策略、建可观测性。不要被客户的”哇,太酷了”冲昏头脑。
  2. 每一个 LLM 调用都要假设它可能失败。 模型会超时、会返回错误格式、会产生幻觉。你的系统必须能优雅地处理这些情况。
  3. 可观测性不是”锦上添花”,是”生命线”。 没有可观测性的 Agent 系统就像没有仪表盘的汽车——能开,但你不知道什么时候会抛锚。

这 7 个陷阱是我们 12 个项目踩过的真实坑。每个解决方案都经过了生产环境验证。如果你在生产 Agent 时遇到了其他坑,欢迎在评论区分享——我们一起补充这份清单。

下一篇我们会分享:如何搭建 Agent 系统的自动化测试框架,让你在上线前就发现 80% 的潜在问题。