RabbitMQ Go 客戶端教程 6——RPC

本文翻譯自 RabbitMQ 官網的 Go 語言客戶端系列教程 [1],共分爲六篇,本文是第六篇——RPC。

這些教程涵蓋了使用 RabbitMQ 創建消息傳遞應用程序的基礎知識。你需要安裝 RabbitMQ 服務器才能完成這些教程,請參閱安裝指南 [2] 或使用 Docker 鏡像 [3]。這些教程的代碼是開源 [4] 的,官方網站 [5] 也是如此。

先決條件

本教程假設 RabbitMQ 已安裝並運行在本機上的標準端口(5672)。如果你使用不同的主機、端口或憑據,則需要調整連接設置。

遠程過程調用(RPC)

(使用 Go RabbitMQ 客戶端)

在第二個教程中,我們學習瞭如何使用工作隊列在多個 worker 之間分配耗時的任務。

但是,如果我們需要在遠程計算機上運行函數並等待結果怎麼辦?好吧,那是一個不同的故事。這種模式通常稱爲_遠程過程調用_或 RPC

在本教程中,我們將使用 RabbitMQ 構建一個 RPC 系統:客戶端和可伸縮 RPC 服務器。由於我們沒有值得分配的耗時任務,因此我們將創建一個虛擬 RPC 服務,該服務返回斐波那契數。

有關 RPC 的說明

儘管 RPC 是計算中非常常見的模式,但它經常受到批評。

當程序員不知道函數調用是本地的還是緩慢的 RPC 時,就會出現問題。這樣的混亂會導致系統變幻莫測,並給調試增加了不必要的複雜性。濫用 RPC 可能會導致無法維護的意大利麪條式代碼而不是簡化軟件,

牢記這一點,請考慮以下建議:

  • 確定哪個函數調用是本地的,哪個是遠程的。

  • 爲你的系統編寫文檔。明確組件之間的依賴關係。

  • 處理錯誤情況。當 RPC 服務器長時間關閉時,客戶端應如何處理?

回調隊列

通常,通過 RabbitMQ 進行 RPC 很容易。客戶端發送請求消息,服務器發送響應消息。爲了接收響應,我們需要發送帶有 “回調” 隊列地址的請求。我們可以使用默認隊列。讓我們嘗試一下:

q, err := ch.QueueDeclare(
  "",    // 不指定隊列名,默認使用隨機生成的隊列名
  false, // durable
  false, // delete when unused
  true,  // exclusive
  false, // noWait
  nil,   // arguments
)

err = ch.Publish(
  "",          // exchange
  "rpc_queue", // routing key
  false,       // mandatory
  false,       // immediate
  amqp.Publishing{
    ContentType:   "text/plain",
    CorrelationId: corrId,
    ReplyTo:       q.Name,  // 在這裏指定callback隊列名,也是在這個隊列等回覆
    Body:          []byte(strconv.Itoa(n)),
})

消息屬性

AMQP 0-9-1 協議預定義了消息附帶的 14 個屬性集。除以下屬性外,大多數屬性很少使用:

  • persistent:將消息標記爲持久性(值爲true)或瞬態(false)。你可能還記得第二個教程中的此屬性。

  • content_type:用於描述編碼的 mime 類型。例如,對於經常使用的 JSON 編碼,將此屬性設置爲application/ json是一個好習慣。

  • reply_to:常用於命名回調隊列

  • correlation_id:有助於將 RPC 響應與請求相關聯

關聯 ID(Correlation Id)

在上面介紹的方法中,我們建議爲每個 RPC 請求創建一個回調隊列。這是相當低效的,但是幸運的是,有一種更好的方法——讓我們爲每個客戶端創建一個回調隊列。

這就引發了一個新問題,在該隊列中收到響應後,尚不清楚響應屬於哪個請求。這個時候就該使用correlation_id這個屬性了。針對每個請求我們將爲其設置一個唯一值。隨後,當我們在回調隊列中收到消息時,我們將查看該屬性,並基於這個屬性將響應與請求進行匹配。如果我們看到未知的correlation_id值,則可以放心地丟棄該消息——它不屬於我們的請求。

你可能會問,爲什麼我們應該忽略回調隊列中的未知消息,而不是報錯而失敗?這是由於服務器端可能出現競爭狀況。儘管可能性不大,但 RPC 服務器可能會在向我們發送答案之後但在發送請求的確認消息之前死亡。如果發生這種情況,重新啓動的 RPC 服務器將再次處理該請求。這就是爲什麼在客戶端上我們必須妥善處理重複的響應,並且理想情況下 RPC 應該是冪等的。

總結

我們的 RPC 工作流程如下:

完整示例

斐波那契函數:

func fib(n int) int {
        if n == 0 {
                return 0
        } else if n == 1 {
                return 1
        } else {
                return fib(n-1) + fib(n-2)
        }
}

聲明我們的斐波那契函數。它僅假設有效的正整數輸入。(不要指望這種方法適用於大量用戶,它可能是最慢的遞歸實現)。

我們的 RPC 服務器 rpc_server.go[6] 的代碼如下所示:

package main

import (
        "log"
        "strconv"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func fib(n int) int {
        if n == 0 {
                return 0
        } else if n == 1 {
                return 1
        } else {
                return fib(n-1) + fib(n-2)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "rpc_queue", // name
                false,       // durable
                false,       // delete when unused
                false,       // exclusive
                false,       // no-wait
                nil,         // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
        )
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        n, err := strconv.Atoi(string(d.Body))
                        failOnError(err, "Failed to convert body to integer")

                        log.Printf(" [.] fib(%d)", n)
                        response := fib(n)

                        err = ch.Publish(
                                "",        // exchange
                                d.ReplyTo, // routing key
                                false,     // mandatory
                                false,     // immediate
                                amqp.Publishing{
                                        ContentType:   "text/plain",
                                        CorrelationId: d.CorrelationId,
                                        Body:          []byte(strconv.Itoa(response)),
                                })
                        failOnError(err, "Failed to publish a message")

                        d.Ack(false)
                }
        }()

        log.Printf(" [*] Awaiting RPC requests")
        <-forever
}

服務器代碼非常簡單:

我們的 RPC 客戶端 rpc_client.go[7] 的代碼:

package main

import (
        "log"
        "math/rand"
        "os"
        "strconv"
        "strings"
        "time"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func randomString(l int) string {
        bytes := make([]byte, l)
        for i := 0; i < l; i++ {
                bytes[i] = byte(randInt(65, 90))
        }
        return string(bytes)
}

func randInt(min int, max int) int {
        return min + rand.Intn(max-min)
}

func fibonacciRPC(n int) (res int, err error) {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // noWait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        corrId := randomString(32)

        err = ch.Publish(
                "",          // exchange
                "rpc_queue", // routing key
                false,       // mandatory
                false,       // immediate
                amqp.Publishing{
                        ContentType:   "text/plain",
                        CorrelationId: corrId,
                        ReplyTo:       q.Name,
                        Body:          []byte(strconv.Itoa(n)),
                })
        failOnError(err, "Failed to publish a message")

        for d := range msgs {
                if corrId == d.CorrelationId {
                        res, err = strconv.Atoi(string(d.Body))
                        failOnError(err, "Failed to convert body to integer")
                        break
                }
        }

        return
}

func main() {
        rand.Seed(time.Now().UTC().UnixNano())

        n := bodyFrom(os.Args)

        log.Printf(" [x] Requesting fib(%d)", n)
        res, err := fibonacciRPC(n)
        failOnError(err, "Failed to handle RPC request")

        log.Printf(" [.] Got %d", res)
}

func bodyFrom(args []string) int {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "30"
        } else {
                s = strings.Join(args[1:], " ")
        }
        n, err := strconv.Atoi(s)
        failOnError(err, "Failed to convert arg to integer")
        return n
}

現在是時候看看 rpc_client.go[8] 和 rpc_server.go[9] 的完整示例源代碼了。

我們的 RPC 服務現已準備就緒。我們可以啓動服務器:

go run rpc_server.go
# => [x] Awaiting RPC requests

要請求斐波那契數,請運行客戶端:

go run rpc_client.go 30
# => [x] Requesting fib(30)

這裏介紹的設計不是 RPC 服務的唯一可能的實現,但是它具有一些重要的優點:

我們的代碼仍然非常簡單,並且不會嘗試解決更復雜(但很重要)的問題,例如:

如果要進行實驗,可能會發現管理後臺界面 [10] 對於查看隊列很有用。

參考資料

[1] RabbitMQ 官網的 Go 語言客戶端系列教程: https://www.rabbitmq.com/getstarted.html

[2] 安裝指南: https://www.rabbitmq.com/download.html

[3] Docker 鏡像: https://registry.hub.docker.com//rabbitmq/_

[4] 開源: https://github.com/rabbitmq/rabbitmq-tutorials

[5] 官方網站: https://github.com/rabbitmq/rabbitmq-website

[6] rpc_server.go: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/rpc_server.go

[7] rpc_client.go: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/rpc_client.go

[8] rpc_client.go: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/rpc_client.go

[9] rpc_server.go: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/rpc_server.go

[10] 管理後臺界面: https://www.rabbitmq.com/management.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/4cSfAFo11mWJhZ9txW3B1A