Go 巧用 io-Pipe-- 優化內存佔用情況

當需要向服務端通過 http api 上傳大文件時候,通常會使用以下這種方式

func main(){
    filename := "nohup.out"
    body := new(bytes.Buffer)
    writer := multipart.NewWriter(body)
    part, err := writer.CreateFormFile("file", filename)
    if err != nil {}

    fr, _ := os.Open(filename)
    defer fr.Close()

    io.Copy(part, fr)

    err = writer.Close()
    if err != nil {}

    req, err := http.NewRequest("POST", "http://127.0.0.1:9807/file", body)
    if err != nil {}

    resp, err := http.DefaultClient.Do(req)
    if err != nil {}
    defer resp.Body.Close()
}

但是當上傳的文件過大的時候,這種方式則會佔用過多內存,在內存資源緊張的時候容易導致 oom。

這樣的原因是在請求開始之前將文件的所有內容都讀到了 body 的這個 buffer 裏面,導致請求參數 req 中會攜帶者文件的全部內容。

優化方法可以使用 io.Pipe,通過一種類似於流式的處理方式,在請求發送時候,纔會從傳入的 reader 中讀取數據,而不會將文件一次性的讀取到內存,減少了一次拷貝的過程。

func main(){
    filename := "nohup.out"

    pr, pw := io.Pipe()
    writer := multipart.NewWriter(pw)
    go func() {
            part, err := writer.CreateFormFile("file", filename)
            if err != nil {}

            fr, _ := os.Open(filename)
            defer fr.Close()

            io.Copy(part, fr)

            pw.Close()
            writer.Close()
    }()

    req, err := http.NewRequest("POST", "http://127.0.0.1:9807/file", body)
    if err != nil {}

    resp, err := http.DefaultClient.Do(req)
    if err != nil {}
    defer resp.Body.Close()
}

io.Pipe() 會返回一個 reader 和一個 writer 對象,writer 中寫入的數據,可以從 reader 中讀取出來。

但是需要注意的是,讀與寫,需要通過多協程併發操作;也就是寫的時候,需要有其他協程在讀取,否則會陷入阻塞。這種特性和無緩衝 chan 很類似,其實內部也就是靠它實現的,由於沒用緩存,所有也正是其節約內存的原因。下面瞭解一下它的具體實現原理。

io.Pipe

調用 Pipe 方法,返回了一個 PipeReader 和一個 PipeWriter,但是本質都是同一個 pipe 對象。

func Pipe() (*PipeReader, *PipeWriter) {
    p := &pipe{
        wrCh: make(chan []byte),
        rdCh: make(chan int),
        done: make(chan struct{}),
    }
    return &PipeReader{p}, &PipeWriter{p}
}

pipe 對象包含了兩個無緩衝管道,一個鎖,還有一些關閉管道需要的成員變量。

type pipe struct {
    //用於保護寫入管道,防止多個協程併發寫,導致相互錯亂。
    wrMu sync.Mutex
    //寫入的channel,初始化時被定義成了無緩衝類型
    wrCh chan []byte
    //讀取channel,用於通知寫入方,寫到了哪裏
    rdCh chan int

    once sync.Once // Protects closing done
    done chan struct{}
    rerr onceError
    werr onceError
}

pipe 實現了 Reader 和 Writer 接口,可以兼容標準庫 io 包中的各種操作。

type Writer interface {
    Write(p []byte) (n int, err error)
}
type Reader interface {
    Read(p []byte) (n int, err error)
}

Read

首先了解一下 Read 方法,read 方法傳入一個 byte 切片,將讀取到的數據寫入到切片上。

pipe 的結構中不存在緩存數據,這裏是直接讀取的無緩衝通道 wrCh,將讀取到的數據拷貝到 b 切片,由於在 copy(dst, src) 過程中拷貝數據的多少,依據了 dst 對象的容量,所以在容量不夠時候,可能會丟失一部分 src 尾部的數據。這時需要將拷貝的數據量通知到發送方,便於發送方再次傳輸尾部的數據,這裏利用了 rdCh 管道來發送。

func (p *pipe) Read(b []byte) (n int, err error) {
    //檢查pipe是否已經關閉
    select {
    case <-p.done:
        return 0, p.readCloseError()
    default:
    }

    select {
    case bw := <-p.wrCh:
        nr := copy(b, bw)
        p.rdCh <- nr
        return nr, nil
    case <-p.done:
        return 0, p.readCloseError()
    }
}

Write

write 方法,將傳入的 byte 切片的數據,寫入到自身的 wrCh 中,在寫入之前會通過 wrMu 加鎖,禁止其他協程寫入,寫入結束後再釋放 wrMu。

數據有可能一次性不會被接收端接受完,通過 for 循環,以及接收方發來的 “進度” 信息,確保將數據寫完。通過這種方式確保自己寫入的數據完全被接收方收到。

func (p *pipe) Write(b []byte) (n int, err error) {
    //檢查pipe是否已經被關閉
    select {
    case <-p.done:
        return 0, p.writeCloseError()
    default:
        //加鎖,防止其他goroutine同時寫
        p.wrMu.Lock()
        defer p.wrMu.Unlock()
    }
    //寫入過程,通過循環以及rdCh的返回,確保寫入完全被讀取
    for once := true; once || len(b) > 0; once = false {
        select {
        case p.wrCh <- b:
            nw := <-p.rdCh
            b = b[nw:]
            n += nw
        case <-p.done:
            return n, p.writeCloseError()
        }
    }
    return n, nil
}

Close

當數據寫入結束後,調用 PipeWriter 的 Close 方法來關閉,PipeWriter 的 Close 會生成一個 EOF 存在 werr 中,並關閉名爲 done 的一個管道,此時對於 Reader 的讀請求將會返回 EOF 的錯誤,這也標誌着讀取的結束。

func (w *PipeWriter) Close() error {
    return w.CloseWithError(nil)
}

func (w *PipeWriter) CloseWithError(err error) error {
    return w.p.CloseWrite(err)
}

func (p *pipe) CloseWrite(err error) error {
    if err == nil {
        err = EOF
    }
    p.werr.Store(err)
    p.once.Do(func() { close(p.done) })
    return nil
}

測試

測試上傳的文件大小爲 2.1GB,優化前的內存佔用如下圖,峯值在 6.5 個 G 左右。

優化後效果顯著,內存峯值在 11M 左右。

總結

當上傳大文件時候,可以通過這種方式,跳過 buffer 緩存,將文件內容進行流式讀取,可以優化一部分性能,減少一次拷貝。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/FHI1Vqn5GMflwusCzSZ6UA