無限緩衝的 channel-2-
圖片拍攝於 2020 年 4 月 20 日。
chanx
上篇文章無限緩衝的 channel(1) 我們提到,當我們創建一個有緩衝的通道並指定了容量,那麼在這個通道的生命週期內,我們將再也無法改變它的容量。
由此引發了關於無限緩存的 channel 話題討論。
我們還分析了一個實現無限緩衝的代碼。 最後,我們也提到了它還可以繼續優化的點。
鳥窩的 chanx
正是基於此方案改造而成的,我們來看看他倆的不同之處。
上篇文章說過,所謂的無限緩衝,無非是藉助一箇中間層的數據結構,暫存臨時數據。
在 chanx
中,結構是這樣的:
type UnboundedChan struct {
In chan<- T // channel for write
Out <-chan T // channel for read
buffer *RingBuffer // buffer
}
in
和 out
的職責在上篇文章已經說明,這裏的 buffer
就是我們所謂的中間臨時存儲層。其中的 RingBuffer
結構我們後面再說。
func NewUnboundedChan(initCapacity int) UnboundedChan {
return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity)
}
func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan {
in := make(chan T, initInCapacity)
out := make(chan T, initOutCapacity)
ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)}
go process(in, out, ch)
return ch
}
它提供了兩個初始化 UnboundedChan
的方法,從代碼中我們可以明顯的看出,NewUnboundedChanSize
可以給每個屬性自定義自己的容量大小。僅此而已。
chanx
中 關於 in
和 out
都是帶緩衝的通道,而上篇文章中的 in
和 out
都是無緩衝的通道。 這和他們對數據的流轉處理有很大關係。
我們接下去看 process(in,out,ch)
最核心的方法。
這時候,我們再放上一篇核心代碼。
可以很明顯他們看出它倆的區別。
上篇從 in
通道讀數據會先 append
到 buffer
,然後從 buffer
中取數據寫入 out
通道。 而 chanx
從 in
通道取出數據先嚐試寫入 out
(沒有中間商賺差價?),只有在 out
已經滿的情況下,才塞入到 buffer
。
chanx
還有一段小細節代碼。
能走到這裏,一定是因爲 out
通道滿了。我們把值追加到 buffer
的同時,需要嘗試把 buffer
中的數據寫入 out
。 此時 in
通道也許還在持續的寫入數據, 爲了避免 in
通道塞滿,阻塞業務寫入,我們同時需要嘗試從 in
通道中讀數據追加到 buffer
。
buffer
上篇文章我提到了關於 buffer
優化的點。
chanx
是如何優化的?
// type T interface{}
type RingBuffer struct {
buf []T
initialSize int
size int
r int // read pointer
w int // write pointer
}
這是 buffer
的結構,其中
-
buf
具體存儲數據的結構。 -
initialSize
初始化buf
的長度 -
size
當前buf
的長度 -
r
當前讀數據位置 -
w
當前寫入數據位置
buffer
本質上就是一個環形的隊列,目的是達到資源的複用。 並且當 buffer
滿時,提供自動擴容的功能。
我們來看具體把數據寫入 buffer
的源碼。
接着看擴容。
這段代碼唯一難理解的就是數據遷移了。這裏的數據遷移目的是爲了保證先入先出的原則。
可能加了註釋有些人也無法理解,那麼就再加一個草率圖。
假設我們 buffer
的長度是 8。 當前讀和寫的 index
都是 5。說明 buffer
滿了,觸發自動擴容規則,進行數據遷移。
那麼遷移的過程就是下圖這樣的。
當 buffer
爲空並且當前的 size
比初始化 size
還大,那麼可以考慮重置 buffer
了。
//if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize {
// ch.buffer.Reset()
// }
func (r *RingBuffer) Reset() {
r.r = 0
r.w = 0
r.size = r.initialSize
r.buf = make([]T, r.initialSize)
}
剩下的代碼, 就沒什麼好說的了。
總結
繼上篇文章後,這篇文章我們主要講解了 chanx
是如何實現無限緩衝的 channel
。 其中最重要的一個點在於 chanx
中 buffer
實現採用的是 ringbuffer
,達到資源複用的同時還能自動擴容。
交流
之前就創建了一個自習羣,不過當初那個羣定下規矩,除了學習的東西,不能閒聊,導致這個羣和殭屍羣沒什麼區別。這個故事告訴我們,真的呆在一個完全不閒聊的羣也是一件很扯🥚的事。
因爲目前主力是 Go,所以專門建了一個 Go 交流技術少摸魚羣,取名: 我的眼裏只有 Go。歡迎各位加入。
另外也歡迎加我個人微信交流:remember_wuqinqiang。
如果文章對你有所幫助,點贊、轉發、留言都是一種支持!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/wDrOBRaPqHx6g-Ir-Yx8kA