以 StreamableHTTP 爲例,對 MCP 進行總結與實踐
引言
近日,MCP 迎來更新,StreamableHTTP 從 3 月 26 日協議發佈,到這禮拜進行落地,從 python 的 sdk 上來看,更新的代碼量並不大,但畢竟是一個協議,從 github 上的幾百條 discussion 中,可以看到還是有非常多褒貶不一的聲音:
本篇就以更新後的 StreamableHTTP,做了一個實踐測試。
從 stdio 到 StreamableHTTP
最初,標準輸入輸出 (stdio
) 爲本地進程間通信(Inter-Process Communication, IPC)提供了基礎,以其簡單性和直接性服務於早期計算和命令行工具。在模型上下文協議(MCP)的背景下,stdio 是支持本地集成的傳輸機制之一。然而,stdio
固有的侷限性——僅限本地、單客戶端、缺乏網絡能力,使其無法滿足現代 Web 應用和分佈式企業系統的需求。
爲了突破本地限制,實現 MCP 網絡化的實時更新,服務器發送事件 (SSE
) 應運而生。它作爲一種基於 HTTP 的服務器推送技術,允許服務器向客戶端單向推送實時數據,顯著改善了需要即時信息的應用體驗。儘管 SSE
解決了實時性的問題,但其單向通信、瀏覽器連接數限制以及在複雜交互和企業級 AI 集成場景下的不足(如雙向通信、連接管理和可擴展性挑戰)逐漸顯現,這些限制促使業界尋求更爲強大和靈活的解決方案。
爲應對 SSE
在模型上下文協議(MCP)等複雜 AI 集成場景中的侷限性,並滿足企業對更高效、靈活和可擴展傳輸機制的需求,StreamableHTTP
登上了舞臺。它旨在通過統一端點、靈活傳輸模式和增強的會話管理,克服先前協議的不足,爲 AI 應用與外部系統的高效、可靠通信提供更完善的解決方案。
它的具體協議標準爲:
下面按 客戶端首次啓動 ➜ 成功連到服務器 ➜ 等待用戶提問 這一完整過程,把 MCP Streamable HTTP 的請求–響應順序用時間線列出來:
- 啓動時: 3 步握手(無用戶輸入)
- 當用戶第一次提問時(模型判斷要用工具,比如說天氣)
從個人開發到企業應用
我們來想一個用戶場景,當使用 MCP 協議時,我們希望 Server 端能記住我們的身份,以便於做一些個性化服務,那最簡單的方式是直接將身份寫入 prompt 中,比如幫我查下 submarineas 的分數是多少,那 LLM 就能調 tool 從數據庫等地方拿到數據,但這會導致我能拿到任何人的分數,結果就是身份被公開化,很顯然是有問題的。所以在早期設計中,標準輸入輸出(stdio)利用環境變量構建了客戶端與服務端的通信模型,定義如下 JSON Schema 規範請求結構:
{
"mcpServers": {
"weather": {
"command": "uv",
"args": [
"--directory",
"/ABSOLUTE/PATH/TO/PARENT/FOLDER/weather",
"run",
"weather.py"
],
"env":{
"username":"submarineas"
}
}
}
}
該方案解決了單客戶端請求訪問隔離數據的 bug,所以這也是目前 cline、claude desktop 等工具使用的方式,通過單進程的 local MCP server 來完成軟件調用:
但對於存在併發的多客戶端,這時候通過進程級別的環境變量就無法做到安全隔離機制(PS:如在異步操作延遲迴調),當然,可能有人會說,既然單進程無法及時將第一次請求的數據清理再做第二次,那我來一個 client,開一個 stdio 通信進程不就好了,我覺得問題不大,我甚至還能開個進程池動態 fork,只要資源是無限的。
那資源有限的情況下,很自然的我們轉向服務器發送事件(SSE)技術,通過 "請求頭承載用戶信息 + 線程級隔離" 重構架構,即 HTTP + SSE 方案,將環境變量轉化爲線程局部存儲(隔離),從本地 io 擴展到網絡 io,客戶端使用 EventSource API 發起與服務器 SSE 端點的 HTTP 連接,服務器保持此連接打開,並以事件的形式將數據流式傳輸到客戶端:
但因爲 SSE 協議比較簡單,依照 http 協議的發展歷程上,它受到了最大打開連接數的限制,並且因爲是單向連接,如何優雅地處理斷開連接並確保不丟失數據仍然可能具有挑戰性,所以對於更具交互性和複雜性的應用程序來說,就提出了 StreamableHTTP。
根據官方已經發布的博客與各種內容總結,可以提煉出 StreamableHTTP 的三種關鍵特性:
-
統一端點: StreamableHTTP 移除了專用的
/sse
端點,將所有通信整合到一個端點(通常爲/mcp
) -
靈活的傳輸模式: 服務器可以根據請求類型和內容動態地選擇返回標準的 HTTP 響應或升級到 SSE 流。對於簡單請求,可以直接返回 HTTP 響應,而無需建立長連接。
-
增強的會話管理: StreamableHTTP 引入了會話 ID 機制(
Mcp-Session-Id
標頭)以支持有狀態交互和連接恢復。會話 ID 必須是全局唯一的且加密安全。
更多內容可以去最後的文獻內容參考查看,我這裏舉個例子,以響應會話 id 名 subm,理論上 StreamableHTTP 想達到的交互效果爲:
那麼下面,就開始實踐。
streamablehttp 實踐
本節將以原始 python-sdk 爲例,編寫 streamablehttp 的一個 demo,然後簡化使用 FastMCP 包,調用其集成好的 streamablehttp 接口服務,最後擴展到 fastapi,接入 tokens 驗證,並說明目前的一個情況。
用 python-sdk 方式編寫天氣查詢
以目前 mcp 官方的 python 版本 sdk 爲例,我們可以構建一個簡單的天氣查詢 streamablehttp 服務:
import contextlib
import logging
import os
from collections.abc import AsyncIterator
import anyio
import click
import httpx
import mcp.types as types
from mcp.server.lowlevel import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send
OPENWEATHER_URL = "https://api.openweathermap.org/data/2.5/weather"
DEFAULT_UNITS = "metric"# use Celsius by default
DEFAULT_LANG = "zh_cn"# Chinese descriptions
asyncdef fetch_weather(city: str, api_key: str) -> dict[str, str]:
"""Call OpenWeather API and return a simplified weather dict.
Raises:
httpx.HTTPStatusError: if the response has a non-2xx status.
"""
params = {
"q": city,
"appid": api_key,
"units": DEFAULT_UNITS,
"lang": DEFAULT_LANG,
}
asyncwith httpx.AsyncClient(timeout=10) as client:
r = await client.get(OPENWEATHER_URL, params=params)
r.raise_for_status()
data = r.json()
weather_main = data["weather"][0]["main"]
description = data["weather"][0]["description"]
temp = data["main"]["temp"]
feels_like = data["main"]["feels_like"]
humidity = data["main"]["humidity"]
return {
"city": city,
"weather": weather_main,
"description": description,
"temp": f"{temp}°C",
"feels_like": f"{feels_like}°C",
"humidity": f"{humidity}%",
}
@click.command()
@click.option("--port", default=9000, help="Port to listen on for HTTP")
@click.option(
"--api-key",
envvar="OPENWEATHER_API_KEY",
required=True,
help="OpenWeather API key (or set OPENWEATHER_API_KEY env var)",
)
@click.option(
"--log-level",
default="INFO",
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
)
@click.option(
"--json-response",
is_flag=True,
default=False,
help="Enable JSON responses instead of SSE streams",
)
def main(port: int, api_key: str, log_level: str, json_response: bool) -> int:
"""Run an MCP weather server using Streamable HTTP transport."""
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("weather-server")
app = Server("mcp-streamable-http-weather")
@app.call_tool()
asyncdef call_tool(name: str, arguments: dict) -> list[types.TextContent]:
"""Handle the 'get-weather' tool call."""
ctx = app.request_context
city = arguments.get("location")
ifnot city:
raise ValueError("'location' is required in arguments")
await ctx.session.send_log_message(
level="info",
data=f"Fetching weather for {city}…",
logger="weather",
related_request_id=ctx.request_id,
)
try:
weather = await fetch_weather(city, api_key)
except Exception as err:
await ctx.session.send_log_message(
level="error",
data=str(err),
logger="weather",
related_request_id=ctx.request_id,
)
raise
await ctx.session.send_log_message(
level="info",
data="Weather data fetched successfully!",
logger="weather",
related_request_id=ctx.request_id,
)
summary = (
f"{weather['city']}:{weather['description']},溫度 {weather['temp']},"
f"體感 {weather['feels_like']},溼度 {weather['humidity']}。"
)
return [
types.TextContent(type="text", text=summary),
]
@app.list_tools()
asyncdef list_tools() -> list[types.Tool]:
"""Expose available tools to the LLM."""
return [
types.Tool(
,
description="查詢指定城市的實時天氣(OpenWeather 數據)",
inputSchema={
"type": "object",
"required": ["location"],
"properties": {
"location": {
"type": "string",
"description": "城市的英文名稱,如 'Beijing'",
}
},
},
)
]
session_manager = StreamableHTTPSessionManager(
app=app,
event_store=None,
json_response=json_response,
stateless=True,
)
asyncdef handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:# noqa: D401,E501
await session_manager.handle_request(scope, receive, send)
@contextlib.asynccontextmanager
asyncdef lifespan(app: Starlette) -> AsyncIterator[None]:
asyncwith session_manager.run():
logger.info("Weather MCP server started! 🚀")
try:
yield
finally:
logger.info("Weather MCP server shutting down…")
starlette_app = Starlette(
debug=False,
routes=[Mount("/mcp", app=handle_streamable_http)],
lifespan=lifespan,
)
import uvicorn
uvicorn.run(starlette_app, host="0.0.0.0", port=port)
return0
if __name__ == "__main__":
main()
這裏略微麻煩的是,因爲沒有用 fastmcp 等集成好自動生成 jsonschema 的模塊,我們還需要自己寫types.Tool
,如果後續有多個,那就直接往後面加,最好還是保持list_tools
唯一,其餘的就是數據處理與組合,我們可以使用 MCP inspector 進行測試,爲:
同樣,還可以使用 cherry studio 在設置裏進行配置:
然後選用 deepseek v3 模型,接入此 mcp 服務進行測試:
用 fastmcp 方式來構建服務
這裏還是以天氣查詢爲例,代碼相比於上面的,少了非常多,而這段代碼,我其實是從 sse 那邊拿過來的,改動核心代碼爲mcp.run(transport="streamable-http")
,即從 sse 改成了 streamable-http,這也是 fastmcp 作者在文檔中寫到的封裝方式:
import json
import httpx
import argparse
from typing import Any
from mcp.server.fastmcp import FastMCP
# 初始化 MCP 服務器
mcp = FastMCP("WeatherServer")
# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
API_KEY = None
USER_AGENT = "weather-app/1.0"
asyncdef fetch_weather(city: str) -> dict[str, Any] | None:
"""
從 OpenWeather API 獲取天氣信息。
"""
if API_KEY isNone:
return {"error": "API_KEY 未設置,請提供有效的 OpenWeather API Key。"}
params = {
"q": city,
"appid": API_KEY,
"units": "metric",
"lang": "zh_cn"
}
headers = {"User-Agent": USER_AGENT}
asyncwith httpx.AsyncClient() as client:
try:
response = await client.get(OPENWEATHER_API_BASE, params=params, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
return {"error": f"HTTP 錯誤: {e.response.status_code}"}
except Exception as e:
return {"error": f"請求失敗: {str(e)}"}
def format_weather(data: dict[str, Any] | str) -> str:
"""
將天氣數據格式化爲易讀文本。
"""
if isinstance(data, str):
try:
data = json.loads(data)
except Exception as e:
returnf"無法解析天氣數據: {e}"
if"error"in data:
returnf"⚠️ {data['error']}"
city = data.get("name", "未知")
country = data.get("sys", {}).get("country", "未知")
temp = data.get("main", {}).get("temp", "N/A")
humidity = data.get("main", {}).get("humidity", "N/A")
wind_speed = data.get("wind", {}).get("speed", "N/A")
weather_list = data.get("weather", [{}])
description = weather_list[0].get("description", "未知")
return (
f"🌍 {city}, {country}\n"
f"🌡 溫度: {temp}°C\n"
f"💧 溼度: {humidity}%\n"
f"🌬 風速: {wind_speed} m/s\n"
f"🌤 天氣: {description}\n"
)
@mcp.tool()
asyncdef query_weather(city: str) -> str:
"""
輸入指定城市的英文名稱,返回今日天氣查詢結果。
"""
data = await fetch_weather(city)
return format_weather(data)
def main():
parser = argparse.ArgumentParser(description="Weather Server")
parser.add_argument("--api_key", type=str, required=True, help="你的 OpenWeather API Key")
args = parser.parse_args()
global API_KEY
API_KEY = args.api_key
mcp.run(transport="streamable-http")
if __name__ == "__main__":
main()
同樣,以 cherry studio 進行測試,結果爲:
將 fastapi 轉化爲 mcp 服務
目前比較好的兩種方式,是 fastmcp 和 fastapi-mcp 框架,這兩者最大的區別是,前者目前還需要考慮如果處理原來的 fastapi 接口,後者是將啓動器做了替換,直接將接口映射爲 mcp 服務,看起來是讓用戶不需要考慮適配性的問題了。但就我個人而言,我對後者持懷疑態度,因爲目前官方協議推出後,看起來依然存在很多問題,python 項目下存在較多 argue,另外維護 fastapi-mcp 項目的機構不太活躍,streamablehttp 還沒有納入更新範疇,即使很多用戶在提 issue 了,截止目前還沒看見回覆。所以本節以前者爲主。
fastmcp 對於 fastapi 的接入採用的是策略,基本涵蓋了 restful 規範:
我們可以對該形式做驗證,針對一個天氣查詢的簡單項目,接入 mcp 後做 check_mcp 函數調用爲:
# Test your MCP server with a client
async def check_mcp(mcp: FastMCP):
# List the components that were created
tools = await mcp.get_tools()
resources = await mcp.get_resources()
templates = await mcp.get_resource_templates()
print(
f"{len(tools)} Tool(s): {', '.join([t.name for t in tools.values()])}"
)
print(
f"{len(resources)} Resource(s): {', '.join([r.name for r in resources.values()])}"
)
print(
f"{len(templates)} Resource Template(s): {', '.join([t.name for t in templates.values()])}"
)
return mcp
"""
2 Tool(s): generate_jwt, get_weather_by_city
3 Resource(s): lookup_city, get_weather_now, root__get
0 Resource Template(s):
"""
而我在該項目的 swagger UI 爲:
可以看到是對應上了。
fastmcp 鑑權
這裏首先定義一箇中間件:
class APIKeyMiddleware(BaseHTTPMiddleware):
"""
API 密鑰認證中間件。
支持通過 x-api-key 請求頭、Bearer Token 或查詢參數傳遞 API 密鑰。
"""
asyncdef dispatch(self, request: Request, call_next):
# 將請求存儲在上下文中
request_ctx.set(request)
# 跳過文檔和架構端點的認證
if request.url.path in ["/docs", "/redoc", "/openapi.json"]:
returnawait call_next(request)
if request.url.path.startswith('/messages/') or request.url.path == '/messages':
returnawait call_next(request)
# 嘗試從多個來源獲取 API 密鑰
# 1. 請求頭認證
api_key = request.headers.get("x-api-key")
# 2. Bearer Token 認證
ifnot api_key:
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
api_key = auth_header[7:] # 移除 "Bearer " 前綴
# 3. 查詢參數認證
ifnot api_key:
api_key = request.query_params.get("api_key")
# 驗證 API 密鑰
ifnot api_key ornot validate_api_key(api_key):
return JSONResponse(
status_code=401,
content={"detail": "Invalid or missing API key"}
)
# 將用戶信息添加到請求狀態
request.state.user = get_user_by_api_key(api_key)
# 繼續處理請求
returnawait call_next(request)
在後續我們程序啓動時,將中間件加入:
# 創建 FastAPI 應用
app = FastAPI(
title="MCP API 服務器",
description="帶有 API 密鑰認證的 MCP 服務器",
version="1.0.0"
)
# 添加認證中間件
app.add_middleware(APIKeyMiddleware)
# 將 MCP 應用掛載到根路徑
app.router.routes.append(Mount('/mcp', app=mcp.streamable_http_app()))
uvicorn.run(app, host=mcp.settings.host, port=mcp.settings.port)
然後就可以啓動進行驗證了,這裏以 mcp inspector 爲例:
我們可以看到,因爲沒有選擇 Authentication,所以收到消息:{"detail": "Invalid or missing API key"},這是上面定義的 401 狀態碼,但後續我在加上驗證 token 後發現streamable_http_app
方式下的好像有問題,我將其與 sse 的方式進行了對比,然後同樣用 inspector 去連接,並在中間件中打印了到底有沒有取到請求頭,結果如下:
因爲程序同樣是之前 sse 通信沒問題修改而來,我又重新對 sse 通信協議下的 demo 進行測試,即更改代碼行爲:
app.router.routes.append(Mount('/mcp', app=mcp.sse_app()))
測試結果如下:
可以看到最開始沒寫 authentication 時,直接收到了 401 的錯誤,但當第二次請求填上 Bearer Token 後,請求通過,出現了 sessionId,我表示很疑惑,於是去摟了一眼,看到 issue 上有個相似問題:
然後看到作者已經在 3 天前修改了 Middleware 的 bug,發佈了 2.3.2 的 fastmcp 版本,但我目前是 2.3.3 的版本,按理來說應該已經修復了,但爲啥沒生效,那可能還是我代碼的寫法問題?emmm,正好作者也更新了最新的 pytest 測試用例,之後有時間來排一下到底問題在哪,想睡覺了。
streamablehttp 網關
這裏推薦兩個我認爲比較不錯的項目框架,一個是 mcp-bridge,它基本上是屬於 100% 純 python 項目,提供了鑑權、日誌管理等功能,目的是充當 OpenAI API 和 MCP(MCP)工具之間的橋樑,允許開發人員通過 OpenAI API 接口利用 MCP 工具。它的流程圖爲:
另一個就是阿里開源的看起來就很強的Higress AI
:
在 MCP 生態系統中,Higress 扮演着至關重要的角色,作爲一個基礎設施組件,它通過強大的協議轉換功能,使得現有微服務能夠無需修改代碼即可融入 MCP 生態。Higress 的核心能力在於它能夠接收 MCP 請求並執行協議轉換,同時提供一系列關鍵的服務,包括統一的身份驗證、流量管理和參數映射,以及安全審計等。
阿里 Nacos(Naming and Configuration Service)作爲雲原生註冊配置中心,最近發佈了 MCP Registry,讓存量業務 API “0 改動” 就可以適配 MCP Server。
最後
本篇是針對近期 StreamableHTTP 協議官方 sdk 落地後的應用場景做了一些探索實踐。在最近半年內,各類智能體協議開始湧現,除了位居前排的 MCP,其餘的如 google 的 A2A、國內的 ANP 等等,都在讓智能體從安全性、可擴展性和實時性幾個角度逐漸演化爲標準化通信協議,也預示着智能體生態系統正在走向成熟和統一。我們有理由相信,在接下來的時間裏,這個領域還會湧現出更多令人矚目的技術和應用。
引用鏈接
[1]
[RFC] Replace HTTP+SSE with new "Streamable HTTP" transport:https://github.com/modelcontextprotocol/modelcontextprotocol/pull/206
[2]
MCP 基礎協議:2025-03-26 修訂版本:https://zhuanlan.zhihu.com/p/1889231328832108293
[3]
An Introduction to MCP and Authorization:https://auth0.com/blog/an-introduction-to-mcp-and-authorization/
[4]
higress github:https://github.com/alibaba/higress
[5]
mcp-bridge github:https://github.com/SecretiveShell/MCP-Bridge
[6]
Build and deploy Remote Model Context Protocol (MCP) servers to Cloudflare:https://blog.cloudflare.com/remote-model-context-protocol-servers-mcp/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/7kzzpz1wP2itrRZlsKQBOA