NSQ 原理解析

不知道大家有沒有這種感覺,在做 ppt 彙報,或者一些分享時候,準備的如果用 100 來打比方,最終演講出來的效果可能不到 80,導致的因素一方面是緊張導致沒說清楚,一方面是講着講着忘記了。

當然,作爲聽衆,可能聽到的效果不足 30,因爲如果內容比較枯燥,到後半程真正會去聽的也很少;或者中途走神了,再去聽後面的已經銜接不上了,只能繼續擺爛。所以,提前拋出一些有意思的話題,可以儘可能讓聽者多聽一會。

後面的資料補充也很重要,作爲分享知識的補充,便於聽者在沒有聽清楚時候後面翻看,雖然基本也很少有人會去翻看。

不過做一場技術分享,是一件使技術人的技術情懷得到滿足的事情。

作爲一個技術人,多少還是有一些技術情懷的,主要體現在這些方面:

  1. 完成一個模塊,對自己的代碼沾沾自喜一陣,雖然過幾天就看不懂了

  2. 看一些開源項目,提個 issue,pr,利用開源項目裏面的一些做法解決了一些業務上的難題,沒白看

  3. 幫助別人解決技術問題

  4. 寫一些技術文章總結,做技術分享

說是技術情懷,也是技術人的成就感。

本文只粘貼了部分我覺得最重要的內容的 ppt 頁,偏向工作原理性質的東西,通過該文章,可以瞭解到一款最簡單的消息隊列是怎麼實現的,該文章不像以往文章大段源碼,讀起來比較輕鬆,希望爲讀者工作中的某些問題提供一些思路。

message in producer

  1. 客戶端的 sdk 提供了 publish 等方法,當客戶端調用 Publish 等發送消息時候。

  2. 客戶端與 nsqd 會建立 tcp 連接,這裏是懶加載的模式,建立連接的同時,客戶端會啓動 routerloop,readloop 等任務

  3. nsqd 裏面的 tcpserver 會啓動一個 IOLoop 來服務這條 tcp 連接,

  4. 一切就緒了,消息會被封裝成一個 transaction,這個 transaction 包含消息體和一個 chan,並等待這個 chan,chan 返回則消息發送成功,這個 transaction 被放到 transactionChan 管道里,

  5. 管道會被 routerloop 消費,

  6. 進而將這個 transaction append 到 transactions 數組裏面,

  7. 之後會將消息體以及指令發送到 nsqd,nsqd 做處理。

  8. nsqd 處理後返回 ok,

  9. 被 readloop 讀取後發往 router,中間有一個 chan,圖裏沒體現,

  10. router 會將消息對應的 transaction 從切片 transactions 裏面 pop 出來,並把 transaction 中的 chan 釋放。客戶端那邊的方法就會返回。

這裏對 transactions 切片的操作是沒有加鎖的,因爲消息的發送與結束是在一個協程中處理的,是順序的發送。

message in nsqd

先看下 Topic 的數據結構

在每一個 topic 被創建的時候,都會啓動一個對應的 messagePump 協程,負責消費 topic 下的 memoryMsgChan 和 backend 中的 message,並把 message 分發給該 topic 每個 Channel,調用 channel 的 PutMessage 方法。

看一下 channel 的數據結構,主要比 topic 多了四個結構

channel 的 PutMessage 方法實現與 Topic 的一致,也是先寫 chan,滿了就寫 backend;區別是後面消費管道的消費者不同,channel 間接的對接了消費者。

message into consumer

  1. consumer 與 nsqd 建立 tcp 連接

  2. Nsqd 會爲 consumer 開啓一個 IOLoop,啓動一個 messagePump 協程服務;consumer 這邊,會啓動 readloop,writeloop 也來服務這條連接

  3. consumer 發送一個 SUB 指令,指定自己要消費的 topic,channel

  4. 該 topic 與 channel 會被 nsqd 找到

  5. Consumer 向 nsqd 發送 rdy 指令,代表自己已經就緒,nsqd 可以推數據了

  6. 此時該 messagePump 會消費 channel 對應的 memoryMsgChan 和 backend 中的數據

  7. 將消息發送到 consumer,被 consumer 的 readloop 獲取到

  8. Readloop 把消息放到 incomingMessages 信道里面

  9. 會被業務協程獲取到,並對消息進行處理

  10. 業務協程返回後,會對該消息封裝成 Finish 等方法

  11. 發送回到 nsqd,代表該消息已經消費

可以同時啓動多個消費者對同一個 channel 做消費,增強消費能力,nsqd 端對消費者的負載均衡處理方式也很簡單。多個 messagePump 同時去監聽 channel 下的兩種消息管道。

at least once

NSQ 在消息的傳遞過程中遵循的是 at least once,所以在某些時候會出現消息重複的情況;下面是消息在傳遞給消費者過程中的流程。

消息正常處理情況:

  1. Consumer 對應的 messagePump 獲取到一條消息

  2. 計算好消息的超時時間,將消息標記到 channel 的 inflightMessages 中,這是一個 map 結構,用於通過 ID 做標記;將消息放到 InflightPQ 中,這是一個最小堆,最靠近當前時間的消息會放在堆頂

  3. 將消息發送到 consumer,進行處理

  4. 在規定時間內處理完成,發送 Fin 信號,被 consumer 對應的 IOLoop 接收到

  5. 執行 FIN 指令,分別從 map 和最小堆中去掉該消息。

消息超時重傳情況,步驟 1-3 同上:

  1. 消息消費超時,可能是系統卡住等導致的一些情況

  2. 在 nsqd 啓動時候,會啓動一個 queueScanLoop 協程池,會定期去每個 channel 下面的 inflightPQ 最小堆檢查有沒有消息超時,方法就是比較堆頂的元素

  3. 如果超時,就會把消息 pop 出來,並從 map 中刪掉

  4. 將消息重新投遞到 channel 裏面,重新消費

有時候想要對一條消息進行重新消費,就可以通過 REQ 機制完成:

  1. 當客戶端想要將一些消息做重新消費的時候,可以通過 sdk 發送一條 REQ 的指令

  2. 會將消息從 inflight 的最小堆和 map 中雙雙去掉

  3. 將消息放到 deferedmessage map 中,並計算好下次發送的時間放到 defererPQ 最小堆中

  4. 啓動的 queueScanLoop 協程池中的 ququeScanWorker 會對延遲的消息進行檢查

  5. 一旦時間到了,就會取出消息,放到 channel 中去重新消費

磁盤隊列的數據格式

使用注意事項

MaxInFlight

大致講講 MaxInFlight 的原理,其中有很多細節沒有涉及:

併發處理消息的能力指的就是消費者一個時間點,能同時接收到多少條消息,通過這個參數可以控制 nsq 推送消息的速率。

  1. 假設設置 MaxInFlight 設置爲 M,consumer 能連接 nsqd 節點數量爲 N,分配給每一個 nsqd 節點的數值就是 M/N,如果值小於 1,則置爲 1

  2. 會給每個 nsqd 節點發送 RDY 指令時候,攜帶上這個 M/N,此時,nsqd 節點就知道該 consumer 最多一次接收多少了,設置成 rdyCount

  3. 當發送消息之前,會對比一下自己的 inflight 參數,如果小於 rdyCount(M/N),就發送,並自增這個 inflight,大於就不發送,等待通知(select default 嘗試通知)

  4. 當消息結束時候,會自減 inflifht,並通過 channel 嘗試通知到 3 步驟。實現一個類似滑動窗口的作用

NSQ 犧牲了一些副本機制,實現了高效與簡單。代碼還是很通俗易懂,感興趣的同學可以抽個時間研究一下。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/pve_gzKI_f0Mnl8ZYCitZQ