Go:實現心跳(Heartbeat)

對於長時間運行的網絡連接,在應用程序級別上可能會經歷較長的空閒時間,明智的做法是在節點之間實現心跳,以提前截止日期。這允許您快速識別網絡問題並迅速重新建立連接,而不是在應用程序要傳輸數據時纔等待檢測網絡錯誤。通過這種方式,您可以確保應用程序在需要時始終有良好的網絡連接。

爲了達到這個目的,一個心跳消息需要被髮送到遠端服務並等待回覆,我們可以根據回覆情況提前終止連接。節點將以一定間隔時間來發送消息,類似心跳。這種方法不僅可以在各種操作系統上移植,而且還可以確保使用網絡連接的應用程序立刻響應,因爲應用程序實現了心跳。

要實現心跳功能,需要一個 goroutine 定期到發送 ping 消息。如果最近收到遠程服務的回覆,就不需要發送不必要的 ping 消息,因此需要可以重置 ping 計時器功能。如下代碼所示:

func Pinger(ctx context.Context, w io.Writer, reset <-chan time.Duration) {
    var interval time.Duration
    select {
    case <-ctx.Done():
        return
    case interval = <-reset: //讀取更新的心跳間隔時間
    default:
    }
    if interval < 0 {
        interval = defaultPingInterval
    }
    timer := time.NewTimer(interval)
    defer func() {
        if !timer.Stop() {
            <-timer.C
        }
    }()
    for {
        select {
        case <-ctx.Done():
            return
        case newInterval := <-reset:
            if !timer.Stop() {
                <-timer.C
            }
            if newInterval > 0 {
                interval = newInterval
            }
        case <-timer.C:
            if _, err := w.Write([]byte("ping")); err != nil {
                //在此跟蹤並執行連續超時
                return
            }
        }
        _ = timer.Reset(interval) //重製心跳上報時間間隔
    }
}

在心跳例子中使用 ping 和 pong 消息格式,即當客戶端定期向服務端發送一個 ping 消息,服務端給客戶端發送一個 pong 消息。這個消息內容可以自定義沒有規定的,這裏也是使用慣例。

下面對前面代碼進行解釋:
在上面的 Pinger 函數中,定期向 io.writer 對象寫入 ping 消息。因爲 Pinger 函數需要運行在一個單獨的 goroutine 中,所以需要接收一個 context 作爲第一個參數,這樣就可以通過 context 終止 goroutine 防止泄漏。剩餘的參數包括一個 io.writer 接口和一個 channel 用於動態接收間隔時間以重置計時器。需要創建一個帶 buffer 的 channel 將一個間隔時間傳入作爲計時器初始值。如果 interval 比 0 小,就使用默認間隔時間。

然後根據 interval 初始化計時器,並使用 defer 來清空計時器 channel 避免泄露。for 循環包含一個 select 聲明,將阻塞直到三個 case 中的一個匹配:context 被取消,reset 通道收到重置計時器消息或計時器過期。如果 context 被取消,函數會退出,不會再發送 ping 消息。如果 reset 通道有數據,也不需要發送 ping 並重置計時器。

如果計時器過期,會寫入 ping 消息到 writer,並在下一個 for 循環之前重置計時器。如果需要,你也可以在這個 case 裏面跟蹤寫入超時的發生。要實現這個功能,你可以將上下文的 cancel 函數傳入,並在這裏調用如果發送超時。

下面代碼說明了如何使用 Pinger 函數,可以按預期的間隔從 reader 讀取 ping 消息,並以不同的間隔重置 ping 計時器。

func ExamplePinger() {
    ctx, cancelFunc := context.WithCancel(context.Background())
    r, w := io.Pipe() //代替網絡連接net.Conn
    done := make(chan struct{})
    resetTimer := make(chan time.Duration, 1)
    resetTimer <- time.Second //ping間隔初始值

    go func() {
        Pinger(ctx, w, resetTimer)
        close(done)
    }()
    receivePing := func(d time.Duration, r io.Reader) {
        if d >= 0 {
            fmt.Printf("resetting time (%s)\n", d)
            resetTimer <- d
        }

        now := time.Now()
        buf := make([]byte, 1024)
        n, err := r.Read(buf)
        if err != nil {
            fmt.Println(err)
        }
        fmt.Printf("received %q (%s)\n", buf[:n], time.Since(now).Round(100*time.Millisecond))
    }
    for i, v := range []int64{0, 200, 300, 0, -1, -1, -1} {
        fmt.Printf("Run %d\n", i+1)
        receivePing(time.Duration(v)*time.Millisecond, r)
    }
    cancelFunc() //取消context使pinger退出
    <-done
}

輸出結果:

Run 1
resetting time (0s)
received "ping" (1s)
Run 2
resetting time (200ms)
received "ping" (200ms)
Run 3
resetting time (300ms)
received "ping" (300ms)
Run 4
resetting time (0s)
received "ping" (300ms)
Run 5
received "ping" (300ms)
Run 6
received "ping" (300ms)
Run 7
received "ping" (300ms)

在這個例子中,創建一個帶緩存的 channel 用於 Pinger 函數中計時器的初始化。在將該通道傳遞給 Pinger 函數之前,在 resetTimer 通道上設置一個 1 秒的初始 ping 間隔。您將使用這個時間初始化 Pinger 的計時器,並指示何時將 ping 消息寫入 writer 接口。

在循環 2 中運行一系列毫秒的間隔時間,將每個間隔時間傳遞給 receivePing 函數。這個函數根據給定的間隔時間重置 ping 計時器,並通過 reader 等待接收 ping 消息。最後將接收到的 ping 消息打印到標準輸出。

在 for 循環第一個迭代時,傳入的參數 0,這表示告訴 Pinger 使用之前的間隔時間也就是 1s 來重置計時器。和預期的一樣,在 1s 後 reader 接收到 ping 消息。第二次迭代將 ping 的計時器重置爲 200ms。一旦過期,reader 就收到 ping 消息了。第三次重置 ping 計時器爲 300ms,並且 ping 消息在 300ms 接收到。

在迭代 4 時傳入的是 0,將使用之前的間隔時間 300ms。可以發現使用間隔時間 0,即意味着使用之前的計時器間隔時間,這非常實用,因爲我們不需要跟蹤初始的定時器間隔時了。迭代 5 到 7 僅僅等待接收 ping 消息,不重置 ping 計時器。如預期一樣,reader 每隔 300ms 接收到 ping 消息。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/3Q6P8ikf5gePbeUGwH4PBQ