自己寫一個分佈式定時任務框架 - 負載均衡 - OpenAPI 異步調用!
項目背景
目前的定時任務框架已經很成熟,從 QuartZ 到 xxl-job,再到近幾年出現的 PowerJob,既然有這麼多的好的實現,爲什麼還是選擇重寫一個定時任務框架呢?
開發中遇到這樣的場景,業務層面需要頻繁的創建修改定時任務,在考慮分佈式的架構下,對於目前可以實現該功能的框架中:
-
MQ 的延時隊列無法動態調整任務參數;
-
redis 的過期策略需要保存太久的 key 且可能會有 BigKey
-
xxljob 沒有原生的 openAPI,其基於數據庫鎖的調度只是實現 server 的高可用而不是高性能;
-
powerjob 的 openAPI 又是基於 http 的同步阻塞調度,並且對於 server 的負載均衡,由於其分組隔離設計,需要開發者手動配置,在高併發下的定時任務操作下,並不能很好的調度 server 集羣。
主流框架往往爲了適配更多的場景,支持足夠多的功能,往往體積大,且不易動態擴展,爲了對項目有最大的控制,在解決以上業務場景的前提下,進行部分功能的修剪,也希望能更好的從中學習主流框架的設計思想,於是決定重寫一個定時任務框架。
本文章主要介紹該項目相對於目前主流定時任務框架的特性,對於定時任務調度和發現的詳細可以見源碼,文章末尾也給出了流程圖方便理解(關於這部分作者對於 PowerJob 的原先設計也做了部分剪枝,相對於原來的框架更易理解和學習,後面可能會推出相關講解)
項目地址:
https://github.com/karatttt/k-job
定位
這是一個基於 PowerJob 的重寫和重構版本,修改和擴展了原始項目的功能,以更好地適配業務需求。
-
支持定時任務頻繁創建和任務參數頻繁動態變動的場景(提供輕量 API,並使用內置消息隊列異步處理)
-
支持大量定時任務併發執行的場景,實現負載均衡(分組隔離 + 應用級別的鎖實現)
-
主要針對小型任務 ,無需過多配置,不對任務實例進行操作
技術選型
通信 : gRPC(基於netty的nio)
序列化 : Protobuf編碼格式編解碼
負載均衡 :自己實現的註冊中心NameServer
|___ 策略 : 服務端最小調度次數策略
|___ 交互 :pull+push
消息隊列 : 自己實現的簡易消息隊列
|___ 消息發送 : 異步+超時重試
|___ 持久化 :mmap+同步刷盤策略
|___ 消息重試 :多級延時隊列+死信隊列
定時調度 : 時間輪算法
項目結構
├── LICENSE
├── k-job-common // 各組件的公共依賴,開發者無需感知
├── k-job-nameServer // server和worker的註冊中心,提供負載均衡
├── k-job-producer //普通Jar包,提供 OpenAPI,內置消息隊列的異步發送
├── k-job-server // 基於SpringBoot實現的調度服務器
├── k-job-worker-boot-starter // kjob-worker 的 spring-boot-starter ,spring boot 應用可以通用引入該依賴一鍵接入 kjob-server
├── k-job-worker // 普通Jar包,接入kjob-server的應用需要依賴該Jar包
└── pom.xml
特性
負載均衡(解決大量定時任務併發執行場景)
對於 worker 的負載均衡策略有許多且已經由較好的解決(輪詢,健康值等),但是,我們目前的系統存在大量的定時任務,考慮 server 層面,可能會存在以下情況:
-
server 一次調度從 DB 中獲取太多任務,可能會 OOM
-
發起調度請求是由線程池負責,可能會有性能瓶頸,我們的系統對時間是敏感的,對時間精度高要求
-
我們的 OpenAPI 同樣也不希望大量請求落在同一個 server 上
在分佈式系統下,解決定時任務併發執行往往考慮 server 集羣的負載均衡(這裏的負載均衡特指 server 集羣能夠根據自身負載,動態調度 worker 集羣),但是對於定時任務框架,需要關注集羣下的任務重複調度問題,目前的定時任務框架大都爲了解決該問題而不能較好實現負載均衡。
通過查看源碼,xxljob 的調度,在每次查詢數據庫獲取任務前,通過數據庫行鎖進行了全局加鎖,保證同一時刻只有一個 server 在進行調度來避免重複調度,但是無法發揮集羣 server 的調度能力
對於 powerjob 的調度,通過分組隔離機制(詳細可以看官方文檔)避免了重複調度,但是同樣帶來了問題:同一 app 下的 worker 集羣只能被一臺 server 調度,如果該 server 的任務太多了呢?如果只有一個業務對應的 app,如何用 server 集羣來負載均衡呢?
基於以上問題,增加了一個註冊中心nameServer模塊來負責負載均衡:
最小調度次數策略:NameServer記錄 server 集羣狀態並維護各個 server 的分配任務次數,由於 server 是否調度某個 worker 由表中數據決定,worker 會在每次 pull 判斷是否發起請求更新 server 中的調度關係表,並將目前分組交由最小調度次數的 server 來調度,當且僅當以下發生:
-
同一 app 分組下的
workerNum > threshold -
該分組對應的 server 的
scheduleTimes > minServerScheduleTime x 2
考慮到 server 的地理位置,通信效率等因素,後續可以考慮增加每個 server 的權重來更優分配
關鍵代碼如下:
public ReBalanceInfo getServerAddressReBalanceList(String serverAddress, String appName) {
// first req, serverAddress is empty
if(serverAddress.isEmpty()){
ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
reBalanceInfo.setSplit(false);
reBalanceInfo.setServerIpList(new ArrayList<String>(serverAddressSet));
reBalanceInfo.setSubAppName("");
return reBalanceInfo;
}
ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
// get sorted scheduleTimes serverList
List<String> newServerIpList = serverAddress2ScheduleTimesMap.keySet().stream().sorted(new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return (int) (serverAddress2ScheduleTimesMap.get(o1) - serverAddress2ScheduleTimesMap.get(o2));
}
}).collect(Collectors.toList());
// see if split
if(!appName2WorkerNumMap.isEmpty() && appName2WorkerNumMap.get(appName) > maxWorkerNum && appName2WorkerNumMap.get(appName) % maxWorkerNum == 1){
// return new serverIpList
reBalanceInfo.setSplit(true);
reBalanceInfo.setChangeServer(false);
reBalanceInfo.setServerIpList(newServerIpList);
reBalanceInfo.setSubAppName(appName + ":" + appName2WorkerNumMap.size());
return reBalanceInfo;
}
// see if need change server
Long lestScheduleTimes = serverAddress2ScheduleTimesMap.get(newServerIpList.get(newServerIpList.size() - 1));
Long comparedScheduleTimes = lestScheduleTimes == 0 ? 1 : lestScheduleTimes;
if(serverAddress2ScheduleTimesMap.get(serverAddress) / comparedScheduleTimes > 2){
reBalanceInfo.setSplit(false);
reBalanceInfo.setChangeServer(true);
// first server is target lest scheduleTimes server
reBalanceInfo.setServerIpList(newServerIpList);
reBalanceInfo.setSubAppName("");
return reBalanceInfo;
}
// return default list
reBalanceInfo.setSplit(false);
reBalanceInfo.setServerIpList(new ArrayList<String>(serverAddressSet));
reBalanceInfo.setSubAppName("");
return reBalanceInfo;
}
實現功能:
-
app 組自動拆分: 可以爲 app 設置組內 worker 數量閾值,超過閾值自動拆分 subApp 並分配負載均衡後的 server
-
worker 動態分配: 對於每一個 subApp,當觸發 pull 時,根據最小調度次數策略,可以分配至負載均衡後的 server,開發者無需感知 subApp
以上,解決 PowerJob 中同一 worker 分組只能被一個 server 調度問題,且 subApp 分組可以根據 server 的負載,實現動態依附至不同 server,對於可能的重複調度問題,我們只需加上 App 級別的鎖,相對於 xxl-job 的全局加鎖性能更好。
消息隊列(解決任務大量創建和頻繁更改場景)
其實一開始用 powerjob 作爲項目中的中間件,業務中的任務操作使用其 openAPI。過程中感受最大的就是,我的業務只是根據任務 id 修改了任務參數,並不需要 server 的響應,爲什麼要同步阻塞?可靠性應由 server 保證而不是客戶端的大量重試及等待。對於業務中頻繁創建定時任務和改動,更應是異步操作。
一開始的想法是,使用 grpc 的futureStub進行異步發送,請求由 Reactor 線程監聽事件,當事件可讀時分配給業務線程池進行處理(gRPC 內部已經實現)。所以需要做的似乎只是做一個 Producer 服務,並把 stub 全換成 Future 類型,對於 jobId,我們用雪花算法拿到一個全局 id 就可以,無需 server 分配。
但是以上設計有一個致命的問題 ------ **阻塞在 BlockingQueue 的請求無法 ack,且 server 宕機存在消息丟失的可能!**這違背了消息隊列的設計(入隊 --ack-- 持久化 -- 消費),意味着只有被分配到線程(消費者)消費時,才能被 ack,而活躍的線程數並不多。故 不能僅僅依賴 gRPC 的內部實現,需要自己實現消息隊列
可靠消息
以 rocketMQ 爲例,producer 的消息會先到達 broker 中的隊列後返回 ack,consumer 再輪詢從 broker 中 pull 重平衡處理後的消息消費。
考慮到本項目的設計 無需路由,所有的 server 都可以接受消息,於是不再設計 broker,將 server 和 broker 結合,每個 server 維護自己的隊列,且消費自己隊列的消息,這樣還能減少一次通信。
這樣可靠消息的解決就變成了:
-
producer 到 server 的消息丟失 ------ 失敗或者超時則依次遍歷所有的 server,一定能保證消息抵達,不再闡述
-
server 的隊列消息丟失(機器宕機)------ 持久化,採用同步刷盤策略,百分之百的可靠
持久化: 同步刷盤機制借鑑了 rocketMQ 的 mmap 和commitLog/consumerQueue設計,將磁盤的文件映射到內存進行讀寫,每次消息進來先存到 buffer 後觸發刷盤,成功後執行寫響應的回調;用consumerQueue文件作爲隊列,server 定時 pull 消費消息,詳細見k-job-server.consumer.DefaultMessageStore,有詳細註釋
// 和rocketMQ一樣,讀寫都是用mmap,因爲內存buffer就是文件的映射,只是有刷盤機制
private MappedByteBuffer commitLogBuffer; // 映射到內存的commitlog文件
private MappedByteBuffer consumerQueueBuffer; // 映射到內存的consumerQueue文件
private final AtomicLong commitLogBufferPosition = new AtomicLong(0);// consumerLog的buffer的位置,同步刷盤的情況下與consumerLog文件的位置保持一致
private final AtomicLong commitLogCurPosition = new AtomicLong(0);// consumerLog文件的目前位置,每次刷盤後就等於buffer位置
private final AtomicLong lastProcessedOffset = new AtomicLong(0);// consumerQueue的buffer拉取commitLog的位置,與commitLog相比,重啓時就是consumerQueue文件最後一條消息的索引位置
private final AtomicLong currentConsumerQueuePosition = new AtomicLong(0); // consumerQueue文件的目前位置
private final AtomicLong consumerPosition = new AtomicLong(0); // 記錄消費者在consumerQueue中的消費位置,這個只在目前的系統中有,類似於rocketMQ通過pull遠程拉取
消息重試
對於 producer,前面提到,爲了應對大量定時任務的場景,對於任務的操作,應全部是異步的,我們引入超時機制即可,當超過一定的時間未收到 ack,或者返回錯誤響應,選擇下一個 server 發起重試
對於 consumer(server),使用多級延時隊列,當某個消息消費失敗後,投遞至下一級延遲更久的延時隊列,若全都消費失敗則進入死信隊列,需要人工干預
private staticfinal Deque<MqCausa.Message> deadMessageQueue = new ArrayDeque<>();
privatestaticfinal List<DelayQueue<DelayedMessage>> delayQueueList = new ArrayList<>(2);
/**
* 逆序排序,因爲重試次數到0則不再重試
*/
privatestatic List<Long> delayTimes = Lists.newArrayList(10000L, 5000L);
public static void init(Consumer consumer) {
delayQueueList.add(new DelayQueue<>());
delayQueueList.add(new DelayQueue<>());
Thread consumerThread1 = new Thread(() -> {
try {
while (true) {
// 從延時隊列中取出消息(會等待直到消息到期)
DelayQueue<DelayedMessage> delayQueue = delayQueueList.get(0);
if(!delayQueue.isEmpty()) {
DelayedMessage message = delayQueue.take();
consumer.consume(message.message);
delayQueue.remove(message);
System.out.println("Consumed: " + message.getMessage() + " at " + System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Consumer thread interrupted");
}
});
// 其他等級的延時隊列
consumerThread1.start();
}
public static void reConsume(MqCausa.Message msg) {
if (msg.getRetryTime() == 0) {
log.error("msg : {} is dead", msg);
deadMessageQueue.add(msg);
return;
}
MqCausa.Message build = msg.toBuilder().setRetryTime(msg.getRetryTime() - 1).build();
DelayedMessage delayedMessage = new DelayedMessage(build, delayTimes.get(build.getRetryTime()));
delayQueueList.get(msg.getRetryTime() - 1).add(delayedMessage);
}
// 定義一個延時消息類,實現 Delayed 接口
staticclass DelayedMessage implements Delayed {
privatefinal MqCausa.Message message;
privatefinallong triggerTime; // 到期時間
public DelayedMessage(MqCausa.Message message, long delayTime) {
this.message = message;
// 當前時間加上延時時間,設置消息的觸發時間
this.triggerTime = System.currentTimeMillis() + delayTime;
}
// 獲取剩餘的延時時間
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 比較方法,用於確定消息的順序
@Override
public int compareTo(Delayed other) {
if (this.triggerTime < ((DelayedMessage) other).triggerTime) {
return -1;
} elseif (this.triggerTime > ((DelayedMessage) other).triggerTime) {
return1;
}
return0;
}
public MqCausa.Message getMessage() {
return message;
}
}
最終實現如圖所示:
實現功能:
-
對於任務操作請求的異步發送
-
輪詢策略實現消費的負載均衡
其他
附上個人總結的對於 worker 和 server 之間服務發現以及調度的流程圖
服務發現
調度
作者:用戶 69860839557
來源:juejin.cn/post/7436925608943419411
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/1JslB-us0CxHv0iab_RRZA