深入淺出 ClickHouse 物化視圖
作者:oliver
雖然官方文檔記錄了 ClickHouse 物化視圖很多詳細信息,但是使用物化視圖還是有很多小細節需要注意,更別說一些最佳實踐。本文總結了 ClickHouse 物化視圖使用上的各種問題,並展示三個實際案例。
存儲過程與觸發器
存儲過程:預編譯好的一組 SQL 程序,類似 無返回結果 的函數。
強調無返回是爲了和真正的 FUNCTION 區分開,這個有返回結果。
觸發器:特殊存儲過程,監聽特定事件自動調用。
數據庫查詢語言(query language)是數據庫管理系統(DBMS)提供給用戶和數據庫交互的工具,查詢語言分爲三類 [^1]:
-
命令式(Imperative):用戶控制系統一步步執行操作,計算、獲取數據。在計算過程中包含了可變的狀態變量。
-
函數式(Functional):用戶調用一系列函數鏈式執行計算、獲取數據。在計算過程中不包含狀態變量,無副作用。
-
聲明式(Non-Procedural/Declarative):用戶只聲明需要的數據,由數據庫管理系統實現計算過程並返回數據。
[!TIP] 三類查詢語言並不是邊界分明 工程中的查詢語言,會同時包含多種查詢語言的特性。[^2]
人們往往認爲 SQL 是用於關係模型(Relational Model)數據庫的聲明式查詢語言,但這個世界並不是非黑即白,聲明式語言雖然降低用戶學習成本,但數據庫承擔了檢查(詞法分析)、翻譯(編譯)、優化最後執行的過程。但如果業務需要一遍又一遍執行某一段相同的邏輯,每次都要重新走一遍流程,顯然不可接受。於是各大關係數據庫系統幾乎都引入了過程擴展,比如 PG 使用的 PL/pgSQL[^3],它包含變量定義、條件控制和循環等等過程式語言的元素。
那麼引入本節的主角:存儲過程(Stored Procedure),預先編譯好的一段邏輯(用過程語言),可以大大加快執行速度。
而觸發器(Trigger)則是一種特殊的存儲過程,它監聽某些數據庫事件,可以在事件發生前 / 中 / 後調用。[^4]
從事件類型上看,觸發器分爲:
-
DDL 觸發器
-
DML 觸發器
從觸發動作上看 [^5],觸發器分爲:
-
事前、事後觸發器(BEFORE、AFTER)
-
替換觸發器(INSTEAD OF)
那麼觸發器有什麼業務場景呢?舉個最簡單的例子,記錄某張表的審計日誌(Audit Log),把所有 DML 操作都通過觸發器記錄下來。
ClickHouse 物化視圖
ClickHouse 作爲關係型 OLAP(OnLine Analytical Processing)數據庫,很遺憾不支持存儲過程。[^6]
[!TIP] ClickHouse 存儲過程的實現狀況 在 2023 年 Roadmap 中 Experimental features and research 部分可以看到 refreshable materialized views,有生之年
但非常有意思的是,ClickHouse 提供了物化視圖(Materialized View)的特殊功能,在功能上相當於 AFTER INSERT 觸發器,物化視圖仍然使用 聲明式 SQL 定義計算邏輯。
源碼閱讀
[!TIP] 提示 可以直接跳到 總結 部分。
[!TIP] ClickHouse 版本 本文源碼閱讀基於 ClickHouse 22.3 版本
StorageMaterializedView
首先看到物化視圖的類聲明 src/Storages/StorageMaterializedView.h
:
class StorageMaterializedView final : public IStorage, WithMutableContext
{
public:
...
private:
/// Will be initialized in constructor
StorageID target_table_id = StorageID::createEmpty();
bool has_inner_table = false;
...
}
可以看到物化視圖繼承自 IStorage
類,從它的類註釋中可以看到它管理的功能。物化視圖和 StorageMerge
一樣都繼承自這個管理數據存儲的類,作爲一個視圖,莫非也有實際存儲?此外,物化視圖用 target_table_id
存儲了別的表的 id。接下來看看 IStorage
的類註解:
/** Storage. Describes the table. Responsible for
* - storage of the table data;
* - the definition in which files (or not in files) the data is stored;
* - data lookups and appends;
* - data storage structure (compression, etc.)
* - concurrent access to data (locks, etc.)
*/
如果讀物化視圖會發生什麼?跳轉到重載 IStorage
的 StorageMaterializedView::read
方法定義:
void StorageMaterializedView::read(
...)
{
auto storage = getTargetTable();
...
storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
...
}
以及 StorageMaterializedView::getTargetTable
方法定義:
StoragePtr StorageMaterializedView::getTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
}
讀操作是對 target_table_id
對應的表進行的,那麼就清晰了,物化視圖並不會存儲數據,會將查詢重定向到目標表。
做個實驗簡單驗證:
create table test( time DateTime) Engine=Memory();
create table source(time DateTime) Engine=Memory();
create materialized view mv_test to test as select time from source;
insert into table source values(now());
select * from test;
┌────────────────time─┐
│ 2023-02-25 18:51:41 │
└─────────────────────┘
select * from mv_test;
┌────────────────time─┐
│ 2023-02-25 18:51:41 │
└─────────────────────┘
explain select * from test;
┌─explain───────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│ ReadFromStorage (Memory) │
└───────────────────────────────────────────────────────────────────────────┘
explain select * from mv_test;
┌─explain───────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│ SettingQuotaAndLimits (Lock destination table for MaterializedView) │
│ ReadFromStorage (Memory) │
└───────────────────────────────────────────────────────────────────────────┘
注意 Lock destination table for MaterializedView
,符合猜測。
接着寫操作會發生什麼?猜測也會重定向到目標表,看看 StorageMaterializedView::write
方法的定義:
SinkToStoragePtr StorageMaterializedView::write(...)
{
auto storage = getTargetTable();
...
auto sink = storage->write(query, metadata_snapshot, local_context);
...
}
再做個小實驗驗證:
insert into table mv_test values(now());
select * from test;
┌────────────────time─┐
│ 2023-02-25 18:51:41 │
└─────────────────────┘
┌────────────────time─┐
│ 2023-02-25 19:11:20 │
└─────────────────────┘
符合猜測。
解答完疑惑,回到正常閱讀順序來,接下來閱讀構造器的代碼 src/Storages/StorageMaterializedView.cpp
:
StorageMaterializedView::StorageMaterializedView(
const StorageID & table_id_,
ContextPtr local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns_,
bool attach_,
const String & comment)
: IStorage(table_id_), WithMutableContext(local_context->getGlobalContext())
{
...
/// If the destination table is not set, use inner table
has_inner_table = query.to_table_id.empty(); // 出現了新概念 inner table
if (has_inner_table && !query.storage) // 創建物化視圖時,要麼有 ENGINE 使用 inner table,要麼用 TO 使用外部表
throw Exception(
"You must specify where to save results of a MaterializedView query: either ENGINE or an existing table in a TO clause",
ErrorCodes::INCORRECT_QUERY);
if (query.select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for MATERIALIZED VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW);
...
// 設置 to_table_id
if (!has_inner_table)
{
target_table_id = query.to_table_id;
}
else if (attach_)
{
/// If there is an ATTACH request, then the internal table must already be created.
target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);
}
else // 創建inner table
{
/// We will create a query to create an internal table.
...
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID(); // 看來 ClickHouse 有個全局表註冊表
}
}
可以看到:
-
物化視圖創建時需要指定目標表,不然會自己創建 inner 表
-
物化視圖不能使用
UNION
-
ClickHouse 系統有個表的 “註冊表”,維護系統所有表的 id- 實例映射
IInterpreter、InterpreterInsertQuery
那麼下一個問題,對原始表插入數據,數據又怎麼經過物化視圖跑到目標表的?
首先關注查詢類 src/Interpreters/IInterpreter.h
:
/** Interpreters interface for different queries.
*/
class IInterpreter
{
public:
/** For queries that return a result (SELECT and similar), sets in BlockIO a stream from which you can read this result.
* For queries that receive data (INSERT), sets a thread in BlockIO where you can write data.
* For queries that do not require data and return nothing, BlockIO will be empty.
*/
virtual BlockIO execute() = 0;
...
}
當插入(INSERT)數據時,系統會調用 IInterpreter
的子類 src/Interpreters/InterpreterInsertQuery.cpp
處理查詢,先看到它的聲明:
/** Interprets the INSERT query.
*/
class InterpreterInsertQuery : public IInterpreter, WithContext
{
public:
...
/** Prepare a request for execution. Return block streams
* - the stream into which you can write data to execute the query, if INSERT;
* - the stream from which you can read the result of the query, if SELECT and similar;
* Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result).
*/
BlockIO execute() override;
...
private
...
Chain buildChainImpl(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
std::atomic_uint64_t * elapsed_counter_ms);
};
繼續看它的定義(只看 INSERT 分支):
BlockIO InterpreterInsertQuery::execute()
{
...
StoragePtr table = getTable(query);
...
StoragePtr inner_table;
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get())) // 如果 insert query 指向的表是 StorageMaterializedView,目標表取出放到 inner_table 變量中
inner_table = mv->getTargetTable();
...
std::vector<Chain> out_chains;
if (!distributed_pipeline || query.watch)
{
size_t out_streams_size = 1;
if (query.select) // 處理 INSERT SELECT,忽略
{
...
}
else if (query.watch) // 處理 LIVE VIEW 的 WATCH 語句,直接忽略即可
{
...
}
for (size_t i = 0; i < out_streams_size; ++i)
{
auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr); // 構建 chain,重要!!!
out_chains.emplace_back(std::move(out));
}
}
BlockIO res;
/// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
if (distributed_pipeline)
{
res.pipeline = std::move(*distributed_pipeline);
}
else if (query.select || query.watch)
{
... // 直接忽略
}
else // 關注這個分支,query 是 INSERT 時
{
res.pipeline = QueryPipeline(std::move(out_chains.at(0))); // 將 chain 第一個元素構造返回 BlockIO 的 pushing pipeline
res.pipeline.setNumThreads(std::min<size_t>(res.pipeline.getNumThreads(), settings.max_threads)); // 設置 query 的配置
if (query.hasInlinedData() && !async_insert)
{ // 也就是 INSERT 語句帶了 VALUES (...),可以直接從語句中拿到要插入的數據
/// can execute without additional data
auto pipe = getSourceFromASTInsertQuery(query_ptr, true, query_sample_block, getContext(), nullptr);
res.pipeline.complete(std::move(pipe));
}
}
res.pipeline.addResources(std::move(resources));
res.pipeline.addStorageHolder(table); // 將 query 的目標表放入 pipeline 資源列表
if (inner_table) // 如果有物化視圖
res.pipeline.addStorageHolder(inner_table); // 把物化視圖的目標表也放到 pipeline 的資源列表
return res;
}
可以看到方法內調用了 InterpreterInsertQuery::buildChainImpl
,接着看這個方法的定義:
Chain InterpreterInsertQuery::buildChainImpl(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
std::atomic_uint64_t * elapsed_counter_ms)
{
...
/// We create a pipeline of several streams, into which we will write data.
Chain out;
/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyed
out.addInterpreterContext(context_ptr);
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination) // table->noPushingToViews() 用於禁止物化視圖插入數據到 KafkaEngine
{
auto sink = table->write(query_ptr, metadata_snapshot, context_ptr);
sink->setRuntimeData(thread_status, elapsed_counter_ms);
out.addSource(std::move(sink));
}
else // 構建物化視圖插入 pushingToViewChain,重點!!!
{
out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);
}
...
return out;
}
Chain 相關
接着來到文件 src/Processors/Transforms/buildPushingToViewsChain.cpp
:
Chain buildPushingToViewsChain(
const StoragePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const ASTPtr & query_ptr,
bool no_destination,
ThreadStatusesHolderPtr thread_status_holder,
std::atomic_uint64_t * elapsed_counter_ms,
const Block & live_view_header)
{
...
auto table_id = storage->getStorageID();
Dependencies dependencies = DatabaseCatalog::instance().getDependencies(table_id); // 重點,通過 table_id,拿到“依賴“這個表的 dependencies
/// We need special context for materialized views insertions
ContextMutablePtr select_context;
ContextMutablePtr insert_context;
ViewsDataPtr views_data;
if (!dependencies.empty())
{
... // 把query的各種上下文拆出
}
std::vector<Chain> chains;
for (const auto & database_table : dependencies)
{
auto dependent_table = DatabaseCatalog::instance().getTable(database_table, context);
auto dependent_metadata_snapshot = dependent_table->getInMemoryMetadataPtr();
ASTPtr query;
Chain out;
...
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get())) // 依賴關係是 MATERIALIZED VIEW
{
type = QueryViewsLogElement::ViewType::MATERIALIZED;
result_chain.addTableLock(materialized_view->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout));
StoragePtr inner_table = materialized_view->getTargetTable(); // 拿到物化視圖的目標表
auto inner_table_id = inner_table->getStorageID();
auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr();
query = dependent_metadata_snapshot->getSelectQuery().inner_query;
target_name = inner_table_id.getFullTableName();
/// Get list of columns we get from select query.
auto header = InterpreterSelectQuery(query, select_context, SelectQueryOptions().analyze())
.getSampleBlock();
/// Insert only columns returned by select.
Names insert_columns;
const auto & inner_table_columns = inner_metadata_snapshot->getColumns();
for (const auto & column : header)
{
/// But skip columns which storage doesn't have.
if (inner_table_columns.hasPhysical(column.name)) // 注意,是通過列名匹配的,而不是位置,這在使用物化視圖時很容易犯錯
insert_columns.emplace_back(column.name);
}
InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false); // 將物化視圖的插入邏輯也作爲 InterpreterInsertQuery 處理
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);
out.addStorageHolder(dependent_table);
out.addStorageHolder(inner_table);
}
else if (auto * live_view = dynamic_cast<StorageLiveView *>(dependent_table.get())) // 依賴關係是 LIVE VIEW,忽略
{
...
}
else if (auto * window_view = dynamic_cast<StorageWindowView *>(dependent_table.get())) // 依賴關係是 WINDOW VIEW,忽略
{
...
}
else
out = buildPushingToViewsChain(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, thread_status_holder, view_counter_ms); // 我理解這裏是級聯物化視圖分支
views_data->views.emplace_back(ViewRuntimeData{ //-V614
std::move(query),
out.getInputHeader(),
database_table,
nullptr,
std::move(runtime_stats)});
if (type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform>(
storage_header, views_data->views.back(), views_data);
executing_inner_query->setRuntimeData(view_thread_status, view_counter_ms);
out.addSource(std::move(executing_inner_query));
}
chains.emplace_back(std::move(out));
/// Add the view to the query access info so it can appear in system.query_log
if (!no_destination)
{
context->getQueryContext()->addQueryAccessInfo(
backQuoteIfNeed(database_table.getDatabaseName()), views_data->views.back().runtime_stats->target_name, {}, "", database_table.getFullTableName());
}
}
...
if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))
{
...
}
else if (auto * window_view = dynamic_cast<StorageWindowView *>(storage.get()))
{
...
}
/// Do not push to destination table if the flag is set
else if (!no_destination) // 物化視圖寫入邏輯
{
auto sink = storage->write(query_ptr, metadata_snapshot, context); // 注意,第一個參數是傳入的 query_ptr,也就是說物化視圖的數據同樣直接來自於查詢,而不是依賴表
metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName());
sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::move(sink));
}
...
return result_chain;
}
注意到 DatabaseCatalog::instance().getDependencies(table_id)
(在文件 src/Interpreters/DatabaseCatalog.cpp
)獲取 “依賴” 在這個表的關係 dependencies,查看源碼:
Dependencies DatabaseCatalog::getDependencies(const StorageID & from) const
{
std::lock_guard lock{databases_mutex};
auto iter = view_dependencies.find({from.getDatabaseName(), from.getTableName()});
if (iter == view_dependencies.end())
return {};
return Dependencies(iter->second.begin(), iter->second.end()); // 查找到的 dependencies set 按照順序塞入 vector 返回
}
在頭文件聲明瞭 view_dependencies
和它的類型:
/// Table -> set of table-views that make SELECT from it.
using ViewDependencies = std::map<StorageID, std::set<StorageID>>;
using Dependencies = std::vector<StorageID>;
...
/// For some reason Context is required to get Storage from Database object
class DatabaseCatalog : boost::noncopyable, WithMutableContext
{
public:
...
private:
ViewDependencies view_dependencies TSA_GUARDED_BY(databases_mutex);
}
這幾個函數的參數是 StorageID
(在文件 src/Interpreters/StorageID.h
),可以看到它的聲明:
struct StorageID
{
String database_name;
String table_name;
UUID uuid = UUIDHelpers::Nil;
...
private:
...
};
由於 ViewDependencies
這個 map 的 value 是 std::set
,在 cpp 中 std::set
的元素會用 std::set::key_comp
方法來排序 [^7],因此物化視圖的處理將按照字母順序。
總結
可以看到:
-
數據插入時,先處理原始表插入,再處理物化視圖的插入。
![[mv-insert-internally.excalidraw]]
-
有多個物化視圖時,按照字母順序依次處理。
-
當設置
parallel_view_processing=1
時,物化視圖並行處理 -
物化視圖不會讀取源表數據,而是插入時同一份數據依次插入源表、目標表。
-
物化視圖相當於 AFTER INSERT TRIGGER,對於目標表而言,不存在任何視圖概念,它只看到一個個 INSERT 查詢。
-
物化視圖可以級聯。
FAQ
前文通過源碼閱讀了解了物化視圖的底層邏輯,接下來從使用者的角度繼續分析物化視圖。
物化視圖使用場景
-
數據預聚合 / 數據增量聚合
-
數據預處理 / ET(Extract-Transform)
-
以另一組 ORDER BY 存儲數據(模擬方向索引)
-
KafkaEngine
[!TIP] 真正的方向索引 ClickHouse 將在 23.1 引入真正的反向索引能力。[^8]
創建物化視圖
先看到官方文檔的 SQL:
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE] AS SELECT ...
有兩種方式創建物化視圖:
-
有
ENGINE
關鍵詞,ClickHouse 將創建隱式表(Implicit Table)作爲目標表 -
有
TO
關鍵詞,需要用戶預先創建目標表
使用 ENGINE
時,ClickHouse 除了創建物化視圖,還會創建一個名爲 .inner.物化視圖名
的隱式表,隱式表其實就是正常的表只不過它以 .
開頭,直接使用它需要反引號 / 雙引號括起來。
POPULATE
只有使用隱式表時生效,它會在 ClickHouse 創建物化視圖後,將原始表 所有 的歷史數據全部處理寫入隱式表。如果原始表有海量數據,將使用大量資源、持續較長時間。
[!TIP]
TO
如何插入歷史數據 手動執行INSERT ... SELECT
,最好按照_partition_id
、_part
虛擬列分片插入。[^9]
這兩種方式有使用上的優劣區別:
因此建議使用 TO
創建物化視圖。
[!ERROR] 物化視圖不會讀源表 物化視圖和原始表磁盤上的數據沒有半點關係,換句話說:
原始表是
SummingMergeTree
、ReplacingMergeTree
等等時,物化視圖不會 “看” 到處理後的數據在原始表上的 DML 不會影響到物化視圖和目標表
[!ERROR] 物化視圖使用列名插入數據 物化視圖通過列名插入數據而不是位置
CREATE MATERIALIZED VIEW mv ( a Int64, d Date, cnt Int64 ) ENGINE = SummingMergeTree PARTITION BY toYYYYMM(d) ORDER BY (a,d) POPULATE AS SELECT a, d, count() AS cnt -- 一定要注意 AS cnt FROM source GROUP BY a, d;
數據副本碰上物化視圖
使用 ReplicatedMergeTree
家族的 Engine 和物化視圖時,物化視圖還能正常工作嗎?
不同 shard 之間不用考慮,因爲數據不相同,這裏只考慮同一個 shard 不同 replica 的情況:
需要注意,插入只會發生在一個節點,所以作爲插入觸發器的物化視圖也只會在插入發生的節點被觸發,接着由 Replicated
的同步機制把物化視圖目標表的數據同步到另一個 Replica。
所以沒問題~
分佈式表碰上物化視圖
現在假設一個場景,有 4 個 node,2 個 shard、2 個 replica,每個節點有個 source 本地表和 dest 本地表,並註冊了 source_dist 和 dest_dist 兩個分佈式表。我想要實現插入 source 的數據都進入到 dest,應該如何設計物化視圖?
排列組合一下,有下面四種方式:
-
source -> dest
-
source_dist -> dest_dist
-
source -> dest_dist
-
source_dist -> dest
答案是前三種可以滿足要求。但首推第一種,沒有網絡開銷,數據在節點內部處理、存儲。第二種、第三種只有在需要數據被打散分佈時使用,比如所 source 表根據用戶 id(user_id)分 shard,結果表想通過設備 id(device_id)分 shard。
第四種會導致所有 source 的數據都出現在每個節點,一般而言是錯誤使用。
Join 碰上物化視圖
絕對避免在物化視圖中使用 join,ClickHouse 使用 HashJoin,插入的每個 Block 都會導致物化視圖創建一個 hash 表,最終導致插入又重又慢。
可以通過可複用的數據結構實現 join 的能力 [^11]:
物化視圖級聯
物化視圖可以通過級聯(Cascade)串起來:
需要注意的是,級聯只能是不同物化視圖的 計算邏輯,比如第一個物化視圖 GROUP BY,第二個物化視圖 FILTER,與目標表沒有任何關係。設計物化視圖級聯時,大可以把前面物化視圖的目標表當作 Null
表,避免干擾。
PG 物化視圖對比
介紹完 ClickHouse 物化視圖,當然要對比下傳統 OLTP 關係型數據庫的物化視圖功能。
物化視圖案例
下文給出幾個物化視圖的真實案例。
KafkaEngine
KakfaEngine
因爲很難錯誤調試被人詬病,比如在 21.6 版本之前,KafkaEngine
解析數據出錯只能通過 input_format_skip_unknown_fields
設置跳過 N 條錯誤消息,然後在系統日誌中查詢記錄:
select * from system.text_log where logger_name like '%Kafka%'
但這個 PR 被合入後有了新的錯誤檢查方法,給 KafkaEngine
新增一個配置 kafka_handle_error_mode='stream'
,每條消息將帶上 _error
和 _raw_message
兩個虛擬列。
SQL 代碼如下 [^14]:
CREATE TABLE default.kafka_engine
(
`i` Int64,
`s` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092'
kafka_topic_list = 'topic',
kafka_group_name = 'clickhouse',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode='stream';
CREATE MATERIALIZED VIEW default.kafka_data
(
`i` Int64,
`s` String
)
ENGINE = MergeTree
ORDER BY (`i`)
AS
SELECT
`i`,
`s`
FROM default.kafka_engine
WHERE length(_error) = 0
CREATE MATERIALIZED VIEW default.kafka_errors
(
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM default.kafka_engine
WHERE length(_error) > 0
讓 JDBC 支持插入二維數組
JDBC 無法支持二維數組,但是許多業務的的確確需要用到二維數組,除了換語言還可以使用物化視圖。
創建一個 Null 表使用 JDBC 支持的數據格式 String 傳輸嵌套結構的字符串,然後通過物化視圖解析插入到最終表:
CREATE TABLE IF NOT EXISTS entry (
json_str String
)ENGINE = Null;
CREATE TABLE IF NOT EXISTS dest (
two_diemnsional_array Array(Array(String))
)ENGINE = MergeTree()
ORDER BY tuple();
CREATE MATERIALIZED VIEW mv_dest TO dest
AS
SELECT
JSONExtract(json_str, 'Array(Array(String))') as two_diemnsional_array
FROM entry;
多維表增量預聚合
ClickHouse 作爲 OLAP 數據庫經常使用多維表、大寬表的 schema,可是原始表直接用於多維分析,需要存儲的數據量過大,自然就想到用預聚合減少數據量。
物化視圖中的 GROUP BY 是針對每一個 Batch 而言的(流處理),當時間緯度橫跨很大,單單一個物化視圖恐怕不能很好地將數據聚合。於是可以考慮使用 SummingMergeTree/AggregatingMergeTree 實現先插入後增量聚合。
除此之外,對於高基數字段,比如用戶 id(user_id)、設備 id(device_id)這一類列,需要聚合時有不同場景的考量:
-
若只用於統計基數
-
精確統計:bitmap
-
概率統計:
uniqState
+uniqMerge
-
需要保留每一個元素用於 filter:set/array
資源使用當然就是:概率統計基數 < 精確統計基數 << 保留每個元素
下面給出一個例子,高基數字段只用於統計基數,可以接受誤差:
CREATE TABLE IF NOT EXISTS event -- 原始表
(
`app_id` LowCardinality(String) CODEC(ZSTD(9)),
`time` Int64 CODEC(Delta, ZSTD(9)),
`user_id` String CODEC(Delta, ZSTD(9)),
`device_id` String CODEC(Delta, ZSTD(9)),
`d1` String CODEC(ZSTD(9)),
`d2` String CODEC(ZSTD(9)),
`d3` String CODEC(ZSTD(9)),
`d4` String CODEC(ZSTD(9)),
`d5` String CODEC(ZSTD(9)),
`d6` String CODEC(ZSTD(9)),
`v1` Int64 CODEC(T64, ZSTD(9)),
`v2` Int64 CODEC(T64, ZSTD(9)),
`v3` Int64 CODEC(T64, ZSTD(9)),
`v4` Int64 CODEC(T64, ZSTD(9)),
`v5` Int64 CODEC(T64, ZSTD(9)),
`v6` Int64 CODEC(T64, ZSTD(9))
)ENGINE = MergeTree()
PARTITION BY intDiv(time, 2592000000)
ORDER BY (app_id, time)
TTL toDate(intDiv(time, 1000)) + toIntervalMonth(1)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1, use_minimalistic_part_header_in_zookeeper = 1;
CREATE TABLE IF NOT EXISTS event_agg_5min
(
`app_id` LowCardinality(String) CODEC(ZSTD(9)),
`time` Int64 CODEC(Delta, ZSTD(9)),
`user_cnt` AggregateFunction(uniq, String) CODEC(ZSTD(9)),
`device_cnt` AggregateFunction(uniq, String) CODEC(ZSTD(9)),
`cnt` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
`d1` String CODEC(ZSTD(9)),
`d2` String CODEC(ZSTD(9)),
`d3` String CODEC(ZSTD(9)),
`d4` String CODEC(ZSTD(9)),
`d5` String CODEC(ZSTD(9)),
`d6` String CODEC(ZSTD(9)),
`v1` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
`v2` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
`v3` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
`v4` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
`v5` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9)),
`v6` SimpleAggregateFunction(sum, Int64) CODEC(T64, ZSTD(9))
)
ENGINE = AggregatingMergeTree() -- 落盤後增量預聚合
PARTITION BY intDiv(time, 2592000000)
PRIMARY KEY (app_id, time)
ORDER BY (app_id, time, d1, d2, d3, d4, d5, d6) -- ORDER BY 需要包含所有“標籤”列
TTL toDate(intDiv(time, 1000)) + toIntervalMonth(1)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1, use_minimalistic_part_header_in_zookeeper = 1;
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_event_agg_5min TO event_agg_5min AS
SELECT
app_id,
intDiv(entrance_time, 300000) * 300000 AS entrance_time, -- 5 分鐘聚合
uniqState(user_id) AS user_cnt, -- AggregateFunction使用 -State 聚合
uniqState(device_id) AS device_cnt, -- 注意 AS 重命名
count(*) AS cnt,
d1,
d2,
d3,
d4,
d5,
d6,
sum(v1) as v1, -- SimpleAggregateFunction 直接使用聚合函數
sum(v2) as v2,
sum(v3) as v3,
sum(v4) as v4,
sum(v5) as v5,
sum(v6) as v6
FROM event
GROUP BY -- 單個 Block 內預聚合
app_id,
category,
d1,
d2,
d3,
d4,
d5,
d6;
參考:
[1]: Abraham Silberschatz, Henry F. Korth, and S. Sudarshan, Database System Concepts, Seventh edition (New York, NY: McGraw-Hill, 2020). P47
[11]: Denis Zhuravlev and Denny Crane, “Everything You Should Know about Materialized Views.,” n.d.
[13]:https://www.postgresql.org/docs/9.4/sql-refreshmaterializedview.html [^14]:https://kb.altinity.com/altinity-kb-integrations/altinity-kb-kafka/error-handling/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/7RxBg1mxdpffcpWp02TJ7w