Prometheus 時序數據庫 - 數據的查詢

前言

在之前的博客裏,筆者詳細闡述了 Prometheus 數據的插入過程。但我們最常見的打交道的是數據的查詢。Prometheus 提供了強大的 Promql 來滿足我們千變萬化的查詢需求。在這篇文章裏面,筆者就以一個簡單的 Promql 爲例,講述下 Prometheus 查詢的過程。

Promql

一個 Promql 表達式可以計算爲下面四種類型:

瞬時向量(Instant Vector) - 一組同樣時間戳的時間序列(取自不同的時間序列,例如不同機器同一時間的CPU idle)
區間向量(Range vector) - 一組在一段時間範圍內的時間序列
標量(Scalar) - 一個浮點型的數據值
字符串(String) - 一個簡單的字符串

我們還可以在 Promql 中使用 svm/avg 等集合表達式,不過只能用在瞬時向量 (Instant Vector) 上面。爲了闡述 Prometheus 的聚合計算以及篇幅原因,筆者在本篇文章只詳細分析瞬時向量 (Instant Vector) 的執行過程。

瞬時向量 (Instant Vector)

前面說到,瞬時向量是一組擁有同樣時間戳的時間序列。但是實際過程中,我們對不同 Endpoint 採樣的時間是不可能精確一致的。所以,Prometheus 採取了距離指定時間戳之前最近的數據 (Sample)。如下圖所示:

當然,如果是距離當前時間戳 1 個小時的數據直觀看來肯定不能納入到我們的返回結果裏面。
所以 Prometheus 通過一個指定的時間窗口來過濾數據 (通過啓動參數—query.lookback-delta 指定,默認 5min)。

對一條簡單的 Promql 進行分析

好了,解釋完 Instant Vector 概念之後,我們可以着手進行分析了。直接上一條帶有聚合函數的 Promql 吧。

SUM BY (group) (http_requests{job="api-server",group="production"})

首先, 對於這種有語法結構的語句肯定是將其 Parse 一把,構造成 AST 樹了。調用

promql.ParseExpr

由於 Promql 較爲簡單,所以 Prometheus 直接採用了 LL 語法分析。在這裏直接給出上述 Promql 的 AST 樹結構。

Prometheus 對於語法樹的遍歷過程都是通過 vistor 模式, 具體到代碼爲:

ast.go vistor設計模式
func Walk(v Visitor, node Node, path []Node) error {
    var err error
    if v, err = v.Visit(node, path); v == nil || err != nil {
        return err
    }
    path = append(path, node)

    for _, e := range Children(node) {
        if err := Walk(v, e, path); err != nil {
            return err
        }
    }

    _, err = v.Visit(nil, nil)
    return err
}
func (f inspector) Visit(node Node, path []Node) (Visitor, error) {
    if err := f(node, path); err != nil {
        return nil, err
    }

    return f, nil
}

通過 golang 裏非常方便的函數式功能,直接傳遞求值函數 inspector 進行不同情況下的求值。

type inspector func(Node, []Node) error

求值過程

具體的求值過程核心函數爲:

func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, storage.Warnings, error) {
    ......
    querier, warnings, err := ng.populateSeries(ctxPrepare, query.queryable, s)     // 這邊拿到對應序列的數據
    ......
    val, err := evaluator.Eval(s.Expr) // here 聚合計算
    ......

}

populateSeries

首先通過 populateSeries 的計算出 VectorSelector Node 所對應的 series(時間序列)。這裏直接給出求值函數

 func(node Node, path []Node) error {
     ......
     querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
     ......
     case *VectorSelector:
         .......
         set, wrn, err = querier.Select(params, n.LabelMatchers...)
         ......
         n.unexpandedSeriesSet = set
     ......
     case *MatrixSelector:
         ......
 }
 return nil

可以看到這個求值函數,只對 VectorSelector/MatrixSelector 進行操作,針對我們的 Promql 也就是隻對葉子節點 VectorSelector 有效。

select

獲取對應數據的核心函數就在 querier.Select。我們先來看下 qurier 是如何得到的.

querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))

根據時間戳範圍去生成 querier, 裏面最重要的就是計算出哪些 block 在這個時間範圍內,並將他們附着到 querier 裏面。具體見函數

func (db *DB) Querier(mint, maxt int64) (Querier, error) {
    for _, b := range db.blocks {
        ......
        // 遍歷blocks挑選block
    }
    // 如果maxt>head.mint(即內存中的block),那麼也加入到裏面querier裏面。
    if maxt >= db.head.MinTime() {
        blocks = append(blocks, &rangeHead{
            head: db.head,
            mint: mint,
            maxt: maxt,
        })
    }
    ......
}


知道數據在哪些 block 裏面,我們就可以着手進行計算 VectorSelector 的數據了。

 // labelMatchers {job:api-server} {__name__:http_requests} {group:production}
 querier.Select(params, n.LabelMatchers...)

有了 matchers 我們很容易的就能夠通過倒排索引取到對應的 series。爲了篇幅起見,我們假設數據都在 headBlock(也就是內存裏面)。那麼我們對於倒排的計算就如下圖所示:

這樣,我們的 VectorSelector 節點就已經有了最終的數據存儲地址信息了,例如圖中的 memSeries refId=3 和 4。

如果想了解在磁盤中的數據尋址,可以詳見筆者之前的博客

<<Prometheus時序數據庫-磁盤中的存儲結構>>

evaluator.Eval

通過 populateSeries 找到對應的數據,那麼我們就可以通過 evaluator.Eval 獲取最終的結果了。計算採用後序遍歷,等下層節點返回數據後纔開始上層節點的計算。那麼很自然的,我們先計算 VectorSelector。

func (ev *evaluator) eval(expr Expr) Value {
    ......
    case *VectorSelector:
    // 通過refId拿到對應的Series
    checkForSeriesSetExpansion(ev.ctx, e)
    // 遍歷所有的series
    for i, s := range e.series {
        // 由於我們這邊考慮的是instant query,所以只循環一次
        for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
            // 獲取距離ts最近且小於ts的最近的sample
            _, v, ok := ev.vectorSelectorSingle(it, e, ts)
            if ok {
                    if ev.currentSamples < ev.maxSamples {
                        // 注意,這邊的v對應的原始t被替換成了ts,也就是instant query timeStamp
                        ss.Points = append(ss.Points, Point{V: v, T: ts})
                        ev.currentSamples++
                    } else {
                        ev.error(ErrTooManySamples(env))
                    }
                }
            ......
        }
    }
}

如代碼註釋中看到,當我們找到一個距離 ts 最近切小於 ts 的 sample 時候,只用這個 sample 的 value, 其時間戳則用 ts(Instant Query 指定的時間戳) 代替。

其中 vectorSelectorSingle 值得我們觀察一下:

func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *VectorSelector, ts int64) (int64, float64, bool){
    ......
    // 這一步是獲取>=refTime的數據,也就是我們instant query傳入的
    ok := it.Seek(refTime)
    ......
        if !ok || t > refTime { 
        // 由於我們需要的是<=refTime的數據,所以這邊回退一格,由於同一memSeries同一時間的數據只有一條,所以回退的數據肯定是<=refTime的
        t, v, ok = it.PeekBack(1)
        if !ok || t < refTime-durationMilliseconds(LookbackDelta) {
            return 0, 0, false
        }
    }
}

就這樣,我們找到了 series 3 和 4 距離 Instant Query 時間最近且小於這個時間的兩條記錄,並保留了記錄的標籤。這樣,我們就可以在上層進行聚合。

SUM by 聚合

葉子節點 VectorSelector 得到了對應的數據後,我們就可以對上層節點 AggregateExpr 進行聚合計算了。代碼棧爲:

evaluator.rangeEval
    |->evaluate.eval.func2
        |->evelator.aggregation grouping key爲group

具體的函數如下圖所示:

func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector {
    ......
    // 對所有的sample
    for _, s := range vec {
        metric := s.Metric
        ......
        group, ok := result[groupingKey] 
        // 如果此group不存在,則新加一個group
        if !ok {
            ......
            result[groupingKey] = &groupedAggregation{
                labels:     m, // 在這裏我們的m=[group:production]
                value:      s.V,
                mean:       s.V,
                groupCount: 1,
            }
            ......
        }
        switch op {
        // 這邊就是對SUM的最終處理
        case SUM:
            group.value += s.V
        .....
        }
    }
    .....
    for _, aggr := range result {
        enh.out = append(enh.out, Sample{
        Metric: aggr.labels,
        Point:  Point{V: aggr.value},
        })
    }
    ......
    return enh.out
}

好了,有了上面的處理,我們聚合的結果就變爲:

這個和我們的預期結果一致, 一次查詢的過程就到此結束了。

總結

Promql 是非常強大的,可以滿足我們的各種需求。其運行原理自然也激起了筆者的好奇心,本篇文章雖然只分析了一條簡單的 Promql, 但萬變不離其宗, 任何 Promql 都是類似的運行邏輯。希望本文對讀者能有所幫助。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/kKo6XpfEytZIVLITH3gk0Q