基於 MCP 的數據智能查詢應用實現(利用 Sampling 實現微博內容的情感分析)

本文以數據智能查詢應用場景爲例,該應用基於 LLM、MCP 實現以下功能:

  1. 用戶以自然語言的形式提問,LLM 識別用戶提問意圖,並通過 MCP 查詢對應的數據庫數據,結合數據查詢結果回答用戶問題。

  2. 通過 MCP 的 Sampling,賦予 MCP 服務端智能能力:服務端在查詢微博數據時,分析並返回每條微博內容的情感傾向。

實體對象關係

假設應用場景有三類數據:中國省份信息、電影數據、微博數據。

主要對象關係如上圖所示(與前文 MCP:編程實戰,手把手教你實現數學運算智能問答應用FastMCP,構建 MCP 的 python 框架,比官方 SDK 更好用!類似),區別在於此處 MCP 服務端暴露的是資源 resources 服務,提供數據庫數據查詢服務):

利用 Sampling 實現微博內容的情感分析

關於 MCP 的 Sampling ,可閱讀前文:一文讀懂 MCP 的 Sampling(採樣),賦予 MCP 服務端智能能力!。通過 Sampling,MCP 服務端在提供微博數據查詢服務時,可以藉助 LLM 能力,實現對微博內容的情感分析,並將分析結果返回給用戶。

通過 Sampling 實現微博內容情感分析流程如下:

基於 FastMCP 的 Sampling 實現:

Sampling 的交互流程:

編碼實現

服務端

中國地區數據、電影數據查詢服務:

@mcp.resource("db://province_info/data/{province_name}")
asyncdef get_province_data(province_name: str) -> str:
    """中國省份(含直轄市)信息

    參數:
    province_name: 中國省份名稱或直轄市,如廣東省,北京市

    返回:
    以 json 格式返回某省份信息
    """
    sql = '''
            select region_code,
                   province_name,
                   capital_city,
                   area,
                   population,
                   abbreviation 
            from public.chinese_provinces 
            where province_name = '{province_name}'
    '''.format(province_name=province_name)
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            return json.dumps(list(rows), default=str)


@mcp.resource("db://movie_info/data/{movie_name}")
asyncdef get_movie_data(movie_name: str) -> str:
    """電影信息

    參數:
    movie_name: 電影名稱

    返回:
    以 json 格式返回某電影信息
    """
    sql = '''
    select movie_type, 
           main_actors, 
           region, 
           director, 
           features, 
           rating, 
           movie_name
    from public.chinese_movie_ratings
    where movie_name = '{movie_name}'
    '''.format(movie_name=movie_name)
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            return json.dumps(list(rows), default=str)

微博數據查詢(調用上下文對象的 sample 方法,獲取每條微博的情感傾向):

@mcp.resource("db://weibo_data/data/?date={date}&limit={limit}")
asyncdef get_weibo_data(ctx: Context, date: str, limit: int = 10,) -> str:
    """微博數據

    參數:
    date: 日期,數據格式爲 yyyymmdd
    limit: 要求返回的數據量

    返回:
    以 json 格式返回微博數據
    """
    sql = f'''
            select weibo_id,
                   user_name,
                   publish_date,
                   publish_time,
                   weibo_content 
            from public.t_weibo_ncov
            where publish_date = '{date}'
            limit {limit}
    '''.format(date=date, limit=limit)
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()

            # 通過 Sampling,給每個微博內容增加情感傾向標籤
            for weibo_data in rows:
                system_prompt = '請給出這條微博內容的情感傾向,標註分爲三類的其中一個:積極,中性和消極'
                response = await ctx.sample(
                    messages=weibo_data['weibo_content'],
                    system_prompt=system_prompt
                )
                assert isinstance(response, TextContent)
                weibo_data['sentiment_tag'] = response.text
                print(weibo_data)
            return json.dumps(list(rows), default=str)


@mcp.resource("db://weibo_data/data/?weibo_user={weibo_user}&limit={limit}")
asyncdef get_weibo_user_data(ctx: Context, weibo_user: str, limit: int = 10,) -> str:
    """某具體微博用戶名的微博內容

    參數:
    weibo_user: 微博用戶名
    limit: 要求返回的數據量

    返回:
    以 json 格式返回微博數據
    """
    sql = '''
            select weibo_id,
                   user_name,
                   publish_date,
                   publish_time,
                   weibo_content 
            from public.t_weibo_ncov
            where user_name = '{weibo_user}'
            limit {limit}
    '''.format(weibo_user=weibo_user, limit=limit)
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()

            # 通過 Sampling,給每個微博內容增加情感傾向標籤
            for weibo_data in rows:
                system_prompt = '請給出這條微博內容的情感傾向,標註分爲三類的其中一個:積極,中性和消極'
                response = await ctx.sample(
                    messages=weibo_data['weibo_content'],
                    system_prompt=system_prompt
                )
                assert isinstance(response, TextContent)
                weibo_data['sentiment_tag'] = response.text
                print(weibo_data)
            return json.dumps(list(rows), default=str)

服務端設置與運行:

mcp = FastMCP("weibo data mcp server",
              debug=True,
              host="0.0.0.0",
              port=8002)

if __name__ == "__main__":
    mcp.run("sse")

客戶端

會話管理:

class LLMClient:
    """大語言模型客戶端,負責與LLM API進行通信"""

    def __init__(self, model_name: str, url: str, api_key: str) -> None:
        """
        初始化LLM客戶端

        Args:
            model_name: 模型名稱
            url: API基礎URL
            api_key: API密鑰
        """
        self.model_name: str = model_name
        self.url: str = url
        self.client = OpenAI(api_key=api_key, base_url=url)

    def get_response(self, messages: list[dict[str, str]]) -> str:
        """
        向LLM發送消息並獲取響應

        Args:
            messages: 消息列表,每條消息包含role和content

        Returns:
            LLM的響應文本
        """
        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=messages,
            stream=False
        )
        return response.choices[0].message.content


class ChatSession:
    """聊天會話管理器,處理用戶輸入、LLM響應和資源訪問"""

    def __init__(self, llm_client: LLMClient, mcp_client: Client) -> None:
        """
        初始化聊天會話

        Args:
            llm_client: LLM客戶端實例
            mcp_client: MCP客戶端實例,用於訪問資源
        """
        self.mcp_client: Client = mcp_client
        self.llm_client: LLMClient = llm_client

    asyncdef process_llm_response(self, llm_response: str) -> str:
        """
        處理LLM的響應,解析資源URI調用或工具調用並執行

        Args:
            llm_response: LLM返回的響應文本

        Returns:
            處理後的響應文本,包含資源數據或工具執行結果
        """
        try:
            # 檢查是否爲資源URI調用(以db://開頭)
            if llm_response.strip().startswith('db://'):
                uri = llm_response.strip()
                try:
                    # 執行資源讀取
                    resource_data = await self.mcp_client.read_resource(uri=uri)
                    returnf"Resource data: {resource_data}"
                except Exception as e:
                    error_msg = f"Error reading resource: {str(e)}"
                    logging.error(error_msg)
                    return error_msg
            else:
                # 嘗試解析爲JSON工具調用(保留兼容性)
                if llm_response.startswith('```json'):
                    llm_response = llm_response.strip('```json').strip('```').strip()
                try:
                    tool_call = json.loads(llm_response)
                    if"tool"in tool_call and"arguments"in tool_call:
                        # 檢查工具是否可用
                        available_tools = await self.mcp_client.list_tools()
                        if any(tool.name == tool_call["tool"] for tool in available_tools):
                            try:
                                # 執行工具調用
                                tool_result = await self.mcp_client.call_tool(
                                    tool_call["tool"], tool_call["arguments"]
                                )
                                returnf"Tool execution result: {tool_result}"
                            except Exception as e:
                                error_msg = f"Error executing tool: {str(e)}"
                                logging.error(error_msg)
                                return error_msg
                        returnf"Tool not found: {tool_call['tool']}"
                except json.JSONDecodeError:
                    pass
                # 如果不是JSON格式或工具調用,直接返回原始響應
                return llm_response
        except Exception as e:
            error_msg = f"Error processing LLM response: {str(e)}"
            logging.error(error_msg)
            return error_msg

    asyncdef start(self, system_message: str) -> None:
        """
        啓動聊天會話的主循環

        Args:
            system_message: 系統提示消息,指導LLM的行爲
        """
        messages = [{"role""system""content": system_message}]
        whileTrue:
            try:
                # 獲取用戶輸入
                user_input = input("用戶: ").strip().lower()
                if user_input in ["quit""exit""退出"]:
                    print('AI助手已退出')
                    break
                messages.append({"role""user""content": user_input})

                # 獲取LLM的初始響應
                llm_response = self.llm_client.get_response(messages)
                print("助手: ", llm_response)

                # 處理可能的資源調用或工具調用
                result = await self.process_llm_response(llm_response)

                # 如果處理結果與原始響應不同,說明執行了資源調用或工具調用,需要進一步處理
                while result != llm_response:
                    messages.append({"role""assistant""content": llm_response})
                    messages.append({"role""system""content": result})

                    # 將資源數據或工具執行結果發送回LLM獲取新響應
                    llm_response = self.llm_client.get_response(messages)
                    result = await self.process_llm_response(llm_response)
                    print("助手: ", llm_response)

                messages.append({"role""assistant""content": llm_response})

            except KeyboardInterrupt:
                print('AI助手已退出')
                break

Sampling:通過 marvin 框架,創建 Agent,負責處理向 LLM 的 Sampling 請求

# 獲取DeepSeek API密鑰
api_key = os.getenv('DEEPSEEK_API_KEY')

# 初始化DeepSeek模型
model = OpenAIModel(
    'deepseek-chat',
    provider=DeepSeekProvider(api_key=api_key),
)

# 創建Marvin智能助手
agent = marvin.Agent(
    model=model,
    
)


asyncdef sampling_func(
        messages: list[SamplingMessage],
        params: SamplingParams,
        ctx: RequestContext,
) -> str:
    """
    採樣函數,用於處理消息並獲取LLM響應

    Args:
        messages: 消息列表
        params: 採樣參數
        ctx: 請求上下文

    Returns:
        LLM的響應文本
    """
    returnawait marvin.say_async(
        message=[m.content.text for m in messages],
        instructions=params.systemPrompt,
        agent=agent
    )

main 函數:

async def main():
    """主函數,初始化客戶端並啓動聊天會話"""
    asyncwith Client("http://127.0.0.1:8002/sse", sampling_handler=sampling_func) as mcp_client:
        # 初始化LLM客戶端,使用通義千問模型
        llm_client = LLMClient(
            model_name='qwen-plus-latest',
            api_key=os.getenv('DASHSCOPE_API_KEY'),
            url='https://dashscope.aliyuncs.com/compatible-mode/v1'
        )

        # 獲取可用資源模板
        resource_templates = await mcp_client.list_resource_templates()
        template_dicts = [template.__dict__ for template in resource_templates]
        resources_description = json.dumps(template_dicts, ensure_ascii=False)

        # 系統提示,指導LLM如何使用資源模板和返回響應
        system_message = f'''
                你是一個智能助手,能夠訪問多種數據資源。嚴格遵循以下協議返回響應:

                可用資源模板:{resources_description}

                響應規則:
                1、當用戶詢問需要查詢數據時,判斷是否需要調用資源模板:
                 - 如果需要查詢數據,返回對應的資源URI
                 - 如果是普通對話,直接回答用戶問題

                2、資源調用格式:
                 - 返回純淨的URI字符串,不包含任何其他內容
                 - URI格式必須嚴格按照資源模板定義
                 - 根據用戶需求填入合適的參數值
                 
                3、URI參數說明:
                 - date: 日期參數,格式如0101表示1月1日
                 - limit: 限制返回數據條數
                 - 其他參數根據資源模板定義填入

                4、非數據查詢的響應:
                 - 對於普通對話,直接給出自然語言回答
                 - 不需要調用資源時,不要返回URI

                5、收到資源數據後:
                 - 將數據轉化爲用戶友好的格式
                 - 突出顯示關鍵信息
                 - 保持回覆簡潔清晰
                 - 根據用戶問題提供相關分析
                '''
        # 啓動聊天會話
        chat_session = ChatSession(llm_client=llm_client, mcp_client=mcp_client)
        await chat_session.start(system_message=system_message)

運行效果

1 運行服務端:python weibo_data_server.py

2 運行客戶端的 main 函數,進入會話,用戶提問:

下面的運行結果顯示,LLM 可根據用戶的提問,轉換爲相應的數據查詢(通過 MCP 查詢外部數據資源),並根據查詢結果,回答用戶的問題。微博數據的查詢返回,提供了每條微博的情感傾向(利用 MCP Sampling 實現微博內容的情感分析):

D:\python_project\mcp_learning\.venv\Scripts\python.exe D:\python_project\mcp_learning\sample\weibo_data_client.py 
用戶: 介紹一下北京市
助手:  db://province_info/data/北京市
助手:  北京市是中國的首都,位於華北地區。以下是一些關於北京市的關鍵信息:

- **區域代碼**:110000
- **省份名稱**:北京市
- **省會城市**:北京市
- **面積**:16,410平方公里
- **人口**:約2189.3萬人
- **簡稱**:京

北京不僅是政治和文化中心,也是世界著名古都之一,擁有衆多歷史遺蹟和現代化建築,如故宮、頤和園和鳥巢等。
用戶: 介紹一下電影我不是藥神
助手:  db://movie_info/data/我不是藥神
助手:  電影《我不是藥神》是一部感人至深的劇情片,以下是該電影的關鍵信息:

- **電影名稱**:我不是藥神  
- **電影類型**:劇情  
- **地區**:中國大陸  
- **導演**:文牧野  
- **主演**:徐崢、王傳君、週一圍、譚卓、章宇  
- **影片特點**:經典  
- **評分**:8.9分(豆瓣高分)  

這部電影改編自真實事件,講述了一位普通藥店老闆程勇爲白血病患者走私低價救命藥的故事。影片深刻反映了醫療問題與人性掙扎,獲得了廣泛讚譽和多項獎項。
用戶: 獲取1月1日前兩條微博
助手:  db://weibo_data/data/?date=0101&limit=2
助手:  以下是2020年1月1日的前兩條微博內容:

1. **用戶:** 小-fairy  
   **時間:** 00:00  
   **內容:** "2019年過去了,所有不好的都忘記吧……2020年已經來了,所有願望都許在新年這一天吧……2020中考順利,家人身體健康,國家越來越強大?"  
   **情緒標籤:** 積極  

2. **用戶:** 黃大喵的金油瓶  
   **時間:** 00:42  
   **內容:** "這新年的祝福,手長的我先給利利送到~@齊木心美衝鴨(因爲我預測零點的服務器會崩潰hhhhh~)本號的第一條原創!!獻給你!!!味~說正祝可愛的利利同學在新的一年裏也要好好加油哦~回想起與利利認識的第一天;和利利交談的第一句話;和利利坐在一節課堂上……我都記不得了~⊙_?展開全文c"  
   **情緒標籤:** 積極  

這兩條微博展現了人們在新年伊始時對未來的美好祝願以及分享生活點滴的喜悅心情。
用戶: 獲取微博用戶“是快樂小張張”的微博
助手:  db://weibo_data/data/?weibo_user=是快樂小張張&limit=10
助手:  以下是微博用戶“是快樂小張張”最近的一條微博內容:

- **發佈時間**:02月03日 17:52  
- **微博內容**:終於哭了不再爲自己的小小世界。我與疫情的關係是不想延遲開學的我,希望它快快過去,痛恨它阻礙了我的感情。在這段時間裏,外面的世界紛紛擾擾,我在小小世界伸手不見五指,我甚至羨慕那些爲了疫情新聞義憤填膺的普通人,至少還有心情關注外面的世界。孔維的視頻我看到親情:採訪時一個頂天立地醫生他想…展開全文c  
- **情緒標籤**:消極  

這條微博反映了作者在疫情期間的情緒困擾,既有對個人生活和感情受阻的無奈,也有對外界疫情動態的關注與感慨。
用戶: quit
AI助手已退出

Process finished with exit code 0

以上是基於 MCP 的數據智能查詢應用實現。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/s2jEcKt4YnS3exg4uE4IsA