寫給 go 開發者的 gRPC 教程 - 攔截器


gRPC攔截器和其他框架的攔截器(也稱 middleware)作用是一樣的。利用攔截器我們可以在不侵入業務邏輯的前提下修改或者記錄服務端或客戶端的請求與響應,利用攔截器我們可以實現諸如日誌記錄、權限認證、限流等諸多功能

上一篇提到gRPC的通信模式分爲unarystreaming幾種模式,攔截器也分爲兩種:unary interceptorsstreaming interceptors ,兩種攔截器可以分別應用在服務端和客戶端,所以 gRPC 總共爲我們提供了四種攔截器。它們已經被定義成了 go 中的接口,我們創建的攔截器只要實現這些接口即可

gRPC 四種攔截器一覽

服務端攔截器

服務端的攔截器從請求開始按順序執行攔截器,在執行完對應 RPC 的邏輯之後,再按反向的順序執行攔截器中對響應的處理邏輯

服務端攔截器

unary interceptors

對於unary服務的攔截器只需實現UnaryServerInterceptor接口即可

func(ctx context.Context, req interface{}, 
     info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

參數看不懂沒關係,我們來看一個例子

示例

// 實現 unary interceptors
func orderUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
 // Pre-processing logic
 s := time.Now()

 // Invoking the handler to complete the normal execution of a unary RPC.
 m, err := handler(ctx, req)

 // Post processing logic
 log.Printf("Method: %s, req: %s, resp: %s, latency: %s\n",
  info.FullMethod, req, m, time.Now().Sub(s))
  
 return m, err
}

func main() {
 s := grpc.NewServer(
    // 使用 unary interceptors
  grpc.UnaryInterceptor(orderUnaryServerInterceptor),
 )
 
  pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})
  
 // ...
}

完整代碼參考:https://github.com/liangwt/grpc-example/tree/main/06-interceptors/server。下同

假設我們的客戶端請求了GetOrder,根據示例再重新看下攔截器接口的每一個參數

🌲 req interface{}

RPC 服務的請求結構體,對於GetOrder來說就是orderId *wrapperspb.StringValue

🌲 info *UnaryServerInfo包含兩個字段:

FullMethod是請求的 method 名字(例如/ecommerce.OrderManagement/getOrder);

Server就是服務實現(就是示例RegisterOrderManagementServer中的&OrderManagementImpl{})

🌲 handler包裝了服務實現

所以在調用它之前我們可以進行改寫reqctx、記錄邏輯開始時間等操作

調用完handler即完成了 RPC 並獲取到響應,我們不僅可以記錄響應還可以改寫響應

總結

這張圖大致展示了UnaryServerInterceptor接口的每個參數的含義

streaming interceptors

對於stream服務的攔截器只要實現StreamServerInterceptor接口即可。它適用於我們上一篇介紹的

func(srv interface{}, ss ServerStream, 
     info *StreamServerInfo, handler StreamHandler) error

示例

我們來看一個例子

func orderStreamServerInterceptor(srv interface{},
 ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {

 // Pre-processing logic
 s := time.Now()

 // Invoking the StreamHandler to complete the execution of RPC invocation
 err := handler(srv, ss)

 // Post processing logic
 log.Printf("Method: %s, latency: %s\n", info.FullMethod, time.Now().Sub(s))

 return err
}

func main() {
 s := grpc.NewServer(
  grpc.StreamInterceptor(orderStreamServerInterceptor),
 )

 pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})

 //...
}

根據示例再重新看下攔截器接口的參數

🌲 srv interface{}

服務實現(就是示例RegisterOrderManagementServer中的&OrderManagementImpl{})

🌲 ss grpc.ServerStream

服務端發送和接收數據的接口,注意它是一個接口

🌲 info *grpc.StreamServerInfo包含三個字段:

FullMethod是請求的 method 名字(例如/ecommerce.OrderManagement/updateOrders);

IsClientStream 是否是客戶端流

IsServerStream 是否是服務端流

🌲 handler包裝了服務實現

所以在調用它之前我們可以進行改寫數據流、記錄邏輯開始時間等操作

調用完handler即完成了 RPC,因爲是流式調用所以不會返回響應數據,只有error

流式攔截器既沒有請求字段,handler也不會返回響應,該如何記錄、修改請求響應呢?

如果想劫持流數據,答案就在ss ServerStream。再重複一遍它的含義:服務端視角的流,它是一個接口。無論是哪一種流式 RPC 對於服務端來說發送(SendMsg)就代表着響應數據,接收(RecvMsg)就代表着請求數據,不同的流式 RPC 的區別就在於是多次發送數據(服務器端流式 RPC)還是多次接收數據(客戶端流式 RPC)或者兩者均有(雙向流式 RPC)。因此可以對ss進行包裝,只要傳入handler的類型實現ServerStream即可

// SendMsg method call.
type wrappedStream struct {
 Recv []interface{}
 Send []interface{}
 grpc.ServerStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
 err := w.ServerStream.RecvMsg(m)

 w.Recv = append(w.Recv, m)

 return err
}

func (w *wrappedStream) SendMsg(m interface{}) error {
 err := w.ServerStream.SendMsg(m)

 w.Send = append(w.Send, m)

 return err
}

func newWrappedStream(s grpc.ServerStream) *wrappedStream {
 return &wrappedStream{
  make([]interface{}, 0),
  make([]interface{}, 0),
  s,
 }
}

func orderStreamServerInterceptor(srv interface{},
 ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {

 // Pre-processing logic
 s := time.Now()

 // Invoking the StreamHandler to complete the execution of RPC invocation
 nss := newWrappedStream(ss)
 err := handler(srv, nss)

 // Post processing logic
 log.Printf("Method: %s, req: %+v, resp: %+v, latency: %s\n",
  info.FullMethod, nss.Recv, nss.Send, time.Now().Sub(s))

 return err
}

客戶端攔截器

客戶端攔截器和服務端攔截器類似,從請求開始按順序執行攔截器,在獲取到服務端響應之後,再按反向的順序執行攔截器中對響應的處理邏輯

客戶端攔截器

unary interceptors

client 端要實現UnaryClientInterceptor接口實現的接口如下

func(ctx context.Context, method string, req, reply interface{}, 
     cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

你可以在調用遠程函數前攔截 RPC,通過獲取 RPC 相關信息,如參數,上下文,函數名,請求等,你甚至可以修改原始的遠程調用

示例

func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
 // Pre-processor phase
 s := time.Now()

 // Invoking the remote method
 err := invoker(ctx, method, req, reply, cc, opts...)

 // Post-processor phase
 log.Printf("method: %s, req: %s, resp: %s, latency: %s\n",
  method, req, reply, time.Now().Sub(s))

 return err
}

func main() {
 conn, err := grpc.Dial("127.0.0.1:8009",
  grpc.WithInsecure(),
  grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
 )
 if err != nil {
  panic(err)
 }

 c := pb.NewOrderManagementClient(conn)
  
  // ...
}

根據示例再重新看下攔截器接口的參數

🌲 cc *grpc.ClientConn客戶端與服務端的鏈接

這裏的cc就是示例代碼中c := pb.NewOrderManagementClient(conn)conn

🌲 invoker grpc.UnaryInvoker包裝了服務實現

調用完invoker即完成了 RPC,所以我們可以改寫req或者在獲取到reply之後修改響應

streaming interceptors

要實現的接口StreamClientInterceptor

func(ctx context.Context, desc *StreamDesc, cc *ClientConn, 
     method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

和 serve 端類似的參數類似,重點關注下面幾個參數

示例

這裏不再贅述,可以參考服務端攔截器

func orderStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc,
 cc *grpc.ClientConn, method string, streamer grpc.Streamer,
 opts ...grpc.CallOption) (grpc.ClientStream, error) {

 // Pre-processing logic
 s := time.Now()

 cs, err := streamer(ctx, desc, cc, method, opts...)

 // Post processing logic
 log.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))

 return cs, err
}

func main() {
 conn, err := grpc.Dial("127.0.0.1:8009",
  grpc.WithInsecure(),
  grpc.WithStreamInterceptor(orderStreamClientInterceptor),
 )
 if err != nil {
  panic(err)
 }

 c := pb.NewOrderManagementClient(conn)
  
  // ...
}

如何記錄或者修改流攔截器的請求響應數據?

和服務端stream interceptor同樣的道理,通過包裝ClientStream即可做到

// SendMsg method call.
type wrappedStream struct {
 method string
 grpc.ClientStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
 err := w.ClientStream.RecvMsg(m)

 log.Printf("method: %s, res: %s\n", w.method, m)

 return err
}

func (w *wrappedStream) SendMsg(m interface{}) error {
 err := w.ClientStream.SendMsg(m)

 log.Printf("method: %s, req: %s\n", w.method, m)

 return err
}

func newWrappedStream(method string, s grpc.ClientStream) *wrappedStream {
 return &wrappedStream{
  method,
  s,
 }
}

func orderStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc,
 cc *grpc.ClientConn, method string, streamer grpc.Streamer,
 opts ...grpc.CallOption) (grpc.ClientStream, error) {

 // Pre-processing logic
 s := time.Now()

 cs, err := streamer(ctx, desc, cc, method, opts...)

 // Post processing logic
 log.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))

 return newWrappedStream(method, cs), err
}

攔截器鏈

服務器只能配置一個 unary interceptorstream interceptor,否則會報錯,客戶端也是,雖然不會報錯,但是隻有最後一個才起作用。

// 服務端攔截器
s := grpc.NewServer(
  grpc.UnaryInterceptor(orderUnaryServerInterceptor),
  grpc.StreamInterceptor(orderStreamServerInterceptor),
)
// 客戶端攔截器
conn, err := grpc.Dial("127.0.0.1:8009",
  grpc.WithInsecure(),
  grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
  grpc.WithStreamInterceptor(orderStreamClientInterceptor),
)

如果你想配置多個,可以使用攔截器鏈或者自己實現一個。

// 服務端攔截器
s := grpc.NewServer(
  grpc.ChainUnaryInterceptor(
    orderUnaryServerInterceptor1,
    orderUnaryServerInterceptor2,
  ),
  grpc.ChainStreamInterceptor(
    orderServerStreamInterceptor1,
    orderServerStreamInterceptor2,
  ),
)
// 客戶端攔截器
conn, err := grpc.Dial("127.0.0.1:8009",
  grpc.WithInsecure(),
  grpc.WithChainUnaryInterceptor(
   orderUnaryClientInterceptor1,
      orderUnaryClientInterceptor2,
  ),
  grpc.WithChainStreamInterceptor(
   orderStreamClientInterceptor1,
      orderStreamClientInterceptor2,
  ),
)

生態

除了可以自己實現攔截器外,gRPC 生態也提供了一系列的開源的攔截器可供使用,覆蓋權限、日誌、監控等諸多方面

https://github.com/grpc-ecosystem/go-grpc-middleware

Auth

Logging

Monitoring

Client

Server

示例代碼

https://github.com/liangwt/grpc-example/tree/main/06-interceptors

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/nsMcLpCe5WdJ5CCSlIh1uQ