微服務基本功:啥是 Oneshot?
Oneshot 在英文中是 “一擊” 或“一槍”的意思, 形容詞的意思是“一次性的”,名詞的意思是“一次性用品”,在 Rust 生態圈中嚐嚐用作一次性使用的併發原語,在 Go 生態圈中很少使用。
這次我把Oneshot
的概念引入到微服務框架裏面,用來指代客戶端對服務器一次性的調用,不需要等待服務器的返回。
爲啥要引入 Oneshot
?
在微服務框架中,客戶端調用服務端的時候,一般都是需要等待服務端的返回的,這樣才能保證調用的結果是正確的,但是有時候,我們並不關心服務端的返回,比如,我們只需要把客戶端的當前日誌上傳都服務器,傳失敗也沒關係,服務器沒有正確保存也無所謂,因爲下一秒可能新的日誌就會上傳了,客戶端丟一次兩次數據不會影響業務,在這種情況下,我們就可以使用 Oneshot,這樣可以減少客戶端的等待時間,提高客戶端的吞吐量。
比如在我們的監控系統中,容許客戶端丟失一些數據,因爲我們的監控系統是基於時間序列的,如果客戶端丟失了一個點數據,我們也不太關心,因爲我們關注的是監控的趨勢,這要保證一段時間內監控曲線能夠畫出來,看到變化趨勢就好。
在 rpcx 的實現中,它的協議已經支持Oneway
的方式了,它的實際功能就是Oneshot
的功能,只不過當時我還接觸到Oneshot
的概念,借鑑的是微博 Motan 架構的Oneway
的概念。但是在 rpcx 的客戶端中,沒有很好的專門爲Oneshot
設計的接口,你可能需要傳入給XClient.Go
一個空的 reply, 隱式的表明這次調用Oneway
方式。這一次我需要在工作中明確要使用Oneshot
的功能,所以我在 rpcx 的客戶端中加入了Oneshot
的支持。
其實修改起來也特別的簡單,藉助於 rpcx 靈活的架構和Oneway
方式的已經支持,我們可以很容易的實現Oneshot
的功能。
首先我們爲XClient
接口增加一個Oneshot
方法的定義:
type XClient interface {
// 新增加的Oneshot方法
Oneshot(ctx context.Context, serviceMethod string, args interface{}) error
Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Inform(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) ([]Receipt, error)
SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
SendFile(ctx context.Context, fileName string, rateInBytesPerSecond int64, meta map[string]string) error
DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error
Stream(ctx context.Context, meta map[string]string) (net.Conn, error)
Close() error
}
Oneshot
方法類似Call
方法,你需要傳入上下文ctx
, 要調用的方法名serviceMethod
和參數args
,但是不需要傳入reply
參數,因爲Oneshot
不需要等待服務器的返回。如果發送失敗,會返回一個 error。
然後我們在XClient
的實現類xClient
中實現Oneshot
方法:
// Oneshot invokes the named function, ** DOEST NOT ** wait for it to complete, and returns immediately.
func (c *xClient) Oneshot(ctx context.Context, serviceMethod string, args interface{}) error {
_, err := c.Go(ctx, serviceMethod, args, nil, nil)
return err
}
其實就是一行,利用Go
方法就實現了Oneshot
。Oneshot
調用Go
方法是,傳入的reply
的值爲 nil, rpcx 就聰明的知道客戶端不需要返回值,然後把消息標記成Oneway
模式,不需要等待服務器的返回,所以這個方法也不需要返回的*Call
, 一切都不需要。
一個例子
既然實現了一個新的功能, 那麼我們一般的操作就是會在項目 rpcx-examples[1] 中增加這個功能的例子。
服務
服務器你可以實現一個Onershot
的服務器,也就是不需要設置Reply
, 你也可以設置Reply
,但是服務器端不會把這個Reply
返回給客戶端,而是直接丟棄了。
這裏我們實現一個既支持Oneshot
又支持普通調用的服務器,它的功能是計算兩個數的乘積,給客戶端返回結果還是不返回結果,依賴客戶端的調用方式。
這個例子也是我們衆多的例子中同樣的一個服務端:
package main
import (
"context"
"flag"
"fmt"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
// 乘法服務
type Arith struct{}
func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
reply.C = args.A * args.B
fmt.Println("C=", reply.C)
return nil
}
func main() {
flag.Parse()
// 創建一個新的rpcx服務
s := server.NewServer()
// 註冊乘法服務
s.RegisterName("Arith", new(Arith), "")
// 開始服務
err := s.Serve("tcp", *addr)
if err != nil {
panic(err)
}
}
客戶端
客戶端的代碼也很簡單,我們可以實現一個Oneshot
的客戶端, 它把兩個乘數扔給服務器,不管服務器的計算和計算結果。
package main
import (
"context"
"flag"
"log"
"time"
"github.com/smallnest/rpcx/protocol"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
// 服務地址
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
// 不需要註冊中心,直連模式
d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
opt := client.DefaultOption
opt.SerializeType = protocol.JSON
// 創建一個客戶端
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
defer xclient.Close()
args := example.Args{
A: 10,
B: 20,
}
// 一直髮送乘法問題給服務器
for {
err := xclient.Oneshot(context.Background(), "Mul", args)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("send the problem %d * %d to server", args.A, args.B)
time.Sleep(time.Second)
}
}
參考資料
[1]
rpcx-examples: https://github.com/rpcxio/rpcx-examples
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/1KPQuHMJ-IiI7Vv_xfWh3w