Go 的 IO 流怎麼併發?小技巧分享
今天聊一個存儲的實現細節,數據副本的併發寫入。
存儲的高可靠性和高可用,必須依賴於數據的冗餘機制。比如 3 副本就是把用戶數據複製成 3 份。然後把 3 份數據分發到不同的地方。這個寫下去的動作是有講究的,因爲肯定不希望時延線性增加,你肯定希望的是雖然多寫 2 份數據,但還只耗費 1 份時間。
換句話說,原則上數據雖然變多了,但是時間開銷不能增加。那就只能併發寫入嘍!那這個動作怎麼實現呢?帶大家思考幾個小問題:
-
副本冗餘和併發寫入的動作發生在哪裏?姿勢如何?
-
單次 IO 級別的併發和 IO 流的併發區別在哪裏?
星型寫入和鏈式寫入
有兩種最典型的姿勢:星型寫入和鏈式寫入。
1 星型寫入
副本複製的動作發生在客戶端。這種方式實現簡單,異常處理好控制。缺點主要是節點的扇出大,可能你的客戶端網卡會是個瓶頸。這種方式數據分流分叉的決策點就是在客戶端。
本篇就以這種方式舉例。
2 鏈式寫入
不得不提當然還有另外一種典型的寫入方式:鏈式寫入。客戶端把數據交個副本的第一個節點,然後由第一個節點交給第二個節點,再由第二個節點交給第三個節點。這種寫入方式對比星型寫入,每個節點的扇入扇出都是一份數據,沒有明顯的瓶頸點。網絡的傳輸上更加均衡。
IO 級別的併發
什麼是 IO 級別?
就是看到的操作主體是一次 IO ,也就是單次 IO 。最常見的就是塊存儲下來的 IO ,塊存儲的使用姿勢一般是 open 出一個句柄之後,通過這個句柄下發 IO 。我們處理的是每一次下發的 IO ,把每一次下發 IO 的數據做冗餘,寫入做併發。如下僞代碼,比如一份數據拷貝多份,寫 2 次:
// 步驟一:獲取到用戶數據: buffer
// 步驟二:發往各個服務端節點
for i=0; i<2; i++ {
wn = write(/*網絡句柄*/, /*buffer*/, /*buffer len*/)
}
// 等待響應,並且異常處理
循環調用 2 次發送即可。這種模式是 io 級別的,它處理的是這一筆 IO 。它的主要時延組成是兩部分:網絡 IO 的時延 + 磁盤 IO 的時延。
由於這個是單次 IO 級別的,buffer 可控的、較小的,網絡傳輸的時延相比磁盤 IO ( 機械盤 )幾乎可以忽略。
這裏直接用循環來串行動作來進行網絡發送主要還是因爲整體時延都在磁盤 IO,而磁盤 IO 在不同的節點是併發的。
但是,存儲介質現在越來越快(比如 nvme 盤,傲騰盤等),磁盤 IO 和網絡 IO 的差距越來越小。這時候串行發送網絡數據就不可取了。所以,網絡傳輸的時延最好也是重疊的,把網絡發送這部分也做成併發的。
網絡 IO 做並行化和異步化處理之後,串行的時延只有客戶端循環拷貝多份內存了,內存拷貝這部分佔比還是極低的,對比網絡 IO 和磁盤 IO 可以忽略。當然,如果還要更極致一點,這部分時延也可以重疊起來。此處不表。
IO 流的併發
單次 IO 的冗餘和併發都是很容易理解。冗餘嘛,就是把一份 buffer 拷貝出多份,併發嘛,就是把這多份數據併發的發送出去。這個都是很簡單的 io 的操作調用。
Go 的 IO 不一樣!Go 的 IO 抽象了所謂的 io.Reader , io.Writer 出來。如果童鞋寫過 Go 的 IO 相關的程序就很容易理解。這是一個典型的 IO 流的操作。IO 流的操作包含了成千上萬次的 IO 調用。一般使用 io.Copy 這種函數來操作。io.Copy 的定義,接受一個讀流、一個寫流 :
func Copy(dst Writer, src Reader) (written int64, err error) {
return copyBuffer(dst, src, nil)
}
io.Copy 的結束是要麼讀到 EOF 或者錯誤纔算結束。所以這種情況如果對兩個流用 io.Copy 操作,這個函數調用完,流也完成了。一次 io.Copy 並不是一次簡單的 io 調用,一次 io.Copy 的函數調用裏包含了成千上萬次的 單次的 IO 操作 。
這種就不能簡單的用 for 循環來操作多次 io.Copy 了。用 for 循環那麼 IO 的寫入時延就無法疊加了,就是一個串行的時延。
for i=0; i<2; i++ {
// 時延純線性增長,涼涼。。。。
io.Copy(//)
}
那該怎麼辦呢?把 IO 流一份爲二或者一分爲多?那麼怎麼才能把這個寫入變成多份,並且寫入的時間最好是重疊起來,只消耗 1 份時間呢?
在 Go 裏,怎麼做呢?奇伢先說步驟:
-
需要一個 teeReader 來分流
-
需要一個 Pipe 寫轉讀
-
需要兩個 goroutine 做併發
1 IO 流併發實戰
首先,需要一個 TeeReader ,這個組件主要是用來分流的,把一個讀流分叉出一股數據流出去:
func TeeReader(r Reader, w Writer) Reader {
return &teeReader{r, w}
}
func (t *teeReader) Read(p []byte) (n int, err error) {
n, err = t.r.Read(p)
if n > 0 {
// 把讀到的每一次數據都輸入到 Writer 裏去.
// 分一股數據流出去
if n, err := t.w.Write(p[:n]); err != nil {
return n, err
}
}
return
}
如上,TeeReader 實現分流的原理也很簡單,就是在每一次 Read 的調用中,都把數據寫一份出去。好,現在我們流分叉有了,但是分出來的是一個寫流。
這個好像不大對呢?能否有兩個 Reader ,這兩個讀流裏面流淌的是相同的數據。咋辦?
這時候就需要另外一個組件:Pipe 。調用 io.Pipe 會產生一個 Reader 和 Writer ,把數據寫到 Writer 裏,就能從 Reader 裏原封不動的讀出來。這可太適合寫轉讀了。
剛好,就可以把 TeeReader 接着分出來的數據流用 Pipe 接着,於是乎你就有了兩個相同的 Reader 數據流,接下來只需要把它們放在不同的 goroutine 去操作,那麼這個 IO 流就是併發的。
**Go 實戰慄子 : **
注意,爲了簡單,省略一些異常處理:
func ConcurrencyWrtie(src io.Reader, dest [2]io.Writer) (err error) {
errCh := make(chan error, 1)
// 管道,主要是用來寫、讀流轉化
pr, pw := io.Pipe()
// teeReader ,主要是用來 IO 流分叉
wr := io.TeeReader(src, pw)
// 併發寫入
go func() {
var _err error
defer func() {
pr.CloseWithError(_err)
errCh <- _err
}()
_, _err = io.Copy(dest[1], pr)
}()
defer func() {
// TODO:異常處理
pw.Close()
_err := <-errCh
_ = _err
}()
// 數據寫入
_, err = io.Copy(dest[0], wr)
return err
}
其實,奇伢個人覺得:IO 流的併發其實更適合用鏈式的寫入方式。這個觀點以後有機會分享。
總結
-
IO 級別的併發很簡單,客戶端用 for 循環發就行了,大部分時間是重疊的( 網絡 IO & 磁盤 IO ),那麼就是併發的;
-
流式 IO 的併發也有套路,用 teeReader 分流,用 Pipe 把分出來的寫流轉成讀流,然後用不同的 goroutine 操作即可實現 IO 流的併發;
-
無論是什麼樣的併發,或多或少都會有串行的部分。只要你把這部分的時間比例調整到極小,那整體就還是併發的效果;
後記
一個很簡單的代碼技巧分享給大家。點贊、在看 是對奇伢最大的支持。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/wNBkC-X1FMPuHBX1P_DXbQ