gRPC 入門指南 — 雙向流式 RPC(四)

你好,我是 Seekload!

前言

前一篇文章我們學習了客戶端流式 RPC,客戶端多次向服務端發送數據,發送結束之後,由服務端返回一個響應。與服務端流式 RPC 類似,都只支持單項連續發送數據,今天我們要來學習雙向流式 RPC 支持通信雙方同時多次發送或接收數據。如下如所示:

新建並編譯 proto 文件

新建 bidirectional_stream.proto 文件:

syntax = "proto3";

package proto;

// 定義流式請求信息
message StreamRequest{
  // 參數類型 參數名稱 標識號
  string data = 1;
}

// 定義流響應信息
message StreamResponse{
  int32 code = 1;
  string value = 2;
}

// 定義我們的服務(可以定義多個服務,每個服務可以定義多個接口)
service StreamService{
  // 雙向流RPC,需要在請求、響應數據前加stream
  rpc Record(stream StreamRequest) returns (stream StreamResponse){};
}

雙向流式 RPC,定義方法時需要在請求值和返回值之前加上 stream。

進入 bidirectional_stream.proto 所在的目錄,使用如下命令編譯文件

protoc --go_out=plugins=grpc:. bidirectional_stream.proto

執行完成之後會生成 bidirectional_stream.pb.go 文件。

創建 server 端

package main

import (
 pb "go-grpc-example/4-bidirectional_stream_rpc/proto"
 "google.golang.org/grpc"
 "io"
 "log"
 "net"
 "strconv"
 "time"
)

const (
 Address string = ":8000"
 Network string = "tcp"
)

// 定義我們的服務
type StreamService struct{}

// 實現 Record() 方法
func (s *StreamService) Record(srv pb.StreamService_RecordServer) error {
 n := 1
 for {
  // 接收數據
  req, err := srv.Recv()
  if err == io.EOF {
   return nil
  }
  if err != nil {
   log.Fatalf("stream get from client err: %v", err)
   return err
  }
  // 發送數據
  err = srv.Send(&pb.StreamResponse{
   Code:  int32(n),
   Value: "This is the " + strconv.Itoa(n) + " message",
  })
  if err != nil {
   log.Fatalf("stream send to client err: %v", err)
   return err
  }
  n++
  log.Println("stream get from client: ", req.Data)
  time.Sleep(1 * time.Second)
 }
 return nil
}

func main() {
 // 1.監聽端口
 listener, err := net.Listen(Network, Address)
 if err != nil {
  log.Fatalf("listener err: %v", err)
 }
 log.Println(Address + " net.Listing...")

 // 2.實例化gRPC實例
 grpcServer := grpc.NewServer()

 // 3.註冊我們的服務
 pb.RegisterStreamServiceServer(grpcServer, &StreamService{})

 // 4.啓動gRPC服務端
 err = grpcServer.Serve(listener)
 if err != nil {
  log.Fatalf("grpc server err: %v", err)
 }
}

在實現的 Record() 方法中,for() 循環裏面讀取客戶端發送的消息並返回一個響應數據。

運行服務端:

go run server.go

輸出:
:8000  net listening...

創建 client 端

package main

import (
 "context"
 pb "go-grpc-example/4-bidirectional_stream_rpc/proto"
 "google.golang.org/grpc"
 "io"
 "log"
 "strconv"
 "time"
)

const Address = ":8000"

func main() {
 // 1.連接服務端
 conn, err := grpc.Dial(Address, grpc.WithInsecure())
 if err != nil {
  log.Fatalf("grpc conn err: %v", err)
 }
 defer conn.Close()

 // 2.創建gRPC客戶端
 grpcClient := pb.NewStreamServiceClient(conn)

 // 3.調用 Record() 方法獲取流
 stream, err := grpcClient.Record(context.Background())
 if err != nil {
  log.Fatalf("call record err: %v", err)
 }

 for i := 0; i < 5; i++ {
  // 4.發送數據
  err := stream.Send(&pb.StreamRequest{
   Data: strconv.Itoa(i),
  })
  if err != nil {
   log.Fatalf("stream send to server err: %v", err)
  }
  // 5.接收服務端發送過來的數據
  resp, err := stream.Recv()
  if err == io.EOF {
   break
  }
  if err != nil {
   log.Fatalf("stream get from server err: %v", err)
  }
  log.Printf("stream get from server,code:%v,value:%v", resp.GetCode(), resp.Value)
  time.Sleep(1 * time.Second)
 }
 // 6.關閉流
 err = stream.CloseSend()
 if err != nil {
  log.Fatalf("close stream err:%v", err)
 }
}

客戶端代碼,在 for() 循環裏面向服務端發送了 5 次消息,並接收服務端返回的數據,5 次數據交互之後調用 CloseSend() 關閉流。

運行客戶端:

go run client.go

客戶端輸出:

stream get from server,code:1,value:This is the 1 message
stream get from server,code:2,value:This is the 2 message
stream get from server,code:3,value:This is the 3 message
stream get from server,code:4,value:This is the 4 message
stream get from server,code:5,value:This is the 5 message

服務端輸出:

stream get from client:  0
stream get from client:  1
stream get from client:  2
stream get from client:  3
stream get from client:  4

觀察仔細的同學會注意到,客戶端和服務端是交替輸出的。

總結

這篇文章我們簡單介紹了 gRPC 的雙向流式 RPC,支持通信雙方同時多次發送或接收數據。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/a5QxxEoyVwq-lnJR8dLlAA