面試官:10 億數據如何快速插入 MySQL?

最快的速度把 10 億條數據導入到數據庫,首先需要和麪試官明確一下,10 億條數據什麼形式存在哪裏,每條數據多大,是否有序導入,是否不能重複,數據庫是否是 MySQL?

假設和麪試官明確後,有如下約束

  1. 10 億條數據,每條數據 1 Kb

  2. 數據內容是非結構化的用戶訪問日誌,需要解析後寫入到數據庫

  3. 數據存放在Hdfs 或 S3 分佈式文件存儲裏

  4. 10 億條數據並不是 1 個大文件,而是被近似切分爲 100 個文件,後綴標記順序

  5. 要求有序導入,儘量不重複

  6. 數據庫是 MySQL

首先考慮 10 億數據寫到 MySQL 單表可行嗎?

數據庫單表能支持 10 億嗎?

答案是不能,單表推薦的值是 2000W 以下。這個值怎麼計算出來的呢?

MySQL 索引數據結構是 B + 樹,全量數據存儲在主鍵索引,也就是聚簇索引的葉子結點上。B + 樹插入和查詢的性能和 B + 樹層數直接相關,2000W 以下是 3 層索引,而 2000w 以上則可能爲四層索引。

Mysql b+索引的葉子節點每頁大小 16K。當前每條數據正好 1K,所以簡單理解爲每個葉子節點存儲 16 條數據。b + 索引每個非葉子節點大小也是 16K,但是其只需要存儲主鍵和指向葉子節點的指針,我們假設主鍵的類型是 BigInt,長度爲 8 字節,而指針大小在 InnoDB 中設置爲 6 字節,這樣一共 14 字節,這樣一個非葉子節點可以存儲 16 * 1024/14=1170

也就是每個非葉子節點可關聯 1170 個葉子節點,每個葉子節點存儲 16 條數據。由此可得到 B + 樹索引層數和存儲數量的表格。2KW 以上 索引層數爲 4 層,性能更差。

ZmBZiE

爲了便於計算,我們可以設計單表容量在 1KW,10 億條數據共 100 個表。

如何高效的寫入數據庫

單條寫入數據庫性能比較差,可以考慮批量寫入數據庫,批量數值動態可調整。每條 1K,默認可先調整爲 100 條批量寫入。

批量數據如何保證數據同時寫成功?MySQL Innodb 存儲引擎保證批量寫入事務同時成功或失敗。

寫庫時要支持重試,寫庫失敗重試寫入,如果重試 N 次後依然失敗,可考慮單條寫入 100 條到數據庫,失敗數據打印記錄,丟棄即可。

此外寫入時按照主鍵 id 順序順序寫入可以達到最快的性能,而非主鍵索引的插入則不一定是順序的,頻繁地索引結構調整會導致插入性能下降。最好不創建非主鍵索引,或者在表創建完成後再創建索引,以保證最快的插入性能。

是否需要併發寫同一個表

不能

  1. 併發寫同一個表無法保證數據寫入時是有序的。

  2. 提高批量插入的閾值,在一定程度上增加了插入併發度。無需再併發寫入單表

MySQL 存儲引擎的選擇

Myisam innodb有更好的插入性能,但失去了事務支持,批量插入時無法保證同時成功或失敗,所以當批量插入超時或失敗時,如果重試,勢必對導致一些重複數據的發生。但是爲了保證更快的導入速度,可以把 myisam 存儲引擎列爲計劃之一。

現階段我引用一下別人的性能測試結果:MyISAM 與 InnoDB 對比分析

圖片

從數據可以看到批量寫入明顯優於單條寫入。並且在 innodb 關閉即時刷新磁盤策略後,innodb 插入性能沒有比 myisam 差太多。

innodb_flush_log_at_trx_commit: 控制 MySQL 刷新數據到磁盤的策略。

  1. 默認 = 1,即每次事務提交都會刷新數據到磁盤,安全性最高不會丟失數據。

  2. 當配置爲 0、2 會每隔 1s 刷新數據到磁盤, 在系統宕機、mysql crash時可能丟失 1s 的數據。

考慮到 Innodb 在關閉即時刷新磁盤策略時,批量性能也不錯,所以暫定先使用innodb(如果公司 MySQL 集羣不允許改變這個策略值,可能要使用 MyIsam 了。)。線上環境測試時可以重點對比兩者的插入性能。

要不要進行分庫

mysql 單庫的併發寫入是有性能瓶頸的,一般情況 5K TPS 寫入就很高了。

當前數據都採用 SSD 存儲,性能應該更好一些。但如果是 HDD 的話,雖然順序讀寫會有非常高的表現,但 HDD 無法應對併發寫入,例如每個庫 10 張表,假設 10 張表在併發寫入,每張表雖然是順序寫入,由於多個表的存儲位置不同,HDD 只有 1 個磁頭,不支持併發寫,只能重新尋道,耗時將大大增加,失去順序讀寫的高性能。所以對於 HDD 而言,單庫併發寫多個表並不是好的方案。回到 SSD 的場景,不同 SSD 廠商的寫入能力不同,對於併發寫入的能力也不同,有的支持 500M/s,有的支持 1G/s 讀寫,有的支持 8 個併發,有的支持 4 個併發。在線上實驗之前,我們並不知道實際的性能表現如何。

所以在設計上要更加靈活,需要支持以下能力

  1. 支持配置數據庫的數量

  2. 支持配置併發寫表的數量,(如果 MySQL 是 HDD 磁盤,只讓一張表順序寫入,其他任務等待)

通過以上配置,靈活調整線上數據庫的數量,以及寫表併發度,無論是 HDD 還是 SSD,我們系統都能支持。不論是什麼廠商型號的 SSD,性能表現如何,都可調整配置,不斷獲得更高的性能。這也是後面設計的思路,不固定某一個閾值數量,都要動態可調整。

接下來聊一下文件讀取,10 億條數據,每條 1K,一共是 931G。近 1T 大文件,一般不會生成如此大的文件。所以我們默認文件已經被大致切分爲 100 個文件。每個文件數量大致相同即可。爲什麼切割爲 100 個呢?切分爲 1000 個,增大讀取併發,不是可以更快導入數據庫嗎?剛纔提到數據庫的讀寫性能受限於磁盤,但任何磁盤相比寫操作,讀操作都要更快。尤其是讀取時只需要從文件讀取,但寫入時 MySQL 要執行建立索引,解析 SQL、事務等等複雜的流程。所以寫的併發度最大是 100,讀文件的併發度無需超過 100。

更重要的是讀文件併發度等於分表數量,有利於簡化模型設計。即 100 個讀取任務,100 個寫入任務,對應 100 張表。

如何保證寫入數據庫有序

既然文件被切分爲 100 個 10G 的小文件,可以按照文件後綴 + 在文件行號 作爲記錄的唯一鍵,同時保證同一個文件的內容被寫入同一個表。例如

  1. index_90.txt 被寫入 數據庫 database_9,table_0 ,

  2. index_67.txt 被寫入數據庫 database_6,table_7。

這樣每個表都是有序的。整體有序通過數據庫後綴 + 表名後綴實現。

如何更快地讀取文件

10G 的文件顯然不能一次性讀取到內存中,場景的文件讀取包括

  1. Files.readAllBytes一次性加載內內存

  2. FileReader+ BufferedReader 逐行讀取

  3. File+ BufferedReader

  4. Scanner 逐行讀取

  5. Java NIO FileChannel 緩衝區方式讀取

在 MAC 上,使用這幾種方式的讀取 3.4G 大小文件的性能對比

WaqnoE

詳細的評測內容請參考:讀取文件性能比較 :https://zhuanlan.zhihu.com/p/142029812

由此可見 使用JavaNIO FileChannnel明顯更優,但是FileChannel的方式是先讀取固定大小緩衝區,不支持按行讀取。也無法保證緩衝區正好包括整數行數據。如果緩衝區最後一個字節正好卡在一行數據中間,還需要額外配合讀取下一批數據。如何把緩衝區變爲一行行數據,比較困難。

File file = new File("/xxx.zip");
FileInputStream fileInputStream = null;
long now = System.currentTimeMillis();
try {
       fileInputStream = new FileInputStream(file);
       FileChannel fileChannel = fileInputStream.getChannel();

       int capacity = 1 * 1024 * 1024;//1M
       ByteBuffer byteBuffer = ByteBuffer.allocate(capacity);
       StringBuffer buffer = new StringBuffer();
       int size = 0;
       while (fileChannel.read(byteBuffer) != -1) {
          //讀取後,將位置置爲0,將limit置爲容量, 以備下次讀入到字節緩衝中,從0開始存儲
          byteBuffer.clear();
          byte[] bytes = byteBuffer.array();
          size += bytes.length;
       }
       System.out.println("file size:" + size);
} catch (FileNotFoundException e) {
   e.printStackTrace();
} catch (IOException e) {
   e.printStackTrace();
} finally {
   //TODO close資源.
}
System.out.println("Time:" + (System.currentTimeMillis() - now));

JavaNIO 是基於緩衝區的,ByteBuffer可轉爲 byte 數組,需要轉爲字符串,並且要處理按行截斷。

但是 BufferedReader JavaIO 方式讀取可以天然支持按行截斷,況且性能還不錯,10G 文件,大致只需要讀取 30s,由於導入的整體瓶頸在寫入部分,即便 30s 讀取完,也不會影響整體性能。所以文件讀取使用BufferedReader 逐行讀取。即方案 3

如果協調讀文件任務和寫數據庫任務

這塊比較混亂,請耐心看完。

100 個讀取任務,每個任務讀取一批數據,立即寫入數據庫是否可以呢?前面提到了由於數據庫併發寫入的瓶頸,無法滿足 1 個庫同時併發大批量寫入 10 個表,所以 100 個任務同時寫入數據庫,勢必導致每個庫同時有 10 個表同時在順序寫,這加劇了磁盤的併發寫壓力。爲儘可能提高速度,減少磁盤併發寫入帶來的性能下降, 需要一部分寫入任務被暫停的。那麼讀取任務需要限制併發度嗎?不需要。

假設寫入任務和讀取任務合併,會影響讀取任務併發度。初步計劃讀取任務和寫入任務各自處理,誰也不耽誤誰。但實際設計時發現這個方案較爲困難。

最初的設想是引入 Kafka,即 100 個讀取任務把數據投遞到 Kafka,由寫入任務消費 kafka 寫入 DB。100 個讀取任務把消息投遞到 Kafka,此時順序就被打亂了,如何保證有序寫入數據庫呢?我想到可以使用 Kafka partition 路由,即讀取任務 id 把同一任務的消息都路由到同一個 partition,保證每個 partition 內有序消費。

要準備多少個分片呢?100 個很明顯太多,如果 partition 小於 100 個,例如 10 個。那麼勢必存在多個任務的消息混合在一起。如果同一個庫的多個表在一個 Kafka partition,且這個數據庫只支持單表批量寫入,不支持併發寫多個表。這個庫多個表的消息混在一個分片中,由於併發度的限制,不支持寫入的表對應的消息只能被丟棄。所以這個方案既複雜,又難以實現。

所以最終放棄了 Kafka 方案,也暫時放棄了將讀取和寫入任務分離的方案。

最終方案簡化爲 讀取任務讀一批數據,寫入一批。即任務既負責讀文件、又負責插入數據庫。

如何保證任務的可靠性

如果讀取任務進行到一半,宕機或者服務發佈如何處理呢?或者數據庫故障,一直寫入失敗,任務被暫時終止,如何保證任務再次拉起時,再斷點處繼續處理,不會存在重複寫入呢?

剛纔我們提到可以 爲每一個記錄設置一個主鍵 Id,即 文件後綴 index + 文件所在行號。可以通過主鍵 id 的方式保證寫入的冪等。

文件所在的行號,最大值 大致爲 10G/1k = 10M,即 10000000。拼接最大的後綴 99。最大的 id 爲 990000000。

所以也無需數據庫自增主鍵 ID,可以在批量插入時指定主鍵 ID。

如果另一個任務也需要導入數據庫呢?如何實現主鍵 ID 隔離,所以主鍵 ID 還是需要拼接taskId。例如{taskId}{fileIndex}{fileRowNumber} 轉化爲 Long 類型。如果 taskId 較大,拼接後的數值過大,轉化爲 Long 類型可能出錯。

最重要的是,如果有的任務寫入 1kw,有的其他任務寫入 100W,使用 Long 類型無法獲知每個佔位符的長度,存在衝突的可能性。而如果拼接字符串{taskId}_{fileIndex}_{fileRowNumber} ,新增唯一索引,會導致插入性能更差, 無法滿足最快導入數據的訴求。所以需要想另一個方案。

可以考慮使用 Redis 記錄當前任務的進度。例如 Redis 記錄 task 的進度,批量寫入數據庫成功後,更新 task 進度。

INCRBY KEY_NAME INCR_AMOUNT

指定當前進度增加 100,例如 incrby task_offset_{taskId} 100。如果出現批量插入失敗的,則重試插入。多次失敗,則單個插入,單個更新 redis。要確保 Redis 更新成功,可以在 Redis 更新時 也加上重試。

如果還不放心 Redis 進度和數據庫更新的一致性,可以考慮 消費 數據庫 binlog,每一條記錄新增則 redis +1 。

如果任務出現中斷,則首先查詢任務的 offset。然後讀取文件到指定的 offset 繼續 處理。

如何協調讀取任務的併發度

前面提到了爲了避免單個庫插入表的併發度過高,影響數據庫性能。可以考慮限制併發度。如何做到呢?

既然讀取任務和寫入任務合併一起。那麼就需要同時限制讀取任務。即每次只挑選一批讀取寫入任務執行。

在此之前需要設計一下任務表的存儲模型。

  1. bizId 爲了以後支持別的產品線,預設字段。默認爲 1,代表當前業務線。

  2. datbaseIndex 代表被分配的數據庫後綴

  3. tableIndex 代表被分配的表名後綴

  4. parentTaskId,即總的任務 id

  5. offset 可以用來記錄當前任務的進度

  6. 10 億條數據導入數據庫,切分爲 100 個任務後,會新增 100 個 taskId,分別處理一部分數據,即一個 10G 文件。

  7. status 狀態用來區分當前任務是否在執行,執行完成。

如何把任務分配給每一個節點,可以考慮搶佔方式。每個任務節點都需要搶佔任務,每個節點同時只能搶佔 1 個任務。具體如何實現呢?可以考慮 每個節點都啓動一個定時任務,定期掃表,掃到待執行子任務,嘗試執行該任務。

如何控制併發呢?可以使用 redission 的信號量。key 爲數據庫 id、

RedissonClient redissonClient = Redisson.create(config);
  RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
    // 設置1個併發度
  rSemaphore.trySetPermits(1);
  rSemaphore.tryAcquire();//申請加鎖,非阻塞

由任務負責定期輪訓,搶到名額後,就開始執行任務。將該任務狀態置爲 Process,任務完成後或失敗後,釋放信號量。

TaskTassk 任務表 Redisalt 爭搶信號量成功定時輪訓任務開始查詢待執行的任務循環爭搶信號量修改任務狀態執行中,設置開始時間時間查詢當前進度讀取文件到從當前進度讀取文件,批量導入數據庫更新進度執行完成,釋放信號量申請下一個任務的信號量 TaskTassk 任務表 Redis

但是使用信號量限流有個問題,如果任務忘記釋放信號量,或者進程 Crash 無法釋放信號量,如何處理呢?可以考慮給信號量增加一個超時時間。那麼如果任務執行過長,導致提前釋放信號量,另一個客戶單爭搶到信號量,導致 兩個客戶端同時寫一個任務如何處理呢?

what,明明是將 10 億數據導入數據庫,怎麼變成分佈式鎖超時的類似問題?

實際上 Redisson的信號量並沒有很好的辦法解決信號量超時問題,正常思維:如果任務執行過長,導致信號量被釋放,解決這個問題只需要續約就可以了,任務在執行中,只要發現快信號量過期了,就續約一段時間,始終保持信號量不過期。但是 Redission 並沒有提供信號量續約的能力,怎麼辦?

不妨換個思路,我們一直在嘗試讓多個節點爭搶信號量,進而限制併發度。可以試試選取一個主節點,通過主節點輪訓任務表。分三種情況,

情況 1 當前執行中數量小於併發度。

  1. 則選取 id 最小的待執行任務,狀態置爲進行中,通知發佈消息。

  2. 消費到消息的進程,申請分佈式鎖,開始處理任務。處理完成釋放鎖。藉助於 Redission 分佈式鎖續約,保證任務完成前,鎖不會超時。

情況 2 當前執行中數量等於併發度。

  1. 主節點嘗試 get 進行中任務是否有鎖。

  2. 如果沒有鎖,說明有任務執行失敗,此時應該重新發布任務。如果有鎖,說明有任務正在執行中。

情況 3 當前執行中數量大於併發度

  1. 上報異常情況,報警,人工介入

使用主節點輪訓任務,可以減少任務的爭搶,通過 kafka 發佈消息,接收到消息的進程處理任務。爲了保證更多的節點參與消費,可以考慮增加 Kafka 分片數。雖然每個節點可能同時處理多個任務,但是不會影響性能,因爲性能瓶頸在數據庫。

那麼主節點應該如何選取呢?可以通過Zookeeper+curator 選取主節點。可靠性比較高。

10 億條數據插入數據庫的時間影響因素非常多。包括數據庫磁盤類型、性能。數據庫分庫數量如果能切分 1000 個庫當然性能更快,要根據線上實際情況決策分庫和分表數量,這極大程度決定了寫入的速率。最後數據庫批量插入的閾值也不是一成不變的,需要不斷測試調整,以求得最佳的性能。可以按照 100,1000,10000 等不斷嘗試批量插入的最佳閾值。

最後總結一下幾點重要的

總結

  1. 要首先確認約束條件,才能設計方案。確定面試官主要想問的方向,例如 1T 文件如何切割爲小文件,雖是難點,然而可能不是面試官想考察的問題。

  2. 從數據規模看,需要分庫分表,大致確定分表的規模。

  3. 從單庫的寫入瓶頸分析,判斷需要進行分庫。

  4. 考慮到磁盤對併發寫的支持力度不同,同一個庫多個表寫入的併發需要限制。並且支持動態調整,方便在線上環境調試出最優值。

  5. MySQL innodb、myisam 存儲引擎對寫入性能支持不同,也要在線上對比驗證

  6. 數據庫批量插入的最佳閾值需要反覆測試得出。

  7. 由於存在併發度限制,所以基於 Kafka 分離讀取任務和寫入任務比較困難。所以合併讀取任務和寫入任務。

  8. 需要 Redis 記錄任務執行的進度。任務失敗後,重新導入時,記錄進度,可避免數據重複問題。

  9. 分佈式任務的協調工作是難點,使用 Redission 信號量無法解決超時續約問題。可以由主節點分配任務 + 分佈式鎖保證任務排他寫入。主節點使用Zookeeper+Curator選取。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/zbcwHTTSEbbclGsuuUDHEw