用 Go 語言併發處理 CSV 文件到數據庫

問題背景

假設你擁有一個包含大量聯繫人信息的 CSV 文件,需要將這些信息遷移到數據庫中。這些聯繫人信息可能包含姓名、電話號碼、郵箱地址等。如果使用傳統的單線程方式,逐條處理數據,遷移過程可能會非常緩慢,尤其是在數據量很大時。

在處理大量的 CSV 文件數據並遷移到數據庫時,使用併發可以顯著提升處理效率。Go 語言的 goroutine 和通道(channel)非常適合用來併發地處理數據。

下面我將給出一個示例,展示如何使用 Go 語言併發地處理 CSV 文件,並將數據插入到數據庫中。

主要思路:

  1. 讀取 CSV 文件:使用 encoding/csv 包來解析 CSV 文件。

  2. 併發處理數據:將 CSV 文件的數據分批次發送到多個 goroutine 中進行併發處理。

  3. 數據庫插入:每個 goroutine 從通道中接收數據並將其插入到數據庫中。

  4. 同步控制:使用 sync.WaitGroup 來等待所有 goroutine 完成任務。

假設我們的數據庫是 MySQL,使用 github.com/jinzhu/gorm 作爲 ORM 庫來處理數據庫插入。我們會定義一個 Contact 結構體來映射數據庫中的表,並用併發的方式將每一行 CSV 數據插入到數據庫。

示例代碼

1. 安裝必要的依賴

首先,你需要安裝 gormcsv 相關的包:

go get github.com/jinzhu/gorm

go get github.com/jinzhu/gorm/dialects/mysql

go get encoding/csv

2. 數據庫模型定義

我們先定義一個 Contact 結構體,它會對應數據庫中的聯繫人表。

package main



import (

	"github.com/jinzhu/gorm"

	_ "github.com/jinzhu/gorm/dialects/mysql"

	"fmt"

)



// Contact 是數據庫中表的模型

type Contact struct {

	ID        uint   `gorm:"primary_key"`

	Name      string `gorm:"size:255"`

	Phone     string `gorm:"size:255"`

	Email     string `gorm:"size:255"`

}



func initDB() (*gorm.DB, error) {

	// 使用 MySQL 數據庫

	db, err := gorm.Open("mysql", "user:password@/dbname?charset=utf8&parseTime=True&loc=Local")

	if err != nil {

		return nil, err

	}



	// 自動遷移表結構

	db.AutoMigrate(&Contact{})

	return db, nil

}

3. 讀取 CSV 文件並處理

接下來,我們需要讀取 CSV 文件並將每一行數據併發地插入到數據庫中。

package main



import (

	"encoding/csv"

	"fmt"

	"os"

	"strings"

	"sync"

)



// 處理 CSV 文件並將數據插入數據庫

func processCSV(filePath string, db *gorm.DB) error {

	// 打開 CSV 文件

	file, err := os.Open(filePath)

	if err != nil {

		return err

	}

	defer file.Close()



	// 創建 CSV 閱讀器

	reader := csv.NewReader(file)



	// 讀取所有行

	records, err := reader.ReadAll()

	if err != nil {

		return err

	}



	// 使用 WaitGroup 來同步所有的 goroutine

	var wg sync.WaitGroup



	// 通道用於發送每行數據

	ch := make(chan Contact, len(records))



	// 啓動多個 goroutine 來併發處理 CSV 數據

	for i := 1; i < len(records); i++ { // 從 1 開始,跳過標題行

		wg.Add(1)

		go func(record []string) {

			defer wg.Done()

			// 將 CSV 行轉換爲 Contact 實例

			contact := Contact{

				Name:  record[0],

				Phone: record[1],

				Email: record[2],

			}

			ch <- contact // 發送數據到通道

		}(records[i])

	}



	// 啓動一個 goroutine 來將通道中的數據插入到數據庫

	go func() {

		for contact := range ch {

			if err := db.Create(&contact).Error; err != nil {

				fmt.Println("Error inserting record:", err)

			}

		}

	}()



	// 等待所有 goroutine 完成

	wg.Wait()



	// 關閉通道

	close(ch)



	return nil

}



func main() {

	// 初始化數據庫

	db, err := initDB()

	if err != nil {

		fmt.Println("Failed to connect to database:", err)

		return

	}

	defer db.Close()



	// 處理 CSV 文件並將數據遷移到數據庫

	err = processCSV("contacts.csv", db)

	if err != nil {

		fmt.Println("Error processing CSV file:", err)

		return

	}



	fmt.Println("CSV data successfully migrated to the database.")

}

4. 代碼說明

  1. 初始化數據庫
  1. 讀取 CSV 文件
  1. 併發處理數據
  1. 併發插入數據庫
  1. 關閉通道與等待

5. 性能優化

在這個例子中,我們併發地讀取 CSV 文件並將數據插入數據庫,顯著提高了處理速度。但是,對於大型數據集,還可以做更多的性能優化:

6. 總結

通過併發處理,我們能夠大大提升 CSV 文件遷移到數據庫的速度。Go 的 goroutines 和通道非常適合這種類型的任務,可以高效地處理 I/O 密集型的操作。

在處理大型 CSV 文件時,使用併發處理可以顯著提升性能,減少總體處理時間。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DoK_mtlq1rDr2lAvOGIO1Q