無限緩衝的 channel-2-

圖片拍攝於 2020 年 4 月 20 日。

chanx

上篇文章無限緩衝的 channel(1) 我們提到,當我們創建一個有緩衝的通道並指定了容量,那麼在這個通道的生命週期內,我們將再也無法改變它的容量。

由此引發了關於無限緩存的 channel 話題討論。 

我們還分析了一個實現無限緩衝的代碼。 最後,我們也提到了它還可以繼續優化的點。

鳥窩的 chanx 正是基於此方案改造而成的,我們來看看他倆的不同之處。

上篇文章說過,所謂的無限緩衝,無非是藉助一箇中間層的數據結構,暫存臨時數據。

在 chanx 中,結構是這樣的:

type UnboundedChan struct {
  In chan<- T // channel for write
  Out <-chan T // channel for read
  buffer *RingBuffer // buffer
}

in 和 out 的職責在上篇文章已經說明,這裏的 buffer 就是我們所謂的中間臨時存儲層。其中的 RingBuffer 結構我們後面再說。

func NewUnboundedChan(initCapacity int) UnboundedChan {
  return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity)
}

func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan {
  in := make(chan T, initInCapacity)
  out := make(chan T, initOutCapacity)
  ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)}

  go process(in, out, ch)

  return ch
}

它提供了兩個初始化 UnboundedChan 的方法,從代碼中我們可以明顯的看出,NewUnboundedChanSize 可以給每個屬性自定義自己的容量大小。僅此而已。

chanx 中 關於 in 和 out 都是帶緩衝的通道,而上篇文章中的 in 和 out 都是無緩衝的通道。 這和他們對數據的流轉處理有很大關係。

我們接下去看 process(in,out,ch) 最核心的方法。 

這時候,我們再放上一篇核心代碼。

可以很明顯他們看出它倆的區別。

上篇從 in 通道讀數據會先 append 到 buffer,然後從 buffer 中取數據寫入 out 通道。 而 chanx 從 in 通道取出數據先嚐試寫入 out(沒有中間商賺差價?),只有在 out 已經滿的情況下,才塞入到 buffer

chanx 還有一段小細節代碼。 

能走到這裏,一定是因爲 out 通道滿了。我們把值追加到 buffer 的同時,需要嘗試把 buffer 中的數據寫入 out 。 此時 in 通道也許還在持續的寫入數據, 爲了避免 in 通道塞滿,阻塞業務寫入,我們同時需要嘗試從 in 通道中讀數據追加到 buffer

buffer

上篇文章我提到了關於 buffer 優化的點。

chanx 是如何優化的?

// type T interface{}
type RingBuffer struct {
  buf []T
  initialSize int
  size int
  r int // read pointer
  w int // write pointer
}

這是 buffer 的結構,其中

buffer 本質上就是一個環形的隊列,目的是達到資源的複用。 並且當 buffer 滿時,提供自動擴容的功能。

我們來看具體把數據寫入 buffer 的源碼。 

接着看擴容。 

這段代碼唯一難理解的就是數據遷移了。這裏的數據遷移目的是爲了保證先入先出的原則。

可能加了註釋有些人也無法理解,那麼就再加一個草率圖。

假設我們 buffer 的長度是 8。 當前讀和寫的 index 都是 5。說明 buffer 滿了,觸發自動擴容規則,進行數據遷移。

那麼遷移的過程就是下圖這樣的。

當 buffer 爲空並且當前的 size 比初始化 size 還大,那麼可以考慮重置 buffer 了。

//if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize {
// ch.buffer.Reset()
// }
func (r *RingBuffer) Reset() {
r.r = 0
r.w = 0
r.size = r.initialSize
r.buf = make([]T, r.initialSize)
}

剩下的代碼, 就沒什麼好說的了。

總結

繼上篇文章後,這篇文章我們主要講解了 chanx 是如何實現無限緩衝的 channel。 其中最重要的一個點在於 chanx 中 buffer 實現採用的是 ringbuffer,達到資源複用的同時還能自動擴容。

交流

之前就創建了一個自習羣,不過當初那個羣定下規矩,除了學習的東西,不能閒聊,導致這個羣和殭屍羣沒什麼區別。這個故事告訴我們,真的呆在一個完全不閒聊的羣也是一件很扯🥚的事。

因爲目前主力是 Go,所以專門建了一個 Go 交流技術少摸魚羣,取名: 我的眼裏只有 Go。歡迎各位加入。

另外也歡迎加我個人微信交流:remember_wuqinqiang

如果文章對你有所幫助,點贊、轉發留言都是一種支持!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/wDrOBRaPqHx6g-Ir-Yx8kA