分佈式架構之 Gobrs-Async

一、Gobrs-Async 是什麼?

Gobrs-Async 是一款功能強大、配置靈活、帶有全鏈路異常回調、內存優化、異常狀態管理於一身的高性能異步編排框架。爲企業提供在複雜應用場景下動態任務編排的能力。針對於複雜場景下,異步線程複雜性、任務依賴性、異常狀態難控制性;Gobrs-Async 爲此而生。

二、Gobrs-Async 能解決什麼問題?

能解決 CompletableFuture 所不能解決的問題。怎麼理解呢?

傳統的 Future、CompleteableFuture 一定程度上可以完成任務編排,並可以把結果傳遞到下一個任務。如 CompletableFuture 有 then 方法,但是卻無法做到對每一個執行單元的回調。譬如 A 執行完畢成功了,後面是 B,我希望 A 在執行完後就有個回調結果,方便我監控當前的執行狀況,或者打個日誌什麼的。失敗了,我也可以記錄個異常信息什麼的。

此時,CompleteableFuture 就無能爲力了。

Gobrs-Async 框架提供了這樣的回調功能。並且,如果執行成功、失敗、異常、超時等場景下都提供了管理線程任務的能力!

三、Gobrs-Async 的場景概述是什麼?

1. 場景一

說明:
任務 A 執行完了之後,繼續執行 B、C、D。

2. 場景二

說明:
任務 A 執行完了之後執行 B 然後再執行 C、D。

3. 場景三

說明:
任務 A 執行完了之後執行 B、E 然後按照順序 B 的流程走 C、D、G。E 的流程走 F、G

四、Gobrs-Async 的核心能力包含哪些?

五、Gobrs-Async 與其它多任務異步編排框架對比的結果是怎樣的?

六、Gobrs-Async 能解決哪些問題?

七、Gobrs-Async 具有怎樣的特性?

Gobrs-Async 在開發時考慮了衆多使用者的開發喜歡,對異常處理的使用場景。並被運用到電商生產環境中,在京東經歷這嚴酷的高併發考驗。同時框架中 極簡靈活的配置、全局自定義可中斷全流程異常、內存優化、靈活的接入方式、提供 SpringBoot Start 接入方式。更加考慮使用者的開發習慣。僅需要注入 GobrsTask 的 Spring Bean 即可實現全流程接入。

Gobrs-Async 項目目錄及其精簡:

Gobrs-Async 在設計時,就充分考慮了開發者的使用習慣,沒有依賴任何中間件。對併發框架做了良好的封裝。主要使用 CountDownLatch 、ReentrantLock 、volatile 等一系列併發技術開發設計。

八、Gobrs-Async 的整體架構是怎樣的?

1. 任務觸發器

任務流的啓動者,負責啓動任務執行流。

2. 規則解析引擎

負責解析使用者配置的規則,同時於 Spring 結合,將配置的 Spring Bean 解析成 TaskBean,進而通過解析引擎加載成 任務裝飾器。進而組裝成任務樹。

3. 任務啓動器

負責通過使用解析引擎解析的任務樹。結合 JUC 併發框架調度實現對任務的統一管理,核心方法有

4. 任務加載器

負責加載任務流程,開始調用任務執行器執行核心流程:

5. 任務執行器

最終的任務執行,每一個任務對應一個 TaskActuator 任務的 攔截、異常、執行、線程複用 等必要條件判斷都在這裏處理

6. 任務總線

任務流程傳遞總線,包括 請求參數、任務加載器、 響應結果, 該對象暴露給使用者,拿到匹配業務的數據信息,例如:返回結果、主動中斷任務流程等功能 需要任務總線 (TaskSupport) 支持。

九、Gobrs-Async 的核心類圖是怎樣的?

十、在 YC-Framework 中如何使用 Gobrs-Async?

1. 引入依賴

<dependency>
    <groupId>com.yc.framework</groupId>
    <artifactId>yc-common-gobrs-async</artifactId>
</dependency>

2. 編寫配置類

@Configuration
public class ThreadPoolConfig {
    @Autowired
    private GobrsAsyncThreadPoolFactory factory;
    /**
     * Gobrs thread pool executor.
     */
    @PostConstruct
    public void gobrsThreadPoolExecutor(){
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(300, 500, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue());
        factory.setThreadPoolExecutor(threadPoolExecutor);
    }
}

3. 編寫多個 Task(具體參照 YC-Framework 對應的 Example)

4. 編寫 Service

@Service
public class GobrsService {
    @Autowired
    private AService aService;
    @Autowired
    private BService bService;
    @Autowired
    private CService cService;
    @Autowired
    private DService dService;
    @Autowired
    private EService eService;
    @Autowired
    private FService fService;
    @Autowired
    private GService gService;
    @Resource
    private GobrsAsync gobrsAsync;
    @Resource
    private RuleThermalLoad ruleThermalLoad;
    /**
     * The Executor service.
     */
    ExecutorService executorService = Executors.newCachedThreadPool();
    /**
     * Gobrs async.
     */
    public void gobrsAsync() {
        gobrsAsync.go("test", () -> new Object());
    }
    /**
     * Future.
     */
    public void future() {
        List<AsyncTask> abList = new ArrayList<>();
        abList.add(aService);
        abList.add(bService);
        List<Future> futures = new ArrayList<>();
        for (AsyncTask task : abList) {
            Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
            futures.add(submit);
        }
        for (Future future : futures) {
            try {
                Object o = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        List<AsyncTask> cdList = new ArrayList<>();
        cdList.add(cService);
        cdList.add(dService);
        List<Future> futurescd = new ArrayList<>();
        for (AsyncTask task : cdList) {
            Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
            futurescd.add(submit);
        }
        for (Future future : futurescd) {
            try {
                Object o = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        List<AsyncTask> efList = new ArrayList<>();
        efList.add(eService);
        efList.add(fService);
        List<Future> futuresef = new ArrayList<>();
        for (AsyncTask task : efList) {
            Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
            futuresef.add(submit);
        }
        for (Future future : futuresef) {
            try {
                Object o = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        Future<Object> submit = executorService.submit(() -> gService.task(new Object(), null));
        try {
            submit.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    /**
     * Update rule.
     *
     * @param rule the rule
     */
    public void updateRule(Rule rule) {
        Rule r = new Rule();
        r.setName("ruleName");
        r.setContent("AService->CService->EService->GService; BService->DService->FService->HService;");
        ruleThermalLoad.load(rule);
    }
}

5. 編寫攔截器

@Slf4j
@Component
public class GobrsInterceptor implements AsyncTaskExceptionInterceptor {
    @SneakyThrows
    @Override
    public void exception(ErrorCallback errorCallback) {
        log.error("Execute global interceptor  error{}", JsonUtil.obj2String(errorCallback.getThrowable()));
    }
}

6. 編寫 Controller

@RestController
@RequestMapping("gobrs")
public class GobrsController {
    @Resource
    private GobrsAsync gobrsAsync;
    @Autowired
    private GobrsService gobrsService;
    /**
     * Gobrs test string.
     *
     * @return the string
     */
    @RequestMapping("testGobrs")
    public String gobrsTest() {
        Map<Class, Object> params = new HashMap<>();
        params.put(AService.class, "A的參數");
        AsyncResult test = gobrsAsync.go("test", () -> params);
        return "success";
    }
    /**
     * Future.
     */
    @RequestMapping("future")
    public void future() {
        long start = System.currentTimeMillis();
        gobrsService.future();
        long coust = System.currentTimeMillis() - start;
        System.out.println("future " + coust);
    }
    /**
     * Sets gobrs async.
     */
    @RequestMapping("gobrsAsync")
    public void setGobrsAsync() {
        //開始時間: 獲取當前時間毫秒數
        long start = System.currentTimeMillis();
        gobrsService.gobrsAsync();
        //結束時間: 當前時間 - 開始時間
        long coust = System.currentTimeMillis() - start;
        System.out.println("gobrs-Async " + coust);
    }
    /**
     * Update rule.
     *
     * @param rule the rule
     */
    @RequestMapping("updateRule")
    public void updateRule(@RequestBody Rule rule) {
        gobrsService.updateRule(rule);
    }
}

相關示例代碼,已上傳至 Github:
https://github.com/developers-youcong/yc-framework/tree/main/yc-example/yc-example-gobrs-async

YC-Framework 官網:
https://framework.youcongtech.com/

YC-Framework Github 源代碼:
https://github.com/developers-youcong/yc-framework

YC-Framework Gitee 源代碼:
https://gitee.com/developers-youcong/yc-framework

以上源代碼均已開源,開源不易,如果對你有幫助,不妨給個 star!!!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/1JpRN2QUp_IkaGPz_v9hvw