gRPC 入門指南 — 雙向流式 RPC(四)
你好,我是 Seekload!
前言
前一篇文章我們學習了客戶端流式 RPC,客戶端多次向服務端發送數據,發送結束之後,由服務端返回一個響應。與服務端流式 RPC 類似,都只支持單項連續發送數據,今天我們要來學習雙向流式 RPC 支持通信雙方同時多次發送或接收數據。如下如所示:
新建並編譯 proto 文件
新建 bidirectional_stream.proto 文件:
syntax = "proto3";
package proto;
// 定義流式請求信息
message StreamRequest{
// 參數類型 參數名稱 標識號
string data = 1;
}
// 定義流響應信息
message StreamResponse{
int32 code = 1;
string value = 2;
}
// 定義我們的服務(可以定義多個服務,每個服務可以定義多個接口)
service StreamService{
// 雙向流RPC,需要在請求、響應數據前加stream
rpc Record(stream StreamRequest) returns (stream StreamResponse){};
}
雙向流式 RPC,定義方法時需要在請求值和返回值之前加上 stream。
進入 bidirectional_stream.proto 所在的目錄,使用如下命令編譯文件
protoc --go_out=plugins=grpc:. bidirectional_stream.proto
執行完成之後會生成 bidirectional_stream.pb.go 文件。
創建 server 端
package main
import (
pb "go-grpc-example/4-bidirectional_stream_rpc/proto"
"google.golang.org/grpc"
"io"
"log"
"net"
"strconv"
"time"
)
const (
Address string = ":8000"
Network string = "tcp"
)
// 定義我們的服務
type StreamService struct{}
// 實現 Record() 方法
func (s *StreamService) Record(srv pb.StreamService_RecordServer) error {
n := 1
for {
// 接收數據
req, err := srv.Recv()
if err == io.EOF {
return nil
}
if err != nil {
log.Fatalf("stream get from client err: %v", err)
return err
}
// 發送數據
err = srv.Send(&pb.StreamResponse{
Code: int32(n),
Value: "This is the " + strconv.Itoa(n) + " message",
})
if err != nil {
log.Fatalf("stream send to client err: %v", err)
return err
}
n++
log.Println("stream get from client: ", req.Data)
time.Sleep(1 * time.Second)
}
return nil
}
func main() {
// 1.監聽端口
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("listener err: %v", err)
}
log.Println(Address + " net.Listing...")
// 2.實例化gRPC實例
grpcServer := grpc.NewServer()
// 3.註冊我們的服務
pb.RegisterStreamServiceServer(grpcServer, &StreamService{})
// 4.啓動gRPC服務端
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpc server err: %v", err)
}
}
在實現的 Record() 方法中,for() 循環裏面讀取客戶端發送的消息並返回一個響應數據。
運行服務端:
go run server.go
輸出:
:8000 net listening...
創建 client 端
package main
import (
"context"
pb "go-grpc-example/4-bidirectional_stream_rpc/proto"
"google.golang.org/grpc"
"io"
"log"
"strconv"
"time"
)
const Address = ":8000"
func main() {
// 1.連接服務端
conn, err := grpc.Dial(Address, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc conn err: %v", err)
}
defer conn.Close()
// 2.創建gRPC客戶端
grpcClient := pb.NewStreamServiceClient(conn)
// 3.調用 Record() 方法獲取流
stream, err := grpcClient.Record(context.Background())
if err != nil {
log.Fatalf("call record err: %v", err)
}
for i := 0; i < 5; i++ {
// 4.發送數據
err := stream.Send(&pb.StreamRequest{
Data: strconv.Itoa(i),
})
if err != nil {
log.Fatalf("stream send to server err: %v", err)
}
// 5.接收服務端發送過來的數據
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("stream get from server err: %v", err)
}
log.Printf("stream get from server,code:%v,value:%v", resp.GetCode(), resp.Value)
time.Sleep(1 * time.Second)
}
// 6.關閉流
err = stream.CloseSend()
if err != nil {
log.Fatalf("close stream err:%v", err)
}
}
客戶端代碼,在 for() 循環裏面向服務端發送了 5 次消息,並接收服務端返回的數據,5 次數據交互之後調用 CloseSend() 關閉流。
運行客戶端:
go run client.go
客戶端輸出:
stream get from server,code:1,value:This is the 1 message
stream get from server,code:2,value:This is the 2 message
stream get from server,code:3,value:This is the 3 message
stream get from server,code:4,value:This is the 4 message
stream get from server,code:5,value:This is the 5 message
服務端輸出:
stream get from client: 0
stream get from client: 1
stream get from client: 2
stream get from client: 3
stream get from client: 4
觀察仔細的同學會注意到,客戶端和服務端是交替輸出的。
總結
這篇文章我們簡單介紹了 gRPC 的雙向流式 RPC,支持通信雙方同時多次發送或接收數據。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/a5QxxEoyVwq-lnJR8dLlAA