Hudi 集成 - Apache Hudi 集成 Spark SQL 搶先體驗

1. 摘要

社區小夥伴一直期待的 Hudi 整合 Spark SQL 的 HUDI-1659 正在積極 Review 中並已經快接近尾聲,Hudi 集成 Spark SQL 預計會在下個版本正式發佈,在集成 Spark SQL 後,會極大方便用戶對 Hudi 表的 DDL/DML 操作,下面來看看如何使用 Spark SQL 操作 Hudi 表。

2. 環境準備

首先需要將 HUDI-1659 拉取到本地打包,生成SPARK_BUNDLE_JAR(hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar)

2.1 啓動 spark-sql

在配置完 spark 環境後可通過如下命令啓動 spark-sql

spark-sql --jars $PATH_TO_SPARK_BUNDLE_JAR  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

2.2 設置併發度

由於 Hudi 默認 upsert/insert/delete 的併發度是 1500,對於演示的小規模數據集可設置更小的併發度。

set hoodie.upsert.shuffle.parallelism = 1;
set hoodie.insert.shuffle.parallelism = 1;
set hoodie.delete.shuffle.parallelism = 1;

同時設置不同步 Hudi 表元數據

set hoodie.datasource.meta.sync.enable=false;

3. Create Table

使用如下 SQL 創建表

create table test_hudi_table (
  id int,
  name string,
  price double,
  ts long,
  dt string
) using hudi
 partitioned by (dt)
 options (
  primaryKey = 'id',
  type = 'mor'
 )
 location 'file:///tmp/test_hudi_table'

說明:表類型爲 MOR,主鍵爲 id,分區字段爲 dt,合併字段默認爲 ts。

創建 Hudi 表後查看創建的 Hudi 表

show create table test_hudi_table

4. Insert Into

4.1 Insert

使用如下 SQL 插入一條記錄

 insert into test_hudi_table select 1 as id, 'hudi' as name, 10 as price, 1000 as ts, '2021-05-05' as dt

insert 完成後查看 Hudi 表本地目錄結構,生成的元數據、分區和數據與 Spark Datasource 寫入均相同。

4.2 Select

使用如下 SQL 查詢 Hudi 表數據

查詢結果如下

5. Update

5.1 Update

使用如下 SQL 將 id 爲 1 的 price 字段值變更爲 20

5.2 Select

再次查詢 Hudi 表數據

select * from test_hudi_table

查詢結果如下,可以看到 price 已經變成了 20.0

查看 Hudi 表的本地目錄結構如下,可以看到在 update 之後又生成了一個deltacommit,同時生成了一個增量 log 文件。

6. Delete

6.1 Delete

使用如下 SQL 將 id=1 的記錄刪除

delete from test_hudi_table where id = 1

查看 Hudi 表的本地目錄結構如下,可以看到 delete 之後又生成了一個deltacommit,同時生成了一個增量 log 文件。

6.2 Select

再次查詢 Hudi 表

查詢結果如下,可以看到已經查詢不到任何數據了,表明 Hudi 表中已經不存在任何記錄了。

7. Merge Into

7.1 Merge Into Insert

使用如下 SQL 向test_hudi_table插入數據

 merge into test_hudi_table as t0
 using (
  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt
 ) as s0
 on t0.id = s0.id
 when not matched and s0.id % 2 = 1 then insert *

7.2 Select

查詢 Hudi 表數據

select * from test_hudi_table

查詢結果如下,可以看到 Hudi 表中存在一條記錄

7.4 Merge Into Update

使用如下 SQL 更新數據

 merge into test_hudi_table as t0
 using (
  select 1 as id, 'a1' as name, 12 as price, 1001 as ts, '2021-03-21' as dt
 ) as s0
 on t0.id = s0.id
 when matched and s0.id % 2 = 1 then update set *

7.5 Select

查詢 Hudi 表

select * from test_hudi_table

查詢結果如下,可以看到 Hudi 表中的分區已經更新了

7.6 Merge Into Delete

使用如下 SQL 刪除數據

merge into test_hudi_table t0
 using (
  select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts, '2021-03-21' as dt
 ) s0
 on t0.id = s0.s_id
 when matched and s_ts = 1001 then delete

查詢結果如下,可以看到 Hudi 表中已經沒有數據了

8. 刪除表

使用如下命令刪除 Hudi 表

drop table test_hudi_table;

使用 show tables 查看錶是否存在

show tables;

可以看到已經沒有表了

9. 總結

通過上面示例簡單展示了通過 Spark SQL Insert/Update/Delete Hudi 表數據,通過 SQL 方式可以非常方便地操作 Hudi 表,降低了使用 Hudi 的門檻。另外 Hudi 集成 Spark SQL 工作將繼續完善語法,儘量對標 Snowflake 和 BigQuery 的語法,如插入多張表(INSERT ALL WHEN condition1 INTO t1 WHEN condition2 into t2),變更 Schema 以及 CALL Cleaner、CALL Clustering 等 Hudi 表服務。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/KoX5CBwx-pOPsbCBkMkMiQ