RabbitMQ Go 客戶端教程 3—發佈 - 訂閱

本文翻譯自 RabbitMQ 官網的 Go 語言客戶端系列教程 [1],共分爲六篇,本文是第三篇——發佈 / 訂閱。

這些教程涵蓋了使用 RabbitMQ 創建消息傳遞應用程序的基礎知識。你需要安裝 RabbitMQ 服務器才能完成這些教程,請參閱安裝指南 [2] 或使用 Docker 鏡像 [3]。這些教程的代碼是開源 [4] 的,官方網站 [5] 也是如此。

先決條件

本教程假設 RabbitMQ 已安裝並運行在本機上的標準端口(5672)。如果你使用不同的主機、端口或憑據,則需要調整連接設置。

發佈 / 訂閱

在上一個教程 [6] 中,我們創建了一個工作隊列。工作隊列背後的假設是每個任務只傳遞給一個工人。在這一部分中,我們將做一些完全不同的事情——我們將向多個消費者傳遞一個消息。這就是所謂的 “訂閱 / 發佈模式”。

爲了說明這種模式,我們將構建一個簡單的日誌系統。它將由兩個程序組成——第一個程序將發出日誌消息,第二個程序將接收並打印它們。

在我們的日誌系統中,每一個運行的接收器程序副本都會收到消息。這樣,我們就可以運行一個接收器並將日誌定向到磁盤;同時,我們還可以運行另一個接收器並在屏幕上查看日誌。

本質上,已發佈的日誌消息將被廣播到所有接收者。

Exchanges(交換器)

在本教程的前面部分中,我們向隊列發送消息和從隊列接收消息。現在是時候在 Rabbit 中引入完整的消息傳遞模型了。

讓我們快速回顧一下先前教程中介紹的內容:

RabbitMQ 消息傳遞模型中的核心思想是生產者從不將任何消息直接發送到隊列。實際上,生產者經常甚至根本不知道是否將消息傳遞到任何隊列。

相反,生產者只能將消息發送到交換器。交換器是非常簡單的東西。一方面,它接收來自生產者的消息,另一方面,將它們推入隊列。交換器必須確切知道如何處理接收到的消息。它應該被附加到特定的隊列嗎?還是應該將其附加到許多隊列中?或者它應該被丟棄。這些規則由交換器的類型定義。

有幾種交換器類型可用:direct, topic, headersfanout。我們將集中討論最後一個——fanout。讓我們創建一個這種類型的交換器,並給它起個名字叫logs

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

fanout(扇出)交換器非常簡單。正如你可能從名稱中猜測的那樣,它只是將接收到的所有消息廣播到它知道的所有隊列中。而這正是我們記錄器所需要的。

交換器清單

要列出服務器上的交換器,你可以執行有用的 rabbitmqctl 命令:

sudo rabbitmqctl list_exchanges

在此列表中,將有一些amq.*交換器和一個默認的(未命名)交換器。這些是默認創建的,但是你現在不太可能需要使用它們。

默認交換器

在本教程的前面部分中,我們還不知道交換器的存在,但仍然能夠將消息發送到隊列。之所以能這樣做,是因爲我們使用的是默認交換器,該交換器由空字符串("")標識。

回想一下我們之前是怎麼發佈消息的:

err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})

在這裏,我們使用默認或無名稱的交換器:消息將以route_key參數指定的名稱路由到隊列(如果存在)。

現在,我們可以改爲發佈到我們的命名交換器:

err = ch.ExchangeDeclare(
  "logs",   // 使用命名的交換器
  "fanout", // 交換器類型
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs", // exchange
  "",     // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
          ContentType: "text/plain",
          Body:        []byte(body),
  })

臨時隊列

你可能還記得,先前我們使用的是具有特定名稱的隊列(還記得hellotask_queue嗎?)能夠命名隊列對我們來說至關重要——我們需要將工作人員指向同一個隊列。當你想在生產者和消費者之間共享隊列時,給隊列一個名稱非常重要。

但對於我們的記錄器來說,情況並非如此。我們希望收到所有日誌消息,而不僅僅是它們的一部分。我們也只對當前正在發送的消息感興趣,而對舊消息不感興趣。爲了解決這個問題,我們需要兩件事。

首先,當我們連接到 Rabbit 時,我們需要一個新的、空的隊列。爲此,我們可以創建一個隨機名稱的隊列,或者更好的方法是讓服務器爲我們選擇一個隨機隊列名稱。

其次,一旦我們斷開消費者的連接,隊列就會自動刪除。

在 amqp[7] 客戶端中,當我們傳遞一個空字符串作爲隊列名稱時,我們將使用隨機生成的名稱創建一個非持久隊列:

q, err := ch.QueueDeclare(
  "",    // 空字符串作爲隊列名稱
  false, // 非持久隊列
  false, // delete when unused
  true,  // 獨佔隊列(當前聲明隊列的連接關閉後即被刪除)
  false, // no-wait
  nil,   // arguments
)

上述方法返回時,生成的隊列實例包含 RabbitMQ 生成的隨機隊列名稱。例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg

當聲明它的連接關閉時,該隊列將被刪除,因爲它被聲明爲獨佔。

綁定

err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil,
)

列出綁定關係

你猜也猜到了,我們可以使用下面的命令列出綁定關係

rabbitmqctl list_bindings

完整示例

package main

import (
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

如果沒有隊列綁定到交換器,那麼消息將丟失,但這對我們來說是 ok 的。如果沒有消費者在接收,我們可以安全地丟棄該消息。

package main

import (
        "log"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.QueueBind(
                q.Name, // queue name
                "",     // routing key
                "logs", // exchange
                false,
                nil,
        )
        failOnError(err, "Failed to bind a queue")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf(" [x] %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
}

(receive_logs.go 源碼 [10])

如果要將日誌保存到文件,只需打開控制檯並輸入:

go run receive_logs.go > logs_from_rabbit.log

如果希望在屏幕上查看日誌,請切換到一個新的終端並運行:

go run receive_logs.go

當然,要發出日誌,請輸入:

go run emit_log.go

使用rabbitmqctl list_bindings命令,你可以驗證代碼是否確實根據需要創建了綁定關係和隊列。在運行兩個receive_logs.go程序後,你應該看到類似以下內容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

對結果的解釋很簡單:數據從logs交換器進入了兩個由服務器分配名稱的隊列。這正是我們想要的。

要了解如何偵聽消息的子集,讓我們繼續學習教程 4[11]。

參考資料

[1] RabbitMQ 官網的 Go 語言客戶端系列教程: https://www.rabbitmq.com/getstarted.html

[2] 安裝指南: https://www.rabbitmq.com/download.html

[3] Docker 鏡像: https://registry.hub.docker.com//rabbitmq/_

[4] 開源: https://github.com/rabbitmq/rabbitmq-tutorials

[5] 官方網站: https://github.com/rabbitmq/rabbitmq-website

[6] 上一個教程: _https://www.readfog.com/a/1631286731132211200 _

[7] amqp: http://godoc.org/github.com/streadway/amqp

[8] 隊列指南: https://www.rabbitmq.com/queues.html

[9] emit_logs.go 源碼: http://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/emit_log.go

[10] receive_logs.go 源碼: http://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/receive_logs.go

[11] 教程 4: https://www.readfog.com/a/1631908528273854464

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/nsouScE11fVqelxznEdGhA