本文分享在生产环境中经过反复验证的A2A代理系统架构模式、设计原则与落地策略,适用于从0到1构建稳定可靠的A2A代理服务,帮助开发者规避常见陷阱。
代理执行器模式
A2A服务器需同时处理同步与异步操作,并保持协议处理与业务逻辑的清晰分离。通过引入专用代理执行器作为A2A协议与代理实现之间的桥梁,可有效解耦系统层级。
class AgentExecutor:"""Bridge between A2A protocol and agent implementation"""def __init__(self, agent_class):self.agent = agent_class()self.supported_content_types = ['text', 'text/plain']async def execute(self, context: ExecutionContext, event_queue: EventQueue):try:user_input = context.get_user_input()if context.is_streaming():await self._handle_streaming(user_input, context, event_queue)else:await self._handle_non_streaming(user_input, context, event_queue)except Exception as e:await self._handle_error(e, context, event_queue)
该模式实现了关注点分离、错误隔离、高可测试性与实现灵活性,提升系统整体稳定性与可维护性。
结构化响应模式
不同代理返回格式不一导致客户端集成困难。采用Pydantic模型强制统一响应结构,确保输出一致性与类型安全。
from pydantic import BaseModelfrom typing import Literalclass ResponseFormat(BaseModel):"""Standardized response format for all agents"""status: Literal['input_required', 'completed', 'error']message: strmetadata: Optional[Dict[str, Any]] = Noneclass CurrencyAgent:def __init__(self):self.graph = create_react_agent(self.model,tools=self.tools,response_format=ResponseFormat,)
优势包括:响应一致、编译时类型校验、简化客户端处理、自文档化API接口,降低集成复杂度。
多协议集成模式
现代代理需对接多种协议(如MCP、API等),同时保持统一调用接口。通过协议适配器实现主备机制,保障系统可用性。
class MathAgent:async def _init_mcp_tools(self):try:server_params = StdioServerParameters(command="python",args=[str(self.math_server_path)],)async with stdio_client(server_params) as (read, write):session = ClientSession(read, write)await session.initialize()tools = await load_mcp_tools(session)return toolsexcept Exception as e:print(f"MCP failed: {e}, falling back to direct API")return self._create_direct_tools()
关键原则:支持优雅降级、抽象协议细节、透明记录错误、监控协议使用情况,增强系统韧性。
新鲜会话模式
长期连接易出现状态过期问题。为每个请求创建独立连接,提升可靠性与隔离性。
class MathAgent:async def process_request(self, request: str) -> str:server_params = StdioServerParameters(command="python",args=[str(self.math_server_path)],)async with stdio_client(server_params) as (read, write):async with ClientSession(read, write) as session:await session.initialize()tools = await load_mcp_tools(session)agent = create_react_agent(self.model, tools)return await agent.ainvoke({"messages": request})
优点:避免连接老化、故障隔离;代价:轻微性能开销、资源消耗增加。适用于外部协议连接不稳定场景。
智能编排模式
多个专用代理需协调工作,需实现智能路由与流程控制。基于LangGraph构建评分驱动的编排器。
class SmartOrchestrator:def _calculate_agent_score(self, request: str, agent_card: AgentCard) -> tuple[float, List[str]]:score = 0.0matched_skills = []request_lower = request.lower()# Keyword matching (weight: 1.0)keywords = [tag for skill in agent_card.skills for tag in (skill.tags or [])]for keyword in keywords:if keyword.lower() in request_lower:score += 1.0# Skill matching (weight: 1.5)for skill in agent_card.skills:if self._skill_matches_request(skill.name, request):score += 1.5matched_skills.append(skill.name)return score, matched_skillsasync def _analyze_request(self, state: RouterState) -> RouterState:request = state["request"]best_agent = Nonebest_score = 0.0agent_scores = {}for agent_id, agent_card in self.agents.items():score, matched_skills = self._calculate_agent_score(request, agent_card)agent_scores[agent_id] = scoreif score > best_score:best_score = scorebest_agent = agent_idconfidence = min(best_score / 5.0, 1.0)state.update({"selected_agent": best_agent,"confidence": confidence,"metadata": {"agent_scores": agent_scores}})return state
优势:评分规则可扩展、决策过程透明、支持复杂工作流管理、兼容A2A代理卡发现机制。
代理发现模式
客户端需动态获取可用代理功能。通过探测`.well-known/agent.json`端点实现自动发现。
async def display_available_agents(httpx_client, agent_url: str, card):if "orchestrator" in card.name.lower():agent_endpoints = [("ArgoCD Agent", "http://localhost:8001"),("Currency Agent", "http://localhost:8002"),("Math Agent", "http://localhost:8003")]available_agents = []for name, endpoint in agent_endpoints:try:response = await httpx_client.get(f"{endpoint}/.well-known/agent.json",timeout=2.0)if response.status_code == 200:agent_data = response.json()available_agents.append({"name": agent_data.get("name", name),"description": agent_data.get("description", ""),"url": endpoint,"skills": agent_data.get("skills", [])})except:passif available_agents:print(f"Found {len(available_agents)} available agents:")for i, agent in enumerate(available_agents, 1):print(f"{i}. {agent['name']} ({agent['url']})")print(f" Description: {agent['description']}")if agent['skills']:skills = [skill.get('name', 'Unknown') for skill in agent['skills'][:3]]print(f" Skills: {', '.join(skills)}")
实施要点:进行运行状况检查、设置请求超时、展示能力信息、仅在上下文需要时触发发现。
输入验证模式
防止恶意或异常输入引发安全风险。实施多层验证机制,涵盖长度、字符集、关键词过滤与内容清理。
def validate_mathematical_expression(expression: str) -> bool:if len(expression) > 1000:return Falsedangerous_patterns = ['import', 'exec', 'eval', '__', 'subprocess']if any(pattern in expression.lower() for pattern in dangerous_patterns):return Falseimport reallowed_chars = re.compile(r'^[a-zA-Z0-9+\-*/()^.,\s=<>!]+$')return bool(allowed_chars.match(expression))def validate_currency_code(currency: str) -> bool:return len(currency) == 3 and currency.isalpha() and currency.isupper()def sanitize_user_input(user_input: str) -> str:import resanitized = re.sub(r'[<>"\']', '', user_input)return sanitized.strip()[:10000]
通过长度限制、危险模式检测、正则白名单与输入净化,全面降低注入攻击与系统崩溃风险。
资源管理模式
防止代理过度消耗CPU或内存资源。通过上下文管理器设置资源限制。
import resourceimport asynciofrom contextlib import asynccontextmanager@asynccontextmanagerasync def resource_limits(cpu_seconds=30, memory_mb=512):try:resource.setrlimit(resource.RLIMIT_CPU, (cpu_seconds, cpu_seconds))memory_bytes = memory_mb * 1024 * 1024resource.setrlimit(resource.RLIMIT_AS, (memory_bytes, memory_bytes))yieldfinally:resource.setrlimit(resource.RLIMIT_CPU, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))resource.setrlimit(resource.RLIMIT_AS, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))class MathAgent:async def process_request(self, request: str) -> str:async with resource_limits(cpu_seconds=30, memory_mb=512):return await self._unsafe_process_request(request)
通过设定CPU时间与内存上限,防止个别代理拖垮整个系统,保障高并发下的稳定性。
错误恢复模式
外部服务故障不应导致代理不可用。引入断路器机制实现自动熔断与恢复。
from enum import Enumimport timeclass CircuitState(Enum):CLOSED = "closed"OPEN = "open"HALF_OPEN = "half_open"class CircuitBreaker:def __init__(self, failure_threshold=5, recovery_timeout=60):self.failure_threshold = failure_thresholdself.recovery_timeout = recovery_timeoutself.failure_count = 0self.last_failure_time = Noneself.state = CircuitState.CLOSEDasync def call(self, func, *args, **kwargs):if self.state == CircuitState.OPEN:if time.time() - self.last_failure_time > self.recovery_timeout:self.state = CircuitState.HALF_OPENelse:raise Exception("Circuit breaker is OPEN")try:result = await func(*args, **kwargs)self._on_success()return resultexcept Exception as e:self._on_failure()raise edef _on_success(self):self.failure_count = 0self.state = CircuitState.CLOSEDdef _on_failure(self):self.failure_count += 1self.last_failure_time = time.time()if self.failure_count >= self.failure_threshold:self.state = CircuitState.OPENclass CurrencyAgent:def __init__(self):self.api_circuit_breaker = CircuitBreaker()async def get_exchange_rate(self, from_curr, to_curr):try:return await self.api_circuit_breaker.call(self._call_frankfurter_api, from_curr, to_curr)except Exception:return self._get_cached_rate(from_curr, to_curr)
断路器在连续失败后自动打开,阻止后续请求,超时后尝试恢复,配合降级逻辑保障服务可用性。
连接池模式
频繁创建HTTP连接影响性能。使用连接池复用连接,减少延迟。
import httpxfrom typing import Optionalclass HTTPClientManager:"""Singleton HTTP client manager with connection pooling"""_instance: Optional['HTTPClientManager'] = None_client: Optional[httpx.AsyncClient] = Nonedef __new__(cls):if cls._instance is None:cls._instance = super().__new__(cls)return cls._instanceasync def get_client(self) -> httpx.AsyncClient:if self._client is None:self._client = httpx.AsyncClient(timeout=30.0,limits=httpx.Limits(max_connections=100,max_keepalive_connections=20),headers={"User-Agent": "A2A-Agent/1.0","Accept": "application/json"})return self._clientasync def close(self):if self._client:await self._client.aclose()self._client = Noneclass CurrencyAgent:async def get_exchange_rate(self, currency_from: str, currency_to: str):client_manager = HTTPClientManager()client = await client_manager.get_client()response = await client.get(f'https://api.frankfurter.app/latest',params={'from': currency_from, 'to': currency_to})return response.json()
通过单例管理与连接池配置,显著降低请求延迟,提升高并发场景下的系统性能与资源利用率。
缓存模式
避免重复调用外部API。实现带TTL的内存缓存机制,平衡性能与数据新鲜度。
from functools import lru_cacheimport timefrom typing import Dict, Tuple, Anyimport asyncioclass TTLCache:"""Time-based cache with TTL support"""def __init__(self, ttl_seconds: int = 300):self.ttl_seconds = ttl_secondsself.cache: Dict[str, Tuple[Any, float]] = {}self.lock = asyncio.Lock()async def get(self, key: str) -> Any:async with self.lock:if key in self.cache:value, timestamp = self.cache[key]if time.time() - timestamp < self.ttl_seconds:return valueelse:del self.cache[key]return Noneasync def set(self, key: str, value: Any):async with self.lock:self.cache[key] = (value, time.time())class CurrencyAgent:def __init__(self):self.rate_cache = TTLCache(ttl_seconds=300)async def get_exchange_rate(self, currency_from: str, currency_to: str):cache_key = f"{currency_from}_{currency_to}"if await self.rate_cache.get(cache_key):return await self.rate_cache.get(cache_key)result = await self._fetch_from_api(currency_from, currency_to)await self.rate_cache.set(cache_key, result)return result
通过设置合理TTL与线程安全访问控制,减少冗余调用,加快响应速度,维持系统高效运行。
构建生产级A2A服务器需综合考虑架构设计、性能优化、安全性与可运维性。核心原则包括:协议与业务逻辑分离、对外部依赖实现优雅降级、实施资源限制与监控、采用Pydantic保证类型安全、异步处理提升并发能力、连接池与缓存优化性能、多层输入验证保障安全。未来发展方向涵盖多协议支持、动态功能发现、联合代理网络、AI驱动路由与事件驱动架构。上述模式已在实际场景中验证,可根据具体需求灵活选用并逐步演进,打造高效稳定的A2A代理系统。

