ClickHouse 雲原生使用手冊

背景

爲適應越來越大的實時 OLAP 需求,我們引入了 ClickHouse。在探索 ClickHouse 和雲原生結合的路上,我們逐漸總結出一套 ClickHouse 的使用方案,該方案有以下特點:

ClickHouse 使用 C++ 語言編寫,引入向量化執行引擎,在存儲和計算方面進行了大量的優化,是目前業內性能最高的 OLAP 引擎。查詢分析類計算比 MySQL 快 100 倍以上,更詳細的性能對比可查看 ClickHouse 官方的 benchmark

ClickHouse 支持冷熱分離的存儲策略,我們將熱數據存儲在 EBS 上,將大部分冷數據存儲在 S3 上,獲得了不限量的低成本存儲

ClickHouse 支持計算資源的水平擴展,我們將 ClickHouse 部署在可以伸縮的 Nomad Clients 上,並通過使用 S3 獲得了不限量的存儲空間

ClickHouse 的 ReplicatedMergeTree 引擎支持通過副本表的形式將數據冗餘地存儲在多個節點上,來提高可用性,同時我們引入了 Coodinator Node 來保證各個節點上計算任務均衡,保證我們的集羣面對併發的場景仍然有比較可靠的可用性

方案

本方案由兩組 ClickHouse 進程組成,分別爲負責協調和調度讀寫操作的 Coodinator Node,與負責數據讀寫的 Data Node 組成,命名的方式參考了 ElasticSearch

(ClickHouse 雲原生部署架構圖)

此架構有如下幾個特點:

1. 存在一組可水平擴展的 Data Node,用來執行數據的讀、寫操作

2. 存在一組可水平擴展的 Coodinator Node,扮演着代理的角色,所有通過 Data Node 進行的數據操作,包括建表(DDL)、對數據的讀寫都需要通過 Coodinator Node 進行

3. Coodinator Node 和 Data Node 都依賴同一組 ZooKeeper 服務進行協調

4. 因爲 ClickHouse 支持通過 TCP 進行連接的 Client,所以我們通過 NLB 對 Coodinator Node 的連接進行負載均衡

5. Data Node 既包含 EBS 的存儲也包含 S3 的存儲,通過冷熱分離的功能,把比較久遠的數據放在 S3 上以縮減成本

接下來我們會詳細介紹我們使用的集羣模式,各個節點的作用以及使用的配置項

Data Node

Data Node 是集羣中負責數據存儲的節點,會存儲數據表等元數據,也負責執行數據的讀、寫以及 JOIN 等操作。集羣中的所有 Data Node 可以按照 cluster 和 shard 分成若干組。

(cluster、shard 和 data node 的關係)

我們通過 ReplicatedMergeTree 這一 Engine 在同一個 shard 的不同 Data Node 之間同步數據(具體原理下文會詳細描述)以保證數據和元數據是冗餘存儲的,不會因爲單點問題而丟失。

Data Node 中包含集羣的配置,用於執行分佈式 SQL 語句,Data Node 間的數據複製不依賴此配置

remote_servers:
  clickhouse_dev_data:
    shard: 
      - replica:
          host: 127.0.0.1
          port: 9000
      - replica:
          host: 127.0.0.1
          port: 9000

Data Node 間的數據複製主要依賴各個 Data Node 在 Zookeeper 中註冊的 hostname 進行通信。hostname 可通過 interserver_http_host 配置。如果不配置使用和 hostname -f 一致的名稱。

Coordinator Node

爲了保證同一個 shard 中的多個 Data Node 上的計算任務是均衡的,我們引入了 Coordinator Node 作爲一層代理。在 Coodinator Node 上建立分佈式表,即可通過 Coodinator Node 去查詢 Data Node 所管理的數據,Coodinator Node 對於查詢或者寫入的 Client 來說是 Server,對 Data Node 來說是 Client。

長期來看,分佈式表的元數據會保存在 Coodinator Node 的磁盤上,因此這一層代理是有狀態的,只有正確的建立分佈式表才能滿足查詢的要求。同時,元數據所佔用的磁盤空間也比較小,所以 Coodinator Node 需要的 EBS Volumn 的 size 也會比較小。

Coodinator Node 和 Data Node 之間也需要鑑權,因此 Coodinator Node 的配置文件中需要填寫 Data Node 的鑑權信息。當發生讀 / 寫操作時,Coodinator Node 會直連 Data Node 進行操作,具體流程下文會詳細描述。

remote_servers:
  clickhouse_data:
    shard: 
      # 需要配置用戶名,密碼來連接 Data Node
      - replica:
          host: 127.0.0.1
          port: 9000
          user: default
          password: xxx
      - replica:
          host: 127.0.0.2
          port: 9000
          user: default
          password: xxx
  clickhouse_coordinator:
    shard: 
      - replica:
          host: 127.0.0.3
          port: 9000
       - replica:
          host: 127.0.0.4
          port: 9000

通用配置

集羣內的節點間同步 DDL 語句,依賴 ZooKeeper,相關配置如下:

# ZooKeeper 連接信息
zookeeper:
  - node:
      host: zookeeper.service.consul
      port: 2181

# 監聽 ZooKeeper 的此目錄獲取分佈式 DDL 的信息
distributed_ddl:
  path: /clickhouse/task_queue/ddl

存儲

在 ClickHouse 中,我們根據不同類型表引擎來選擇不同的存儲介質。日常中我們主要使用 MergeTree 系列引擎以及 S3 引擎,下面我們主要介紹這 2 種引擎。

MergeTree 引擎

MergeTree 系列引擎主要負責 ClickHouse 的數據存儲和查詢,是 ClickHouse 最核心的表引擎,該引擎支持我們使用 EBS 和 S3 共同作爲 ClickHouse 的存儲介質。在實際使用場景中我們通常按數據的存儲週期採用冷熱分離的策略,將熱數據存儲在 EBS 上,將冷數據存儲在 S3 上。這樣既能夠保證熱數據的查詢和寫入性能,又能降低冷數據的存儲成本。

當我們同時使用 EBS + S3 作爲存儲介質時的配置如下:

# EBS 存儲路徑
path: /data/clickhouse

storage_configuration:
  disks:
    # S3 訪問相關信息
    s3:
      type: s3
      endpoint: https://test-dev-clickhouse.s3.cn-northwest-1.amazonaws.com.cn/clickhouse/
      # 使用機器級別的 IAM Role
      use_environment_credentials: true
      region: cn-northwest-1
  policies:
    # 數據從 EBS 移動到 S3 的策略
    move_from_ebs_to_s3:
      volumes:
        main:
          disk: default
        external:
          disk: s3
      move_factor: 0.2

在上述配置文件中,我們定義了一個名爲 move_from_ebs_to_s3 的存儲策略,並在存儲策略中添加 2 種不同類型的 volume。其中名爲 main 的 volume 將數據存儲在 disk 爲 default 塊存儲上(即本地 EBS 中),名爲 external 的 volume 則將數據存儲在 disk 爲 s3 的對象存儲上。

當往 ClickHouse 寫入數據時,ClickHouse 會優先寫入 volumes 列表中的第 1 個 volume(本例子中是 main)當本 volume 的 可用存儲空間低於閾值時,則本 volume 中的數據以 part 爲單位移動到下一個 volume 中。

volume 的詳細配置如下:

S3 引擎

S3 引擎主要用於查詢 AWS S3 上已存在的數據,屬於 ClickHouse 擴展功能。當我們使用 S3 引擎時,我們需要在配置文件中做如下聲明:

s3:
  test-dev-analytics:
    endpoint: https://test-dev-analytics.s3.ap-east-1.amazonaws.com/
    use_environment_credentials: true

在上面配置中,我們聲明瞭一個名爲 shiyou-labs-aws-cur-test 的 S3 endpoint 點並配置 endpoint 訪問信息。我們聲明 use_environment_credentials 爲 true 來讓 ClickHouse 通過機器級別的 IAM Role 的方式來訪問 S3。該方式需要 S3 服務爲提供 ClickHouse 服務的 EC2 所關聯的 IAM Role(後面將該 IAM Role 稱爲 ClickHouse IAM Role) 授予訪問權限。

建表

分佈式 DDL

默認情況下,我們在節點上執行 CREATE、ALTER、DROP、RENAME 及 TRUNCATE 這些 DDL 語句時,該語句僅僅只能在本地節點生效。並不能在整個集羣所有節點上生效,如果要想在整個集羣的所有節點上生效,則默認需要分別登錄每個節點上面手動執行 CREATE 等這些 DDL 語句。這顯然是繁瑣也容易出錯的。

ClickHouse 爲我們提供了分佈式 DDL 功能。該功能可以允許我們在一臺節點上使用 on cluster {cluster} 語法執行 DDL 語句,從而 ClickHouse 幫我們自動分發到集羣中的所有節點上執行,省去了需要依次去單個節點執行 DDL 的煩惱。

分佈式 DDL 語句的執行依賴 Zookeeper 做協同,相關配置如下:

zookeeper:
  - node:
      host: zookeeper.service.consul
      port: 2181 

distributed_ddl:
  path: /clickhouse/task_queue/ddl

remote_servers:
  clickhouse_dev:
    shard: 
      - internal_replication: true
        replica:
          host: 10.0.0.0
          port: 9000
      - internal_replication: true
        replica:
          host: 10.0.0.1
          port: 9000

分佈式 DDL 語句如下:

CREATE TABLE default.test on cluster clickhouse_dev
(
    `id` Int64 COMMENT '主鍵ID',
    `name` String COMMENT '名稱',
    `create_time` DateTime COMMENT '創建時間',
    `dt` String DEFAULT toYYYYMMDD(create_time) COMMENT '分區字段',
)
ENGINE =  MergeTree()
PARTITION BY dt
ORDER BY (id, create_time)

ClickHouse 分佈式 DDL 用於在集羣內的多個 Data Node 上批量執行 DDL 語句,以簡化我們在多 Data Node 上逐個執行 DDL 的操作。整個分佈式 DDL 的執行流程如下:

  1. 客戶端通過 on cluster 關鍵字向 Coordinator Node 發起分佈式 DDL 請求,用於在 Coordinator Node 上向集羣中所有 Data Node 分發執行 DDL 語句

  2. 由於 Coordinator Node 並沒有使用 Zookeeper 對集羣中的 Data Node 做服務發現,故 Coordinator Node 需要在 system.clusters 表中查詢集羣中所有的 Data Node 節點

  3. Coordinator Node 將 DDL 操作日誌推送到 Zookeeper 上的 ddl 節點 /clickhouse/task_queue/ddl/query-000000207 並監控執行進度。推送到 Zookeeper 上操作日誌樣例數據如下:

[zk: localhost:2181(CONNECTED) 4] get  /clickhouse/task_queue/ddl/query-0000000207
version: 2
query: CREATE TABLE default.test_distributed_ddl UUID \'4355ae32-b5c9-4a78-a4c7-7e3b7b333beb\' ON CLUSTER clickhouse_dev (`id` Int64 COMMENT \'主鍵ID\', `text` String COMMENT \'文本字段\', `create_time` DateTime COMMENT \'創建時間\', `dt` String DEFAULT toYYYYMMDD(create_time) COMMENT \'分區字段\') ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/{database}/{table}\', \'{replica}\') PARTITION BY dt ORDER BY (id, create_time) SETTINGS storage_policy = \'move_from_ebs_to_s3\'
hosts: ['10%2E0%2E0%2E22:9000','10%2E0%2E0%2E23:9000']
initiator: ip%2D10%2D0%2D0%2D26%2Eap%2Deast%2D1%2Ecompute%2Einternal:9000
settings: load_balancing = 'random', max_memory_usage = 10000000000

可以看到操作日誌中主要包含:

load_balancing:爲多副本查詢時的負載均衡算法

max_memory_usage:單節點的內存使用限制

  1. Data Node 監聽 Zookeeper 上 ddl 節點下的變化並獲取操作日誌。通過 C++ 的 ifaddrs 包獲取本機 IP 地址信息並判斷本機 IP 是否在 hosts 列表中。如果在,則執行 query 中的 DDL 語句。如果不在,則直接跳過即可。

  2. Data Node 執行完 query 中的 DDL 語句後,會將本節點的狀態寫入到 finished 節點中,finished 節點中的樣例數據如下:

[zk: localhost:2181(CONNECTED) 4] ls /clickhouse/task_queue/ddl/query-0000000207/finished
[10%2E0%2E0%2E22:9000, 10%2E0%2E0%2E23:9000]

如上可以看到,執行完成的節點都會在 finished 節點下創建對應的子節點。

6. Coordinator Node 監聽 finished 節點下子節點的變化來判斷集羣中的所有節點是否全部執行完成。整個監聽過程是同步阻塞的。如果全部執行完成則返回給客戶端,如果沒有全部執行完成,則默認阻塞 180s 後超時失敗。阻塞超時時間可以通過 distributed_ddl_task_timeout 來調整。

7. 整個流程完畢後,Coordinator Node 將結果返回給客戶端

在執行分佈式 DDL 時,我們需要注意以下幾點:

在集羣模式下,ClickHouse 的建表方式和其他數據庫存在一些差異。我們需要在 Coordinator Node 和 Data Node 上面創建不同的表引擎,才能完成集羣模式下的數據寫入和查詢。整個建表流程如下:

在 Data Node 上創建數據表

ClickHouse MergeTree 系列的表引擎是 ClickHouse 數據存儲功能的核心,它們爲彈性和高性能數據檢索提供了大多數功能:列存儲、自定義分區、一級索引、二級索引等。爲了保證數據的高可用性,我們選擇在 Data Node 上使用 ReplicatedMergeTree 引擎創建表。整個建表流程如下:

1. 客戶端使用 on cluster 關鍵字向 Coordinator Node 發送分佈式 DDL 建表請求,建表語句如下:

CREATE TABLE default.test on cluster {cluster}
(
    `id` Int64 COMMENT '主鍵ID',
    `name` String COMMENT '名稱',
    `create_time` DateTime COMMENT '創建時間',
    `dt` String DEFAULT toYYYYMMDD(create_time) COMMENT '分區字段',
)
ENGINE =  ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}','{replica}')
PARTITION BY dt
ORDER BY (id, create_time)
TTL  create_time + INTERVAL 1 HOUR TO DISK 's3'
SETTINGS storage_policy = 'move_from_ebs_to_s3';

如上所示,MergeTree 中建表語句的參數如下:

1. cluster:ClickHouse 的集羣名稱。

2. shard: Data Node 所屬的 shard 編號。該參數自動從系統表中獲取

3. database:庫名。該參數自動從系統獲取

4. table:表名。該參數自動從系統獲取

2. Coordinator Node 節點通過集羣名從 system.clusters 表中獲取該集羣對應的所有 Data Node 節點信息

3. Coordinator Node 將 DDL 語句發送到 Zookeeper 節點中

4. 集羣中的各個 Data Node 從 Zookeeper 中獲取 DDL 語句執行

至此,所有 Data Node 上的本地表創建結束。由於集羣模式下,我們只將 Coordinator Node 暴露給客戶端訪問,此時我們在 Data Node 上創建的表還不能被客戶端訪問到。故我們接下來需要在 Coordinator Node 上建表。

在 Coordinator Node 上建表

ClickHouse 的 Distributed 表引擎支持數據寫入和查詢的轉發,本身不存儲任何數據。故我們在 Coordinator Node 上使用 Distributed 引擎創建表,用於對 Data Node 上表的訪問。

具體建表流程如下:

1. 客戶端向 Coordinator Node 發送建表請求

具體的建表語句如下:

CREATE TABLE default.test_distributed
(
    `id` Int64 COMMENT '主鍵ID',
    `name` String COMMENT '名稱',
    `create_time` DateTime COMMENT '創建時間',
    `dt` String DEFAULT toYYYYMMDD(create_time) COMMENT '分區字段',
)
ENGINE =  Distributed('{cluster}','{database}','{table}');

如上建表語句,Distributed 引擎中參數含義如下:

至此,我們整個建表流程介紹。

注意:

1. 當我們執行 drop table 刪除 ReplicatedMergeTree 表引擎的時候,會存在無法立即刪除 ReplicatedMergeTree 存在於 zookeeper 上的元數據信息。此時我們需要在 drop table 後添 加 no delay 參數去解決。官方 issue ReplicatedMergeTree in Atomic database。刪除語句如下:

DROP TABLE default.test_ttl ON cluster clickhouse_dev no delay;

分佈式寫入

接下來我們將描述集羣模式下,如何寫入數據,大致分爲以下幾步:

1. Client 向 Coodinator Node 發送要寫入的數據

2. Coodinator Node 本地緩存該數據並開始向 Data Node 推送

3. 完成寫入的 Data Node 在同一個 shard 內進行數據同步

(寫入時序圖)

如上所示通過 Distributed 表引擎進行分佈式寫入的步驟如下:

1. Coordinator Node 上的 Distributed 引擎接收到數據後會先寫入本地臨時目錄,例如:/{data}/{database}/{table}/{shard_number}/tmp

2. Coordinator Node 定時監聽臨時目錄中的數據變化,有變化時觸發向 Data Node 推送數據的功能,定時監聽的頻率涉及如下 2 個參數配置:

1distributed_directory_monitor_sleep_time_ms :基本間隔頻率,默認爲 100 毫秒

1distributed_directory_monitor_max_sleep_time_ms :最大間隔頻率,默認爲 30 秒

3. 默認情況下 Coordinator Node 會在每個 shard 中挑選一個 Data Node 逐個推送已寫入的數據文件。文件推送涉及如下 2 個參數配置:

3.1. distributed_directory_monitor_batch_inserts:開啓文件的批量發送,默認爲禁止

3.2. background_distributed_schedule_pool_size:發送文件的線程池大小,默認爲 16。只能在服務啓動時設置

4. Data Node 1 接收 Coordinator 發送的數據文件並寫入本地

5. 同一個 shard 的多個 Data Node 1 之間進行數據複製

整個數據寫入流程有同步和異步 2 種模式,可通過 insert_distributed_sync 進行配置:

      5.1 異步,默認方式,Coordinator Node 將數據寫入到本地臨時目錄後,就會向客戶端返回寫入成功信息。

      5.2 同步,Coordinator Node 必須保證數據寫入至少 1 個 Data Node 節點上時,整個 Insert 操作纔會成功。

由於異步模式無法在 Coodinator Node 宕機時保證數據一定能寫入 Data Node,所以我們此處會使用同步模式。

Data Node 之間的數據複製

當 shard 中的某一個 Data Node 成功完成了本地的寫入操作後,ClickHouse 會通過 ReplicatedMergeTree 來 shard 內部的數據同步。數據複製過程依賴 Zookeeper 做協同,官方建議使用的 Zookeeper 版本最好是 3.4.5 及以上。

包含副本狀態感知、操作日誌分發、任務隊列等。同一個 shard 的不同副本只能通過配置文件的方式指定 IP 和端口。

下面我們以 2 個 Data Node 爲例,來介紹 Data Node 間的數據複製流程

(數據複製時序圖)

1. Data Node 1 從 Coodinator Node 接收數據,並將數據按條數(max_insert_block_size = 1048576)切分成若干個 Block 塊並寫入本地分區。

2. Data Node 1 將操作日誌寫入到 Zookeeper 的 log 路徑下

   (/clickhouse/tables/{shard}/{database}/{table}/log/log-00000197)具體樣例數據如下:

[zk: localhost:2181(CONNECTED) 40] get  /clickhouse/tables/1/default/test_ttl/log/log-0000000197
format version: 4
create_time: 2022-09-27 08:00:29
source replica: replica_2
block_id: f880fa005b8940dcda64418c7eb81b52_1332684842122574982_11562910527007783636
get
f880fa005b8940dcda64418c7eb81b52_61_61_0

可以看到操作日誌包含如下內容:

         2.1 create_time:Log 數據寫入 Zookeeper 的時間

         2.2 source replica:操作日誌的推送副本

         2.3 block_id:當前 part 的 Block ID

         2.4 type:操作類型。

         2.5 part id:當前分區的目錄 ID

         2.6 Data Node 1 更新本節點在 Zookeeper 下的 log_pointer 下標。具體路徑爲:/clickhouse/tables/{shard}/{database}/{table}/replicas/{replica_name}/log_pointer。

3. Data Node 2 監聽 Zookeeper log 路徑下的節點變化,並從 log pointer 中獲取最新的 log 點。然後將 log pointer 點往後的操作日誌轉化爲 task 任務。並放到 Zookeeper 上的 queue 隊列中(/clickhouse/tables/{shard}/{database}/{table}/replicas/{replica_name}/queue)

4. Data Node 2 從 queue  隊列中獲取任務並選擇一個最佳的副本節點去拉取數據。副本選擇算法如下:

4.1 獲取 replicas/ 下的所有副本並通過 is_active 僅保留活躍的副本

4.2 獲取每個副本的 log_pointer,該值代表已同步的 part 下標。值越大則該節點的數據越齊全

4.3 獲取每個副本的 queue,該值代表爲同步的 part。值越大則該節點的數據越少

4.4 選擇 log_pointer 最大的副本,如果 log_pointer 一致,則選取 queue 長度越小的副本

5. Data Node 2 通過 Zookeeper 上的 replicas/{replica_name}/host 獲取副本的 hostname 信息。該 hostname 可通過 interserver_http_host 配置。如果不配置則和 hostname -f 一致。樣例數據如下:

[zk: localhost:2181(CONNECTED) 0] get /clickhouse/tables/1/default/test_ttl_2/replicas/replica_1/host
host: ip-10-0-0-12.ap-east-1.compute.internal
port: 9009
tcp_port: 9000
database: default
table: test_ttl_2
scheme: http

6. 向遠端副本發送下載請求,如果下載失敗,則嘗試 5 次。具體可通過  max_fetch_partition_retries_count 進行配置。

默認情況下,數據成功寫入 1 個副本後就會返回給客戶端,其他副本異步同步。當該副本所在的節點宕機時,則可能會導致數據丟失。我們可以通過 insert_quorum 參數來保證多副本同時寫入成功。

這樣會損失一些寫入性能,與非複製表相比,此操作會增加寫入的延遲。官方建議我們以低於 1 秒的頻率批量寫入數據,這部分損耗是可以忽略不計的。

分佈式查詢

單表查詢

1. 客戶端向 Coordinator Node 上的 Distributed 表發送查詢請求

2. Coordinator Node 通過負載均衡算法爲每個 Shard 選擇一個 Data Node 副本節點。負載均衡算法通過 load_balancing 配置指定:

3. Coordinator Node 將請求分發給被選中的 Data Node

4. Data Node 將查詢結果返回給 Coordinator Node

5. Coordinator Node 將結果返回客戶端

普通 JOIN

從上面的流程圖我們可以看出 ClickHouse 中的 join 流程如下:

1. 客戶端向 Coordinator Node 上的分佈式表發送查詢請求(select name from a join b where a.id = b.id)

2. Coordinator Node 上的分佈式表通過負載均衡算法,爲每個 shard 選擇 1 個副本節點。目前我們的集羣屬於 1 個 shard 2 個 副本的架構

3. Coordinator Node 將查詢發送到 a 表被選中的 Data Node

4. 由於 a 表需要和 b 表做 join,則 a 表所在的 Data Node 需要向 b 表所在的所有 Data Node 發送查詢請求並將 b 表的數據拷貝到本地做 join。如果 a 表的數據分佈在 M 個 Data Node 上,b 表的數據分佈到 N 個 Data Node 上。則在整個 join  過程中,a 表需要發送 M * N 個數據查詢請求。

5. 每個 a 表所在的 Data Node 將 join 後的數據返回給 Coordinator Node 上的分佈式表

6. Coordinator 上的分佈式表將各個 Data Node 返回的數據合併

7. Coordinator 上的分佈式表將數據返回給客戶端

從第 4 步可以看出當 2 個分佈式表進行 join 時,會產生大量的數據查詢請求(shard 數的平方),由於我們的集羣目前是 1 個 shard 2 個副本。所以暫時不會該問題。但我們下面介紹一下解決這個問題的 Global join 的方案。

Global JOIN

從上面的流程圖可以看出 ClickHouse 的 global join 如下:

1. 客戶端向 Coordinator Node 上的分佈式表發送查詢請求(select name from a global join b where a.id = b.id)

2. Coordinator Node 上的分佈式表通過負載均衡算法,爲每個 shard 選擇 1 個副本節點。目前我們的集羣屬於 1 個 shard 2 個 副本的架構

3. Coordinator Node 上的分佈式表先查詢 b 表的數據並存儲在本地臨時內存表中

4. Coordinator Node 將 b 表的數據發送到 a 表所在的 Data Node(如果 a 表是副本,則會在多個副本中選擇一個 Data Node)

5. Coordinator Node 將查詢語句發送至 a 表所在的 Data Node

6. Data Node 將本地的 a 表及內存中的 b 進行 join

7. Data Node 將本地計算完的數據返回給 Coordinator Node

8. Coordinator Node 合併所有 Data Node 返回的數據並最終返回給客戶端

在 global join 中,可以看到對 b 表的查詢請求只在 Coordinator Node 上發生了一次,即請求數爲 shard 數。但使用 global join 時也需要注意以下幾點:

     1. 一般建議對維表進行 join 的時候使用,數據量小。

     2. Global join 的 Coordinator Node 到 Data Node 的網絡帶寬無法限制。

認證

如上所示,我們只將 Coordinator Node 暴露給客戶端,由於客戶端到 Coordinator Node 認證信息無法直接穿透到 Coordinator Node 到 Data Node 中去,故我們整個 ClickHouse 的認證過程分爲如下 2 個階段:

1. 客戶端連接 Coordinator Node 時,通過指定不同的用戶名和密碼來訪問不同的表。創建用戶的樣例語句如下:

CREATE USER test_user  IDENTIFIED WITH sha256_password BY 'test_password' HOST LIKE '10.%.%.%';
grant  SELECT, INSERT, ALTER, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, OPTIMIZE  ON default.* to test_user;

2. Coordinator Node 連接 Data Node 時,通過預先在 Coordinator 節點中配置的用戶名及密碼來訪問 Data Node 上面的庫和表。配置方式如下:

remote_servers:
  clickhouse_dev:
    shard: 
      - internal_replication: true
        replica:
          host: 127.0.0.1
          port: 9000
          user: test_user
          password: test_password
      - internal_replication: true
        replica:
          host: 127.0.0.1
          port: 9000 
          user: test_user
          password: test_password

最佳實踐

創建用戶並授權

-- 創建用戶的例子
CREATE USER test_user IDENTIFIED WITH sha256_password BY 'test_password' HOST LIKE '10.%.%.%';

-- 向用戶授權的例子
GRANT SELECT, INSERT, ALTER, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, OPTIMIZE ON test_database.* to test_user;

建表

MergeTree

創建 MergeTree 引擎表需要分別在 Coordinator Node 上創建 Distributed 表以及在 Data Node 上創建 MergeTree 引擎表:

1 在 Coordinator Node 上執行分佈式 DDL 來在 Data Node 節點上創建 MergeTree 引擎表

CREATE TABLE IF NOT EXISTS omni_sdk.app_store_sale ON cluster clickhouse_dev(
    account String COMMENT '',
    vendor_number String COMMENT '',
    dt Date COMMENT '時間分區',
    provider String COMMENT '提供商',
    provider_country String COMMENT '提供商所在國家或地區',
    sku String COMMENT 'sku',
    create_time DateTime64(9, 'UTC') COMMENT ''
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/{database}/{table}','{replica}',create_time)
ORDER BY(account,
    vendor_number,
    dt,
    provider,
    provider_country,
    sku
)
TTL toDateTime(create_time) + INTERVAL 30 DAY TO DISK 's3'
SETTINGS storage_policy = 'move_from_ebs_to_s3'

關於建表語句的說明如下:

1. ON cluster clickhouse_dev 將建表語句分發到集羣名爲 clickhouse_dev 的節點上去。

2. ReplicatedReplacingMergeTree 引擎主要用於表的副本及去重功能

3. storage_policy = 'move_from_ebs_to_s3' 用於數據冷熱分離

4. TTL toDateTime(create_time) + INTERVAL 30 DAY TO DISK 's3'將 30 天以前的數據轉移到 S3。

1 在 Coordinator Node 上創建分佈式表

CREATE TABLE IF NOT EXISTS omni_sdk.app_store_sale(
    account String COMMENT '',
    vendor_number String COMMENT '',
    dt Date COMMENT '時間分區',
    provider String COMMENT '提供商',
    provider_country String COMMENT '提供商所在國家或地區',
    sku String COMMENT 'sku',
    create_time DateTime64(9, 'UTC') COMMENT ''
)
ENGINE = Distributed('clickhouse_dev','omni_sdk','app_store_sale');

S3 外表

針對 S3 外表,我們需要分別在 Data Node 和 Coordinator 節點上創建。其中 Coordinator Node 上的外表主要用於預覽 S3 上的數據,Data Node 上的外表主要用於做數據的 join。

1. 我們在 Data Node 集羣節點上的 S3 外表建表語句如下:

-- 創建 cur 外部表,並跳過第1列
CREATE TABLE default.test_s3_engine on cluster clickhouse_data_dev(
    identity_lineitemid String COMMENT '',
    identity_timeinterval String COMMENT '',
    bill_invoiceid String COMMENT ''
) engine = S3('https://shiyou-labs-aws-cur-test.s3.ap-southeast-1.amazonaws.com/*.csv.gz', 'CSV')
SETTINGS input_format_csv_skip_first_lines = 1;

2. 我們在 Coordinator Node 集羣節點上的 S3 外表建表語句如下:

-- 創建 cur 外部表,並跳過第1列
CREATE TABLE default.test_s3_engine on cluster clickhouse_coordinator_dev(
    identity_lineitemid String COMMENT '',
    identity_timeinterval String COMMENT '',
    bill_invoiceid String COMMENT ''
) engine = S3('https://shiyou-labs-aws-cur-test.s3.ap-southeast-1.amazonaws.com/*.csv.gz', 'CSV')
SETTINGS input_format_csv_skip_first_lines = 1;

架構演進

單點架構

使用單個 EC2 部署 ClickHouse 服務,元數據存儲在 EBS 上,數據存儲在 S3 上。定時對 EBS 上的數據做 snapshot。特點如下:

1. 部署簡單

2. 單個 EBS 具有 99.9% 的可用性,元數據的安全性可以得到較高的保證

3. 針對單個 EBS 做 snapshot,可恢復一部分人爲誤操作刪除的數據

4. 單個 EC2 的性能無法滿足業務需求時,我們可以先通過升級 EC2 配置解決

Replicated 副本表架構

Replicated 架構在單點架構的基礎上通過使用 ReplicatedMergeTree 引擎來將同一份數據實時備份到多個 EC2 部署的 ClickHouse 節點上。每個節點都存儲全量數據。每個服務的元數據存儲在各自的 EBS 上,數據在 S3 上存儲一份。相較於單節點的優勢如下:

1. 多個節點 EBS 的可用性要高於單個節點

2. ClickHouse 服務可以實現不停機的滾動更新

3. 能提高查詢的並行度,但無法提高單個查詢效率

分佈式架構

分佈式架構在單點架構的基礎上通過使用 Distributed 表引擎,來將寫入到集羣中的數據均勻的分配到多個 EC2 部署的 ClickHouse 節點上(將數據水平切分)。每個節點只存儲集羣中的部分數據。可以解決單節點遇到的如下問題:

1. 當單個節點的性能無法通過升級節點配置解決時,則我們可以使用該架構將數據存儲在多個節點上進行並行處理。

延伸閱讀

配額與熔斷

ClickHouse 支持通過配額和單個查詢的複雜度 2 種方式來進行熔斷,相關內容請進一步閱讀官方文檔

配置熱更

ClickHouse 配置文件中,有部分配置項可以熱更新,我們使用單獨的 Nomad  Job 將配置文件寫入 consul kv 中,並在 ClickHouse Nomad Job 中使用 SIGHUP 信號進行配置文件的更新,這樣可以保證 ClickHouse 在服務不重啓的情況下實現配置文件的動態更新。

已經明確測試過可以熱更新的配置包括:

ClickHouse 集羣相關配置。即當我們的集羣中動態增加或刪除節點時,我們的 ClickHouse 服務可以不用重啓而直接熱加載

remote_servers:
  clickhouse_dev:
    shard: 
      - internal_replication: true
        replica:
          host: 127.0.0.1
          port: 9000
      - internal_replication: true
        replica:
          host: 127.0.0.1
          port: 9000
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/irwzvdNJEzqcq6k2R5clsA