RabbitMQ Go 客戶端教程 1——HelloWorl
本文翻譯自 RabbitMQ 官網的 Go 語言客戶端系列教程 [1],共分爲六篇,本文是第一篇——HelloWorld。
這些教程涵蓋了使用 RabbitMQ 創建消息傳遞應用程序的基礎知識。你需要安裝 RabbitMQ 服務器才能完成這些教程,請參閱安裝指南 [2] 或使用 Docker 鏡像 [3]。這些教程的代碼是開源 [4] 的,官方網站 [5] 也是如此。
先決條件
本教程假設 RabbitMQ 已安裝並運行在本機上的標準端口(5672)。如果你使用不同的主機、端口或憑據,則需要調整連接設置。
RabbitMQ Go 語言客戶端教程(一)
介紹
RabbitMQ 是一個消息代理:它接受並轉發消息。你可以把它想象成一個郵局:當你把你想要郵寄的郵件放進一個郵箱時,你可以確定郵差先生或女士最終會把郵件送到你的收件人那裏。在這個比喻中,RabbitMQ 是一個郵箱、一個郵局和一個郵遞員。
RabbitMQ 和郵局的主要區別在於它不處理紙張,而是接受、存儲和轉發二進制數據塊——消息。
RabbitMQ 和一般的消息傳遞都使用一些術語。
-
生產僅意味着發送。發送消息的程序是生產者:
-
隊列是位於 RabbitMQ 內部的郵箱的名稱。儘管消息通過 RabbitMQ 和你的應用程序流動,但它們只能存儲在隊列中。隊列只受主機內存和磁盤限制的限制,實際上它是一個大的消息緩衝區。許多生產者可以向一個隊列發送消息,而許多消費者可以嘗試從一個隊列接收數據。以下是我們表示隊列的方式:
-
消費與接收具有相似的含義。消費者是一個主要等待接收消息的程序:
請注意,生產者,消費者和代理(broker)不必位於同一主機上。實際上,在大多數應用程序中它們不是。一個應用程序既可以是生產者,也可以是消費者。
"Hello World"
(使用 Go RabbitMQ 客戶端)
在本教程的這一部分中,我們將在 Go 中編寫兩個小程序:發送單個消息的生產者和接收消息並將其打印出來的消費者。我們將忽略 Go-RabbitMQ[6] API 中的一些細節,只關注非常簡單的事情,以便開始教程。這是一個消息傳遞版的 “Hello World”。
在下圖中,“P” 是我們的生產者,“ C” 是我們的消費者。中間的框是一個隊列——RabbitMQ 代表消費者保存的消息緩衝區。
(P) -> [|||] -> (C)
Go RabbitMQ 客戶端庫
RabbitMQ 講多種協議。本教程使用 amqp0-9-1,這是一個開放的、通用的消息傳遞協議。RabbitMQ 有許多不同語言的客戶端 [7]。在本教程中,我們將使用 Go amqp 客戶端。
首先,使用
go get
安裝 amqpgo get github.com/streadway/amqp
現在安裝好 amqp 之後,我們就可以編寫一些代碼。
發送
(P) -> [|||]
我們將消息發佈者(發送者)稱爲 send.go
,將消息消費者(接收者)稱爲receive.go
。發佈者將連接到 RabbitMQ,發送一條消息,然後退出。
在send.go
中,我們需要首先導入庫:
package main
import (
"fmt"
"github.com/streadway/amqp"
)
我們還需要一個輔助函數來檢查每個 amqp 調用的返回值:
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
然後連接到 RabbitMQ 服務器
// 1. 嘗試連接RabbitMQ,建立連接
// 該連接抽象了套接字連接,併爲我們處理協議版本協商和認證等。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
連接抽象了 socket 連接,併爲我們處理協議版本協商和認證等。接下來,我們創建一個通道,這是大多數用於完成任務的 API 所在的位置:
// 2. 接下來,我們創建一個通道,大多數API都是用過該通道操作的。
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
要發送,我們必須聲明要發送到的隊列。然後我們可以將消息發佈到隊列:
// 3. 聲明消息要發送到的隊列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
// 4.將消息發佈到聲明的隊列
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
聲明隊列是冪等的——僅當隊列不存在時才創建。消息內容是一個字節數組,因此你可以在此處編碼任何內容。
點擊查看完整的 send.go 文件 [8]
接收
上面是我們的發佈者。我們的消費者監聽來自 RabbitMQ 的消息,因此與發佈單個消息的發佈者不同,我們將使消費者保持運行狀態以監聽消息並打印出來。
[|||] -> (C)
該代碼(在receive.go
中)具有與send
相同的導入和幫助功能:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
設置與發佈者相同;我們打開一個連接和一個通道,並聲明要消耗的隊列。請注意,這與send
發佈到的隊列匹配。
// 建立連接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 獲取channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 聲明隊列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
請注意,我們也在這裏聲明隊列。因爲我們可能在發佈者之前啓動使用者,所以我們希望在嘗試使用隊列中的消息之前確保隊列存在。
我們將告訴服務器將隊列中的消息傳遞給我們。由於它將異步地向我們發送消息,因此我們將在 goroutine 中從通道(由amqp::Consume
返回)中讀取消息。
// 獲取接收消息的Delivery通道
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
點擊完整的 receive.go 腳本 [9]
完整示例
現在我們可以運行兩個腳本。在一個終端窗口,運行發佈者:
go run send.go
然後,運行使用者:
go run receive.go
消費者將打印通過 RabbitMQ 從發佈者那裏得到的消息。使用者將持續運行,等待消息(使用 Ctrl-C 停止它),因此請嘗試從另一個終端運行發佈者。
如果要檢查隊列,請嘗試使用rabbitmqctl list_queues
命令。
接下來該繼續教程的第二部分 [10] 並建立一個簡單的任務隊列。
參考資料
[1] RabbitMQ 官網的 Go 語言客戶端系列教程: https://www.rabbitmq.com/getstarted.html
[2] 安裝指南: https://www.rabbitmq.com/download.html
[3] Docker 鏡像: https://registry.hub.docker.com//rabbitmq/_
[4] 開源: https://github.com/rabbitmq/rabbitmq-tutorials
[5] 官方網站: https://github.com/rabbitmq/rabbitmq-website
[6] Go-RabbitMQ: http://godoc.org/github.com/streadway/amqp
[7] 許多不同語言的客戶端: http://rabbitmq.com/devtools.html
[8] 點擊查看完整的 send.go 文件: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/send.go
[9] 點擊完整的 receive.go 腳本: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/receive.go
[10] 第二部分: https://www.liwenzhou.com/posts/Go/go_rabbitmq_tutorials_02/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/KSr-soJ9Ls-GWyMdtiPVVA