從 0 開始實現 MCP-Client

什麼是 MCP-Client?

MCP-ClientModel Context Protocol(模型上下文協議)架構中的一個重要組件,用於連接AI模型(如ClaudeGPT等大型語言模型)與外部數據源、工具和服務的橋樑。

MCP(Model Context Protocol)是由Anthropic公司在 2024 年底首次提出並開源的一種開放標準協議,旨在解決大語言模型(LLM)與外部世界的連接問題。這一協議的核心價值在於打破了AI模型的 "信息孤島" 限制,使模型能夠以標準化的方式訪問和處理實時數據,顯著擴展了大模型的應用場景。

MCP架構中,有三個關鍵組件:

  1. MCP 服務器(Server):輕量級服務程序,負責對接具體數據源或工具(如數據庫、API等),並按照 MCP 規範提供標準化的功能接口。每個MCP服務器封裝了特定的能力,如文件檢索、數據庫查詢等。

  2. MCP 客戶端(Client):嵌入在AI應用中的連接器,與MCP服務器建立一對一連接,充當模型與服務器之間的橋樑。它負責發現可用服務、發送調用請求、獲取結果,並將這些信息傳遞給AI模型。

  3. 宿主應用(Host):運行LLM的應用程序或環境,如Claude桌面應用、Cursor IDE等。宿主通過集成MCP客戶端,使其內部的模型能夠調用外部MCP服務器的能力。

MCP-Client的工作原理是基於JSON-RPC 2.0協議,通過標準化的接口與MCP服務器進行通信。它能夠自動發現可用的MCP服務器及其提供的工具,並將這些信息以結構化的方式提供給大語言模型,使模型能夠理解可用的工具及其功能,從而根據用戶需求決定何時何地調用這些工具。

爲什麼要自寫 MCP-Client?

自主開發MCP-Client有幾個重要的原因和優勢:

  1. 定製化需求:市場上的通用MCP客戶端(例如:ClaudeDesktopCursorCline等等)可能無法滿足特定業務場景的需求。通過自主開發,可以根據企業或個人的具體需求進行定製,比如添加特定的安全驗證、數據過濾、或針對特定領域的優化。

  2. 系統集成:將MCP-Client與現有系統無縫集成。自主開發的MCP-Client可以更好地適配已有的技術棧和架構,減少兼容性問題,提高開發效率。

  3. 數據隱私與安全:對於敏感數據或內部系統,自主開發的MCP-Client可以實現更嚴格的權限控制和數據保護措施,確保敏感信息不會被未授權訪問或泄露。

  4. 性能優化:針對特定用例優化性能。例如,對於需要高頻率、低延遲訪問的場景,可以通過定製MCP-Client來減少通信開銷,提高響應速度。

  5. 擴展功能:實現標準MCP協議之外的增強功能。比如添加高級緩存機制、請求隊列管理、負載均衡,或針對特定AI模型優化的上下文處理邏輯。

  6. 控制和可維護性:對於依賴AI能力的核心業務,自主開發的MCP-Client意味着更好的控制能力和可維護性。當需求變化或出現問題時,可以快速進行調整和修復,而不必依賴第三方供應商。

  7. 適配多種 AI 模型:自主開發的MCP-Client可以設計爲同時支持多種不同的大語言模型(如ClaudeGPT等),根據任務需求動態選擇最適合的模型,提高系統靈活性。

  8. 特殊協議支持:對於需要使用特殊通信協議或數據格式的場景,自主開發可以實現這些非標準需求。

  9. 降低依賴風險:減少對第三方服務的依賴,增強系統的獨立性和韌性。如果第三方服務發生變更或中斷,自主開發的系統可以更快地適應和調整。

  10. 專業知識沉澱:通過自主開發MCP-Client,團隊可以積累 AI 與外部系統集成的專業知識和經驗,這對於長期的 AI 戰略和能力建設非常有價值。

實際上,隨着 AI 應用的深入和普及,越來越多的組織開始認識到,AI 基礎設施(包括MCP-Client)是一種戰略性資產。自主開發這些組件不僅可以獲得更好的技術匹配度,還能在競爭中獲得差異化優勢,尤其是在 AI 技術對業務至關重要的領域。

MCP協議的開放性恰恰爲自主開發MCP-Client提供了可能,使得組織和開發者能夠在標準化框架下創建適合自己需求的定製解決方案,同時仍然保持與整個生態系統的互操作性。通過自寫MCP-Client,開發者可以充分利用AI大模型的能力,同時保持對系統架構和數據流的完全控制。

MCP-Client 編寫

工程搭建

在本節實驗中,需要大家自己準備一個適配openai協議的大模型API,例如:deepseek V3Qwen系列,Moonshot月之暗面等等。

爲了編寫MCP Client,在這裏我們直接使用上一節(從 0 開始實現 MCP-Server)中,創建好的工程。

首先創建環境變量.env文件,在該文件中我們放入自己的大模型相關信息:

主要包含 3 個字段:

  1. OPENAI_API_KEY:大模型的API KEY

  2. BASE_URL:大模型請求地址

  3. MODEL:模型名稱

這裏對模型的種類不限,我使用的是moonshot,大家使用其他大模型均可。

MCP-Prompt

目前支持或深度集成MCP協議的大模型,主要包括:Claude 系列GPT 系列等。

國內的大模型供應商對於MCP協議基本上沒有做針對性的集成訓練,所以在國內使用MCP協議,必須編寫結構化的MCP-Prompt,通過system prompt的方式讓國內的大模型具備適配MCP協議。

爲了編寫這個提示詞,我使用cloudflare對大模型進行代理,然後對CursorMCP請求進行截獲並將MCP提示詞相關內容保留,其他無關內容刪除,得到以下提示詞,當然大家也可以自行對這個提示詞進行修改,實現自己的定製化。

在工程中創建文件MCP_Prompt.txt,將以下內容放入文件中。

You are an AI assistant, you can help users solve problems, including but not limited to programming, editing files, browsing websites, etc.

====

TOOL USE

You have access to a set of tools that are executed upon the user's approval. You can use one tool per message, and will receive the result of that tool use in the user's response. You use tools step-by-step to accomplish a given task, with each tool use informed by the result of the previous tool use.

# Tool Use Formatting

Tool use is formatted using XML-style tags. The tool name is enclosed in opening and closing tags, and each parameter is similarly enclosed within its own set of tags. Here's the structure:

<tool_name>
<parameter1_name>value1</parameter1_name>
<parameter2_name>value2</parameter2_name>
...
</tool_name>

For example:

<read_file>
<path>src/main.js</path>
</read_file>

Always adhere to this format for the tool use to ensure proper parsing and execution.

# Tools
## use_mcp_tool
Description: Request to use a tool provided by a connected MCP server. Each MCP server can provide multiple tools with different capabilities. Tools have defined input schemas that specify required and optional parameters.
Parameters:
- server_name: (required) The name of the MCP server providing the tool
- tool_name: (required) The name of the tool to execute
- arguments: (required) A JSON object containing the tool's input parameters, following the tool's input schema
Usage:
<use_mcp_tool>
<server_name>server name here</server_name>
<tool_name>tool name here</tool_name>
<arguments>
{
  "param1": "value1",
  "param2": "value2"
}
</arguments>
</use_mcp_tool>

# Tool Use Examples
## Example 1: Requesting to use an MCP tool

<use_mcp_tool>
<server_name>weather-server</server_name>
<tool_name>get_forecast</tool_name>
<arguments>
{
  "city": "San Francisco",
  "days": 5
}
</arguments>
</use_mcp_tool>

## Example 2: Another example of using an MCP tool (where the server name is a unique identifier such as a URL)

<use_mcp_tool>
<server_name>github.com/modelcontextprotocol/servers/tree/main/src/github</server_name>
<tool_name>create_issue</tool_name>
<arguments>
{
  "owner": "octocat",
  "repo": "hello-world",
  "title": "Found a bug",
  "body": "I'm having a problem with this.",
  "labels": ["bug", "help wanted"],
  "assignees": ["octocat"]
}
</arguments>
</use_mcp_tool>

===

MCP SERVERS

The Model Context Protocol (MCP) enables communication between the system and locally running MCP servers that provide additional tools and resources to extend your capabilities.

# Connected MCP Servers

When a server is connected, you can use the server's tools via the `use_mcp_tool` tool, and access the server's resources via the `access_mcp_resource` tool.
<$MCP_INFO$>

===

CAPABILITIES
- You have access to MCP servers that may provide additional tools and resources. Each server may provide different capabilities that you can use to accomplish tasks more effectively.

====

RULES
- MCP operations should be used one at a time, similar to other tool usage. Wait for confirmation of success before proceeding with additional operations.

====

OBJECTIVE

You accomplish a given task iteratively, breaking it down into clear steps and working through them methodically.

1. Analyze the user's task and set clear, achievable goals to accomplish it. Prioritize these goals in a logical order.
2. Work through these goals sequentially, utilizing available tools one at a time as necessary. Each goal should correspond to a distinct step in your problem-solving process. You will be informed on the work completed and what's remaining as you go.
3. Once you've completed the user's task, you must use the attempt_completion tool to present the result of the task to the user. 
4. The user may provide feedback, which you can use to make improvements and try again. But DO NOT continue in pointless back and forth conversations, i.e. don't end your responses with questions or offers for further assistance."

在這個提示詞中,我設置了一個特殊的標記符<$MCP_INFO$>,該標記符用於後期載入MCP ServerMCP Tool相關工具的描述信息。

Stdio 通信協議

stdio 傳輸方式是最簡單的通信方式,通常在本地工具之間進行消息傳遞時使用。它利用標準輸入輸出(stdin/stdout)作爲數據傳輸通道,適用於本地進程間的交互。

在這裏我們首先以上一節(從 0 開始實現 MCP-Server)中構建的weather MCP Server爲例,編寫MCP ClientMCP Server進行請求。

編寫mcp_client_stdio.py代碼如下所示:

import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from dotenv import load_dotenv
import os, re
from openai import OpenAI
from lxml import etree

load_dotenv()  # 加載.env文件內容到環境變量中

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        # 需要提前在.env文件中設置相關環境變量
        self.API_KEY = os.getenv("API_KEY")
        self.BASE_URL = os.getenv("BASE_URL")
        self.MODEL = os.getenv("MODEL")
        # 創建LLM client
        self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
        # 存儲歷史消息
        self.messages = []
        # 讀取提示詞模板
        with open("./MCP_Prompt.txt""r", encoding="utf-8") as file:
            self.system_prompt = file.read()

    async def connect_to_stdio_server(self, mcp_name, command: str, args: list[str], env: dict[str, str]={}):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        server_params = StdioServerParameters(
            command=command,
            args=args,
            env=env
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()
        # 將MCP信息添加到system_prompt
        response = await self.session.list_tools()
        available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
        self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>""\n".join(available_tools)+"\n<$MCP_INFO$>")
        tools = response.tools
        print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        self.messages.append(
            {
                "role""system",
                "content": self.system_prompt
            }
        )
        self.messages.append(
            {
                "role""user",
                "content": query
            }
        )

        # Initial Claude API call
        response = self.client.chat.completions.create(
            model=self.MODEL,
            max_tokens=1024,
            messages=self.messages,
        )

        # Process response and handle tool calls
        final_text = []
        content = response.choices[0].message.content
        if'<use_mcp_tool>'notin content:
            final_text.append(content)
        else:
            # 解析tool_string
            server_name, tool_name, tool_args = self.parse_tool_string(content)

            # 執行工具調用
            result = await self.session.call_tool(tool_name, tool_args)
            print(f"[Calling tool {tool_name} with args {tool_args}]")
            print("-"*40)
            print("Server:", server_name)
            print("Tool:", tool_name)
            print("Args:", tool_args)
            print("-"*40)
            print("Result:", result.content[0].text)
            print("-"*40)
            self.messages.append({
                "role""assistant",
                "content": content
            })
            self.messages.append({
                "role""user",
                "content": f"[Tool {tool_name} \n returned: {result}]"
            })

            response = self.client.chat.completions.create(
                model=self.MODEL,
                max_tokens=1024,
                messages=self.messages
            )
            final_text.append(response.choices[0].message.content)
        return"\n".join(final_text)
    
    def parse_tool_string(self, tool_string: str) -> tuple[str, str, dict]:
        """
        解析大模型工具調用返回的字符串
        """
        tool_string = re.findall("(<use_mcp_tool>.*?</use_mcp_tool>)", tool_string, re.S)[0]
        root = etree.fromstring(tool_string)
        server_name = root.find('server_name').text
        tool_name = root.find('tool_name').text
        try:
            tool_args = json.loads(root.find('arguments').text)
        except json.JSONDecodeError:
            raise ValueError("Invalid tool arguments")
        return server_name, tool_name, tool_args

    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        self.messages = []
        whileTrue:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break
                if query.strip() == '':
                    print("Please enter a query.")
                    continue
                response = await self.process_query(query)
                print(response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()

async def main():
    client = MCPClient()
    try:
        await client.connect_to_stdio_server('weather''python', ['weather.py'])
        await client.chat_loop()
    finally:
        await client.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

注意:這裏調用的是上一節中編寫的weather MCP Server,所以weather.py文件應該和mcp_client_stdio.py文件在同一目錄下。

代碼執行效果演示:

MCP Server調用成功!

SSE 通信協議

SSE 是基於 HTTP 協議的流式傳輸機制,它允許服務器通過 HTTP 單向推送事件到客戶端。SSE 適用於客戶端需要接收服務器推送的場景,通常用於實時數據更新。

在這裏同樣可以以上一節中,我們在公網中搭建的weather MCP SSE Server

編寫代碼mcp_client_sse.py如下所示:

import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from dotenv import load_dotenv
import os, re
from openai import OpenAI
from lxml import etree

load_dotenv()  # 加載.env文件內容到環境變量中

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        # 需要提前在.env文件中設置相關環境變量
        self.API_KEY = os.getenv("API_KEY")
        self.BASE_URL = os.getenv("BASE_URL")
        self.MODEL = os.getenv("MODEL")
        # 創建LLM client
        self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
        # 存儲歷史消息
        self.messages = []
        # 讀取提示詞模板
        with open("./MCP_Prompt.txt""r", encoding="utf-8") as file:
            self.system_prompt = file.read()

    async def connect_to_sse_server(self, mcp_name, server_url: str):
        stdio_transport = await self.exit_stack.enter_async_context(sse_client(server_url))
        self.sse, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write))

        await self.session.initialize()
        # List available tools
        response = await self.session.list_tools()
        available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
        self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>""\n".join(available_tools)+"\n<$MCP_INFO$>\n")
        tools = response.tools
        print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        self.messages.append(
            {
                "role""system",
                "content": self.system_prompt
            }
        )
        self.messages.append(
            {
                "role""user",
                "content": query
            }
        )

        # Initial Claude API call
        response = self.client.chat.completions.create(
            model=self.MODEL,
            max_tokens=1024,
            messages=self.messages,
        )

        # Process response and handle tool calls
        final_text = []
        content = response.choices[0].message.content
        if'<use_mcp_tool>'notin content:
            final_text.append(content)
        else:
            # 解析tool_string
            server_name, tool_name, tool_args = self.parse_tool_string(content)

            # 執行工具調用
            result = await self.session.call_tool(tool_name, tool_args)
            print(f"[Calling tool {tool_name} with args {tool_args}]")
            print("-"*40)
            print("Server:", server_name)
            print("Tool:", tool_name)
            print("Args:", tool_args)
            print("-"*40)
            print("Result:", result.content[0].text)
            print("-"*40)
            self.messages.append({
                "role""assistant",
                "content": content
            })
            self.messages.append({
                "role""user",
                "content": f"[Tool {tool_name} \n returned: {result}]"
            })

            response = self.client.chat.completions.create(
                model=self.MODEL,
                max_tokens=1024,
                messages=self.messages
            )
            final_text.append(response.choices[0].message.content)
        return"\n".join(final_text)
    
    def parse_tool_string(self, tool_string: str) -> tuple[str, str, dict]:
        """
        解析大模型工具調用返回的字符串
        """
        tool_string = re.findall("(<use_mcp_tool>.*?</use_mcp_tool>)", tool_string, re.S)[0]
        root = etree.fromstring(tool_string)
        server_name = root.find('server_name').text
        tool_name = root.find('tool_name').text
        try:
            tool_args = json.loads(root.find('arguments').text)
        except json.JSONDecodeError:
            raise ValueError("Invalid tool arguments")
        return server_name, tool_name, tool_args

    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        self.messages = []
        whileTrue:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break
                if query.strip() == '':
                    print("Please enter a query.")
                    continue
                response = await self.process_query(query)
                print(response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()

async def main():
    client = MCPClient()
    try:
         await client.connect_to_sse_server('weather_sse''http://47.113.225.16:8000/sse')
        await client.chat_loop()
    finally:
        await client.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

執行效果演示:

同樣可以成功調用MCP SSE Server

大家可以直接調用我部署好的MCP SSE Server

爲了方便大家學習,這裏可以直接使用的部署好的服務http://47.113.225.16:8000/sse來進行測試。當然也鼓勵大家換成其他的服務嘗試。

採用配置文件進行加載

經過前面的實驗,我們現在已經可以通過自己編寫的MCP Client連接任意MCP Server,包括stdiosse通信協議,但是都只連接了一個MCP Server。用過Cursor或其他MCP Client應用的同學應該很清楚,他們是通過一個JSON配置文件去加載多個MCP Server,那麼我們自己編寫的MCP Client能否達到這個效果呢?

當然可以,接下來的實驗我們就一起來編寫相關代碼,實現這個需求。

首先,我們定義自己的JSON文件協議,例如:

{
    "mcpServers": {
      "weather-sse": {
        "isActive": true,
        "type""stdio",
        "command""python",
        "args": [
          "weather.py"
        ],
        "name""weather-sse",
        "env": {}
      },
      "amap-amap-sse": {
        "isActive": true,
        "type""sse",
        "url""https://mcp.amap.com/sse?key={高德API KEY}",
        "name""amap-amap-sse"
      }
    }
  }

注意:{高德API KEY}大家可以去高德官網註冊賬號,可以免費獲取。

字段意義如下所示:

  1. 公共字段:
  1. stdio相關字段:
  1. sse相關參數:

編寫mcp_client_mix.py文件,內容如下所示:

import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from dotenv import load_dotenv
import os, re
from openai import OpenAI
from lxml import etree

load_dotenv()  # load environment variables from .env

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        # 需要提前在.env文件中設置相關環境變量
        self.API_KEY = os.getenv("API_KEY")
        self.BASE_URL = os.getenv("BASE_URL")
        self.MODEL = os.getenv("MODEL")
        self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
        self.sessions = {}
        self.messages = []
        with open("./MCP_Prompt.txt""r", encoding="utf-8") as file:
            self.system_prompt = file.read()

    async def mcp_json_config(self, mcp_json_file):
        try:
            with open(mcp_json_file, 'r') as f:
                mcp_config: dict = json.load(f)
        except json.JSONDecodeError:
            raise ValueError("Invalid MCP config")
        servers_config: dict = mcp_config.get('mcpServers', {})
        for k, v in servers_config.items():
            try:
                print('-'*50)
                if v.get('isActive', False) == False:
                    continue
                mcp_name = v.get('name', k)
                mcp_type: str = v.get('type''stdio')
                if mcp_type.lower() == 'stdio':
                    command = v.get('command', None)
                    args = v.get('args', [])
                    env = v.get('env', {})
                    if command isNone:
                        raise ValueError(f'{mcp_name} command is empty.')
                    if args == []:
                        raise ValueError(f'{mcp_name} args is empty.')
                    await self.connect_to_stdio_server(mcp_name, command, args, env)
                elif mcp_type.lower() == 'sse':
                    server_url = v.get('url', None)
                    if server_url isNone:
                        raise ValueError(f'{mcp_name} server_url is empty.')
                    await self.connect_to_sse_server(mcp_name, server_url)
                else:
                    raise ValueError(f'{mcp_name} mcp type must in [stdio, sse].')
            except Exception as e:
                print(f"Error connecting to {mcp_name}: {e}")

    async def connect_to_stdio_server(self, mcp_name, command: str, args: list[str], env: dict[str, str]={}):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        server_params = StdioServerParameters(
            command=command,
            args=args,
            env=env
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
        self.sessions[mcp_name] = self.session

        await self.session.initialize()
        # 將MCP信息添加到system_prompt
        response = await self.session.list_tools()
        available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
        self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>""\n".join(available_tools)+"\n<$MCP_INFO$>")
        tools = response.tools
        print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

    async def connect_to_sse_server(self, mcp_name, server_url: str):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        stdio_transport = await self.exit_stack.enter_async_context(sse_client(server_url))
        self.sse, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write))
        self.sessions[mcp_name] = self.session

        await self.session.initialize()
        # List available tools
        response = await self.session.list_tools()
        available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
        self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>""\n".join(available_tools)+"\n<$MCP_INFO$>\n")
        tools = response.tools
        print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        self.messages.append(
            {
                "role""system",
                "content": self.system_prompt
            }
        )
        self.messages.append(
            {
                "role""user",
                "content": query
            }
        )

        # Initial Claude API call
        response = self.client.chat.completions.create(
            model=self.MODEL,
            max_tokens=1024,
            messages=self.messages,
        )

        # Process response and handle tool calls
        final_text = []
        content = response.choices[0].message.content
        if'<use_mcp_tool>'notin content:
            final_text.append(content)
        else:
            # 解析tool_string
            server_name, tool_name, tool_args = self.parse_tool_string(content)

            # 執行工具調用
            result = await self.sessions[server_name].call_tool(tool_name, tool_args)
            print(f"[Calling tool {tool_name} with args {tool_args}]")
            print("-"*40)
            print("Server:", server_name)
            print("Tool:", tool_name)
            print("Args:", tool_args)
            print("-"*40)
            print("Result:", result.content[0].text)
            print("-"*40)
            self.messages.append({
                "role""assistant",
                "content": content
            })
            self.messages.append({
                "role""user",
                "content": f"[Tool {tool_name} \n returned: {result}]"
            })

            response = self.client.chat.completions.create(
                model=self.MODEL,
                max_tokens=1024,
                messages=self.messages
            )
            final_text.append(response.choices[0].message.content)
        return"\n".join(final_text)
    
    def parse_tool_string(self, tool_string: str) -> tuple[str, str, dict]:
        tool_string = re.findall("(<use_mcp_tool>.*?</use_mcp_tool>)", tool_string, re.S)[0]
        root = etree.fromstring(tool_string)
        server_name = root.find('server_name').text
        tool_name = root.find('tool_name').text
        try:
            tool_args = json.loads(root.find('arguments').text)
        except json.JSONDecodeError:
            raise ValueError("Invalid tool arguments")
        return server_name, tool_name, tool_args

    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        self.messages = []
        whileTrue:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break
                if query.strip() == '':
                    print("Please enter a query.")
                    continue
                response = await self.process_query(query)
                print(response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()

async def main():
    client = MCPClient()
    try:
        mcp_config_file = './mcp.json'
        await client.mcp_json_config(mcp_config_file)
        await client.chat_loop()
    finally:
        await client.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

演示效果如下所示:

可以看到,兩個工具都可以成功調用。

值得進一步優化的小建議

在本節中,我們已經實現了通過MCP配置文件,加載所有的MCP Server,並且經過驗證所有的MCP Server工具都可以成功調用。

但是,現在的版本無法完成工具之前進行相互調用,無法通過用戶的需求調用多個工具配置完成用戶問題的解答,基於此大家可以自行修改現有代碼,實現的方式不難,大家可以自己動手實操一下。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/PQFYQ-Z9Pam9KZs80-yILg