萬字解析 go 語言分佈式消息隊列 nsq

0 前言

本期和大家一起探討一款完全基於 go 語言實現的分佈式消息隊列——nsq.

有關於消息隊列 Message Queue 的概念及作用,大家可以參考我之前發表的文章——萬字長文解析如何基於 Redis 實現消息隊列,這些基礎知識本文不再贅述.

nsq 是一款基於 go 語言開發實現的分佈式消息隊列組件,在 golang 世界中擁有着很高的認可度和流行性,截止今日在 github 上的 stars 數已高達 23 k. 本期會涉及大量對 nsq 的源碼走讀環節,主要包含:

此外有一個拓展性內容,涉及到 nsq 服務端與磁盤的存儲交互操作,對應項目爲 https://github.com/nsqio/go-diskqueue . 大家感興趣可自行拓展學習,這部分內容本文不作展開.

本期分享內容的大綱羅列如下:

1 架構介紹

本章我們首先來理清楚 nsq 的整體架構和核心概念,在大的方向上建立一定的理解基礎,再在後續章節中基於各個主要流程展開技術細節的探討.

1.1 整體架構

nsq 可以拆分爲三個組件:

1.2 核心概念

nsq 整體架構如上圖所示,其中涉及到的 topic、channel、producer、consumer 等概念下面我們逐一拆解:

這裏我們着重解釋一下 channel 的概念. channel 可以與其他消費隊列組件中的 consumer group 消費者組的概念類比來看:

由此可見,所有 subcribe 了相同 channel 的 consumer 之間自動形成了一種類似消費者組的機制,大家各自消費 topic 下數據的一部分,形成數據分治與負載均衡.

倘若某個 consumer 需要獲取到 topic 中的全量數據,那也容易,只需要 subscribe 一個不與他人共享的 channel 即可.

nsq 與其他消息隊列組件的另一大差異是,其中的 topic 和 channel 採用的都是懶創建的機制,使用方無需顯式執行 topic 或者 channel 的創建操作,channel 由首次針對該頻道發起 subscribe 訂閱操作的 consumer 創建;而 topic 則由首次針對該主題發起 publish 操作的 producer 或者 subscribe 操作的 consumer 創建.

到這裏,有關 nsq 的核心概念就介紹得差不多了,建議大家略作消化後,可以回頭看一眼本章最開始展示的整體架構圖,鞏固一下對概念的理解.

2 使用教程

本章開始,我們介紹一下 nsq 的基本用法.

2.1 啓動服務端

首先我們介紹,如何在一臺 linux 服務器中啓動 nsq 服務端.

nsq 的官網下載安裝包地址爲 https://nsq.io/deployment/installing.html,可以從中下載到對應於 linux 系統的安裝包壓縮文件.

在 linux 服務器上解壓文件,然後進入 bin 目錄即可一鍵啓動 nsq 服務. 爲了避免因爲啓動服務而使得終端陷入阻塞,我們通過 nohup 指令以守護進程的方式分別啓動 nsqlookupd、nsqd 和 nsqadmin 進程:

腳本:start_lookup.sh

nohup ./nsqlookupd ./nsqlookupd &
exit

腳本:start_nsqd.sh

nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 &
exit

腳本:start_admin.sh

nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 &
exit

在 nsqadmin 啓動後,即可進入 http://localhost:4171 頁面瀏覽 topic、channel 相關信息:

上述各個進程對應的端口號如下:

2.2 運行客戶端

下面展示一下,nsq 客戶端的示例代碼,其中包括對 consumer 和 producer 的運行啓動實例. 具體代碼以及註釋展示如下:

package main


import (
    "fmt"
    "sync"
    "testing"
    "time"


    "github.com/nsqio/go-nsq"
)


const (
    // 用於測試的消息主題
    testTopic = "my_test_topic"
    // nsqd 服務端地址
    nsqdAddr  = "localhost:4150"
    // 訂閱 channelGroupA 的消費者 A1
    consumerA1 = "consumer_a1"
    // 訂閱 channelGroupA 的消費者 A2
    consumerA2 = "consumer_a2"
    // 單獨訂閱 channelGroupB 的消費者 B
    consumerB  = "consumer_b"
    // testTopic 下的 channelA
    channelGroupA = "channel_a"
    // testTopic 下的 channelB
    channelGroupB = "channel_b"
)


// 建立 consumer 與 channel 之間的映射關係
// consumerA1、consumerA2 -> channelA
// consumerB -> channelB
var consumerToChannelGroup = map[string]string{
    consumerA1: channelGroupA,
    consumerA2: channelGroupA,
    consumerB:  channelGroupB,
}


func Test_nsq(t *testing.T) {
    // 消費者消費到消息後的執行邏輯
    msgCallback := func(consumerName string, msg []byte) error {
        t.Logf("i am %s, receive msg: %s", consumerName, msg)
        return nil
    }


    // 運行 producer
    if err := runProducer(); err != nil {
        t.Error(err)
        return
    }


    // 併發運行三個 consumer
    var wg sync.WaitGroup
    for consumer := range consumerToChannelGroup {
        // shadow
        wg.Add(1)
        go func(consumer string) {
            defer wg.Done()
            if err := runConsumer(consumer, msgCallback); err != nil {
                t.Error(err)
            }
        }(consumer)




    }
    wg.Wait()
}


// 運行生產者
func runProducer() error {
    // 通過 addr 直連 nsqd 服務端
    producer, err := nsq.NewProducer(nsqdAddr, nsq.NewConfig())
    if err != nil {
        return err
    }
    defer producer.Stop()


    // 通過 producer.Publish 方法,往 testTopic 中發送三條消息
    for i := 0; i < 3; i++ {
        msg := fmt.Sprintf("hello xiaoxu %d", i)
        if err := producer.Publish(testTopic, []byte(msg)); err != nil {
            return err
        }
    }
    return nil 
}


// 用於處理消息的 processor,需要實現 go-nsq 中定義的 msgProcessor interface,核心是實現消息回調處理方法:func HandleMessage(msg *nsq.Message) error
type msgProcessor struct {
    // 消費者名稱
    consumerName string
    // 消息回調處理函數
    callback     func(consumerName string, msg []byte) error
}


func newMsgProcessor(consumerName string, callback func(consumerName string, msg []byte) error) *msgProcessor {
    return &msgProcessor{
        consumerName: consumerName,
        callback:     callback,
    }
}


// 消息回調處理
func (m *msgProcessor) HandleMessage(msg *nsq.Message) error {
    // 執行用戶定義的業務處理邏輯
    if err := m.callback(m.consumerName, msg.Body); err != nil {
        return err
    }
    // 倘若業務處理成功,則調用 Finish 方法,發送消息的 ack
    msg.Finish()
    return nil
}


// 運行消費者
func runConsumer(consumerName string, callback func(consumerName string, msg []byte) error) error {
    // 根據消費者名獲取到對應的 channel
    channel, ok := consumerToChannelGroup[consumerName]
    if !ok {
        return fmt.Errorf("bad name: %s", consumerName)
    }


    // 指定 topic 和 channel,創建 consumer 實例
    consumer, err := nsq.NewConsumer(testTopic, channel, nsq.NewConfig())
    if err != nil {
        return err
    }
    defer consumer.Stop()


    // 添加消息回調處理函數
    consumer.AddHandler(newMsgProcessor(consumerName, callback))
    
    // consumer 連接到 nsqd 服務端,開啓消費流程
    if err = consumer.ConnectToNSQD(nsqdAddr); err != nil {
        return err
    }


    <-time.After(5 * time.Second)
}

按照上述流程運行單測代碼後,對應的輸出結果如下:

    nsq_test.go:32: i am consumer_a1, receive msg: hello xiaoxu 0
    nsq_test.go:32: i am consumer_b, receive msg: hello xiaoxu 0
    nsq_test.go:32: i am consumer_a2, receive msg: hello xiaoxu 1
    nsq_test.go:32: i am consumer_b, receive msg: hello xiaoxu 1
    nsq_test.go:32: i am consumer_a1, receive msg: hello xiaoxu 2
    nsq_test.go:32: i am consumer_b, receive msg: hello xiaoxu 2

可以看到,共享了 channelA 的兩個 consumerA1、consumerA2 分攤了 topic 下的消息數據;而獨享 channelB 的 consumer 則消費到了 topic 下的全量消息.

3 客戶端

本章中,我們帶着大家深入到 nsq 客戶端項目中,梳理生產者與消費者的運行流程.

nsq 客戶端 lib 庫開源地址爲:https://github.com/nsqio/go-nsq,本文走讀源碼版本爲 v1.1.0

3.1 連接交互

在 nsq 客戶端部分,無論是生產者 producer 還是消費者 consumer,在與服務端交互時,都是通過客戶端定義類 Conn 來封裝表示兩端之間建立的連接.

3.1.1 類定義

客戶端定義的連接類 Conn 定義如下,其中涉及到的核心字段,我都補充了相應的註釋.

這裏稍微解釋下 inFlight 的概念,其指的是某條消息已經被客戶端接收到,但是還未給予服務端 ack 的狀態.

type Conn struct {
    // 記錄了有多少消息處於未 ack 狀態
    messagesInFlight int64
    // ...


    // 互斥鎖,保證臨界資源一致性
    mtx sync.Mutex
    // ...


    // 真正的 tcp 連接
    conn    *net.TCPConn
    // ...
    // 連接的服務端地址
    addr    string
    // ...
    
    // 讀入口
    r io.Reader
    // 寫出口
    w io.Writer
    // writeLoop goroutine 用於接收用戶指令的 channel
    cmdChan         chan *Command
    // write goroutine 通過此 channel 接收來自客戶端的響應,發往服務端
    msgResponseChan chan *msgResponse
    // 控制 write goroutine 退出的 channel
    exitChan        chan int
    // 併發等待組,保證所有 goroutine 及時回收,conn 才能關閉
    wg        sync.WaitGroup
    // ...
}

3.1.2 創建連接

由客戶端實際向服務端發起一筆連接,對應的方法是 Conn.Connect(),其核心步驟包括:

func (c *Conn) Connect() (*IdentifyResponse, error) {
    dialer := &net.Dialer{
        LocalAddr: c.config.LocalAddr,
        Timeout:   c.config.DialTimeout,
    }




    conn, err := dialer.Dial("tcp", c.addr)
    // ...
    c.conn = conn.(*net.TCPConn)
    c.r = conn
    c.w = conn
    // var MagicV2 = []byte("  V2")
    _, err = c.Write(MagicV2)
    // ...
    c.wg.Add(2)
    // ...
    // 接收來自服務端的響應
    go c.readLoop()
    // 發送發往服務端的請求
    go c.writeLoop()
    return resp, nil
}

有關 readLoop 和 writeLoop 的設計,其核心優勢包括:

這種讀寫 loop 模式在很多 go 語言底層通信框架中都有采用,比較經典的案例包括 go 語言中的 net/http 標準庫,大家如果感興趣可以閱讀我之前發表的這篇文章——Golang HTTP 標準庫實現原理.

有關 Conn 中的 readLoop、writeLoop 部分內容,在本文 3.2、3.3 小節介紹生產者 producer 和 消費者 consumer 模塊時,會略作展開.

3.2 生產者

下面我們來介紹一下,在客戶端視角下,生產者 producer 向服務端生產消息的 publish 流程.

3.2.1 類定義

生產者 producer 類定義如下,核心字段均已給出相應註釋.

值得一提的是,producer 在創建好與服務端交互的連接 Conn 後,也會啓動一個常駐的 router goroutine,負責持續接收來自客戶端的指令,並複用 Conn 將指令發送到服務端.

// 生產者
type Producer struct {
    // 生產者標識 id
    id     int64
    // 連接的 nsqd 服務器地址
    addr   string
    // 內置的客戶端連接,其實現類是 3.1 小節中的 Conn. 次數聲明爲 producerConn interface,是爲 producer 屏蔽了一些生產者無需感知的細節
    conn   producerConn
    // ...
    // 生產者 router goroutine 接收服務端響應的 channel 
    responseChan chan []byte
    // 生產者 router goroutine 接收錯誤的 channel
    errorChan    chan []byte
    // 生產者 router goroutine 接收生產者關閉信號的 channel
    closeChan    chan int
    // 生產者 router goroutine 接收 publish 指令的 channel
    transactionChan chan *ProducerTransaction
    // ...
    // 生產者的狀態
    state           int32
    // 當前有多少 publish 指令併發執行
    concurrentProducers int32
    // 生產者 router goroutine 接收退出信號的 channel
    exitChan            chan int
    // 併發等待組,保證 producer 關閉前及時回收所有 goroutine
    wg                  sync.WaitGroup
    // 互斥鎖
    guard               sync.Mutex
}

3.2.2 publish 指令

生產者 producer 生產消息的入口方法爲 Producer.Publish(),底層會組裝出一個 PUB 指令,並調用 Producer.sendCommand() 方法發送指令.

// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {
    return w.sendCommand(Publish(topic, body))
}

PUB 指令:

// Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) *Command {
    var params = [][]byte{[]byte(topic)}
    return &Command{[]byte("PUB"), params, body}
}

3.2.3 發送指令

producer 發送指令的主流程如上圖所示,核心步驟包括:

func (w *Producer) sendCommand(cmd *Command) error {
    doneChan := make(chan *ProducerTransaction)
    err := w.sendCommandAsync(cmd, doneChan, nil)
    // ...
    t := <-doneChan
    return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction, args []interface{}) error {
    // keep track of how many outstanding producers we're dealing with
    // in order to later ensure that we clean them all up...
    atomic.AddInt32(&w.concurrentProducers, 1)
    defer atomic.AddInt32(&w.concurrentProducers, -1)


    if atomic.LoadInt32(&w.state) != StateConnected {
        err := w.connect()
        // ...
    }


    t := &ProducerTransaction{
        cmd:      cmd,
        doneChan: doneChan,
        Args:     args,
    }


    select {
    case w.transactionChan <- t:
    case <-w.exitChan:
        return ErrStopped
    }


    return nil
}

可以看到,在 Producer.connect() 方法中:

func (w *Producer) connect() error {
    w.guard.Lock()
    defer w.guard.Unlock()


    // ...
    w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
    // ...
    // producer 創建和 nsqd 之間的連接
    _, err := w.conn.Connect()
    // ...
    atomic.StoreInt32(&w.state, StateConnected)
    w.closeChan = make(chan int)
    w.wg.Add(1)
    go w.router()


    return nil
}

router goroutine 的運行框架如下,主要通過 for + select 的形式,持續接收來自 Producer.transactionChan 通道的指令,將其發往服務端.

func (w *Producer) router() {
    for {
        select {
        case t := <-w.transactionChan:
            w.transactions = append(w.transactions, t)
            err := w.conn.WriteCommand(t.cmd)
            // ...
        // ...
        case <-w.closeChan:
            goto exit
        case <-w.exitChan:
            goto exit
        }
    }


// ...
}

3.2.4 接收響應

下面我們來理一下,producer 在發送完 PUB 指令後,又要如何接收到來自服務端的響應. 大家可以結合上面這張流程圖,和我們深入到下面的源碼走讀環節:

步驟 I:在 Conn.readLoop() 方法中,讀取到來自服務端的響應,通過調用 Producer.onConnResponse() 方法,將數據發送到 Producer.responseChan 當中

func (c *Conn) readLoop() {
    delegate := &connMessageDelegate{c}
    for {
        // ...


        frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
        // ...


        switch frameType {
        case FrameTypeResponse:
            c.delegate.OnResponse(c, data)
        // ...
        }
    }
    // ...
}
func (d *producerConnDelegate) OnResponse(c *Conn, data []byte)       { d.w.onConnResponse(c, data) }
func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan <- data }

步驟 II: Producer 的 router goroutine 通過 Producer.responseChan 接收到響應數據,通過調用 Producer.popTransaction(...) 方法,將響應推送到 doneChan 當中

func (w *Producer) router() {
    for {
        select {
        // ...
        case data := <-w.responseChan:
            w.popTransaction(FrameTypeResponse, data)
        case data := <-w.errorChan:
            w.popTransaction(FrameTypeError, data)
        // ...
    }


    // ...
}
func (w *Producer) popTransaction(frameType int32, data []byte) {
    // ...
    t := w.transactions[0]
    // ...
    t.finish()
}
func (t *ProducerTransaction) finish() {
    if t.doneChan != nil {
        t.doneChan <- t
    }
}

步驟 III: 客戶端通過在 Producer.sendCommand 方法中阻塞等待來自 doneChan 的數據,接收到後將其中的錯誤返回給上層.

func (w *Producer) sendCommand(cmd *Command) error {
    doneChan := make(chan *ProducerTransaction)
    err := w.sendCommandAsync(cmd, doneChan, nil)
    // ...
    t := <-doneChan
    return t.Error
}

3.3 消費者

接下來梳理一下,消費者 consumer 訂閱、消費以及 ack 的流程.

3.3.1 類定義

客戶端關於消費者的類定義如下. 每當用戶註冊消息回調處理函數 handler 時,consumer 就會啓動一個 handleLoop goroutine,負責接收消息並調用相應的處理函數.

需要注意,consumer 在被創建出來時,就需要明確指定其訂閱的 topic 以及 chanel.

type Consumer struct {
    // ...
    // 互斥鎖
    mtx sync.RWMutex


    // ...
    // 消費者標識 id
    id      int64
    // 消費者訂閱的 topic
    topic   string
    // 消費者訂閱的 channel
    channel string
    // ...
    // 用於接收消息的 channel
    incomingMessages chan *Message


    // ...
    pendingConnections map[string]*Conn
    connections        map[string]*Conn
  
    // 連接的 nsqd 地址
    nsqdTCPAddrs []string


    // ...
}

3.3.2 添加 handler

用戶可以通過調用 Consumer.AddHandler(...) 方法,針對每個 handler 會啓動一個 handleLoop goroutine 用於在消費到消息時執行預定義的回調函數.

需要注意的是,倘若用戶註冊了多個 handler,最終每條消息只會隨機被其中一個 handler 處理.

func (r *Consumer) AddHandler(handler Handler) {
    r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
    // ...
    atomic.AddInt32(&r.runningHandlers, int32(concurrency))
    for i := 0; i < concurrency; i++ {
        go r.handlerLoop(handler)
    }
}
func (r *Consumer) handlerLoop(handler Handler) {
    // ...
    for {
        message, ok := <-r.incomingMessages
        // ...
        err := handler.HandleMessage(message)
        // ...
    }


    // ...
}

3.3.3 連接服務端

消費者通過調用 Consumer.ConnectToNSQD(...) 方法,實現與 nsqd 服務端的連接交互,其中核心步驟包括:

func (r *Consumer) ConnectToNSQD(addr string) error {
    // 創建鏈接
    conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
    // ...
    r.mtx.Lock()
    _, pendingOk := r.pendingConnections[addr]
    _, ok := r.connections[addr]
    // ...
    r.pendingConnections[addr] = conn
    if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
        r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
    }
    r.mtx.Unlock()


    // ...
    resp, err := conn.Connect()
    // ...


    // ...
    cmd := Subscribe(r.topic, r.channel)
    err = conn.WriteCommand(cmd)
    // ...
    r.mtx.Lock()
    delete(r.pendingConnections, addr)
    r.connections[addr] = conn
    r.mtx.Unlock()
    // ...
    return nil
}

3.3.4 消費消息

當指定 topic channel 下有新消息產生時,consumer 持有的 conn 會通過 readLoop goroutine 接收到對應的消息,並調用 Consumer.onConnMessage(...) 方法,將消息推送到 Consumer.incomingMessages 通道中.

func (c *Conn) readLoop() {
    delegate := &connMessageDelegate{c}
    for {
        // ...


        frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
        // ...


        switch frameType {
        // ...
        case FrameTypeMessage:
            msg, err := DecodeMessage(data)
            // ...           
            c.delegate.OnMessage(c, msg)
        // ...
    }


    // ...
}
func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message)         { d.r.onConnMessage(c, m) }
func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
    // ...
    r.incomingMessages <- msg
}

此前在註冊 consumer handler 時啓動的 handlerLoop goroutine 便會通過 Consumer.incomingMessages 通道獲取到消息,並調用相應的 handler 執行回調處理邏輯.

func (r *Consumer) handlerLoop(handler Handler) {
    // ...
    for {
        message, ok := <-r.incomingMessages
        // ...
        err := handler.HandleMessage(message)
        // ...
    }




    // ...
}

3.3.5 消息 ack

爲防止消息丟失,nsq 中同樣支持了 consumer ack 機制.

consumer 在接收到消息併成功調用 handler 回調處理函數後,可以通過調用 Message.Finish() 方法,向服務端發送 ack 指令,確認消息已成功接收.

倘若服務端超時未收到 ack 響應,則會默認消息已丟失,會重新推送一輪消息.

consumer ack 流程的核心代碼展示如下,在 Conn.onMessageFinish(...) 方法中,會通過 Conn.msgResponseChan 通道將數據推送到 Conn writeLoop goroutine 當中,由其負責將請求發往 nsqd 服務端.

func (m *Message) Finish() {
    .// ..
    m.Delegate.OnFinish(m)
}
func (d *connMessageDelegate) OnFinish(m *Message) { d.c.onMessageFinish(m) }
func (c *Conn) onMessageFinish(m *Message) {
    c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true}
}

Conn.writeLoop 是 Conn 負責向服務端發送指令的常駐 goroutine,其在接收到來自 Consumer.msgResponseChan 通道的數據後,會根據其成功狀態,選擇調用 OnMessageFinished 或者 OnMessageRequeued 將其封裝成 ack 或者重試指令  最後調用 Conn.WriteCommand(...) 方法將指令發往服務端.

func (c *Conn) writeLoop() {
    for {
        select {
        // ...
        // ...  
        case resp := <-c.msgResponseChan:
            // ...
            msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)


            if resp.success {
                c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
                c.delegate.OnMessageFinished(c, resp.msg)
                c.delegate.OnResume(c)
            } else {
                c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
                c.delegate.OnMessageRequeued(c, resp.msg)
                if resp.backoff {
                    c.delegate.OnBackoff(c)
                } else {
                    c.delegate.OnContinue(c)
                }
            }


            err := c.WriteCommand(resp.cmd)
            // ...
        }
    }
    // ...
}

4 服務端

接下來進入本文最核心的章節,我們來揭示一下 nsq 服務端的底層實現原理.

nsq 服務端開源地址:https://github.com/nsqio/nsq 本文走讀源碼的版本爲: v1.2.1

4.1 核心類

首先統一梳理一下,nsq 服務端所涉及到的幾個核心類.

4.1.1 nsqd

NSQD 類對應於一個 nsqd 節點,其中封裝了各類重要信息:

type NSQD struct {
    // 節點內遞增的客戶端 id 序號,爲每個到來的客戶端請求分配唯一的標識 id
    clientIDSequence int64
    // 一把讀寫鎖,保證臨界資源併發安全
    sync.RWMutex
    
    // ...
    // 存在的 topic 集合
    topicMap map[string]*Topic


    // 集羣中的其他 nsqd 節點 
    lookupPeers atomic.Value
    // 運行的 tcp server
    tcpServer     *tcpServer
    // tcp server 使用的端口監聽器
    tcpListener   net.Listener
    // http server 使用的端口監聽器
    httpListener  net.Listener
    // https server 使用的端口監聽器
    httpsListener net.Listener
    // ...


    // 用於回收派生出去的 goroutine  
    exitChan             chan int
    // 併發等待組工具,保證派發出去的 goroutine 都能得到及時回收
    waitGroup            util.WaitGroupWrapper
    // 集羣信息
    ci *clusterinfo.ClusterInfo
}

4.1.2 topic

Topic 對應爲一個消息主題,其下包含核心字段:

type Topic struct {
    // 統計消息數量和大小
    messageCount uint64
    messageBytes uint64
    
    // 讀寫鎖,保證臨界資源併發安全
    sync.RWMutex


    // 當前 topic 名稱
    name              string
    // 當前 topic 下存在的 channel 集合
    channelMap        map[string]*Channel
    // 當 memoryMsgChan 滿了,消息通過該組件持久化落盤
    backend           BackendQueue
    // 用於在內存中傳遞當前 topic 下的消息
    memoryMsgChan     chan *Message
    // 通知 pump goroutine 啓動
    startChan         chan int
    // 通知 pump goroutine 退出
    exitChan          chan int
    // 通知 pump goroutine channels 有更新
    channelUpdateChan chan int
    // 等待組工具,保證當前 topic 下派生出的 goroutine 能夠被回收
    waitGroup         util.WaitGroupWrapper
    // 消息 id 生成器
    idFactory         *guidFactory


    // ...
    // 從屬的 nsqd 模塊
    nsqd *NSQD
}

在 topic 被構造出來時,會調用 Topic.messagePump(...) 方法,異步啓動一個 goroutine,負責持續接收分配給該 topic 的消息,並將消息逐一發送給該 topic 下的每個 channel

// Topic constructor
func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
    // 構造 topic 實例
    t := &Topic{
        // ...
    }
    // ...


    // 啓動 topic 下的 pump goroutine
    t.waitGroup.Wrap(t.messagePump)


    // ...
    // 返回 topic 實例
    return t
}

topic.messagePump 方法非常核心,這裏先展示一下其核心源碼,該方法在後續 4.3 小節的 publish 流程中還會有所涉及.

func (t *Topic) messagePump() {
    var msg *Message
    var buf []byte
    var err error
    var chans []*Channel
    var memoryMsgChan chan *Message
    var backendChan <-chan []byte


    // ...
    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()
    if len(chans) > 0 && !t.IsPaused() {
        // 獲取基於內存的 memoryChan 和基於磁盤的 backendChan
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }




    // main message loop
    for {
        select {
        // 通過內存通道接收消息
        case msg = <-memoryMsgChan:
        // 通過磁盤通道接收消息
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            // ...
        // 倘若和 lookupd 通信發送 channel 有變更,則需要對 channels list 進行更新
        case <-t.channelUpdateChan:
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        // ...
        }


        // 遍歷 topic 下所有 channel,一一將消息投遞過去
        for i, channel := range chans {
            chanMsg := msg
            // ...
            // 倘若消息類型爲延時消息,則將其添加到延時隊列
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            // 將消息逐一投遞到 topic 下的每個 channel 中
            err := channel.PutMessage(chanMsg)
            // ...
        }
    }


    // ...
}

4.1.3 channel

Channel 對應爲一個消息頻道,

值得一提的是,channel 下的 memoryMsgChan 和 backend 會同時被所有訂閱了該 channel 的 consumer client goroutine 所接收,因此 channel 下的同一條消息只會隨機被某個 consumer 消費到.

type Channel struct {
    // 一些計數器
    requeueCount uint64
    messageCount uint64
    timeoutCount uint64


    // 讀寫鎖,保證臨界資源併發安全
    sync.RWMutex
    // 從屬的 topic 名稱
    topicName string
    // 當前 channel 名稱
    name      string
    // 從屬的 nsqd 模塊
    nsqd      *NSQD
    // 當 memoryMsgChan 滿了,則通過該組件將消息落盤傳遞
    backend BackendQueue
    // 用於在內存中傳遞當前 channel 下的消息
    memoryMsgChan chan *Message
    // ...
    // 記錄當前 channel 下的 consumer 集合
    clients        map[int64]Consumer
    // 延時消息集合
    deferredMessages map[MessageID]*pqueue.Item
    // 延時消息隊列,底層基於一個小頂堆實現,以執行時間戳作爲排序的鍵
    deferredPQ       pqueue.PriorityQueue
    // 保護延時消息隊列的互斥鎖
    deferredMutex    sync.Mutex
    // 重試(待確認)消息集合
    inFlightMessages map[MessageID]*Message
    // 重試(待確認)消息隊列,底層基於一個小頂堆實現,以執行時間戳作爲排序的鍵
    inFlightPQ       inFlightPqueue
    // 保護重試(待確認)消息隊列的互斥鎖
    inFlightMutex    sync.Mutex
}

4.1.4 deferredPQ

延時隊列 deferredQ 和待 ack 隊列 inFlightPQ 底層數據結構類似,都是基於時間戳進行排序的小頂堆. 這裏我們以 deferredPQ 爲例展開介紹.

在 deferredQ 中使用了 golang container/heap 標準庫中的堆結構,其中定義的堆 interface 如下:

type Interface interface {
    sort.Interface
    Push(x any) // add x as element Len()
    Pop() any   // remove and return element Len() - 1.
}
type Interface interface {
    // 返回堆的長度
    Len() int


    // 判斷堆中元素大小的規則
    Less(i, j int) bool


    // 交換 index = i 和 index = j 的兩個元素
    Swap(i, j int)
}

在 nsq 中定義了延時隊列類型 PriorityQueue,逐一實現了上述 interface 的各個方法. 其中,用於比較元素大小的 Item.Priority 對應就是一條消息的執行時間戳.

type Item struct {
    Value    interface{}
    Priority int64
    Index    int
}


// this is a priority queue as implemented by a min heap
// ie. the 0th element is the *lowest* value
type PriorityQueue []*Item




func New(capacity int) PriorityQueue {
    return make(PriorityQueue, 0, capacity)
}




func (pq PriorityQueue) Len() int {
    return len(pq)
}




func (pq PriorityQueue) Less(i, j int) bool {
    return pq[i].Priority < pq[j].Priority
}




func (pq PriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
    pq[i].Index = i
    pq[j].Index = j
}




func (pq *PriorityQueue) Push(x interface{}) {
    n := len(*pq)
    c := cap(*pq)
    if n+1 > c {
        npq := make(PriorityQueue, n, c*2)
        copy(npq, *pq)
        *pq = npq
    }
    *pq = (*pq)[0 : n+1]
    item := x.(*Item)
    item.Index = n
    (*pq)[n] = item
}




func (pq *PriorityQueue) Pop() interface{} {
    n := len(*pq)
    c := cap(*pq)
    if n < (c/2) && c > 25 {
        npq := make(PriorityQueue, n, c/2)
        copy(npq, *pq)
        *pq = npq
    }
    item := (*pq)[n-1]
    item.Index = -1
    *pq = (*pq)[0 : n-1]
    return item
}

在 PriorityQueue 中專門實現了一個 PriorityQueue.PeekAndShift(...) 方法,其作用是傳入一個指定時間戳 max,倘若堆頂消息時間戳小於等於 max,則 pop 彈出,否則不執行任何操作.

func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) {
    if pq.Len() == 0 {
        return nil, 0
    }


    item := (*pq)[0]
    if item.Priority > max {
        return nil, item.Priority - max
    }
    heap.Remove(pq, 0)




    return item, 0
}

往延時隊列中添加消息的方法入口爲 Channel.addToDeferredPQ(...) ,其中會把一條延時消息封裝成一個 pqueue.Item,然後調用 container/heap 包下的 Push 方法,裏面會實現新元素入堆的 heapInsert 操作.

func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
    c.deferredMutex.Lock()
    heap.Push(&c.deferredPQ, item)
    c.deferredMutex.Unlock()
}

介紹完幾個核心類之後,下面我們開始針對幾個核心流程進行梳理總結.

4.2 服務運行

首先我們介紹一下,nsqd 服務端是如何啓動運行的.

4.2.1 宏觀流程

首先,nsqd 服務端的啓動入口位於 apps/nsqd/main.go 文件的 main 方法中:

func main() {
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        logFatal("%s", err)
    }
}

此處 nsq 使用了 go-svc 腳手架:https://github.com/judwhite/go-svc .

在 svc 框架中,會分別調用 program 的 Init() 和 Start() 方法,用於初始化和運行 nsqd 後臺程序.

I program.Init

在 program.Init() 方法中,會讀取用戶輸入的配置,然後構造出 nsqd 實例:

func (p *program) Init(env svc.Environment) error {
    // ...
    
    // 初始化 nsqd 實例
    nsqd, err := nsqd.New(opts)
    // ...
    p.nsqd = nsqd


    return nil
}

II program.Start

在 program.Start() 方法中,會異步調用 NSQD.Main() 方法,並分別在其中啓動如下幾個異步任務:

func (p *program) Start() error {
    // program 啓動前會加載之前存儲的歷史數據信息
    err := p.nsqd.LoadMetadata()
    // ...
    err = p.nsqd.PersistMetadata()
    // ...


    go func() {
        // 調用 nsqd.Main 方法,啓動 nsqd 常駐 goroutine
        err := p.nsqd.Main()
        // ...
    }()


    return nil
}
func (n *NSQD) Main() error {
    // ...
    // 啓動 tcp 服務,提供 publish 和 subscribe 
    n.waitGroup.Wrap(func() {
        exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    })    
    // 啓動 http server,提供 publish 流程有關的 api
    if n.httpListener != nil {
        httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
        })
    }
    // 啓動 https server
    if n.httpsListener != nil {
        httpsServer := newHTTPServer(n, true, true)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
        })
    }


    n.waitGroup.Wrap(n.queueScanLoop)
    // 啓動
    n.waitGroup.Wrap(n.lookupLoop)
    // ...


    err := <-exitCh
    return err
}

4.2.2 tcp server

nsqd 的 tcp server 部分採用的是服務端中經典的 for + listen 模式,每當通過 listener.Accept() 方法接收到一筆來自於客戶端的連接後,會爲這筆連接分配一個 goroutine 處理後續到來的請求.

這部分實現與 go 語言 net/http 標準庫類似,listener.Accept() 方法向下延伸會使用到 linux 的 epoll 多路複用指令,這部分內容大家如果感興趣的話,可以閱讀一下我之前發表的兩篇文章:

迴歸正題,tcp server 的運行方法展示如下:

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
    // ...


    var wg sync.WaitGroup


    for {
        // 接收新的 tcp 請求
        clientConn, err := listener.Accept()
        // ...
        wg.Add(1)
        go func() {
            // 處理請求
            handler.Handle(clientConn)
            wg.Done()
        }()
    }


    // ...
    wg.Wait()
    // ...


    return nil
}

針對於每筆到來的連接,nsqd 服務端會:

func (p *tcpServer) Handle(conn net.Conn) {
    // ...
    buf := make([]byte, 4)
    _, err := io.ReadFull(conn, buf)
    // ...
    protocolMagic := string(buf)
    // ...
    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{nsqd: p.nsqd}
    default:
        // ...
        return
    }


    // 基於當前請求的 conn,創建一個新的 client 實例
    client := prot.NewClient(conn)
    p.conns.Store(conn.RemoteAddr(), client)
 
    // 當前請求維度的常駐處理流程
    err = prot.IOLoop(client)
    // ...
    p.conns.Delete(conn.RemoteAddr())
    client.Close()
}

protocolV2.IOLoop() 方法是處理一筆客戶端連接的主方法,核心步驟包括:

func (p *protocolV2) IOLoop(c protocol.Client) error {
    var err error
    var line []byte
    var zeroTime time.Time


    client := c.(*clientV2)


    // synchronize the startup of messagePump in order
    // to guarantee that it gets a chance to initialize
    // goroutine local state derived from client attributes
    // and avoid a potential race with IDENTIFY (where a client
    // could have changed or disabled said attributes)
    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan


    // 啓動
    for {
        // ...
        line, err = client.Reader.ReadSlice('\n')
        // ...
        var response []byte
        response, err = p.Exec(client, params)
        // ...


        if response != nil {
            err = p.Send(client, frameTypeResponse, response)
            // ...
        }
    }


    // ...      
    close(client.ExitChan)
    if client.Channel != nil {
        client.Channel.RemoveClient(client.ID)
    }
    return err
}

protocolV2.Exec(...) 方法中,會根據各類指令,dispatch 到各種處理方法當中. 本文重點介紹的指令包括:

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    // ...
    switch {
    // ...
    case bytes.Equal(params[0][]byte("PUB")):
        return p.PUB(client, params)
    // ...
    case bytes.Equal(params[0][]byte("SUB")):
        return p.SUB(client, params)
    // ...
    case bytes.Equal(params[0][]byte("FIN")):
        return p.FIN(client, params)    
    }
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

4.3 publish 流程

下面我們展開 nsqd 服務端針對 PUB 指令的處理流程. 這部分屬於本文中絕對核心的內容.

4.3.1 topic -> channel

在 protocolV2.PUB(...) 方法中:

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    var err error


    // ...
    topicName := string(params[1])
    // ...


    bodyLen, err := readLen(client.Reader, client.lenSlice)
    // ...
    messageBody := make([]byte, bodyLen)
    _, err = io.ReadFull(client.Reader, messageBody)
    // ...


    topic := p.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
    err = topic.PutMessage(msg)
    // ...


    client.PublishedMessage(topicName, 1)


    return okBytes, nil
}
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
    t.RLock()
    defer t.RUnlock()
    // ...
    err := t.put(m)
    // ...
    return nil
}

nsqd 服務端處理 sub 指令流程:

func (t *Topic) put(m *Message) error {
    // ...
    if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 {
        select {
        // 寫到內存中
        case t.memoryMsgChan <- m:
            return nil
        // 內存 chan 已滿,則寫入到磁盤
        default:
            break // write to backend
        }
    }
    err := writeMessageToBackend(m, t.backend)
    // ...
    return nil
}

接下來我們再觀察一下 Topic.messagePump() 方法. 其中在讀取到新消息後,會遍歷當前 topic 下的 channel,逐一調用 Channel.PutMessage(...) 方法,將消息傳遞到每個 channel 中.

func (t *Topic) messagePump() {
    // ...
    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            // ...
        // ...
        }


        for i, channel := range chans {
            chanMsg := msg
            // ...            
            // 延遲消息特殊處理
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            // ...
        }
    }
// ...
}

在 Channel.PutMessage(...) 方法中,會將消息通過 Channel.memoryMsgChan 或者 Channel.backend,將消息隨機傳遞到某個訂閱了該 channel 的 consumer client 對應的 protocolV2.messagePump(...) goroutine 當中.

此處呼應了 4.2.2 小節,該方法會於 4.3.2 小節詳細展開.

func (c *Channel) PutMessage(m *Message) error {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()
    // ...
    err := c.put(m)
    // ...
    atomic.AddUint64(&c.messageCount, 1)
    return nil
}
func (c *Channel) put(m *Message) error {
    select {
    case c.memoryMsgChan <- m:
    default:
        err := writeMessageToBackend(m, c.backend)
        c.nsqd.SetHealth(err)
        // ...
    }
    return nil
}

4.3.2 channel -> consumer

當一條消息從 topic 被一一推送到每個 channel 的 memoryMsgChan 和 backendChan 中時,這筆數據會隨機被一個訂閱了該 channel 的 consumer 消費.

這種隨機與互斥的性質得以實現的原因是,是多個 consumer 競爭消費同一個 memoryMsgChan 和 backendChan 中的數據,因此同一條數據一定只能被一個 consumer client 所獲取到.

consumer 消費 channel 數據的內容可以參見方法 protocolV2.messagePump() ,該方法是與每一個連接 nsqd 服務端的客戶端連接一一對應的.

值得一提的是,只有某個 client 執行了訂閱 channel 的指令,此時才能從 client.SubEventChan 下接收到該 channel 對應的 subChannel,並進一步獲得該 channel 的 memoryMsgChan 和 backendChan. 這部分會與 3.4 小節的內容形成呼應.

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    var err error
    var memoryMsgChan chan *Message
    var backendMsgChan <-chan []byte
    var subChannel *Channel
    // ...
    // 當前 client 用於接收 subChannel 的通道
    subEventChan := client.SubEventChan
    // ...  


    for {
       
        // ...
        // 倘若當前 client 執行 sub 指令,次數獲取到訂閱 channel 下用於傳遞消息的 memoryMsgChan
        memoryMsgChan = subChannel.memoryMsgChan
        backendMsgChan = subChannel.backend.ReadChan()
        flusherChan = outputBufferTicker.C
        
        // ...
        select {
        // ...      
        // 首先,倘若當前 client 執行的是 sub 指令,則此處會獲取到該客戶端訂閱的 channel
        // 倘若 client 發送的指令是 SUB,則其訂閱的 channel 會通過 subEventChan 發送過來
        case subChannel = <-subEventChan:
            // you can't SUB anymore
            subEventChan = nil
        // ...
        // 嘗試接收訂閱 channel 下的 backendMsgChan
        case b := <-backendMsgChan:
            // ...
            msg, err := decodeMessage(b)
            // ...
            // 發送消息前,先將消息添加到待 ack 確認隊列中
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            // 將消息發送到 client
            err = p.SendMessage(client, msg)
            // ...
        // 嘗試接收訂閱 channel 下的 memoryMsgChan
        case msg := <-memoryMsgChan:
            // ...
            // 發送消息前,先將消息添加到待 ack 確認隊列中
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            // 將消息發送到 client
            err = p.SendMessage(client, msg)
            // ...
        }
    }


    // ...
}

向客戶端發送消息的方法鏈路展示如下,核心是獲取 client 中的 tcp 連接,然後向 io writer 中寫入數據發往客戶端

func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error {
    // ...
    buf := bufferPoolGet()
    defer bufferPoolPut(buf)


    _, err := msg.WriteTo(buf)
    // ...


    err = p.Send(client, frameTypeMessage, buf.Bytes())
    // ...
}
func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error {
    // ...
    _, err := protocol.SendFramedResponse(client.Writer, frameType, data)
    // ...
}
func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error) {
    // ...
    n, err = w.Write(data)
    // ...
}

4.4 subscribe 流程

接下來我們一起梳理下 consumer 向 nsqd 服務端發起 subscribe 指令的流程.

4.4.1 sub 主流程

處理 SUB 指令的入口方法爲 protocolV2.SUB(...),其核心步驟是:

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    // ...
    topicName := string(params[1])
    // ...


    channelName := string(params[2])
    // ...
    var channel *Channel
    for i := 1; ; i++ {
        topic := p.nsqd.GetTopic(topicName)
        channel = topic.GetChannel(channelName)
        if err := channel.AddClient(client.ID, client); err != nil {
            // ...
        }


        // ...
        break
    }
    atomic.StoreInt32(&client.State, stateSubscribed)
    client.Channel = channel
    // update message pump
    client.SubEventChan <- channel


    return okBytes, nil
}

在 1.2 小節中,我們有提到,channel 是在首次被 consumer 訂閱時自動創建的,該流程對應的方法鏈路如下:

func (t *Topic) GetChannel(channelName string) *Channel {
    t.Lock()
    channel, isNew := t.getOrCreateChannel(channelName)
    t.Unlock()
    // ...
    return channel
}
// this expects the caller to handle locking
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
    channel, ok := t.channelMap[channelName]
    if !ok {
        // ...
        channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback)
        t.channelMap[channelName] = channel
        // ...
        return channel, true
    }
    return channel, false
}

4.4.2 pump 協程接收 chan

對應於每個 consumer client 會有一個運行 protocolV2.messagePump(...) 方法的 goroutine,其中會通過 client.SubEventChan 接收到訂閱的 channel,然後從對應的 memoryMsgChan 和 backendChan 中接收消息:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    var err error
    var memoryMsgChan chan *Message
    var backendMsgChan <-chan []byte
    var subChannel *Channel
    // ...
    subEventChan := client.SubEventChan
    // ...  


    for {
       
        // ...
        // 倘若當前 client 執行 sub 指令,次數獲取到訂閱 channel 下用於傳遞消息的 memoryMsgChan
        memoryMsgChan = subChannel.memoryMsgChan
        backendMsgChan = subChannel.backend.ReadChan()
        flusherChan = outputBufferTicker.C
        
        // ...
        select {        
        case subChannel = <-subEventChan:
            // you can't SUB anymore
            subEventChan = nil
        // ...         
        }
    }
    // ...
}

4.5 inFlight&ack 機制

下面我們捋一下,nsq 爲保證消息不丟失而提供的 inFlight 和 ack 機制.

4.5.1 StartInFlight

在 protocolV2.messagePump(...) 方法中,每當從 channel 中接收到消息要發往客戶端之前,都會先調用 Channel.StartInFlightTimeout(...) 方法,將消息添加到待 ack 確認隊列 inFlightPQ 當中:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    // ...
    for {
        // ...
        select {
        // ...
        case b := <-backendMsgChan:
            // ...
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            // ...
            err = p.SendMessage(client, msg)
            // ...
        case msg := <-memoryMsgChan:
            // ...
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            // ...
            err = p.SendMessage(client, msg)
            // ...
        }
    }
    // ...
}

添加消息進入 inFlightPQ 前,會根據用戶設置的超時重試時間,推算出消息重試的執行時間,以時間戳作爲排序的鍵,將消息添加進入小頂堆中.

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    msg.pri = now.Add(timeout).UnixNano()
    err := c.pushInFlightMessage(msg)
    if err != nil {
        return err
    }
    c.addToInFlightPQ(msg)
    return nil
}
func (c *Channel) addToInFlightPQ(msg *Message) {
    c.inFlightMutex.Lock()
    c.inFlightPQ.Push(msg)
    c.inFlightMutex.Unlock()
}

4.5.2 queueScan

在 nsqd 節點啓動時,會異步啓動一個 gorouting 執行 NSQD.queueScanLoop() 方法,該方法的用途就是定時輪詢 inFlightPQ 和 deferredPQ,取出其中達到時間條件的消息進行執行

func (n *NSQD) Main() error {
    // ...
    n.waitGroup.Wrap(n.queueScanLoop)
    // ...
}
func (n *NSQD) queueScanLoop() {
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    // ...
    // 輪詢時間間隔
    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    // ...
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)


    channels := n.channels()
    // 在方法中會啓動 scan worker goroutine
    n.resizePool(len(channels), workCh, responseCh, closeCh)
    // ...
    for {
        select {
        case <-workTicker.C:
            // ...
        // ...
        }


        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }


        // ...
        // 將需要掃描的 channel 通過 workCh 發給 scan worker goroutine
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }


         // ...
    }   
}
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    // ...
    for {
        // ...
        n.waitGroup.Wrap(func() {
            n.queueScanWorker(workCh, responseCh, closeCh)
        })
        n.poolSize++      
    }
}

queueScanWorker 是 nsqd 節點下異步運行的 goroutine,每當通過 workCh 接收到 channel 時,會調用 Channel.processInFlightQueue(...) 方法,完成掃描任務:

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            // ...
            responseCh <- dirty
        // ...
        }
    }
}

在掃描任務中,每次會嘗試從 inFlightPQ 堆頂彈出達到執行時間的消息,然後調用 Channel.put(...) 方法將消息傳遞到某個訂閱了該 channel 的 client 手中:

func (c *Channel) processInFlightQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()


    if c.Exiting() {
        return false
    }


    dirty := false
    for {
        c.inFlightMutex.Lock()
        // 倘若堆頂的消息已經達到執行時間,則彈出對應的消息
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()


        // 倘若堆頂消息未達到執行時間,則結束本次任務
        if msg == nil {
            goto exit
        }
        dirty = true


        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            client.TimedOutMessage()
        }
        // 發送重試消息
        c.put(msg)
    }


exit:
    return dirty
}

4.5.3 ack

倘若某條消息已經得到了 consumer 的 ack,則 nsqd 服務端會接收到來自 consumer 發送的 FIN 指令,然後步入到 protocolV2.FIN(...) 方法當中:

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    // ...
    switch {
    case bytes.Equal(params[0][]byte("FIN")):
        return p.FIN(client, params)
    // ...
    }
    // ...
}

在 protocolV2.FIN(...) 方法中,會獲取到消息的唯一 id,然後調用 Channel.FinishMessage(...) 方法,將消息從 channel 的 inFlightPQ 當中移除

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
    // ...
    id, err := getMessageID(params[1])
    // ...
    err = client.Channel.FinishMessage(client.ID, *id)
    // ...
    client.FinishedMessage()


    return nil, nil
}
// FinishMessage successfully discards an in-flight message
func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
    msg, err := c.popInFlightMessage(clientID, id)
    // ...
    c.removeFromInFlightPQ(msg)
    // ...
}
func (c *Channel) removeFromInFlightPQ(msg *Message) {
    c.inFlightMutex.Lock()
    // ...
    c.inFlightPQ.Remove(msg.index)
    c.inFlightMutex.Unlock()
}

4.6 deferred 機制

下面我們梳理一下 nsq 實現延時消息的流程.

4.6.1 添加延時隊列

首先在 publish 流程中,消息由 topic 發放每個 channel 前,會先判斷消息是否爲延時消息類型. 倘若爲延時消息,則會調用 Channel.PutMessageDeferred(...) 方法追加到延時隊列當中.

func (t *Topic) messagePump() {
    // ...
    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            // ...
        // ...
        }


        for i, channel := range chans {
            chanMsg := msg
            // ...          
            // 延遲消息特殊處理
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            // ...
        }
    }
    // ...
}

添加消息進入延時隊列的方法鏈路如下,其中主要是通過當前時刻結合延時設置,推算出消息的執行時刻,然後以該時間戳爲排序鍵,將消息添加到 deferredPQ 的小頂堆中.

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
    atomic.AddUint64(&c.messageCount, 1)
    c.StartDeferredTimeout(msg, timeout)
}
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
    absTs := time.Now().Add(timeout).UnixNano()
    item := &pqueue.Item{Value: msg, Priority: absTs}
    err := c.pushDeferredMessage(item)
    // ...
    c.addToDeferredPQ(item)
    return nil
}
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
    c.deferredMutex.Lock()
    // TODO: these map lookups are costly
    id := item.Value.(*Message).ID
    _, ok := c.deferredMessages[id]
    if ok {
        c.deferredMutex.Unlock()
        return errors.New("ID already deferred")
    }
    c.deferredMessages[id] = item
    c.deferredMutex.Unlock()
    return nil
}
func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
    c.deferredMutex.Lock()
    heap.Push(&c.deferredPQ, item)
    c.deferredMutex.Unlock()
}

4.6.2 處理延時任務

掃描執行延時隊列的流程與 4.5.2 小節介紹的處理待確認消息的流程類似,都是以 NSQD.queueScanWorker(...) 方法爲入口,定時輪詢延時隊列,找到滿足時間條件的消息,調用 Channel.put(...) 方法發送消息.

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            // ...
            if c.processDeferredQueue(now) {
                dirty = true
            }
        // ...
        }
    }
}

執行

func (c *Channel) processDeferredQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()


    // ...
    dirty := false
    for {
        c.deferredMutex.Lock()
        // 倘若堆頂的消息已經達到執行時間條件,則取出進行執行
        item, _ := c.deferredPQ.PeekAndShift(t)
        c.deferredMutex.Unlock()
        // 倘若堆頂的消息都尚未達到執行時間條件,則結束本次任務
        if item == nil {
            goto exit
        }
        dirty = true




        msg := item.Value.(*Message)
        _, err := c.popDeferredMessage(msg.ID)
        if err != nil {
            goto exit
        }
        // 發送延時消息,由  protocolV2.messagePump(..) 方法接收處理
        c.put(msg)
    }


exit:
    return dirty
}

5 總結

本期向大家詳細分享了一款完全基於 go 語言實現的分佈式消息隊列——nsq. 文中分別向大家詳細介紹了有關 nsq 的核心概念、使用教程,並且和大家一起深入到 nsq 項目源碼當中,逐一向大家揭示了 nsq 客戶端與服務端項目的底層技術細節.

這裏比較自信地說一句,只要大家認真閱讀完本文,對於 nsq 的理解程度一定能做到 “遙遙領先” !

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/a_o7cUxhb9XC_fNecL7WAA