分佈式 PostgreSQL 集羣 -Citus- 官方示例 - 實時儀表盤

目錄

Citus 提供對大型數據集的實時查詢。我們在 Citus 常見的一項工作負載涉及爲事件數據的實時儀表板提供支持。

例如,您可以是幫助其他企業監控其 HTTP 流量的雲服務提供商。每次您的一個客戶端收到 HTTP 請求時,您的服務都會收到一條日誌記錄。您想要攝取所有這些記錄並創建一個 HTTP 分析儀表板,爲您的客戶提供洞察力,例如他們的網站服務的 HTTP 錯誤數量。重要的是,這些數據以儘可能少的延遲顯示出來,這樣您的客戶就可以解決他們網站的問題。儀表板顯示歷史趨勢圖也很重要。

或者,也許您正在建立一個廣告網絡,並希望向客戶展示其廣告系列的點擊率。在此示例中,延遲也很關鍵,原始數據量也很高,歷史數據和實時數據都很重要。

在本節中,我們將演示如何構建第一個示例的一部分,但該架構同樣適用於第二個和許多其他用例。

數據模型

我們正在處理的數據是不可變的日誌數據流。我們將直接插入 Citus,但這些數據首先通過 Kafka 之類的東西進行路由也很常見。這樣做具有通常的優勢,並且一旦數據量變得難以管理,就可以更容易地預先聚合數據。

我們將使用一個簡單的 schema 來攝取 HTTP 事件數據。這個 schema 作爲一個例子來展示整體架構;一個真實的系統可能會使用額外的列。

-- this is run on the coordinator

CREATE TABLE http_request (
  site_id INT,
  ingest_time TIMESTAMPTZ DEFAULT now(),

  url TEXT,
  request_country TEXT,
  ip_address TEXT,

  status_code INT,
  response_time_msec INT
);

SELECT create_distributed_table('http_request', 'site_id');

當我們調用 create_distributed_table 時,我們要求 Citus 使用 site_id 列對 http_request 進行 hash 分配。這意味着特定站點的所有數據都將存在於同一個分片中。

UDF 使用分片計數的默認配置值。我們建議在集羣中使用 2-4 倍於 CPU 核的分片。使用這麼多分片可以讓您在添加新的工作節點後重新平衡集羣中的數據。

Azure Database for PostgreSQL — 超大規模 (Citus) 使用流式複製來實現高可用性,因此維護分片副本將是多餘的。在任何流複製不可用的生產環境中,您應該將 citus.shard_replication_factor 設置爲 2 或更高以實現容錯。

有了這個,系統就可以接受數據並提供查詢了!在繼續執行本文中的其他命令時,讓以下循環在後臺的 psql 控制檯中運行。它每隔一兩秒就會生成假數據。

DO $$
  BEGIN LOOP
    INSERT INTO http_request (
      site_id, ingest_time, url, request_country,
      ip_address, status_code, response_time_msec
    ) VALUES (
      trunc(random()*32), clock_timestamp(),
      concat('http://example.com/', md5(random()::text)),
      ('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],
      concat(
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2)
      )::inet,
      ('{200,404}'::int[])[ceil(random()*2)],
      5+trunc(random()*150)
    );
    COMMIT;
    PERFORM pg_sleep(random() * 0.25);
  END LOOP;
END $$;

攝取數據後,您可以運行儀表板查詢,例如:

SELECT
  site_id,
  date_trunc('minute', ingest_time) as minute,
  COUNT(1) AS request_count,
  SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
  SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
  SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
WHERE date_trunc('minute', ingest_time) > now() - '5 minutes'::interval
GROUP BY site_id, minute
ORDER BY minute ASC;

上述設置有效,但有兩個缺點:

彙總

您可以通過將原始數據彙總爲預聚合形式來克服這兩個缺點。在這裏,我們將原始數據彙總到一個表中,該表存儲 1 分鐘間隔的摘要。在生產系統中,您可能還需要類似 1 小時和 1 天的間隔,這些都對應於儀表板中的縮放級別。當用戶想要上個月的請求時間時,儀表板可以簡單地讀取並繪製過去 30 天每一天的值。

CREATE TABLE http_request_1min (
  site_id INT,
  ingest_time TIMESTAMPTZ, -- which minute this row represents

  error_count INT,
  success_count INT,
  request_count INT,
  average_response_time_msec INT,
  CHECK (request_count = error_count + success_count),
  CHECK (ingest_time = date_trunc('minute', ingest_time))
);

SELECT create_distributed_table('http_request_1min', 'site_id');

CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);

這看起來很像前面的代碼塊。最重要的是:它還在 site_id 上進行分片,並對分片計數和複製因子使用相同的默認配置。因爲這三個都匹配,所以 http_request 分片和 http_request_1min 分片之間存在一對一的對應關係,Citus 會將匹配的分片放在同一個 worker 上。這稱爲協同定位(co-location);它使諸如聯接(join)之類的查詢更快,並使我們的彙總成爲可能。

爲了填充 http_request_1min,我們將定期運行 INSERT INTO SELECT。這是可能的,因爲這些表位於同一位置。爲方便起見,以下函數將彙總查詢包裝起來。

-- single-row table to store when we rolled up last
CREATE TABLE latest_rollup (
  minute timestamptz PRIMARY KEY,

  -- "minute" should be no more precise than a minute
  CHECK (minute = date_trunc('minute', minute))
);

-- initialize to a time long ago
INSERT INTO latest_rollup VALUES ('10-10-1901');

-- function to do the rollup
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
  curr_rollup_time timestamptz := date_trunc('minute', now());
  last_rollup_time timestamptz := minute from latest_rollup;
BEGIN
  INSERT INTO http_request_1min (
    site_id, ingest_time, request_count,
    success_count, error_count, average_response_time_msec
  ) SELECT
    site_id,
    date_trunc('minute', ingest_time),
    COUNT(1) as request_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
    SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
  FROM http_request
  -- roll up only data new since last_rollup_time
  WHERE date_trunc('minute', ingest_time) <@
          tstzrange(last_rollup_time, curr_rollup_time, '(]')
  GROUP BY 1, 2;

  -- update the value in latest_rollup so that next time we run the
  -- rollup it will operate on data newer than curr_rollup_time
  UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;

上述函數應該每分鐘調用一次。您可以通過在 coordinator 節點上添加一個 crontab 條目來做到這一點:

* * * * * psql -c 'SELECT rollup_http_request();'

或者,諸如 pg_cron 之類的擴展允許您直接從數據庫安排週期性查詢。

之前的儀表板查詢現在好多了:

SELECT site_id, ingest_time as minute, request_count,
       success_count, error_count, average_response_time_msec
  FROM http_request_1min
 WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;

過期的舊數據

彙總使查詢更快,但我們仍然需要使舊數據過期以避免無限的存儲成本。只需決定您希望爲每個粒度保留數據多長時間,然後使用標準查詢刪除過期數據。在以下示例中,我們決定將原始數據保留一天,將每分鐘的聚合保留一個月:

DELETE FROM http_request WHERE ingest_time < now() - interval '1 day';
DELETE FROM http_request_1min WHERE ingest_time < now() - interval '1 month';

在生產中,您可以將這些查詢包裝在一個函數中,並在 cron job 中每分鐘調用一次。

通過在 Citrus 哈希分佈之上使用表範圍分區,數據過期可以更快。有關詳細示例,請參閱時間序列數據部分。

這些是基礎!我們提供了一種架構,可以攝取 HTTP 事件,然後將這些事件彙總到它們的預聚合形式中。這樣,您既可以存儲原始事件,也可以通過亞秒級查詢爲您的分析儀表板提供動力。

接下來的部分將擴展基本架構,並向您展示如何解決經常出現的問題。

近似不同計數

HTTP 分析中的一個常見問題涉及近似的不同計數:上個月有多少獨立訪問者訪問了您的網站?準確地回答這個問題需要將所有以前見過的訪問者的列表存儲在彙總表中,這是一個令人望而卻步的數據量。然而,一個近似的答案更易於管理。

一種稱爲 hyperloglog 或 HLL 的數據類型可以近似地回答查詢;要告訴您一個集合中大約有多少個獨特元素,需要的空間非常小。其精度可以調整。我們將使用僅使用 1280 字節的那些,將能夠以最多 2.2% 的錯誤計算多達數百億的唯一訪問者。

如果您要運行全局查詢,則會出現類似的問題,例如在上個月訪問您客戶的任何站點的唯一 IP 地址的數量。在沒有 HLL 的情況下,此查詢涉及將 IP 地址列表從 worker 傳送到 coordinator 以進行重複數據刪除。這既是大量的網絡流量,也是大量的計算。通過使用 HLL,您可以大大提高查詢速度。

首先你必須安裝 HLL 擴展;github repo 有說明。接下來,您必須啓用它:

CREATE EXTENSION hll;

這在 Hyperscale 上是不必要的,它已經安裝了 HLL 以及其他有用的擴展。

現在我們準備好在 HLL 彙總中跟蹤 IP 地址。首先向彙總表添加一列。

ALTER TABLE http_request_1min ADD COLUMN distinct_ip_addresses hll;

接下來使用我們的自定義聚合來填充列。只需將它添加到我們彙總函數中的查詢中:

@@ -1,10 +1,12 @@
  INSERT INTO http_request_1min (
    site_id, ingest_time, request_count,
    success_count, error_count, average_response_time_msec
+   , distinct_ip_addresses
  ) SELECT
    site_id,
    minute,
    COUNT(1) as request_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
    SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
+   , hll_add_agg(hll_hash_text(ip_address)) AS distinct_ip_addresses
  FROM http_request

儀表板查詢稍微複雜一些,您必須通過調用 hll_cardinality 函數讀出不同數量的 IP 地址:

SELECT site_id, ingest_time as minute, request_count,
       success_count, error_count, average_response_time_msec,
       hll_cardinality(distinct_ip_addresses) AS distinct_ip_address_count
  FROM http_request_1min
 WHERE ingest_time > date_trunc('minute', now()) - interval '5 minutes';

HLL 不僅速度更快,還可以讓你做以前做不到的事情。假設我們進行了彙總,但我們沒有使用 HLL,而是保存了確切的唯一計數。這很好用,但您無法回答諸如在過去的一週內,我們丟棄了原始數據有多少不同的會話?之類的問題。

使用 HLL,這很容易。您可以使用以下查詢計算一段時間內的不同 IP 計數:

SELECT hll_cardinality(hll_union_agg(distinct_ip_addresses))
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;

您可以在項目的 GitHub 存儲庫中找到有關 HLL 的更多信息。

使用 JSONB 的非結構化數據

Citus 與 Postgres 對非結構化數據類型的內置支持配合得很好。爲了證明這一點,讓我們跟蹤來自每個國家/地區的訪客數量。使用半結構數據類型可以讓您不必爲每個國家添加一列,並最終得到具有數百個稀疏填充列的行。我們有一篇博文解釋了半結構化數據使用哪種格式。這篇文章推薦使用 JSONB,在這裏我們將演示如何將 JSONB 列合併到您的數據模型中。

首先,將新列添加到我們的彙總表中:

ALTER TABLE http_request_1min ADD COLUMN country_counters JSONB;

接下來,通過修改彙總函數將其包含在彙總中:

@@ -1,14 +1,19 @@
  INSERT INTO http_request_1min (
    site_id, ingest_time, request_count,
    success_count, error_count, average_response_time_msec
+   , country_counters
  ) SELECT
    site_id,
    minute,
    COUNT(1) as request_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count
    SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count
    SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
- FROM http_request
+   , jsonb_object_agg(request_country, country_count) AS country_counters
+ FROM (
+   SELECT *,
+     count(1) OVER (
+       PARTITION BY site_id, date_trunc('minute', ingest_time), request_country
+     ) AS country_count
+   FROM http_request
+ ) h

現在,如果您想在儀表板中獲取來自美國的請求數量,您可以將儀表板查詢修改爲如下所示:

SELECT
  request_count, success_count, error_count, average_response_time_msec,
  COALESCE(country_counters->>'USA', '0')::int AS american_visitors
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VODPCLrRTCnI1vqn8NIBHA