高併發寫文件的 Ring Buffer 優化思路

場景

一般的系統中都會有大量的寫日誌操作, 業務日誌,mysql 日誌,ngxin 日誌等等。假設有一個系統目前的訪問量很大,那麼就會有大量的寫日誌操作。

Linux 寫日誌方式

Linux 下讀寫文件是有 buffer io 的,寫數據的時候先寫到一個內存中的 page buffer,此時這個 page buffer 就是 dirty(髒的)。

解決思路

可以看出,隨着大量髒數據的產生,來不及刷盤是有可能阻塞 write 系統調用寫日誌的,會造成磁盤 iops 劇烈抖動,iowait 飆升。我們可以採用異步寫入方式,一般異步執行都是需要一個隊列的東西來保存內容,而且這個隊列必須是線程安全.

Ring Buffer

環形緩衝區,本質一個數組,也可以看成一個隊列,不過它是首尾相連成環的,可以存儲數據,用在不同線程間傳遞數據的緩衝區,一邊生產數據塞進去,另一半消費數據拿出來。懶得畫圖,詳細介紹可以參考這篇

Ring Buffer 是什麼

使用數組的最大優點是對 CPU 緩存友好, 數組內元素的內存地址的連續性存儲的,因爲只要一個元素被加載到 CPU 緩存行,其相鄰的其他元素也會被加載進同一個緩存行,減少了 CPU 訪問內存的次數。

多說無益,實現一下,主要思路就是一個圓環,兩個讀寫遊標,分別記錄讀到哪了,寫到哪了,當遊標跑一圈到起始位置時,重置遊標爲 0,當寫遊標等於讀遊標且數組不爲空時滿了。其實,就存在 2 種情況

實現

package ringbuffer

import (
 "errors"
 "sync"
)

type RingBuffer struct {
 // 存數據
 buf []byte

 // buf大小
 size int

 // 讀遊標
 r int

 // 寫遊標
 w int

 // 是否滿了
 isFull bool

 // 線程安全
 mu sync.Mutex
}

func NewRingBuffer(size int) *RingBuffer {
 return &RingBuffer{
  buf:  make([]byte, size),
  size: size,
 }
}

// 讀取指定大小的數據出來
func (r *RingBuffer) Read([]byte) (n int, err error) {
 if len(p) == 0 {
  return 0, nil
 }

 r.mu.Lock()
 defer r.mu.Unlock()

 // 讀遊標==寫遊標==0 無數據
 if r.w == r.r && !r.isFull {
  return 0, errors.New("It is empty")
 }

 // 第一圈, 寫快讀慢
 if r.w > r.r {
  // 可讀出來的數據長度
  n = r.w - r.r
  if n > len(p) {
   // 超過搬運緩衝區大小則截取
   n = len(p)
  }

  copy(p, r.buf[r.r:r.r+n])

  // 讀完數據了,維護讀遊標,取模技巧(始終會在小於等於size的範圍內)
  r.r = (r.r + n) % r.size
  return
 }

 // 第二圈,此時 讀在前,寫在後
 // 可讀出來的數據長度計算
 n = r.size - r.r + r.w
 if n > len(p) {
  n = len(p)
 }

 if r.r+n <= r.size {
  // 第一圈可以填充完畢
  copy(p, r.buf[r.r:r.r+n])
 } else {
  // 分兩步驟填充,第一圈填充一部分,第二圈填充一部分
  c1 := r.size - r.r
  copy(p, r.buf[r.r:r.size])
  c2 := n - c1
  copy(p[c1:], r.buf[0:c2])
 }

 // 維護讀遊標,取模技巧(始終會在小於等於size的範圍內)
 r.r = (r.r + n) % r.size

 // 讀出一部分數據除去,肯定是不滿的
 r.isFull = false

 return n, err
}

// 寫數據
func (r *RingBuffer) Write([]byte) (n int, err error) {
 // 需要寫入的數據爲空
 if len(p) == 0 {
  return 0, nil
 }

 r.mu.Lock()
 defer r.mu.Unlock()

 // 滿了
 if r.isFull {
  return 0, errors.New("It is full")
 }

 // 計算可寫容量
 var avail int
 if r.w >= r.r {
  // 可用 = 總共-寫佔用+讀釋放
  avail = r.size - r.w + r.r
 } else {
  // 寫遊標第二圈了,所以在讀後面,那麼剩餘額容量= (讀遊標-寫遊標)
  avail = r.r - r.w
 }

 // 要寫的數據太多了,存不下
 if len(p) > avail {
  err = errors.New("Too many data to write")
  // 截取
  p = p[:avail]
 }
 n = len(p)

 // 第一圈,寫快 讀慢
 if r.w >= r.r {
  // 第一圈剩餘多少
  c1 := r.size - r.w
  // 第一圈剩下的夠存下 需要寫入的字符
  if c1 >= n {
   copy(r.buf[r.w:], p)
   r.w += n
  } else {
   // 第一圈剩下的不夠夠存下需要寫入的字符,分兩步寫
   // 第一圈先寫一部分
   copy(r.buf[r.w:], p[:c1])
   c2 := n - c1
   // 第二圈繼續寫剩餘的部分
   copy(r.buf[0:], p[c1:])

   // 此時寫遊標在第二圈,讀遊標後面
   r.w = c2
  }
 } else {
  // 寫遊標跑第二圈了
  copy(r.buf[r.w:], p)
  r.w += n
 }

 // 寫遊標寫了一圈到了起始點回歸0
 if r.w == r.size {
  r.w = 0
 }

 // 寫遊標跑一圈追上讀遊標了,此時滿了
 if r.w == r.r {
  r.isFull = true
 }

 return n, err
}

func (r *RingBuffer) Reset() {
 r.mu.Lock()
 defer r.mu.Unlock()

 r.r = 0
 r.w = 0
 r.isFull = false
}



func main() {
 buffer := ringbuffer.NewRingBuffer(1024)
 _, _ = buffer.Write([]byte("hello"))
 container := make([]byte, 5)
 _, _ = buffer.Read(container)
 fmt.Println(string(container))
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/y2i_-mzqJzW72pZjJsyllQ