Go 語言五種實現 broadcaster 的方法,你喜歡哪一種?
昨天看到 Jaana Dogan 創建了一個 broadcaster 的庫, 話說美女 Jaana Dogan 又回到了 Google 了麼。她的實現我們就當做 broadcaster 的第一個實現吧。
什麼是 broadcaster?就是村口的大喇叭,一播音,全村都知道了。
Jaana Dogan 實現的這個 broadcaster 只有通知的功能,沒有傳遞消息,也不能重用。我們就以這個庫爲基準,看看我們能夠實現幾種方式。
1、sync.Cond 實現
Jaana Dogan 使用sync.Cond實現。
package main
import (
"sync"
)
type Broadcaster struct {
mu *sync.Mutex
cond *sync.Cond
signaled bool
}
func NewBroadcaster() *Broadcaster {
var mu sync.Mutex
return &Broadcaster{
mu: &mu,
cond: sync.NewCond(&mu),
signaled: false,
}
}
func (b *Broadcaster) Go(fn func()) {
go func() {
b.cond.L.Lock()
defer b.cond.L.Unlock()
for !b.signaled {
b.cond.Wait()
}
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.cond.L.Lock()
b.signaled = true
b.cond.L.Unlock()
b.cond.Broadcast()
}
sync.Cond是一個條件變量,可以用來實現廣播通知。sync.Cond的Wait方法會阻塞當前所有調用的 goroutine,直到一個 goroutine 調用Broadcast方法。
我們可以寫爲它寫一個單元測試,其他四種也使用同樣的單元測試代碼,就不贅述了。
package main
import (
"sync"
"testing"
)
func TestNewBroadcaster(t *testing.T) {
b := NewBroadcaster()
var wg sync.WaitGroup
wg.Add(2)
b.Go(func() {
t.Log("function 1 finished")
wg.Done()
})
b.Go(func() {
t.Log("function 2 finished")
wg.Done()
})
b.Broadcast()
wg.Wait()
}
我覺得直接使用sync.Cond也可以,只不過 Jaana Dogan 將它包裝了一下,更方便使用。
-
需要阻塞等待通知的邏輯放在
Go方法中 -
Broadcast方法用來通知所有等待的 goroutine
2、channel 實現
使用channle可以更簡潔的實現。
package main
type Broadcaster struct {
signal chan struct{}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
signal: make(chan struct{}),
}
}
func (b *Broadcaster) Go(fn func()) {
go func() {
<-b.signal
fn()
}()
}
func (b *Broadcaster) Broadcast() {
close(b.signal)
}
channel的特性是可以關閉,關閉後的channel會一直返回零值,所以我們可以使用close來通知所有等待的 goroutine。
這是常常使用 channel 實現的一種通知機制。
3、context 實現
context是 Go1.7 引入的一個標準庫,用來傳遞請求的上下文,可以用來實現廣播通知。
package broadcaster
import (
"context"
)
type Broadcaster struct {
ctx context.Context
cancel context.CancelFunc
}
func NewBroadcaster() *Broadcaster {
ctx, cancel := context.WithCancel(context.Background())
return &Broadcaster{
ctx: ctx,
cancel: cancel,
}
}
func (b *Broadcaster) Go(fn func()) {
go func() {
<-b.ctx.Done()
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.cancel()
}
平心而論,context也是一種不錯的選擇。
4、sync.WaitGroup 實現
sync.WaitGroup也可以實現廣播通知。
package main
import (
"sync"
"sync/atomic"
)
type Broadcaster struct {
done int32
trigger sync.WaitGroup
}
func NewBroadcaster() *Broadcaster {
b := &Broadcaster{}
b.trigger.Add(1)
return b
}
func (b *Broadcaster) Go(fn func()) {
go func() {
if atomic.LoadInt32(&b.done) == 1 {
return
}
b.trigger.Wait()
fn()
}()
}
func (b *Broadcaster) Broadcast() {
if atomic.CompareAndSwapInt32(&b.done, 0, 1) {
b.trigger.Done()
}
}
sync.WaitGroup的Wait方法會阻塞等待Done方法的調用,所以我們可以使用WaitGroup來實現廣播通知。一旦Wait被放行,所有阻塞在Wait的 goroutine 都會被放行。 所以一開始我們將 WaitGroup 的計數器設置爲 1,然後調用Wait方法,一旦Broadcast方法被調用,計數器減 1,Wait方法放行。
5、sync.RWMutex 實現
sync.RWMutex也可以實現廣播通知。
package main
import (
"sync"
)
type Broadcaster struct {
mu *sync.RWMutex
}
func NewBroadcaster() *Broadcaster {
var mu sync.RWMutex
mu.Lock()
return &Broadcaster{mu: &mu}
}
func (b *Broadcaster) Go(fn func()) {
go func() {
b.mu.RLock()
defer b.mu.RUnlock()
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.mu.Unlock()
}
讀寫鎖的特性是寫鎖會阻塞讀鎖,所以我們可以使用讀寫鎖來實現廣播通知。一開始我們將寫鎖鎖住,然後調用Go方法請求讀鎖,肯定獲取不到而阻塞,一旦Broadcast方法被調用,釋放寫鎖,所有阻塞在讀鎖的 goroutine 都會被放行。
以上羅列了五種實現 broadcaster 的方法,你喜歡哪一種?你還有什麼更好的實現方法嗎?歡迎留言討論。
要想全面瞭解 Go 併發的知識,可以閱讀下面這本書:
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/hHItHcyQKPE--B6QnC7Mcg