VictorialMetrics 存儲原理之索引
前文我們介紹了 VictorialMetrics 中是如何接收和傳輸數據的,接下來我們來分析下當 vmstorage 接收到數據後是如何保存監控指標的。
現在我們使用 csv 來導入一行指標數據,直接使用下面的請求即可:
curl -d "GOOG,1.23,4.56,NYSE" 'http://127.0.0.1:8480/insert/0/prometheus/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
執行上面的請求後,在 vmstorage 組件下面會收到如下所示的一些日誌信息:
同時在數據目錄 vmstorage-data 下面也多了一個 cache 目錄,而且 data 下面的 small 目錄和 indexdb 目錄下面也生成了一些文件,這些文件就是用來存儲指標數據的。
接下來我們就來仔細分析下這些文件是幹什麼的,以及這些文件的存儲格式是怎樣的。
要想弄明白 vmstorage 是如何去存儲數據的,首先我們要先弄明白幾個概念。
存儲格式
下圖是 VictoriaMetrics 支持的 Prometheus 協議的一個寫入示例。
VM 在收到寫入請求時,會對請求中包含的時序數據做轉換處理。首先根據包含 metric 和 labels 的 MetricName 生成一個唯一標識 TSID,然後 metric(指標名稱__name__) + labels + TSID 作爲索引 index,TSID + timestamp + value 作爲數據 data,最後索引 index 和數據 data 分別進行存儲和檢索。
因此 VM 的數據整體上分成索引和數據兩個部分,因此文件格式整體上會有兩個部分,其中索引部分主要是用於支持按照 label 或者 tag 進行多維檢索,數據存儲時,先將數據按 TSID 進行分組,然後每個 TSID 包含的數據點各自使用列式壓縮存儲。
TSID
VictoriaMetrics 的 MetricName 的結構如下所示,包含 MetricGroup(指標名稱 __name__) 和 Tag 數組,其中,Tags 是可選的,每個 Tag 由 Key 和 Value 等字節數組構成。
爲了規範,Tags 必須按標籤 Key 排序,使用 sortTags 方法。
VictoriaMetrics 的 TSID 的結構如下所示,包含 MetricGroupID、JobID、InstanceID、MetricID 等幾個字段,其中除了 MetricID 外,其他字段都是可選的。這個幾個 ID 的生成方法如下:
-
MetricGroupID是根據MetricName中的MetricGroup使用xxhash的 sum64 算法生成。 -
JobID和InstanceID分別由MetricName中的第一個 tag 和第二個 tag 使用xxhash的 sum64 算法生成。爲什麼使用第一個 tag 和第二個 tag?這是因爲 VictoriaMetrics 在寫入時,將寫入請求中的 JobID 和 InstanceID 放在了 Tag 數組的第一個和第二個位置。 -
MetricID,使用 VictoriaMetrics 進程啓動時的系統納秒時間戳自增生成。
// lib/storage/tsid.go
// TSID 是一個時間序列的唯一 ID,實際上就是唯一標識一個時間序列的結構體。
//
// 時間序列會根據 TSID 進行排序。
//
// 除了 MetricID 之外其他屬性都是可選的。 它們的存在僅僅是爲了更好地對相關指標進行分組。
// 如果它們的含義與它們的命名不同,那也沒關係。
type TSID struct {
AccountID uint32
ProjectID uint32 // 下面分析的時候可以暫時忽略這兩個屬性,用於多租戶標識的屬性
// MetricGroupID(指標組ID)對於指定的(AccountID, ProjectID)必須是唯一的。
//
// Metric Group 包含具有相同名稱的指標,例如 “memory_usage”、“http_requests”,但具有不同的標籤。
// 例如,下面的這些指標屬於 memory_usage 這個指標組:
//
// memory_usage{datacenter="foo1", job="bar1", instance="baz1:1234"}
// memory_usage{datacenter="foo1", job="bar1", instance="baz2:1234"}
// memory_usage{datacenter="foo1", job="bar2", instance="baz1:1234"}
// memory_usage{datacenter="foo2", job="bar1", instance="baz2:1234"}
MetricGroupID uint64
// JobID 是給定項目的單個作業(又名服務)的 ID。
//
// JobID 對於指定的(AccountID, ProjectID)必須是唯一的。
//
// 一個 Job 任務可能由多個實例組成。
// See https://prometheus.io/docs/concepts/jobs_instances/ for details.
JobID uint32
// InstanceID 是實例(進程)ID,對於特定的(AccountID, ProjectID)必須是唯一的。
InstanceID uint32
// MetricID 是指標(時間序列)的唯一ID。
//
// 其他所有的 TSID 字段都可以通過 MetricID 獲取。
MetricID uint64
}
因爲 TSID 中除了 MetricID 外,其他字段都是可選的,因此 TSID 中可以始終作爲有效信息的只有 MetricID,因此 VictoriaMetrics 的在構建 tag 到 TSID 的字典過程中,是直接存儲的 tag 到 MetricID 的字典。
以寫入 http_requests_total{status="200", method="GET"} 爲例,則 MetricName 爲 http_requests_total{status="200", method="GET"},假設生成的 TSID 爲 {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286},則 VictoriaMetrics 在寫入時就構建瞭如下幾種類型的索引 item,其他類型的索引 item 是在後臺或者查詢時構建的。
-
metricName -> TSID, 即http_requests_total{status="200", method="GET"} -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286} -
metricID -> metricName,即51106185174286 -> http_requests_total{status="200", method="GET"} -
metricID -> TSID,即51106185174286 -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286} -
tag -> metricID,即status="200" -> 51106185174286、method="GET" -> 51106185174286、"__name__" = http_requests_total -> 51106185174286(其實還有一個聯合索引)
有了這些索引的 item 後,就可以支持基於 tag 的多維檢索了,在當給定查詢條件 http_requests_total{status="200"} 時,VictoriaMetrics 先根據給定的 tag 條件,找出每個 tag 的 metricID 列表,然後計算所有 tag 的 metricID 列表的交集,然後根據交集中的 metricID,再到索引文件中檢索出 TSID,根據 TSID 就可以到數據文件中查詢數據了,在返回結果之前,再根據 TSID 中的 metricID,到索引文件中檢索出對應的寫入時的原始 MetircName。
但是由於 VictoriaMetrics 的 tag 到 metricID 的字典,沒有將相同 tag 的所有 metricID 放在一起存儲,在檢索時,一個 tag 可能需要查詢多次才能得到完整的 metricID 列表。另外查詢出 metricID 後,還要再到索引文件中去檢索 TSID 才能去數據文件查詢數據,又增加了一次 IO 開銷。這樣來看的話,VictoriaMetrics 的索引文件在檢索時,如果命中的時間線比較多的情況下,其 IO 開銷會比較大,查詢延遲也會比較高。
這裏我們瞭解了 TSID 這個非常重要的概念,還有幾個結構體需要我們瞭解下,比如 rawRow 表示一個原始的時間序列行,MetricRow 表示插入到存儲中的指標數據:
// lib/storage/raw_row.go
// rawRow 表示一個原始的時間序列行
type rawRow struct {
TSID TSID // 時間序列ID
Timestamp int64 // 時間戳
Value float64 // 給定時間戳的時間序列值
// PrecisionBits是要存儲的值中的有效位數,可能值爲 [1..64]
// 1 表示最大. 50% error, 2 - 25%, 3 - 12.5%, 64 沒有錯誤, i.e.
// 存儲的值不會丟失精度
PrecisionBits uint8
}
// libe/storage/storage.go
// MetricRow 插入到存儲中的指標
type MetricRow struct {
// MetricNameRaw 包含原始的指標名稱,必須使用 metricne.UnmarshalRaw 對其進行解碼。
MetricNameRaw []byte
Timestamp int64
Value float64
}
插入指標
有了上面幾個概念的認識,現在我們回過頭再去看下 vmstorage 中對 vminsert 請求的處理:
// app/vmstorage/transport/server.go
func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
return clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits))
}, s.storage.IsReadOnly)
}
當 vmstorage 節點接收到數據後,最後會通過回調執行 s.storage.AddRows(rows, uint8(*precisionBits)),該函數將數據添加到底層存儲去:
// lib/storage/storage.go
// AddRows 添加 mrs 集合到存儲 s
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 {
return nil
}
// 限制可能向存儲添加行的併發 goroutine 數量
// 當太多的 goroutine 調用 AddRows 時,這應該可以防止內存不足錯誤和 CPU 抖動。
select {
// 如果寫入 channel 成功,說明併發小於 CPU 最大核數,然後就可以走插入邏輯
// 如果沒寫入成功(也就是滿了),則執行default case
case addRowsConcurrencyCh <- struct{}{}:
default: // 如果插入 channel 失敗,說明某個 insert 操作的協程被阻塞,這時需要通知 select 協程去讓出。
atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
t := timerpool.Get(addRowsTimeout) // 獲取一個30s超時的timer
// 數據攝取優先級高於併發搜索
// pacelimiter(步長限制器)中有個原子累加的變量,表示有多少個 insert 操作在等待
// 走到這裏證明有一個 insert 操作被阻塞了,調用 Inc,表示需要(Search操作)等待
storagepacelimiter.Search.Inc()
select { // 寫入不成功或者還未超時就會阻塞在這裏了
// 在超時的30s時間內,嘗試去寫入 channel 隊列
case addRowsConcurrencyCh <- struct{}{}:
timerpool.Put(t) // 把 timer 放回對象池,減少 GC
// 可以成功寫入 channel 了,那麼可以執行 insert 操作了,則執行限制器的 Dec 操作,減一
storagepacelimiter.Search.Dec()
// 當限制器的等待數量爲0的時候,會調用 cond.Broadcast() 去通知 select 協程開始工作。
case <-t.C: // 到30s超時時間了
// 把 timer 放回對象池,減少 GC timerpool.Put(t)
// 超時了那麼當前的 insert 就報錯了,等待的數量就可以減一了
storagepacelimiter.Search.Dec()
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) // 記錄下超時次數
atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs))) // 記錄沒有被插入成功的 mr 數量
// 等待了30秒仍然沒有CPU資源,只能報錯
return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load",
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
}
}
// 下面是插入邏輯
// 一次插入不要太大
var firstErr error
ic := getMetricRowsInsertCtx()
maxBlockLen := len(ic.rrs)
for len(mrs) > 0 {
mrsBlock := mrs
// 如果要插入的 mrs 超過了最大長度
if len(mrs) > maxBlockLen {
// 則先插入最大長度的 mrs mrsBlock = mrs[:maxBlockLen]
// 剩下的 mrs 下次循環去處理
mrs = mrs[maxBlockLen:]
} else {
mrs = nil
}
// 執行真正的 add 操作
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
// 記錄下插入成功的 mrs 數量
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
}
// 放回對象池
putMetricRowsInsertCtx(ic)
<-addRowsConcurrencyCh // insert 邏輯執行完成後,出隊
return firstErr
}
該函數的實現非常經典,會限制可能向存儲添加數據的併發 goroutine 數量,當太多的 goroutine 調用 AddRows 時,可以防止內存不足錯誤和 CPU 抖動。這裏實現了插入比查詢更高的優先級,當資源不足時,查詢操作會掛起讓出資源給到插入操作使用。
獲取 TSID
真正實現添加數據是下面的 add 函數,其中 rawRow 是原始的時序數據行,MetricRow 是要插入到存儲中的行數據,該函數的核心就是要生成指標序列的 TSID 數據,如下所示:
// lib/storage/storage.go
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
// 當前使用的索引
idb := s.idb()
j := 0
var (
// 這些變量用於加速同一 metricName 的多個相鄰行的批量導入。
prevTSID TSID
prevMetricNameRaw []byte
)
var pmrs *pendingMetricRows
// 獲取該數據塊的最小時間和最大時間
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
// 帶有第幾代索引信息的 TSID 對象
var genTSID generationTSID
// 只返回第一個錯誤,因爲它返回所有錯誤沒有意義
var firstWarn error
// 循環數據行,其實就是填充 rawRow 中的 TSID 數據
for i := range mrs {
mr := &mrs[i]
if math.IsNaN(mr.Value) { // 值爲 NaN
if !decimal.IsStaleNaN(mr.Value) {
// 跳過 Prometheus staleness 標記以外的 NaN
// 因爲底層編碼不知道如何使用它們。
continue
}
}
// 如果指標的時間戳小於最小的時間戳
// 則跳過保留期外時間戳過小的行
if mr.Timestamp < minTimestamp {
......
continue
}
// 同樣跳過超過最大時間戳的數據
if mr.Timestamp > maxTimestamp {
......
continue
}
dstMrs[j] = mr
r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
// 快速路徑 - 當前 mr 包含與前一 mr 相同的指標名稱,因此它包含相同的 TSID。
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
// 當許多行包含相同的 MetricNameRaw 時,應在批量導入時觸發此路徑。
r.TSID = prevTSID
continue
}
// 判斷 TSID 是否在緩存中(命中緩存)
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
r.TSID = genTSID.TSID
// 跳過該行,因爲已超出唯一序列數的限制。
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
j--
continue
}
// 快速路徑 - 給定 MetricNameRaw 的 TSID 已在緩存中找到,並且未刪除。
// 不需要檢查 r.TSID.MetricID 是否已刪除,因爲 tsidCache 不包含已刪除時間序列的 MetricName -> TSID 條目,可以查看 Storage.DeleteMetrics 的代碼
prevTSID = r.TSID // 設置前一個 TSID 的值
prevMetricNameRaw = mr.MetricNameRaw // 設置前一個 MetricNameRaw 的值
// 找到的TSID不是當前代的索引(來自上一代緩存下來的索引)
if genTSID.generation != idb.generation {
// 索引需要嘗試使用該 TSID 重新填充當前代的索引數據
// https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
created, err := idb.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw)
if err != nil {
return fmt.Errorf("cannot create indexes in the current indexdb: %w", err)
}
if created {
// 如果填充成功,則將當前的 TSID 設置爲當前代索引
genTSID.generation = idb.generation
// 重新將該 TSID -> MetricNameRaw 數據放回緩存,方便後面的序列處理
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
}
continue
}
// 慢速路徑 - 緩存中缺少TSID
// 在下面的循環中推遲搜索
j--
if pmrs == nil {
// 初始化 pendingMetricRows
pmrs = getPendingMetricRows()
}
// 將 mr 數據添加到 pendingMetricRows 中去待處理
if err := pmrs.addRow(mr); err != nil {
// 錯誤時不要停止添加數據 - 只需跳過無效行即可。
// 這保證了無效行不會阻止將有效行添加到存儲中去。
if firstWarn == nil {
firstWarn = err
}
continue
}
}
// 有指標的 TSID 沒有在緩存中(上面的慢速路徑)
if pmrs != nil {
// 按指標名稱對 pendingMetricRows 進行排序,以便通過下面循環中的 “is” 加快搜索速度。
pendingMetricRows := pmrs.pmrs
sort.Slice(pendingMetricRows, func(i, j int) bool {
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
})
//
is := idb.getIndexSearch(0, 0, noDeadline)
prevMetricNameRaw = nil // 接收前一個 MetricNameRaw
var slowInsertsCount uint64
for i := range pendingMetricRows {
pmr := &pendingMetricRows[i]
mr := pmr.mr // MetricRaw
dstMrs[j] = mr
r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
// 快速路徑 - 當前 mr 包含與前一個 mr 相同的指標名稱,因此它包含相同的 TSID。
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
// 當許多行包含相同的 MetricNameRaw 時,在批量導入時會觸發該路徑。
r.TSID = prevTSID
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
// 跳過該行,因爲已超出唯一序列數的限制
j--
continue
}
continue
}
// 慢速路徑
slowInsertsCount++ // 記錄慢插入次數
// 通過 MetricName 去獲取(沒有就創建)TSID 數據
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot obtain or create TSID for MetricName %q: %w", pmr.MetricName, err)
}
j--
continue
}
// 設置 genTSID 爲當前生成的 TSID
genTSID.generation = idb.generation
genTSID.TSID = r.TSID
// 返回緩存
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
// 緩存當前的 TSID 和 MetricNameRaw,方便下一條序列快速處理
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
// 跳過該行,因爲已超出唯一序列數的限制
j--
continue
}
}
// 回收對象
idb.putIndexSearch(is)
putPendingMetricRows(pmrs)
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
}
// 提示錯誤信息
if firstWarn != nil {
logger.WithThrottler("storageAddRows", 5*time.Second).Warnf("warn occurred during rows addition: %s", firstWarn)
}
dstMrs = dstMrs[:j]
rows = rows[:j]
// TSID 填充完成,可以插入數據了
var firstError error
if err := s.tb.AddRows(rows); err != nil {
firstError = fmt.Errorf("cannot add rows to table: %w", err)
}
if err := s.updatePerDateData(rows, dstMrs); err != nil && firstError == nil {
firstError = fmt.Errorf("cannot update per-date data: %w", err)
}
if firstError != nil {
return fmt.Errorf("error occurred during rows addition: %w", firstError)
}
return nil
}
首先循環數據,把時間戳過小或過大的都過濾掉,然後就是想辦法儘可能快地獲取到指標的 TSID:
-
快速路徑 - 當前 MetricRow 包含與前一 MetricRow 相同的指標名稱,因此它們具有相同的 TSID,所以直接將當前對象的 TSID 設置成前一個 TSID,這是最快的方式。
-
如果和前一個指標名稱不一樣,則去查看 genTSID 是否在緩存中(命中緩存)
-
如果命中緩存則 genTSID 中的 TSID 就是我們需要的,同時也將其設置爲前一個 prevTSID。如果該 TSID 不是當代的索引(來自上一代緩存下來的索引),則需要嘗試使用該 TSID 重新填充當代的索引數據,這和索引輪換有關,後面會詳細說明。
-
如果沒有命中緩存,則屬於慢速路徑,將當前數據添加到
pendingMetricRows中去待處理 -
循環了所有指標數據後,接下來需要處理
pendingMetricRows中的數據,也就是緩存中沒有對應的 TSID,此時就需要我們去生成對應的 TSID 數據。 -
快速路徑 - 同樣是當前 MetricRow 與前一個 MetricRow 的指標名稱相同,因此它包含相同的 TSID,直接設置成前一個 TSID 即可。
-
慢速路徑 - 走到這個分支則只能去創建 TSID 了,通過 MetricName 去獲取(沒有就創建)TSID 數據,也就是上面的
GetOrCreateTSIDByName函數。獲取後記得放到緩存中去。
上面費了很大的功夫就是爲了獲取時間序列對應的 TSID 數據的,這也是插入數據過程中最可能出現慢插入的地方,因爲該過程涉及到索引,比較耗時間,如果你插入的數據出現大量的高基數序列(比如包含一些隨機生成的 ID 作爲標籤),則會大大降低 vmstorage 的插入性能。
我們可以去查看下 GetOrCreateTSIDByName 函數的實現。
// lib/storage/index_db.go
// GetOrCreateTSIDByName 使用指定 metricName 的 TSID 填充 dst。
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error {
// hack:在多次連續未命中後跳過 TSID 的搜索
// 這將提高大批量新時間序列的插入性能。
if is.tsidByNameMisses < 100 {
err := is.getTSIDByMetricName(dst, metricName)
if err == nil {
is.tsidByNameMisses = 0
return nil
}
if err != io.EOF {
return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)
}
is.tsidByNameMisses++
} else {
is.tsidByNameSkips++
if is.tsidByNameSkips > 10000 {
is.tsidByNameSkips = 0
is.tsidByNameMisses = 0
}
}
// 找不到給定名稱的 TSID,創建它。
// 如果 mn 的重複 TSID 是由併發 goroutines 創建的,那麼這也是可以的。
// 指標結果將在表搜索 TableSearch 後由 mn 合併。
if err := is.db.createTSIDByName(dst, metricName); err != nil {
return fmt.Errorf("cannot create TSID by MetricName %q: %w", metricName, err)
}
return nil
}
// 根據 metricName 去搜索 TSID
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
dmis := is.db.s.getDeletedMetricIDs()
ts := &is.ts // TableSearch
kb := &is.kb
kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID) // MetricName -> TSID 的前綴
kb.B = append(kb.B, metricName...)
kb.B = append(kb.B, kvSeparatorChar)
ts.Seek(kb.B) // Seek 查找 ts 中大於或等於 k 的第一項
for ts.NextItem() { // 循環查找
if !bytes.HasPrefix(ts.Item, kb.B) { // ts.Item 不是以 kb.B 爲前綴
// 沒找到
return io.EOF
}
v := ts.Item[len(kb.B):] // 獲得尾部的值
tail, err := dst.Unmarshal(v) // 填充dst
if err != nil {
return fmt.Errorf("cannot unmarshal TSID: %w", err)
}
if len(tail) > 0 { // 尾部還有值
return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)
}
if dmis.Len() > 0 { // 有標記刪除的 MetricID 列表
// 驗證 dst 是否標記爲已刪除。
if dmis.Has(dst.MetricID) {
// dst 被刪除了,繼續搜索。
continue
}
}
// 找到了有效的 dst
return nil
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching TSID by metricName; searchPrefix %q: %w", kb.B, err)
}
// 什麼都沒發現
return io.EOF
}
該函數會獲取 metricName 對應的 TSID,但是可能會出現多次連續未命中的情況,爲了提高性能,這裏做了一點 hack,如果連續未查詢到 TSID 100 次則跳過搜索,就只能去創建 TSID 了,如果跳過了 10000 次則又重置可以重新去搜索。
搜索 TSID 是通過下面的 getTSIDByMetricName 函數來實現的,創建 TSID 是通過 createTSIDByName 函數實現的。
TSID 的生成方法如下所示:
// lib/storage/index_db.go
// 根據指定的 metricName 創建 TSID
func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
mn := GetMetricName()
defer PutMetricName(mn)
if err := mn.Unmarshal(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
}
// 創建 TSID
created, err := db.getOrCreateTSID(dst, metricName, mn)
if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err)
}
// TSID 創建後要創建索引,這一步是最耗時的
if err := db.createIndexes(dst, mn); err != nil {
return fmt.Errorf("cannot create indexes: %w", err)
}
// 不需要使 tag 緩存無效,因爲它在 db 上無效,tb 通過傳遞給 OpenTable 的invalidateTagFiltersCache flushCallback 刷新。
if created {
// 僅當 indexDB 中未找到 tsid 時,才增加 newTimeseriesCreated 計數器
atomic.AddUint64(&db.newTimeseriesCreated, 1)
if logNewSeries {
logger.Infof("new series created: %s", mn.String())
}
}
return nil
}
// getOrCreateTSID 在 db.extDB 中查找指定 metricName 的 TSID
// 如果找不到任何內容,則創建新的 TSID
//
// 如果 TSID 已創建,則返回 true;如果 TSID 在 extDB 中,則返回 false
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) {
// 在外部存儲中搜索 TSID
// 這個 db 通常來自上一個時期
var err error
// 相當於去上一個索引 db 中查找 TSID
if db.doExtDB(func(extDB *indexDB) {
err = extDB.getTSIDByNameNoCreate(dst, metricName)
}) {
if err == nil {
// 已在外部存儲中找到 TSID
return false, nil
}
if err != io.EOF {
return false, fmt.Errorf("external search failed: %w", err)
}
}
// 在外部存儲中找不到 TSID,在本地生成。
generateTSID(dst, mn)
return true, nil
}
// 生成 TSID 數據
func generateTSID(dst *TSID, mn *MetricName) {
dst.AccountID = mn.AccountID
dst.ProjectID = mn.ProjectID
// 根據 MetricName 中的 MetricGroup 使用 xxhash 的 sum64 算法生成。
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
// 假設 job-like metric 放在 mn.Tags[0],而 instance-like metric 放在 mn.Tags[1]
// 這個假設是正確的,因爲 mn.Tags 必須在調用 generateTSID() 函數之前使用 mn.sortTags() 進行排序。
// 這允許對磁盤上彼此靠近的相同(job、instance)的數據塊進行分組。
// 當從磁盤讀取相同 job 和/或 instance 的數據塊時,這會減少磁盤尋道和磁盤讀取 IO。
// 例如,與 `process_resident_memory_bytes{job="vmstorage"}` 匹配的時間序列的數據塊在磁盤上是物理相鄰的。
if len(mn.Tags) > 0 {
dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value)) // 第一個Tag規定爲 JobID
}
if len(mn.Tags) > 1 {
dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value)) // 第二個Tag規定爲 InstanceID
}
dst.MetricID = generateUniqueMetricID() // 生成唯一的指標ID
}
MetricID 通過 generateUniqueMetricID() 生成, 在重啓時, nextUniqueMetricID 被賦值爲當時的時間戳, 隨後每次新的 TSID 的創建都會在此基礎之上 + 1。
// lib/storage/index_db.go
// 生成唯一的 MetricID
func generateUniqueMetricID() uint64 {
// 期望的是從此函數返回的 metricID 必須是密集的。
// 如果它們是稀疏的,那麼這可能會損害 metric_ids 與 uint64set.Set 的交集性能。
return atomic.AddUint64(&nextUniqueMetricID, 1)
}
// 該數在重新啓動時不能倒退,否則可能會發生 metricID 衝突。
// 所以不要在 VictoriaMetrics 重新啓動期間更改服務器上的時間。
var nextUniqueMetricID = uint64(time.Now().UnixNano())
但是我們可能在這裏看不懂 TSID 是如何去搜索或者創建的,這就需要我們去了解下 VM 中的倒排索引了。
倒排索引
當創建完 TSID 後, 需要建立一系列的索引供查找時使用。在 VM 中不同類型的索引都是通過 KV 關係來描述,在代碼中稱爲 Item , Item 的結構如下:
在 VM 中 Item 的整體上是一個 KV 結構的字節數組,共計有 7 種類型,每種類型的 Item 通過固定前綴來區分,前綴類型如下圖所示。
在 storage/index_db.go: createIndexes 函數中,去分別建立了各個索引,生成 Items,代碼如下所示:
// lib/storage/index_db.go
// 創建索引
func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error {
// 索引 items 的順序很重要,它保證了索引的一致性。
ii := getIndexItems()
defer putIndexItems(ii)
// 創建 MetricName -> TSID 的索引。
ii.B = append(ii.B, nsPrefixMetricNameToTSID) // 前綴
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar) // 分隔符
ii.B = tsid.Marshal(ii.B)
ii.Next()
// 創建 MetricID -> MetricName 索引。
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.B = mn.Marshal(ii.B)
ii.Next()
// 創建 MetricID -> TSID 索引
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToTSID, mn.AccountID, mn.ProjectID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// 創建 Tag -> MetricID 索引
prefix := kbPool.Get()
prefix.B = marshalCommonPrefix(prefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)
kbPool.Put(prefix)
// 將 Items 添加到 Table 中去
return db.tb.AddItems(ii.Items)
}
對於 ask{market="NYSE",ticker="GOOG"} 1.23 的時序指標,對應的 MetricName 爲 AccountID=0, ProjectID=0, ask{market="NYSE",ticker="GOOG"},假設生成的 TSID 爲:
{
AccountID: 0
ProjectID: 0
MetricGroupID: 6661248876682682060
JobID: 3817370224
InstanceID: 4166188337
MetricID: 1654132102944898001
}
則生成的索引 Item 邏輯結構如下圖所示:
上圖爲構建的 MetricName -> TSID 的索引,前綴爲 nsPrefixMetricNameToTSID=0,整個索引項就是一個 key: value 的形式,key 爲 MetricName 編碼後的值,value 爲 TSID 編碼後的值,中間通過一個 kvSeparator 的分隔符進行連接,當然這些值真正的存儲形式都是 []byte。除了上圖的這個索引之外還有幾個其他的索引:MetricID -> MetricName、MetricID -> TSID、Tag -> MetricID,方式都是一樣的,只是要注意每種索引的前綴是不一樣的。最後得到的索引就是上面構建的幾種索引的集合數組。
索引構建完成後又是如何去持久化數據的呢?保存的數據又是怎樣的格式呢?未完待續吧......
k8s 技術圈 專注容器、專注 kubernetes 技術......
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/5T3koechRO-ZtAeEttIIwA