用 MCP 和 A2A 構建 3 個 Agent 的多智能體系統 -MAS-

圖題:U。U|來源:Hailey

接上篇技術分析文章,目前我對 Multi-agent based 產品是無法信任的,市面上的無論已公測還是 waiting list 狀態的 Agentic AI 產品,我只願意試用失敗情況下對個人隱私和利益影響最小的功能。原因也很直觀,單 agent 無法實現 showcase 裏 fancy 的產品效果,多 agent 存在數據 / 工具錯誤和幻覺等問題導致 意圖理解、目標規劃、任務執行、結果優化 失敗。但我非常好奇 MAS 在 MCP 和 A2A 的加持下能實現多複雜的任務,本文的 MAS 實驗證明,這兩個技術很大程度上影響甚至決定 Agentic AI 可實現的複雜效果。

本文大綱如下:

1 爲什麼 MAS, MCP, A2A 重要?

我們在創造 真 ·AI 產品 時會把它當人看,可以理解爲 LLM 是大腦,Multi-agent 裏的每個 agent 是身體的各個器官, 爲了完成不同任務需要使用不同功能的工具。

Anthropic 推出的 MCP 爲 Agent 提供了對話過程中可靠的訪問 數據和工具 的方式。MCP 作爲 LLM 的 USB-C 接口,可以將同一模型插入 GitHub、Postgresql 數據庫、自定義知識庫中,而無需每次都重新寫代碼集成 模型 和 數據或工具。

通過對 主機 hosts, 客戶端 clients, 服務器 servers 交換 工具 Tools, 資源 resources, 提示詞 prompts 進行標準化,MCP 將上下文變成模型可依賴的內容,而非研發人員臨時拼湊。

Google 的 A2A 協議解決了另一個障礙:讓不同智能體(通常基於不同框架構建並部署)互相理解。在 A2A 協議中,每個智能體都有專屬的 “Agent card” 用於展示各自的技能,以及 HTTP 接口實現智能體間協商任務、流式傳輸中間結果。

通過 MCP 處理智能體融入外部世界,A2A 處理智能體間對話通信,智能體系統將變成協作流暢的勞動力羣,就像人腦有了左膀右臂。

2 手搓 MAS ft. MCP&A2A (tutorials from scratch)

本文將用 A2A 和 MCP 搭建一個完整的 Agent pipeline。首先會測試兩個 MCP server,其次嘗試創建一個使用 MCP 獲取信息的單 Agent,最後實現 Multi-agent 系統並用 A2A 相互協調和 MCP 獲取信息。預期單 Agent 能從 API 獲取數據或搜索網絡,多 Agent 系統能撰寫美國公司的簡單報告,例如 “S&P500 指數中前 10 家公司的股票價格分別是多少”“美國前 5 大木材生產商是誰?”。

2.1 兩個 MCP server

MAS 需要兩個 MCP server:

  1. 1. 一個能在 Google 上搜索和閱讀網頁的爬蟲

  2. 2. 一個股票檢索器,能夠獲取特定時間的股票價格和其他有用的信息

2.1.1 股票檢索與網頁爬蟲服務

  1. 1. 股票檢索

注意先註冊獲取 API Key

  1. 2. 爬蟲

注意先註冊獲取 API Key

2.1.2 服務轉服務器

MCP 服務器根據傳輸層不同可以分爲兩種類型:

STDIO MCP 更簡單易用,但當存在多個 MCP server 時無法穩定工作,通過 SSE MCP 可以使用不同端口穩定運行不同 server。

使用 FastMCP 框架構建兩個 MCP server,使用 Google 的 ADK(Agent Development Kit)構建 MAS 即多智能體的分層團隊,MAS 包含一個主代理和兩個專門的子代理。

2.1.3 使用 MCP 實現外部信息獲取

# 1. MCP工具連接
## 獲取搜索
asyncdefreturn_sse_mcp_tools_search():
    print("Attempting to connect to MCP server for search and page read...")
    server_params = SseServerParams(
        url="http://localhost:8080/sse",
    )
    tools, exit_stack = await MCPToolset.from_server(connection_params=server_params)
    print("MCP Toolset created successfully.")
    return tools, exit_stack
    
## 股票數據
asyncdefreturn_sse_mcp_tools_stocks():
    print("Attempting to connect to MCP server for stock info...")
    server_params = SseServerParams(
        url="http://localhost:8181/sse",
    )
    tools, exit_stack = await MCPToolset.from_server(connection_params=server_params)
    print("MCP Toolset created successfully.")
    return tools, exit_stack

# 2. MCP工具分配
stock_analysis_agent = Agent( # Agent能獲取實時股票價格
    // ... existing code ...
    tools=stocks_tools,
)
search_agent = Agent( # Agent能使用Google搜索和閱讀網頁
    // ... existing code ...
    tools=search_tools,
)

2.2 MAS 中的三個 Agent

2.2.1 層次化結構

MAS層次化結構:
root_agent (company_analysis_assistant)
├── search_agent
└── stock_analysis_agent

主要特點:
- root_agent : 主協調代理,負責公司分析和任務委派
- search_agent : 搜索代理,處理 Google 搜索和網頁閱讀
- stock_analysis_agent : 股票分析代理,提供股票數據和分析
- 使用 SSE 連接到 MCP 服務器
- 允許用戶輸入自定義查詢,沒有設置硬查詢(測試時可以設置)
- 主代理根據查詢類型將任務委派給適當的子代理

層次化結構通過sub_agents參數實現:
root_agent = Agent(
    // ... existing code ...
    sub_agents=[search_agent, stock_analysis_agent],
    output_key="last_assistant_response",
)

2.2.2 委派機制

根 Agent (root_agent) :作爲主協調者,負責分析用戶查詢並決定是自己處理還是委派給子 Agent。將該機制在 root_agent 指令中明確定義:

"1. If the user asks about a company, provide a detailed report.\n"
"2. If you need any information about the current stock price, delegate to the stock_analysis_agent.\n"
"3. If you need to search for information, delegate to the search_agent.\n"

2.2.3 Agent 關鍵參數

# 基本參數
agent = Agent(
    model=MODEL,       # 使用的LLM模型
    , # Agent的名稱,用於標識和引用
    instruction="..."# Agent的詳細指令
    description="..."# Agent的簡短描述
    tools=tools,       # Agent可以使用的工具
)

# 特殊參數
root_agent = Agent(
    // ... existing code ...
    sub_agents=[search_agent, stock_analysis_agent],  # 子Agent列表
    output_key="last_assistant_response",             # 輸出鍵,用於獲取最終響應
)

2.3 A2A 實現 Agent 間通信

2.3.1 三個 Agent 服務器

每個 Agent 創建一個服務器實現一個 A2A 服務,使 Agent 可以通過 HTTP 接口被其他 Agent 調用。

# 主機Agent服務器
server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/host_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
)
# 股票報告Agent服務器
server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/stock_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
)
# Google搜索Agent服務器
server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/host_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
)

# host_agent通過send_task方法與其他agent通信
asyncdefsend_task(
        self,
        agent_name: str,
        message: str,
        tool_context: ToolContext):
    """Sends a task either streaming (if supported) or non-streaming.
    // ... existing code ...
    """

# host_agent需要知道其他agent的地址
list_urls = [
    "http://localhost:11000/google_search_agent",
    "http://localhost:10000/stock_agent",
]

host_agent = ADKAgent(
    // ... existing code ...
    is_host_agent=True,
    remote_agent_addresses=list_urls,
)

2.3.4 創建專業化 Agent

# 股票分析代理
stock_analysis_agent = Agent(
    model=MODEL,
    ,
    instruction="Analyze stock data and provide insights.",
    description="Handles stock analysis and provides insights, in particular, can get the latest stock price.",
    tools=stocks_tools,
)

# Google搜索代理
search_agent = Agent(
    model=MODEL,
    ,
    instruction="Expert googler. Can search anything on google and read pages online.",
    description="Handles search queries and can read pages online.",
    tools=search_tools,
)

# 根代理
root_agent = Agent(
    ,
    model=MODEL,
    description="Main assistant: Handles requests about stocks and information of companies.",
    instruction=(
        "You are the main Assistant coordinating a team. Your primary responsibilities are providing company and stocks reports and delegating other tasks.\n"
        "1. If the user asks about a company, provide a detailed report.\n"
        "2. If you need any information about the current stock price, delegate to the stock_analysis_agent.\n"
        "3. If you need to search for information, delegate to the search_agent.\n"
        "Analyze the user's query and delegate or handle it appropriately. If unsure, ask for clarification. Only use tools or delegate as described."
    ),
    sub_agents=[search_agent, stock_analysis_agent],
    output_key="last_assistant_response",
)

2.3.3 MAS 執行流程

整個系統的執行流程如下:

  1. 1. 創建會話和服務

  2. 2. 創建子 Agent 和根 Agent

  3. 3. 創建 Runner 並運行 Agent

  4. 4. root_agent 獲取用戶查詢

  5. 5. 分析查詢並決定是否:

  6. 直接處理

  7. 分配給股票分析 agent 處理

  8. 分配給 Google 搜索 agent 處理

  9. 3. agent 連接 MCP 服務器獲取工具來收集信息

  10. 6. root_agent 處理事件並獲取最終響應提供給用戶答案

  11. 7. 關閉 MCP 連接

async defasync_main():
    # Initialize services
    session_service = InMemorySessionService()
    artifacts_service = InMemoryArtifactService()
    
    # Create session
    session = session_service.create_session(
        state={},
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=SESSION_ID,
    )

    # Get user query
    query = input("Enter your query:\n")
    content = types.Content(role='user', parts=[types.Part(text=query)])
    
    # 創建子Agent
    stock_analysis_agent = Agent(...)
    search_agent = Agent(...)
    # 創建根Agent
    root_agent = Agent(...)
    
    # Initialize tools from MCP servers
    search_tools, search_exit_stack = await return_sse_mcp_tools_search()
    stocks_tools, stocks_exit_stack = await return_sse_mcp_tools_stocks()

    # Create and run the agent pipeline 創建Runner
    runner = Runner(
        app_name=APP_NAME,
        agent=root_agent,
        artifact_service=artifacts_service,
        session_service=session_service,
    )

    # Process events and get final response 運行Agent
    events_async = runner.run_async(
        session_id=session.id, 
        user_id=session.user_id, 
        new_message=content
    )
    # 處理事件
    asyncfor event in events_async:
        if event.is_final_response():
            if event.content and event.content.parts:
                final_response_text = event.content.parts[0].text
            elif event.actions and event.actions.escalate:
                final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
            print(colored(text=f"############# Final Response #############\n\n{final_response_text}", color='green'))
            break
        else:
            print(event)

    # Cleanup 關閉連接
    await stocks_exit_stack.aclose()
    await search_exit_stack.aclose()

2.4 A2A 協議框架

2.4.1 核心概念組件

2.4.1.1 核心模塊

由於中文翻譯可能詞不達意,保留概念的英文名稱。

Agent card(代理卡):作爲 Agent 的數字身份卡,詳細說明了代理的功能、可用技能、服務端點和安全要求,使其他 agent 能夠發現並瞭解如何與其交互。

A2A server(A2A 服務器):一個通過公開 HTTP 端點來實現 A2A 協議的 agent,能夠處理傳入的請求、管理任務執行,並根據協議規範與客戶端保持通信。

A2A client(A2A 客戶端):任何使用 A2A 協議的應用程序或 Agent,通過向 A2A 服務器的端點發送請求來啓動交互。

Task(任務):A2A 中的主要工作單元,當 Client 啓動一個任務時,client 會創建一個獨特的對話線程,該線程可以經歷各種狀態:已提交、正在進行、需要輸入、已完成、失敗或已取消。每個任務都維護自己的上下文和歷史記錄。

Message(消息):客戶端和 Agent 之間通信的基本單元,消息包含一個角色(user 或 agent),並由一個或多個 Parts 組成,形成對話線程。

Artifact(工件):代表 Agent 在任務執行期間生成的任何輸出,包括生成的文件、結構化數據或其他資源,與 Message 一樣,Artifacts 也由 Parts 組成。

Part(部分):Message 或 Artifacts 中的原子內容單元,存在以下不同類型:

Streaming(流式傳輸):對於需要較長處理時間的任務,服務器通過流式傳輸功能,允許客戶端通過服務器發送事件(SSE)接收實時更新,包括任務狀態更改和新工件。

Push notifications(推送通知):支持此功能的服務器可以通過 Webhook URL 主動通知客戶端的端點接受並啓動任務更新。

2.4.1.2 典型流程

Discovery:客戶端從服務器的已知 URL 檢索 Agent card,以瞭解 Agent 的功能和要求。

Initiation:客戶端通過發送請求來啓動新任務,請求代碼包括初始消息和唯一任務 ID。

Processing:

Interaction:如果任務需要額外輸入,客戶端可以使用相同的任務 ID 發送後續消息。

Completion:任務達到最終狀態(已完成、失敗、已取消),從而結束 Interaction。

2.4.2 ADKAgent 類

用 ADKAgent 類實現上述 A2A 的概念組件和典型流程,ADKAgent 類是 A2A 實現的基礎,將 Google ADK 框架與 A2A 協議連接起來,允許 Agent 進行通信和協調任務。

ADKAgent 類有兩種模式:

主要特點:

遠程管理 Agent:允許 Agent 發現並連接到網絡中的其他 Agent,每個 Agent 的能力都在 AgentCard 中描述。

任務委派:通過該方法處理核心的 A2A 協議交互流程:

狀態管理:Agent 需要維護的內容的狀態信息包括:活動會話、當前任務、已連接的 Agent、對話上下文

響應處理:具備一系列方法處理不同響應格式之間的轉換:文本響應、結構化數據、文件附件、artifacts

2.4.3 Agent Card 類

AgentCard 類是 A2A 協議的關鍵組成部分,作爲一種標準化方式來描述一個 Agent 的能力和需求,本質上是 Agent 的數字身份卡,其他 Agent 可以用 AgentCard 類來發現和理解如何與該 Agent 交互,且無需實現瞭解該 Agent 的實現細節。AgentCard 類包含:

以股票分析 Agent card 爲例:

stock_agent_card = generate_agent_card(
    agent_,
    agent_description="Analyzes stock market data and provides insights",
    agent_url="http://localhost:8080",
    agent_version="1.0.0",
    can_stream=True,
    can_push_notifications=True,
    skills=[
        AgentSkill(
            ,
            description="Analyzes stock prices and market trends",
            input_schema={
                "type""object",
                "properties": {
                    "symbol": {"type""string"},
                    "timeframe": {"type""string"}
                }
            },
            output_schema={
                "type""object",
                "properties": {
                    "price": {"type""number"},
                    "trend": {"type""string"}
                }
            }
        )
    ]
)
當另一個Agent要與該Agent交互時,需要:

從/agent.json中獲取Agent card
驗證agent是否支持所需功能
檢查agent是否具備必要技能
使用提供的URL和身份驗證方法建立通信

2.4.4 A2A 服務器

利用 A2A 服務器處理來自其他 Agent 的傳入請求並管理通信協議。關鍵組件包括:

2.4.5 A2A Card 解析器

解析器是一個用於解析 Agent Card 的類,當不知道 Agent 的 Card 時可以用於查找 Agent 的 Agent Card。

出於某種原因,來自 Google 的原始示例不起作用。因爲它在附加卡片路徑時 “銷燬” 了 URL。公共資源、utils 和其他在這些主要組件之上,在本教程中,我們使用了其他資源列表,其中許多資源與原始教程相同:絕大多數都可以在這裏找到。把所有東西放在一起現在,我們已經有了把所有東西放在一起所需的所有組件。我們創建 Host Agent:

2.5 結果

2.5.1 Host Agent

async defrun_agent():
    AGENT_NAME = "host_agent"
    AGENT_DESCRIPTION = "An agent orchestrates the decomposition of the user request into tasks that can be performed by the child agents."
    PORT = 12000
    HOST = "0.0.0.0"
    AGENT_URL = f"http://{HOST}:{PORT}"
    AGENT_VERSION = "1.0.0"
    MODEL = 'gemini-2.5-pro-preview-03-25'
    AGENT_SKILLS = [
        AgentSkill(
            id="COORDINATE_AGENT_TASKS",
            ,
            description="coordinate tasks between agents.",
        ),
    ]

    list_urls = [
        "http://localhost:11000/google_search_agent",
        "http://localhost:10000/stock_agent",
    ]

    AGENT_CARD = generate_agent_card(
        agent_name=AGENT_NAME,
        agent_description=AGENT_DESCRIPTION,
        agent_url=AGENT_URL,
        agent_version=AGENT_VERSION,
        can_stream=False,
        can_push_notifications=False,
        can_state_transition_history=True,
        default_input_modes=["text"],
        default_output_modes=["text"],
        skills=AGENT_SKILLS,
    )

    host_agent = ADKAgent(
        model=MODEL,
        ,
        description="",
        tools=[],
        instructions="",
        is_host_agent=True,
        remote_agent_addresses=list_urls,
    )

    task_manager = generate_agent_task_manager(
        agent=host_agent,
    )
    server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/host_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
    )
    print(f"Starting {AGENT_NAME} A2A Server on {AGENT_URL}")
    await server.astart()

2.5.2 Search Agent

import asyncio
from typing importList, Any

from dotenv import load_dotenv, find_dotenv
from google.adk import Agent
from google.adk.tools import google_search

from a2a_servers.agent_servers.utils import generate_agent_task_manager, generate_agent_card
from a2a_servers.agents.adk_agent import ADKAgent
from a2a_servers.common.agent_task_manager import AgentTaskManager
from a2a_servers.common.server.server import A2AServer
from a2a_servers.common.types import (
    AgentCard,
    AgentCapabilities,
    AgentSkill,
)
from adk_agents_testing.mcp_tools.mcp_tool_search import return_sse_mcp_tools_search

load_dotenv(find_dotenv())

asyncdefrun_agent():
    AGENT_NAME = "google_search_agent"
    AGENT_DESCRIPTION = "An agent that handles search queries and can read pages online."
    HOST = "0.0.0.0"
    PORT = 11000
    AGENT_URL = f"http://{HOST}:{PORT}"
    AGENT_VERSION = "1.0.0"
    MODEL = 'gemini-2.5-pro-preview-03-25'
    AGENT_SKILLS = [
        AgentSkill(
            id="GOOGLE_SEARCH",
            ,
            description="Handles search queries and can read pages online.",
        ),
    ]

    AGENT_CARD = generate_agent_card(
        agent_name=AGENT_NAME,
        agent_description=AGENT_DESCRIPTION,
        agent_url=AGENT_URL,
        agent_version=AGENT_VERSION,
        can_stream=False,
        can_push_notifications=False,
        can_state_transition_history=True,
        default_input_modes=["text"],
        default_output_modes=["text"],
        skills=AGENT_SKILLS,
    )

    gsearch_tools, g_search_exit_stack = await return_sse_mcp_tools_search()

    google_search_agent = ADKAgent(
        model=MODEL,
        ,
        description="Handles search queries and can read pages online.",
        tools=gsearch_tools,
        instructions=(
            "You are an expert googler. Can search anything on google and read pages online."
        ),
    )

    task_manager = generate_agent_task_manager(
        agent=google_search_agent,
    )
    server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/google_search_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
    )
    print(f"Starting {AGENT_NAME} A2A Server on {AGENT_URL}")
    await server.astart()


if __name__ == "__main__":
    asyncio.run(
        run_agent()
    )

2.5.3 Stock Agent

async defrun_agent():
    AGENT_NAME = "stock_report_agent"
    AGENT_DESCRIPTION = "An agent that provides US stock prices and info."
    PORT = 10000
    HOST = "0.0.0.0"
    AGENT_URL = f"http://{HOST}:{PORT}"
    AGENT_VERSION = "1.0.0"
    MODEL = 'gemini-2.5-pro-preview-03-25'
    AGENT_SKILLS = [
        AgentSkill(
            id="SKILL_STOCK_REPORT",
            ,
            description="Provides stock prices and info.",
        ),
    ]

    AGENT_CARD = generate_agent_card(
        agent_name=AGENT_NAME,
        agent_description=AGENT_DESCRIPTION,
        agent_url=AGENT_URL,
        agent_version=AGENT_VERSION,
        can_stream=False,
        can_push_notifications=False,
        can_state_transition_history=True,
        default_input_modes=["text"],
        default_output_modes=["text"],
        skills=AGENT_SKILLS,
    )

    stocks_tools, stocks_exit_stack = await return_sse_mcp_tools_stocks()

    stock_analysis_agent = ADKAgent(
        model=MODEL,
        ,
        description="Handles stock analysis and provides insights, in particular, can get the latest stock price.",
        tools=stocks_tools,
        instructions=(
            "Analyze stock data and provide insights. You can also get the latest stock price."
            "If the user asks about a company, the stock prices for the said company."
            "If the user asks about a stock, provide the latest stock price and any other relevant information."
            "You can get only the latest stock price for US companies."
        ),
    )

    task_manager = generate_agent_task_manager(
        agent=stock_analysis_agent,
    )
    server = A2AServer(
        host=HOST,
        port=PORT,
        endpoint="/stock_agent",
        agent_card=AGENT_CARD,
        task_manager=task_manager
    )
    print(f"Starting {AGENT_NAME} A2A Server on {AGENT_URL}")
    await server.astart()

全部代碼詳見:https://github.com/Tsadoq/a2a-mcp-tutorial

3 資料推薦

3.1 代碼庫

3.2 教程

3.3 其他

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/40kPGdsw7LxTCeDxSsoYUQ