MCP 超強源碼解讀!Streamable HTTP 如何實現服務端向客戶端通信
在最新的 Model Context Protocol(MCP,模型上下文協議)版本(2025-03-26)[1] 中引入了 Streamable HTTP 的通信方式,取代了舊版本中的 SSE 通信方式,成爲了新的遠程 MCP 調用標準。
Streamable HTTP 通信下的 client 向 server 的請求不需要像之前必須保持 SSE 的長連接,而是通過 client 發起 HTTP 請求,server 通過 SSE 或者 JSON 進行響應。Streamable HTTP 優化了 MCP server 的橫向擴展能力,更符合當今雲原生的服務端環境。
然而在 MCP 規範中,聲明瞭 server 向 client 發送通知(notification)和請求(request)的能力。想要實現 server 向 client 的通信,就依然需要單獨維護一個 SSE 連接。本文旨在通過 MCP 官方 TypeScript SDK[2] 源碼解讀,說明 Streamable HTTP 通信方式下,MCP server 向 client 的通信具體實現。
Streamable HTTP 簡介
Streamable HTTP 通信方式下的 MCP server 只需要暴露一個同時支持 POST/GET 的端點(之前 SSE 需要兩個端點):
- MCP client 向 server 發送消息時,必須使用 POST 方法,server 根據消息類型進行 SSE 或 JSON 響應;
- MCP client 向 server 發送 GET 請求時,server 會初始化一個單獨的 SSE 連接用於 server 向 client 發送請求消息與通知消息。
Server 向 Client 通信
初始化 SSE
根據 MCP 規範中對 Streamable HTTP 的描述可知,server 向 client 通信的 SSE 連接需要 client 向 server 的端點發送 GET 請求。在實現中,什麼時候去做這個 GET 請求進行 SSE 初始化呢?
根據 MCP 規範可知,MCP 的生命週期 [3](lifecycle)包括初始化(initialization)、常規交互(operation)和終止(Shutdown)三階段。其中初始化階段有 “三次握手”:
-
第一次握手:Client 向 server 發送 initialize request;
-
第二次握手:Server 向 client 響應 initialize response;
-
第三次握手:Client 向 server 發送 initialized notification,通知 server 初始化完成。
SSE 的初始化就是發生在第三次握手之後。
客戶端
TypeScript SDK 源碼 src/client/streamableHttp.ts
中的 send
方法是 MCP client 所有消息的出口方法,在這個方法中可以看到 SSE 的初始化實現:
// 向 server 發送 HTTP POST 請求
const init = {
...this._requestInit,
method: "POST",
headers,
body: JSON.stringify(message),
signal: this._abortController?.signal,
};
const response = await fetch(this._url, init);
// ...
if (response.status === 202) {
// message 只有是 response 或 notification 的時候 server 纔會響應 202 Accepted
if (isInitializedNotification(message)) {
// 如果 message 是 initialized notification,則進行 SSE 初始化
this._startOrAuthSse({ resumptionToken: undefined }).catch(err =>this.onerror?.(err));
}
return;
}
MCP client 這段代碼發生在 HTTP 消息發送後,client 判斷 server 是否成功響應了初始化完成通知(第三次握手),如果符合這個條件則進行 SSE 的初始化,實現了在初始化階段結束後就立刻進行用於 server 向 client 通信的 SSE 的初始化。
在 _startOrAuthSse
方法中,我們可以看到 client 會向 server 發送 GET 請求以初始化 SSE 連接,並對後續的 SSE stream 進行監聽處理:
private async _startOrAuthSse(options: StartSSEOptions): Promise<void> {
try {
// 向 server 發送 GET 請求以初始化 SSE 連接,監聽 server 向 client 發送的消息
const headers = awaitthis._commonHeaders();
headers.set("Accept", "text/event-stream");
const response = await fetch(this._url, {
method: "GET",
headers,
signal: this._abortController?.signal,
});
if (!response.ok) {
// ... 對 HTTP status 爲 401 Unauthorized 和 405 Method Not Allowed 分別進行處理(略)
// 對其它 HTTP status (例如 500 Internal Error)則拋出異常(記住這一點有助於下文重試邏輯的理解)
throw new StreamableHTTPError(
response.status,
`Failed to open SSE stream: ${response.statusText}`,
);
}
// 處理後續 SSE stream
this._handleSseStream(response.body, options);
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
}
服務端
那麼 MCP server 是如何處理 client SSE 初始化請求的?MCP server 需要對 GET 請求進行處理(假設 server 端點是 /mcp
):
const app = express();
app.use(express.json());
app.get('/mcp', async (req: Request, res: Response) => {
const server = const server = new McpServer({
name: 'minimal-mcp-server',
version: '1.0.0',
});
const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({});
await transport.connect(server);
await tansport.handleRequest(req, res, req.body);
});
MCP server 通過 tansport.handleRequest
處理 client GET 請求,該方法位於 SDK 源碼的 src/server/streamableHttp.ts
路徑下。該方法會根據請求方法進行對應的處理,對於 GET 請求,實際由 handleGetRequest
方法進行具體處理:
async handleRequest(req: IncomingMessage, res: ServerResponse, parsedBody?: unknown): Promise<void> {
if (req.method === "POST") {
await this.handlePostRequest(req, res, parsedBody);
} elseif (req.method === "GET") {
await this.handleGetRequest(req, res);
} elseif (req.method === "DELETE") {
await this.handleDeleteRequest(req, res);
} else {
await this.handleUnsupportedRequest(res);
}
}
在 handleGetRequest
方法中,主要做的事情就是檢查 client 的 Accept header,並向 client 返回 SSE headers:
private async handleGetRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
// Client 請求中的 Accept header 必須包含 text/event-stream
const acceptHeader = req.headers.accept;
if (!acceptHeader?.includes("text/event-stream")) {
res.writeHead(406).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Not Acceptable: Client must accept text/event-stream"
},
id: null
}));
return;
}
// 實現可恢復性(resumability),重放 last-event-id 之後的消息
if (this._eventStore) {
const lastEventId = req.headers['last-event-id'] asstring | undefined;
if (lastEventId) {
await this.replayEvents(lastEventId, res);
return;
}
}
// 向 client 返回 headers,調用 flushHeaders 讓 client 不阻塞等待 HTTP 請求
const headers: Record<string, string> = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
};
res.writeHead(200, headers).flushHeaders();
// 內部 SSE map 中存儲當前 SSE response,key 爲 _GET_stream
this._streamMapping.set(this._standaloneSseStreamId, res);
res.on("close", () => {
this._streamMapping.delete(this._standaloneSseStreamId);
});
}
SSE Stream 處理
客戶端
MCP client 的 _startOrAuthSse
方法中,初始化了 SSE 連接後,就通過 _handleSseStream
方法去處理後續的 SSE stream。
_handleSseStream
方法核心邏輯都在 processStream
函數中,processStream
異步執行,代碼寫得還是比較清晰的:
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, options: StartSSEOptions): void {
const { onresumptiontoken } = options;
let lastEventId: string | undefined;
const processStream = async () => {
try {
// 構建 Pipeline: binary stream -> text decoder -> SSE parser,並拿到 reader
const reader = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();
// 循環讀取並處理接收到的 SSE 消息
while (true) {
const { value: event, done } = await reader.read();
if (done) {
break;
}
// 如果 server 帶了 event id,則說明需要實現可恢復性,則 client 也需更新 lastEventId
if (event.id) {
lastEventId = event.id;
onresumptiontoken?.(event.id);
}
// server 推送的 SSE 消息可以不帶 event 類型,但如果帶了,類型必須是 message,否則忽略
if (!event.event || event.event === "message") {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
}
}
}
} catch (error) {
// 處理 SSE streaming 過程中遇到的錯誤,一般是網絡原因,或者服務端重新部署等
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
// 如果 client 沒有 abort,嘗試進行重連
if (this._abortController && !this._abortController.signal.aborted) {
if (lastEventId !== undefined) {
try {
this._scheduleReconnection({
resumptionToken: lastEventId,
onresumptiontoken
}, 0);
}
catch (error) {
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
}
}
}
}
};
// 不加 await 調用 processStream 這個 async 函數,使得它異步執行,不阻塞
// _handleSseStream 方法的執行
processStream();
}
這裏對 client 監聽 server SSE 時的重試策略進行展開說明。重試策略的核心邏輯在 _scheduleReconnection
方法當中,client 會重試執行 _startOrAuthSse
方法進行 SSE 重新初始化與處理,採用了指數退避(exponential backoff)的重試算法:
// 入參 options 爲再次執行 _startOrAuthSse 方法所需的參數,需要帶上 lastEventId 以實現 resumability,重放 lastEventId 之後的 SSE event
// 入參 attempCount 爲當前已經重試的次數,默認 0
private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void {
// 最大重試次數,默認爲 2,超過後就拋錯,走 onerror 回調
const maxRetries = this._reconnectionOptions.maxRetries;
if (maxRetries > 0 && attemptCount >= maxRetries) {
this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`));
return;
}
// 指數退避算法,基於當前已重試的次數計算下次重連的間隔時間(毫秒)
const delay = this._getNextReconnectionDelay(attemptCount);
// 經過 delay 時長後,進行重連,依然是 _startOrAuthSse 方法,透傳 options
setTimeout(() => {
this._startOrAuthSse(options).catch(error => {
// 如果重試失敗,捕獲 error,並遞歸調用本方法,將已重試次數 +1
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
this._scheduleReconnection(options, attemptCount + 1);
});
}, delay);
}
這裏值得說明的一點是,只有 _startOrAuthSse
初始化 SSE 的 GET 請求失敗,即得到 server 非 401 和 405 的響應纔會拋出異常並命中這裏的 catch
,進而導致重試次數加一;而一旦當 SSE 初始化成功,進入 _handleSseStream
的流程後,如果再次發生了重連,則重試次數從 0 開始計數。這裏大家可以再結合前面 “初始化 SSE” 章節的內容和源碼再仔細梳理下,涉及到遞歸調用的代碼總是會有一些理解難度的。
補習下指數退避重試算法:
計算重試間隔
服務端
當 MCP server 需要向 client 發送 notification 或 request 類消息的時候,就會向這個 SSE 連接中推送 event 類型爲 message 的消息。
不管是 notification 還是 request,最終都是通過src/server/streamableHttp.ts
路徑下的 send
方法作爲出口。
MCP server 會判斷消息類型是否爲 notification 和 reqeust(判斷方法見代碼註釋),如果是的話就往直前建立好的 SSE 連接中推送消息:
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
let requestId = options?.relatedRequestId;
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
// 假如消息 message 類型是 response,則給 requestId 賦值,而 notification 或 request 不用
requestId = message.id;
}
// 如果是 notification 或者 request 則 requestId 等於 undefined,那麼就通過之前建立好的 SSE 連接推送消息。
if (requestId === undefined) {
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
throw newError("Cannot send a response on a standalone SSE stream unless resuming a previous client request");
}
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId)
if (standaloneSse === undefined) {
return;
}
// 若配置了 eventStore 則需要實現可恢復性(resumability),記錄當前 message 內容用於後續重放恢復
let eventId: string | undefined;
if (this._eventStore) {
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
}
// 推送 SSE 消息,如果 eventId 不爲空則帶上。
this.writeSSEEvent(standaloneSse, message, eventId);
return;
}
// ... 對 response 的處理邏輯,不在本文討論範圍內(略)
}
推送消息的具體實現在 writeSSEEvent
方法中:
private writeSSEEvent(res: ServerResponse, message: JSONRPCMessage, eventId?: string): boolean {
let eventData = `event: message\n`;
// Include event ID if provided - this is important for resumability
if (eventId) {
eventData += `id: ${eventId}\n`;
}
eventData += `data: ${JSON.stringify(message)}\n\n`;
return res.write(eventData);
}
MCP server 向 SSE 連接推送 event 類型爲 message 的消息,data 部分就是具體的 notification 或 request 內容。若配置了 eventStore 以實現可恢復性,則還會在 SSE 消息中加入 id 字段。
總結
本文通過對 MCP 官方 TypeScript SDK 源碼解讀,說明了 Streamable HTTP 通信方式下,MCP server 向 client 發送通知(notification)和響應(response)類型消息的具體實現,分別從 MCP 客戶端和服務端的角度介紹了 SSE 的初始化和 SSE stream 處理的具體實現。
下面這張圖總結了本文講到的源碼函數調用關係,讀者們可以根據文章內容,結合這張圖進行更深刻的理解:
本文的源碼分析中頻繁提到了可恢復性(resumability)這一概念。在 MCP 的規範中,關於這一特性的章節叫做 Resumability and Redelivery[4],規範了 MCP client 和 server 如何交互,以實現重新建立連接時,過往消息的重放,提高了 MCP 的可靠性。如果大家對 MCP resumability 的概念和具體源碼實現感興趣,請點贊、關注、分享,儘快安排一篇詳細的源碼分析文章!
參考資料
[1]
Model Context Protocol(MCP,模型上下文協議)版本(2025-03-26): https://modelcontextprotocol.io/specification/2025-03-26
[2]
MCP 官方 TypeScript SDK: https://github.com/modelcontextprotocol/typescript-sdk
[3]
MCP 的生命週期: https://modelcontextprotocol.io/specification/2025-03-26/basic/lifecycle
[4]
Resumability and Redelivery: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#resumability-and-redelivery
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/SYiNKstQkGaHqcobG98O2w