千萬級支付對賬系統是怎麼設計的?

今天給大家分享一篇關於對賬系統設計的文章,出自在支付行業摸爬滾打好幾年的小黑哥之手。

如果你之前做過支付相關的業務一定多多少少都接觸過 “支付數據對賬” 的問題。

這個問題其實有非常多的解法,而不同的解法可以應對不同的交易規模。隨着交易規模的增長,對賬系統的設計也一定是在不斷的進行迭代。

而本文就是探討對於每日千萬級數據量的時候,對應的對賬系統大致應該是長什麼樣的。

如果你像我一樣,之前做過支付、對賬相關係統,在觀看文章的過程中,一定能或多或少的看到自己做過的影子在裏面。

因爲不論是什麼級別的數據對賬,它們的 “對賬流程 “這個底層邏輯都是不會發生太大變化的。

所以我希望你在學習解題之道的時候,也能一眼看到它的底層邏輯,以不變應萬變。

1 支付對賬

很早之前寫過一篇支付對賬相關文章,那時候負責對賬系統日均處理數量比較小。

那最近正在接手現在的對賬系統,由於當前系統日均數量都在千萬級,所以對賬系統架構與之前架構完全不一樣。

那就這個話題,聊聊如何實現千萬級數據支付的對賬系統。

2 什麼是對賬?

我們先來回顧下什麼是對賬?

也許你對對賬這個概念比較模糊,但是這個場景你肯定碰到過。

上班路上買了一個煎餅,加了根裏脊與王中王,然後你掃了老闆的二維碼付了 10 元錢。

你跟老闆說你已經付了 10 元錢,老闆看了下手機,果然有一條 10 元支付記錄,老闆確認收到錢,然後就把煎餅給你。

這個過程,你說你付了 10 元,老闆確認收到 10 元,這就是一隻簡單的對賬過程。

回到我們支付場景,用戶下單使用微信支付 100 元購買了一個狗頭抱枕,這時我們這邊會生成一條支付記錄,同時微信支付也會生成記錄。

那微信第二天就會生成一個賬單記錄,我們拿到之後把我們的交易記錄跟微信記錄逐筆覈對,這就是支付對賬。

3 爲什麼需要對賬?

正常支付的情況下,兩邊(我們 / 第三方支付渠道)都會產生交易數據,那支付對賬過程,兩邊數據一致,大家各自安好,不用處理什麼。

但是有些異常情況下,可能由於網絡問題,導致兩邊數據存在不一致的情況,支付對賬就可以主動發現這些交易。

對賬可以說支付系統最後一道安全防線,通過對賬我們可及時的對之前支付進行糾錯,避免訂單差錯越積越多,最後財務盤點變成一筆糊塗賬。

4 支付對賬系統

開篇先來一張圖,先來看下整體對賬系統架構圖:

整個對賬系統分爲兩個模塊:

對賬模塊,主要負責對賬文件拉取,數據解析,數據覈對,數據彙總等任務。

差錯模塊是對賬模塊後置任務,對賬模塊覈對過程產生無法覈對成功的數據,這類數據件將會推送給差錯系統。

差錯系統將會根據規則生成差錯訂單,運營人員可以在後臺處理這列數據。

先簡單的看一下之前的對賬系統設計,瞭解下對賬的整體流程。

5 對賬系統設計

對賬系統如果從流程上來講,其實非常簡單,引用一下之前文章流程圖:

https://studyidea.cn/articles/2019/08/26/1566790305561.html

整體流程可以簡單分爲三個模塊:

本端數據指的是我們應用產生的支付記錄,這裏根據賬期(交易日期)與渠道編號獲取單一渠道的所有支付記錄。

對端數據指的是第三方支付渠道支付記錄,一般通過下載對賬文件獲取。

由於每個渠道下載方式,文件格式都不太一樣,對端數據處理的時候需要將其轉化統一數據格式,標準化在入庫存儲。

網上找了一份通用賬單,可以參考:

對端數據轉化存儲之後,對賬流程中,對端數據也需要跟本端數據一樣,獲取當前賬期下所有記錄。

兩端數據都獲取成功之後,接下來就是本地數據逐筆覈對。

覈對流程可以參考之前寫的流程:

w1GkiK

上面流程其實也比較簡單,翻譯一下:

查找本端數據 / 對端數據,然後轉化存儲到 Map 中,其中 key 爲訂單號,value 爲本端 / 對端訂單對象。

然後遍歷本端數據 Map 對象,依次去對端數據 Map 查找。如果能查找到,說明對端數據也有這筆。這筆覈對成功,對端數據集中移除這筆。

如果查找不到,說明這筆數據爲差異數據,它在本端存在,對端不存在,將其移動到差異數據集中。

最後,本端數據遍歷結束,如果對端數據集還存在數據,那就證明這些數據也是差異數據,他們在對端存在,本端不存在,將其也移動到差異數據集中。

PS:上述流程存在瑕疵,只能覈對出兩邊訂單互有缺失的流程,但是實際情況下還會碰到兩邊訂單都存在,但是訂單金額卻不一樣的差異數據。這種情況有可能發現在系統 Bug,比如渠道端上送金額單位爲元,但是實際上送金額單位爲分,這就導致對賬兩端金額不一致。

之前對賬系統日均處理的支付數據峯值在幾十萬,所以上面的流程沒什麼問題,還可以抗住,正常處理。

但是目前的支付數據日均在千萬級,如果還是用這種方式對賬,當前系統可能會直接崩了。

6 千萬數據級帶來的挑戰

第一個,查詢效率。

本端 / 對端數據通過分頁查詢業務數據表獲取當天所有的數據。隨着每天支付數據累計,業務表中數據將會越來越多,這就會導致數據查詢變慢。

實際過程我們發現,單個渠道數據量很大的情況下,對賬完成需要一兩個小時。

雖然說對賬是一個離線流程,允許對賬完成時間可以久一點。但是對賬流程是後續其他任務的前置流程,整個對賬流程還是需要在中午之前完成,這樣運營同學就可以在下午處理。

第二個問題,OOM。

上面流程中,我們把把全部數據加載到內存中,小數據量下沒什麼問題。

但是在千萬級數據情況下,數據都加載到內存中,並且還是加載了兩份數據(本端、對端),這就很容易喫完整個應用內存,從而導致 Full GC,甚至還有可能導致應用 OOM。

而且這還會導致級聯反應,一個任務引發 Full GC,導致其他渠道對賬收到影響。

第三個問題,性能問題。

原先系統設計上,單一渠道對賬處理流程只能在單個機器上處理,無法並行處理。

這就導致系統設計伸縮性很差,服務器資源也被大量的浪費。

7 千萬數據級對賬解決辦法

上面系統代碼,實際上還是存在優化空間,可以利用單機多線程並行處理,但是大數據下其實帶來效果不是很好。

那主要原因是因爲發生在系統架構上,當前系統使用底層使用 MySQL 處理的。

傳統的 MySQL 是 OLTP(on-line transaction processing),這個結構決定它適合用於高併發,小事務業務數據處理。

但是對賬業務特性動輒就是百萬級,千萬級數據,數據量處理非常大。但是對賬數據處理大多是一次性,不會頻繁更新。

上面業務特性決定了,MySQL 這種 OLTP 系統不太適合大數據級對賬業務。

那專業的事應該交給專業的人去做,對賬業務也一樣,這種大數據級業務比較適合由 Hive、Spark SQL 等 OLAP 去做。

8 千萬數據級對賬系統實現方案

首先我們先來看下對賬整體時序圖,先有個印象:

下面整篇文章將會圍繞上面時序圖開始講解,由於文章篇幅過長,請各位扶好坐穩,準備發車。

9 數據平臺

前面提到,千萬級數據需要使用 Hive,Spark 等相關大數據技術,這就離不開大數據平臺的技術支持。

簡單聊下我們這邊大數據平臺 DP (Data Platform),它提供用戶大數據離線任務開發所需要的環境、工具以及數據,具有入口統一性、一站式、簡化 Hadoop 本身的複雜性、數據安全等特點。

DP 平臺提供功能如下:

那本篇文章不會涉及具體的大數據技術相關的實現細節,相關原理(主要是咱也不會~),主要聊下對賬系統如何聯合 DP 平臺實現完整數據對賬方案。

10 對賬系統概覽

開頭的時序圖,我們可以看到整個對賬過程設計好幾個業務流程,那在這裏對賬系統內部將會維護一個流程狀態機,當前一個流程處理結束之後,下一個流程才能被觸發。

由於當前對賬系統實現方案,涉及對賬系統與 DP 平臺,對賬系統目前沒辦法調用 DP 平臺觸發任務,但是 DP 平臺可以通過通過 HTTP 接口調用對賬系統。

所以當前流程觸發的方式使用的是定時任務的方案,每個流程有一個單獨的定時任務。

對賬系統內的定時任務觸發的時候,將會判斷當前流程是否已經到達執行條件,即判斷一下當前任務的狀態。

每個定時任務觸發時間人爲設置的時候,岔開一兩分鐘,防止同時運行。

DP 平臺使用自帶調度任務,對賬系統無法控制 DP 任務的運行。

DP 平臺定時任務可以通過運行 Scala 腳本代碼,調用對賬系統提供 HTTP  查詢接口,通過這種方式判斷當前流程是否已經到達執行條件。

下面詳細解釋一下每個流程。

11 初始化對賬任務

對賬系統依靠對賬任務記錄推動流轉,目前每天凌晨將會初始化生成對賬任務記錄,後續任務流轉就可以從這裏開始。

對賬系統維護一張對賬覈對規則表:

對賬覈對規則表關鍵字段含義如下:

每次對接新的支付渠道,對賬配置規則需要新增覈對規則。

初始化對賬定時任務將會查找覈對規則表中所有的生效的配置規則,依次生成當天的對賬任務記錄:

對賬任務記錄部分字段與覈對規則表含義一樣,不再贅述,其他字段含義如下:

初始化對賬任務結束之後,對賬任務流程推動到第二階段,數據收集。

12 數據收集

數據收集階段,收集兩端待覈對的數據,爲後面的數據覈對任務提供覈對數據。

數據收集階段分爲兩部分:

本端數據收集

本端數據,是自己業務產生的支付數據,這些數據原本存在各個業務的數據庫中。

對賬系統獲取這些支付數據,一般有兩種方式:

查詢數據方式前面也聊到過,數據量小的情況下,沒什麼問題。一旦數據量變大,查詢效率就會變低。

所以這裏我們採用推送的方式,對賬系統監聽各個業務數據表 binlog,每當業務數據發生變動,對賬系統就可以接受到 binlog 消息。

對賬系統接受到 binlog 消息,將會判斷當前消息是否需要過濾,是否已經支付成功等等,滿足條件之後,binlog 消息將會插入本端數據表中,表結構如下:

本端記錄表關鍵字段含義如下:

上面展示的支付記錄表結構,根據業務類型不同,本端其實還有退款記錄表,提現記錄表等。

這裏設計的時候,實際上也可以將所有業務數據放在一張表中,然後根據業務類型字段區分。

對端數據收集

對端數據,就是第三方支付渠道產生支付數據,一般 D 日產生交易之後,D+1 日第三方渠道將會生成一個對賬文件。

對賬系統需要從對端提供的對賬文件獲取對端數據。

渠道的對賬文件,下載方式,文件類型存在很大的差異,每次接入新的支付渠道,這裏需要經過新的開發。

對端數據這裏維護了一張渠道下載配置表,對端數據收集的時候將會獲取所有可用配置:

渠道下載配置表關鍵字段含義如下:

對賬文件下載成功之後,需要根據文件類型進行解析,最後轉化自己的需要的對賬數據入庫。

對端數據表結構如下:

上面關鍵字段與本端記錄表類似,額外新增字段:

channel_fee 渠道手續費,用於統計渠道收的手續費。

同樣渠道記錄表根據根據業務類型也分爲退款渠道記錄表,提現渠道記錄表等,同樣也可以合併成一張表,根據業務類型區分。

對端數據收集階段,由於拉取三方渠道的對賬文件,那有時候渠道端存在異常,將會導致對賬文件下載延遲,從而導致其他任務也出現的相應的延遲。

這一點是整個對賬流程中,相對不可控的問題。我們需要在對賬流程設計中考慮這一點。

對賬文件下載解析成功入庫之後,對賬流程將會流轉到下一個流程存疑數據處理。

13 存疑數據處理

講解這個流程之前,先給大家解釋一下什麼是存疑數據?

正常支付過程中,會存在一個兩邊賬期不一致的問題,比如說本端數據支付時間是 2021 年 12 月 28 日 23 點 59 分 59 秒,那麼本端認爲這筆支付交易賬期是 2021 年 12 月 28 日。

然而這筆支付發送給三方渠道之後,三方渠道支付成功的時間已經是 2021 年 12 月 29 日 0 點 0 分 2 秒,三方渠道支付賬期記爲 2021 年 12 月 29 日。

這種情況下我們這邊記錄賬期是 2021 年 12 月 28 日,但是第三方渠道這筆記錄是 2021 年 12 月 29 日,所以 2021 年 12 月 28 日對賬單上沒有這筆支付記錄,這就導致一筆差異數據(一端有 / 一端無)的情況。

上面這種情況就是典型因爲日切問題導致差異。

但是我們知道 2021 年 12 月 29 日對賬單上肯定會包含這筆,所以我們可以先把這筆差異數據掛起,當做存疑數據,等到 2021 年 12 月 29 日賬期對賬的時候,對方賬單包含這筆,當天就能覈對成功,這就解決這筆差異數據。

所以說存疑數據,就跟其字面意思一樣,當這筆數據當前處理不了的時候,那就現放着,不做定論,過一天我再嘗試處理一下。

除了上面日切問題導致的差異數據以外,還有一些情況:

存疑數據分爲三種類型:

瞭解完存疑數據的定義,我們再來看下存疑數據處理的流程。

存疑數據將會由下面的流程中產生,這裏先來看下存疑表結構:

關鍵字段如下:

存疑處理過程將會撈起所有存疑表中還未處理的存疑數據,根據存疑類型反向查找對賬數據表。例如:

查找對端 / 本端數據,都是根據支付流水號加業務類型查找定位。

如果在本端 / 對端數據中找到,這裏還需要再對比一下金額:

上面這一步比較重要,因爲下面對賬覈對過程主要覈對要素是支付流水號 + 支付金額,通過這種方式收集單片賬是無法知道是因爲單號不存在,還是因爲金額不存在原因,具體流程可以看下下面覈對流程。

如果在本端 / 對端數據還是找不到,那就根據渠道配置的存疑規則,如果當前已經存疑的天數大於配置渠道存疑天數,則將數據直接移動到差錯表。

如果存疑天數小於當前渠道配置天數,那就不要管,繼續保存在存疑表,等待下一天存疑數據處理。

一般來說,日切導致的數據,存疑一天,就可以解決。但是有些渠道可能是 T+1 在對賬,這種情況需要配置的存疑天數就要長一點了。

本地存疑數據處理結束之後,下面就要開始 DP 數據處理。

14 數據導入 DP

在 DP 覈對之前,我們需要將對賬系統收集的數據,從 MySQL 導入 DP Hive 表中。

DP 任務調度開始,DP 平臺定時檢測對賬系統提供 HTTP 接口,判斷本次存疑流程是否處理完成。

如果完成,自動觸發將數據從 MySQL 導入 DP Hive 表中。

數據導入之後,將會開始 DP 覈對規程。這個過程就是整個對賬流程最關鍵的部分,這個流程覈對兩端數據,檢查兩端是否存在差異數據。

15 DP 覈對

數據導入結束,DP 平臺開始覈對數據,這個過程分爲兩個覈對任務:

成功數據覈對

成功數據覈對任務,覈對的目的是爲了覈對出本端與對端支付單號與金額一致的數據。

這裏的核對任務使用了 Hive SQL,整個 SQL 如下所示:

---- A
CREATE TABLE IF NOT EXISTS dp.pay_check_success (
    `batch_no` bigint comment '批次號',
    `merchant_no` string comment '三方商戶號',
    `sub_merchant_no` string comment '三方子商戶號',
    `biz_id` string comment '對賬業務關聯字段',
    `biz_amount` bigint comment '金額',
    `biz_date` string comment '業務日期',
    `biz_type` int comment '業務類型',
    `status` int comment '狀態標識',
    `remark` string comment '備註',
    `create_time` string comment '創建時間',
    `update_time` string comment '修改時間',
    `trade_date` int comment '訂單交易日期',
    `channel_code` int comment '渠道類型'
);

----B
insert
    overwrite table dp.pay_check_success
select
    tb1.batch_no as batch_no,
    tb1.merchant_no as merchant_no,
    tb1.sub_merchant_no as sub_merchant_no,
    tb1.biz_id as biz_id,
    tb1.biz_amount as biz_amount,
    tb1.biz_date as biz_date,
    tb1.biz_type as biz_type,
    tb1.status as status,
    tb1.remark as remark,
    tb1.trade_date as trade_date,
    tb1.channel_code as channel_code
from
    (
        select
            tb2.batch_no as batch_no,
            tb1.merchant_no as merchant_no,
            tb1.sub_merchant_no as sub_merchant_no,
            tb1.biz_order_no as biz_id,
            tb1.trader_amount as biz_amount,
            '${DP_1_DAYS_AGO_Ymd}' as biz_date,
            '0' as status,
            '' as remark,
            '${DP_1_DAYS_AGO_Ymd}' as trade_date,
            tb1.channel_code as channel_code
        from
            dp.pay_check_record tb1
            inner join dp.pay_check_channel_record tb2 on tb1.biz_order_no = tb2.biz_order_no
            and tb1.trader_amount = tb2.trader_amount
            and tb1.channel_code = tb2.channel_code
        where
            tb1.is_check = 0
            and tb2.is_check = 0
            and tb1.bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and tb2.bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and tb1.is_filter = 0
    ) tb1

整個 SQL 分爲兩部分,第一部分將會在 DP 中創建一張 pay_check_success,記錄覈對成功的數據。

第二部分,將覈對成功的數據插入上面創建的 pay_check_success 表中。

查找覈對成功的數據 SQL 如下:

        select
            tb2.batch_no as batch_no,
            tb1.merchant_no as merchant_no,
            tb1.sub_merchant_no as sub_merchant_no,
            tb1.biz_order_no as biz_id,
            tb1.trader_amount as biz_amount,
            '${DP_1_DAYS_AGO_Ymd}' as biz_date,
            '0' as status,
            '' as remark,
            '${DP_1_DAYS_AGO_Ymd}' as trade_date,
            tb1.channel_code as channel_code
        from
            dp.pay_check_record tb1
            inner join dp.pay_check_channel_record tb2 on tb1.biz_order_no = tb2.biz_order_no
            and tb1.trader_amount = tb2.trader_amount
            and tb1.channel_code = tb2.channel_code
        where
            tb1.is_check = 0
            and tb2.is_check = 0
            and tb1.bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and tb2.bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and tb1.is_filter = 0

上述 SQL 存在一些 DP 平臺系統變量。DP_1_DAYS_AGO_Ymd 代表當前日期的前一天

主要邏輯非常簡單,利用 SQL 內連接查詢的功能,可以查找單號,金額,渠道編碼一致的數據。

成功數據覈對任務結束,將會把剛纔在 DP 中創建的 pay_check_success 同步回對賬系統的 MYSQL 數據庫中。

存疑數據覈對

存疑數據覈對任務,覈對的目的是爲了覈對出本端與對端支付單號或金額不一致的數據。

這些數據將會當做存疑數據,這些數據將會在第二階段存疑數據處理。

這裏的核對任務也是使用了 Hive SQL ,整個 SQL 跟上面比較類似,SQL 如下所示:

CREATE TABLE IF NOT EXISTS dp.check_dp_buffer_record (
    `biz_id` string comment '訂單號',
    `order_type` string comment '訂單類型 0本端訂單 1渠道訂單',
    `bill_date` int comment '對賬日期',
    `biz_type` int comment '業務類型',
    `channel_code` int comment '渠道類型',
    `amount` string comment '金額',
    `merchant_no` string comment '商戶號',
    `sub_merchant_no` string comment '三方子商戶號',
    `trade_date` int comment '交易日期',
    `create_time` string comment '創建時間',
    `update_time` string comment '修改時間'
);

insert
    overwrite table dp.check_dp_buffer_record
select
    tb1.biz_id as biz_id,
    tb1.order_type as order_type,
    tb1.bill_date as bill_date,
    tb1.biz_type as biz_type,
    tb1.channel_code as channel_code,
    tb1.amount as amount,
    tb1.merchant_no as merchant_no,
    tb1.sub_merchant_no as sub_merchant_no,
    tb1.trade_date as trade_date,
    '${DP_0_DAYS_AGO_Y_m_d_HMS}',
    '${DP_0_DAYS_AGO_Y_m_d_HMS}'
FROM
    (
        select
            tb1.biz_order_no as biz_id,
            0 as order_type,
            tb1.bill_date as bill_date,
            10 as biz_type,
            tb1.channel_code as channel_code,
            tb1.trade_amount as amount,
            tb1.merchant_no as merchant_no,
            tb1.sub_merchant_no as sub_merchant_no,
            '${DP_1_DAYS_AGO_Ymd}' as trade_date
        FROM
            (
                select
                    biz_order_no,
                    bill_date,
                    channel_code,
                    trade_amount,
                    merchant_no,
                    sub_merchant_no
                from
                    ods.pay_check_record
                where
                    and bill_date = '${DP_1_DAYS_AGO_Ymd}'
                    and is_filter = 0
                    and is_check = 0
            ) tb1
            LEFT JOIN (
                select
                    biz_order_no,
                    trade_amount,
                    channel_code
                from
                    ods.pay_check_channel_record
                where
                    and bill_date = '${DP_1_DAYS_AGO_Ymd}'
                    and is_check = 0
            ) tb2 ON tb1.biz_order_no = tb2.biz_order_no
            and tb1.trade_amount = tb2.trade_amount
            and tb1.channel_code = tb2.channel_code
        where
            tb2.biz_order_no IS NULL
        union
        select
            tb1.biz_order_no as biz_id,
            1 as order_type,
            tb1.bill_date as bill_date,
            10 as biz_type,
            tb1.channel_code as channel_code,
            tb1.trade_amount as amount,
            tb1.merchant_no as merchant_no,
            tb1.sub_merchant_no as sub_merchant_no,
            '${DP_1_DAYS_AGO_Ymd}' as trade_date
        FROM
            (
                select
                    biz_order_no,
                    bill_date,
                    channel_code,
                    trade_amount,
                    merchant_no,
                    sub_merchant_no
                from
                    ods.pay_check_chnnel_bill
                where
                    and bill_date = '${DP_1_DAYS_AGO_Ymd}'
                    and is_check = 0
            ) tb1
            LEFT JOIN (
                select
                    biz_order_no,
                    channel_code,
                    trade_amount
                from
                    ods.pay_check_record
                where
                    and bill_date = '${DP_1_DAYS_AGO_Ymd}'
                    and is_filter = 0
                    and is_check = 0
            ) tb2 ON tb1.biz_order_no = tb2.biz_order_no
            and tb1.trade_amount = tb2.trade_amount
            and tb1.channel_code = tb2.channel_code
        where
            tb2.biz_order_no IS NULL
    ) tb1;

整個 SQL 分爲兩部分,第一部分將會在 DP 中創建一張 check_dp_buffer_record,記錄覈對差異的的數據。

第二部分,將覈對差異的數據插入上面創建的 check_dp_buffer_record 表中。

查找差異數據較爲麻煩,需要分成兩部分收集:

兩邊數據查找到之後,使用 SQL union 功能,將兩端數據聯合。

我們先來看下本端單邊張的邏輯的:

select
    tb1.biz_order_no as biz_id,
    0 as order_type,
    tb1.bill_date as bill_date,
    10 as biz_type,
    tb1.channel_code as channel_code,
    tb1.trade_amount as amount,
    tb1.merchant_no as merchant_no,
    tb1.sub_merchant_no as sub_merchant_no,
    '${DP_1_DAYS_AGO_Ymd}' as trade_date
FROM
    (
        select
            biz_order_no,
            bill_date,
            channel_code,
            trade_amount,
            merchant_no,
            sub_merchant_no
        from
            ods.pay_check_record
        where
            and bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and is_filter = 0
            and is_check = 0
    ) tb1
    LEFT JOIN (
        select
            biz_order_no,
            trade_amount,
            channel_code
        from
            ods.pay_check_channel_record
        where
            and bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and is_check = 0
    ) tb2 ON tb1.biz_order_no = tb2.biz_order_no
    and tb1.trade_amount = tb2.trade_amount
    and tb1.channel_code = tb2.channel_code
where
    tb2.biz_order_no IS NULL

SQL 看起來比較複雜,實際邏輯可以簡化爲下面 SQL:

select
    *
from
    innerTab t1
    LEFT JOIN channelTab t2 ON t1.biz_order_no = t2.biz_order_no
    and t1.trade_amount = t2.trade_amount
    and t1.channel_code = t2.channel_code
where
    t2.biz_order_no is null;

這裏主要利用 SQL 左連接的功能,本端數據 left join 渠道數據,如果渠道單號不存在,則認爲本端數據存在,渠道數據不存在,當然也有可能是兩端數據都存在,但是金額不相等。

這種情況記爲本端數據存疑,orderType 爲 0。

渠道端單邊賬收集邏輯:

select
    tb1.biz_order_no as biz_id,
    1 as order_type,
    tb1.bill_date as bill_date,
    10 as biz_type,
    tb1.channel_code as channel_code,
    tb1.trade_amount as amount,
    tb1.merchant_no as merchant_no,
    tb1.sub_merchant_no as sub_merchant_no,
    '${DP_1_DAYS_AGO_Ymd}' as trade_date
FROM
    (
        select
            biz_order_no,
            bill_date,
            channel_code,
            trade_amount,
            merchant_no,
            sub_merchant_no
        from
            ods.pay_check_chnnel_bill
        where
            and bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and is_check = 0
    ) tb1
    LEFT JOIN (
        select
            biz_order_no,
            channel_code,
            trade_amount
        from
            ods.pay_check_record
        where
            and bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and is_filter = 0
            and is_check = 0
    ) tb2 ON tb1.biz_order_no = tb2.biz_order_no
    and tb1.trade_amount = tb2.trade_amount
    and tb1.channel_code = tb2.channel_code
where
    tb2.biz_order_no IS NULL

邏輯與本端單邊賬收集類似,渠道數據 left join 本端數據,如果本端單號不存在,則爲渠道數據存在,本端數據不存在。當然也有可能是兩端數據都存在,但是金額不相等。

這裏記爲渠道存疑數據,orderType 爲 1。

成功數據覈對以及存疑數據覈對結束,DP 平臺將會自動把數據從 Hive 表中導入到 MySQL。

數據導出結束,DP 平臺將會調用對賬系統的相關接口,通知對賬系統 DP 覈對流程結束。

DP 覈對流程是整個對賬流程核心流程,目前千萬級數據的情況下,大概能在一個小時之內搞定。

DP 覈對流程結束之後,對賬系統開始下個流程 - 二次存疑數據處理。

16 二次存疑數據處理

前面流程我們講到存疑處理,爲什麼這裏還需要二次存疑數據處理呢?

這因爲 DP 覈對存疑數據收集的過程,我們使用業務單號與金額去互相匹配,那如果不存在,有可能是因爲兩端數據有一端不存在,還有可能是因爲兩端數據數據都存在,但是金額卻不相等。

DP 覈對過程是無法區分出這兩種情況,所以增加一個二次存疑數據處理流程,單獨區分出這兩類數據。

回到二次存疑數據處理流程,當天產生的所有存疑數據都從 DP 中導入到 check_dp_buffer_record 表。

二次存疑數據處理流程將會查找 check_dp_buffer_record 表所有未覈對的記錄,然後依次遍歷。

遍歷過程中將會嘗試在 check_dp_buffer_record 表中查找相反方向的存疑數據。

這個可能不好理解,舉個例子:

假如有一筆訂單,本端是 100 元,渠道端是 10 元。這種情況兩筆記錄都會出現在  check_dp_buffer_record 表。

遍歷到本端這筆的時候,這筆類型是本端存疑,type 爲 0。使用者本端單號從 check_dp_buffer_record 查找渠道端存疑(type 爲 1)的數據。

上面的情況可以找到,證明這筆存疑數據其實是金額不相等,這裏需要將數據移動到差錯表。

那如果是正常一端缺失的數據,那自然去相反方向查找是找不到的,這種數據是正常存疑數據,移動內部存疑表。

對賬系統二次存疑數據處理結束之後,開始下一個階段數據彙總。

17 數據彙總

數據彙總階段就是爲了統計當天每個有多少成功功對賬數據,多少存疑數據,統計結束通過看板給相關運營人員展示統計數據。

由於數據量大的問題,這裏使用的是 DP 平臺 Sprak 任務進行任務統計。

這裏邏輯簡單解釋爲,就是利用 Scala 腳本代碼對數據進行相關求和,這裏代碼沒有普遍性,就不展示具體的邏輯了。

18 差錯數據推送

數據彙總結束之後,開始下一個階段,差錯數據推送給差錯系統。

上面存疑數據處理的流程中轉化的差錯數據,當前存在對賬系統內部差錯數據表中。

目前我們差錯數據是是另外一個差錯系統單獨處理,所以對賬系統需要把差錯數據表數據推送給差錯系統。

這裏的邏輯比較簡單,查找所有待處理的差錯數據,遍歷發送 NSQ 消息給差錯系統。

19 總結

千萬級數據對賬整個流程看起,其實相關操作流程都不是很難。

那我個人認爲這裏難點在於第一需要一套完整大數據平臺體系,第二改變原有對賬方式,思考如何將對賬系統與大數據平臺一起串起來。

希望這篇文章對正好碰到該類問題同仁起到相關幫助。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/iKtmU2GnajosandeeP0H5w