谷歌 Agent2Agent 協議原理解讀

谷歌在 25 年 4 月初發布了 A2A 協議,作爲 MCP 協議的補充。Agent2Agent 協議致力於促進獨立 agent 間的通信,幫助不同生態系統的 agent 溝通和協作。

核心概念

// An AgentCard conveys key information:
// - Overall details (version, name, description, uses)
// - Skills: A set of capabilities the agent can perform
// - Default modalities/content types supported by the agent.
// - Authentication requirements
interface AgentCard {
  // Human readable name of the agent.
  // (e.g. "Recipe Agent")
  name: string;
  // A human-readable description of the agent. Used to assist users and
  // other agents in understanding what the agent can do.
  // (e.g. "Agent that helps users with recipes and cooking.")
  description: string;
  // A URL to the address the agent is hosted at.
  url: string;
  // The service provider of the agent
  provider?: {
    organization: string;
    url: string;
  };
  // The version of the agent - format is up to the provider. (e.g. "1.0.0")
  version: string;
  // A URL to documentation for the agent.
  documentationUrl?: string;
  // Optional capabilities supported by the agent.
  capabilities: {
    streaming?: boolean; // true if the agent supports SSE
    pushNotifications?: boolean; // true if the agent can notify updates to client
    stateTransitionHistory?: boolean; //true if the agent exposes status change history for tasks
  };
  // Authentication requirements for the agent.
  // Intended to match OpenAPI authentication structure.
  authentication: {
    schemes: string[]; // e.g. Basic, Bearer
    credentials?: string; //credentials a client should use for private cards
  };
  // The set of interaction modes that the agent
  // supports across all skills. This can be overridden per-skill.
  defaultInputModes: string[]; // supported mime types for input
  defaultOutputModes: string[]; // supported mime types for output
  // Skills are a unit of capability that an agent can perform.
  skills: {
    id: string; // unique identifier for the agent's skill
    name: string; //human readable name of the skill
    // description of the skill - will be used by the client or a human
    // as a hint to understand what the skill does.
    description: string;
    // Set of tag words describing classes of capabilities for this specific
    // skill (e.g. "cooking", "customer support", "billing")
    tags: string[];
    // The set of example scenarios that the skill can perform.
    // Will be used by the client as a hint to understand how the skill can be
    // used. (e.g. "I need a recipe for bread")
    examples?: string[]; // example prompts for tasks
    // The set of interaction modes that the skill supports
    // (if different than the default)
    inputModes?: string[]; // supported mime types for input
    outputModes?: string[]; // supported mime types for output
  }[];
}

典型工作流程

  1. 發現:客戶端從服務器的 well‑known URL(通常是 https://DOMAIN/.well-known/agent.json)獲取 Agent Card。

  2. 發起:客戶端發送包含初始用戶消息和唯一任務 ID 的 tasks/send 或 tasks/sendSubscribe 請求。

  3. 處理

  1. 交互(可選):如果任務進入 input-required 狀態,客戶端可使用相同的任務 ID,通過 tasks/send 或 tasks/sendSubscribe 發送後續消息以提供所需輸入。

  2. 完成:任務最終達到終止狀態之一:completedfailed 或 canceled


Agent 發現

發現 Agent Card

🌐 開放發現(Open Discovery)

A2A 協議建議企業將 Agent Card 託管在一個約定路徑上,例如:**https://DOMAIN/.well-known/agent.json**客戶端通過 DNS 解析域名,向該路徑發送 GET 請求,即可獲取 Agent Card。 這允許網頁爬蟲和應用程序輕鬆發現已知或配置的代理,實質上將 “發現 Agent” 簡化爲“找到域名”。

📚 篩選式發現(Curated Discovery,基於註冊表)

企業可能通過目錄界面提供代理註冊表,供公司或團隊內部使用。這種方式便於由管理員進行管理與篩選。 谷歌官方正在考慮將註冊表支持加入協議標準。

🔐 私有發現(Private Discovery,基於 API)

某些 Agent 可能託管於私有 “Agent 商店” 或通過自定義 API 交換 Agent Card。 A2A 協議目前不打算將私有發現 API 納入標準。


保護 Agent Card(Securing Agent Cards)

Agent Card 可能包含敏感信息,實現方可根據需要爲其設置身份驗證和訪問控制。 例如,即使是放在 .well-known 路徑上的 Agent Card,也可以通過 mTLS(雙向 TLS) 進行限制,僅允許特定客戶端訪問。註冊表和私有 API 更應強制認證,並根據不同身份返回不同內容。

⚠️ 注意:Agent Card 中可能會包含認證信息(如 API Key),強烈建議此類內容不得在未驗證身份的情況下公開訪問。


企業級就緒性 (Enterprise Readiness)

A2A 協議旨在支持企業級需求,與現有企業基礎設施無縫集成。它將 Agent 看作標準的基於 HTTP 的企業應用,因此依賴現有的認證、安全、隱私、追蹤與監控機制。

傳輸層安全(Transport Level Security)

A2A 基於 HTTP,生產環境必須使用 HTTPS,並啓用現代 TLS 加密套件。

服務器身份驗證(Server Identity)

服務器應使用由受信任證書頒發機構簽發的數字證書,客戶端需驗證證書以確認身份。

客戶端和用戶身份(Client and User Identity)

A2A 協議本身不定義客戶端或用戶標識,而是通過 Agent Card 指明所需的認證方式。客戶端需通過外部認證機制(如 OAuth)獲取憑證,並通過 HTTP 頭傳輸。

多身份聯合登錄仍是開放議題,如某任務需同時訪問 A 系統和 B 系統,用戶需同時提供兩個系統的認證憑據。

客戶端認證(Authenticating Clients)

A2A 服務器應在 Agent Card 中公佈其支持的認證協議,如 API Key、OAuth、OIDC 等,並要求客戶端每次請求都進行認證。認證失敗將返回標準 HTTP 401 或 403 錯誤碼。

授權與數據隱私(Authorization and Data Privacy)

A2A 建議基於 技能(skills) 和 工具(tools) 兩個維度進行授權管理:

可觀測性與追蹤(Tracing and Observability)

A2A 基於 HTTP,因此可直接使用企業現有的日誌、追蹤與監控工具(如 OpenTracing)。客戶端和服務器應插入必要的追蹤頭,並記錄日誌與事件。


與 MCP 協議的互補關係

構建 agent 應用需要同時使用 A2A 和 MCP 協議。agent 通過 MCP 連接工具,通過 A2A 協議連接其他 agent。

例如一個汽車修理店,員工使用各種專用工具(如升降機、電壓表、扳手)來診斷和修理汽車問題。他們經常處理以前沒見過的故障,過程中可能還需與客戶交談、查資料、與零件供應商合作。 現在把這些員工抽象爲 Agent:

MCP 協議用來連接 agent 與結構化工具(例如:“升高平臺 2 米”、“扳手右轉 4 毫米”)。 

A2A 協議用來讓最終用戶或其他代理與員工交流(例如:“我的車有異響”)。A2A 支持多輪對話和任務計劃的演進(如:“拍張左車輪的照片”、“我看到有液體泄漏,這種情況多久了?”),也讓修理工可以與其他代理(如零件供應商)協作。

案例實戰:基於 A2A 協議實現 MindsDB 企業數據 agent

通過 A2A 協議實現自然語言查詢和跨多個企業數據源的數據分析,基座模型使用 Gemini 2.5 Flash 模型,MindsDB 提供文本到 SQL 的能力,A2A 協議作爲通信協議,實現智能體之間的協作和任務分發。 

具體流程爲:

 自然語言輸入:用戶通過智能體輸入自然語言查詢,例如:“查詢過去三個月的銷售數據”。

 任務解析與分發:智能體將用戶的查詢解析爲任務,並通過 A2A 協議將任務分發給適當的執行代理。

 數據查詢與分析:執行代理接收到任務後,使用 MindsDB 的文本到 SQL(Text-to-SQL)技能,將自然語言查詢轉換爲 SQL 查詢,並在多個數據源上執行這些查詢。 

結果整合與反饋:執行代理將查詢結果整合,並通過 A2A 協議將結果反饋給用戶。Agent 的實現:

import json
import aiohttp
import os
from typing import Any, AsyncIterable, Dict, Optional
from dotenv import load_dotenv

# 加載 .env 文件中的環境變量
load_dotenv()

class MindsDBAgent:
    """An agent that data requests from any database, datawarehouse, app."""

    # 支持的內容類型列表,僅文本
    SUPPORTED_CONTENT_TYPES = ["text""text/plain"]
    # MindsDB API 的基礎 URL
    API_URL = "https://ai.staging.mindsdb.com/chat/completions"

    def __init__(self):
        # 從環境變量獲取 MindsDB API Key
        self.api_key = os.getenv('MINDS_API_KEY')
        ifnot self.api_key:
            raise ValueError("MINDS_API_KEY environment variable is not set")

        # 從環境變量獲取默認的 Mind 名稱
        self.model = os.getenv('MIND_NAME')
        ifnot self.model:
            # 如果未設置,則使用示例中默認的 Mind
            self.model = "Sales_Data_Expert_Demo_Mind"

        # HTTP 請求頭,包含認證信息
        self.headers = {
            "Content-Type""application/json",
            "Authorization": f"Bearer {self.api_key}"
        }

    def invoke(self, query, session_id) -> str:
        # 同步調用接口占位,提示使用流式獲取真實結果
        return {"content""Use stream method to get the results!"}

    asyncdef stream(self, query, session_id) -> AsyncIterable[Dict[str, Any]]:
        """
        流式方法:針對長時任務返回 SSE 流,每次 yield 一個消息段或完成事件
        """
        # 構造請求的 JSON 負載,包含 system 和 user 消息,以及 stream 標誌
        payload = {
            "model": self.model,
            "messages": [
                {"role""system""content""You are a helpful assistant."},
                {"role""user""content": query}
            ],
            "stream": True
        }

        # 使用 aiohttp 發送異步 HTTP POST 請求
        asyncwith aiohttp.ClientSession() as session:
            asyncwith session.post(self.API_URL, headers=self.headers, json=payload) as response:
                # 解析響應內容爲字節流,按行處理 SSE
                asyncfor line in response.content:
                    ifnot line:
                        continue# 跳過空行
                    line = line.decode('utf-8').strip()
                    ifnot line.startswith("data: "):
                        continue# 只處理帶 "data: " 前綴的行

                    # 去除前綴並嘗試解析 JSON
                    json_str = line[6:]
                    try:
                        data = json.loads(json_str)
                    except json.JSONDecodeError:
                        continue# 跳過無法解析的行

                    # 處理 choices 數組,通常只有一個元素
                    if"choices"in data:
                        choice = data["choices"][0]
                        delta = choice.get("delta", {})
                        content = delta.get("content")
                        role = delta.get("role""")

                        # 構造基本的消息部分列表
                        parts = [{"type""text""text": content}]

                        # 判斷是否已完成
                        if choice.get("finish_reason") == "stop":
                            yield {
                                "is_task_complete": True,
                                "parts": parts
                            }
                            continue

                        # 默認子類型爲推理
                        subtype = "analysis"
                        tool_calls = delta.get("tool_calls", [])

                        # 如果是助手角色發言,轉換爲 ack 類型
                        if role == "assistant":
                            subtype = "acknowledge"

                        # 處理工具調用示例(例如 SQL 查詢)
                        if tool_calls:
                            tool_call = tool_calls[0]
                            function = tool_call.get("function", {})
                            function_name = function.get("name")
                            arguments = function.get("arguments", {})
                            if function_name == "sql_db_query":
                                subtype = "execute_query"
                                # 將查詢參數也作爲一個 message part
                                parts.append({
                                    "type""text",
                                    "text": str(arguments)
                                })

                        # 返回中間事件,is_task_complete 爲 False
                        yield {
                            "is_task_complete": False,
                            "parts": parts,
                            "metadata": {"type""reasoning""subtype": subtype}
                        }
                        continue

task manager 的實現:

import json
from typing import AsyncIterable, Union
import logging

# 引入 A2A 協議相關類型
from common.types import (
    SendTaskRequest,
    TaskSendParams,
    Message,
    TaskStatus,
    Artifact,
    TaskStatusUpdateEvent,
    TaskArtifactUpdateEvent,
    TextPart,
    TaskState,
    Task,
    SendTaskResponse,
    InternalError,
    JSONRPCResponse,
    SendTaskStreamingRequest,
    SendTaskStreamingResponse,
)

# 引入任務管理基類(內存實現)
from common.server.task_manager import InMemoryTaskManager
# 引入具體的 MindsDBAgent 實現
from agent import MindsDBAgent
# 通用工具函數,用於檢查內容類型兼容性等
import common.server.utils as utils

logger = logging.getLogger(__name__)

class AgentTaskManager(InMemoryTaskManager):
    """
    A2A 協議任務管理器:
    繼承自 InMemoryTaskManager,處理 tasks/send 與 tasks/sendSubscribe。
    調用 MindsDBAgent 執行實際任務,並將結果按 A2A 事件流式推送。
    """
    def __init__(self, agent: MindsDBAgent):
        super().__init__()
        # 注入具體的代理實例
        self.agent = agent

    asyncdef _stream_generator(
        self, request: SendTaskStreamingRequest
    ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
        """
        實現流式訂閱:
        接收 A2A 客戶端的 tasks/sendSubscribe 請求
        並將代理 stream() 方法的輸出轉換爲 A2A 事件流。
        """
        # 從請求中獲取 Task 參數
        task_send_params: TaskSendParams = request.params
        query = self._get_user_query(task_send_params)
        try:
            # 調用代理的 SSE 流方法
            asyncfor item in self.agent.stream(query, task_send_params.sessionId):
                is_complete = item["is_task_complete"]
                parts = item["parts"]

                ifnot is_complete:
                    # 未完成:狀態爲 WORKING
                    task_state = TaskState.WORKING
                    metadata = item.get("metadata")
                    # 構造 A2A 消息
                    message = Message(role="agent", parts=parts, metadata=metadata)
                    task_status = TaskStatus(state=task_state, message=message)
                    # 更新內存任務狀態
                    await self._update_store(task_send_params.id, task_status, [])
                    # 構造 TaskStatusUpdateEvent
                    update_event = TaskStatusUpdateEvent(
                        id=task_send_params.id,
                        status=task_status,
                        final=False
                    )
                    # 推送給客戶端
                    yield SendTaskStreamingResponse(id=request.id, result=update_event)
                else:
                    # 完成:狀態爲 COMPLETED
                    task_state = TaskState.COMPLETED
                    # 構造 Artifact(僅一段 parts)
                    artifact = Artifact(parts=parts, index=0, append=False)
                    # 首先發送工件更新事件
                    yield SendTaskStreamingResponse(
                        id=request.id,
                        result=TaskArtifactUpdateEvent(
                            id=task_send_params.id,
                            artifact=artifact
                        )
                    )
                    # 更新內存中的任務狀態與工件列表
                    task_status = TaskStatus(state=task_state)
                    await self._update_store(task_send_params.id, task_status, [artifact])
                    # 最終發送狀態完成事件
                    yield SendTaskStreamingResponse(
                        id=request.id,
                        result=TaskStatusUpdateEvent(
                            id=task_send_params.id,
                            status=TaskStatus(state=task_state),
                            final=True
                        )
                    )
        except Exception as e:
            # 異常處理:返回 JSON-RPC 錯誤
            logger.error(f"Streaming error: {e}")
            yield JSONRPCResponse(
                id=request.id,
                error=InternalError(message="Error streaming response")
            )

    def _validate_request(
        self, request: Union[SendTaskRequest, SendTaskStreamingRequest]
    ) -> Optional[JSONRPCResponse]:
        """
        驗證客戶端請求的輸出模式與代理支持的內容類型是否兼容。
        不兼容時返回 JSON-RPC 錯誤響應。
        """
        params: TaskSendParams = request.params
        ifnot utils.are_modalities_compatible(
            params.acceptedOutputModes,
            MindsDBAgent.SUPPORTED_CONTENT_TYPES
        ):
            logger.warning("Incompatible modes: %s", params.acceptedOutputModes)
            return utils.new_incompatible_types_error(request.id)
        returnNone

    asyncdef on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
        """
        處理 tasks/send 同步請求,返回最終 Task 或 INPUT_REQUIRED。
        """
        error = self._validate_request(request)
        if error:
            return error
        # 插入或更新任務初始記錄
        await self.upsert_task(request.params)
        # 調用同步 invoke 實現
        returnawait self._invoke(request)

    asyncdef on_send_task_subscribe(
        self, request: SendTaskStreamingRequest
    ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
        """
        處理 tasks/sendSubscribe 流式請求,返回一個事件流。
        """
        error = self._validate_request(request)
        if error:
            return error
        await self.upsert_task(request.params)
        return self._stream_generator(request)

    asyncdef _update_store(
        self, task_id: str, status: TaskStatus, artifacts: list[Artifact]
    ) -> Task:
        """
        更新內存中任務的狀態和工件列表,用於後續查詢與狀態回放。
        """
        asyncwith self.lock:
            task = self.tasks.get(task_id)
            ifnot task:
                logger.error(f"Task {task_id} not found")
                raise ValueError(f"Task {task_id} not found")
            task.status = status
            if artifacts:
                task.artifacts = (task.artifacts or []) + artifacts
            return task

    asyncdef _invoke(self, request: SendTaskRequest) -> SendTaskResponse:
        """
        同步調用代理的 invoke,處理簡單或需要額外輸入的場景。
        """
        params: TaskSendParams = request.params
        query = self._get_user_query(params)
        try:
            result = self.agent.invoke(query, params.sessionId)
        except Exception as e:
            logger.error(f"Invoke error: {e}")
            raise ValueError(f"Error invoking agent: {e}")
        # 將返回內容封裝爲 TextPart
        parts = [{"type""text""text": result}]
        # 根據返回結果判斷是否需要更多輸入
        state = TaskState.INPUT_REQUIRED if"MISSING_INFO:"in result else TaskState.COMPLETED
        # 更新存儲並返回完整 Task
        task = await self._update_store(
            params.id,
            TaskStatus(state=state, message=Message(role="agent", parts=parts)),
            [Artifact(parts=parts)]
        )
        return SendTaskResponse(id=request.id, result=task)

    def _get_user_query(self, params: TaskSendParams) -> str:
        """
        從 TaskSendParams.message.parts 中提取用戶文本查詢,目前僅支持 TextPart。
        """
        part = params.message.parts[0]
        ifnot isinstance(part, TextPart):
            raise ValueError("Only text parts are supported")
        return part.text

REF

https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/ https://google.github.io/A2A/#a2a-unlock-collaborative-agent-to-agent-scenarios-with-a-new-open-protocol https://github.com/google/A2A https://github.com/google/A2A/blob/main/specification/json/a2a.json https://github.com/google/A2A/blob/main/samples/python/agents/mindsdb/README.md

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Gnye7-0fTX56Bf6fqbAc0Q