Go 實現 PostgreSQL 前後端協議

1 PG 前後端協議

本文基於 3.0 版本協議,主要以圖示方式更爲形象的描述其中比較重要的鏈接、查詢等協議,詳盡的文字說明可參考官方文檔《Chapter 53. Frontend/Backend Protocol》(https://www.postgresql.org/docs/current/protocol.html) 或中文文檔《第 53 章 前端 / 後端協議》(http://www.postgres.cn/docs/14/protocol.html)。

1.1 整體流程視圖

總體流程

協議大致可以分爲 startup、normal 兩個階段,startup 階段即建立鏈接階段,包含 ssl、認證、鏈接參數等;normal 階段即爲發起 SQL 查詢等的階段。

1.2 Startup 階段

startup 階段流程

(1)一般客戶端在建立鏈接時會先詢問服務端是否開啓 SSL 加密,若開啓則服務器回覆'S',客戶端則進行 SSL 握手等,若不開啓則回覆'N'。

(2)客戶端向服務器發送 StartupMessage 啓動包,服務端判斷是否需要認證,若需要則發送 AuthenticationRequest 信息,若不需要認證或已認證則可發送 AuthenticationOk,認證失敗響應 ErrorResponse。

PG 支持多種方式認證如:AuthenticationKerberosV5、AuthenticationCleartextPassword、AuthenticationMD5Password 等,後面示例將採用常見的 AuthenticationMD5Password 方式進行演示。

(3)認證通過後服務端一般會發送三條消息:ParameterStatus 如版本號、編碼格式等;BackendKeyData 包括當前鏈接進程 id、取消鏈接的密鑰;ReadForQuery 客戶端收到這條消息後表示啓動成功,前端現在可以發出命令到服務端。

(4)消息報文格式

啓動消息報文格式

如上圖,啓動報文格式一般包含消息長度 4 個字節(包含自身)、協議類型編號 4 個字節、參數鍵值對等。

1.3 normal 階段

此部分主要講解兩種查詢協議 Simple Query、 Extended Query,其他協議先不做講解,感興趣可查閱文檔學習。

此兩種協議消息報文格式一般爲:

Query 消息協議報文

1.3.1 Simple Query 協議

Simple Query 流程

如上圖:

(1)客戶端向服務端發起 Query 請求;

(2)服務端收到後立即處理,並將結果分三種類型消息返回:

RowDescription:表字段信息描述,包括字段名、字段類型等信息;

DataRow:數據信息,每條信息僅包含一行數據,如查詢數據有 10 條,則會發送 10 條 DataRow 類型消息;

CommandComplete:表示本次查詢完成,並返回處理了多少條數據,如 select 到 10 條數據,消息體爲 SELECT 10;

(3)服務端向客戶端發送 ReadyForQuery,表示可以接收下一條命令;該消息類型中包含三種消息類型標識:
'I':表示操作不在事物內;
'T':表示操作處在事物中;
'E':在失敗的事物中;

(4)消息報文示例

Simple Query 消息報文

1.3.2 Extended Query 協議

相比於 Simple Query,使用 Extended Query 時,發送請求的過程被分爲若干步驟,準備步驟的結果可以被多次複用以提高效率;另外,還可以獲得額外的特性, 比如可以把數據值作爲獨立的參數提供而不是必須把它們直接插入一個查詢字符串。通常包括 Parse、Describe、Bind 和 Execute 等。

交互流程如下圖,圖示已經比較清晰,不在文字贅述。

Extended Query 流程

消息報文格式

各階段消息報文格式示意

以上基本把鏈接、兩種類型查詢協議講解完畢,主要以流程圖示方式直觀展示,還是那句話詳盡的文字描述及理論性概念請閱讀官方文檔,不必在此贅述。圖示部分參考《Postgres for the wire》。

2 Go 實現示例

Talk is cheap. Show me the code.

相較於理論學習,實操能幫助大家更好的理解協議交互過程及方式,印象更加深刻。

pgproto3 是從開源 Go 版本 PG 驅動 pgx 中抽取的協議層庫,站在巨人的肩膀,我們可以很容易的實現整個通信過程。

說明:demo 實現服務端處理邏輯,demo 中許多異常處理、代碼規範、函數封裝等都不完善,僅作爲簡單的演示流程,實際工程代碼遠比此複雜,考慮的場景也更爲豐富。我們分別使用 psql、pgx 驅動作爲客戶端鏈接測試。

2.1 startup 階段演示

首先開啓一個服務端 tcp 監聽端口,並循環接收客戶端鏈接,代碼大致如下:

package main

import (
    "bufio"
    "fmt"
    "log"
    "net"
)
func main() {
    //pg 協議是在TCP/IP和Unix 域套接字上實現的,因此先開啓一個tcp端口 監聽客戶端請求
    ln, err := net.Listen("tcp""127.0.0.1:5432")
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Listening on", ln.Addr())

    for {
        c := new(ClientConn)
        conn, err := ln.Accept()
        if err != nil {
            log.Fatal(err)
        }
        log.Println("Accepted connection from", conn.RemoteAddr())

        go func() {
            tcpConn := conn.(*net.TCPConn)
            c.c = tcpConn
            //初始化reader
            c.rb = bufio.NewReaderSize(c.c, 8 * 1024)
            c.startup()
        }()
    }
}

ClientConn 結構體簡單的定義爲:

type ClientConn struct {
    rb *bufio.Reader
    c net.Conn
}

下面分析啓動 startup() 流程代碼,首先定義兩個接收消息的處理方法:一個判斷 startup message 消息類型的方法,返回消息類別;一個處理一般格式的消息,返回數組:

//讀取startup消息結構體
func (c *ClientConn) receiveStartupMessage() (pgproto3.FrontendMessage, error) {
    header := make([]byte, 4)
    if _, err := io.ReadFull(c.rb, header); err != nil {
        return nil, err
    }
    //根據消息報文格式可知 前4位爲消息體的長度
    msgLen := int(binary.BigEndian.Uint32(header) - 4)
    msg := make([]byte, msgLen)
    //獲取消息編碼
    if _, err := io.ReadFull(c.rb, msg); err != nil {
        return nil, err
    }
    code := binary.BigEndian.Uint32(msg)
    switch code {
    //ProtocolVersionNumber
    case 196608:
        startMessage := &pgproto3.StartupMessage{}
        if err := startMessage.Decode(msg); err != nil {
            return nil, err
        }
        return startMessage, nil
    //sslRequestNumber 詢問是否爲ssl協議類型
    case 80877103:
        sslRequest := &pgproto3.SSLRequest{}
        if err := sslRequest.Decode(msg); err != nil {
            return nil, err
        }
        return sslRequest, nil
    //cancelRequestCode 取消協議類型
    case 80877102:
        cancelRequest := &pgproto3.CancelRequest{}
        if err := cancelRequest.Decode(msg); err != nil {
            return nil, err
        }
        return cancelRequest, nil
    default:
        log.Fatal("unknown startup message")
        return nil, errors.New("unknown startup message")
    }
}

// 讀取一般格式協議報文
func (c *ClientConn) readNormalMsg() ([]byte, error) {
    msgType := make([]byte, 1)
    if _, err := io.ReadFull(c.rb, msgType); err != nil {
        return nil, err
    }
    // 後面四個字節爲長度,包括自己
    msgLength := make([]byte, 4)

    if _, err := io.ReadFull(c.rb, msgLength); err != nil {
        return nil, err
    }

    msgLen := binary.BigEndian.Uint32(msgLength)

    // 獲取請求的具體信息
    msg := make([]byte, msgLen-4)

    if _, err := io.ReadFull(c.rb, msg); err != nil {
        return nil, err
    }

    data := append(msgType, msg...)
    return data, nil
}

然後定義一個處理 startup message 的方法:

func (c *ClientConn) handleStartupMessage(startupMessage *pgproto3.StartupMessage) error {

    // 可以從客戶端獲取用戶名和數據庫名稱等信息
    username := startupMessage.Parameters["user"]
    log.Println("username: ",username)
    log.Println("database: ",startupMessage.Parameters["database"])

    // 進行用戶驗證
    auth := make([]byte, 0)
    // 發送一個 authRequest,
    // 如果是AuthenticationCleartextPassword{} 接收到的auth爲明文密碼
    // 這裏使用 MD5 加密要求; 前端必須返回一個 MD5 加密的密碼進行驗證
    // salt 爲隨機生成的 4 個字節,這裏我們寫死幾個數
    salt := [4]byte{109,65,109,65}
    authRequest := pgproto3.AuthenticationMD5Password{Salt: salt}
    n,err :=c.c.Write(authRequest.Encode(nil))
    fmt.Println(n, err)

    // 讀取客戶端發來的密碼
    // 格式: 'p' + len + 'password' + '0'
    // 長度 = len + password + 1
    auth, err = c.readNormalMsg()
    //psql中會先斷開鏈接,輸入密碼後再重新建立鏈接,因此會出現EOF讀取錯誤
        //正常退出即可,等待下一次鏈接進行處理即可
    if err !=nil {
        return err
    }

    if auth[0] != 'p' {
        return errors.New("received is not a password packet" + string(auth[0]))
    }

    // 去掉第一個 'p' 和最後一個結束符,中間的爲認證信息
    auth = auth[1 : len(auth)-1]
    log.Println("客戶端收到md5密碼: ", auth)

    //假定我們服務端密碼爲 password
    pwd := "password"

    //構造加密後的密碼
    //客戶端首先將用戶輸入的密碼進行一次MD5加密,其中用戶名作爲salt
    //然後將服務器發送過來的4位的隨機數md5Salt作爲salt再進行一次MD5加密,
    //並將結果作爲認證信息再次發送給服務器端。
    res := "md5" + fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%x", md5.Sum([]byte(pwd + username))) +
        string([]byte{109,65,109,65}))))

    if res!=string(auth){
        //返回給客戶端對應錯誤信息 這裏就不處理了
        log.Println("密碼認證失敗")
    }

    //認證成功 給客戶端寫回成功信息
    authOK := &pgproto3.AuthenticationOk{}
    c.c.Write(authOK.Encode(nil))

    //寫回服務端部分參數信息
    parameters := map[string]string{
        "client_encoding":   "UTF8",
        "DateStyle":         "ISO, YMD",
        "server_version":    "version: PostgreSQL 14.2",
    }

    // 發送 ParameterStatus
    for k, v := range parameters {
        parameterStatus := &pgproto3.ParameterStatus{Name: k, Value: v}
        c.c.Write(parameterStatus.Encode(nil))
    }

    // 發送 ReadyForQuery 表示一切準備就緒。"I"表示空閒(沒有在事務中)
    c.writeReadyForQuery('I')
    return nil
}

//發送ReadyForQuery消息
func (c *ClientConn) writeReadyForQuery(status byte) {

    readyForQuery := &pgproto3.ReadyForQuery{TxStatus: status}
    c.c.Write(readyForQuery.Encode(nil))
}

startup() 處理方法:

func  (c *ClientConn) startup() {

    m, err := c.receiveStartupMessage()
    if err != nil {
        log.Fatal(err)
        return
    }

    switch m.(type) {
    case *pgproto3.CancelRequest:
        c.c.Close()
        break
    case *pgproto3.SSLRequest:
        //直接模擬不支持ssl類型 寫N
        data := []byte{'N'}
        c.c.Write(data)
        // 完成 SSL 確認後需要正式接收 StartupMessage
        m, err := c.receiveStartupMessage()
        if err != nil {
            return
        }
        msg, ok := m.(*pgproto3.StartupMessage)
        // 如果接收到的包不爲啓動包則報錯
        if !ok {
            return
        }

        // 接收完 SSLRequest 包後接收 StartupMessage
        c.handleStartupMessage(msg)
    case *pgproto3.StartupMessage:
        //處理鏈接請求
        c.handleStartupMessage(m.(*pgproto3.StartupMessage))
    default:
        log.Fatal("received is not a expected packet")
        break
    }

}

以上即是啓動過程的整體代碼,下面我們啓動驗證整個啓動過程。

debug 調試

服務端啓動輸出如下日誌,等待客戶端鏈接:

2022/04/04 21:31:35 Listening on 127.0.0.1:5432

shell 中輸入鏈接命令:

 psql -h 127.0.0.1 -U root -p5432  -d testdb

首先看下客戶端發送的消息體:

SSLRequest 階段協議調試

收到的第一個消息總長度爲 8,消息編碼內容長度爲 4,並且確實爲 SSLRequest 類型消息。

StartupMessage 調試

當寫回'N'後收到客戶端發來的 StartupMessage。

認證階段調試

可以從啓動包獲取用戶名等信息,並且寫回需要 md5 方式認證,此時需要說明一點:psql 鏈接端會斷開此鏈接,等用戶輸入密碼後重新建立鏈接,因此會收到一條 EOF 消息,正常退出循環即可,等待下次鏈接接入。

此時客戶端需要用戶輸入密碼,輸入代碼內置的密碼 password:

客戶端輸入密碼

此時服務端收到開頭爲'p'的消息,表示密碼驗證,最後一位爲結束符:

收到 md5 加密密碼信息

服務端依據 md5 規則計算預置密碼的密文,並做比較,注意計算時涉及兩次 md5 加密,分別以用戶名和服務端加鹽字段拼接,詳見如下:

密碼計算驗證

之後服務端向客戶端發送 ParameterStatus、ReadyForQuery 消息,完成 psql 與服務端建立鏈接。(這裏沒發送 BackendKeyData 消息)

此時 psql 顯示可以正常執行命令:

鏈接建立完成

2.2 Simple Query 階段演示

首先實現一個循環處理消息的方法,目前僅處理 Simple Query 類型的'Q' 消息,然後直接定義返回數據消息,演示消息返回:

func (c *ClientConn) run() {
    for {
        data, err := c.readNormalMsg()
        if err != nil {
            return
        }
        //處理數據
        cmd := data[0]
        data = data[1:]
        switch cmd {
        //http://www.postgres.cn/docs/12/protocol-message-formats.html
        //在簡單查詢模式中,檢索出來的值的格式總是文本,除非給出的命令是在一個使用BINARY選項聲明的遊標上FETCH。
        case 'Q': /* simple query */
            if len(data) > 0 && data[len(data)-1] == 0 {
                data = data[:len(data)-1]
                dataStr := hack.String(data)
            }
            log.Println("獲取的客戶端查詢語句:", dataStr)
            //mock字段描述
            buf := (&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
                {
                    Name:                 []byte("name"),//字段名稱
                    TableOID:             0,
                    TableAttributeNumber: 0,
                    DataTypeOID:          25,//字段數據類型
                    DataTypeSize:         -1,
                    TypeModifier:         -1,
                    Format:               0,//0表示text 1表示二進制格式
                },
            }}).Encode(nil)
            //mock結果返回
            buf = (&pgproto3.DataRow{Values: [][]byte{[]byte("tom")}}).Encode(buf)
            buf = (&pgproto3.CommandComplete{CommandTag: []byte("SELECT 1")}).Encode(buf)
            buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
            c.c.Write(buf)
        default:
            log.Fatal("command %d not supported now", cmd)
        }
    }
}

並把方法加入 main 處理流程:

......
go func() {
            tcpConn := conn.(*net.TCPConn)
            c.c = tcpConn
            //初始化reader
            c.rb = bufio.NewReaderSize(c.c, 8 * 1024)
            c.startup()
                        //添加命令處理方法
            c.run()
        }()
......

debug 調試

客戶端發起模擬查詢命令:

模擬查詢

服務端接收消息並解析內容,處理過程類似密碼接收,不再贅述,可以從消息體總獲取客戶端的命令,下面模擬服務端數據返回:

接收消息

分別模擬返回 RowDescription、DataRow、CommandComplete 以及 ReadyForQuery 消息,爲了方便消息體內容均寫死返回;可以看到發送的字節均是按照固定消息協議格式發送:

數據消息發送

寫回客戶端後展示如下:

客戶端展示如下

2.2 Extended Query 階段演示

psql 默認採用 Simple Query 方式執行命令,而 pgx 驅動默認使用 Extended Query 方式執行,因此選用 pgx 作爲擴展查詢的測試客戶端。

首先增加處理 Parse、Describe、Bind、Execute、Sync 消息協議代碼,同樣爲便於演示將返回信息寫死:

        case 'P': /* parse */
            parse := &pgproto3.Parse{}
            parse.Decode(data)
            parseComplete := &pgproto3.ParseComplete{}
            c.c.Write(parseComplete.Encode(nil))
        case 'B': /* bind */
            bind := &pgproto3.Bind{}
            bind.Decode(data)
            bindComplete := &pgproto3.BindComplete{}
            c.c.Write(bindComplete.Encode(nil))
        case 'E': /* execute */
            execute := &pgproto3.Execute{}
            execute.Decode(data)
            buf := (&pgproto3.DataRow{Values: [][]byte{[]byte("tom")}}).Encode(nil)
            buf = (&pgproto3.CommandComplete{CommandTag: []byte("SELECT 1")}).Encode(buf)
            c.c.Write(buf)
        case 'C': /* close */
            close := &pgproto3.Close{}
            close.Decode(data)
            c.c.Close()
        case 'D': /* describe */
            desc := &pgproto3.Describe{}
            desc.Decode(data)
            parameterDes := &pgproto3.ParameterDescription{}
            c.c.Write(parameterDes.Encode(nil))
            buf := (&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
                {
                    Name:                 []byte("name"),//字段名稱
                    TableOID:             0,
                    TableAttributeNumber: 0,
                    DataTypeOID:          25,//字段數據類型
                    DataTypeSize:         -1,
                    TypeModifier:         -1,
                    Format:               0,//0表示text 1表示二進制格式
                },
            }}).Encode(nil)
            c.c.Write(buf)
        case 'S': /* sync */
            sync := &pgproto3.Sync{}
            sync.Decode(data)
            c.writeReadyForQuery('I')

pgx 鏈接查詢代碼:

package main

import (
    "context"
    "fmt"
    "github.com/jackc/pgx/v4"
    "os"
)


func main() {
    /** pg數據庫鏈接測試**/
    urlExample := "postgres://root:password@127.0.0.1:5432/testdb"
    conn, err := pgx.Connect(context.Background(), urlExample)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
        os.Exit(1)
    }
    defer conn.Close(context.Background())

    var name string
    err = conn.QueryRow(context.Background()"select * from test;").Scan(&name)
    if err != nil {
        fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
        os.Exit(1)
    }
    fmt.Println(name)
}

debug 調試

startup 階段與 psql 大同小異,此處略過,直接進行擴展查詢協議調試。

打入斷點,運行 pgx 鏈接查詢代碼,首先進入 Parse 階段,因代碼查詢中沒有指定變量參數,所以收到的 parse 結構體信息爲空字符,同時寫回 ParseComplete 消息:

Parse 階段

接着第一次進入 Describe 階段,寫回 ParameterDescription、RowDescription,因沒有參數信息,這裏使用一個空的 ParameterDescription:

第一次 Describe 階段

然後進入第一次 Sync 階段,寫回 ReadyForQuery:

第一次 Sync 階段

然後進入 Bind 階段,按照名稱綁定查詢方法,寫回 BindComplete 消息:

Bind 階段

然後進入第二次 Describe 階段,此時寫回 RowDescription 即可,demo 中未做區分因此也寫回了兩種消息:

第二次 Describe 階段

然後進入 Execute 階段,因 demo 寫回的都是空數據,所以此處收到的 Portal 參數爲空,正常爲綁定的名稱;此階段直接寫回數據,DataRow 以及 CommandComplete;如下:

Execute 階段

最後進入第二次 Sync 階段,同樣寫回 ReadyForQuery,告訴客戶端可以執行其他命令:

第二次 Sync 階段

回到 pgx 鏈接代碼,此時獲取到服務端返回數據 name = "tom",至此整個擴展查詢流程結束:

pgx 客戶端收到查詢結果

3 總結

全文以圖示及 demo 調試的方式詳細闡述了 PG 前後端通信協議過程,主要包含兩個查詢子協議,希望對理解協議本身有所幫助;其餘協議過程後續有需要再補充~

轉自:

zhuanlan.zhihu.com/p/493045524

Go 開發大全

參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YXV6ta2oZnYQUzSGtPCKBg