深入 TiDB:執行計劃執行過程詳解

本文基於 TiDB release-5.1 進行分析,需要用到 Go 1.16 以後的版本

上一篇講解了 TiDB 的執行優化相關的內容,這篇我們繼續往下看,在獲取到執行優化結果之後如何執行整個計劃。

我們這裏還是使用一個簡單的例子:

CREATE TABLE student
(
    id   VARCHAR(31),
    name VARCHAR(50),
    age  int,
    key id_idx (id)
);
INSERT INTO student VALUES ('pingcap001''pingcap', 13);

select name from student where age>10;

我們直接看到  session/session.go 下的 ExecuteStmt() 方法 :

func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlexec.RecordSet, error) {
 ...
 compiler := executor.Compiler{Ctx: s}
 // 制定查詢計劃以及優化
 stmt, err := compiler.Compile(ctx, stmtNode)
 ...

 // Execute the physical plan.
 logStmt(stmt, s)
 recordSet, err := runStmt(ctx, s, stmt)
 ...
 return recordSet, nil
}

在上一篇講解了 compiler.Compile 制定會調用到 Optimize 制定邏輯計劃和物理計劃相關的代碼,下面主要是講解 runStmt 這部分,它主要作用是根據制定好的執行計劃去 TiKV 中獲取相關的數據。

func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.RecordSet, err error) {
 ...
 // 校驗用戶使用 rollback、commit 這種顯示關閉事務的 SQL 中斷執行
 err = se.checkTxnAborted(s)
 if err != nil {
  return nil, err
 }
 //執行 SQL,並返回 rs  結果集
 rs, err = s.Exec(ctx)
 se.updateTelemetryMetric(s.(*executor.ExecStmt))
 sessVars.TxnCtx.StatementCount++
 if rs != nil {
  return &execStmtResult{
   RecordSet: rs,
   sql:       s,
   se:        se,
  }, err
 }
 //在執行完語句後,檢查是否該提交了
 err = finishStmt(ctx, se, err, s)
 if se.hasQuerySpecial() { 
  se.SetValue(ExecStmtVarKey, s.(*executor.ExecStmt))
 } else { 
  s.(*executor.ExecStmt).FinishExecuteStmt(origTxnCtx.StartTS, err, false)
 }
 return nil, err
}

runStmt 這段代碼中,我們直接進入到 Exec 繼續跟蹤執行相關代碼。

func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
 ...
 // 生成執行器
 e, err := a.buildExecutor()
 if err != nil {
  return nil, err
 } 
 ctx = a.setPlanLabelForTopSQL(ctx)
 // 根據不同執行者進行不同的處理
 if err = e.Open(ctx); err != nil {
  terror.Call(e.Close)
  return nil, err
 } 
 ... 
 return &recordSet{
  executor:   e,
  stmt:       a,
  txnStartTS: txnStartTS,
 }, nil
}

構建 Executor

我們在構建執行計劃的時候,會根據 SQL 語句生成各種各樣的算子,所以這裏會根據算子構建不同的 Executor ,然後再執行 Open 進行數據處理。

我們先看看生成執行器 buildExecutor :

func (a *ExecStmt) buildExecutor() (Executor, error) {
 ctx := a.Ctx 
 ...
 // 新建一個構造者
 b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness, a.TxnScope)
 text := a.Text
 if strings.Contains(text, "student") {
  fmt.Println(text)
 }
 //根據執行計劃構建 Executor
 e := b.build(a.Plan)
 if b.err != nil {
  return nil, errors.Trace(b.err)
 }
 ...
 return e, nil
}

這裏構建好的 ExecutorBuilder 會根據執行計劃構建 Executor。對於我們上面的查詢例子:

select name from student where age>10;

對於這個查詢條件來說生成的物理執行計劃大概是這樣:

TableReader(Table(student)->Sel([gt(test.student.age, 1)])->Limit)->Limit

最外層是一個 PhysicalLimit,內部是 PhysicalTableReader。所以在執行 executorBuilder 的 build 方法的時候會根據類型進行判斷進入到 buildLimit 中:

func (b *executorBuilder) build(p plannercore.Plan) Executor {
 switch v := p.(type) {
 case nil:
  return nil
 // 根據執行計劃類型進入不同的build方法中
 case *plannercore.PhysicalTableReader:
  return b.buildTableReader(v)
 case *plannercore.PhysicalLimit:
  return b.buildLimit(v)
 ...
 default:
  if mp, ok := p.(MockPhysicalPlan); ok {
   return mp.GetExecutor()
  }

  b.err = ErrUnknownPlan.GenWithStack("Unknown Plan %T", p)
  return nil
 }
}

這裏的執行計劃的類型有好幾十種,我這裏先看看 buildLimit,其他方法感興趣的可以自己去看看。

func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
 // 獲取子計劃的Executor
 childExec := b.build(v.Children()[0])
 if b.err != nil {
  return nil
 }
 n := int(mathutil.MinUint64(v.Count, uint64(b.ctx.GetSessionVars().MaxChunkSize)))
 base := newBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec)
 base.initCap = n
 // 構建 limit executor
 e := &LimitExec{
  baseExecutor: base,
  begin:        v.Offset,
  end:          v.Offset + v.Count,
 }
 ...
 return e
}

buildLimit 會獲取子計劃的 Executor,然後構建 limit executor。這裏子計劃就是 PhysicalTableReader,所以再次進入到 build 方法中會調用 buildTableReader 進行構建:

func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) Executor {
 ... 
 // 先構建一個無範圍的 TableReaderExecutor
 ret, err := buildNoRangeTableReader(b, v)
 if err != nil {
  b.err = err
  return nil
 }
 // 通過遞歸執行計劃來更新TableReaderExecutor範圍
 ts := v.GetTableScan()
 ret.ranges = ts.Ranges
 sctx := b.ctx.GetSessionVars().StmtCtx
 sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
 // 如果不使用動態分區進行修建則直接返回
 if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
  return ret
 }
 ... 
 return ret
}

這裏先是調用 buildNoRangeTableReader 函數構建一個無範圍的 TableReaderExecutor,然後調用 GetTableScan 遞歸執行計劃獲取 table plan 的 PhysicalTableScan,然後從中獲取 Ranges 填充 Executor 的範圍。

發送請求給 TiKV

這裏獲取到 Executor 之後繼續回到 ExecStmt 的 Exec 中 執行 Executor 的 Open 方法:

func (e *LimitExec) Open(ctx context.Context) error {
    // 遍歷子 Executor 執行其 Open 方法
 if err := e.baseExecutor.Open(ctx); err != nil {
  return err
 }
 e.childResult = newFirstChunk(e.children[0])
 e.cursor = 0
 e.meetFirstBatch = e.begin == 0
 return nil
}

需要注意的是,我們上面的查詢中,先是構建的 LimitExec ,它裏面封裝的纔是 TableReaderExecutor ,所以它繼續會調用到 TableReaderExecutor 的 Open 方法中。

func (e *TableReaderExecutor) Open(ctx context.Context) error {
 ...  
 firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle)
 ...
 // 將 firstPartRanges 進行執行,請求TiKV並獲取返回的結果
 firstResult, err := e.buildResp(ctx, firstPartRanges)
 if err != nil {
  e.feedback.Invalidate()
  return err
 }
 // 當 secondPartRanges 沒有時,直接將第一部分結果進行整合
 if len(secondPartRanges) == 0 {
  e.resultHandler.open(nil, firstResult)
  return nil
 }
 // 當 secondPartRanges 存在值時,請求TiKV並獲取返回的結果
 var secondResult distsql.SelectResult
 //發送請求
 secondResult, err = e.buildResp(ctx, secondPartRanges)
 if err != nil {
  e.feedback.Invalidate()
  return err
 }
 // 將兩部分的結果進行整合
 e.resultHandler.open(firstResult, secondResult)
 return nil
}

SplitRangesAcrossInt64Boundary 其實就是將 ranges 列表進行拆分,通過看註釋:

// SplitRangesAcrossInt64Boundary split the ranges into two groups:
// 1. signedRanges is less or equal than MaxInt64
// 2. unsignedRanges is greater than MaxInt64
//
// We do this because every key of tikv is encoded as an int64. As a result, MaxUInt64 is small than zero when
// interpreted as an int64 variable.
//
// This function does the following:
// 1. split ranges into two groups as described above.
// 2. if there's a range that straddles the int64 boundary, split it into two ranges, which results in one smaller and
//    one greater than MaxInt64.

我們可以知道,因爲 tikv 的每個 key 都是 int64,所以像 UInt64 這個無符號類型的最大值其實是大於 Int64 的,所以需要進行拆分。拆分的結果分爲兩部分,signedRanges 表示的是小於等於 MaxInt64 的集合,unsignedRanges 表示的是大於 MaxInt64 集合。

接下來會調用 buildResp 構建 kv.Request,然後調用 SelectResult 向 kv client 發送請求返回 SelectResult 結構體:

func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
 ...
 // build Request
 kvReq, err := e.buildKVReq(ctx, ranges)
 if err != nil {
  return nil, err
 }
 e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
 // sends a DAG request, returns SelectResult
 result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
 if err != nil {
  return nil, err
 }
 return result, nil
}

返回的 SelectResult 可以認爲它是一個迭代器,因爲下層是有很多 TiKV ,然後每個結果是一個 PartialResult,所以也可以說它是 PartialResult 的迭代器。

type SelectResult interface {
 // NextRaw gets the next raw result.
 NextRaw(context.Context) ([]byte, error)
 // Next reads the data into chunk.
 Next(context.Context, *chunk.Chunk) error
 // Close closes the iterator.
 Close() error
}

SelectResult 這個接口,代表了一次查詢的所有結果的抽象,計算是以 Region 爲單位進行,所以這裏全部結果會包含所有涉及到的 Region 的結果。通過 SelectResult 的 next 方法可以拿到下一個 PartialResult 。

在 buildResp 方法中調用 SelectResult 方法裏面最後會調用到 DistSQL 包提供的 Select API:

func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (SelectResult, error) {
 ...
 resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction)
 if resp == nil {
  err := errors.New("client returns nil response")
  return nil, err
 } 
 ...
 return &selectResult{
  label:      "dag",
  resp:       resp,
  rowLen:     len(fieldTypes),
  fieldTypes: fieldTypes,
  ctx:        sctx,
  feedback:   fb,
  sqlType:    label,
  memTracker: kvReq.MemTracker,
  encodeType: encodetype,
  storeType:  kvReq.StoreType,
 }, nil
}

它提供了向 TiKV Client 發送請求並構建 selectResult 能力。

用一張官方的圖來說明一下整個查詢過程:

獲取 TiKV 數據

我們繼續順着 Select 方法裏面 Send 方法往下看。

func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response {
 ... 
 ranges := NewKeyRanges(req.KeyRanges)
 // 根據ranges構建task
 tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
 if err != nil {
  return copErrorResponse{err}
 }
 // 構建 copIterator
 it := &copIterator{
  store:           c.store,
  req:             req,
  concurrency:     req.Concurrency,
  finishCh:        make(chan struct{}),
  vars:            vars,
  memTracker:      req.MemTracker,
  replicaReadSeed: c.replicaReadSeed,
  rpcCancel:       tikv.NewRPCanceller(),
  resolvedLocks:   util.NewTSSet(5),
 }
 it.tasks = tasks
 // 設置並行度
 if it.concurrency > len(tasks) {
  it.concurrency = len(tasks)
 }
 if it.concurrency < 1 {
  it.concurrency = 1
 }
 // 設置限流器和傳輸數據的 channel
 if it.req.KeepOrder {
  it.sendRate = util.NewRateLimit(2 * it.concurrency)
        // 如果要求有序,那麼就不用全局的 chanel 
  it.respChan = nil
 } else {
  capacity := it.concurrency
  if enabledRateLimitAction { 
   capacity = it.concurrency * 2
  }
        // 如果無序,那麼會將response數據放入到全局的 channel 中
  it.respChan = make(chan *copResponse, capacity)
  it.sendRate = util.NewRateLimit(it.concurrency)
 }
 it.actionOnExceed = newRateLimitAction(uint(it.sendRate.GetCapacity()))
 if sessionMemTracker != nil {
  sessionMemTracker.FallbackOldAndSetNewAction(it.actionOnExceed)
 }

 if !it.req.Streaming {
  ctx = context.WithValue(ctx, tikv.RPCCancellerCtxKey{}, it.rpcCancel)
 }
 // 啓動多個 goroutine 獲取 response
 it.open(ctx, enabledRateLimitAction)
 return it
}

首先是調用 buildCopTasks 構建 coprocessor task。在調用 buildCopTasks 的時候會傳入 RegionCache,因爲我們的數據可能會分佈在多個 region 中,所以我們可以根據它找到有哪些 region 包含了一個 key range 範圍內的數據。然後按照 region 的 range 把 key range list 進行切分構建好 coprocessor task 返回。

獲取到 task 列表之後會創建 copIterator, 是 kv.Response接口的實現,需要實現對應 Next 方法,在上層調用 Next  的時候,返回一個 coprocessor response ,上層通過多次調用 Next 方法,獲取多個 coprocessor response,直到所有結果獲取完。

type Response interface {
 // Next returns a resultSubset from a single storage unit.
 // When full result set is returned, nil is returned.
 Next(ctx context.Context) (resultSubset ResultSubset, err error)
 // Close response.
 Close() error
}

爲了增大並行度,在調用 open 的時候構造多個 goroutine 充當 worker 來執行 task,多個 worker 從這一個 channel 讀取 task,執行完成後,把結果發到 response channel,通過設置 worker 的數量控制併發度 。

需要注意的是在調用 open 執行 task 之前會校驗 task 是不是有序的,如果是有序的,那麼 worker 執行完 task 之後就不能直接放入到  response channel 中了,因爲併發結果是無序的。所以通過給每一個 task 創建一個 channel,把 response 發送到這個 task 自己的 response channel 裏,Next 的時候,就可以按照 task 的順序獲取 response,保證結果的有序。

下面我們來看看實現細節。先來看看 buildCopTasks:

func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
 ...
 rangesLen := ranges.Len()
 //找到有哪些 region 包含了一個 key range 範圍內的數據
 locs, err := cache.SplitKeyRangesByLocations(bo, ranges)
 if err != nil {
  return nil, errors.Trace(err)
 }

 var tasks []*copTask
 //根據返回的 LocationKeyRanges 來構建 task 
 for _, loc := range locs { 
  // 這裏是因爲一個 region 裏面可能也包含多個 Range
  rLen := loc.Ranges.Len()
  for i := 0; i < rLen; {
   nextI := mathutil.Min(i+rangesPerTask, rLen)
   tasks = append(tasks, &copTask{
    region: loc.Location.Region,
    ranges: loc.Ranges.Slice(i, nextI), 
    respChan:  make(chan *copResponse, 2),
    cmdType:   cmdType,
    storeType: req.StoreType,
   })
   i = nextI
  }
 }
 ...
 return tasks, nil
}

這裏我們看到 buildCopTasks 裏面會根據傳入的 RegionCache 來對 ranges 進行拆分,返回的  LocationKeyRanges 對象裏面包含了 KeyRanges ,因爲一個 region 裏面可能也包含多個 Range,所以這裏用了兩層 for 循環進行遍歷,創建好 task 之後返回。

我們再回到 Send 方法中,繼續往下看 open 方法:

func (it *copIterator) open(ctx context.Context, enabledRateLimitAction bool) {
 taskCh := make(chan *copTask, 1)
 it.wg.Add(it.concurrency) 
 // 根據併發數創建 worker
 for i := 0; i < it.concurrency; i++ {
  worker := &copIteratorWorker{
   taskCh:          taskCh,
   wg:              &it.wg,
   store:           it.store,
   req:             it.req,
   respChan:        it.respChan,
   finishCh:        it.finishCh,
   vars:            it.vars,
   kvclient:        tikv.NewClientHelper(it.store.store, it.resolvedLocks),
   memTracker:      it.memTracker,
   replicaReadSeed: it.replicaReadSeed,
   actionOnExceed:  it.actionOnExceed,
  }
  go worker.run(ctx)
 }
 taskSender := &copIteratorTaskSender{
  taskCh:   taskCh,
  wg:       &it.wg,
  tasks:    it.tasks,
  finishCh: it.finishCh,
  sendRate: it.sendRate,
 }
 taskSender.respChan = it.respChan
 it.actionOnExceed.setEnabled(enabledRateLimitAction)
 failpoint.Inject("ticase-4171", func(val failpoint.Value) {
  if val.(bool) {
   it.memTracker.Consume(10 * MockResponseSizeForTest)
   it.memTracker.Consume(10 * MockResponseSizeForTest)
  }
 })
 // 創建 sender
 go taskSender.run()
}

這裏我們看到了分別會創建兩類 goroutine,一種是 worker 一種是 sender。

我們先來看看 sender:

func (sender *copIteratorTaskSender) run() { 
 for _, t := range sender.tasks { 
  // 使用限流器控制頻率
  exit := sender.sendRate.GetToken(sender.finishCh)
  if exit {
   break
  }
  // 發送task到taskCh中
  exit = sender.sendToTaskCh(t)
  if exit {
   break
  }
 }
 //發送完畢之後關閉 channel
 close(sender.taskCh)

 // Wait for worker goroutines to exit.
 sender.wg.Wait()
 if sender.respChan != nil {
  close(sender.respChan)
 }
}

sender 會將所有的 task 放入到 taskCh 中,發送完畢之後關閉 channel。下面再來看看 worker:

func (worker *copIteratorWorker) run(ctx context.Context) {
 defer func() {
  failpoint.Inject("ticase-4169", func(val failpoint.Value) {
   if val.(bool) {
    worker.memTracker.Consume(10 * MockResponseSizeForTest)
    worker.memTracker.Consume(10 * MockResponseSizeForTest)
   }
  })
  worker.wg.Done()
 }()
 for task := range worker.taskCh {
  respCh := worker.respChan
  // 這裏是需要排序的時候爲空,那麼爲每個 task 都創建一個 respChan
  if respCh == nil {
   respCh = task.respChan
  }
  // 發送rpc請求
  worker.handleTask(ctx, task, respCh)
  if worker.respChan != nil { 
   // 發送 finCopResp 到 respCh 中,告訴copIterator有一個task已經運行完畢了
   worker.sendToRespCh(finCopResp, worker.respChan, false)
  }
  close(task.respChan)
  if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
   return
  }
  select {
  case <-worker.finishCh:
   return
  default:
  }
 }
}

worker 主要是處理 sender 發送過來的 taskCh 數據,通過遍歷 taskCh 獲取 task 之後調用 handleTask 發送 rpc 請求,返回的數據會放入到 respCh 中。需要注意這裏如果是有序的 task ,那麼 worker.respChan 爲空,然後會爲每個 task 創建一個 respChan,在獲取數據的時候會根據每個 task 的 respChan 數據來做排序。

下面我們再來看看怎麼獲取數據:

上面我們也提到了,copIterator 其實就是一個迭代器,獲取數據是通過調用 copIterator 的 Next 方法獲取:

func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
 var (
  resp   *copResponse
  ok     bool
  closed bool
 )
 ...
 // 如果數據不需要排序,那麼直接從 respChan 中獲取數據
 if it.respChan != nil {
  // Get next fetched resp from chan
  resp, ok, closed = it.recvFromRespCh(ctx, it.respChan)
  if !ok || closed {
   it.actionOnExceed.close()
   return nil, nil
  }
  // 表示讀到 respChan 最後一個數據
  if resp == finCopResp {
   it.actionOnExceed.destroyTokenIfNeeded(func() {
    it.sendRate.PutToken()
   })
   return it.Next(ctx)
  }
 } else {
  for {
   if it.curr >= len(it.tasks) {
    // Resp will be nil if iterator is finishCh.
    it.actionOnExceed.close()
    return nil, nil
   }
   // 如果數據是有序的,那麼從 task 的 respChan 獲取數據
   task := it.tasks[it.curr]
   resp, ok, closed = it.recvFromRespCh(ctx, task.respChan)
   if closed { 
    return nil, nil
   }
   if ok {
    break
   }
   it.actionOnExceed.destroyTokenIfNeeded(func() {
    it.sendRate.PutToken()
   }) 
   it.tasks[it.curr] = nil
   it.curr++
  }
 }

 if resp.err != nil {
  return nil, errors.Trace(resp.err)
 }

 err := it.store.CheckVisibility(it.req.StartTs)
 if err != nil {
  return nil, errors.Trace(err)
 }
 return resp, nil
}

獲取數據根據是否有序也是分爲兩種,無序的數據直接從 copIterator 的 respChan 中獲取數據,如果是有序的,那麼需要獲取到 task 裏面的 respChan 來獲取數據。

Reference

https://blog.minifish.org/posts/tidb4/

https://zhuanlan.zhihu.com/p/337939383

https://pingcap.com/zh/blog/mpp-smp-tidb

https://pingcap.com/zh/blog/tikv-source-code-reading-14

https://pingcap.com/zh/blog/tidb-source-code-reading-19

關注 luozhiyun 很酷,和他一起學習👆

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/MeLWHewHgBdyse_xkFRoqw