自己寫一個分佈式定時任務框架 - 負載均衡 - OpenAPI 異步調用!

項目背景

目前的定時任務框架已經很成熟,從 QuartZ 到 xxl-job,再到近幾年出現的 PowerJob,既然有這麼多的好的實現,爲什麼還是選擇重寫一個定時任務框架呢?

開發中遇到這樣的場景,業務層面需要頻繁的創建修改定時任務,在考慮分佈式的架構下,對於目前可以實現該功能的框架中:

主流框架往往爲了適配更多的場景,支持足夠多的功能,往往體積大,且不易動態擴展,爲了對項目有最大的控制,在解決以上業務場景的前提下,進行部分功能的修剪,也希望能更好的從中學習主流框架的設計思想,於是決定重寫一個定時任務框架。

本文章主要介紹該項目相對於目前主流定時任務框架的特性,對於定時任務調度和發現的詳細可以見源碼,文章末尾也給出了流程圖方便理解(關於這部分作者對於 PowerJob 的原先設計也做了部分剪枝,相對於原來的框架更易理解和學習,後面可能會推出相關講解)

項目地址:

https://github.com/karatttt/k-job

定位

這是一個基於 PowerJob 的重寫和重構版本,修改和擴展了原始項目的功能,以更好地適配業務需求。

技術選型

通信 : gRPC(基於netty的nio)
序列化 : Protobuf編碼格式編解碼
負載均衡 :自己實現的註冊中心NameServer
    |___ 策略 : 服務端最小調度次數策略
    |___ 交互 :pull+push
消息隊列 : 自己實現的簡易消息隊列
    |___ 消息發送 : 異步+超時重試
    |___ 持久化 :mmap+同步刷盤策略
    |___ 消息重試 :多級延時隊列+死信隊列
定時調度 : 時間輪算法

項目結構

├── LICENSE
├── k-job-common // 各組件的公共依賴,開發者無需感知
├── k-job-nameServer // server和worker的註冊中心,提供負載均衡
├── k-job-producer //普通Jar包,提供 OpenAPI,內置消息隊列的異步發送
├── k-job-server // 基於SpringBoot實現的調度服務器
├── k-job-worker-boot-starter // kjob-worker 的 spring-boot-starter ,spring boot 應用可以通用引入該依賴一鍵接入 kjob-server 
├── k-job-worker // 普通Jar包,接入kjob-server的應用需要依賴該Jar包
└── pom.xml

特性

負載均衡(解決大量定時任務併發執行場景)

對於 worker 的負載均衡策略有許多且已經由較好的解決(輪詢,健康值等),但是,我們目前的系統存在大量的定時任務,考慮 server 層面,可能會存在以下情況:

在分佈式系統下,解決定時任務併發執行往往考慮 server 集羣的負載均衡(這裏的負載均衡特指 server 集羣能夠根據自身負載,動態調度 worker 集羣),但是對於定時任務框架,需要關注集羣下的任務重複調度問題,目前的定時任務框架大都爲了解決該問題而不能較好實現負載均衡

通過查看源碼,xxljob 的調度,在每次查詢數據庫獲取任務前,通過數據庫行鎖進行了全局加鎖,保證同一時刻只有一個 server 在進行調度來避免重複調度,但是無法發揮集羣 server 的調度能力

對於 powerjob 的調度,通過分組隔離機制(詳細可以看官方文檔)避免了重複調度,但是同樣帶來了問題:同一 app 下的 worker 集羣只能被一臺 server 調度,如果該 server 的任務太多了呢?如果只有一個業務對應的 app,如何用 server 集羣來負載均衡呢?

基於以上問題,增加了一個註冊中心nameServer模塊來負責負載均衡:

最小調度次數策略:NameServer記錄 server 集羣狀態並維護各個 server 的分配任務次數,由於 server 是否調度某個 worker 由表中數據決定,worker 會在每次 pull 判斷是否發起請求更新 server 中的調度關係表,並將目前分組交由最小調度次數的 server 來調度,當且僅當以下發生:

考慮到 server 的地理位置,通信效率等因素,後續可以考慮增加每個 server 的權重來更優分配

關鍵代碼如下:

 public ReBalanceInfo getServerAddressReBalanceList(String serverAddress, String appName) {
        // first req, serverAddress is empty
        if(serverAddress.isEmpty()){
            ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
            reBalanceInfo.setSplit(false);
            reBalanceInfo.setServerIpList(new ArrayList<String>(serverAddressSet));
            reBalanceInfo.setSubAppName("");
            return reBalanceInfo;
        }
        ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
        // get sorted scheduleTimes serverList
        List<String> newServerIpList = serverAddress2ScheduleTimesMap.keySet().stream().sorted(new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                return (int) (serverAddress2ScheduleTimesMap.get(o1) - serverAddress2ScheduleTimesMap.get(o2));
            }
        }).collect(Collectors.toList());

        // see if split
        if(!appName2WorkerNumMap.isEmpty() && appName2WorkerNumMap.get(appName) > maxWorkerNum && appName2WorkerNumMap.get(appName) % maxWorkerNum == 1){
            // return new serverIpList
            reBalanceInfo.setSplit(true);
            reBalanceInfo.setChangeServer(false);
            reBalanceInfo.setServerIpList(newServerIpList);
            reBalanceInfo.setSubAppName(appName + ":" + appName2WorkerNumMap.size());
            return reBalanceInfo;
        }
        // see if need change server
        Long lestScheduleTimes = serverAddress2ScheduleTimesMap.get(newServerIpList.get(newServerIpList.size() - 1));
        Long comparedScheduleTimes = lestScheduleTimes == 0 ? 1 : lestScheduleTimes;
        if(serverAddress2ScheduleTimesMap.get(serverAddress) / comparedScheduleTimes > 2){
            reBalanceInfo.setSplit(false);
            reBalanceInfo.setChangeServer(true);
            // first server is target lest scheduleTimes server
            reBalanceInfo.setServerIpList(newServerIpList);
            reBalanceInfo.setSubAppName("");
            return reBalanceInfo;
        }
        // return default list
        reBalanceInfo.setSplit(false);
        reBalanceInfo.setServerIpList(new ArrayList<String>(serverAddressSet));
        reBalanceInfo.setSubAppName("");
        return reBalanceInfo;

    }
實現功能:

以上,解決 PowerJob 中同一 worker 分組只能被一個 server 調度問題,且 subApp 分組可以根據 server 的負載,實現動態依附至不同 server,對於可能的重複調度問題,我們只需加上 App 級別的鎖,相對於 xxl-job 的全局加鎖性能更好。

消息隊列(解決任務大量創建和頻繁更改場景)

其實一開始用 powerjob 作爲項目中的中間件,業務中的任務操作使用其 openAPI。過程中感受最大的就是,我的業務只是根據任務 id 修改了任務參數,並不需要 server 的響應,爲什麼要同步阻塞?可靠性應由 server 保證而不是客戶端的大量重試及等待。對於業務中頻繁創建定時任務和改動,更應是異步操作。

一開始的想法是,使用 grpc 的futureStub進行異步發送,請求由 Reactor 線程監聽事件,當事件可讀時分配給業務線程池進行處理(gRPC 內部已經實現)。所以需要做的似乎只是做一個 Producer 服務,並把 stub 全換成 Future 類型,對於 jobId,我們用雪花算法拿到一個全局 id 就可以,無需 server 分配。

但是以上設計有一個致命的問題 ------ **阻塞在 BlockingQueue 的請求無法 ack,且 server 宕機存在消息丟失的可能!**這違背了消息隊列的設計(入隊 --ack-- 持久化 -- 消費),意味着只有被分配到線程(消費者)消費時,才能被 ack,而活躍的線程數並不多。故 不能僅僅依賴 gRPC 的內部實現,需要自己實現消息隊列

可靠消息

以 rocketMQ 爲例,producer 的消息會先到達 broker 中的隊列後返回 ack,consumer 再輪詢從 broker 中 pull 重平衡處理後的消息消費。

考慮到本項目的設計 無需路由,所有的 server 都可以接受消息,於是不再設計 broker,將 server 和 broker 結合,每個 server 維護自己的隊列,且消費自己隊列的消息,這樣還能減少一次通信。

這樣可靠消息的解決就變成了:

持久化: 同步刷盤機制借鑑了 rocketMQ 的 mmap 和commitLog/consumerQueue設計,將磁盤的文件映射到內存進行讀寫,每次消息進來先存到 buffer 後觸發刷盤,成功後執行寫響應的回調;用consumerQueue文件作爲隊列,server 定時 pull 消費消息,詳細見k-job-server.consumer.DefaultMessageStore,有詳細註釋

// 和rocketMQ一樣,讀寫都是用mmap,因爲內存buffer就是文件的映射,只是有刷盤機制
    private MappedByteBuffer commitLogBuffer;  // 映射到內存的commitlog文件
    private MappedByteBuffer consumerQueueBuffer; // 映射到內存的consumerQueue文件
    private final AtomicLong commitLogBufferPosition = new AtomicLong(0);// consumerLog的buffer的位置,同步刷盤的情況下與consumerLog文件的位置保持一致
    private final AtomicLong commitLogCurPosition = new AtomicLong(0);// consumerLog文件的目前位置,每次刷盤後就等於buffer位置
    private final AtomicLong lastProcessedOffset = new AtomicLong(0);// consumerQueue的buffer拉取commitLog的位置,與commitLog相比,重啓時就是consumerQueue文件最後一條消息的索引位置
    private final AtomicLong currentConsumerQueuePosition = new AtomicLong(0); // consumerQueue文件的目前位置
    private final AtomicLong consumerPosition = new AtomicLong(0); // 記錄消費者在consumerQueue中的消費位置,這個只在目前的系統中有,類似於rocketMQ通過pull遠程拉取
消息重試

對於 producer,前面提到,爲了應對大量定時任務的場景,對於任務的操作,應全部是異步的,我們引入超時機制即可,當超過一定的時間未收到 ack,或者返回錯誤響應,選擇下一個 server 發起重試

對於 consumer(server),使用多級延時隊列,當某個消息消費失敗後,投遞至下一級延遲更久的延時隊列,若全都消費失敗則進入死信隊列,需要人工干預

 private staticfinal Deque<MqCausa.Message> deadMessageQueue = new ArrayDeque<>();

    privatestaticfinal List<DelayQueue<DelayedMessage>> delayQueueList = new ArrayList<>(2);
    /**
     * 逆序排序,因爲重試次數到0則不再重試
     */
    privatestatic List<Long> delayTimes = Lists.newArrayList(10000L, 5000L);
    public static void init(Consumer consumer) {
        delayQueueList.add(new DelayQueue<>());
        delayQueueList.add(new DelayQueue<>());
        Thread consumerThread1 = new Thread(() -> {
            try {
                while (true) {
                    // 從延時隊列中取出消息(會等待直到消息到期)
                    DelayQueue<DelayedMessage> delayQueue = delayQueueList.get(0);
                    if(!delayQueue.isEmpty()) {
                        DelayedMessage message = delayQueue.take();
                        consumer.consume(message.message);
                        delayQueue.remove(message);
                        System.out.println("Consumed: " + message.getMessage()" at " + System.currentTimeMillis());
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Consumer thread interrupted");
            }
        });
     //  其他等級的延時隊列

        consumerThread1.start();
    }
    public static void reConsume(MqCausa.Message msg) {
        if (msg.getRetryTime() == 0) {
            log.error("msg : {} is dead", msg);
            deadMessageQueue.add(msg);
            return;
        }
        MqCausa.Message build = msg.toBuilder().setRetryTime(msg.getRetryTime() - 1).build();
        DelayedMessage delayedMessage = new DelayedMessage(build, delayTimes.get(build.getRetryTime()));
        delayQueueList.get(msg.getRetryTime() - 1).add(delayedMessage);
    }


// 定義一個延時消息類,實現 Delayed 接口
staticclass DelayedMessage implements Delayed {
    privatefinal MqCausa.Message message;
    privatefinallong triggerTime; // 到期時間

    public DelayedMessage(MqCausa.Message message, long delayTime) {
        this.message = message;
        // 當前時間加上延時時間,設置消息的觸發時間
        this.triggerTime = System.currentTimeMillis() + delayTime;
    }
    // 獲取剩餘的延時時間
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    // 比較方法,用於確定消息的順序
    @Override
    public int compareTo(Delayed other) {
        if (this.triggerTime < ((DelayedMessage) other).triggerTime) {
            return -1;
        } elseif (this.triggerTime > ((DelayedMessage) other).triggerTime) {
            return1;
        }
        return0;
    }
    public MqCausa.Message getMessage() {
        return message;
    }
}

最終實現如圖所示:

實現功能:

其他

附上個人總結的對於 worker 和 server 之間服務發現以及調度的流程圖

服務發現

調度

作者:用戶 69860839557

來源:juejin.cn/post/7436925608943419411

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/1JslB-us0CxHv0iab_RRZA