Apache Pulsar 在拉卡拉的技術實踐

關於 Apache Pulsar

Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代雲原生分佈式消息流平臺,集消息、存儲、輕量化函數式計算爲一體,採用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據複製,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。 
GitHub 地址:http://github.com/apache/pulsar/

項目背景介紹

拉卡拉支付成立於 2005 年,是國內領先的第三方支付企業,致力於整合信息科技,服務線下實體,從支付切入,全維度爲中小微商戶的經營賦能。2011 年成爲首批獲得《支付業務許可證》企業的一員,2019 年上半年服務商戶超過 2100 萬家。2019 年 4 月 25 日,登陸創業板。

功能需求

由於拉卡拉的項目組數量較多,各個項目在建設時,分別根據需要選擇了自己的消息系統。這就導致一方面很多系統的業務邏輯和具體的消息系統之間存在耦合,爲後續系統維護和升級帶來麻煩;另一方面業務團隊成員對消息系統的管理和使用水平存在差異,從而使得整體系統服務質量和性能不穩定;此外,同時維護多套系統,物理資源利用率和管理成本都比較高。

因此,我們計劃建設一套分佈式基礎消息平臺,同時爲各個團隊提供服務。該平臺需要具備以下特性:高可靠、低耦合、租戶隔離、易於水平擴展、易於運營維護、統一管理、按需申請使用,同時支持傳統的消息隊列和流式隊列。表 1 展示了這兩類服務應該具備的特性。

表 1. 對消息隊列和流式隊列的要求

爲什麼選擇 Apache Pulsar

大廠開源背書

現在可供用戶選擇的大廠開源消息平臺有很多,架構設計大多類似,比如 Kafka 和 RocketMQ 都採用存儲與計算一體的架構,只有 Pulsar 採用存儲與計算分離的多層架構。我們比較選型的消息系統有三個:Kafka、RocketMQ 和 Pulsar。測試之前,我們通過網上的公開數據,對三者的性能和功能進行了簡單的對比,表 2 爲對比結果。從中可以看出 Pulsar 更符合我們的需求。

表 2. Kafka、RocketMQ 和 Pulsar 性能、功能對比

Pulsar 的架構優勢

Pulsar 是雲原生的分佈式消息流平臺,源於 Yahoo!,支持 Yahoo! 應用,服務 140 萬個 topic,日處理超過 1000 億條消息。2016 年 Yahoo! 開源 Pulsar 並將其捐贈給 Apache 軟件基金會,2018 年 Pulsar 成爲 Apache 軟件基金會的頂級項目。

作爲一種高性能解決方案,Pulsar 具有以下特性:支持多租戶,通過多租戶可爲每個租戶單獨設置認證機制、存儲配額、隔離策略等;高吞吐、低延遲、高容錯;原生支持多集羣部署,集羣間支持無縫數據複製;高可擴展,能夠支撐上百萬個 topic;支持多語言客戶端,如 Java、Go、Python、C++ 等;支持多種消息訂閱模式(獨佔、共享、災備、Key_Shared)。

架構合理

Kafka 採用計算與存儲一體的架構,當 topic 數量較多時,Kafka 的存儲機制會導致緩存污染,降低性能。Pulsar 採用計算與存儲分離的架構(如圖 1)。無狀態計算層由一組接收和投遞消息的 broker 組成,broker 負責與業務系統進行通信,承擔協議轉換,序列化和反序列化、選主等功能。有狀態存儲層由一組 bookie 存儲節點組成,可以持久存儲消息。

圖 1. Pulsar 架構圖

Broker 架構

Broker 主要由四個模塊組成。我們可以根據實際需求對相應的功能進行二次開發。

•Dispatcher:調度分發模塊,承擔協議轉換、序列化反序列化等。•Load balancer:負載均衡模塊,對訪問流量進行控制管理。•Global replicator:跨集羣複製模塊,承擔異步的跨集羣消息同步功能。•Service discovery:服務發現模塊,爲每個 topic 選擇無狀態的主節點。

圖 2. Broker 架構圖

持久層(BookKeeper)架構

圖 3 爲 Pulsar 中持久層的架構圖。Bookie 是 BookKeeper 的存儲節點,提供獨立的存儲服務。ZooKeeper 爲元數據存儲系統,提供服務發現以及元數據管理服務。BookKeeper 架構屬於典型的 slave-slave 架構,所有 bookie 節點的角色都是 slave,負責持久化數據,每個節點的處理邏輯都相同;BookKeeper 客戶端爲 leader 角色,承擔協調工作,由於其本身無狀態,所以可以快速實現故障轉移。

圖 3. Pulsar 持久層架構圖隔離架構

隔離架構

保證了 Pulsar 的優良性能,主要體現在以下幾個方面:

•IO 隔離:寫入、追尾讀和追趕讀隔離。• 利用網絡流入帶寬和磁盤順序寫入的特性實現高吞吐寫:傳統磁盤在順序寫入時,帶寬很高,零散讀寫導致磁盤帶寬降低,採取順序寫入方式可以提升性能。• 利用網絡流出帶寬和多個磁盤共同提供的 IOPS 處理能力實現高吞吐讀:收到數據後,寫到性能較好的 SSD 盤裏,進行一級緩存,然後再使用異步線程,將數據寫入到傳統的 HDD 硬盤中,降低存儲成本。• 利用各級緩存機制實現低延遲投遞:生產者發送消息時,將消息寫入 broker 緩存中;實時消費時(追尾讀),首先從 broker 緩存中讀取數據,避免從持久層 bookie 中讀取,從而降低投遞延遲。讀取歷史消息(追趕讀)場景中,bookie 會將磁盤消息讀入 bookie 讀緩存中,從而避免每次都讀取磁盤數據,降低讀取延時。

圖 4. Pulsar 隔離架構圖

對比總結

左側爲 Kafka、RabbitMQ 等消息系統採用的架構設計,broker 節點同時負責計算與存儲,在某些場景中使用這種架構,可以實現高吞吐;但當 topic 數量增加時,緩存會受到污染,影響性能。

右側爲 Pulsar 的架構,Pulsar 對 broker 進行了拆分,增加了 BookKeeper 持久層,雖然這樣會增加系統的設計複雜性,但可以降低系統的耦合性,更易實現擴縮容、故障轉移等功能。表 3 總結了分區架構和分片架構的主要特性。

圖 5. 分區架構與分片架構對比圖

表 3. 分區架構與分片架構特性

基於對 Pulsar 的架構和功能特點,我們對 Pulsar 進行了測試。在操作系統層面使用 NetData 工具進行監控,使用不同大小的數據包和頻率進行壓測,測試的幾個重要指標是磁盤、網絡帶寬等的波動情況。

圖 6. Pulsar 測試過程

測試結論如下:

• 部署方式:混合部署優於分開部署。broker 和 bookie 可以部署在同一個節點上,也可以分開部署。節點數量較多時,分開部署較好;節點數量較少或對性能要求較高時,將二者部署在同一個節點上較好,可以節省網絡帶寬,降低延遲。• 負載大小:隨着測試負載的增大,tps 降低,吞吐量穩定。• 刷盤方式:異步刷盤優於同步刷盤。• 壓縮算法:壓縮算法推薦使用 LZ4 方式。我們分別測試了 Pulsar 自帶的幾種壓縮方式,使用 LZ4 壓縮算法時,CPU 使用率最低。使用壓縮算法可以降低網絡帶寬使用率,壓縮比率爲 82%。• 分區數量:如果單 topic 未達到單節點物理資源上限,建議使用單分區;由於 Pulsar 存儲未與分區耦合,可以根據業務發展情況,隨時調整分區數量。• 主題數量:壓測過程中,增加 topic 數量,性能不受影響。• 資源約束:如果網絡帶寬爲千兆,網絡會成爲性能瓶頸,網絡 IO 可以達到 880 MB/s;在網絡帶寬爲萬兆時,磁盤會成爲瓶頸,磁盤 IO 使用率爲 85% 左右。• 內存與線程:如果使用物理主機,需注意內存與線程數目的比例。默認配置參數爲 IO 線程數等於 CPU 核數的 2 倍。這種情況下,實體機核數爲 48 核,如果內存設置得較小,比較容易出現 OOM 的問題。

除了上述測試以外,我們還複測了 Jack Vanlightly(RabbitMQ 的測試工程師)的破壞性測試用例,得到如下結論:

  1. 所有測試場景中,沒有出現消息丟失與消息亂序;2. 開啓消息去重的場景中,沒有出現消息重複。

支持團隊專業

另外,我們與 Apache Pulsar 項目的核心開發人員交流溝通時間較早,他們在 Yahoo! 和推特有過豐富的實踐經驗,預備成立公司在全世界範圍內推廣使用 Pulsar,並且會將中國作爲最重要的基地,這爲我們的使用提供了強有力的保障。現在大家也都知道,他們成立了 StreamNative 公司,並且已獲得多輪融資,隊伍也在不斷壯大。

Pulsar 在基礎消息平臺的實踐

我們基於 Pulsar 構建的基礎消息平臺架構如下圖,圖中綠色部分爲基於 Pulsar 實現的功能或開發的組件。本節將結合實際使用場景,詳細介紹我們如何在實際使用場景中應用 Pulsar 及基於 Pulsar 開發的組件。

場景 1:流式隊列

1. OGG For Pulsar 適配器

源數據存儲在 Oracle 中,我們希望實時抓取 Oracle 的變更數據,進行實時計算、數據分析、提供給下游業務系統查詢等場景。

我們使用 Oracle 的 OGG(Oracle Golden Gate) 工具進行實時抓取,它包含兩個模塊:源端 OGG 和目標 OGG。由於 OGG 官方沒有提供 Sink 到 Pulsar 的組件,我們根據需要開發了 OGG For Pulsar 組件。下圖爲數據處理過程圖,OGG 會抓取到表中每條記錄的增刪改操作,並且把每次操作作爲一條消息推送給 OGG For Pulsar 組件。OGG For Pulsar 組件會調用 Pulsar 客戶端的 producer 接口,進行消息投遞。投遞過程中,需要嚴格保證消息順序。我們使用數據庫表的主鍵作爲消息的 key,數據量大時,可以根據 key 對 topic 進行分區,將相同的 key 投遞到同一分區,從而保證對數據庫表中主鍵相同的記錄所進行的增刪改操作有序。

圖 8. OGG For Pulsar 組件示意圖

2. Pulsar To TiDB 組件

我們通過 Pulsar To TiDB 組件將抓取到的變更消息存儲到 TiDB 中,對下游系統提供查詢服務。這一組件的處理邏輯爲:

  1. 使用災備訂閱方式,消費 Pulsar 消息。2. 根據消息的 key 進行哈希運算,將相同的 key 散列到同一持久化線程中。3. 啓用 Pulsar 的消息去重功能,避免消息重複投遞。假設 MessageID2 重複投遞,那麼數據一致性將被破壞。

圖 9. Pulsar To TiDB 組件使用流程圖

3. Pulsar 的消息持久化過程分析

Pulsar 的消息持久化過程包括以下四步:

1.OGG For Pulsar 組件調用 Pulsar 客戶端的 producer 接口,投遞消息。2.Pulsar 客戶端根據配置文件中的 broker 地址列表,獲取其中一個 broker 的地址,然後發送 topic 歸屬查詢服務,獲取服務該 topic 的 broker 地址(下圖示例中爲 broker2)。3.Pulsar 客戶端將消息投遞給 Broker2。4.Broker2 調用 BookKeeper 的客戶端做持久化存儲,存儲策略包括本次存儲可選擇的 bookie 總數、副本數、成功存儲確認回覆數。

圖 10. Pulsar 的消息持久化示意圖

4. 數據庫表結構動態傳遞

OGG 使用 AVRO 方式進行序列化操作時,如果將多個表投遞到同一個 topic 中,AVRO Schema 爲二級結構:wrapper schema 和 table schema。wrapper schema 結構始終不變,包含 table_name、schema_fingerprint、payload 三部分信息;OGG 在抓取數據時,會感知數據庫表結構的變化並通知給 OGG For Pulsar,即表結構決定其 table schema,再由 table schema 生成對應的 schema_fingerprint。

我們將獲取到的 table schema 發送並存儲在指定的 Schema topic 中。Data topic 中的消息只包含 schema_fingerprint 信息,這樣可以降低序列化後消息包的大小。Pulsar To TiDB 啓動時,從 Schema topic 消費數據,使用 schema_fingerprint 爲 Key 將 table schema 緩存在內存中。反序列化 Data Topic 中的消息時,從緩存中根據 schema_fingerprint 提取 table schema,對 payload 進行反序列化操作。

圖 11. 表結構管理流程圖

5. 一致性保障

要保證消息有序和去重,需要從 broker、producer、consumer 三方面進行設置。

Broker

• 在 namespace 級別開啓去重功能:bin/pulsar-admin namespaces set-deduplication namespace --enable• 修復 / 優化 Pulsar 客戶端死鎖問題。2.7.1 版本已修復,詳細信息可參考 PR 9552[1]。

Producer

•pulsar.producer.batchingEnabled=false

在 producer 設置中,關閉批量發送。如果開啓批量發送消息,則消息可能會亂序。

•pulsar.producer.blocklfQueueFull=true

爲了提高效率,我們採用異步發送消息,需要開啓阻塞隊列處理,否則可能會出現消息丟失。

調用異步發送超時,發送至異常 topic。如果在異步超時重發消息時,出現消息重複,可以通過開啓自動去重功能進行處理;其它情況下出現的消息發送超時,需要單獨處理,我們將這些消息存儲在異常 topic 中,後續通過對賬程序從源庫直接獲取終態數據。

Consumer

實現攔截器:ConsumerInterceptorlmpl implements ConsumerInterceptor 配置確認超時:pulsarClient.ackTimeout(3000, TimeUnit.MILLISECONDS).ackTimeoutTickTime(500, TimeUnit.MILLISECONDS) 使用累積確認:consumer.acknowledgeCumulative(sendMessageID)

備註:配置確認超時參數,如果沒有在 ackTimeout 時間內進行消費確認的話,消息將重新投遞。爲了嚴格保證一致性,我們需要使用累計確認方式進行確認。

6. 消息消費的確認方式

假如在 MessageID 爲 1 的消息已確認消費成功,開始採用累積確認方式,此時正在確認 MessageID 爲 3 的消息,則已消費但未確認的 MessageID 爲 2 的消息也會被確認成功。假如在 “確認超時” 時間內一直未收到確認,則會按照原順序重新投遞 MessageID 爲 2、3、4、5 的消息。

圖 12. 消息確認流程圖(1)

假如採用單條確認方式,圖中 MessageID 爲 1、3、4 的消息確認消費成功,而 MessageID 爲 2 的消息 “確認超時”。在這種情況下,如果應用程序處理不當,未按照消費順序逐條確認,則出現消息“確認超時” 時,只有發生超時的消息(即 MessageID 爲 2 的消息)會被重新投遞,導致消費順序發生錯亂。

圖 12. 消息確認流程圖(2)

總結:隊列消費模式建議使用單條確認方式,流式消費模式建議使用累積確認方式。

7. 消息確認超時(客戶端)檢測機制

確認超時機制中有兩個參數,超時時間和輪詢間隔。超時檢測機制通過一個雙向隊列 + 多個 HashSet 實現。HashSet 的個數爲(超時時間)除以(輪詢間隔)後取整,因此每次輪詢處理一個 HashSet,從而有效規避全局鎖帶來的性能損耗。

圖 13. 消息確認超時(客戶端)檢測機制示意圖

場景 2:消息隊列:OpenMessaging 協議實現(透明層協議)

我們過去使用的很多業務系統都和消息系統強耦合,導致後續升級和維護很麻煩,因此我們決定使用 OpenMessaging 協議作爲中間層進行解耦。

  1. 通過 Pulsar 實現 OpenMessaging 協議。2. 開發框架(基於 spring boot)調用 OpenMessaging 協議接口,發送和接收消息。

圖 14. 透明層協議流程圖

場景 3:流式隊列:自定義 Kafka 0.8-Source(Source 開發)

Pulsar IO 可以輕鬆對接到各種數據平臺。我們的部分業務系統使用的是 Kafka 0.8,官方沒有提供對應的 Source,因此我們根據 Pulsar IO 的接口定義,開發了 Kafka 0.8 Source 組件。

圖 15. Kafka 0.8 Source 組件示意圖

場景 4:流式隊列:Function 消息過濾(消息過濾)

我們通過 Pulsar Functions 把 Pulsar IDC 集羣消息中的敏感字段(比如身份證號,手機號)脫敏後實時同步到雲集羣中,供雲上應用消費。

圖 16. Pulsar Functions 消息過濾示意圖

商戶經營分析場景中,Flink 通過 Pulsar Flink Connector 連接到 Pulsar,對流水數據根據不同維度,進行實時計算,並且將計算結果再通過 Pulsar 持久化到 TiDB 中。從目前的使用情況來看,Pulsar Flink Connector 的性能和穩定性均表現良好。

圖 17. Pulsar Flink Connector 流式計算示意圖

場景 6:流式隊列:TiDB CDC 適配(TiDB 適配)

我們需要基於 TiDB 數據變更進行實時抓取,但 TiDB CDC For Pulsar 序列化方式不支持 AVRO 方式,因此我們針對這一使用場景進行了定製化開發,即先封裝從 TiDB 發出的數據,再投遞到 Pulsar 中。TiDB CDC For Pulsar 組件的開發語言爲 Go 語言。

圖 18. TiDB CDC For Pulsar 組件示意圖

未來規劃

我們基於 Pulsar 構建的基礎消息平臺有效提高了物理資源的使用效率;使用一套消息平臺簡化了系統維護和升級等操作,整體服務質量也得以提升。我們對 Pulsar 的未來使用規劃主要包括以下兩點:

  1. 陸續下線其它消息系統,最終全部接入到 Pulsar 基礎消息平臺;2. 深度使用 Pulsar 的資源隔離和流控機制。

在實踐過程中,藉助 Pulsar 諸多原生特性和基於 Pulsar 開發的組件,新消息平臺完美實現了我們預期的功能需求。

作者簡介 
姜殿璟,基礎架構部架構師,負責基礎消息平臺及其生態的建設與運營,團隊合作將 Apache Pulsar 引入到拉卡拉核心架構中,並且在強一致性的流式消費場景和隊列消費場景中取得了成功實踐,目前主要負責 Pulsar 性能調優、新功能開發及 Pulsar 生態集成。

引用鏈接

[1] PR 9552: https://github.com/apache/pulsar/pull/9552

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/aTWfEPGkhX4_bhlQQuzf-g