Postgres 的並行掃描源碼分析

大家好,我是 「微擾理論」,目前在 Hashdata 擔任數據庫內核研發工程師。

前言

在計算機和半導體產業發展了許多年之後,摩爾定律已經越來越接近於理論上的物理極限;免費的午餐結束了。而爲了繼續提高計算機的性能,處理器廠商紛紛引入了多核架構。

爲了更好地利用現代多核架構下的計算資源;PG 9.6 也終於正式引入並行查詢的能力;其中並行順序掃描 (parallel sequential scan) 則是 PG 引入的第一個並行化的算子。在 PG 9.6 之前,PG 開發者們也花了幾年的時間去討論和構建支持並行化的一些基礎設施;包括動態共享內存、共享內存隊列、後臺工作進程等;我們在之前的文章中已經進行了相關內容的介紹。在最近發佈的 PG 15 中,大部分只讀的算子都已經部分或全部支持了並行化。微擾醬最近的主要工作都集中在 PG 並行化算子實現的調研上,今天就來分享一下 PG 中最簡單也是最早被支持的並行順序掃描算子的實現細節。

一個簡單的例子

首先我們來看一個簡單的例子;這個例子也是微擾醬這幾天調試代碼反覆使用的查詢語句。

-- encourage use of parallel plans
postgres=# set parallel_setup_cost=0;
postgres=# set parallel_tuple_cost=0;
postgres=# set min_parallel_table_scan_size=0;
postgres=# set max_parallel_workers_per_gather=4;

-- prepare test table
postgres=# create table test(t int);
postgres=# insert into test select * from generate_series(1, 1000);

-- explain the query
postgres=# explain select * from test;
                             QUERY PLAN
---------------------------------------------------------------------
 Gather  (cost=0.00..9.17 rows=1000 width=4)
   Workers Planned: 2
   ->  Parallel Seq Scan on test  (cost=0.00..9.17 rows=417 width=4)
(3 rows)

正常來說,在比較小的表中,PG 是不會生成並行的查詢計劃的;所以爲了使得 PG 生成並行查詢計劃,我們設置了幾個相關的 GUC 值;包括:

這樣,在生成的查詢計劃中,我們就可以看到 Workers Planned: 2 了;這意味着整個查詢計劃有兩個 worker 參與掃描。由於查詢計劃最上層是 Gather 算子,根據火山模型,整個查詢便由 Gather 驅動。Gather 算子於 leader 進程上執行,從共享內存中的隊列獲取由另外兩個工作進程放入的消息並從中解析出元組信息,通過 shm_mq_receive 實現;兩個工作進程則依次啓動並對數據表競爭讀,將元組打包成消息放入隊列中,通過 shm_mq_send 實現 。

下面,我們讀具體過程進行詳述。先貼上在 leader 進程、 worker 進程和 master 進程中執行的一些關鍵函數和調用關係並提供了詳細的註釋;供大家對照後文閱讀

執行器調用過程

ExecGather()
 // 這裏生成工作進程的計劃;在本例中 worker 進程最頂層的算子爲 SeqScan
 ExecInitParallelPlan()
  ExecParallelInitializeDSM()
   ExecSeqScanInitializeDSM()
 LaunchParallelWorkers() // 這一步將創建工作進程
  foreach work loop:
   RegisterDynamicBackgroundWorker()
    SendPostmasterSignal // 通知 postmaster;由 postmaster 創建工作進程
 // setup tuple queue readers to read results
 ExecParallelCreateReaders()
 // 獲取下一條數據
 gather_getnext()
  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
   gather_readnext() // 從某個 reader 讀取一個元組
    for(;;)
     // 輪詢 reader,讀完一個再讀下一個; 如果某個 reader 未能成功初始化,則不返回任何元組
     reader = gatherstate->reader[gatherstate->nextreader] 
     TupleQueueReaderNext() // 從 queue reader 裏獲取元組
      shm_mq_receive() // 從 queue 中獲取一條消息
       // try read
       while (!mqh->mqh_length_word_complete)
        res = shm_mq_receive_bytes()
     // 如果當前 reader 被 detach 了,移除當前reader;如果所有全部完成,需要關閉 workers
     if (readerdone) 
      if (gatherstate->nreaders == 0)
       ExecShutdownGatherWorkers()
        ExecParallelFinish()
         // 需要等待工作進程結束;似乎是通過工作進程調用 pq_putmessage('X', NULL, 0) 完成的
         WaitForParallelWorkersToFinish()
     if (tup) return tup
     // tuple 爲空不阻塞;嘗試讀下一個 reader
     gatherstate->nextreader++ 
 // 返回投影
 return ExecProject()
ParallelWorkerMain()
 dsm_attach() // 綁定到 dsm segment
  dsm_create_descriptor()
 shm_toc_attach()
 // set parallel state
 shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false)
 before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg)) // 註冊退出回調函數
 // attach error queue
 shm_mq_set_sender()
 shm_mq_attach(mq, seg, NULL)
 // send BackendKeyData message
 pq_beginmessage(&msgbuf, 'K');

 // restore database connection
 BackgroundWorkerInitializeConnectionByOid()
 // restore state; protected by transaction
 RestoreLibraryState()
 RestoreGUCState()
 RestoreComboCIDState()

 // attach per-session segment
 AttachSession()
 RestoreTransactionSnapshot()
 
 ...

 EnterParallelMode()

 entrypt(seg, toc); // actuall is ParallelQueryMain
  // setup DestReceiver, SharedExecutorInstrumentation and QueryDesc
  ExecParallelGetReceiver() // 這一步很重要,後面 tqueueReceiveSlot 要用
  shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true)
  ExecParallelGetQueryDesc()

  // 執行器初始化
  ExecParallelInitializeWorker()
    ExecSeqScanInitializeWorker()
     table_beginscan_parallel()
      RestoreSnapshot() // 從序列化的快照中恢復快照
      relation->rd_tableam->scan_begin()
  // prepare to track buffer/WAL usage
  InstrStartParallelQuery()
  // 正式執行
  ExecutorRun()
   standard_ExecutorRun()
    ExecutePlan()
     for(;;)
      ExecProcNode() // obtain a tuple
       ExecProcNodeFirst()
        ExecSeqScan()
         ExecScan()
          ExecScanFetch()
           SeqNext()
            table_scan_getnextslot()
             heap_getnextslot()
              heapgettup_pagemode()
               page = table_block_parallelscan_nextpage()
                // 當前 page 沒有被讀完
                if (pbscanwork->phsw_chunk_remaining > 0)
                 nallocated = ++pbscanwork->phsw_nallocated;
                // 當前 page 被讀完
                else
                 nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, pbscanwork->phsw_chunk_size)
               heapgetpage()
              pgstat_count_heap_getnext()
      if (!dest->receiveSlot(slot, dest)) // actually tqueueReceiveSlot
       shm_mq_send()
    dest->rShutdown(dest)
  // 關閉執行器
  ExecutorFinish()
  // 獲取 instrumentation 後必須執行
  ExecutorEnd()
  // clean
  dsa_detach()
  
 ExitParallelMode()
 DetachSession()
 // report success
 pq_putmessage()
// 循環等待事件即可;在合適的時機 fork 出 worker 進程。
PostmasterMain()
 ServerLoop()
  sigusr1_handler()
   maybe_start_bgworkers()
    do_start_bgworker()
     fork_process()

Leader 進程和 Worker 進程

對於例子中簡單的查詢語句,整個查詢樹只有兩層結構,在整個查詢的執行過程中我們一共需要關注 3 個進程;分別是 master 進程、 leader 進程和 worker 進程。在執行過程中執行 ps 命令即可查詢到這三種進程,我們分別用紅色、藍色和綠色標出。其中,leader 進程和非並行化的算子一樣,本質上是用戶連接 PG 後 fork 出來用於處理查詢的進程;只不過在並行化算子中,爲了和 worker 進程區分,我們這裏就稱爲 leader 進程。Worker 進程的發起,也是由 leader 進程控制的。

但是 leader 進程本身並不能直接 fork 出 worker 進程,而是需要通過給 master 進程發信號再由 master 進程 fork 的方式進行。

在 master 進程中,由 ServerLoop() 負責等待信號,如果發現是要求 fork worker 進程的信號,就會進行相應的後臺工作進程的啓動工作;再 fork_process 調用之後,worker 進程就產生並立刻進入初始化和後續的掃描工作。

而他們之間的通信主要通過動態共享內存和在此之上建立的消息隊列實現;錯誤上報、掃描元組的傳遞都是通過該機制實現的;底層是一個無鎖環形隊列,實現非常巧妙,我們之後有機會展開討論。

Worker 進程創建過程

那 worker 進程具體是如何被創建的呢?我們先來看看 PG 執行器收到的查詢計劃的結構。

Gather [startup_cost=total_cost=9.1666666666666679 plan_rows=1000 plan_width=parallel_aware=false parallel_safe=false async_capable=false plan_node_id=0]
       [num_workers=rescan_param=single_copy=false invisible=false initParam=0x0]
 [targetlist]
  TargetEntry [resno=1 res resorigtbl=24576 resorigcol=1]
   Var [varno=-2 varattno=vartype=23 varnosyn=varattnosyn=1]
 [lefttree]
  -> SeqScan [startup_cost=total_cost=9.1666666666666679 plan_rows=417 plan_width=parallel_aware=true parallel_safe=true async_capable=false plan_node_id=1
              extParam=0x00000001 allParam=0x00000001]
             [scanrelid=1]
             []
   [targetlist]
    TargetEntry [resno=1]
     Var [varno=varattno=vartype=23 varnosyn=varattnosyn=1]

計劃的最上層是 Gather ;所以在執行器開始執行的時候,leader 進程首先就會從 ExecGather 開始進行執行操作,而這個時候,worker 進程其實還沒有被創建。真正地創建是在執行 ExecGather 的過程中發生的;如果 leader 進程執行過程中發現並行上下文尚未完成初始化,且計劃器計劃的並行工作進程數量大於 0;則需要進行相應的並行進程初始化工作。

ExecInitParallelPlan() 調用的過程中,我們也會初始化 worker 進程所需要的查詢計劃,放入 node->pei 中,並通過共享內存同步給 worker 進程。隨後,通過調用 LaunchParallelWorkers()  準備 worker 進程啓動所需要的一些信息併發送信號給 master 要求 fork worker 進程。

在這個過程裏, leader 也會負責將隊列信息和 worker 進程的信息綁定在一起。

for (i = 0; i < pcxt->nworkers_to_launch; ++i)
 {
  memcpy(worker.bgw_extra, &i, sizeof(int));
  if (!any_registrations_failed &&
   RegisterDynamicBackgroundWorker(&worker,
           &pcxt->worker[i].bgwhandle))
  {
   shm_mq_set_handle(pcxt->worker[i].error_mqh,
         pcxt->worker[i].bgwhandle);
   pcxt->nworkers_launched++;
  }
}

而 worker 進程啓動的時候就可以從共享內存中獲取 bgw_extra。

void
ParallelWorkerMain(Datum main_arg)
{
 memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
}

從而獲取當前的消息隊列應該是哪一個;通過調用 ExecParallelGetReceiver 可關聯 shm_mq。

static DestReceiver *
ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
{
 char    *mqspace;
 shm_mq    *mq;

 mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
 mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
 mq = (shm_mq *) mqspace;
 shm_mq_set_sender(mq, MyProc);
 return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
}

最後,整個喚起進程的過程中可能會失敗,所以實際喚起的工作進程數量可能是少於預期的;不過這並不會構成問題。

多個進程是如何協同掃描的?

現在我們來介紹多個進程是如何一起掃描的。在微擾醬一開始的猜測裏,掃描可能是由 Planner 事先分配好每個進程的掃描的範圍;然後併發協同掃描的。而這樣,顯然會導致多個進程掃描時產生一定的亂序,從而導致讀性能下降,看起來不太可取。

那 PG 具體是如何做的呢?

事實上,PG 的做法是讓多個進程競爭地順序讀數據表。其實現則依賴於在共享內存中存儲的變量 ParallelBlockTableScanDesc 。它是被多個 worker 進程共享並用於控制目前掃描進行到哪一頁的描述符。

多個進程掃描獲取下一個元組的時候首先會去獲取一批頁,通過原子操作更新共享內存中的描述符實現互斥;如果進程獲取的頁沒讀完,則會先將獲取的頁中的元組全部讀出,後再次申請新的頁;核心代碼如下。

BlockNumber
table_block_parallelscan_nextpage(Relation rel,
          ParallelBlockTableScanWorker pbscanwork,
          ParallelBlockTableScanDesc pbscan)
{
   nallocated = pbscanwork->phsw_nallocated =
   pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
         pbscanwork->phsw_chunk_size);
}

因此,併發掃描的粒度是以頁爲單位的。在一部分進程因爲一些原因被阻塞的情況下,其他進程,則可以繼續後續頁中的數據。所有 worker 進程讀出的數據都會通過共享內存隊列發送給 leader 進程。Leader 進程也會循環讀取每個隊列中的數據,如果其中有隊列中沒有新的數據並不會阻塞,而是直接跳過當前進程繼續讀後續隊列的數據。

這樣,在併發讀的過程中,由於有文件系統基於窗口的預讀機制,在大部分情況下和嚴格順序讀的磁盤性能差距並不明顯

併發順序掃描的意義?

寫到這裏,不知道讀者會不會產生一個疑問;既然併發順序讀本身並不能提高 IO 本身的效率,甚至引入了一些額外的初始化和競爭的開銷;那併發順序掃描的算子到底有什麼意義呢?

我們知道,併發主要釋放的還是 CPU 的計算能力,尤其是在多核的場景下;而順序掃描本身其實是一個純 IO 的場景,在例子中非常簡單的查詢語句下其實確實是不能發揮什麼優勢的。但是如果你的查詢語句比較複雜,涉及過濾、聚合等運算,那每個進程在獲取數據之後就要進行許多諸如哈希計算這樣耗費 CPU 計算資源的操作;併發的意義就得以體現了。看網上流傳的一些評測數據中,在啓用併發度爲 4 的連接查詢時,通常也能達到接近於 4 倍的效率提升。而併發順序掃描的意義就是爲後續的其他並行算子提供基礎。正如微擾醬的這篇文章也是後續研究其他算子的基礎;大家的點贊、關注、轉發也能讓微擾醬有更可持續的寫作基礎一樣。

參考鏈接

[1]. PostgreSQL 並行表掃描分析 Gather Parallel Seq Scan https://www.mytecdb.com/blogDetail.php?id=22

[2]. Parallel Hash for PostgreSQL https://www.enterprisedb.com/blog/parallel-hash-postgresql

[3]. Parallelism in PostgreSQL 11 https://2019.pgdu.org/static/presentations/Thomas-parallelism-postgresql-11.pdf

[4]. Using gdb to trace into a parallel worker spawned by postmaster during a large query https://www.highgo.ca/2021/07/09/using-gdb-to-trace-into-a-parallel-worker-spawned-by-postmaster-during-a-large-query/

結語

計算機是一門需要動手實踐才能掌握的學科,想了解數據庫最好的方式莫過於加入一家不錯的公司啦;歡迎大家找我內推 hashdata;入職即享 20 天年假,優秀更可遠程辦公,快私信微擾醬一鍵內推吧~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/bYZPrKuu6q_UE8N20Vc5TQ