MCP 如何進行權限管理?完全版示例解析
想必 MCP 大家應該都不陌生
MCP 就是以標準方式讓 LLM 使用不同工具,獲取外界信息的一個 "中間層協議",想要入門瞭解具體 MCP 如何工作的可以參考我之前寫的一篇文章:
MCP 的架構示意圖
MCP 有 host、client、server 三個部分組成,host 就是咱們所說的 claude desktop 或 curser 等,client 就是 host 內置的和 server 進行數據傳輸的客戶端程序,一個 server 對應一個 client,架構示意圖如下:
其中 MCP host 會先根據用戶的輸入進行判斷使用哪個 MCP server 然後進行路由,並使用對應的 client 進行 mcp server 請求,然後拿到對應的調用結果返回給大模型,最終呈現在 host 中。
在企業落地或需要進行權限管控或數據隔離的場景中,我們是不希望所有人都可以對某一個 mcp server 進行訪問,或不希望一個人可以訪問別人的數據,或者根據用戶來對數據進行區分,那麼這個時候怎麼做呢?
第一種方式
定義 mcp server 工具,將用戶的用戶名作爲工具入參,或傳入其他可以區分用戶的標識符
@mcp.tool()
async def get_user_info(user_name: str) -> str:
"""獲取用戶信息和分數
Args:
user_name: 用戶名
"""
score = user_scores.get(user_name, -1)
if score == -1:
return f"用戶 {user_name} 不存在"
return f"用戶 {user_name} 的分數是 {score}"
通過這種方式調用 mcp 服務,可以根據不同的用戶進行區分數據,並可以根據用戶進行權限的管控,但是這是不安全的,仍然可以在 prompt 中寫入其他用戶的用戶名進行信息獲取,這是一種僞權限隔離。
我們都知道,mcp client 和 mcp server 之間有兩種通信方式,一種是 stdio,另一種是基於 http 的 sse:
stdio 標準輸入輸入客戶端通過啓動服務器子進程,並利用標準輸入(stdin)和標準輸出(stdout)建立雙向通信
http 的 sse 基於 HTTP 協議的技術,允許服務器向客戶端單向、實時地推送數據
從這兩方面出發,我們可以引出第二種方式和第三種方式
第二種方式
前面的教程講過可以通過配置 cursor 的 mcp json 文件來掛在 mcp server
{
"mcpServers":{
"airpline":{
"command": "uv",
"args":[
"--directory",
"E:\\AutomationDev\\automation_project\\mcp_demo\\first_mcp",
"run",
"get_airpline_info.py"
]
}
}
}
以上是針對 stdio 的通信方式配置的,其中有命令行和 server 的腳本路徑,我們可以增加一個 env 字段來定義這個工具的密鑰
{
"mcpServers":{
"airpline":{
"command": "uv",
"args":[
"--directory",
"E:\\AutomationDev\\automation_project\\mcp_demo\\first_mcp",
"run",
"get_airpline_info.py"
],
"env":{
"USER_KEY":"XXXXXXX"
}
}
}
}
@mcp.tool()
async def get_user_info() -> str:
user_key = os.getenv("USER_KEY")
#校驗用戶user_key
...
通過 USER_KEY 來進行用戶識別,在 server 定義的工具中,就不需要通過參數的形式接受了,只需要在工具中獲取環境變量中的 USER_KEY 字段然後進行權限校驗和數據獲取。
當然,這種方式會有一個問題,就是這個 KEY 是寫死的,我們要實現動態校驗,可以在調用前進行一個鑑權操作,改寫這個 USER_KEY,remove 掉原有的會話,然後重新實例化一個新的 stdio 的會話,這樣可能需要自己編寫 client,比較麻煩,除非將客戶端和 mcp server 作爲一個整體的服務部署爲 BS 架構,服務器本地調用 mcp server。
所以最適合做鑑權的還是 http 方式,引出我們的第三種方式
第三種方式
使用基於 http 的 sse 來實現 mcp client 和 mcp server 之間通信,現在比較成熟的開源 mcp 服務框架有 fastmcp、fastapi-mcp 等,這類框架直接將 mcp 服務和 fastapi 框架結合,可以直接進行 mcp 服務的開發和部署。 如果要實現 web 服務的權限校驗可以加入 jwt+oauth2.0 的權限校驗機制,這樣就可以實現 mcp server 中 tool 的權限管控和不同用戶的數據隔離。
from fastapi import FastAPI
from fastapi_mcp import FastApiMCP
app = FastAPI()
mcp = FastApiMCP(
app,
# 可選參數
,
description="MCP API Server",
base_url="http://0.0.0.0:8000",
)
# 將MCP 服務器直接掛載到 FastAPI 應用
mcp.mount()
# 工具函數
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user or not verify_password(password, user.hashed_password):
return None
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 獲取當前登錄用戶
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# 獲取當前活躍用戶
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# 登錄接口:獲取 token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 受保護的接口:需要 token 訪問
@app.get("/users/{user_id}", operation_id="get_user_info")
async def read_user(user_id: int, current_user: User = Security(get_current_active_user)):
return {
"user_id": user_id,
"current_user": current_user.username,
"message": f"Hello {current_user.full_name}, you accessed user ID {user_id}"
}
客戶端訪問
GET /users/123
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.xxxxxxx
當然也可以在 cursor 中配置 json 配置文件,具體可以參考 cursor 的官方文檔:https://docs.cursor.com/context/model-context-protocol#manual-configuration
{
"mcpServers": {
"server-name": {
"url": "http://localhost:3000/sse",
"env": {
"API_KEY": "value"
}
}
}
}
總結
實現 MCP 服務權限管控的三種方式:
-
通過工具參數區分用戶:將用戶名或其他標識符作爲工具參數傳入,實現數據區分和權限管控。但這種方式存在安全性問題,用戶可能通過 Prompt 繞過限制。
-
通過環境變量傳遞密鑰:在 MCP 配置中添加
env
字段定義工具密鑰,工具通過環境變量獲取密鑰進行權限校驗。這種方式適合 stdio 通信,但密鑰固定,動態校驗較複雜。 -
基於 HTTP 的 SSE 實現鑑權:使用成熟的框架如
fastmcp
或fastapi-mcp
,結合 JWT 和 OAuth2.0 實現權限校驗。這種方式適合 HTTP 通信,安全性高,適合多用戶場景。
落地實踐中,可以根據自己場景,和自定義開發的程度進行相應的選擇。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/kR4Jxua_ySwlbb_UjF05NQ