用 Go 語言併發處理 CSV 文件到數據庫
問題背景
假設你擁有一個包含大量聯繫人信息的 CSV 文件,需要將這些信息遷移到數據庫中。這些聯繫人信息可能包含姓名、電話號碼、郵箱地址等。如果使用傳統的單線程方式,逐條處理數據,遷移過程可能會非常緩慢,尤其是在數據量很大時。
在處理大量的 CSV 文件數據並遷移到數據庫時,使用併發可以顯著提升處理效率。Go 語言的 goroutine 和通道(channel)非常適合用來併發地處理數據。
下面我將給出一個示例,展示如何使用 Go 語言併發地處理 CSV 文件,並將數據插入到數據庫中。
主要思路:
-
讀取 CSV 文件:使用
encoding/csv包來解析 CSV 文件。 -
併發處理數據:將 CSV 文件的數據分批次發送到多個 goroutine 中進行併發處理。
-
數據庫插入:每個 goroutine 從通道中接收數據並將其插入到數據庫中。
-
同步控制:使用
sync.WaitGroup來等待所有 goroutine 完成任務。
假設我們的數據庫是 MySQL,使用 github.com/jinzhu/gorm 作爲 ORM 庫來處理數據庫插入。我們會定義一個 Contact 結構體來映射數據庫中的表,並用併發的方式將每一行 CSV 數據插入到數據庫。
示例代碼
1. 安裝必要的依賴
首先,你需要安裝 gorm 和 csv 相關的包:
go get github.com/jinzhu/gorm
go get github.com/jinzhu/gorm/dialects/mysql
go get encoding/csv
2. 數據庫模型定義
我們先定義一個 Contact 結構體,它會對應數據庫中的聯繫人表。
package main
import (
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/mysql"
"fmt"
)
// Contact 是數據庫中表的模型
type Contact struct {
ID uint `gorm:"primary_key"`
Name string `gorm:"size:255"`
Phone string `gorm:"size:255"`
Email string `gorm:"size:255"`
}
func initDB() (*gorm.DB, error) {
// 使用 MySQL 數據庫
db, err := gorm.Open("mysql", "user:password@/dbname?charset=utf8&parseTime=True&loc=Local")
if err != nil {
return nil, err
}
// 自動遷移表結構
db.AutoMigrate(&Contact{})
return db, nil
}
3. 讀取 CSV 文件並處理
接下來,我們需要讀取 CSV 文件並將每一行數據併發地插入到數據庫中。
package main
import (
"encoding/csv"
"fmt"
"os"
"strings"
"sync"
)
// 處理 CSV 文件並將數據插入數據庫
func processCSV(filePath string, db *gorm.DB) error {
// 打開 CSV 文件
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// 創建 CSV 閱讀器
reader := csv.NewReader(file)
// 讀取所有行
records, err := reader.ReadAll()
if err != nil {
return err
}
// 使用 WaitGroup 來同步所有的 goroutine
var wg sync.WaitGroup
// 通道用於發送每行數據
ch := make(chan Contact, len(records))
// 啓動多個 goroutine 來併發處理 CSV 數據
for i := 1; i < len(records); i++ { // 從 1 開始,跳過標題行
wg.Add(1)
go func(record []string) {
defer wg.Done()
// 將 CSV 行轉換爲 Contact 實例
contact := Contact{
Name: record[0],
Phone: record[1],
Email: record[2],
}
ch <- contact // 發送數據到通道
}(records[i])
}
// 啓動一個 goroutine 來將通道中的數據插入到數據庫
go func() {
for contact := range ch {
if err := db.Create(&contact).Error; err != nil {
fmt.Println("Error inserting record:", err)
}
}
}()
// 等待所有 goroutine 完成
wg.Wait()
// 關閉通道
close(ch)
return nil
}
func main() {
// 初始化數據庫
db, err := initDB()
if err != nil {
fmt.Println("Failed to connect to database:", err)
return
}
defer db.Close()
// 處理 CSV 文件並將數據遷移到數據庫
err = processCSV("contacts.csv", db)
if err != nil {
fmt.Println("Error processing CSV file:", err)
return
}
fmt.Println("CSV data successfully migrated to the database.")
}
4. 代碼說明
- 初始化數據庫:
-
initDB函數用於初始化 MySQL 數據庫連接並進行自動遷移。 -
我們使用
gorm來處理數據庫操作,模型Contact映射到數據庫中的contacts表。
- 讀取 CSV 文件:
-
processCSV函數打開並讀取 CSV 文件。然後,它讀取所有的記錄,並將每條記錄通過 goroutine 異步發送到通道中。 -
每個 goroutine 都會將一條記錄從 CSV 轉換爲
Contact對象,並將其發送到通道。
- 併發處理數據:
-
sync.WaitGroup被用來確保所有的 goroutine 完成任務。wg.Add(1)在啓動每個 goroutine 時調用,wg.Done()在每個 goroutine 完成時調用。 -
使用
chan Contact通道來將數據從多個 goroutine 傳遞到數據庫插入部分。一個單獨的 goroutine 從通道中接收數據並將其插入到數據庫。
- 併發插入數據庫:
- 每個 goroutine 向通道發送數據,然後另一個 goroutine 從通道中讀取數據並將其插入數據庫。通過這種方式,多個數據庫插入操作是併發進行的。
- 關閉通道與等待:
-
在所有數據都發送到通道後,使用
wg.Wait()等待所有 goroutine 完成處理。 -
關閉通道以確保數據庫插入操作可以順利結束。
5. 性能優化
在這個例子中,我們併發地讀取 CSV 文件並將數據插入數據庫,顯著提高了處理速度。但是,對於大型數據集,還可以做更多的性能優化:
-
批量插入:可以將多個數據條目批量插入數據庫,而不是每次插入一條記錄。批量插入可以顯著減少數據庫的 I/O 操作,提升性能。
-
控制併發數:通過
semacphore或者限制通道緩衝區大小,可以控制併發數,避免數據庫被過多併發請求壓垮。 -
數據庫連接池:確保數據庫連接池的配置合理,避免過多的併發連接造成數據庫連接耗盡。
6. 總結
通過併發處理,我們能夠大大提升 CSV 文件遷移到數據庫的速度。Go 的 goroutines 和通道非常適合這種類型的任務,可以高效地處理 I/O 密集型的操作。
在處理大型 CSV 文件時,使用併發處理可以顯著提升性能,減少總體處理時間。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/DoK_mtlq1rDr2lAvOGIO1Q