gRPC 入門指南 — 服務端流式 RPC(二)

你好,我是 Seekload!

前言

前一篇文章,我們學習了簡單模式 RPC(Simple RPC),gRPC 服務端和客戶端在通信時始終只有一個請求和一個響應。實際的應用場景中,我們會遇到需要不斷傳輸數據的時候,比如當數據量很大時。這個時候我們就可以使用流式 RPC,可以實現邊傳輸、邊處理數據。這篇文章先介紹下服務端流式 RPC:客戶端一次請求,服務端通過 stream 的方式響應多條數據,如下圖所示:

創建並編譯 proto 文件

新建文件 server_stream.proto

syntax = "proto3";

package proto;

// 定義發送請求信息
message SimpleRequest{
  // 參數類型 參數名稱 標識號
  string data = 1;
}

// 定義流式響應信息
message StreamResponse{
  int32 code = 1;
  string value = 2;
}

// 定義我們的服務(可以定義多個服務,每個服務可以定義多個接口)
service StreamService{
  // 服務端流式RPC,需要在響應數據前加stream
  rpc List(SimpleRequest) returns (stream StreamResponse){};
}

服務端流式 RPC,定義方法時需要在返回值之前加上 stream。

進入 server_stream.proto 所在的目錄,使用如下命令編譯文件

protoc --go_out=plugins=grpc:. server_stream.proto

執行完成之後會生成 server_stream.pb.go 文件。

創建 server 端

package main

import (
 pb "go-grpc-example/2-server_stream_rpc/proto"
 "google.golang.org/grpc"
 "log"
 "net"
 "time"
)

const (
 Address string = ":8000"
 Network string = "tcp"
)

// 定義我們的服務
type StreamService struct{}

// 實現List方法
func (s *StreamService) List(req *pb.SimpleRequest, srv pb.StreamService_ListServer) error {
 for i := 0; i < 5; i++ {
  // 向流中發送消息,默認每次發送消息大小爲 math.MaxInt32 byte
  err := srv.Send(&pb.StreamResponse{
   Code:  int32(i),
   Value: req.Data,
  })
  if err != nil {
   return err
  }
  time.Sleep(1 * time.Second)
 }
 return nil
}

func main() {
 // 1.監聽端口
 listener, err := net.Listen(Network, Address)
 if err != nil {
  log.Fatalf("listener err: %v", err)
 }
 log.Println(Address + " net.Listing...")
 // 2.實例化gRPC服務端
 // 默認單次接受消息大小爲 1024*1024*4 字節(4M),發送大小爲 math.MaxInt32 字節
 grpcServer := grpc.NewServer()

 // 3.註冊我們實現的服務 StreamService
 pb.RegisterStreamServiceServer(grpcServer, &StreamService{})

 // 4.啓動gRPC服務端
 err = grpcServer.Serve(listener)
 if err != nil {
  log.Fatalf("grpc server err: %v", err)
 }
}

通過追源代碼可以看到,實例化 gRPC 服務端時,默認情況下默認單次接受消息大小爲 1024 * 1024 * 4 字節(4M),發送大小爲 math.MaxInt32 字節。

const (
 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
 defaultServerMaxSendMessageSize    = math.MaxInt32
)

運行服務端:

go run server.go

輸出:
:8000  net listening...

創建 client 端

package main

import (
 "context"
 pb "go-grpc-example/2-server_stream_rpc/proto"
 "google.golang.org/grpc"
 "io"
 "log"
)

const Address string = ":8000"

func main() {
 // 1.連接服務端
 conn, err := grpc.Dial(Address, grpc.WithInsecure())
 if err != nil {
  log.Fatalf("grpc conn err: %v", err)
 }
 defer conn.Close()

 // 2.創建grpc客戶端
 grpcClient := pb.NewStreamServiceClient(conn)

 // 3.調用服務端提供的服務
 req := &pb.SimpleRequest{
  Data: "Hello,Server",
 }
 stream, err := grpcClient.List(context.Background(), req)
 if err != nil {
  log.Fatalf("call server err,%v", err)
 }
 for {
  // 4.處理服務端發送過來的流信息
  resp, err := stream.Recv()
  if err == io.EOF { // 流是否結束
   break
  }
  if err != nil {
   log.Fatalf("client get stream err:%v", err)
  }
  log.Printf("get from stream server,code:%v,value:%v", resp.GetCode(), resp.GetValue())
 }
}

從代碼可以看出,客戶端發送一次請求,之後在 for 循環裏面不停地從服務端接受信息,只到服務端結束髮送。

運行客戶端:

go run client.go

get from stream server,code:0,value:Hello,Server
get from stream server,code:1,value:Hello,Server
get from stream server,code:2,value:Hello,Server
get from stream server,code:3,value:Hello,Server
get from stream server,code:4,value:Hello,Server

總結

這篇文章主要介紹了服務端流式 RPC 的簡單使用,該模式下客戶端發送一次請求,服務端會發送多次響應數據。

下篇文章我們介紹下客戶端流式 RPC。

對了,看完文章,記得點擊下方的卡片。關注我哦~ 👇👇👇

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/f2-yA_Q9wPOClndKEwakMw