SeaTunnel WAL 機制實現內存數據持久化存儲

作者 | 郭強,Apache SeaTunnel PPMC

前言

Hazelcast IMDG 是一種開源分佈式內存對象存儲,支持多種數據結構。

SeaTunnel Engine 使用 Hazelcast IMDG 將數據存儲在 RAM 中,從而在集羣中傳播和複製數據。

但由於它是分佈式內存數據庫,因此一旦宕機,內存中的數據就會丟失。爲了解決這個問題,我們開發實現了一個 WAL (Write-Ahead Logging) 機制, 從而可以將數據持久化寫入第三方文件存儲,以便在服務器宕機時可以恢復數據。

WAL 機制

WAL 機制是將數據寫入磁盤的機制。它的工作原理是在寫入數據之前,先將數據寫入日誌文件中。當數據寫入日誌文件後,再將數據寫入內存。這樣,即使在寫入數據時發生了宕機,也可以通過日誌文件中的數據恢復數據。 它最重要的作用是數據恢復,一旦服務器崩潰,通過 WAL 日誌,我們可以恢復崩潰之前的數據。這也意味如果寫入 WAL 失敗,整個操作將認爲失敗。 因此可以充分保證得數據的完整性和一致性,而它基本是數據庫軟件崩潰恢復的標準做法。

對於我們來講,大體流程則是,先寫入磁盤,而後寫入成功,再寫入內存。如果宕機或者節點重啓,則全量讀取 WAL 文件進行數據恢復。

整體流程

由於 Hazelcast IMDG 是分佈式內存數據庫,因此每個節點都可以去進行數據操作,因此,這種情況下我們每個節點都有自己的  WAL 文件,而不是像主從架構那樣,只有主節點有 WAL 文件。

如下:

1 存儲目錄

我們根據 namespace 來區分不同的 WAL 文件,每個 namespace 又會根據自己所在的節點創建對應的 WAL 文件。因此,它的文件目錄可以抽象爲一個樹形結構,如下圖所示:

├── namespace 
│   ├── cluster name 1
│   │   ├── business name 1
│   │   │   ├── node1
│   │   │   │   ├── wal file
│   │   │   ├── node 2
│   │   │   │   ├── wal file
│   ├──  cluster name  2
│   │   ├── business name 2
│   │   │   ├── node1
│   │   │   │   ├── wal file
│   │   │   ├── node 2
│   │   │   │   ├── wal file

2 寫入流程

我們使用 Disruptor 框架來實現 WAL 的寫入,它的寫入流程如下:

1、將數據寫入到 Disruptor 中。

2、Disruptor 將數據寫入到 WAL 文件中。

3、將數據寫入到磁盤中。

4、返回寫入結果。

當用戶去進行 DDL 操作的時候(需要注意,所有的 DDL 操作在我們的實際存儲中都是追加,沒有真正的物理刪除),用戶會去構建對應的 WAL Data(包含操作指令,版本,序列化,namespce 等一系列信息,以及對應的序列化信息等等), 然後提交消息到 disruotor,這個過程是同步過程,當本次操作的數據進行消費,真正執行 append 命令後則返回,這個過程完成後會返回給用戶一個成功的消息,即意味着本次操作執行成功。

我們計劃會有一個異步定時線程,去執行 flush 以及歸檔操作,當我們的數據達到一定的閾值後,我們會將數據進行歸檔,歸檔的過程是將數據進行壓縮,然後寫入到 Storage-Data 中,然後將 WAL 中的數據刪除,同時創建新的 WAL 文件。 我們進行過基準測試,雙線程生產,單線程消費,當數據在 byte 級別的時候,單機可以達到上萬次操作,在 MB 級別的時候,大概是不到一百的操作,其中 byte 級別的數據內存設置爲 512M,MB 級別的數據內存設置爲 5G。但有待討論,目前社區認爲每次都是需要去進行 fsync 的,這個過程是需要消耗一定的時間的,所以我們需要在這個過程中進行優化。

3 WAL 文件數據結構

我們的存儲只有 K - V ,而 K 和 V 具體的類型並沒有限制,他們可能是 Long 類型,Map 類型,亦或者其它類型,但對於底層存儲來講,我們只需要將 K 和 V 序列化成字節數組,然後將字節數組寫入磁盤即可。

因此在最終存儲前數據都會被轉化成如下格式:

DataEntry{ 
    Object key; 
    String keyClass;  // 序列化的 class 名稱
    Object value;     
    String valueClass; // 序列化的 class 名稱
    Boolean deleted; // 所有的DDL操作最終都可以歸結爲刪除或者新增,因此我們只需要一個標識位即可。
    Long timestamp;  // 時間戳,用於數據的版本控制, 這要求分佈式系統的時間必須同步.或在可以接受的誤差範圍內。
}

而最終實際存儲後的數據是 DataEntry 的序列化後的字節數組。

因此實際存儲到磁盤的數據協議如下:

+--------------------------+---------------+
| Data Size  (12)          |      data     |
+--------------- ----------+---------------+
| record data length   | Data (byte[])     |
+--------------------------+---------------+

當需要從存儲中恢復的時候,我們會將存儲中的數據讀取出來,然後將其反序列化成 DataEntry 對象,然後將其放入內存中。進行數據的恢復。這個過程中會進行排序去重。

當進行全量查詢的時候,我們需要查詢所有 namespace 的數據,這個時候,我們需要將所有的數據進行歸檔,但如果採用上面的做法,我們只查詢各個節點所有的歸檔的 WAL 信息,然後進行查詢即可。這個過程中會進行計算,歸併操作,所有的數據都是有序的,我們需要根據數據順序決定所有數據的最終值,舉個例子:

insert into data(K,V) values(1,1)
insert into data(K,V) values(1,2)
delete from data where K=1
insert into data(K,V) values(1,3)
update data set V=4 where K=1

上面這些記錄一共會在存儲中有五條記錄,因此我們會根據版本來進行合併,在上面的語句中,如果我們查詢,則實際返回的記錄的是 K=1 V=4。

而對於一個文件的數據歸併來講,這種過程其實更爲簡單,因爲文件中的數據是有序的,因此我們只需要將文件中的數據讀取出來,倒序排序後進行以下判斷:

delete set = new HashSet<>();// 存儲當前已經刪除的Key
data set = new HashSet<>();// 存儲當前已經存在的Key
is in delete set?  // 如果在刪除集合中,那麼直接跳過 由於倒序,因此最先put的數據一定是最新的
    yes: end
    no: 
        is deleted? // 如果是刪除操作
            yes: 
                add to delete set if not exist
                end
            no: 
                is in data set? // 如果在數據集合中,那麼直接跳過 由於倒序,因此最先put的數據一定是最新的
                    yes: end
                    no: 
                        add to data set
                        end

優化和思考

q WAL 重寫

隨着寫入操作的執行,WAL 文件會變得越來越大。例如,如果將 key 爲 1 類型爲 Integer 的數據寫入 100 次,那麼 WAL 文件將包含 100 個相同的 key(類型爲 Integer) 爲 1 的數據。但實際有效數據只有一條。 這也對我們每次恢復數據時的性能有影響。

因此我們需要對 WAL 文件進行重寫,將重複的數據進行合併。這個過程中,我們會將 WAL 文件中的數據讀取出來,然後進行排序,然後進行去重,最後將去重後的數據寫入新的 WAL 文件中。

但這可能會有大量的內存佔用(取決於具體的數據集)。

一些建議的做法是,我們提供對應的 CLI 工具,用戶可以通過 CLI 工具來進行 WAL 文件的重寫。

這個過程不會影響到正在運行的服務,因此可以在業務低峯期進行。我們可以對已經歸檔的數據進行排序去重,最後生成臨時文件, 然後將臨時文件重命名爲原來的文件名,這樣就完成了重寫。

但這其中也有一些思考點:

分佈式重寫 vs 單機重寫

分佈式重寫的效率更高,更快。但我們需要考慮的是,如果有多個節點同時進行重寫,那麼會有多個節點同時寫入同一個文件,這樣會導致文件的內容不一致,因此我們需要對重寫的過程進行加鎖,這樣會導致重寫的效率降低。

單機重寫速率略慢,但卻可以保證在無競爭的狀態下去進行,因此可以保證重寫的正確性。

2 WAL 刷盤機制

WAL 文件的刷盤機制是一個比較重要的問題,如果 WAL 文件沒有及時刷盤,那麼在機器宕機的情況下,我們的數據就會丟失。但頻繁的刷盤會導致性能的下降。 因此我們需要對 WAL 文件的刷盤機制進行優化。

一般來說,我們會將 WAL 文件的刷盤機制分爲兩種:

採用何種方式取決於具體的業務場景,我們是否需要還有待於討論,但對我而言,針對不同的業務場景採用不同的方式纔是比較合理的。

Apache SeaTunnel(Incubating) 是一個分佈式、高性能、易擴展、用於海量數據(離線 & 實時)同步和轉化的數據集成平臺

倉庫地址:

https://github.com/apache/incubator-seatunnel

網址:

https://seatunnel.apache.org/

Proposal:

https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro

Apache SeaTunnel(Incubating)  下載地址:

https://seatunnel.apache.org/download

提交問題和建議:

https://github.com/apache/incubator-seatunnel/issues

貢獻代碼:

https://github.com/apache/incubator-seatunnel/pulls

訂閱社區開發郵件列表 :

dev-subscribe@seatunnel.apache.org

開發郵件列表:

dev@seatunnel.apache.org

加入 Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

關注 Twitter:

https://twitter.com/ASFSeaTunnel

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/eyW57iQOl42WFjfcWuj4YQ