Prometheus 時序數據庫 - 數據的查詢
前言
在之前的博客裏,筆者詳細闡述了 Prometheus 數據的插入過程。但我們最常見的打交道的是數據的查詢。Prometheus 提供了強大的 Promql 來滿足我們千變萬化的查詢需求。在這篇文章裏面,筆者就以一個簡單的 Promql 爲例,講述下 Prometheus 查詢的過程。
Promql
一個 Promql 表達式可以計算爲下面四種類型:
瞬時向量(Instant Vector) - 一組同樣時間戳的時間序列(取自不同的時間序列,例如不同機器同一時間的CPU idle)
區間向量(Range vector) - 一組在一段時間範圍內的時間序列
標量(Scalar) - 一個浮點型的數據值
字符串(String) - 一個簡單的字符串
我們還可以在 Promql 中使用 svm/avg 等集合表達式,不過只能用在瞬時向量 (Instant Vector) 上面。爲了闡述 Prometheus 的聚合計算以及篇幅原因,筆者在本篇文章只詳細分析瞬時向量 (Instant Vector) 的執行過程。
瞬時向量 (Instant Vector)
前面說到,瞬時向量是一組擁有同樣時間戳的時間序列。但是實際過程中,我們對不同 Endpoint 採樣的時間是不可能精確一致的。所以,Prometheus 採取了距離指定時間戳之前最近的數據 (Sample)。如下圖所示:
當然,如果是距離當前時間戳 1 個小時的數據直觀看來肯定不能納入到我們的返回結果裏面。
所以 Prometheus 通過一個指定的時間窗口來過濾數據 (通過啓動參數—query.lookback-delta 指定,默認 5min)。
對一條簡單的 Promql 進行分析
好了,解釋完 Instant Vector 概念之後,我們可以着手進行分析了。直接上一條帶有聚合函數的 Promql 吧。
SUM BY (group) (http_requests{job="api-server",group="production"})
首先, 對於這種有語法結構的語句肯定是將其 Parse 一把,構造成 AST 樹了。調用
promql.ParseExpr
由於 Promql 較爲簡單,所以 Prometheus 直接採用了 LL 語法分析。在這裏直接給出上述 Promql 的 AST 樹結構。
Prometheus 對於語法樹的遍歷過程都是通過 vistor 模式, 具體到代碼爲:
ast.go vistor設計模式
func Walk(v Visitor, node Node, path []Node) error {
var err error
if v, err = v.Visit(node, path); v == nil || err != nil {
return err
}
path = append(path, node)
for _, e := range Children(node) {
if err := Walk(v, e, path); err != nil {
return err
}
}
_, err = v.Visit(nil, nil)
return err
}
func (f inspector) Visit(node Node, path []Node) (Visitor, error) {
if err := f(node, path); err != nil {
return nil, err
}
return f, nil
}
通過 golang 裏非常方便的函數式功能,直接傳遞求值函數 inspector 進行不同情況下的求值。
type inspector func(Node, []Node) error
求值過程
具體的求值過程核心函數爲:
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, storage.Warnings, error) {
......
querier, warnings, err := ng.populateSeries(ctxPrepare, query.queryable, s) // 這邊拿到對應序列的數據
......
val, err := evaluator.Eval(s.Expr) // here 聚合計算
......
}
populateSeries
首先通過 populateSeries 的計算出 VectorSelector Node 所對應的 series(時間序列)。這裏直接給出求值函數
func(node Node, path []Node) error {
......
querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
......
case *VectorSelector:
.......
set, wrn, err = querier.Select(params, n.LabelMatchers...)
......
n.unexpandedSeriesSet = set
......
case *MatrixSelector:
......
}
return nil
可以看到這個求值函數,只對 VectorSelector/MatrixSelector 進行操作,針對我們的 Promql 也就是隻對葉子節點 VectorSelector 有效。
select
獲取對應數據的核心函數就在 querier.Select。我們先來看下 qurier 是如何得到的.
querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
根據時間戳範圍去生成 querier, 裏面最重要的就是計算出哪些 block 在這個時間範圍內,並將他們附着到 querier 裏面。具體見函數
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
for _, b := range db.blocks {
......
// 遍歷blocks挑選block
}
// 如果maxt>head.mint(即內存中的block),那麼也加入到裏面querier裏面。
if maxt >= db.head.MinTime() {
blocks = append(blocks, &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
})
}
......
}
知道數據在哪些 block 裏面,我們就可以着手進行計算 VectorSelector 的數據了。
// labelMatchers {job:api-server} {__name__:http_requests} {group:production}
querier.Select(params, n.LabelMatchers...)
有了 matchers 我們很容易的就能夠通過倒排索引取到對應的 series。爲了篇幅起見,我們假設數據都在 headBlock(也就是內存裏面)。那麼我們對於倒排的計算就如下圖所示:
這樣,我們的 VectorSelector 節點就已經有了最終的數據存儲地址信息了,例如圖中的 memSeries refId=3 和 4。
如果想了解在磁盤中的數據尋址,可以詳見筆者之前的博客
<<Prometheus時序數據庫-磁盤中的存儲結構>>
evaluator.Eval
通過 populateSeries 找到對應的數據,那麼我們就可以通過 evaluator.Eval 獲取最終的結果了。計算採用後序遍歷,等下層節點返回數據後纔開始上層節點的計算。那麼很自然的,我們先計算 VectorSelector。
func (ev *evaluator) eval(expr Expr) Value {
......
case *VectorSelector:
// 通過refId拿到對應的Series
checkForSeriesSetExpansion(ev.ctx, e)
// 遍歷所有的series
for i, s := range e.series {
// 由於我們這邊考慮的是instant query,所以只循環一次
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
// 獲取距離ts最近且小於ts的最近的sample
_, v, ok := ev.vectorSelectorSingle(it, e, ts)
if ok {
if ev.currentSamples < ev.maxSamples {
// 注意,這邊的v對應的原始t被替換成了ts,也就是instant query timeStamp
ss.Points = append(ss.Points, Point{V: v, T: ts})
ev.currentSamples++
} else {
ev.error(ErrTooManySamples(env))
}
}
......
}
}
}
如代碼註釋中看到,當我們找到一個距離 ts 最近切小於 ts 的 sample 時候,只用這個 sample 的 value, 其時間戳則用 ts(Instant Query 指定的時間戳) 代替。
其中 vectorSelectorSingle 值得我們觀察一下:
func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *VectorSelector, ts int64) (int64, float64, bool){
......
// 這一步是獲取>=refTime的數據,也就是我們instant query傳入的
ok := it.Seek(refTime)
......
if !ok || t > refTime {
// 由於我們需要的是<=refTime的數據,所以這邊回退一格,由於同一memSeries同一時間的數據只有一條,所以回退的數據肯定是<=refTime的
t, v, ok = it.PeekBack(1)
if !ok || t < refTime-durationMilliseconds(LookbackDelta) {
return 0, 0, false
}
}
}
就這樣,我們找到了 series 3 和 4 距離 Instant Query 時間最近且小於這個時間的兩條記錄,並保留了記錄的標籤。這樣,我們就可以在上層進行聚合。
SUM by 聚合
葉子節點 VectorSelector 得到了對應的數據後,我們就可以對上層節點 AggregateExpr 進行聚合計算了。代碼棧爲:
evaluator.rangeEval
|->evaluate.eval.func2
|->evelator.aggregation grouping key爲group
具體的函數如下圖所示:
func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector {
......
// 對所有的sample
for _, s := range vec {
metric := s.Metric
......
group, ok := result[groupingKey]
// 如果此group不存在,則新加一個group
if !ok {
......
result[groupingKey] = &groupedAggregation{
labels: m, // 在這裏我們的m=[group:production]
value: s.V,
mean: s.V,
groupCount: 1,
}
......
}
switch op {
// 這邊就是對SUM的最終處理
case SUM:
group.value += s.V
.....
}
}
.....
for _, aggr := range result {
enh.out = append(enh.out, Sample{
Metric: aggr.labels,
Point: Point{V: aggr.value},
})
}
......
return enh.out
}
好了,有了上面的處理,我們聚合的結果就變爲:
這個和我們的預期結果一致, 一次查詢的過程就到此結束了。
總結
Promql 是非常強大的,可以滿足我們的各種需求。其運行原理自然也激起了筆者的好奇心,本篇文章雖然只分析了一條簡單的 Promql, 但萬變不離其宗, 任何 Promql 都是類似的運行邏輯。希望本文對讀者能有所幫助。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/kKo6XpfEytZIVLITH3gk0Q