不可思議!億級數據竟然如此輕鬆同步至 ES!

1 這是一個背景

最近接了一個需求,要提供一個隨意組合多個條件來查詢訂單數據的功能,看着數據庫裏過億的訂單量,頭髮不爭氣的又脫落了兩根代表這個需求不簡單

脫落的兩根頭髮,不是技術實現上很難,其實技術實現上清晰明瞭,就是通過數據異構,將數據同步到 ES,利用 ES 的倒排索引、緩存等能力,提供多條件複雜查詢的能力,而 ES 集羣我們已經有了

但有些數據,在目前的 ES 索引中是不存在的,也就是說,我需要將過億的訂單數據從訂單數據庫重新刷一遍到 ES 中,而這一頓操作下來得需要一週的時間!

什麼?你不信,那咱們來捋一捋

2 捋一捋訂單數據同步到 ES 中的複雜度

2.1 數據同步 ES 索引流程

如上圖所示,就是將數據同步到 ES 索引的過程。

首先需要從訂單數據庫查詢所有的訂單數據,然後根據訂單數據上保存的用戶 ID,商品 ID 等信息從用戶服務,商品服務查詢相關信息,經過處理與組裝後落到 ES 集羣中。

之所以要查詢用戶信息和商品信息,是因爲異構在 ES 索引中的訂單數據,並不會與 mysql 中的數據一一對應,有很多根據商品類目,用戶信息等查詢訂單信息的訴求存在,因此在這裏就需要查詢很多的上游服務來組裝信息

2.2 來梳理下是否有難點?

  1. 從數據庫把上億的訂單數據出來。這個操作不能影響到線上業務,因此查詢的訂單數據庫一般是從庫,OK,配置多數據源來讀取數據吧,而且上億的訂單一般採用的都是分庫分表來存儲的,我們是分了 16 個庫,每個庫 16 個表,總共 256 張表,嘿嘿

  2. 上億的訂單數據不能一次性全部讀取到內存吧,不然內存冒煙都存不下啊。所以得考慮分頁,分頁直接 limit 也不好,隨着數據量越大,速度越慢,所以得考慮一個遊標,嗯,選一個字段當遊標吧,遊標最好唯一且遞增

  3. 從多個服務獲取數據,這些數據所在的服務一般都屬於公司的其它部門,讀取數據的時候也不能影響到人家的服務吧,你這裏查詢的是嘎嘎猛,一看人家的服務都崩了,這個黑鍋就飛來了。所以這裏得考慮限流吧,得考慮隔離吧?不說全鏈路隔離,成本太高,起碼關鍵服務得隔離一下

  4. 數據同步一段時間,產品來問,同步多久了啊,大概還有多久能完成啊,數據量大概是多少啊,一臉懵,不知道啊。

  5. 如果中途同步失敗了,咋處理啊,是不是得重試,咋重試,重試策略是啥?失敗有沒有報警,能不能及時感知並處理啊?如果同步一段時間中斷了咋整啊?有沒有記錄從哪中斷的?能否從中斷處繼續同步啊,不然從頭開始又得 N 天,哭了

  6. 同步了一部分,發現有問題需要暫停一會,咋整?

  7. 如果只想同步部分數據不一致的訂單數據,可能就 2,3 個訂單,咋整,是不是還得提供按照手動輸入訂單 ID 同步 ES 數據的能力?

  8. 同步過程是咋樣的?開始時間?結束時間?共耗時多久?操作人是誰?這些統計數據從哪來?

  9. 想夜深人靜的時候同步數據,這有時候對業務的影響小,定個鬧鐘晚上起?

  10. 現在不單需要同步訂單的數據了,還需要同步商品 ES 集羣的數據,這些邏輯還得重新寫一遍?

啊啊啊啊,想想都頭疼啊

所以,一些事情看着簡單,其實並沒有那麼簡單

3 神奇的服務

爲了讓頭髮更有歸屬感,針對上述的難點開發了一款神奇的服務,那就是 ECP。它可以將整個流程自動化、可視化的處理,降低數據異構到 ES 的成本 任務界面如下所示:

3.1 ECP 的簡單運行流程

簡單來說,ECP 的作用就是將數據從數據源讀取出來,然後推送給 ES 寫服務。因爲數據處理的邏輯因不同的業務而異,ES 寫服務由各個對接方來實現,因此一個簡單的流程如下圖:

這裏面涉及到一些技術細節,比如如何進行多數據源數據讀取,數據源配置,sql 校驗,動態限流、SPI 機制、重試策略與故障感知、探活與故障恢復,環境隔離等等。

下面一一介紹下

3.2 多數據源數據讀取

ECP 支持目前支持三個數據源數據的讀取,分別爲 ID 源,文本源、以及腳本源

3.2.1 ID 源

有個文本框用來輸入 ID。這種場景適用於小數據的數據同步,比如發現一些數據庫和 ES 的數據不一致了,就簡單的刷一下數據

3.2.2 文件源

文件源指的是數據源來源於文本文件,適合中等數據的同步。ECP 和對象存儲進行了對接,用戶可以上傳文件至對象存儲,在任務執行時,ECP 會讀取對象存儲中的文本數據。

這種情況需要注意的是,用戶上傳的文件有可能會比較大,直接都讀取到內存再處理不現實,因此這裏採用的是流的方式進行讀取,讀取一批處理一批,再釋放一批,不會造成 OOM

簡化的處理方式如下:

try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
            if (!response.isSuccessful()) {
                throw new IOException("Unexpected code " + response);
            }

  // 以流的方式讀取文件數據
  InputStream inputStream = response.body().byteStream();
  BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

}

3.2.3 腳本源

腳本源適用於大數據量的數據同步。

腳本本質上就是 SQL 和數據源的結合。

用戶在 ECP 中配置數據庫的連接信息,然後配置 SQL。ECP 會執行該 SQL,將數據從配置的數據庫中讀取出來,推送到 ES 寫服務中。

腳本源可以支持上億數據的讀取與推送,如下圖爲訂單庫(分庫分表)配置的腳本信息:

3.2.4 腳本源大數據讀取的實現

將幾億數據讀取到內存中來處理顯然不可能,因此採用局部數據的讀取與處理纔是正道。

在業務中,經常使用的是分頁,但分頁如果僅是使用 limit offset,size,待 offset 的值比較大時,性能會急劇下降,形成慢 SQL,甚至拖累整個數據庫的性能。

因此在分頁數量比較大時,需要指定一個有索引的字段作爲遊標,該遊標可以提高分頁的性能,如在訂單表中,若在訂單 ID 是遞增的且有設置了索引,SQL 就可以這麼寫:select * from t_order where order_id > xxx order by order_id desc limit 10; 利用 order_id 值的變化就可以起到分頁的效果

這種方式雖好,但讓用戶選定遊標索引無疑增加了使用的門檻,因此 ECP 沒有采用上述分頁的形式來讀取大數據,而是採用 JDBC 遊標查詢的方式,如下所示:

    // 建立連接
       conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
       // 創建查詢
       stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
       stmt.setFetchSize(param.getFetchSize());

遊標查詢每次讀取 fetchSize 大小的數據量,可以很好的避免讀取大數據量導致的 OOM 問題

3.3 SQL 的解析與校驗

用戶配置 SQL 腳本,ECP 需要對該 SQL 腳本進行校驗與修改,傳統的字符串處理(比如正則)雖然在一定情況下可以滿足需求,但是容易出錯。因此 ECP 採用的是 Druid 的 SQL 解析工具包,可以將 SQL 解析成 AST 語法樹,以便對 SQL 進行各種處理。如下圖所示:

ECP 提供的數據樣例查詢,會對 SQL 自動拼接上 limit 1

3.4 動態限流的實現

限流分集羣限流和單機限流,經過評估,在能簡單就簡單的原則下,我們採用的是單機限流,限流組件使用的是 guava 的 RateLimiter

當在頁面上修改 QPS 的值時,會將該值同步到數據庫中,有個調度任務會不斷地掃描該值的變動,將變動的值同步到 RateLimiter 組件中

當然,也可以採用數據監聽的策略 (比如廣播 MQ),讓變動值同步到 RateLimiter 更及時,但這種方式還需引入其它組件,複雜度嗷嗷上升,不符合我們簡單實現的策略

動態限流的實現流程如下;

如下圖是在不同的時間點修改了限流值後的 QPS 變化圖:

3.5 重試策略與故障感知

ES 中和 DB 中的數據要儘可能的保證實時一致性,但最終一致性是必須要保證的,所以數據推送、處理失敗的時候要進行重試,如何重試?

首先需要了解下失敗的類型,制定合適的重試策略,知彼知己,百戰不殆嘛

一、網絡抖動導致的接口調用超時。在調用微服務 RPC 接口的時候,由於網絡抖動等情況,會導致接口調用超時,但很快就會恢復,通常情況下也就偶爾一次,下一次調用就會正常

二、數據處理邏輯異常。這種情況下,異常沒辦法自恢復,只能人工介入

三、上游服務異常。如上游服務壓力過大導致接口調用失敗,這時候就需要我們緩一緩再繼續處理,不能一個勁的調用導致上游服務崩潰掉

結合上面的失敗類型的特點,斐波那契數列的重試策略就非常適合 斐波那契數列的特點是:1,1,2,3,5,8,13,21,34,55,89…

當第一次失敗的時候,延時 1 秒後就重試,如果此時是網絡抖動導致的超時,重試就成功了,不影響數據處理的速度 若失敗的次數越多,重試的間隔時間就會越長,這也會兼顧到上述二、三的失敗類型

重試組件使用的是 Guava Retry,簡單的僞代碼如下:

// 重試組件配置
private final Retryer<Boolean> RETRYER = RetryerBuilder.<Boolean>newBuilder()
            // 對中斷類的異常不重試
            .retryIfException(input -> !isPauseException(input))
            // 1,1,2,3,5,8,13,21,33...
            .withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
           // 重試次數達到一定的次數後,不再重試
            .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
            .withRetryListener(new RetryListener() {
                @Override
                public <V> void onRetry(Attempt<V> attempt) {
                    if (attempt.hasException()) {
                        log.error("act=【DataFlushRpcCallRetry】desc=【重試】重試次數=【{}】重試異常=【{}】", attempt.getAttemptNumber(), attempt.getExceptionCause());
                        // 重試超過閾值進行報警提醒
                        alarmIfExceedThreshold(attempt);
                    }
                }
            })
            .build();

// 將執行邏輯抽象爲Runnable,對外暴露該方法
public void execute(Runnable runnable) {
    innerExecute(runnable,RETRYER);
}


private void innerExecute(Runnable runnable, Retryer<Boolean> retryer) {
    try {
    retryer.call(() -> {
        runnable.run();
        return true;
       });
    } catch (Exception e) {
       log.error("act=【DataFlushRpcCallRetry】desc=【重試異常】error=【{}】", e);
       throw new IllegalStateException(e);
    }
}

若重試到一定次數之後依然是失敗的話,則會將錯誤信息發送到報警羣。根據推送的信息,可以明確知道錯誤的類型,重試的次數,以及任務的創建人等等信息,無需查看日誌,即可定位大部分的問題。如下圖:

3.6 將數據推送給哪個服務來處理?-SPI 機制

ECP 是個通用的服務,因此需要將共性功能收攏在一起做成成品,將非共性的功能抽象一下,交給各個對接方去實現。

從簡單實現的角度來看,若有某個服務想要對接 ECP,我們在 ECP 上開發一下,調用該服務的接口,將數據推送給該服務,思路雖清晰明瞭,但對接及維護成本極高,且沒有一個統一的規範,因此不可取,其流程如下圖:

Java 上有個很好的思想可以解決這個問題,那就是 SPI。因此由 ECP 提供一個接口,制定一個規範,具體的 ES 索引數據的組裝邏輯由各個對接方去實現

這樣,若有一個新的對接方接入,只要實現接口即可,ECP 無需做任何改動

至於服務發現,ECP 採用的配置的方式,也就是在新建任務的時候,選擇數據推送的消費方服務,如下圖:

對於實現方式,得益於公司內部自研的 RPC 框架,提供了動態指定調用服務的方式,僞代碼如下:

Reference<IEsIndexFlushAPI> reference = new Reference<>();
// 設置調用的服務名
reference.setServiceName(serviceName);
// 設置接口名
reference.setInterfaceClass(IEsIndexFlushAPI.class);
// 設置上下文
reference.setApplicationConfig(applicationConfig);
// 獲取接口實例
IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer());
// 接口調用
log.info("act=【EsIndexFlushApiInvoker】desc=【請求值】serviceName=【{}】dataListSize=【{}】indexNameList=【{}】tag=【{}】", serviceName,request.getDataList().size(),request.getIndexNameList(),request.getTag() );
EMApiResult<FlushResponse> result = iEsIndexFlushAPI.flush(request);

3.7 環境隔離

同步數據是個比較重的操作,這個操作不應該影響到線上業務 因此,同步數據的服務應當與線上服務隔離開 ECP 整合了架構組提供的標籤路由功能,可以在整個請求鏈路中調用指定標籤的服務,實現環境隔離

ECP 標籤路由配置圖:

如下圖,若在 ECP 上配置任務的標籤路由爲 FLUSH,則在同步任務執行過程中,會自動調用鏈路中綁定了 FLUSH 標籤的服務分組。

若某些服務沒有配置爲 FLUSH 標籤的分組,這時就會自動請求該服務的線上正常環境。這樣,就可以做到一定程度上的環境隔離

3.8 探活與任務故障恢復機制

在推送數據的過程中,若發生了不可描述的事情導致任務中斷,咋整?

到了需求 DeadLine,發現任務在某年某月某日進度爲 1% 的時候停了,哭了。

而且工作時間緊,任務重,總不能一定盯着任務,看有沒有中斷吧?這不適合,也不禮貌。

當然,這種情況在 ECP 是不會發生的,因爲 ECP 是有 “自救包” 的。下面聊下 ECP 的任務探活和中斷恢復機制

如下圖,在 ECP 中有探活和任務故障恢復兩大組件 探活組件負責監控當前任務線程的執行狀態,若任務線程正在執行,則對該任務的存活時間進行續期 任務故障恢復組件負責掃描當前未完成的任務,若任務上次存活時間大於指定的閾值時,則拉取該任務恢復執行

續期的僞代碼如下:

    @Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
    public void renewal(){
        futureMap.forEach((taskId,future)->{
            if (!future.isDone()){
                log.info("act=【renewal】desc=【任務續期】taskId=【{}】續期時間=【{}】",taskId, DateUtils.dateToString(new Date(),DateUtils.PATTERN));
                contextService.renewal(taskId);
            }else {
                log.info("act=【renewal】desc=【任務結束】taskId=【{}】",taskId);
                futureMap.remove(taskId);
            }
        });
    }

任務故障恢復的僞代碼如下:

    @Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
    public void restartTask(){

     // 1.查詢當前未完成的任務
        List<TaskFlushExecuteContextPO> contextPOS = contextService.queryRunningTask();

        for (TaskFlushExecuteContextPO contextPO : contextPOS) {
            // 2.計算上次存活到當前的時間
            Integer durationMin = calculateTimeSinceLastAlive();

      // 3.若時間大於指定閾值 則對任務重新拉起
            if (durationMin >= MAX_DURATION_MIN){
                log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】",contextPO.getTaskId());
                // 4.更新alive_time進行鎖定 防止併發執行
                int i = contextExtMapper.casUpdateAliveTime();
                if (i >0){
                    // 5.重新拉起任務
                    restart0(contextPO, aliveTime);
                }
            }
        }
    }

3.9 平滑遷移的實現

將數據同步到 ES,通常有兩種方式:

  1. 直接把數據同步到原索引上

  2. 新建一個索引,利用雙寫以及切換別名的方式實現流量的平滑遷移。

對於新建一個索引的場景,往往是索引 Mapping 的改變,或者是爲了不影響原索引,保證操作可回滾

針對這種場景,ECP 分析了歷來大家手動操作刷 ES 索引的步驟,將流程進行抽象,歸納了以下幾個步驟,如下圖:

ECP 提供了平滑遷移組件,其內部整合了 Apollo 配置中心實現推送能力,其簡要的實現流程如下圖:

3.10 優雅的日誌記錄

如下圖所示展示了該任務操作的日誌,原則上日誌記錄爲非核心業務,需要與核心業務代碼進行剝離,因此使用註解式流水記錄是個很好的選擇

但註解式流水記錄有個問題,就是在很多的場景下,流水裏面的值需要動態獲取,利用註解可以實現嗎? 答案是可以的,在上圖所示中,任務 ID、數據來源都是動態數據,那如何實現的呢?看下面代碼:

@Flow(subjectIdEp = "#taskPO.id",subjectType = SubjectTypeEnum.TASK,operateFlowType = OperateFlowTypeEnum.CREATE_TASK,content = "'創建任務,任務ID:' + #taskPO.id ")
    public void saveTaskWithUser(TaskPO taskPO) {
        String name = LoginUserContext.get().getName();
        taskPO.setCreator(name);
        taskPO.setModifier(name);
        taskMapper.insertSelective(taskPO);
    }

subjectIdEp 爲流水主題 ID,#taskPo.id 爲一個表達式,可用動態獲取參數 taskPo 中的 id 值,這裏利用了 springEl 表達式的能力

content = "'創建任務,任務 ID:' + #taskPO.id " 爲流水信息,同樣利用了 springEL 表達式,動態獲取請求參數 taskPo 中的 id 信息

但有些信息需要一系列的計算纔可以獲取到,而不是單純的從對象中取值,這也是可以實現的。如下:

@Flow(subjectIdEp = "#contextPO.taskId",
            subjectType = SubjectTypeEnum.TASK,
            operateFlowType = OperateFlowTypeEnum.DATA_FLUSH,
            content = "'【數據同步】異常中斷任務恢復執行,中斷時間:' + T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime)")
    @Transactional(rollbackFor = Exception.class,isolation = Isolation.REPEATABLE_READ)
    public void restart0(TaskFlushExecuteContextPO contextPO, Date aliveTime) {
        log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】原aliveTime=【{}】", contextPO.getTaskId(), aliveTime);
        dsProcessorExecutor.executeAndKeepAliveMonitor(contextPO.getTaskId());
    }

其中T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime) 代表執行的是DateUtils.dateToStringSimple 方法,也就是說表達式是可以調用方法的,包括從 spring 容器中獲取對象,調用對象的方法均可。

這種註解式流水的實現原理,就是利用 SPEL 表達式和 Spring Aop 的特性,寫一個切面,攔截自定義的 flow 註解即可,僞代碼如下:

// 定義切面,攔截FLOW註解
@Around("@annotation(com.zhuanzhuan.esmanage.entity.annotation.Flow)")
public Object around(ProceedingJoinPoint point) throws Throwable {

    // 調用目標方法
    Object result = null;
    try {
        result = point.proceed();
        recordFlow(point,result);
        return result;
    } catch (Throwable e) {
        recordException(point,e);
        throw e;
    }
}


// 流水記錄的實現
private void recordFlow(ProceedingJoinPoint point, Object result) {
    // try catch 防止影響主邏輯
    //TODO 看是否需要寫在一個事務中,主要評估流水的重要性
    try {
        MethodSignature signature = (MethodSignature) point.getSignature();
        Flow flowAnnotation = getFlowAnnotation(signature);

        // 組裝參數上下文
        EvaluationContext evaluationContext = buildContext(point, signature);

        evaluationContext.setVariable("result",result);

        // ID表達式
        String subjectIdEp = flowAnnotation.subjectIdEp();

        // content表達式
        String content = getContent(flowAnnotation, evaluationContext);

    // SPEL解析表達式
        Expression expression = PARSER.parseExpression(subjectIdEp);
        Integer subjectId = (Integer)expression.getValue(evaluationContext);
        record(flowAnnotation, subjectId, content);
    } catch (Exception e) {
        log.error("記錄操作流水失敗", e);
    }
}

4 總結

總得來說,ECP 的實現中有很多的技術細節需要考慮,技術難度一般。 

實際上,在我們大部分的項目中,考驗的就是對細節的把控~

ps:感謝 ChatGPT 對本文名稱的大力支持


關於作者

閆展,轉轉交易中臺研發工程師

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/JyH0AHy0VmOe_qiBKSU_RQ