Spring Boot 定時任務動態管理通用解決方案
來源:blog.csdn.net/qq_34886352/article/details/106494637
一、功能說明
SpringBoot 的定時任務的加強工具,實現對 SpringBoot 原生的定時任務進行動態管理, 完全兼容原生 @Scheduled 註解, 無需對原本的定時任務進行修改
推薦下自己做的 Spring Boot 的實戰項目:
https://github.com/YunaiV/ruoyi-vue-pro
二、快速使用
具體的功能已經封裝成 SpringBoot-starter 即插即用
<dependency>
<groupId>com.github.guoyixing</groupId>
<artifactId>spring-boot-starter-super-scheduled</artifactId>
<version>0.3.1</version>
</dependency>
Spring 官方教程中文版重新上線了!
之前老版本的社區丟了一些教程,比如:官方 Spring Guides 的中文版,Spring Security 專題教程等。
最近也在抽空去填補這些內容,目前 Spring Guides 的中文版已經找回,可以直接在菜單中看到它們,您也可以通過下面的鏈接訪問到:
http://spring4all.com/spring-guides
三、實現原理
1、動態管理實現
(1) 配置管理介紹
@Component("superScheduledConfig")
public class SuperScheduledConfig {
/**
* 執行定時任務的線程池
*/
private ThreadPoolTaskScheduler taskScheduler;
/**
* 定時任務名稱與定時任務回調鉤子 的關聯關係容器
*/
private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();
/**
* 定時任務名稱與定時任務需要執行的邏輯 的關聯關係容器
*/
private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();
/**
* 定時任務名稱與定時任務的源信息 的關聯關係容器
*/
private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();
/* 普通的get/sets省略 */
}
(2) 使用後處理器攔截 SpringBoot 原本的定時任務
-
實現 ApplicationContextAware 接口拿到 SpringBoot 的上下文
-
實現 BeanPostProcessor 接口,將這個類標記爲後處理器,後處理器會在每個 bean 實例化之後執行
-
使用 @DependsOn 註解強制依賴 SuperScheduledConfig 類,讓 SpringBoot 實例化 SuperScheduledPostProcessor 類之前先實例化 SuperScheduledConfig 類
-
主要實現邏輯在 postProcessAfterInitialization() 方法中
@DependsOn({"superScheduledConfig"})
@Component
@Order
public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware {
protected final Log logger = LogFactory.getLog(getClass());
private ApplicationContext applicationContext;
/**
* 實例化bean之前的操作
* @param bean bean實例
* @param beanName bean的Name
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
/**
* 實例化bean之後的操作
* @param bean bean實例
* @param beanName bean的Name
*/
@Override
public Object postProcessAfterInitialization(Object bean,
String beanName) throws BeansException {
//1.獲取配置管理器
SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);
//2.獲取當前實例化完成的bean的所有方法
Method[] methods = bean.getClass().getDeclaredMethods();
//循環處理對每個方法逐一處理
if (methods.length > 0) {
for (Method method : methods) {
//3.嘗試在該方法上獲取@Scheduled註解(SpringBoot的定時任務註解)
Scheduled annotation = method.getAnnotation(Scheduled.class);
//如果無法獲取到@Scheduled註解,就跳過這個方法
if (annotation == null) {
continue;
}
//4.創建定時任務的源屬性
//創建定時任務的源屬性(用來記錄定時任務的配置,初始化的時候記錄的是註解上原本的屬性)
ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
//對註解上獲取到源屬性中的屬性進行檢測
if (!scheduledSource.check()) {
throw new SuperScheduledException("在" + beanName + "Bean中" + method.getName() + "方法的註解參數錯誤");
}
//生成定時任務的名稱(id),使用beanName+“.”+方法名
String name = beanName + "." + method.getName();
//將以key-value的形式,將源數據存入配置管理器中,key:定時任務的名稱 value:源數據
superScheduledConfig.addScheduledSource(name, scheduledSource);
try {
//5.將原本SpringBoot的定時任務取消掉
clearOriginalScheduled(annotation);
} catch (Exception e) {
throw new SuperScheduledException("在關閉原始方法" + beanName + method.getName() + "時出現錯誤");
}
}
}
//最後bean保持原有返回
return bean;
}
/**
* 修改註解原先的屬性
* @param annotation 註解實例對象
* @throws Exception
*/
private void clearOriginalScheduled(Scheduled annotation) throws Exception {
changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);
changeAnnotationValue(annotation, "fixedDelay", -1L);
changeAnnotationValue(annotation, "fixedDelayString", "");
changeAnnotationValue(annotation, "fixedRate", -1L);
changeAnnotationValue(annotation, "fixedRateString", "");
changeAnnotationValue(annotation, "initialDelay", -1L);
changeAnnotationValue(annotation, "initialDelayString", "");
}
/**
* 獲取SpringBoot的上下文
* @param applicationContext SpringBoot的上下文
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
(3) 使用 ApplicationRunner 初始化自定義的定時任務運行器
-
實現 ApplicationContextAware 接口拿到 SpringBoot 的上下文
-
使用 @DependsOn 註解強制依賴 threadPoolTaskScheduler 類
-
實現 ApplicationRunner 接口,在所有 bean 初始化結束之後,運行自定義邏輯
-
主要實現邏輯在 run() 方法中
@DependsOn("threadPoolTaskScheduler")
@Component
public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware {
protected final Log logger = LogFactory.getLog(getClass());
private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private ApplicationContext applicationContext;
/**
* 定時任務配置管理器
*/
@Autowired
private SuperScheduledConfig superScheduledConfig;
/**
* 定時任務執行線程
*/
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Override
public void run(ApplicationArguments args) {
//1.定時任務配置管理器中緩存 定時任務執行線程
superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
//2.獲取所有定時任務源數據
Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
//逐一處理定時任務
for (String name : nameToScheduledSource.keySet()) {
//3.獲取定時任務源數據
ScheduledSource scheduledSource = nameToScheduledSource.get(name);
//4.獲取所有增強類
String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
//5.創建執行控制器
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
//配置執行控制器
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
//6.逐一處理增強類(增強器實現原理後面具體分析)
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
//7.將增強器代理成point
Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
//創建代理
Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
proxy.setSuperScheduledName(name);
//8.所有的points連成起來
points.add(proxy);
}
//將point形成調用鏈
runnable.setChain(new Chain(points));
//將執行邏輯封裝並緩存到定時任務配置管理器中
superScheduledConfig.addRunnable(name, runnable::invoke);
try {
//8.啓動定時任務
ScheduledFuture<?> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler
, scheduledSource, runnable::invoke);
//將線程回調鉤子存到任務配置管理器中
superScheduledConfig.addScheduledFuture(name, schedule);
logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經啓動...");
} catch (Exception e) {
throw new SuperScheduledException("任務" + name + "啓動失敗,錯誤信息:" + e.getLocalizedMessage());
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
(4) 進行動態管理
@Component
public class SuperScheduledManager {
protected final Log logger = LogFactory.getLog(getClass());
private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Autowired
private SuperScheduledConfig superScheduledConfig;
/**
* 修改Scheduled的執行週期
*
* @param name scheduled的名稱
* @param cron cron表達式
*/
public void setScheduledCron(String name, String cron) {
//終止原先的任務
cancelScheduled(name);
//創建新的任務
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setCron(cron);
addScheduled(name, scheduledSource);
}
/**
* 修改Scheduled的fixedDelay
*
* @param name scheduled的名稱
* @param fixedDelay 上一次執行完畢時間點之後多長時間再執行
*/
public void setScheduledFixedDelay(String name, Long fixedDelay) {
//終止原先的任務
cancelScheduled(name);
//創建新的任務
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setFixedDelay(fixedDelay);
addScheduled(name, scheduledSource);
}
/**
* 修改Scheduled的fixedRate
*
* @param name scheduled的名稱
* @param fixedRate 上一次開始執行之後多長時間再執行
*/
public void setScheduledFixedRate(String name, Long fixedRate) {
//終止原先的任務
cancelScheduled(name);
//創建新的任務
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setFixedRate(fixedRate);
addScheduled(name, scheduledSource);
}
/**
* 查詢所有啓動的Scheduled
*/
public List<String> getRunScheduledName() {
Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet();
return new ArrayList<>(names);
}
/**
* 查詢所有的Scheduled
*/
public List<String> getAllSuperScheduledName() {
Set<String> names = superScheduledConfig.getNameToRunnable().keySet();
return new ArrayList<>(names);
}
/**
* 終止Scheduled
*
* @param name scheduled的名稱
*/
public void cancelScheduled(String name) {
ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
scheduledFuture.cancel(true);
superScheduledConfig.removeScheduledFuture(name);
logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經終止...");
}
/**
* 啓動Scheduled
*
* @param name scheduled的名稱
* @param scheduledSource 定時任務的源信息
*/
public void addScheduled(String name, ScheduledSource scheduledSource) {
if (getRunScheduledName().contains(name)) {
throw new SuperScheduledException("定時任務" + name + "已經被啓動過了");
}
if (!scheduledSource.check()) {
throw new SuperScheduledException("定時任務" + name + "源數據內容錯誤");
}
scheduledSource.refreshType();
Runnable runnable = superScheduledConfig.getRunnable(name);
ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler();
ScheduledFuture<?> schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable);
logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經啓動...");
superScheduledConfig.addScheduledSource(name, scheduledSource);
superScheduledConfig.addScheduledFuture(name, schedule);
}
/**
* 以cron類型啓動Scheduled
*
* @param name scheduled的名稱
* @param cron cron表達式
*/
public void addCronScheduled(String name, String cron) {
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setCron(cron);
addScheduled(name, scheduledSource);
}
/**
* 以fixedDelay類型啓動Scheduled
*
* @param name scheduled的名稱
* @param fixedDelay 上一次執行完畢時間點之後多長時間再執行
* @param initialDelay 第一次執行的延遲時間
*/
public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setFixedDelay(fixedDelay);
if (initialDelay != null && initialDelay.length == 1) {
scheduledSource.setInitialDelay(initialDelay[0]);
} else if (initialDelay != null && initialDelay.length > 1) {
throw new SuperScheduledException("第一次執行的延遲時間只能傳入一個參數");
}
addScheduled(name, scheduledSource);
}
/**
* 以fixedRate類型啓動Scheduled
*
* @param name scheduled的名稱
* @param fixedRate 上一次開始執行之後多長時間再執行
* @param initialDelay 第一次執行的延遲時間
*/
public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setFixedRate(fixedRate);
if (initialDelay != null && initialDelay.length == 1) {
scheduledSource.setInitialDelay(initialDelay[0]);
} else if (initialDelay != null && initialDelay.length > 1) {
throw new SuperScheduledException("第一次執行的延遲時間只能傳入一個參數");
}
addScheduled(name, scheduledSource);
}
/**
* 手動執行一次任務
*
* @param name scheduled的名稱
*/
public void runScheduled(String name) {
Runnable runnable = superScheduledConfig.getRunnable(name);
runnable.run();
}
}
2、增強接口實現
增強器實現的整體思路與 SpringAop 的思路一致,實現沒有 Aop 複雜
(1) 增強接口
@Order(Ordered.HIGHEST_PRECEDENCE)
public interface BaseStrengthen {
/**
* 前置強化方法
*
* @param bean bean實例(或者是被代理的bean)
* @param method 執行的方法對象
* @param args 方法參數
*/
void before(Object bean, Method method, Object[] args);
/**
* 後置強化方法
* 出現異常不會執行
* 如果未出現異常,在afterFinally方法之後執行
*
* @param bean bean實例(或者是被代理的bean)
* @param method 執行的方法對象
* @param args 方法參數
*/
void after(Object bean, Method method, Object[] args);
/**
* 異常強化方法
*
* @param bean bean實例(或者是被代理的bean)
* @param method 執行的方法對象
* @param args 方法參數
*/
void exception(Object bean, Method method, Object[] args);
/**
* Finally強化方法,出現異常也會執行
*
* @param bean bean實例(或者是被代理的bean)
* @param method 執行的方法對象
* @param args 方法參數
*/
void afterFinally(Object bean, Method method, Object[] args);
}
(2) 代理抽象類
public abstract class Point {
/**
* 定時任務名
*/
private String superScheduledName;
/**
* 抽象的執行方法,使用代理實現
* @param runnable 定時任務執行器
*/
public abstract Object invoke(SuperScheduledRunnable runnable);
/* 普通的get/sets省略 */
}
(3) 調用鏈類
public class Chain {
private List<Point> list;
private int index = -1;
/**
* 索引自增1
*/
public int incIndex() {
return ++index;
}
/**
* 索引還原
*/
public void resetIndex() {
this.index = -1;
}
}
(4) cglib 動態代理實現
使用 cglib 代理增強器,將增強器全部代理成調用鏈節點 Point
public class RunnableBaseInterceptor implements MethodInterceptor {
/**
* 定時任務執行器
*/
private SuperScheduledRunnable runnable;
/**
* 定時任務增強類
*/
private BaseStrengthen strengthen;
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object result;
//如果執行的是invoke()方法
if ("invoke".equals(method.getName())) {
//前置強化方法
strengthen.before(obj, method, args);
try {
//調用執行器中的invoke()方法
result = runnable.invoke();
} catch (Exception e) {
//異常強化方法
strengthen.exception(obj, method, args);
throw new SuperScheduledException(strengthen.getClass() + "中強化執行時發生錯誤", e);
} finally {
//Finally強化方法,出現異常也會執行
strengthen.afterFinally(obj, method, args);
}
//後置強化方法
strengthen.after(obj, method, args);
} else {
//直接執行方法
result = methodProxy.invokeSuper(obj, args);
}
return result;
}
public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {
this.runnable = runnable;
if (BaseStrengthen.class.isAssignableFrom(object.getClass())) {
this.strengthen = (BaseStrengthen) object;
} else {
throw new SuperScheduledException(object.getClass() + "對象不是BaseStrengthen類型");
}
}
public RunnableBaseInterceptor() {
}
}
(5) 定時任務執行器實現
public class SuperScheduledRunnable {
/**
* 原始的方法
*/
private Method method;
/**
* 方法所在的bean
*/
private Object bean;
/**
* 增強器的調用鏈
*/
private Chain chain;
public Object invoke() {
Object result;
//索引自增1
if (chain.incIndex() == chain.getList().size()) {
//調用鏈中的增強方法已經全部執行結束
try {
//調用鏈索引初始化
chain.resetIndex();
//增強器全部執行完畢,執行原本的方法
result = method.invoke(bean);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new SuperScheduledException(e.getLocalizedMessage());
}
} else {
//獲取被代理後的方法增強器
Point point = chain.getList().get(chain.getIndex());
//執行增強器代理
//增強器代理中,會回調方法執行器,形成調用鏈,逐一運行調用鏈中的增強器
result = point.invoke(this);
}
return result;
}
/* 普通的get/sets省略 */
}
(6) 增強器代理邏輯
com.gyx.superscheduled.core.SuperScheduledApplicationRunner
類中的代碼片段
//創建執行控制器
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
//用來存放 增強器的代理對象
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
//循環所有的增強器的beanName
for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
//獲取增強器的bean對象
Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
//將增強器代理成Point節點
Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
proxy.setSuperScheduledName(name);
//增強器的代理對象緩存到list中
points.add(proxy);
}
//將增強器代理實例的集合生成調用鏈
//執行控制器中設置調用鏈
runnable.setChain(new Chain(points));
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ZhtIhvDCankIMm1u-EPCmw