Go 語言,gRPC 的使用瞭解 -- 下

書接上文,我們繼續實現剩餘的兩種方式 -- 客戶端流式 RPC、雙向流式 RPC。

Client-side streaming RPC:客戶端流式 RPC、

客戶端流式 RPC,單向流,客戶端通過流式發起多次 RPC 請求到服務端,服務端發起一次響應給客戶端

Proto :

syntax = "proto3";

package proto;

message String {
    string value = 1;
}

service HelloService {
    rpc Hello (stream String) returns (String){};
}

server:

package main

import (
 "google.golang.org/grpc"
 "io"
 "log"
 "net"
 pb "rpc/proto" // 設置引用別名
)

// HelloServiceImpl 定義我們的服務
type HelloServiceImpl struct{}

//實現Hello方法
func (p *HelloServiceImpl) Hello(stream pb.HelloService_HelloServer) error {
 for {
  resp, err := stream.Recv()
  if err == io.EOF {
   return stream.SendAndClose(&pb.String{Value:"say.hello"})
  }
  if err != nil {
   return err
  }

  log.Printf("resp: %v", resp)
 }

 return nil
}

func main() {
 // 新建gRPC服務器實例
 grpcServer := grpc.NewServer()
 // 在gRPC服務器註冊我們的服務
 pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))

 lis, err := net.Listen("tcp"":1234")
 if err != nil {
  log.Fatal(err)
 }
 log.Println(" net.Listing...")
 //用服務器 Serve() 方法以及我們的端口信息區實現阻塞等待,直到進程被殺死或者 Stop() 被調用
 err = grpcServer.Serve(lis)
 if err != nil {
  log.Fatalf("grpcServer.Serve err: %v", err)
 }
}

如上,我們對每一個 Recv 都進行了處理,當發現 io.EOF (流關閉) 後,需要通過 stream.SendAndClose 方法將最終的響應結果發送給客戶端,同時關閉正在另外一側等待的 Recv。

client:

package main

import (
 "context"
 "google.golang.org/grpc"
 "log"
 pb "rpc/proto" // 設置引用別名
)

// SayHello 調用服務端的 Hello 方法
func SayHello(client pb.HelloServiceClient, r *pb.String) error {
 stream, _ := client.Hello(context.Background())
 for n := 0; n < 6; n++ {
  _ = stream.Send(r)
 }
 resp, _ := stream.CloseAndRecv()

 log.Printf("resp err: %v", resp)
 return nil
}

func main() {
 conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
 if err != nil {
  log.Fatal("dialing err:", err)
 }
 defer conn.Close()

 // 建立gRPC連接
 client := pb.NewHelloServiceClient(conn)

 // 創建發送結構體
 req := pb.String{
  Value: "stream server grpc ",
 }
 SayHello(client, &req)
}

在 Server 端的 stream.CloseAndRecv,與 Client 端 stream.SendAndClose 是配套使用的方法。

開啓服務器端,開啓客戶端。執行結果如下:

$ go run server.go
2021/11/17 13:26:34  net.Listing...
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
2021/11/17 13:26:44 resp: value:"stream server grpc "
$ go run client.go
2021/11/17 13:26:44 resp err: value:"say.hello"

Bidirectional streaming RPC:雙向流式 RPC

雙向流式 RPC,由客戶端以流式的方式發起請求,服務端也以流式的方式響應請求。

首個請求一定是 Client 發起,但具體交互方式(誰先誰後、一次發多少、響應多少、什麼時候關閉)根據程序編寫的方式來確定(可以結合協程)。

Proto :

syntax = "proto3";

package proto;

message String {
    string value = 1;
}

service HelloService {
    rpc Hello (stream String) returns (stream String){};
}

server:

package main

import (
 "google.golang.org/grpc"
 "io"
 "log"
 "net"
 pb "rpc/proto" // 設置引用別名
)

// HelloServiceImpl 定義我們的服務
type HelloServiceImpl struct{}

//實現Hello方法
func (p *HelloServiceImpl) Hello(stream pb.HelloService_HelloServer) error {
 for {
  _ = stream.Send(&pb.String{Value: "say.hello"})

  resp, err := stream.Recv()
  //接收完了返回
  if err == io.EOF {
   return nil
  }
  if err != nil {
   return err
  }
  log.Printf("resp: %v", resp)
 }
}

func main() {
 // 新建gRPC服務器實例
 grpcServer := grpc.NewServer()
 // 在gRPC服務器註冊我們的服務
 pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))

 lis, err := net.Listen("tcp"":1234")
 if err != nil {
  log.Fatal(err)
 }
 log.Println(" net.Listing...")
 err = grpcServer.Serve(lis)
 if err != nil {
  log.Fatalf("grpcServer.Serve err: %v", err)
 }
}

client:

package main

import (
 "context"
 "google.golang.org/grpc"
 "io"
 "log"
 pb "rpc/proto" // 設置引用別名
)

// SayHello 調用服務端的 Hello 方法
func SayHello(client pb.HelloServiceClient, r *pb.String) error {
 stream, _ := client.Hello(context.Background())
 for n := 0; n <= 3; n++ {
  _ = stream.Send(r)
  resp, err := stream.Recv()
  if err == io.EOF {
   break
  }
  if err != nil {
   return err
  }

  log.Printf("resp err: %v", resp)
 }

 _ = stream.CloseSend()

 return nil
}


func main() {
 conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
 if err != nil {
  log.Fatal("dialing err:", err)
 }
 defer conn.Close()

 // 建立gRPC連接
 client := pb.NewHelloServiceClient(conn)

 // 創建發送結構體
 req := pb.String{
  Value: "stream server grpc ",
 }
 SayHello(client, &req)
}

服務端在循環中接收客戶端發來的數據,如果遇到 io.EOF 表示客戶端流被關閉,如果函數退出表示服 務端流關閉。生成返回的數據通過流發送給客戶端,雙向流數據的發送和接收都是完全獨立的行爲。需 要注意的是,發送和接收的操作並不需要一一對應,用戶可以根據真實場景進行組織代碼。

開啓服務器端,開啓客戶端。執行結果如下:

$ go run server.go
2021/11/17 15:46:10  net.Listing...
2021/11/17 15:46:19 resp: value:"stream server grpc "
2021/11/17 15:46:19 resp: value:"stream server grpc "
2021/11/17 15:46:19 resp: value:"stream server grpc "
2021/11/17 15:46:19 resp: value:"stream server grpc "
$ go run client.go
2021/11/17 15:46:19 resp err: value:"say.hello"
2021/11/17 15:46:19 resp err: value:"say.hello"
2021/11/17 15:46:19 resp err: value:"say.hello"
2021/11/17 15:46:19 resp err: value:"say.hello"

參考資料:

https://golang2.eddycjy.com/posts/ch3/03-simple-grpc/

https://www.cnblogs.com/FireworksEasyCool/p/12693749.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Z0NJJzMxcRZ1zKurAMyEBg