go 實現 ringbuffer 以及 ringbuffer 使用場景介紹
ringbuffer 因爲它能複用緩衝空間,通常用於網絡通信連接的讀寫,雖然市面上已經有了 go 寫的諸多版本的 ringbuffer 組件,雖然諸多版本,實現 ringbuffer 的核心邏輯卻是不變的。但發現其內部提供的方法並不能滿足我當下的需求,所以還是自己造一個吧。
源碼已經上傳到 github
https://github.com/HobbyBear/ringbuffer
需求分析
我在基於 「epoll」 實現一個網絡框架時,需要預先定義好的和客戶端的通信協議,當從連接讀取數據時需要判讀當前連接是否擁有完整的協議 (實際網絡環境中可能完整的協議字節只到達了部分),有才會將數據全部讀取出來,然後進行處理,否則就等待下次連接可讀時,再判斷連接是否具有完整的協議。
由於在讀取時需要先判斷當前連接是否有完整協議,所以讀取時不能移動讀指針的位置,因爲萬一協議不完整的話,下次讀取還要從當前的讀指針位置開始讀取。
所以對於 ringbuffer 組件我會實現一個 peek 方法
func (r *RingBuffer) Peek(readOffsetBack, n int) ([]byte, error)
peek 方法兩個參數,n 代表要讀取的字節數, 「readOffsetBack」 代表讀取是要在當前讀位置偏移的字節數,因爲在設計協議時,往往協議不是那麼簡單 (可能是由多個固定長度的數據構成) ,比如下面這樣的協議格式。
完整的協議有三段構成,每段開頭都會有一個 4 字節的大小代表每段的長度,在判斷協議是否完整時,就必須看着 3 段的數據是否都全部到達。 所以在判斷第二段數據是否完整時,會跳過前面 3 個字節去判斷, 此時 「readOffsetBack」 將會是 3。
此外我還需要一個通過分割符獲取字節的方法,因爲有時候協議不是固定長度的數組了,而是通過某個分割符判斷某段協議是否結束,比如換行符。
func (r *RingBuffer) PeekBytes(readOffsetBack int, delim byte) ([]byte, error)
接着,還需要提供一個更新讀位置的方法,因爲一旦判斷是一個完整的協議後,我會將協議數據全部讀取出來,此時應該要更新讀指針的位置,以便下次讀取新的請求。
func (r *RingBuffer) AddReadPosition(n int)
n 便是代表需要將讀指針往後偏移的 n 個字節。
ringbuffer 原理解析
接着,我們再來看看實際上 ringbuffer 的實現原理是什麼。
首先來看下一個 ringbuffer 應該有的屬性
type RingBuffer struct {
buf []byte
reader io.Reader
r int // 標記下次讀取開始的位置
unReadSize int // 緩衝區中未讀數據大小
}
buf 用作連接讀取的緩衝區,reader 代表了原鏈接,r 代表讀取 ringbuffer 時應該從字節數組的哪個位置開始讀取,unReadSize 代表緩衝區當中還有多少數據沒有讀取,因爲你可能一次性從 reader 裏讀取了很多數據到 buf 裏,但是上層應用只取 buf 裏的部分數據,剩餘的未讀數據就留在了 buf 裏,等待下次被應用層繼續讀取。
我們用一個 5 字節的字節數組當做緩衝區, 首先從 ringbuffer 讀取數據時,由於 ringbuffer 內部沒有數據,所以需要從連接中讀取數據然後寫到 ringbuffer 裏。
如下圖所示:
假設 ringBuffer 規定每次向原網絡連接讀取時 按 4 字節讀取到緩衝區中 (實際情況爲了減少系統調用開銷,這個值會更多,儘可能會一次性讀取更多數據到緩衝區) write pos 指向的位置則代表從 reader 讀取的數據應該從哪個位置開始寫入到 buf 字節數組裏。
writePos = (r + unReadSize) % len(buf)
如果此時上層應用還想再讀取 3 個字節,那麼 ringbuffer 就必須再向 reader 讀取字節填充到緩衝區上,我們假設這次向 reader 索取 3 個字節。緩衝區的空間就會變成下面這樣
當填充上字節後,應用層繼續讀取 3 個字節,那麼 ringBuffer 會變成這樣
讀指針又指向了數組的開頭了,可以得出讀指針的計算公式
r = (r + n)% len(buf)
ringBuffer 代碼解析
有了前面的演示後,再來看代碼就比較容易了。用 peek 方法舉例進行分析,
func (r *RingBuffer) Peek(readOffsetBack, n int) ([]byte, error) {
// 由於目前實現的ringBuffer還不具備自動擴容,所以不支持讀取的字節數大於緩衝區的長度
if n > len(r.buf) {
return nil, fmt.Errorf("the unReadSize is over range the buffer len")
}
peek:
if n <= r.UnReadSize()-readOffsetBack {
// 說明緩衝區中的未讀字節數有足夠長的n個字節,從buf緩衝區直接讀取
readPos := (r.r + readOffsetBack) % len(r.buf)
return r.dataByPos(readPos, (r.r+readOffsetBack+n-1)%len(r.buf)), nil
}
// 說明緩衝區中未讀字節數不夠n個字節那麼長,還需要從reader裏讀取數據到緩衝區中
err := r.fill()
if err != nil {
return nil, err
}
goto peek
}
peek 方法的大致邏輯是首先判斷要讀取的 n 個字節能不能從緩衝區 buf 裏直接讀取,如果能則直接返回,如果不能,則需要從 reader 裏繼續讀取數據,直到 buf 緩衝區數據夠 n 個字節那麼長。
「dataByPos」 方法是根據傳入的元素位置,從 buf 中讀取在這個位置區間內的數據。
// dataByPos 返回索引值在start和end之間的數據,閉區間
func (r *RingBuffer) dataByPos(start int, end int) []byte {
// 因爲環形緩衝區原因,所以末位置索引值有可能小於開始位置索引
if end < start {
return append(r.buf[start:], r.buf[:end+1]...)
}
return r.buf[start : end+1]
}
「fill()」 方法則是從 reader 中讀取數據到 buf 裏。
fill 情況分析
reader 填充新數據到 buf 後,未讀空間未跨越 buf 末尾
// 此時writePos 沒有超過 len(buf)
writePos = (r + unReadSize)
未讀 空間 本來就 已經從頭覆蓋
當未讀空間本來就重新覆蓋了 buf 頭部,和上面類似,這種情況也是直接在 write pos 位置追加數據即可。
未讀空間未跨越 buf 末尾,當從 reader 追加數據到 buf 後發現需要覆蓋 buf 頭部
writePos := (r.r + r.unReadSize) % len(r.buf)
n := copy(r.buf[writePos:], buf[:readBytes])
一部分覆蓋到 buf 的頭部
end := r.r + r.unReadSize + readBytes
copy(r.buf[:end%len(r.buf)], buf[len(r.buf)-writePos:])
現在再來看 fill 的源碼就比較容易理解了。
func (r *RingBuffer) fill() error {
if r.unReadSize == len(r.buf) {
// 當未讀數據填滿buf後 ,就應該等待上層應用把未讀數據讀取一部分再來填充緩衝區
return fmt.Errorf("the unReadSize is over range the buffer len")
}
// batchFetchBytes 爲每次向reader裏讀取多少個字節,如果此時buf的剩餘空間比batchFetchBytes小,則應該只向reader讀取剩餘空間的字節數
readLen := int(math.Min(float64(r.batchFetchBytes), float64(len(r.buf)-r.unReadSize)))
buf := make([]byte, readLen)
readBytes, err := r.reader.Read(buf)
if readBytes > 0 {
// 查看讀取readBytes個字節後,未讀空間有沒有超過buf末尾指針,如果超過了,在複製數據時需要特殊處理
end := r.r + r.unReadSize + readBytes
if end < len(r.buf) {
// 沒有超過末尾指針,直接將數據copy到writePos後面
copy(r.buf[r.r+r.unReadSize:], buf[:readBytes])
} else {
// 超過了末尾指針,有兩種情況,看下圖分析
writePos := (r.r + r.unReadSize) % len(r.buf)
n := copy(r.buf[writePos:], buf[:readBytes])
if n < readBytes {
copy(r.buf[:end%len(r.buf)], buf[len(r.buf)-writePos:])
}
}
r.unReadSize += readBytes
return nil
}
if err != nil {
return err
}
return nil
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/otIvwLrv0cVgu8nE99oLwQ