大数跨境
0
0

云舟观测智能体系统技术架构解析

云舟观测智能体系统技术架构解析 360智汇云开发者
2025-11-28
0
导读:本文聚焦云舟观测智能体系统架构设计、核心模块原理与工程实现,为大家呈现一个完整可靠、可扩展、企业级的监控观测智能体系统技术蓝图。

前言

随着云原生技术、微服务架构和容器编排体系的普及,IT系统的复杂度持续攀升。传统运维观测手段难以应对高维度数据、复杂链路与快速迭代带来的挑战,诊断效率与交互体验均面临瓶颈。云舟观测智能体系统( GC Agent System)由此推出。系统以自然语言交互为入口,通过统一编排层与标准化工具体系,实现对观测数据的智能分析、故障辅助诊断与知识检索,为企业构建新一代的自动化运维能力。

本文聚焦系统架构设计、核心模块原理与工程实现,为大家呈现一个完整可靠、可扩展、企业级的监控观测智能体系统技术蓝图。


一、总体架构设计

1.1 架构理念

系统采用 模块化、松耦合、可扩展 的架构体系。上层业务逻辑、中层编排调度与下层执行工具使用标准接口隔离,使系统既能保持稳定性,也能在未来轻松扩展至更多模型、更多场景和更多观测工具。

1.2 分层架构

系统整体分为 用户/网关层、编排层、Agent工具与能力层、协议层、持久层等五个层次,各层职责清晰、边界明确。

后端核心技术组件主要有以下:

选型以 性能优先、生态成熟、支持异步、运维友好 为原则,核心能力基于langgraph/langchain构建。

1.3 系统任务流程

示例:

一个典型的用户交互请求(如“帮我诊断下主机故障”)在系统中的流转过程如下:

  1. 请求接入与极速响应:用户发起请求后,API 层立即生成会话 ID,并在进入任何耗时操作之前,通过 SSE 推送首包事件。这种“首包零延迟”机制能够显著减少用户等待时间。

  2. 纵深防御鉴权:系统基于 Cookie 获取用户态信息,并将其编码到 x-ai Header 中。该凭证会伴随整个请求链路,最终传递到最底层的 MCP 工具,从而实现数据级的权限隔离。

  3. 智能决策编排:

    • 当请求来自明确的指令(例如前端按钮触发),系统会直接进入路由专家模式。

    • 当请求为自然语言输入时,系统进入自由探索模式。意图识别组件会对请求进行七类分类(如问候、知识查询、故障排查等),并据此选择最合适的执行策略。

  4. 动态代理构建:系统根据任务类型,通过 MCPLoader 动态构建执行代理,加载所需的 MCP 工具,并配置相应提示模板,使代理具备完成任务的能力。

  5. 推理与执行循环:代理按照“推理—调用工具—获取结果”的工作方式进行循环。在执行过程中,系统会实时记录关键中间状态并通过 SSE 反馈给前端,以便用户了解任务的执行进度。

  6. 结果生成与记忆:当任务执行完成后,系统会将最终结果写入 Redis 的会话历史。同时,后台会异步触发标题生成流程,为该轮会话生成简洁、可索引的主题描述,形成完整的交互闭环。

二、核心组件深度解析

2.1 GCExecutor - 核心引擎

GCExecutor是整个系统的核心编排器,负责协调Agent生态系统的运作。它采用了创新的双模式执行策略,能够根据任务特性自动选择最优的执行路径。

2.1.1 路由专家模式(Route Mode)

路由专家模式是为专业任务设计的快速执行通道。当用户明确知道需要哪个专业Agent时,系统会直接路由到对应的Agent,跳过意图识别环节,大幅提升响应速度。适用于需求明确、流程固定的场景。

核心特性:

  • 直接路由:通过特定参数指定目标Agent

  • 工具精简:只加载Agent配置中定义的5-10个必要工具

  • 响应可预测:执行路径固定,便于调试和优化

  • 性能优先:减少LLM调用次数,降低延迟

目前适用场景:

  • 主机诊断:直接调用主机诊断专家。

  • 知识查询:直接调用知识库专家。

  • 定向任务:用户明确需要特定功能(待开放)

2.1.2 AI自由探索模式(Explore Mode)

探索模式是系统的智能化体现,适用于复杂、开放性的问题。系统会加载所有可用工具,让AI自主分析问题并选择最优工具组合。

核心特性:

  • 全量工具:加载25+可用工具

  • 智能选择:AI根据上下文自主决策

  • 多轮迭代:支持复杂的多步骤任务

  • 动态调整:根据执行结果调整策略

适用场景:

  • 复杂故障诊断

  • 多维度数据分析

  • 探索性问题解决

  • 跨领域综合任务

为保障多租户隔离,每次执行均会重新构建执行环境,注入用户级鉴权信息。

2.1.3 意图识别系统

意图识别是探索模式的第一道关卡,通过多级过滤和智能分析,精准理解用户意图。

五级识别流程:

  1. 输入净化:移除特殊字符,防止注入攻击

  2. 可疑检测:识别恶意输入模式

  3. 相关性判断:检查是否属于平台业务范围

  4. 历史融合:结合对话历史理解上下文

  5. LLM分析:深度语义理解,返回7种意图类型

意图类型体系:

  • 无效/恶意输入

  • 知识查询

  • 通用任务

  • 问候类输入

  • 身份信息询问

  • 上下文相关短问

  • 可疑/异常输入(防注入)

2.1.4 编排核心逻辑

编排器的核心在于协调各个组件的协同工作,确保请求的高效处理。

执行流程:

  1. 请求初始化:生成session_id和response_id

  2. 模式判断:路由模式 vs 探索模式

  3. Agent创建:动态创建Agent实例(不使用缓存)

  4. 工具加载:根据模式加载相应工具集

  5. 执行监控:实时跟踪执行状态

  6. 结果聚合:整合多个工具的返回结果

关键实现代码:

class GCExecutor:    """GC Agent 执行器"""    def __init__(self, redis_url: str = None):        """初始化执行器"""        # 初始化LLM        self.llm = GCLLMManager.get_llm()
        # 获取MCP加载器        self.mcp_loader = get_mcp_loader()
        # Agent缓存(注意:实际不使用缓存)        self.agents = {}
        # 初始化Redis存储        self.redis_store = GCRedisStore(redis_url=redis_url)
        # 初始化反馈服务        self.feedback_service = GCFeedbackService(redis_url=redis_url)    async def execute(self, request: GCRequest, start_sequence: int = 0) -> AsyncIterator[str]:        """执行请求 - 生成SSE流"""
        # 保存headers到实例,供后续工具创建使用        self.current_request_headers = request.headers
        # 确定执行模式        if request.direct_agent:            # 直接路由模式 - 使用指定的Agent            agent_id = request.direct_agent            async for event in self._execute_with_agent(                    agent_id,                    request.message,                    session_id,                    response_id,                    sequence            ):                yield event        else:            # AI自由探索模式            async for event in self._execute_explore_mode(                    request.message,                    session_id,                    response_id,                    sequence            ):                yield event

意图识别的完整实现:

async def _analyze_intent(self, query: str, session_id: str) -> Dict[strAny]:    """增强的意图识别方法(带防护机制)"""
    # 步骤1:输入净化    cleaned_query = self._sanitize_input(query)
    # 步骤2:可疑输入检测    if self._is_suspicious_input(cleaned_query):        return {            "intent_type""invalid",            "intent_name""可疑输入",            "confidence"1.0,            "direct_response""抱歉,您的输入包含不支持的内容..."        }
    # 步骤3:主题相关性检测    if not self._is_observation_related(cleaned_query):        pass  # 继续LLM判断
    # 步骤4:获取历史对话    history_messages = await self.redis_store.get_conversation_history(        session_id=session_id,        limit=20    )
    # 步骤5:LLM意图分析    response = await self.llm.ainvoke([        SystemMessage(content="你是一个意图识别专家..."),        HumanMessage(content=prompt)    ])

小结:GCExecutor通过双模式执行策略,既保证了专业任务的高效执行,又提供了复杂问题的智能探索能力。意图识别系统的多级过滤机制确保了系统的安全性和准确性。

2.2 MCP工具加载器和Header透传

MCP(Model Context Protocol)是系统与外部工具交互的统一协议,Headers透传机制确保了认证信息在整个调用链路中的完整传递。

async def _create_tools_for_agent(    self,     agent_config: Dict[strAny],     explore_mode: bool = False,    request_headers: Optional[Dict[strstr]] = None) -> List[Tool]:    """为Agent创建工具"""
    # 合并headers,确保所有值都是字符串    final_headers = {}
    # 处理配置中的headers    if mcp_server.get("headers"):        for key, value in mcp_server["headers"].items():            final_headers[key] = str(value)
    # 处理请求中的headers(包含x-ai认证信息)    if request_headers:        for key, value in request_headers.items():            final_headers[key] = str(value)
    # 创建MCP客户端配置    server_config = {        "server": {            "url": url,            "transport""streamable_http",            "headers": final_headers  # Headers透传到MCP服务        }    }
    # 每次都创建新客户端,不缓存    client = MultiServerMCPClient(server_config)
    # 根据模式过滤工具    if not explore_mode and defined_tools:        # 路由模式:只返回定义的工具        return [t for t in all_tools if t.name in defined_tools]    else:        # 探索模式:返回所有工具        return all_tools

小结:MCP协议的引入统一了工具调用接口,Headers透传机制保证了认证链路的完整性,这是系统安全性的重要保障。

三、技术创新亮点

3.1 双模式执行策略

class ExecutionModeStrategy:    """    创新的双模式执行策略    """
    async def execute_explore_mode(self, query: str):        """        探索模式 - 适用于复杂、开放性问题
        特点:        1. 加载所有可用工具(20+)        2. AI自主选择最优工具组合        3. 支持多轮工具调用        4. 动态调整执行策略        """        # 加载完整工具集        all_tools = await self.load_all_tools()
        # 创建超级Agent        super_agent = create_react_agent(            llm=self.llm,            tools=all_tools,            prompt=self.exploration_prompt        )
        # 智能执行        return await super_agent.astream(query)
    async def execute_route_mode(self, query: str, agent_id: str):        """        路由模式 - 适用于明确、专业的任务
        特点:        1. 直接路由到专业Agent        2. 只加载必要工具(5-10个)        3. 执行速度快        4. 响应可预测        """        # 加载指定Agent        agent = await self.load_agent(agent_id)
        # 快速执行        return await agent.astream(query)

3.2 流式响应系统

class StreamingResponseSystem:    """    SSE流式响应系统    """
    async def generate_sse_stream(self, request: GCRequest):        """        生成SSE事件流
        技术特点:        1. 首包延迟 < 100ms        2. 思维链实时可视化        3. 内容渐进式输出        4. 错误优雅降级        """
        # 立即发送首个事件        yield self._format_sse({            "type""session_response_id",            "message": response_id,            "timestamp": datetime.now().isoformat()        })
        # 实时发送思考步骤        async for step in self.executor.think():            yield self._format_sse({                "type""thinking_step",                "message": step.description,                "step": step.type            })
        # 流式输出内容        async for token in self.executor.generate():            yield self._format_sse({                "type""token",                "message": token            })

3.3 智能工具调用格式化

def _get_tool_description(self, tool_name: str, tool_args: Dict) -> str:    """    智能工具描述生成
    创新点:    1. 根据工具类型生成友好描述    2. 参数智能提取和展示    3. 执行意图清晰表达    """
    # 工具语义映射    semantic_map = {        "getSystemSpace""获取系统空间信息",        "queryHostCoreMetrics""查询主机核心指标",        "queryHostProcData""分析进程运行状态",        "getHostErrorLogs""检索系统异常日志"    }
    # 智能描述生成    base_desc = semantic_map.get(tool_name, f"执行{tool_name}")
    # 参数增强    if "hosts" in tool_args:        base_desc += f" - 目标: {tool_args['hosts']}"    if "metrics" in tool_args:        base_desc += f" - 指标: {tool_args['metrics']}"
    return base_desc

3.4 企业级会话管理与用户隔离

class SessionManagement:    """    企业级会话管理系统    """
    async def manage_session(self, session_id: str, user_id: str):        """        会话管理核心
        技术特点:        1. Redis持久化 - 分布式会话存储        2. 用户隔离 - 基于Cookie的认证        3. 上下文保持 - 历史消息关联        4. 标题生成 - AI智能摘要        """
        # 会话创建/获取        session = await self.redis_store.get_or_create_session(            session_id=session_id,            user_id=user_id        )
        # 历史上下文构建        history = await self.redis_store.get_conversation_history(            session_id=session_id,            limit=20  # 保持最近20条消息        )
        # 智能标题生成        if not session.title:            title = await self._generate_session_title(                user_question=query,                ai_response=response            )            await self.redis_store.update_session_title(session_id, title)

小结:通过双模式执行、流式响应、智能工具格式化等创新技术,系统实现了效率与智能的完美平衡。

四、上下文/记忆管理

记忆是 Agent 能够进行连续对话的基础。本系统基于 Redis 构建了一套层次化的分布式记忆系统,实现了会话管理、历史记忆和用户隔离等核心功能。

class GCRedisStore:    """GC Agent的Redis存储实现"""
    def __init__(self, redis_url: str = None):        """初始化Redis连接"""        if redis_url:            self.redis = redis.from_url(redis_url)        else:            # 从配置文件读取            config = self._load_config()            redis_config = config.get('redis', {})            self.redis = redis.Redis(                host=redis_config.get('host''localhost'),                port=redis_config.get('port'6379),                db=redis_config.get('db'0),                decode_responses=True            )
    async def get_or_create_session(self, session_id: str, user_id: str = "default") -> SessionContext:        """获取或创建会话上下文"""        key = f"session:{session_id}"        context_data = await self.redis.hgetall(key)
        if not context_data:            # 创建新会话            context = SessionContext(                session_id=session_id,                user_id=user_id,                created_at=datetime.now(),                updated_at=datetime.now()            )            await self.redis.hset(key, mapping=context.dict())        else:            context = SessionContext(**context_data)
        return context
    async def add_user_message(self, session_id: str, content: str):        """添加用户消息到历史"""        message = {            "role""user",            "content": content,            "timestamp": datetime.now().isoformat()        }
        key = f"messages:{session_id}"        await self.redis.rpush(key, json.dumps(message))
        # 更新会话时间        await self._update_session_time(session_id)

小结:Redis存储层通过精心设计的Key结构和过期策略,实现了高效的会话管理和历史记忆功能。同时,为了提升历史会话的可读性,系统会在首轮对话结束后,异步触发一个轻量级 LLM 任务。该任务会根据用户的提问和 AI 的回答,提炼出精准的会话摘要(15 字以内),并更新到 Redis 中。这一过程对用户完全透明,不阻塞主交互流程。

五、认证机制

系统采用创新的三层鉴权架构,形成完整的安全闭环,保证数据安全可靠。

5.1 三层鉴权体系

第一层:网关层权限校验

拦截所有流量,通过与企业鉴权中心的交互,验证 Cookie 的合法性并解析出用户身份。这一层阻挡了所有未授权的外部访问。

  • Cookie交换:解析用户Cookie获取智汇云用户态信息

  • 用户识别:从Cookie中提取当前用户的各类信息

第二层:MCP Server鉴权

在 API 业务逻辑中,系统强制校验 Session ID 与 User ID 的绑定关系。即使用户猜到了其他人的 Session ID,也无法查看或操作对应的会话内容,确保了租户间的绝对隔离。

  • 服务认证:MCP Server自身的访问控制

  • 未来扩展:支持MCP Server市场的统一鉴权链接

  • 服务隔离:不同MCP Server间的权限隔离

第三层:数据权限校验

这是最具创新性的一层。系统将用户身份信息编码为 `x-ai` Header,并通过 MCP 协议透传至最底层的工具服务。这意味着,权限控制下沉到了数据源头——LLM 无法"看到"用户无权访问的主机或日志,从根本上解决了 AI 系统常见的数据泄露风险。

  • Headers透传:将用户信息透传到每个工具

  • 细粒度控制:每个MCP Server的Tool做各自的数据权限验证

  • 动态授权:基于用户角色的动态权限分配

5.2 认证服务实现

class AuthService:    """统一的认证服务"""
    async def get_user_info(self, request: Request) -> Tuple[strstr]:        """从请求中获取用户信息"""
        # 尝试从Cookie获取        cookie_value = request.cookies.get("sessionid"or request.cookies.get("JSESSIONID")
        if cookie_value:            try:                # 解析Cookie获取用户信息                user_info = self._parse_cookie(cookie_value)                user_name = user_info.get("userName""default")                return json.dumps(user_info), user_name            except Exception as e:                logger.warning(f"Cookie解析失败: {e}")
        # 降级到默认用户        default_user = {            "userName""default",            "userId""default",            "timestamp": datetime.now().isoformat()        }        return json.dumps(default_user), "default"
    def get_x_ai_header(self, user_info: str) -> str:        """生成X-AI header"""        # Base64编码用户信息        return base64.b64encode(user_info.encode()).decode()
    async def verify_session_ownership(        self,         session_id: str        user_name: str,        redis_store: GCRedisStore    ) -> bool:        """验证会话所有权"""        session_context = await redis_store.get_context(session_id)
        if not session_context:            return False
        return session_context.user_id == user_name

小结:三层鉴权机制从网关、服务、数据三个维度构建了完整的安全体系,确保了系统的安全性和可靠性。

六、系统可观测性与可进化能力

6.1 Trace数据上报

系统通过Traceloop统一上报采集数据到LLM应用监测模块,实现全链路追踪。

监控维度:

  • LLM调用监控:Token使用量、响应时间、错误率

  • 工具执行追踪:每个工具的执行时间和成功率

  • 会话分析:用户行为模式、热点问题识别

  • 性能指标:P99延迟、QPS、并发数

实现代码:

from traceloop.sdk import Traceloopfrom traceloop.sdk.tracing.tracing import Instruments# 先禁用 MCP 自动初始化Traceloop.init(    app_name="gc-agent",    resource_attributes={        "service.name""gc-agent",        "service.version""v1.0",        "service.xHawkUserSpaceId""xxx"    },    api_endpoint="http://traces.xxxx.lycc.qihoo.net:55683",    block_instruments={Instruments.MCP},    disable_batch=True)# 手动初始化 MCP instrumentormcp_instrumentor = Nonetry:    from opentelemetry.instrumentation.mcp import McpInstrumentor    mcp_instrumentor = McpInstrumentor()        # 验证初始化状态    logger.info(f"MCP instrumentor is instrumented: {mcp_instrumentor.is_instrumented_by_opentelemetry}")except Exception as e:    logger.error(f"Failed to initialize MCP instrumentor: {e}")    mcp_instrumentor = None# 添加src路径sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))

6.2 反馈服务系统

反馈服务主要用于收集用户对AI响应的评价(点赞/点踩),为系统后期优化提供数据支持。

  • 评价收集:支持用户对每个响应进行评价

  • 数据持久化:将反馈数据存储在Redis中

  • 统计分析:支持按会话、时间等维度统计

6.3 实现代码

class GCFeedbackService:    """反馈服务"""
    def __init__(self, redis_url: str = None):        """初始化反馈服务"""        if redis_url:            self.redis = redis.from_url(redis_url)        else:            # 从配置文件读取            config = self._load_config()            redis_config = config.get('redis', {})            self.redis = redis.Redis(                host=redis_config.get('host''localhost'),                port=redis_config.get('port'6379),                db=redis_config.get('feedback_db'1),  # 使用独立的DB                decode_responses=True            )
    async def save_response(        self,        response_id: str,        session_id: str,        user_question: str,        agent_response: str    ) -> Feedback:        """保存响应记录"""        feedback = Feedback(            response_id=response_id,            session_id=session_id,            user_question=user_question,            agent_response=agent_response,            created_at=datetime.now(),            updated_at=datetime.now()        )
        # 保存到Redis        key = f"feedback:{response_id}"        await self.redis.hset(key, mapping=feedback.dict())
        # 添加到会话索引        session_key = f"session_feedbacks:{session_id}"        await self.redis.sadd(session_key, response_id)
        return feedback
    async def update_rating(        self,        response_id: str,        rating: Optional[str],        session_id: Optional[str] = None    ) -> bool:        """更新反馈评分"""        key = f"feedback:{response_id}"
        # 检查反馈是否存在        exists = await self.redis.exists(key)        if not exists:            logger.warning(f"反馈记录不存在: {response_id}")            return False
        # 更新评分        updates = {            "rating": rating or "null",            "updated_at": datetime.now().isoformat()        }
        await self.redis.hset(key, mapping=updates)        logger.info(f"反馈评分已更新: {response_id} -> {rating}")
        return True

小结:反馈服务通过简单有效的设计,为系统的持续优化提供了重要的数据基础。


结语

云舟观测智能体系统通过分层架构设计、标准化工具体系和可扩展的智能编排能力,为企业提供了结构清晰、可审计、易集成的智能化监控观测能力。在当前架构基础上,系统将围绕以下方向持续演进:

技术演进方向:

  • Agent 市场化:支持第三方 Agent 接入,构建内部能力生态。

  • 主动式运维:从被动式操作协助迈向预测式、预防式运维。

  • 多模态支持:实现图表、日志、指标等多类型观测数据的统一分析。

产品化路线:

  • 私有化部署:支持与智汇云体系的集成与打包部署,满足企业级落地需求。

  • 插件化架构:允许用户按需扩展 MCP 工具与流程,实现业务级定制能力。

  • 生态化发展:与更多观测工具与运维平台深度集成,构建全链路协同生态。


更多技术干货,

请关注“360智汇云开发者”👇

360智汇云是以"汇聚数据价值,助力智能未来"为目标的企业应用开放服务平台,融合360丰富的产品、技术力量,为客户提供平台服务。

目前,智汇云提供数据库、中间件、存储、大数据、人工智能、计算、网络、视联物联与通信等多种产品服务以及一站式解决方案。

官网:https://zyun.360.cn(复制在浏览器中打开)

更多好用又便宜的云产品,欢迎试用体验~

添加工作人员企业微信👇,get更快审核通道+试用包哦~


【声明】内容源于网络
0
0
360智汇云开发者
360智汇云是以"汇聚数据价值,助力智能未来"为目标的企业应用开放服务平台,融合360丰富的产品、技术力量,为客户提供平台服务。
内容 585
粉丝 0
360智汇云开发者 360智汇云是以"汇聚数据价值,助力智能未来"为目标的企业应用开放服务平台,融合360丰富的产品、技术力量,为客户提供平台服务。
总阅读236
粉丝0
内容585