定時任務之時間輪

定時任務的基礎知識

首先,我們先了解下什麼是定時任務?定時器有非常多的使用場景,大家在平時工作中應該經常遇到,例如生成月統計報表、財務對賬、會員積分結算、郵件推送等,都是定時器的使用場景。定時器一般有三種表現形式:按固定週期定時執行、延遲一定時間後執行、指定某個時刻執行。

定時器的本質是設計一種數據結構,能夠存儲和調度任務集合,而且 deadline 越近的任務擁有更高的優先級。那麼定時器如何知道一個任務是否到期了呢?定時器需要通過輪詢的方式來實現,每隔一個時間片去檢查任務是否到期。

所以定時器的內部結構一般需要一個任務隊列和一個異步輪詢線程,並且能夠提供三種基本操作:

JDK 原生提供了三種常用的定時器實現方式,分別爲 Timer、DelayedQueue 和 ScheduledThreadPoolExecutor。下面我們逐一對它們進行介紹。

Timer

Timer 屬於 JDK 比較早期版本的實現,它可以實現固定週期的任務,以及延遲任務。Timer 會起動一個異步線程去執行到期的任務,任務可以只被調度執行一次,也可以週期性反覆執行多次。我們先來看下 Timer 是如何使用的,示例代碼如下:

Timer timer = newTimer();
timer.scheduleAtFixedRate(newTimerTask(){
    @Override
    public voidrun(){
        // do something
    }
}, 10000, 1000);  // 10s 後調度一個週期爲 1s 的定時任務

可以看出,任務是由 TimerTask 類實現,TimerTask 是實現了 Runnable 接口的抽象類,Timer 負責調度和執行 TimerTask。接下來我們看下 Timer 的內部構造。

public class Timer {
    private final TaskQueue queue = newTaskQueue();
    private final TimerThread thread = newTimerThread(queue);
    public Timer(String name){
        thread.setName(name);
        thread.start();
    }
}

TaskQueue 是由數組結構實現的小根堆,deadline 最近的任務位於堆頂端,queue[1] 始終是最優先被執行的任務。所以使用小根堆的數據結構,Run 操作時間複雜度 O(1),新增 Schedule 和取消 Cancel 操作的時間複雜度都是 O(logn)。

Timer 內部啓動了一個 TimerThread 異步線程,不論有多少任務被加入數組,始終都是由 TimerThread 負責處理。TimerThread 會定時輪詢 TaskQueue 中的任務,如果堆頂的任務的 deadline 已到,那麼執行任務;如果是週期性任務,執行完成後重新計算下一次任務的 deadline,並再次放入小根堆;如果是單次執行的任務,執行結束後會從 TaskQueue 中刪除。

DelayedQueue

DelayedQueue 是 JDK 中一種可以延遲獲取對象的阻塞隊列,其內部是採用優先級隊列 PriorityQueue 存儲對象。DelayQueue 中的每個對象都必須實現 Delayed 接口,並重寫 compareTo 和 getDelay 方法。DelayedQueue 的使用方法如下:

public class DelayQueueTest {
    public static voidmain(String[] args) throws Exception {
        BlockingQueue<SampleTask> delayQueue = new DelayQueue<>();
        long now = System.currentTimeMillis();
        delayQueue.put(newSampleTask(now + 1000));
        delayQueue.put(newSampleTask(now + 2000));
        delayQueue.put(newSampleTask(now + 3000));
        for(int i = 0; i <3; i++){
            System.out.println(newDate(delayQueue.take().getTime()));
        }
    }
    static class SampleTask implements Delayed {
        long time;
        public SampleTask(long time){
            this.time = time;
        }
        public long getTime(){
            return time;
        }
        @Override
        public int compareTo(Delayed o){
            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }
        @Override
        public long getDelay(TimeUnit unit){
            return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    }
}

DelayQueue 提供了 put() 和 take() 的阻塞方法,可以向隊列中添加對象和取出對象。對象被添加到 DelayQueue 後,會根據 compareTo() 方法進行優先級排序。getDelay() 方法用於計算消息延遲的剩餘時間,只有 getDelay <=0 時,該對象才能從 DelayQueue 中取出。

DelayQueue 在日常開發中最常用的場景就是實現重試機制。例如,接口調用失敗或者請求超時後,可以將當前請求對象放入 DelayQueue,通過一個異步線程 take() 取出對象然後繼續進行重試。如果還是請求失敗,繼續放回 DelayQueue。爲了限制重試的頻率,可以設置重試的最大次數以及採用指數退避算法設置對象的 deadline,如 2s、4s、8s、16s …… 以此類推。

相比於 Timer,DelayQueue 只實現了任務管理的功能,需要與異步線程配合使用。DelayQueue 使用優先級隊列實現任務的優先級排序,新增 Schedule 和取消 Cancel 操作的時間複雜度也是 O(logn)。

ScheduledThreadPoolExecutor

上文中介紹的 Timer 其實目前並不推薦用戶使用,它是存在不少設計缺陷的。

爲了解決 Timer 的設計缺陷,JDK 提供了功能更加豐富的 ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor 提供了週期執行任務和延遲執行任務的特性,下面通過一個例子先看下 ScheduledThreadPoolExecutor 如何使用。

public class ScheduledExecutorServiceTest {
    public static voidmain(String[] args){
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        executor.scheduleAtFixedRate(() -> System.out.println("Hello World"), 1000, 2000, TimeUnit.MILLISECONDS); // 1s 延遲後開始執行任務,每 2s 重複執行一次
    }
}

ScheduledThreadPoolExecutor 繼承於 ThreadPoolExecutor,因此它具備線程池異步處理任務的能力。線程池主要負責管理創建和管理線程,並從自身的阻塞隊列中不斷獲取任務執行。線程池有兩個重要的角色,分別是任務和阻塞隊列。ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基礎上,重新設計了任務 ScheduledFutureTask 和阻塞隊列 DelayedWorkQueue。ScheduledFutureTask 繼承於 FutureTask,並重寫了 run() 方法,使其具備週期執行任務的能力。DelayedWorkQueue 內部是優先級隊列,deadline 最近的任務在隊列頭部。對於週期執行的任務,在執行完會重新設置時間,並再次放入隊列中。ScheduledThreadPoolExecutor 的實現原理可以用下圖表示:

以上我們簡單介紹了 JDK 三種實現定時器的方式。可以說它們的實現思路非常類似,都離不開任務、任務管理、任務調度三個角色。三種定時器新增和取消任務的時間複雜度都是 O(nlog(n)),面對海量任務插入和刪除的場景,這三種定時器都會遇到比較嚴重的性能瓶頸。因此,對於性能要求較高的場景,我們一般都會採用時間輪算法。

時間輪原理

如果一個系統中存在着大量的調度任務,而大量的調度任務如果每一個都使用自己的調度器來管理任務的生命週期的話,浪費 cpu 的資源並且很低效。時間輪是一種高效來利用線程資源來進行批量化調度的一種調度模型。把大批量的調度任務全部都綁定到同一個的調度器上面,使用這一個調度器來進行所有任務的管理(manager),觸發(trigger)以及運行(runnable)。能夠高效的管理各種延時任務,週期任務,通知任務等等。

時間輪算法的核心是:輪詢線程不再負責遍歷所有任務,而是僅僅遍歷時間刻度。時間輪算法好比指針不斷在時鐘上旋轉、遍歷,如果一個發現某一時刻上有任務(任務隊列),那麼就會將任務隊列上的所有任務都執行一遍。

時間輪算法不再將任務隊列作爲數據結構,其數據結構如下圖所示(我們以小時爲單位):

顯而易見,時間輪算法解決了遍歷效率低的問題。時間輪算法中,輪詢線程遍歷到某一個時間刻度後,總是執行對應刻度上任務隊列中的所有任務(通常是將任務扔給異步線程池來處理),而不再需要遍歷檢查所有任務的時間戳是否達到要求。

現在,即使有 10k 個任務,輪詢線程也不必每輪遍歷 10 k 個任務,而僅僅需要遍歷 24 個時間刻度。

一個以小時爲單位的時間輪算法就這麼簡單地實現了。不過,小時作爲時間單位粒度太大,我們有時候會希望基於分鐘作爲時間刻度。最直接的方式是增加時間刻度,每一天有 24 * 60 = 1440。此時時間輪的數據結構如下:

通過增加時間刻度,我們可以基於更精細的時間單位(分鐘)來進行定時任務的執行。但是,這種實現方式有如下的缺陷:

如果要將時間精度設爲秒,那麼整個時間輪將需要 86400 個單位的時間刻度,此時時間輪算法的遍歷線程將遇到更大的運行效率低的問題。

分層時間輪算法

分層的時間輪算法在生活中有對應的模型,那就是水錶:

此時,我們有秒、分鐘、小時級別的三個時間輪,每一個時間輪分別有 60、60、24 個刻度。

分層時間輪如下圖所示:

假設我們的任務需要在每天的 7:30:20 秒執行一次。任務首先添加於秒級別時鐘輪的第 20 號刻度上,當其輪詢線程訪問到第 20 號刻度時,就將此任務轉移到分鐘級別時鐘輪的第 30 號刻度上。當分鐘級別的時鐘輪線程訪問到第 30 號刻度,就將此任務轉移到小時級別時鐘輪的第 7 號刻度上。當小時級別時鐘輪線程訪問到第 7 號刻度時,最終會將任務交給異步線程負責執行,然後將任務再次註冊到秒級別的時間輪中。分層時間輪中的任務從一個時間輪轉移到另一個時間輪,這類似於水錶中小單位的錶轉彎一圈會導致高單位的表前進一個單位一樣。

由於時間輪在 Netty、Akka、Quartz、ZooKeeper 、Kafka 等組件中都存在,所以這裏不對具體實現和用法做詳解的講解。

作者:錢魏 Way

來源:www.biaodianfu.com/timingwheel.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OSEIkgE7XDr4zoiCKFmdSw