NSQ 中的消息流轉

因爲工作上面需要用到 NSQ 消息隊列對業務做解耦,所以對 NSQ 做了一些簡單的研究。這篇文章主要結合源碼做一些原理性的分析。不瞭解的 NSQ 的同學可以先讀官方文檔。

我們從使用方式開始入手,並逐步深挖。

消費者簡單使用

//首先定義一個實現HandleMessage方法的結構體
type myMessageHandler struct {}
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
    //處理消息的業務邏輯
    err := processMessage(m.Body)
    //返回結果,這裏如果err不爲空,該消息會被重發
    return err
}
func main() {
  config := nsq.NewConfig()
  consumer, err := nsq.NewConsumer("topic", "channel", config)
  if err != nil {
    log.Fatal(err)
  }
        
  consumer.AddHandler(&myMessageHandler{})

  // 連接nsqloopup
  err = consumer.ConnectToNSQLookupd("localhost:4161")
  if err != nil {
    log.Fatal(err)
  }
  ......
}

客戶端原理

客戶端 Consumer 首先去連接 nsqlookup,通過 http 請求得到 nsqd 的地址,並最終對得到的 nsqd 地址進行 tcp 連接。並將該連接信息放到自身的 connections 中。

type Consumer struct {
    ......
    topic string
    channel string
    .....

    //用於消息傳遞的管道
    incomingMessages chan *Message

    //保存與nsqd的連接信息
    pendingConnections map[string]*Conn
    connections map[string]*Conn

    ......
}

在請求 NSQD 時,會先與 NSQD 建立一條 TCP 連接,並啓動兩個 goroutine 來服務這條連接,分別是 readLoop 和 writeLoop。

readLoop

readLoop 用於讀取 NSQD 發來的消息,包括消息 Message,心跳等,並調用 Consumer 的 OnMessage 方法去處理消息。

func (c *Conn) readLoop() {
    delegate := &connMessageDelegate{c}
    for {
        //從connection中讀取消息
        frameType, data, err := ReadUnpackedResponse(c)
        if err != nil {
            ......
        }

        //判斷是否是心跳消息
        if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
            ......
            continue
        }

        //正常的消息是FrameTypeMessage是類型
        switch frameType {
        case FrameTypeResponse:
            c.delegate.OnResponse(c, data)
        case FrameTypeMessage:
            // 解碼數據得到消息對象,因爲消息是按照NSQ自定義的一套序列化規則來的,
            //所以這裏進行解碼,並賦值到消息體對象
            msg, err := DecodeMessage(data)
            ......
            //一些賦值操作過後,增加處理的消息數量
            atomic.AddInt64(&c.messagesInFlight, 1)
            //最終調用Consumer的OnMessage方法處理消息
            c.delegate.OnMessage(c, msg)
        case FrameTypeError:
            ......
        default:
            c.log(LogLevelError, "IO error - %s", err)
            c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
        }
    }
    ......
}

consumer 的 OnMessage 方法,就是將 message 塞到了 incomingMessages 管道中。

func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message){
    d.r.onConnMessage(c, m)
    }

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
    atomic.AddUint64(&r.messagesReceived, 1)
    r.incomingMessages <- msg
}

那這個 incomingMessages 管道是誰來消費呢

在我們使用的時候會調用 AddHandler 方法,就會開啓一個 handlerLoop goruntine 去對 incomingMessages 管道進行消費,拿到消息,並調用我們自己的 handler.HandleMessage 方法處理業務。

func (r *Consumer) handlerLoop(handler Handler) {
    r.log(LogLevelDebug, "starting Handler")

    for {
        message, ok := <-r.incomingMessages
        if !ok {
            goto exit
        }
        //是否應該放棄這條消息(因爲有的消息可能消費失敗重試了很多次)
        if r.shouldFailMessage(message, handler) {
            //如果放棄,就發送finish信號FIN
            message.Finish()
            continue
        }
        //調用HandleMessage方法處理業務,如果返回的錯誤不爲空,
        //說明這條消息消費失敗了,根據業務需求決定是否要重新消費一次
        err := handler.HandleMessage(message)
        if err != nil {
            //自動回覆是否被禁用,默認情況下爲false
            if !message.IsAutoResponseDisabled() {
                //會調用Requeue方法實現重新入隊的操作
                message.Requeue(-1)
            }
            continue
        }
        //沒有錯誤產生情況下,再去調用Finish方法發送FIN信號給到nsqd
        if !message.IsAutoResponseDisabled() {
            message.Finish()
        }
    }
}

Requeue,Finish 這類方法會包裝一個 Command 類型的結構體指令,會將該指令返回 nsqd,代表這條消息的處理狀況,nsqd 端會對該指令做區分並調用不同方式處理。

type Command struct {
    Name []byte
    Params [][]byte
    Body []byte
}

func Finish(id MessageID) *Command {
    var params = [][]byte{id[:]}
    return &Command{[]byte("FIN"), params, nil}
}

func Requeue(id MessageID, delay time.Duration) *Command {
    var params = [][]byte{id[:], []byte(strconv.Itoa(int(delay / time.Millisecond)))}
    return &Command{[]byte("REQ"), params, nil}
}

writeLoop

上面說到的 message 對象對應的一系列回覆 nsqd 的方法(Requeue,Finish),都是將信息發送到了 Consumer 的 connection 對象中的 msgResponseChan 管道。而 writeLoop goroutine 會讀取該管道,並將信息發回到 nsqd。

func (c *Conn) writeLoop() {
    for {
        select {
            ......
        case resp := <-c.msgResponseChan:
            //consumer端信息處理結束,正在處理的信息數減一
            msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)

            //一些標記的操作
            if resp.success {
                ......
            } else {
                ......
            }
            //將指令寫回nsqd
            err := c.WriteCommand(resp.cmd)
            if err != nil {
                c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
                c.close()
                continue
            }

            ......
        }
    }

}

nsqd 端原理

上面的流程是 consumer 客戶端的消費邏輯,接下來介紹下 nsqd 服務端的消息處理邏輯。

nsq 中的'channel'類似於消費者組,但是 channel 是 topic 下一級的概念,channel 不能訂閱多個 topic。每個 topic 可能被多個 channel 訂閱,每個 channel 可能被多個 consumer 消費,類似下圖。

Topic

當 topic 創建時候,會初始化一個 Topic 對象。裏面包含一個 chan 類型的 memoryMsgChan,用於臨時存放 producer 發來的消息,由於 chan 是有限度的,所以當發生消息積壓打到 chan 的容量時候,後續的發來的消息會被持久化,BackendQueue 提供了持久化的接口。

在初始化 topic 的時候,也會創建一個異步 messagePump goroutine 去服務這個 topic,比如會從 memoryMsgChan 管道中拿消息分發到下游 channel 中。

type Topic struct {
    ......
    name string
    //存放多個channel
    channelMap map[string]*Channel
    //用於做磁盤持久化的接口
    backend BackendQueue
    //用於存放消息的管道
    memoryMsgChan chan *Message
    //channel的狀態發生改變的通知從這裏獲取
    channelUpdateChan chan int
    waitGroup util.WaitGroupWrapper
    exitFlag int32
    idFactory *guidFactory

    ephemeral bool
    deleteCallback func(*Topic)
    deleter        sync.Once

    paused    int32
    pauseChan chan int
    //nsqd全局對象
    nsqd *NSQD
}

接收到消息時,會調用 Topic 的 PutMessage->put 方法,該方法將 message 發送到 memoryMsgChan 管道,如果阻塞,調用 writeMessageToBackend 方法持久化(這裏的 backend 對象是使用的一個開源三方庫 go-diskqueue)。

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        err := writeMessageToBackend(m, t.backend)
        t.nsqd.SetHealth(err)
        if err != nil {
            return err
        }
    }
    return nil
}

messagePump 會從 memoryMsgChan 和 backend 中去讀取消息(backend 實現了 ReadChan 方法,可以讀到存取的數據)。

讀取到的數據會分發到該 topic 下面的 channel 中。

func (t *Topic) messagePump() {
    var msg *Message
    var buf []byte
    var err error
    //topic旗下的channel
    var chans []*Channel
    //兩個讀取消息的管道
    var memoryMsgChan chan *Message
    var backendChan <-chan []byte
    ......

    //有可能出現topic創建了,但是channel還沒創建的情況(也就是消費者
    //還沒有建立的情況,當channel創建好,會往channelUpdateChan發信號),

    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()
    if len(chans) > 0 && !t.IsPaused() {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }

    for {
        select {
        //從memoryMsgChan中收到消息
        case msg = <-memoryMsgChan:
        //從backend中收到消息
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            if err != nil {
                t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
        //當收到信號,代表消費者已經建立好了,會對memoryMsgChan和backendChan
        //進行重新賦值,會將Topic裏面的對象賦值過來。
        case <-t.channelUpdateChan:
            chans = chans[:0]
            t.RLock()
            //將channel對象追加到chans中
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) ==|| t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                //對兩個管道進行重新賦值
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        ......
        }

        //分別對每個channel去發送消息
        for i, channel := range chans {
            chanMsg := msg
            //確保每條消息在每個管道唯一性的操作
            if i > 0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            //查看該消息是否要延遲發送
            if chanMsg.deferred !={
                //如果要延遲發送,放到channel的延遲隊列中
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            //調用PutMessage發送
            err := channel.PutMessage(chanMsg)
            ......
        }
    }
}

channel

channel 是跟消費者 Consumer 去對接的,本身也需要一些能保存消息的數據結構,以確保消息的不丟失。在初始化時候會初始化一些存放消息的結構,比如帶緩衝區的 memoryMsgChan chan,也有一個 backend 用於持久化消息的組件,因爲需要預防自身的 memoryMsgChan 滿了的情況。

由於和消費者 Consumer 聯繫較爲緊密,所以需要確保消息提供的可靠性,也就是消息從發送到 ConsumerConsumer 返回消息響應這個時間段消息不丟失,需要有一個數據結構來對消息做標記,這就是 InFlight 機制。

type Channel struct {
    //自身的名稱等參數
    //.....
    nsqd *NSQD

    //channel用於持久化消息的管道
    backend BackendQueue

    //Channel用於存放消息的管道
    memoryMsgChan chan *Message
    exitFlag int32
    exitMutex sync.RWMutex
    ......

    // 延遲消息的存放位置
    deferredMessages map[MessageID]*pqueue.Item
    deferredPQ pqueue.PriorityQueue
    deferredMutex sync.Mutex

    //用於標記消息的數據結構
    inFlightMessages map[MessageID]*Message
    inFlightPQ inFlightPqueue
    inFlightMutex sync.Mutex
}

在 channel 發送消息調用 PutMessage 方法時,會調用自身的 put 方法,這裏和 Topic 的方法一樣.

func (c *Channel) put(m *Message) error {
    select {
    //發送到自身的memoryMsgChan,否則進行持久化
    case c.memoryMsgChan <- m:
    default:
        err := writeMessageToBackend(m, c.backend)
        c.nsqd.SetHealth(err)
        ......
    }
    return nil
}

這個時候的 memoryMsgChan 的消費端就是消費者 Consumer 創建的連接了。

對於每一個 Consumer 消費者,nsqd 會創建一個 Handle goroutine 去服務它。該 goroutine 會執行 IOLoop 方法,會異步啓動一個 messagePump goroutine,該協程從 Channel 中接收消息,並轉發到 Consumer。IOLoop 也會讀取 Consumer 發來的消息,並作出對應的處理。

當和 Consumer 建立連接時候,首先會收到一條 SUB 指令,來確定 Consumer 指定的 topic 和 channel。並將該的 Channel 對象傳到 messagePump goroutine 中,用於消費 channel 中消息。

func (p *protocolV2) IOLoop(c protocol.Client) error {
    var err error
    var line []byte
    var zeroTime time.Time

    client := c.(*clientV2)

    messagePumpStartedChan := make(chan bool)
    //異步啓動messagePump處理channel中的消息
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan

    //接收Consumer發來的指令
    for {
        ......
        line, err = client.Reader.ReadSlice('\n')
        if err != nil {
            ......
            break
        }

        //解析指令(FIN/REQ/SUB....)
        line = line[:len(line)-1]
        ......
        params := bytes.Split(line, separatorBytes)

        //應用指令
        response, err = p.Exec(client, params)
        if err != nil {
            ......
            continue
        }

        if response != nil {
            err = p.Send(client, frameTypeResponse, response)
            if err != nil {
                err = fmt.Errorf("failed to send response - %s", err)
                break
            }
        }
    }

    return err
}
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    ......
    for {
        ......

        select {
        ......
        //發送心跳信息給Consumer
        case <-heartbeatChan:
            err = p.Send(client, frameTypeResponse, heartbeatBytes)
            ......
        //發送磁盤持久化的信息給Consumer
        case b := <-backendMsgChan:
            ...解析一下信息
            msg, err := decodeMessage(b)
            if err != nil {
                p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
            //該信息即將發送,需要在channel的InFlight中保存一份。
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            //發送消息給Consumer
            err = p.SendMessage(client, msg)
            if err != nil {
                goto exit
            }
            flushed = false
        //發送管道中的信息給Consumer
        case msg := <-memoryMsgChan:
            ......
            //該信息即將發送,也需要在channel的InFlight中保存一份。
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            err = p.SendMessage(client, msg)
            if err != nil {
                goto exit
            }
            flushed = false
        ......
        }
    }
}

處理 Consumer 客戶端發來的請求

FIN

當 nsqd 收到 Consumer 傳來的 FIN 指令時,代表該消息已經處理結束,不需要繼續保存,此時可以從 Inflight 中移除該消息。

REQ

對於重新入隊的消息,會首先從 InFlight 中刪除該消息,並將消息以及下一次消息發送的時間存到 channel 的 deferredMessages 延時發送結構中,等待到時間將消息重新發送到 memoryMsgChan 管道或者 backend 中,實現重新發送的目的。

當然還有很多指令,這裏不一一列舉。

延遲隊列

上文講到 Channel 中有一個延遲發送的數據結構 deferredMessages。它是怎樣延遲發送的呢?

在 nsqd 啓動時候,後臺會異步啓動一個 queueScanLoop 協程。它通過定時器定期去檢查進程中關於 channel 的變化,並調用 resizePool 對所有的 channel 作出對應的調整(清理各個 channel 中超時的消息和檢查延遲隊列中的消息)。

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    //workCh中放着所有的channel,遍歷所有的channel並作處理
    for {
        select {
        //從workCh中取出channel
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            //檢查inflight中超時的消息,在nsqd發送消息出去時候,
            //會設置超時時間,如果沒返回FIN等信號,認爲超時,重新發送該消息
            if c.processInFlightQueue(now) {
                dirty = true
            }
            //檢查延遲隊列中的消息是否到了要發送的時間並進行發送
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        ......
        }
    }
}

上面提到過,在 Channel 中關於延遲隊列和 inflight 隊列是分別用兩個結構去存儲的,一個是 map 結構,用於標記該消息,確保唯一性;另一個是小頂堆實現的,通過時間對消息進行排序組織,來達到取出超時或者到時間的消息的目的。

type Channel struct {
    //延遲消息的存儲結構,用於標記消息
    deferredMessages map[MessageID]*pqueue.Item
    //最小堆實現的隊列,用於對消息通過時間進行排序
    deferredPQ pqueue.PriorityQueue

    //發送中的消息的存儲結構
    inFlightMessages map[MessageID]*Message
    //最小堆實現的隊列,用於對消息通過時間進行排序
    inFlightPQ inFlightPqueue
}
func (c *Channel) processDeferredQueue(t int64) bool {
    ......
    dirty := false
    for {
        c.deferredMutex.Lock()
        //檢查堆頂是否有超時的消息,有就去掉
        item, _ := c.deferredPQ.PeekAndShift(t)
        c.deferredMutex.Unlock()

        //當取出的元素爲空即沒有超時消息,跳出循環。
        if item == nil {
            goto exit
        }
        dirty = true

        msg := item.Value.(*Message)
        //刪掉deferredMessages中的消息
        _, err := c.popDeferredMessage(msg.ID)
        if err != nil {
            goto exit
        }
        //調用put方法將消息放到
        c.put(msg)
    }

exit:
    return dirty
}

小結

Reference

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/FaIe30sU-mNhpbaw-nKouw