在 Pisa-Proxy 中,如何利用 Rust 實現 MySQL 代理

前言

背景

在 Database Mesh 中,Pisanix 是一套以數據庫爲中心的治理框架,爲用戶提供了諸多治理能力,例如:數據庫流量治理,SQL 防火牆,負載均衡和審計等。在 Pisanix 中,Pisa-Proxy 是作爲整個 Database Mesh 實現中數據平面的核心組件。Pisa-Proxy 服務本身需要具備 MySQL 協議感知,理解 SQL 語句,能對後端代理的數據庫做一些特定的策略,SQL 併發控制和斷路等功能。在這諸多特性當中,能夠理解 MySQL 協議就尤爲重要,本篇將主要介紹 MySQL 協議和在 Pisa-Proxy 中 MySQL 協議的 Rust 實現。

Why Rust

爲什麼要選用 Rust 語言呢?我們的考量有以下幾個必要條件。

整體架構,模塊設計

整體架構

如圖 1,代理服務包含服務端和客戶端的協議解析、SQL 解析、訪問控制、連接池等模塊。

圖 1  Pisa-Proxy 工作流程圖

工作流程

在圖 1 中我們可以看出整個 Proxy 服務可以概括爲以下幾個階段。

  1. 首先 Pisa-Proxy 支持 MySQL 協議,將自己僞裝爲數據庫服務端,應用連接配置只需修改訪問地址即可建連 Pisa-Proxy 通過讀取應用發來的握手請求和數據包;

  2. 得到應用發來的 SQL 語句後對該 SQL 進行語法解析,並得到該 SQL 的 AST;

  3. 得到對應 AST 後,基於 AST 實現高級訪問控制和 SQL 防火牆能力;

  4. 訪問控制和防火牆通過後 SQL 提交執行,SQL 執行階段的指標將採集爲 Prometheus Metrics,最後根據負載均衡策略獲取執行該語句的後端數據庫連接;

  5. 如果連接池爲空將根據配置建立和後端數據庫的連接,SQL 將從該連接發往後端數據庫;

  6. 最後讀取 SQL 執行結果,組裝後返回給客戶端。

如何用 Rust 快速實現 MySQL 代理服務

如圖 2,在整個代理服務中總體分爲兩部分:服務端和客戶端,即代理服務作爲服務端處理來自客戶端的請求。和服務端,需要對服務端發起認證,並將客戶端端命令發送給 MySQL 數據庫。在這兩部分中我們需要創建兩套 Socket 來完成網絡數據包的處理。

圖 2

技術選型

介紹

在網絡報處理和運行時處理上,我們選用了 Rust 實現的 Tokio(https://github.com/tokio-rs/tokio)框架。Tokio 框架是 Rust 編寫的可靠、異步和精簡應用程序的運行時。並且 Tokio 是一個事件驅動的非阻塞 I/O 平臺,用於使用 Rust 編程語言編寫異步應用程序。在高層次上,它提供了幾個主要組件:

同時,Tokio 還提供了豐富的工具鏈,例如編解碼工具包、分幀包等等,可以使我們更加方便快捷地處理 MySQL 中各種各樣的數據報文。

項目地址: https://github.com/tokio-rs/tokio

優勢

a. 快速: Tokio 的設計旨在使應用程序儘可能都快。

b. 零成本抽象: Tokio 以 Future 爲基礎。雖然 Future 並非 Rust 獨創,但與其他語言的 Future 不同的是,Tokio Future 編譯成了狀態機,用 Future 實現常見的同步,不會增加額外開銷成本。Tokio 的非阻塞 IO 可以充分發揮系統優勢,例如實現類似 Linux Epoll 這種多路複用技術,在單個線程上的多路複用允許套接字並批量接收操作系統消息,從而減少系統調用,所有這些都可以減少應用程序的開銷。

c. 可靠: Rust 的所有權模型和類型系統可以實現系統級應用程序,而不必擔心內存不安全。

d. 輕量級: 沒有垃圾收集器,因爲 Tokio 是基於 Rust 構建的,所以編譯後的可執行文件包含最少的語言運行時。這意味着,沒有垃圾收集器,沒有虛擬機,沒有 JIT 編譯,也沒有堆棧操作。這樣在編寫多線程併發的系統時,能夠有效避免阻塞。

e. 模塊化: 每個組件都位於一個單獨的庫中。如果需要,應用程序可以挑選所需的組件,避免依賴不需要的組件。

代碼實現

Rust 中數據包處理

#[derive(Debug)]
pub struct Packet {
    pub sequenceu8,
    pub connBufStream<LocalStream>,
    pub header[u8; 4],
}

以上爲 Proxy 數據包處理邏輯中核心的結構體,結構體中包含了三個字段分別爲:

在包處理邏輯中主要定義了以下函數,在整個代理服務中網絡數據交換都由以下方法來完成。

Pisa-Proxy 作爲服務端

pub struct Connection {
    saltVec<u8>,
    statusu16,
    collationCollationId,
    capabilityu32,
    connection_idu32,
    _charsetString,
    userString,
    passwordString,
    auth_dataBytesMut,
    pub auth_plugin_nameString,
    pub dbString,
    pub affected_rowsi64,
    pub pktPacket,
}

上面的結構體描述了 Pisa- Proxy 作爲服務端處理來自於客戶端請求時所包含的字段。例如,其中包含了和 MySQL 客戶端進行認證信息,和所包含數據包處理邏輯的 Packet。

Pisa-Proxy 作爲客戶端

#[derive(Debug, Default)]
pub struct ClientConn {
    pub framedOption<Box<ClientCodec>>,
    pub auth_infoOption<ClientAuth>,
    userString,
    passwordString,
    endpointString,
}

Tokio 提供的編解碼器

在 Pisa-Proxy 中,大量使用了 Tokio 工具包中的編解碼器,使用 codec Rust 會自動幫助開發者將原始字節轉化爲 Rust 的數據類型,方便開發者處理數據。使用編解碼器,只需要在代碼中爲定義好的類型實現 Decoder 和 Encoder 兩個 Trait,就可以通過 stream 和 sink 進行數據的讀寫。下面通過一個簡單的示例來看一下 Tokio 編解碼器的使用步驟。

使用 Tokio 編解碼器一共分爲三步:

  1. 首先要自定義一個錯誤類型,這裏定義了一個 ProtocolError,併爲它實現一個 from 方法,能夠讓它接收錯誤處理。
pub enum ProtocolError {
    Io(io::Error),
}

impl From<io::Error> for ProtocolError {
    fn from(errio::Error) -> Self {
        ProtocolError::Io(err)
    }
}
  1. 定義一個數據類型,這裏我們聲明一個 Message 爲 String,然後定義一個結構體,也就是我們要解析的是原始字節流要實際轉換成的結構體,也即 Tokio 中 framed(幀的概念),這裏定義一個空結構體 PisaProxy;
type Message = String;
struct PisaProxy;

接下來就是爲 PisaProxy 分別實現 Encoder 和 Decoder 這兩個 Trait。這裏的示例,實現的功能爲將數據轉爲 byte 數組,追加到 buf 中。在編碼器中,我們首先要指定 Item 類型爲 Message 和錯誤類型,編碼處理邏輯這裏將字符串進行拼接並返回給客戶端。

在這裏,Encoder 編碼是指將用戶自定義類型轉換成 BytesMut 類型,寫到 TcpStream 中,Decoder 解碼指將讀到的字節數據序列化爲 Rust 的結構體。

impl Encoder<Message> for PisaProxy {
    type Error = ProtocolError;

    fn encode(&mut self, itemMessage, dst&mut BytesMut) ->Result<(), Self::Error> {
        dst.extend(item.as_bytes());
        Ok(())
    }
}

impl Decoder for PisaProxy {
    type Item = Message;
    type Error = ProtocolError;

    fn decode(&mut self, src&mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.is_empty() {
            return Ok(None);
        }
        let data = src.split();
        let mut buf = BytesMut::from(&b"hello:"[..]);
        buf.extend_from_slice(&data[..]);
        let data = String::from_utf8_lossy(&buf[..]).to_string();

        Ok(Some(data))
    }
}
  1. 當實例化 PisaProxy 結構體後,就可以調用 framed 方法,codec 的 framed 方法(codec.framed(socket))將 TcpStream 轉換爲 Framed<TcpStream,PisaProxy>,這個 Framed 就是實現了 tokio 中的 Stream 和 Sink 這兩個 trait,實現的這兩個 Trait 的實例就具有了接收(通過 Stream)和發送(通過 Sink)數據的功能,這樣我們就可以調用 send 方法發送數據了。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "127.0.0.1:9088";
    let listener = TcpListener::bind(addr).await?;
    println!("listen on: {:?}", addr);

    loop {
        let (socket, addr) = listener.accept().await?;

        println!("accepted connect from: {}", addr);

        tokio::spawn(async move {
            let codec = PisaProxy {};
            let mut conn = codec.framed(socket);
            loop {
                match conn.next().await {
                    Some(Ok(None)) => println("waiting for data..."),
                    Some(Ok(data)) => {
                        println!("data {:?}", data);
                        conn.send(data).await;
                    },
                    Some(Err(e)) => {
                    },
                    None => {},
                }
            }
        });
    }
}

MySQL 協議在 Pisa-Proxy 中的實現

MySQL 協議簡介

MySQL 數據庫本身是一個很典型的 C/S 結構的服務,客戶端和服務端通信可以通過 Tcp 和 Unix Socket 的方式進行交互。在本篇中,我們主要說明通過網絡 Tcp 的方式來實現 MySQL 代理。

MySQL 客戶端和服務端的交互主要包含了兩個重要的過程:1. 握手認證,2. 發送命令。本篇會主要圍繞這兩個過程來介紹相關的實現。在客戶端和服務端交互的過程中主要包含了以下幾種類型報文:數據包、數據結束包、成功報告包以及錯誤消息包,在後面的章節中會爲大家詳細介紹這幾種報文。

交互過程

MySQL 客戶端和服務端在交互的過程中主要包含了兩個過程,即握手認證階段和執行命令階段,當然在這兩個過程之前首先要經歷 TCP 三次握手的過程。在三次握手結束後首先進入握手認證階段,在交換完信息並且客戶端正確登錄服務端後,進入執行命令階段,圖 3 完整描述了整個交互過程。

代碼鏈接:https://github.com/database-mesh/pisanix/blob/master/pisa-proxy/protocol/mysql/src/server/conn.rs

圖 3

握手認證

在握手認證階段是 MySQL 客戶端和服務端建聯非常重要的階段,該階段發生在 TCP 三次握手之後。首先服務端會給客戶端發送服務端信息,其中包括協議版本號、服務版本信息、挑戰隨機數、權能標誌位等等。當客戶端接收到服務端發來的響應之後,客戶端開始發起認證請求。認證請求中,會攜帶客戶端用戶名、數據庫名以及通過服務端響應中的挑戰隨機數將客戶端密碼加密後,一同發送給服務端進行校驗。校驗過程中,除了會對用戶名密碼進行校驗,還會匹配客戶端所使用的認證插件,如果不匹配則會發生插件的自動切換,以及判斷客戶端是否使用了加密鏈接。當以上階段全部正常完成後,客戶端則登錄成功,服務端返回客戶端 OK 數據報文。

上述過程分別在 runtime 和 protocol 的 server 中實現。在 runtime 中的 start 函數等待請求進入,Tcp 三次握手結束後,從 handshake 函數如圖 3,開始握手階段。在 handshake 中分別包含了三個過程,首先由 write_initial_handshake 給客戶端發送服務端信息,然後在 read_handshake_response 客戶端拿着服務端信息開始認證請求。

pub async fn handshake(&mut self) -> Result<(), ProtocolError> {
        match self.write_initial_handshake().await {
            Err(err) => return Err(err::ProtocolError::Io(err)),
            Ok(_) => debug!("it is ok"),
        }

        match self.read_handshake_response().await {
            Err(err) => {
                return Err(err);
            }
            Ok(_) => {
                self.pkt.write_ok().await?;
                debug!("handshake response ok")
            }
        }

        self.pkt.sequence = 0;

        Ok(())
    }

執行命令

當握手和認證階段完成後,此時客戶端纔算是真正意義上的與服務端完成建立鏈接。那麼此時則進入執行命令階段。在 MySQL 中,能夠發送命令的指令類型有很多種,我們會在下文中爲大家進行介紹。

代碼鏈接: https://github.com/database-mesh/pisanix/blob/master/pisa-proxy/runtime/mysql/src/server/server.rs.

在下面的代碼中可以看到,Pisa-Proxy 在這裏會對不同類型的指令進行不同的邏輯處理。例如,初始化 db 的處理邏輯爲 handle_init_db,處理查詢的邏輯爲 handle_query。

  match cmd {
            COM_INIT_DB => self.handle_init_db(&payload, true).await,
            COM_QUERY => self.handle_query(&payload).await,
            COM_FIELD_LIST => self.handle_field_list(&payload).await,
            COM_QUIT => self.handle_quit().await,
            COM_PING => self.handle_ok().await,
            COM_STMT_PREPARE => self.handle_prepare(&payload).await,
            COM_STMT_EXECUTE => self.handle_execute(&payload).await,
            COM_STMT_CLOSE => self.handle_stmt_close(&payload).await,
            COM_STMT_RESET => self.handle_ok().await,
            _ => self.handle_err(format!("command {} not support", cmd)).await,
}

MySQL 協議基本數據類型

在 MySQL 協議中主要有以下幾種數據類型:

MySQL 報文中整型值分別有 1、2、3、4、8 字節長度,使用小端傳輸。

字符串長度不固定,當遇到'NULL'(0x00)字符時結束。

參考鏈接:https://dev.mysql.com/doc/internals/en/basic-types.html

報文結構

在 MySQL 客戶端和服務端所交互的數據最大長度爲 16MByte,其基本數據報文結構類型如下:

圖 4

圖 5

在圖 4 和圖 5 中描述了 MySQL 報文的基本結構。報文包括了兩部分,消息頭和消息體。其中在消息頭中,3 字節表示數據報文長度,1 字節存儲序列號,消息體中存儲實際的報文數據。

消息頭

用於標記當前請求消息的實際數據長度值,以字節爲單位,佔用 3 個字節,最大值爲 0xFFFFFF,即接 2^24-1。

序列號

序列 ID 從 0 開始隨每個數據包遞增,並且當進入新的命令時重置爲 0。序列號 ID 有可能會發生迴繞,當發生迴繞時,需將序列號 ID 重置爲 0,並且重新開始計數遞增。

報文數據

消息體用於存放請求的內容及響應的數據,長度由消息頭中的長度值決定。

客戶端請求命令報文

該指令用於標識客戶所要執行命令的類型,以字節爲單位佔用 1 個字節。請求命令報文常用到的有 Text 文本協議和 Binary 二進制協議。這裏可以參考【執行命令】中的代碼,代碼中描述了對不同指令的處理邏輯。

在文本協議中常用到的有以下指令:

etBKcY

更多請參考:https://dev.mysql.com/doc/internals/en/text-protocol.html

在二進制協議,即 Prepare Statement 中常用到的有以下指令:

BiKy2y

更多請參考:https://dev.mysql.com/doc/internals/en/prepared-statements.html

響應報文

......

例如,下面代碼中展示瞭如何給客戶端寫 OK 和 EOF 報文。

  #[inline]
pub async fn write_ok(&mut self) -> Result<(), Error> {
    let mut data = [7, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0];
    self.set_seq_id(&mut data);
    self.write_buf(&data).await
}
    
#[inline]
pub async fn write_eof(&mut self) -> Result<(), Error> {
    let mut eof = [5, 0, 0, 0, 0xfe, 0, 0, 2, 0];
    self.set_seq_id(&mut eof);
    self.write_buf(&eof).await
}

com_query 請求流程

如圖 6 爲 com_query 的請求流程,com_query 指令可能會返回以下結果集:

在(https://github.com/database-mesh/pisanix/blob/master/pisa-proxy/protocol/mysql/src/client/resultset.rs)文件中主要爲對 ResultSet 結果集的處理,可以看到在這裏定義了 ResultSet 結構,同樣對 ResultSetCodec 分別實現了 Encoder 和 Decoder 兩個 Trait,這樣就可以通過編解碼器來處理 ResultSet 的報文。

  #[derive(Debug, Default)]
pub struct ResultsetCodec {
    pub next_stateDecodeResultsetState,
    pub colu64,
    pub is_binarybool,
    pub sequ8,
    pub auth_infoOption<ClientAuth>,
}

圖 6

總結

以上,就是本篇文章的全部內容。在本篇文章中我們介紹了使用 Rust 實現 MySQL 代理的動機,介紹了 MySQL 協議中一些常用到的概念和 MySQL 中數據報文在網絡中是如何進行交換數據的;並在最後介紹如何使用 Rust 去快速實現一個 MySQL 代理服務。更多實現細節請關注 Pisanix 代碼倉庫。

點擊閱讀原文鏈接可查看相關教學視頻。

相關鏈接

Pisanix

項目地址:https://github.com/database-mesh/pisanix

官網地址:https://www.pisanix.io/

Database Mesh:https://www.database-mesh.io/

Mini-Proxy:一個最小化的 MySQL Rust 代理實現

項目地址:https://github.com/wbtlb/mini-proxy

社區

目前 Pisanix 社區每兩週都會組織線上討論,詳細安排如下,歡迎各位小夥伴一起參與進來。

作者介紹

王波,SphereEx MeshLab 研發工程師,目前專注於 Database Mesh,Cloud Native 研發。Linux、llvm、yacc、ebpf user, Gopher & Rustacean and c bug hunter。

GitHub:https://github.com/wbtlb

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/0mnr17j2mt4bHVmL2uIAwQ