使用 Go 實現可用 select 監聽的隊列

1. 背景與選型

《基於 Redis Cluster 的分佈式鎖實現以互斥方式操作共享資源》一文一樣,今天要說的 Go 隊列方案也是有一定項目背景的。

5G 消息方興未艾 [1]!前一段時間從事了一段時間 5G 消息網關的研發,但凡涉及類似消息業務的網關,我們一般都離不開隊列這種數據結構的支持。這個 5G 消息網關項目採用的是 Go 技術棧開發,那麼我們應該如何爲它選擇一個與業務模型匹配且性能不差的實現呢?

如今一提到消息隊列,大家第一個想到的一定是 kafka[2],kafka 的確是一款優秀的分佈式隊列中間件,但對於我們這個系統來說,它有些 “重”,部署和運維都有門檻,並且項目組裏也沒有能很好維護它的專家,畢竟 “可控” 是技術選擇的一個重要因素。除此之外,我們更想在 Go 技術棧的生態中挑選,但 kafka 是 Java 實現的。

Go 圈裏在性能上能與 kafka“掰掰手腕” 的成熟選手不多,nats[3] 以及其主持持久化的子項目 nats-streaming[4] 算是其中兩個。不過 nats 的消息送達模型是:At-least-once-delivery,即至少送一次(而沒有 kafka 的精確送一次的送達模型)。一旦消費者性能下降,給 nats server 返回的應答超時,nats 就會做消息的重發處理:即將消息重新加入到隊列中。這與我們的業務模型不符,即便 nats 提供了發送超時的設定,但我們還是無法給出適當的 timeout 時間。Go 圈裏的另一個高性能分佈式消息隊列 nsq[5] 採用的也是 “至少送一次” 的消息送達模型 [6],因此也無法滿足我們的業務需求。

我們的業務決定了我們需要的隊列要支持 “多生產者多消費者” 模型,Go 語言內置的 channel 也是一個不錯的候選。經過多個 Go 版本的打磨和優化,channel 的 send 和 recv 操作性能在一定數量 goroutine 的情況下已經可以滿足很多業務場景的需求了。但 channel 還是不完全滿足我們的業務需求。我們的系統要求儘可能將來自客戶端的消息接收下來並緩存在隊列中。即便下游發送性能變慢,也要將客戶消息先收下來,而不是拒收或延遲響應。而 channel 本質上是一個具有 “靜態大小” 的隊列並且 Go 的 channel 操作語義會在 channel buffer 滿的情況下阻塞對 channel 的繼續 send,這就與我們的場景要求有背離,即便我們使用 buffered channel,我們也很難選擇一個合適的 len 值,並且一旦 buffer 滿,它與 unbuffered channel 行爲無異。

這樣一來,我們便選擇自己實現一個簡單的、高性能的滿足業務要求的隊列,並且最好能像 channel 那樣可以被 select 監聽到數據 ready,而不是給消費者帶去 “心智負擔” :消費者採用輪詢的方式查看隊列中是否有數據。

2. 設計與實現方案

要設計和實現這樣一個隊列結構,我們需要解決三個問題:

我們逐一來看!

1) 基礎隊列結構實現來自一個未被 Go 項目採納的技術提案

隊列是最基礎的數據結構,實現一個 “先進先出(FIFO)” 的練手 queue 十分容易,但實現一份能加入標準庫、資源佔用小且性能良好的 queue 並不容易。Christian Petrin[7] 在 2018 年 10 月份曾發起一份關於 Go 標準庫加入 queue 實現的技術提案 [8],提案對基於 array 和鏈表的多種 queue 實現 [9] 進行詳細的比對,並最終給出結論:impl7[10] 是最爲適宜和有競爭力的標準庫 queue 的候選者。雖然該技術提案目前尚未得到 accept,但 impl7 足可以作爲我們的內存隊列的基礎實現。

2) 爲 impl7 添加併發支持

在性能敏感的領域,我們可以直接使用 sync 包提供的諸多同步原語來實現 goroutine 併發安全訪問,這裏也不例外,一個最簡單的讓 impl7 隊列實現支持併發的方法就是使用 sync.Mutex 實現對隊列的互斥訪問。由於 impl7 並未作爲一個獨立的 repo 存在,我們將其代碼 copy 到我們的實現中 (queueimpl7.go),並將其包名由 queueimpl7 改名爲 queue:

// github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue1/queueimpl7.go

// Package queueimpl7 implements an unbounded, dynamically growing FIFO queue.
// Internally, queue store the values in fixed sized slices that are linked using
// a singly linked list.
// This implementation tests the queue performance when performing lazy creation of
// the internal slice as well as starting with a 1 sized slice, allowing it to grow
// up to 16 by using the builtin append function. Subsequent slices are created with
// 128 fixed size.
package queue

// Keeping below as var so it is possible to run the slice size bench tests with no coding changes.
var (
        // firstSliceSize holds the size of the first slice.
        firstSliceSize = 1

        // maxFirstSliceSize holds the maximum size of the first slice.
        maxFirstSliceSize = 16

        // maxInternalSliceSize holds the maximum size of each internal slice.
        maxInternalSliceSize = 128
)
... ...

下面我們就來爲以 queueimpl7 爲底層實現的 queue 增加併發訪問支持:

// github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue1/safe-queue.go

package queue

import (
 "sync"
)

type SafeQueue struct {
 q *Queueimpl7
 sync.Mutex
}

func NewSafe() *SafeQueue {
 sq := &SafeQueue{
  q: New(),
 }

 return sq
}

func (s *SafeQueue) Len() int {
 s.Lock()
 n := s.q.Len()
 s.Unlock()
 return n
}

func (s *SafeQueue) Push(v interface{}) {
 s.Lock()
 defer s.Unlock()

 s.q.Push(v)
}

func (s *SafeQueue) Pop() (interface{}, bool) {
 s.Lock()
 defer s.Unlock()
 return s.q.Pop()
}

func (s *SafeQueue) Front() (interface{}, bool) {
 s.Lock()
 defer s.Unlock()
 return s.q.Front()
}

我們建立一個新結構體 SafeQueue,用於表示支持併發訪問的 Queue,該結構只是在 queueimpl7 的 Queue 的基礎上嵌入了 sync.Mutex。

3) 支持 select 監聽

到這裏支持併發的 queue 雖然實現了,但在使用上還存在一些問題,尤其是對消費者而言,它只能通過輪詢的方式來檢查隊列中是否有消息。而 Go 併發範式中,select 扮演着重要角色,如果能讓 SafeQueue 像普通 channel 那樣能支持 select 監聽,那麼消費者在使用時的心智負擔將大大降低。於是我們得到了下面第二版的 SafeQueue 實現:

// github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue2/safe-queue.go

package queue

import (
 "sync"
 "time"
)

const (
 signalInterval = 200
 signalChanSize = 10
)

type SafeQueue struct {
 q *Queueimpl7
 sync.Mutex
 C chan struct{}
}

func NewSafe() *SafeQueue {
 sq := &SafeQueue{
  q: New(),
  C: make(chan struct{}, signalChanSize),
 }

 go func() {
  ticker := time.NewTicker(time.Millisecond * signalInterval)
  defer ticker.Stop()
  for {
   select {
   case <-ticker.C:
    if sq.q.Len() > 0 {
     // send signal to indicate there are message waiting to be handled
     select {
     case sq.C <- struct{}{}:
      //signaled
     default:
      // not block this goroutine
     }
    }
   }
  }

 }()

 return sq
}

func (s *SafeQueue) Len() int {
 s.Lock()
 n := s.q.Len()
 s.Unlock()
 return n
}

func (s *SafeQueue) Push(v interface{}) {
 s.Lock()
 defer s.Unlock()

 s.q.Push(v)
}

func (s *SafeQueue) Pop() (interface{}, bool) {
 s.Lock()
 defer s.Unlock()
 return s.q.Pop()
}

func (s *SafeQueue) Front() (interface{}, bool) {
 s.Lock()
 defer s.Unlock()
 return s.q.Front()
}

從上面代碼看到,每個 SafeQueue 的實例會伴隨一個 goroutine,該 goroutine 會定期 (signalInterval) 掃描其所綁定的隊列實例中當前消息數,如果大於 0,則會向 SafeQueue 結構中新增的 channel 發送一條數據,作爲一個 “事件”。SafeQueue 的消費者則可以通過 select 來監聽該 channel,待收到“事件” 後調用 SafeQueue 的 Pop 方法獲取隊列數據。下面是一個 SafeQueue 的簡單使用示例:

// github.com/bigwhite/experiments/blob/master/queue-with-select/main.go
package main

import (
 "fmt"
 "sync"
 "time"

 queue "github.com/bigwhite/safe-queue/safe-queue2"
)

func main() {
 var q = queue.NewSafe()
 var wg sync.WaitGroup

 wg.Add(2)
 // 生產者
 go func() {
  for i := 0; i < 1000; i++ {
   time.Sleep(time.Second)
   q.Push(i + 1)

  }
  wg.Done()
 }()

 // 消費者
 go func() {
 LOOP:
  for {
   select {
   case <-q.C:
    for {
     i, ok := q.Pop()
     if !ok {
      // no msg available
      continue LOOP
     }

     fmt.Printf("%d\n", i.(int))
    }
   }

  }

 }()

 wg.Wait()
}

從支持 SafeQueue 的原理可以看到,當有多個消費者時,只有一個消費者能得到 “事件” 並開始消費。如果隊列消息較少,只有一個消費者可以啓動消費,這個機制也不會導致 “驚羣”;當隊列中有源源不斷的消費產生時,與 SafeQueue 綁定的 goroutine 可能會連續發送“事件”,多個消費者都會收到事件並啓動消費行爲。在這樣的實現下,建議消費者在收到“事件” 後持續消費,直到 Pop 的第二個返回值返回 false(代表隊列爲空),就像上面示例中的那樣。

這個 SafeQueue 的性能 “中規中矩”,比 buffered channel 略好 (Go 1.16 darwin 下跑的 benchmark):

$go test -bench .
goos: darwin
goarch: amd64
pkg: github.com/bigwhite/safe-queue/safe-queue2
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkParallelQueuePush-8              10687545        110.9 ns/op       32 B/op        1 allocs/op
BenchmarkParallelQueuePop-8               18185744         55.58 ns/op        0 B/op        0 allocs/op
BenchmarkParallelPushBufferredChan-8      10275184        127.1 ns/op       16 B/op        1 allocs/op
BenchmarkParallelPopBufferedChan-8        10168750        128.8 ns/op       16 B/op        1 allocs/op
BenchmarkParallelPushUnBufferredChan-8     3005150        414.9 ns/op       16 B/op        1 allocs/op
BenchmarkParallelPopUnBufferedChan-8       2987301        402.9 ns/op       16 B/op        1 allocs/op
PASS
ok   github.com/bigwhite/safe-queue/safe-queue2 11.209s

注:BenchmarkParallelQueuePop-8 因爲是讀取空隊列,所以沒有分配內存,實際情況是會有內存分配的。另外併發 goroutine 的模擬差異可能導致有結果差異。

3. 擴展與問題

上面實現的 SafeQueue 是一個純內存隊列,一旦程序停止 / 重啓,未處理的消息都將消失。一個傳統的解決方法是採用 wal(write ahead log) 在推隊列之前將消息持久化後寫入文件,在消息出隊列後將消息狀態也寫入 wal 文件中。這樣重啓程序時,從 wal 中恢復消息到各個隊列即可。我們也可以將 wal 封裝到 SafeQueue 的實現中,在 SafeQueue 的 Push 和 Pop 時自動操作 wal,並對 SafeQueue 的使用者透明,不過這裏有一個前提,那就是隊列消息的可序列化(比如使用 protobuf)。另外 SafeQueue 還需提供一個對外的 wal 消息恢復接口。大家可以考慮一下如何實現這些。

另外在上述的 SafeQueue 實現中,我們在給 SafeQueue 增加 select 監聽時引入兩個 const:

const (
 signalInterval = 200
 signalChanSize = 10
)

對於 SafeQueue 的使用者而言,這兩個默認值可能不滿足需求,那麼我們可以將 SafeQueue 的 New 方法做一些改造,採用 “功能選項 (functional option)” 的模式 [11] 爲用戶提供設置這兩個值的可選接口,這個 “作業” 也留給大家了 ^_^。

本文所有示例代碼可以在這裏 [12] 下載 - https://github.com/bigwhite/experiments/tree/master/queue-with-select。

參考資料

[1]  5G 消息方興未艾: https://51smspush.com

[2]  kafka: https://kafka.apache.org/

[3]  nats: https://github.com/nats-io/nats-server

[4]  nats-streaming: https://github.com/nats-io/nats-streaming-server

[5]  nsq: https://github.com/nsqio/nsq

[6]  “至少送一次” 的消息送達模型: https://nsq.io/overview/features_and_guarantees.html

[7]  Christian Petrin: https://github.com/christianrpetrin

[8]  關於 Go 標準庫加入 queue 實現的技術提案: https://github.com/golang/proposal/blob/master/design/27935-unbounded-queue-package.md

[9]  多種 queue 實現: https://github.com/christianrpetrin/queue-tests

[10]  impl7: https://github.com/christianrpetrin/queue-tests/tree/master/queueimpl7/queueimpl7.go

[11]  “功能選項 (functional option)” 的模式: https://www.imooc.com/read/87/article/2424

[12]  這裏: https://github.com/bigwhite/experiments/tree/master/queue-with-select

[13]  改善 Go 語⾔編程質量的 50 個有效實踐: https://www.imooc.com/read/87

[14]  Kubernetes 實戰:高可用集羣搭建、配置、運維與應用: https://coding.imooc.com/class/284.html

[15]  我愛發短信: https://51smspush.com/

[16]  鏈接地址: https://m.do.co/c/bff6eed92687

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/GeM_W3Ea7uHCCgbfrHGHsg