Spark 離線開發框架設計與實現

導讀:本文介紹了開發框架的整體設計,隨後對各模塊進行了拆解,重點介紹瞭如何快速實現應用程序的開發,並從設計思路、實現方式、功能介紹及創建方式等角度對通用的數據回溯應用進行了全面介紹,實現了一次環境準備,多數據回溯任務的啓動方案。總之,框架對開發效率、回溯任務的效率與維護成本及代碼管理便捷性都會有顯著的效果。

一、背景

隨着 Spark 以及其社區的不斷髮展,Spark 本身技術也在不斷成熟,Spark 在技術架構和性能上的優勢越來越明顯,目前大多數公司在大數據處理中都傾向使用 Spark。Spark 支持多種語言的開發,如 Scala、Java、Sql、Python 等。

Spark SQL 使用標準的數據連接,與 Hive 兼容,易與其它語言 API 整合,表達清晰、簡單易上手、學習成本低,是開發者開發簡單數據處理的首選語言,但對於複雜的數據處理、數據分析的開發,使用 SQL 開發顯得力不從心,維護成本也非常高,使用高級語言處理會更高效。

在日常的數據倉庫開發工作中,我們除了開發工作外,也涉及大量的數據回溯任務。對於創新型業務來說,口徑變化頻繁、業務迅速迭代,數據倉庫的回溯非常常見,通過回溯幾個月甚至一年是非常普遍的,但傳統的回溯任務方式效率極低,而且需要人力密切關注各任務狀態。

針對目前現狀,我們開發了一套 Spark 離線開發框架,如下表所示,我們例舉了目前存在的問題及解決方案。框架的實現不僅讓開發變得簡單高效,而且對於數據的回溯工作在不需要任何開發的情況下,快速高效地完成大量的回溯工作。

二、框架設計

框架旨在封裝重複的工作,讓開發變得簡單。框架如圖 2-1 所示,主要分爲三個部分,基礎框架、可擴展工具及應用程序,開發者只需關注應用程序即可簡單快速實現代碼開發。

2.1 基礎框架

基礎框架中,我們對於所有類型的應用實現代碼與配置分離機制,資源配置統一以 XML 文件形式保存並由框架解析處理。框架會根據開發者配置的任務使用資源大小,完成了 SparkSession、SparkContext、SparkConf 的創建,同時加載了常用環境變量,開發了通用的 UDF 函數(如常用的 url 參數解析等)。其中 Application 爲所有應用的父類,處理流程如圖所示,開發者只需編寫關注綠色部分即可。

目前,離線框架所支持的常用環境變量如下表所示。

2.2 可擴展工具

可擴展工具中包含了大量的工具類,服務於應用程序及基礎框架,常用有,配置文件解析類,如解析任務資源參數等;數據庫工具類,用於讀寫數據庫;日期工具類,用於日期加減、轉換、識別並解析環境變量等。服務於應用程序的通用工具模塊可統稱爲可擴展工具,這裏不再贅述。

2.3 應用程序

2.3.1 SQL 應用

對於 SQL 應用,只需要創建 SQL 代碼及資源配置即可,應用類爲唯一類(已實現),有且只有一個,供所有 SQL 應用使用,開發者無需關心。如下配置所示,class 爲所有應用的唯一類名,開發者要關心的是 path 中的 sql 代碼及 conf 中該 sql 所使用的資源大小。

<?xml version="1.0" encoding="UTF-8"?>
<project >
    <class>com.way.app.instance.SqlExecutor</class>
    <path>sql文件路徑</path>
  <!--    sparksession conf   -->
    <conf>
        <spark.executor.memory>1G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>1G</spark.driver.memory>
        <spark.executor.instances>20</spark.executor.instances>
    </conf>
</project>

2.3.2 Java 應用

對於複雜的數據處理,SQL 代碼不能滿足需求時,我們也支持 Java 程序的編寫,與 SQL 不同的是,開發者需要創建新的應用類,繼承 Application 父類並實現 run 方法即可,run 方法中開發者只需要關注數據的處理邏輯,對於通用的 SparkSession、SparkContext 等創建及關閉無需關注,框架還幫助開發者封裝了代碼的輸入、輸出邏輯,對於輸入類型,框架支持 HDFS 文件輸入、SQL 輸入等多種輸入類型,開發者只需調用相關處理函數即可。

如下爲一個簡單的 Java 數據處理應用,從配置文件可以看出,仍需配置資源大小,但與 SQL 不同的是,開發者需要定製化編寫對應的 Java 類(class 參數),以及應用的輸入(input 參數)和輸出參數(output 參數),此應用中輸入爲 SQL 代碼,輸出爲 HDFS 文件。從 Test 類實現可以看出,開發者只需三步走:獲取輸入數據、邏輯處理、結果輸出,即可完成代碼編寫。

<?xml version="1.0" encoding="UTF-8"?>
<project >
    <class>com.way.app.instance.ecommerce.Test</class>
    <input>
        <type>table</type>
        <sql>select
            clk_url,
            clk_num
            from test_table
            where event_day='{DATE}'
            and click_pv > 0
            and is_ubs_spam=0
        </sql>
    </input>
    <output>
        <type>afs_kp</type>
        <path>test/event_day={DATE}</path>
    </output>
    <conf>
        <spark.executor.memory>2G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>2G</spark.driver.memory>
        <spark.executor.instances>10</spark.executor.instances>
    </conf>
</project>
package com.way.app.instance.ecommerce;

import com.way.app.Application;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import java.util.Map;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;

public class Test extends Application {
    @Override
    public void run() {
        // 輸入
        Map<String, String> input = (Map<String, String>) property.get("input");
        Dataset<Row> ds = sparkSession.sql(getInput(input)).toDF("url", "num");
        // 邏輯處理(簡單的篩選出url帶有部分站點的日誌)
        JavaRDD<String> outRdd = ds.filter((FilterFunction<Row>) row -> {
            String url = row.getAs("url").toString();
            return url.contains(".jd.com")
                    || url.contains(".suning.com")
                    || url.contains("pin.suning.com")  
                    || url.contains(".taobao.com")
                    || url.contains("detail.tmall.hk")
                    || url.contains(".amazon.cn")
                    || url.contains(".kongfz.com")
                    || url.contains(".gome.com.cn")
                    || url.contains(".kaola.com")
                    || url.contains(".dangdang.com")
                    || url.contains("aisite.wejianzhan.com")
                    || url.contains("w.weipaitang.com");
        })
                .toJavaRDD()
                .map(row -> row.mkString("\001"));
        // 輸出
        Map<String, String> output = (Map<String, String>) property.get("output");
        outRdd.saveAsTextFile(getOutPut(output));
    }
}

2.3.3 數據回溯應用

數據回溯應用是爲解決快速回溯、釋放人力而研發的,使用非常便捷,開發者無需重構任務代碼,與 SQL 應用相同,回溯應用類爲唯一類(已實現),有且只有一個,供所有回溯任務使用,且支持多種回溯方案。

2.3.3.1 方案設計

在日常回溯過程中發現,一次回溯任務存在嚴重的時間浪費,無論以何種方式提交任務,都需要經歷以下執行環境申請及準備的過程:

  1. 在 client 提交 application,首先 client 向 RS 申請啓動 ApplicationMaster

  2. RS 先隨機找到一臺 NodeManager 啓動 ApplicationMaster

  3. ApplicationMaster 向 RS 申請啓動 Executor 的資源

  4. RS 返回一批資源給 ApplicationMaster

  5. ApplicationMaster 連接 Executor

  6. 各個 Executor 反向註冊給 ApplicationMaster

  7. ApplicationMaster 發送 task、監控 task 執行,回收結果

這個過程佔用的時間我們統稱爲執行環境準備,我們提交任務後,經歷如下三個過程:

  1. 執行環境準備

  2. 開始執行代碼

  3. 釋放資源

執行環境準備通常會有 5-20 分鐘的等待時間,以隊列當時的資源情況上下波動,失敗率爲 10% 左右,失敗原因由於隊列、網絡、資源不足等造成的不可抗力因素;代碼執行過程通常失敗率在 5% 左右,通常由於節點不穩定、網絡等因素導致。離線開發框架回溯應用從節省時間和人力兩個方面考慮,設計方案圖 2-3 所示。

從回溯時間方面來看:將所有回溯子任務的第一、第三步的時間壓縮爲一次,即環境準備及釋放各一次,執行多次回溯代碼。若開發者回溯任務爲 30 個子任務,則節省的時間爲 5-20 分鐘乘 29,可見,回溯子任務越多,回溯提效越明顯。

從人工介入方面來看,第一,開發者無需額外開發、添加回溯配置即可。第二,離線框架回溯應用啓動的任務數量遠遠小於傳統回溯方案,以圖 2-3 爲例,該回溯任務爲串行回溯方式,使用框架後只需關注一個任務的執行狀態,而傳統方式則需人工維護 N 個任務的執行狀態。

最後,我們在使用離線開發框架回溯一個一年的串行任務中,代碼的執行只需要 5 分鐘左右,我們發現,不使用離線開發框架回溯的任務在最理想的情況下(即最短時間分配到資源、所有子任務均無失敗情況、一次可以串行啓動 365 天),需要的時間爲 2.5 天,但使用離線開發框架回溯的任務,在最壞的情況下(即最長時間分配到資源,任務失敗情況出現 10%),只需要 6 個小時就可完成,提效 90% 以上,且基本無需人力關注。

2.3.3.2 功能介紹

斷點續回

使用 Spark 計算,我們在享受其計算帶來的飛快速度時,難免會遭遇其中的不穩定性,節點宕機、網絡連接失敗、資源問題帶來的任務失敗屢見不鮮,回溯任務動輒幾個月、甚至一年,任務量巨大,失敗後可以繼續從斷點處回溯顯得尤爲重要。在離線框架設計中,記錄了任務回溯過程中已成功的部分,任務失敗重啓後會進行斷點續回。

回溯順序

在回溯任務中,通常我們會根據業務需要確定回溯順序,如對於有新老用戶的增量數據,由於當前的日期數據依賴歷史數據,所以我們通常會從歷史到現在開始回溯。但沒有這種需要時,一般來說,先回溯現在可以快速滿足業務方對現在數據指標的瞭解,我們通常會從現在到歷史回溯。在離線框架設計中,開發者可根據業務需要選擇回溯順序。

並行回溯

通常,回溯任務優先級低於例行任務,在資源有限的情況下,回溯過程中不能一次性全部開啓,以免佔用大量資源影響例行任務,所以離線框架默認爲串行回溯。當然在資源充分的時間段,我們可以選擇適當的並行回溯。離線開發框架支持一定的併發度,開發者在回溯任務時遊刃有餘。

2.3.3.3 創建一個回溯任務

回溯應用的使用非常方便,開發者無需新開發代碼,使用例行的代碼,配置回溯方案即可,如下代碼所示,

<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_ads_others_order_retain_incr_day">
    <class>com.way.app.instance.ecommerce.Huisu</class>
    <type>sql</type>
    <path>/sql/ecommerce/ecommerce_ads_others_order_retain_incr_day.sql</path>
    <limitdate>20220404</limitdate>
    <startdate>20210101</startdate>
    <order>1</order>
    <distance>-1</distance>
    <file>/user/ecommerce_ads_others_order_retain_incr_day/process</file>
    <conf>
        <spark.executor.memory>1G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.executor.instances>30</spark.executor.instances>
        <spark.yarn.maxAppAttempts>1</spark.yarn.maxAppAttempts>
    </conf>
</project>

三、使用方式

3.1 使用介紹

使用離線框架方式開發時,開發者只需重點關注數據邏輯處理部分,開發完成打包後,提交執行,對於每一個應用主類相同,如前文所述爲 Application 父類,不隨應用變化,唯一變化的是父類需要接收的參數,該參數爲應用的配置文件的相對路徑。

3.2 使用對比

使用離線框架前後對比圖如下所示。

四、展望

目前,離線開發框架僅支持 SQL、Java 語言代碼的開發,但 Spark 支持的語言遠不止這兩種,我們需要繼續對框架升級支持多語言開發等,讓開發者更方便、快速地進行大數據開發。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/zpoUp6VdomGRHACaPgVOpw