RabbitMQ Go 教程 2——工作隊列

本文翻譯自 RabbitMQ 官網的 Go 語言客戶端系列教程 [1],共分爲六篇,本文是第二篇——任務隊列。

這些教程涵蓋了使用 RabbitMQ 創建消息傳遞應用程序的基礎知識。 你需要安裝 RabbitMQ 服務器才能完成這些教程,請參閱安裝指南 [2] 或使用 Docker 鏡像 [3]。 這些教程的代碼是開源 [4] 的,官方網站 [5] 也是如此。

先決條件

本教程假設 RabbitMQ 已安裝並運行在本機上的標準端口(5672)。如果你使用不同的主機、端口或憑據,則需要調整連接設置。

任務隊列 / 工作隊列

(使用 Go RabbitMQ 客戶端)

在第一個教程 [6] 中,我們編寫程序從命名的隊列發送和接收消息。在這一節中,我們將創建一個工作隊列,該隊列將用於在多個工人之間分配耗時的任務。

工作隊列(又稱任務隊列)的主要思想是避免立即執行某些資源密集型任務並且不得不等待這些任務完成。相反,我們安排任務異步地同時或在當前任務之後完成。我們將任務封裝爲消息並將其發送到隊列,在後臺運行的工作進程將取出消息並最終執行任務。當你運行多個工作進程時,任務將在他們之間共享。

這個概念在 Web 應用中特別有用,因爲在 Web 應用中不可能在較短的 HTTP 請求窗口內處理複雜的任務,(譯註:例如註冊時發送郵件或短信驗證碼等場景)。

準備工作

在本教程的上一部分,我們發送了一條包含 “Hello World!” 的消息。現在,我們將發送代表複雜任務的字符串。我們沒有實際的任務,例如調整圖像大小或渲染 pdf 文件,所以我們通過藉助time.Sleep函數模擬一些比較耗時的任務。我們會將一些包含.的字符串封裝爲消息發送到隊列中,其中每有一個.就表示需要耗費 1 秒鐘的工作,例如,hello...表示一個將花費三秒鐘的假任務。

我們將稍微修改上一個示例中的send.go代碼,以允許從命令行發送任意消息。該程序會將任務安排到我們的工作隊列中,因此我們將其命名爲new_task.go

body := bodyFrom(os.Args)  // 從參數中獲取要發送的消息正文
err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
  })
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

下面是bodyFrom函數:

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

我們以前的receive.go程序也需要進行一些更改:它需要爲消息正文中出現的每個.僞造一秒鐘的工作。它將從隊列中彈出消息並執行任務,因此我們將其稱爲worker.go

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))  // 數一下有幾個.
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)  // 模擬耗時的任務
    log.Printf("Done")
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

請注意,我們的假任務模擬執行時間。

然後,我們就可以打開兩個終端,分別執行new_task.goworker.go了。

# shell 1
go run worker.go
# shell 2
go run new_task.go

循環調度

使用任務隊列的優點之一是能夠輕鬆並行化工作。如果我們的工作正在積壓,我們可以增加更多的工人,這樣就可以輕鬆擴展。

首先,讓我們嘗試同時運行兩個worker.go腳本。它們都將從隊列中獲取消息,但是究竟是怎樣呢?讓我們來看看。

你需要打開三個控制檯。其中兩個將運行worker.go腳本。這些控制檯將成爲我們的兩個消費者——C1 和 C2。

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

在第三個控制檯中,我們將發佈新任務。啓動消費者之後,你可以發佈一些消息:

# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....

然後我們在shell1shell2 兩個窗口看到如下輸出結果了:

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....

默認情況下,RabbitMQ 將按順序將每個消息發送給下一個消費者。平均而言,每個消費者都會收到相同數量的消息。這種分發消息的方式稱爲輪詢。使用三個或者更多worker試一下。

消息確認

work 完成任務可能需要耗費幾秒鐘,如果一個worker在任務執行過程中宕機了該怎麼辦呢?我們當前的代碼中,RabbitMQ 一旦向消費者傳遞了一條消息,便立即將其標記爲刪除。在這種情況下,如果你終止一個worker那麼你就可能會丟失這個任務,我們還將丟失所有已經交付給這個worker的尚未處理的消息。

我們不想丟失任何任務,如果一個worker意外宕機了,那麼我們希望將任務交付給其他worker來處理。

爲了確保消息永不丟失,RabbitMQ 支持 消息 * 確認 *[7]。消費者發送回一個確認(acknowledgement),以告知 RabbitMQ 已經接收,處理了特定的消息,並且 RabbitMQ 可以自由刪除它。

如果使用者在不發送確認的情況下死亡(其通道已關閉,連接已關閉或 TCP 連接丟失),RabbitMQ 將瞭解消息未完全處理,並將對其重新排隊。如果同時有其他消費者在線,它將很快將其重新分發給另一個消費者。這樣,您可以確保即使工人偶爾死亡也不會丟失任何消息。

沒有任何消息超時;RabbitMQ 將在消費者死亡時重新傳遞消息。即使處理一條消息需要很長時間也沒關係。

在本教程中,我們將使用手動消息確認,方法是爲 “auto-ack” 參數傳遞一個false,然後在完成任務後,使用d.Ack(false)worker發送一個正確的確認(這將確認一次傳遞)。

msgs, err := ch.Consume(
 q.Name, // queue
 "",     // consumer
 false,  // 注意這裏傳false,關閉自動消息確認
 false,  // exclusive
 false,  // no-local
 false,  // no-wait
 nil,    // args
)
if err != nil {
 fmt.Printf("ch.Consume failed, err:%v\n", err)
 return
}

// 開啓循環不斷地消費消息
forever := make(chan bool)
go func() {
 for d := range msgs {
  log.Printf("Received a message: %s", d.Body)
  dotCount := bytes.Count(d.Body, []byte("."))
  t := time.Duration(dotCount)
  time.Sleep(t * time.Second)
  log.Printf("Done")
  d.Ack(false) // 手動傳遞消息確認
 }
}()

使用這段代碼,我們可以確保即使你在處理消息時使用CTRL+C殺死一個worker,也不會丟失任何內容。在worker死後不久,所有未確認的消息都將被重新發送。

消息確認必須在接收消息的同一通道(Channel)上發送。嘗試使用不同的通道(Channel)進行消息確認將導致通道級協議異常。有關更多信息,請參閱確認的文檔指南 [8]。

忘記確認

忘記確認是一個常見的錯誤。這是一個簡單的錯誤,但後果是嚴重的。當你的客戶機退出時,消息將被重新傳遞(這看起來像隨機重新傳遞),但是 RabbitMQ 將消耗越來越多的內存,因爲它無法釋放任何未確認的消息。

爲了調試這種錯誤,可以使用 rabbitmqctl 打印 messages_unacknowledged 字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 平臺,去掉 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化

我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是如果 RabbitMQ 服務器停止運行,我們的任務仍然會丟失。

當 RabbitMQ 退出或崩潰時,它將忘記隊列和消息,除非您告訴它不要這樣做。要確保消息不會丟失,需要做兩件事:我們需要將隊列和消息都標記爲持久的。

首先,我們需要確保隊列能夠在 RabbitMQ 節點重新啓動後繼續運行。爲此,我們需要聲明它是持久的:

q, err := ch.QueueDeclare(
 "hello", // name
 true,    // 聲明爲持久隊列
 false,   // delete when unused
 false,   // exclusive
 false,   // no-wait
 nil,     // arguments
)

雖然這個命令本身是正確的,但它在我們當前的設置中不起作用。這是因爲我們已經定義了一個名爲hello的隊列,它不是持久的。RabbitMQ 不允許你使用不同的參數重新定義現有隊列,並將向任何嘗試重新定義的程序返回錯誤。但是有一個快速的解決方法——讓我們聲明一個具有不同名稱的隊列,例如task_queue

q, err := ch.QueueDeclare(
 "task_queue", // name
 true,         // 聲明爲持久隊列
 false,        // delete when unused
 false,        // exclusive
 false,        // no-wait
 nil,          // arguments
)

這種持久的選項更改需要同時應用於生產者代碼和消費者代碼。

在這一點上,我們確信即使 RabbitMQ 重新啓動,任務隊列隊列也不會丟失。現在我們需要將消息標記爲持久的——通過使用amqp.Publishing中的持久性選項amqp.Persistent

err = ch.Publish(
 "",     // exchange
 q.Name, // routing key
 false,  // 立即
 false,  // 強制
 amqp.Publishing{
  DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬態/持久)
  ContentType:  "text/plain",
  Body:         []byte(body),
 })

有關消息持久性的說明

將消息標記爲持久性並不能完全保證消息不會丟失。儘管它告訴 RabbitMQ 將消息保存到磁盤上,但是 RabbitMQ 接受了一條消息並且還沒有保存它時,仍然有一個很短的時間窗口。而且,RabbitMQ 並不是對每個消息都執行fsync(2)——它可能只是保存到緩存中,而不是真正寫入磁盤。持久性保證不是很強,但是對於我們的簡單任務隊列來說已經足夠了。如果您需要更強有力的擔保,那麼您可以使用 publisher confirms[9]。

公平分發

你可能已經注意到調度仍然不能完全按照我們的要求工作。例如,在一個有兩個worker的情況下,當所有的奇數消息都是重消息而偶數消息都是輕消息時,一個worker將持續忙碌,而另一個worker幾乎不做任何工作。嗯,RabbitMQ 對此一無所知,仍然會均勻地發送消息。

這是因爲 RabbitMQ 只是在消息進入隊列時發送消息。它不考慮消費者未確認消息的數量。只是盲目地向消費者發送信息。

爲了避免這種情況,我們可以將預取計數設置爲1。這告訴 RabbitMQ 不要一次向一個worker發出多個消息。或者,換句話說,在處理並確認前一條消息之前,不要向worker發送新消息。相反,它將把它發送給下一個不忙的worker

err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)

關於隊列大小的說明

如果所有的worker都很忙,你的queue隨時可能會滿。你會想繼續關注這一點,也許需要增加更多的worker,或者有一些其他的策略。

完整的代碼示例

我們的new_task.go的最終代碼代入如下:

package main

import (
 "fmt"
 "log"
 "os"
 "strings"

 "github.com/streadway/amqp"
)

func main() {
 // 1. 嘗試連接RabbitMQ,建立連接
 // 該連接抽象了套接字連接,併爲我們處理協議版本協商和認證等。
 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
 if err != nil {
  fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
  return
 }
 defer conn.Close()

 // 2. 接下來,我們創建一個通道,大多數API都是用過該通道操作的。
 ch, err := conn.Channel()
 if err != nil {
  fmt.Printf("open a channel failed, err:%v\n", err)
  return
 }
 defer ch.Close()

 // 3. 要發送,我們必須聲明要發送到的隊列。
 q, err := ch.QueueDeclare(
  "task_queue", // name
  true,         // 持久的
  false,        // delete when unused
  false,        // 獨有的
  false,        // no-wait
  nil,          // arguments
 )
 if err != nil {
  fmt.Printf("declare a queue failed, err:%v\n", err)
  return
 }

 // 4. 然後我們可以將消息發佈到聲明的隊列
 body := bodyFrom(os.Args)
 err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // 立即
  false,  // 強制
  amqp.Publishing{
   DeliveryMode: amqp.Persistent, // 持久
   ContentType:  "text/plain",
   Body:         []byte(body),
  })
 if err != nil {
  fmt.Printf("publish a message failed, err:%v\n", err)
  return
 }
 log.Printf(" [x] Sent %s", body)
}

// bodyFrom 從命令行獲取將要發送的消息內容
func bodyFrom(args []string) string {
 var s string
 if (len(args) < 2) || os.Args[1] == "" {
  s = "hello"
 } else {
  s = strings.Join(args[1:], " ")
 }
 return s
}

work.go內容如下:

package main

import (
 "bytes"
 "fmt"
 "log"
 "time"

 "github.com/streadway/amqp"
)

func main() {
 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
 if err != nil {
  fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
  return
 }
 defer conn.Close()

 ch, err := conn.Channel()
 if err != nil {
  fmt.Printf("open a channel failed, err:%v\n", err)
  return
 }
 defer ch.Close()

 // 聲明一個queue
 q, err := ch.QueueDeclare(
  "task_queue", // name
  true,         // 聲明爲持久隊列
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
 )
 err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
 )
 if err != nil {
  fmt.Printf("ch.Qos() failed, err:%v\n", err)
  return
 }

 // 立即返回一個Delivery的通道
 msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // 注意這裏傳false,關閉自動消息確認
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
 )
 if err != nil {
  fmt.Printf("ch.Consume failed, err:%v\n", err)
  return
 }

 // 開啓循環不斷地消費消息
 forever := make(chan bool)
 go func() {
  for d := range msgs {
   log.Printf("Received a message: %s", d.Body)
   dotCount := bytes.Count(d.Body, []byte("."))
   t := time.Duration(dotCount)
   time.Sleep(t * time.Second)
   log.Printf("Done")
   d.Ack(false) // 手動傳遞消息確認
  }
 }()

 log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
 <-forever
}

使用消息確認和預取計數,可以設置工作隊列(work queue)。即使 RabbitMQ 重新啓動,持久性選項也可以讓任務繼續存在。

有關amqp.Channel方法和消息屬性的內容,可以瀏覽 amqp API 文檔 [10]。

參考資料

[1] RabbitMQ 官網的 Go 語言客戶端系列教程: https://www.rabbitmq.com/getstarted.html

[2] 安裝指南: https://www.rabbitmq.com/download.html

[3] Docker 鏡像: https://registry.hub.docker.com//rabbitmq/_

[4] 開源: https://github.com/rabbitmq/rabbitmq-tutorials

[5] 官方網站: https://github.com/rabbitmq/rabbitmq-website

[6] 第一個教程: https://www.readfog.com/a/1631184198527193088_

[7] 消息_確認_: _https://www.rabbitmq.com/confirms.html_

[8] 確認的文檔指南: https://www.rabbitmq.com/confirms.html

[9] publisher confirms: https://www.rabbitmq.com/confirms.html

[10] amqp API 文檔: http://godoc.org/github.com/streadway/amqp

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/4Jln0-2kMi7bsTdl0qoTRA