Flink 和 Pulsar 的批流融合

**摘要:**StreamNative 聯合創始人翟佳在本次演講中介紹了下一代雲原生消息流平臺 Apache Pulsar,並講解如何通過 Apache Pulsar 原生的存儲計算分離的架構提供批流融合的基礎,以及 Apache Pulsar 如何與 Flink 結合,實現批流一體的計算。內容包括:

  1. Apache Pulsar 是什麼

  2. Pulsar 的數據視圖

  3. Pulsar 與 Flink 的批流融合

  4. Pulsar 現有能力和進展

 GitHub 地址 

https://github.com/apache/flink

歡迎大家給 Flink 點贊送 star~

Apache Pulsar 相對比較新,它於 2017 年加入 Apache 軟件基金會,2018 年才從 Apache 軟件基金會畢業併成爲一個頂級項目。Pulsar 由於原生採用了存儲計算分離的架構,並且有專門爲消息和流設計的存儲引擎 BookKeeper,結合 Pulsar 本身的企業級特性,得到了越來越多開發者的關注。

一、Apache Pulsar 是什麼

下圖是屬於消息領域的開源工具,從事消息或者基礎設施的開發者對這些一定不會陌生。雖然 Pulsar 在 2012 年開始開發,直到 2016 年纔開源,但它在跟大家見面之前已經在雅虎的線上運行了很長時間。這也是爲什麼它一開源就得到了很多開發者關注的原因,它已經是一個經過線上檢驗的系統。


Pulsar 跟其他消息系統最根本的不同在於兩個方面:

架構

下圖展示了 Pulsar 存儲計算分離的架構:

這個分層的架構對用戶的集羣擴展十分方便:

這個雲原生的架構有兩個主要特點:

從節點對等來說,Broker 層不存儲數據,所以很容易實現節點對等。但是 Pulsar 在底層的存儲也是節點對等狀態:在存儲層,BookKeeper 沒有采用 master/slave 這種主從同步的方式,而是通過 Quorum 的方式。

如果是要保持多個數據備份,用戶通過一個 broker 併發地寫三個存儲節點,每一份數據都是一個對等狀態,這樣在底層的節點也是一個對等的狀態,用戶要做底層節點的擴容和管理就會很容易。有這樣節點對等的基礎,會給用戶帶來很大的雲原生的便捷,方便用戶在每一層單獨擴容,也會提高用戶的線上系統的可用性和維護性。

同時,這種分層的架構爲我們在 Flink 做批流融合打好了基礎。因爲它原生分成了兩層,可以根據用戶的使用場景和批流的不同訪問模式,來提供兩套不同的 API。

存儲 BookKeeper

Pulsar 另一個優勢是有專門爲流和消息設計的存儲引擎 Apache BookKeeper。它是一個簡單的 write-ahead-log 抽象。Log 抽象和流的抽象類似,所有的數據都是源源不斷地從尾部直接追加。

它給用戶帶來的好處就是寫入模式比較簡單,可以帶來比較高的吞吐。在一致性方面,BookKeeper 結合了 PAXOS 和 ZooKeeper ZAB 這兩種協議。BookKeeper 暴露給大家的就是一個 log 抽象。你可以簡單認爲它的一致性很高,可以實現類似 Raft 的 log 層存儲。BookKeeper 的誕生是爲了服務我們在 HDFS naming node 的 HA,這種場景對一致性要求特別高。這也是爲什麼在很多關鍵性的場景裏,大家會選擇 Pulsar 和 BookKeeper 做存儲的原因。

BookKeeper 的設計中,有專門的讀寫隔離,簡單理解就是,讀和寫是發生在不同的磁盤。這樣的好處是在批流融合的場景可以減少與歷史數據讀取的相互干擾,很多時候用戶讀最新的實時數據時,不可避免會讀到歷史數據,如果有一個專門爲歷史數據而準備的單獨的磁盤,歷史數據和實時數據的讀寫不會有 IO 的爭搶,會對批流融合的 IO 服務帶來更好的體驗。

應用場景

Pulsar 場景應用廣泛。下面是 Pulsar 常見的幾種應用場景:

我們在 2020 年 11 月底的 Pulsar Summit 亞洲峯會,邀請 40 多位講師來分享他們的 Pulsar 落地案例。如果大家對 Pulsar 應用場景比較感興趣,可以關注 B 站上 StreamNative 的賬號,觀看相關視頻。

二、Pulsar 的數據視圖

在這些應用場景中,Unified Data Processing 尤爲重要。關於批流融合,很多國內用戶的第一反應是選擇 Flink。我們來看 Pulsar 和 Flink 結合有什麼樣的優勢?爲什麼用戶會選擇 Pulsar 和 Flink 做批流融合。


首先,我們先從 Pulsar 的數據視圖來展開。跟其他的消息系統一樣,Pulsar 也是以消息爲主體,以 Topic 爲中心。所有的數據都是 producer 交給 topic,然後 consumer 從 topic 訂閱消費消息。

Partition 分區

爲了方便擴展,Pulsar 在 topic 內部也有分區的概念,這跟很多消息系統都類似。上面提到 Pulsar 是一個分層的架構,它採用分區把 topic 暴露給用戶,但是在內部,實際上每一個分區又可以按照用戶指定的時間或者大小切成一個分片。一個 Topic 最開始創建的時候只有一個 active 分片,隨着用戶指定的時間到達以後,會再切一個新的分片。在新開一個分片的過程中,存儲層可以根據各個節點的容量,選擇容量最多的節點來存儲這個新的分片。

這樣的好處是,topic 的每一個分片都會均勻地散佈在存儲層的各個節點上,實現數據存儲的均衡。如果用戶願意,就可以用整個存儲集羣來存儲分區,不再被單個節點容量所限制。如下圖所示,該 Topic 有 4 個分區,每一個分區被拆成多個分片,用戶可以按照時間(比如 10 分鐘或者一個小時),也可以按照大小(比如 1G 或者 2G)切一個分片。分片本身有順序性,按照 ID 逐漸遞增,分片內部所有消息按照 ID 單調遞增,這樣很容易保證順序性。

Stream 流存儲

我們再從單個分片來看一下,在常見流(stream)數據處理的概念。用戶所有的數據都是從流的尾部不斷追加,跟流的概念相似,Pulsar 中 Topic 的新數據不斷的添加在 Topic 的最尾部。不同的是,Pulsar 的 Topic 抽象提供了一些優勢:

做基礎設施的同學,如果看到按照時間分片的架構,很容易想到把老的分片搬到二級存儲裏面去,在 Pulsar 裏也是這樣做的。用戶可以根據 topic 的消費熱度,設置把老的,或者超過時限或大小的數據自動搬到二級存儲中。用戶可以選擇使用 Google,微軟的 Azure 或者 AWS 來存儲老的分片,同時也支持 HDFS 存儲。

這樣的好處是:對最新的數據可以通過 BookKeeper 做快速返回,對於老的冷數據可以利用網絡存儲雲資源做一個無限的流存儲。這就是 Pulsar 可以支持無限流存儲的原因,也是批流融合的一個基礎。

總體來說,Pulsar 通過存儲計算分離,爲大家提供了實時數據和歷史數據兩套不同的訪問接口。用戶可以依據內部不同的分片位置,根據 metadata 來選擇使用哪種接口來訪問數據。同時根據分片機制可以把老的分片放到二級存儲中,這樣可以支撐無限的流存儲。

Pulsar 的統一體現在對分片元數據管理的方面。每個分片可以按照時間存放成不同的存儲介質或格式,但 Pulsar 通過對每個分片的 metadata 管理,來對外提供一個分區的邏輯概念。在訪問分區中的一個分片的時候我可以拿到它的元數據,知道它的在分區中的順序,數據的存放位置和保存類型 Pulsar 對每一個分片的 metadata 的管理,提供了統一的 topic 的抽象。

三、Pulsar 和 Flink 的批流融合

在 Flink 中,流是一個基礎的概念,Pulsar 可以作爲流的載體來存儲數據。如果用戶做一個批的計算,可以認爲它是一個有界的流。對 Pulsar 來說,這就是一個 Topic 有界範圍內的分片。


在圖中我們可以看到,topic 有很多的分片,如果確定了起止的時間,用戶就可以根據這個時間來確定要讀取的分片範圍。對實時的數據,對應的是一個連續的查詢或訪問。對 Pulsar 的場景來說就是不停的去消費 Topic 的尾部數據。這樣,Pulsar 的 Topic 的模型就可以和 Flink 流的概念很好的結合,Pulsar 可以作爲 Flink 流計算的載體。

對有界的流和無界的流,Pulsar 採取不同的響應模式:

簡單來說,Flink 提供了統一的視圖讓用戶可以用統一的 API 來處理 streaming 和歷史數據。以前,數據科學家可能需要編寫兩套應用分別用來處理實時數據和歷史數據,現在只需要一套模型就能夠解決這種問題。

Pulsar 主要提供一個數據的載體,通過基於分區分片的架構爲上面的計算層提供流的存儲載體。因爲 Pulsar 採用了分層分片的架構,它有針對流的最新數據訪問接口,也有針對批的對併發有更高要求的存儲層訪問接口。同時它提供無限的流存儲和統一的消費模型。

四、Pulsar 現有能力和進展

最後我們額外說一下 Pulsar 現在有怎樣的能力和最近的一些進展。

現有能力

■ schema

在大數據中,schema 是一個特別重要的抽象。在消息領域裏面也是一樣,在 Pulsar 中,如果 producer 和 consumer 可以通過 schema 來簽訂一套協議,那就不需要生產端和消費端的用戶再線下溝通數據的發送和接收的格式。在計算引擎中我們也需要同樣的支持。

在 Pulsar-Flink connector 中,我們借用 Flink schema 的 interface,對接 Pulsar 自帶的 Schema,Flink 能夠直接解析存儲在 Pulsar 數據的 schema。這個 schema 包括兩種:

同時我們結合 Flip-107,整合 Flink metadata schema 和 Avro 的 metadata,可以將兩種 Schema 結合在一起做更復雜的查詢。

■ source

有了這個 schema,用戶可以很容易地把它作爲一個 source,因爲它可以從 schema 的信息理解每個消息。

■ Pulsar Sink

我們也可以把在 Flink 中的計算結果返回給 Pulsar 把它做爲 Sink。

■ Streaming Tables

有了 Sink 和 Source 的支持,我們就可以把 Flink table 直接暴露給用戶。用戶可以很簡單的把 Pulsar 作爲 Flink 的一個 table,查找數據。

■write to straming tables

下圖展示如何把計算結果或數據寫到 Pulsar 的 Topic 中去。

■ Pulsar Catalog

Pulsar 自帶了很多企業流的特性。Pulsar 的 topic(e.g. persistent://tenant_name/namespace_name/topic_name)不是一個平鋪的概念,而是分很多級別。有 tenant 級別,還有 namespace 級別。這樣可以很容易得與 Flink 常用的 Catalog 概念結合。

如下圖所示,定義了一個 Pulsar Catalog,database 是 tn/ns,這是一個路徑表達,先是 tenant,然後是 namespace,最後再掛一個 topic。這樣就可以把 Pulsar 的 namespace 當作 Flink 的 Catalog,namespace 下面會有很多 topic,每個 topic 都可以是 Catalog 的 table。這就可以很容易地跟 Flink Cataglog 做很好的對應。在下圖中,上方的是 Catalog 的定義,下方則演示如何使用這個 Catalog。不過,這裏還需要進一步完善,後邊也有計劃做 partition 的支持。

■ FLIP-27

FLIP-27 是 Pulsar - Flink 批流融合的一個代表。前面介紹了 Pulsar 提供統一的視圖,管理所有 topic 的 metadata。在這個視圖中,根據 metadata 標記每個分片的信息,再依靠 FLIP-27 的 framework 達到批流融合的目的。FLIP-27 中有兩個概念:Splitter 和 reader。

它的工作原理是這樣的,首先會有一個 splitter 把數據源做切割,之後交給 reader 讀取數據。對 Pulsar 來說,splitter 處理的還是 Pulsar 的一個 topic。抓到 Pulsar topic 的 metadata 之後,根據每個分片的元數據來判斷這個分片存儲在什麼位置,再選最合適的 reader 進行訪問。Pulsar 提供統一的存儲層,Flink 根據 splitter 對每個分區的不同位置和格式的信息,選擇不同的 reader 讀取 Pulsar 中的數據。

■ Source 高併發

另一個和 Pulsar 消費模式緊密相關的是。很多 Flink 用戶面臨的問題是如何讓 Flink 更快地執行任務。例如,用戶給了 10 個併發度,它會有 10 個 job 併發,但假如一個 Kafka 的 topic 只有 5 個分區,由於每個分區只能被一個 job 消費,就會有 5 個 Flink job 是空閒的。如果想要加快消費的併發度,只能跟業務方協調多開幾個分區。這樣的話,從消費端到生產端和後邊的運維方都會覺得特別複雜。並且它很難做到實時的按需更新。

而 Pulsar 不僅支持 Kafka 這種每個分區只能被一個 active 的 consumer 消費的情況,也支持 Key-Shared 的模式,多個 consumer 可以共同對一個分區進行消費,同時保證每個 key 的消息只發給一個 consumer,這樣就保證了 consumer 的併發,又同時保證了消息的有序。

對前面的場景,我們在 Pulsar Flink 裏做了 Key-shared 消費模式的支持。同樣是 5 個分區,10 個併發 Flink job。但是我可以把 key 的範圍拆成 10 個。每一個 Flink 的子任務消費在 10 個 key 範圍中的一個。這樣從用戶消費端來說,就可以很好解耦分區的數量和 Flink 併發度之間的關係,也可以更好提供數據的併發。

■ 自動 Reader 選擇

另外一個方向是上文提到的 Pulsar 已經有統一的存儲基礎。我們可以在這個基礎上根據用戶不同的 segment metadata 選擇不同的 reader。目前,我們已經實現該功能。

近期工作

最近,我們也在做和 Flink 1.12 整合相關的工作。Pulsar-Flink 項目也在不停地做迭代,比如我們增加了對 Pulsar 2.7 中事務的支持,並且把端到端的 Exactly-Once 整合到 Pulsar Flink repo 中;另外的工作是如何讀取 Parquet 格式的二級存儲的列數據;以及使用 Pulsar 存儲層做 Flink 的 state 存儲等。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/reFTJ-eqNptkSILzck94uA