高併發寫文件的 Ring Buffer 優化思路
場景
一般的系統中都會有大量的寫日誌操作, 業務日誌,mysql 日誌,ngxin 日誌等等。假設有一個系統目前的訪問量很大,那麼就會有大量的寫日誌操作。
Linux 寫日誌方式
Linux 下讀寫文件是有 buffer io 的,寫數據的時候先寫到一個內存中的 page buffer,此時這個 page buffer 就是 dirty(髒的)。
-
當用戶調用 write 時,如果發現系統中髒數據大於閥值 dirty_background_bytes,會觸發 pdflush 進程把髒數據刷到磁盤持久化,用戶的 write 調用會立即返回,無需等待。
-
當用戶調用 write 時,如果發現系統髒數據大於閥值 dirty_bytes 時,需要此用戶進程自己把髒頁刷到磁盤,直到降低到這個閥值一下才返回,此時業務中的 write 操作會阻塞 (被動出發了刷髒操作)
解決思路
可以看出,隨着大量髒數據的產生,來不及刷盤是有可能阻塞 write 系統調用寫日誌的,會造成磁盤 iops 劇烈抖動,iowait 飆升。我們可以採用異步寫入方式,一般異步執行都是需要一個隊列的東西來保存內容,而且這個隊列必須是線程安全.
Ring Buffer
環形緩衝區,本質一個數組,也可以看成一個隊列,不過它是首尾相連成環的,可以存儲數據,用在不同線程間傳遞數據的緩衝區,一邊生產數據塞進去,另一半消費數據拿出來。懶得畫圖,詳細介紹可以參考這篇
Ring Buffer 是什麼
使用數組的最大優點是對 CPU 緩存友好, 數組內元素的內存地址的連續性存儲的,因爲只要一個元素被加載到 CPU 緩存行,其相鄰的其他元素也會被加載進同一個緩存行,減少了 CPU 訪問內存的次數。
多說無益,實現一下,主要思路就是一個圓環,兩個讀寫遊標,分別記錄讀到哪了,寫到哪了,當遊標跑一圈到起始位置時,重置遊標爲 0,當寫遊標等於讀遊標且數組不爲空時滿了。其實,就存在 2 種情況
-
讀遊標在前,寫遊標在後,讀遊標 > 寫遊標
-
讀遊標在後,寫遊標在前,寫遊標 > 讀遊標
實現
package ringbuffer
import (
"errors"
"sync"
)
type RingBuffer struct {
// 存數據
buf []byte
// buf大小
size int
// 讀遊標
r int
// 寫遊標
w int
// 是否滿了
isFull bool
// 線程安全
mu sync.Mutex
}
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
buf: make([]byte, size),
size: size,
}
}
// 讀取指定大小的數據出來
func (r *RingBuffer) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
r.mu.Lock()
defer r.mu.Unlock()
// 讀遊標==寫遊標==0 無數據
if r.w == r.r && !r.isFull {
return 0, errors.New("It is empty")
}
// 第一圈, 寫快讀慢
if r.w > r.r {
// 可讀出來的數據長度
n = r.w - r.r
if n > len(p) {
// 超過搬運緩衝區大小則截取
n = len(p)
}
copy(p, r.buf[r.r:r.r+n])
// 讀完數據了,維護讀遊標,取模技巧(始終會在小於等於size的範圍內)
r.r = (r.r + n) % r.size
return
}
// 第二圈,此時 讀在前,寫在後
// 可讀出來的數據長度計算
n = r.size - r.r + r.w
if n > len(p) {
n = len(p)
}
if r.r+n <= r.size {
// 第一圈可以填充完畢
copy(p, r.buf[r.r:r.r+n])
} else {
// 分兩步驟填充,第一圈填充一部分,第二圈填充一部分
c1 := r.size - r.r
copy(p, r.buf[r.r:r.size])
c2 := n - c1
copy(p[c1:], r.buf[0:c2])
}
// 維護讀遊標,取模技巧(始終會在小於等於size的範圍內)
r.r = (r.r + n) % r.size
// 讀出一部分數據除去,肯定是不滿的
r.isFull = false
return n, err
}
// 寫數據
func (r *RingBuffer) Write(p []byte) (n int, err error) {
// 需要寫入的數據爲空
if len(p) == 0 {
return 0, nil
}
r.mu.Lock()
defer r.mu.Unlock()
// 滿了
if r.isFull {
return 0, errors.New("It is full")
}
// 計算可寫容量
var avail int
if r.w >= r.r {
// 可用 = 總共-寫佔用+讀釋放
avail = r.size - r.w + r.r
} else {
// 寫遊標第二圈了,所以在讀後面,那麼剩餘額容量= (讀遊標-寫遊標)
avail = r.r - r.w
}
// 要寫的數據太多了,存不下
if len(p) > avail {
err = errors.New("Too many data to write")
// 截取
p = p[:avail]
}
n = len(p)
// 第一圈,寫快 讀慢
if r.w >= r.r {
// 第一圈剩餘多少
c1 := r.size - r.w
// 第一圈剩下的夠存下 需要寫入的字符
if c1 >= n {
copy(r.buf[r.w:], p)
r.w += n
} else {
// 第一圈剩下的不夠夠存下需要寫入的字符,分兩步寫
// 第一圈先寫一部分
copy(r.buf[r.w:], p[:c1])
c2 := n - c1
// 第二圈繼續寫剩餘的部分
copy(r.buf[0:], p[c1:])
// 此時寫遊標在第二圈,讀遊標後面
r.w = c2
}
} else {
// 寫遊標跑第二圈了
copy(r.buf[r.w:], p)
r.w += n
}
// 寫遊標寫了一圈到了起始點回歸0
if r.w == r.size {
r.w = 0
}
// 寫遊標跑一圈追上讀遊標了,此時滿了
if r.w == r.r {
r.isFull = true
}
return n, err
}
func (r *RingBuffer) Reset() {
r.mu.Lock()
defer r.mu.Unlock()
r.r = 0
r.w = 0
r.isFull = false
}
func main() {
buffer := ringbuffer.NewRingBuffer(1024)
_, _ = buffer.Write([]byte("hello"))
container := make([]byte, 5)
_, _ = buffer.Read(container)
fmt.Println(string(container))
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/y2i_-mzqJzW72pZjJsyllQ