​深入淺出 ClickHouse 物化視圖

作者:oliver

雖然官方文檔記錄了 ClickHouse 物化視圖很多詳細信息,但是使用物化視圖還是有很多小細節需要注意,更別說一些最佳實踐。本文總結了 ClickHouse 物化視圖使用上的各種問題,並展示三個實際案例。

存儲過程與觸發器

  • 存儲過程:預編譯好的一組 SQL 程序,類似 無返回結果 的函數。

  • 強調無返回是爲了和真正的 FUNCTION 區分開,這個有返回結果。

  • 觸發器:特殊存儲過程,監聽特定事件自動調用。

數據庫查詢語言(query language)是數據庫管理系統(DBMS)提供給用戶和數據庫交互的工具,查詢語言分爲三類 [^1]:

[!TIP] 三類查詢語言並不是邊界分明 工程中的查詢語言,會同時包含多種查詢語言的特性。[^2]

人們往往認爲 SQL 是用於關係模型(Relational Model)數據庫的聲明式查詢語言,但這個世界並不是非黑即白,聲明式語言雖然降低用戶學習成本,但數據庫承擔了檢查(詞法分析)、翻譯(編譯)、優化最後執行的過程。但如果業務需要一遍又一遍執行某一段相同的邏輯,每次都要重新走一遍流程,顯然不可接受。於是各大關係數據庫系統幾乎都引入了過程擴展,比如 PG 使用的 PL/pgSQL[^3],它包含變量定義、條件控制和循環等等過程式語言的元素。

那麼引入本節的主角:存儲過程(Stored Procedure),預先編譯好的一段邏輯(用過程語言),可以大大加快執行速度。

而觸發器(Trigger)則是一種特殊的存儲過程,它監聽某些數據庫事件,可以在事件發生前 / 中 / 後調用。[^4]

從事件類型上看,觸發器分爲:

從觸發動作上看 [^5],觸發器分爲:

那麼觸發器有什麼業務場景呢?舉個最簡單的例子,記錄某張表的審計日誌(Audit Log),把所有 DML 操作都通過觸發器記錄下來。

ClickHouse 物化視圖

ClickHouse 作爲關係型 OLAP(OnLine Analytical Processing)數據庫,很遺憾不支持存儲過程。[^6]

[!TIP] ClickHouse 存儲過程的實現狀況 在 2023 年 Roadmap 中 Experimental features and research 部分可以看到 refreshable materialized views,有生之年

但非常有意思的是,ClickHouse 提供了物化視圖(Materialized View)的特殊功能,在功能上相當於 AFTER INSERT 觸發器,物化視圖仍然使用 聲明式 SQL 定義計算邏輯

源碼閱讀

[!TIP] 提示 可以直接跳到 總結 部分。

[!TIP] ClickHouse 版本 本文源碼閱讀基於 ClickHouse 22.3 版本

StorageMaterializedView

首先看到物化視圖的類聲明 src/Storages/StorageMaterializedView.h

class StorageMaterializedView final : public IStorage, WithMutableContext
{
public:
    ...
private:
    /// Will be initialized in constructor
    StorageID target_table_id = StorageID::createEmpty();

    bool has_inner_table = false;
    ...
}

可以看到物化視圖繼承自 IStorage 類,從它的類註釋中可以看到它管理的功能。物化視圖和 StorageMerge 一樣都繼承自這個管理數據存儲的類,作爲一個視圖,莫非也有實際存儲?此外,物化視圖用 target_table_id 存儲了別的表的 id。接下來看看 IStorage 的類註解:

/** Storage. Describes the table. Responsible for
  * - storage of the table data;
  * - the definition in which files (or not in files) the data is stored;
  * - data lookups and appends;
  * - data storage structure (compression, etc.)
  * - concurrent access to data (locks, etc.)
  */

如果讀物化視圖會發生什麼?跳轉到重載 IStorageStorageMaterializedView::read 方法定義:

void StorageMaterializedView::read(
    ...)
{
    auto storage = getTargetTable();
    ...
    storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
    ...
}

以及 StorageMaterializedView::getTargetTable 方法定義:

StoragePtr StorageMaterializedView::getTargetTable() const
{
    checkStackSize();
    return DatabaseCatalog::instance().getTable(target_table_id, getContext());
}

讀操作是對 target_table_id 對應的表進行的,那麼就清晰了,物化視圖並不會存儲數據,會將查詢重定向到目標表。

做個實驗簡單驗證:

create table test( time DateTime) Engine=Memory();
create table source(time DateTime) Engine=Memory();
create materialized view mv_test to test as select time from source;
insert into table source values(now());

select * from test;

┌────────────────time─┐
│ 2023-02-25 18:51:41 │
└─────────────────────┘

select * from mv_test;

┌────────────────time─┐
│ 2023-02-25 18:51:41 │
└─────────────────────┘

explain  select * from test;

┌─explain───────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY))                               │
│   SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│     ReadFromStorage (Memory)                                              │
└───────────────────────────────────────────────────────────────────────────┘

explain  select * from mv_test;

┌─explain───────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY))                               │
│   SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│     SettingQuotaAndLimits (Lock destination table for MaterializedView)   │
│       ReadFromStorage (Memory)                                            │
└───────────────────────────────────────────────────────────────────────────┘

注意 Lock destination table for MaterializedView,符合猜測。

接着寫操作會發生什麼?猜測也會重定向到目標表,看看 StorageMaterializedView::write 方法的定義:

SinkToStoragePtr StorageMaterializedView::write(...)
{
    auto storage = getTargetTable();
    ...
    auto sink = storage->write(query, metadata_snapshot, local_context);
    ...
}

再做個小實驗驗證:

insert into table mv_test values(now());

select * from test;

┌────────────────time─┐
│ 2023-02-25 18:51:41 │
└─────────────────────┘
┌────────────────time─┐
│ 2023-02-25 19:11:20 │
└─────────────────────┘

符合猜測。

解答完疑惑,回到正常閱讀順序來,接下來閱讀構造器的代碼 src/Storages/StorageMaterializedView.cpp

StorageMaterializedView::StorageMaterializedView(
    const StorageID & table_id_,
    ContextPtr local_context,
    const ASTCreateQuery & query,
    const ColumnsDescription & columns_,
    bool attach_,
    const String & comment)
    : IStorage(table_id_), WithMutableContext(local_context->getGlobalContext())
{
    ...
    /// If the destination table is not set, use inner table
    has_inner_table = query.to_table_id.empty();  // 出現了新概念 inner table
    if (has_inner_table && !query.storage)  // 創建物化視圖時,要麼有 ENGINE 使用 inner table,要麼用 TO 使用外部表
        throw Exception(
            "You must specify where to save results of a MaterializedView query: either ENGINE or an existing table in a TO clause",
            ErrorCodes::INCORRECT_QUERY);

    if (query.select->list_of_selects->children.size() != 1)
        throw Exception("UNION is not supported for MATERIALIZED VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW);

    ...

    // 設置 to_table_id
    if (!has_inner_table)
    {
        target_table_id = query.to_table_id;
    }
    else if (attach_)
    {
        /// If there is an ATTACH request, then the internal table must already be created.
        target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);
    }
    else  // 創建inner table
    {
        /// We will create a query to create an internal table.
        ...

        target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID();  // 看來 ClickHouse 有個全局表註冊表
    }
}

可以看到:

  1. 物化視圖創建時需要指定目標表,不然會自己創建 inner 表

  2. 物化視圖不能使用 UNION

  3. ClickHouse 系統有個表的 “註冊表”,維護系統所有表的 id- 實例映射

IInterpreter、InterpreterInsertQuery

那麼下一個問題,對原始表插入數據,數據又怎麼經過物化視圖跑到目標表的?

首先關注查詢類 src/Interpreters/IInterpreter.h

/** Interpreters interface for different queries.
  */
class IInterpreter
{
public:
    /** For queries that return a result (SELECT and similar), sets in BlockIO a stream from which you can read this result.
      * For queries that receive data (INSERT), sets a thread in BlockIO where you can write data.
      * For queries that do not require data and return nothing, BlockIO will be empty.
      */
    virtual BlockIO execute() = 0;
    ...
}

當插入(INSERT)數據時,系統會調用 IInterpreter 的子類 src/Interpreters/InterpreterInsertQuery.cpp 處理查詢,先看到它的聲明:

/** Interprets the INSERT query.
  */
class InterpreterInsertQuery : public IInterpreter, WithContext
{
public:
    ...
    /** Prepare a request for execution. Return block streams
      * - the stream into which you can write data to execute the query, if INSERT;
      * - the stream from which you can read the result of the query, if SELECT and similar;
      * Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result).
      */
    BlockIO execute() override;
    ...
private
    ...
    Chain buildChainImpl(
        const StoragePtr & table,
        const StorageMetadataPtr & metadata_snapshot,
        const Block & query_sample_block,
        ThreadStatusesHolderPtr thread_status_holder,
        std::atomic_uint64_t * elapsed_counter_ms);
};

繼續看它的定義(只看 INSERT 分支):

BlockIO InterpreterInsertQuery::execute()
{
    ...
    StoragePtr table = getTable(query);
    ...

    StoragePtr inner_table;
    if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))  // 如果 insert query 指向的表是 StorageMaterializedView,目標表取出放到 inner_table 變量中
        inner_table = mv->getTargetTable();

    ...

    std::vector<Chain> out_chains;
    if (!distributed_pipeline || query.watch)
    {
        size_t out_streams_size = 1;

        if (query.select)  // 處理 INSERT SELECT,忽略
        {
            ...
        }
        else if (query.watch)  // 處理 LIVE VIEW 的 WATCH 語句,直接忽略即可
        {
            ...
        }

        for (size_t i = 0; i < out_streams_size; ++i)
        {
            auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr);  // 構建 chain,重要!!!
            out_chains.emplace_back(std::move(out));
        }
    }

    BlockIO res;

    /// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
    if (distributed_pipeline)
    {
        res.pipeline = std::move(*distributed_pipeline);
    }
    else if (query.select || query.watch)
    {
        ...  // 直接忽略
    }
    else  // 關注這個分支,query 是 INSERT 時
    {
        res.pipeline = QueryPipeline(std::move(out_chains.at(0)));  // 將 chain 第一個元素構造返回 BlockIO 的 pushing pipeline
        res.pipeline.setNumThreads(std::min<size_t>(res.pipeline.getNumThreads(), settings.max_threads));  // 設置 query 的配置

        if (query.hasInlinedData() && !async_insert)
        {  // 也就是 INSERT 語句帶了 VALUES (...),可以直接從語句中拿到要插入的數據
            /// can execute without additional data
            auto pipe = getSourceFromASTInsertQuery(query_ptr, true, query_sample_block, getContext(), nullptr);
            res.pipeline.complete(std::move(pipe));
        }
    }

    res.pipeline.addResources(std::move(resources));

    res.pipeline.addStorageHolder(table);  // 將 query 的目標表放入 pipeline 資源列表
    if (inner_table)  // 如果有物化視圖
        res.pipeline.addStorageHolder(inner_table);  // 把物化視圖的目標表也放到 pipeline 的資源列表

    return res;
}

可以看到方法內調用了 InterpreterInsertQuery::buildChainImpl,接着看這個方法的定義:

Chain InterpreterInsertQuery::buildChainImpl(
    const StoragePtr & table,
    const StorageMetadataPtr & metadata_snapshot,
    const Block & query_sample_block,
    ThreadStatusesHolderPtr thread_status_holder,
    std::atomic_uint64_t * elapsed_counter_ms)
{
    ...

    /// We create a pipeline of several streams, into which we will write data.
    Chain out;

    /// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyed
    out.addInterpreterContext(context_ptr);

    /// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
    ///       Otherwise we'll get duplicates when MV reads same rows again from Kafka.
    if (table->noPushingToViews() && !no_destination)  // table->noPushingToViews() 用於禁止物化視圖插入數據到 KafkaEngine
    {
        auto sink = table->write(query_ptr, metadata_snapshot, context_ptr);
        sink->setRuntimeData(thread_status, elapsed_counter_ms);
        out.addSource(std::move(sink));
    }
    else  // 構建物化視圖插入 pushingToViewChain,重點!!!
    {
        out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);
    }

    ...

    return out;
}
Chain 相關

接着來到文件 src/Processors/Transforms/buildPushingToViewsChain.cpp

Chain buildPushingToViewsChain(
    const StoragePtr & storage,
    const StorageMetadataPtr & metadata_snapshot,
    ContextPtr context,
    const ASTPtr & query_ptr,
    bool no_destination,
    ThreadStatusesHolderPtr thread_status_holder,
    std::atomic_uint64_t * elapsed_counter_ms,
    const Block & live_view_header)
{
    ...

    auto table_id = storage->getStorageID();
    Dependencies dependencies = DatabaseCatalog::instance().getDependencies(table_id);  // 重點,通過 table_id,拿到“依賴“這個表的 dependencies

    /// We need special context for materialized views insertions
    ContextMutablePtr select_context;
    ContextMutablePtr insert_context;
    ViewsDataPtr views_data;
    if (!dependencies.empty())
    {
        ...  // 把query的各種上下文拆出
    }

    std::vector<Chain> chains;

    for (const auto & database_table : dependencies)
    {
        auto dependent_table = DatabaseCatalog::instance().getTable(database_table, context);
        auto dependent_metadata_snapshot = dependent_table->getInMemoryMetadataPtr();

        ASTPtr query;
        Chain out;

        ...

        if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))  // 依賴關係是 MATERIALIZED VIEW
        {
            type = QueryViewsLogElement::ViewType::MATERIALIZED;
            result_chain.addTableLock(materialized_view->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout));

            StoragePtr inner_table = materialized_view->getTargetTable();  // 拿到物化視圖的目標表
            auto inner_table_id = inner_table->getStorageID();
            auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr();
            query = dependent_metadata_snapshot->getSelectQuery().inner_query;
            target_name = inner_table_id.getFullTableName();

            /// Get list of columns we get from select query.
            auto header = InterpreterSelectQuery(query, select_context, SelectQueryOptions().analyze())
                .getSampleBlock();

            /// Insert only columns returned by select.
            Names insert_columns;
            const auto & inner_table_columns = inner_metadata_snapshot->getColumns();
            for (const auto & column : header)
            {
                /// But skip columns which storage doesn't have.
                if (inner_table_columns.hasPhysical(column.name))  // 注意,是通過列名匹配的,而不是位置,這在使用物化視圖時很容易犯錯
                    insert_columns.emplace_back(column.name);
            }

            InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false);  // 將物化視圖的插入邏輯也作爲 InterpreterInsertQuery 處理
            out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);
            out.addStorageHolder(dependent_table);
            out.addStorageHolder(inner_table);
        }
        else if (auto * live_view = dynamic_cast<StorageLiveView *>(dependent_table.get()))  // 依賴關係是 LIVE VIEW,忽略
        {
            ...
        }
        else if (auto * window_view = dynamic_cast<StorageWindowView *>(dependent_table.get()))  // 依賴關係是 WINDOW VIEW,忽略
        {
            ...
        }
        else
            out = buildPushingToViewsChain(
                dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, thread_status_holder, view_counter_ms);  // 我理解這裏是級聯物化視圖分支

        views_data->views.emplace_back(ViewRuntimeData{ //-V614
            std::move(query),
            out.getInputHeader(),
            database_table,
            nullptr,
            std::move(runtime_stats)});

        if (type == QueryViewsLogElement::ViewType::MATERIALIZED)
        {
            auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform>(
                storage_header, views_data->views.back(), views_data);
            executing_inner_query->setRuntimeData(view_thread_status, view_counter_ms);

            out.addSource(std::move(executing_inner_query));
        }

        chains.emplace_back(std::move(out));

        /// Add the view to the query access info so it can appear in system.query_log
        if (!no_destination)
        {
            context->getQueryContext()->addQueryAccessInfo(
                backQuoteIfNeed(database_table.getDatabaseName()), views_data->views.back().runtime_stats->target_name, {}"", database_table.getFullTableName());
        }
    }

    ...

    if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))
    {
        ...
    }
    else if (auto * window_view = dynamic_cast<StorageWindowView *>(storage.get()))
    {
        ...
    }
    /// Do not push to destination table if the flag is set
    else if (!no_destination)  // 物化視圖寫入邏輯
    {
        auto sink = storage->write(query_ptr, metadata_snapshot, context);  // 注意,第一個參數是傳入的 query_ptr,也就是說物化視圖的數據同樣直接來自於查詢,而不是依賴表
        metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName());
        sink->setRuntimeData(thread_status, elapsed_counter_ms);
        result_chain.addSource(std::move(sink));
    }

    ...

    return result_chain;
}

注意到 DatabaseCatalog::instance().getDependencies(table_id)(在文件 src/Interpreters/DatabaseCatalog.cpp)獲取 “依賴” 在這個表的關係 dependencies,查看源碼:

Dependencies DatabaseCatalog::getDependencies(const StorageID & from) const
{
    std::lock_guard lock{databases_mutex};
    auto iter = view_dependencies.find({from.getDatabaseName(), from.getTableName()});
    if (iter == view_dependencies.end())
        return {};
    return Dependencies(iter->second.begin(), iter->second.end());  // 查找到的 dependencies set 按照順序塞入 vector 返回
}

在頭文件聲明瞭 view_dependencies 和它的類型:

/// Table -> set of table-views that make SELECT from it.
using ViewDependencies = std::map<StorageID, std::set<StorageID>>;
using Dependencies = std::vector<StorageID>;
...
/// For some reason Context is required to get Storage from Database object
class DatabaseCatalog : boost::noncopyable, WithMutableContext
{
public:
    ...
private:
    ViewDependencies view_dependencies TSA_GUARDED_BY(databases_mutex);
}

這幾個函數的參數是 StorageID(在文件 src/Interpreters/StorageID.h),可以看到它的聲明:

struct StorageID
{
    String database_name;
    String table_name;
    UUID uuid = UUIDHelpers::Nil;
    ...
private:
    ...
};

由於 ViewDependencies 這個 map 的 value 是 std::set,在 cpp 中 std::set 的元素會用 std::set::key_comp 方法來排序 [^7],因此物化視圖的處理將按照字母順序。

總結

可以看到:

  1. 數據插入時,先處理原始表插入,再處理物化視圖的插入。

    ![[mv-insert-internally.excalidraw]]

  2. 有多個物化視圖時,按照字母順序依次處理。

  1. 當設置 parallel_view_processing=1 時,物化視圖並行處理

  2. 物化視圖不會讀取源表數據,而是插入時同一份數據依次插入源表、目標表。

  3. 物化視圖相當於 AFTER INSERT TRIGGER,對於目標表而言,不存在任何視圖概念,它只看到一個個 INSERT 查詢。

  4. 物化視圖可以級聯。

FAQ

前文通過源碼閱讀了解了物化視圖的底層邏輯,接下來從使用者的角度繼續分析物化視圖。

物化視圖使用場景

[!TIP] 真正的方向索引 ClickHouse 將在 23.1 引入真正的反向索引能力。[^8]

創建物化視圖

先看到官方文檔的 SQL:

CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE] AS SELECT ...

有兩種方式創建物化視圖:

  1. ENGINE 關鍵詞,ClickHouse 將創建隱式表(Implicit Table)作爲目標表

  2. TO 關鍵詞,需要用戶預先創建目標表

使用 ENGINE 時,ClickHouse 除了創建物化視圖,還會創建一個名爲 .inner.物化視圖名 的隱式表,隱式表其實就是正常的表只不過它以 . 開頭,直接使用它需要反引號 / 雙引號括起來。

POPULATE 只有使用隱式表時生效,它會在 ClickHouse 創建物化視圖後,將原始表 所有 的歷史數據全部處理寫入隱式表。如果原始表有海量數據,將使用大量資源、持續較長時間。

[!TIP] TO 如何插入歷史數據 手動執行 INSERT ... SELECT,最好按照 _partition_id_part 虛擬列分片插入。[^9]

這兩種方式有使用上的優劣區別:

itLEdD

因此建議使用 TO 創建物化視圖。

[!ERROR] 物化視圖不會讀源表 物化視圖和原始表磁盤上的數據沒有半點關係,換句話說:

  • 原始表是 SummingMergeTreeReplacingMergeTree 等等時,物化視圖不會 “看” 到處理後的數據

  • 在原始表上的 DML 不會影響到物化視圖和目標表

[!ERROR] 物化視圖使用列名插入數據 物化視圖通過列名插入數據而不是位置

CREATE MATERIALIZED VIEW mv (
 a Int64,
 d Date,
 cnt Int64
) ENGINE = SummingMergeTree
PARTITION BY toYYYYMM(d)
ORDER BY (a,d)
POPULATE
AS
SELECT
 a,
 d,
 count() AS cnt  -- 一定要注意 AS cnt
FROM source GROUP BY a, d;
數據副本碰上物化視圖

使用 ReplicatedMergeTree 家族的 Engine 和物化視圖時,物化視圖還能正常工作嗎?

不同 shard 之間不用考慮,因爲數據不相同,這裏只考慮同一個 shard 不同 replica 的情況:

需要注意,插入只會發生在一個節點,所以作爲插入觸發器的物化視圖也只會在插入發生的節點被觸發,接着由 Replicated 的同步機制把物化視圖目標表的數據同步到另一個 Replica。

所以沒問題~

分佈式表碰上物化視圖

現在假設一個場景,有 4 個 node,2 個 shard、2 個 replica,每個節點有個 source 本地表和 dest 本地表,並註冊了 source_dist 和 dest_dist 兩個分佈式表。我想要實現插入 source 的數據都進入到 dest,應該如何設計物化視圖?

排列組合一下,有下面四種方式:

答案是前三種可以滿足要求。但首推第一種,沒有網絡開銷,數據在節點內部處理、存儲。第二種、第三種只有在需要數據被打散分佈時使用,比如所 source 表根據用戶 id(user_id)分 shard,結果表想通過設備 id(device_id)分 shard。

第四種會導致所有 source 的數據都出現在每個節點,一般而言是錯誤使用。

Join 碰上物化視圖

絕對避免在物化視圖中使用 join,ClickHouse 使用 HashJoin,插入的每個 Block 都會導致物化視圖創建一個 hash 表,最終導致插入又重又慢。

可以通過可複用的數據結構實現 join 的能力 [^11]:

物化視圖級聯

物化視圖可以通過級聯(Cascade)串起來:

需要注意的是,級聯只能是不同物化視圖的 計算邏輯,比如第一個物化視圖 GROUP BY,第二個物化視圖 FILTER,與目標表沒有任何關係。設計物化視圖級聯時,大可以把前面物化視圖的目標表當作 Null 表,避免干擾。

PG 物化視圖對比

介紹完 ClickHouse 物化視圖,當然要對比下傳統 OLTP 關係型數據庫的物化視圖功能。

TWN9SE

物化視圖案例

下文給出幾個物化視圖的真實案例。

KafkaEngine

KakfaEngine 因爲很難錯誤調試被人詬病,比如在 21.6 版本之前,KafkaEngine 解析數據出錯只能通過 input_format_skip_unknown_fields 設置跳過 N 條錯誤消息,然後在系統日誌中查詢記錄:

select * from system.text_log where logger_name like '%Kafka%'

但這個 PR 被合入後有了新的錯誤檢查方法,給 KafkaEngine 新增一個配置 kafka_handle_error_mode='stream',每條消息將帶上 _error_raw_message 兩個虛擬列。

SQL 代碼如下 [^14]:

CREATE TABLE default.kafka_engine
(
    `i` Int64,
    `s` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092'
kafka_topic_list = 'topic',
kafka_group_name = 'clickhouse',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode='stream';

CREATE MATERIALIZED VIEW default.kafka_data
(
    `i` Int64,
    `s` String
)
ENGINE = MergeTree
ORDER BY (`i`)
AS
SELECT
    `i`,
    `s`
FROM default.kafka_engine
WHERE length(_error) = 0

CREATE MATERIALIZED VIEW default.kafka_errors
(
    `topic` String,
    `partition` Int64,
    `offset` Int64,
    `raw` String,
    `error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
    _topic AS topic,
    _partition AS partition,
    _offset AS offset,
    _raw_message AS raw,
    _error AS error
FROM default.kafka_engine
WHERE length(_error) > 0

讓 JDBC 支持插入二維數組

JDBC 無法支持二維數組,但是許多業務的的確確需要用到二維數組,除了換語言還可以使用物化視圖。

創建一個 Null 表使用 JDBC 支持的數據格式 String 傳輸嵌套結構的字符串,然後通過物化視圖解析插入到最終表:

CREATE TABLE IF NOT EXISTS entry (
    json_str String
)ENGINE = Null;

CREATE TABLE IF NOT EXISTS dest (
    two_diemnsional_array Array(Array(String))
)ENGINE = MergeTree()
ORDER BY tuple();

CREATE MATERIALIZED VIEW mv_dest TO dest
AS
SELECT
    JSONExtract(json_str, 'Array(Array(String))') as two_diemnsional_array
FROM entry;

多維表增量預聚合

ClickHouse 作爲 OLAP 數據庫經常使用多維表、大寬表的 schema,可是原始表直接用於多維分析,需要存儲的數據量過大,自然就想到用預聚合減少數據量。

物化視圖中的 GROUP BY 是針對每一個 Batch 而言的(流處理),當時間緯度橫跨很大,單單一個物化視圖恐怕不能很好地將數據聚合。於是可以考慮使用 SummingMergeTree/AggregatingMergeTree 實現先插入後增量聚合。

除此之外,對於高基數字段,比如用戶 id(user_id)、設備 id(device_id)這一類列,需要聚合時有不同場景的考量:

資源使用當然就是:概率統計基數 < 精確統計基數 << 保留每個元素

下面給出一個例子,高基數字段只用於統計基數,可以接受誤差:

CREATE TABLE IF NOT EXISTS event  -- 原始表
(
    `app_id` LowCardinality(String) CODEC(ZSTD(9)),
    `time` Int64 CODEC(Delta, ZSTD(9)),
    `user_id` String CODEC(Delta, ZSTD(9)),
    `device_id` String CODEC(Delta, ZSTD(9)),
    `d1` String CODEC(ZSTD(9)),
    `d2` String CODEC(ZSTD(9)),
    `d3` String CODEC(ZSTD(9)),
    `d4` String CODEC(ZSTD(9)),
    `d5` String CODEC(ZSTD(9)),
    `d6` String CODEC(ZSTD(9)),
    `v1` Int64 CODEC(T64, ZSTD(9)),
    `v2` Int64 CODEC(T64, ZSTD(9)),
    `v3` Int64 CODEC(T64, ZSTD(9)),
    `v4` Int64 CODEC(T64, ZSTD(9)),
    `v5` Int64 CODEC(T64, ZSTD(9)),
    `v6` Int64 CODEC(T64, ZSTD(9))
)ENGINE = MergeTree()
PARTITION BY intDiv(time, 2592000000)
ORDER BY (app_id, time)
TTL toDate(intDiv(time, 1000)) + toIntervalMonth(1)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1, use_minimalistic_part_header_in_zookeeper = 1;

CREATE TABLE IF NOT EXISTS event_agg_5min
(
    `app_id` LowCardinality(String) CODEC(ZSTD(9)),
    `time` Int64 CODEC(Delta, ZSTD(9)),
    `user_cnt` AggregateFunction(uniq, String) CODEC(ZSTD(9)),
    `device_cnt` AggregateFunction(uniq, String) CODEC(ZSTD(9)),
    `cnt` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
    `d1` String CODEC(ZSTD(9)),
    `d2` String CODEC(ZSTD(9)),
    `d3` String CODEC(ZSTD(9)),
    `d4` String CODEC(ZSTD(9)),
    `d5` String CODEC(ZSTD(9)),
    `d6` String CODEC(ZSTD(9)),
    `v1` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
    `v2` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
    `v3` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
    `v4` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
    `v5` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
    `v6` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9))
)
ENGINE = AggregatingMergeTree()  -- 落盤後增量預聚合
PARTITION BY intDiv(time, 2592000000)
PRIMARY KEY (app_id, time)
ORDER BY (app_id, time, d1, d2, d3, d4, d5, d6)  -- ORDER BY 需要包含所有“標籤”列
TTL toDate(intDiv(time, 1000)) + toIntervalMonth(1)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1, use_minimalistic_part_header_in_zookeeper = 1;

CREATE MATERIALIZED VIEW IF NOT EXISTS mv_event_agg_5min TO event_agg_5min AS
SELECT
    app_id,
    intDiv(entrance_time, 300000) * 300000 AS entrance_time,  -- 5 分鐘聚合
    uniqState(user_id) AS user_cnt,  -- AggregateFunction使用 -State 聚合
    uniqState(device_id) AS device_cnt,  -- 注意 AS 重命名
    count(*) AS cnt,
    d1,
    d2,
    d3,
    d4,
    d5,
    d6,
    sum(v1) as v1,  -- SimpleAggregateFunction 直接使用聚合函數
    sum(v2) as v2,
    sum(v3) as v3,
    sum(v4) as v4,
    sum(v5) as v5,
    sum(v6) as v6
FROM event
GROUP BY  -- 單個 Block 內預聚合
    app_id,
    category,
    d1,
    d2,
    d3,
    d4,
    d5,
    d6;

參考:

[1]: Abraham Silberschatz, Henry F. Korth, and S. Sudarshan, Database System Concepts, Seventh edition (New York, NY: McGraw-Hill, 2020). P47 

[11]: Denis Zhuravlev and Denny Crane, “Everything You Should Know about Materialized Views.,” n.d. 

[13]:https://www.postgresql.org/docs/9.4/sql-refreshmaterializedview.html [^14]:https://kb.altinity.com/altinity-kb-integrations/altinity-kb-kafka/error-handling/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/7RxBg1mxdpffcpWp02TJ7w