從零實現一個時序數據庫

時序數據庫(TSDB: Time Series Database)大多數時候都是爲了滿足監控場景的需求,這裏先介紹兩個概念:

series1: {"__name__""netspeed""host""localhost""iface""eth0"}
series2: {"__name__""netspeed""host""localhost""iface""eth1"}

Prometheus, InfluxDB, M3, TimescaleDB 都是時下流行的 TSDB。時序數據的壓縮算法很大程度上決定了 TSDB 的性能,以上幾個項目的實現都參考了 Fackbook 2015 年發表的論文《Gorilla: A fast, scalable, in-memory time series database》(http://www.vldb.org/pvldb/vol8/p1816-teller.pdf) 中提到的差值算法,該算法平均可以將 16 字節的數據點壓縮成 1.37 字節。

Who's mando?

Din Djarin, also known as "the Mandalorian" or simply "Mando," was a human male Mandalorian who worked as a famous bounty hunter during the New Republic Era.

What's mandodb?

mandodb(https://github.com/chenjiandongx/mandodb) 是我在學習過程中實現的一個最小化的 TSDB,從概念上來講它還算不上是一個完整的 TSDB,因爲它:

mandodb 主要受到了兩個項目的啓發。本項目僅限於學習用途,未經生產環境測試驗證!

prometheus 的核心開發者 Fabian Reinartz 寫了一篇文章 《Writing a Time Series Database from Scratch》(https://fabxc.org/tsdb/) 來介紹 prometheus TSDB 的演變過程,非常值得一讀,強烈推薦。

💡 數據模型 & API 文檔

數據模型定義

// Point 表示一個數據點 (ts, value) 二元組
type Point struct {
 Ts    int64 // in seconds
 Value float64
}

// Label 代表一個標籤組合
type Label struct {
 Name  string
 Value string
}

// Row 一行時序數據 包括數據點和標籤組合
type Row struct {
 Metric string
 Labels LabelSet
 Point  Point
}

// LabelSet 表示 Label 組合
type LabelSet []Label

// LabelMatcher Label 匹配器 支持正則
type LabelMatcher struct {
 Name   string
 Value  string
 IsRegx bool
}

// LabelMatcherSet 表示 LabelMatcher 組合
type LabelMatcherSet []LabelMatcher

API

// InsertRows 寫數據
InsertRows(rows []*Row) error 

// QueryRange 查詢時序數據點
QueryRange(metric string, lms LabelMatcherSet, start, end int64) ([]MetricRet, error)

// QuerySeries 查詢時序序列組合
QuerySeries(lms LabelMatcherSet, start, end int64) ([]map[string]string, error)

// QueryLabelValues 查詢標籤值
QueryLabelValues(label string, start, end int64) []string

🛠 配置選項

配置項在初始化 TSDB 的時候設置。

// WithMetaSerializerType 設置 Metadata 數據的序列化類型
// 目前只提供了 BinaryMetaSerializer
WithMetaSerializerType(t MetaSerializerType) Option 

// WithMetaBytesCompressorType 設置字節數據的壓縮算法
// 目前提供了
// * 不壓縮: NoopBytesCompressor(默認)
// * ZSTD: ZstdBytesCompressor
// * Snappy: SnappyBytesCompressor
WithMetaBytesCompressorType(t BytesCompressorType) Option

// WithOnlyMemoryMode 設置是否默認只存儲在內存中
// 默認爲 false
WithOnlyMemoryMode(memoryMode bool) Option

// WithEnabledOutdated 設置是否支持亂序寫入 此特性會增加資源開銷 但會提升數據完整性
// 默認爲 true
WithEnabledOutdated(outdated bool) Option

// WithMaxRowsPerSegment 設置單 Segment 最大允許存儲的點數
// 默認爲 19960412(夾雜私貨 🐶)
WithMaxRowsPerSegment(n int64) Option

// WithDataPath 設置 Segment 持久化存儲文件夾
// 默認爲 "."
WithDataPath(d string) Option

// WithRetention 設置 Segment 持久化數據保存時長
// 默認爲 7d
WithRetention(t time.Duration) Option

// WithWriteTimeout 設置寫入超時閾值
// 默認爲 30s
WithWriteTimeout(t time.Duration) Option

// WithLoggerConfig 設置日誌配置項
// logger: github.com/chenjiandongx/logger
WithLoggerConfig(opt *logger.Options) Option

🔖 用法示例

package main

import (
 "fmt"
 "time"

 "github.com/chenjiandongx/mandodb"
)

func main() {
 store := mandodb.OpenTSDB(
  mandodb.WithOnlyMemoryMode(true),
  mandodb.WithWriteTimeout(10*time.Second),
 )
 defer store.Close()

 // 插入數據
 _ = store.InsertRows([]*mandodb.Row{
  {
   Metric: "cpu.busy",
   Labels: []mandodb.Label{
    {Name: "node", Value: "vm1"},
    {Name: "dc", Value: "gz-idc"},
   },
   Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
  },
  {
   Metric: "cpu.busy",
   Labels: []mandodb.Label{
    {Name: "node", Value: "vm2"},
    {Name: "dc", Value: "sz-idc"},
   },
   Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
  },
 })

 time.Sleep(time.Millisecond)

 // 時序數據查詢
 data, _ := store.QueryRange("cpu.busy", nil, 1600000000, 1600000002)
 fmt.Printf("data: %+v\n", data)
 // output:
 // data: [{Labels:{__name__="cpu.busy"dc="gz-idc"node="vm1"} Points:[{Ts:1600000001 Value:0.1}]}]

 // 查詢 Series
 // __name__ 是 metric 名稱在 TSDB 中的 Label Key
 ser, _ := store.QuerySeries(
        mandodb.LabelMatcherSet{{Name: "__name__", Value: "cpu.busy"}}, 1600000000, 1600000002)
 for _, d := range ser {
  fmt.Printf("data: %+v\n", d)
 }
 // output:
 // data: map[__name__:cpu.busy dc:gz-idc node:vm1]
 // data: map[__name__:cpu.busy dc:sz-idc node:vm2]

 // 查詢標籤值
 lvs := store.QueryLabelValues("node", 1600000000, 1600000002)
 fmt.Printf("data: %+v\n", lvs)
 // output:
 // data: [vm1 vm2]
}

下面是我對這段時間學習內容的整理,嘗試完整介紹如何從零開始實現一個小型的 TSDB。

我本身並沒有數據庫開發的背景,某些描述可能並不那麼準確,所以歡迎 實名 diss 指正。

🧮 Gorilla 差值算法

Gorilla 論文 4.1 小節介紹了壓縮算法,先整體看一下壓縮方案,T/V 是緊挨存儲的,'0'/'10'/'11' 表示控制位。

Figure: Gorilla 壓縮算法

Timestamp DOD 壓縮:

在時序的場景中,每個時序點都有一個對應的 Timestamp,一條時序序列中相鄰數據點的間隔是有規律可循的。一般來講,監控數據的採集都是會以固定的時間間隔進行的,所以就可以用差值來記錄時間間隔,更進一步,我們可以用差值的差值來記錄以此來減少存儲空間。

t1: 1627401800; t2: 1627401810; t3: 1627401820; t4: 1627401830
--------------------------------------------------------------
// 差值:delta
t1: 1627401800; (t2-t1)d1: 10; (t3-t2)d2: 10; (t4-t3)d3: 10; 
--------------------------------------------------------------
// 差值的差值:delta of delta
t1: 1627401800; dod1: 0; dod2: 0; dod3: 0;

實際環境中當然不可能每個間隔都這麼均勻,由於網絡延遲等其他原因,差值會有波動。

Value XOR 壓縮:

Figure: IEEE 浮點數以及 XOR 計算結果

當兩個數據點數值值比較接近的話,通過異或操作計算出來的結果是比較相似的,利用這點就可以通過記錄前置零和後置零個數以及數值部分來達到壓縮空間的目的。

下面通過算法實現來介紹,代碼來自項目 dgryski/go-tsz。代碼完全按照論文中給出的步驟來實現。

// New 初始化 block 這裏會將第一個原始時間戳寫入到 block 中
func New(t0 uint32) *Series {
 s := Series{
  T0:      t0,
  leading: ^uint8(0),
 }

 s.bw.writeBits(uint64(t0), 32)
 return &s
}

// Push 負責寫入時序數據
func (s *Series) Push(t uint32, v float64) {
 // ....
 // 如果是第一個數據點的話寫入原始數據後直接返回
 if s.t == 0 {
  s.t = t
  s.val = v
  s.tDelta = t - s.T0 // 實際上這裏爲 0

  // The block header stores the starting time stamp, t-1(前一個時間戳),
  // which is aligned to a two hour window; the first time
  // stamp, t0, in the block is stored as a delta from t−1 in 14 bits.
        
  // 用 14 個 bit 寫入時間戳差值
  s.bw.writeBits(uint64(s.tDelta), 14)
  // 原始數據點完整寫入
  s.bw.writeBits(math.Float64bits(v), 64)
  return
 }

 tDelta := t - s.t
 dod := int32(tDelta - s.tDelta) // 計算差值的差值 Detla of Delta

 // 下面開始就處理非第一個數據點的情況了
 switch {
  // If D is zero, then store a single ‘0’ bit
  // 如果是零的話 那直接用 '0' 一個字節就可以直接表示
 case dod == 0:
  s.bw.writeBit(zero)

  //  If D is between [-63, 64], store ‘10’ followed by the value (7 bits)
 case -63 <= dod && dod <= 64:
  s.bw.writeBits(0x02, 2) // 控制位 '10'
  s.bw.writeBits(uint64(dod), 7) // 7bits 可以表示 [-63, 64] 的範圍

  // If D is between [-255, 256], store ‘110’ followed by the value (9 bits)
 case -255 <= dod && dod <= 256:
  s.bw.writeBits(0x06, 3) // 控制位 '110'
  s.bw.writeBits(uint64(dod), 9)

  // if D is between [-2047, 2048], store ‘1110’ followed by the value (12 bits)
 case -2047 <= dod && dod <= 2048:
  s.bw.writeBits(0x0e, 4) // 控制位 '1110'
  s.bw.writeBits(uint64(dod), 12)

  // Otherwise store ‘1111’ followed by D using 32 bits
 default:
  s.bw.writeBits(0x0f, 4) // 其餘情況控制位均用 '1111'
  s.bw.writeBits(uint64(dod), 32)
 }

 // 到這裏 (T, V) 中的時間戳已經寫入完畢了 接下來是寫 V 部分

 // 先計算兩個值的異或結果
 vDelta := math.Float64bits(v) ^ math.Float64bits(s.val)

 // If XOR with the previous is zero (same value), store single ‘0’ bit
 // 如果前後兩個值相等的話 直接用 '0' 1 個 bit 就可以表示
 // 所以如果上報的時序數據是 1 或者 0 這種的話 佔用的內存會非常少

 // zero = '0'; one = '1'
 if vDelta == 0 {
  s.bw.writeBit(zero)
 } else {    // 非 0 情況那就要把控制位置爲 1
  s.bw.writeBit(one)

  // 計算前置 0 和後置 0
  leading := uint8(bits.LeadingZeros64(vDelta))
  trailing := uint8(bits.TrailingZeros64(vDelta))

  // clamp number of leading zeros to avoid overflow when encoding
  if leading >= 32 {
   leading = 31
  }

  // (Control bit ‘0’) If the block of meaningful bits
  // falls within the block of previous meaningful bits,
  // i.e., there are at least as many leading zeros and
  // as many trailing zeros as with the previous value,
  // use that information for the block position and
  // just store the meaningful XORed value.

  // 如果前置 0 不小於上一個值計算的異或結果的前置 0 且後置 0 也不小於上一個值計算的異或結果的後置 0
  if s.leading != ^uint8(0) && leading >= s.leading && trailing >= s.trailing { // => 控制位 '10'
   s.bw.writeBit(zero)
   // 記錄異或值非零部分
   s.bw.writeBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing))
  } else { // => 控制位 '11'

   // (Control bit ‘1’) Store the length of the number
   // of leading zeros in the next 5 bits, then store the
   // length of the meaningful XORed value in the next
   // 6 bits. Finally store the meaningful bits of the XORed value.
   s.leading, s.trailing = leading, trailing

   // 其他情況控制位置爲 1 並用接下來的 5bits 記錄前置 0 個數
   s.bw.writeBit(one)
   s.bw.writeBits(uint64(leading), 5)

   // 然後用接下來的 6bits 記錄異或差值中的非零部分
   sigbits := 64 - leading - trailing
   s.bw.writeBits(uint64(sigbits), 6)
   s.bw.writeBits(vDelta>>trailing, int(sigbits))
  }
 }

 // 狀態更新 至此(T, V)均已被壓縮寫入到內存中
 s.tDelta = tDelta
 s.t = t
 s.val = v
}

// 每個 block 的結尾會使用特殊標記用於標識
func finish(w *bstream) {
 // write an end-of-stream record
 w.writeBits(0x0f, 4)
 w.writeBits(0xffffffff, 32)
 w.writeBit(zero)
}

論文給出了不同 case 的 buckets 佔比分佈。

Figure: Timestamp buckets distribution

Figure: Value buckets distribution

Timestamp buckets 中,前後兩個時間戳差值相同的比例高達 96.39%,而在 Value buckets 中只用一個控制位的佔比也達到了 59.06%,可見其壓縮比之高。

論文還給出了一個重要結論,數據壓縮比隨着時間的增長而增長,並在 120 個點的時候開始收斂到一個最佳值。

Figure: 壓縮率曲線

Gorilla 差值算法也應用於我的另外一個項目 chenjiandongx/tszlist,一種時序數據線程安全鏈表。

📝 數據寫入

時序數據具有「垂直寫,水平查」的特性,即同一時刻有多條時間線的數據不斷被追加。但查詢的時候往往是查某條時間線持續一段時間內的數據點。

series
  ^   
  │   . . . . . . . . . . . . . . . . .   . . . . .   {__name__="request_total"method="GET"}
  │     . . . . . . . . . . . . . . . . . . . . . .   {__name__="request_total"method="POST"}
  │         . . . . . . .
  │       . . .     . . . . . . . . . . . . . . . .                  ... 
  │     . . . . . . . . . . . . . . . . .   . . . .   
  │     . . . . . . . . . .   . . . . . . . . . . .   {__name__="errors_total"method="POST"}
  │           . . .   . . . . . . . . .   . . . . .   {__name__="errors_total"method="GET"}
  │         . . . . . . . . .       . . . . .
  │       . . .     . . . . . . . . . . . . . . . .                  ... 
  │     . . . . . . . . . . . . . . . .   . . . . 
  v
    <-------------------- time --------------------->

時序數據跟時間是強相關的(不然還叫時序數據?🧐),即大多數查詢其實只會查詢最近時刻的數據,這裏的「最近」是個相對概念。所以沒必要維護一條時間線的完整生命週期,特別是在 Kubernetes 這種雲原生場景,Pod 隨時有可能會被擴縮容,也就意味着一條時間線的生命週期可能會很短。如果我們一直記錄着所有的時間線的索引信息,那麼隨着時間的推移,數據庫裏的時間線的數量會呈現一個線性增長的趨勢 😱,會極大地影響查詢效率。

這裏引入一個概念「序列分流」,這個概念描述的是一組時間序列變得不活躍,即不再接收數據點,取而代之的是有一組新的活躍的序列出現的場景。

series
  ^
  │   . . . . . .
  │   . . . . . .
  │   . . . . . .
  │               . . . . . . .
  │               . . . . . . .
  │               . . . . . . .
  │                             . . . . . .
  │                             . . . . . .
  │                                         . . . . .
  │                                         . . . . .
  │                                         . . . . .
  v
    <-------------------- time --------------------->

我們將多條時間線的數據按一定的時間跨度切割成多個小塊,每個小塊本質就是一個獨立小型的數據庫,這種做法另外一個優勢是清除過期操作的時候非常方便,只要將整個塊給刪了就行 👻(梭哈是一種智慧)。內存中保留最近兩個小時的熱數據(Memory Segment),其餘數據持久化到磁盤 (Disk Segment)。

Figure: 序列分塊

DiskSegment 使用的是 AVL Tree 實現的列表,可在插入時排序。爲什麼不用更加高大上的紅黑樹?因爲不好實現...

當 Memory Segment 達到歸檔條件的時候,會創建一個新的內存塊並異步將剛歸檔的塊寫入到磁盤,同時會使用 mmap 將磁盤文件句柄映射到內存中。代碼實現如下。

func (tsdb *TSDB) getHeadPartition() (Segment, error) {
 tsdb.mut.Lock()
 defer tsdb.mut.Unlock()

 if tsdb.segs.head.Frozen() {
  head := tsdb.segs.head

  go func() {
   tsdb.wg.Add(1)
   defer tsdb.wg.Done()

   tsdb.segs.Add(head)

   t0 := time.Now()
   dn := dirname(head.MinTs(), head.MaxTs())

   if err := writeToDisk(head.(*memorySegment)); err != nil {
    logger.Errorf("failed to flush data to disk, %v", err)
    return
   }

   fname := path.Join(dn, "data")
   mf, err := mmap.OpenMmapFile(fname)
   if err != nil {
    logger.Errorf("failed to make a mmap file %s, %v", fname, err)
    return
   }

   tsdb.segs.Remove(head)
   tsdb.segs.Add(newDiskSegment(mf, dn, head.MinTs(), head.MaxTs()))
   logger.Infof("write file %s take: %v", fname, time.Since(t0))
  }()

  tsdb.segs.head = newMemorySegment()
 }

 return tsdb.segs.head, nil
}

Figure: Memory Segment 兩部分數據

寫入的時候支持數據時間回撥,也就是支持有限的亂序數據寫入,實現方案是在內存中對還沒歸檔的每條時間線維護一個鏈表(同樣使用 AVL Tree 實現),當數據點的時間戳不是遞增的時候存儲到鏈表中,查詢的時候會將兩部分數據合併查詢,持久化的時候也會將兩者合併寫入。

🖇 Mmap 內存映射

mmap 是一種將磁盤文件映射到進程的虛擬地址空間來實現對文件讀取和修改操作的技術。

從 Linux 角度來看,操作系統的內存空間被分爲「內核空間」和「用戶空間」兩大部分,其中內核空間和用戶空間的空間大小、操作權限以及核心功能都不相同。這裏的內核空間是指操作系統本身使用的內存空間,而用戶空間則是提供給各個進程使用的內存空間。由於用戶進程不具有訪問內核資源的權限,例如訪問硬件資源,因此當一個用戶進程需要使用內核資源的時候,就需要通過 系統調用 來完成。

虛擬內存細節可以閱讀 《虛擬內存精粹》 這篇文章。

Figure: 常規文件操作和 mmap 操作的區別

image

常規文件操作

讀文件: 用戶進程首先執行 read(2) 系統調用,會進行系統上下文環境切換,從用戶態切換到內核態,之後由 DMA 將文件數據從磁盤讀取到內核緩衝區,再將內核空間緩衝區的數據複製到用戶空間的緩衝區中,最後 read(2) 系統調用返回,進程從內核態切換到用戶態,整個過程結束。

寫文件: 用戶進程發起 write(2) 系統調用,從用戶態切換到內核態,將數據從用戶空間緩衝區複製到內核空間緩衝區,接着 write(2) 系統調用返回,同時進程從內核態切換到用戶態,數據從內核緩衝區寫入到磁盤,整個過程結束。

mmap 操作

mmap 內存映射的實現過程,總的來說可以分爲三個階段:

  1. 進程啓動映射過程,並在虛擬地址空間中爲映射創建虛擬映射區域。

  2. 執行內核空間的系統調用函數 mmap,建立文件物理地址和進程虛擬地址的一一映射關係。

  3. 進程發起對這片映射空間的訪問,引發缺頁異常,實現文件內容到物理內存的拷貝。

📣 小結

常規文件操作爲了提高讀寫效率和保護磁盤,使用了頁緩存機制。這樣造成讀文件時需要先將文件頁從磁盤拷貝到頁緩存中,由於頁緩存處在內核空間,不能被用戶進程直接尋址,所以還需要將頁緩存中數據頁再次拷貝到內存對應的用戶空間中。這樣,通過了兩次數據拷貝過程,才能完成進程對文件內容的獲取任務。寫操作也是一樣,待寫入的 buffer 在內核空間不能直接訪問,必須要先拷貝至內核空間對應的主存,再寫回磁盤中(延遲寫回),也是需要兩次數據拷貝。

而使用 mmap 操作文件,創建新的虛擬內存區域和建立文件磁盤地址和虛擬內存區域映射這兩步,沒有任何文件拷貝操作。而之後訪問數據時發現內存中並無數據而發起的缺頁異常過程,可以通過已經建立好的映射關係,只使用一次數據拷貝,就從磁盤中將數據傳入內存的用戶空間中,供進程使用。

😅 總而言之,常規文件操作需要從磁盤到頁緩存再到用戶主存的兩次數據拷貝。而 mmap 操控文件只需要從磁盤到用戶主存的一次數據拷貝過程。mmap 的關鍵點是實現了「用戶空間」和「內核空間」的數據直接交互而省去了不同空間數據複製的開銷

📍 索引設計

TSDB 的查詢,是通過 Label 組合來鎖定到具體的時間線進而確定分塊偏移檢索出數據。

在傳統的關係型數據庫,索引設計可能是這樣的。

LDQiHm

時序數據是 NoSchema 的,沒辦法提前建表和定義數據模型 🤔,因爲我們要支持用戶上報任意 Label 組合的數據,這樣的話就沒辦法進行動態的擴展了。或許你會靈光一現 ✨,既然這樣,那把 Labels 放一個字段拼接起來不就可以無限擴展啦,比如下面這個樣子。

o3xVJu

喲嚯,乍一看沒毛病,靚仔竊喜。

不對,有問題 😨,要定位到其中的某條時間線,那我是不是得全表掃描一趟。而且這種設計還有另外一個弊病,就是會導致內存激增,Label 的 Name 和 Value 都可能是特別長的字符串。

那怎麼辦呢(🤡 靚仔沉默...),剎那間我的腦中閃過一個帥氣的身影,沒錯,就是你,花澤類「只要倒立眼淚就不會流出來」。

我悟了!要學會逆向思維 🙃,把 Label 當做主鍵,Sid 當做其字段不就好了。這其實有點類似於 ElasticSearch 中的倒排索引,主鍵爲 Keyword,字段爲 DocumentID。索引設計如下。

F2QPJW

Label 作爲主鍵時會建立索引(Hashkey),查找的效率可視爲 O(1),再根據鎖定的 Label 來最終確定想要的 Sid。舉個例子,我們想要查找 {vm="node1", iface="eth0"} 的時間線的話就可以快速定位到 Sids(忽略其他 ... sid)。

sid1; sid2; sid3
sid2; sid3; sid5

兩者求一個交集,就可以得到最終要查詢的 Sid 爲 sid2sid3。🙂 Nice!

假設我們的查詢只支持相等匹配的話,格局明顯就小了 🤌。查詢條件是 {vm=~"node*", iface="eth0"} 腫麼辦?對 label1、label2、label3 和 label4 一起求一個並集嗎?顯然不是,因爲這樣算的話那結果就是 sid3

釐清關係就不難看出,只要對相同的 Label Name 做並集然後再對不同的 Label Name 求交集就可以了。這樣算的正確結果就是 sid3sid5。實現的時候用到了 Roaring Bitmap,一種優化的位圖算法。

Memory Segment 索引匹配

func (mim *memoryIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []string {
 // ...
 sids := newMemorySidSet()
 var got bool
 for i := len(lms) - 1; i >= 0; i-- {
  tmp := newMemorySidSet()
  vs := lvs.Match(lms[i])
  // 對相同的 Label Name 求並集
  for _, v := range vs {
   midx := mim.idx[joinSeparator(lms[i].Name, v)]
   if midx == nil || midx.Size() <= 0 {
    continue
   }

   tmp.Union(midx.Copy())
  }

  if tmp == nil || tmp.Size() <= 0 {
   return nil
  }

  if !got {
   sids = tmp
   got = true
   continue
  }

  // 對不同的 Label Name 求交集
  sids.Intersection(tmp.Copy())
 }

 return sids.List()
}

Disk Segment 索引匹配

func (dim *diskIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []uint32 {
 // ...

 lst := make([]*roaring.Bitmap, 0)
 for i := len(lms) - 1; i >= 0; i-- {
  tmp := make([]*roaring.Bitmap, 0)
  vs := lvs.Match(lms[i])

  // 對相同的 Label Name 求並集
  for _, v := range vs {
   didx := dim.label2sids[joinSeparator(lms[i].Name, v)]
   if didx == nil || didx.set.IsEmpty() {
    continue
   }

   tmp = append(tmp, didx.set)
  }

  union := roaring.ParOr(4, tmp...)
  if union.IsEmpty() {
   return nil
  }

  lst = append(lst, union)
 }

 // 對不同的 Label Name 求交集
 return roaring.ParAnd(4, lst...).ToArray()
}

然而,確定相同的 LabelName 也是一個問題,因爲 Label 本身就代表着 Name:Value,難不成我還要遍歷所有 label 才能確定嘛,這不就又成了全表掃描???

沒有什麼問題是一個索引解決不了的,如果有,那就再增加一個索引。--- 魯迅。

只要我們保存 Label 的 Name 對應的 Value 列表的映射關係即可高效解決這個問題。

hCjXwc

還是上面的 {vm=~"node1|node2", iface="eth0"} 查詢,第一步通過正則匹配確定匹配到 node1, node2,第二步匹配到 eth0,再將 LabelName 和 LabelValue 一拼裝,Label 就出來了,✌️ 完事!

橋豆麻袋!還有一個精彩的正則匹配優化算法沒介紹。

fastRegexMatcher 是一種優化的正則匹配器,算法來自 Prometheus。

// 思路就是儘量先執行前綴匹配和後綴匹配 能不用正則就不用正則
// 如 label 表達式爲 {vm="node*"}
// 而我們此時內存中有 vm=node1, vm=node2, vm=foo, vm=bar,那這個時候只需要前綴匹配就能直接把 vm=foo,vm=bar 給過濾了
// 畢竟前綴匹配和後綴匹配的執行效率還是比正則高不少的
type fastRegexMatcher struct {
 re       *regexp.Regexp
 prefix   string
 suffix   string
 contains string
}

func newFastRegexMatcher(v string) (*fastRegexMatcher, error) {
 re, err := regexp.Compile("^(?:" + v + ")$")
 if err != nil {
  return nil, err
 }

 parsed, err := syntax.Parse(v, syntax.Perl)
 if err != nil {
  return nil, err
 }

 m := &fastRegexMatcher{
  re: re,
 }

 if parsed.Op == syntax.OpConcat {
  m.prefix, m.suffix, m.contains = optimizeConcatRegex(parsed)
 }

 return m, nil
}

// optimizeConcatRegex returns literal prefix/suffix text that can be safely
// checked against the label value before running the regexp matcher.
func optimizeConcatRegex(r *syntax.Regexp) (prefix, suffix, contains string) {
 sub := r.Sub

 // We can safely remove begin and end text matchers respectively
 // at the beginning and end of the regexp.
 if len(sub) > 0 && sub[0].Op == syntax.OpBeginText {
  sub = sub[1:]
 }
 if len(sub) > 0 && sub[len(sub)-1].Op == syntax.OpEndText {
  sub = sub[:len(sub)-1]
 }

 if len(sub) == 0 {
  return
 }

 // Given Prometheus regex matchers are always anchored to the begin/end
 // of the text, if the first/last operations are literals, we can safely
 // treat them as prefix/suffix.
 if sub[0].Op == syntax.OpLiteral && (sub[0].Flags&syntax.FoldCase) == 0 {
  prefix = string(sub[0].Rune)
 }
 if last := len(sub) - 1; sub[last].Op == syntax.OpLiteral && (sub[last].Flags&syntax.FoldCase) == 0 {
  suffix = string(sub[last].Rune)
 }

 // If contains any literal which is not a prefix/suffix, we keep the
 // 1st one. We do not keep the whole list of literals to simplify the
 // fast path.
 for i := 1; i < len(sub)-1; i++ {
  if sub[i].Op == syntax.OpLiteral && (sub[i].Flags&syntax.FoldCase) == 0 {
   contains = string(sub[i].Rune)
   break
  }
 }

 return
}

func (m *fastRegexMatcher) MatchString(s string) bool {
 if m.prefix != "" && !strings.HasPrefix(s, m.prefix) {
  return false
 }

 if m.suffix != "" && !strings.HasSuffix(s, m.suffix) {
  return false
 }

 if m.contains != "" && !strings.Contains(s, m.contains) {
  return false
 }
 return m.re.MatchString(s)
}

🗂 存儲佈局

既然是數據庫,那麼自然少不了數據持久化的特性。瞭解完索引的設計,再看看落到磁盤的存儲佈局就很清晰了。先跑個示例程序寫入一些數據熱熱身。

package main

import (
 "fmt"
 "math/rand"
 "strconv"
 "time"

 "github.com/chenjiandongx/mandodb"
 "github.com/satori/go.uuid"
)

// 模擬一些監控指標
var metrics = []string{
 "cpu.busy""cpu.load1""cpu.load5""cpu.load15""cpu.iowait",
 "disk.write.ops""disk.read.ops""disk.used",
 "net.in.bytes""net.out.bytes""net.in.packages""net.out.packages",
 "mem.used""mem.idle""mem.used.bytes""mem.total.bytes",
}

// 增加 Label 數量
var uid1, uid2, uid3 []string

func init() {
 for i := 0; i < len(metrics); i++ {
  uid1 = append(uid1, uuid.NewV4().String())
  uid2 = append(uid2, uuid.NewV4().String())
  uid3 = append(uid3, uuid.NewV4().String())
 }
}

func genPoints(ts int64, node, dc int) []*mandodb.Row {
 points := make([]*mandodb.Row, 0)
 for idx, metric := range metrics {
  points = append(points, &mandodb.Row{
   Metric: metric,
   Labels: []mandodb.Label{
    {Name: "node", Value: "vm" + strconv.Itoa(node)},
    {Name: "dc", Value: strconv.Itoa(dc)},
    {Name: "foo", Value: uid1[idx]},
    {Name: "bar", Value: uid2[idx]},
    {Name: "zoo", Value: uid3[idx]},
   },
   Point: mandodb.Point{Ts: ts, Value: float64(rand.Int31n(60))},
  })
 }

 return points
}

func main() {
 store := mandodb.OpenTSDB()
 defer store.Close()

 now := time.Now().Unix() - 36000 // 10h ago

 for i := 0; i < 720; i++ {
  for n := 0; n < 5; n++ {
   for j := 0; j < 1024; j++ {
    _ = store.InsertRows(genPoints(now, n, j))
   }
  }

  now += 60 //1min
 }

 fmt.Println("finished")

 select {}
}

每個分塊保存在名字爲 seg-${mints}-${maxts} 文件夾裏,每個文件夾含有 datameta.json 兩個文件。

❯ 🐶 tree -h seg-*
seg-1627709713-1627716973
├── [ 28M]  data
└── [ 110]  meta.json
seg-1627716973-1627724233
├── [ 28M]  data
└── [ 110]  meta.json
seg-1627724233-1627731493
├── [ 28M]  data
└── [ 110]  meta.json
seg-1627731493-1627738753
├── [ 28M]  data
└── [ 110]  meta.json
seg-1627738753-1627746013
├── [ 28M]  data
└── [ 110]  meta.json

0 directories, 10 files

❯ 🐶 cat seg-1627709713-1627716973/meta.json -p
{
    "seriesCount": 81920,
    "dataPointsCount": 9912336,
    "maxTs": 1627716973,
    "minTs"1627709713
}

存儲 8 萬條時間線共接近 1 千萬的數據點的數據塊佔用磁盤 28M。實際上在寫入的時候,一條數據是這個樣子的。

{__name__="cpu.busy"node="vm0"dc="0"foo="bdac463d-8805-4cbe-bc9a-9bf495f87bab"bar="3689df1d-cbf3-4962-abea-6491861e62d2"zoo="9551010d-9726-4b3b-baf3-77e50655b950"} 1627710454 41

這樣一條數據按照 JSON 格式進行網絡通信的話,大概是 200Byte,初略計算一下。

200 * 9912336 = 1982467200Byte = 1890M

可以選擇 ZSTD 或者 Snappy 算法進行二次壓縮(默認不開啓)。還是上面的示例代碼,不過在 TSDB 啓動的時候指定了壓縮算法。

ZstdBytesCompressor

func main() {
 store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.ZstdBytesCompressor))
 defer store.Close()
 // ...
}

// 壓縮效果 28M -> 25M
❯ 🐶 ll seg-1627711905-1627719165
Permissions Size User          Date Modified Name
.rwxr-xr-x   25M chenjiandongx  1 Aug 00:13  data
.rwxr-xr-x   110 chenjiandongx  1 Aug 00:13  meta.json

SnappyBytesCompressor

func main() {
 store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.SnappyBytesCompressor))
 defer store.Close()
 // ...
}

// 壓縮效果 28M -> 26M
❯ 🐶 ll seg-1627763918-1627771178
Permissions Size User          Date Modified Name
.rwxr-xr-x   26M chenjiandongx  1 Aug 14:39  data
.rwxr-xr-x   110 chenjiandongx  1 Aug 14:39  meta.json

多多少少還是有點效果的 🤪...

壓縮是有成本的,壓縮體積的同時會增大 CPU 開銷(mbp 可以煎雞蛋了),減緩寫入速率。

敲黑板,接下來就要來好好講講 data 文件到底寫了什麼東西。 data 存儲佈局如下。

Figure: Segment Stroage

TOC 描述了 Data Block 和 Meta Block(Series Block + Labels Block)的體積,用於後面對 data 進行解析讀取。Data Block 存儲了每條時間線具體的數據點,時間線之間數據緊挨存儲。DataContent 就是使用 Gorilla 差值算法壓縮的 block。

Figure: Data Block

Labels Block 記錄了具體的 Label 值以及對應 Label 與哪些 Series 相關聯。

Figure: Labels Block

Series Block 記錄了每條時間線的元數據,字段解釋如下。

Figure: Series Block

瞭解完設計,再看看 Meta Block 編碼和解編碼的代碼實現,binaryMetaSerializer 實現了 MetaSerializer 接口。

type MetaSerializer interface {
 Marshal(Metadata) ([]byte, error)
 Unmarshal([]byte, *Metadata) error
}

編碼 Metadata

const (
 endOfBlock uint16 = 0xffff
 uint16Size        = 2
 uint32Size        = 4
 uint64Size        = 8

 magic = "https://github.com/chenjiandongx/mandodb"
)

func (s *binaryMetaSerializer) Marshal(meta Metadata) ([]byte, error) {
 encf := newEncbuf()

 // labels block
 labelOrdered := make(map[string]int)
 for idx, row := range meta.Labels {
  labelOrdered[row.Name] = idx
  encf.MarshalUint16(uint16(len(row.Name)))
  encf.MarshalString(row.Name)
  encf.MarshalUint32(uint32(len(row.Sids)))
  encf.MarshalUint32(row.Sids...)
 }
 encf.MarshalUint16(endOfBlock)

 // series block
 for idx, series := range meta.Series {
  encf.MarshalUint16(uint16(len(series.Sid)))
  encf.MarshalString(series.Sid)
  encf.MarshalUint64(series.StartOffset, series.EndOffset)

  rl := meta.sidRelatedLabels[idx]
  encf.MarshalUint32(uint32(rl.Len()))

  lids := make([]uint32, 0, rl.Len())
  for _, lb := range rl {
   lids = append(lids, uint32(labelOrdered[lb.MarshalName()]))
  }

  sort.Slice(lids, func(i, j int) bool {
   return lids[i] < lids[j]
  })
  encf.MarshalUint32(lids...)
 }
 encf.MarshalUint16(endOfBlock)

 encf.MarshalUint64(uint64(meta.MinTs))
 encf.MarshalUint64(uint64(meta.MaxTs))
 encf.MarshalString(magic)   // <-- magic here

 return ByteCompress(encf.Bytes()), nil
}

解碼 Metadata

func (s *binaryMetaSerializer) Unmarshal(data []byte, meta *Metadata) error {
 data, err := ByteDecompress(data)
 if err != nil {
  return ErrInvalidSize
 }

 if len(data) < len(magic) {
  return ErrInvalidSize
 }

 decf := newDecbuf()
 // 檢驗數據完整性
 if decf.UnmarshalString(data[len(data)-len(magic):]) != magic {
  return ErrInvalidSize
 }

 // labels block
 offset := 0
 labels := make([]seriesWithLabel, 0)
 for {
  var labelName string
  labelLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
  offset += uint16Size

  if labelLen == endOfBlock {
   break
  }

  labelName = decf.UnmarshalString(data[offset : offset+int(labelLen)])
  offset += int(labelLen)
  sidCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
  offset += uint32Size

  sidLst := make([]uint32, sidCnt)
  for i := 0; i < int(sidCnt); i++ {
   sidLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
   offset += uint32Size
  }
  labels = append(labels, seriesWithLabel{Name: labelName, Sids: sidLst})
 }
 meta.Labels = labels

 // series block
 rows := make([]metaSeries, 0)
 for {
  series := metaSeries{}
  sidLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
  offset += uint16Size

  if sidLen == endOfBlock {
   break
  }

  series.Sid = decf.UnmarshalString(data[offset : offset+int(sidLen)])
  offset += int(sidLen)
  series.StartOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
  offset += uint64Size
  series.EndOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
  offset += uint64Size
  labelCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
  offset += uint32Size

  labelLst := make([]uint32, labelCnt)
  for i := 0; i < int(labelCnt); i++ {
   labelLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
   offset += uint32Size
  }
  series.Labels = labelLst
  rows = append(rows, series)
 }
 meta.Series = rows

 meta.MinTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
 offset += uint64Size
 meta.MaxTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
 offset += uint64Size

 return decf.Err()
}

至此,對 mandodb 的索引和存儲整體設計是不是就瞭然於胸。🥺 文檔較長,建議 Star 收藏,畢竟來都來了...

項目地址:https://github.com/chenjiandongx/mandodb

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/PqVHjGLLu5dXxjHubPbXYA