VictorialMetrics 源碼分析之插入指標數據
爲了調試方便,這裏我們將 VictorialMetrics 代碼使用 Goland 打開。每個組件的入口位於 app/<module>/main.go,比如 vmstorage 組件的入口位於 app/vmstorage/main.go:
爲了對 VM 整個流暢分析,我們可以直接在 IDE 中來啓動這些組件。
直接在 vmstorage 入口的 main 函數上點擊 Run 'go build main.go' 即可啓動該組件:
通過日誌記錄可以看出 vmstorage 會在 8401 端口監聽 vmselect 的連接請求,在 8400 端口監聽 vminsert 的連接請求,其本身的服務會通過 8482 端口進行暴露。啓動後會在根目錄下面創建一個名爲 vmstorage-data 的數據目錄,該目錄就是用來保存 VM 的數據的,其中 data 目錄是監控指標數據目錄,indexdb 目錄是索引數據目錄,snapshots 是快照目錄,flock.lock 爲文件鎖文件,用於 VM 進程鎖住文件,不允許別的進程進行修改目錄或文件,如下所示:
數據目錄 data 下面包含兩個最主要的目錄 big 目錄和 small 目錄,這兩個目錄的結構是一樣的。
-
small 目錄:內存中的數據先持久化到目錄,壓縮比例高,會定期檢測判斷是否滿足 merge 條件,合併多個小文件。
-
big 目錄:small 過大後會合併到 big 目錄,壓縮比例極高。
索引目錄 indexdb 下面包含兩個目錄 16F29B51EDD96911、16F29B51EDD96912,這兩個目錄分別表示當前正在使用的索引目錄,和前面一次使用的索引目錄,爲什麼需要保留前面一次使用的呢?
這是因爲 VM 中會配置自動輪換的週期,比如可以配置 1 天、1 周、1 月等等,那麼這個週期到了後索引數據就要輪換,就相當於會創建一個新的目錄作爲最新的索引數據目錄,但是如果你直接將前面一個到期的索引刪除,那麼現在就沒有任何索引了,此時如果有大量的插入或者查詢操作的話比如就需要去生成大量的索引,而生成索引的是非常消耗資源的,索引會造成系統性能急劇下降,保留前面一個索引可以來判斷新的數據是否能命中前面的緩存,如果命中了則直接將之前的索引拷貝到最新的索引中來,這樣就大大提高了索引的效率,索引我們需要保留兩個索引,之前的索引則會刪除掉。
索引的名稱是根據系統的納秒時間戳原子 + 1 後生成的 16 進制數據:
// lib/storage/storage.go
func nextIndexDBTableName() string {
n := atomic.AddUint64(&indexDBTableIdx, 1)
return fmt.Sprintf("%016X", n)
}
var indexDBTableIdx = uint64(time.Now().UnixNano())
啓動 vmstorage 的時候就會去打開索引,默認路徑爲 <vmstorage-data>/indexdb:
// lib/storage/storage.go
// 打開索引數據表 path=vmstorage-data/indexdb
func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error) {
//索引目錄不存在則創建
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
}
d, err := os.Open(path)
if err != nil {
return nil, nil, fmt.Errorf("cannot open directory: %w", err)
}
defer fs.MustClose(d)
// 搜索最近的兩個表,最後一個表示活躍狀態的,前面一個包含備份數據
fis, err := d.Readdir(-1)
if err != nil {
return nil, nil, fmt.Errorf("cannot read directory: %w", err)
}
var tableNames []string
for _, fi := range fis {
if !fs.IsDirOrSymlink(fi) {
// 不是目錄則跳過
continue
}
tableName := fi.Name()
if !indexDBTableNameRegexp.MatchString(tableName) {
// 名稱不符合規範也有跳過
continue
}
// 剩下的就是所有的表名稱了
tableNames = append(tableNames, tableName)
}
// 對錶名進行排序
sort.Slice(tableNames, func(i, j int) bool {
return tableNames[i] < tableNames[j]
})
// 如果表名個數小於2,則創建
if len(tableNames) < 2 {
// 如果沒有表名,則先創建前面一個表名
if len(tableNames) == 0 {
// 生成前面一個表名
prevName := nextIndexDBTableName()
tableNames = append(tableNames, prevName)
}
//生成後面的一個表名(在前面表名的基礎上做原子+1操作的16進制數據)
currName := nextIndexDBTableName()
tableNames = append(tableNames, currName)
}
// Invariant: len(tableNames) >= 2
// 如果操過2個表,則只保留最後兩個表,其他不需要了,沒意義,因爲過期了
for _, tn := range tableNames[:len(tableNames)-2] {
pathToRemove := path + "/" + tn
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
fs.MustRemoveAll(pathToRemove)
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
}
// 持久化變更
fs.MustSyncPath(path)
// 打開最後兩個表
currPath := path + "/" + tableNames[len(tableNames)-1]
logger.Infof("1.prepare open index db currPath %s", currPath)
curr, err = openIndexDB(currPath, s, 0)
if err != nil {
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
}
prevPath := path + "/" + tableNames[len(tableNames)-2]
logger.Infof("2.prepare open index db prevPath %s", prevPath)
prev, err = openIndexDB(prevPath, s, 0)
if err != nil {
curr.MustClose()
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
}
return curr, prev, nil
}
當索引目錄不存在的時候會創建該目錄,然後去該目錄中查找最近的兩個索引,如果沒有兩個索引,則去生成對應的索引目錄,索引的名稱就是上面的納秒時間戳原子 + 1 後的 16 進制數據,然後通過 openIndexDB 函數分別打開這兩個索引。
openIndexDB 函數用於打開指定路徑的索引,其實就是生成一個 indexDB 對象,indexDB 結構體定義如下所示:
// lib/storage/index_db.go
// indexDB 代表一個 index db.
type indexDB struct {
// 原子計數器必須位於結構體的頂部,以便在32位架構上正確對齊8個字節
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
refCount uint64
// 新創建的時間序列的計數器,可用於確定時間序列的 churn rate。
newTimeseriesCreated uint64
// 在輪換後從以前的 indexDB 重新填充的時間序列的計數器。
timeseriesRepopulated uint64
// MetricID -> TSID 條目 miss 的數量
// 該值比率如果較高則證明 indexDB 損壞了
missingTSIDsForMetricID uint64
// date range 搜索的調用數
dateRangeSearchCalls uint64
// date range 搜索的命中數
dateRangeSearchHits uint64
// 全局搜索調用次數
globalSearchCalls uint64
// MetricID -> MetricName 條目 miss 的數量
// 高比率可能意味着由於不乾淨的關機導致索引數據庫損壞。
// 之後必須自動恢復db
missingMetricNamesForMetricID uint64
// 標記爲刪除
mustDrop uint64
// 標識索引的 生成 ID(可以看成是第幾代索引),並用於同步來自不同 indexDB 的數據
generation uint64
// indexDB 輪換的unix時間戳(以秒爲單位)。
rotationTimestamp uint64
// 索引名稱
name string
// Table 表結構
tb *mergeset.Table
// 相當於之前的一個 indexDB
extDB *indexDB
extDBLock sync.Mutex
// 用於快速查找 TagFilters -> TSIDs 的緩存
tagFiltersCache *workingsetcache.Cache
// 父級存儲引用
s *Storage
// (date, tagFilter) -> loopsCount 的緩存
// 用於減少匹配一組過濾器時的工作量。
loopsPerDateTagFilterCache *workingsetcache.Cache
// 索引搜索的對象池
indexSearchPool sync.Pool
}
openIndexDB 函數實現代碼如下所示,整體比較簡單,就是去構造一個 indexDB 對象,索引路徑的最後一段(也就是文件夾的名稱)轉換成 10 進制的數據就會用來表示 indexDB 的 generation:
// lib/storage/index_db.go
// openIndexDB 從指定路徑打開索引 db 文件
//
// path 路徑的最後一段應該是一個唯一的16進制數據,會被用作 indexDB.generation
//
// 當在 indexdb 輪換期間創建新的 indexdb 時,ipenIndexDB 被調用時
// rotationTimestamp 必須設置爲當前的 unix 時間戳。
func openIndexDB(path string, s *Storage, rotationTimestamp uint64) (*indexDB, error) {
if s == nil {
logger.Panicf("BUG: Storage must be nin-nil")
}
// 獲取路徑的最後一段,也就是索引表(文件夾)的名稱
name := filepath.Base(path)
// 將16進制數據轉換成10進制的數據,用來表示 indexDB.generation
gen, err := strconv.ParseUint(name, 16, 64)
logger.Infof("Open Index DB path %s, and gen %d", name, gen)
if err != nil {
return nil, fmt.Errorf("failed to parse indexdb path %q: %w", path, err)
}
tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows)
if err != nil {
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
}
// 不要將 tagFiltersCache 保存在文件中,因爲它非常不穩定。
mem := memory.Allowed()
db := &indexDB{
refCount: 1,
generation: gen,
rotationTimestamp: rotationTimestamp,
tb: tb,
name: name,
tagFiltersCache: workingsetcache.New(mem / 32),
s: s,
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
}
return db, nil
}
構造 indexDB 對象中最核心部分就是獲取 Table 表對象了,通過 mergeset.OpenTable 函數來實現。要搞清楚這個 Table 表是什麼,首先我們需要去看下其結構定義:
// lib/mergeset/table.go
// Table 代表 mergeset 表.
type Table struct {
// 原子更新的計數器必須在結構體最前面,這樣在32位架構上可以正確地對齊到8字節。
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
activeMerges uint64
mergesCount uint64
itemsMerged uint64
assistedMerges uint64
mergeIdx uint64
path string
// 將數據刷新到存儲的回調
flushCallback func()
flushCallbackWorkerWG sync.WaitGroup
needFlushCallbackCall uint32
// block 準備好的回調
prepareBlock PrepareBlockCallback
partsLock sync.Mutex
// 包含的 part 列表
parts []*partWrapper
// rawItems 包含最近添加的尚未轉換爲 parts 的數據。
// 出於性能原因,未在搜索中使用 rawItems
rawItems rawItemsShards
snapshotLock sync.RWMutex
flockF *os.File
stopCh chan struct{}
// 使用 syncwg 而不是sync,因爲可以從併發 goroutine 調用 Add/Wait。
partMergersWG syncwg.WaitGroup
rawItemsFlusherWG sync.WaitGroup
convertersWG sync.WaitGroup
// 使用 syncwg 而不是sync,因爲可以從併發 goroutine 調用 Add/Wait。
rawItemsPendingFlushesWG syncwg.WaitGroup
}
OpenTable 函數實現如下所示,首先會判斷表目錄是否存在,不存在就創建這個目錄,然後創建 flock.lock 文件防止併發打開,然後就是核心的 openParts 函數打開表的 part 列表:
// lib/mergeset/table.go
// OpenTable 在指定路徑上打開一個 table
//
// 每次將新數據批次刷新到底層存儲並對搜索可見時,都會調用可選的 flushCallback 回調。
//
// 在將準備好的 block 塊刷新到持久存儲之前,在合併期間調用可選的 prepareBlock 回調。
//
// 如果該表還不存在,則創建該表。
func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback) (*Table, error) {
path = filepath.Clean(path)
logger.Infof("opening table %q...", path)
startTime := time.Now()
// 如果表還不存在,那麼爲它創建一個目錄
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, fmt.Errorf("cannot create directory %q: %w", path, err)
}
// 創建 flock.lock 文件,防止併發打開
flockF, err := fs.CreateFlockFile(path)
if err != nil {
return nil, err
}
// 打開表 parts
pws, err := openParts(path)
if err != nil {
return nil, fmt.Errorf("cannot open table parts at %q: %w", path, err)
}
tb := &Table{
path: path,
flushCallback: flushCallback,
prepareBlock: prepareBlock,
parts: pws,
mergeIdx: uint64(time.Now().UnixNano()),
flockF: flockF,
stopCh: make(chan struct{}),
}
// 初始化 rawItems
tb.rawItems.init()
// 開始執行 partMerges 的工作
tb.startPartMergers()
// 開始執行 rawItems 刷新的工作
tb.startRawItemsFlusher()
// 更新表相關的指標數據
var m TableMetrics
tb.UpdateMetrics(&m)
logger.Infof("table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d",
path, time.Since(startTime).Seconds(), m.PartsCount, m.BlocksCount, m.ItemsCount, m.SizeBytes)
tb.convertersWG.Add(1)
go func() {
tb.convertToV1280()
tb.convertersWG.Done()
}()
// 如果有刷新回調則執行回調
if flushCallback != nil {
tb.flushCallbackWorkerWG.Add(1)
go func() {
// 每10秒調用一次 flushCallback,以提高緩存的效率
// 緩存由 flushCallback 重置
tc := time.NewTicker(10 * time.Second)
for {
select {
case <-tb.stopCh: // 停止
tb.flushCallback()
tb.flushCallbackWorkerWG.Done()
return
case <-tc.C:
// 如果需要刷新,則調用刷新回調
if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) {
tb.flushCallback()
}
}
}
}()
}
return tb, nil
}
openParts 返回的就是一個包裝的 part 列表 partWrapper,裏面除了 part 的引用之外,還包括在內存中的 inmemoryPart 的引用。
// lib/mergeset/table.go
type partWrapper struct {
p *part
mp *inmemoryPart
refCount uint64
isInMerge bool
}
func openParts(path string) ([]*partWrapper, error) {
// 從備份還原後,可能會丟失路徑,所以需要的時候就創建它
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, err
}
d, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("cannot open difrectory: %w", err)
}
defer fs.MustClose(d)
// 執行剩餘的事務和清理 /txn 和 /tmp 目錄。
// 尚未創建快照,使用 fakeSnapshotLock
var fakeSnapshotLock sync.RWMutex
if err := runTransactions(&fakeSnapshotLock, path); err != nil {
return nil, fmt.Errorf("cannot run transactions: %w", err)
}
// 清理事務目錄 txn,然後重新創建
txnDir := path + "/txn"
fs.MustRemoveAll(txnDir)
if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
return nil, fmt.Errorf("cannot create %q: %w", txnDir, err)
}
// 清理臨時數據目錄 tmp,然後重新創建
tmpDir := path + "/tmp"
fs.MustRemoveAll(tmpDir)
if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
return nil, fmt.Errorf("cannot create %q: %w", tmpDir, err)
}
fs.MustSyncPath(path)
// 獲取所有的 parts
fis, err := d.Readdir(-1)
if err != nil {
return nil, fmt.Errorf("cannot read directory: %w", err)
}
var pws []*partWrapper
for _, fi := range fis {
if !fs.IsDirOrSymlink(fi) {
// 跳過非目錄的
continue
}
fn := fi.Name()
if isSpecialDir(fn) {
// 跳過一些特殊的目錄
continue
}
partPath := path + "/" + fn
if fs.IsEmptyDir(partPath) { // 如果爲空目錄
// 刪除空目錄,該目錄可以在NFS上不乾淨關閉後保留下來。
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
fs.MustRemoveAll(partPath)
continue
}
// 打開 Part
p, err := openFilePart(partPath)
if err != nil {
mustCloseParts(pws)
return nil, fmt.Errorf("cannot open part %q: %w", partPath, err)
}
// 將 Part 放進包裝的 partWrapper 中去
pw := &partWrapper{
p: p,
refCount: 1,
}
pws = append(pws, pw)
}
return pws, nil
}
openParts 的過程其實就是去構造表的過程,比如重置事務目錄 txn、臨時數據目錄 tmp,當第一次啓動的時候可以看出 parts 是爲空的,索引 openParts 會返回一個空的切片。那麼什麼時候纔會有 part 數據產生呢?自然要等到有數據寫入的時候,所以接下來我們要去啓動 vminsert 這個組件。
首先同樣需要在 IDE 中來啓動 vminsert,但是在啓動之前需要配置下啓動參數,因爲 vminsert 需要將數據傳輸到 vmstorage 中去的,在 app/vminsert/main.go 文件上右鍵選擇 Modify Run Configuration...:
在配置對話框中的 Program arguments 行添加需要配置的參數,比如我們這裏添加 -storageNode=127.0.0.1:8401,意思就是 vminert 接收到數據後會發送到後面的 storageNode 節點去:
配置好後和前面一樣再次去啓動 app/vminsert/main.go 即可,如下所示。可以看到 vminsert 成功和 127.0.0.1:8400 建立了連接,也就是上面的 vmstorage 節點:
同樣當連接建立後在 vmstorage 節點這邊也有相應的日誌體現,如下所示:
vmstorage 在 8400 端口上接收 vminsert 的請求,8401 端口上接收 vmselect 的請求,通過 transport.NewServer 去初始化 Server,然後分別在一個 goroutine 中去啓動監聽 vminsert、vmselect 的請求:
// app/vmstorage/main.go
srv, err := transport.NewServer(*vminsertAddr, *vmselectAddr, strg)
if err != nil {
logger.Fatalf("cannot create a server with vminsertAddr=%s, vmselectAddr=%s: %s", *vminsertAddr, *vmselectAddr, err)
}
go srv.RunVMInsert()
go srv.RunVMSelect()
我們可以先看看這裏的 Server 是如何定義的:
// app/vmstorage/transport/server.go
// Server 用於處理來自 vminsert 和 vmselect 的連接
type Server struct {
// 將 stopFlag 移動到結構體頂部,以便在32位架構上修復對它的原子訪問(內存對齊)。
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
stopFlag uint64
// 存儲引用
storage *storage.Storage
// vminsert和vmselect的網絡監聽器
vminsertLN net.Listener
vmselectLN net.Listener
vminsertWG sync.WaitGroup
vmselectWG sync.WaitGroup
// 用於跟蹤vminsert與vmselect的活躍連接
vminsertConnsMap ingestserver.ConnsMap
vmselectConnsMap ingestserver.ConnsMap
}
// NewServer 實例化 Server.
func NewServer(vminsertAddr, vmselectAddr string, storage *storage.Storage) (*Server, error) {
// 初始化網絡監聽器
vminsertLN, err := netutil.NewTCPListener("vminsert", vminsertAddr, nil)
if err != nil {
return nil, fmt.Errorf("unable to listen vminsertAddr %s: %w", vminsertAddr, err)
}
vmselectLN, err := netutil.NewTCPListener("vmselect", vmselectAddr, nil)
if err != nil {
return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", vmselectAddr, err)
}
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
return nil, fmt.Errorf("invalid -precisionBits: %w", err)
}
s := &Server{
storage: storage,
vminsertLN: vminsertLN,
vmselectLN: vmselectLN,
}
// 初始化活躍連接Map
s.vminsertConnsMap.Init()
s.vmselectConnsMap.Init()
return s, nil
}
// lib/ingestserver/conns_map.go
// ConnsMap 用於跟蹤活躍的連接
type ConnsMap struct {
mu sync.Mutex
m map[net.Conn]struct{}
isClosed bool
}
Server 裏面主要了包含 vminsert 和 vmselect 的監聽器,還有專門用來跟蹤活躍連接的 ConnsMap,其實就是一個 Map,Server 初始化後會通過一個 goroutine 執行 RunVMInsert:
// app/vmstorage/transport/server.go
// RunVMInsert 運行接受 vminsert 連接的服務器
func (s *Server) RunVMInsert() {
logger.Infof("accepting vminsert conns at %s", s.vminsertLN.Addr())
for {
// 等待並返回到監聽器的下一個連接
c, err := s.vminsertLN.Accept()
if err != nil {
if pe, ok := err.(net.Error); ok && pe.Temporary() {
continue
}
if s.isStopping() {
return
}
logger.Panicf("FATAL: cannot process vminsert conns at %s: %s", s.vminsertLN.Addr(), err)
}
logger.Infof("accepted vminsert conn from %s", c.RemoteAddr())
// 將該連接c添加到ConnsMap中
if !s.vminsertConnsMap.Add(c) {
// 關閉連接
_ = c.Close()
return
}
// vminsert連接數+1
vminsertConns.Inc()
s.vminsertWG.Add(1)
go func() {
defer func() {
// 處理完過後清理連接
s.vminsertConnsMap.Delete(c)
vminsertConns.Dec()
s.vminsertWG.Done()
}()
// 不需要響應壓縮
// vmstorage 只會發送小的 packets 給 vminsert
compressionLevel := 0
// VMInsertServer 爲 vminsert 執行服務器端握手的協議
// 得到的是一個帶 buffer 的 net.Conn(BufferedConn)
bc, err := handshake.VMInsertServer(c, compressionLevel)
if err != nil {
if s.isStopping() {
// c 在服務器內停止,必須關閉
return
}
logger.Errorf("cannot perform vminsert handshake with client %q: %s", c.RemoteAddr(), err)
_ = c.Close()
return
}
defer func() {
if !s.isStopping() {
logger.Infof("closing vminsert conn from %s", c.RemoteAddr())
}
_ = bc.Close()
}()
// 真正處理 vminsert 連接的邏輯
logger.Infof("processing vminsert conn from %s", c.RemoteAddr())
if err := s.processVMInsertConn(bc); err != nil {
if s.isStopping() {
return
}
vminsertConnErrors.Inc()
logger.Errorf("cannot process vminsert conn from %s: %s", c.RemoteAddr(), err)
}
}()
}
}
RunVMInsert 用來不斷接收監聽器的連接,獲取到連接 c 過後記得添加到 ConnsMap 中去,表示當前連接是活躍連接,然後要開另外一個 goroutine 去處理連接,在連接處理完成後要在 goroutine 退出之前要記得清理連接,從 ConnsMap 移出掉,真正處理連接的過程是先通過 handshake.VMInsertServer 創建一個帶有 buffer 的 net.Conn 連接,真正處理連接的邏輯是通過 processVMInsertConn 來完成的。
// app/vmstorage/transport/server.go
func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
return clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits))
}, s.storage.IsReadOnly)
}
可以看到上面的函數是通過 clusternative.ParseStream 來進行處理的,該函數解析從 vminsert 發送到 bc 的數據,並對解析的行數據執行回調。我們可以先來看下這個函數的具體實現:
// lib/protoparser/clusternative/streamparser.go
// ParseStream 解析從 vminsert 發送到 bc 的數據,並對解析的行數據執行回調。
// 如果存儲無法接受新數據,則可選函數 isReadOnly 必須返回 true。在這種情況下,從 bc 讀取的數據不被接受,只讀狀態被髮回 bc。
//
// 對於來自 req 的流數據,可以多次併發調用回調。
//
// 回調在返回後不應阻塞。
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error, isReadOnly func() bool) error {
var wg sync.WaitGroup
var (
callbackErrLock sync.Mutex
callbackErr error
)
for {
// 不要使用 unmarshalWork pool,因爲每個 unmarshalWork 結構通常佔用大量內存(超過 consts.MaxInsertPacketSize 字節)。該 pool 將導致內存使用量增加。
uw := &unmarshalWork{}
// 設置回調 callback
uw.callback = func(rows []storage.MetricRow) {
// 執行回調
if err := callback(rows); err != nil {
processErrors.Inc()
callbackErrLock.Lock()
if callbackErr == nil {
callbackErr = fmt.Errorf("error when processing native block: %w", err)
}
callbackErrLock.Unlock()
}
}
uw.wg = &wg
var err error
// readBlock 從 vminsert 的 bc 連接中讀取下一個數據塊。
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc, isReadOnly)
if err != nil {
wg.Wait()
if err == io.EOF {
// Remote end gracefully closed the connection.
return nil
}
return err
}
blocksRead.Inc()
wg.Add(1)
// 獲取數據後將數據傳遞到 unmarshalWorkCh 通道中,unmarshal workers 會在其他 goroutine 中進行處理
common.ScheduleUnmarshalWork(uw)
}
}
在上面的 ParseStream 函數中會通過 readBlock 函數不斷從 bc 連接中讀取數據塊,readBlock 中獲取到數據後會發送 ack 給到客戶端的 vminsert,表示傳遞的網絡數據已經正確獲取到。當獲取到數據後會傳遞到 unmarshalWorkCh 通道中,unmarshal workers 會在其他 goroutine 中去進行處理。
// lib/protoparser/common/unmarshal_work.go
// StartUnmarshalWorkers 啓動 unmarshal workers.
func StartUnmarshalWorkers() {
if unmarshalWorkCh != nil {
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
}
gomaxprocs := cgroup.AvailableCPUs() //獲取 CUP 核數
unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs) // 初始化channel通道,長度與核數相等
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() { // 啓動N個 goroutine,數量與 CPU 核數一樣
defer unmarshalWorkersWG.Done() // waitgroup 完成
for uw := range unmarshalWorkCh {
uw.Unmarshal() // 執行具體的業務邏輯
}
}()
}
}
而上面的 StartUnmarshalWorkers() 函數在 vmstorage 的 main 函數中就調用了,所以我們只需要做的就是往 unmarshalWorkCh 通道傳數據過去即可。
// app/vmstorage/main.go
func main() {
......
common.StartUnmarshalWorkers()
srv, err := transport.NewServer(*vminsertAddr, *vmselectAddr, strg)
......
}
真正執行具體的業務邏輯是 Unmarshal() 函數:
// lib/protoparser/clusternative/streamparser.go
// 真正處理 vminsert 傳過來的數據的業務邏輯
func (uw *unmarshalWork) Unmarshal() {
reqBuf := uw.reqBuf // vminsert 傳過來的數據
for len(reqBuf) > 0 {
// 限制傳遞給回調的行數,以減少處理大行數據包時的內存使用。
// 將 reqBuf 轉換成插入存儲中的指標數據列表 []MetricRow
mrs, tail, err := storage.UnmarshalMetricRows(uw.mrs[:0], reqBuf, maxRowsPerCallback)
uw.mrs = mrs
if err != nil {
parseErrors.Inc()
logger.Errorf("cannot unmarshal MetricRow from clusternative block with size %d (remaining %d bytes): %s", len(reqBuf), len(tail), err)
break
}
rowsRead.Add(len(mrs))
// 調用回調
uw.callback(mrs)
reqBuf = tail
}
wg := uw.wg
wg.Done()
}
const maxRowsPerCallback = 10000
上面的函數中先將從 vminsert 傳過來的數據通過 storage.UnmarshalMetricRows 函數轉換成可以直接存入到 vmstorage 存儲中的 MetricRow 列表,轉換完成後調用 callback 去進行處理,這樣就可以回到前面的 processVMInsertConn 函數中了,clusternative.ParseStream 的第二個參數就是回調函數。
// app/vmstorage/transport/server.go
func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
return clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits))
}, s.storage.IsReadOnly)
}
最後就是通過 s.storage.AddRows 函數去處理添加轉換過後的 MetricRow 列表,這也是真正的將數據存入到本地存儲的入口函數了。
現在我們知道了服務的 vmstorage 如何去接收客戶端 vminsert 傳過來的數據了,那麼 vminsert 中是如何來發送網絡請求的呢?未完待續.....
k8s 技術圈 專注容器、專注 kubernetes 技術......
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Wx_EGiILp6fWFHr5EBLZIg