ClickHouse 在唯品會 OLAP 系統的實踐

供稿:王新春、王玉、王康、徐其民

01

OLAP 在唯品會演進迭代

1.1 Presto/Kylin 在唯品會的使用

Presto 作爲當前唯品會 OLAP 主力軍,經歷了數次架構和使用方式演進。當前階段,我們 Presto 共有物理機 500 多臺,服務於 20 多個線上業務,日均查詢高峯可達 500 萬次,每天讀取和處理接近 3PB 數據。

在業務應用上 Presto 從最初只有魔方、自助分析兩個業務使用,發展到現在接入 20 個業務,基於業務使用實踐,每個階段我們對 Presto 都有相應改進。

1.1.1 集羣統一接入管控

定製化 Presto 管理工具 Spider/Nebula(新版),做到多集羣路由,集羣 HA,負載均衡,查詢回溯,全鏈路監控等。

我們定製了 Presto Server 和 Client。用自研管理工具 Spider/Nebula 從 Presto 暴露的 API 和系統表裏獲取到節點和查詢信息,一方面將查詢落入 mysql,通過 etl-job 落入 hive 便於存儲和分析; 一方面根據集羣查詢數和節點信息來給該集羣打分。用戶可以事先在 Spider 裏給 Presto 集羣劃分 group,同一個 group 的集羣 coordinator 信息被我們統一保存到 zk 中。這樣用戶查詢 presto 時,在本地調用定製 presto-client 或者 presto-jdbc 來連接 presto 集羣之際,就會通過 zk 獲取當前業務所屬 group 中打分最低的集羣 (負載最低) 進行連接查詢。當有集羣處於維護狀態或者連不通時,路由會自動感知摘除這個節點的 coordinator 信息,這樣查詢將不會打到這個集羣上,同步保障了 Presto 的 HA。

1.1.2 Presto 容器化

Presto 上雲接入 K8S,可以智能擴縮容 Presto 集羣,做到資源合理調度、智能部署等功能。標準配置每個 Presto 的 Worker 40vCPU/110G 內存,每個集羣 100 個 Worker 節點。

500 多臺物理機分批改造,讓用戶業務無感,隨之帶來的提升也很明顯。

★後臺 presto-k8s 集羣是配置完全相同、算力相同的集羣。用戶的業務只需要在 client 裏配置一個虛擬 IP,我們就會使用路由功能爲他分配一到 N 個集羣。不同業務允許交叉和隔離。這種操作完全是動態的,不需要重啓集羣,算力也是均衡的。

★在查詢比較集中,大促、流量比較大時,我們可以快速合併集羣,動態刪除部分集羣,讓其他集羣快速擴容 worker。用戶在使用過程中是無感知的。

★部署 Presto on k8s 變得十分便捷。我們只需要在 k8s 管理平臺上點擊頁面,填入集羣名稱,幾分鐘內就可以拉起一個標準化的 Presto 集羣,域名跟集羣名有規則對應。這樣刪除集羣、新增集羣代價非常小。

★由於網絡、內存、反親和性部署使得整個 Presto-k8s 集羣處於相對均衡穩定狀態,集羣穩定性得到了大大的提升。我們觀察了物理機的 CPU、內存等指標,機器變得飽和且穩定。

★安裝包和配置分離、k8s 自動部署模式,使得所有集羣的升級變得簡單、快速、易操作。

自此,唯品會 Presto 走上了集羣全面容器化的階段。

1.2 Clickhouse 的引入

隨着業務對於 OLAP 要求越來越高,部分業務場景 Presto 和 Kylin 無法滿足其需求。比如百億 JOIN 百億(Local Join)的低延遲實時數據場景和對中等 qps 的查詢平均響應時間要求低於 1 秒的 OLAP 使用場景等,我們把目光轉向” 大家都說快 “的 Clickhouse。

1.3 ClickHouse 在業務的部署架構和模式

我們在使用中發現 Clickhouse 有如下優勢:

★大寬表查詢性能優異,其主要分析都是大寬表的 sql 聚合。ClickHouse 整個聚合耗時都非常小,性能好,並且具有量級提升。

★單表性能分析以及分區對其的 join 計算都能取得很好的性能優勢。比如百億數量級 join 幾十億數量級的大表關聯大表的場景,在 24C 128G * 10 shard (2 副本) 通過優化取得了 10s 左右的查詢性能。

目前我們支持的業務主要是實驗平臺、agamotto 監控、OLAP 查詢日誌。

02

實時數據入 ClickHouse

2.1 Flink 寫 ClickHouse

2.1.1 Flink 寫入分佈式表

最初我們使用分佈式表寫入是爲了快速驗證一些功能和性能,比如分佈式表在建表時支持 sharding_key 和數據寫入支持各種策略,分佈式表的建表示例:

Distributed(logs, default, hits[, sharding_key[, policy_name]])

爲了實現 join 時完全 local join,我們在寫入分佈式表時,sharding_key 就是 join 的 column,policy_name 設定爲 murmurHash3_64(sharding_key),實現起來相對比較簡單。爲了支持寫入 HA ,我們配置連接的 URL 有多個,如果某個 host 出現連接,會重連另外一個 host。下面附上一些實踐小經驗:

★ck 適合小併發 大批次寫入,否則會報錯:典型的 merge 速度跟不上寫入;

★本地表 url 填寫的地址只需要一個(實際會根據這個地址查詢集羣分片信息,根據 hash 策略做哈希),分佈式表可以一個或多個;

★如果是本地表寫入推薦基於字段的一致性哈希, 可以相對做到數據均衡,如果是分佈式表寫入推薦至少 2 個節點的分佈式表寫入 。

整體寫入架構如下圖所示:

2.1.2  Flink 寫入本地表

Flink 寫入分佈式表能完成功能邏輯,但在性能和可靠性上還是略微有差異:

★由於數據是由 ClickHouse 節點做二次分發,會佔用寫入分佈式表節點比較多的內存;

★集羣節點異常後,導致分佈式表無法準確把數據分發到新節點。

基於以上問題,我們在 Flink 的 ClickHouse Connector 端做了相應改造,支持寫入本地表的相關功能。主要流程如下:

★根據庫名和表名查 system.tables 獲取表的 engine 信息 (SELECT engine_full FROM system.tables WHERE database = ? AND name = ?)

★解析 engine 信息,獲取集羣名、本地表名;

★根據集羣名,查 system.clusters 獲取集羣分片節點信息 (SELECT shard_num, host_address FROM system.clusters WHERE cluster = ?),得到 TreeMap<shard_num, shard host list > 這樣的配置信息

★根據 shard 配置信息,初始化 List, ClickHouseShardExecutor 內部初始化時會隨機 shard group 裏的 url 來連接;負責將 flink 內部的數據結構 RowData 添加到 batch buffer 裏,並根據 sink.buffer-flush.max-rows buffer 大小和 sink.buffer-flush.interval flush 間隔來觸發 flush,將一批數據真正的 sink 到 ClickHouse Server 端。

★當 shard 內,個別節點負載比較高或查詢有熱點時,會導致 batch flush 失敗,這個時候需要做異常時重連操作。

究竟某條數據過來 sink 到哪個 shard,我們定義了 RowData 到 ClickHouseShardExecutor 的分區接口,並實現了 3 種分區策略 round-robin 輪訓 、random 隨機分區、field-hash 基於字段的一致性哈希等策略,通過 sink.partition-column 參數指定分區字段,保證相同分區字段哈希到同 shard 內。整體架構如下圖所示:

Flink 數據寫入的時序圖可以參考如下所示:

03

實驗平臺數據自助分析

3.1 實驗平臺簡要介紹

唯品會實驗平臺是通過配置多維度分析和下鑽分析,提供海量數據的 A/B-test 實驗效果分析的一體化平臺。一個實驗是由一股流量(比如用戶請求)和在這股流量上進行的相對對比實驗的修改組成。實驗平臺對於海量數據查詢有着低延遲、低響應、超大規模數據 (百億級) 的需求。

3.2 Flink+ClickHouse 整體架構

3.2.1 FLINK SQL + CK 在實驗平臺業務場景

我們實現了 flink sql redis connector,支持 redis 的 sink 、source 維表關聯等操作,可以很方便的讀寫 redis,實現維表關聯,維表關聯內可配置 cache ,極大提高應用的 TPS。通過 FLINK SQL 實現實時數據流的 pipeline, 最終將大寬表 sink 到 CK 裏,並按照某個字段粒度做 murmurHash3_64 存儲,保證相同用戶的數據都存在同一 shard 節點組內。

3.2.2 ClickHouse 百億級數據 join 的解決方案

在實際應用場景中,我們發現一些流量的特定場景。我們需要拿一天的用戶流量點擊情況,來 join A/B TEST 的日誌,用以匹配實驗和人羣的關係。這就給我們帶來了很大挑戰,兩張大分佈式表 join 出來的性能也非常不理想。

分桶 join 字段

在這種情況下,我們用了類似於分桶概念。首先把左表和右表 join 的字段,建表時用 hash 來落到不同的機器節點,murmurHash3_64(mid)。 

如果寫入分佈式表,在建表時指定 murmurHash3_64 字段,如果是寫本地表,在 flink 寫入段路由策略里加入 murmurHash3_64 策略即可。

在查詢時,使用分佈式表 join 本地表,來達到想要的效果。

這樣分桶後 JOIN 的結果,是等於分佈式表 JOIN 分佈式表,且處理的數據量,只有總數據量 /(集羣機器數 / 副本數)。

在寫 SQL 的時候,我們還遇到一個坑,即在左表 JOIN 右表的過程中,如果左表是子查詢,則分佈式規則不生效,查詢出的結果也遠遠小於預期值,等於本地表 JOIN 本地表。右表是子查詢則沒有關係,因爲右表本來就是本地表,對分佈式沒有要求。

如圖所示:

3.3. 增量數據更新場景

數據去重方案比較

訂單類數據需要像寫入 kudu 一樣,做去重,由於流量數據都實時寫入數據,爲了訂單數據和流量數據做 join, 就需要對訂單數據做去重,由於訂單數據是有生命週期的,從產生之後,會不停的 update ,下面討論基於 CK 各種 MergeTree engine 的去重方案優缺點。對於實驗平臺的場景,需要選用一種方案,既能夠實時去重,又要保證查詢歷史數據的結果要固定下來,又不能影響歸因準確率,不能忽大忽小,避免對用戶產生困惑。

去重方案總結:a.ReplacingMergeTree 數據無法 merge, 忽大忽小,不能用。b.ReplicatedReplacingMergeTree 可以做去重,對 hash 字段不變化的情況下適合。c. remote 表 查詢複雜,對性能有影響,存在副本的可靠性問題。d.flink 方案規避去重 和 hash 字段變化的問題。

3.4 . Flink 寫入端遇到的問題及優化

問題 1:Too many parts (328). Merges are processing significantly slower than inserts.

原因:剛開始使用 clickhouse 的時候都有遇到過該異常,出現異常的原因是因爲 MergeTree 的 merge 的速度跟不上目錄生成的速度, 數據目錄越來越多就會拋出這個異常, 所以一般情況下遇到這個異常,降低一下插入頻次就可以。

解決: 1. 服務端參數調整,將 parts_to_throw_insert 參數調大值 10000,默認值 300 對我們大流量的應用場景來說較小。

  1. 客戶端參數調整,調小 ck sink 並行度  小併發 大批次寫入。小併發,我們通過修改執行計劃調整 flink 算子並行度,如下圖 kafka source 的並行度 = 72 ck sink 並行度 = 24  大批次,例如調整 connector 參數'sink.buffer-flush.max-rows' = '200000','sink.buffer-flush.interval' = '60s'    20W 條記錄或 60s 產生一個 batch 批量寫入 CK

  1. 分區時間字段 event_time 選擇單調遞增的時間,流量數據選擇 nginx 日誌時間 time_local,曝光數據不適合選擇忽大忽小的 activity_starttime , 訂單數據選擇 add_time 創建時間。

問題 2:Unexpected NULL value of not Nullable type Int64

原因:ck 建表時如果去掉 nullable 限制,插入時,就必須給一個確定的值,否則會 flush 時執行失敗,影響 flink sql  job 穩定性。

解決:1. ck 建表時每個字段加上默認值,或建表時加上 Nullable 約束(不建議,但是主要這種約束不要太多,主要會佔用過多的存儲空間,也會降低了查詢效率)。

2.flink sql 在處理數據時,加上 coalesce 空值處理函數。

04

ClickHouse 查詢優化

4.1.schema 定義優化

 1CREATE TABLE goods_click_app_h5_ck_hm on cluster ck_cluster (
 2`goods_id` Int64 default -9999,
 3`app_version` String default '-9999',
 4....
 5`dt` Date,
 6`exp_page_id` Int32 default -9999
 7)
 8ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/goods_click_app_h5_ck_hm', '{replica}')
 9PARTITION BY (dt, exp_page_id)
10ORDER BY (activity_type)
11TTL dt + INTERVAL 32 DAY
12SETTINGS index_granularity = 8192
13

① 選擇有副本的 merge 引擎

② 按 dt 作爲分區,分區內的 part 文件進行異步合併

③ 按照字段 order by 排序,提升查詢性能

④ 設置 TTL 過期時間

⑤ index_granularity 設置索引粒度爲 8192 行一個查找單元

4.2. 常用參數調整

05

物化視圖

5.1. 物化視圖對常用維度組合進行加速

使用 ReplicatedSummingMergeTree 引擎, 相同的數據長度集合,物化視圖和明細表查詢效率對比(相差將近 10-100 倍)。

物化視圖的創建

 1CREATE MATERIALIZED VIEW vip_sirius.multi_data_1000445_491_local ON cluster ck_cluster
 2( date Date ,
 3  `timestamp` UInt32 ,
 4  `network_fail_total` Nullable(Int64) ,
 5  `isp` String default '-null' ,
 6  `pv_total` Nullable(Int64),
 7  `service_success_total` Nullable(Int64),
 8  `response_t_total` Nullable(Float64),
 9  `response_t_count_total` Nullable(Int64),
10  `business_success_total` Nullable(Int64),
11  `time_cnt_100` Nullable(Int64),
12  `time_cnt_200` Nullable(Int64),
13  `time_cnt_500` Nullable(Int64),
14  `time_cnt_5000` Nullable(Int64),
15  `time_cnt_more_5000` Nullable(Int64)
16)
17ENGINE =ReplicatedSummingMergeTree('/clickhouse/tables/{layer}-{shard}/multi_data_1000445_491_local', '{replica}')
18PARTITION BY date
19ORDER BY (timestamp,isp)
20TTL date + toIntervalDay(14)
21SETTINGS index_granularity = 8192,storage_policy = 'hotdata'
22AS select
23    CAST(timestamp AS Date) AS date,timestamp ,
24    sum(network_fail_total) AS network_fail_total,
25    `isp` ,
26    sum(pv_total) AS pv_total,
27    sum(service_success_total) AS service_success_total,
28    sum(response_t_total) AS response_t_total,
29    sum(response_t_count_total) AS response_t_count_total,
30    sum(business_success_total) AS business_success_total,
31    sum(time_cnt_100) AS time_cnt_100,
32    sum(time_cnt_200) AS time_cnt_200,
33    sum(time_cnt_500) AS time_cnt_500,
34    sum(time_cnt_5000) AS time_cnt_5000,
35    sum(time_cnt_more_5000) AS time_cnt_more_5000
36FROM vip_sirius.multi_data_1000445_local
37GROUP BY timestamp ,isp
38Distributed table:
39CREATE TABLE vip_sirius.multi_data_1000445_491
40ON cluster ck_cluster AS vip_sirius.multi_data_1000445_491_local
41ENGINE = Distributed('ck_cluster','vip_sirius','multi_data_1000445_491_local',rand())
42

查詢明細表

語句:

select sum(pv_total),toDateTime(timestamp) from multi_data_1000445 where date>='2021-02-10' and date<='2021-03-03' group by timestamp order by timestamp desc ;

查詢物化視圖

語句:

select sum(pv_total),toDateTime(timestamp) from multi_data_1000445_491 where date>='2021-02-10' and date<='2021-03-03' group by timestamp order by timestamp desc ;

5.2 物化視圖的問題

我們在使用物化視圖的過程中,也遇到一些問題。比如:

1、物化視圖維度比較多的時候,生成的結果表也會指數級增加。我們這些表最多的一個庫,有着 1500 多張計算各種維度的物化視圖,且無法進行表級合併。這樣在管理、監控表的時候,帶來一些麻煩。

2、物化視圖維度增多的時候,寫入數據將會帶來不小的消耗,在 CPU,內存等層面都會有更多的消耗,這樣在分配集羣和角色資源的時候,會擾亂原有的分配計劃。

總而言之,物化視圖是一把雙刃劍。在帶來速度加速效果明顯的同時,也會帶來資源、管理上的一些弊端,用戶使用的時候要把握好這些優缺點。

06

展望

6.1 ClickHouse 和 Spark/Presto 融合

HyperLogLog 是大數據分析常用的去重計算分析方法,在我們之前的應用中已經打通了 Spark,Presto 的 HyperLogLog 對象,即在一種引擎裏生產的 HyperLogLog 對象,在其他引擎均可以解析、計算與分析。

未來我們會打通 Clickhouse 的 HyperLogLog 的數據對象,將 C++ 和 JAVA 做統一序列化和反序列化。

最終達到在 hadoop 中通過 Spark 和 Presto 等引擎 ETL 出的 HyperLogLog,導入 Clickhouse 也可以直接用 Clickhouse 的語法查出。DWS/ADS 層可以共享數據,使得 ClickHouse 在 ADS 層數據可以加速。

6.2 業務使用

後續我們有 push 效果分析,廣告投入效果等應用場景,陸陸續續接入的 Clickhouse。

我們也在探索使用 RoaringBitmap 來進行字段長度不一的  user_id,push_id 等各種 ID 的精確去重、留存分析等。

RoaringBitmap:http://roaringbitmap.org/

6.3 ClickHouse 底層架構迭代演進

隨着業務的推進和發展,我們之後會通過以下幾個方向,繼續優化 Clickhouse 在唯品會的推進和使用。

6.3.1 存算分離

我們都知道 Clickhouse 是自帶本地化存儲的 OLAP 引擎,本地化存儲在海量數據請求的情況下,會有 I/O 速度受限,擴容複雜 (需修改 clickhouse 的存儲策略),不能按需自動擴縮容,Clickhouse 不好上 AI 雲平臺等諸多限制。

所以我們有計劃將這部分存儲打到雲上,實現存算分離,可以做到用網絡的傳輸的高速率打破本地 I/O 的讀寫瓶頸,按照需求自動擴縮容雲端存儲,將 Clickhouse 上到我們自己的 AI 雲平臺便於管理。

我們將會從修改存儲策略接口代碼,多種雲存儲或者分佈式存儲來對數據進行分類,不同熱度、容量的數據對應不同的存儲策略。

6.3.2 寫入優化

目前我們寫入主要是寫入分佈式表,將來會考慮測試優化寫入分佈式表的性能和 hash 功能, 來支持更高的寫入 tps。

6.3.3 接入管控

目前我們屬於 Clickhouse 業務推廣階段,對 Clickhouse 使用方管控較少,也沒做過多的存儲、計算、查詢角色等方面的管控。數據安全乃大數據重中之重,我們將在接下來的工作中逐步完善這一塊。

6.3.4 SQL 管控

在 Clickhouse 的新版中,已經加入了 RBAC 的訪問控制管理,官方也推薦使用這種方式。

參考:https://clickhouse.tech/docs/en/operations/access-rights(點擊閱讀原文)

我們將會:

1、用 default 創建一個 root 賬戶,作爲管理者賬戶。

2、所有授權的操作通過 root 賬戶 GRANT 完成。

3、禁用 default 用戶的管理功能。

6.3.5 資源管控

在資源層面,我們會結合存算分離,給不同的業務分配不同的用戶,不同的用戶在雲平臺上申請的存儲資源有限。且會對每個用戶的存儲進行價值計算。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Y5uFqjOUptG6D8AloXRtAQ