Spring Boot 定時任務動態管理通用解決方案

來源:blog.csdn.net/qq_34886352/article/details/106494637


一、功能說明

SpringBoot 的定時任務的加強工具,實現對 SpringBoot 原生的定時任務進行動態管理, 完全兼容原生 @Scheduled 註解, 無需對原本的定時任務進行修改

推薦下自己做的 Spring Boot 的實戰項目:

https://github.com/YunaiV/ruoyi-vue-pro

二、快速使用

具體的功能已經封裝成 SpringBoot-starter 即插即用

<dependency>
    <groupId>com.github.guoyixing</groupId>
    <artifactId>spring-boot-starter-super-scheduled</artifactId>
    <version>0.3.1</version>
</dependency>

Spring 官方教程中文版重新上線了!

之前老版本的社區丟了一些教程,比如:官方 Spring Guides 的中文版,Spring Security 專題教程等。

最近也在抽空去填補這些內容,目前 Spring Guides 的中文版已經找回,可以直接在菜單中看到它們,您也可以通過下面的鏈接訪問到:

http://spring4all.com/spring-guides

三、實現原理

1、動態管理實現

(1) 配置管理介紹

@Component("superScheduledConfig")
public class SuperScheduledConfig {
    /**
     * 執行定時任務的線程池
     */
    private ThreadPoolTaskScheduler taskScheduler;

    /**
     * 定時任務名稱與定時任務回調鉤子  的關聯關係容器
     */
    private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();

    /**
     * 定時任務名稱與定時任務需要執行的邏輯  的關聯關係容器
     */
    private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();

    /**
     * 定時任務名稱與定時任務的源信息  的關聯關係容器
     */
    private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();
 /* 普通的get/sets省略 */
}

(2) 使用後處理器攔截 SpringBoot 原本的定時任務

@DependsOn({"superScheduledConfig"})
@Component
@Order
public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware {
    protected final Log logger = LogFactory.getLog(getClass());

    private ApplicationContext applicationContext;

    /**
     * 實例化bean之前的操作
     * @param bean bean實例
     * @param beanName bean的Name
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * 實例化bean之後的操作
     * @param bean bean實例
     * @param beanName bean的Name
     */
    @Override
    public Object postProcessAfterInitialization(Object bean,
                                                 String beanName) throws BeansException {
        //1.獲取配置管理器
        SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);

        //2.獲取當前實例化完成的bean的所有方法
        Method[] methods = bean.getClass().getDeclaredMethods();
        //循環處理對每個方法逐一處理
        if (methods.length > 0) {
            for (Method method : methods) {
             //3.嘗試在該方法上獲取@Scheduled註解(SpringBoot的定時任務註解)
                Scheduled annotation = method.getAnnotation(Scheduled.class);
                //如果無法獲取到@Scheduled註解,就跳過這個方法
                if (annotation == null) {
                    continue;
                }
                //4.創建定時任務的源屬性
                //創建定時任務的源屬性(用來記錄定時任務的配置,初始化的時候記錄的是註解上原本的屬性)
                ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
                //對註解上獲取到源屬性中的屬性進行檢測
                if (!scheduledSource.check()) {
                    throw new SuperScheduledException("在" + beanName + "Bean中" + method.getName() + "方法的註解參數錯誤");
                }
                //生成定時任務的名稱(id),使用beanName+“.”+方法名
                String name = beanName + "." + method.getName();
                //將以key-value的形式,將源數據存入配置管理器中,key:定時任務的名稱 value:源數據
                superScheduledConfig.addScheduledSource(name, scheduledSource);
                try {
                 //5.將原本SpringBoot的定時任務取消掉
                    clearOriginalScheduled(annotation);
                } catch (Exception e) {
                    throw new SuperScheduledException("在關閉原始方法" + beanName + method.getName() + "時出現錯誤");
                }
            }
        }
        //最後bean保持原有返回
        return bean;
    }

    /**
     * 修改註解原先的屬性
     * @param annotation 註解實例對象
     * @throws Exception
     */
    private void clearOriginalScheduled(Scheduled annotation) throws Exception {
        changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);
        changeAnnotationValue(annotation, "fixedDelay", -1L);
        changeAnnotationValue(annotation, "fixedDelayString""");
        changeAnnotationValue(annotation, "fixedRate", -1L);
        changeAnnotationValue(annotation, "fixedRateString""");
        changeAnnotationValue(annotation, "initialDelay", -1L);
        changeAnnotationValue(annotation, "initialDelayString""");
    }


    /**
     * 獲取SpringBoot的上下文
     * @param applicationContext SpringBoot的上下文
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

(3) 使用 ApplicationRunner 初始化自定義的定時任務運行器

@DependsOn("threadPoolTaskScheduler")
@Component
public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware {
    protected final Log logger = LogFactory.getLog(getClass());
    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private ApplicationContext applicationContext;
 
 /**
     * 定時任務配置管理器
     */
    @Autowired
    private SuperScheduledConfig superScheduledConfig;
    /**
     * 定時任務執行線程
     */
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    @Override
    public void run(ApplicationArguments args) {
     //1.定時任務配置管理器中緩存  定時任務執行線程
        superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
        //2.獲取所有定時任務源數據
        Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
        //逐一處理定時任務
        for (String name : nameToScheduledSource.keySet()) {
            //3.獲取定時任務源數據
            ScheduledSource scheduledSource = nameToScheduledSource.get(name);
            //4.獲取所有增強類
            String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
            //5.創建執行控制器
            SuperScheduledRunnable runnable = new SuperScheduledRunnable();
            //配置執行控制器
            runnable.setMethod(scheduledSource.getMethod());
            runnable.setBean(scheduledSource.getBean());
            //6.逐一處理增強類(增強器實現原理後面具體分析)
            List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
            for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
             //7.將增強器代理成point
                Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
                //創建代理
                Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
                proxy.setSuperScheduledName(name);
                //8.所有的points連成起來
                points.add(proxy);
            }
   //將point形成調用鏈
            runnable.setChain(new Chain(points));
            //將執行邏輯封裝並緩存到定時任務配置管理器中
            superScheduledConfig.addRunnable(name, runnable::invoke);
            try {
             //8.啓動定時任務
                ScheduledFuture<?> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler
                        , scheduledSource, runnable::invoke);
                //將線程回調鉤子存到任務配置管理器中
                superScheduledConfig.addScheduledFuture(name, schedule);
                logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經啓動...");

            } catch (Exception e) {
                throw new SuperScheduledException("任務" + name + "啓動失敗,錯誤信息:" + e.getLocalizedMessage());
            }
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

(4) 進行動態管理

@Component
public class SuperScheduledManager {
    protected final Log logger = LogFactory.getLog(getClass());
    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private SuperScheduledConfig superScheduledConfig;

    /**
     * 修改Scheduled的執行週期
     *
     * @param name scheduled的名稱
     * @param cron cron表達式
     */
    public void setScheduledCron(String name, String cron) {
        //終止原先的任務
        cancelScheduled(name);
        //創建新的任務
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setCron(cron);
        addScheduled(name, scheduledSource);
    }

    /**
     * 修改Scheduled的fixedDelay
     *
     * @param name       scheduled的名稱
     * @param fixedDelay 上一次執行完畢時間點之後多長時間再執行
     */
    public void setScheduledFixedDelay(String name, Long fixedDelay) {
        //終止原先的任務
        cancelScheduled(name);
        //創建新的任務
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setFixedDelay(fixedDelay);
        addScheduled(name, scheduledSource);
    }

    /**
     * 修改Scheduled的fixedRate
     *
     * @param name      scheduled的名稱
     * @param fixedRate 上一次開始執行之後多長時間再執行
     */
    public void setScheduledFixedRate(String name, Long fixedRate) {
        //終止原先的任務
        cancelScheduled(name);
        //創建新的任務
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setFixedRate(fixedRate);
        addScheduled(name, scheduledSource);
    }

    /**
     * 查詢所有啓動的Scheduled
     */
    public List<String> getRunScheduledName() {
        Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet();
        return new ArrayList<>(names);
    }

    /**
     * 查詢所有的Scheduled
     */
    public List<String> getAllSuperScheduledName() {
        Set<String> names = superScheduledConfig.getNameToRunnable().keySet();
        return new ArrayList<>(names);
    }

    /**
     * 終止Scheduled
     *
     * @param name scheduled的名稱
     */
    public void cancelScheduled(String name) {
        ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
        scheduledFuture.cancel(true);
        superScheduledConfig.removeScheduledFuture(name);
        logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經終止...");
    }

    /**
     * 啓動Scheduled
     *
     * @param name            scheduled的名稱
     * @param scheduledSource 定時任務的源信息
     */
    public void addScheduled(String name, ScheduledSource scheduledSource) {
        if (getRunScheduledName().contains(name)) {
            throw new SuperScheduledException("定時任務" + name + "已經被啓動過了");
        }
        if (!scheduledSource.check()) {
            throw new SuperScheduledException("定時任務" + name + "源數據內容錯誤");
        }

        scheduledSource.refreshType();

        Runnable runnable = superScheduledConfig.getRunnable(name);
        ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler();


        ScheduledFuture<?> schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable);
        logger.info(df.format(LocalDateTime.now()) + "任務" + name + "已經啓動...");

        superScheduledConfig.addScheduledSource(name, scheduledSource);
        superScheduledConfig.addScheduledFuture(name, schedule);
    }

    /**
     * 以cron類型啓動Scheduled
     *
     * @param name scheduled的名稱
     * @param cron cron表達式
     */
    public void addCronScheduled(String name, String cron) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setCron(cron);

        addScheduled(name, scheduledSource);
    }

    /**
     * 以fixedDelay類型啓動Scheduled
     *
     * @param name         scheduled的名稱
     * @param fixedDelay   上一次執行完畢時間點之後多長時間再執行
     * @param initialDelay 第一次執行的延遲時間
     */
    public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setFixedDelay(fixedDelay);
        if (initialDelay != null && initialDelay.length == 1) {
            scheduledSource.setInitialDelay(initialDelay[0]);
        } else if (initialDelay != null && initialDelay.length > 1) {
            throw new SuperScheduledException("第一次執行的延遲時間只能傳入一個參數");
        }

        addScheduled(name, scheduledSource);
    }

    /**
     * 以fixedRate類型啓動Scheduled
     *
     * @param name         scheduled的名稱
     * @param fixedRate    上一次開始執行之後多長時間再執行
     * @param initialDelay 第一次執行的延遲時間
     */
    public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setFixedRate(fixedRate);
        if (initialDelay != null && initialDelay.length == 1) {
            scheduledSource.setInitialDelay(initialDelay[0]);
        } else if (initialDelay != null && initialDelay.length > 1) {
            throw new SuperScheduledException("第一次執行的延遲時間只能傳入一個參數");
        }

        addScheduled(name, scheduledSource);
    }

    /**
     * 手動執行一次任務
     *
     * @param name scheduled的名稱
     */
    public void runScheduled(String name) {
        Runnable runnable = superScheduledConfig.getRunnable(name);
        runnable.run();
    }
}

2、增強接口實現

增強器實現的整體思路與 SpringAop 的思路一致,實現沒有 Aop 複雜

(1) 增強接口

@Order(Ordered.HIGHEST_PRECEDENCE)
public interface BaseStrengthen {
    /**
     * 前置強化方法
     *
     * @param bean   bean實例(或者是被代理的bean)
     * @param method 執行的方法對象
     * @param args   方法參數
     */
    void before(Object bean, Method method, Object[] args);

    /**
     * 後置強化方法
     * 出現異常不會執行
     * 如果未出現異常,在afterFinally方法之後執行
     *
     * @param bean   bean實例(或者是被代理的bean)
     * @param method 執行的方法對象
     * @param args   方法參數
     */
    void after(Object bean, Method method, Object[] args);

    /**
     * 異常強化方法
     *
     * @param bean   bean實例(或者是被代理的bean)
     * @param method 執行的方法對象
     * @param args   方法參數
     */
    void exception(Object bean, Method method, Object[] args);

    /**
     * Finally強化方法,出現異常也會執行
     *
     * @param bean   bean實例(或者是被代理的bean)
     * @param method 執行的方法對象
     * @param args   方法參數
     */
    void afterFinally(Object bean, Method method, Object[] args);
}

(2) 代理抽象類

public abstract class Point {
    /**
     * 定時任務名
     */
    private String superScheduledName;

    /**
     * 抽象的執行方法,使用代理實現
     * @param runnable 定時任務執行器
     */
    public abstract Object invoke(SuperScheduledRunnable runnable);
    
    /* 普通的get/sets省略 */
}

(3) 調用鏈類

public class Chain {
    private List<Point> list;
    private int index = -1;
    /**
     * 索引自增1
     */
    public int incIndex() {
        return ++index;
    }

    /**
     * 索引還原
     */
    public void resetIndex() {
        this.index = -1;
    }
}

(4) cglib 動態代理實現

使用 cglib 代理增強器,將增強器全部代理成調用鏈節點 Point

public class RunnableBaseInterceptor implements MethodInterceptor {
    /**
     * 定時任務執行器
     */
    private SuperScheduledRunnable runnable;
    /**
     * 定時任務增強類
     */
    private BaseStrengthen strengthen;

    @Override
    public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
        Object result;
        //如果執行的是invoke()方法
        if ("invoke".equals(method.getName())) {
         //前置強化方法
            strengthen.before(obj, method, args);
            try {
             //調用執行器中的invoke()方法
                result = runnable.invoke();
            } catch (Exception e) {
             //異常強化方法
                strengthen.exception(obj, method, args);
                throw new SuperScheduledException(strengthen.getClass() + "中強化執行時發生錯誤", e);
            } finally {
             //Finally強化方法,出現異常也會執行
                strengthen.afterFinally(obj, method, args);
            }
            //後置強化方法
            strengthen.after(obj, method, args);

        } else {
         //直接執行方法
            result = methodProxy.invokeSuper(obj, args);
        }
        return result;
    }

    public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {
        this.runnable = runnable;
        if (BaseStrengthen.class.isAssignableFrom(object.getClass())) {
            this.strengthen = (BaseStrengthen) object;
        } else {
            throw new SuperScheduledException(object.getClass() + "對象不是BaseStrengthen類型");
        }
    }

    public RunnableBaseInterceptor() {

    }
}

(5) 定時任務執行器實現

public class SuperScheduledRunnable {
    /**
     * 原始的方法
     */
    private Method method;
    /**
     * 方法所在的bean
     */
    private Object bean;
    /**
     * 增強器的調用鏈
     */
    private Chain chain;


    public Object invoke() {
        Object result;
        //索引自增1
        if (chain.incIndex() == chain.getList().size()) {
            //調用鏈中的增強方法已經全部執行結束
            try {
                //調用鏈索引初始化
                chain.resetIndex();
                //增強器全部執行完畢,執行原本的方法
                result = method.invoke(bean);
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw new SuperScheduledException(e.getLocalizedMessage());
            }
        } else {
            //獲取被代理後的方法增強器
            Point point = chain.getList().get(chain.getIndex());
            //執行增強器代理
            //增強器代理中,會回調方法執行器,形成調用鏈,逐一運行調用鏈中的增強器
            result = point.invoke(this);
        }
        return result;
    }
    
    /* 普通的get/sets省略 */
}

(6) 增強器代理邏輯

com.gyx.superscheduled.core.SuperScheduledApplicationRunner類中的代碼片段

//創建執行控制器
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
//用來存放 增強器的代理對象
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
//循環所有的增強器的beanName
for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
 //獲取增強器的bean對象
    Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
    //將增強器代理成Point節點
    Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
    proxy.setSuperScheduledName(name);
    //增強器的代理對象緩存到list中
    points.add(proxy);
}
//將增強器代理實例的集合生成調用鏈
//執行控制器中設置調用鏈
runnable.setChain(new Chain(points));
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ZhtIhvDCankIMm1u-EPCmw