Go 併發最佳實踐

1、使用 goroutines 管理服務狀態

通常啓動一個 http 服務器或其他需要長期在後臺運行的任務時,可以使用 chan 或帶 chan 字段的結構體,實現 goroutines 的同步:

type Server strung { quit chan bool }
func NewServer() *Server{
    s := &Server{make(chan bool)}
    go s.run()
    return s
func (s *Server) run() {
    for {
        select {
            case: <- s.quit:
                fmt.Println("finishing task")
                time.Sleep(time.Secod)
                fmt.Println("task done")
                s.quit <- true
                return
             case: <- time.After(time.Secod)
                 fmt.Println("running task")
        }
    }
}

上面的代碼在創建 Server 對象後,調用 run 方法來不停地執行任務。如果 quit 通道收到消息,就會結束未完成的任務並向 quit 中發送信號,通知服務器停止。

func (s *Server) Stop() {
    fmt.Println("server stoppint")
    <- s.quit
    fmt.Println("server stopped")
}

Server 的 Stop 方法在接收到 quit 通道的信號後,會做一些停止服務的操作,例如關閉數據庫連接,停止 API 請求等操作。

func main() {
    s := NewServer()
    time.Sleep(2 * time.Second)

    s.Stop  //會阻塞等待quit通知
}

2、使用 buffer channel 避免 goroutine 泄漏

首先我們來開發一個應用程序實現廣播功能:

func sendMsg(msg, addr string) error {

    conn, err := net.Dial("tcp", addr)   
    if err != nil {
        return err
    }
    defer conn.Close()
    _, err = fmt.Fprint(conn, msg)
   return err
}

上面的函數很簡單,先建立 tcp 連接,然後將要發送的消息通過 fmt.Fprint 函數發送。

func broadcastMsg(msg string, addrs []string) error {

    err := make(chan error)
    for _, add := range addrs {
        go func (adds string) {
            err <- sendMsg(msg, addr)
            fmt.Println("done")
        }(adds)

    for _ = range addrs {
        if err := <- err; err != nil {
            return err
        }

    }
    return nil
}

上面的函數通過兩個循環實現,第一個 for 循環向每個服務發送消息;第二個循環檢查消息發送是否有失敗的。

func main() {
    addr := []string{"localhost:8080, "http://google.com"}
    err := broadcastMsg("hi", addr)

    time.Sleep(time.Second)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("消息發送成功")
}

上面的 broadcasgMsg 函數代碼存在幾個問題:

使用帶緩衝帶 channel

func broadcasMsg(msg string, adds []string) error {

    errc := make(chan error, len(addrs))
    for _, addr := range addrs {
        go func(addr string) {
            errc <- sendMsg(msg, addr)
            fmt.Println("done")
        }(addr)

    }
    for _ = range addrs {
        if err := <-errc; err != nil {
            return err
        }

    }
    return nil
}

這裏使用帶緩衝帶的通道可以保證每個 goroutine 都能向通道里面寫入數據並結束 goroutine,但是還存在一個問題就是如果提前不知道 channel 的容量該怎麼處理?

使用 quit 通道

func broadcastMsg(msg string, addrs []string) error {
    errc := make(chan error)
    quit := make(chan struct)    
    defer close(quit)    

    for _, addr := range addrs {
        go func(addr string) {
            select {
                case errc <- sendMsg(msg, addr):
                    fmt.Println("done")
                case <-  quit:
                    fmt.Println("quit")
            }
        }(addr)

    }
    for _ = range addrs {
        if err := <- errc; err != nil {
            return err
        }

    }
    return nil
}

這裏使用 quit 通道來保證所有的 goroutine 能正常退出,這樣就不會存在 goroutine 泄漏。不管發送消息是否成功還是失敗,當 broadcastMsg 函數結束時會調用 defer close(quit),保證所有 goroutine 的退出。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/66vI_Zq9Oeb_2IJOga7_qg