Go 實現斷路器

斷路器是一種模式,可以在進行遠程調用時保護接口不發生重複失敗、掛起或超時。

const (
    StateClosed = iota
    StateHalfOpen
    StateOpen
)

斷路器使用 fails 和 stop 通道來同步。Threshold 字段是每秒錯誤個數。timeout 字段單位是秒。

type CircuitBreaker struct {
    Threshold    int
    Timeout      int
    state        int
    fails        chan error
    failureCount int
    openDuration int
    ticker       *time.Ticker
    stop         chan struct{}
}

NewCircuitBreaker 創建並返回一個新的斷路器實例。需要設置閾值和超時參數。

func NewCircuitBreaker(threshold, timeout int) *CircuitBreaker {
    return &CircuitBreaker{
        Threshold: threshold,
        Timeout:   timeout,
    }
}

Start 方法啓動斷路器:

func (cb *CircuitBreaker) Start() {
    cb.fails = make(chan error)
    cb.ticker = time.NewTicker(time.Second)
    cb.stop = make(chan struct{})

    go func() {
        defer close(cb.stop)
        for {
            select {
            case err := <-cb.fails:
                //忽略open狀態下的錯誤
                if cb.state == StateOpen {
                    continue
                }

                // 在半開狀態下,如果沒有錯誤時斷路器轉到closed狀態
                if cb.state == StateHalfOpen {
                    if err == nil {
                        cb.state = StateClosed
                    }
                    continue
                }

                // 在關閉狀態下對零錯誤不做任何事情
                if err == nil {
                    continue
                }

                // 在關閉狀態下發送錯誤就增加失敗計數。
                cb.failureCount++
            case <-cb.ticker.C:
                // 半開狀態下計時器超時不做任何事情
                if cb.state == StateHalfOpen {
                    continue
                }

                // 在open狀態下增加開路持續時間,並在每次計時器超時將斷路器跳閘到半開路狀態,嘗試恢復
                if cb.state == StateOpen {
                    cb.openDuration++

                    if cb.openDuration == cb.Timeout {
                        cb.state = StateHalfOpen
                    }

                    continue
                }

                // 如果失敗計數達到閾值,斷路器跳閘進入open狀態,並在每次計時器超時在閉合狀態重置開路持續時間
                if cb.failureCount >= cb.Threshold {
                    cb.state = StateOpen
                    cb.openDuration = 0
                }

                // 在關閉狀態下計時器超時就重置統計失敗次數
                cb.failureCount = 0
            case <-cb.stop:
                return
            }
        }
    }()
}

Stop 方法:停止斷路器

func (cb *CircuitBreaker) Stop() {
    cb.ticker.Stop()

    cb.stop <- struct{}{}
    <-cb.stop

    cb.state = StateClosed
    cb.failureCount = 0

    close(cb.fails)
}

Fail 方法,通知斷路器調用接口失敗:

func (cb *CircuitBreaker) Fail(err error) {
    cb.fails <- err
}

State 方法: 返回斷路器狀態。

func (cb *CircuitBreaker) State() int {
    return cb.state
}

如果您運行以下代碼並在一秒鐘內執行三個或更多請求,斷路器將跳至打開狀態並返回 404。

package main

import (
    "errors"
    "net/http"

    circuitbreaker "github.com/ermanimer/design-patterns/circuit-breaker"
)

func main() {
    // 創建一個新的斷路器實例
    cb := circuitbreaker.NewCircuitBreaker(3, 2)

    //啓動斷路器
    cb.Start()
    defer cb.Stop()

    // 定義一個http處理程序
    http.HandleFunc("/sample-endpoint", func(rw http.ResponseWriter, r *http.Request) {
        // 如果斷路器不在閉合狀態則返回錯誤
        if cb.State() != circuitbreaker.StateClosed {
            rw.WriteHeader(http.StatusNotFound)
            return
        }

        // 模擬接口調用失敗,通知斷路器
        err := sampleRemoteCall()
        cb.Fail(err)

        // 返回錯誤響應
        rw.WriteHeader(http.StatusInternalServerError)
        rw.Write([]byte(err.Error()))
    })

    //  啓動http服務器
    http.ListenAndServe(":8000", nil)
}

func sampleRemoteCall() error {
    return errors.New("sample error")
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/v0bNQafvXyYu6qt_q3WNsg