微服務基本功:啥是 Oneshot?

Oneshot 在英文中是 “一擊” 或“一槍”的意思, 形容詞的意思是“一次性的”,名詞的意思是“一次性用品”,在 Rust 生態圈中嚐嚐用作一次性使用的併發原語,在 Go 生態圈中很少使用。

這次我把Oneshot的概念引入到微服務框架裏面,用來指代客戶端對服務器一次性的調用,不需要等待服務器的返回。

爲啥要引入 Oneshot ?

在微服務框架中,客戶端調用服務端的時候,一般都是需要等待服務端的返回的,這樣才能保證調用的結果是正確的,但是有時候,我們並不關心服務端的返回,比如,我們只需要把客戶端的當前日誌上傳都服務器,傳失敗也沒關係,服務器沒有正確保存也無所謂,因爲下一秒可能新的日誌就會上傳了,客戶端丟一次兩次數據不會影響業務,在這種情況下,我們就可以使用 Oneshot,這樣可以減少客戶端的等待時間,提高客戶端的吞吐量。

比如在我們的監控系統中,容許客戶端丟失一些數據,因爲我們的監控系統是基於時間序列的,如果客戶端丟失了一個點數據,我們也不太關心,因爲我們關注的是監控的趨勢,這要保證一段時間內監控曲線能夠畫出來,看到變化趨勢就好。

在 rpcx 的實現中,它的協議已經支持Oneway的方式了,它的實際功能就是Oneshot的功能,只不過當時我還接觸到Oneshot的概念,借鑑的是微博 Motan 架構的Oneway的概念。但是在 rpcx 的客戶端中,沒有很好的專門爲Oneshot設計的接口,你可能需要傳入給XClient.Go一個空的 reply, 隱式的表明這次調用Oneway方式。這一次我需要在工作中明確要使用Oneshot的功能,所以我在 rpcx 的客戶端中加入了Oneshot的支持。

其實修改起來也特別的簡單,藉助於 rpcx 靈活的架構和Oneway方式的已經支持,我們可以很容易的實現Oneshot的功能。

首先我們爲XClient接口增加一個Oneshot方法的定義:

type XClient interface {
 // 新增加的Oneshot方法
 Oneshot(ctx context.Context, serviceMethod string, args interface{}) error

 Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}done chan *Call) (*Call, error)
 Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
 Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
 Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
 Inform(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) ([]Receipt, error)
 SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
 SendFile(ctx context.Context, fileName string, rateInBytesPerSecond int64, meta map[string]string) error
 DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error
 Stream(ctx context.Context, meta map[string]string) (net.Conn, error)
 Close() error
}

Oneshot方法類似Call方法,你需要傳入上下文ctx, 要調用的方法名serviceMethod和參數args,但是不需要傳入reply參數,因爲Oneshot不需要等待服務器的返回。如果發送失敗,會返回一個 error。

然後我們在XClient的實現類xClient中實現Oneshot方法:

// Oneshot invokes the named function, ** DOEST NOT ** wait for it to complete, and returns immediately.
func (c *xClient) Oneshot(ctx context.Context, serviceMethod string, args interface{}) error {
 _, err := c.Go(ctx, serviceMethod, args, nil, nil)

 return err
}

其實就是一行,利用Go方法就實現了OneshotOneshot調用Go方法是,傳入的reply的值爲 nil, rpcx 就聰明的知道客戶端不需要返回值,然後把消息標記成Oneway模式,不需要等待服務器的返回,所以這個方法也不需要返回的*Call, 一切都不需要。

一個例子

既然實現了一個新的功能, 那麼我們一般的操作就是會在項目 rpcx-examples[1] 中增加這個功能的例子。

服務

服務器你可以實現一個Onershot的服務器,也就是不需要設置Reply, 你也可以設置Reply,但是服務器端不會把這個Reply返回給客戶端,而是直接丟棄了。

這裏我們實現一個既支持Oneshot又支持普通調用的服務器,它的功能是計算兩個數的乘積,給客戶端返回結果還是不返回結果,依賴客戶端的調用方式。

這個例子也是我們衆多的例子中同樣的一個服務端:

package main

import (
 "context"
 "flag"
 "fmt"

 example "github.com/rpcxio/rpcx-examples"
 "github.com/smallnest/rpcx/server"
)

var (
 addr = flag.String("addr""localhost:8972""server address")
)

// 乘法服務
type Arith struct{}

func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
 reply.C = args.A * args.B
 fmt.Println("C=", reply.C)
 return nil
}

func main() {
 flag.Parse()

    // 創建一個新的rpcx服務
 s := server.NewServer()
    // 註冊乘法服務
 s.RegisterName("Arith", new(Arith)"")
    // 開始服務
 err := s.Serve("tcp", *addr)
 if err != nil {
  panic(err)
 }
}

客戶端

客戶端的代碼也很簡單,我們可以實現一個Oneshot的客戶端, 它把兩個乘數扔給服務器,不管服務器的計算和計算結果。

package main

import (
 "context"
 "flag"
 "log"
 "time"

 "github.com/smallnest/rpcx/protocol"

 example "github.com/rpcxio/rpcx-examples"
 "github.com/smallnest/rpcx/client"
)

var (
    // 服務地址
 addr = flag.String("addr""localhost:8972""server address")
)

func main() {
 flag.Parse()

    // 不需要註冊中心,直連模式
 d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
 opt := client.DefaultOption
 opt.SerializeType = protocol.JSON

    // 創建一個客戶端
 xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
 defer xclient.Close()

 args := example.Args{
  A: 10,
  B: 20,
 }

    // 一直髮送乘法問題給服務器
 for {
  err := xclient.Oneshot(context.Background()"Mul", args)
  if err != nil {
   log.Fatalf("failed to call: %v", err)
  }

  log.Printf("send the problem %d * %d to server", args.A, args.B)
  time.Sleep(time.Second)
 }

}

參考資料

[1]

rpcx-examples: https://github.com/rpcxio/rpcx-examples

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/1KPQuHMJ-IiI7Vv_xfWh3w