Golang 從零到一開發實現 RPC 框架
內容提要
RPC 框架是分佈式領域核心組件,也是微服務的基礎。今天嘗試從零擼一個 RPC 框架,剖析其核心原理及代碼實現,後續還會逐步迭代追加微服務治理等功能,將之前文章覆蓋的熔斷、限流、負載均衡、註冊發現等功能融合進來,打造一個五臟俱全的 RPC 框架。本文主要內容包括:
-
RPC 實現原理
-
RPC 協議設計
-
RPC 服務端實現
-
RPC 客戶端實現
實現原理
RPC (Remote Procedure Call)全稱是遠程過程調用,相對於本地方法調用,在同一內存空間可以直接通過方法棧實現調用,遠程調用則跨了不同的服務終端,並不能直接調用。
RPC 框架 要解決的就是遠程方法調用的問題,並且實現調用遠程服務像調用本地服務一樣簡單,框架內部封裝實現了網絡調用的細節(透明化遠程調用),其核心過程原理如下圖所示。
這個版本可以稱爲 “P2P RPC” ,而生產環境部署往往會將服務提供者(Server)部署多個實例(集羣部署),那麼客戶端就需要具備發現服務端的能力和負載均衡的支持,所以有了服務註冊發現和負載均衡。
再然後,爲了保障 RPC 調用的可靠性和穩定性,增加了服務監控和服務容錯治理的能力,考慮性能提升的異步化能力以及考慮可擴展性的插件化管理,這些完善構成了更完整的微服務 RPC 框架。
RPC 協議實現
協議設計
協議設計算是 RPC 最重要的一部分了,它主要解決服務端與客戶端通信的問題。一般來說通訊要解決如下問題:
**
- 網絡傳輸協議**
基於 TCP、UDP 還是 HTTP,UDP 要自己解決可靠性傳輸問題,而 HTTP 又太重,包含很多沒必要的頭信息,所以一般 RPC 框架會優先選擇 TCP 協議。
(當然也有大名鼎鼎的 gRPC 基於 HTTP2)
2. 序列化協議
網絡傳輸數據必須是二進制數據,而執行過程是編程語言的對象方法,那麼就涉及到如何將對象序列化成可傳輸消息(二進制),並可反序列化還原。常見的通用型協議如 XML、 JSON、Protobuf、Thrift 等,也有語言綁定的如 Python 原生支持的 pickle 協議, Java 實現的 Serializbale 接口及 Hessian 協議,Golang 原生支持的 Gob 協議等。
3. 消息編碼協議
它是一種客戶端和服務端的調用約定,比如請求和參數如何組織,Header 放置什麼內容。這部分每個框架設計均不同,有時也稱這一層爲狹義的 RPC 協議層。
另外客戶端發起調用一般來說要知道調用的具體類方法(請求標識符)以及入參(Payload),而網絡傳輸的是二進制字節流,如何能從這些字節中找出哪些是方法名,哪些是參數?進一步如果客戶端不斷的發送消息,如何將每一條消息分割?(解決 TCP 粘包問題)
採用定長消息很容易解決,但事先並不能確定要固定多長,所以這種方式並不可行。消息加分隔符可以實現,但要確保分隔符不會與正文衝突。而最常用的實現方案就是用定長的頭標識出不定長的體,比如用 int32 (定長 4 字節)標識後面的內容長度,這樣就能較優雅實現消息分割了。
(注:這個方案中如果消息體的長度大於 2^32 會發生溢出而導致解析失敗,可以換更長類型,但理論上總會有溢出風險,設計使用時應該限制避免傳輸過大數據體)
協議實現
網絡傳輸協議,這裏使用 TCP 協議即可,沒有太多爭議,可預留接口支持。
序列化協議,這裏使用 Golang 專有的 Gob 協議,保留接口後期可以擴展支持 JSON、Protobuf 等協議。
type Codec interface {
Encode(i interface{}) ([]byte, error)
Decode(data []byte, i interface{}) error
}
type GobCodec struct{}
func (c GobCodec) Encode(i interface{}) ([]byte, error) {
var buffer bytes.Buffer
encoder := gob.NewEncoder(&buffer)
if err := encoder.Encode(i); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func (c GobCodec) Decode(data []byte, i interface{}) error {
buffer := bytes.NewBuffer(data)
decoder := gob.NewDecoder(buffer)
return decoder.Decode(i)
}
codec/codec.go
RPC 消息格式編碼設計如下,協議消息頭定義定長 5 字節(byte),依次放置魔術數(用於校驗),協議版本,消息類型(區分請求 / 響應),壓縮類型,序列化協議類型,每個佔 1 個字節(8 個 bit)。可擴展追加 消息 ID 以及 元數據 等信息用於做服務治理。
const (
HEADER_LEN = 5
)
const (
magicNumber byte = 0x06
)
type MsgType byte
const (
Request MsgType = iota
Response
)
type CompressType byte
const (
None CompressType = iota
Gzip
)
type SerializeType byte
const (
Gob SerializeType = iota
JSON
)
type Header [HEADER_LEN]byte
func (h *Header) CheckMagicNumber() bool {
return h[0] == magicNumber
}
func (h *Header) Version() byte {
return h[1]
}
func (h *Header) SetVersion(version byte) {
h[1] = version
}
//省略 MsgType,CompressType,SerializeType
protocol/header.go
定義協議消息格式,除了協議頭,還包括調用的服務類名、方法名以及參數(Payload)。
type RPCMsg struct {
*Header
ServiceClass string
ServiceMethod string
Payload []byte
}
func NewRPCMsg() *RPCMsg {
header := Header([HEADER_LEN]byte{})
header[0] = magicNumber
return &RPCMsg{
Header: &header,
}
}
protocol/msg.go
實現傳輸
定義好協議後,要解決的問題就是如何通過網絡(IO)發送和接收,實現通信的目的。
func (msg *RPCMsg) Send(writer io.Writer) error {
//send header
_, err := writer.Write(msg.Header[:])
if err != nil {
return err
}
//寫入消息體總長度,方便一次性解析
dataLen := SPLIT_LEN + len(msg.ServiceClass) + SPLIT_LEN + len(msg.ServiceMethod) + SPLIT_L
EN + len(msg.Payload)
err = binary.Write(writer, binary.BigEndian, uint32(dataLen)) //4
if err != nil {
return err
}
//write service.class len
err = binary.Write(writer, binary.BigEndian, uint32(len(msg.ServiceClass)))
if err != nil {
return err
}
//write service.class content
err = binary.Write(writer, binary.BigEndian, util.StringToByte(msg.ServiceClass))
if err != nil {
return err
}
//省略 service.method,payload
}
protocol/msg.go
其中類名、方法名、payload 均爲不定長部分,要想順利解析就需要一一對應的長度字段標識不定長的長度,也就是 SPLIT_LEN 代表各部分長度,是 int32 類型(32 bit),正好相當於 4 個 byte,所以 SPLIT_LEN 爲 4。
另外要注意網絡傳輸一般使用大端字節序。先理解字節序即爲字節(byte)的組成順序,分爲大端序(最高有效位放低地址)和小端序(最低有效位放低地址)。CPU 一般採用小端序讀寫,而 TCP 網絡傳輸採用大端序則更方便。對應這裏的 binary.BigEndian 代碼實現大端序。
消息讀取後反解析,按發送順序依次還原 Header、類名、方法名、Payload,不定長部分都有對應的長度保存,因此可以順利解析到所有數據。
func Read(r io.Reader) (*RPCMsg, error) {
msg := NewRPCMsg()
err := msg.Decode(r)
if err != nil {
return nil, err
}
return msg, nil
}
func (msg *RPCMsg) Decode(r io.Reader) error {
//read header
_, err := io.ReadFull(r, msg.Header[:])
if !msg.Header.CheckMagicNumber() { //magicNumber
return fmt.Errorf("magic number error: %v", msg.Header[0])
}
//total body len
headerByte := make([]byte, 4)
_, err = io.ReadFull(r, headerByte)
if err != nil {
return err
}
bodyLen := binary.BigEndian.Uint32(headerByte)
//一次將整個body讀取,再依次拆解
data := make([]byte, bodyLen)
_, err = io.ReadFull(r, data)
//service.class len
start := 0
end := start + SPLIT_LEN
classLen := binary.BigEndian.Uint32(data[start:end]) //0,4
//service.class
start = end
end = start + int(classLen)
msg.ServiceClass = util.ByteToString(data[start:end]) //4,x
//省略 method,payload}
protocol/msg.go
解決了最複雜的協議部分,下面依次來看服務端和客戶端的實現。
服務端實現
服務端實現主要包括服務啓停(端口監聽)、服務註冊、響應連接和處理請求幾部分。
定義服務接口
服務接口提供服務啓停和處理方法註冊的能力。
type Server interface {
Register(string, interface{})
Run()
Close()
}
provider/server.go
服務啓停
實現服務啓停,關鍵在於通過 ip 和端口開啓監聽,這裏通過 Listener 封裝 net 包開啓 tcp Listen。
type RPCServer struct {
listener Listener
}
func NewRPCServer(ip string, port int) *RPCServer {
return &RPCServer{
listener: NewRPCListener(ip, port),
}
}
func (svr *RPCServer) Run() {
go svr.listener.Run()
}
func (svr *RPCServer) Close() {
if svr.listener != nil {
svr.listener.Close()
}
}
provider/server.go
type Listener interface {
Run()
SetHandler(string, Handler)
Close()
}
type RPCListener struct {
ServiceIp string
ServicePort int
Handlers map[string]Handler
nl net.Listener
}
func NewRPCListener(serviceIp string, servicePort int) *RPCListener {
return &RPCListener{ServiceIp: serviceIp,
ServicePort: servicePort,
Handlers: make(map[string]Handler)}
}
func (l *RPCListener) Run() {
addr := fmt.Sprintf("%s:%d", l.ServiceIp, l.ServicePort)
nl, err := net.Listen(config.NET_TRANS_PROTOCOL, addr) //tcp
if err != nil {
panic(err)
}
l.nl = nl
for {
conn, err := l.nl.Accept()
if err != nil {
continue
}
go l.handleConn(conn)
}
}
func (l *RPCListener) Close() { if l.nl != nil {
l.nl.Close()
}
}
provider/listener.go
這裏通過爲每個連接創建一個協程處理請求,得益於 Golang 的協程優勢,Thread-Per-Message 模式來滿足併發請求更容易實現(Java 線程成本太大,一般採用線程池實現 Worker Thread 模式)。
服務註冊
服務註冊就是在內存中維護一個映射關係,map (key = 服務名,value = 對象實例),通過 interface{} 泛化,可以反射還原。
func (svr *RPCServer) Register(class interface{}) {
name := reflect.Indirect(reflect.ValueOf(class)).Type().Name()
svr.RegisterName(name, class)
}
func (svr *RPCServer) RegisterName(name string, class interface{}) {
handler := &RPCServerHandler{class: reflect.ValueOf(class)}
svr.listener.SetHandler(name, handler)
log.Printf("%s registered success!\n", name)
}
func (l *RPCListener) SetHandler(name string, handler Handler) {
if _, ok := l.Handlers[name]; ok {
log.Printf("%s is registered!\n", name)
return
}
l.Handlers[name] = handler
}
provider/server.go
(1)由於服務啓動初始化時進行所有服務註冊,所以用 map 沒考慮併發,否則如果有動態註冊就需要考慮併發問題了。
(2)這裏沒有註冊到服務註冊中心,設計考慮將以應用服務(系統)爲單位進行註冊,而具體的服務接口通過應用內存映射。這種註冊粒度大,優點就是減少對註冊中心的依賴和註冊實例數量,提高服務發現資源利用率。
(dubbo 3 重要改進就是將接口級服務發現切換爲應用級服務發現)
響應連接請求
整個過程依次涉及從網絡連接讀取數據,反序列化獲得請求結構體 (RPCMsg),根據註冊類和方法找到目標函數並執行,將執行結果序列化後封裝成 RPCMsg 通過網絡發送,整個過程是同步 io 模型。
func (l *RPCListener) handleConn(conn net.Conn) {
defer catchPanic()
for {
msg, err := l.receiveData(conn)
if err != nil || msg == nil {
return
}
coder := global.Codecs[msg.Header.SerializeType()]
if coder == nil {
return
}
inArgs := make([]interface{}, 0)
err = coder.Decode(msg.Payload, &inArgs)
if err != nil {
return
}
handler, ok := l.Handlers[msg.ServiceClass]
if !ok {
return
}
result, err := handler.Handle(msg.ServiceMethod, inArgs)
encodeRes, err := coder.Encode(result)
if err != nil {
return
}
err = l.sendData(conn, encodeRes)
if err != nil {
return
}
}
}
provider/listener.go
其中實際執行本地方法過程如下:
func (handler *RPCServerHandler) Handle(method string, params []interface{}) ([]interface{}, error) {
args := make([]reflect.Value, len(params))
for i := range params {
args[i] = reflect.ValueOf(params[i])
}
reflectMethod := handler.class.MethodByName(method)
result := reflectMethod.Call(args)
resArgs := make([]interface{}, len(result))
for i := 0; i < len(result); i++ {
resArgs[i] = result[i].Interface()
}
var err error
if _, ok := result[len(result)-1].Interface().(error); ok {
err = result[len(result)-1].Interface().(error)
}
return resArgs, err
}
provider/handler.go
收發網絡請求通過調用之前封裝的 Protocol 來完成。
func (l *RPCListener) receiveData(conn net.Conn) (*protocol.RPCMsg, error) {
msg, err := protocol.Read(conn)
if err != nil {
if err != io.EOF { //close
return nil, err
}
}
return msg, nil
}
func (l *RPCListener) sendData(conn net.Conn, payload []byte) error {
resMsg := protocol.NewRPCMsg()
resMsg.SetVersion(config.Protocol_MsgVersion)
resMsg.SetMsgType(protocol.Response)
resMsg.SetCompressType(protocol.None)
resMsg.SetSerializeType(protocol.Gob)
resMsg.Payload = payload
return resMsg.Send(conn)
}
provider/listener.go
測試服務端
通過環境變量注入 ip 和 port,開啓服務監聽,依次註冊幾個服務。
func main() {
flag.Parse()
if ip == "" || port == 0 {
panic("init ip and port error")
}
srv := provider.NewRPCServer(ip, port)
srv.RegisterName("User", &UserHandler{})
srv.RegisterName("Test", &TestHandler{})
gob.Register(User{})
go srv.Run()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
<-quit
srv.Close()
}
server.go
這裏註冊兩個結構體 User 和 Test,特別注意:只有可導出的類方法(首字母大寫)才能被客戶端調用執行,否則會找不到對應類方法而失敗。此外 User 作爲接口值實現傳輸必須註冊纔行(gob.Register(User{}))。
type TestHandler struct{}
func (t *TestHandler) Hello() string {
return "hello world"
}
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
}
var userList = map[int]User{
1: User{1, "hero", 11},
2: User{2, "kavin", 12},
}
type UserHandler struct{}
func (u *UserHandler) GetUserById(id int) (User, error) {
if u, ok := userList[id]; ok {
return u, nil
}
return User{}, fmt.Errorf("id %d not found", id)
}
server.go
客戶端實現
客戶端發起 RPC 調用,就像調本地服務一樣,所以需要定義一個 stub,該 stub 同請求服務端方法簽名一致,然後通過代理實現網絡請求和解析。
var Hello func() string
r, err := cli.Call(ctx, "UserService.Test.Hello", &Hello)
var GetUserById func(id int) (User, error)
_, err := cli.Call(ctx, "UserService.User.GetUserById", &GetUserById)
u, err := GetUserById(2)
定義客戶端
定義客戶端接口,其中 Invoke 代理執行 RPC 請求。
type Client interface {
Connect(string) error
Invoke(context.Context, *Service, interface{}, ...interface{}) (interface{}, error)
Close()
}
consumer/client.go
定義連接參數,設置重試次數、超時時間、序列化協議、壓縮類型等。
type Option struct {
Retries int
ConnectionTimeout time.Duration
SerializeType protocol.SerializeType
CompressType protocol.CompressType
}
var DefaultOption = Option{
Retries: 3,
ConnectionTimeout: 5 * time.Second,
SerializeType: protocol.Gob,
CompressType: protocol.None,
}
type RPCClient struct {
conn net.Conn
option Option
}
func NewClient(option Option) Client {
return &RPCClient{option: option}
}
consumer/client.go
執行請求
實現網絡連接、關閉以及執行部分。
func (cli *RPCClient) Connect(addr string) error {
conn, err := net.DialTimeout(config.NET_TRANS_PROTOCOL, addr, cli.option.ConnectionTimeout)
if err != nil {
return err
}
cli.conn = conn
return nil
}
func (cli *RPCClient) Invoke(ctx context.Context, service *Service, stub interface{}, params ...interface{}) (interface{}, error) {
cli.makeCall(service, stub)
return cli.wrapCall(ctx, stub, params...)
}
func (cli *RPCClient) Close() {
if cli.conn != nil {
cli.conn.Close()
}
}
consumer/client.go
執行代理過程,主要依賴反射實現。這裏 cli.makeCall() 主要是通過反射來生成代理函數,在代理函數中完成網絡連接、請求數據序列化、網絡傳輸、響應返回數據解析的工作,然後通過 cli.wrapCall() 發起實際調用。
func (cli *RPCClient) makeCall(service *Service, methodPtr interface{}) {
container := reflect.ValueOf(methodPtr).Elem()
coder := global.Codecs[cli.option.SerializeType]
handler := func(req []reflect.Value) []reflect.Value {
numOut := container.Type().NumOut()
errorHandler := func(err error) []reflect.Value {
outArgs := make([]reflect.Value, numOut)
for i := 0; i < len(outArgs)-1; i++ {
outArgs[i] = reflect.Zero(container.Type().Out(i))
}
outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()
return outArgs
}
inArgs := make([]interface{}, 0, len(req))
for _, arg := range req {
inArgs = append(inArgs, arg.Interface())
}
payload, err := coder.Encode(inArgs) //[]byte
if err != nil {
log.Printf("encode err:%v\n", err)
return errorHandler(err)
}
msg := protocol.NewRPCMsg()
msg.SetVersion(config.Protocol_MsgVersion)
msg.SetMsgType(protocol.Request)
msg.SetCompressType(cli.option.CompressType)
msg.SetSerializeType(cli.option.SerializeType)
msg.ServiceClass = service.Class
msg.ServiceMethod = service.Method
msg.Payload = payload
err = msg.Send(cli.conn)
if err != nil {
log.Printf("send err:%v\n", err)
return errorHandler(err)
}
respMsg, err := protocol.Read(cli.conn)
if err != nil {
return errorHandler(err)
}
respDecode := make([]interface{}, 0)
err = coder.Decode(respMsg.Payload, &respDecode)
if err != nil {
log.Printf("decode err:%v\n", err)
return errorHandler(err)
}
if len(respDecode) == 0 {
respDecode = make([]interface{}, numOut)
}
outArgs := make([]reflect.Value, numOut)
for i := 0; i < numOut; i++ {
if i != numOut {
if respDecode[i] == nil {
outArgs[i] = reflect.Zero(container.Type().Out(i))
} else {
outArgs[i] = reflect.ValueOf(respDecode[i])
}
} else {
outArgs[i] = reflect.Zero(container.Type().Out(i))
}
}
return outArgs
}
container.Set(reflect.MakeFunc(container.Type(), handler))
}
consumer/client.go
wrapCall 執行實際函數調用。
func (cli *RPCClient) wrapCall(ctx context.Context, stub interface{}, params ...interface{}) (interface{}, error) {
f := reflect.ValueOf(stub).Elem()
if len(params) != f.Type().NumIn() {
return nil, errors.New(fmt.Sprintf("params not adapted: %d-%d", len(params), f.Type().NumIn()))
}
in := make([]reflect.Value, len(params))
for idx, param := range params {
in[idx] = reflect.ValueOf(param)
}
result := f.Call(in)
return result, nil
}
consumer/client.go
代理實現
到目前爲止,客戶端主要實現邏輯有了,但是客戶端在發起調用前是需要先連接到服務端,然後執行調用,還有長連接管理、超時、重試甚至鑑權等功能沒有實現,因此需要有一個代理類完成以上動作。
type RPCClientProxy struct {
option Option
}
func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ...interface{}) (interface{}, error) {
service, err := NewService(servicePath)
if err != nil {
return nil, err
}
client := NewClient(cp.option)
addr := service.SelectAddr()
err = client.Connect(addr) //TODO 長連接管理
if err != nil {
return nil, err
}
retries := cp.option.Retries
for retries > 0 {
retries--
return client.Invoke(ctx, service, stub, params...)
}
return nil, errors.New("error")
}
consumer/client_proxy.go
這裏通過服務路徑拆分依次獲取類名、方法名、服務 AppId,然後根據 AppId 查找服務註冊中心獲取到服務端(服務提供者)地址。由於篇幅限制,這部分將在下一篇實現(包括註冊發現、負載均衡、長連接管理等),測試方便這裏直接寫死服務端地址。
type Service struct {
AppId string
Class string
Method string
Addrs []string
}
//demo: UserService.user.GetUser
func NewService(servicePath string) (*Service, error) {
arr := strings.Split(servicePath, ".")
service := &Service{}
if len(arr) != 3 {
return service, errors.New("service path inlegal")
}
service.AppId = arr[0]
service.Class = arr[1]
service.Method = arr[2]
return service, nil
}
func (service *Service) SelectAddr() string {
return "ip:8811"
}
consumer/service.go
測試客戶端
客戶端通過 stub 發起調用,執行過程看到發起了遠程執行並從服務端獲取到了結果。
func main() {
gob.Register(User{})
cli := consumer.NewClientProxy(consumer.DefaultOption)
ctx := context.Background()
var GetUserById func(id int) (User, error)
cli.Call(ctx, "UserService.User.GetUserById", &GetUserById)
u, err := GetUserById(2)
log.Println("result:", u, err)
var Hello func() string
r, err := cli.Call(ctx, "UserService.Test.Hello", &Hello)
log.Println("result:", r, err)
}
client.go
總結與補充
至此實現了簡單的 “P2P RPC”,後續可以迭代加入註冊發現能力、長連接管理、異步調用、插件化擴展、負載均衡、認證授權、容錯治理等能力,希望大家多多支持。
文章完整代碼請關注公衆號 技術歲月,發送關鍵字 RPC獲取。
技術歲月 技術歲月,熱愛技術,分享點滴
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/3u-ZsAAixjddNFy_wbSaUw