高性能消息中間件 NSQ 解析 - 應用實踐

Nsq 是用 Go 語言開發的輕量級的分佈式消息隊列,適合小型項目使用、用來學習消息隊列實現原理,對於學習 Go channel 的原理和用法,以及如何用 Go 語言來寫分佈式是一個很不錯的入門項目。

安裝使用

在官網(https://nsq.io/overview/quick_start.html) 下載對應的二進制可執行文件。

 1# 啓動nsqlookupd
 2$ nsqlookupd
 3# 啓動 nsqd
 4$ nsqd --lookupd-tcp-address=127.0.0.1:4160
 5# 啓動 nsqadmin
 6$ nsqadmin --lookupd-http-address=127.0.0.1:4161
 7
 8# 創建topic,發送消息
 9$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
10# 啓動nsq_to_file
11$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
12# 發佈消息到 nsqd
13$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
14$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
15

在本地按照上述步驟就可以跑起來了。

創建生產者

安裝好 nsq 的幾個服務之後,我們來實現基於 nsq 的生產和消費示例。首先是創建生產者:

 1package main
 2
 3import (
 4  "fmt"
 5  "log"
 6  "time"
 7
 8  "github.com/nsqio/go-nsq"
 9)
10
11func main() {
12  config := nsq.NewConfig()
13  p, err := nsq.NewProducer("127.0.0.1:4150", config)
14
15  if err != nil {
16    log.Panic(err)
17  }
18
19  for i := 0; i < 1000; i++ {
20    msg := fmt.Sprintf("num-%d", i)
21    log.Println("Pub:" + msg)
22    err = p.Publish("testTopic", []byte(msg))
23    if err != nil {
24      log.Panic(err)
25    }
26    time.Sleep(time.Second * 1)
27  }
28
29  p.Stop()
30}
31

生產者的邏輯比較簡單,基於 nsq 官方提供的 github.com/nsqio/go-nsq包,通過調用,循環寫 1000 個字符 + 數字,即 num-n 的形式,通過 p.Publish 發送到消息隊列中,等待消費。

消費者

接着,我們創建消費者:consumer.go 來消費剛剛生產的消息。

 1package main
 2
 3import (
 4  "log"
 5  "sync"
 6
 7  "github.com/nsqio/go-nsq"
 8)
 9
10func main() {
11  wg := &sync.WaitGroup{}
12  wg.Add(1000)
13
14  config := nsq.NewConfig()
15  c, _ := nsq.NewConsumer("testTopic", "ch", config)
16  c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
17    log.Printf("Got a message: %s", message.Body)
18    wg.Done()
19    return nil
20  }))
21
22  // 1.直連nsqd
23  // err := c.ConnectToNSQD("127.0.0.1:4150")
24
25  // 2.通過 nsqlookupd 服務發現
26  err := c.ConnectToNSQLookupd("127.0.0.1:4161")
27  if err != nil {
28    log.Panic(err)
29  }
30  wg.Wait()
31}
32

可通過兩種方式與 nsqd 連接:

消費消息的動作,主要邏輯就是打印出來,實際業務中需要進行其他處理。

運行結果

依次啓動生產者和消費者的服務,可以分別看到如下的輸出結果:

 1$go run producer.go
 2
 32020/12/28 20:29:51 Pub:num-0
 42020/12/28 20:29:51 INF    1 (127.0.0.1:4150) connecting to nsqd
 52020/12/28 20:29:52 Pub:num-1
 62020/12/28 20:29:53 Pub:num-2
 72020/12/28 20:29:54 Pub:num-3
 82020/12/28 20:29:55 Pub:num-4
 92020/12/28 20:29:56 Pub:num-5
102020/12/28 20:29:57 Pub:num-6
112020/12/28 20:29:58 Pub:num-7
122020/12/28 20:29:59 Pub:num-8
132020/12/28 20:30:00 Pub:num-9
142020/12/28 20:30:01 Pub:num-10
15
16$ go run consumer.go
17
182020/12/28 20:30:08 INF    1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
192020/12/28 20:30:08 INF    1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
202020/12/28 20:30:08 Got a message: num-0
212020/12/28 20:30:08 Got a message: num-1
222020/12/28 20:30:08 Got a message: num-2
232020/12/28 20:30:08 Got a message: num-3
242020/12/28 20:30:08 Got a message: num-4
252020/12/28 20:30:08 Got a message: num-5
262020/12/28 20:30:08 Got a message: num-6
272020/12/28 20:30:08 Got a message: num-7
282020/12/28 20:30:08 Got a message: num-8
292020/12/28 20:30:08 Got a message: num-9
302020/12/28 20:30:08 Got a message: num-10
31

通過如上的示例,我們已經成功地實現 NSQ 的應用。下面我們將解析 NSQ 的幾個核心部分。

小結

本文主要介紹 nsq 的安裝使用,下載好可執行文件之後,依次啓動 nsqlookupd、nsqd、nsqadmin 幾個服務。接着我們基於官方提供的客戶端 API 包實現了生產消費模型的案例。通過簡單的案例,我們能夠對 nsq 的安裝和基本使用有一個瞭解。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/rve_Fd2IxCuU_ZE5p5XszA