B 站增量數據湖探索與實踐

本期作者

周暉棟

嗶哩嗶哩資深開發工程師

目前主要負責 B 站實時團隊增量數倉、Hudi 數據湖方向。

黃靖

嗶哩嗶哩資深開發工程師

專注於實時計算相關大數據技術,目前負責 Hudi 數據湖在 B 站的建設和應用。

陳世治

嗶哩嗶哩資深開發工程師

負責 B 站實時湖倉一體架構的建設,專注於 Flink 與 Hudi 生態結合的實時數據湖實踐與落地。

1. 背景

衆所周知,越實時的數據越有價值。直播、推薦、審覈等領域中有越來越多的場景需要近實時的數據來進行數據分析。我們在探索和實踐增量數據湖的過程中遇到許多痛點,如時效性、數據集成同步和批流一體的存儲介質不統一的問題。本文將介紹我們針對這些痛點所進行的思考與實踐方案。

1.1 時效性痛點

傳統數倉以小時 / 天級分區,數據完整纔可查。然而,一些用戶並不需要數據完整,只需要最近的數據做一些趨勢分析。因此,現狀無法滿足用戶越來越強的數據時效性需求。傳統數倉 ETL 上一個任務完成後,才能開始下一個任務。即使是小時分區,層級處理越多,數據最終產出時效性越低。

1.2 數據集成同步痛點

業界成熟的方案是通過阿里巴巴 datax 系統同步 mysql 從庫數據到 hive 表。定期全量或者增量進行同步。需要單獨設置從庫以應對數據同步時對 db 請求的壓力。此外,db 從庫成本高,不可忽視。

增量同步面臨如何解決歷史分區數據修改問題。如果一條數據被更新了,那麼僅通過增量同步,可能會在兩個分區裏分別存在更新前和更新後的數據。用戶需要自行合併更新數據後,才能使用。

1.3 批流一體的存儲介質不統一

業務下游即有時效性要求場景,也有離線 ETL 場景。Flink sql 可以統一流批計算過程,但無統一存儲,仍需要將實時、離線數據分開存儲在 kafka、hdfs。

2. 思考與方案

增量化數據湖建設選型採用 Flink + Hudi。我們需要數據湖的 ACID 事務保障、流批讀寫操作的支持。並且,相對於 Iceberg 的設計,Hudi 對 Upsert 的支持設計之初的主要支持方案在 upsert 能力、小文件合併能力上有明顯優勢。Append 性能在版本迭代中逐漸完善。活躍的社區在陸續迭代增量消費、流式消費能力。綜合對比,最終選擇基於 Hudi 搭建增量數據湖生態。

3. HUDI 內核優化

Hudi cow 模式每次寫入都需要進行合併,有 io 放大問題。Hudi 0.8 起支持了 mor 模式,僅更新部分需要更新的數據文件。

但這會帶來數據質量問題,主要是一些極端情況下的數據丟失、數據重複、數據延遲問題。我們在實際測試生產過程中發現了一些問題,嘗試解決並反饋給社區。

3.1 底層數據可靠性優化

Hudi compaction 大概代碼結構

圖片

StreamWriteOperatorCoordinator 算子中的 notifyCheckpointComplete 邏輯:

CompactionPlanOperator 算子中的 notifyCheckpointComplete 邏輯:

CompactFuncation

3.1.1 log 跳過壞塊 bug,造成數據丟失

圖片

hudi log 文件由一個個 block 塊組成,一個 log 中可能包含多個 deltacommit 寫入的塊信息,每個塊通過 MAGIC 進行分割,每個塊包含的內容如上圖所示。其中 Block Header 會記錄每個塊的 deltacommit  instantTime,在 compact 的過程中會掃描讀取需要合併的塊。HoodieLogFileReader 是用來讀取 log 文件,會將 log 文件轉化爲一個個塊對象。首先會讀取 MAGIC 分隔,如果存在會讀下一位置塊的總大小,判斷是否超過當前文件總大小來界定是否爲壞塊,檢測到壞塊會跳過壞塊內容並創建壞塊對象 HoodieCorruptBlock,跳過的壞塊是從當前 block total size 位置以後檢索 MAGIC(#HUDI#),每次讀取 1 兆大小,一直讀取到下一個 MAGIC 分割爲止,找到 MAGIC 後跳到當前 MAGIC 位置,後面可以接着在讀完整的塊信息。

上述提到壞塊的位置是從 block total size 位置以後檢索的,當文件在極端情況下只寫入 MAGIC 內容還沒有寫入內容任務會出現異常,此時會寫入連續 MAGIC,在讀取的過程中會把 MAGIC 當做 Block Total size 讀取,在檢索的過程中會將下一個正常的塊給跳過,當成壞塊返回。在合併的過程中這部分塊數據就丟失了。這裏跳過的壞塊位置應該從 MAGIC 之後開始檢索而不是 block total size 之後,讀取時出現連續 MAGIC 就不會跳過正常塊。

相關 pr:https://github.com/apache/hudi/pull/4015

3.1.2 compaction 和回滾的 log 合併,導致數據重複

在 HoodieMergedLogRecordScanner 會掃描有效的塊信息,極端情況下 log 文件寫入的塊寫入是完整的,但是 deltacommit 元數據提交前任務失敗,並且 log 中的塊信息也比 deltacommit 元數據的 instant Time 還小,此時會被認爲是有效的塊被合併。因爲 deltacommit 元數據沒有寫入成功,check point 重新啓動將之前的數據進行回放。回放時被分配的 flieId 可能不同,當這部分數據被合併會發現數據重複並且 fileId 不一樣現象。

在掃描 log 是否爲效塊時,如果當前時間線比現有沒歸檔的時間線的元數據小的時候,加入已經歸檔的時間線,進行協同判斷是否爲有效塊,不再只將未歸檔的時間線作爲篩選條件。

相關 pr:https://github.com/apache/hudi/pull/5052

3.1.3 數據非連續場景,最後一次數據不會觸發 compaction,導致 RO 表數據延遲

上述流程中調度 compaction 執行計劃必須在上一次事務提交成功後纔會觸發,如果一段數據都沒有數據寫入,compact 即使滿足時間條件或者 commit 個數條件都不會形成 compact 執行計劃。上游沒有執行計劃下游 Compaction 算子不會觸發合併,用戶在查詢 ro 表,部分數據會一直查詢不到。

去掉必須 commitInstant 提交成功才調度生成 compact 執行計劃的綁定,每次 checkpoint 後都檢查是否滿足條件觸發並生成 Compaction 執行計劃,避免最新數據無法被合併。當沒有新數據寫入的場景下元數據只有 deltacommit.requested 和 deltacommit.inflight 元數據不能直接用當前時間爲 compact instantTime。上游可能隨時寫入數據避免合併沒有寫完的數據,在生成 compact 執行計劃也會檢查元數據的 deltacommit 和 compact 元數據避免出現合併沒寫完的數據。此時 compact instantTime 可以爲最近沒完成的 deltacommit instantTime 和最新完成 compact 之間的時間。這樣生成 compaction 執行計劃元數據下游完成 compact 合併就不會延遲了。

相關 pr:https://github.com/apache/hudi/pull/3928

3.1.4 CompactionPlanOperator 每次只取最後一次 compaction,導致數據延遲

在 StreamWriteOperatorCoordinator 算子 notifyCheckpointComplete 方法中生產 compact 執行計劃,在 CompactionPlanOperator 算子 notifyCheckpointComplete 方法中消費執行計劃下發操作合併數據。在極端情況下如果設置 compact 策略爲一個 commit 就觸發 compact 合併操作,這樣在兩個算子 notifyCheckpointComplete 中會不斷的生產和消費 compact 執行計劃,一旦消費一端的 compact 出現異常任務失敗,這樣會堆積很多的 compact.requested 執行計劃,而每次 CompactionPlanOperator 只會獲取一個執行計劃元數據,這樣數據會產生堆積和延遲,總有一部分執行計劃無法執行。在 0.8 版本取最後一次的 compaction 執行計劃,這樣會新的 commit 一直在合併,老的數據一直無法合併造成丟失數據的假像。後續社區改爲獲取最新的一次,如果下游出現某種原因的失敗導致 compact 執行計劃擠壓,數據延遲會越來越大。

CompactionPlanOperator 獲取所有的 compact 執行計劃轉化爲 CompactionPlanEvent 下發下游,將 CompactFunction 方法改爲默認同步模式。異步模式中底層使用 newSingleThreadExecutor 線程池避免在同步的過程隊列持有大量對象。

相關 pr:https://issues.apache.org/jira/browse/HUDI-3618

3.1.5 log 中沒有符合時間線塊,parquet 文件重新生成,之前記錄丟失

圖片

compact 執行計劃包含多個 HoodieCompactionOperation,每個 HoodieCompactionOperation 包含 log 文件和 parquet 文件,但可能也只有 logFlies 說明只有新增數據。在做 compact 合併時會獲取 HoodieCompactionOperation 中的 log 文件和 parquet 文件信息,構建 HoodieMergedLogRecordScanner 將 log 文件中符合合併要求的塊數據刷入 ExternalSpillableMap 中,在 merge 階段根據 parquet 數據和 ExternalSpillableMap 的數據比較合併形成新的 parquet 文件,新文件的 instantTime 爲 compact 的 instantTime。

在 HoodieMergedLogRecordScanner 中掃描 log file 文件中,需要符合時間線要求的 log 塊信息,如果沒符合要求的塊被掃描到,後續的 merge 操作不會運行,新的 parquet 文件版本也沒有。下一次 compact 的執行計劃獲取的 FileSlice 只會有 log 文件而沒 parquet 文件,在執行 compact runMerge 會當新增操作寫入,沒有和之前合併 parquet 數據合併之前的數據全部丟失,新 parquet 文件中只有下一次 log 產生的數據,導致數據丟失。

不論掃描是否符合要求,塊信息都強制寫入新的 parquet 文件,這樣下一次 compact 合併執行計劃中獲取 FileSlice 都會有 log 文件和 parquet 文件,可以正常進行 handleUpdate 合併,保證數據不丟失。

3.2 Table Service 優化

3.2.1 獨立的 table service  -- compaction 外掛

背景:

目前社區提供了多種表服務方案,但實際生產應用中,尤其是平臺化過程中,會面臨多種問題。

**原方案分析: **

爲了對比各方案的特點,我們基於 hudi v0.9 提供的表服務進行分析。

首先,我們將表服務拆解爲調度 + 執行 2 階段的過程。

根據調度後是否立刻執行,可以將調度分爲 inline 調度 (即調度後立馬執行) 與 async 調度 (即調度後只生成 plan,不立馬執行) 兩種,

根據執行調用的方式,可分爲 sync 執行 (同步執行)、async 執行 (通過相應 service 異步執行,如 AsyncCleanService) 兩種,

此外,相對於寫入 job,表服務作爲一種數據編排 job,本質上是區別於寫入 job 的,根據這些服務是否內嵌在寫入 job 中,我們將其稱爲內嵌模式和 standalone 模式兩種模式。

以下,對社區提供的 Flink on Hudi 的多種表服務方案進行分析:

方案一 內嵌同步模式:在 ingestion 作業中,inline schedule + sync compact/cluster & inline schedule clean + async clean

圖片

該模式的問題很明顯:每次寫入後,立即通過內聯調度並執行 compact 作業,完成後纔開始新的 instant,在流程上即直接影響數據寫入的性能,在實際生產中不會採用。

方案二  內嵌異步模式:在 ingestion 作業中, async schedule + sync/async compact/cluster && inline schedule + sync/async clean

圖片

不同於方案一,該模式將資源消耗較大的 compact/cluster 等操作異步化至專門算子處理,ingestion 流程僅保留了輕量的調度操作,對 clean 操作增加了同步 / 異步選擇。

但依舊存在缺點,ingestion 作業的流式處理,疊加上表服務的間歇性批處理,對資源消耗曲線新增造成很多衝激毛刺,甚至是很多作業 oom 的元兇,使得作業配置時不得不預留足夠多資源,造成高優先資源閒時浪費。

方案三  standalone 模式:ingestion 作業 + compact/cluster 作業組合

目前社區該方案尚不完善,其中寫入作業流程參考方案一,對 compact/cluster/clean 等 action 提供單獨的編排作業,以 compact 爲例,HoodieFlinkCompactor 的流程如下:

圖片

該方案通過抽象出單獨的數據編排作業,從作業級別隔離使用,克服了方案二的弊端,從平臺化的角度看,符合我們的需求。

選定方向後,就需要面對該方案目前的諸多缺陷,包括如下幾點:

  1. 在單 writer 模型下,編排作業的 schedule 模式不可用,會有的 timeline 一致性問題,導致數據丟失。

相對於 ingestion 作業,compact/cluster job 本質上是另一個 writer,多 writer 處於併發下,在無鎖狀態下 timeline 一致性是無法保證的,極端情況下會出現丟數據的問題,如下圖所示:

圖片

  1. 寫入作業和編排作業沒有 standalone 模式下的協同能力。

首先,是 clean action 的問題,雖然 hudi 內核能力已經健全,但目前表服務層面僅暴露出 inline 調度 + 執行的方法,導致無論寫入還是編排作業都會包含 clean,架構上過於混亂與不可控;

其次,編排作業 CompactPlanSource 與內嵌模式寫入作業的 CompactPlan 是兩個不同算子,dag 未保持線性,不利於不同模式的切換;

此外,還存在作業編排調度不具備接收外部策略的能力,無法進行平臺化,集成公司智能調度、專家診斷等系統等問題。

優化方案:

首先,解決 timeline 一致性問題,目前 hudi 社區已經有 occ(樂觀併發控制) 運行模式的支持,引入了分佈式鎖 (hive metastore、zookeeper)。但 flink 模塊的相關支持尚在初始階段中,我們內部也在進行相應應用測試,但發現距生產應用尚有諸多問題需要解決。

由於調度操作本身較爲輕量,本期暫時把表服務 scheduler 保留在寫入作業中,仍舊保持以單 writer 模型運行,以規避多 writer 問題。

其次,針對性優化 hudi 底層的表服務調度機制,將 clean action 也拆解爲調度 + 執行的的使用範式,通過 inlineScheduleEnabled 配置,默認爲 true 進行後向兼容,在 standalone 模式下,inlineScheduleEnabled 爲 false。

然後,重構寫入作業與編排作業,完善對 3 種運行模式的支持。具體包括:

對寫入作業的表服務 scheduler 優化,提供 DynamicConfigProvider 支持外部策略集成;重寫 Clean 算子,支持多種運行模式的切換;

對編排作業,重構作業使作業 dag 與內嵌模式線性一致;支持單 instant,全批以及 service 常駐模式;優化寫入與編排作業間的配置傳遞,使其達到託管任務的要求任意啓停;

重構後的寫入與編排作業(以 compact 爲例)如下:

圖片

3.2.2 metaStore 解決分區 ready 問題

增量化數倉,需要支持近實時業務場景分鐘級數據可見性,需要在寫入數據時就創建 hiveMetaStore 分區信息。

而離線依賴建 hiveMetaStore 分區即數據完整的語義。如何解決是一個問題。

圖片

任務調度,依賴調度系統。在數據 ready 後,通知調度系統,可以進行下游任務調度。與建分區討論關係不大。

圖片

離線依賴 hiveMetaStore 問題,我們通過改造 hiveMetaStore ,賦予分區一個新的 commit 屬性,若數據未 ready 則 commit 爲 false,分區不可見。保持原有語義不變。

對於 adhoc 來說,帶 hint includeUnCommit=true 標識查詢,可在數據未完成時查詢到數據。

對於離線來說,當分區的 commit 屬性被置爲 true,才能查到分區。滿足分區可見即數據完整的語義。

圖片

對於 flink job 來說,在數據第一次寫入時,創建分區,並賦予分區 commit=false 標籤,使得 adhoc 可以查到最新寫入的數據。

在處理完分區數據,判斷分區數據 ready 後,更新分區 commit=true。此時,數據 ready,分區對離線可見,滿足 “分區可見即數據完整” 的語義。

4. 場景落地實踐

4.1 增量化數倉

傳統數倉 TL 上一個任務完成後,才能開始下一個任務。即使是小時分區,層級處理越多,數據最終產出時效性越低。

圖片

採用增量計算方式,每次計算讀取上一次增量。這樣當上遊數據完整後,只需要額外計算最後一次增量的時間即可完成,可以提升數據完成時效性。

同時在第一批數據寫入到 ods 層後就可增量計算至下一層直至產出,數據即可見,大大提升數據可見時效性。

具體實現方式是:通過 hudi source,flink 增量消費 hudi 數據。支持數倉跨層增量計算,如 ods → dwd → ads 都使用 hudi 串聯。支持同其他數據源做 join、groupby,最終產出繼續落 hudi。

圖片

對於審覈數據等有較高時效性訴求,可以採用此方案加速數據產出,提升數據可見時效性。

4.2 CDC 到 HUDI

4.2.1 面臨的問題

  1. 原有 datax 同步方式 簡單來說就是 select * ,對 mysql 來說是慢查詢,有阻塞業務庫風險,所以需要單獨開闢 mysql 從庫滿足入倉需求,有較高的 mysql 從庫機器成本,是降本增效的對象之一。

  2. 原有同步方式不能滿足日益增加的時效性需求,僅能支持天 / 小時同步,無法支持到分鐘級數據可見粒度。

  3. 原有同步方式落 hive 表,不具備 update 能力,如果一條記錄經過 update,則可能在兩個以 mtime 爲時間分區都存在此數據,業務使用還需要做去重,使用成本較高。

4.2.2 解決思路

通過 flink cdc 消費 mysql 增量 + 全量數據,分 chunk 進行 select,無需單獨爲入倉開闢 mysql 從庫。

落 hudi 支持 update、delete,相當於 hudi 表是 mysql 表的鏡像。

同時支持分鐘級可見,滿足業務時效性訴求。

4.2.3 整體架構

圖片

一個 db 庫用一個 flink cdc job 進行 mysql 數據同步,一張表的數據分流到一個 kafka topic 中,由一個 flink job 消費一個 kafka topic,落到 hudi 表中。

4.2.4 數據質量保障 - 不丟不重

flink cdc source

簡單來說就是:全量 + 增量 通過 changlog stream 方式將數據變更傳遞給下游。

全量階段:分 chunk 讀取, select  * + binlog 修正,無鎖的將全量數據讀出並傳遞給下游。

增量階段:僞裝成爲一個從庫,讀取 binlog 數據傳遞給下游。

圖片

flink

通過 flink checkpoint 機制,將處理完成的數據位點記錄到 checkpoint 中,如果後續發生異常則從 checkpoint 可保證數據不丟不重。

kafka

kafka client 開啓 ack = all ,當所有副本都接收到數據後,才 ack,保證數據不丟。

kafka server 保證 replicas 大於 1,避免髒選舉。

這裏不會開啓 kafka 事務(成本較高),保證 at least once 即可。由下游 hudi 做去重。

hudi sink

hudi sink 同樣基於 flink checkpoint 實現類似二階段提交方式,實現數據寫 hudi 表不丟失。

通過增加由 flink cdc 生成的單調遞增的 “版本字段” 進行比較,單條記錄版本高的寫入,低的捨棄。同時解決去重和亂序消費問題。

4.2.5 字段變更

mysql 業務庫進行了字段變更、新增字段怎麼辦?

面臨的問題

  1. 數據平臺新增字段有安全准入問題,需要用戶確認,是否需要加密入倉。

  2. 字段類型變更,需要用戶確認下游任務是否兼容。

  3. hudi 的 column evalution 能力有限,比如 int 轉 string 類型就無法支持。

解決思路

與 dba 約定,**部分變更支持自動審批(如新增字段、int 類型轉 long 等)**通過。並且異步通知 berserker(b 站大數據平臺系統),由 berserker 變更①hudi 表信息,以及②更新 flink job 信息。

超出約定變更部分(如 int 轉 varchar 等),走人工審批,需要 berserker 確認① hudi 表變更完成、②寫入 hudi 的 flink job 變更完成後,再放行 mysql ddl 變更工單。

我們改造了 Flink cdc job,可以感知 mysql 字段變更,向下遊 kafka 發送變更後數據,不受審批約束,將變更後的數據暫存在 kafka topic 中,此 kafka 對用戶不可見。下游寫入 hudi 任務不變更照常消費,不寫入新增字段,用戶確認數據可入倉後,再從 kafka 回放數據,補充寫入新增字段。

方案

圖片

mysql 字段類型和 hudi 類型存在不對應情況。flink job 消費 kafka 定義字段類型和 hudi 表定義字段不對應,需要 berserker 在拼 flink sql 時候,額外拼入轉換的邏輯。

Flink cdc sql 自動感知字段變更改造

flink cdc 原生 sql 是需要定義 mysql 表的字段信息的,那麼當 mysql 出現字段變更時,是必然無法做到自動感知,並傳遞變更後數據給下游的。

原生 flink cdc source 會對所有監聽到的數據在反序列化時根據 sql ddl 定義做 column 轉換和解析,以 row 的形式傳給下游。

我們在 cdc-source 中新增了一種的 format 方式:changelog bytes 序列化方式。該 format 在將數據反序列化時在不再進行 column 轉換和解析,而是將所有 column 直接轉換爲 changelog-json 二進制傳輸,外層將該二進制數據直接封裝成 row 再傳給下游。對下游透明,下游 hudi 在消費 kafka 數據的時候可以直接通過 changelog-json 反序列化進行數據解析。

並且由於該改動減少了一次 column 的轉換和解析工作,通過實際測試下來發現除自動感知 schema 變更外還能提升 1 倍的吞吐。

4.3 實時 DQC

dqc kafka 監控存在幾個痛點:

架構

圖片

將 kafka 數據 dump 到 hudi 表後,提供 dqc 數據校驗。不影響生產秒級 / 亞秒級數據時效,又可以解決以上痛點。

hudi 提供分鐘級的監控,可以滿足實時 dqc 監控訴求。時間過短,可能反而會因爲數據波動產生誤告警。

hudi 以 hive 表的形式呈現,使得實時 dqc 可以和離線 dqc 邏輯一致,可以很容易的進行同環比告警,易於開發維護。

實時 DQC on Hudi 使得實時鏈路數據變得更易觀測。

4.4 實時物化

背景

有些業務方需要對實時產出的數據進行一個秒級的聚合查詢。

如實時看板需求,需要一分鐘一個數據點來展示 DAU 曲線,等多指標聚合查詢場景。

同時結果數據要寫入 update 存儲,實時更新。

難點

在較大數據規模下,基於明細產出幾十上百個聚合計算結果,要求秒級返回,幾乎不可能。

目前公司內支持 update 類型的存儲主要是 redis/mysql,計算結果導入意味着數據出倉,脫離了 hdfs 存儲體系,同時也要使用對應的 client 進行查詢,開發成本較高。

現有的 hdfs 體系內計算加速方案如物化、預計算大都是基於離線場景,對實時數據提供物化查詢能力較弱。

目標

支持 hdfs 體系內的 update 存儲。讓數據無需出倉導入外部存儲,可以直接使用 olap 引擎高效查詢。

通過 sql 就可以簡單定義實時物化表。查詢時通過 sql 解析,命中物化表查詢則可秒級返回多個聚合查詢結果。

方案

基於 flink + hudi 提供實時物化的能力。

通過 sql 自定義物化邏輯到基於 hudi 的物化表。將明細數據寫入明細 hudi 表中,並拉起一個 flink job 進行實時聚合計算,將計算結果 upsert 到物化的 hudi 表中。

在查詢時通過 sql 解析,如果規則命中物化表,則查詢物化表中的數據,從而達到加速查詢的效果。

圖片

5. 未來展望

5.1 HUDI 內核能力增強及穩定性優化

Hudi timeline 支持 樂觀鎖解決併發衝突,支持多流同時寫一張表。從底層支持新增數據和回補數據同時寫入 hudi。

支持更豐富的 schema evalution。避免重新建表、重新導入數據的繁重操作。

Hudi meta server,統一實時表離線表,支持 instance 版本等信息。支持 flink sql 上使用 time travel,滿足取數據快照等訴求

Hudi manager 根據不同的表按需調配 compaction、clustering、clean。用於離線 ETL 的表,低峯期進行 compaction,資源上削峯填谷。對於近線分析的表,積極 compaction 以及 clustering,減少查詢攝取文件數,提升查詢速度。

5.2 切換弱實時場景從 Kafka 到 HUDI

在弱實時場景上實現流批統一存儲。Kafka 對於突發流量以及拉取歷史數據達到性能瓶頸時,難易緊急擴容分攤讀寫負載。可以將分鐘級的弱實時使用場景,從 Kafka 切換到 HUDI,利用 HUDI 可讀取增量數據的能力,滿足業務需求,並且 HUDI 基於分佈式文件系統可快速擴容副本的能力,滿足緊急擴容的需求。


嗶哩嗶哩技術 提供 B 站相關技術的介紹和講解

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/pjZAjgHF-HdZNjr7LfY9JA