RabbitMQ Go 客戶端教程 6——RPC
本文翻譯自 RabbitMQ 官網的 Go 語言客戶端系列教程 [1],共分爲六篇,本文是第六篇——RPC。
這些教程涵蓋了使用 RabbitMQ 創建消息傳遞應用程序的基礎知識。你需要安裝 RabbitMQ 服務器才能完成這些教程,請參閱安裝指南 [2] 或使用 Docker 鏡像 [3]。這些教程的代碼是開源 [4] 的,官方網站 [5] 也是如此。
先決條件
本教程假設 RabbitMQ 已安裝並運行在本機上的標準端口(5672)。如果你使用不同的主機、端口或憑據,則需要調整連接設置。
遠程過程調用(RPC)
(使用 Go RabbitMQ 客戶端)
在第二個教程中,我們學習瞭如何使用工作隊列在多個 worker 之間分配耗時的任務。
但是,如果我們需要在遠程計算機上運行函數並等待結果怎麼辦?好吧,那是一個不同的故事。這種模式通常稱爲_遠程過程調用_或 RPC。
在本教程中,我們將使用 RabbitMQ 構建一個 RPC 系統:客戶端和可伸縮 RPC 服務器。由於我們沒有值得分配的耗時任務,因此我們將創建一個虛擬 RPC 服務,該服務返回斐波那契數。
有關 RPC 的說明
儘管 RPC 是計算中非常常見的模式,但它經常受到批評。
當程序員不知道函數調用是本地的還是緩慢的 RPC 時,就會出現問題。這樣的混亂會導致系統變幻莫測,並給調試增加了不必要的複雜性。濫用 RPC 可能會導致無法維護的意大利麪條式代碼而不是簡化軟件,
牢記這一點,請考慮以下建議:
確定哪個函數調用是本地的,哪個是遠程的。
爲你的系統編寫文檔。明確組件之間的依賴關係。
處理錯誤情況。當 RPC 服務器長時間關閉時,客戶端應如何處理?
回調隊列
通常,通過 RabbitMQ 進行 RPC 很容易。客戶端發送請求消息,服務器發送響應消息。爲了接收響應,我們需要發送帶有 “回調” 隊列地址的請求。我們可以使用默認隊列。讓我們嘗試一下:
q, err := ch.QueueDeclare(
"", // 不指定隊列名,默認使用隨機生成的隊列名
false, // durable
false, // delete when unused
true, // exclusive
false, // noWait
nil, // arguments
)
err = ch.Publish(
"", // exchange
"rpc_queue", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name, // 在這裏指定callback隊列名,也是在這個隊列等回覆
Body: []byte(strconv.Itoa(n)),
})
消息屬性
AMQP 0-9-1 協議預定義了消息附帶的 14 個屬性集。除以下屬性外,大多數屬性很少使用:
persistent
:將消息標記爲持久性(值爲true
)或瞬態(false
)。你可能還記得第二個教程中的此屬性。
content_type
:用於描述編碼的 mime 類型。例如,對於經常使用的 JSON 編碼,將此屬性設置爲application/ json
是一個好習慣。
reply_to
:常用於命名回調隊列
correlation_id
:有助於將 RPC 響應與請求相關聯
關聯 ID(Correlation Id)
在上面介紹的方法中,我們建議爲每個 RPC 請求創建一個回調隊列。這是相當低效的,但是幸運的是,有一種更好的方法——讓我們爲每個客戶端創建一個回調隊列。
這就引發了一個新問題,在該隊列中收到響應後,尚不清楚響應屬於哪個請求。這個時候就該使用correlation_id
這個屬性了。針對每個請求我們將爲其設置一個唯一值。隨後,當我們在回調隊列中收到消息時,我們將查看該屬性,並基於這個屬性將響應與請求進行匹配。如果我們看到未知的correlation_id
值,則可以放心地丟棄該消息——它不屬於我們的請求。
你可能會問,爲什麼我們應該忽略回調隊列中的未知消息,而不是報錯而失敗?這是由於服務器端可能出現競爭狀況。儘管可能性不大,但 RPC 服務器可能會在向我們發送答案之後但在發送請求的確認消息之前死亡。如果發生這種情況,重新啓動的 RPC 服務器將再次處理該請求。這就是爲什麼在客戶端上我們必須妥善處理重複的響應,並且理想情況下 RPC 應該是冪等的。
總結
我們的 RPC 工作流程如下:
-
客戶端啓動時,它將創建一個匿名排他回調隊列。
-
對於 RPC 請求,客戶端發送一條消息,該消息具有兩個屬性:
reply_to
(設置爲回調隊列)和correlation_id
(設置爲每個請求的唯一值)。 -
該請求被髮送到
rpc_queue
隊列。 -
RPC 工作程序(又名:服務器)正在等待該隊列上的請求。當出現請求時,它會完成計算工作並把結果作爲消息使用
replay_to
字段中的隊列發回給客戶端。 -
客戶端等待回調隊列上的數據。出現消息時,它將檢查
correlation_id
屬性。如果它與請求中的值匹配,則將響應返回給應用程序。
完整示例
斐波那契函數:
func fib(n int) int {
if n == 0 {
return 0
} else if n == 1 {
return 1
} else {
return fib(n-1) + fib(n-2)
}
}
聲明我們的斐波那契函數。它僅假設有效的正整數輸入。(不要指望這種方法適用於大量用戶,它可能是最慢的遞歸實現)。
我們的 RPC 服務器 rpc_server.go[6] 的代碼如下所示:
package main
import (
"log"
"strconv"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func fib(n int) int {
if n == 0 {
return 0
} else if n == 1 {
return 1
} else {
return fib(n-1) + fib(n-2)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"rpc_queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
n, err := strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
log.Printf(" [.] fib(%d)", n)
response := fib(n)
err = ch.Publish(
"", // exchange
d.ReplyTo, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(response)),
})
failOnError(err, "Failed to publish a message")
d.Ack(false)
}
}()
log.Printf(" [*] Awaiting RPC requests")
<-forever
}
服務器代碼非常簡單:
-
與往常一樣,我們首先建立連接,通道並聲明隊列。
-
我們可能要運行多個服務器進程。爲了將負載平均分配給多個服務器,我們需要在通道上設置
prefetch
設置。 -
我們使用
Channel.Consume
獲取去隊列,我們從隊列中接收消息。然後,我們進入 goroutine 進行工作,並將響應發送回去。
我們的 RPC 客戶端 rpc_client.go[7] 的代碼:
package main
import (
"log"
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func randomString(l int) string {
bytes := make([]byte, l)
for i := 0; i < l; i++ {
bytes[i] = byte(randInt(65, 90))
}
return string(bytes)
}
func randInt(min int, max int) int {
return min + rand.Intn(max-min)
}
func fibonacciRPC(n int) (res int, err error) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // noWait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
corrId := randomString(32)
err = ch.Publish(
"", // exchange
"rpc_queue", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
})
failOnError(err, "Failed to publish a message")
for d := range msgs {
if corrId == d.CorrelationId {
res, err = strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
break
}
}
return
}
func main() {
rand.Seed(time.Now().UTC().UnixNano())
n := bodyFrom(os.Args)
log.Printf(" [x] Requesting fib(%d)", n)
res, err := fibonacciRPC(n)
failOnError(err, "Failed to handle RPC request")
log.Printf(" [.] Got %d", res)
}
func bodyFrom(args []string) int {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "30"
} else {
s = strings.Join(args[1:], " ")
}
n, err := strconv.Atoi(s)
failOnError(err, "Failed to convert arg to integer")
return n
}
現在是時候看看 rpc_client.go[8] 和 rpc_server.go[9] 的完整示例源代碼了。
我們的 RPC 服務現已準備就緒。我們可以啓動服務器:
go run rpc_server.go
# => [x] Awaiting RPC requests
要請求斐波那契數,請運行客戶端:
go run rpc_client.go 30
# => [x] Requesting fib(30)
這裏介紹的設計不是 RPC 服務的唯一可能的實現,但是它具有一些重要的優點:
-
如果 RPC 服務器太慢,則可以通過運行另一臺 RPC 服務器來進行擴展。嘗試在新控制檯中運行另一個
rpc_server.go
。 -
在客戶端,RPC 只需要發送和接收一條消息。結果,RPC 客戶端只需要一個網絡往返就可以處理單個 RPC 請求。
我們的代碼仍然非常簡單,並且不會嘗試解決更復雜(但很重要)的問題,例如:
-
如果沒有服務器在運行,客戶端應如何反應?
-
客戶端是否應該爲 RPC 設置某種超時時間?
-
如果服務器發生故障並引發異常,是否應該將其轉發給客戶端?
-
在處理之前防止無效的傳入消息(例如檢查邊界,類型)。
如果要進行實驗,可能會發現管理後臺界面 [10] 對於查看隊列很有用。
參考資料
[1] RabbitMQ 官網的 Go 語言客戶端系列教程: https://www.rabbitmq.com/getstarted.html
[2] 安裝指南: https://www.rabbitmq.com/download.html
[3] Docker 鏡像: https://registry.hub.docker.com//rabbitmq/_
[4] 開源: https://github.com/rabbitmq/rabbitmq-tutorials
[5] 官方網站: https://github.com/rabbitmq/rabbitmq-website
[6] rpc_server.go: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/rpc_server.go
[7] rpc_client.go: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/rpc_client.go
[8] rpc_client.go: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/rpc_client.go
[9] rpc_server.go: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/rpc_server.go
[10] 管理後臺界面: https://www.rabbitmq.com/management.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/4cSfAFo11mWJhZ9txW3B1A