VictoriaMetrics 源碼之布隆過濾器

VictoriaMetrics 的 vmstorage 組件會接收上游傳遞過來的指標,在現實場景中,指標或瞬時指標的數量級可能會非常恐怖,如果不限制緩存的大小,有可能會由於 cache miss 而導致出現過高的 slow insert(https://docs.victoriametrics.com/FAQ.html#what-is-a-slow-insert)。

爲此,vmstorage 提供了兩個參數:maxHourlySeriesmaxDailySeries,用於限制每小時 / 每天添加到緩存的唯一序列,唯一序列指表示唯一的時間序列,如 metrics{label1="value1",label2="value2"} 屬於一個時間序列,但多條不同值的 metrics{label1="value1",label2="value2"} 屬於同一條時間序列。VictoriaMetrics 使用如下方式來獲取時序的唯一標識:

func getLabelsHash(labels []prompbmarshal.Label) uint64 {
 bb := labelsHashBufPool.Get()
 b := bb.B[:0]
 for _, label := range labels {
  b = append(b, label.Name...)
  b = append(b, label.Value...)
 }
 h := xxhash.Sum64(b)
 bb.B = b
 labelsHashBufPool.Put(bb)
 return h
}

限速器的初始化

VictoriaMetrics 使用了一個類似限速器的概念,限制每小時 / 每天新增的唯一序列,但與普通的限速器不同的是,它需要在序列級別進行限制,即判斷某個序列是否是新的唯一序列,如果是,則需要進一步判斷一段時間內緩存中新的時序數目是否超過限制,而不是簡單地在請求層面進行限制。

hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
dailySeriesLimiter = bloomfilter.NewLimiter(*maxDailySeries, 24*time.Hour)

下面是新建限速器的函數,傳入一個最大 (序列) 值,以及一個刷新時間。該函數中會:

  1. 初始化一個限速器,限速器的最大元素個數爲 maxItems

  2. 則啓用了一個 goroutine,當時間達到 refreshInterval 時會重置限速器:

func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter {
 l := &Limiter{
  maxItems: maxItems,
  stopCh:   make(chan struct{}),
 }
 l.v.Store(newLimiter(maxItems)) //1
 l.wg.Add(1)
 go func() {
  defer l.wg.Done()
  t := time.NewTicker(refreshInterval)
  defer t.Stop()
  for {
   select {
   case <-t.C:
    l.v.Store(newLimiter(maxItems))//2
   case <-l.stopCh:
    return
   }
  }
 }()
 return l
}

限速器只有一個核心函數 Add,當 vmstorage 接收到一個指標之後,會通過 getLabelsHash 計算該指標的唯一標識 (h),然後調用下面的 Add 函數來判斷該唯一標識是否存在於緩存中。

如果當前存儲的元素個數大於等於允許的最大元素,則通過過濾器判斷緩存中是否已經存在該元素;否則將該元素直接加入過濾器中,後續允許將該元素加入到緩存中。

func (l *Limiter) Add(h uint64) bool {
 lm := l.v.Load().(*limiter)
 return lm.Add(h)
}

func (l *limiter) Add(h uint64) bool {
 currentItems := atomic.LoadUint64(&l.currentItems)
 if currentItems >= uint64(l.f.maxItems) {
  return l.f.Has(h)
 }
 if l.f.Add(h) {
  atomic.AddUint64(&l.currentItems, 1)
 }
 return true
}

上面的過濾器採用的是布隆過濾器,核心函數爲 HasAdd,分別用於判斷某個元素是否存在於過濾器中,以及將元素添加到布隆過濾器中。

過濾器的初始化函數如下,bitsPerItem 是個常量,值爲 16。bitsCount 統計了過濾器中的總 bit 數,每個 bit 表示某個值的存在性。bits 以 64bit 爲單位的 (後續稱之爲 slot,目的是爲了在 bitsCount 中快速檢索目標 bit)。計算 bits 時加上63的原因是爲了四捨五入向上取值,比如當 maxItems=1 時至少需要 1 個 unit64 的 slot。

func newFilter(maxItems int) *filter {
 bitsCount := maxItems * bitsPerItem
 bits := make([]uint64, (bitsCount+63)/64)
 return &filter{
  maxItems: maxItems,
  bits:     bits,
 }
}

爲什麼bitsPerItem爲 16?這篇文章 (https://zhuanlan.zhihu.com/p/282864286) 給出瞭如何計算布隆過濾器的大小。在本代碼中,k 爲 4(hashesCount),期望的漏失率爲 0.003(可以從官方的 filter_test.go 中看出),則要求總存儲和總元素的比例爲 15,爲了方便檢索 slot(64bit,爲 16 的倍數),將之設置爲 16。

if p > 0.003 {
  t.Fatalf("too big false hits share for maxItems=%d: %.5f, falseHits: %d", maxItems, p, falseHits)
}

下面是過濾器的 Add 操作,目的是在過濾器中添加某個元素。Add 函數中沒有使用多個哈希函數來計算元素的哈希值,轉而改變同一個元素的值,然後對相應的值應用相同的哈希函數,元素改變的次數受 hashesCount的限制。

  1. 獲取過濾器的完整存儲,並轉換爲以 bit 單位

  2. 將元素 h 轉換爲 byte 數組,便於 xxhash.Sum64 計算

  3. 後續將執行 hashesCount 次哈希,降低漏失率

  4. 計算元素 h 的哈希

  5. 遞增元素h,爲下一次哈希做準備

  6. 取餘法獲取元素的 bit 範圍

  7. 獲取元素所在的 slot(即 uint64 大小的 bit 範圍)

  8. 獲取元素所在的 slot 中的 bit 位,該位爲 1 表示該元素存在,爲 0 表示該元素不存在

  9. 獲取元素所在 bit 位的掩碼

  10. 加載元素所在的 slot 的數值

  11. 如果w & mask結果爲 0,說明該元素不存在,

  12. 將元素所在的 slot(w)中的元素所在的 bit 位 (mask) 置爲 1,表示添加了該元素

  13. 由於 Add 函數可以併發訪問,因此 bits[i] 有可能被其他操作修改,因此需要通過重新加載 (14) 並通過循環來在 bits[i] 中設置該元素的存在性

func (f *filter) Add(h uint64) bool {
 bits := f.bits
 maxBits := uint64(len(bits)) * 64 //1
 bp := (*[8]byte)(unsafe.Pointer(&h))//2
 b := bp[:]
 isNew := false
 for i := 0; i < hashesCount; i++ {//3
  hi := xxhash.Sum64(b)//4
  h++ //5
  idx := hi % maxBits //6
  i := idx / 64 //7
  j := idx % 64 //8
  mask := uint64(1) << j //9
  w := atomic.LoadUint64(&bits[i])//10
  for (& mask) == 0 {//11
   wNew := w | mask //12
   if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {//13
    isNew = true//14
    break
   }
   w = atomic.LoadUint64(&bits[i])//14
  }
 }
 return isNew
}

看懂了 Add 函數,Has 就相當簡單了,它只是 Add 函數的縮減版,無需設置 bits[i]

func (f *filter) Has(h uint64) bool {
 bits := f.bits
 maxBits := uint64(len(bits)) * 64
 bp := (*[8]byte)(unsafe.Pointer(&h))
 b := bp[:]
 for i := 0; i < hashesCount; i++ {
  hi := xxhash.Sum64(b)
  h++
  idx := hi % maxBits
  i := idx / 64
  j := idx % 64
  mask := uint64(1) << j
  w := atomic.LoadUint64(&bits[i])
  if (& mask) == 0 {
   return false
  }
 }
 return true
}

總結

由於 VictoriaMetrics 的過濾器採用的是布隆過濾器,因此它的限速並不精準,在源碼條件下,大約有 3% 的偏差。但同樣地,由於採用了布隆過濾器,降低了所需的內存以及相關計算資源。此外 VictoriaMetrics 的過濾器實現了併發訪問。過濾器爲了支持併發訪問,使用 atomic 來實現數值存儲和加載以及數值變更等操作。

在大流量場景中,如果需要對請求進行相對精準的過濾,可以考慮使用布隆過濾器,降低所需要的資源,但前提是過濾的結果能夠忍受一定程度的漏失率。

原文鏈接:https://www.cnblogs.com/charlieroro/p/16101238.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/8JzzngjsHFva4kMdwQgQSA