分佈式架構之 Gobrs-Async
一、Gobrs-Async 是什麼?
Gobrs-Async 是一款功能強大、配置靈活、帶有全鏈路異常回調、內存優化、異常狀態管理於一身的高性能異步編排框架。爲企業提供在複雜應用場景下動態任務編排的能力。針對於複雜場景下,異步線程複雜性、任務依賴性、異常狀態難控制性;Gobrs-Async 爲此而生。
二、Gobrs-Async 能解決什麼問題?
能解決 CompletableFuture 所不能解決的問題。怎麼理解呢?
傳統的 Future、CompleteableFuture 一定程度上可以完成任務編排,並可以把結果傳遞到下一個任務。如 CompletableFuture 有 then 方法,但是卻無法做到對每一個執行單元的回調。譬如 A 執行完畢成功了,後面是 B,我希望 A 在執行完後就有個回調結果,方便我監控當前的執行狀況,或者打個日誌什麼的。失敗了,我也可以記錄個異常信息什麼的。
此時,CompleteableFuture 就無能爲力了。
Gobrs-Async 框架提供了這樣的回調功能。並且,如果執行成功、失敗、異常、超時等場景下都提供了管理線程任務的能力!
三、Gobrs-Async 的場景概述是什麼?
1. 場景一
說明:
任務 A 執行完了之後,繼續執行 B、C、D。
2. 場景二
說明:
任務 A 執行完了之後執行 B 然後再執行 C、D。
3. 場景三
說明:
任務 A 執行完了之後執行 B、E 然後按照順序 B 的流程走 C、D、G。E 的流程走 F、G
四、Gobrs-Async 的核心能力包含哪些?
五、Gobrs-Async 與其它多任務異步編排框架對比的結果是怎樣的?
六、Gobrs-Async 能解決哪些問題?
-
- 客戶端請求服務端接口,該接口需要調用其他 N 個微服務的接口。
-
- 並行執行 N 個任務,後續根據這 1-N 個任務的執行結果來決定是否繼續執行下一個任務。
-
- 需要進行線程隔離的多批次任務。
-
- 單機工作流任務編排。
-
- 其他有順序編排的需求。
七、Gobrs-Async 具有怎樣的特性?
Gobrs-Async 在開發時考慮了衆多使用者的開發喜歡,對異常處理的使用場景。並被運用到電商生產環境中,在京東經歷這嚴酷的高併發考驗。同時框架中 極簡靈活的配置、全局自定義可中斷全流程異常、內存優化、靈活的接入方式、提供 SpringBoot Start 接入方式。更加考慮使用者的開發習慣。僅需要注入 GobrsTask 的 Spring Bean 即可實現全流程接入。
Gobrs-Async 項目目錄及其精簡:
-
1.gobrs-async-example:Gobrs-Async 接入實例,提供測試用例。
-
2.gobrs-async-starter:Gobrs-Async 框架核心組件。
Gobrs-Async 在設計時,就充分考慮了開發者的使用習慣,沒有依賴任何中間件。對併發框架做了良好的封裝。主要使用 CountDownLatch 、ReentrantLock 、volatile 等一系列併發技術開發設計。
八、Gobrs-Async 的整體架構是怎樣的?
1. 任務觸發器
任務流的啓動者,負責啓動任務執行流。
2. 規則解析引擎
負責解析使用者配置的規則,同時於 Spring 結合,將配置的 Spring Bean 解析成 TaskBean,進而通過解析引擎加載成 任務裝飾器。進而組裝成任務樹。
3. 任務啓動器
負責通過使用解析引擎解析的任務樹。結合 JUC 併發框架調度實現對任務的統一管理,核心方法有
- (1)trigger 觸發任務加載器,爲加載任務準備環境。
4. 任務加載器
負責加載任務流程,開始調用任務執行器執行核心流程:
-
(1)load 核心任務流程方法,在這裏阻塞等待整個任務流程。
-
(2)getBeginProcess 獲取子任務開始流程。
-
(3)completed 任務完成。
-
(4)errorInterrupted 任務失敗 中斷任務流程。
-
(5)error 任務失敗。
5. 任務執行器
最終的任務執行,每一個任務對應一個 TaskActuator 任務的 攔截、異常、執行、線程複用 等必要條件判斷都在這裏處理
-
(1)prepare 任務前置處理。
-
(2)preInterceptor 統一任務前置處理。
-
(3)task 核心任務方法,業務執行內容。
-
(4)postInterceptor 統一後置處理。
-
(5)onSuccess 任務執行成功回調。
-
(6)onFail 任務執行失敗回調。
6. 任務總線
任務流程傳遞總線,包括 請求參數、任務加載器、 響應結果, 該對象暴露給使用者,拿到匹配業務的數據信息,例如:返回結果、主動中斷任務流程等功能 需要任務總線 (TaskSupport) 支持。
九、Gobrs-Async 的核心類圖是怎樣的?
十、在 YC-Framework 中如何使用 Gobrs-Async?
1. 引入依賴
<dependency>
<groupId>com.yc.framework</groupId>
<artifactId>yc-common-gobrs-async</artifactId>
</dependency>
2. 編寫配置類
@Configuration
public class ThreadPoolConfig {
@Autowired
private GobrsAsyncThreadPoolFactory factory;
/**
* Gobrs thread pool executor.
*/
@PostConstruct
public void gobrsThreadPoolExecutor(){
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(300, 500, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue());
factory.setThreadPoolExecutor(threadPoolExecutor);
}
}
3. 編寫多個 Task(具體參照 YC-Framework 對應的 Example)
4. 編寫 Service
@Service
public class GobrsService {
@Autowired
private AService aService;
@Autowired
private BService bService;
@Autowired
private CService cService;
@Autowired
private DService dService;
@Autowired
private EService eService;
@Autowired
private FService fService;
@Autowired
private GService gService;
@Resource
private GobrsAsync gobrsAsync;
@Resource
private RuleThermalLoad ruleThermalLoad;
/**
* The Executor service.
*/
ExecutorService executorService = Executors.newCachedThreadPool();
/**
* Gobrs async.
*/
public void gobrsAsync() {
gobrsAsync.go("test", () -> new Object());
}
/**
* Future.
*/
public void future() {
List<AsyncTask> abList = new ArrayList<>();
abList.add(aService);
abList.add(bService);
List<Future> futures = new ArrayList<>();
for (AsyncTask task : abList) {
Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
futures.add(submit);
}
for (Future future : futures) {
try {
Object o = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
List<AsyncTask> cdList = new ArrayList<>();
cdList.add(cService);
cdList.add(dService);
List<Future> futurescd = new ArrayList<>();
for (AsyncTask task : cdList) {
Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
futurescd.add(submit);
}
for (Future future : futurescd) {
try {
Object o = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
List<AsyncTask> efList = new ArrayList<>();
efList.add(eService);
efList.add(fService);
List<Future> futuresef = new ArrayList<>();
for (AsyncTask task : efList) {
Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
futuresef.add(submit);
}
for (Future future : futuresef) {
try {
Object o = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
Future<Object> submit = executorService.submit(() -> gService.task(new Object(), null));
try {
submit.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
/**
* Update rule.
*
* @param rule the rule
*/
public void updateRule(Rule rule) {
Rule r = new Rule();
r.setName("ruleName");
r.setContent("AService->CService->EService->GService; BService->DService->FService->HService;");
ruleThermalLoad.load(rule);
}
}
5. 編寫攔截器
@Slf4j
@Component
public class GobrsInterceptor implements AsyncTaskExceptionInterceptor {
@SneakyThrows
@Override
public void exception(ErrorCallback errorCallback) {
log.error("Execute global interceptor error{}", JsonUtil.obj2String(errorCallback.getThrowable()));
}
}
6. 編寫 Controller
@RestController
@RequestMapping("gobrs")
public class GobrsController {
@Resource
private GobrsAsync gobrsAsync;
@Autowired
private GobrsService gobrsService;
/**
* Gobrs test string.
*
* @return the string
*/
@RequestMapping("testGobrs")
public String gobrsTest() {
Map<Class, Object> params = new HashMap<>();
params.put(AService.class, "A的參數");
AsyncResult test = gobrsAsync.go("test", () -> params);
return "success";
}
/**
* Future.
*/
@RequestMapping("future")
public void future() {
long start = System.currentTimeMillis();
gobrsService.future();
long coust = System.currentTimeMillis() - start;
System.out.println("future " + coust);
}
/**
* Sets gobrs async.
*/
@RequestMapping("gobrsAsync")
public void setGobrsAsync() {
//開始時間: 獲取當前時間毫秒數
long start = System.currentTimeMillis();
gobrsService.gobrsAsync();
//結束時間: 當前時間 - 開始時間
long coust = System.currentTimeMillis() - start;
System.out.println("gobrs-Async " + coust);
}
/**
* Update rule.
*
* @param rule the rule
*/
@RequestMapping("updateRule")
public void updateRule(@RequestBody Rule rule) {
gobrsService.updateRule(rule);
}
}
相關示例代碼,已上傳至 Github:
https://github.com/developers-youcong/yc-framework/tree/main/yc-example/yc-example-gobrs-async
YC-Framework 官網:
https://framework.youcongtech.com/
YC-Framework Github 源代碼:
https://github.com/developers-youcong/yc-framework
YC-Framework Gitee 源代碼:
https://gitee.com/developers-youcong/yc-framework
以上源代碼均已開源,開源不易,如果對你有幫助,不妨給個 star!!!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/1JpRN2QUp_IkaGPz_v9hvw