自動增量計算:構建高性能數據分析系統的任務編排
在起始的那篇《金融 Python 即服務:業務自助的數據服務模式》,我們介紹了:使用 Python 如何使用作爲數據系統的 wrapper 層?在這一篇文章裏,我們將繼續之前的話題,介紹如何使用 Python 作爲計算引擎核心的膠水層,即:如何使用 Python 構建 DAG(有向無環圖,Directed Acyclic Graph) 任務?
除此,還可以瞭解一下,如何設計增量 DAG 計算?先看一下增量計算的概念:
增量計算(Incremental computing),是一種軟件功能,每當一條數據發生更改時,它都會嘗試通過僅重新計算依賴於更改數據的輸出來節省時間。
常見的領域有:
-
GUI 應用, 諸如於 React 的 Dom Diff
-
不斷變化的大型計算,諸如於金融計算、電子表格、大數據系統
-
構建系統,諸如於 Gradle、Bazel、Rustc 等
所以,在開始之前,讓我們先看一個簡單的例子,Excel 如何實現增量計算。
引子 1:Excel 的增量計算
衆所周知,Excel 是使用最廣泛的數據分析工具。當我們使用了 Excel 中的公式之後,當我們修改了 A 單元格的值,對應的結果會自動發生變化。而如果在這時,還有其它依賴於此單元格的值時,對應的結果也會發生變化。如下圖所示:
出自 《How to Recalculate a Spreadsheet》
在 Microsoft 官方的文檔裏(Excel 重新計算),可以看到對應的觸發重新計算場景:輸入新數據、刪除或插入行或列等等。在 Excel 中,工作表的計算可視爲包含三個階段的過程:
-
構造依賴關係樹
-
構造計算鏈
-
重新計算單元格
一旦觸發了重新計算,Excel 會重新構造依賴關係樹和計算鏈,並依賴於此的所有單元格標記爲 ” 髒單元格 “。隨後,根據計算鏈指定的順序重新計算。通常來說,在我們設計依賴分析時,假定的是函數是不可變的。但是呢,還存在一些特殊的函數類型,諸如於文檔中提到的:
-
異步函數 (UDF)。
-
可變函數。即哪怕參數沒有變化時,值也可能修改。諸如於 Now、Today 等。
這意味着,我們在設計增量計算時,需要考慮到這個場景的問題。從原理和實現來說,它一點並不算太複雜,有諸如於
從註解 DAG 到增量 DAG 設計
DAG (有向無環圖,Directed Acyclic Graph)是一種常用數據結構,僅就 DAG 而言,它已經在我們日常的各種工具中存在:
-
依賴系統。諸如如 NPM、Yarn、Gradle、Cargo 等
-
人工智能。如機器學習等
-
數據流系統。如編譯器、Apache Spark、Apache Airflow 等。
-
數據可視化。常用的 Graphviz,又或者是各個語言裏的 Network 相關的庫,諸如於 Python 的 NetworkX。
當我們從任務編排和數據等的角度來看,DAG 的面向普通人術語是叫工作流(Workflow)。
常規 DAG 到函數式 DAG
通常情況下,實現一個 DAG 非常的簡單 —— 只是數據結構。在使用時,也比較簡單,如下是 Cytoscape 的 API 示例:
cy.add([
{ group: 'nodes', data: { id: 'n0' }, position: { x: 100, y: 100 } },
{ group: 'nodes', data: { id: 'n1' }, position: { x: 200, y: 200 } },
{ group: 'edges', data: { id: 'e0', source: 'n0', target: 'n1' } }
]);
而這一類 DAG 是靜態的,當我們需要結合些任務時,就會需要添加函數。由此便會稍微複雜一些,再現看個示例:
comp = Computation()
comp.add_node('a')
comp.add_node('b', lambda a: a+1)
comp.add_node('c', lambda a, b: 2*a)
comp.add_node('d', lambda b, c: b + c)
comp.add_node('e', lambda c: c + 1)
comp.compute('d')
comp.get_value_dict()
上述的代碼中,是 Loman 框架的示例,其中的 lambda a: a+1
是 Python 的 Lambda 表達式。Loman 會在運行時,分析這個 Lambda,獲得 Lambda 中的參數,隨後添加對應的計算依賴。
Loman 示例
而在多數場景之下,往往是採用註解的形式,諸如於 Airflow、Gradle 等。
基於註解與條件的 DAG 函數
回到研究的開始,如美銀證券的 Quartz 的 DSL 擴展(Little languages),便是在 Loman 的形式上進行了一步擴展。使用註解代替了 Lambda:
class C:
@dag
def f1(self, x, y):
return self.f2(x) + y
@dag
def f2(self, x):
return x * x
圍繞於這個註解,Quartz 在這一層的實現上,包含了四個特性:DAG、記憶化(memoization)、持久化、時間旅行調試(time travel)。考慮到 Quartz 並不是一個開源的實現,社區上的材料不一定靠譜,所以我們還是再看看 Apache Ariflow 的實現。引用官網的示例:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
# Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")
@task()
def airflow():
print("airflow")
# Set dependencies between tasks
hello >> airflow()
從實現上來說,Apache Airflow 的 DAG 實現本着 “工作流即代碼” 的思想設計的。上面代碼中,比較有意思的是 >>
語法,其是在任務之間定義了一個依賴關係並控制任務的執行順序。
增量 DAG 註解:Gradle —— 監聽輸入與輸出
在編譯上,Gradle 也是支持增量編譯(也是一種增量計算)的,我們可以先看個簡單的示例:
abstract class IncrementalReverseTask extends DefaultTask {
@Incremental
@InputDirectory
abstract DirectoryProperty getInputDir()
@OutputDirectory
abstract DirectoryProperty getOutputDir()
@TaskAction
void execute(InputChanges inputChanges) {
inputChanges.getFileChanges(inputDir).each { change ->
if (change.fileType == FileType.DIRECTORY) return
def targetFile = outputDir.file(change.normalizedPath).get().asFile
if (change.changeType == ChangeType.REMOVED) {
targetFile.delete()
} else {
targetFile.text = change.file.text.reverse()
}
}
}
}
對於 Gradle 的增量任務來說,通常只需要關注輸入和輸出,只要 InputDirectory
和 OutputDirectory
不變,那麼就認爲 Task 不需要再執行。因爲在實現處理邏輯時,只關注於這兩個值是否發生變化。
Rust 語言:Salsa 框架的增量 DAG 設計
Rust 編譯器的文檔上也包含了相關的介紹:Incremental compilation,而這裏我們是一個相關的實現 —— 增量編譯的設計者之一(Niko Matsakis)編寫的庫 Salsa。Salsa 是一個用於編寫增量 (incremental) 、按需 (on-demand) 程序的 Rust 框架,其採用的是 “紅 - 綠” 算法。與 Gradle 相似的,Salsa 結構體(Structs)是使用一種 Salsa 屬性宏進行了標註的結構體:
-
#[salsa::input]
:用於指定計算的 “基本輸入” -
#[salsa::tracked]
:用於指定在計算過程中創建的中間值 -
#[salsa::interned]
:用於指定易於進行相等比較的小型值
由於 Salsa 相比於 Gradle 是位於更底層的基礎設施,所以需要手動構建存儲層,即 Jar 和數據庫)。數據庫是一個結構體,它最終存儲 Salsa 的所有中間狀態,例如來自跟蹤函數的被記憶的 (memoized) 返回值。數據庫本身是以一些中間結構 (intermediate structure) 的形式定義的,這些中間結構被稱爲 jars,幷包含每個函數的數據。
緩存計算與存儲計算
既然,我們已經通過註解將輸入、輸出、函數等內容標註出來,下一步就是緩存結果。如此一來,我們就可以通過緩存來提升計算性能。對於計算的緩存來說,至少需要包含這三個部分:
-
函數表達式(Fn 類型)。
-
零個或多個參數。
-
一個可選名稱。
由此,我們才能獲得緩存後的結果。在一些框架的設計裏,諸如於 Python 語言
內存:Memoization —— 函數式編程的記憶
Memoization(記憶化)是函數式語言的一種特性,使用一組參數初次調用函數時,緩存參數和計算結果,當再次使用相同的參數調用該函數時,直接返回相應的緩存結果。在一些不支持 memoization 的語言裏,需要手動引入這種設計,如 Java:
Map<Integer, Integer> cache = new ConcurrentHashMap<>();
Integer addOne(Integer x) {
return cache.computeIfAbsent(x -> x + 1);
}
上述只是一個加法的示例,萬能的 StackOverflow 上有更多的示例:Java memoization method。
當然了,緩存是有負作用的 —— 第一次計算時存儲結果會花費一定的時間,不過大部分情況下可以忽略不計。
數據庫存儲
對於耗時更長的 AI 或者是金融計算場景時,需要採用分佈式的任務調度器,才能更快的得到計算結果。於是乎,採用分佈式鍵值存儲來對結果進行緩存就是更好的選擇。在 Salsa 框架裏,由於考慮到不同的類型(input、output、tracked 等),對於數據結構函數等來說,其對應的 Index 由三部分組成:
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct DatabaseKeyIndex {
group_index: u16,
query_index: u16,
key_index: u32,
}
大抵是
增量計算框架與算法
由於時間與精力限制(主要是我看不懂一些用英語寫的公式,還有暫時沒打算學 OCaml),這裏就沒有展開對於各類計算框架論文的討論。諸如於 Incremental 和 Adapton 就是相關的論文與實現,就包含了非常不錯的資料。
除此:https://lord.io/spreadsheets/ 一文也給了非常好的介紹。
這裏,我就不展開了。
有了增量計算,然後呢?
後續的計算部分,可以參考 Apache Airflow 來實現。它是一個支持開源分佈式任務調度框架,其架構
-
調度程序,它處理觸發計劃的工作流,並將任務提交給執行程序以運行。
-
執行器,它處理正在運行的任務。在默認的 Airflow 安裝中,這會在調度程序中運行所有內容,但大多數適合生產的執行程序實際上會將任務執行推送給工作人員。
-
Web 服務器,它提供了一個方便的用戶界面來檢查、觸發和調試 DAG 和任務的行爲。
-
DAG 文件的文件夾,由調度程序和執行程序(以及執行程序擁有的任何工作人員)讀取
-
元數據數據庫,由調度程序、執行程序和網絡服務器用來存儲狀態。
其架構圖如下:
不過、過了、還是不過,考慮到 Airflow 的 DAG 實現是 Python,在分佈式任務調度並不是那麼流行。但是,作爲一個參考還是非常不錯的。
其他
相關參考資料:
-
《How to Recalculate a Spreadsheet》一篇非常不錯的文章,介紹了不同的算法是如何重新計算電子表格的。當然了,也包含作者自己寫的新方案 Anchors。對於寫庫來說,是一個非常不錯的參考。
-
《Excel 重新計算》介紹了 Excel 重新計算的邏輯。
-
Salsa 文檔:https://salsa-rs.netlify.app/ (中文版翻譯:https://rust-chinese-translation.github.io/salsa-book/ )
-
Adapton 提供了一個增量計算的編程語言抽象,官網:http://adapton.org/ 提供了非常不錯的參考資料
除此,在構建工具方面,在這一方面微軟研究院的《Build Systems à la Carte》提供了一個非常不錯的介紹,如果你可以參考這一篇《【工業聚看論文】第一期:《Build Systems à la Carte: Theory and Practice》
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/YynLAkFflO9rZlW2jSAvfA