写点什么

构建代理式 MLOps:一种基于 A2A 和 MCP 的分层协议策略

作者:Shashank Kapoor, Sanjay Surendranath Girija, Lakshit Arora
  • 2026-02-24
    北京
  • 本文字数:15243 字

    阅读完需:约 50 分钟

引言

随着软件行业进入智能体时代,开发者和架构师面临着一个熟悉的挑战。正如微服务的兴起需要标准化的通信模式,如 REST 和 gRPC,专业 AI 智能体的激增需要一个强大的框架,使它们能够有效地发现、通信和协作。

本文提出了一个结合两个新兴标准的架构模式:Agent-to-Agent(A2A)协议和模型上下文协议(MCP)。通过分层这些协议,我们可以创建强大、可伸缩、可扩展和可互操作的多智能体系统,在智能体时代,可以在不改变智能体的核心通信逻辑的情况下添加新功能。

在本文中,我们首先介绍每个协议的核心概念,然后将分层协议策略应用于 MLOps 用例,目标是在验证成功后部署模型,然后详细说明相应的代码,使其栩栩如生。代码将展示一个架构模式,用于从执行逻辑中解耦编排逻辑,这是在可扩展性中使用的原则。

在智能体驱动的范式中,目标是用专业 AI 智能体的动态团队取代僵化的管道。例如,在我们的 MLOps 用例中,负责部署模型的编排器智能体可能需要与验证智能体和部署智能体协作。此场景提出了两个基本挑战:这些智能体如何发现并相互通信,以及它们如何访问其任务所需的特定工具和数据?本文提出的架构通过为每个协议分配不同的角色来解决这个问题。

A2A 提供了通信总线,允许编排器在没有硬编码连接的情况下找到并执行适当的专家任务。MCP 作为一种通用的功能语言,确保智能体一旦被委托,无论其底层实现如何,都可以发现和利用必要的工具。

图 1:我们的 MLOps 用例的 A2A 和 MCP 栈

故意选择 MLOps 用例的例子作为概念桥梁,说明从今天的静态管道到明天的动态、代理驱动操作的演变。虽然现有的编排器功能强大,但它们的僵化可能成为未来的瓶颈。当业务逻辑发生变化时,管道通常需要重写和重新部署。相比之下,分层代理架构是为这种演变而构建的。我们展示的编排者协调验证和部署智能体将突出这一关键优势:通过组合能力适应新要求,而不是重写大量代码。随着 AI 智能体的发展,从静态执行到动态协调的转变是我们想要证明的核心原则。

这里展示的原则不仅限于 MLOps,可以应用于任何领域。

工作流程中的分层协议

A2A:智能体到智能体通信总线

A2A 旨在使 AI 智能体能够安全地跨不同系统通信,无论供应商如何。它解决了多代理环境中的互操作性需求。通过允许来自不同供应商的代理互操作,A2A 有助于解锁模块化工作流程,减少供应商锁定,并增强可扩展性。将其视为你的代理的通用语言。

关键机制

  • 互操作性的关键要素:在 A2A 世界中,每个智能体都被分配一个“代理卡”,描述其能力、支持的协议和可接受的请求类型,使其他智能体能够发现和交互,而不会暴露敏感细节。将其视为代理的特征。随着你的智能体的演变,这张卡也会随之演变,允许外部世界识别升级。

  • 通信:在 A2A 中,消息使用标准Web技术交换,使用JSONJSON-RPC等格式。这简化了与现有 Web 基础设施的集成,因为智能体的到来不应该中断现有的通信技术。

  • 安全与治理:A2A 已纳入Linux基金会,以促进中立、协作的治理和长期可持续性。

为什么 A2A 重要:

  • 将孤立的“单次 LLM 工具”转变为能够合作、协商和专业化的多智能体系统。

  • 使工作流程中的一个智能体可以作为另一个智能体的同行调用,而不仅仅是作为 API 客户端。

  • 支持水平扩展智能:不是构建一个庞大的智能体,而是编排小型、专业化的生态系统。

MCP:特定领域的语言

MCP 是一个旨在标准化 AI 系统如何连接到工具、服务和数据源的协议。经常被描述为 AI 集成的“USB-C”,MCP 提供了一个通用接口,允许 AI 应用程序插入外部数据源和工具,而无需定制胶水代码。

关键机制

  • 互操作性的关键要素:MCP 服务器暴露了三种主要类型的实体。工具提供了代理可以调用的操作,比如执行代码或调用 API。资源包括代理可以查询或加载的结构化数据。提示提供了预定义的模板来指导代理行为。这些原语是标准定义,以便任何 MCP 兼容的客户端都可以在没有自定义集成的情况下发现和使用它们。

  • 通信:类似于 A2A,MCP尝试重用现有的通信技术,如HTTPSSE等。它还使用简单的客户端-服务器架构。

  • 安全和治理:MCP 实现了强大的集成,但也引入了诸如快速注入、工具中毒和未经授权的数据访问等风险。尽管单独使用它可能并不理想,但它可以与其他工具(如MCPWatch)有效地捆绑在一起,以增强系统保护。

为什么 MCP 很重要:

MCP使智能体能够超越固定技能,允许它们发现和使用网络上任何可用的工具或资源。这支持在不重建智能体的情况下添加新功能。

MCP 允许无缝工具集成,让智能体将工具视为可发现的服务。这消除了自定义集成逻辑,并简化了添加新功能、API 或数据集的过程。

MLOps 工作流

为了演示我们的分层架构,我们将使用一个非常常见的 MLOps 工作流用例,即自动化机器学习模型的验证和部署。系统由三个专门的智能体组成,它们相互协作以实现目标:

  • 编排器智能体:充当协调员。它将高级目标(例如,“验证并部署最新模型”)翻译成一系列任务。使用 A2A 协议,它发现每个任务的适当专家智能体,传递所需的上下文,并根据结果做出决策。

  • 验证智能体:专注于模型验证的专家智能体。它通过其 A2A 智能体卡暴露其能力,如性能测试或偏见分析。要执行请求,它发现并使用实现这些检查的底层 MCP 工具。这允许编排器请求验证而无需了解实现细节。

  • 部署智能体:负责部署经过验证的模型的专家智能体。像验证代理一样,它使用其 A2A 卡来宣传其能力,并发现执行部署所需的 MCP 工具。

工作流的序列图

图 2:MLOps 工作流的序列图

执行和流程

从查询到编排

当 MLOps 工程师提交高级查询时,流程开始。 OrchestratorAgent 在其 stream 方法中接收此查询。它立即调用其内部的 _create_plan_from_query 方法,使用其 LLM 驱动的推理将复杂请求分解为两个不同的高级子目标的任务列表:一个用于验证,一个用于部署。

从编排到专业化

编排器的 stream 方法开始执行计划。对于第一个任务,它使用 A2A 来发现并调用 ValidationAgent ,并向其传递特定的验证指令。 ValidationAgent 现在在它的 stream 方法中接收这个子查询。然后调用 _create_tool_use_plan 方法。这里有一个重要的区别:它的计划不是关于委托,而是关于使用工具的。它通过 MCP 发现 fetch_modelvalidate_churn_model 工具,并制定一系列工具调用以满足请求。其具体实现方式应编码在其初始化时定义的prompt_personality 字符串中。

从工具到结果

ValidationAgent 执行其工具使用计划,调用 MCP 服务器完成工作,并将结果流返回到编排器。如果验证成功,编排器将继续执行第二个任务,调用 DeploymentAgentDeploymentAgent 遵循相同的模式:它创建一个工具使用计划(首先获取当前状态,然后部署)并执行它。然后将最终结果流回用户。

代码模式概览

现在,我们将我们的架构理论转化为实践。在这个代码演练中,我们将重点关注来自 MLOps 工程师的一个示例查询:

“检索最新的流失预测模型,并通过验证模块运行它。如果模型的绝对偏差小于或等于 0.04,则批准其部署。将新模型部署到备用区域:如果当前生产模型在 us-west-1 中运行,则将此版本部署到 us-west-2;否则,将其部署到 us-west-1”。

为了构建一个能够执行此类命令的系统,我们首先建立其基础组件。我们将从设置 MCP 服务器开始,它充当智能体和它们执行任务所需的底层工具之间的桥梁。然后我们再介绍 A2A 构建块以及连接两种协议的胶水。

关于实现的注意事项:代码中的许多函数故意留作占位符。这是因为它们的内部逻辑特定于实现(例如,验证库、云供应商或部署工具的选择)。本文的重点在于展示这些组件如何交互的架构模式。

MCP 服务器

MCP 服务器充当我们系统中所有功能的中心枢纽。它为专家智能体将使用的工具和资源提供了标准化和可发现的接口。这个服务器将智能体与底层应用逻辑解耦。

对于我们的 MLOps 工作流程,MCP 服务器暴露了以下关键端点:

工具(智能体可以调用的动作)

  • fetch_model:从模型注册表中检索最新训练模型的元数据。

  • validate_churn_model:根据提供的要求对模型执行验证逻辑。

  • deploy_churn_model:触发将验证过的模型部署到特定环境。

资源(智能体可以查询的结构化数据)

  • list_agent_cards:提供系统中所有可用代理的列表。

  • retrieve_agent_skills:获取特定智能体的详细能力。

下面的 Python 代码演示了如何使用 FastMCP 库定义这个服务器及其端点。注意,每个函数内部的实现逻辑被故意省略,因为这会根据其他数据工具而变化。这里的重点是架构模式:如何定义、命名和通过标准化协议暴露能力,使任何授权智能体都可以使用。

#mcp_server.pyfrom mcp.server.fastmcp import FastMCPdef serve(host, port, transport):    """Initializes and runs the MCP Server    Args:        host: The hostname or IP address to bind the server to.        port: The port number to bind the server to.        transport: The transport mechanism for the MCP server (e.g., 'stdio', 'sse').    """    mcp = FastMCP("validation-deployment-mcp-server", host=host, port=port)    @mcp.tool(        name="fetch_model",        description="MCP Tool that fetches the latest trained user churn model.",    )    def fetch_model(model_version_metadata : dict) -> dict:        """MCP Tool that fetches the latest trained user churn model metadata.        Args:            model_version_metadata: Which model data is required.         Returns:            JSON object that returns the Metadata information where the new model            is present and other metadata for validation purposes like test dataset            for validation etc.        """        pass    @mcp.tool(        name="validate_churn_model",        description="MCP that validates the churn model.",    )    def validate_churn_model(validation_config: dict) -> dict:        """MCP Tool that validates the churn model based on validation_config.        Args:            validation_config: config containing validation requirements.        Returns:            JSON object returning the validation status.        """        pass    @mcp.tool(        name="deploy_churn_model",        description="MCP that deploys the churn model.",    )    def deploy_churn_model(deployment_config: dict) -> dict:        """MCP Tool that deploys the churn model based on deployment_config.        Args:            deployment_config: config containing deployment requirements.        Returns:            JSON object returning the deployment status.        """        pass    @mcp.resource("resource://list_agent_cards/list", mime_type="application/json")    def list_agent_cards() -> dict:        """Retrieves all loaded agent cards as a json / dictionary for the MCP resource endpoint.        This function serves as the handler for the MCP resource identified by        the URI 'resource://agent_cards/list'.        Returns:            A JSON object containing a list of all available agents.        """    @mcp.resource(        "resource://retrieve_agent_skills/{agent_name}", mime_type="application/json"    )    def retrieve_agent_skills(agent_name: str) -> dict:        """Retrieves an agent card as JSON data.        Returns:            A JSON object of Agent Card.        """        pass    mcp.run(transport=transport)def main(host, port, transport) -> None:    serve(host, port, transport)
复制代码

MCP 客户端

为了让智能体发现并使用 MCP 服务器暴露的能力,它需要一个客户端。这个客户端模块充当一个高级 API,抽象掉了 MCP 协议的原始细节。它不是强迫每个智能体构建资源 URI 和管理连接状态,而是提供了一个干净、可重用的接口,包含 list_agents()list_tools() 等方法。

下面的代码概述了一个围绕 mcp.ClientSession 构建的简单 MCPClient 类。它使用异步上下文管理器来处理与服务器的连接生命周期。注意,连接细节被简化,以突出 API 设计,而不是特定传输连接的完整实现。

from contextlib import asynccontextmanagerfrom typing import Any, AsyncGenerator, Dict, Listfrom mcp import ClientSessionfrom mcp.types import ReadResourceResult, ListResourcesResult, ListToolsResultclass MCPClient:    """A high-level client for interacting with the MLOps MCP server."""    def __init__(self, host: str, port: int, transport: str):        """        Initializes the client with the server's connection details.                Args:            host: The hostname or IP of the MCP server.            port: The port of the MCP server.            transport: The transport mechanism (e.g., 'http', 'sse').        """        self._host = host        self._port = port        self._transport = transport    @asynccontextmanager    async def _get_session(self) -> AsyncGenerator[ClientSession, None]:        """        Provides a managed session to connect with the MCP server.        The actual implementation of this would depend on the chosen transport.        """        # In a real implementation, you would initialize the session here        # based on self._host, self._port, etc.        session: ClientSession = None  # Placeholder for the actual session object        try:            # For example:connected through http            yield session        finally:            # For example: await session.close()            pass    async def list_agents(self) -> ReadResourceResult:        """        Retrieves the list of all available agent cards from the MCP server.        """        async with self._get_session() as session:            return await session.read_resource("resource://list_agent_cards/list")    async def get_agent_skills(self, agent_name: str) -> ReadResourceResult:        """        Retrieves the skills for a specific agent from the MCP server.        """        async with self._get_session() as session:            uri = f"resource://retrieve_agent_skills/{agent_name}"            return await session.read_resource(uri)    async def list_resources(self) -> ListResourcesResult:        """Lists all available resources on the MCP server."""        async with self._get_session() as session:            return await session.list_resources()    async def list_tools(self) -> ListToolsResult:        """Lists all available tools on the MCP server."""        async with self._get_session() as session:            return await session.list_tools()
复制代码

智能体的执行辅助程序

为了执行多步计划,智能体需要一种结构化的方法来管理其任务。下面的辅助类为这个任务提供了一个可重用的模式。其核心思想是将一个复杂的目标表示为一个任务列表(TaskList),它本质上是一个计划或一系列任务对象。每个任务表示工作流中的单个具体步骤,例如找到合适的专家智能体或调用特定的工具。

这种方法允许智能体的高层推理与低层执行机制解耦。

import jsonfrom collections.abc import AsyncIterablefrom a2a.client import A2AClientfrom uuid import uuid4import httpxfrom a2a.types import (    AgentCard,    MessageSendParams,    SendStreamingMessageRequest,    SendStreamingMessageSuccessResponse,    TaskArtifactUpdateEvent,)from mcp_client import MCPClientfrom a2a.server.agent_execution import AgentExecutor, RequestContextfrom a2a.server.events import EventQueueclass Task:    """Represents a single task that needs to be executed in the task list."""    task_query: str    def __init__(self, *args, **kwargs):        pass    async def find_agent_for_task(self, mcp_client, query) -> AgentCard | None:        """Fetch an agent card suitable for the node's task from MCP."""        result = await mcp_client.list_agents(query)        chosen_agent = select_agent(query)        agent_card_json = json.loads(chosen_agent.content[0].text)        return AgentCard(**agent_card_json)    async def execute_task(        self,    ) -> AsyncIterable[dict[str, any]]:        """Execute the node task via A2A streaming messages using the assigned agent."""        agent_card = await self.find_agent_for_task(query=self.task_query)        async with httpx.AsyncClient() as httpx_client:            client = A2AClient(httpx_client, agent_card)  # A2A Client queries the Agent            payload: dict[str, any] = {                "message": {                    "parts": [{"kind": "text", "text": self.task_query}],                    # Can have other elements too based on Agent Card inputs.                },            }            request = SendStreamingMessageRequest(                id=str(uuid4()), params=MessageSendParams(**payload)            )            response_stream = client.send_message_streaming(request)            async for chunk in response_stream:                # Save the artifact as a result of the node                if isinstance(chunk.root, SendStreamingMessageSuccessResponse) and                              isinstance(chunk.root.result, TaskArtifactUpdateEvent):                    artifact = chunk.root.result.artifact                    self.results = artifact                yield chunkclass TaskList:    """Represents a Topological graph of tasks that need to be executed"""    task_list: list[Task]  # Task list that needs to be executed.    def __init__(self, *args, **kwargs) -> None:        """        Breaks the query into a task list and the order in which it should be         executed.The AI agent should break this down and put it in the task_list         array.        """        pass    async def execute_task_list(self) -> AsyncIterable[dict[str, any]]:        """        Executes the tasks for the agent.        """        # .....        for task in self.task_list:            # ....            task.execute_task()class GenericAgentExecutor(AgentExecutor):    """AgentExecutor used by the agents."""    def __init__(self, agent):        self.agent = agent    async def execute(        self,        context: RequestContext,        event_queue: EventQueue,    ) -> None:        pass
复制代码

协调智能体

卡片

{    "name": "Orchestrator Agent",    "description": "Helps in invoking the MLOps workflow. Which will do validtion and deployment",    "url": "http://localhost:8003/",    "version": "1.0.0",    "skills": [        {            "id": "orchestrate_the_flow",            "name": "orchestrate_the_flow",            "description": "Helps in orchestrating MLOps Workflow",            "tags": [                "Validate the model and then deploy it."            ],            "examples": [                "Retrieve the latest churn prediction model and run it through the validation module. If the model’s absolute bias is less than or equal to 0.04, approve it for deployment. Deploy the new model to the alternate region: if the current production model is running in us-west-1, deploy this version to us-west-2; otherwise, deploy it to us-west-1."            ]        }    ]}
复制代码

代码样板

from typing import AsyncIterable, Anyfrom agent_helpers import TaskListfrom mcp_client import MCPClientclass OrchestratorAgent:    """    Orchestrates a multi-step workflow by breaking a high-level goal    into a sequence of tasks for specialist agents.    """    def __init__(self, mcp_client: MCPClient, prompt_personality: str):        """        Initializes the Orchestrator Agent.        Args:            mcp_client: A client for interacting with the MCP server.            prompt_personality: Instructions guiding the agent's planning process.        """        self._mcp_client = mcp_client        self._prompt_personality = prompt_personality    async def _create_plan_from_query(self, query: str) -> TaskList:        """        Translates a natural language query into a structured TaskList for delegation.        """        # This method simulates the agent's high-level reasoning process.        # The agent's LLM, guided by its personality prompt, would parse the        # user's query to identify distinct, sequential steps.        # For our example query, it would identify two main sub-goals:        # 1. A validation step with a specific condition.        # 2. A deployment step that depends on the outcome of the first.        # The agent then creates a TaskList where each Task encapsulates the        # natural language instruction for that sub-goal. This is different        # from a specialist agent, whose plan would involve specific tool calls.        #        # Task 1 Query: "Retrieve the latest churn prediction model... approve it for deployment."        # Task 2 Query: "Deploy the new model to the alternate region..."        #        # The output of this method would be a TaskList object containing        # these two Task objects, ready for execution.        pass    async def stream(self, query: str) -> AsyncIterable[dict[str, Any]]:        """        Processes a query by creating a plan and then executing it.        """        # 1. CREATE THE PLAN        # The agent first calls its internal planning method to translate        # the natural language query into a structured TaskList.        plan = await self._create_plan_from_query(query)        # 2. EXECUTE THE PLAN        # The agent then executes the plan. The plan.execute() method will        # iterate through the Tasks. For each Task, it will find the        # appropriate specialist agent (Validation, then Deployment) and        # stream the sub-query to it. The results are then yielded back.        pass
复制代码

验证智能体

卡片

{    "name": "Validation Agent",    "description": "Helps in validating the MLOps model.",    "url": "http://localhost:8004/",    "version": "1.0.0",    "skills": [        {            "id": "validate_the_model",            "name": "validate_the_model",            "description": "Helps in validating MLOps models",            "tags": [                "Validate the model based on user requirements."            ],            "examples": [                "Retrieve the latest churn prediction model and run it through the validation module. If the model’s absolute bias is less than or equal to 0.04, approve it for deployment."            ]        }    ]}
复制代码

代码样板

from typing import AsyncIterable, Anyfrom mcp_client import MCPClientclass ValidationAgent:    """    A specialist agent that validates a machine learning model by discovering    and using tools from the MCP server.    """    def __init__(self, mcp_client: MCPClient, prompt_personality: str):        """        Initializes the Validation Agent.        Args:            mcp_client: A client for interacting with the MCP server.            prompt_personality: Instructions guiding the agent's tool-use logic.        """        self._mcp_client = mcp_client        self._prompt_personality = prompt_personality    async def _create_tool_use_plan(self, query: str):        """        Translates a natural language query into a structured plan of tool calls.        """        # This method simulates the agent's reasoning process.        # 1. DISCOVER: The agent first needs to understand what it can do.        # It would call self._mcp_client.list_tools() to get a real-time        # list of all available capabilities on the MCP server. This allows        # it to dynamically learn that tools like 'fetch_model' and        # 'validate_churn_model' are available. This should be part of the         # prompt_personality        # 2. PLAN: Based on the available tools and the specific user query,        # the agent formulates a plan. For the query: "...absolute bias is        # less than or equal to 0.04...", its LLM would determine that it        # needs to:        #   a. Fetch the model's metadata using the 'fetch_model' tool.        #   b. Construct a 'validation_config' containing the bias check,        #      extracting the '0.04' threshold from the query.        #   c. Call the 'validate_churn_model' tool with that config.        #        # The output of this method would be a structured object, like a list        # of pre-configured tool calls, ready for execution.        pass    async def stream(self, query: str) -> AsyncIterable[dict[str, Any]]:        """        Processes a validation query by creating a plan and then executing it.        """        # 1. CREATE THE PLAN        # The agent first calls its internal planning method to translate        # the natural language query into a structured sequence of tool calls.        plan = await self._create_tool_use_plan(query)        # 2. EXECUTE THE PLAN        # The agent would then iterate through the steps in the generated plan.        # It would call the necessary MCP client methods (fetch_model,        # validate_churn_model) in the correct order with the correct        # parameters derived during the planning phase. The results of each        # step would be yielded back to the Orchestrator.        pass
复制代码

部署智能体

卡片

{    "name": "Deployment Agent",    "description": "Helps in deploying the validated MLOps model.",    "url": "http://localhost:8005/",    "version": "1.0.0",    "skills": [        {            "id": "deploy_the_model",            "name": "deploy_the_model",            "description": "Helps in deploying MLOps models",            "tags": [                "Deploy the model based on user requirements."            ],            "examples": [                "Deploy the new model to the alternate region: if the current production model is running in us-west-1, deploy this version to us-west-2; otherwise, deploy it to us-west-1."            ]        }    ]}
复制代码

代码样板

from typing import AsyncIterable, Anyfrom mcp_client import MCPClientclass DeploymentAgent:    """    A specialist agent that deploys a validated machine learning model by    discovering and using tools from the MCP server.    """    def __init__(self, mcp_client: MCPClient, prompt_personality: str):        """        Initializes the Deployment Agent.        Args:            mcp_client: A client for interacting with the MCP server.            prompt_personality: Instructions guiding the agent's tool-use logic.        """        self._mcp_client = mcp_client        self._prompt_personality = prompt_personality    async def _create_tool_use_plan(self, query: str):        """        Translates a natural language query into a structured plan of tool calls.        """        # This method simulates the agent's reasoning process.        # 1. DISCOVER: The agent determines its available capabilities.        # It would call self._mcp_client.list_tools() to learn that tools        # like 'fetch_model' and 'deploy_churn_model' are available. Again done by prompt        # personality.        # 2. PLAN: The agent formulates a plan based on the query: "Deploy        # the new model to the alternate region...". Its LLM reasoning would be:        #   a. To find the "alternate" region, I must first find the "current" one.        #   b. The 'fetch_model' tool can get me the metadata of the current        #      production model.        #   c. From that metadata, I can extract the current deployment region.        #   d. I can then write logic to determine the alternate region.        #   e. The final plan is a sequence of two tool calls: first fetch_model        #      to get the state, then deploy_churn_model to execute the change.        # The output of this method would be a structured object, like a list        # of pre-configured tool calls, ready for execution.        pass    async def stream(self, query: str) -> AsyncIterable[dict[str, Any]]:        """        Processes a deployment query by creating a plan and then executing it.        """        # 1. CREATE THE PLAN        # The agent first calls its internal planning method to translate        # the natural language query into a structured sequence of tool calls.        plan = await self._create_tool_use_plan(query)        # 2. EXECUTE THE PLAN        # The agent would then iterate through the steps in the generated plan.        # It would call the necessary MCP client methods (fetch_model,        # deploy_churn_model) in the correct order with the correct        # parameters derived during the planning phase. The results of each        # step would be yielded back to the caller.        pass
复制代码

脚本启动所有智能体

import jsonimport httpxfrom pathlib import Pathfrom basic_helper.promp_personalities import promptsfrom orchestrator_agent import OrchestratorAgentfrom validation_agent import ValidationAgentfrom deployment_agent import DeploymentAgentfrom a2a.types import AgentCardimport uvicornfrom a2a.server.apps import A2AStarletteApplicationfrom a2a.server.request_handlers import DefaultRequestHandlerfrom a2a.server.tasks import (    BasePushNotificationSender,    InMemoryPushNotificationConfigStore,    InMemoryTaskStore,)mcp_client = MCPClient(host="localhost", port=8000, transport="http") # Example Clientdef get_agent(agent_card: AgentCard):    """Get the agent, given an agent card."""    try:        if agent_card.name == "Orchestrator Agent":            # This is the Orchestrator Agent            return OrchestratorAgent(mcp_client, prompts.orchestrator_agent)        if agent_card.name == "Validation Agent":            # This is the Validation Agent            return ValidationAgent(mcp_client, prompts.validation_agent)        if agent_card.name == "Deployment Agent":            # This is the Deployment Agent            return DeploymentAgent(mcp_client, prompts.deployment_agent)    except Exception as e:        raise edef main(host, port, agent_card_path):    """Starts an Agent server."""    with Path.open(agent_card) as file:        data = json.load(file)    agent_card = AgentCard(**data)    client = httpx.AsyncClient()    push_notification_config_store = InMemoryPushNotificationConfigStore()    push_notification_sender = BasePushNotificationSender(        client, config_store=push_notification_config_store    )    request_handler = DefaultRequestHandler(        agent_executor=GenericAgentExecutor(agent=get_agent(agent_card)),        task_store=InMemoryTaskStore(),        push_config_store=push_notification_config_store,        push_sender=push_notification_sender,    )    server = A2AStarletteApplication(        agent_card=agent_card, http_handler=request_handler    )    uvicorn.run(server.build(), host=host, port=port)if __name__ == "__main__":    main()
复制代码

将这两个协议分层的架构优势

这种将编排与专业执行分离的清晰划分带来了显著的架构优势:

  • 动态发现和弹性:编排器没有硬编码的专家知识。新智能体(例如,ReportingAgent 或 MonitoringAgent)可以被添加到系统中,而编排器能够在不更改其代码的情况下发现并使用它们。

  • 可组合能力: 专家智能体本身不是单体的。它们通过发现和使用 MCP 服务器中的细粒度工具来组合它们的行为。只需部署一个新的 MCP 工具,就可以简单地添加一个新的验证检查,ValidationAgent 随后可以动态地发现并使用它。

  • 清晰的意图与执行分离:编排器表达高层次的业务目标。专家处理低层次的实现细节。这种解耦使得整个系统更容易理解、维护和扩展。

  • 适应性和涌现系统:通过结合一个通用编排器和一组可发现的专业工具和代理,我们创建了一个能够适应新且复杂命令的系统,这些命令并非为它们明确设计。

通过在能力协议(MCP)之上分层一个通信和发现协议(A2A),我们弥合了从僵化和程序化自动化到真正的目标导向、AI 驱动操作的差距。

结论

随着智能体时代的到来,对健壮、可扩展和可互操作的智能体系统的需求变得越来越重要。在本文中,我们提出了一种架构模式,利用 Agent-to-Agent (A2A)和模型上下文协议(MCP)来解决这一挑战。

通过对 MLOps 工作流程的详细探索,我们展示了这种分层方法如何成功地将编排逻辑与执行逻辑解耦,这是可扩展系统的一个基本原则。我们展示了 A2A 为动态智能体协作提供了必要的通信框架,而 MCP 作为智能体发现和利用多样化工具和资源的通用接口。这种架构能够在不改变核心通信逻辑的情况下无缝集成新能力。

这种分层智能体架构的力量在于其适应和演变的能力。对于在 AI 的复杂性中导航的组织来说,这意味着从僵化、单体系统转向敏捷、智能体驱动的操作。它为开发能够快速整合新模型、工具和业务需求的 AI 生态系统提供了一个强大的蓝图。开发者获得了一个强大的框架,以构建更具弹性和可维护的管道。这种模式不仅限于 MLOps;其原则适用于任何动态协作和适应性访问能力至关重要的领域,以构建下一代智能系统。通过拥抱 A2A 和 MCP,我们使 AI 智能体从孤立任务转向协调智能,解锁了智能体时代前所未有的自动化和适应性水平。

这里介绍的架构模式提供了一种多智能体设计的方法。它提供了一个深思熟虑的结构,使我们能够超越简单的、单一的智能体,向协作系统迈进。

对于有兴趣尝试这些概念并围绕它们开发工具的读者,GitHub 上的官方A2A示例库提供了一个使用这两种协议的可运行示例,是一个很好的入门资源。

原文链接:

https://www.infoq.com/articles/architecting-agentic-mlops-a2a-mcp/