AI AinoCode AI 工具与基础设施
AI教程 4 分钟

MCP Gateway 实战:把企业遗留系统(ERP/CRM/OA)安全暴露给 AI Agent

完整架构:鉴权层(API Key→OAuth 2.0)+ 限流层 + 审计层 + MCP Server,用 SAP/飞书审批/钉钉日程 3 个真实场景演示 Legacy→Agent 的桥接方案。

AinoCode 编辑部

MCP Gateway 企业遗留系统集成架构

MCP Gateway 实战:把企业遗留系统(ERP/CRM/OA)安全暴露给 AI Agent

企业里 90% 的核心业务系统——ERP、CRM、OA、HR、财务——都不是为 AI Agent 设计的。

它们有 SOAP 接口、有 XML 报文、有鉴权逻辑、有复杂的权限模型。直接把一个 MCP Server 对接到 SAP 上,等于给 AI Agent 开了一扇没有锁的后门

MCP Gateway 要解决的问题就是:在遗留系统和 AI Agent 之间,加一层可控、可审计、可回滚的安全桥接层。

本文完整实现一个生产级 MCP Gateway,覆盖三个真实场景:

  1. SAP ERP 对接:库存查询 + 采购订单创建
  2. 飞书审批流对接:审批状态查询 + 催办
  3. 钉钉日程对接:日程创建 + 冲突检测

一、架构总览

┌─────────────────────────────────────────────────────┐
│                   AI Agent (Client)                  │
│         (Claude / GPT / 自研 Agent Framework)        │
└──────────────────────┬──────────────────────────────┘
                       │ MCP Protocol (stdio / SSE)

┌─────────────────────────────────────────────────────┐
│                  MCP Gateway                         │
│                                                      │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐          │
│  │ 鉴权层    │→│ 限流层    │→│ 审计层    │          │
│  │ API Key   │  │ Rate     │  │ Audit    │          │
│  │ OAuth 2.0 │  │ Limiter  │  │ Log      │          │
│  └──────────┘  └──────────┘  └──────────┘          │
│                       │                              │
│  ┌──────────────────────────────────────────────┐   │
│  │            Tool Router                        │   │
│  │  sap_inventory    │  feishu_approval          │   │
│  │  sap_purchase     │  feishu_remind            │   │
│  │  dingtalk_create  │  dingtalk_conflict         │   │
│  └──────────────────┬───────────────────────────┘   │
│                       │                              │
│  ┌──────────────────────────────────────────────┐   │
│  │        Adapter Layer(协议转换)               │   │
│  │  SAP RFC → JSON  │  飞书 Open API → JSON      │   │
│  │  SOAP → REST     │  钉钉 API → JSON           │   │
│  └──────────────────┬───────────────────────────┘   │
└──────────────────────┬──────────────────────────────┘
                       │ HTTP / RFC / SDK

┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│   SAP ERP   │  │  飞书开放平台 │  │  钉钉开放平台 │
│   (RFC/BAPI)│  │  (Open API) │  │  (Open API) │
└─────────────┘  └─────────────┘  └─────────────┘

核心设计原则:

  • 最小权限:每个 Agent 只拿到它需要的 Tool 权限
  • 可审计:每次调用都记录谁、什么时候、调了什么、结果如何
  • 可回滚:写操作(创建订单、修改审批)都有 dry-run 模式和撤销机制
  • 协议无关:上游统一 MCP 协议,下游可以是 RFC、SOAP、REST、GraphQL

二、核心实现:MCP Gateway

2.1 基础框架

我们用 Python + mcp SDK(官方 MCP Server SDK)实现:

# gateway/server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import json
from datetime import datetime

app = Server("ainocode-mcp-gateway")

# ========== 鉴权层 ==========

class AuthManager:
    def __init__(self):
        self.api_keys: dict[str, dict] = {}  # key → {agent_id, permissions, expires}
        self.oauth_clients: dict[str, dict] = {}  # client_id → config
    
    def validate_api_key(self, key: str) -> dict | None:
        """验证 API Key 并返回 agent 上下文"""
        record = self.api_keys.get(key)
        if not record:
            return None
        if datetime.now() > record["expires"]:
            return None
        return record
    
    def validate_oauth_token(self, token: str, scopes: list[str]) -> dict | None:
        """验证 OAuth 2.0 token 并检查 scope"""
        # 实际实现:调用 OAuth Provider 的 introspection endpoint
        pass

auth = AuthManager()

# ========== 审计层 ==========

class AuditLogger:
    def __init__(self):
        self.log_path = "/var/log/mcp-gateway/audit.log"
    
    async def log(self, agent_id: str, tool_name: str, 
                  params: dict, result: dict, status: str):
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "agent_id": agent_id,
            "tool": tool_name,
            "params": mask_sensitive(params),
            "result_summary": summarize(result),
            "status": status,
        }
        async with aiofiles.open(self.log_path, "a") as f:
            await f.write(json.dumps(entry, ensure_ascii=False) + "\n")

auditor = AuditLogger()

# ========== 限流层 ==========

class RateLimiter:
    def __init__(self):
        self.requests: dict[str, list[float]] = {}  # agent_id → [timestamps]
        self.limits = {
            "default": {"rpm": 60, "burst": 10},
            "premium": {"rpm": 600, "burst": 50},
        }
    
    async def check(self, agent_id: str, tier: str = "default") -> bool:
        now = asyncio.get_event_loop().time()
        limit = self.limits[tier]
        
        if agent_id not in self.requests:
            self.requests[agent_id] = []
        
        # 滑动窗口:只保留最近 60 秒的请求
        self.requests[agent_id] = [
            t for t in self.requests[agent_id] if now - t < 60
        ]
        
        if len(self.requests[agent_id]) >= limit["rpm"]:
            return False
        
        self.requests[agent_id].append(now)
        return True

limiter = RateLimiter()

2.2 Tool 注册与路由

# 注册所有可用的 Tools
@app.list_tools()
async def list_tools() -> list[Tool]:
    """返回当前 Agent 有权限访问的 Tools"""
    return [
        Tool(
            name="sap_inventory_query",
            description="查询 SAP ERP 中的物料库存(只读)",
            inputSchema={
                "type": "object",
                "properties": {
                    "material_number": {
                        "type": "string",
                        "description": "物料编码,如 'MAT-001234'"
                    },
                    "plant": {
                        "type": "string",
                        "description": "工厂代码,如 '1000'"
                    }
                },
                "required": ["material_number"]
            }
        ),
        Tool(
            name="sap_purchase_order_create",
            description="在 SAP 中创建采购订单(写操作,需审批流确认)",
            inputSchema={
                "type": "object",
                "properties": {
                    "vendor": {"type": "string"},
                    "material": {"type": "string"},
                    "quantity": {"type": "number"},
                    "dry_run": {
                        "type": "boolean",
                        "description": "True=只校验不提交,False=实际创建"
                    }
                },
                "required": ["vendor", "material", "quantity"]
            }
        ),
        Tool(
            name="feishu_approval_status",
            description="查询飞书审批单状态",
            inputSchema={
                "type": "object",
                "properties": {
                    "approval_code": {"type": "string"},
                },
                "required": ["approval_code"]
            }
        ),
        Tool(
            name="feishu_approval_remind",
            description="向审批人发送催办通知(写操作)",
            inputSchema={
                "type": "object",
                "properties": {
                    "approval_code": {"type": "string"},
                    "message": {"type": "string", "default": "请尽快处理"}
                },
                "required": ["approval_code"]
            }
        ),
        Tool(
            name="dingtalk_schedule_create",
            description="在钉钉日历创建日程",
            inputSchema={
                "type": "object",
                "properties": {
                    "summary": {"type": "string"},
                    "start_time": {"type": "string", "format": "date-time"},
                    "end_time": {"type": "string", "format": "date-time"},
                    "attendees": {"type": "array", "items": {"type": "string"}},
                    "check_conflict": {"type": "boolean", "default": True}
                },
                "required": ["summary", "start_time", "end_time"]
            }
        ),
        Tool(
            name="dingtalk_schedule_conflict",
            description="检查指定时间段是否有日程冲突",
            inputSchema={
                "type": "object",
                "properties": {
                    "user_id": {"type": "string"},
                    "start_time": {"type": "string", "format": "date-time"},
                    "end_time": {"type": "string", "format": "date-time"}
                },
                "required": ["user_id", "start_time", "end_time"]
            }
        ),
    ]

2.3 Tool 调用处理

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """处理 Tool 调用请求"""
    # 1. 从请求上下文获取 agent_id(通过 API Key 或 OAuth)
    agent_id = get_current_agent_id()
    
    # 2. 限流检查
    if not await limiter.check(agent_id):
        return [TextContent(
            type="text",
            text=json.dumps({"error": "Rate limit exceeded", "retry_after": 60})
        )]
    
    # 3. 审计日志(调用前)
    await auditor.log(agent_id, name, arguments, {}, "start")
    
    try:
        # 4. 路由到对应 Handler
        result = await ROUTERS[name](arguments)
        
        # 5. 审计日志(成功)
        await auditor.log(agent_id, name, arguments, result, "success")
        
        return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
    
    except ToolError as e:
        await auditor.log(agent_id, name, arguments, {"error": str(e)}, "error")
        return [TextContent(type="text", text=json.dumps({"error": str(e)}))]

# Tool 路由表
ROUTERS = {
    "sap_inventory_query": sap_handlers.inventory_query,
    "sap_purchase_order_create": sap_handlers.purchase_order_create,
    "feishu_approval_status": feishu_handlers.approval_status,
    "feishu_approval_remind": feishu_handlers.approval_remind,
    "dingtalk_schedule_create": dingtalk_handlers.schedule_create,
    "dingtalk_schedule_conflict": dingtalk_handlers.schedule_conflict,
}

三、场景一:SAP ERP 对接

3.1 架构

SAP ERP 通常通过 RFC/BAPI 接口对外提供服务。我们需要:

  1. pyrfc 连接 SAP 系统
  2. 把 BAPI 调用封装为 MCP Tool
  3. 对写操作强制 dry-run + 二次确认
# gateway/adapters/sap.py
from pyrfc import Connection
import os

class SAPAdapter:
    def __init__(self):
        self.conn = Connection(
            ashost=os.getenv("SAP_HOST"),
            sysnr=os.getenv("SAP_SYSNR", "00"),
            client=os.getenv("SAP_CLIENT", "100"),
            user=os.getenv("SAP_USER"),
            passwd=os.getenv("SAP_PASSWORD"),
        )
    
    async def inventory_query(self, material_number: str, plant: str = "1000") -> dict:
        """调用 BAPI: BAPI_MATERIAL_STOCK_REQ_LIST"""
        result = self.conn.call(
            "BAPI_MATERIAL_STOCK_REQ_LIST",
            MATERIAL=material_number,
            PLANT=plant,
        )
        
        if result.get("RETURN", {}).get("TYPE") == "E":
            raise ToolError(f"SAP Error: {result['RETURN']['MESSAGE']}")
        
        # 解析库存数据
        stock_data = []
        for item in result.get("STOCK_LIST", []):
            stock_data.append({
                "material": item["MATERIAL"],
                "plant": item["PLANT"],
                "storage_location": item["STGE_LOC"],
                "batch": item["BATCH"],
                "available_qty": item["AVAILABLE_QTY"],
                "unit": item["BASE_UOM"],
            })
        
        return {
            "material_number": material_number,
            "plant": plant,
            "stock_items": stock_data,
            "query_time": datetime.utcnow().isoformat(),
        }
    
    async def purchase_order_create(
        self, vendor: str, material: str, quantity: float, dry_run: bool = True
    ) -> dict:
        """调用 BAPI: BAPI_PO_CREATE1"""
        if dry_run:
            # Dry run:只做参数校验,不提交
            return {
                "status": "dry_run",
                "message": "参数校验通过,未实际创建订单",
                "validation": {
                    "vendor_valid": self._validate_vendor(vendor),
                    "material_valid": self._validate_material(material),
                    "quantity_valid": quantity > 0 and quantity < 999999,
                },
                "preview": {
                    "vendor": vendor,
                    "material": material,
                    "quantity": quantity,
                    "estimated_total": self._estimate_cost(material, quantity),
                },
            }
        
        # 实际创建(需要额外审批确认)
        po_header = {
            "COMP_CODE": "1000",
            "DOC_TYPE": "NB",
            "VENDOR": vendor,
            "PURCH_ORG": "1000",
            "PUR_GROUP": "001",
        }
        
        po_items = [{
            "PO_ITEM": "00010",
            "MATERIAL": material,
            "QUANTITY": quantity,
            "PLANT": "1000",
        }]
        
        result = self.conn.call(
            "BAPI_PO_CREATE1",
            POHEADER=po_header,
            POITEM=po_items,
        )
        
        if result.get("RETURN", [{}])[0].get("TYPE") == "E":
            raise ToolError(f"SAP Error: {result['RETURN'][0]['MESSAGE']}")
        
        return {
            "status": "created",
            "po_number": result.get("EXPPURCHASEORDER"),
            "vendor": vendor,
            "material": material,
            "quantity": quantity,
        }
    
    def _validate_vendor(self, vendor: str) -> bool:
        """校验供应商是否存在"""
        result = self.conn.call("BAPI_VENDOR_GETDETAIL", VENDOR=vendor)
        return result.get("RETURN", {}).get("TYPE") != "E"
    
    def _validate_material(self, material: str) -> bool:
        """校验物料是否存在"""
        result = self.conn.call("BAPI_MATERIAL_GET_DETAIL", MATERIAL=material)
        return result.get("RETURN", {}).get("TYPE") != "E"
    
    def _estimate_cost(self, material: str, quantity: float) -> float:
        """获取物料标准价格估算总价"""
        result = self.conn.call(
            "BAPI_MATERIAL_GET_PRICE", MATERIAL=material
        )
        price = result.get("PRICE", 0)
        return round(price * quantity, 2)

sap_adapter = SAPAdapter()

3.2 Agent 调用示例

Agent 的完整对话流程:

User: "帮我查一下物料 MAT-001234 在 1000 工厂的库存"

Agent: 调用 sap_inventory_query(material_number="MAT-001234", plant="1000")
      → 返回: {"stock_items": [{"available_qty": 150, "unit": "PC"}]}

Agent: "物料 MAT-001234 在 1000 工厂当前可用库存为 150 PC。"

User: "库存不够,帮我创建一个采购订单,供应商 V-005678,采购 500 个"

Agent: 先调 dry_run 校验:
      sap_purchase_order_create(
          vendor="V-005678", material="MAT-001234", 
          quantity=500, dry_run=True
      )
      → 返回: {"status": "dry_run", "preview": {"estimated_total": 25000.00}}

Agent: "采购订单预检通过:供应商 V-005678,物料 MAT-001234,500 个,预估总价 ¥25,000。确认创建吗?"

User: "确认"

Agent: 实际创建:
      sap_purchase_order_create(
          vendor="V-005678", material="MAT-001234",
          quantity=500, dry_run=False
      )
      → 返回: {"status": "created", "po_number": "4500012345"}

四、场景二:飞书审批流对接

4.1 实现

# gateway/adapters/feishu.py
import httpx
import os
import time

class FeishuAdapter:
    def __init__(self):
        self.app_id = os.getenv("FEISHU_APP_ID")
        self.app_secret = os.getenv("FEISHU_APP_SECRET")
        self.base_url = "https://open.feishu.cn/open-apis"
        self._token = None
        self._token_expires = 0
    
    async def _get_token(self) -> str:
        """获取/刷新 tenant_access_token"""
        if time.time() < self._token_expires - 60:
            return self._token
        
        resp = await httpx.AsyncClient().post(
            f"{self.base_url}/auth/v3/tenant_access_token/internal",
            json={
                "app_id": self.app_id,
                "app_secret": self.app_secret,
            }
        )
        data = resp.json()
        self._token = data["tenant_access_token"]
        self._token_expires = time.time() + data["expire"]
        return self._token
    
    async def approval_status(self, approval_code: str) -> dict:
        """查询审批单状态"""
        token = await self._get_token()
        resp = await httpx.AsyncClient().post(
            f"{self.base_url}/approval/v4/instances",
            headers={"Authorization": f"Bearer {token}"},
            json={"approval_code": approval_code}
        )
        data = resp.json()
        
        instance = data.get("data", {}).get("instance", {})
        return {
            "approval_code": approval_code,
            "status": instance.get("status"),  # PENDING / APPROVED / REJECTED
            "title": instance.get("task_title"),
            "created_at": instance.get("created_time"),
            "current_approver": instance.get("current_node", {}).get("approver"),
            "history": [
                {
                    "node": h["task_title"],
                    "action": h["status"],
                    "operator": h.get("operator_name"),
                    "time": h.get("operate_time"),
                }
                for h in instance.get("history", [])
            ]
        }
    
    async def approval_remind(self, approval_code: str, message: str = "请尽快处理") -> dict:
        """发送催办通知"""
        token = await self._get_token()
        resp = await httpx.AsyncClient().post(
            f"{self.base_url}/approval/v4/instances/{approval_code}/remind",
            headers={"Authorization": f"Bearer {token}"},
            json={"message": message}
        )
        data = resp.json()
        if data.get("code") != 0:
            raise ToolError(f"飞书催办失败: {data.get('msg')}")
        
        return {
            "status": "reminded",
            "approval_code": approval_code,
            "message": message,
        }

feishu_adapter = FeishuAdapter()

4.2 Agent 调用示例

User: "帮我看看审批单 SP-20260513-0042 到哪了"

Agent: feishu_approval_status(approval_code="SP-20260513-0042")
      → 返回: {"status": "PENDING", "current_approver": "张三"}

Agent: "审批单 SP-20260513-0042(Q2 采购预算申请)当前状态:待审批,当前审批人:张三。"

User: "催一下"

Agent: "确认要发送催办通知给张三吗?"
User: "确认"
Agent: feishu_approval_remind(approval_code="SP-20260513-0042", message="请尽快处理")
      → 返回: {"status": "reminded"}

五、场景三:钉钉日程对接

5.1 实现

# gateway/adapters/dingtalk.py
import httpx
import os
from datetime import datetime, timedelta

class DingtalkAdapter:
    def __init__(self):
        self.app_key = os.getenv("DINGTALK_APP_KEY")
        self.app_secret = os.getenv("DINGTALK_APP_SECRET")
        self.base_url = "https://api.dingtalk.com/v1.0"
        self._token = None
    
    async def _get_token(self) -> str:
        if self._token:
            return self._token
        resp = await httpx.AsyncClient().post(
            "https://api.dingtalk.com/v1.0/oauth2/accessToken",
            json={"appKey": self.app_key, "appSecret": self.app_secret}
        )
        self._token = resp.json()["accessToken"]
        return self._token
    
    async def schedule_conflict(
        self, user_id: str, start_time: str, end_time: str
    ) -> dict:
        """检查日程冲突"""
        token = await self._get_token()
        
        # 查询该时间段内的已有日程
        resp = await httpx.AsyncClient().get(
            f"{self.base_url}/calendar/v1/users/{user_id}/calendars/primary/schedules",
            headers={"x-acs-dingtalk-access-token": token},
            params={
                "timeMin": start_time,
                "timeMax": end_time,
            }
        )
        schedules = resp.json().get("data", [])
        
        if schedules:
            conflicts = [
                {
                    "summary": s["summary"],
                    "start": s["start"]["dateTime"],
                    "end": s["end"]["dateTime"],
                    "organizer": s.get("creator", {}).get("displayName"),
                }
                for s in schedules
            ]
            return {
                "has_conflict": True,
                "conflicts": conflicts,
                "conflict_count": len(conflicts),
            }
        
        return {"has_conflict": False, "conflict_count": 0}
    
    async def schedule_create(
        self, summary: str, start_time: str, end_time: str,
        attendees: list[str] = None, check_conflict: bool = True
    ) -> dict:
        """创建日程"""
        if check_conflict:
            conflict_result = await self.schedule_conflict(
                user_id="me", start_time=start_time, end_time=end_time
            )
            if conflict_result["has_conflict"]:
                return {
                    "status": "conflict_detected",
                    "message": f"检测到 {conflict_result['conflict_count']} 个冲突日程",
                    "conflicts": conflict_result["conflicts"],
                }
        
        token = await self._get_token()
        payload = {
            "summary": summary,
            "description": f"由 AI Agent 创建",
            "start": {"dateTime": start_time, "timeZone": "Asia/Shanghai"},
            "end": {"dateTime": end_time, "timeZone": "Asia/Shanghai"},
        }
        if attendees:
            payload["attendees"] = [{"id": a} for a in attendees]
        
        resp = await httpx.AsyncClient().post(
            f"{self.base_url}/calendar/v1/users/me/calendars/primary/schedules",
            headers={"x-acs-dingtalk-access-token": token},
            json=payload,
        )
        data = resp.json()
        
        return {
            "status": "created",
            "schedule_id": data.get("scheduleId"),
            "summary": summary,
            "start_time": start_time,
            "end_time": end_time,
        }

dingtalk_adapter = DingtalkAdapter()

六、安全加固清单

把遗留系统暴露给 AI Agent,安全是底线。以下是必须实现的加固措施:

6.1 鉴权分级

级别操作类型鉴权方式示例
L1只读查询API Key库存查询、状态查询
L2低风险写操作API Key + Dry Run日程创建、审批查询
L3高风险写操作OAuth 2.0 + Dry Run + 二次确认采购订单创建、数据修改
L4不可逆操作OAuth 2.0 + 人工审批删除、财务过账

6.2 参数注入防护

AI Agent 的参数是 LLM 生成的,必须做严格的 schema 校验:

import jsonschema

def validate_tool_input(tool_name: str, params: dict) -> None:
    """用 JSON Schema 校验 Agent 传入的参数"""
    schema = TOOL_SCHEMAS.get(tool_name)
    if not schema:
        raise ToolError(f"Unknown tool: {tool_name}")
    
    try:
        jsonschema.validate(params, schema)
    except jsonschema.ValidationError as e:
        raise ToolError(f"Invalid parameters: {e.message}")

# 额外防护:防止 SQL/命令注入
def sanitize_string(value: str, max_length: int = 256) -> str:
    """清理字符串,防止注入"""
    value = value[:max_length]
    # 禁止 SQL 关键字
    sql_keywords = ["DROP", "DELETE", "INSERT", "UPDATE", "ALTER", "EXEC"]
    for kw in sql_keywords:
        if kw.lower() in value.lower():
            raise ToolError(f"Potentially malicious input detected: {kw}")
    return value

6.3 审计日志

每条审计记录包含:

{
  "timestamp": "2026-05-13T06:00:00Z",
  "agent_id": "agent-sales-001",
  "tool": "sap_purchase_order_create",
  "params": {
    "vendor": "V-005678",
    "material": "MAT-001234",
    "quantity": 500,
    "dry_run": true
  },
  "result_summary": "dry_run, estimated_total=25000.00",
  "status": "success",
  "latency_ms": 245,
  "ip": "10.0.1.50"
}

审计日志写入后不可修改,建议用 append-only 存储或写入区块链/不可变存储。


七、启动与部署

7.1 本地开发

# 安装依赖
pip install mcp pyrfc httpx jsonschema aiofiles

# 环境变量
export SAP_HOST="sap.company.com"
export SAP_USER="mcp_gateway"
export SAP_PASSWORD="***"
export FEISHU_APP_ID="cli_xxx"
export FEISHU_APP_SECRET="***"
export DINGTALK_APP_KEY="ding_xxx"
export DINGTALK_APP_SECRET="***"

# 启动(stdio 模式,供本地 Agent 调试)
python -m gateway.server

# 启动(SSE 模式,供远程 Agent 调用)
python -m gateway.server --transport sse --port 8765

7.2 Docker 部署

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY gateway/ ./gateway/

EXPOSE 8765

CMD ["python", "-m", "gateway.server", "--transport", "sse", "--port", "8765"]

7.3 Agent 端配置

Agent 通过 MCP config 连接到 Gateway:

{
  "mcpServers": {
    "enterprise-gateway": {
      "command": "python",
      "args": ["-m", "gateway.server"],
      "env": {
        "MCP_API_KEY": "your-api-key-here"
      }
    }
  }
}

远程 SSE 模式:

{
  "mcpServers": {
    "enterprise-gateway": {
      "url": "https://gateway.ainocode.cn/sse",
      "headers": {
        "Authorization": "Bearer your-api-key-here"
      }
    }
  }
}

八、踩坑记录

  1. SAP RFC 连接池必须复用。每次调用新建连接在 SAP 端会产生大量对话进程,很快超过 rdisp/max_wprun_time 限制。解决方案:全局单例 Connection + 连接健康检查。

  2. 飞书 token 过期时机比文档说的早。文档说 expire 秒后过期,实测提前 2-3 分钟就失效了。我们在 _get_token 里预留了 60 秒的 buffer。

  3. 钉钉日程 API 的时区陷阱dateTime 字段如果不含 timeZone,钉钉默认用 UTC,导致日程时间偏移 8 小时。必须显式指定 "timeZone": "Asia/Shanghai"

  4. LLM 生成的参数类型不匹配。LLM 有时把 "500"(字符串)当成 quantity 传入,而 JSON Schema 定义的是 number。必须在 validate 层做类型转换或拒绝。

  5. dry_run 不是万能的。SAP 的 dry_run 只做语法校验,不做业务规则校验(比如信用额度、采购限额)。真正的高风险操作需要完整的业务审批流。


九、总结

MCP Gateway 的本质是一个协议翻译器 + 安全网关。它让 AI Agent 能安全、可控地调用企业遗留系统的能力,同时保留了完整的审计追踪。

关键要点:

  • 只读操作优先:先暴露查询类 Tool,验证安全性后再开放写操作
  • dry_run 强制化:所有写操作默认 dry_run,需显式确认才实际执行
  • 审计不可少:每条调用记录不可修改,便于事后追溯
  • 渐进式暴露:从低风险的查询开始,逐步扩展到写操作,不一步到位

本文的完整代码已开源,包含 Docker Compose 部署配置和 Agent 端集成示例。