高性能消息中間件 NSQ 解析 - 應用實踐
Nsq 是用 Go 語言開發的輕量級的分佈式消息隊列,適合小型項目使用、用來學習消息隊列實現原理,對於學習 Go channel 的原理和用法,以及如何用 Go 語言來寫分佈式是一個很不錯的入門項目。
安裝使用
在官網(https://nsq.io/overview/quick_start.html) 下載對應的二進制可執行文件。
1# 啓動nsqlookupd
2$ nsqlookupd
3# 啓動 nsqd
4$ nsqd --lookupd-tcp-address=127.0.0.1:4160
5# 啓動 nsqadmin
6$ nsqadmin --lookupd-http-address=127.0.0.1:4161
7
8# 創建topic,發送消息
9$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
10# 啓動nsq_to_file
11$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
12# 發佈消息到 nsqd
13$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
14$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
15
在本地按照上述步驟就可以跑起來了。
創建生產者
安裝好 nsq 的幾個服務之後,我們來實現基於 nsq 的生產和消費示例。首先是創建生產者:
1package main
2
3import (
4 "fmt"
5 "log"
6 "time"
7
8 "github.com/nsqio/go-nsq"
9)
10
11func main() {
12 config := nsq.NewConfig()
13 p, err := nsq.NewProducer("127.0.0.1:4150", config)
14
15 if err != nil {
16 log.Panic(err)
17 }
18
19 for i := 0; i < 1000; i++ {
20 msg := fmt.Sprintf("num-%d", i)
21 log.Println("Pub:" + msg)
22 err = p.Publish("testTopic", []byte(msg))
23 if err != nil {
24 log.Panic(err)
25 }
26 time.Sleep(time.Second * 1)
27 }
28
29 p.Stop()
30}
31
生產者的邏輯比較簡單,基於 nsq 官方提供的 github.com/nsqio/go-nsq
包,通過調用,循環寫 1000 個字符 + 數字,即 num-n 的形式,通過 p.Publish 發送到消息隊列中,等待消費。
消費者
接着,我們創建消費者:consumer.go 來消費剛剛生產的消息。
1package main
2
3import (
4 "log"
5 "sync"
6
7 "github.com/nsqio/go-nsq"
8)
9
10func main() {
11 wg := &sync.WaitGroup{}
12 wg.Add(1000)
13
14 config := nsq.NewConfig()
15 c, _ := nsq.NewConsumer("testTopic", "ch", config)
16 c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
17 log.Printf("Got a message: %s", message.Body)
18 wg.Done()
19 return nil
20 }))
21
22 // 1.直連nsqd
23 // err := c.ConnectToNSQD("127.0.0.1:4150")
24
25 // 2.通過 nsqlookupd 服務發現
26 err := c.ConnectToNSQLookupd("127.0.0.1:4161")
27 if err != nil {
28 log.Panic(err)
29 }
30 wg.Wait()
31}
32
可通過兩種方式與 nsqd 連接:
-
直連 nsqd,適用於單機 (standalone) 版;
-
通過 nsqlookupd 服務發現,適用於集羣 (cluster) 版;
消費消息的動作,主要邏輯就是打印出來,實際業務中需要進行其他處理。
運行結果
依次啓動生產者和消費者的服務,可以分別看到如下的輸出結果:
1$go run producer.go
2
32020/12/28 20:29:51 Pub:num-0
42020/12/28 20:29:51 INF 1 (127.0.0.1:4150) connecting to nsqd
52020/12/28 20:29:52 Pub:num-1
62020/12/28 20:29:53 Pub:num-2
72020/12/28 20:29:54 Pub:num-3
82020/12/28 20:29:55 Pub:num-4
92020/12/28 20:29:56 Pub:num-5
102020/12/28 20:29:57 Pub:num-6
112020/12/28 20:29:58 Pub:num-7
122020/12/28 20:29:59 Pub:num-8
132020/12/28 20:30:00 Pub:num-9
142020/12/28 20:30:01 Pub:num-10
15
16$ go run consumer.go
17
182020/12/28 20:30:08 INF 1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
192020/12/28 20:30:08 INF 1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
202020/12/28 20:30:08 Got a message: num-0
212020/12/28 20:30:08 Got a message: num-1
222020/12/28 20:30:08 Got a message: num-2
232020/12/28 20:30:08 Got a message: num-3
242020/12/28 20:30:08 Got a message: num-4
252020/12/28 20:30:08 Got a message: num-5
262020/12/28 20:30:08 Got a message: num-6
272020/12/28 20:30:08 Got a message: num-7
282020/12/28 20:30:08 Got a message: num-8
292020/12/28 20:30:08 Got a message: num-9
302020/12/28 20:30:08 Got a message: num-10
31
通過如上的示例,我們已經成功地實現 NSQ 的應用。下面我們將解析 NSQ 的幾個核心部分。
小結
本文主要介紹 nsq 的安裝使用,下載好可執行文件之後,依次啓動 nsqlookupd、nsqd、nsqadmin 幾個服務。接着我們基於官方提供的客戶端 API 包實現了生產消費模型的案例。通過簡單的案例,我們能夠對 nsq 的安裝和基本使用有一個瞭解。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/rve_Fd2IxCuU_ZE5p5XszA