Golang 池化技術實現

在 golang 中,經常會出現需要發送短信、通知或其它內容的需求,而這些事情不需要即時性且有點耗時。我們最常用的方法是使用一個 goroutine 來做這個事情。

例如:在訂單發貨後,我們需要給用戶發送一個短信通知。

func SendSms(tmpl string,msg string){
    // 發送短信代碼
}
func OrderShipment(){
    // 訂單發貨邏輯
    // 發貨成功後,發送短信
    go SendSms("發貨模板","消息內容")
}

上面的 goroutine 方法,可以滿足大部分需求。但是有個缺陷,當發貨量非常大時,需要開啓的 goroutine 特別多,當我們需要限制開啓的 goroutine 數量時,我們可以使用池化技術。

池化技術就是提前準備一些資源,在需要時可以重複使用這些預先準備的資源。池化技術的優點主要是提前準備和重複利用,控制和管理線程數和任務數的能力。

我們來做一個 goroutine 的例程池,這樣可以控制 goroutine 的數量。

// ErrScheduleTimeout 在週期內池中沒有空閒的資源返回超時錯誤
var ErrScheduleTimeout = fmt.Errorf("schedule error: timed out")
// Pool 包含兩種通道,sem控制worker的創建,work控制任務的執行
type Pool struct {
    sem  chan struct{}
    work chan func()
}
// NewPool 創建具有給定大小的池,
// size爲池最大創建的例程數量,queue爲worker的數量,spawn爲預先生成例程的數量
func NewPool(size, queue, spawn int) *Pool {
    if spawn <= 0 && queue > 0 {
        panic("dead queue configuration detected")
    }
    if spawn > size {
        panic("spawn > workers")
    }
    p := &Pool{
        sem:  make(chan struct{}, size),
        work: make(chan func(), queue),
    }
    for i := 0; i < spawn; i++ {
        p.sem <- struct{}{}
        go p.worker(func() {})
    }
    return p
}
// Schedule 調度任務
func (p *Pool) Schedule(task func()) {
    p.schedule(task, nil)
}
// ScheduleTimeout 調度任務
// 它將返回錯誤如果在給定週期內沒有資源可用
func (p *Pool) ScheduleTimeout(timeout time.Duration, task func()) error {
    return p.schedule(task, time.After(timeout))
}
func (p *Pool) schedule(task func(), timeout <-chan time.Time) error {
    select {
    case <-timeout:
        return ErrScheduleTimeout
    case p.work <- task:
        return nil
    case p.sem <- struct{}{}:
        go p.worker(task)
        return nil
    }
}
func (p *Pool) worker(task func()) {
    defer func() { <-p.sem }()
    task()
    for task := range p.work {
        task()
    }
}

使用池,下面是個例子。

// 先實例化
pool := gopool.NewPool(128, 1024, 1)
func OrderShipment(){
    // 訂單發貨邏輯
    // 發貨成功後,發送短信
    // 發送短信
    pool.Schedule(func() {
        SendSms("發貨模板","消息內容")
    })
}

掌握它的原理,可根據場景應用到其它地方。

這個池的調度實現的很妙,妙處就在於在調度實現利用了 golang 的 select 特性:如果任意某個通道可以進行,它就執行,其他被忽略;如果所有通道都不會執行,select 將阻塞,直到某個通道可以運行;在 worker 中,使用 for range 通道,直到通道被關閉,for 循環才退出。

func (p *Pool) schedule(task func(), timeout <-chan time.Time) error {
 select {
    case <-timeout:
        return ErrScheduleTimeout
    case p.work <- task: // 如果任務過多,被阻塞,往下執行
        return nil
    case p.sem <- struct{}{}: // 如果達到最大例程數,會被阻塞
        go p.worker(task)
        return nil
    }
}
func (p *Pool) worker(task func()) {
    // 返回前讀取信號
    defer func() { <-p.sem }()
    task()
    // 直到p.work通道被關閉
    for task := range p.work {
        task()
    }
}

如果你對任務執行有時間要求,請使用 ScheduleTimeout 方法,它會在給定的時間段後如果沒有資源可用就返回錯誤,你可以根據報錯做業務處理。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OxMnb-qMw0_X1SMGGxeJyA