Go 實現雙 buffer id 生成器
本文通過 Go
實現一個 Leaf——美團點評分佈式 ID 生成系統 雙 buffer 的 id 獲取器。
一. 主流程
主流程看圖似乎步驟非常多,但實際上總結起來就以下三個步驟:
-
客戶端發起請求
-
id 獲取器從緩存中獲取
-
如果 id 已經被消耗超過閾值,則重新加載新的號段到內存中
接下來我們來具體看一下 Go 代碼是如何實現的
二. 代碼實現
2.1 Segment
存儲了發號的具體遊標信息
type Segment struct {
Cursor uint64
MaxId uint64
MinId uint64
IsInitOk bool
}
1.Segment.Cursor 是當前發放的遊標
2.Segment.MaxId 和 Segment.MinId 是 id 號段的範圍
3.Segment.IsInitOk 標識了這個號段是否初始化完成
這裏號段的實現可以使用 DB 的形式來實現
BeginUPDATEtableSET max_id=max_id+step WHERE segment_type=xxxSELECT tag, max_id, step FROMtableWHERE segment_type=xxxCommit
2.2 SegmentAlloc
結構體定義
type SegmentAlloc struct {
SegmentType int // 號段業務類型
CurrentSegmentBufferIndex int // 當前使用的buffer 下標,會一直使用0的下標
SegmentBuffers []*Segment // 號段的雙buffer
Step uint64 // 號段加載的步長
IsPreloading bool // 是否處於預加載號段的狀態
UpdateTime time.Time // 更新號段的時間
mutex sync.Mutex // 互斥鎖
Waiting []chan struct{} //等待的客戶端,當號段用戶正處於初始化時,其他協程處於等待狀態,到一定超時時間仍然未完成再返回失敗
}
SegmentAlloc.SegmentBuffer 圖示:
- 當 SegmentAlloc.IsPreloading 爲 true 時,其他進程不能進行初始化,避免了同個業務類型重複去初始化 buffer
SegmentAlloc 判斷是否需要對 buffer 進行預加載
func (s *SegmentAlloc) IsNeedPreload() bool {
// 已經在預加載了
if s.IsPreloading {
return false
}
// 第二個緩衝區已經準備好 ,這裏之前遺漏了該判斷,會導致只要超過一半就開始去預加載
if len(s.SegmentBuffers) > 1 {
return false
}
segmentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
// 當前剩餘的號已經小於步長的一半,則進行加載
restId := segmentBuffer.MaxId - segmentBuffer.Cursor
if restId <= s.Step/2 {
return true
}
return false
}
2.SegmentAlloc.Waiting 保存了所有在等待的客戶端協程,等到初始化 buffer 完成後再喚起所有等待的協程
func (s *SegmentAlloc) WakeUpAllWaitingClient() error {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, waiting := range s.Waiting {
goasync.SafeClose(waiting)
}
return nil
}
這裏如果被其他攜程關閉的話則不能再進行二次關閉,所以需要通過下面方法來保證發送信號量不出異常,這裏其實偷懶了實現,不應該使用 panic 和 recover 來處理正常的關閉。
func SafeClose(ch chan struct{}) (ok bool) {
defer func() {
if recover() != nil {
// 已經被其他協程關閉
ok = false
}
}()
close(ch)
return true
}
3.SegmentAlloc.UpdateTime 存儲了內存數據更新的時間,如果長時間沒有更新的數據可以進行監控上報,看是否有業務異常
SegmentAlloc 加載完成後如果上一個號段已經用完則刷新 buffer
func (s *SegmentAlloc) IsHasSegment() bool {
currentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
// 這裏可能還沒有初始化好,
if currentBuffer.IsInitOk && currentBuffer.Cursor < currentBuffer.MaxId {
return true
}
return false
}
func (s *SegmentAlloc) IsNewBufferReady() bool {
if len(s.SegmentBuffers) <= 1 {
return false
}
return true
}
func (s *SegmentAlloc) RefreshBuffer() {
// 當前buffer 仍然有號則不需要刷新,因爲可能其他協程已經刷新了buffer區
if s.IsHasSegment() {
return
}
if !s.IsNewBufferReady() {
return
}
s.SegmentBuffers = append(s.SegmentBuffers[:0], s.SegmentBuffers[1:]...)
}
Segment 獲取新的 id 號
func (s *SegmentAlloc) GetId() uint64 {
if s.IsHasSegment() {
currentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
id := atomic.AddUint64(¤tBuffer.Cursor, 1)
s.UpdateTime = time.Now()
return id
}
return 0
}
func (s *SegmentAlloc) IsRightId(id uint64) bool {
return id > 0
}
2.3 SegmentCache
存儲了多個業務的 SegmentCache,並且通過管理信號量來看是否需要進行預加載
type SegmentCache struct {
cache sync.Map
loadSegmentEvent chan int // 加載號段的信號量
}
1.**SegmentCache.loadSegmentEvent :**通過信號量的方式來看是否需要觸發預加載
2.**SegmentCache.cache :**存儲了已經分配了 SegmentAlloc 數據
存儲和獲取 SegmentCache 中的 SegmentAlloc
func (s *SegmentCache) Add(alloc *SegmentAlloc) int {
s.cache.Store(alloc.SegmentType, alloc)
return alloc.SegmentType
}
func (s *SegmentCache) Get(segmentType int) *SegmentAlloc {
v, ok := s.cache.Load(segmentType)
if ok {
return v.(*SegmentAlloc)
}
return nil
}
讀取和寫入 load alloc 的信號量
func (s *SegmentCache) LoadEvent() <-chan int {
return s.loadSegmentEvent
}
func (s *SegmentCache) WriteLoadEvent(segmentAlloc *SegmentAlloc) {
segmentAlloc.IsPreloading = trues.loadSegmentEvent <- segmentAlloc.SegmentType
}
2.4 Service 實現
客戶端獲取新的業務 id 流程圖
通過上圖可以看到客戶端獲取 id 的主要流程。
-
先嚐試去獲取 Id,如果成功則直接返回
-
如果失敗則等待號段加載
-
等待一定時間,如果超過等待時長則直接返回錯誤。
具體代碼如下:
func (service *IdGenerateService) GetSegmentId(ctx context.Context, segmentType int) (id uint64, err error) {
segmentAlloc := service.segmentCache.Get(segmentType)
if segmentAlloc == nil {
return 0, errors.WithStack(err)
}
id, err = service.nextId(ctx, segmentAlloc)
if err != nil {
return 0, errors.WithStack(err)
}
return id, nil}
func (service *IdGenerateService) nextId(ctx context.Context, segmentAlloc *internalDefine.SegmentAlloc) (id uint64, err error) {
if segmentAlloc == nil {
return 0, define.RespGenerateIdErr
}
segmentAlloc.Lock()
defer segmentAlloc.Unlock()
id = segmentAlloc.GetId()
if segmentAlloc.IsNeedPreload() {
service.segmentCache.WriteLoadEvent(segmentAlloc)
}
// 1.如果已經獲取到正確的id則直接返回
if segmentAlloc.IsRightId(id) {
return id, nil
}
// 2.如果沒拿到號段 ,在這裏加入等待隊列,前面已經發出事件開始加載,避免多個協程同時進行加載
waitChan := make(chan struct{}, 1)
segmentAlloc.Waiting = append(segmentAlloc.Waiting, waitChan)
// 3.讓其他客戶端可以走前面的步驟,進入到等待狀態
segmentAlloc.Unlock()
// 3.最多等待500ms,超過等待時間則直接返回錯誤
timer := time.NewTimer(500 * time.Millisecond)
select {
case <-waitChan:
case <-timer.C:
}
segmentAlloc.Lock()
segmentAlloc.RefreshBuffer()
id = segmentAlloc.GetId()
if segmentAlloc.IsRightId(id) {
return id, nil
}
return 0, define.RespGenerateIdErr
}
監聽信號量加載新的號段,這裏會監聽寫入的信號量異步去加載號段數據。
func (service *IdGenerateService) watchSegmentLoadEvent(ctx context.Context) {
for {
select {
case <-ctx.Done():
returncase segmentType, ok := <-service.segmentCache.LoadEvent():
if !ok {
continue
}
err := service.loadSegmentAllocBuffer(ctx, segmentType)
if err != nil {
logger.CtxLogErrorf(ctx, "loadSegmentAllocBufferErr :%+v", err)
continue
}
default:
}
}
}
// loadSegmentAllocBuffer
// @Description: 預先加載id號,使用時直接從內存中獲取,避免每次去請求IDCS
func (service *IdGenerateService) loadSegmentAllocBuffer(ctx context.Context, segmentType int) (err error) {
segmentAlloc := service.segmentCache.Get(segmentType)
defer func() {
segmentAlloc.IsPreloading = falseif err == nil {
wakeupErr := segmentAlloc.WakeUpAllWaitingClient()
if wakeupErr != nil {
logger.CtxLogErrorf(ctx, "wake up all client err : %+v", wakeupErr)
}
}
}()
var (
id uint64
)
for i := 0; i < 3; i++ {
id, err = service.dao.GetSeqIdBySegmentType(ctx, segmentType)
if err != nil {
logger.CtxLogErrorf(ctx, "idCenterApi.GetSeqIdBySegmentTypeErr :%+v", err)
continue}
var (
minId = id - segmentAlloc.Step
maxId = id
)
segment := internalDefine.NewSegment(minId, maxId)
segmentAlloc.SegmentBuffers = append(segmentAlloc.SegmentBuffers, segment)
return nil}
if err != nil {
return errors.WithStack(err)
}
return nil
}
三. 關於高可用
3.1. 爲什麼不使用 chan
進行實現 buffer
?
因爲這樣的話無法實現在客戶端請求 QPS 暴增的情況下對 id 的號進行一個預先的加載,並且內存裏面會一直存在 chan
內部的數據。
通過雙 Buffer
的模式可以在 QPS 暴漲的情況下預留一批號在內存中,並且可以通過動態增加步長的方式來增加緩存號。
3.2. 動態步長調整
通過增加號段消耗時間來對步長進行動態調整,如果在短時間內號段很快被消耗了,那麼可以將步長翻倍進行獲取,緩存更多的號段在內存內。
這樣雙 Buffer 在數據庫獲取號段宕機的一段時間內仍然能夠獲取 id 號。
3.3. 爲什麼用內存緩存而不是 Redis?
該功能本身的目的用於隔離第三方的不穩定,Redis 本身也非可靠的服務,所以雖然本地內存無法實現分佈式的,並且有可能會浪費一部分號段,但是對於可用性的提升更爲顯著
四. 生產場景優化
4.1. 初始步長設置問題
如果業務場景中每一次操作就會剛好把 id 耗盡,那麼初始化的時候需要將對應類型的緩存增大,但是也不能一次性設置的過大,這樣會導致重新去準備號段的時候時間過長,雖然使用超過一半就會重新加載,但是如果峯值流量過大的情況下,消耗完了還沒加載完的話就會導致客戶端陷入等待,走兜底去獲取,也會造成接口變慢。
並且在服務第一次啓動時,如果強依賴了該組件,則會影響服務重啓的速度,所以目前選擇的弱依賴的形式,未全部 Loading 完成則不能進行獲取,直接兜底返回,這樣會存在的一個問題是服務重啓並且沒有全部完成加載時,接口 RT 會短時間的抖動。所以如果不擔心服務重啓速度的話可以進行強依賴。
4.2. 號段加載速度
加載 buffer 的時候可以通過分段優化,對於初始值較大的號段,可以通過多協程來對不同段進行加載,從而提升加載速度,這裏沒做是擔心對下游服務造成過大壓力。
4.3. 鎖粒度過大導致客戶端等待
加載的過程中實際上是不阻塞客戶端的獲取,因爲加載過程中是使用第一個 buffer, 操作時第二個 buffer 不受影響,所以無需鎖定整個 segment
導致客戶端陷入等待,加載和獲取互不影響,所以只在最後需要變更數據的時候進行上鎖即可。
4.4. 客戶端等待時長的設置
此處最開始設置等待 500ms,但是存在的一個問題是如果一次性獲取 100 個號並且內存中都未準備好時,就會等待 100*500ms , 這對於一個業務接口來說是不可接受的,所以將單個等待的超時時間修改爲 5ms
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/cPLozuxxGOZbwsGUxFdoFw