筆耕不輟,聊聊 7 種實現異步編程的方式
大家好,我是小富~
最近有很多小夥伴給我留言,能不能總結下異步編程,今天就和大家簡單聊聊這個話題。
早期的系統是同步的,容易理解,我們來看個例子
同步編程
當用戶創建一筆電商交易訂單時,要經歷的業務邏輯流程還是很長的,每一步都要耗費一定的時間,那麼整體的 RT 就會比較長。
於是,聰明的人們開始思考能不能將一些非核心業務從主流程中剝離出來,於是有了異步編程
雛形。
異步編程是讓程序併發運行的一種手段。它允許多個事件同時發生,當程序調用需要長時間運行的方法時,它不會阻塞當前的執行流程,程序可以繼續運行。
核心思路:採用多線程優化性能,將串行操作變成並行操作。異步模式設計的程序可以顯著減少線程等待,從而在高吞吐量場景中,極大提升系統的整體性能,顯著降低時延。
接下來,我們來講下異步有哪些編程實現方式
一、線程 Thread
直接繼承 Thread類
是創建異步線程最簡單的方式。
首先,創建 Thread 子類,普通類或匿名內部類方式;然後創建子類實例;最後通過 start() 方法啓動線程。
public class AsyncThread extends Thread{
@Override
public void run() {
System.out.println("當前線程名稱:" + this.getName() + ", 執行線程名稱:" + Thread.currentThread().getName() + "-hello");
}
}
public static void main(String[] args) {
// 模擬業務流程
// .......
// 創建異步線程
AsyncThread asyncThread = new AsyncThread();
// 啓動異步線程
asyncThread.start();
}
當然如果每次都創建一個 Thread線程
,頻繁的創建、銷燬,浪費系統資源。我們可以採用線程池
@Bean(name = "executorService")
public ExecutorService downloadExecutorService() {
return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(),
(r, executor) -> log.error("defaultExecutor pool is full! "));
}
將業務邏輯封裝到 Runnable
或 Callable
中,交由 線程池
來執行
二、Future
上述方式雖然達到了多線程並行處理,但有些業務不僅僅要執行過程,還要獲取執行結果。
Java 從 1.5 版本開始,提供了 Callable
和 Future
,可以在任務執行完畢之後得到任務執行結果。
當然也提供了其他功能,如:取消任務、查詢任務是否完成等
Future 類位於 java.util.concurrent 包下,接口定義:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
方法描述:
-
cancel():取消任務,如果取消任務成功返回 true,如果取消任務失敗則返回 false
-
isCancelled():表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true
-
isDone():表示任務是否已經完成,如果完成,返回 true
-
get():獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
-
get(long timeout, TimeUnit unit):用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回 null
代碼示例:
public class CallableAndFuture {
public static ExecutorService executorService = new ThreadPoolExecutor(4, 40,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "異步處理,Callable 返回結果";
}
}
public static void main(String[] args) {
Future<String> future = executorService.submit(new MyCallable());
try {
System.out.println(future.get());
} catch (Exception e) {
// nodo
} finally {
executorService.shutdown();
}
}
}
Future 表示一個可能還沒有完成的異步任務的結果,通過 get
方法獲取執行結果,該方法會阻塞直到任務返回結果。
三、FutureTask
FutureTask
實現了 RunnableFuture
接口,則 RunnableFuture
接口繼承了 Runnable
接口和 Future
接口,所以可以將 FutureTask
對象作爲任務提交給 ThreadPoolExecutor
去執行,也可以直接被 Thread
執行;又因爲實現了 Future
接口,所以也能用來獲得任務的執行結果。
FutureTask 構造函數:
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
FutureTask 常用來封裝 Callable
和 Runnable
,可以作爲一個任務提交到線程池中執行。除了作爲一個獨立的類之外,也提供了一些功能性函數供我們創建自定義 task 類使用。
FutureTask 線程安全由 CAS 來保證。
ExecutorService executor = Executors.newCachedThreadPool();
// FutureTask包裝callbale任務,再交給線程池執行
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println("子線程開始計算:");
Integer sum = 0;
for (int i = 1; i <= 100; i++)
sum += i;
return sum;
});
// 線程池執行任務, 運行結果在 futureTask 對象裏面
executor.submit(futureTask);
try {
System.out.println("task運行結果計算的總和爲:" + futureTask.get());
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
Callable 和 Future 的區別:Callable 用於產生結果,Future 用於獲取結果
如果是對多個任務多次自由串行、或並行組合,涉及多個線程之間同步阻塞獲取結果,Future 代碼實現會比較繁瑣,需要我們手動處理各個交叉點,很容易出錯。
四、異步框架 CompletableFuture
Future 類通過 get()
方法阻塞等待獲取異步執行的運行結果,性能比較差。
JDK1.8 中,Java 提供了 CompletableFuture
類,它是基於異步函數式編程。相對阻塞式等待返回結果,CompletableFuture
可以通過回調的方式來處理計算結果,實現了異步非阻塞,性能更優。
優點:
-
異步任務結束時,會自動回調某個對象的方法
-
異步任務出錯時,會自動回調某個對象的方法
-
主線程設置好回調後,不再關心異步任務的執行
泡茶示例:
(內容摘自:極客時間的《Java 併發編程實戰》)
//任務1:洗水壺->燒開水
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(() -> {
System.out.println("T1:洗水壺...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:燒開水...");
sleep(15, TimeUnit.SECONDS);
});
//任務2:洗茶壺->洗茶杯->拿茶葉
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(() -> {
System.out.println("T2:洗茶壺...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶葉...");
sleep(1, TimeUnit.SECONDS);
return "龍井";
});
//任務3:任務1和任務2完成後執行:泡茶
CompletableFuture<String> f3 =
f1.thenCombine(f2, (__, tf) -> {
System.out.println("T1:拿到茶葉:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
//等待任務3執行結果
System.out.println(f3.join());
}
CompletableFuture 提供了非常豐富的 API,大約有 50 種處理串行,並行,組合以及處理錯誤的方法。
更多內容移步之前寫的一篇文章,搞定 CompletableFuture,併發異步編程和編寫串行程序還有什麼區別?
五、 SpringBoot 註解 @Async
除了硬編碼的異步編程處理方式,SpringBoot 框架還提供了 註解式
解決方案,以 方法體
爲邊界,方法體內部的代碼邏輯全部按異步方式執行。
首先,使用 @EnableAsync
啓用異步註解
@SpringBootApplication
@EnableAsync
public class StartApplication {
public static void main(String[] args) {
SpringApplication.run(StartApplication.class, args);
}
}
自定義線程池:
@Configuration
@Slf4j
public class ThreadPoolConfiguration {
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService() {
return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10000),
new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
(r, executor) -> log.error("system pool is full! "));
}
}
在異步處理的方法上添加註解 @Async
,當對 execute 方法
調用時,通過自定義的線程池 defaultThreadPoolExecutor
異步化執行 execute 方法
@Service
public class AsyncServiceImpl implements AsyncService {
@Async("defaultThreadPoolExecutor")
public Boolean execute(Integer num) {
System.out.println("線程:" + Thread.currentThread().getName() + " , 任務:" + num);
return true;
}
}
用 @Async 註解標記的方法,稱爲異步方法。在 spring boot 應用中使用 @Async 很簡單:
-
調用異步方法類上或者啓動類加上註解 @EnableAsync
-
在需要被異步調用的方法外加上 @Async
-
所使用的 @Async 註解方法的類對象應該是 Spring 容器管理的 bean 對象;
六、Spring ApplicationEvent 事件
事件機制在一些大型項目中被經常使用,Spring 專門提供了一套事件機制的接口,滿足了架構原則上的解耦。
ApplicationContext
通過 ApplicationEvent
類和 ApplicationListener
接口進行事件處理。如果將實現 ApplicationListener
接口的 bean 注入到上下文中,則每次使用 ApplicationContext
發佈 ApplicationEvent
時,都會通知該 bean。本質上,這是標準的觀察者設計模式
。
ApplicationEvent 是由 Spring 提供的所有 Event 類的基類
首先,自定義業務事件子類,繼承自 ApplicationEvent
,通過泛型注入業務模型參數類。相當於 MQ 的消息體。
public class OrderEvent extends AbstractGenericEvent<OrderModel> {
public OrderEvent(OrderModel source) {
super(source);
}
}
然後,編寫事件監聽器。ApplicationListener
接口是由 Spring 提供的事件訂閱者必須實現的接口,我們需要定義一個子類,繼承 ApplicationListener
。相當於 MQ 的消費端
@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
@Override
public void onApplicationEvent(OrderEvent event) {
System.out.println("【OrderEventListener】監聽器處理!" + JSON.toJSONString(event.getSource()));
}
}
最後,發佈事件,把某個事件告訴所有與這個事件相關的監聽器。相當於 MQ 的生產端。
OrderModel orderModel = new OrderModel();
orderModel.setOrderId((long) i);
orderModel.setBuyerName("Tom-" + i);
orderModel.setSellerName("judy-" + i);
orderModel.setAmount(100L);
// 發佈Spring事件通知
SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));
加個餐:
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 1
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 2
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 3
上面是跑了個 demo 的運行結果,我們發現無論生產端還是消費端,使用了同一個線程 http-nio-8090-exec-1
,Spring 框架的事件機制默認是同步阻塞的。只是在代碼規範方面做了解耦,有較好的擴展性,但底層還是採用同步調用方式。
那麼問題來了,如果想實現異步調用,如何處理?
我們需要手動創建一個 SimpleApplicationEventMulticaster
,並設置 TaskExecutor
,此時所有的消費事件採用異步線程執行。
@Component
public class SpringConfiguration {
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
return simpleApplicationEventMulticaster;
}
}
我們看下改造後的運行結果:
[生產端]線程:http-nio-8090-exec-1,發佈事件 1
[生產端]線程:http-nio-8090-exec-1,發佈事件 2
[生產端]線程:http-nio-8090-exec-1,發佈事件 3
[消費端]線程:default-executor-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消費端]線程:default-executor-2,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消費端]線程:default-executor-0,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
SimpleApplicationEventMulticaster
這個我們自己實例化的 Bean 與系統默認的加載順序如何?會不會有衝突?
查了下 Spring 源碼,處理邏輯在 AbstractApplicationContext#initApplicationEventMulticaster
方法中,通過 beanFactory 查找是否有自定義的 Bean,如果沒有,容器會自己 new 一個 SimpleApplicationEventMulticaster
對象注入到容器中。
代碼地址:https://github.com/aalansehaiyang/wx-project
七、消息隊列
異步架構是互聯網系統中一種典型架構模式,與同步架構相對應。而消息隊列天生就是這種異步架構,具有超高吞吐量和超低時延。
消息隊列異步架構的主要角色包括消息生產者、消息隊列和消息消費者。
消息生產者就是主應用程序,生產者將調用請求封裝成消息發送給消息隊列。
消息隊列的職責就是緩衝消息,等待消費者消費。根據消費方式又分爲點對點模式
和發佈訂閱模式
兩種。
消息消費者,用來從消息隊列中拉取、消費消息,完成業務邏輯處理。
當然市面上消息隊列框架非常多,常見的有 RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等
不同的消息隊列的功能特性會略有不同,但整體架構類似,這裏就不展開了。
我們只需要記住一個關鍵點,藉助消息隊列這個中間件可以高效的實現異步編程。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/MnY1rWBg9orS66WbQOfyWA