一個頗爲巧妙的 go 併發模式: or pattern

golang 由於對多併發的支持,它被廣泛採用與後臺開發。它特色的 routine 是一種高效的輕量級線程,採用 go 語言,再加上它所提供的各種支持高併發的語言機制,使得用它開發可擴展性高的後臺服務器變得容易很多。

我使用 go 一段時間,在最近學習過程中發現一個頗爲巧妙的併發模式,因此想總結一下。go 的最大特色是 goroutine 和 channel,不同 routine 之間能通過 channel 進行數據發送和通訊,假設我們發動了很多個 routine 用於請求某項服務,只要其中一個完成了,其他 routine 就可以直接結束,面對這樣的需求情況該如何有效的處理呢。

如果 routine 的數量不多的情況下,例如我們發動了 3 個 routine 向遠程服務器發出請求,只要有一個迴應了,所有 routine 就可以結束,假設每個 routine 都返回一個 channel,一旦成功請求到遠程服務,它就把返回結果寫入到它對應的 channel 中,示例如下:

remoteServiceRequest := func(ip string, port int) <-chan string {
    result := make(chan string)
    //請求遠程服務,並把請求結果寫入到result
    ...
    return result
}

//啓動3個併發請求
var serviceRes = make([] <-chan string, 0, 3)
for i := 0; i < 3; i++ {
    serviceRes[i] = remoteServiceRequest(remote_ip, remote_port)
}

//等到3個請求,只要其中有一個返回即可
select {
case res0 := <-serviceRes[0]:
       do_something(res0)
 case res1 := <-serviceRes[1]:
     do_something(res1)
 case res2 := <-serviceRes[2]:
     do_something(res2)      
}

在 go 中,要等待若干個異步請求返回時,就可以使用 select 模式,問題在於如果請求的數量較少上面模式可行,當請求的數量非常多,例如有十幾二十個,或是有上百個時,我們不可能使用幾十個 case 分支,更麻煩在於很多時候我們並不能提前得知確切的異步請求數量,那麼當我們希望在實現的目的爲:在任意個異步請求發起的情況下,只要其中有一個完成,那麼就獲取結果,要靈活的實現這個目標該怎麼辦?這裏就可以使用一個設計頗爲巧妙的 or 模式。

它的基本思路爲,假設我們現在要發起一百個異步請求,顯然我們不能在 select 下面分別羅列 100 個 case, 萬一到時候需要的是 150 個怎辦?但我們可以通過遞歸的方式實現 case 的自動化羅列,於是我可以在函數中先考慮只有 3 個 case 的情況,假設現在來了 6 個異步請求,那麼我就可以通過函數先處理前 3 個請求,然後再遞歸的去處理接下來的 3 個請求,也就是通過遞歸的方式來實現任意個 case 的羅列,我們看看具體例子:

package main 

import (
    "fmt"
    "time"
)

func main() {
    var or func(channels ...<-chan interface{}) <-chan interface{}
    /*
    or 函數將多個channel合併在一起,只要其中有一個channel有數據那麼就可以返回
    */
    or = func(channels ...<-chan interface{}) <-chan interface{} {
        //如果只有0個或1個channel,那麼直接返回
        switch len(channels) {
        case 0:
            return nil 
        case 1:
            return channels[0]
        }

        /*
        如果有3個以上的channel,那麼下面先針對前3個channel進行select case,然後遞歸的對
        餘下的channel進行select case
        */
        orDone := make(chan interface{})
        go func() {
            defer close(orDone)
            switch len(channels) {
            case 2:
                select {
                case <-channels[0]:
                case <-channels[1]:
                }
            default:
                select {
                case <-channels[0]:
                case <-channels[1]:
                case <-channels[2]:
                //遞歸對3個之後的channel進行select case
                case <-or(append(channels[3:], orDone)...):
                }
            }
        }()
        return orDone 
    }

    sig := func(after time.Duration, done <-chan interface{}) <-chan interface{} {
        c := make(chan interface{})
        go func() {
            defer close(c)
            select {
            case <-done:
            case <-time.After(after):
            }
        }()
        return c 
    }

    start := time.Now()
    done := make(chan interface{})
    <-or(sig(2*time.Hour, done), sig(5*time.Minute, done), sig(1*time.Second, done), 
         sig(3 * time.Hour, done), sig(1 * time.Minute, done))
    close(done)
    fmt.Printf("done after %v", time.Since(start))
}

我們看看上面代碼的基本邏輯,假設我們有 6 個異步請求返回的 channel 需要等待,那麼把 6 個 channel 放入 or 函數後,前 3 個會在 default 下面的 select 被監控,然後代碼創建一個 orDone channel,然後將它和餘下的 3 個 channel 再遞歸的傳入 or 函數。假設第 4 個 channel 最先獲得數據,那麼遞歸調用的 or 函數就會在 default 部分的 select case 的阻塞中返回,於是它創建的 orDone 就會因爲”defer close(orDone)” 的執行而關閉。

於是在第一次遞歸時對應的語句”<-or(append(channel[3:], orDone)” 就會返回,於是就會觸發第一次調用 or 函數是所創建的 orDone 被關閉,於是最外層調用 or 函數時就能返回,如果不是後 3 個 channel 有數據,而是前 3 個 channel 有數據返回,那麼遞歸前的 orDone 就會被關閉,這就導致遞歸調用 or 函數結束阻塞,於是算法就不用再等待後 3 個 channel 有數據返回,由此看來這個模式的設計還是非常巧妙的。

在示例代碼中,我們啓動了多個時鐘,然後 sig 函數會等待時鐘結束,然後把這些時鐘對應的 channel 都放入 or 函數,裏面最短的時鐘是 1 秒,因此 or 函數會在等待 1 秒後直接返回,由此可見 or 模式在高併發場景下還是非常適用的。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/3l9vHTMKK387i7oHUYBT8A