Go 語言操作 NSQ 消息隊列
【導讀】本文介紹了 NSQ 消息隊列和 go 語言 NSQ 交互。
網上看了好多,都是抄個官網 README,很多重要的東西不說清楚。只好自己研究了一下。
NSQ 的全家桶介紹
-
nsqd:守護進程,客戶端通信。默認端口
4150
(TCP)4151
(HTTP) -
nsqlookupd:相當於一個路由器。客戶端可以經由它發現生產者、nsqd 廣播的話題。一個 nsqlookupd 能夠管理一羣 nsqd。 默認端口:
:4160
(TCP),:4161
(HTTP) -
nsqadmin:在線面板,能夠通過瀏覽器直接訪問。默認端口
:4171
從命令行啓動
可以直接下載二進制文件。開三個終端,分別執行:
nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
nsqadmin --lookupd-http-address=127.0.0.1:4161
go-nsq 的使用
我封裝了一個包:
package mq
import (
"encoding/json"
"fmt"
"time"
"github.com/nsqio/go-nsq"
"go.uber.org/zap"
)
type MessageQueueConfig struct {
NsqAddr string
NsqLookupdAddr string
SupportedTopics []string
}
type MessageQueue struct {
config MessageQueueConfig
producer *nsq.Producer
consumers map[string]*nsq.Consumer
}
func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
zap.L().Debug("New message queue")
producer, err := initProducer(config.NsqAddr)
if err != nil {
return nil, err
}
consumers := make(map[string]*nsq.Consumer)
for _, topic := range config.SupportedTopics {
nsq.Register(topic,"default")
consumers[topic], err = initConsumer(topic, "default", config.NsqAddr)
if err != nil {
return
}
}
return &MessageQueue{
config: config,
producer: producer,
consumers: consumers,
}, nil
}
func (mq *MessageQueue) Run() {
for name, c := range mq.consumers {
zap.L().Info("Run consumer for " + name)
// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
c.ConnectToNSQD(mq.config.NsqAddr)
}
}
func initProducer(addr string) (producer *nsq.Producer, err error) {
zap.L().Debug("initProducer to " + addr)
config := nsq.NewConfig()
producer, err = nsq.NewProducer(addr, config)
return
}
func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
zap.L().Debug("initConsumer to " + topic + "/" + channel)
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err = nsq.NewConsumer(topic, channel, config)
return
}
func (mq *MessageQueue) Pub(name string, data interface{}) (err error) {
body, err := json.Marshal(data)
if err != nil {
return
}
zap.L().Info("Pub " + name + " to mq. data = " + string(body))
return mq.producer.Publish(name, body)
}
type Messagehandler func(v []byte)
func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) {
zap.L().Info("Subscribe " + name)
v, ok := mq.consumers[name]
if !ok {
err = fmt.Errorf("No such topic: " + name)
return
}
v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
handler(message.Body)
return nil
}))
return
}
使用示例:
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
NsqAddr: "127.0.0.1:4150",
NsqLookupdAddr: "127.0.0.1:4161",
SupportedTopics: []string{"hello"},
})
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
m.Sub("hello", func(resp []byte) {
zap.L().Info("S1 Got: " + string(resp))
})
m.Sub("hello", func(resp []byte) {
zap.L().Info("S2 Got: " + string(resp))
})
m.Run()
err = m.Pub("hello", "world")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
err = m.Pub("hello", "tom")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
os.Exit(0);
主要是進行解耦合,這樣萬一我們換成 Kalfa 之類的隊列,就可以不用動業務代碼。
輸出結果:
2021-11-07T19:13:41.886+0800 DEBUG mq/mq.go:29 New message queue
2021-11-07T19:13:41.886+0800 DEBUG mq/mq.go:58 initProducer to 127.0.0.1:4150
2021-11-07T19:13:41.887+0800 DEBUG mq/mq.go:65 initConsumer to hello/default
2021-11-07T19:13:41.887+0800 INFO mq/mq.go:84 Subscribe hello
2021-11-07T19:13:41.887+0800 INFO mq/mq.go:84 Subscribe hello
2021-11-07T19:13:41.887+0800 INFO mq/mq.go:51 Run consumer for hello
2021/11/07 19:13:41 INF 2 [hello/default] (127.0.0.1:4150) connecting to nsqd
2021-11-07T19:13:41.887+0800 INFO mq/mq.go:77 Pub hello to mq. data = "world"
2021/11/07 19:13:41 INF 1 (127.0.0.1:4150) connecting to nsqd
2021-11-07T19:13:41.888+0800 INFO mq/mq.go:77 Pub hello to mq. data = "tom"
2021-11-07T19:13:41.888+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "world"
2021-11-07T19:13:41.888+0800 INFO buqi-admin-backend/main.go:63 S2 Got: "tom"
從輸出結果我們可以確認一個事實,就是對於訂閱了同一個 topic,同一個 channel 的不同消費者,當消息湧入時,將會負載均衡——每個 Handler 只會收到一個消息。
遇到的問題
TOPIC_NOT_FOUND
遇到兩個原因。
其一是大小寫,Topic 名是大小寫敏感的,因此 Hello
和 hello
是兩個不同的 topic,寫代碼時應該規範操作:抽取常量,並維護一個所有 Topic 的列表。
其二是 Topic 未創建。第一次 pub 之後,對應的 topic/channel 才能創建。建議寫個腳本調用 /topic/create
接口一次性創建好,不然後面第二次重試訂閱的時候才能收到消息,造成不可預料的延遲。
發現客戶端輪詢 HTTP
這是因爲 NsqLookupd 本身是一箇中介,可以管理一堆不同 IP 的 nsqd,那麼我們就不可能永遠只連接一個 nsq,所以就要輪詢來確認有哪些客戶端。
對於小項目,可以繞過 NsqLookupd:
// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
c.ConnectToNSQD(mq.config.NsqAddr)
如何讓多個消費者消費同一個 topic?
顯然,根據 nsq 的機制,我們需要讓同一個 topic 的消費者使用不同的通道。一種方法是隨機化 channel,比如使用一個遞增量作爲 channel 名。
第二種方法是根據用途定義 channel 名。
第三種方法:據說可以使用 AddConcurrentHandlers,尚未研究。
第四種方法:我們把 Handler 中介化,使用一個消費者去消費,但是手動將消息送入應用層的一個自定義的流水線,讓流水線的 filter 去處理消息。我猜這樣還能避免一些臨界區問題。
我們試一下第四種方法。(代碼已發佈到 GIST,Github 用戶名 Pluveto)
https://gist.github.com/pluveto/f380362ec04cf4849e5bc9a79c751204
實現流水線 Handler
package mq
import (
"encoding/json"
"fmt"
"time"
"github.com/nsqio/go-nsq"
"go.uber.org/zap"
)
type MessageQueueConfig struct {
NsqAddr string
NsqLookupdAddr string
EnableLookupd bool
SupportedTopics []string
}
type MessageQueue struct {
subscribers map[string]Subscriber
config MessageQueueConfig
producer *nsq.Producer
}
type Messagehandler func(v []byte) bool
// LinkedHandlerNode 第一個節點爲頭節點,Handler 必須爲 nil
type LinkedHandlerNode struct {
Handler *Messagehandler
Index int
NextNode *LinkedHandlerNode
}
type Subscriber struct {
HandlerHeadNode *LinkedHandlerNode
Consumer *nsq.Consumer
Handler nsq.HandlerFunc
}
func createProducer(addr string) (producer *nsq.Producer, err error) {
zap.L().Debug("initProducer to " + addr)
config := nsq.NewConfig()
producer, err = nsq.NewProducer(addr, config)
return
}
func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
zap.L().Debug("initConsumer to " + topic + "/" + channel)
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err = nsq.NewConsumer(topic, channel, config)
return
}
func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
zap.L().Debug("New message queue")
producer, err := createProducer(config.NsqAddr)
if err != nil {
return nil, err
}
subscribers := make(map[string]Subscriber)
for _, topic := range config.SupportedTopics {
nsq.Register(topic, "default")
consumer, err := createConsumer(topic, "default", config.NsqAddr)
if err != nil {
return nil, err
}
// 頭節點不參與實際使用,所以 Index = -1
headNode := &LinkedHandlerNode{Index: -1}
hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error {
// 循環鏈式調用各個 Handler
curNode := headNode.NextNode
// 當不存在任何用戶定義的 Handler 時拋出警告
if(nil == curNode){
return fmt.Errorf("No handler provided!")
}
for nil != curNode {
msg := message.Body
zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic)
stop := (*curNode.Handler)(msg)
if stop {
zap.S().Debugf("the message has stopped spreading ")
break
}
curNode = curNode.NextNode
}
return nil
})
consumer.AddHandler(hubHandler)
subscribers[topic] = Subscriber{
Consumer: consumer,
HandlerHeadNode: headNode,
}
}
return &MessageQueue{
config: config,
producer: producer,
subscribers: subscribers,
}, nil
}
func (mq *MessageQueue) Run() {
for name, s := range mq.subscribers {
zap.L().Info("Run consumer for " + name)
if mq.config.EnableLookupd {
s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
} else {
s.Consumer.ConnectToNSQD(mq.config.NsqAddr)
}
}
}
func (mq *MessageQueue) IsTopicSupported(topic string) bool {
for _, v := range mq.config.SupportedTopics {
if v == topic {
return true
}
}
return false
}
// Pub 向消息隊列發送一個消息
func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) {
if !mq.IsTopicSupported(topic) {
err = fmt.Errorf("unsupported topic name: " + topic)
return
}
body, err := json.Marshal(data)
if err != nil {
return
}
zap.L().Info("Pub " + topic + " to mq. data = " + string(body))
return mq.producer.Publish(topic, body)
}
// Sub 從消息隊列訂閱一個消息
func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) {
if !mq.IsTopicSupported(topic) {
err = fmt.Errorf("unsupported topic name: " + topic)
return
}
zap.L().Info("Subscribe " + topic)
subscriber, ok := mq.subscribers[topic]
if !ok {
err = fmt.Errorf("No such topic: " + topic)
return
}
// 抵達最後一個有效鏈表節點
curNode := subscriber.HandlerHeadNode
for nil != curNode.NextNode {
curNode = curNode.NextNode
}
// 創建節點
curNode.NextNode = &LinkedHandlerNode{
Handler: &handler,
Index: 1 + curNode.Index,
NextNode: nil,
}
return
}
這裏的思想是給每個消費者預先創建唯一的 Handler,這個 Handler 會依次調用鏈表中的各個具體的 Handler。當用戶訂閱 Topic 時,將用戶提供的 Handler 添加到鏈表末尾。
使用示例:
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
NsqAddr: "127.0.0.1:4150",
NsqLookupdAddr: "127.0.0.1:4161",
SupportedTopics: []string{"hello"},
EnableLookupd: false,
})
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S1 Got: " + string(resp))
return false
})
m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S2 Got: " + string(resp))
return true
})
m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S3 Got: " + string(resp))
return false
})
m.Run()
err = m.Pub("hello", "world")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
err = m.Pub("hello", "tom")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
os.Exit(0)
輸出:
2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:40 New message queue
2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:89 initProducer to 127.0.0.1:4150
2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:96 initConsumer to hello/default
2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello
2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello
2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello
2021-11-07T20:30:38.448+0800 INFO mq/mq.go:82 Run consumer for hello
2021/11/07 20:30:38 INF 2 [hello/default] (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.454+0800 INFO mq/mq.go:108 Pub hello to mq. data = "world"
2021/11/07 20:30:38 INF 1 (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.455+0800 INFO mq/mq.go:108 Pub hello to mq. data = "tom"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "world"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:64 S2 Got: "world"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:60 the message has stopped spreading
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "tom"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:64 S2 Got: "tom"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:60 the message has stopped spreading
^C
可以看到,Handler 返回 true 時,就可以阻斷消息的傳播。
轉自:segmentfault.com/a/1190000040923001
Go 開發大全
參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/czxAuY-CW9v1nBOocYZN5g