Sentinel 萬字教程

限流作爲現在微服務中常見的穩定性措施,在面試中肯定也是經常會被問到的,我在面試的時候也經常喜歡問一下你對限流算法知道哪一些?有看過源碼嗎?實現原理是什麼?

第一部分先講講限流算法,最後再講講源碼的實現原理。

限流算法

關於限流的算法大體上可以分爲四類:固定窗口計數器、滑動窗口計數器、漏桶 (也有稱漏斗,英文 Leaky bucket)、令牌桶 (英文 Token bucket)。

固定窗口

固定窗口,相比其他的限流算法,這應該是最簡單的一種。

它簡單地對一個固定的時間窗口內的請求數量進行計數,如果超過請求數量的閾值,將被直接丟棄。

這個簡單的限流算法優缺點都很明顯。優點的話就是簡單,缺點舉個例子來說。

比如我們下圖中的黃色區域就是固定時間窗口,默認時間範圍是 60s,限流數量是 100。

如圖中括號內所示,前面一段時間都沒有流量,剛好後面 30 秒內來了 100 個請求,此時因爲沒有超過限流閾值,所以請求全部通過,然後下一個窗口的 20 秒內同樣通過了 100 個請求。

所以變相的相當於在這個括號的 40 秒的時間內就通過了 200 個請求,超過了我們限流的閾值。

限流

滑動窗口

爲了優化這個問題,於是有了滑動窗口算法,顧名思義,滑動窗口就是時間窗口在隨着時間推移不停地移動。

滑動窗口把一個固定時間窗口再繼續拆分成 N 個小窗口,然後對每個小窗口分別進行計數,所有小窗口請求之和不能超過我們設定的限流閾值。

以下圖舉例子來說,假設我們的窗口拆分成了 3 個小窗口,小窗口都是 20s,同樣基於上面的例子,當在第三個 20s 的時候來了 100 個請求,可以通過。

然後時間窗口滑動,下一個 20s 請求又來了 100 個請求,此時我們滑動窗口的 60s 範圍內請求數量肯定就超過 100 了啊,所以請求被拒絕。

漏桶 Leaky bucket

漏桶算法,人如其名,他就是一個漏的桶,不管請求的數量有多少,最終都會以固定的出口流量大小勻速流出,如果請求的流量超過漏桶大小,那麼超出的流量將會被丟棄。

也就是說流量流入的速度是不定的,但是流出的速度是恆定的。

這個和 MQ 削峯填谷的思想比較類似,在面對突然激增的流量的時候,通過漏桶算法可以做到勻速排隊,固定速度限流。

漏桶算法的優勢是勻速,勻速是優點也是缺點,很多人說漏桶不能處理突增流量,這個說法並不準確。

漏桶本來就應該是爲了處理間歇性的突增流量,流量一下起來了,然後系統處理不過來,可以在空閒的時候去處理,防止了突增流量導致系統崩潰,保護了系統的穩定性。

但是,換一個思路來想,其實這些突增的流量對於系統來說完全沒有壓力,你還在慢慢地勻速排隊,其實是對系統性能的浪費。

所以,對於這種有場景來說,令牌桶算法比漏桶就更有優勢。

令牌桶 token bucket

令牌桶算法是指系統以一定地速度往令牌桶裏丟令牌,當一個請求過來的時候,會去令牌桶裏申請一個令牌,如果能夠獲取到令牌,那麼請求就可以正常進行,反之被丟棄。

現在的令牌桶算法,像 Guava 和 Sentinel 的實現都有冷啓動 / 預熱的方式,爲了避免在流量激增的同時把系統打掛,令牌桶算法會在最開始一段時間內冷啓動,隨着流量的增加,系統會根據流量大小動態地調整生成令牌的速度,最終直到請求達到系統的閾值。

源碼舉例

我們以 sentinel 舉例,sentinel 中統計用到了滑動窗口算法,然後也有用到漏桶、令牌桶算法。

滑動窗口

sentinel中就使用到了滑動窗口算法來進行統計,不過他的實現和我上面畫的圖有點不一樣,實際上 sentinel 中的滑動窗口用一個圓形來描述更合理一點。

前期就是創建節點,然後 slot 串起來就是一個責任鏈模式,StatisticSlot 通過滑動窗口來統計數據,FlowSlot 是真正限流的邏輯,還有一些降級、系統保護的措施,最終形成了整個 sentinel 的限流方式。

就看看官方圖吧,這圓形畫起來好惡心

滑動窗口的實現主要可以看LeapArray的代碼,默認的話定義了時間窗口的相關參數。

對於 sentinel 來說其實窗口分爲秒和分鐘兩個級別,秒的話窗口數量是 2,分鐘則是 60 個窗口,每個窗口的時間長度是 1s,總的時間週期就是 60s,分成 60 個窗口,這裏我們就以分鐘級別的統計來說。

public abstract class LeapArray<T> {
    //窗口時間長度,毫秒數,默認1000ms
    protected int windowLengthInMs;
    //窗口數量,默認60
    protected int sampleCount;
    //毫秒時間週期,默認60*1000
    protected int intervalInMs;
    //秒級時間週期,默認60
    private double intervalInSecond;
    //時間窗口數組
    protected final AtomicReferenceArray<WindowWrap<T>> array;

然後我們要看的就是它是怎麼計算出當前窗口的,其實源碼裏寫的聽清楚的,但是如果你按照之前想象把他當做一條直線延伸去想的話估計不太好理解。

首先計算數組索引下標和時間窗口時間這個都比較簡單,難點應該大部分在於第三點窗口大於 old 這個是什麼鬼,詳細說下這幾種情況。

  1. 數組中的時間窗口是是空的,這個說明時間走到了我們初始化的時間之後了,此時 new 一個新的窗口通過 CAS 的方式去更新,然後返回這個新的窗口就好了。

  2. 第二種情況是剛好時間窗口的時間相等,那麼直接返回,沒啥好說的

  3. 第三種情況就是比較難以理解的,可以參看兩條時間線的圖,就比較好理解了,第一次時間窗口走完了達到 1200,然後圓形時間窗口開始循環,新的時間起始位置還是 1200,然後時間窗口的時間來到 1676,B2 的位置如果還是老的窗口那麼就是 600,所以我們要重置之前的時間窗口的時間爲當前的時間。

  4. 最後一種一般情況不太可能發生,除非時鐘回撥這樣子

從這個我們可以發現就是針對每個WindowWrap時間窗口都進行了統計,最後實際上在後面的幾個地方都會用到時間窗口統計的 QPS 結果,這裏就不再贅述了,知道即可。

private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    long timeId = timeMillis / windowLengthInMs;
    // Calculate current index so we can map the timestamp to the leap array.
    return (int) (timeId % array.length());
}

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
}

public WindowWrap<T> currentWindow(long timeMillis) {
    //當前時間如果小於0,返回空
    if (timeMillis < 0) {
        return null;
    }
    //計算時間窗口的索引
    int idx = calculateTimeIdx(timeMillis);
    // 計算當前時間窗口的開始時間
    long windowStart = calculateWindowStart(timeMillis);

    while (true) {
        //在窗口數組中獲得窗口
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            /*
             *     B0       B1      B2    NULL      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             * 比如當前時間是888,根據計算得到的數組窗口位置是個空,所以直接創建一個新窗口就好了
             */
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            /*
             *     B0       B1      B2     B3      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             * 這個更好了,剛好等於,直接返回就行
             */
            return old;
        } else if (windowStart > old.windowStart()) {
            /*
             *     B0       B1      B2     B3      B4
             * |_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *             B0       B1      B2    NULL      B4
             * |_______||_______|_______|_______|_______|_______||___
             * ...    1200     1400    1600    1800    2000    2200  timestamp
             *                              ^
             *                           time=1676
             * 這個要當成圓形理解就好了,之前如果是1200一個完整的圓形,然後繼續從1200開始,如果現在時間是1676,落在在B2的位置,
             * 窗口開始時間是1600,獲取到的old時間其實會是600,所以肯定是過期了,直接重置窗口就可以了
             */
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // 這個不太可能出現,嗯。。時鐘回撥
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

漏桶

sentinel 主要根據FlowSlot中的流控進行流量控制,其中RateLimiterController就是漏桶算法的實現,這個實現相比其他幾個還是簡單多了,稍微看一下應該就明白了。

  1. 首先計算出當前請求平攤到 1s 內的時間花費,然後去計算這一次請求預計時間

  2. 如果小於當前時間的話,那麼以當前時間爲主,返回即可

  3. 反之如果超過當前時間的話,這時候就要進行排隊等待了,等待的時候要判斷是否超過當前最大的等待時間,超過就直接丟棄

  4. 沒有超過就更新上一次的通過時間,然後再比較一次是否超時,還超時就重置時間,反之在等待時間範圍之內的話就等待,如果都不是那就可以通過了

public class RateLimiterController implements TrafficShapingController {
  //最大等待超時時間,默認500ms
  private final int maxQueueingTimeMs;
  //限流數量
  private final double count;
  //上一次的通過時間
  private final AtomicLong latestPassedTime = new AtomicLong(-1);

  @Override
  public boolean canPass(Node node, int acquireCount, boolean prioritized) {
      // Pass when acquire count is less or equal than 0.
      if (acquireCount <= 0) {
          return true;
      }
      // Reject when count is less or equal than 0.
      // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
      if (count <= 0) {
          return false;
      }

      long currentTime = TimeUtil.currentTimeMillis();
      //時間平攤到1s內的花費
      long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 1 / 100 * 1000 = 10ms

      //計算這一次請求預計的時間
      long expectedTime = costTime + latestPassedTime.get();

      //花費時間小於當前時間,pass,最後通過時間 = 當前時間
      if (expectedTime <= currentTime) {
          latestPassedTime.set(currentTime);
          return true;
      } else {
          //預計通過的時間超過當前時間,要進行排隊等待,重新獲取一下,避免出現問題,差額就是需要等待的時間
          long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
          //等待時間超過最大等待時間,丟棄
          if (waitTime > maxQueueingTimeMs) {
              return false;
          } else {
              //反之,可以更新最後一次通過時間了
              long oldTime = latestPassedTime.addAndGet(costTime);
              try {
                  waitTime = oldTime - TimeUtil.currentTimeMillis();
                  //更新後再判斷,還是超過最大超時時間,那麼就丟棄,時間重置
                  if (waitTime > maxQueueingTimeMs) {
                      latestPassedTime.addAndGet(-costTime);
                      return false;
                  }
                  //在時間範圍之內的話,就等待
                  if (waitTime > 0) {
                      Thread.sleep(waitTime);
                  }
                  return true;
              } catch (InterruptedException e) {
              }
          }
      }
      return false;
  }

}

令牌桶

最後是令牌桶,這個不在於實現的複製,而是你看源碼會發現都算的些啥玩意兒。。。sentinel 的令牌桶實現基於 Guava,代碼在WarmUpController中。

這個算法那些各種計算邏輯其實我們可以不管(因爲我也沒看懂。。),但是流程上我們是清晰的就可以了。

幾個核心的參數看註釋,構造方法裏那些計算邏輯暫時不管他是怎麼算的(我也沒整明白,但是不影響我們理解),關鍵看canPass是怎麼做的。

  1. 拿到當前窗口和上一個窗口的 QPS

  2. 填充令牌,也就是往桶裏丟令牌,然後我們先看填充令牌的邏輯

public class WarmUpController implements TrafficShapingController {
    //限流QPS
    protected double count;
    //冷啓動係數,默認=3
    private int coldFactor;
    //警戒的令牌數
    protected int warningToken = 0;
    //最大令牌數
    private int maxToken;
    //斜率,產生令牌的速度
    protected double slope;

    //存儲的令牌數量
    protected AtomicLong storedTokens = new AtomicLong(0);
    //最後一次填充令牌時間
    protected AtomicLong lastFilledTime = new AtomicLong(0);

    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }

    public WarmUpController(double count, int warmUpPeriodInSec) {
        construct(count, warmUpPeriodInSec, 3);
    }

    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }
        this.count = count;
        this.coldFactor = coldFactor;

        //stableInterval 穩定產生令牌的時間週期,1/QPS
        //warmUpPeriodInSec 預熱/冷啓動時間 ,默認 10s
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
    //斜率的計算參考Guava,當做一個固定改的公式
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //當前時間窗口通過的QPS
        long passQps = (long) node.passQps();
        //上一個時間窗口QPS
        long previousQps = (long) node.previousPassQps();
        //填充令牌
        syncToken(previousQps);

        // 開始計算它的斜率
        // 如果進入了警戒線,開始調整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            //當前的令牌超過警戒線,獲得超過警戒線的令牌數
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }
}

填充令牌的邏輯如下:

  1. 拿到當前的時間,然後去掉毫秒數,得到的就是秒級時間

  2. 判斷時間小於這裏就是爲了控制每秒丟一次令牌

  3. 然後就是coolDownTokens去計算我們的冷啓動 / 預熱是怎麼計算填充令牌的

  4. 後面計算當前剩下的令牌數這個就不說了,減去上一次消耗的就是桶裏剩下的令牌

protected void syncToken(long passQps) {
  long currentTime = TimeUtil.currentTimeMillis();
  //去掉當前時間的毫秒
  currentTime = currentTime - currentTime % 1000;
  long oldLastFillTime = lastFilledTime.get();
  //控制每秒填充一次令牌
  if (currentTime <= oldLastFillTime) {
    return;
  }
  //當前的令牌數量
  long oldValue = storedTokens.get();
  //獲取新的令牌數量,包含添加令牌的邏輯,這就是預熱的邏輯
  long newValue = coolDownTokens(currentTime, passQps);
  if (storedTokens.compareAndSet(oldValue, newValue)) {
    //存儲的令牌數量當然要減去上一次消耗的令牌
    long currentValue = storedTokens.addAndGet(0 - passQps);
    if (currentValue < 0) {
      storedTokens.set(0L);
    }
    lastFilledTime.set(currentTime);
  }

}
  1. 最開始的事實因爲lastFilledTimeoldValue都是 0,所以根據當前時間戳會得到一個非常大的數字,最後和maxToken取小的話就得到了最大的令牌數,所以第一次初始化的時候就會生成maxToken的令牌

  2. 之後我們假設系統的 QPS 一開始很低,然後突然飆高。所以開始的時候回一直走到高於警戒線的邏輯裏去,然後passQps又很低,所以會一直處於把令牌桶填滿的狀態(currentTime - lastFilledTime.get()會一直都是 1000,也就是 1 秒),所以每次都會填充最大 QPScount數量的令牌

  3. 然後突增流量來了,QPS 瞬間很高,慢慢地令牌數量就會消耗到警戒線之下,走到我們if的邏輯裏去,然後去按照count數量增加令牌

private long coolDownTokens(long currentTime, long passQps) {
  long oldValue = storedTokens.get();
  long newValue = oldValue;

  //水位低於警戒線,就生成令牌
  if (oldValue < warningToken) {
    //如果桶中令牌低於警戒線,根據上一次的時間差,得到新的令牌數,因爲去掉了毫秒,1秒生成的令牌就是閾值count
    //第一次都是0的話,會生成count數量的令牌
    newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
  } else if (oldValue > warningToken) {
    //反之,如果是高於警戒線,要判斷QPS。因爲QPS越高,生成令牌就要越慢,QPS低的話生成令牌要越快
    if (passQps < (int)count / coldFactor) {
      newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
    }
  }
  //不要超過最大令牌數
  return Math.min(newValue, maxToken);
}

上面的邏輯理順之後,我們就可以繼續看限流的部分邏輯:

  1. 令牌計算的邏輯完成,然後判斷是不是超過警戒線,按照上面的說法,低 QPS 的狀態肯定是一直超過的,所以會根據斜率來計算出一個warningQps,因爲我們處於冷啓動的狀態,所以這個階段就是要根據斜率來計算出一個 QPS 數量,讓流量慢慢地達到系統能承受的峯值。舉個例子,如果count是 100,那麼在 QPS 很低的情況下,令牌桶一直處於滿狀態,但是系統會控制 QPS,實際通過的 QPS 就是warningQps,根據算法可能只有 10 或者 20(怎麼算的不影響理解)。QPS 主鍵提高的時候,aboveToken再逐漸變小,整個warningQps就在逐漸變大,直到走到警戒線之下,到了else邏輯裏。

  2. 流量突增的情況,就是else邏輯裏低於警戒線的情況,我們令牌桶在不停地根據count去增加令牌,這時候消耗令牌的速度超過我們生成令牌的速度,可能就會導致一直處於警戒線之下,這時候判斷當然就需要根據最高 QPS 去判斷限流了。

 long restToken = storedTokens.get();
 if (restToken >= warningToken) {
  //當前的令牌超過警戒線,獲得超過警戒線的令牌數
  long aboveToken = restToken - warningToken;
  // 消耗的速度要比warning快,但是要比慢
  // current interval = restToken*slope+1/count
  double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
  if (passQps + acquireCount <= warningQps) {
   return true;
  }
 } else {
  if (passQps + acquireCount <= count) {
   return true;
  }
 }

所以,按照低 QPS 到突增高 QPS 的流程,來想象一下這個過程:

  1. 剛開始,系統的 QPS 非常低,初始化我們就直接把令牌桶塞滿了

  2. 然後這個低 QPS 的狀態持續了一段時間,因爲我們一直會填充最大 QPS 數量的令牌(因爲取最小值,所以其實桶裏令牌基本不會有變化),所以令牌桶一直處於滿的狀態,整個系統的限流也處於一個比較低的水平

這以上的部分一直處於警戒線之上,實際上就是叫做冷啓動 / 預熱的過程。

  1. 接着系統的 QPS 突然激增,令牌消耗速度太快,就算我們每次增加最大 QPS 數量的令牌任然無法維持消耗,所以桶裏的令牌在不斷低減少,這個時候,冷啓動階段的限制 QPS 也在不斷地提高,最後直到桶裏的令牌低於警戒線

  2. 低於警戒線之後,系統就會按照最高 QPS 去限流,這個過程就是系統在逐漸達到最高限流的過程

那這樣一來,實際就達到了我們處理突增流量的目的,整個系統在漫漫地適應突然飆高的 QPS,然後最終達到系統的 QPS 閾值。

  1. 最後,如果 QPS 回覆正常,那麼又會逐漸回到警戒線之上,就回到了最開始的過程。

總結

因爲算法如果單獨說的話都比較簡單,一說大家都可以聽明白,不需要幾個字就能說明白,所以還是得弄點源碼看看別人是怎麼玩的,所以儘管我很討厭放源碼,但是還是不得不幹。

光靠別人說一點其實有點看不明白,按照順序讀一遍的話心裏就有數了。

那源碼的話最難以理解的就是令牌桶的實現了,說實話那幾個計算的邏輯我看了好幾遍不知道他算的什麼鬼,但是思想我們理解就行了,其他的邏輯相對來說就比較容易理解。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/3cCP1JYBVPuAIORK0pBZZQ