Go 語言,gRPC 的使用瞭解 -- 下
書接上文,我們繼續實現剩餘的兩種方式 -- 客戶端流式 RPC、雙向流式 RPC。
Client-side streaming RPC:客戶端流式 RPC、
客戶端流式 RPC,單向流,客戶端通過流式發起多次 RPC 請求到服務端,服務端發起一次響應給客戶端
Proto :
syntax = "proto3";
package proto;
message String {
string value = 1;
}
service HelloService {
rpc Hello (stream String) returns (String){};
}
server:
package main
import (
"google.golang.org/grpc"
"io"
"log"
"net"
pb "rpc/proto" // 設置引用別名
)
// HelloServiceImpl 定義我們的服務
type HelloServiceImpl struct{}
//實現Hello方法
func (p *HelloServiceImpl) Hello(stream pb.HelloService_HelloServer) error {
for {
resp, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.String{Value:"say.hello"})
}
if err != nil {
return err
}
log.Printf("resp: %v", resp)
}
return nil
}
func main() {
// 新建gRPC服務器實例
grpcServer := grpc.NewServer()
// 在gRPC服務器註冊我們的服務
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
log.Println(" net.Listing...")
//用服務器 Serve() 方法以及我們的端口信息區實現阻塞等待,直到進程被殺死或者 Stop() 被調用
err = grpcServer.Serve(lis)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
如上,我們對每一個 Recv 都進行了處理,當發現 io.EOF (流關閉) 後,需要通過 stream.SendAndClose 方法將最終的響應結果發送給客戶端,同時關閉正在另外一側等待的 Recv。
client:
package main
import (
"context"
"google.golang.org/grpc"
"log"
pb "rpc/proto" // 設置引用別名
)
// SayHello 調用服務端的 Hello 方法
func SayHello(client pb.HelloServiceClient, r *pb.String) error {
stream, _ := client.Hello(context.Background())
for n := 0; n < 6; n++ {
_ = stream.Send(r)
}
resp, _ := stream.CloseAndRecv()
log.Printf("resp err: %v", resp)
return nil
}
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal("dialing err:", err)
}
defer conn.Close()
// 建立gRPC連接
client := pb.NewHelloServiceClient(conn)
// 創建發送結構體
req := pb.String{
Value: "stream server grpc ",
}
SayHello(client, &req)
}
在 Server 端的 stream.CloseAndRecv,與 Client 端 stream.SendAndClose 是配套使用的方法。
開啓服務器端,開啓客戶端。執行結果如下:
$ go run server.go
2021/11/17 13:26:34 net.Listing...
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
$ go run client.go
2021/11/17 13:26:44 resp err: value:"say.hello"
Bidirectional streaming RPC:雙向流式 RPC
雙向流式 RPC,由客戶端以流式的方式發起請求,服務端也以流式的方式響應請求。
首個請求一定是 Client 發起,但具體交互方式(誰先誰後、一次發多少、響應多少、什麼時候關閉)根據程序編寫的方式來確定(可以結合協程)。
Proto :
syntax = "proto3";
package proto;
message String {
string value = 1;
}
service HelloService {
rpc Hello (stream String) returns (stream String){};
}
server:
package main
import (
"google.golang.org/grpc"
"io"
"log"
"net"
pb "rpc/proto" // 設置引用別名
)
// HelloServiceImpl 定義我們的服務
type HelloServiceImpl struct{}
//實現Hello方法
func (p *HelloServiceImpl) Hello(stream pb.HelloService_HelloServer) error {
for {
_ = stream.Send(&pb.String{Value: "say.hello"})
resp, err := stream.Recv()
//接收完了返回
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("resp: %v", resp)
}
}
func main() {
// 新建gRPC服務器實例
grpcServer := grpc.NewServer()
// 在gRPC服務器註冊我們的服務
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
log.Println(" net.Listing...")
err = grpcServer.Serve(lis)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
client:
package main
import (
"context"
"google.golang.org/grpc"
"io"
"log"
pb "rpc/proto" // 設置引用別名
)
// SayHello 調用服務端的 Hello 方法
func SayHello(client pb.HelloServiceClient, r *pb.String) error {
stream, _ := client.Hello(context.Background())
for n := 0; n <= 3; n++ {
_ = stream.Send(r)
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp err: %v", resp)
}
_ = stream.CloseSend()
return nil
}
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal("dialing err:", err)
}
defer conn.Close()
// 建立gRPC連接
client := pb.NewHelloServiceClient(conn)
// 創建發送結構體
req := pb.String{
Value: "stream server grpc ",
}
SayHello(client, &req)
}
服務端在循環中接收客戶端發來的數據,如果遇到 io.EOF 表示客戶端流被關閉,如果函數退出表示服 務端流關閉。生成返回的數據通過流發送給客戶端,雙向流數據的發送和接收都是完全獨立的行爲。需 要注意的是,發送和接收的操作並不需要一一對應,用戶可以根據真實場景進行組織代碼。
開啓服務器端,開啓客戶端。執行結果如下:
$ go run server.go
2021/11/17 15:46:10 net.Listing...
2021/11/17 15:46:19 resp: value:"stream server grpc "
2021/11/17 15:46:19 resp: value:"stream server grpc "
2021/11/17 15:46:19 resp: value:"stream server grpc "
2021/11/17 15:46:19 resp: value:"stream server grpc "
$ go run client.go
2021/11/17 15:46:19 resp err: value:"say.hello"
2021/11/17 15:46:19 resp err: value:"say.hello"
2021/11/17 15:46:19 resp err: value:"say.hello"
2021/11/17 15:46:19 resp err: value:"say.hello"
參考資料:
https://golang2.eddycjy.com/posts/ch3/03-simple-grpc/
https://www.cnblogs.com/FireworksEasyCool/p/12693749.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Z0NJJzMxcRZ1zKurAMyEBg