Go 語言實現安全計數的若干種方法
四哥水平有限,如有翻譯或理解錯誤,煩請幫忙指出,感謝!
原文如下:
有一天,我正研究共享計數器的簡單經典實現,實現方式使用的是 C++ 中的互斥鎖,這時,我非常想知道還有哪些線程安全的實現方式。我通常使用 Go 來滿足自己的好奇心,本文就是一篇如何用 goroutine-safe 的方式實現計數器的方法彙總。
不要這樣做
我們先從非安全的實現方式開始:
type NotSafeCounter struct {
number uint64
}
func NewNotSafeCounter() Counter {
return &NotSafeCounter{0}
}
func (c *NotSafeCounter) Add(num uint64) {
c.number = c.number + num
}
func (c *NotSafeCounter) Read() uint64 {
return c.number
}
代碼上沒什麼特別的地方。我們來測試下結果正確與否:創建 100 個 goroutine,其中三分之二的 goroutine 對共享計數器加一。
func testCorrectness(t *testing.T, counter Counter) {
wg := &sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
if i%3 == 0 {
go func(counter Counter) {
counter.Read()
wg.Done()
}(counter)
} else if i%3 == 1 {
go func(counter Counter) {
counter.Add(1)
counter.Read()
wg.Done()
}(counter)
} else {
go func(counter Counter) {
counter.Add(1)
wg.Done()
}(counter)
}
}
wg.Wait()
if counter.Read() != 66 {
t.Errorf("counter should be %d and was %d", 66, counter.Read())
}
}
測試的結果是不確定的,有時候能正確運行,有時候會出現類似這樣的錯誤:
counter_test.go:34: counter should be 66 and was 65
經典實現方式
實現一個正確計數器的傳統方式是使用互斥鎖,保證任意時間只有一個協程操作計數器。Go 語言的話,我們可以使用 sync 包。
type MutexCounter struct {
mu *sync.RWMutex
number uint64
}
func NewMutexCounter() Counter {
return &MutexCounter{&sync.RWMutex{}, 0}
}
func (c *MutexCounter) Add(num uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.number = c.number + num
}
func (c *MutexCounter) Read() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.number
}
現在測試結果每次都能通過且都是正確的。
使用 channel
鎖是一種保證同步的低級原語。Go 也提供了更高級實現方式 - channel。
關於 mutexe 和 channel,現在有太多類似這樣的討論:“mutexe vs channel”、“哪個更好”、“我應當使用哪一個” 等。其中一些討論非常有趣且有益,但這並不是本文討論的重點。
我們使用 channel 來實現協程安全的計數器,使用 channel 充當隊列,對計數器的操作 (讀、寫) 都緩存在隊列中,按順序操作。具體的操作通過傳遞 func() 實現。創建時,計數器會衍生出一個 goroutine 並且按順序執行隊列裏的操作。
下面是計數器的定義:
type ChannelCounter struct {
ch chan func()
number uint64
}
func NewChannelCounter() Counter {
counter := &ChannelCounter{make(chan func(), 100), 0}
go func(counter *ChannelCounter) {
for f := range counter.ch {
f()
}
}(counter)
return counter
}
當一個協程調用 Add(),就往隊列裏面添加一個寫操作:
func (c *ChannelCounter) Add(num uint64) {
c.ch <- func() {
c.number = c.number + num
}
}
當一個協程調用 Read(),就往隊列裏面添加一個讀操作:
func (c *ChannelCounter) Read() uint64 {
ret := make(chan uint64)
c.ch <- func() {
ret <- c.number
close(ret)
}
return <-ret
}
我真正喜歡這個實現的地方在於,這種按順序執行的方式非常的清晰。
原子方式
我們甚至可以用更低級別的原語,利用 sync/atomic 包執行原子操作。
type AtomicCounter struct {
number uint64
}
func NewAtomicCounter() Counter {
return &AtomicCounter{0}
}
func (c *AtomicCounter) Add(num uint64) {
atomic.AddUint64(&c.number, num)
}
func (c *AtomicCounter) Read() uint64 {
return atomic.LoadUint64(&c.number)
}
比較和交換
或者,我們可以使用非常經典的原語:CAS,對計時器進行計數。
func (c *CASCounter) Add(num uint64) {
for {
v := atomic.LoadUint64(&c.number)
if atomic.CompareAndSwapUint64(&c.number, v, v+num) {
return
}
}
}
func (c *CASCounter) Read() uint64 {
return atomic.LoadUint64(&c.number)
}
float 類型該如何實現
在我探索學習過程中,看到一個非常棒的視頻 - 《Prometheus: Designing and Implementing a Modern Monitoring Solution in Go[1]》。在視頻的最後,討論瞭如何實現浮點數計數器。到目前爲止,所有的技術都適用於浮點數,除了 sync/atomic 包,還沒提供浮點數的原子操作。
在視頻裏,Björn Rabenstein 介紹瞭如何通過將浮點數存儲爲 uint64 並使用 math.Float64bits 和 math.Float64frombits 在 float64 和 uint64 之間進行轉換來解決此問題。
type CASFloatCounter struct {
number uint64
}
func NewCASFloatCounter() *CASFloatCounter {
return &CASFloatCounter{0}
}
func (c *CASFloatCounter) Add(num float64) {
for {
v := atomic.LoadUint64(&c.number)
newValue := math.Float64bits(math.Float64frombits(v) + num)
if atomic.CompareAndSwapUint64(&c.number, v, newValue) {
return
}
}
}
func (c *CASFloatCounter) Read() float64 {
return math.Float64frombits(atomic.LoadUint64(&c.number))
}
最後
這篇文章是共享計數器的實現彙總。這是我好奇心驅使的結果,此外對併發也有一個基本的瞭解。如果你有其他實現共享計數的方式,請告訴我。
本文提到的實現方式對應的代碼可以看這裏 [2],此外還包括運行用例和基準測試。
參考資料
[1]
Prometheus: Designing and Implementing a Modern Monitoring Solution in Go: https://www.youtube.com/watch?v=1V7eJ0jN8-E
[2]
看這裏: https://github.com/brunocalza/sharedcounter
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/9fZUEJSLLnq0sn_6Y9YZ-g