MCP Gateway 实战:把企业遗留系统(ERP/CRM/OA)安全暴露给 AI Agent
完整架构:鉴权层(API Key→OAuth 2.0)+ 限流层 + 审计层 + MCP Server,用 SAP/飞书审批/钉钉日程 3 个真实场景演示 Legacy→Agent 的桥接方案。
AinoCode 编辑部
MCP Gateway 实战:把企业遗留系统(ERP/CRM/OA)安全暴露给 AI Agent
企业里 90% 的核心业务系统——ERP、CRM、OA、HR、财务——都不是为 AI Agent 设计的。
它们有 SOAP 接口、有 XML 报文、有鉴权逻辑、有复杂的权限模型。直接把一个 MCP Server 对接到 SAP 上,等于给 AI Agent 开了一扇没有锁的后门。
MCP Gateway 要解决的问题就是:在遗留系统和 AI Agent 之间,加一层可控、可审计、可回滚的安全桥接层。
本文完整实现一个生产级 MCP Gateway,覆盖三个真实场景:
- SAP ERP 对接:库存查询 + 采购订单创建
- 飞书审批流对接:审批状态查询 + 催办
- 钉钉日程对接:日程创建 + 冲突检测
一、架构总览
┌─────────────────────────────────────────────────────┐
│ AI Agent (Client) │
│ (Claude / GPT / 自研 Agent Framework) │
└──────────────────────┬──────────────────────────────┘
│ MCP Protocol (stdio / SSE)
▼
┌─────────────────────────────────────────────────────┐
│ MCP Gateway │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 鉴权层 │→│ 限流层 │→│ 审计层 │ │
│ │ API Key │ │ Rate │ │ Audit │ │
│ │ OAuth 2.0 │ │ Limiter │ │ Log │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Tool Router │ │
│ │ sap_inventory │ feishu_approval │ │
│ │ sap_purchase │ feishu_remind │ │
│ │ dingtalk_create │ dingtalk_conflict │ │
│ └──────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Adapter Layer(协议转换) │ │
│ │ SAP RFC → JSON │ 飞书 Open API → JSON │ │
│ │ SOAP → REST │ 钉钉 API → JSON │ │
│ └──────────────────┬───────────────────────────┘ │
└──────────────────────┬──────────────────────────────┘
│ HTTP / RFC / SDK
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ SAP ERP │ │ 飞书开放平台 │ │ 钉钉开放平台 │
│ (RFC/BAPI)│ │ (Open API) │ │ (Open API) │
└─────────────┘ └─────────────┘ └─────────────┘
核心设计原则:
- 最小权限:每个 Agent 只拿到它需要的 Tool 权限
- 可审计:每次调用都记录谁、什么时候、调了什么、结果如何
- 可回滚:写操作(创建订单、修改审批)都有 dry-run 模式和撤销机制
- 协议无关:上游统一 MCP 协议,下游可以是 RFC、SOAP、REST、GraphQL
二、核心实现:MCP Gateway
2.1 基础框架
我们用 Python + mcp SDK(官方 MCP Server SDK)实现:
# gateway/server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import json
from datetime import datetime
app = Server("ainocode-mcp-gateway")
# ========== 鉴权层 ==========
class AuthManager:
def __init__(self):
self.api_keys: dict[str, dict] = {} # key → {agent_id, permissions, expires}
self.oauth_clients: dict[str, dict] = {} # client_id → config
def validate_api_key(self, key: str) -> dict | None:
"""验证 API Key 并返回 agent 上下文"""
record = self.api_keys.get(key)
if not record:
return None
if datetime.now() > record["expires"]:
return None
return record
def validate_oauth_token(self, token: str, scopes: list[str]) -> dict | None:
"""验证 OAuth 2.0 token 并检查 scope"""
# 实际实现:调用 OAuth Provider 的 introspection endpoint
pass
auth = AuthManager()
# ========== 审计层 ==========
class AuditLogger:
def __init__(self):
self.log_path = "/var/log/mcp-gateway/audit.log"
async def log(self, agent_id: str, tool_name: str,
params: dict, result: dict, status: str):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"agent_id": agent_id,
"tool": tool_name,
"params": mask_sensitive(params),
"result_summary": summarize(result),
"status": status,
}
async with aiofiles.open(self.log_path, "a") as f:
await f.write(json.dumps(entry, ensure_ascii=False) + "\n")
auditor = AuditLogger()
# ========== 限流层 ==========
class RateLimiter:
def __init__(self):
self.requests: dict[str, list[float]] = {} # agent_id → [timestamps]
self.limits = {
"default": {"rpm": 60, "burst": 10},
"premium": {"rpm": 600, "burst": 50},
}
async def check(self, agent_id: str, tier: str = "default") -> bool:
now = asyncio.get_event_loop().time()
limit = self.limits[tier]
if agent_id not in self.requests:
self.requests[agent_id] = []
# 滑动窗口:只保留最近 60 秒的请求
self.requests[agent_id] = [
t for t in self.requests[agent_id] if now - t < 60
]
if len(self.requests[agent_id]) >= limit["rpm"]:
return False
self.requests[agent_id].append(now)
return True
limiter = RateLimiter()
2.2 Tool 注册与路由
# 注册所有可用的 Tools
@app.list_tools()
async def list_tools() -> list[Tool]:
"""返回当前 Agent 有权限访问的 Tools"""
return [
Tool(
name="sap_inventory_query",
description="查询 SAP ERP 中的物料库存(只读)",
inputSchema={
"type": "object",
"properties": {
"material_number": {
"type": "string",
"description": "物料编码,如 'MAT-001234'"
},
"plant": {
"type": "string",
"description": "工厂代码,如 '1000'"
}
},
"required": ["material_number"]
}
),
Tool(
name="sap_purchase_order_create",
description="在 SAP 中创建采购订单(写操作,需审批流确认)",
inputSchema={
"type": "object",
"properties": {
"vendor": {"type": "string"},
"material": {"type": "string"},
"quantity": {"type": "number"},
"dry_run": {
"type": "boolean",
"description": "True=只校验不提交,False=实际创建"
}
},
"required": ["vendor", "material", "quantity"]
}
),
Tool(
name="feishu_approval_status",
description="查询飞书审批单状态",
inputSchema={
"type": "object",
"properties": {
"approval_code": {"type": "string"},
},
"required": ["approval_code"]
}
),
Tool(
name="feishu_approval_remind",
description="向审批人发送催办通知(写操作)",
inputSchema={
"type": "object",
"properties": {
"approval_code": {"type": "string"},
"message": {"type": "string", "default": "请尽快处理"}
},
"required": ["approval_code"]
}
),
Tool(
name="dingtalk_schedule_create",
description="在钉钉日历创建日程",
inputSchema={
"type": "object",
"properties": {
"summary": {"type": "string"},
"start_time": {"type": "string", "format": "date-time"},
"end_time": {"type": "string", "format": "date-time"},
"attendees": {"type": "array", "items": {"type": "string"}},
"check_conflict": {"type": "boolean", "default": True}
},
"required": ["summary", "start_time", "end_time"]
}
),
Tool(
name="dingtalk_schedule_conflict",
description="检查指定时间段是否有日程冲突",
inputSchema={
"type": "object",
"properties": {
"user_id": {"type": "string"},
"start_time": {"type": "string", "format": "date-time"},
"end_time": {"type": "string", "format": "date-time"}
},
"required": ["user_id", "start_time", "end_time"]
}
),
]
2.3 Tool 调用处理
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""处理 Tool 调用请求"""
# 1. 从请求上下文获取 agent_id(通过 API Key 或 OAuth)
agent_id = get_current_agent_id()
# 2. 限流检查
if not await limiter.check(agent_id):
return [TextContent(
type="text",
text=json.dumps({"error": "Rate limit exceeded", "retry_after": 60})
)]
# 3. 审计日志(调用前)
await auditor.log(agent_id, name, arguments, {}, "start")
try:
# 4. 路由到对应 Handler
result = await ROUTERS[name](arguments)
# 5. 审计日志(成功)
await auditor.log(agent_id, name, arguments, result, "success")
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
except ToolError as e:
await auditor.log(agent_id, name, arguments, {"error": str(e)}, "error")
return [TextContent(type="text", text=json.dumps({"error": str(e)}))]
# Tool 路由表
ROUTERS = {
"sap_inventory_query": sap_handlers.inventory_query,
"sap_purchase_order_create": sap_handlers.purchase_order_create,
"feishu_approval_status": feishu_handlers.approval_status,
"feishu_approval_remind": feishu_handlers.approval_remind,
"dingtalk_schedule_create": dingtalk_handlers.schedule_create,
"dingtalk_schedule_conflict": dingtalk_handlers.schedule_conflict,
}
三、场景一:SAP ERP 对接
3.1 架构
SAP ERP 通常通过 RFC/BAPI 接口对外提供服务。我们需要:
- 用
pyrfc连接 SAP 系统 - 把 BAPI 调用封装为 MCP Tool
- 对写操作强制 dry-run + 二次确认
# gateway/adapters/sap.py
from pyrfc import Connection
import os
class SAPAdapter:
def __init__(self):
self.conn = Connection(
ashost=os.getenv("SAP_HOST"),
sysnr=os.getenv("SAP_SYSNR", "00"),
client=os.getenv("SAP_CLIENT", "100"),
user=os.getenv("SAP_USER"),
passwd=os.getenv("SAP_PASSWORD"),
)
async def inventory_query(self, material_number: str, plant: str = "1000") -> dict:
"""调用 BAPI: BAPI_MATERIAL_STOCK_REQ_LIST"""
result = self.conn.call(
"BAPI_MATERIAL_STOCK_REQ_LIST",
MATERIAL=material_number,
PLANT=plant,
)
if result.get("RETURN", {}).get("TYPE") == "E":
raise ToolError(f"SAP Error: {result['RETURN']['MESSAGE']}")
# 解析库存数据
stock_data = []
for item in result.get("STOCK_LIST", []):
stock_data.append({
"material": item["MATERIAL"],
"plant": item["PLANT"],
"storage_location": item["STGE_LOC"],
"batch": item["BATCH"],
"available_qty": item["AVAILABLE_QTY"],
"unit": item["BASE_UOM"],
})
return {
"material_number": material_number,
"plant": plant,
"stock_items": stock_data,
"query_time": datetime.utcnow().isoformat(),
}
async def purchase_order_create(
self, vendor: str, material: str, quantity: float, dry_run: bool = True
) -> dict:
"""调用 BAPI: BAPI_PO_CREATE1"""
if dry_run:
# Dry run:只做参数校验,不提交
return {
"status": "dry_run",
"message": "参数校验通过,未实际创建订单",
"validation": {
"vendor_valid": self._validate_vendor(vendor),
"material_valid": self._validate_material(material),
"quantity_valid": quantity > 0 and quantity < 999999,
},
"preview": {
"vendor": vendor,
"material": material,
"quantity": quantity,
"estimated_total": self._estimate_cost(material, quantity),
},
}
# 实际创建(需要额外审批确认)
po_header = {
"COMP_CODE": "1000",
"DOC_TYPE": "NB",
"VENDOR": vendor,
"PURCH_ORG": "1000",
"PUR_GROUP": "001",
}
po_items = [{
"PO_ITEM": "00010",
"MATERIAL": material,
"QUANTITY": quantity,
"PLANT": "1000",
}]
result = self.conn.call(
"BAPI_PO_CREATE1",
POHEADER=po_header,
POITEM=po_items,
)
if result.get("RETURN", [{}])[0].get("TYPE") == "E":
raise ToolError(f"SAP Error: {result['RETURN'][0]['MESSAGE']}")
return {
"status": "created",
"po_number": result.get("EXPPURCHASEORDER"),
"vendor": vendor,
"material": material,
"quantity": quantity,
}
def _validate_vendor(self, vendor: str) -> bool:
"""校验供应商是否存在"""
result = self.conn.call("BAPI_VENDOR_GETDETAIL", VENDOR=vendor)
return result.get("RETURN", {}).get("TYPE") != "E"
def _validate_material(self, material: str) -> bool:
"""校验物料是否存在"""
result = self.conn.call("BAPI_MATERIAL_GET_DETAIL", MATERIAL=material)
return result.get("RETURN", {}).get("TYPE") != "E"
def _estimate_cost(self, material: str, quantity: float) -> float:
"""获取物料标准价格估算总价"""
result = self.conn.call(
"BAPI_MATERIAL_GET_PRICE", MATERIAL=material
)
price = result.get("PRICE", 0)
return round(price * quantity, 2)
sap_adapter = SAPAdapter()
3.2 Agent 调用示例
Agent 的完整对话流程:
User: "帮我查一下物料 MAT-001234 在 1000 工厂的库存"
Agent: 调用 sap_inventory_query(material_number="MAT-001234", plant="1000")
→ 返回: {"stock_items": [{"available_qty": 150, "unit": "PC"}]}
Agent: "物料 MAT-001234 在 1000 工厂当前可用库存为 150 PC。"
User: "库存不够,帮我创建一个采购订单,供应商 V-005678,采购 500 个"
Agent: 先调 dry_run 校验:
sap_purchase_order_create(
vendor="V-005678", material="MAT-001234",
quantity=500, dry_run=True
)
→ 返回: {"status": "dry_run", "preview": {"estimated_total": 25000.00}}
Agent: "采购订单预检通过:供应商 V-005678,物料 MAT-001234,500 个,预估总价 ¥25,000。确认创建吗?"
User: "确认"
Agent: 实际创建:
sap_purchase_order_create(
vendor="V-005678", material="MAT-001234",
quantity=500, dry_run=False
)
→ 返回: {"status": "created", "po_number": "4500012345"}
四、场景二:飞书审批流对接
4.1 实现
# gateway/adapters/feishu.py
import httpx
import os
import time
class FeishuAdapter:
def __init__(self):
self.app_id = os.getenv("FEISHU_APP_ID")
self.app_secret = os.getenv("FEISHU_APP_SECRET")
self.base_url = "https://open.feishu.cn/open-apis"
self._token = None
self._token_expires = 0
async def _get_token(self) -> str:
"""获取/刷新 tenant_access_token"""
if time.time() < self._token_expires - 60:
return self._token
resp = await httpx.AsyncClient().post(
f"{self.base_url}/auth/v3/tenant_access_token/internal",
json={
"app_id": self.app_id,
"app_secret": self.app_secret,
}
)
data = resp.json()
self._token = data["tenant_access_token"]
self._token_expires = time.time() + data["expire"]
return self._token
async def approval_status(self, approval_code: str) -> dict:
"""查询审批单状态"""
token = await self._get_token()
resp = await httpx.AsyncClient().post(
f"{self.base_url}/approval/v4/instances",
headers={"Authorization": f"Bearer {token}"},
json={"approval_code": approval_code}
)
data = resp.json()
instance = data.get("data", {}).get("instance", {})
return {
"approval_code": approval_code,
"status": instance.get("status"), # PENDING / APPROVED / REJECTED
"title": instance.get("task_title"),
"created_at": instance.get("created_time"),
"current_approver": instance.get("current_node", {}).get("approver"),
"history": [
{
"node": h["task_title"],
"action": h["status"],
"operator": h.get("operator_name"),
"time": h.get("operate_time"),
}
for h in instance.get("history", [])
]
}
async def approval_remind(self, approval_code: str, message: str = "请尽快处理") -> dict:
"""发送催办通知"""
token = await self._get_token()
resp = await httpx.AsyncClient().post(
f"{self.base_url}/approval/v4/instances/{approval_code}/remind",
headers={"Authorization": f"Bearer {token}"},
json={"message": message}
)
data = resp.json()
if data.get("code") != 0:
raise ToolError(f"飞书催办失败: {data.get('msg')}")
return {
"status": "reminded",
"approval_code": approval_code,
"message": message,
}
feishu_adapter = FeishuAdapter()
4.2 Agent 调用示例
User: "帮我看看审批单 SP-20260513-0042 到哪了"
Agent: feishu_approval_status(approval_code="SP-20260513-0042")
→ 返回: {"status": "PENDING", "current_approver": "张三"}
Agent: "审批单 SP-20260513-0042(Q2 采购预算申请)当前状态:待审批,当前审批人:张三。"
User: "催一下"
Agent: "确认要发送催办通知给张三吗?"
User: "确认"
Agent: feishu_approval_remind(approval_code="SP-20260513-0042", message="请尽快处理")
→ 返回: {"status": "reminded"}
五、场景三:钉钉日程对接
5.1 实现
# gateway/adapters/dingtalk.py
import httpx
import os
from datetime import datetime, timedelta
class DingtalkAdapter:
def __init__(self):
self.app_key = os.getenv("DINGTALK_APP_KEY")
self.app_secret = os.getenv("DINGTALK_APP_SECRET")
self.base_url = "https://api.dingtalk.com/v1.0"
self._token = None
async def _get_token(self) -> str:
if self._token:
return self._token
resp = await httpx.AsyncClient().post(
"https://api.dingtalk.com/v1.0/oauth2/accessToken",
json={"appKey": self.app_key, "appSecret": self.app_secret}
)
self._token = resp.json()["accessToken"]
return self._token
async def schedule_conflict(
self, user_id: str, start_time: str, end_time: str
) -> dict:
"""检查日程冲突"""
token = await self._get_token()
# 查询该时间段内的已有日程
resp = await httpx.AsyncClient().get(
f"{self.base_url}/calendar/v1/users/{user_id}/calendars/primary/schedules",
headers={"x-acs-dingtalk-access-token": token},
params={
"timeMin": start_time,
"timeMax": end_time,
}
)
schedules = resp.json().get("data", [])
if schedules:
conflicts = [
{
"summary": s["summary"],
"start": s["start"]["dateTime"],
"end": s["end"]["dateTime"],
"organizer": s.get("creator", {}).get("displayName"),
}
for s in schedules
]
return {
"has_conflict": True,
"conflicts": conflicts,
"conflict_count": len(conflicts),
}
return {"has_conflict": False, "conflict_count": 0}
async def schedule_create(
self, summary: str, start_time: str, end_time: str,
attendees: list[str] = None, check_conflict: bool = True
) -> dict:
"""创建日程"""
if check_conflict:
conflict_result = await self.schedule_conflict(
user_id="me", start_time=start_time, end_time=end_time
)
if conflict_result["has_conflict"]:
return {
"status": "conflict_detected",
"message": f"检测到 {conflict_result['conflict_count']} 个冲突日程",
"conflicts": conflict_result["conflicts"],
}
token = await self._get_token()
payload = {
"summary": summary,
"description": f"由 AI Agent 创建",
"start": {"dateTime": start_time, "timeZone": "Asia/Shanghai"},
"end": {"dateTime": end_time, "timeZone": "Asia/Shanghai"},
}
if attendees:
payload["attendees"] = [{"id": a} for a in attendees]
resp = await httpx.AsyncClient().post(
f"{self.base_url}/calendar/v1/users/me/calendars/primary/schedules",
headers={"x-acs-dingtalk-access-token": token},
json=payload,
)
data = resp.json()
return {
"status": "created",
"schedule_id": data.get("scheduleId"),
"summary": summary,
"start_time": start_time,
"end_time": end_time,
}
dingtalk_adapter = DingtalkAdapter()
六、安全加固清单
把遗留系统暴露给 AI Agent,安全是底线。以下是必须实现的加固措施:
6.1 鉴权分级
| 级别 | 操作类型 | 鉴权方式 | 示例 |
|---|---|---|---|
| L1 | 只读查询 | API Key | 库存查询、状态查询 |
| L2 | 低风险写操作 | API Key + Dry Run | 日程创建、审批查询 |
| L3 | 高风险写操作 | OAuth 2.0 + Dry Run + 二次确认 | 采购订单创建、数据修改 |
| L4 | 不可逆操作 | OAuth 2.0 + 人工审批 | 删除、财务过账 |
6.2 参数注入防护
AI Agent 的参数是 LLM 生成的,必须做严格的 schema 校验:
import jsonschema
def validate_tool_input(tool_name: str, params: dict) -> None:
"""用 JSON Schema 校验 Agent 传入的参数"""
schema = TOOL_SCHEMAS.get(tool_name)
if not schema:
raise ToolError(f"Unknown tool: {tool_name}")
try:
jsonschema.validate(params, schema)
except jsonschema.ValidationError as e:
raise ToolError(f"Invalid parameters: {e.message}")
# 额外防护:防止 SQL/命令注入
def sanitize_string(value: str, max_length: int = 256) -> str:
"""清理字符串,防止注入"""
value = value[:max_length]
# 禁止 SQL 关键字
sql_keywords = ["DROP", "DELETE", "INSERT", "UPDATE", "ALTER", "EXEC"]
for kw in sql_keywords:
if kw.lower() in value.lower():
raise ToolError(f"Potentially malicious input detected: {kw}")
return value
6.3 审计日志
每条审计记录包含:
{
"timestamp": "2026-05-13T06:00:00Z",
"agent_id": "agent-sales-001",
"tool": "sap_purchase_order_create",
"params": {
"vendor": "V-005678",
"material": "MAT-001234",
"quantity": 500,
"dry_run": true
},
"result_summary": "dry_run, estimated_total=25000.00",
"status": "success",
"latency_ms": 245,
"ip": "10.0.1.50"
}
审计日志写入后不可修改,建议用 append-only 存储或写入区块链/不可变存储。
七、启动与部署
7.1 本地开发
# 安装依赖
pip install mcp pyrfc httpx jsonschema aiofiles
# 环境变量
export SAP_HOST="sap.company.com"
export SAP_USER="mcp_gateway"
export SAP_PASSWORD="***"
export FEISHU_APP_ID="cli_xxx"
export FEISHU_APP_SECRET="***"
export DINGTALK_APP_KEY="ding_xxx"
export DINGTALK_APP_SECRET="***"
# 启动(stdio 模式,供本地 Agent 调试)
python -m gateway.server
# 启动(SSE 模式,供远程 Agent 调用)
python -m gateway.server --transport sse --port 8765
7.2 Docker 部署
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY gateway/ ./gateway/
EXPOSE 8765
CMD ["python", "-m", "gateway.server", "--transport", "sse", "--port", "8765"]
7.3 Agent 端配置
Agent 通过 MCP config 连接到 Gateway:
{
"mcpServers": {
"enterprise-gateway": {
"command": "python",
"args": ["-m", "gateway.server"],
"env": {
"MCP_API_KEY": "your-api-key-here"
}
}
}
}
远程 SSE 模式:
{
"mcpServers": {
"enterprise-gateway": {
"url": "https://gateway.ainocode.cn/sse",
"headers": {
"Authorization": "Bearer your-api-key-here"
}
}
}
}
八、踩坑记录
-
SAP RFC 连接池必须复用。每次调用新建连接在 SAP 端会产生大量对话进程,很快超过
rdisp/max_wprun_time限制。解决方案:全局单例 Connection + 连接健康检查。 -
飞书 token 过期时机比文档说的早。文档说 expire 秒后过期,实测提前 2-3 分钟就失效了。我们在
_get_token里预留了 60 秒的 buffer。 -
钉钉日程 API 的时区陷阱。
dateTime字段如果不含timeZone,钉钉默认用 UTC,导致日程时间偏移 8 小时。必须显式指定"timeZone": "Asia/Shanghai"。 -
LLM 生成的参数类型不匹配。LLM 有时把
"500"(字符串)当成 quantity 传入,而 JSON Schema 定义的是 number。必须在 validate 层做类型转换或拒绝。 -
dry_run 不是万能的。SAP 的 dry_run 只做语法校验,不做业务规则校验(比如信用额度、采购限额)。真正的高风险操作需要完整的业务审批流。
九、总结
MCP Gateway 的本质是一个协议翻译器 + 安全网关。它让 AI Agent 能安全、可控地调用企业遗留系统的能力,同时保留了完整的审计追踪。
关键要点:
- 只读操作优先:先暴露查询类 Tool,验证安全性后再开放写操作
- dry_run 强制化:所有写操作默认 dry_run,需显式确认才实际执行
- 审计不可少:每条调用记录不可修改,便于事后追溯
- 渐进式暴露:从低风险的查询开始,逐步扩展到写操作,不一步到位
本文的完整代码已开源,包含 Docker Compose 部署配置和 Agent 端集成示例。