gRPC 入門指南 — 客戶端流式 RPC(三)

你好,我是 Seekload!

前言

前一篇文章我們學習了服務端流式 RPC,客戶端發送一次請求,通過流的方式多次從服務端收到信息。這一節我們來學習下客戶端流式 RPC,該模式與服務端流式 RPC 正好相反,客戶端不斷向服務端發送數據,結束之後,服務端返回一個響應,如下:

新建並編譯 proto 文件

新建 client_stream.proto 文件:

syntax = "proto3";

package proto;

// 定義流式請求信息
message StreamRequest{
  // 參數類型 參數名稱 標識號
  string data = 1;
}

// 定義響應信息
message SimpleResponse{
  int32 code = 1;
  string value = 2;
}

// 定義我們的服務(可以定義多個服務,每個服務可以定義多個接口)
service StreamService{
  // 客戶端流式RPC,需要在請求數據前加stream
  rpc Record(stream StreamRequest) returns (SimpleResponse){};
}

客戶端流式 RPC,定義方法時需要在請求值之前加上 stream。

進入 client_stream.proto 所在的目錄,使用如下命令編譯文件

protoc --go_out=plugins=grpc:. client_stream.proto

執行完成之後會生成 client_stream.pb.go 文件。

創建 server 端

package main

import (
 pb "go-grpc-example/3-client_stream_rpc/proto"
 "google.golang.org/grpc"
 "io"
 "log"
 "net"
)

const (
 Address string = ":8000"
 Network string = "tcp"
)

// 定義我們的服務
type StreamService struct{}

// 實現 Record 方法
func (s *StreamService) Record(srv pb.StreamService_RecordServer) error {
 for {
  // 從流中獲取消息
  req, err := srv.Recv()
  if err == io.EOF {
   // 發送數據並關閉
   return srv.SendAndClose(&pb.SimpleResponse{
    Code:  1,
    Value: "ok",
   })
  }
  if err != nil {
   return err
  }
  log.Printf("get from client:%v", req.Data)
 }
}

func main() {
 // 1.監聽端口
 listener, err := net.Listen(Network, Address)
 if err != nil {
  log.Fatalf("listener err: %v", err)
 }
 log.Println(Address + " net.Listing...")

 // 2.創建gRPC服務端實例
 grpcServer := grpc.NewServer()

 // 3.註冊我們實現的服務 StreamService
 pb.RegisterStreamServiceServer(grpcServer, &StreamService{})

 // 4.啓動gRPC服務端
 err = grpcServer.Serve(listener)
 if err != nil {
  log.Fatalf("grpc server err: %v", err)
 }
}

在實現的 Record() 方法中,可以看到 server 端在 for 循環中不斷從客戶端接收消息,知道接收完畢,服務端返回一個響應。

運行服務端:

go run server.go

輸出:
:8000  net listening...

創建 client 端

package main

import (
 "context"
 pb "go-grpc-example/3-client_stream_rpc/proto"
 "google.golang.org/grpc"
 "log"
 "strconv"
 "time"
)

const Address string = ":8000"

func main() {
 // 1.連接服務端
 conn, err := grpc.Dial(Address, grpc.WithInsecure())
 if err != nil {
  log.Fatalf("grpc conn err: %v", err)
 }
 defer conn.Close()

 // 2.建立gRPC連接
 streamClient := pb.NewStreamServiceClient(conn)

 // 3.調用record,獲取流
 stream, err := streamClient.Record(context.Background())
 if err != nil {
  log.Fatalf("call record err: %v", err)
 }

 for i := 0; i < 5; i++ {
  // 4.向流中發送數據
  err := stream.Send(&pb.StreamRequest{Data: strconv.Itoa(i)})
  if err != nil {
   log.Fatalf("stream request err: %v", err)
  }
  time.Sleep(1 * time.Second)
 }
 // 5.關閉流並獲取返回的消息
 resp, err := stream.CloseAndRecv()
 if err != nil {
  log.Fatalf("client stream close err: %v", err)
 }
 log.Printf("get from server,code:%v,value:%v", resp.GetCode(), resp.GetValue())
}

客戶端代碼,在 for 循環裏面向服務端發送了 5 次消息,接着調用 CloseAndRecv() 關閉流並接收服務端返回的數據。

運行客戶端:

go run client.go

服務端輸出:

get from client:0
get from client:1
get from client:2
get from client:3
get from client:4

服務端輸出之後,客戶端輸出:

get from server,code:1,value:ok

總結

這篇文章主要介紹了客戶端流式 RPC 的簡單使用,該模式下客戶端可以多次向服務端發送數據,數據發送完畢之後,服務端會返回一次響應。下篇文章我們會介紹雙向流式 RPC。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/06IQa335XPXDRa2qoz9mRg