時間輪原理及其在框架中的應用

作者:vivo 互聯網服務器團隊 - Li Wanghong

一、時間輪簡介

1.1 爲什麼要使用時間輪

在平時開發中,經常會與定時任務打交道。下面舉幾個定時任務處理的例子。

1)心跳檢測。在 Dubbo 中,需要有心跳機制來維持 Consumer 與 Provider 的長連接,默認的心跳間隔是 60s。當 Provider 在 3 次心跳時間內沒有收到心跳響應,會關閉連接通道。當 Consumer 在 3 次心跳時間內沒有收到心跳響應,會進行重連。Provider 側和 Consumer 側的心跳檢測機制都是通過定時任務實現的,而且是本篇文章要分析的時間輪 HashedWheelTimer 處理的。

2)超時處理。在 Dubbo 中發起 RPC 調用時,通常會配置超時時間,當消費者調用服務提供者出現超時進行一定的邏輯處理。那麼怎麼檢測任務調用超時了呢?我們可以利用定時任務,每次創建一個 Future,記錄這個 Future 的創建時間與超時時間,後臺有一個定時任務進行檢測,當 Future 到達超時時間並且沒有被處理時,就需要對這個 Future 執行超時邏輯處理。

3)Redisson 分佈式鎖續期。在分佈式鎖處理中,通常會指定分佈式鎖的超時時間,同樣會在 finally 塊裏釋放分佈式鎖。但是有一個問題時,通常分佈式鎖的超時時間不好判斷,如果設置短了業務卻沒執行完成就把鎖釋放掉了,或者超時時間設置很長,同樣也會存在一些問題。Redisson 提供了一種看門狗機制,通過時間輪定時給分佈式鎖續期,也就是延長分佈式鎖的超時時間。

可以看到,上述幾個例子都與定時任務有關,那麼傳統的定時任務有什麼缺點呢?爲什麼要使用時間輪來實現?

假如使用普通的定時任務處理機制來處理**例 2)**中的超時情況:

1)簡單地,可以針對每一次請求創建一個線程,然後 Sleep 到超時時間,之後若判斷超時則進行超時邏輯處理。存在的問題是如果面臨是高併發請求,針對每個請求都要去創建線程,這樣太耗費資源了。

2)針對方案 1 的不足,可以改成一個線程來處理所有的定時任務,比如這個線程可以每隔 50ms 掃描所有需要處理的超時任務,如果發現有超時任務,則進行處理。但是,這樣也存在一個問題,可能一段時間內都沒有任務達到超時時間,那麼就讓 CPU 多了很多無用的輪詢遍歷操作。

針對上述方案的不足,可以採用時間輪來進行處理。下面先來簡單介紹下時間輪的概念。

1.2 單層時間輪

我們先以單層時間輪爲例,假設時間輪的週期是 1 秒,時間輪中有 10 個槽位,則每個槽位代表 100ms。假設我們現在有 3 個任務,分別是任務 A(220ms 後執行)、B(410ms 之後運行)、C(1930ms 之後運行)。則這三個任務在時間輪所處的槽位如下圖,可以看到任務 A 被放到了槽位 2,任務 B 被放到了槽位 4,任務 C 被放到了槽位 9。

當時間輪轉動到對應的槽時,就會從槽中取出任務判斷是否需要執行。同時可以發現有一個剩餘週期的概念,這是因爲任務 C 的執行時間爲 1930ms,超過了時間輪的週期 1 秒,所以可以標記它的剩餘週期爲 1,當時間輪第一次轉動到它的位置時,發現它的剩餘週期爲 1,表示還沒有到要處理的時間,將剩餘週期減 1,時間輪繼續轉動,當下一次轉動到 C 任務位置時,發現剩餘週期爲 0,表示時間到了需要處理該定時任務了。Dubbo 中採用的就是這種單層時間輪機制。

圖片

1.3 多層時間輪

既然有單層時間輪,那麼自然而然可以想到利用多層時間輪來解決上述任務執行時間超出時間輪週期的情況。下面以兩層時間輪爲例,第一層時間輪週期爲 1 秒,第二層時間輪週期爲 10 秒。

還是以上述 3 個任務爲例,可以看到任務 A 和 B 分佈在第一層時間輪上,而任務 C 分佈在第二層時間輪的槽 1 處。當第一層時間輪轉動時,任務 A 和任務 B 會被先後執行。1 秒鐘之後,第一層時間輪完成了一個週期轉動。從新開始第 0 跳,這時第二層時間輪從槽 0 跳到了槽 1 處,將槽 1 處的任務,也就是任務 C 取出放入到第一層時間輪的槽位 9 處,當第一層時間輪轉動到槽位 9 處,任務 C 就會被執行。這種將第二層的任務取出放入第一層中稱爲降級,它是爲了保證任務被處理的時間精度。Kafka 內部就是採用的這種多層時間輪機制。

圖片

二、時間輪原理

下面先來看一下 Dubbo 中的時間輪的結構,可以看到,它和時鐘很像,它被劃分成了一個個 Bucket,每個 Bucket 有一個頭指針和尾指針,分別指向雙向鏈表的頭節點和尾節點,雙向鏈表中存儲的就是要處理的任務。時間輪不停轉動,當指向 Bucket0 所負責維護的雙向鏈表時,就將它所存儲的任務遍歷取出來處理。

圖片

下面我們先來介紹下 Dubbo 中時間輪 HashedWheelTimer 所涉及到的一些核心概念,在講解完這些核心概念之後,再來對時間輪的源碼進行分析。

2.1 TimerTask

在 Dubbo 中,TimerTask 封裝了要執行的任務,它就是上圖雙向鏈表中節點所封裝的任務。所有的定時任務都需要繼承 TimerTask 接口。如下圖,可以看到 Dubbo 中的心跳任務 HeartBeatTask、註冊失敗重試任務 FailRegisteredTask 等都實現了 TimerTask 接口。

public interface TimerTask {
    void run(Timeout timeout) throws Exception;
}

圖片

2.2 Timeout

TimerTask 中 run 方法的入參是 Timeout,Timeout 與 TimerTask 一一對應,Timeout 的唯一實現類 HashedWheelTimeout 中就封裝了 TimerTask 屬性,可以理解爲 HashedWheelTimeout 就是上述雙向鏈表的一個節點,因此它也包含兩個 HashedWheelTimeout 類型的指針,分別指向當前節點的上一個節點和下一個節點。

public interface Timeout {
    // Timer就是定時器, 也就是Dubbo中的時間輪
    Timer timer();
    // 獲取該節點要執行的任務
    TimerTask task();
    // 判斷該節點封裝的任務有沒有過期、被取消
    boolean isExpired();
    boolean isCancelled();
    // 取消該節點的任務
    boolean cancel();
}

HashedWheelTimeout 是 Timeout 的唯一實現,它的作用有兩個:

下面來看一下 Timeout 的實現類 HashedWheelTimeout 的核心字段與實現。

1) int ST_INIT = 0、int ST_CANCELLED = 1、int ST_EXPIRED = 2
   HashedWheelTimeout裏定義了三種狀態,分別表示任務的初始化狀態、被取消狀態、已過期狀態
2) STATE_UPDATER
   用於更新定時任務的狀態
3) HashedWheelTimer timer
   指向時間輪對象
4) TimerTask task
   實際要執行的任務
5) long deadline
   指定時任務執行的時間,這個時間是在創建 HashedWheelTimeout 時指定的
   計算公式是: currentTime(創建 HashedWheelTimeout 的時間) + delay(任務延遲時間)
             - startTime(HashedWheelTimer 的啓動時間),時間單位爲納秒
6) int state = ST_INIT
   任務初始狀態
7) long remainingRounds
   指當前任務剩餘的時鐘週期數. 時間輪所能表示的時間長度是有限的, 在任務到期時間與當前時刻
   的時間差超過時間輪單圈能表示的時長,就出現了套圈的情況,需要該字段值表示剩餘的時鐘週期
8) HashedWheelTimeout next、HashedWheelTimeout prev
   分別對應當前定時任務在鏈表中的前驅節點和後繼節點,這也驗證了時間輪中每個槽所對應的任務鏈表是
   一個雙鏈表
9) HashedWheelBucket bucket
   時間輪中的一個槽,對應時間輪圓圈的一個個小格子,每個槽維護一個雙向鏈表,當時間輪指針轉到當前
   槽時,就會從槽所負責的雙向鏈表中取出任務進行處理

HashedWheelTimeout 提供了 remove 操作,可以從雙向鏈表中移除當前自身節點,並將當前時間輪所維護的定時任務數量減一。

void remove() {
    // 獲取當前任務屬於哪個槽
    HashedWheelBucket bucket = this.bucket;
    if (bucket != null) {
        // 從槽中移除自己,也就是從雙向鏈表中移除節點,
        // 分析bucket的方法時會分析
        bucket.remove(this);
    } else {
        // pendingTimeouts表示當前時間輪所維護的定時任務的數量
        timer.pendingTimeouts.decrementAndGet();
    }
}

HashedWheelTimeout 提供了 cancel 操作,可以取消時間輪中的定時任務。當定時任務被取消時,它會首先被暫存到 canceledTimeouts 隊列中。在時間輪轉動到槽進行任務處理之前和時間輪退出運行時都會調用 cancel,而 cancel 會調用 remove,從而清理該隊列中被取消的定時任務。

@Override
public boolean cancel() {
    // 通過CAS進行狀態變更
    if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
        return false;
    }
    // 任務被取消時,時間輪會將它暫存到時間輪所維護的canceledTimeouts隊列中.
    // 在時間輪轉動到槽進行任務處理之前和時間輪退出運行時都會調用cancel,而
    // cancel會調用remove,從而清理該隊列中被取消的定時任務
    timer.cancelledTimeouts.add(this);
    return true;
}

HashedWheelTimeout 提供了 expire 操作,當時間輪指針轉動到某個槽時,會遍歷該槽所維護的雙向鏈表,判斷節點的狀態,如果發現任務已到期,會通過 remove 方法移除,然後調用 expire 方法執行該定時任務。

public void expire() {
    // 修改定時任務狀態爲已過期
    if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
        return;
    }
    try {
        // 真正的執行定時任務所要代表的邏輯
        task.run(this);
    } catch (Throwable t) {
        // 打印日誌,可以看到當時間輪中定時任務執行異常時,
        // 不會拋出異常,影響到時間輪中其他定時任務執行
    }
}

2.3 HashedWheelBucket

前面也介紹過了,它是時間輪中的槽,它內部維護了雙向鏈表的首尾指針。下面我們來看一下它內部的核心資源和實現。

1) HashedWheelTimeout head、HashedWheelTimeout tail
   指向該槽所維護的雙向鏈表的首節點和尾節點

HashedWheelBucket 提供了 addTimeout 方法,用於添加任務到雙向鏈表的尾節點。

void addTimeout(HashedWheelTimeout timeout) {
    // 添加之前判斷一下該任務當前沒有被被關聯到一個槽上
    assert timeout.bucket == null;
    timeout.bucket = this;
    if (head == null) {
        head = tail = timeout;
    } else {
        tail.next = timeout;
        timeout.prev = tail;
        tail = timeout;
    }
}

HashedWheelBucket 提供了 remove 方法,用於從雙向鏈表中刪除指定節點。核心邏輯如下圖所示,根據要刪除的節點找到其前置節點和後置節點,然後分別調整前置節點的 next 指針和後置節點的 prev 指針。刪除過程中需要考慮一些邊界情況。刪除之後將 pendingTimeouts,也就是當前時間輪的待處理任務數減一。remove 代碼邏輯較簡單,這邊就不貼代碼了。

圖片

HashedWheelBucket 提供了 expireTimeouts 方法,當時間輪指針轉動到某個槽時,通過該方法處理該槽上雙向鏈表的定時任務,分爲 3 種情況:

void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;
    // 時間輪指針轉到某個槽時從雙向鏈表頭節點開始遍歷
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        // remainingRounds <= 0表示到期了
        if (timeout.remainingRounds <= 0) {
            // 從鏈表中移除該節點
            next = remove(timeout);
            // 判斷該定時任務確實是到期了
            if (timeout.deadline <= deadline) {
                // 執行該任務
                timeout.expire();
            } else {
                // 拋異常
            }
        } else if (timeout.isCancelled()) {
            // 任務被取消,移除後直接丟棄
            next = remove(timeout);
        } else {
            // 剩餘時鐘週期減一
            timeout.remainingRounds--;
        }
        // 繼續判斷下一個任務節點
        timeout = next;
    }
}

HashedWheelBucket 也提供了 clearTimeouts 方法,該方法會在時間輪停止的時候被使用,它會遍歷並移除所有雙向鏈表中的節點,並返回所有未超時和未被取消的任務。

2.4 Worker

Worker 實現了 Runnable 接口,時間輪內部通過 Worker 線程來處理放入時間輪中的定時任務。下面先來看一下它的核心字段和 run 方法邏輯。

1) Set<Timeout> unprocessedTimeouts
   當時間輪停止時,用於存放時間輪中未過期的和未被取消的任務
2) long tick
   時間輪指針,指向時間輪中某個槽,當時間輪轉動時該tick會自增

圖片

public void run() {
    // 初始化startTime, 所有任務的的deadline都是相對於這個時間點
    startTime = System.nanoTime();
    // 喚醒阻塞在start()的線程
    startTimeInitialized.countDown();
    // 只要時間輪的狀態爲WORKER_STATE_STARTED, 就循環的轉動tick,
    // 處理槽中的定時任務
    do {
        // 判斷是否到了處理槽的時間了,還沒到則sleep一會
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // 獲取tick對應的槽索引
            int idx = (int) (tick & mask);
            // 清理用戶主動取消的定時任務, 這些定時任務在用戶取消時,
            // 會記錄到 cancelledTimeouts 隊列中. 在每次指針轉動
            // 的時候,時間輪都會清理該隊列
            processCancelledTasks();
            // 根據當前指針定位對應槽
            HashedWheelBucket bucket = wheel[idx];
            // 將緩存在 timeouts 隊列中的定時任務轉移到時間輪中對應的槽中
            transferTimeoutsToBuckets();
            // 處理該槽位的雙向鏈表中的定時任務
            bucket.expireTimeouts(deadline);
            tick++;
        }
        // 檢測時間輪的狀態, 如果時間輪處於運行狀態, 則循環執行上述步驟,
        // 不斷執行定時任務
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this)
                                    == WORKER_STATE_STARTED);
    // 這裏應該是時間輪停止了, 清除所有槽中的任務, 並加入到未處理任務列表,
    // 以供stop()方法返回
    for (HashedWheelBucket bucket : wheel) {
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    // 將還沒有加入到槽中的待處理定時任務隊列中的任務取出, 如果是未取消
    // 的任務, 則加入到未處理任務隊列中, 以供stop()方法返回
    for (; ; ) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    // 最後再次清理 cancelledTimeouts 隊列中用戶主動取消的定時任務
    processCancelledTasks();
}

下面對 run 方法中涉及到的一些方法進行介紹:

1)waitForNextTick

邏輯比較簡單,它會判斷有沒有到達處理下一個槽任務的時間了,如果還沒有到達則 sleep 一會。

2)processCancelledTasks

遍歷 cancelledTimeouts,獲取被取消的任務並從雙向鏈表中移除。

private void processCancelledTasks() {
    for (; ; ) {
        HashedWheelTimeout timeout = cancelledTimeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        timeout.remove();
    }
}

3)transferTimeoutsToBuckets

當調用 newTimeout 方法時,會先將要處理的任務緩存到 timeouts 隊列中,等時間輪指針轉動時統一調用 transferTimeoutsToBuckets 方法處理,將任務轉移到指定的槽對應的雙向鏈表中,每次轉移 10 萬個,以免阻塞時間輪線程。

private void transferTimeoutsToBuckets() {
    // 每次tick只處理10w個任務, 以免阻塞worker線程
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        // 沒有任務了直接跳出循環
        if (timeout == null) {
            // all processed
            break;
        }
        // 還沒有放入到槽中就取消了, 直接略過
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            continue;
        }
        // 計算任務需要經過多少個tick
        long calculated = timeout.deadline / tickDuration;
        // 計算任務的輪數
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        // 如果任務在timeouts隊列裏面放久了, 以至於已經過了執行時間, 這個時候
        // 就使用當前tick, 也就是放到當前bucket, 此方法調用完後就會被執行.
        final long ticks = Math.max(calculated, tick);
        int stopIndex = (int) (ticks & mask);
        // 將任務加入到相應的槽中
        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

2.5 HashedWheelTimer

最後,我們來分析時間輪 HashedWheelTimer,它實現了 Timer 接口,提供了 newTimeout 方法可以向時間輪中添加定時任務,該任務會先被暫存到 timeouts 隊列中,等時間輪轉動到某個槽時,會將該 timeouts 隊列中的任務轉移到某個槽所負責的雙向鏈表中。它還提供了 stop 方法用於終止時間輪,該方法會返回時間輪中未處理的任務。它也提供了 isStop 方法用於判斷時間輪是否終止了。

先來看一下 HashedWheelTimer 的核心字段。

1) HashedWheelBucket[] wheel
   該數組就是時間輪的環形隊列,數組每個元素都是一個槽,一個槽負責維護一個雙向鏈表,用於存儲定時
   任務。它會被在構造函數中初始化,當指定爲n時,它實際上會取最靠近n的且爲2的冪次方值。
2) Queue<HashedWheelTimeout> timeouts
   timeouts用於緩存外部向時間輪提交的定時任務
3) Queue<HashedWheelTimeout> cancelledTimeouts
   cancelledTimeouts用於暫存被取消的定時任務,時間輪會在處理槽負責的雙向鏈表之前,先處理這兩
   個隊列中的數據。
4) Worker worker
   時間輪處理定時任務的邏輯
5) Thread workerThread
   時間輪處理定時任務的線程
6) AtomicLong pendingTimeouts
   時間輪剩餘的待處理的定時任務數量
7) long tickDuration
   時間輪每個槽所代表的時間長度
8) int workerState
   時間輪狀態,可選值有init、started、shut down

下面來看一下時間輪的構造函數,用於初始化一個時間輪。首先它會對傳入參數 ticksPerWheel 進行轉換處理,返回大於該值的 2 的冪次方,它表示時間輪上有多少個槽,默認是 512 個。然後創建大小爲該值的 HashedWheelBucket[] 數組。接着通過傳入的 tickDuration 對時間輪的 tickDuration 賦值,默認是 100ms。節通過 threadFactory 創建 workerThread 工作線程,該線程就是負責處理時間輪中的定時任務的線程。

public HashedWheelTimer(ThreadFactory threadFactory,
                        long tickDuration, TimeUnit unit,
                        int ticksPerWheel,
                        long maxPendingTimeouts) {
    // 圓環上一共有多少個時間間隔, HashedWheelTimer對其正則化
    // 將其換算爲大於等於該值的2^n
    wheel = createWheel(ticksPerWheel);
    // 這用來快速計算任務應該呆的槽
    mask = wheel.length - 1;
    // 時間輪每個槽的時間間隔
    this.tickDuration = unit.toNanos(tickDuration);
    // threadFactory是創建線程的線程工廠對象
    workerThread = threadFactory.newThread(worker);
    // 最多允許多少個任務等待執行
    this.maxPendingTimeouts = maxPendingTimeouts;
}
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
    // 計算真正應當創建多少個槽
    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    // 初始化時間輪數組
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for (int i = 0; i < wheel.length; i++) {
        wheel[i] = new HashedWheelBucket();
    }
    return wheel;
}

初始化時間輪之後,就可以向其中提交定時任務了,可以通過時間輪提供的 newTimeout 方法來完成。首先將待處理的任務數量加 1,然後啓動時間輪線程,這時 worker 的 run 方法就會被系統調度運行。然後將該定時任務封裝成 HashedWheelTimeout 加入到 timeouts 隊列中。start 之後,時間輪就開始運行起來了,直到外界調用 stop 方法終止退出。

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    // 待處理的任務數量加1
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    // 啓動時間輪
    start();
    // 計算該定時任務的deadline
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    // 創建一個HashedWheelTimeout對象,它首先會被暫存到timeouts隊列中
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}
public void start() {
    /**
      * 判斷當前時間輪的狀態
      * 1) 如果是初始化, 則啓動worker線程, 啓動整個時間輪
      * 2) 如果已經啓動則略過
      * 3) 如果是已經停止,則報錯
      */
    switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            // 使用cas來判斷啓動時間輪
            if (WORKER_STATE_UPDATER.compareAndSet(this,
                     WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            // 拋異常
        default:
            throw new Error("Invalid WorkerState");
    }
    // 等待worker線程初始化時間輪的啓動時間
    while (startTime == 0) {
        try {
            // 這裏使用countDownLatch來確保調度的線程已經被啓動
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

三、時間輪應用

到這裏,Dubbo 中的時間輪原理就分析完了。接下來呼應本文開頭的三個例子,結合它們來分析下時間輪在 Dubbo 或 Redisson 中是如何使用的。

1)HeartbeatTimerTask

在 Dubbo 的 HeaderExchangeClient 類中會向時間輪中提交該心跳任務。

private void startHeartBeatTask(URL url) {
    // Client的具體實現決定是否啓動該心跳任務
    if (!client.canHandleIdle()) {
        AbstractTimerTask.ChannelProvider cp =
          () -> Collections.singletonList(HeaderExchangeClient.this);
        // 計算心跳間隔, 最小間隔不能低於1s
        int heartbeat = getHeartbeat(url);
        long heartbeatTick = calculateLeastDuration(heartbeat);
        // 創建心跳任務
        this.heartBeatTimerTask =
               new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
        // 提交到IDLE_CHECK_TIMER這個時間輪中等待執行, 等時間到了時間輪就會去取出該任務進行調度執行
        IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
    }
}
// 上面用到的IDLE_CHECK_TIMER就是我們本文的分析的時間輪
private static final HashedWheelTimer IDLE_CHECK_TIMER =
                              new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
// 上述創建心跳任務時, 創建了一個HeartbeatTimerTask對象, 可以看下該任務具體要做什麼
@Override
protected void doTask(Channel channel) {
    try {
        // 獲取最後一次讀寫時間
        Long lastRead = lastRead(channel);
        Long lastWrite = lastWrite(channel);
        if ((lastRead != null && now() - lastRead > heartbeat)
            || (lastWrite != null && now() - lastWrite > heartbeat)) {
            // 最後一次讀寫時間超過心跳時間, 就會發送心跳請求
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            // 表明它是一個心跳請求
            req.setEvent(HEARTBEAT_EVENT);
            channel.send(req);
        }
    } catch (Throwable t) {
    }
}

2)Redisson 鎖續期機制

當獲取鎖成功後,Redisson 會封裝一個鎖續期任務放入時間輪中,默認 10s 檢查一下,用於對獲取到的鎖進行續期,延長持有鎖的時間。如果業務機器宕機了,那麼該續期的定時任務也就沒法跑了,就沒法續期了,那等加鎖時間到了鎖就自動釋放了。邏輯封裝在 RedissonLock 中的 renewExpiration() 方法中。

圖片

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 這邊newTimeout點進去發現就是往時間輪中提交了一個任務
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                if (res) {
                    // 續期成功後繼續調度, 又往時間輪中放一個續期任務
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    // 通過lua腳本對鎖進行續期
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getName()),
                          internalLockLeaseTime, getLockName(threadId));
}

3)超時重試

使用方式和 HeartbeatTimerTask 方式類似,讀者可以自己動手去分析下它是在哪裏被引入的。

四、總結

在本篇文章中,先是舉了 3 個例子來論述爲什麼需要使用時間輪,使用時間輪的優點,在文末處也分別對這 3 個例子在 Dubbo 或 Redisson 中的使用做了介紹。接着通過畫圖講解了單層時間輪與多層時間輪機制,讓讀者對時間輪算法有了一個簡單的認識。在第二部分,依次講解了 Dubbo 時間輪中涉及到的 TimerTask、Timeout、HashedWheelBucket、Worker、HashedWheelTimer,分析了它們的原理與源碼實現。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ZmfvujXMXUe9O0PjXuq-uA