Go 巧用 io-Pipe-- 優化內存佔用情況
當需要向服務端通過 http api 上傳大文件時候,通常會使用以下這種方式
func main(){
filename := "nohup.out"
body := new(bytes.Buffer)
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile("file", filename)
if err != nil {}
fr, _ := os.Open(filename)
defer fr.Close()
io.Copy(part, fr)
err = writer.Close()
if err != nil {}
req, err := http.NewRequest("POST", "http://127.0.0.1:9807/file", body)
if err != nil {}
resp, err := http.DefaultClient.Do(req)
if err != nil {}
defer resp.Body.Close()
}
但是當上傳的文件過大的時候,這種方式則會佔用過多內存,在內存資源緊張的時候容易導致 oom。
這樣的原因是在請求開始之前將文件的所有內容都讀到了 body 的這個 buffer 裏面,導致請求參數 req 中會攜帶者文件的全部內容。
優化方法可以使用 io.Pipe,通過一種類似於流式的處理方式,在請求發送時候,纔會從傳入的 reader 中讀取數據,而不會將文件一次性的讀取到內存,減少了一次拷貝的過程。
func main(){
filename := "nohup.out"
pr, pw := io.Pipe()
writer := multipart.NewWriter(pw)
go func() {
part, err := writer.CreateFormFile("file", filename)
if err != nil {}
fr, _ := os.Open(filename)
defer fr.Close()
io.Copy(part, fr)
pw.Close()
writer.Close()
}()
req, err := http.NewRequest("POST", "http://127.0.0.1:9807/file", body)
if err != nil {}
resp, err := http.DefaultClient.Do(req)
if err != nil {}
defer resp.Body.Close()
}
io.Pipe() 會返回一個 reader 和一個 writer 對象,writer 中寫入的數據,可以從 reader 中讀取出來。
但是需要注意的是,讀與寫,需要通過多協程併發操作;也就是寫的時候,需要有其他協程在讀取,否則會陷入阻塞。這種特性和無緩衝 chan 很類似,其實內部也就是靠它實現的,由於沒用緩存,所有也正是其節約內存的原因。下面瞭解一下它的具體實現原理。
io.Pipe
調用 Pipe 方法,返回了一個 PipeReader 和一個 PipeWriter,但是本質都是同一個 pipe 對象。
func Pipe() (*PipeReader, *PipeWriter) {
p := &pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}
return &PipeReader{p}, &PipeWriter{p}
}
pipe 對象包含了兩個無緩衝管道,一個鎖,還有一些關閉管道需要的成員變量。
type pipe struct {
//用於保護寫入管道,防止多個協程併發寫,導致相互錯亂。
wrMu sync.Mutex
//寫入的channel,初始化時被定義成了無緩衝類型
wrCh chan []byte
//讀取channel,用於通知寫入方,寫到了哪裏
rdCh chan int
once sync.Once // Protects closing done
done chan struct{}
rerr onceError
werr onceError
}
pipe 實現了 Reader 和 Writer 接口,可以兼容標準庫 io 包中的各種操作。
type Writer interface {
Write(p []byte) (n int, err error)
}
type Reader interface {
Read(p []byte) (n int, err error)
}
Read
首先了解一下 Read 方法,read 方法傳入一個 byte 切片,將讀取到的數據寫入到切片上。
pipe 的結構中不存在緩存數據,這裏是直接讀取的無緩衝通道 wrCh,將讀取到的數據拷貝到 b 切片,由於在 copy(dst, src) 過程中拷貝數據的多少,依據了 dst 對象的容量,所以在容量不夠時候,可能會丟失一部分 src 尾部的數據。這時需要將拷貝的數據量通知到發送方,便於發送方再次傳輸尾部的數據,這裏利用了 rdCh 管道來發送。
func (p *pipe) Read(b []byte) (n int, err error) {
//檢查pipe是否已經關閉
select {
case <-p.done:
return 0, p.readCloseError()
default:
}
select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
Write
write 方法,將傳入的 byte 切片的數據,寫入到自身的 wrCh 中,在寫入之前會通過 wrMu 加鎖,禁止其他協程寫入,寫入結束後再釋放 wrMu。
數據有可能一次性不會被接收端接受完,通過 for 循環,以及接收方發來的 “進度” 信息,確保將數據寫完。通過這種方式確保自己寫入的數據完全被接收方收到。
func (p *pipe) Write(b []byte) (n int, err error) {
//檢查pipe是否已經被關閉
select {
case <-p.done:
return 0, p.writeCloseError()
default:
//加鎖,防止其他goroutine同時寫
p.wrMu.Lock()
defer p.wrMu.Unlock()
}
//寫入過程,通過循環以及rdCh的返回,確保寫入完全被讀取
for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b:
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}
Close
當數據寫入結束後,調用 PipeWriter 的 Close 方法來關閉,PipeWriter 的 Close 會生成一個 EOF 存在 werr 中,並關閉名爲 done 的一個管道,此時對於 Reader 的讀請求將會返回 EOF 的錯誤,這也標誌着讀取的結束。
func (w *PipeWriter) Close() error {
return w.CloseWithError(nil)
}
func (w *PipeWriter) CloseWithError(err error) error {
return w.p.CloseWrite(err)
}
func (p *pipe) CloseWrite(err error) error {
if err == nil {
err = EOF
}
p.werr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}
測試
測試上傳的文件大小爲 2.1GB,優化前的內存佔用如下圖,峯值在 6.5 個 G 左右。
優化後效果顯著,內存峯值在 11M 左右。
總結
當上傳大文件時候,可以通過這種方式,跳過 buffer 緩存,將文件內容進行流式讀取,可以優化一部分性能,減少一次拷貝。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/FHI1Vqn5GMflwusCzSZ6UA