企业级多智能体系统实战:从架构到生产部署
Yaqin Hei··20分钟阅读
项目背景
在Cognizant,我领导团队为大型企业客户开发IT运维自动化系统。需求很明确:
- 目标:自动化60-70%的重复性IT请求
- 场景:密码重置、权限申请、软件安装、故障排查等
- 约束:必须与现有的ServiceNow系统集成,确保数据安全
单个LLM无法胜任这个任务,因为:
- 任务多样性:不同类型请求需要不同处理流程
- 工具调用:需要操作多个后端API
- 决策复杂度:需要多步推理和规划
- 可靠性要求:不能有幻觉或误操作
所以我们设计了一个多智能体协作系统。
系统架构
整体设计
┌─────────────────┐
│ 用户请求 │
└────────┬────────┘
│
┌──────▼──────┐
│ Router Agent │ ← 路由分发
└──────┬──────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│Password │ │Software │ │Ticket │
│ Agent │ │ Agent │ │ Agent │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌──────▼──────┐
│ ServiceNow │ ← 后端系统
│ APIs │
└─────────────┘
核心组件
1. Router Agent - 任务路由器
负责理解用户意图,分发到对应的专门Agent。
class RouterAgent:
"""路由Agent:将用户请求分发到专门Agent"""
def __init__(self):
self.specialists = {
"password_reset": PasswordAgent(),
"software_install": SoftwareAgent(),
"ticket_query": TicketAgent(),
"general_qa": QAAgent(),
}
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
def route(self, user_request: str) -> AgentResponse:
"""路由用户请求"""
# 使用LLM进行意图识别
prompt = f"""
分析以下用户请求,判断应该使用哪个Agent处理。
可选Agents:
- password_reset: 密码重置、账号锁定
- software_install: 软件安装、更新
- ticket_query: 工单查询、状态跟踪
- general_qa: 一般性问答
用户请求: {user_request}
返回JSON格式:
{{"agent": "agent_name", "confidence": 0.95, "reasoning": "原因"}}
"""
response = self.llm.invoke(prompt)
routing_decision = json.loads(response.content)
# 置信度检查
if routing_decision["confidence"] < 0.8:
return self._ask_clarification(user_request)
# 调用对应Agent
agent_name = routing_decision["agent"]
specialist = self.specialists[agent_name]
return specialist.handle(user_request)
设计要点:
- 结构化输出:使用JSON保证解析稳定性
- 置信度阈值:低于0.8时要求用户澄清
- 日志记录:记录所有路由决策,用于优化
2. Specialist Agents - 专门Agent
每个专门Agent负责特定类型任务。以密码重置为例:
class PasswordAgent:
"""密码重置专门Agent"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.servicenow = ServiceNowClient()
def handle(self, request: str) -> AgentResponse:
"""处理密码重置请求"""
# Step 1: 提取关键信息
user_info = self._extract_user_info(request)
# Step 2: 验证用户身份
if not self._verify_user(user_info):
return AgentResponse(
success=False,
message="身份验证失败,请联系IT支持"
)
# Step 3: 安全检查
if not self._security_check(user_info):
return AgentResponse(
success=False,
message="检测到异常,已创建安全工单"
)
# Step 4: 调用ServiceNow API重置密码
try:
result = self.servicenow.reset_password(
user_id=user_info["user_id"],
email=user_info["email"]
)
return AgentResponse(
success=True,
message=f"密码已重置,临时密码已发送到 {user_info['email']}"
)
except Exception as e:
logging.error(f"Password reset failed: {e}")
return AgentResponse(
success=False,
message="重置失败,已创建人工处理工单"
)
def _extract_user_info(self, request: str) -> dict:
"""提取用户信息"""
prompt = f"""
从以下请求中提取用户信息:
请求: {request}
返回JSON:
{{
"user_id": "提取的用户ID",
"email": "邮箱",
"department": "部门"
}}
"""
response = self.llm.invoke(prompt)
return json.loads(response.content)
def _verify_user(self, user_info: dict) -> bool:
"""验证用户身份"""
# 调用AD或SSO系统验证
return self.servicenow.verify_user(user_info)
def _security_check(self, user_info: dict) -> bool:
"""安全检查"""
# 检查最近是否有异常登录
# 检查是否是钓鱼攻击
recent_failures = self.servicenow.get_login_failures(
user_id=user_info["user_id"],
hours=24
)
return recent_failures < 5 # 24小时内失败不超过5次
3. Tool Layer - 工具层
统一封装所有外部API调用:
class ServiceNowClient:
"""ServiceNow API客户端"""
def __init__(self):
self.base_url = os.getenv("SERVICENOW_URL")
self.auth = (
os.getenv("SERVICENOW_USER"),
os.getenv("SERVICENOW_PASSWORD")
)
def reset_password(self, user_id: str, email: str) -> dict:
"""重置密码API"""
endpoint = f"{self.base_url}/api/now/table/sys_user"
# 生成临时密码
temp_password = self._generate_temp_password()
response = requests.post(
endpoint,
auth=self.auth,
json={
"user_id": user_id,
"password": temp_password,
"force_reset": True
},
timeout=10
)
response.raise_for_status()
# 发送邮件
self._send_password_email(email, temp_password)
return response.json()
def create_ticket(self, title: str, description: str, priority: str) -> str:
"""创建工单"""
endpoint = f"{self.base_url}/api/now/table/incident"
response = requests.post(
endpoint,
auth=self.auth,
json={
"short_description": title,
"description": description,
"priority": priority,
"assigned_to": "auto_triage"
},
timeout=10
)
response.raise_for_status()
ticket_id = response.json()["result"]["number"]
return ticket_id
关键技术实现
1. Agent间通信协议
定义标准的消息格式:
from pydantic import BaseModel
from typing import Optional, Dict, Any
class AgentMessage(BaseModel):
"""Agent间通信消息"""
sender: str # 发送者Agent ID
receiver: str # 接收者Agent ID
message_type: str # task/query/response
payload: Dict[str, Any] # 消息内容
context: Optional[Dict] = {} # 上下文信息
timestamp: float # 时间戳
class AgentResponse(BaseModel):
"""Agent响应"""
success: bool
message: str
data: Optional[Dict] = {}
next_steps: Optional[list] = []
2. 状态管理
使用Redis存储会话状态:
class ConversationState:
"""会话状态管理"""
def __init__(self, redis_client):
self.redis = redis_client
def save_state(self, session_id: str, state: dict):
"""保存会话状态"""
key = f"session:{session_id}"
self.redis.setex(
key,
3600, # 1小时过期
json.dumps(state)
)
def get_state(self, session_id: str) -> dict:
"""获取会话状态"""
key = f"session:{session_id}"
data = self.redis.get(key)
return json.loads(data) if data else {}
def add_history(self, session_id: str, message: dict):
"""添加对话历史"""
state = self.get_state(session_id)
if "history" not in state:
state["history"] = []
state["history"].append(message)
self.save_state(session_id, state)
3. 错误处理与降级
生产环境的可靠性是核心:
class AgentExecutor:
"""Agent执行器,带重试和降级"""
def __init__(self, max_retries=3):
self.max_retries = max_retries
def execute_with_retry(self, agent_func, *args, **kwargs):
"""带重试的执行"""
for attempt in range(self.max_retries):
try:
return agent_func(*args, **kwargs)
except RateLimitError:
# API限流,指数退避
wait_time = 2 ** attempt
logging.warning(f"Rate limit, retry in {wait_time}s")
time.sleep(wait_time)
except APIError as e:
# API错误,创建人工工单
logging.error(f"API error: {e}")
return self._create_fallback_ticket(*args, **kwargs)
except Exception as e:
# 未知错误
logging.error(f"Unexpected error: {e}")
if attempt == self.max_retries - 1:
return self._create_fallback_ticket(*args, **kwargs)
# 所有重试失败
return AgentResponse(
success=False,
message="系统暂时不可用,已创建人工处理工单"
)
def _create_fallback_ticket(self, *args, **kwargs):
"""降级:创建人工处理工单"""
servicenow = ServiceNowClient()
ticket_id = servicenow.create_ticket(
title="AI Agent处理失败",
description=f"Request: {args}, Error: {kwargs}",
priority="high"
)
return AgentResponse(
success=False,
message=f"已创建工单 {ticket_id},人工将在30分钟内处理"
)
生产环境优化
1. 性能优化
并行处理:
import asyncio
async def parallel_agent_call(agents, request):
"""并行调用多个Agent"""
tasks = [agent.handle_async(request) for agent in agents]
results = await asyncio.gather(*tasks)
return results
# 示例:同时查询多个数据源
results = await parallel_agent_call(
agents=[TicketAgent(), KnowledgeBaseAgent(), HistoryAgent()],
request="查询我的所有工单"
)
缓存热点数据:
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_user_permissions(user_id: str) -> list:
"""缓存用户权限(1小时)"""
return servicenow.get_permissions(user_id)
2. 监控与告警
关键指标:
import prometheus_client as prom
# 定义指标
agent_requests = prom.Counter(
'agent_requests_total',
'Total agent requests',
['agent_name', 'status']
)
agent_latency = prom.Histogram(
'agent_latency_seconds',
'Agent response latency',
['agent_name']
)
agent_success_rate = prom.Gauge(
'agent_success_rate',
'Agent success rate',
['agent_name']
)
# 使用
def handle_request_with_metrics(agent, request):
start_time = time.time()
try:
response = agent.handle(request)
status = "success" if response.success else "failure"
agent_requests.labels(
agent_name=agent.__class__.__name__,
status=status
).inc()
return response
finally:
latency = time.time() - start_time
agent_latency.labels(
agent_name=agent.__class__.__name__
).observe(latency)
3. 安全性保障
输入验证:
from pydantic import BaseModel, validator
class UserRequest(BaseModel):
"""用户请求模型"""
request_text: str
user_id: str
session_id: str
@validator('request_text')
def validate_request(cls, v):
# 检查恶意输入
dangerous_patterns = [
'DROP TABLE', 'DELETE FROM', '<script>',
'eval(', 'exec(', '__import__'
]
for pattern in dangerous_patterns:
if pattern.lower() in v.lower():
raise ValueError("检测到危险输入")
# 限制长度
if len(v) > 5000:
raise ValueError("请求过长")
return v
权限控制:
def require_permission(permission: str):
"""权限装饰器"""
def decorator(func):
def wrapper(self, request, *args, **kwargs):
user_id = request.user_id
if not self._check_permission(user_id, permission):
return AgentResponse(
success=False,
message=f"权限不足,需要: {permission}"
)
return func(self, request, *args, **kwargs)
return wrapper
return decorator
class PasswordAgent:
@require_permission("password_reset")
def handle(self, request):
# 处理逻辑
pass
实际效果
部署6个月后的数据:
| 指标 | 结果 |
|---|---|
| 自动化率 | 68% ✅ |
| 平均处理时间 | 从45分钟 → 2分钟 |
| 用户满意度 | 4.6/5.0 |
| 成本节省 | $2.3M/年 |
| 错误率 | 低于2% |
关键成功因素:
- ✅ 明确的Agent职责划分
- ✅ 完善的错误处理和降级机制
- ✅ 严格的安全控制
- ✅ 详细的监控和日志
经验教训
做得好的地方
- 模块化设计:新增Agent很容易
- 降级机制:从未出现完全不可用
- 监控体系:问题能快速定位
需要改进
- 幻觉问题:少数情况下LLM会生成错误信息
- 成本控制:GPT-4调用成本较高
- 冷启动优化:第一次请求较慢
未来计划
- 使用开源模型降低成本
- 引入检索增强(RAG)减少幻觉
- 实现更复杂的多Agent协作(如拍卖、投票机制)
总结
多智能体系统是构建复杂AI应用的有效范式。关键是:
- 清晰的职责划分:每个Agent专注特定任务
- 标准化通信:定义清晰的消息协议
- 鲁棒的错误处理:必须有降级方案
- 完善的监控:生产环境的生命线
- 持续优化:根据实际使用数据改进
从我的经验看,多智能体不是"银弹",但在合适的场景下(任务可分解、需要工具调用、要求高可靠性),它是目前最实用的方案。
代码示例 完整项目代码:GitHub Repo
想交流? 如果你也在构建多智能体系统,欢迎联系我分享经验!
Subscribe for updates
Get the latest AI engineering posts delivered to your inbox.