Golang 從零到一開發實現 RPC 框架

內容提要

RPC 框架是分佈式領域核心組件,也是微服務的基礎。今天嘗試從零擼一個 RPC 框架,剖析其核心原理及代碼實現,後續還會逐步迭代追加微服務治理等功能,將之前文章覆蓋的熔斷、限流、負載均衡、註冊發現等功能融合進來,打造一個五臟俱全的 RPC 框架。本文主要內容包括:

實現原理

RPC (Remote Procedure Call)全稱是遠程過程調用,相對於本地方法調用,在同一內存空間可以直接通過方法棧實現調用,遠程調用則跨了不同的服務終端,並不能直接調用。

RPC 框架 要解決的就是遠程方法調用的問題,並且實現調用遠程服務像調用本地服務一樣簡單,框架內部封裝實現了網絡調用的細節(透明化遠程調用),其核心過程原理如下圖所示。

這個版本可以稱爲 “P2P RPC” ,而生產環境部署往往會將服務提供者(Server)部署多個實例(集羣部署),那麼客戶端就需要具備發現服務端的能力和負載均衡的支持,所以有了服務註冊發現和負載均衡。

再然後,爲了保障 RPC 調用的可靠性和穩定性,增加了服務監控和服務容錯治理的能力,考慮性能提升的異步化能力以及考慮可擴展性的插件化管理,這些完善構成了更完整的微服務 RPC 框架。

RPC 協議實現

協議設計

協議設計算是 RPC 最重要的一部分了,它主要解決服務端與客戶端通信的問題。一般來說通訊要解決如下問題:

**

  1. 網絡傳輸協議**
    基於 TCP、UDP 還是 HTTP,UDP 要自己解決可靠性傳輸問題,而 HTTP 又太重,包含很多沒必要的頭信息,所以一般 RPC 框架會優先選擇 TCP 協議。

(當然也有大名鼎鼎的 gRPC 基於 HTTP2)

2. 序列化協議

網絡傳輸數據必須是二進制數據,而執行過程是編程語言的對象方法,那麼就涉及到如何將對象序列化成可傳輸消息(二進制),並可反序列化還原。常見的通用型協議如 XML、 JSON、Protobuf、Thrift 等,也有語言綁定的如 Python 原生支持的 pickle 協議, Java 實現的 Serializbale 接口及 Hessian 協議,Golang 原生支持的 Gob 協議等。

3. 消息編碼協議

它是一種客戶端和服務端的調用約定,比如請求和參數如何組織,Header 放置什麼內容。這部分每個框架設計均不同,有時也稱這一層爲狹義的 RPC 協議層。

另外客戶端發起調用一般來說要知道調用的具體類方法(請求標識符)以及入參(Payload),而網絡傳輸的是二進制字節流,如何能從這些字節中找出哪些是方法名,哪些是參數?進一步如果客戶端不斷的發送消息,如何將每一條消息分割?(解決 TCP 粘包問題)

採用定長消息很容易解決,但事先並不能確定要固定多長,所以這種方式並不可行。消息加分隔符可以實現,但要確保分隔符不會與正文衝突。而最常用的實現方案就是用定長的頭標識出不定長的體,比如用 int32 (定長 4 字節)標識後面的內容長度,這樣就能較優雅實現消息分割了。

(注:這個方案中如果消息體的長度大於 2^32 會發生溢出而導致解析失敗,可以換更長類型,但理論上總會有溢出風險,設計使用時應該限制避免傳輸過大數據體)

協議實現

網絡傳輸協議,這裏使用 TCP 協議即可,沒有太多爭議,可預留接口支持。

序列化協議,這裏使用 Golang 專有的 Gob 協議,保留接口後期可以擴展支持 JSON、Protobuf 等協議。

type Codec interface {
    Encode(i interface{}) ([]byte, error)
    Decode(data []byte, i interface{}) error
}
type GobCodec struct{}
func (c GobCodec) Encode(i interface{}) ([]byte, error) {
    var buffer bytes.Buffer
    encoder := gob.NewEncoder(&buffer)
    if err := encoder.Encode(i); err != nil {
        return nil, err 
    }   
    return buffer.Bytes(), nil 
}
func (c GobCodec) Decode(data []byte, i interface{}) error {
    buffer := bytes.NewBuffer(data)
    decoder := gob.NewDecoder(buffer)
    return decoder.Decode(i)
}

codec/codec.go

RPC 消息格式編碼設計如下,協議消息頭定義定長 5 字節(byte),依次放置魔術數(用於校驗),協議版本,消息類型(區分請求 / 響應),壓縮類型,序列化協議類型,每個佔 1 個字節(8 個 bit)。可擴展追加 消息 ID 以及 元數據 等信息用於做服務治理。

const (
    HEADER_LEN = 5
)
const (
    magicNumber byte = 0x06
)
type MsgType byte
const (
    Request MsgType = iota
    Response
)
type CompressType byte
const (
    None CompressType = iota
    Gzip
)
type SerializeType byte
const (
    Gob SerializeType = iota
    JSON
)
type Header [HEADER_LEN]byte
func (h *Header) CheckMagicNumber() bool {
    return h[0] == magicNumber
}
func (h *Header) Version() byte {
    return h[1]
}
func (h *Header) SetVersion(version byte) {
    h[1] = version
}
//省略 MsgType,CompressType,SerializeType

protocol/header.go

定義協議消息格式,除了協議頭,還包括調用的服務類名、方法名以及參數(Payload)。

type RPCMsg struct {
    *Header
    ServiceClass  string
    ServiceMethod string
    Payload       []byte
}
func NewRPCMsg() *RPCMsg {
    header := Header([HEADER_LEN]byte{})
    header[0] = magicNumber
    return &RPCMsg{
        Header: &header,
    }
}

protocol/msg.go

實現傳輸

定義好協議後,要解決的問題就是如何通過網絡(IO)發送和接收,實現通信的目的。

func (msg *RPCMsg) Send(writer io.Writer) error {
    //send header
    _, err := writer.Write(msg.Header[:])
    if err != nil {
        return err
    }
    //寫入消息體總長度,方便一次性解析
    dataLen := SPLIT_LEN + len(msg.ServiceClass) + SPLIT_LEN + len(msg.ServiceMethod) + SPLIT_L
EN + len(msg.Payload)
    err = binary.Write(writer, binary.BigEndian, uint32(dataLen)) //4
    if err != nil {
        return err
    }
    //write service.class len
    err = binary.Write(writer, binary.BigEndian, uint32(len(msg.ServiceClass)))
    if err != nil {
        return err
    }
    //write service.class content
    err = binary.Write(writer, binary.BigEndian, util.StringToByte(msg.ServiceClass))
    if err != nil {
        return err
    }
    //省略 service.method,payload 
}

protocol/msg.go

其中類名、方法名、payload 均爲不定長部分,要想順利解析就需要一一對應的長度字段標識不定長的長度,也就是 SPLIT_LEN 代表各部分長度,是 int32 類型(32 bit),正好相當於 4 個 byte,所以 SPLIT_LEN 爲 4。

另外要注意網絡傳輸一般使用大端字節序。先理解字節序即爲字節(byte)的組成順序,分爲大端序(最高有效位放低地址)和小端序(最低有效位放低地址)。CPU 一般採用小端序讀寫,而 TCP 網絡傳輸採用大端序則更方便。對應這裏的 binary.BigEndian 代碼實現大端序。

消息讀取後反解析,按發送順序依次還原 Header、類名、方法名、Payload,不定長部分都有對應的長度保存,因此可以順利解析到所有數據。

func Read(r io.Reader) (*RPCMsg, error) {
    msg := NewRPCMsg()
    err := msg.Decode(r)
    if err != nil {
        return nil, err
    }
    return msg, nil
}
func (msg *RPCMsg) Decode(r io.Reader) error {
    //read header
    _, err := io.ReadFull(r, msg.Header[:])
    if !msg.Header.CheckMagicNumber() { //magicNumber
        return fmt.Errorf("magic number error: %v", msg.Header[0])
    }
    //total body len    
    headerByte := make([]byte, 4)
    _, err = io.ReadFull(r, headerByte)
    if err != nil {
        return err
    }
    bodyLen := binary.BigEndian.Uint32(headerByte)
    //一次將整個body讀取,再依次拆解
    data := make([]byte, bodyLen)
    _, err = io.ReadFull(r, data)
    //service.class len
    start := 0
    end := start + SPLIT_LEN
    classLen := binary.BigEndian.Uint32(data[start:end]) //0,4
    //service.class
    start = end
    end = start + int(classLen)
    msg.ServiceClass = util.ByteToString(data[start:end]) //4,x
    //省略 method,payload}

protocol/msg.go

解決了最複雜的協議部分,下面依次來看服務端和客戶端的實現。

服務端實現

服務端實現主要包括服務啓停(端口監聽)、服務註冊、響應連接和處理請求幾部分。

定義服務接口

服務接口提供服務啓停和處理方法註冊的能力。

type Server interface {
    Register(string, interface{})
    Run()
    Close()
}

provider/server.go

服務啓停

實現服務啓停,關鍵在於通過 ip 和端口開啓監聽,這裏通過 Listener 封裝 net 包開啓 tcp Listen。

type RPCServer struct {
    listener Listener
}
func NewRPCServer(ip string, port int) *RPCServer {
    return &RPCServer{
        listener: NewRPCListener(ip, port),
    }   
}
func (svr *RPCServer) Run() {
    go svr.listener.Run()
}
func (svr *RPCServer) Close() {
    if svr.listener != nil {
        svr.listener.Close()
    }
}

provider/server.go

type Listener interface {
    Run()
    SetHandler(string, Handler)
    Close()
}
type RPCListener struct {
    ServiceIp   string
    ServicePort int
    Handlers    map[string]Handler
    nl          net.Listener
}
func NewRPCListener(serviceIp string, servicePort int) *RPCListener {
    return &RPCListener{ServiceIp: serviceIp,
        ServicePort: servicePort,
        Handlers:    make(map[string]Handler)}
}
func (l *RPCListener) Run() {
    addr := fmt.Sprintf("%s:%d", l.ServiceIp, l.ServicePort)
    nl, err := net.Listen(config.NET_TRANS_PROTOCOL, addr)  //tcp
    if err != nil {
        panic(err)
    }   
    l.nl = nl
    for {
        conn, err := l.nl.Accept()
        if err != nil {
            continue
        }  
        go l.handleConn(conn)
    }
}
func (l *RPCListener) Close() {    if l.nl != nil {    
       l.nl.Close()
    }
}

provider/listener.go

這裏通過爲每個連接創建一個協程處理請求,得益於 Golang 的協程優勢,Thread-Per-Message 模式來滿足併發請求更容易實現(Java 線程成本太大,一般採用線程池實現 Worker Thread 模式)。

服務註冊

服務註冊就是在內存中維護一個映射關係,map (key = 服務名,value = 對象實例),通過 interface{} 泛化,可以反射還原。

func (svr *RPCServer) Register(class interface{}) {
    name := reflect.Indirect(reflect.ValueOf(class)).Type().Name()
    svr.RegisterName(name, class)
}
func (svr *RPCServer) RegisterName(name string, class interface{}) {
    handler := &RPCServerHandler{class: reflect.ValueOf(class)}
    svr.listener.SetHandler(name, handler)
    log.Printf("%s registered success!\n", name)
}
func (l *RPCListener) SetHandler(name string, handler Handler) {
    if _, ok := l.Handlers[name]; ok {
        log.Printf("%s is registered!\n", name)
        return
    }
    l.Handlers[name] = handler
}

provider/server.go

(1)由於服務啓動初始化時進行所有服務註冊,所以用 map 沒考慮併發,否則如果有動態註冊就需要考慮併發問題了。

(2)這裏沒有註冊到服務註冊中心,設計考慮將以應用服務(系統)爲單位進行註冊,而具體的服務接口通過應用內存映射。這種註冊粒度大,優點就是減少對註冊中心的依賴和註冊實例數量,提高服務發現資源利用率。

(dubbo 3 重要改進就是將接口級服務發現切換爲應用級服務發現)

響應連接請求

整個過程依次涉及從網絡連接讀取數據,反序列化獲得請求結構體 (RPCMsg),根據註冊類和方法找到目標函數並執行,將執行結果序列化後封裝成 RPCMsg 通過網絡發送,整個過程是同步 io 模型。

func (l *RPCListener) handleConn(conn net.Conn) {
    defer catchPanic()
    for {
        msg, err := l.receiveData(conn)
        if err != nil || msg == nil {
            return
        }
        coder := global.Codecs[msg.Header.SerializeType()]
        if coder == nil {
            return
        }
        inArgs := make([]interface{}, 0)
        err = coder.Decode(msg.Payload, &inArgs)
        if err != nil {
            return
        }
        handler, ok := l.Handlers[msg.ServiceClass]
        if !ok {
            return
        }
        result, err := handler.Handle(msg.ServiceMethod, inArgs)
        encodeRes, err := coder.Encode(result) 
        if err != nil {
            return
        }
        err = l.sendData(conn, encodeRes)
        if err != nil {
            return
        }
    }
}

provider/listener.go

其中實際執行本地方法過程如下:

func (handler *RPCServerHandler) Handle(method string, params []interface{}) ([]interface{}, error) {
    args := make([]reflect.Value, len(params))
    for i := range params {
        args[i] = reflect.ValueOf(params[i])
    } 
    reflectMethod := handler.class.MethodByName(method)    
    result := reflectMethod.Call(args)
    resArgs := make([]interface{}, len(result))
    for i := 0; i < len(result); i++ {
        resArgs[i] = result[i].Interface()
    } 
    var err error
    if _, ok := result[len(result)-1].Interface().(error); ok {
        err = result[len(result)-1].Interface().(error)
    }
    return resArgs, err
}

provider/handler.go

收發網絡請求通過調用之前封裝的 Protocol 來完成。

func (l *RPCListener) receiveData(conn net.Conn) (*protocol.RPCMsg, error) {
    msg, err := protocol.Read(conn)
    if err != nil {
        if err != io.EOF { //close
            return nil, err
        }
    }
    return msg, nil
}
func (l *RPCListener) sendData(conn net.Conn, payload []byte) error {
    resMsg := protocol.NewRPCMsg()
    resMsg.SetVersion(config.Protocol_MsgVersion)
    resMsg.SetMsgType(protocol.Response)
    resMsg.SetCompressType(protocol.None)
    resMsg.SetSerializeType(protocol.Gob)
    resMsg.Payload = payload
    return  resMsg.Send(conn)
}

provider/listener.go

測試服務端

通過環境變量注入 ip 和 port,開啓服務監聽,依次註冊幾個服務。

func main() {
    flag.Parse()
    if ip == "" || port == 0 {
        panic("init ip and port error")
    }
    srv := provider.NewRPCServer(ip, port)
    srv.RegisterName("User"&UserHandler{})
    srv.RegisterName("Test"&TestHandler{})
    gob.Register(User{})
    go srv.Run()
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
    <-quit
    srv.Close()
}

server.go

這裏註冊兩個結構體 User 和 Test,特別注意:只有可導出的類方法(首字母大寫)才能被客戶端調用執行,否則會找不到對應類方法而失敗。此外 User 作爲接口值實現傳輸必須註冊纔行(gob.Register(User{}))。

type TestHandler struct{}
func (t *TestHandler) Hello() string {
    return "hello world"
}
type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Age  int    `json:"age"`
}
var userList = map[int]User{
    1: User{1, "hero", 11},
    2: User{2, "kavin", 12},
}
type UserHandler struct{}
func (u *UserHandler) GetUserById(id int) (User, error) {
    if u, ok := userList[id]; ok {
        return u, nil
    }
    return User{}, fmt.Errorf("id %d not found", id)
}

server.go

客戶端實現

客戶端發起 RPC 調用,就像調本地服務一樣,所以需要定義一個 stub,該 stub 同請求服務端方法簽名一致,然後通過代理實現網絡請求和解析。

var Hello func() string
r, err := cli.Call(ctx, "UserService.Test.Hello"&Hello)

var GetUserById func(id int) (User, error)
_, err := cli.Call(ctx, "UserService.User.GetUserById"&GetUserById)
u, err := GetUserById(2)

定義客戶端

定義客戶端接口,其中 Invoke 代理執行 RPC 請求。

type Client interface {
    Connect(string) error
    Invoke(context.Context, *Service, interface{}, ...interface{}) (interface{}, error)
    Close()
}

consumer/client.go

定義連接參數,設置重試次數、超時時間、序列化協議、壓縮類型等。

type Option struct {
    Retries           int
    ConnectionTimeout time.Duration
    SerializeType     protocol.SerializeType
    CompressType      protocol.CompressType
}
var DefaultOption = Option{
    Retries:           3,
    ConnectionTimeout: 5 * time.Second,
    SerializeType:     protocol.Gob,
    CompressType:      protocol.None,
}
type RPCClient struct {
    conn   net.Conn
    option Option
}
func NewClient(option Option) Client {
    return &RPCClient{option: option}
}

consumer/client.go

執行請求

實現網絡連接、關閉以及執行部分。

func (cli *RPCClient) Connect(addr string) error {
    conn, err := net.DialTimeout(config.NET_TRANS_PROTOCOL, addr, cli.option.ConnectionTimeout)
    if err != nil {
        return err
    }
    cli.conn = conn
    return nil
}
func (cli *RPCClient) Invoke(ctx context.Context, service *Service, stub interface{}, params ...interface{}) (interface{}, error) {
    cli.makeCall(service, stub)
    return cli.wrapCall(ctx, stub, params...)
}
func (cli *RPCClient) Close() {
    if cli.conn != nil {
        cli.conn.Close()
    }
}

consumer/client.go

執行代理過程,主要依賴反射實現。這裏 cli.makeCall() 主要是通過反射來生成代理函數,在代理函數中完成網絡連接、請求數據序列化、網絡傳輸、響應返回數據解析的工作,然後通過 cli.wrapCall() 發起實際調用。

func (cli *RPCClient) makeCall(service *Service, methodPtr interface{}) {
    container := reflect.ValueOf(methodPtr).Elem() 
    coder := global.Codecs[cli.option.SerializeType]

    handler := func(req []reflect.Value) []reflect.Value {  
        numOut := container.Type().NumOut()
        errorHandler := func(err error) []reflect.Value {
            outArgs := make([]reflect.Value, numOut)
            for i := 0; i < len(outArgs)-1; i++ {
                outArgs[i] = reflect.Zero(container.Type().Out(i))
            }
            outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()
            return outArgs
        }
        inArgs := make([]interface{}, 0, len(req))
        for _, arg := range req {
            inArgs = append(inArgs, arg.Interface())
        }
        payload, err := coder.Encode(inArgs) //[]byte
        if err != nil {
            log.Printf("encode err:%v\n", err)
            return errorHandler(err)
        }
        msg := protocol.NewRPCMsg()
        msg.SetVersion(config.Protocol_MsgVersion)
        msg.SetMsgType(protocol.Request)
        msg.SetCompressType(cli.option.CompressType)
        msg.SetSerializeType(cli.option.SerializeType)
        msg.ServiceClass = service.Class
        msg.ServiceMethod = service.Method
        msg.Payload = payload
        err = msg.Send(cli.conn)
        if err != nil {
            log.Printf("send err:%v\n", err)
            return errorHandler(err)
        }
        respMsg, err := protocol.Read(cli.conn)
        if err != nil {
            return errorHandler(err)
        }
        respDecode := make([]interface{}, 0)
        err = coder.Decode(respMsg.Payload, &respDecode)
        if err != nil {
            log.Printf("decode err:%v\n", err)
            return errorHandler(err)
        }
        if len(respDecode) == 0 {
            respDecode = make([]interface{}, numOut)
        }
        outArgs := make([]reflect.Value, numOut)
        for i := 0; i < numOut; i++ {
            if i != numOut {
                if respDecode[i] == nil {
                    outArgs[i] = reflect.Zero(container.Type().Out(i))
                } else {
                    outArgs[i] = reflect.ValueOf(respDecode[i])
                }
            } else {
                outArgs[i] = reflect.Zero(container.Type().Out(i))
            }
        }
        return outArgs
    }
    container.Set(reflect.MakeFunc(container.Type(), handler))
}

consumer/client.go

wrapCall 執行實際函數調用。

func (cli *RPCClient) wrapCall(ctx context.Context, stub interface{}, params ...interface{}) (interface{}, error) {
    f := reflect.ValueOf(stub).Elem()
    if len(params) != f.Type().NumIn() {
        return nil, errors.New(fmt.Sprintf("params not adapted: %d-%d", len(params), f.Type().NumIn()))
    }
    in := make([]reflect.Value, len(params))
    for idx, param := range params {
        in[idx] = reflect.ValueOf(param)
    }
    result := f.Call(in)
    return result, nil
}

consumer/client.go

代理實現

到目前爲止,客戶端主要實現邏輯有了,但是客戶端在發起調用前是需要先連接到服務端,然後執行調用,還有長連接管理、超時、重試甚至鑑權等功能沒有實現,因此需要有一個代理類完成以上動作。

type RPCClientProxy struct {
    option Option
}
func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ...interface{}) (interface{}, error) {
    service, err := NewService(servicePath)
    if err != nil {
        return nil, err
    }       
    client := NewClient(cp.option)
    addr := service.SelectAddr()
    err = client.Connect(addr) //TODO 長連接管理
    if err != nil {
        return nil, err
    }
    retries := cp.option.Retries
    for retries > 0 {
        retries--
        return client.Invoke(ctx, service, stub, params...)
    }
    return nil, errors.New("error")
}

consumer/client_proxy.go

這裏通過服務路徑拆分依次獲取類名、方法名、服務 AppId,然後根據 AppId 查找服務註冊中心獲取到服務端(服務提供者)地址。由於篇幅限制,這部分將在下一篇實現(包括註冊發現、負載均衡、長連接管理等),測試方便這裏直接寫死服務端地址。

type Service struct {
    AppId  string
    Class  string
    Method string
    Addrs  []string
}
//demo: UserService.user.GetUser
func NewService(servicePath string) (*Service, error) {
    arr := strings.Split(servicePath, ".")
    service := &Service{}
    if len(arr) != 3 { 
        return service, errors.New("service path inlegal")
    }   
    service.AppId = arr[0]
    service.Class = arr[1]
    service.Method = arr[2]
    return service, nil 
}
func (service *Service) SelectAddr() string {
    return "ip:8811"
}

consumer/service.go

測試客戶端

客戶端通過 stub 發起調用,執行過程看到發起了遠程執行並從服務端獲取到了結果。

func main() {
    gob.Register(User{})
    cli := consumer.NewClientProxy(consumer.DefaultOption)
    ctx := context.Background()
    var GetUserById func(id int) (User, error)
    cli.Call(ctx, "UserService.User.GetUserById"&GetUserById)
    u, err := GetUserById(2)
    log.Println("result:", u, err)
    var Hello func() string
    r, err := cli.Call(ctx, "UserService.Test.Hello"&Hello)
    log.Println("result:", r, err)
}

client.go

總結與補充

至此實現了簡單的 “P2P RPC”,後續可以迭代加入註冊發現能力、長連接管理、異步調用、插件化擴展、負載均衡、認證授權、容錯治理等能力,希望大家多多支持。

文章完整代碼請關注公衆號  技術歲月,發送關鍵字 RPC獲取。

技術歲月 技術歲月,熱愛技術,分享點滴

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/3u-ZsAAixjddNFy_wbSaUw