寫給 go 開發者的 gRPC 教程 - 攔截器
gRPC
的攔截器和其他框架的攔截器(也稱 middleware)作用是一樣的。利用攔截器我們可以在不侵入業務邏輯的前提下修改或者記錄服務端或客戶端的請求與響應,利用攔截器我們可以實現諸如日誌記錄、權限認證、限流等諸多功能
上一篇提到gRPC
的通信模式分爲unary
和streaming
幾種模式,攔截器也分爲兩種:unary interceptors
和streaming interceptors
,兩種攔截器可以分別應用在服務端和客戶端,所以 gRPC 總共爲我們提供了四種攔截器。它們已經被定義成了 go 中的接口,我們創建的攔截器只要實現這些接口即可
服務端攔截器
服務端的攔截器從請求開始按順序執行攔截器,在執行完對應 RPC 的邏輯之後,再按反向的順序執行攔截器中對響應的處理邏輯
unary interceptors
對於unary
服務的攔截器只需實現UnaryServerInterceptor
接口即可
func(ctx context.Context, req interface{},
info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
-
ctx context.Context
:單個請求的上下文 -
req interface{}
:RPC 服務的請求結構體 -
info *UnaryServerInfo
:RPC 的服務信息 -
handler UnaryHandler
:它包裝了服務實現,通過調用它我們可以完成 RPC 並獲取到響應
參數看不懂沒關係,我們來看一個例子
示例
// 實現 unary interceptors
func orderUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// Pre-processing logic
s := time.Now()
// Invoking the handler to complete the normal execution of a unary RPC.
m, err := handler(ctx, req)
// Post processing logic
log.Printf("Method: %s, req: %s, resp: %s, latency: %s\n",
info.FullMethod, req, m, time.Now().Sub(s))
return m, err
}
func main() {
s := grpc.NewServer(
// 使用 unary interceptors
grpc.UnaryInterceptor(orderUnaryServerInterceptor),
)
pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})
// ...
}
完整代碼參考:https://github.com/liangwt/grpc-example/tree/main/06-interceptors/server。下同
假設我們的客戶端請求了GetOrder
,根據示例再重新看下攔截器接口的每一個參數
🌲 req interface{}
RPC 服務的請求結構體,對於GetOrder
來說就是orderId *wrapperspb.StringValue
🌲 info *UnaryServerInfo
包含兩個字段:
FullMethod
是請求的 method 名字(例如/ecommerce.OrderManagement/getOrder
);
Server
就是服務實現(就是示例RegisterOrderManagementServer
中的&OrderManagementImpl{}
)
🌲 handler
包裝了服務實現
所以在調用它之前我們可以進行改寫req
或ctx
、記錄邏輯開始時間等操作
調用完handler
即完成了 RPC 並獲取到響應,我們不僅可以記錄響應還可以改寫響應
總結
這張圖大致展示了UnaryServerInterceptor
接口的每個參數的含義
streaming interceptors
對於stream
服務的攔截器只要實現StreamServerInterceptor
接口即可。它適用於我們上一篇介紹的
-
服務器端流式 RPC
-
客戶端流式 RPC
-
雙向流式 RPC
func(srv interface{}, ss ServerStream,
info *StreamServerInfo, handler StreamHandler) error
-
srv interface{}
:服務實現 -
ss ServerStream
:服務端視角的流。怎麼理解呢?無論是哪一種流式 RPC 對於服務端來說發送(SendMsg
)就代表着響應數據,接收(RecvMsg
)就代表着請求數據,不同的流式 RPC 的區別就在於是多次發送數據(服務器端流式 RPC)還是多次接收數據(客戶端流式 RPC)或者兩者均有(雙向流式 RPC)。因此僅使用這一個抽象就代表了所有的流式 RPC 場景 -
info *StreamServerInfo
:RPC 的服務信息 -
handler StreamHandler
:它包裝了服務實現,通過調用它我們可以完成 RPC
示例
我們來看一個例子
func orderStreamServerInterceptor(srv interface{},
ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Pre-processing logic
s := time.Now()
// Invoking the StreamHandler to complete the execution of RPC invocation
err := handler(srv, ss)
// Post processing logic
log.Printf("Method: %s, latency: %s\n", info.FullMethod, time.Now().Sub(s))
return err
}
func main() {
s := grpc.NewServer(
grpc.StreamInterceptor(orderStreamServerInterceptor),
)
pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})
//...
}
根據示例再重新看下攔截器接口的參數
🌲 srv interface{}
服務實現(就是示例RegisterOrderManagementServer
中的&OrderManagementImpl{}
)
🌲 ss grpc.ServerStream
服務端發送和接收數據的接口,注意它是一個接口
🌲 info *grpc.StreamServerInfo
包含三個字段:
FullMethod
是請求的 method 名字(例如/ecommerce.OrderManagement/updateOrders
);
IsClientStream
是否是客戶端流
IsServerStream
是否是服務端流
🌲 handler
包裝了服務實現
所以在調用它之前我們可以進行改寫數據流、記錄邏輯開始時間等操作
調用完handler
即完成了 RPC,因爲是流式調用所以不會返回響應數據,只有error
流式攔截器既沒有請求字段,handler
也不會返回響應,該如何記錄、修改請求響應呢?
如果想劫持流數據,答案就在ss ServerStream
。再重複一遍它的含義:服務端視角的流,它是一個接口。無論是哪一種流式 RPC 對於服務端來說發送(SendMsg
)就代表着響應數據,接收(RecvMsg
)就代表着請求數據,不同的流式 RPC 的區別就在於是多次發送數據(服務器端流式 RPC)還是多次接收數據(客戶端流式 RPC)或者兩者均有(雙向流式 RPC)。因此可以對ss
進行包裝,只要傳入handler
的類型實現ServerStream
即可
// SendMsg method call.
type wrappedStream struct {
Recv []interface{}
Send []interface{}
grpc.ServerStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
w.Recv = append(w.Recv, m)
return err
}
func (w *wrappedStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
w.Send = append(w.Send, m)
return err
}
func newWrappedStream(s grpc.ServerStream) *wrappedStream {
return &wrappedStream{
make([]interface{}, 0),
make([]interface{}, 0),
s,
}
}
func orderStreamServerInterceptor(srv interface{},
ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Pre-processing logic
s := time.Now()
// Invoking the StreamHandler to complete the execution of RPC invocation
nss := newWrappedStream(ss)
err := handler(srv, nss)
// Post processing logic
log.Printf("Method: %s, req: %+v, resp: %+v, latency: %s\n",
info.FullMethod, nss.Recv, nss.Send, time.Now().Sub(s))
return err
}
客戶端攔截器
客戶端攔截器和服務端攔截器類似,從請求開始按順序執行攔截器,在獲取到服務端響應之後,再按反向的順序執行攔截器中對響應的處理邏輯
unary interceptors
client 端要實現UnaryClientInterceptor
接口實現的接口如下
func(ctx context.Context, method string, req, reply interface{},
cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
你可以在調用遠程函數前攔截 RPC,通過獲取 RPC 相關信息,如參數,上下文,函數名,請求等,你甚至可以修改原始的遠程調用
-
ctx context.Context
:單個請求的上下文 -
method string
:請求的 method 名字(例如/ecommerce.OrderManagement/getOrder
) -
req, reply interface{}
:請求和響應數據 -
cc *ClientConn
:客戶端與服務端的鏈接 -
invoker UnaryInvoker
:通過調用它我們可以完成 RPC 並獲取到響應 -
opts ...CallOption
:RPC 調用的所有配置項,包含設置到 conn 上的,也包含配置在每一個調用上的
示例
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// Pre-processor phase
s := time.Now()
// Invoking the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
// Post-processor phase
log.Printf("method: %s, req: %s, resp: %s, latency: %s\n",
method, req, reply, time.Now().Sub(s))
return err
}
func main() {
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
)
if err != nil {
panic(err)
}
c := pb.NewOrderManagementClient(conn)
// ...
}
根據示例再重新看下攔截器接口的參數
🌲 cc *grpc.ClientConn
客戶端與服務端的鏈接
這裏的cc
就是示例代碼中c := pb.NewOrderManagementClient(conn)
的conn
🌲 invoker grpc.UnaryInvoker
包裝了服務實現
調用完invoker
即完成了 RPC,所以我們可以改寫req
或者在獲取到reply
之後修改響應
streaming interceptors
要實現的接口StreamClientInterceptor
func(ctx context.Context, desc *StreamDesc, cc *ClientConn,
method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
和 serve 端類似的參數類似,重點關注下面幾個參數
-
cs ClientStream
:客戶端視角的流。類比服務端的ss ServerStream
,無論是哪一種流式 RPC 對於客戶端來說發送(SendMsg
)就代表着請求數據,接收(RecvMsg
)就代表着響應數據(正好和服務端是反過來的) -
streamer Streamer
:完成 RPC 請求的調用
示例
這裏不再贅述,可以參考服務端攔截器
func orderStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Pre-processing logic
s := time.Now()
cs, err := streamer(ctx, desc, cc, method, opts...)
// Post processing logic
log.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))
return cs, err
}
func main() {
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithStreamInterceptor(orderStreamClientInterceptor),
)
if err != nil {
panic(err)
}
c := pb.NewOrderManagementClient(conn)
// ...
}
如何記錄或者修改流攔截器的請求響應數據?
和服務端stream interceptor
同樣的道理,通過包裝ClientStream
即可做到
// SendMsg method call.
type wrappedStream struct {
method string
grpc.ClientStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)
log.Printf("method: %s, res: %s\n", w.method, m)
return err
}
func (w *wrappedStream) SendMsg(m interface{}) error {
err := w.ClientStream.SendMsg(m)
log.Printf("method: %s, req: %s\n", w.method, m)
return err
}
func newWrappedStream(method string, s grpc.ClientStream) *wrappedStream {
return &wrappedStream{
method,
s,
}
}
func orderStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Pre-processing logic
s := time.Now()
cs, err := streamer(ctx, desc, cc, method, opts...)
// Post processing logic
log.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))
return newWrappedStream(method, cs), err
}
攔截器鏈
服務器只能配置一個 unary interceptor
和 stream interceptor
,否則會報錯,客戶端也是,雖然不會報錯,但是隻有最後一個才起作用。
// 服務端攔截器
s := grpc.NewServer(
grpc.UnaryInterceptor(orderUnaryServerInterceptor),
grpc.StreamInterceptor(orderStreamServerInterceptor),
)
// 客戶端攔截器
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
grpc.WithStreamInterceptor(orderStreamClientInterceptor),
)
如果你想配置多個,可以使用攔截器鏈或者自己實現一個。
// 服務端攔截器
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
orderUnaryServerInterceptor1,
orderUnaryServerInterceptor2,
),
grpc.ChainStreamInterceptor(
orderServerStreamInterceptor1,
orderServerStreamInterceptor2,
),
)
// 客戶端攔截器
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithChainUnaryInterceptor(
orderUnaryClientInterceptor1,
orderUnaryClientInterceptor2,
),
grpc.WithChainStreamInterceptor(
orderStreamClientInterceptor1,
orderStreamClientInterceptor2,
),
)
生態
除了可以自己實現攔截器外,gRPC 生態也提供了一系列的開源的攔截器可供使用,覆蓋權限、日誌、監控等諸多方面
https://github.com/grpc-ecosystem/go-grpc-middleware
Auth
grpc_auth
- a customizable (viaAuthFunc
) piece of auth middleware
Logging
-
grpc_ctxtags
- a library that adds aTag
map to context, with data populated from request body -
grpc_zap
- integration of zap logging library into gRPC handlers. -
grpc_logrus
- integration of logrus logging library into gRPC handlers. -
grpc_kit
- integration of go-kit/log logging library into gRPC handlers. -
grpc_grpc_logsettable
- a wrapper aroundgrpclog.LoggerV2
that allows to replace loggers in runtime (thread-safe).
Monitoring
-
grpc_prometheus
⚡ - Prometheus client-side and server-side monitoring middleware -
otgrpc
⚡ - OpenTracing client-side and server-side interceptors -
grpc_opentracing
- OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags
Client
grpc_retry
- a generic gRPC response code retry mechanism, client-side middleware
Server
-
grpc_validator
- codegen inbound message validation from.proto
options -
grpc_recovery
- turn panics into gRPC errors -
ratelimit
- grpc rate limiting by your own limiter
示例代碼
https://github.com/liangwt/grpc-example/tree/main/06-interceptors
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/nsMcLpCe5WdJ5CCSlIh1uQ