用 ClickHouse 近乎實時地進行欺詐檢測

以下是我們如何確保我們不斷髮展的 Gojek 生態系統對我們的客戶、司機夥伴和商戶夥伴是安全的。

在 Gojek,我們不斷尋求創新的解決方案,以解決我們不斷變化的挑戰,爲我們的客戶、司機夥伴、商戶夥伴和我們的整個生態系統保持平臺安全。

ClickHouse 正是用於這一目的。

它是我們最近部署的技術之一,以打擊我們平臺上的欺詐者。在這篇文章中,我們旨在描述我們採用 ClickHouse 的方法,涵蓋以下主題。

我們正在努力解決的問題

多年來,我們的欺詐檢測引擎專注於批量檢測。幾年來,它在幫助我們與欺詐者作鬥爭方面一直運作良好。然而,隨着我們業務複雜性的增加,我們越來越多地看到有必要在近乎實時的情況下抓住欺詐者的用例。實時規則有助於在訂單完成之前限制對我們的客戶造成的損害。它還有助於我們創造更好的客戶體驗,因爲我們將能夠在損害發生之前做出反應。

我們自然而然地查看了我們現有的基礎設施。我們有一個現有的數據倉庫,支持我們的批量檢測邏輯。然而,它不適合頻繁和重複的實時查詢,因爲它是一個相對冷的存儲,在數據更新發生之前需要幾分鐘。對於小型查詢,它也需要幾秒鐘的時間,因爲它需要提供資源和處理查詢。因此,我們開始尋找另一種近乎實時的解決方案來補充我們基於批處理的解決方案。

我們的目標是爲近乎實時的規則引擎找到一個新的數據存儲。我們的用戶,即數據分析師,已經表達了他們對新的規則引擎具有以下特性的願望。

用 ClickHouse 進行實驗

根據分析師給出的要求,我們轉向了流處理引擎和在線分析處理(OLAP)數據庫。我們以同樣的實驗場景測試了幾個數據庫。經過評估,我們選擇了 ClickHouse,它是一個開源的 OLAP 數據庫,注重性能。

我們選擇它作爲我們的解決方案是因爲以下原因。

實驗方案

我們將用實驗中使用的一個簡化樣本場景來詳細說明。在實驗中,我們想找到有併發預訂的新賬戶。併發預訂被定義爲與任何其他預訂重疊的預訂,預訂時間由預訂創建事件 + 預訂完成 / 取消事件定義。新賬戶被定義爲在過去 30 分鐘內創建的賬戶。因此,我們將查詢窗口限制爲 T-30 分鐘。

我們有兩個相應的 Kafka 流,用於客戶賬戶活動和食品預訂活動。然而,這些流是通用的流,反映了客戶賬戶活動和食品預訂的所有更新。我們也不應該針對這個例子進行任何優化,因爲在現實中它們可以被用於多個查詢,我們的數據分析師在爲新的用例編寫查詢時有絕對的自由。

當我們在決定實驗的模擬規模時,我們研究了數據分析師需要解決的最常見的問題的規模。對於 30 分鐘時間跨度內的大多數實時查詢,事件的平均數量約爲數萬到數千萬,這取決於流。因此,我們挑選了典型的流的負載,我們模擬了 5 萬個關於客戶賬戶活動的事件,以及 200 萬個關於預訂的事件。我們要運行的查詢包括對食品預訂事件的自我連接,以找出持續時間,以及通常的選擇、過濾和子查詢。我們希望查詢每 5 秒執行一次,並且查詢應該在 1 秒內完成。

定義表格和填充數據

爲了適應我們的用例,我們從谷歌雲上的 e2-standard-8 機器上的一個單節點 ClickHouse 實例開始。我們必須完成的第一個任務是使測試數據在 ClickHouse 中可用。表的創建與其他 RDBMS 中通常的數據描述語言(DDL)略有不同,但我們能夠通過遵循常見的例子爲食品預訂事件提出我們的第一個表定義,如下所示。

CREATE TABLE order_log (
 customer_id Int32,
 order_number String,
 status String,
 event_timestamp DateTime64(3, ‘UTC’)
) ENGINE = MergeTree()
ORDER BY (event_timestamp)
PARTITION BY toYYYYMMDD(event_timestamp)
SETTINGS index_granularity = 8192

我們還定義了我們的客戶賬戶活動表,如下所示。

CREATE TABLE customer_log (
 customer_id Int32,
 new_user UInt8,
 event_timestamp DateTime64(3, ‘UTC’)
) ENGINE = MergeTree()
ORDER BY (event_timestamp)
PARTITION BY toYYYYMMDD(event_timestamp)
SETTINGS index_granularity = 8192

爲了避免在測試環境中消耗生產數據的複雜性,我們在這個階段沒有從生產 Kafka 流中收集數據。相反,我們爲兩個流創建了一個幾小時的數據轉儲,並使用默認的客戶端將其插入到我們的測試 ClickHouse 實例。在我們的生產設置中,我們已經實現了一個專門的數據攝取器,以應對我們面臨的各種挑戰,更多的細節可以在本文的後面部分找到。

上面的代碼中最陌生的部分是 ENGINE= MergeTree() 這一行,以及它後面的內容。讓我們深入瞭解這一行的內部情況,並理解它對我們的用例有何幫助。

合併樹引擎

ENGINE 子句定義了該表所使用的引擎,它定義了數據的存儲和查詢的方式。MergeTree 是 ClickHouse 中最常見的引擎,它是 ClickHouse 提供良好性能的原因之一。MergeTree 系列中的引擎被設計爲以批處理的方式向表中插入非常大量的數據。

爲了理解 MergeTree 的工作原理,我們從最熟悉的分區概念開始,分區由 PARTITION BY 子句定義。這裏的分區概念類似於 RDBMS 中的傳統分區,但不同的是,沒有爲每個分區創建單獨的表。它是一種數據的邏輯分組,用於更有效的查詢:當一個 SELECT 查詢包含一個由 PARTITION BY 指定的列上的條件時,ClickHouse 會自動過濾掉那些不需要的分區,從而減少讀取的數據量。

在每個分區中,會有許多數據部分。因此,一個單一的 ClickHouse 表由許多數據部分組成。每次插入都會創建一個新的數據部分,如下圖所示,它由文件系統中的一個新目錄表示。你可以想象,這對點插入來說不是很有效,因爲每次插入只包含一條記錄。因此,ClickHouse 建議批量插入,同時在數據寫入後進行優化,在後臺應用規則來合併各部分,以提高存儲效率。在我們的理解中,這就是爲什麼引擎中有一個 Merge 字樣。

高效的查詢通常依賴於一些專門的數據結構,ClickHouse 也不例外。就像其他 RDBMS 一樣, primary key 起着重要的作用。主鍵通常與 ORDER BY 子句中定義的內容相同,但也可以是它的一個子集。數據按主鍵在數據部分之間和內部進行排序。然後每個數據部分被邏輯地劃分爲顆粒,這是最小的不可分割的數據集,ClickHouse 在選擇數據時讀取。ClickHouse 不分割行或值,所以每個顆粒總是包含一個整數的行,例如,在一個顆粒中 8192 行。一個顆粒的第一行被標記爲該行的主鍵值。顆粒在數據部分的位置是由標記表示的。對於每個數據部分,ClickHouse 創建了一個索引文件來存儲這些標記。想象一下,如果一個查詢到達,並且它包含一個關於主鍵的條件。ClickHouse 將能夠快速找到數據部分,以及使用標記的顆粒。如果我們將標記保存在內存中,那麼查詢將快如閃電,ClickHouse 可以直接定位磁盤上的數據。

最後,ClickHouse 是一個列式數據庫。這意味着 ClickHouse 中的數據是按列而不是按行存儲在磁盤上的。這就減少了在查詢時從磁盤上讀取的數據量,因爲不需要讀取或跳過不必要的數據。列數據需要與索引一起工作。因此,對於每一列,無論它是否在主鍵中,ClickHouse 也會存儲相同的標記。這些標記可以讓你直接在列文件中找到數據。

下面是對 MergeTree 結構的總結。

優化性能

一旦我們建立了表和基礎設施,我們就開始執行我們的數據分析師在這個例子上準備的查詢。經過一些小的調整,我們能夠執行該查詢(所以我們相信我們的數據分析師也能輕而易舉地做到這一點!)。

在我們的第一次嘗試中,我們發現性能是合理的。在我們描述的問題的規模下,查詢大約需要 1.5 秒的時間。我們採取了一些調整措施來提高性能。

經過優化,我們成功地將查詢時間減少到 800ms 以下。我們對其性能感到滿意,因爲它符合我們的性能目標。

ClickHouse 數據採集器

正如上一節提到的,我們直接從 CLI 轉儲數據進行實驗。當我們進行生產設置時,我們認爲在 ClickHouse 中複製 Kafka 流是很容易的,因爲在 ClickHouse 中有一個 Kafka 引擎可以使用。然而,在嘗試使用它之後,我們決定編寫我們自己的基於 Golang 的應用程序來處理來自 Kafka 的數據攝入。這個應用程序有簡單的職責。

這一決定背後有幾個原因。

支持 Protobuf

我們的 Kafka 流中的數據都是用 Protobuf 序列化的。如果我們使用內置的 Kafka 引擎,我們的表定義會像 ClickHouse 給出的例子一樣,如下所示。爲了解決模式問題,我們需要把我們的 Protobuf 定義放在 / var/lib/clickhouse/format_schemas 中,然後創建如下的表。

CREATE TABLE table (
 field String
 ) ENGINE = Kafka()
SETTINGS
 kafka_broker_list = ‘kafka:9092’,
 kafka_topic_list = ‘topic’,
 kafka_group_name = ‘group’,
 kafka_format = ‘Protobuf’,
 kafka_schema = ‘social:User’,
 kafka_row_delimiter = ‘’

它開箱即用,但它在我們的基礎設施中產生了一個問題。我們的 Protobuf 的 Schema 存儲在一個共同的存儲庫中,並且由於業務需求的不斷變化而不斷地更新。我們經常需要添加或更新我們的 protobuf 模式,從我們的存儲庫中獲取最新版本。我們有時也會更新我們的 Kafka 主題名稱。如果我們使用內置的方法,每次更新都需要在我們的服務器中使用 git pull 進行手動修改。我們希望能避免這種手動操作。

通過擁有一個單獨的數據攝取器,我們能夠導入我們編譯的 protobuf 作爲一個依賴。拉取最新版本的 protobuf 模式將由應用程序部署完成,在我們的 ClickHouse 基礎設施上將不需要手動改變。此外,由於 ClickHouse 不支持 Protobuf 中的所有數據類型(如地圖),擁有一個自定義的數據攝取器有助於我們保持兩種格式之間的數據兼容性。我們可以安全地忽略不支持的字段,甚至可以執行定製的映射,如果這些數據在我們的商業案例中是至關重要的。

DB 模式的版本控制和性能調控

每當一個新的 Kafka 流被複制到 Clickhouse,我們需要爲它創建一個相應的表。像往常一樣,我們希望對它進行版本控制,並且能夠在末日場景中完全恢復我們的數據庫模式,或者當我們需要通過添加新的 ClickHouse 節點來擴大我們的容量時。我們需要一個應用程序來處理遷移過程,而數據採集器原來是一個好地方。

當我們建立了從給定的 Protobuf 模式自動生成表定義 DDL 的功能時,我們看到了一個額外的優勢。由於我們的分析師不熟悉性能調整,我們能夠在我們的表定義上應用一些性能調整技術,例如將 Enums 定義爲 LowCardinality 列和編碼。這確保了表爲通用查詢進行了充分的優化。如果數據採集器不存在,這將更難實現。

可配置的批量插入

由於 ClickHouse 建議分批插入,我們需要爲我們想要插入的數據點創建一個緩衝區,並且只以每秒數次的頻率進行插入。挑戰在於,不同的 Kafka 流有非常不同的流量,從每秒超過數萬條記錄到幾秒鐘一條記錄不等。對於高流量的流,我們需要通過改變批次大小和沖刷間隔來控制攝取的頻率。另一方面,對於低流量的數據流,我們需要確保數據不會被緩衝太長時間,因爲查詢是近乎實時地運行。

因此,儘管 logic 每個流的配置總是相同的,但我們對不同的流應用不同的配置。在我們的數據攝取器中,我們爲每個流提取了以下配置。flush_max_messages 控制了緩衝區的最大尺寸,flush_timeout_ms 定義了一條記錄在緩衝區中可以保留的最大時間。通過改變這些值,我們可以確保每個流的攝取得到很好的處理。

[
 {
 “brokers”: [
 “The list of brokers, depending on the environment”
 ],
 “topic”: “topic to which the ingestor listens to”,
 “table_name”: “destination table in ClickHouse”,
 “proto_url”: “the Protobuf Schema URL, so that we can deserialize the message”,
 “flush_max_message”: Maximum number of messages per batch,
 “flush_timeout_ms”: the interval that the messages are inserted into Clickhouse, if flush_max_message is not fully utilized
 }
]

可測試性

獨立攝取器的最後但也是重要的好處是,我們能夠在投入生產之前充分測試我們的數據攝取。在我們的 CI/CD 管道中,我們進行了以下工作。

這確保了我們永遠不會把有問題的模式推到生產中,而且我們有足夠的信心通過這套測試使我們的部署管道完全自動化。此外,這些測試不需要太多的維護,因爲所有的表和流的邏輯是統一的。

生產部署

Clickhouse 成爲我們關於欺詐檢測的規則引擎的核心部分。因此,我們的系統需要高度可用,如果一個 ClickHouse 節點發生故障,不應該有數據損失。ClickHouse 允許我們通過配置來解決這個問題。它支持 cluster mode 並支持表的 ReplicatedMergeTree 引擎。ReplicatedMergeTree 引擎只是一個普通的 MergeTree 引擎,具有跨 ClickHouse 節點複製數據的能力。使用 Apache Zookeeper,ClickHouse 爲我們處理這個問題,我們不需要擔心這個問題。

部署還應該處理以下要求。

我們用讀寫分離的方式處理了上述要求,你可以在下面看一看。注意我們在圖中排除了 ZooKeeper。

讀 / 寫分離

我們的 ClickHouse 集羣目前共包含 5 個節點,其中兩個節點只用於數據攝取,三個節點只用於讀取。這些節點都是由同一個 ZooKeeper 集羣管理的。我們將它們的職責明確定義如下。

插入節點

插入節點只用於寫入數據,從不用於查詢執行。我們確保所有的數據只從我們的數據攝取器寫到攝取節點上。在數據被寫入插入節點後,它將使用 ReplicatedMergeTree 引擎自動複製到所有其他節點。插入節點被配置爲對所有其他節點具有可見性,因此它總是將數據複製到所有節點。

生產節點

生產節點負責執行我們的數據分析師在其規則中編寫的查詢。我們從不向生產節點寫入數據。我們還確保我們的規則引擎只與生產節點有連接。

遊樂場節點

Playground Node 擁有與 Production Node 相同的數據,並且是隻讀的,但它是用於數據分析師使用數據控制檯(如 Redash)來試驗他們的查詢。我們的規則引擎並不連接到 Playground Node。Playground Node 與 Production Node 的分離是爲了確保實驗性查詢不會影響生產中查詢的運行時間。

能力規劃

職責的不同爲能力規劃創造了靈活性。在處理能力和擴容要求上有很大的不同。下面是我們最終使用的每個節點的能力總結。

我們根據我們的監測統計數據和樣本查詢的觀察結果來規劃我們的容量。首先,插入節點的容量要比其他節點小很多。原因是插入節點執行的工作非常簡單,沒有查詢在那裏運行。我們最初對插入節點使用了與其他節點相同的容量,但我們發現它們的利用率非常低。於是我們決定大幅縮減插入節點的規模。此刻,我們的插入節點能夠處理每秒數萬次的寫入,沒有問題。

其次,我們爲我們的生產節點和遊樂場節點選擇了高 cpu 設置,而不是標準設置。我們的查詢是接近實時的,並且有複雜的邏輯。大多數時候,它不需要超過幾十 GB 的數據大小,但它需要大量的計算能力。我們的監測還顯示,我們的查詢對 CPU 的要求比較高,即使是有大量數據的查詢。因此,CPU 更有可能成爲查詢處理的瓶頸。使用高 cpu 設置的決定是根據我們查詢的特點和執行查詢時使用的資源來決定的。

結束語

我們的 ClickHouse 集羣已經生產了大約一年。到目前爲止,我們有幾十條近乎實時的規則在上面運行,而更多的規則正在開發或從基於批處理的規則中轉換。

我們已經看到,我們目前的 ClickHouse 設置能夠支持我們的業務需求的發展,以對抗不同的欺詐案件。基於 Clickhouse 的規則已經被整合到許多下游的應用程序中,使我們能夠實現過去不可能實現的目標。我們期待着在不久的將來看到更多的用例被納入我們的 ClickHouse 集羣。

來源:

https://www.toutiao.com/article/7081115136571851301/?log_from=7540ed87b3b77_1649302863048

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/I2TinC7yZTd_J3X659eICg