前言
随着云原生技术、微服务架构和容器编排体系的普及,IT系统的复杂度持续攀升。传统运维观测手段难以应对高维度数据、复杂链路与快速迭代带来的挑战,诊断效率与交互体验均面临瓶颈。云舟观测智能体系统( GC Agent System)由此推出。系统以自然语言交互为入口,通过统一编排层与标准化工具体系,实现对观测数据的智能分析、故障辅助诊断与知识检索,为企业构建新一代的自动化运维能力。
本文聚焦系统架构设计、核心模块原理与工程实现,为大家呈现一个完整可靠、可扩展、企业级的监控观测智能体系统技术蓝图。
一、总体架构设计
1.1 架构理念
系统采用 模块化、松耦合、可扩展 的架构体系。上层业务逻辑、中层编排调度与下层执行工具使用标准接口隔离,使系统既能保持稳定性,也能在未来轻松扩展至更多模型、更多场景和更多观测工具。
1.2 分层架构
系统整体分为 用户/网关层、编排层、Agent工具与能力层、协议层、持久层等五个层次,各层职责清晰、边界明确。
后端核心技术组件主要有以下:
选型以 性能优先、生态成熟、支持异步、运维友好 为原则,核心能力基于langgraph/langchain构建。
1.3 系统任务流程
示例:
一个典型的用户交互请求(如“帮我诊断下主机故障”)在系统中的流转过程如下:
请求接入与极速响应:用户发起请求后,API 层立即生成会话 ID,并在进入任何耗时操作之前,通过 SSE 推送首包事件。这种“首包零延迟”机制能够显著减少用户等待时间。
纵深防御鉴权:系统基于 Cookie 获取用户态信息,并将其编码到 x-ai Header 中。该凭证会伴随整个请求链路,最终传递到最底层的 MCP 工具,从而实现数据级的权限隔离。
智能决策编排:
当请求来自明确的指令(例如前端按钮触发),系统会直接进入路由专家模式。
当请求为自然语言输入时,系统进入自由探索模式。意图识别组件会对请求进行七类分类(如问候、知识查询、故障排查等),并据此选择最合适的执行策略。
动态代理构建:系统根据任务类型,通过 MCPLoader 动态构建执行代理,加载所需的 MCP 工具,并配置相应提示模板,使代理具备完成任务的能力。
推理与执行循环:代理按照“推理—调用工具—获取结果”的工作方式进行循环。在执行过程中,系统会实时记录关键中间状态并通过 SSE 反馈给前端,以便用户了解任务的执行进度。
结果生成与记忆:当任务执行完成后,系统会将最终结果写入 Redis 的会话历史。同时,后台会异步触发标题生成流程,为该轮会话生成简洁、可索引的主题描述,形成完整的交互闭环。
二、核心组件深度解析
2.1 GCExecutor - 核心引擎
GCExecutor是整个系统的核心编排器,负责协调Agent生态系统的运作。它采用了创新的双模式执行策略,能够根据任务特性自动选择最优的执行路径。
2.1.1 路由专家模式(Route Mode)
路由专家模式是为专业任务设计的快速执行通道。当用户明确知道需要哪个专业Agent时,系统会直接路由到对应的Agent,跳过意图识别环节,大幅提升响应速度。适用于需求明确、流程固定的场景。
核心特性:
直接路由:通过特定参数指定目标Agent
工具精简:只加载Agent配置中定义的5-10个必要工具
响应可预测:执行路径固定,便于调试和优化
性能优先:减少LLM调用次数,降低延迟
目前适用场景:
主机诊断:直接调用主机诊断专家。
知识查询:直接调用知识库专家。
定向任务:用户明确需要特定功能(待开放)
2.1.2 AI自由探索模式(Explore Mode)
探索模式是系统的智能化体现,适用于复杂、开放性的问题。系统会加载所有可用工具,让AI自主分析问题并选择最优工具组合。
核心特性:
全量工具:加载25+可用工具
智能选择:AI根据上下文自主决策
多轮迭代:支持复杂的多步骤任务
动态调整:根据执行结果调整策略
适用场景:
复杂故障诊断
多维度数据分析
探索性问题解决
跨领域综合任务
为保障多租户隔离,每次执行均会重新构建执行环境,注入用户级鉴权信息。
2.1.3 意图识别系统
意图识别是探索模式的第一道关卡,通过多级过滤和智能分析,精准理解用户意图。
五级识别流程:
输入净化:移除特殊字符,防止注入攻击
可疑检测:识别恶意输入模式
相关性判断:检查是否属于平台业务范围
历史融合:结合对话历史理解上下文
LLM分析:深度语义理解,返回7种意图类型
意图类型体系:
无效/恶意输入
知识查询
通用任务
问候类输入
身份信息询问
上下文相关短问
可疑/异常输入(防注入)
2.1.4 编排核心逻辑
编排器的核心在于协调各个组件的协同工作,确保请求的高效处理。
执行流程:
请求初始化:生成session_id和response_id
模式判断:路由模式 vs 探索模式
Agent创建:动态创建Agent实例(不使用缓存)
工具加载:根据模式加载相应工具集
执行监控:实时跟踪执行状态
结果聚合:整合多个工具的返回结果
关键实现代码:
class GCExecutor:"""GC Agent 执行器"""def __init__(self, redis_url: str = None):"""初始化执行器"""# 初始化LLMself.llm = GCLLMManager.get_llm()# 获取MCP加载器self.mcp_loader = get_mcp_loader()# Agent缓存(注意:实际不使用缓存)self.agents = {}# 初始化Redis存储self.redis_store = GCRedisStore(redis_url=redis_url)# 初始化反馈服务self.feedback_service = GCFeedbackService(redis_url=redis_url)async def execute(self, request: GCRequest, start_sequence: int = 0) -> AsyncIterator[str]:"""执行请求 - 生成SSE流"""# 保存headers到实例,供后续工具创建使用self.current_request_headers = request.headers# 确定执行模式if request.direct_agent:# 直接路由模式 - 使用指定的Agentagent_id = request.direct_agentasync for event in self._execute_with_agent(agent_id,request.message,session_id,response_id,sequence):yield eventelse:# AI自由探索模式async for event in self._execute_explore_mode(request.message,session_id,response_id,sequence):yield event
意图识别的完整实现:
async def _analyze_intent(self, query: str, session_id: str) -> Dict[str, Any]:"""增强的意图识别方法(带防护机制)"""# 步骤1:输入净化cleaned_query = self._sanitize_input(query)# 步骤2:可疑输入检测if self._is_suspicious_input(cleaned_query):return {"intent_type": "invalid","intent_name": "可疑输入","confidence": 1.0,"direct_response": "抱歉,您的输入包含不支持的内容..."}# 步骤3:主题相关性检测if not self._is_observation_related(cleaned_query):pass # 继续LLM判断# 步骤4:获取历史对话history_messages = await self.redis_store.get_conversation_history(session_id=session_id,limit=20)# 步骤5:LLM意图分析response = await self.llm.ainvoke([SystemMessage(content="你是一个意图识别专家..."),HumanMessage(content=prompt)])
小结:GCExecutor通过双模式执行策略,既保证了专业任务的高效执行,又提供了复杂问题的智能探索能力。意图识别系统的多级过滤机制确保了系统的安全性和准确性。
2.2 MCP工具加载器和Header透传
MCP(Model Context Protocol)是系统与外部工具交互的统一协议,Headers透传机制确保了认证信息在整个调用链路中的完整传递。
async def _create_tools_for_agent(self,agent_config: Dict[str, Any],explore_mode: bool = False,request_headers: Optional[Dict[str, str]] = None) -> List[Tool]:"""为Agent创建工具"""# 合并headers,确保所有值都是字符串final_headers = {}# 处理配置中的headersif mcp_server.get("headers"):for key, value in mcp_server["headers"].items():final_headers[key] = str(value)# 处理请求中的headers(包含x-ai认证信息)if request_headers:for key, value in request_headers.items():final_headers[key] = str(value)# 创建MCP客户端配置server_config = {"server": {"url": url,"transport": "streamable_http","headers": final_headers # Headers透传到MCP服务}}# 每次都创建新客户端,不缓存client = MultiServerMCPClient(server_config)# 根据模式过滤工具if not explore_mode and defined_tools:# 路由模式:只返回定义的工具return [t for t in all_tools if t.name in defined_tools]else:# 探索模式:返回所有工具return all_tools
小结:MCP协议的引入统一了工具调用接口,Headers透传机制保证了认证链路的完整性,这是系统安全性的重要保障。
三、技术创新亮点
3.1 双模式执行策略
class ExecutionModeStrategy:"""创新的双模式执行策略"""async def execute_explore_mode(self, query: str):"""探索模式 - 适用于复杂、开放性问题特点:1. 加载所有可用工具(20+)2. AI自主选择最优工具组合3. 支持多轮工具调用4. 动态调整执行策略"""# 加载完整工具集all_tools = await self.load_all_tools()# 创建超级Agentsuper_agent = create_react_agent(llm=self.llm,tools=all_tools,prompt=self.exploration_prompt)# 智能执行return await super_agent.astream(query)async def execute_route_mode(self, query: str, agent_id: str):"""路由模式 - 适用于明确、专业的任务特点:1. 直接路由到专业Agent2. 只加载必要工具(5-10个)3. 执行速度快4. 响应可预测"""# 加载指定Agentagent = await self.load_agent(agent_id)# 快速执行return await agent.astream(query)
3.2 流式响应系统
class StreamingResponseSystem:"""SSE流式响应系统"""async def generate_sse_stream(self, request: GCRequest):"""生成SSE事件流技术特点:1. 首包延迟 < 100ms2. 思维链实时可视化3. 内容渐进式输出4. 错误优雅降级"""# 立即发送首个事件yield self._format_sse({"type": "session_response_id","message": response_id,"timestamp": datetime.now().isoformat()})# 实时发送思考步骤async for step in self.executor.think():yield self._format_sse({"type": "thinking_step","message": step.description,"step": step.type})# 流式输出内容async for token in self.executor.generate():yield self._format_sse({"type": "token","message": token})
3.3 智能工具调用格式化
def _get_tool_description(self, tool_name: str, tool_args: Dict) -> str:"""智能工具描述生成创新点:1. 根据工具类型生成友好描述2. 参数智能提取和展示3. 执行意图清晰表达"""# 工具语义映射semantic_map = {"getSystemSpace": "获取系统空间信息","queryHostCoreMetrics": "查询主机核心指标","queryHostProcData": "分析进程运行状态","getHostErrorLogs": "检索系统异常日志"}# 智能描述生成base_desc = semantic_map.get(tool_name, f"执行{tool_name}")# 参数增强if "hosts" in tool_args:base_desc += f" - 目标: {tool_args['hosts']}"if "metrics" in tool_args:base_desc += f" - 指标: {tool_args['metrics']}"return base_desc
3.4 企业级会话管理与用户隔离
class SessionManagement:"""企业级会话管理系统"""async def manage_session(self, session_id: str, user_id: str):"""会话管理核心技术特点:1. Redis持久化 - 分布式会话存储2. 用户隔离 - 基于Cookie的认证3. 上下文保持 - 历史消息关联4. 标题生成 - AI智能摘要"""# 会话创建/获取session = await self.redis_store.get_or_create_session(session_id=session_id,user_id=user_id)# 历史上下文构建history = await self.redis_store.get_conversation_history(session_id=session_id,limit=20 # 保持最近20条消息)# 智能标题生成if not session.title:title = await self._generate_session_title(user_question=query,ai_response=response)await self.redis_store.update_session_title(session_id, title)
小结:通过双模式执行、流式响应、智能工具格式化等创新技术,系统实现了效率与智能的完美平衡。
四、上下文/记忆管理
记忆是 Agent 能够进行连续对话的基础。本系统基于 Redis 构建了一套层次化的分布式记忆系统,实现了会话管理、历史记忆和用户隔离等核心功能。
class GCRedisStore:"""GC Agent的Redis存储实现"""def __init__(self, redis_url: str = None):"""初始化Redis连接"""if redis_url:self.redis = redis.from_url(redis_url)else:# 从配置文件读取config = self._load_config()redis_config = config.get('redis', {})self.redis = redis.Redis(host=redis_config.get('host', 'localhost'),port=redis_config.get('port', 6379),db=redis_config.get('db', 0),decode_responses=True)async def get_or_create_session(self, session_id: str, user_id: str = "default") -> SessionContext:"""获取或创建会话上下文"""key = f"session:{session_id}"context_data = await self.redis.hgetall(key)if not context_data:# 创建新会话context = SessionContext(session_id=session_id,user_id=user_id,created_at=datetime.now(),updated_at=datetime.now())await self.redis.hset(key, mapping=context.dict())else:context = SessionContext(**context_data)return contextasync def add_user_message(self, session_id: str, content: str):"""添加用户消息到历史"""message = {"role": "user","content": content,"timestamp": datetime.now().isoformat()}key = f"messages:{session_id}"await self.redis.rpush(key, json.dumps(message))# 更新会话时间await self._update_session_time(session_id)
小结:Redis存储层通过精心设计的Key结构和过期策略,实现了高效的会话管理和历史记忆功能。同时,为了提升历史会话的可读性,系统会在首轮对话结束后,异步触发一个轻量级 LLM 任务。该任务会根据用户的提问和 AI 的回答,提炼出精准的会话摘要(15 字以内),并更新到 Redis 中。这一过程对用户完全透明,不阻塞主交互流程。
五、认证机制
系统采用创新的三层鉴权架构,形成完整的安全闭环,保证数据安全可靠。
5.1 三层鉴权体系
第一层:网关层权限校验
拦截所有流量,通过与企业鉴权中心的交互,验证 Cookie 的合法性并解析出用户身份。这一层阻挡了所有未授权的外部访问。
Cookie交换:解析用户Cookie获取智汇云用户态信息
用户识别:从Cookie中提取当前用户的各类信息
第二层:MCP Server鉴权
在 API 业务逻辑中,系统强制校验 Session ID 与 User ID 的绑定关系。即使用户猜到了其他人的 Session ID,也无法查看或操作对应的会话内容,确保了租户间的绝对隔离。
服务认证:MCP Server自身的访问控制
未来扩展:支持MCP Server市场的统一鉴权链接
服务隔离:不同MCP Server间的权限隔离
第三层:数据权限校验
这是最具创新性的一层。系统将用户身份信息编码为 `x-ai` Header,并通过 MCP 协议透传至最底层的工具服务。这意味着,权限控制下沉到了数据源头——LLM 无法"看到"用户无权访问的主机或日志,从根本上解决了 AI 系统常见的数据泄露风险。
Headers透传:将用户信息透传到每个工具
细粒度控制:每个MCP Server的Tool做各自的数据权限验证
动态授权:基于用户角色的动态权限分配
5.2 认证服务实现
class AuthService:"""统一的认证服务"""async def get_user_info(self, request: Request) -> Tuple[str, str]:"""从请求中获取用户信息"""# 尝试从Cookie获取cookie_value = request.cookies.get("sessionid") or request.cookies.get("JSESSIONID")if cookie_value:try:# 解析Cookie获取用户信息user_info = self._parse_cookie(cookie_value)user_name = user_info.get("userName", "default")return json.dumps(user_info), user_nameexcept Exception as e:logger.warning(f"Cookie解析失败: {e}")# 降级到默认用户default_user = {"userName": "default","userId": "default","timestamp": datetime.now().isoformat()}return json.dumps(default_user), "default"def get_x_ai_header(self, user_info: str) -> str:"""生成X-AI header"""# Base64编码用户信息return base64.b64encode(user_info.encode()).decode()async def verify_session_ownership(self,session_id: str,user_name: str,redis_store: GCRedisStore) -> bool:"""验证会话所有权"""session_context = await redis_store.get_context(session_id)if not session_context:return Falsereturn session_context.user_id == user_name
小结:三层鉴权机制从网关、服务、数据三个维度构建了完整的安全体系,确保了系统的安全性和可靠性。
六、系统可观测性与可进化能力
6.1 Trace数据上报
系统通过Traceloop统一上报采集数据到LLM应用监测模块,实现全链路追踪。
监控维度:
LLM调用监控:Token使用量、响应时间、错误率
工具执行追踪:每个工具的执行时间和成功率
会话分析:用户行为模式、热点问题识别
性能指标:P99延迟、QPS、并发数
实现代码:
from traceloop.sdk import Traceloopfrom traceloop.sdk.tracing.tracing import Instruments# 先禁用 MCP 自动初始化Traceloop.init(app_name="gc-agent",resource_attributes={"service.name": "gc-agent","service.version": "v1.0","service.xHawkUserSpaceId": "xxx"},api_endpoint="http://traces.xxxx.lycc.qihoo.net:55683",block_instruments={Instruments.MCP},disable_batch=True)# 手动初始化 MCP instrumentormcp_instrumentor = Nonetry:from opentelemetry.instrumentation.mcp import McpInstrumentormcp_instrumentor = McpInstrumentor()# 验证初始化状态logger.info(f"MCP instrumentor is instrumented: {mcp_instrumentor.is_instrumented_by_opentelemetry}")except Exception as e:logger.error(f"Failed to initialize MCP instrumentor: {e}")mcp_instrumentor = None# 添加src路径sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
6.2 反馈服务系统
反馈服务主要用于收集用户对AI响应的评价(点赞/点踩),为系统后期优化提供数据支持。
评价收集:支持用户对每个响应进行评价
数据持久化:将反馈数据存储在Redis中
统计分析:支持按会话、时间等维度统计
6.3 实现代码
class GCFeedbackService:"""反馈服务"""def __init__(self, redis_url: str = None):"""初始化反馈服务"""if redis_url:self.redis = redis.from_url(redis_url)else:# 从配置文件读取config = self._load_config()redis_config = config.get('redis', {})self.redis = redis.Redis(host=redis_config.get('host', 'localhost'),port=redis_config.get('port', 6379),db=redis_config.get('feedback_db', 1), # 使用独立的DBdecode_responses=True)async def save_response(self,response_id: str,session_id: str,user_question: str,agent_response: str) -> Feedback:"""保存响应记录"""feedback = Feedback(response_id=response_id,session_id=session_id,user_question=user_question,agent_response=agent_response,created_at=datetime.now(),updated_at=datetime.now())# 保存到Rediskey = f"feedback:{response_id}"await self.redis.hset(key, mapping=feedback.dict())# 添加到会话索引session_key = f"session_feedbacks:{session_id}"await self.redis.sadd(session_key, response_id)return feedbackasync def update_rating(self,response_id: str,rating: Optional[str],session_id: Optional[str] = None) -> bool:"""更新反馈评分"""key = f"feedback:{response_id}"# 检查反馈是否存在exists = await self.redis.exists(key)if not exists:logger.warning(f"反馈记录不存在: {response_id}")return False# 更新评分updates = {"rating": rating or "null","updated_at": datetime.now().isoformat()}await self.redis.hset(key, mapping=updates)logger.info(f"反馈评分已更新: {response_id} -> {rating}")return True
小结:反馈服务通过简单有效的设计,为系统的持续优化提供了重要的数据基础。
结语
云舟观测智能体系统通过分层架构设计、标准化工具体系和可扩展的智能编排能力,为企业提供了结构清晰、可审计、易集成的智能化监控观测能力。在当前架构基础上,系统将围绕以下方向持续演进:
技术演进方向:
Agent 市场化:支持第三方 Agent 接入,构建内部能力生态。
主动式运维:从被动式操作协助迈向预测式、预防式运维。
多模态支持:实现图表、日志、指标等多类型观测数据的统一分析。
产品化路线:
私有化部署:支持与智汇云体系的集成与打包部署,满足企业级落地需求。
插件化架构:允许用户按需扩展 MCP 工具与流程,实现业务级定制能力。
生态化发展:与更多观测工具与运维平台深度集成,构建全链路协同生态。
更多技术干货,
请关注“360智汇云开发者”👇
360智汇云是以"汇聚数据价值,助力智能未来"为目标的企业应用开放服务平台,融合360丰富的产品、技术力量,为客户提供平台服务。
目前,智汇云提供数据库、中间件、存储、大数据、人工智能、计算、网络、视联物联与通信等多种产品服务以及一站式解决方案。
官网:https://zyun.360.cn(复制在浏览器中打开)
更多好用又便宜的云产品,欢迎试用体验~

