SeaTunnel WAL 機制實現內存數據持久化存儲
作者 | 郭強,Apache SeaTunnel PPMC
前言
Hazelcast IMDG 是一種開源分佈式內存對象存儲,支持多種數據結構。
SeaTunnel Engine 使用 Hazelcast IMDG 將數據存儲在 RAM 中,從而在集羣中傳播和複製數據。
但由於它是分佈式內存數據庫,因此一旦宕機,內存中的數據就會丟失。爲了解決這個問題,我們開發實現了一個 WAL (Write-Ahead Logging) 機制, 從而可以將數據持久化寫入第三方文件存儲,以便在服務器宕機時可以恢復數據。
WAL 機制
WAL 機制是將數據寫入磁盤的機制。它的工作原理是在寫入數據之前,先將數據寫入日誌文件中。當數據寫入日誌文件後,再將數據寫入內存。這樣,即使在寫入數據時發生了宕機,也可以通過日誌文件中的數據恢復數據。 它最重要的作用是數據恢復,一旦服務器崩潰,通過 WAL 日誌,我們可以恢復崩潰之前的數據。這也意味如果寫入 WAL 失敗,整個操作將認爲失敗。 因此可以充分保證得數據的完整性和一致性,而它基本是數據庫軟件崩潰恢復的標準做法。
對於我們來講,大體流程則是,先寫入磁盤,而後寫入成功,再寫入內存。如果宕機或者節點重啓,則全量讀取 WAL 文件進行數據恢復。
整體流程
由於 Hazelcast IMDG 是分佈式內存數據庫,因此每個節點都可以去進行數據操作,因此,這種情況下我們每個節點都有自己的 WAL 文件,而不是像主從架構那樣,只有主節點有 WAL 文件。
如下:
-
每個節點都有自己的 WAL 文件,每個節點都會將數據寫入自己的 WAL 文件中。
-
當節點宕機時,會將全量讀取所有的 WAL 文件,然後將數據恢復到內存中。
1 存儲目錄
我們根據 namespace 來區分不同的 WAL 文件,每個 namespace 又會根據自己所在的節點創建對應的 WAL 文件。因此,它的文件目錄可以抽象爲一個樹形結構,如下圖所示:
├── namespace
│ ├── cluster name 1
│ │ ├── business name 1
│ │ │ ├── node1
│ │ │ │ ├── wal file
│ │ │ ├── node 2
│ │ │ │ ├── wal file
│ ├── cluster name 2
│ │ ├── business name 2
│ │ │ ├── node1
│ │ │ │ ├── wal file
│ │ │ ├── node 2
│ │ │ │ ├── wal file
-
namespace:區分不同的 SeaTunnel 存儲業務
-
cluster name:集羣名稱,用於區分不同的集羣,每個集羣都有自己的集羣名稱。
-
business name:業務名稱,用於區分不同的業務,每個業務都有自己的業務名稱。
-
node:節點,用於區分不同的節點,每個節點都有自己的節點名稱, 且不會產生衝突。
-
wal file:WAL 文件,用於存儲數據。
2 寫入流程
我們使用 Disruptor 框架來實現 WAL 的寫入,它的寫入流程如下:
1、將數據寫入到 Disruptor 中。
2、Disruptor 將數據寫入到 WAL 文件中。
3、將數據寫入到磁盤中。
4、返回寫入結果。
當用戶去進行 DDL 操作的時候(需要注意,所有的 DDL 操作在我們的實際存儲中都是追加,沒有真正的物理刪除),用戶會去構建對應的 WAL Data(包含操作指令,版本,序列化,namespce 等一系列信息,以及對應的序列化信息等等), 然後提交消息到 disruotor,這個過程是同步過程,當本次操作的數據進行消費,真正執行 append 命令後則返回,這個過程完成後會返回給用戶一個成功的消息,即意味着本次操作執行成功。
我們計劃會有一個異步定時線程,去執行 flush 以及歸檔操作,當我們的數據達到一定的閾值後,我們會將數據進行歸檔,歸檔的過程是將數據進行壓縮,然後寫入到 Storage-Data 中,然後將 WAL 中的數據刪除,同時創建新的 WAL 文件。 我們進行過基準測試,雙線程生產,單線程消費,當數據在 byte 級別的時候,單機可以達到上萬次操作,在 MB 級別的時候,大概是不到一百的操作,其中 byte 級別的數據內存設置爲 512M,MB 級別的數據內存設置爲 5G。但有待討論,目前社區認爲每次都是需要去進行 fsync 的,這個過程是需要消耗一定的時間的,所以我們需要在這個過程中進行優化。
3 WAL 文件數據結構
我們的存儲只有 K - V ,而 K 和 V 具體的類型並沒有限制,他們可能是 Long 類型,Map 類型,亦或者其它類型,但對於底層存儲來講,我們只需要將 K 和 V 序列化成字節數組,然後將字節數組寫入磁盤即可。
因此在最終存儲前數據都會被轉化成如下格式:
DataEntry{
Object key;
String keyClass; // 序列化的 class 名稱
Object value;
String valueClass; // 序列化的 class 名稱
Boolean deleted; // 所有的DDL操作最終都可以歸結爲刪除或者新增,因此我們只需要一個標識位即可。
Long timestamp; // 時間戳,用於數據的版本控制, 這要求分佈式系統的時間必須同步.或在可以接受的誤差範圍內。
}
而最終實際存儲後的數據是 DataEntry 的序列化後的字節數組。
因此實際存儲到磁盤的數據協議如下:
+--------------------------+---------------+
| Data Size (12) | data |
+--------------- ----------+---------------+
| record data length | Data (byte[]) |
+--------------------------+---------------+
當需要從存儲中恢復的時候,我們會將存儲中的數據讀取出來,然後將其反序列化成 DataEntry 對象,然後將其放入內存中。進行數據的恢復。這個過程中會進行排序去重。
當進行全量查詢的時候,我們需要查詢所有 namespace 的數據,這個時候,我們需要將所有的數據進行歸檔,但如果採用上面的做法,我們只查詢各個節點所有的歸檔的 WAL 信息,然後進行查詢即可。這個過程中會進行計算,歸併操作,所有的數據都是有序的,我們需要根據數據順序決定所有數據的最終值,舉個例子:
insert into data(K,V) values(1,1)
insert into data(K,V) values(1,2)
delete from data where K=1
insert into data(K,V) values(1,3)
update data set V=4 where K=1
上面這些記錄一共會在存儲中有五條記錄,因此我們會根據版本來進行合併,在上面的語句中,如果我們查詢,則實際返回的記錄的是 K=1 V=4。
而對於一個文件的數據歸併來講,這種過程其實更爲簡單,因爲文件中的數據是有序的,因此我們只需要將文件中的數據讀取出來,倒序排序後進行以下判斷:
delete set = new HashSet<>();// 存儲當前已經刪除的Key
data set = new HashSet<>();// 存儲當前已經存在的Key
is in delete set? // 如果在刪除集合中,那麼直接跳過 由於倒序,因此最先put的數據一定是最新的
yes: end
no:
is deleted? // 如果是刪除操作
yes:
add to delete set if not exist
end
no:
is in data set? // 如果在數據集合中,那麼直接跳過 由於倒序,因此最先put的數據一定是最新的
yes: end
no:
add to data set
end
優化和思考
q WAL 重寫
隨着寫入操作的執行,WAL 文件會變得越來越大。例如,如果將 key 爲 1 類型爲 Integer 的數據寫入 100 次,那麼 WAL 文件將包含 100 個相同的 key(類型爲 Integer) 爲 1 的數據。但實際有效數據只有一條。 這也對我們每次恢復數據時的性能有影響。
因此我們需要對 WAL 文件進行重寫,將重複的數據進行合併。這個過程中,我們會將 WAL 文件中的數據讀取出來,然後進行排序,然後進行去重,最後將去重後的數據寫入新的 WAL 文件中。
但這可能會有大量的內存佔用(取決於具體的數據集)。
一些建議的做法是,我們提供對應的 CLI 工具,用戶可以通過 CLI 工具來進行 WAL 文件的重寫。
這個過程不會影響到正在運行的服務,因此可以在業務低峯期進行。我們可以對已經歸檔的數據進行排序去重,最後生成臨時文件, 然後將臨時文件重命名爲原來的文件名,這樣就完成了重寫。
但這其中也有一些思考點:
分佈式重寫 vs 單機重寫
分佈式重寫的效率更高,更快。但我們需要考慮的是,如果有多個節點同時進行重寫,那麼會有多個節點同時寫入同一個文件,這樣會導致文件的內容不一致,因此我們需要對重寫的過程進行加鎖,這樣會導致重寫的效率降低。
單機重寫速率略慢,但卻可以保證在無競爭的狀態下去進行,因此可以保證重寫的正確性。
2 WAL 刷盤機制
WAL 文件的刷盤機制是一個比較重要的問題,如果 WAL 文件沒有及時刷盤,那麼在機器宕機的情況下,我們的數據就會丟失。但頻繁的刷盤會導致性能的下降。 因此我們需要對 WAL 文件的刷盤機制進行優化。
一般來說,我們會將 WAL 文件的刷盤機制分爲兩種:
-
同步刷盤:每次寫入數據後,都會調用 fsync() 方法,將數據刷盤。這樣可以保證數據的安全性,但是會導致性能的下降。
-
異步刷盤:
-
週期刷盤,每隔一段時間,將數據刷盤一次。這樣可以可以減少 fsync() 的調用次數,從而提高性能,但是也會存在窗口數據丟失的風險以及全量恢復時窗口數據可見性的問題。
-
基於數據量的刷盤:按照數據量來刷盤,比如每寫入 1MB 的數據,就調用一次 fsync() 方法,但是也會存在窗口數據丟失的風險。
採用何種方式取決於具體的業務場景,我們是否需要還有待於討論,但對我而言,針對不同的業務場景採用不同的方式纔是比較合理的。
Apache SeaTunnel(Incubating) 是一個分佈式、高性能、易擴展、用於海量數據(離線 & 實時)同步和轉化的數據集成平臺
倉庫地址:
https://github.com/apache/incubator-seatunnel
網址:
https://seatunnel.apache.org/
Proposal:
https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro
Apache SeaTunnel(Incubating) 下載地址:
https://seatunnel.apache.org/download
提交問題和建議:
https://github.com/apache/incubator-seatunnel/issues
貢獻代碼:
https://github.com/apache/incubator-seatunnel/pulls
訂閱社區開發郵件列表 :
dev-subscribe@seatunnel.apache.org
開發郵件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ
關注 Twitter:
https://twitter.com/ASFSeaTunnel
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/eyW57iQOl42WFjfcWuj4YQ