go 實現 ringbuffer 以及 ringbuffer 使用場景介紹

ringbuffer 因爲它能複用緩衝空間,通常用於網絡通信連接的讀寫,雖然市面上已經有了 go 寫的諸多版本的 ringbuffer 組件,雖然諸多版本,實現 ringbuffer 的核心邏輯卻是不變的。但發現其內部提供的方法並不能滿足我當下的需求,所以還是自己造一個吧。

源碼已經上傳到 github

https://github.com/HobbyBear/ringbuffer

需求分析

我在基於 「epoll」 實現一個網絡框架時,需要預先定義好的和客戶端的通信協議,當從連接讀取數據時需要判讀當前連接是否擁有完整的協議 (實際網絡環境中可能完整的協議字節只到達了部分),有才會將數據全部讀取出來,然後進行處理,否則就等待下次連接可讀時,再判斷連接是否具有完整的協議。

由於在讀取時需要先判斷當前連接是否有完整協議,所以讀取時不能移動讀指針的位置,因爲萬一協議不完整的話,下次讀取還要從當前的讀指針位置開始讀取。

所以對於 ringbuffer 組件我會實現一個 peek 方法

func (r *RingBuffer) Peek(readOffsetBack, n int) ([]byte, error)

peek 方法兩個參數,n 代表要讀取的字節數, 「readOffsetBack」 代表讀取是要在當前讀位置偏移的字節數,因爲在設計協議時,往往協議不是那麼簡單 (可能是由多個固定長度的數據構成)  ,比如下面這樣的協議格式。

完整的協議有三段構成,每段開頭都會有一個 4 字節的大小代表每段的長度,在判斷協議是否完整時,就必須看着 3 段的數據是否都全部到達。 所以在判斷第二段數據是否完整時,會跳過前面 3 個字節去判斷, 此時 「readOffsetBack」 將會是 3。

此外我還需要一個通過分割符獲取字節的方法,因爲有時候協議不是固定長度的數組了,而是通過某個分割符判斷某段協議是否結束,比如換行符。

func (r *RingBuffer) PeekBytes(readOffsetBack int, delim byte) ([]byte, error)

接着,還需要提供一個更新讀位置的方法,因爲一旦判斷是一個完整的協議後,我會將協議數據全部讀取出來,此時應該要更新讀指針的位置,以便下次讀取新的請求。

func (r *RingBuffer) AddReadPosition(n int)

n 便是代表需要將讀指針往後偏移的 n 個字節。

ringbuffer 原理解析

接着,我們再來看看實際上 ringbuffer 的實現原理是什麼。

首先來看下一個 ringbuffer 應該有的屬性

type RingBuffer struct {  
   buf             []byte  
   reader          io.Reader  
   r               int // 標記下次讀取開始的位置  
   unReadSize      int // 緩衝區中未讀數據大小  
}

buf 用作連接讀取的緩衝區,reader 代表了原鏈接,r 代表讀取 ringbuffer 時應該從字節數組的哪個位置開始讀取,unReadSize 代表緩衝區當中還有多少數據沒有讀取,因爲你可能一次性從 reader 裏讀取了很多數據到 buf 裏,但是上層應用只取 buf 裏的部分數據,剩餘的未讀數據就留在了 buf 裏,等待下次被應用層繼續讀取。

我們用一個 5 字節的字節數組當做緩衝區, 首先從 ringbuffer 讀取數據時,由於 ringbuffer 內部沒有數據,所以需要從連接中讀取數據然後寫到 ringbuffer 裏。

如下圖所示:

假設 ringBuffer 規定每次向原網絡連接讀取時 按 4 字節讀取到緩衝區中 (實際情況爲了減少系統調用開銷,這個值會更多,儘可能會一次性讀取更多數據到緩衝區)  write pos 指向的位置則代表從 reader 讀取的數據應該從哪個位置開始寫入到 buf 字節數組裏。

writePos = (r + unReadSize) % len(buf)

接着,上層應用只讀取了 3 個字節,緩衝區中的讀指針 r 和未讀空間就會變成下面這樣

如果此時上層應用還想再讀取 3 個字節,那麼 ringbuffer 就必須再向 reader 讀取字節填充到緩衝區上,我們假設這次向 reader 索取 3 個字節。緩衝區的空間就會變成下面這樣

此時已經複用了首次向 reader 讀取數據時佔據的緩衝空間了。

當填充上字節後,應用層繼續讀取 3 個字節,那麼 ringBuffer 會變成這樣

讀指針又指向了數組的開頭了,可以得出讀指針的計算公式

r = (r + n)% len(buf)

ringBuffer 代碼解析

有了前面的演示後,再來看代碼就比較容易了。用 peek 方法舉例進行分析,

func (r *RingBuffer) Peek(readOffsetBack, n int) ([]byte, error) { 
   // 由於目前實現的ringBuffer還不具備自動擴容,所以不支持讀取的字節數大於緩衝區的長度
   if n > len(r.buf) {  
      return nil, fmt.Errorf("the unReadSize is over range the buffer len")  
   }  
peek:  
   if n <= r.UnReadSize()-readOffsetBack {  
      // 說明緩衝區中的未讀字節數有足夠長的n個字節,從buf緩衝區直接讀取
      readPos := (r.r + readOffsetBack) % len(r.buf)  
      return r.dataByPos(readPos, (r.r+readOffsetBack+n-1)%len(r.buf)), nil  
   }  
   // 說明緩衝區中未讀字節數不夠n個字節那麼長,還需要從reader裏讀取數據到緩衝區中
   err := r.fill()  
   if err != nil {  
      return nil, err  
   }  
   goto peek  
}

peek 方法的大致邏輯是首先判斷要讀取的 n 個字節能不能從緩衝區 buf 裏直接讀取,如果能則直接返回,如果不能,則需要從 reader 裏繼續讀取數據,直到 buf 緩衝區數據夠 n 個字節那麼長。

「dataByPos」 方法是根據傳入的元素位置,從 buf 中讀取在這個位置區間內的數據。

// dataByPos 返回索引值在start和end之間的數據,閉區間  
func (r *RingBuffer) dataByPos(start int, end int) []byte {  
   // 因爲環形緩衝區原因,所以末位置索引值有可能小於開始位置索引  
   if end < start {  
      return append(r.buf[start:], r.buf[:end+1]...)  
   }  
   return r.buf[start : end+1]  
}

「fill()」 方法則是從 reader 中讀取數據到 buf 裏。

fill 情況分析

reader 填充新數據到 buf 後,未讀空間未跨越 buf 末尾

當從 reader 讀取完數據後,如果 end := r.r + r.unReadSize + readBytes   end 指向了未讀空間的末尾,如果沒有超過 buf 的長度,那麼將數據複製到 buf 裏的邏輯很簡單,直接在當前 write pos 的位置追加讀取到的字節就行。

// 此時writePos 沒有超過 len(buf)
writePos = (r + unReadSize)

未讀 空間 本來就 已經從頭覆蓋

當未讀空間本來就重新覆蓋了 buf 頭部,和上面類似,這種情況也是直接在 write pos 位置追加數據即可。

未讀空間未跨越 buf 末尾,當從 reader 追加數據到 buf 後發現需要覆蓋 buf 頭部

這種情況需要將讀取的數據一部分覆蓋到 buf 的末尾

 writePos := (r.r + r.unReadSize) % len(r.buf)  
 n := copy(r.buf[writePos:], buf[:readBytes])

一部分覆蓋到 buf 的頭部

end := r.r + r.unReadSize + readBytes  
copy(r.buf[:end%len(r.buf)], buf[len(r.buf)-writePos:])

現在再來看 fill 的源碼就比較容易理解了。

func (r *RingBuffer) fill() error {  
   if r.unReadSize == len(r.buf) {  
      // 當未讀數據填滿buf後 ,就應該等待上層應用把未讀數據讀取一部分再來填充緩衝區
      return fmt.Errorf("the unReadSize is over range the buffer len")  
   }  
   // batchFetchBytes 爲每次向reader裏讀取多少個字節,如果此時buf的剩餘空間比batchFetchBytes小,則應該只向reader讀取剩餘空間的字節數
   readLen := int(math.Min(float64(r.batchFetchBytes), float64(len(r.buf)-r.unReadSize)))  
   buf := make([]byte, readLen)  
   readBytes, err := r.reader.Read(buf)  
   if readBytes > 0 {  
     // 查看讀取readBytes個字節後,未讀空間有沒有超過buf末尾指針,如果超過了,在複製數據時需要特殊處理
      end := r.r + r.unReadSize + readBytes  
      if end < len(r.buf) {
        // 沒有超過末尾指針,直接將數據copy到writePos後面  
         copy(r.buf[r.r+r.unReadSize:], buf[:readBytes])  
      } else {  
        // 超過了末尾指針,有兩種情況,看下圖分析
         writePos := (r.r + r.unReadSize) % len(r.buf)  
         n := copy(r.buf[writePos:], buf[:readBytes])  
         if n < readBytes {  
            copy(r.buf[:end%len(r.buf)], buf[len(r.buf)-writePos:])  
         }  
      }  
      r.unReadSize += readBytes  
      return nil  
   }  
   if err != nil {  
      return err  
   }  
   return nil  
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/otIvwLrv0cVgu8nE99oLwQ