以 StreamableHTTP 爲例,對 MCP 進行總結與實踐

引言

近日,MCP 迎來更新,StreamableHTTP 從 3 月 26 日協議發佈,到這禮拜進行落地,從 python 的 sdk 上來看,更新的代碼量並不大,但畢竟是一個協議,從 github 上的幾百條 discussion 中,可以看到還是有非常多褒貶不一的聲音:

本篇就以更新後的 StreamableHTTP,做了一個實踐測試。

從 stdio 到 StreamableHTTP

最初,標準輸入輸出 (stdio) 爲本地進程間通信(Inter-Process Communication, IPC)提供了基礎,以其簡單性和直接性服務於早期計算和命令行工具。在模型上下文協議(MCP)的背景下,stdio 是支持本地集成的傳輸機制之一。然而,stdio 固有的侷限性——僅限本地、單客戶端、缺乏網絡能力,使其無法滿足現代 Web 應用和分佈式企業系統的需求。

爲了突破本地限制,實現 MCP 網絡化的實時更新,服務器發送事件 (SSE) 應運而生。它作爲一種基於 HTTP 的服務器推送技術,允許服務器向客戶端單向推送實時數據,顯著改善了需要即時信息的應用體驗。儘管 SSE 解決了實時性的問題,但其單向通信、瀏覽器連接數限制以及在複雜交互和企業級 AI 集成場景下的不足(如雙向通信、連接管理和可擴展性挑戰)逐漸顯現,這些限制促使業界尋求更爲強大和靈活的解決方案。

爲應對 SSE 在模型上下文協議(MCP)等複雜 AI 集成場景中的侷限性,並滿足企業對更高效、靈活和可擴展傳輸機制的需求,StreamableHTTP 登上了舞臺。它旨在通過統一端點、靈活傳輸模式和增強的會話管理,克服先前協議的不足,爲 AI 應用與外部系統的高效、可靠通信提供更完善的解決方案。

它的具體協議標準爲:

下面按 客戶端首次啓動 ➜ 成功連到服務器 ➜ 等待用戶提問 這一完整過程,把 MCP Streamable HTTP 的請求–響應順序用時間線列出來:

Nmem9H

WpdKz8

從個人開發到企業應用

我們來想一個用戶場景,當使用 MCP 協議時,我們希望 Server 端能記住我們的身份,以便於做一些個性化服務,那最簡單的方式是直接將身份寫入 prompt 中,比如幫我查下 submarineas 的分數是多少,那 LLM 就能調 tool 從數據庫等地方拿到數據,但這會導致我能拿到任何人的分數,結果就是身份被公開化,很顯然是有問題的。所以在早期設計中,標準輸入輸出(stdio)利用環境變量構建了客戶端與服務端的通信模型,定義如下 JSON Schema 規範請求結構:

{
  "mcpServers": {
    "weather": {
      "command": "uv",
      "args": [
        "--directory",
        "/ABSOLUTE/PATH/TO/PARENT/FOLDER/weather",
        "run",
        "weather.py"
      ],
      "env":{
        "username":"submarineas"
      }
    }
  }
}

該方案解決了單客戶端請求訪問隔離數據的 bug,所以這也是目前 cline、claude desktop 等工具使用的方式,通過單進程的 local MCP server 來完成軟件調用:

但對於存在併發的多客戶端,這時候通過進程級別的環境變量就無法做到安全隔離機制(PS:如在異步操作延遲迴調),當然,可能有人會說,既然單進程無法及時將第一次請求的數據清理再做第二次,那我來一個 client,開一個 stdio 通信進程不就好了,我覺得問題不大,我甚至還能開個進程池動態 fork,只要資源是無限的。

那資源有限的情況下,很自然的我們轉向服務器發送事件(SSE)技術,通過 "請求頭承載用戶信息 + 線程級隔離" 重構架構,即 HTTP + SSE 方案,將環境變量轉化爲線程局部存儲(隔離),從本地 io 擴展到網絡 io,客戶端使用 EventSource API 發起與服務器 SSE 端點的 HTTP 連接,服務器保持此連接打開,並以事件的形式將數據流式傳輸到客戶端:

但因爲 SSE 協議比較簡單,依照 http 協議的發展歷程上,它受到了最大打開連接數的限制,並且因爲是單向連接,如何優雅地處理斷開連接並確保不丟失數據仍然可能具有挑戰性,所以對於更具交互性和複雜性的應用程序來說,就提出了 StreamableHTTP。

根據官方已經發布的博客與各種內容總結,可以提煉出 StreamableHTTP 的三種關鍵特性:

  1. 統一端點: StreamableHTTP 移除了專用的/sse端點,將所有通信整合到一個端點(通常爲/mcp

  2. 靈活的傳輸模式: 服務器可以根據請求類型和內容動態地選擇返回標準的 HTTP 響應或升級到 SSE 流。對於簡單請求,可以直接返回 HTTP 響應,而無需建立長連接。

  3. 增強的會話管理: StreamableHTTP 引入了會話 ID 機制(Mcp-Session-Id標頭)以支持有狀態交互和連接恢復。會話 ID 必須是全局唯一的且加密安全。

更多內容可以去最後的文獻內容參考查看,我這裏舉個例子,以響應會話 id 名 subm,理論上 StreamableHTTP 想達到的交互效果爲:

那麼下面,就開始實踐。

streamablehttp 實踐

本節將以原始 python-sdk 爲例,編寫 streamablehttp 的一個 demo,然後簡化使用 FastMCP 包,調用其集成好的 streamablehttp 接口服務,最後擴展到 fastapi,接入 tokens 驗證,並說明目前的一個情況。

用 python-sdk 方式編寫天氣查詢

以目前 mcp 官方的 python 版本 sdk 爲例,我們可以構建一個簡單的天氣查詢 streamablehttp 服務:

import contextlib
import logging
import os
from collections.abc import AsyncIterator

import anyio
import click
import httpx
import mcp.types as types
from mcp.server.lowlevel import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send

OPENWEATHER_URL = "https://api.openweathermap.org/data/2.5/weather"
DEFAULT_UNITS = "metric"# use Celsius by default
DEFAULT_LANG = "zh_cn"# Chinese descriptions

asyncdef fetch_weather(city: str, api_key: str) -> dict[str, str]:
    """Call OpenWeather API and return a simplified weather dict.

    Raises:
        httpx.HTTPStatusError: if the response has a non-2xx status.
    """
    params = {
        "q": city,
        "appid": api_key,
        "units": DEFAULT_UNITS,
        "lang": DEFAULT_LANG,
    }
    asyncwith httpx.AsyncClient(timeout=10) as client:
        r = await client.get(OPENWEATHER_URL, params=params)
        r.raise_for_status()
        data = r.json()
    weather_main = data["weather"][0]["main"]
    description = data["weather"][0]["description"]
    temp = data["main"]["temp"]
    feels_like = data["main"]["feels_like"]
    humidity = data["main"]["humidity"]
    return {
        "city": city,
        "weather": weather_main,
        "description": description,
        "temp": f"{temp}°C",
        "feels_like": f"{feels_like}°C",
        "humidity": f"{humidity}%",
    }


@click.command()
@click.option("--port", default=9000, help="Port to listen on for HTTP")
@click.option(
    "--api-key",
    envvar="OPENWEATHER_API_KEY",
    required=True,
    help="OpenWeather API key (or set OPENWEATHER_API_KEY env var)",
)
@click.option(
    "--log-level",
    default="INFO",
    help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
)
@click.option(
    "--json-response",
    is_flag=True,
    default=False,
    help="Enable JSON responses instead of SSE streams",
)
def main(port: int, api_key: str, log_level: str, json_response: bool) -> int:
    """Run an MCP weather server using Streamable HTTP transport."""

    logging.basicConfig(
        level=getattr(logging, log_level.upper()),
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )
    logger = logging.getLogger("weather-server")

    app = Server("mcp-streamable-http-weather")

    @app.call_tool()
    asyncdef call_tool(name: str, arguments: dict) -> list[types.TextContent]:
        """Handle the 'get-weather' tool call."""
        ctx = app.request_context
        city = arguments.get("location")
        ifnot city:
            raise ValueError("'location' is required in arguments")

        await ctx.session.send_log_message(
            level="info",
            data=f"Fetching weather for {city}…",
            logger="weather",
            related_request_id=ctx.request_id,
        )

        try:
            weather = await fetch_weather(city, api_key)
        except Exception as err:
            await ctx.session.send_log_message(
                level="error",
                data=str(err),
                logger="weather",
                related_request_id=ctx.request_id,
            )
            raise

        await ctx.session.send_log_message(
            level="info",
            data="Weather data fetched successfully!",
            logger="weather",
            related_request_id=ctx.request_id,
        )

        summary = (
            f"{weather['city']}:{weather['description']},溫度 {weather['temp']},"
            f"體感 {weather['feels_like']},溼度 {weather['humidity']}。"
        )

        return [
            types.TextContent(type="text", text=summary),
        ]

    @app.list_tools()
    asyncdef list_tools() -> list[types.Tool]:
        """Expose available tools to the LLM."""
        return [
            types.Tool(
                ,
                description="查詢指定城市的實時天氣(OpenWeather 數據)",
                inputSchema={
                    "type": "object",
                    "required": ["location"],
                    "properties": {
                        "location": {
                            "type": "string",
                            "description": "城市的英文名稱,如 'Beijing'",
                        }
                    },
                },
            )
        ]

    session_manager = StreamableHTTPSessionManager(
        app=app,
        event_store=None, 
        json_response=json_response,
        stateless=True,
    )

    asyncdef handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:# noqa: D401,E501
        await session_manager.handle_request(scope, receive, send)

    @contextlib.asynccontextmanager
    asyncdef lifespan(app: Starlette) -> AsyncIterator[None]:
        asyncwith session_manager.run():
            logger.info("Weather MCP server started! 🚀")
            try:
                yield
            finally:
                logger.info("Weather MCP server shutting down…")

    starlette_app = Starlette(
        debug=False,
        routes=[Mount("/mcp", app=handle_streamable_http)],
        lifespan=lifespan,
    )

    import uvicorn

    uvicorn.run(starlette_app, host="0.0.0.0", port=port)
    return0


if __name__ == "__main__":
    main()

這裏略微麻煩的是,因爲沒有用 fastmcp 等集成好自動生成 jsonschema 的模塊,我們還需要自己寫types.Tool,如果後續有多個,那就直接往後面加,最好還是保持list_tools唯一,其餘的就是數據處理與組合,我們可以使用 MCP inspector 進行測試,爲:

同樣,還可以使用 cherry studio 在設置裏進行配置:

然後選用 deepseek v3 模型,接入此 mcp 服務進行測試:

用 fastmcp 方式來構建服務

這裏還是以天氣查詢爲例,代碼相比於上面的,少了非常多,而這段代碼,我其實是從 sse 那邊拿過來的,改動核心代碼爲mcp.run(transport="streamable-http"),即從 sse 改成了 streamable-http,這也是 fastmcp 作者在文檔中寫到的封裝方式:

import json
import httpx
import argparse  
from typing import Any
from mcp.server.fastmcp import FastMCP

# 初始化 MCP 服務器
mcp = FastMCP("WeatherServer")

# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
API_KEY = None
USER_AGENT = "weather-app/1.0"

asyncdef fetch_weather(city: str) -> dict[str, Any] | None:
    """
    從 OpenWeather API 獲取天氣信息。
    """
    if API_KEY isNone:
        return {"error": "API_KEY 未設置,請提供有效的 OpenWeather API Key。"}

    params = {
        "q": city,
        "appid": API_KEY,
        "units": "metric",
        "lang": "zh_cn"
    }
    headers = {"User-Agent": USER_AGENT}

    asyncwith httpx.AsyncClient() as client:
        try:
            response = await client.get(OPENWEATHER_API_BASE, params=params, headers=headers, timeout=30.0)
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            return {"error": f"HTTP 錯誤: {e.response.status_code}"}
        except Exception as e:
            return {"error": f"請求失敗: {str(e)}"}

def format_weather(data: dict[str, Any] | str) -> str:
    """
    將天氣數據格式化爲易讀文本。
    """
    if isinstance(data, str):
        try:
            data = json.loads(data)
        except Exception as e:
            returnf"無法解析天氣數據: {e}"

    if"error"in data:
        returnf"⚠️ {data['error']}"

    city = data.get("name", "未知")
    country = data.get("sys", {}).get("country", "未知")
    temp = data.get("main", {}).get("temp", "N/A")
    humidity = data.get("main", {}).get("humidity", "N/A")
    wind_speed = data.get("wind", {}).get("speed", "N/A")
    weather_list = data.get("weather", [{}])
    description = weather_list[0].get("description", "未知")

    return (
        f"🌍 {city}, {country}\n"
        f"🌡 溫度: {temp}°C\n"
        f"💧 溼度: {humidity}%\n"
        f"🌬 風速: {wind_speed} m/s\n"
        f"🌤 天氣: {description}\n"
    )

@mcp.tool()
asyncdef query_weather(city: str) -> str:
    """
    輸入指定城市的英文名稱,返回今日天氣查詢結果。
    """
    data = await fetch_weather(city)
    return format_weather(data)

def main():
    parser = argparse.ArgumentParser(description="Weather Server")
    parser.add_argument("--api_key", type=str, required=True, help="你的 OpenWeather API Key")
    args = parser.parse_args()
    global API_KEY
    API_KEY = args.api_key
    mcp.run(transport="streamable-http")

if __name__ == "__main__":
    main()

同樣,以 cherry studio 進行測試,結果爲:

將 fastapi 轉化爲 mcp 服務

目前比較好的兩種方式,是 fastmcp 和 fastapi-mcp 框架,這兩者最大的區別是,前者目前還需要考慮如果處理原來的 fastapi 接口,後者是將啓動器做了替換,直接將接口映射爲 mcp 服務,看起來是讓用戶不需要考慮適配性的問題了。但就我個人而言,我對後者持懷疑態度,因爲目前官方協議推出後,看起來依然存在很多問題,python 項目下存在較多 argue,另外維護 fastapi-mcp 項目的機構不太活躍,streamablehttp 還沒有納入更新範疇,即使很多用戶在提 issue 了,截止目前還沒看見回覆。所以本節以前者爲主。

fastmcp 對於 fastapi 的接入採用的是策略,基本涵蓋了 restful 規範:

NUnjDP

我們可以對該形式做驗證,針對一個天氣查詢的簡單項目,接入 mcp 後做 check_mcp 函數調用爲:

# Test your MCP server with a client
async def check_mcp(mcp: FastMCP):
    # List the components that were created
    tools = await mcp.get_tools()
    resources = await mcp.get_resources()
    templates = await mcp.get_resource_templates()

    print(
        f"{len(tools)} Tool(s): {', '.join([t.name for t in tools.values()])}"
    )
    print(
        f"{len(resources)} Resource(s): {', '.join([r.name for r in resources.values()])}"
    )
    print(
        f"{len(templates)} Resource Template(s): {', '.join([t.name for t in templates.values()])}"
    )

    return mcp


"""
2 Tool(s): generate_jwt, get_weather_by_city
3 Resource(s): lookup_city, get_weather_now, root__get
0 Resource Template(s): 
"""

而我在該項目的 swagger UI 爲:

可以看到是對應上了。

fastmcp 鑑權

這裏首先定義一箇中間件:

class APIKeyMiddleware(BaseHTTPMiddleware):
    """
    API 密鑰認證中間件。
    支持通過 x-api-key 請求頭、Bearer Token 或查詢參數傳遞 API 密鑰。
    """
    asyncdef dispatch(self, request: Request, call_next):
        # 將請求存儲在上下文中
        request_ctx.set(request)

        # 跳過文檔和架構端點的認證
        if request.url.path in ["/docs", "/redoc", "/openapi.json"]:
            returnawait call_next(request)
        if request.url.path.startswith('/messages/') or request.url.path == '/messages':
            returnawait call_next(request)

        # 嘗試從多個來源獲取 API 密鑰
        # 1. 請求頭認證
        api_key = request.headers.get("x-api-key")

        # 2. Bearer Token 認證
        ifnot api_key:
            auth_header = request.headers.get("Authorization")
            if auth_header and auth_header.startswith("Bearer "):
                api_key = auth_header[7:]  # 移除 "Bearer " 前綴

        # 3. 查詢參數認證
        ifnot api_key:
            api_key = request.query_params.get("api_key")

        # 驗證 API 密鑰
        ifnot api_key ornot validate_api_key(api_key):
            return JSONResponse(
                status_code=401,
                content={"detail": "Invalid or missing API key"}
            )

        # 將用戶信息添加到請求狀態
        request.state.user = get_user_by_api_key(api_key)

        # 繼續處理請求
        returnawait call_next(request)

在後續我們程序啓動時,將中間件加入:

# 創建 FastAPI 應用
app = FastAPI(
    title="MCP API 服務器",
    description="帶有 API 密鑰認證的 MCP 服務器",
    version="1.0.0"
)

# 添加認證中間件
app.add_middleware(APIKeyMiddleware)
# 將 MCP 應用掛載到根路徑
app.router.routes.append(Mount('/mcp', app=mcp.streamable_http_app()))

uvicorn.run(app, host=mcp.settings.host, port=mcp.settings.port)

然後就可以啓動進行驗證了,這裏以 mcp inspector 爲例:

我們可以看到,因爲沒有選擇 Authentication,所以收到消息:{"detail": "Invalid or missing API key"},這是上面定義的 401 狀態碼,但後續我在加上驗證 token 後發現streamable_http_app方式下的好像有問題,我將其與 sse 的方式進行了對比,然後同樣用 inspector 去連接,並在中間件中打印了到底有沒有取到請求頭,結果如下:

因爲程序同樣是之前 sse 通信沒問題修改而來,我又重新對 sse 通信協議下的 demo 進行測試,即更改代碼行爲:

app.router.routes.append(Mount('/mcp', app=mcp.sse_app()))

測試結果如下:

可以看到最開始沒寫 authentication 時,直接收到了 401 的錯誤,但當第二次請求填上 Bearer Token 後,請求通過,出現了 sessionId,我表示很疑惑,於是去摟了一眼,看到 issue 上有個相似問題:

然後看到作者已經在 3 天前修改了 Middleware 的 bug,發佈了 2.3.2 的 fastmcp 版本,但我目前是 2.3.3 的版本,按理來說應該已經修復了,但爲啥沒生效,那可能還是我代碼的寫法問題?emmm,正好作者也更新了最新的 pytest 測試用例,之後有時間來排一下到底問題在哪,想睡覺了。

streamablehttp 網關

這裏推薦兩個我認爲比較不錯的項目框架,一個是 mcp-bridge,它基本上是屬於 100% 純 python 項目,提供了鑑權、日誌管理等功能,目的是充當 OpenAI API 和 MCP(MCP)工具之間的橋樑,允許開發人員通過 OpenAI API 接口利用 MCP 工具。它的流程圖爲:

另一個就是阿里開源的看起來就很強的Higress AI

在 MCP 生態系統中,Higress 扮演着至關重要的角色,作爲一個基礎設施組件,它通過強大的協議轉換功能,使得現有微服務能夠無需修改代碼即可融入 MCP 生態。Higress 的核心能力在於它能夠接收 MCP 請求並執行協議轉換,同時提供一系列關鍵的服務,包括統一的身份驗證、流量管理和參數映射,以及安全審計等。

阿里 Nacos(Naming and Configuration Service)作爲雲原生註冊配置中心,最近發佈了 MCP Registry,讓存量業務 API “0 改動” 就可以適配 MCP Server。

最後

本篇是針對近期 StreamableHTTP 協議官方 sdk 落地後的應用場景做了一些探索實踐。在最近半年內,各類智能體協議開始湧現,除了位居前排的 MCP,其餘的如 google 的 A2A、國內的 ANP 等等,都在讓智能體從安全性、可擴展性和實時性幾個角度逐漸演化爲標準化通信協議,也預示着智能體生態系統正在走向成熟和統一。我們有理由相信,在接下來的時間裏,這個領域還會湧現出更多令人矚目的技術和應用。

引用鏈接

[1] [RFC] Replace HTTP+SSE with new "Streamable HTTP" transport:https://github.com/modelcontextprotocol/modelcontextprotocol/pull/206
[2]MCP 基礎協議:2025-03-26 修訂版本:https://zhuanlan.zhihu.com/p/1889231328832108293
[3]An Introduction to MCP and Authorization:https://auth0.com/blog/an-introduction-to-mcp-and-authorization/
[4]higress github:https://github.com/alibaba/higress
[5]mcp-bridge github:https://github.com/SecretiveShell/MCP-Bridge
[6]Build and deploy Remote Model Context Protocol (MCP) servers to Cloudflare:https://blog.cloudflare.com/remote-model-context-protocol-servers-mcp/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/7kzzpz1wP2itrRZlsKQBOA