谷歌 Agent2Agent 協議原理解讀
谷歌在 25 年 4 月初發布了 A2A 協議,作爲 MCP 協議的補充。Agent2Agent 協議致力於促進獨立 agent 間的通信,幫助不同生態系統的 agent 溝通和協作。
核心概念
- Agent Card:一個公共元數據文件(通常位於
/.well-known/agent.json
),用於描述 Agent 的能力、技能、端點 URL 以及認證要求。客戶端通過它來發現 Agent。 Agent Card 通常包括 agent 的技能說明、供應商、身份驗證、鏈接、名稱等:
// An AgentCard conveys key information:
// - Overall details (version, name, description, uses)
// - Skills: A set of capabilities the agent can perform
// - Default modalities/content types supported by the agent.
// - Authentication requirements
interface AgentCard {
// Human readable name of the agent.
// (e.g. "Recipe Agent")
name: string;
// A human-readable description of the agent. Used to assist users and
// other agents in understanding what the agent can do.
// (e.g. "Agent that helps users with recipes and cooking.")
description: string;
// A URL to the address the agent is hosted at.
url: string;
// The service provider of the agent
provider?: {
organization: string;
url: string;
};
// The version of the agent - format is up to the provider. (e.g. "1.0.0")
version: string;
// A URL to documentation for the agent.
documentationUrl?: string;
// Optional capabilities supported by the agent.
capabilities: {
streaming?: boolean; // true if the agent supports SSE
pushNotifications?: boolean; // true if the agent can notify updates to client
stateTransitionHistory?: boolean; //true if the agent exposes status change history for tasks
};
// Authentication requirements for the agent.
// Intended to match OpenAPI authentication structure.
authentication: {
schemes: string[]; // e.g. Basic, Bearer
credentials?: string; //credentials a client should use for private cards
};
// The set of interaction modes that the agent
// supports across all skills. This can be overridden per-skill.
defaultInputModes: string[]; // supported mime types for input
defaultOutputModes: string[]; // supported mime types for output
// Skills are a unit of capability that an agent can perform.
skills: {
id: string; // unique identifier for the agent's skill
name: string; //human readable name of the skill
// description of the skill - will be used by the client or a human
// as a hint to understand what the skill does.
description: string;
// Set of tag words describing classes of capabilities for this specific
// skill (e.g. "cooking", "customer support", "billing")
tags: string[];
// The set of example scenarios that the skill can perform.
// Will be used by the client as a hint to understand how the skill can be
// used. (e.g. "I need a recipe for bread")
examples?: string[]; // example prompts for tasks
// The set of interaction modes that the skill supports
// (if different than the default)
inputModes?: string[]; // supported mime types for input
outputModes?: string[]; // supported mime types for output
}[];
}
-
A2A Server:一個實現了 A2A 協議方法(在 JSON 規範中定義)的 HTTP 端點,由 Agent 公開,用於接收請求並管理任務執行。
-
A2A Client:消費 A2A 服務的應用程序或其他代理。它向 A2A Server 的 URL 發送請求(如
tasks/send
)。 -
Task(任務):工作的核心單元。客戶端通過發送消息(
tasks/send
或tasks/sendSubscribe
)來發起任務。任務擁有唯一 ID,並依次經歷不同狀態(submitted
、working
、input-required
、completed
、failed
、canceled
)。 -
Message(消息):表示客戶端(
role: "user"
)與 Agent(role: "agent"
)之間的交互輪次。消息由多個 Part 組成。 -
Part(消息部分):消息或工件中的基本內容單元,可爲
TextPart
、FilePart
(內聯字節或 URI)或DataPart
(結構化 JSON,例如表單)。 -
Artifact(工件):代理在任務執行過程中生成的輸出,例如生成的文件或最終結構化數據。工件同樣由多個 Part 組成。
-
Streaming(流式傳輸):對於長時間運行的任務,支持流式功能的服務器可使用
tasks/sendSubscribe
。客戶端會接收包含TaskStatusUpdateEvent
或TaskArtifactUpdateEvent
的 Server-Sent Events(SSE),以獲取實時進度。 -
Push Notifications(推送通知):支持推送通知的服務器可以主動將任務更新發送到客戶端提供的 webhook URL,該 URL 可通過
tasks/pushNotification/set
進行配置。
典型工作流程
-
發現:客戶端從服務器的 well‑known URL(通常是
https://DOMAIN/.well-known/agent.json
)獲取 Agent Card。 -
發起:客戶端發送包含初始用戶消息和唯一任務 ID 的
tasks/send
或tasks/sendSubscribe
請求。 -
處理:
-
流式:服務器在任務進展過程中,通過 Server‑Sent Events(SSE)發送狀態更新或 Artifact 更新事件。
-
非流式:服務器同步處理任務,並在 HTTP 響應中返回最終的 Task 對象。
-
交互(可選):如果任務進入
input-required
狀態,客戶端可使用相同的任務 ID,通過tasks/send
或tasks/sendSubscribe
發送後續消息以提供所需輸入。 -
完成:任務最終達到終止狀態之一:
completed
、failed
或canceled
。
Agent 發現
發現 Agent Card
🌐 開放發現(Open Discovery)
A2A 協議建議企業將 Agent Card 託管在一個約定路徑上,例如:**https://DOMAIN/.well-known/agent.json
**客戶端通過 DNS 解析域名,向該路徑發送 GET 請求,即可獲取 Agent Card。 這允許網頁爬蟲和應用程序輕鬆發現已知或配置的代理,實質上將 “發現 Agent” 簡化爲“找到域名”。
📚 篩選式發現(Curated Discovery,基於註冊表)
企業可能通過目錄界面提供代理註冊表,供公司或團隊內部使用。這種方式便於由管理員進行管理與篩選。 谷歌官方正在考慮將註冊表支持加入協議標準。
🔐 私有發現(Private Discovery,基於 API)
某些 Agent 可能託管於私有 “Agent 商店” 或通過自定義 API 交換 Agent Card。 A2A 協議目前不打算將私有發現 API 納入標準。
保護 Agent Card(Securing Agent Cards)
Agent Card 可能包含敏感信息,實現方可根據需要爲其設置身份驗證和訪問控制。 例如,即使是放在 .well-known
路徑上的 Agent Card,也可以通過 mTLS(雙向 TLS) 進行限制,僅允許特定客戶端訪問。註冊表和私有 API 更應強制認證,並根據不同身份返回不同內容。
⚠️ 注意:Agent Card 中可能會包含認證信息(如 API Key),強烈建議此類內容不得在未驗證身份的情況下公開訪問。
企業級就緒性 (Enterprise Readiness)
A2A 協議旨在支持企業級需求,與現有企業基礎設施無縫集成。它將 Agent 看作標準的基於 HTTP 的企業應用,因此依賴現有的認證、安全、隱私、追蹤與監控機制。
傳輸層安全(Transport Level Security)
A2A 基於 HTTP,生產環境必須使用 HTTPS,並啓用現代 TLS 加密套件。
服務器身份驗證(Server Identity)
服務器應使用由受信任證書頒發機構簽發的數字證書,客戶端需驗證證書以確認身份。
客戶端和用戶身份(Client and User Identity)
A2A 協議本身不定義客戶端或用戶標識,而是通過 Agent Card 指明所需的認證方式。客戶端需通過外部認證機制(如 OAuth)獲取憑證,並通過 HTTP 頭傳輸。
多身份聯合登錄仍是開放議題,如某任務需同時訪問 A 系統和 B 系統,用戶需同時提供兩個系統的認證憑據。
客戶端認證(Authenticating Clients)
A2A 服務器應在 Agent Card 中公佈其支持的認證協議,如 API Key、OAuth、OIDC 等,並要求客戶端每次請求都進行認證。認證失敗將返回標準 HTTP 401 或 403 錯誤碼。
授權與數據隱私(Authorization and Data Privacy)
A2A 建議基於 技能(skills) 和 工具(tools) 兩個維度進行授權管理:
-
技能授權:如通過 OAuth 範圍控制訪問特定技能;
-
工具授權:如通過 API 管理工具限制訪問敏感操作或數據。
可觀測性與追蹤(Tracing and Observability)
A2A 基於 HTTP,因此可直接使用企業現有的日誌、追蹤與監控工具(如 OpenTracing)。客戶端和服務器應插入必要的追蹤頭,並記錄日誌與事件。
與 MCP 協議的互補關係
構建 agent 應用需要同時使用 A2A 和 MCP 協議。agent 通過 MCP 連接工具,通過 A2A 協議連接其他 agent。
-
MCP(Model Context Protocol) 是正在興起的標準,用於連接大模型與工具、數據等資源,正在統一 “函數調用” 接口,降低了工具接入的複雜度,已被多個框架和平臺採納。
-
A2A 則聚焦另一個層面 —— 它是一個應用層協議,允許 agent 之間以 “agent” 的方式進行自然交流(而非簡單工具調用)。谷歌希望 A2A 能與 MCP 形成互補,共同推動智能代理生態的發展。
MCP 協議用來連接 agent 與結構化工具(例如:“升高平臺 2 米”、“扳手右轉 4 毫米”)。
A2A 協議用來讓最終用戶或其他代理與員工交流(例如:“我的車有異響”)。A2A 支持多輪對話和任務計劃的演進(如:“拍張左車輪的照片”、“我看到有液體泄漏,這種情況多久了?”),也讓修理工可以與其他代理(如零件供應商)協作。
案例實戰:基於 A2A 協議實現 MindsDB 企業數據 agent
通過 A2A 協議實現自然語言查詢和跨多個企業數據源的數據分析,基座模型使用 Gemini 2.5 Flash 模型,MindsDB 提供文本到 SQL 的能力,A2A 協議作爲通信協議,實現智能體之間的協作和任務分發。
具體流程爲:
自然語言輸入:用戶通過智能體輸入自然語言查詢,例如:“查詢過去三個月的銷售數據”。
任務解析與分發:智能體將用戶的查詢解析爲任務,並通過 A2A 協議將任務分發給適當的執行代理。
數據查詢與分析:執行代理接收到任務後,使用 MindsDB 的文本到 SQL(Text-to-SQL)技能,將自然語言查詢轉換爲 SQL 查詢,並在多個數據源上執行這些查詢。
結果整合與反饋:執行代理將查詢結果整合,並通過 A2A 協議將結果反饋給用戶。
import json
import aiohttp
import os
from typing import Any, AsyncIterable, Dict, Optional
from dotenv import load_dotenv
# 加載 .env 文件中的環境變量
load_dotenv()
class MindsDBAgent:
"""An agent that data requests from any database, datawarehouse, app."""
# 支持的內容類型列表,僅文本
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
# MindsDB API 的基礎 URL
API_URL = "https://ai.staging.mindsdb.com/chat/completions"
def __init__(self):
# 從環境變量獲取 MindsDB API Key
self.api_key = os.getenv('MINDS_API_KEY')
ifnot self.api_key:
raise ValueError("MINDS_API_KEY environment variable is not set")
# 從環境變量獲取默認的 Mind 名稱
self.model = os.getenv('MIND_NAME')
ifnot self.model:
# 如果未設置,則使用示例中默認的 Mind
self.model = "Sales_Data_Expert_Demo_Mind"
# HTTP 請求頭,包含認證信息
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
def invoke(self, query, session_id) -> str:
# 同步調用接口占位,提示使用流式獲取真實結果
return {"content": "Use stream method to get the results!"}
asyncdef stream(self, query, session_id) -> AsyncIterable[Dict[str, Any]]:
"""
流式方法:針對長時任務返回 SSE 流,每次 yield 一個消息段或完成事件
"""
# 構造請求的 JSON 負載,包含 system 和 user 消息,以及 stream 標誌
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": query}
],
"stream": True
}
# 使用 aiohttp 發送異步 HTTP POST 請求
asyncwith aiohttp.ClientSession() as session:
asyncwith session.post(self.API_URL, headers=self.headers, json=payload) as response:
# 解析響應內容爲字節流,按行處理 SSE
asyncfor line in response.content:
ifnot line:
continue# 跳過空行
line = line.decode('utf-8').strip()
ifnot line.startswith("data: "):
continue# 只處理帶 "data: " 前綴的行
# 去除前綴並嘗試解析 JSON
json_str = line[6:]
try:
data = json.loads(json_str)
except json.JSONDecodeError:
continue# 跳過無法解析的行
# 處理 choices 數組,通常只有一個元素
if"choices"in data:
choice = data["choices"][0]
delta = choice.get("delta", {})
content = delta.get("content")
role = delta.get("role", "")
# 構造基本的消息部分列表
parts = [{"type": "text", "text": content}]
# 判斷是否已完成
if choice.get("finish_reason") == "stop":
yield {
"is_task_complete": True,
"parts": parts
}
continue
# 默認子類型爲推理
subtype = "analysis"
tool_calls = delta.get("tool_calls", [])
# 如果是助手角色發言,轉換爲 ack 類型
if role == "assistant":
subtype = "acknowledge"
# 處理工具調用示例(例如 SQL 查詢)
if tool_calls:
tool_call = tool_calls[0]
function = tool_call.get("function", {})
function_name = function.get("name")
arguments = function.get("arguments", {})
if function_name == "sql_db_query":
subtype = "execute_query"
# 將查詢參數也作爲一個 message part
parts.append({
"type": "text",
"text": str(arguments)
})
# 返回中間事件,is_task_complete 爲 False
yield {
"is_task_complete": False,
"parts": parts,
"metadata": {"type": "reasoning", "subtype": subtype}
}
continue
task manager 的實現:
import json
from typing import AsyncIterable, Union
import logging
# 引入 A2A 協議相關類型
from common.types import (
SendTaskRequest,
TaskSendParams,
Message,
TaskStatus,
Artifact,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
TextPart,
TaskState,
Task,
SendTaskResponse,
InternalError,
JSONRPCResponse,
SendTaskStreamingRequest,
SendTaskStreamingResponse,
)
# 引入任務管理基類(內存實現)
from common.server.task_manager import InMemoryTaskManager
# 引入具體的 MindsDBAgent 實現
from agent import MindsDBAgent
# 通用工具函數,用於檢查內容類型兼容性等
import common.server.utils as utils
logger = logging.getLogger(__name__)
class AgentTaskManager(InMemoryTaskManager):
"""
A2A 協議任務管理器:
繼承自 InMemoryTaskManager,處理 tasks/send 與 tasks/sendSubscribe。
調用 MindsDBAgent 執行實際任務,並將結果按 A2A 事件流式推送。
"""
def __init__(self, agent: MindsDBAgent):
super().__init__()
# 注入具體的代理實例
self.agent = agent
asyncdef _stream_generator(
self, request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
"""
實現流式訂閱:
接收 A2A 客戶端的 tasks/sendSubscribe 請求
並將代理 stream() 方法的輸出轉換爲 A2A 事件流。
"""
# 從請求中獲取 Task 參數
task_send_params: TaskSendParams = request.params
query = self._get_user_query(task_send_params)
try:
# 調用代理的 SSE 流方法
asyncfor item in self.agent.stream(query, task_send_params.sessionId):
is_complete = item["is_task_complete"]
parts = item["parts"]
ifnot is_complete:
# 未完成:狀態爲 WORKING
task_state = TaskState.WORKING
metadata = item.get("metadata")
# 構造 A2A 消息
message = Message(role="agent", parts=parts, metadata=metadata)
task_status = TaskStatus(state=task_state, message=message)
# 更新內存任務狀態
await self._update_store(task_send_params.id, task_status, [])
# 構造 TaskStatusUpdateEvent
update_event = TaskStatusUpdateEvent(
id=task_send_params.id,
status=task_status,
final=False
)
# 推送給客戶端
yield SendTaskStreamingResponse(id=request.id, result=update_event)
else:
# 完成:狀態爲 COMPLETED
task_state = TaskState.COMPLETED
# 構造 Artifact(僅一段 parts)
artifact = Artifact(parts=parts, index=0, append=False)
# 首先發送工件更新事件
yield SendTaskStreamingResponse(
id=request.id,
result=TaskArtifactUpdateEvent(
id=task_send_params.id,
artifact=artifact
)
)
# 更新內存中的任務狀態與工件列表
task_status = TaskStatus(state=task_state)
await self._update_store(task_send_params.id, task_status, [artifact])
# 最終發送狀態完成事件
yield SendTaskStreamingResponse(
id=request.id,
result=TaskStatusUpdateEvent(
id=task_send_params.id,
status=TaskStatus(state=task_state),
final=True
)
)
except Exception as e:
# 異常處理:返回 JSON-RPC 錯誤
logger.error(f"Streaming error: {e}")
yield JSONRPCResponse(
id=request.id,
error=InternalError(message="Error streaming response")
)
def _validate_request(
self, request: Union[SendTaskRequest, SendTaskStreamingRequest]
) -> Optional[JSONRPCResponse]:
"""
驗證客戶端請求的輸出模式與代理支持的內容類型是否兼容。
不兼容時返回 JSON-RPC 錯誤響應。
"""
params: TaskSendParams = request.params
ifnot utils.are_modalities_compatible(
params.acceptedOutputModes,
MindsDBAgent.SUPPORTED_CONTENT_TYPES
):
logger.warning("Incompatible modes: %s", params.acceptedOutputModes)
return utils.new_incompatible_types_error(request.id)
returnNone
asyncdef on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
"""
處理 tasks/send 同步請求,返回最終 Task 或 INPUT_REQUIRED。
"""
error = self._validate_request(request)
if error:
return error
# 插入或更新任務初始記錄
await self.upsert_task(request.params)
# 調用同步 invoke 實現
returnawait self._invoke(request)
asyncdef on_send_task_subscribe(
self, request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
"""
處理 tasks/sendSubscribe 流式請求,返回一個事件流。
"""
error = self._validate_request(request)
if error:
return error
await self.upsert_task(request.params)
return self._stream_generator(request)
asyncdef _update_store(
self, task_id: str, status: TaskStatus, artifacts: list[Artifact]
) -> Task:
"""
更新內存中任務的狀態和工件列表,用於後續查詢與狀態回放。
"""
asyncwith self.lock:
task = self.tasks.get(task_id)
ifnot task:
logger.error(f"Task {task_id} not found")
raise ValueError(f"Task {task_id} not found")
task.status = status
if artifacts:
task.artifacts = (task.artifacts or []) + artifacts
return task
asyncdef _invoke(self, request: SendTaskRequest) -> SendTaskResponse:
"""
同步調用代理的 invoke,處理簡單或需要額外輸入的場景。
"""
params: TaskSendParams = request.params
query = self._get_user_query(params)
try:
result = self.agent.invoke(query, params.sessionId)
except Exception as e:
logger.error(f"Invoke error: {e}")
raise ValueError(f"Error invoking agent: {e}")
# 將返回內容封裝爲 TextPart
parts = [{"type": "text", "text": result}]
# 根據返回結果判斷是否需要更多輸入
state = TaskState.INPUT_REQUIRED if"MISSING_INFO:"in result else TaskState.COMPLETED
# 更新存儲並返回完整 Task
task = await self._update_store(
params.id,
TaskStatus(state=state, message=Message(role="agent", parts=parts)),
[Artifact(parts=parts)]
)
return SendTaskResponse(id=request.id, result=task)
def _get_user_query(self, params: TaskSendParams) -> str:
"""
從 TaskSendParams.message.parts 中提取用戶文本查詢,目前僅支持 TextPart。
"""
part = params.message.parts[0]
ifnot isinstance(part, TextPart):
raise ValueError("Only text parts are supported")
return part.text
REF
https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/ https://google.github.io/A2A/#a2a-unlock-collaborative-agent-to-agent-scenarios-with-a-new-open-protocol https://github.com/google/A2A https://github.com/google/A2A/blob/main/specification/json/a2a.json https://github.com/google/A2A/blob/main/samples/python/agents/mindsdb/README.md
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Gnye7-0fTX56Bf6fqbAc0Q