Go 實現雙 buffer id 生成器

本文通過 Go 實現一個 Leaf——美團點評分佈式 ID 生成系統 雙 buffer 的 id 獲取器。

一. 主流程

主流程看圖似乎步驟非常多,但實際上總結起來就以下三個步驟:

  1. 客戶端發起請求

  2. id 獲取器從緩存中獲取

  3. 如果 id 已經被消耗超過閾值,則重新加載新的號段到內存中

接下來我們來具體看一下 Go 代碼是如何實現的

二. 代碼實現

2.1 Segment

存儲了發號的具體遊標信息

type Segment struct {
 Cursor   uint64
 MaxId    uint64
 MinId    uint64
 IsInitOk bool 
}

1.Segment.Cursor 是當前發放的遊標

2.Segment.MaxId 和 Segment.MinId 是 id 號段的範圍

3.Segment.IsInitOk 標識了這個號段是否初始化完成

這裏號段的實現可以使用 DB 的形式來實現

BeginUPDATEtableSET max_id=max_id+step WHERE segment_type=xxxSELECT tag, max_id, step FROMtableWHERE segment_type=xxxCommit

2.2 SegmentAlloc

結構體定義

type SegmentAlloc struct {
 SegmentType               int             // 號段業務類型
 CurrentSegmentBufferIndex int             // 當前使用的buffer 下標,會一直使用0的下標
 SegmentBuffers            []*Segment      // 號段的雙buffer
 Step                      uint64          // 號段加載的步長
 IsPreloading              bool            // 是否處於預加載號段的狀態
 UpdateTime                time.Time       // 更新號段的時間
 mutex                     sync.Mutex      // 互斥鎖
 Waiting                   []chan struct{} //等待的客戶端,當號段用戶正處於初始化時,其他協程處於等待狀態,到一定超時時間仍然未完成再返回失敗
}

SegmentAlloc.SegmentBuffer 圖示:

  1. 當 SegmentAlloc.IsPreloading 爲 true 時,其他進程不能進行初始化,避免了同個業務類型重複去初始化 buffer

SegmentAlloc 判斷是否需要對 buffer 進行預加載

func (s *SegmentAlloc) IsNeedPreload() bool {
 // 已經在預加載了
 if s.IsPreloading {
  return false
 }
  // 第二個緩衝區已經準備好 ,這裏之前遺漏了該判斷,會導致只要超過一半就開始去預加載
 if len(s.SegmentBuffers) > 1 {
  return false
 }
 segmentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
 // 當前剩餘的號已經小於步長的一半,則進行加載
 restId := segmentBuffer.MaxId - segmentBuffer.Cursor
 if restId <= s.Step/2 {
  return true
 }
 return false
}

2.SegmentAlloc.Waiting 保存了所有在等待的客戶端協程,等到初始化 buffer 完成後再喚起所有等待的協程

func (s *SegmentAlloc) WakeUpAllWaitingClient() error {
 s.mutex.Lock()
 defer s.mutex.Unlock()
 for _, waiting := range s.Waiting {
  goasync.SafeClose(waiting)
 }
 return nil
}

這裏如果被其他攜程關閉的話則不能再進行二次關閉,所以需要通過下面方法來保證發送信號量不出異常,這裏其實偷懶了實現,不應該使用 panic 和 recover 來處理正常的關閉。

func SafeClose(ch chan struct{}) (ok bool) {
 defer func() {
  if recover() != nil {
   // 已經被其他協程關閉
   ok = false
  }
 }()
 close(ch)
 return true
}

3.SegmentAlloc.UpdateTime 存儲了內存數據更新的時間,如果長時間沒有更新的數據可以進行監控上報,看是否有業務異常

SegmentAlloc 加載完成後如果上一個號段已經用完則刷新 buffer

func (s *SegmentAlloc) IsHasSegment() bool {
 currentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
 // 這裏可能還沒有初始化好,
 if currentBuffer.IsInitOk && currentBuffer.Cursor < currentBuffer.MaxId {
  return true
 }
 return false
}

func (s *SegmentAlloc) IsNewBufferReady() bool {
 if len(s.SegmentBuffers) <= 1 {
  return false
 }
 return true
}

func (s *SegmentAlloc) RefreshBuffer() {
 // 當前buffer 仍然有號則不需要刷新,因爲可能其他協程已經刷新了buffer區
 if s.IsHasSegment() {
  return
 }
 if !s.IsNewBufferReady() {
  return
 }
 s.SegmentBuffers = append(s.SegmentBuffers[:0], s.SegmentBuffers[1:]...)
}

Segment 獲取新的 id 號

func (s *SegmentAlloc) GetId() uint64 {
 if s.IsHasSegment() {
  currentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
  id := atomic.AddUint64(¤tBuffer.Cursor, 1)
  s.UpdateTime = time.Now()
  return id
 }
 return 0
}

func (s *SegmentAlloc) IsRightId(id uint64) bool {
 return id > 0
}

2.3 SegmentCache

存儲了多個業務的 SegmentCache,並且通過管理信號量來看是否需要進行預加載

type SegmentCache struct {
 cache            sync.Map
 loadSegmentEvent chan int // 加載號段的信號量
}

1.**SegmentCache.loadSegmentEvent :**通過信號量的方式來看是否需要觸發預加載

2.**SegmentCache.cache :**存儲了已經分配了 SegmentAlloc 數據

存儲和獲取 SegmentCache 中的 SegmentAlloc

func (s *SegmentCache) Add(alloc *SegmentAlloc) int {
 s.cache.Store(alloc.SegmentType, alloc)
 return alloc.SegmentType
}

func (s *SegmentCache) Get(segmentType int) *SegmentAlloc {
 v, ok := s.cache.Load(segmentType)
 if ok {
  return v.(*SegmentAlloc)
 }
 return nil
}

讀取和寫入 load alloc 的信號量

func (s *SegmentCache) LoadEvent() <-chan int {
 return s.loadSegmentEvent
}

func (s *SegmentCache) WriteLoadEvent(segmentAlloc *SegmentAlloc) {
 segmentAlloc.IsPreloading = trues.loadSegmentEvent <- segmentAlloc.SegmentType
}

2.4 Service 實現

客戶端獲取新的業務 id 流程圖

通過上圖可以看到客戶端獲取 id 的主要流程。

  1. 先嚐試去獲取 Id,如果成功則直接返回

  2. 如果失敗則等待號段加載

  3. 等待一定時間,如果超過等待時長則直接返回錯誤。

具體代碼如下:

func (service *IdGenerateService) GetSegmentId(ctx context.Context, segmentType int) (id uint64, err error) {
 segmentAlloc := service.segmentCache.Get(segmentType)
 if segmentAlloc == nil {
  return 0, errors.WithStack(err)
 }

 id, err = service.nextId(ctx, segmentAlloc)
 if err != nil {
  return 0, errors.WithStack(err)
 }
 return id, nil}

func (service *IdGenerateService) nextId(ctx context.Context, segmentAlloc *internalDefine.SegmentAlloc) (id uint64, err error) {
 if segmentAlloc == nil {
  return 0, define.RespGenerateIdErr
 }

 segmentAlloc.Lock()
 defer segmentAlloc.Unlock()

 id = segmentAlloc.GetId()

 if segmentAlloc.IsNeedPreload() {
  service.segmentCache.WriteLoadEvent(segmentAlloc)
 }

 // 1.如果已經獲取到正確的id則直接返回
 if segmentAlloc.IsRightId(id) {
  return id, nil
 }

 // 2.如果沒拿到號段 ,在這裏加入等待隊列,前面已經發出事件開始加載,避免多個協程同時進行加載
 waitChan := make(chan struct{}, 1)
 segmentAlloc.Waiting = append(segmentAlloc.Waiting, waitChan)
 // 3.讓其他客戶端可以走前面的步驟,進入到等待狀態
 segmentAlloc.Unlock()

 // 3.最多等待500ms,超過等待時間則直接返回錯誤
 timer := time.NewTimer(500 * time.Millisecond)
 select {
 case <-waitChan:
 case <-timer.C:
 }

 segmentAlloc.Lock()
 segmentAlloc.RefreshBuffer()
 id = segmentAlloc.GetId()
 if segmentAlloc.IsRightId(id) {
  return id, nil
 }
 return 0, define.RespGenerateIdErr
}

監聽信號量加載新的號段,這裏會監聽寫入的信號量異步去加載號段數據。

func (service *IdGenerateService) watchSegmentLoadEvent(ctx context.Context) {
 for {
  select {
  case <-ctx.Done():
   returncase segmentType, ok := <-service.segmentCache.LoadEvent():
   if !ok {
    continue
   }
   err := service.loadSegmentAllocBuffer(ctx, segmentType)
   if err != nil {
    logger.CtxLogErrorf(ctx, "loadSegmentAllocBufferErr :%+v", err)
    continue
   }
  default:
  }
 }
}

// loadSegmentAllocBuffer
// @Description: 預先加載id號,使用時直接從內存中獲取,避免每次去請求IDCS
func (service *IdGenerateService) loadSegmentAllocBuffer(ctx context.Context, segmentType int) (err error) {
 segmentAlloc := service.segmentCache.Get(segmentType)
 defer func() {
  segmentAlloc.IsPreloading = falseif err == nil {
   wakeupErr := segmentAlloc.WakeUpAllWaitingClient()
   if wakeupErr != nil {
    logger.CtxLogErrorf(ctx, "wake up all client err : %+v", wakeupErr)
   }
  }
 }()
 var (
  id uint64
 )
 for i := 0; i < 3; i++ {
  id, err = service.dao.GetSeqIdBySegmentType(ctx, segmentType)
  if err != nil {
   logger.CtxLogErrorf(ctx, "idCenterApi.GetSeqIdBySegmentTypeErr :%+v", err)
   continue}
  var (
   minId = id - segmentAlloc.Step
   maxId = id
  )
  segment := internalDefine.NewSegment(minId, maxId)
  segmentAlloc.SegmentBuffers = append(segmentAlloc.SegmentBuffers, segment)
  return nil}
 if err != nil {
  return errors.WithStack(err)
 }
 return nil
}

三. 關於高可用

3.1. 爲什麼不使用 chan 進行實現 buffer

因爲這樣的話無法實現在客戶端請求 QPS 暴增的情況下對 id 的號進行一個預先的加載,並且內存裏面會一直存在 chan 內部的數據。

通過雙 Buffer 的模式可以在 QPS 暴漲的情況下預留一批號在內存中,並且可以通過動態增加步長的方式來增加緩存號。

3.2. 動態步長調整

通過增加號段消耗時間來對步長進行動態調整,如果在短時間內號段很快被消耗了,那麼可以將步長翻倍進行獲取,緩存更多的號段在內存內。

這樣雙 Buffer 在數據庫獲取號段宕機的一段時間內仍然能夠獲取 id 號。

3.3. 爲什麼用內存緩存而不是 Redis?

該功能本身的目的用於隔離第三方的不穩定,Redis 本身也非可靠的服務,所以雖然本地內存無法實現分佈式的,並且有可能會浪費一部分號段,但是對於可用性的提升更爲顯著

四. 生產場景優化

4.1. 初始步長設置問題

如果業務場景中每一次操作就會剛好把 id 耗盡,那麼初始化的時候需要將對應類型的緩存增大,但是也不能一次性設置的過大,這樣會導致重新去準備號段的時候時間過長,雖然使用超過一半就會重新加載,但是如果峯值流量過大的情況下,消耗完了還沒加載完的話就會導致客戶端陷入等待,走兜底去獲取,也會造成接口變慢。

並且在服務第一次啓動時,如果強依賴了該組件,則會影響服務重啓的速度,所以目前選擇的弱依賴的形式,未全部 Loading 完成則不能進行獲取,直接兜底返回,這樣會存在的一個問題是服務重啓並且沒有全部完成加載時,接口 RT 會短時間的抖動。所以如果不擔心服務重啓速度的話可以進行強依賴。

4.2. 號段加載速度

加載 buffer 的時候可以通過分段優化,對於初始值較大的號段,可以通過多協程來對不同段進行加載,從而提升加載速度,這裏沒做是擔心對下游服務造成過大壓力。

4.3. 鎖粒度過大導致客戶端等待

加載的過程中實際上是不阻塞客戶端的獲取,因爲加載過程中是使用第一個 buffer, 操作時第二個 buffer 不受影響,所以無需鎖定整個 segment 導致客戶端陷入等待,加載和獲取互不影響,所以只在最後需要變更數據的時候進行上鎖即可。

4.4. 客戶端等待時長的設置

此處最開始設置等待 500ms,但是存在的一個問題是如果一次性獲取 100 個號並且內存中都未準備好時,就會等待 100*500ms , 這對於一個業務接口來說是不可接受的,所以將單個等待的超時時間修改爲 5ms

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/cPLozuxxGOZbwsGUxFdoFw