Golang 秒讀 16GB 大文件

【導讀】go 讀取大文件怎麼做?本文巧妙使用 bufio 達成了讀取大文件的目標,來看看如何實踐吧!

現代計算機系統每天都會產生大量的數據,隨着系統規模的增大,把產生的所有調試數據存儲到數據庫是不可行的,因爲它們產生以後就不會被改變,只是用來分析和解決故障。因此把它存儲在本地磁盤上是一個很好的辦法。

在這我們打算使用 GOLANG 讀取一個 16GB 大小,上百萬行內容的 txt 或者 log 文件。跟我一起來吧。

LET‘s CODE

首先我們使用 GO 標準庫中的 os.File 來打開文件

f, err := os.Open(fileName)
 if err != nil {
   fmt.Println("cannot able to read the file", err)
   return
 }
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

當文件被打開以後,我們有兩個選擇

  1. 逐行讀取,這能幫助我們減少內存的使用,但會耗費大量的時間在 IO 上。

  2. 一次性讀取整個文件到內存中進行處理,這將顯著的增加內存使用,但是會節省大量的時間。

但是要注意,我們的文件大小是 16GB,因此我們不可能把它一次性的加載到內存中。但是第一個選擇也不適合,因爲我們想在幾秒內處理完成。

因此,仔細想想,我們也許還有第三個選擇,我們不去一次性讀取整個文件到內存中,而是分段讀取,想想看 GO 的標準庫中是不是有個 bufio.NewReader() 呢?

r := bufio.NewReader(f)
for {
  buf := make([]byte,4*1024) //the chunk size
  n, err := r.Read(buf) //loading chunk into buffer
  buf = buf[:n]
  if n == 0 {
       if err == io.EOF {
         break
       }
       if err != nil {
         fmt.Println(err)
         break
       }
       return err
    }
}

一旦我們有個一個分塊,我們就可以開啓一個 goroutine 去處理它。因此上面的代碼可以做如下修改。

//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 500*1024)
        return lines
}}
stringPool := sync.Pool{New: func() interface{} {
          lines := ""
          return lines
}}
slicePool := sync.Pool{New: func() interface{} {
           lines := make([]string, 100)
           return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
 
     buf := linesPool.Get().([]byte)
     n, err := r.Read(buf)
     buf = buf[:n]
     if n == 0 {
        if err == io.EOF {
            break
        }
        if err != nil {
            fmt.Println(err)
            break
        }
 
        return err
     }
     nextUntillNewline, err := r.ReadBytes('\n')//read entire line
 
     if err != io.EOF {
         buf = append(buf, nextUntillNewline...)
     }
 
     wg.Add(1)
     go func() { 
 
        //process each chunk concurrently
        //start -> log start time, end -> log end time
 
        ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
        wg.Done()
 
     }()
    }
    wg.Wait()
}

上面的代碼做了兩個優化:

  1. sync.Pool 可以減輕 GC 的壓力,我們可以重複使用已分配的內存,減少內存消耗,加快處理速度。

  2. goroutine 幫助我們併發處理多個切塊,顯著加快處理速度。

接下來我們就來實現 ProcessChunk 函數,來處理如下格式的日誌文件

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我們將根據命令行提供的時間戳來提取日誌

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             
      var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
      logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow 
         noOfThread++
      }
length := len(logsSlice)
//traverse the chunk
     for i := 0; i < length; i += chunkSize {
 
         wg2.Add(1)
//process each chunk in saperate chunk
         go func(s int, e int) {
            for i:= s; i<e;i++{
               text := logsSlice[i]
if len(text) == 0 {
                  continue
               }
 
            logParts := strings.SplitN(text, ",", 2)
            logCreationTimeString := logParts[0]
            logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
                 fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                 return
            }
// check if log's timestamp is inbetween our desired period
          if logCreationTime.After(start) && logCreationTime.Before(end) {
 
            fmt.Println(text)
           }
        }
        textSlice = nil
        wg2.Done()
 
     }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
   //passing the indexes for processing
}  
   wg2.Wait() //wait for a chunk to finish
   logsSlice = nil
}

使用上面的代碼來打開處理 16GB 的日誌文件,測試花費時間大概是 25s。

轉自:寒城

鏈接:zhuanlan.zhihu.com/p/184937550

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/EJa3yXdUEYv5nKDKLWO4qw