千萬級支付對賬系統是怎麼設計的?
今天給大家分享一篇關於對賬系統設計的文章,出自在支付行業摸爬滾打好幾年的小黑哥之手。
如果你之前做過支付相關的業務一定多多少少都接觸過 “支付數據對賬” 的問題。
這個問題其實有非常多的解法,而不同的解法可以應對不同的交易規模。隨着交易規模的增長,對賬系統的設計也一定是在不斷的進行迭代。
而本文就是探討對於每日千萬級數據量的時候,對應的對賬系統大致應該是長什麼樣的。
如果你像我一樣,之前做過支付、對賬相關係統,在觀看文章的過程中,一定能或多或少的看到自己做過的影子在裏面。
因爲不論是什麼級別的數據對賬,它們的 “對賬流程 “這個底層邏輯都是不會發生太大變化的。
所以我希望你在學習解題之道的時候,也能一眼看到它的底層邏輯,以不變應萬變。
1 支付對賬
很早之前寫過一篇支付對賬相關文章,那時候負責對賬系統日均處理數量比較小。
那最近正在接手現在的對賬系統,由於當前系統日均數量都在千萬級,所以對賬系統架構與之前架構完全不一樣。
那就這個話題,聊聊如何實現千萬級數據支付的對賬系統。
2 什麼是對賬?
我們先來回顧下什麼是對賬?
也許你對對賬這個概念比較模糊,但是這個場景你肯定碰到過。
上班路上買了一個煎餅,加了根裏脊與王中王,然後你掃了老闆的二維碼付了 10 元錢。
你跟老闆說你已經付了 10 元錢,老闆看了下手機,果然有一條 10 元支付記錄,老闆確認收到錢,然後就把煎餅給你。
這個過程,你說你付了 10 元,老闆確認收到 10 元,這就是一隻簡單的對賬過程。
回到我們支付場景,用戶下單使用微信支付 100 元購買了一個狗頭抱枕,這時我們這邊會生成一條支付記錄,同時微信支付也會生成記錄。
那微信第二天就會生成一個賬單記錄,我們拿到之後把我們的交易記錄跟微信記錄逐筆覈對,這就是支付對賬。
3 爲什麼需要對賬?
正常支付的情況下,兩邊(我們 / 第三方支付渠道)都會產生交易數據,那支付對賬過程,兩邊數據一致,大家各自安好,不用處理什麼。
但是有些異常情況下,可能由於網絡問題,導致兩邊數據存在不一致的情況,支付對賬就可以主動發現這些交易。
對賬可以說支付系統最後一道安全防線,通過對賬我們可及時的對之前支付進行糾錯,避免訂單差錯越積越多,最後財務盤點變成一筆糊塗賬。
4 支付對賬系統
開篇先來一張圖,先來看下整體對賬系統架構圖:
整個對賬系統分爲兩個模塊:
-
對賬模塊
-
差錯模塊
對賬模塊,主要負責對賬文件拉取,數據解析,數據覈對,數據彙總等任務。
差錯模塊是對賬模塊後置任務,對賬模塊覈對過程產生無法覈對成功的數據,這類數據件將會推送給差錯系統。
差錯系統將會根據規則生成差錯訂單,運營人員可以在後臺處理這列數據。
先簡單的看一下之前的對賬系統設計,瞭解下對賬的整體流程。
5 對賬系統設計
對賬系統如果從流程上來講,其實非常簡單,引用一下之前文章流程圖:
https://studyidea.cn/articles/2019/08/26/1566790305561.html
整體流程可以簡單分爲三個模塊:
-
本端數據處理
-
對端數據處理
-
本端數據與渠道端數據覈對
本端數據指的是我們應用產生的支付記錄,這裏根據賬期(交易日期)與渠道編號獲取單一渠道的所有支付記錄。
對端數據指的是第三方支付渠道支付記錄,一般通過下載對賬文件獲取。
由於每個渠道下載方式,文件格式都不太一樣,對端數據處理的時候需要將其轉化統一數據格式,標準化在入庫存儲。
網上找了一份通用賬單,可以參考:
對端數據轉化存儲之後,對賬流程中,對端數據也需要跟本端數據一樣,獲取當前賬期下所有記錄。
兩端數據都獲取成功之後,接下來就是本地數據逐筆覈對。
覈對流程可以參考之前寫的流程:
上面流程其實也比較簡單,翻譯一下:
查找本端數據 / 對端數據,然後轉化存儲到 Map 中,其中 key 爲訂單號,value 爲本端 / 對端訂單對象。
然後遍歷本端數據 Map 對象,依次去對端數據 Map 查找。如果能查找到,說明對端數據也有這筆。這筆覈對成功,對端數據集中移除這筆。
如果查找不到,說明這筆數據爲差異數據,它在本端存在,對端不存在,將其移動到差異數據集中。
最後,本端數據遍歷結束,如果對端數據集還存在數據,那就證明這些數據也是差異數據,他們在對端存在,本端不存在,將其也移動到差異數據集中。
PS:上述流程存在瑕疵,只能覈對出兩邊訂單互有缺失的流程,但是實際情況下還會碰到兩邊訂單都存在,但是訂單金額卻不一樣的差異數據。這種情況有可能發現在系統 Bug,比如渠道端上送金額單位爲元,但是實際上送金額單位爲分,這就導致對賬兩端金額不一致。
之前對賬系統日均處理的支付數據峯值在幾十萬,所以上面的流程沒什麼問題,還可以抗住,正常處理。
但是目前的支付數據日均在千萬級,如果還是用這種方式對賬,當前系統可能會直接崩了。
6 千萬數據級帶來的挑戰
第一個,查詢效率。
本端 / 對端數據通過分頁查詢業務數據表獲取當天所有的數據。隨着每天支付數據累計,業務表中數據將會越來越多,這就會導致數據查詢變慢。
實際過程我們發現,單個渠道數據量很大的情況下,對賬完成需要一兩個小時。
雖然說對賬是一個離線流程,允許對賬完成時間可以久一點。但是對賬流程是後續其他任務的前置流程,整個對賬流程還是需要在中午之前完成,這樣運營同學就可以在下午處理。
第二個問題,OOM。
上面流程中,我們把把全部數據加載到內存中,小數據量下沒什麼問題。
但是在千萬級數據情況下,數據都加載到內存中,並且還是加載了兩份數據(本端、對端),這就很容易喫完整個應用內存,從而導致 Full GC,甚至還有可能導致應用 OOM。
而且這還會導致級聯反應,一個任務引發 Full GC,導致其他渠道對賬收到影響。
第三個問題,性能問題。
原先系統設計上,單一渠道對賬處理流程只能在單個機器上處理,無法並行處理。
這就導致系統設計伸縮性很差,服務器資源也被大量的浪費。
7 千萬數據級對賬解決辦法
上面系統代碼,實際上還是存在優化空間,可以利用單機多線程並行處理,但是大數據下其實帶來效果不是很好。
那主要原因是因爲發生在系統架構上,當前系統使用底層使用 MySQL 處理的。
傳統的 MySQL 是 OLTP(on-line transaction processing),這個結構決定它適合用於高併發,小事務業務數據處理。
但是對賬業務特性動輒就是百萬級,千萬級數據,數據量處理非常大。但是對賬數據處理大多是一次性,不會頻繁更新。
上面業務特性決定了,MySQL 這種 OLTP 系統不太適合大數據級對賬業務。
那專業的事應該交給專業的人去做,對賬業務也一樣,這種大數據級業務比較適合由 Hive、Spark SQL 等 OLAP 去做。
8 千萬數據級對賬系統實現方案
首先我們先來看下對賬整體時序圖,先有個印象:
下面整篇文章將會圍繞上面時序圖開始講解,由於文章篇幅過長,請各位扶好坐穩,準備發車。
9 數據平臺
前面提到,千萬級數據需要使用 Hive,Spark 等相關大數據技術,這就離不開大數據平臺的技術支持。
簡單聊下我們這邊大數據平臺 DP (Data Platform),它提供用戶大數據離線任務開發所需要的環境、工具以及數據,具有入口統一性、一站式、簡化 Hadoop 本身的複雜性、數據安全等特點。
DP 平臺提供功能如下:
-
數據雙向離線同步,MySQL 與 Hive 互相同步
-
大數據離線計算,支持 SQL(SparkSQL/HiveSQL/Presto) 形式處理各類的數據清洗、轉化、聚合操作,也支持使用 MapReduce、Spark 等形式,處理比較複雜的計算場景
-
即時的 SQL 查詢,允許用戶即時的執行 SQL、查看執行的日誌和結果數以及進行結果數據的可視化分析
-
數據報表
那本篇文章不會涉及具體的大數據技術相關的實現細節,相關原理(主要是咱也不會~),主要聊下對賬系統如何聯合 DP 平臺實現完整數據對賬方案。
10 對賬系統概覽
開頭的時序圖,我們可以看到整個對賬過程設計好幾個業務流程,那在這裏對賬系統內部將會維護一個流程狀態機,當前一個流程處理結束之後,下一個流程才能被觸發。
由於當前對賬系統實現方案,涉及對賬系統與 DP 平臺,對賬系統目前沒辦法調用 DP 平臺觸發任務,但是 DP 平臺可以通過通過 HTTP 接口調用對賬系統。
所以當前流程觸發的方式使用的是定時任務的方案,每個流程有一個單獨的定時任務。
對賬系統內的定時任務觸發的時候,將會判斷當前流程是否已經到達執行條件,即判斷一下當前任務的狀態。
每個定時任務觸發時間人爲設置的時候,岔開一兩分鐘,防止同時運行。
DP 平臺使用自帶調度任務,對賬系統無法控制 DP 任務的運行。
DP 平臺定時任務可以通過運行 Scala 腳本代碼,調用對賬系統提供 HTTP 查詢接口,通過這種方式判斷當前流程是否已經到達執行條件。
下面詳細解釋一下每個流程。
11 初始化對賬任務
對賬系統依靠對賬任務記錄推動流轉,目前每天凌晨將會初始化生成對賬任務記錄,後續任務流轉就可以從這裏開始。
對賬系統維護一張對賬覈對規則表:
對賬覈對規則表關鍵字段含義如下:
-
channel_code 渠道編碼,每個支付渠道將會分配一個唯一渠道編碼,例如微信,支付寶
-
biz_type 業務類型,例如支付,退款,提現等
-
status 是否生效
每次對接新的支付渠道,對賬配置規則需要新增覈對規則。
初始化對賬定時任務將會查找覈對規則表中所有的生效的配置規則,依次生成當天的對賬任務記錄:
對賬任務記錄部分字段與覈對規則表含義一樣,不再贅述,其他字段含義如下:
-
bill_date 賬期,一般 D 日對賬任務覈對 D-1 數據,所以賬期爲 D-1 日
-
batch_no 對賬批次,生成規則如下:賬期 + 渠道編碼 + 001
-
phase,當前對賬任務處於階段,根據上面對賬流程可以分爲:
-
初始化
-
數據收集
-
存疑處理
-
數據覈對
-
二次存疑處理
-
數據彙總
-
差錯數據推送
-
error_reason 錯誤原因
初始化對賬任務結束之後,對賬任務流程推動到第二階段,數據收集。
12 數據收集
數據收集階段,收集兩端待覈對的數據,爲後面的數據覈對任務提供覈對數據。
數據收集階段分爲兩部分:
-
本端數據收集,即自己方產生的支付數據
-
對端數據收集,即三方渠道產生支付數據
本端數據收集
本端數據,是自己業務產生的支付數據,這些數據原本存在各個業務的數據庫中。
對賬系統獲取這些支付數據,一般有兩種方式:
-
查詢,對賬系統主動拉取
-
推送,對賬系統監聽獲取數據
查詢數據方式前面也聊到過,數據量小的情況下,沒什麼問題。一旦數據量變大,查詢效率就會變低。
所以這裏我們採用推送的方式,對賬系統監聽各個業務數據表 binlog,每當業務數據發生變動,對賬系統就可以接受到 binlog 消息。
對賬系統接受到 binlog 消息,將會判斷當前消息是否需要過濾,是否已經支付成功等等,滿足條件之後,binlog 消息將會插入本端數據表中,表結構如下:
本端記錄表關鍵字段含義如下:
-
channel_code 渠道編碼,每個支付渠道將會分配一個唯一渠道編碼,例如微信,支付寶
-
biz_order_no 本端支付流水號
-
bill_date 賬期
-
status 狀態
-
is_check 對賬狀態,0 - 未覈對,1 - 已覈對
-
trade_amount 支付金額
-
channel_order_no 三方渠道支付單號
-
merchant_no 商戶號
-
sub_merchant_no 子商戶號
上面展示的支付記錄表結構,根據業務類型不同,本端其實還有退款記錄表,提現記錄表等。
這裏設計的時候,實際上也可以將所有業務數據放在一張表中,然後根據業務類型字段區分。
對端數據收集
對端數據,就是第三方支付渠道產生支付數據,一般 D 日產生交易之後,D+1 日第三方渠道將會生成一個對賬文件。
對賬系統需要從對端提供的對賬文件獲取對端數據。
渠道的對賬文件,下載方式,文件類型存在很大的差異,每次接入新的支付渠道,這裏需要經過新的開發。
對端數據這裏維護了一張渠道下載配置表,對端數據收集的時候將會獲取所有可用配置:
渠道下載配置表關鍵字段含義如下:
-
mch_id 三方渠道分配的商戶號
-
type 下載類型:
-
FTP
-
SFTP
-
HTTP
-
download_param 下載的配置參數,比如 FTP 的地址,登錄密碼,下載地址等。
對賬文件下載成功之後,需要根據文件類型進行解析,最後轉化自己的需要的對賬數據入庫。
對端數據表結構如下:
上面關鍵字段與本端記錄表類似,額外新增字段:
channel_fee 渠道手續費,用於統計渠道收的手續費。
同樣渠道記錄表根據根據業務類型也分爲退款渠道記錄表,提現渠道記錄表等,同樣也可以合併成一張表,根據業務類型區分。
對端數據收集階段,由於拉取三方渠道的對賬文件,那有時候渠道端存在異常,將會導致對賬文件下載延遲,從而導致其他任務也出現的相應的延遲。
這一點是整個對賬流程中,相對不可控的問題。我們需要在對賬流程設計中考慮這一點。
對賬文件下載解析成功入庫之後,對賬流程將會流轉到下一個流程存疑數據處理。
13 存疑數據處理
講解這個流程之前,先給大家解釋一下什麼是存疑數據?
正常支付過程中,會存在一個兩邊賬期不一致的問題,比如說本端數據支付時間是 2021 年 12 月 28 日 23 點 59 分 59 秒,那麼本端認爲這筆支付交易賬期是 2021 年 12 月 28 日。
然而這筆支付發送給三方渠道之後,三方渠道支付成功的時間已經是 2021 年 12 月 29 日 0 點 0 分 2 秒,三方渠道支付賬期記爲 2021 年 12 月 29 日。
這種情況下我們這邊記錄賬期是 2021 年 12 月 28 日,但是第三方渠道這筆記錄是 2021 年 12 月 29 日,所以 2021 年 12 月 28 日對賬單上沒有這筆支付記錄,這就導致一筆差異數據(一端有 / 一端無)的情況。
上面這種情況就是典型因爲日切問題導致差異。
但是我們知道 2021 年 12 月 29 日對賬單上肯定會包含這筆,所以我們可以先把這筆差異數據掛起,當做存疑數據,等到 2021 年 12 月 29 日賬期對賬的時候,對方賬單包含這筆,當天就能覈對成功,這就解決這筆差異數據。
所以說存疑數據,就跟其字面意思一樣,當這筆數據當前處理不了的時候,那就現放着,不做定論,過一天我再嘗試處理一下。
除了上面日切問題導致的差異數據以外,還有一些情況:
-
網絡問題,導致兩邊訂單狀態不一致。
-
測試環境與生產環境共用一個三方渠道商戶號,測試環境產生的交易出現在對賬單裏
存疑數據分爲三種類型:
-
本端有,渠道無,即本端存在訂單信息,渠道賬單記錄沒有訂單信息,可能是日切導致的問題
-
渠道有,本端無,即本端不存在訂單信息,渠道端賬單記錄卻有訂單信息,可能是測試環境與生產環境共用渠道參數
-
金額不平,即雙方都存在訂單信息,但是雙方訂單金額不一致
瞭解完存疑數據的定義,我們再來看下存疑數據處理的流程。
存疑數據將會由下面的流程中產生,這裏先來看下存疑表結構:
關鍵字段如下:
-
batch_no 批次號
-
biz_id 業務單號
-
biz_amount 金額
-
status 0 - 未處理,1 - 已處理
-
biz_date 賬期
-
biz_type 業務類型
-
channel_code 渠道類型
-
delayed_times 延遲天數
-
merchant_no 商戶號
-
sub_merchant_no 子商戶號
-
buffer_type 存疑類型,0 - 本端存疑,1 - 渠道存疑
存疑處理過程將會撈起所有存疑表中還未處理的存疑數據,根據存疑類型反向查找對賬數據表。例如:
-
渠道存疑(第一天對賬,本端有,渠道無),查找對端數據
-
本端存疑(第一天對賬,本端無,渠道有),查找本端數據
查找對端 / 本端數據,都是根據支付流水號加業務類型查找定位。
如果在本端 / 對端數據中找到,這裏還需要再對比一下金額:
-
如果金額不相等,代表單號相同,但是金額不等,將這筆移動到支付差異表
-
如果金額相等,代表這兩筆核平,存疑表將這筆數據更新爲覈對成功,本端 / 對端數據更新爲對賬成功
上面這一步比較重要,因爲下面對賬覈對過程主要覈對要素是支付流水號 + 支付金額,通過這種方式收集單片賬是無法知道是因爲單號不存在,還是因爲金額不存在原因,具體流程可以看下下面覈對流程。
如果在本端 / 對端數據還是找不到,那就根據渠道配置的存疑規則,如果當前已經存疑的天數大於配置渠道存疑天數,則將數據直接移動到差錯表。
如果存疑天數小於當前渠道配置天數,那就不要管,繼續保存在存疑表,等待下一天存疑數據處理。
一般來說,日切導致的數據,存疑一天,就可以解決。但是有些渠道可能是 T+1 在對賬,這種情況需要配置的存疑天數就要長一點了。
本地存疑數據處理結束之後,下面就要開始 DP 數據處理。
14 數據導入 DP
在 DP 覈對之前,我們需要將對賬系統收集的數據,從 MySQL 導入 DP Hive 表中。
DP 任務調度開始,DP 平臺定時檢測對賬系統提供 HTTP 接口,判斷本次存疑流程是否處理完成。
如果完成,自動觸發將數據從 MySQL 導入 DP Hive 表中。
數據導入之後,將會開始 DP 覈對規程。這個過程就是整個對賬流程最關鍵的部分,這個流程覈對兩端數據,檢查兩端是否存在差異數據。
15 DP 覈對
數據導入結束,DP 平臺開始覈對數據,這個過程分爲兩個覈對任務:
成功數據覈對
成功數據覈對任務,覈對的目的是爲了覈對出本端與對端支付單號與金額一致的數據。
這裏的核對任務使用了 Hive SQL,整個 SQL 如下所示:
---- A
CREATE TABLE IF NOT EXISTS dp.pay_check_success (
`batch_no` bigint comment '批次號',
`merchant_no` string comment '三方商戶號',
`sub_merchant_no` string comment '三方子商戶號',
`biz_id` string comment '對賬業務關聯字段',
`biz_amount` bigint comment '金額',
`biz_date` string comment '業務日期',
`biz_type` int comment '業務類型',
`status` int comment '狀態標識',
`remark` string comment '備註',
`create_time` string comment '創建時間',
`update_time` string comment '修改時間',
`trade_date` int comment '訂單交易日期',
`channel_code` int comment '渠道類型'
);
----B
insert
overwrite table dp.pay_check_success
select
tb1.batch_no as batch_no,
tb1.merchant_no as merchant_no,
tb1.sub_merchant_no as sub_merchant_no,
tb1.biz_id as biz_id,
tb1.biz_amount as biz_amount,
tb1.biz_date as biz_date,
tb1.biz_type as biz_type,
tb1.status as status,
tb1.remark as remark,
tb1.trade_date as trade_date,
tb1.channel_code as channel_code
from
(
select
tb2.batch_no as batch_no,
tb1.merchant_no as merchant_no,
tb1.sub_merchant_no as sub_merchant_no,
tb1.biz_order_no as biz_id,
tb1.trader_amount as biz_amount,
'${DP_1_DAYS_AGO_Ymd}' as biz_date,
'0' as status,
'' as remark,
'${DP_1_DAYS_AGO_Ymd}' as trade_date,
tb1.channel_code as channel_code
from
dp.pay_check_record tb1
inner join dp.pay_check_channel_record tb2 on tb1.biz_order_no = tb2.biz_order_no
and tb1.trader_amount = tb2.trader_amount
and tb1.channel_code = tb2.channel_code
where
tb1.is_check = 0
and tb2.is_check = 0
and tb1.bill_date = '${DP_1_DAYS_AGO_Ymd}'
and tb2.bill_date = '${DP_1_DAYS_AGO_Ymd}'
and tb1.is_filter = 0
) tb1
整個 SQL 分爲兩部分,第一部分將會在 DP 中創建一張 pay_check_success,記錄覈對成功的數據。
第二部分,將覈對成功的數據插入上面創建的 pay_check_success 表中。
查找覈對成功的數據 SQL 如下:
select
tb2.batch_no as batch_no,
tb1.merchant_no as merchant_no,
tb1.sub_merchant_no as sub_merchant_no,
tb1.biz_order_no as biz_id,
tb1.trader_amount as biz_amount,
'${DP_1_DAYS_AGO_Ymd}' as biz_date,
'0' as status,
'' as remark,
'${DP_1_DAYS_AGO_Ymd}' as trade_date,
tb1.channel_code as channel_code
from
dp.pay_check_record tb1
inner join dp.pay_check_channel_record tb2 on tb1.biz_order_no = tb2.biz_order_no
and tb1.trader_amount = tb2.trader_amount
and tb1.channel_code = tb2.channel_code
where
tb1.is_check = 0
and tb2.is_check = 0
and tb1.bill_date = '${DP_1_DAYS_AGO_Ymd}'
and tb2.bill_date = '${DP_1_DAYS_AGO_Ymd}'
and tb1.is_filter = 0
上述 SQL 存在一些 DP 平臺系統變量。DP_1_DAYS_AGO_Ymd 代表當前日期的前一天
主要邏輯非常簡單,利用 SQL 內連接查詢的功能,可以查找單號,金額,渠道編碼一致的數據。
成功數據覈對任務結束,將會把剛纔在 DP 中創建的 pay_check_success 同步回對賬系統的 MYSQL 數據庫中。
存疑數據覈對
存疑數據覈對任務,覈對的目的是爲了覈對出本端與對端支付單號或金額不一致的數據。
這些數據將會當做存疑數據,這些數據將會在第二階段存疑數據處理。
這裏的核對任務也是使用了 Hive SQL ,整個 SQL 跟上面比較類似,SQL 如下所示:
CREATE TABLE IF NOT EXISTS dp.check_dp_buffer_record (
`biz_id` string comment '訂單號',
`order_type` string comment '訂單類型 0本端訂單 1渠道訂單',
`bill_date` int comment '對賬日期',
`biz_type` int comment '業務類型',
`channel_code` int comment '渠道類型',
`amount` string comment '金額',
`merchant_no` string comment '商戶號',
`sub_merchant_no` string comment '三方子商戶號',
`trade_date` int comment '交易日期',
`create_time` string comment '創建時間',
`update_time` string comment '修改時間'
);
insert
overwrite table dp.check_dp_buffer_record
select
tb1.biz_id as biz_id,
tb1.order_type as order_type,
tb1.bill_date as bill_date,
tb1.biz_type as biz_type,
tb1.channel_code as channel_code,
tb1.amount as amount,
tb1.merchant_no as merchant_no,
tb1.sub_merchant_no as sub_merchant_no,
tb1.trade_date as trade_date,
'${DP_0_DAYS_AGO_Y_m_d_HMS}',
'${DP_0_DAYS_AGO_Y_m_d_HMS}'
FROM
(
select
tb1.biz_order_no as biz_id,
0 as order_type,
tb1.bill_date as bill_date,
10 as biz_type,
tb1.channel_code as channel_code,
tb1.trade_amount as amount,
tb1.merchant_no as merchant_no,
tb1.sub_merchant_no as sub_merchant_no,
'${DP_1_DAYS_AGO_Ymd}' as trade_date
FROM
(
select
biz_order_no,
bill_date,
channel_code,
trade_amount,
merchant_no,
sub_merchant_no
from
ods.pay_check_record
where
and bill_date = '${DP_1_DAYS_AGO_Ymd}'
and is_filter = 0
and is_check = 0
) tb1
LEFT JOIN (
select
biz_order_no,
trade_amount,
channel_code
from
ods.pay_check_channel_record
where
and bill_date = '${DP_1_DAYS_AGO_Ymd}'
and is_check = 0
) tb2 ON tb1.biz_order_no = tb2.biz_order_no
and tb1.trade_amount = tb2.trade_amount
and tb1.channel_code = tb2.channel_code
where
tb2.biz_order_no IS NULL
union
select
tb1.biz_order_no as biz_id,
1 as order_type,
tb1.bill_date as bill_date,
10 as biz_type,
tb1.channel_code as channel_code,
tb1.trade_amount as amount,
tb1.merchant_no as merchant_no,
tb1.sub_merchant_no as sub_merchant_no,
'${DP_1_DAYS_AGO_Ymd}' as trade_date
FROM
(
select
biz_order_no,
bill_date,
channel_code,
trade_amount,
merchant_no,
sub_merchant_no
from
ods.pay_check_chnnel_bill
where
and bill_date = '${DP_1_DAYS_AGO_Ymd}'
and is_check = 0
) tb1
LEFT JOIN (
select
biz_order_no,
channel_code,
trade_amount
from
ods.pay_check_record
where
and bill_date = '${DP_1_DAYS_AGO_Ymd}'
and is_filter = 0
and is_check = 0
) tb2 ON tb1.biz_order_no = tb2.biz_order_no
and tb1.trade_amount = tb2.trade_amount
and tb1.channel_code = tb2.channel_code
where
tb2.biz_order_no IS NULL
) tb1;
整個 SQL 分爲兩部分,第一部分將會在 DP 中創建一張 check_dp_buffer_record,記錄覈對差異的的數據。
第二部分,將覈對差異的數據插入上面創建的 check_dp_buffer_record 表中。
查找差異數據較爲麻煩,需要分成兩部分收集:
-
本端單邊賬,即本端存在數據,但是對端不存在數據
-
渠道端單邊賬,即對端存在數據,本端不存在數據
兩邊數據查找到之後,使用 SQL union 功能,將兩端數據聯合。
我們先來看下本端單邊張的邏輯的:
select
tb1.biz_order_no as biz_id,
0 as order_type,
tb1.bill_date as bill_date,
10 as biz_type,
tb1.channel_code as channel_code,
tb1.trade_amount as amount,
tb1.merchant_no as merchant_no,
tb1.sub_merchant_no as sub_merchant_no,
'${DP_1_DAYS_AGO_Ymd}' as trade_date
FROM
(
select
biz_order_no,
bill_date,
channel_code,
trade_amount,
merchant_no,
sub_merchant_no
from
ods.pay_check_record
where
and bill_date = '${DP_1_DAYS_AGO_Ymd}'
and is_filter = 0
and is_check = 0
) tb1
LEFT JOIN (
select
biz_order_no,
trade_amount,
channel_code
from
ods.pay_check_channel_record
where
and bill_date = '${DP_1_DAYS_AGO_Ymd}'
and is_check = 0
) tb2 ON tb1.biz_order_no = tb2.biz_order_no
and tb1.trade_amount = tb2.trade_amount
and tb1.channel_code = tb2.channel_code
where
tb2.biz_order_no IS NULL
SQL 看起來比較複雜,實際邏輯可以簡化爲下面 SQL:
select
*
from
innerTab t1
LEFT JOIN channelTab t2 ON t1.biz_order_no = t2.biz_order_no
and t1.trade_amount = t2.trade_amount
and t1.channel_code = t2.channel_code
where
t2.biz_order_no is null;
這裏主要利用 SQL 左連接的功能,本端數據 left join 渠道數據,如果渠道單號不存在,則認爲本端數據存在,渠道數據不存在,當然也有可能是兩端數據都存在,但是金額不相等。
這種情況記爲本端數據存疑,orderType 爲 0。
渠道端單邊賬收集邏輯:
select
tb1.biz_order_no as biz_id,
1 as order_type,
tb1.bill_date as bill_date,
10 as biz_type,
tb1.channel_code as channel_code,
tb1.trade_amount as amount,
tb1.merchant_no as merchant_no,
tb1.sub_merchant_no as sub_merchant_no,
'${DP_1_DAYS_AGO_Ymd}' as trade_date
FROM
(
select
biz_order_no,
bill_date,
channel_code,
trade_amount,
merchant_no,
sub_merchant_no
from
ods.pay_check_chnnel_bill
where
and bill_date = '${DP_1_DAYS_AGO_Ymd}'
and is_check = 0
) tb1
LEFT JOIN (
select
biz_order_no,
channel_code,
trade_amount
from
ods.pay_check_record
where
and bill_date = '${DP_1_DAYS_AGO_Ymd}'
and is_filter = 0
and is_check = 0
) tb2 ON tb1.biz_order_no = tb2.biz_order_no
and tb1.trade_amount = tb2.trade_amount
and tb1.channel_code = tb2.channel_code
where
tb2.biz_order_no IS NULL
邏輯與本端單邊賬收集類似,渠道數據 left join 本端數據,如果本端單號不存在,則爲渠道數據存在,本端數據不存在。當然也有可能是兩端數據都存在,但是金額不相等。
這裏記爲渠道存疑數據,orderType 爲 1。
成功數據覈對以及存疑數據覈對結束,DP 平臺將會自動把數據從 Hive 表中導入到 MySQL。
數據導出結束,DP 平臺將會調用對賬系統的相關接口,通知對賬系統 DP 覈對流程結束。
DP 覈對流程是整個對賬流程核心流程,目前千萬級數據的情況下,大概能在一個小時之內搞定。
DP 覈對流程結束之後,對賬系統開始下個流程 - 二次存疑數據處理。
16 二次存疑數據處理
前面流程我們講到存疑處理,爲什麼這裏還需要二次存疑數據處理呢?
這因爲 DP 覈對存疑數據收集的過程,我們使用業務單號與金額去互相匹配,那如果不存在,有可能是因爲兩端數據有一端不存在,還有可能是因爲兩端數據數據都存在,但是金額卻不相等。
DP 覈對過程是無法區分出這兩種情況,所以增加一個二次存疑數據處理流程,單獨區分出這兩類數據。
回到二次存疑數據處理流程,當天產生的所有存疑數據都從 DP 中導入到 check_dp_buffer_record 表。
二次存疑數據處理流程將會查找 check_dp_buffer_record 表所有未覈對的記錄,然後依次遍歷。
遍歷過程中將會嘗試在 check_dp_buffer_record 表中查找相反方向的存疑數據。
這個可能不好理解,舉個例子:
假如有一筆訂單,本端是 100 元,渠道端是 10 元。這種情況兩筆記錄都會出現在 check_dp_buffer_record 表。
遍歷到本端這筆的時候,這筆類型是本端存疑,type 爲 0。使用者本端單號從 check_dp_buffer_record 查找渠道端存疑(type 爲 1)的數據。
上面的情況可以找到,證明這筆存疑數據其實是金額不相等,這裏需要將數據移動到差錯表。
那如果是正常一端缺失的數據,那自然去相反方向查找是找不到的,這種數據是正常存疑數據,移動內部存疑表。
對賬系統二次存疑數據處理結束之後,開始下一個階段數據彙總。
17 數據彙總
數據彙總階段就是爲了統計當天每個有多少成功功對賬數據,多少存疑數據,統計結束通過看板給相關運營人員展示統計數據。
由於數據量大的問題,這裏使用的是 DP 平臺 Sprak 任務進行任務統計。
這裏邏輯簡單解釋爲,就是利用 Scala 腳本代碼對數據進行相關求和,這裏代碼沒有普遍性,就不展示具體的邏輯了。
18 差錯數據推送
數據彙總結束之後,開始下一個階段,差錯數據推送給差錯系統。
上面存疑數據處理的流程中轉化的差錯數據,當前存在對賬系統內部差錯數據表中。
目前我們差錯數據是是另外一個差錯系統單獨處理,所以對賬系統需要把差錯數據表數據推送給差錯系統。
這裏的邏輯比較簡單,查找所有待處理的差錯數據,遍歷發送 NSQ 消息給差錯系統。
19 總結
千萬級數據對賬整個流程看起,其實相關操作流程都不是很難。
那我個人認爲這裏難點在於第一需要一套完整大數據平臺體系,第二改變原有對賬方式,思考如何將對賬系統與大數據平臺一起串起來。
希望這篇文章對正好碰到該類問題同仁起到相關幫助。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/iKtmU2GnajosandeeP0H5w