Go: time-After導致內存泄露

time.After 導致內存泄露

time.After(time.Duration) 的功能是當持續的時間結束後,會將當前的時間發送到返回的通道中。在某個時間到後執行某個動作可以用 time.After 來實現,它使用起來非常方便,在併發程序中用的比較多。如果我們只是想讓程序睡眠一段時間,可以使用 time.Sleep(time.Duration). time.After 主要用在 “如果在 5 秒內沒有從通道收到消息,那麼將做..." 這樣的場景中。然而經常會看到在循環中調用 time.After 的代碼,非常糟糕,這可能會導致內存泄露。

下面來看一個具體的例子,此函數完成的功能是不斷地從通道中讀取數據並進行處理,如果長達 1 個小時都沒有從通道中接收到任何消息,希望記錄一條警告日誌。實現代碼如下:

func consumer(ch <-chan Event) {
        for {
                select {
                case event := <-ch:
                        handle(event)
                case <-time.After(time.Hour):
                        log.Println("warning: no messages received")
                }
        }
}

上面循環體中 select 語句被執行有兩種情況:1. 從通道 ch 獲取到了消息 2. 已經有 1 個小時沒有從 ch 獲取到消息。因爲每次循環執行 select 時都會對 time.After 進行求值計算,也就是每次都會重置超時。這段代碼有什麼問題嗎?咋一看,沒有發現問題,實際上這段代碼可能存在內存泄露。

time.After 會返回一個通道,函數簽名如下,可以看到返回的是一個 Time 類型的通道。我們期望的效果是這個通道在每次循環後都被關閉,然而實際情況可能並不是這樣。例如這種情況,在通道 ch 每次都有消息的時候,在 1 個小時內會一直走case event := <-ch分支,但是每次運行 select 時也會對 time.After(time.Hour) 執行求值,每次申請的通道資源在超時(持續時間結束後)纔會關閉釋放,佔用的內存會在一小時內一直累積。在 Go1.15 中,每次調用 time.After 大約需要 200 字節的內存,假如每小時收到 500 萬條消息,那會消耗 200Byte*5000000=1G 的內存空間。

func After(d Duration) <-chan Time {
 return NewTimer(d).C
}

如何修復這個問題呢? 這還不簡單麼,我們在每次循環結束將通道關閉不就可以了嗎?這是不可能能的,因爲返回的是一個只能接收值的通道。函數簽名如上,這裏返回的是<-chan Time不是chan Time,只接收通道不能執行close(ch)操作, 編譯是通不過的,會報下面的錯誤。

invalid operation: close(time.After(time.Second)) (cannot close receive-only channel)

有多種方法修復上面代碼存在的問題。第一種不使用 time.After,採用上下文 Context 包中的 ctx.Done(),代碼如下. 這種方法的缺點是必須在每次循環迭代期間不斷重新創建上下文,Context.WithTimeout 放在 for 內。創建上下文在 Go 語言中不是一個輕量級操作。有其他更好的解決方法嗎?

func consumer(ch <-chan Event) {
        for {
                ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
                select {
                case event := <-ch:
                        cancel()
                        handle(event)
                case <-ctx.Done():
                        log.Println("warning: no messages received")
                }
        }
}

第二種方法是使用 time 包中的time.NewTimer, 該函數會返回一個time.Timer結構對象,該結構有下面的可導出方法和字段:

「NOTE: 雖然 time.After 實現也是依賴於 time.Timer,但是 time.After 可導出的只有字段 C, 所以不能調用 Reset 方法。」

package time

func After(d Duration) <-chan Time {
        return NewTimer(d).C
}

下面是使用time.NewTimer實現版本,代碼如下:

func consumer(ch <-chan Event) {
        timerDuration := 1 * time.Hour
        timer := time.NewTimer(timerDuration)

        for {
                timer.Reset(timerDuration)
                select {
                case event := <-ch:
                        handle(event)
                case <-timer.C:
                        log.Println("warning: no messages received")
                }
        }
}

在上面的程序中,每次循環剛開始時調用timer.Reset進行重置操作。調用 Reset 操作比每次都創建一個新的上下文更簡單,更快並且對 GC 產生的壓力更小,因爲它不需要任何新的堆分配。相比第一種方法,此方法更好。因此,使用 time.Timer 是解決本文開始提到問題的最佳解決方案。

「NOTE: 爲了簡單,前面的示例代碼中的 goroutine 沒有進行停止處理,在 Go 語言中常見 100 問題 -#62 Starting a goroutine without knowing when to .. 有提到,在不知道什麼時候停止的情況下啓動 goroutine 不是最佳實踐。在生產級別的代碼中,應該有退出條件,例如在上下文取消的時候。在 goroutine 退出的時候,記得通過使用 defer timer.Stop() 停止創建的 time.Timer.」

在循環中使用 time.After 並不是唯一可能導致內存泄露的原因,本質原因與重複調用的代碼有關。循環只是其中一種情況,在 HTTP 處理函數中使用 time.After 也會導致相同的問題,因爲該處理函數將被多次調用。

總結,在使用 time.After 時應該謹慎小心,記住創建的資源只有在定時器到期時纔會被釋放。當 time.After 被重複調用時,例如在循環中(本文中的例子)、Kafka 消費處理函數和 HTTP 處理程序中等,可能會導致內存在一段時間持續上漲,甚至會出現 OOM,這種情況下,我們應該使用 time.NewTimer 取代 time.After.

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/5U4dlv05dqpSEu9g1jvryA