MCP 如何進行權限管理?完全版示例解析

想必 MCP 大家應該都不陌生

MCP 就是以標準方式讓 LLM 使用不同工具,獲取外界信息的一個 "中間層協議",想要入門瞭解具體 MCP 如何工作的可以參考我之前寫的一篇文章:

MCP 的架構示意圖

MCP 有 host、client、server 三個部分組成,host 就是咱們所說的 claude desktop 或 curser 等,client 就是 host 內置的和 server 進行數據傳輸的客戶端程序,一個 server 對應一個 client,架構示意圖如下:

其中 MCP host 會先根據用戶的輸入進行判斷使用哪個 MCP server 然後進行路由,並使用對應的 client 進行 mcp server 請求,然後拿到對應的調用結果返回給大模型,最終呈現在 host 中。

在企業落地或需要進行權限管控或數據隔離的場景中,我們是不希望所有人都可以對某一個 mcp server 進行訪問,或不希望一個人可以訪問別人的數據,或者根據用戶來對數據進行區分,那麼這個時候怎麼做呢?

第一種方式
定義 mcp server 工具,將用戶的用戶名作爲工具入參,或傳入其他可以區分用戶的標識符

@mcp.tool()
async def get_user_info(user_name: str) -> str:
    """獲取用戶信息和分數

    Args:
        user_name: 用戶名
    """
    score = user_scores.get(user_name, -1)
    if score == -1:
        return f"用戶 {user_name} 不存在"
    return f"用戶 {user_name} 的分數是 {score}"

通過這種方式調用 mcp 服務,可以根據不同的用戶進行區分數據,並可以根據用戶進行權限的管控,但是這是不安全的,仍然可以在 prompt 中寫入其他用戶的用戶名進行信息獲取,這是一種僞權限隔離。

我們都知道,mcp client 和 mcp server 之間有兩種通信方式,一種是 stdio,另一種是基於 http 的 sse: 

stdio 標準輸入輸入客戶端通過啓動服務器子進程,並利用標準輸入(stdin)和標準輸出(stdout)建立雙向通信

http 的 sse 基於 HTTP 協議的技術,允許服務器向客戶端單向、實時地推送數據

ZpF7Lb

從這兩方面出發,我們可以引出第二種方式和第三種方式

第二種方式
前面的教程講過可以通過配置 cursor 的 mcp json 文件來掛在 mcp server

{
    "mcpServers":{
        "airpline":{
            "command": "uv",
            "args":[
                "--directory",
                "E:\\AutomationDev\\automation_project\\mcp_demo\\first_mcp",
                "run",
                "get_airpline_info.py"
            ]
        }
    }
}

以上是針對 stdio 的通信方式配置的,其中有命令行和 server 的腳本路徑,我們可以增加一個 env 字段來定義這個工具的密鑰

{
    "mcpServers":{
        "airpline":{
            "command": "uv",
            "args":[
                "--directory",
                "E:\\AutomationDev\\automation_project\\mcp_demo\\first_mcp",
                "run",
                "get_airpline_info.py"
            ],
            "env":{
              "USER_KEY":"XXXXXXX"
            }
        }
    }
}
@mcp.tool()
async def get_user_info() -> str:
    user_key = os.getenv("USER_KEY")
    #校驗用戶user_key
    ...

通過 USER_KEY 來進行用戶識別,在 server 定義的工具中,就不需要通過參數的形式接受了,只需要在工具中獲取環境變量中的 USER_KEY 字段然後進行權限校驗和數據獲取。

當然,這種方式會有一個問題,就是這個 KEY 是寫死的,我們要實現動態校驗,可以在調用前進行一個鑑權操作,改寫這個 USER_KEY,remove 掉原有的會話,然後重新實例化一個新的 stdio 的會話,這樣可能需要自己編寫 client,比較麻煩,除非將客戶端和 mcp server 作爲一個整體的服務部署爲 BS 架構,服務器本地調用 mcp server

所以最適合做鑑權的還是 http 方式,引出我們的第三種方式

第三種方式

使用基於 http 的 sse 來實現 mcp client 和 mcp server 之間通信,現在比較成熟的開源 mcp 服務框架有 fastmcp、fastapi-mcp 等,這類框架直接將 mcp 服務和 fastapi 框架結合,可以直接進行 mcp 服務的開發和部署。 如果要實現 web 服務的權限校驗可以加入 jwt+oauth2.0 的權限校驗機制,這樣就可以實現 mcp server 中 tool 的權限管控和不同用戶的數據隔離。

from fastapi import FastAPI
from fastapi_mcp import FastApiMCP

app = FastAPI()

mcp = FastApiMCP(
    app,
    # 可選參數
    ,
    description="MCP API Server",
    base_url="http://0.0.0.0:8000",
)

# 將MCP 服務器直接掛載到 FastAPI 應用
mcp.mount()
# 工具函數
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user or not verify_password(password, user.hashed_password):
        return None
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# 獲取當前登錄用戶
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# 獲取當前活躍用戶
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


# 登錄接口:獲取 token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


# 受保護的接口:需要 token 訪問
@app.get("/users/{user_id}", operation_id="get_user_info")
async def read_user(user_id: int, current_user: User = Security(get_current_active_user)):
    return {
        "user_id": user_id,
        "current_user": current_user.username,
        "message": f"Hello {current_user.full_name}, you accessed user ID {user_id}"
    }

客戶端訪問

GET /users/123
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.xxxxxxx

當然也可以在 cursor 中配置 json 配置文件,具體可以參考 cursor 的官方文檔:https://docs.cursor.com/context/model-context-protocol#manual-configuration

{
  "mcpServers": {
    "server-name": {
      "url": "http://localhost:3000/sse",
      "env": {
        "API_KEY": "value"
      }
    }
  }
}

總結

實現 MCP 服務權限管控的三種方式:

  1. 通過工具參數區分用戶:將用戶名或其他標識符作爲工具參數傳入,實現數據區分和權限管控。但這種方式存在安全性問題,用戶可能通過 Prompt 繞過限制。

  2. 通過環境變量傳遞密鑰:在 MCP 配置中添加env字段定義工具密鑰,工具通過環境變量獲取密鑰進行權限校驗。這種方式適合 stdio 通信,但密鑰固定,動態校驗較複雜。

  3. 基於 HTTP 的 SSE 實現鑑權:使用成熟的框架如fastmcpfastapi-mcp,結合 JWT 和 OAuth2.0 實現權限校驗。這種方式適合 HTTP 通信,安全性高,適合多用戶場景。

落地實踐中,可以根據自己場景,和自定義開發的程度進行相應的選擇。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/kR4Jxua_ySwlbb_UjF05NQ