如何實現 1 小時內完成千萬級數據運算

作者:ninetyhe,騰訊 CDG 後臺開發工程師

本文詳細描述如何實現:目前手上可用的資源僅剩一個 16 核剩餘 4-8G 內存的機器,單點完成在 1 個小時內千萬級別 feed 流數據 flush 操作(主要包括:讀數據,計算綜合得分,淘汰低分數據,並更新最新得分,回寫緩存和數據庫)。

背景

目前工作負責的一款產品增加了綜合得分序的 Feed 流排序方式:需要每天把(將近 1000W 數據量)的 feed 流信息進行算分計算更新後回寫到數據層。手上的批跑物理機器是 16 核(因爲混部,無法獨享 CPU),同時剩下可用內存僅 4-8G。顯而易見的是:我們可以申請機器,多機部署,分片計算或者通過現有的大數據平臺 Hadoop 進行運算都看似可以解決問題。但是由於更新 feed 流的操作需要依賴下游服務(這裏暫且叫 A,後續文中提到下游服務均可稱 A 服務),而下游的服務 A-Server 本身是個 DB 強綁定的關係,也就說明了下游的服務瓶頸在於 DB 的 QPS,這也導致了即便我本身的服務多機部署,分片處理,下游服務的短板導致不可行。而針對方案二通過大數據平臺完成的話,也就是需要推薦大數據的部門協助處理,顯然這個是需要排期處理,而時間上也是不可預估。

既然如此,那就借用,朱光潛老先生的一篇文章《朝抵抗力最大的路徑走》。我本人相信通過合理的資源調度以及更低的成本可以克服眼前的困難,實現最終的需求效果。當然優化過程中並不是一帆風順,當然經過兩週左右的優化迭代,也終於實現了。

業務主要流程流程

整個 flush 的業務流程大致如下:

主要業務流程圖具體如下:

針對上述的業務邏輯,設計出了最初方案

方案圖如下:

最初方案缺陷

將近 1000W 的數據雖然在處理過程中,在使用後的集合或者 Map 都會及時清空:

Map=nil []string=nil  // 清空已使用的內容
runtime.GC()  // 發出GC的請求,希望發起GC

但是問題還是出現了:

內存跑滿(由於機器總內存 18G,所以基本是內存直接跑滿了)

Cpu 也基本瞬間跑滿

堆棧中的異常

compress@v1.12.2/zstd/blockdec.go:215 +0x149
created by github.com/klauspost/compress/zstd.newBlockDec
 /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:118 +0x166

goroutine 61 [chan receive, 438 minutes]:
github.com/klauspost/compress/zstd.(*blockDec).startDecoder(0xc006c1d6c0)
 /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:215 +0x149
created by github.com/klauspost/compress/zstd.newBlockDec
 /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:118 +0x166

goroutine 62 [chan receive, 438 minutes]:
github.com/klauspost/compress/zstd.(*blockDec).startDecoder(0xc006c1d790)
 /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:215 +0x149
created by github.com/klauspost/compress/zstd.newBlockDec
 /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:118 +0x166

goroutine 63 [chan receive, 438 minutes]:
github.com/klauspost/compress/zstd.(*blockDec).startDecoder(0xc006c1d860)
 /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:215 +0x149
created by github.com/klauspost/compress/zstd.newBlockDec
 /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:118 +0x166

因爲堆棧給的信息不多,但是從機器上看基本是 goruntine 開啓的太多,併發量太大,同時大量的數據同時加載到內存,導致了機器的內存和 Cpu 的負載過高。

針對上述的問題,設計出了第二套方案:

對象池具體改進點如下:

協程池

實現比較簡單,這裏就直接上代碼:

// 協程池對象
type PoolBuilder struct {
 workerNum  int                  //  Worker 線程數量
 DelJobChan *chan string // 緩衝隊列

}


// 創建一個協程池
func (pool *PoolBuilder) listenAdd(num int) {
 for i := 0; i < num; i++ {
  go func(i int) { //
   addWorker(pool.AddJobChan)
  }(i)
 }
}
// 任務寫入隊列
func (pool *PoolBuilder) InsertAddChannel(id string, score int64) {
 log.Infof("send value to add channel,%s", id)
 pool.AddJobChan.In <- &AddChannel{
  id:    id,
  score: score,
 }
}
優化後的方案缺陷

內存和 Cpu 的負載相對降下來了,但是由於下游服務 A-Server 是對 DB 的強依賴的類型,所以突然的高併發,DB 的瓶頸成了 A-Server 的服務瓶頸。如果併發量降下來,但是 6 個小時內完成 1000w 的數據讀庫,業務計算,算法排序以及刪除和更新每一條數據的得分,顯然不夠。

陷入僵局

全量的數據計算,併發高,下游服務,下游存儲資源扛不;相對併發不高的情況,數據計算不完。與組內小夥伴商量,可以採用大數據平臺計算不失一種好的辦法。看似最優解,但是大數據平臺接入,以及推動大數據平臺的開發也是需要走排期等流程。

參考開源,集思廣益

經過了兩週的專研和思考,我最終從:hadoop 的 mapreduce 分而治的思想、vert.x 的全異步鏈路(本人超級喜歡的一個框架,使用後,根本不想寫同步代碼了)以及 Linux 的內核調度機制的三種優秀的設計中借鑑了一些思路,最終完成了 2 小時 40 分鐘跑千萬級別的數據優化。

1、Hadoop 的 mapreduce 分而治的思想

把任務拆分成若干分,然後分配給一個 woker 讓每個 worker 處理手中的任務,並把處理後的子任務彙集到一個 woker-A。woker-A 負責把所以的子任務結果,彙總處理,並返回。

啓發

我可以把每一個類別分配給一個協程處理,而每一個協程只負責每一個類別下的所以數據,這樣協程的數量也就是類別的數據,這樣進一步節省了協程數量,但是由於 merge 的結果在最終一步,這樣的話內存就需要存儲處理後全量數據,這一點與目前的內存有限不符合,所以這裏借鑑了把任務分發的思想。

2、Linux 的內核調度機制(非 epoll)

在 Linux 的中內核調度中,我們知道非 epoll 的模式中,無論是 poll 和 select 的時候,都會有一個 select 來負責後續的任務調用和分配,用官方的描述就是:select 輪訓設置或檢查存放 fd 標誌位的數據結構進行下一步處理。如果滿足狀態,就會扭轉到下一個步,喚起相應的進程函數調用。

啓發

這裏可以參考 select 這個負責任的角色,當然改進的地方是我可以增加多個協程來併發查詢所以類別,並進行分發類別處理,這樣話,下游的協程池就可以儘可能的在完成一次調度後,馬上進行下一次調度(因爲分配任務的協程多了),而不會進入調度空閒的狀態。

這裏就直接使用網上的一張圖:

3、vert.x 全異步鏈路

我將這個 vert.x 標紅了,可以看到這裏 vert.x 給我的啓發是最關鍵也是最大的。上述問題,我反覆思考,我發現,其實我如果突然的高併發,必然導致了下游的服務負載過高從而導致 DB 和下游服務扛不住。如果我能平滑的併發,而不是從某個時間點起,併發操作,也許就能解決這個問題。

併發代碼我們寫的多,但也許我們大家寫的只是併發而不是真正的異步,因爲我們在開始或者函數彙總的結果初我們都會使用阻塞,當然我也是有短時間沒有寫全異步的代碼了,所以思想固化了,具體案例如下分析:

這種在主線中啓動併發或者異步的處理,最終還是需要在主線程中使用 wait 來阻塞等等所以線程的結果處理完畢,這樣看似提高的吞吐量,但是由於需要對併發線程或協程的結果進行彙總計算,這樣就註定要把大量的結果集合存儲到內存,然後進行後續的操作。這樣的異步更像一種僞裝異步。

而在 vert.x 中是將上下游的數據通信都是用了 callback 的方式處理,而正是這樣,這個框架的做到了全鏈路的異步邏輯。這裏我們看看這個框架的核心思想:

Vertx 完成採用另一個機制,用一個線程來接受請求(也可以是幾個,注意是幾個,不是幾百個),而把這個真正要執行的任務委託給另外一個線程來執行,從而不會堵塞當前線程。

另外在 Vert.x 中的調度模型也正是使用了 Linux 的 epoll 的事件驅動的機制,大致如下:

整體來看 vert.x 的做到了:

  1. 非阻塞處理請求,異步執行阻塞程序,保證了請求處理的高效性;

  2. 使用 Event Bus 事件總線來進行通訊,可以輕鬆編寫出分佈式、松耦合、高擴展性的程序。

這裏可以展示一下 Vert.x 的異步代碼:

public class Server extends AbstractVerticle {
  public void start() {
    vertx.createHttpServer().requestHandler(req -> {
      req.response()
        .putHeader("content-type""text/plain")
        .end("Hello from Vert.x!");
    }).listen(8080);
  }
}

對異步代碼有興趣的小夥伴一定要看看:https://vertx.io/

優化改造開始

借鑑了上述優秀的思想,我對自己的服務做了以下改進:

1、我構造了 4 個協程池,分別是查詢類別 category、查詢 DB 基本信息、根據算法計算綜合得分、和數據更新回寫;

2、從主協程開始,不做任何阻塞,查詢類別的協程協程池,每查詢一個類別,結果直接丟到 channelA(不阻塞然後繼續擦下類別);

3、查詢 DB 的協程,監聽 channelA,當發現有數據的時候,查詢 DB 信息,並將結果丟到 channelB(同上不做任何阻塞,繼續查詢下一條數據的結果集合);

4、帖子得分協程池讀取 channelB 的數據,然後根據算法計算處理帖子的得分,並將結果集合丟到 channelC(同樣不做任何阻塞,繼續計算下一次的得分數據);

5、而數據回寫的協程負責調用下游服務 A-Server,處理後完,打 log,標記處理的偏移量(由於沒有阻塞,需要跟着最終所以數據是否處理完成)。

業務架構設計如下:

優化效果

1、協程數 6w->100!,這裏協程數從 6w 降到了 100 個協程就 Cover 住了整個項目;

2、內存使用情況,從基本跑滿到僅僅使用 1-2G 的正常內存。

3、CPU 的使用 460% 的使用率直接降到 65%:

4、計算數據量 1000w 的時間 6 個小時併發算不完到 1 小時 46 分鐘計算完成。

總結:沒想到自己的堅持看到了效果,自選股的業務中也因此可以接入綜合得分序列的 feed 流,我相信這個是一個好的開始,在這個基礎上,我們可以根據個人畫像做更多的智能推薦,期間大夥的建議更多是借用大數據平臺計算,而實際的推進和排期讓我更願意用自己的方式以最低的成本最優的結構去優化完成,當然這次很幸運,自己的努力實現了。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9QD8kzWHmuN4m2MmEg92NQ