筆耕不輟,聊聊 7 種實現異步編程的方式

大家好,我是小富~

最近有很多小夥伴給我留言,能不能總結下異步編程,今天就和大家簡單聊聊這個話題。

早期的系統是同步的,容易理解,我們來看個例子

同步編程

當用戶創建一筆電商交易訂單時,要經歷的業務邏輯流程還是很長的,每一步都要耗費一定的時間,那麼整體的 RT 就會比較長。

於是,聰明的人們開始思考能不能將一些非核心業務從主流程中剝離出來,於是有了異步編程雛形。

異步編程是讓程序併發運行的一種手段。它允許多個事件同時發生,當程序調用需要長時間運行的方法時,它不會阻塞當前的執行流程,程序可以繼續運行。

核心思路:採用多線程優化性能,將串行操作變成並行操作。異步模式設計的程序可以顯著減少線程等待,從而在高吞吐量場景中,極大提升系統的整體性能,顯著降低時延。

接下來,我們來講下異步有哪些編程實現方式

一、線程 Thread

直接繼承 Thread類 是創建異步線程最簡單的方式。

首先,創建 Thread 子類,普通類或匿名內部類方式;然後創建子類實例;最後通過 start() 方法啓動線程。

public class AsyncThread extends Thread{
    @Override
    public void run() {
        System.out.println("當前線程名稱:" + this.getName() + ", 執行線程名稱:" + Thread.currentThread().getName() + "-hello");
    }
}
public static void main(String[] args) {

  // 模擬業務流程
  // .......
  
    // 創建異步線程 
    AsyncThread asyncThread = new AsyncThread();

    // 啓動異步線程
    asyncThread.start();
}

當然如果每次都創建一個 Thread線程,頻繁的創建、銷燬,浪費系統資源。我們可以採用線程池

@Bean(name = "executorService")
public ExecutorService downloadExecutorService() {
    return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
            new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(),
            (r, executor) -> log.error("defaultExecutor pool is full! "));
}

將業務邏輯封裝到 RunnableCallable 中,交由 線程池 來執行

二、Future

上述方式雖然達到了多線程並行處理,但有些業務不僅僅要執行過程,還要獲取執行結果。

Java 從 1.5 版本開始,提供了 CallableFuture,可以在任務執行完畢之後得到任務執行結果。

當然也提供了其他功能,如:取消任務、查詢任務是否完成等

Future 類位於 java.util.concurrent 包下,接口定義:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

方法描述:

代碼示例:

public class CallableAndFuture {

    public static ExecutorService executorService = new ThreadPoolExecutor(4, 40,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder()
            .setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());


    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            return "異步處理,Callable 返回結果";
        }
    }

    public static void main(String[] args) {
        Future<String> future = executorService.submit(new MyCallable());
        try {
            System.out.println(future.get());
        } catch (Exception e) {
            // nodo
        } finally {
            executorService.shutdown();
        }
    }
}

Future 表示一個可能還沒有完成的異步任務的結果,通過 get 方法獲取執行結果,該方法會阻塞直到任務返回結果。

三、FutureTask

FutureTask 實現了 RunnableFuture 接口,則 RunnableFuture 接口繼承了 Runnable 接口和 Future 接口,所以可以將 FutureTask 對象作爲任務提交給 ThreadPoolExecutor 去執行,也可以直接被 Thread 執行;又因爲實現了 Future 接口,所以也能用來獲得任務的執行結果。

FutureTask 構造函數:

public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)

FutureTask 常用來封裝 CallableRunnable,可以作爲一個任務提交到線程池中執行。除了作爲一個獨立的類之外,也提供了一些功能性函數供我們創建自定義 task 類使用。

FutureTask 線程安全由 CAS 來保證。

ExecutorService executor = Executors.newCachedThreadPool();
// FutureTask包裝callbale任務,再交給線程池執行
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
    System.out.println("子線程開始計算:");
    Integer sum = 0;
    for (int i = 1; i <= 100; i++)
        sum += i;
    return sum;
});

// 線程池執行任務, 運行結果在 futureTask 對象裏面
executor.submit(futureTask);

try {
    System.out.println("task運行結果計算的總和爲:" + futureTask.get());
} catch (Exception e) {
    e.printStackTrace();
}
executor.shutdown();

Callable 和 Future 的區別:Callable 用於產生結果,Future 用於獲取結果

如果是對多個任務多次自由串行、或並行組合,涉及多個線程之間同步阻塞獲取結果,Future 代碼實現會比較繁瑣,需要我們手動處理各個交叉點,很容易出錯。

四、異步框架 CompletableFuture

Future 類通過 get() 方法阻塞等待獲取異步執行的運行結果,性能比較差。

JDK1.8 中,Java 提供了 CompletableFuture 類,它是基於異步函數式編程。相對阻塞式等待返回結果,CompletableFuture 可以通過回調的方式來處理計算結果,實現了異步非阻塞,性能更優。

優點

泡茶示例:

(內容摘自:極客時間的《Java 併發編程實戰》)

//任務1:洗水壺->燒開水
CompletableFuture<Void> f1 =
        CompletableFuture.runAsync(() -> {
            System.out.println("T1:洗水壺...");
            sleep(1, TimeUnit.SECONDS);

            System.out.println("T1:燒開水...");
            sleep(15, TimeUnit.SECONDS);
        });

//任務2:洗茶壺->洗茶杯->拿茶葉
CompletableFuture<String> f2 =
        CompletableFuture.supplyAsync(() -> {
            System.out.println("T2:洗茶壺...");
            sleep(1, TimeUnit.SECONDS);

            System.out.println("T2:洗茶杯...");
            sleep(2, TimeUnit.SECONDS);

            System.out.println("T2:拿茶葉...");
            sleep(1, TimeUnit.SECONDS);
            return "龍井";
        });

//任務3:任務1和任務2完成後執行:泡茶
CompletableFuture<String> f3 =
        f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1:拿到茶葉:" + tf);
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        });

//等待任務3執行結果
System.out.println(f3.join());

}

CompletableFuture 提供了非常豐富的 API,大約有 50 種處理串行,並行,組合以及處理錯誤的方法。

更多內容移步之前寫的一篇文章,搞定 CompletableFuture,併發異步編程和編寫串行程序還有什麼區別?

五、 SpringBoot 註解 @Async

除了硬編碼的異步編程處理方式,SpringBoot 框架還提供了 註解式 解決方案,以 方法體 爲邊界,方法體內部的代碼邏輯全部按異步方式執行。

首先,使用 @EnableAsync 啓用異步註解

@SpringBootApplication
@EnableAsync
public class StartApplication {

    public static void main(String[] args) {
        SpringApplication.run(StartApplication.class, args);
    }
}

自定義線程池:

@Configuration
@Slf4j
public class ThreadPoolConfiguration {

    @Bean(name = "defaultThreadPoolExecutor"destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {

        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }
}

在異步處理的方法上添加註解 @Async ,當對 execute 方法 調用時,通過自定義的線程池 defaultThreadPoolExecutor 異步化執行  execute 方法

@Service
public class AsyncServiceImpl implements AsyncService {

    @Async("defaultThreadPoolExecutor")
    public Boolean execute(Integer num) {
        System.out.println("線程:" + Thread.currentThread().getName() + " , 任務:" + num);
        return true;
    }

}

用 @Async 註解標記的方法,稱爲異步方法。在 spring boot 應用中使用 @Async 很簡單:

六、Spring ApplicationEvent 事件

事件機制在一些大型項目中被經常使用,Spring 專門提供了一套事件機制的接口,滿足了架構原則上的解耦。

ApplicationContext 通過 ApplicationEvent 類和 ApplicationListener 接口進行事件處理。如果將實現 ApplicationListener 接口的 bean 注入到上下文中,則每次使用 ApplicationContext 發佈 ApplicationEvent 時,都會通知該 bean。本質上,這是標準的觀察者設計模式

ApplicationEvent 是由 Spring 提供的所有 Event 類的基類

首先,自定義業務事件子類,繼承自 ApplicationEvent,通過泛型注入業務模型參數類。相當於 MQ 的消息體。

public class OrderEvent extends AbstractGenericEvent<OrderModel> {
    public OrderEvent(OrderModel source) {
        super(source);
    }
}

然後,編寫事件監聽器。ApplicationListener 接口是由 Spring 提供的事件訂閱者必須實現的接口,我們需要定義一個子類,繼承 ApplicationListener。相當於 MQ 的消費端

@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
    @Override
    public void onApplicationEvent(OrderEvent event) {

        System.out.println("【OrderEventListener】監聽器處理!" + JSON.toJSONString(event.getSource()));

    }
}

最後,發佈事件,把某個事件告訴所有與這個事件相關的監聽器。相當於 MQ 的生產端。

OrderModel orderModel = new OrderModel();
orderModel.setOrderId((long) i);
orderModel.setBuyerName("Tom-" + i);
orderModel.setSellerName("judy-" + i);
orderModel.setAmount(100L);
// 發佈Spring事件通知
SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));

加個餐:

[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 1
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 2
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 3

上面是跑了個 demo 的運行結果,我們發現無論生產端還是消費端,使用了同一個線程 http-nio-8090-exec-1,Spring 框架的事件機制默認是同步阻塞的。只是在代碼規範方面做了解耦,有較好的擴展性,但底層還是採用同步調用方式。

那麼問題來了,如果想實現異步調用,如何處理?

我們需要手動創建一個 SimpleApplicationEventMulticaster,並設置 TaskExecutor,此時所有的消費事件採用異步線程執行。

@Component
public class SpringConfiguration {

    @Bean
    public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
        return simpleApplicationEventMulticaster;
    }

}

我們看下改造後的運行結果:

[生產端]線程:http-nio-8090-exec-1,發佈事件 1
[生產端]線程:http-nio-8090-exec-1,發佈事件 2
[生產端]線程:http-nio-8090-exec-1,發佈事件 3
[消費端]線程:default-executor-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消費端]線程:default-executor-2,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消費端]線程:default-executor-0,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}

SimpleApplicationEventMulticaster 這個我們自己實例化的 Bean 與系統默認的加載順序如何?會不會有衝突?

查了下 Spring 源碼,處理邏輯在 AbstractApplicationContext#initApplicationEventMulticaster 方法中,通過 beanFactory 查找是否有自定義的 Bean,如果沒有,容器會自己 new 一個 SimpleApplicationEventMulticaster 對象注入到容器中。

代碼地址:https://github.com/aalansehaiyang/wx-project

七、消息隊列

異步架構是互聯網系統中一種典型架構模式,與同步架構相對應。而消息隊列天生就是這種異步架構,具有超高吞吐量和超低時延。

消息隊列異步架構的主要角色包括消息生產者、消息隊列和消息消費者。

消息生產者就是主應用程序,生產者將調用請求封裝成消息發送給消息隊列。

消息隊列的職責就是緩衝消息,等待消費者消費。根據消費方式又分爲點對點模式發佈訂閱模式兩種。

消息消費者,用來從消息隊列中拉取、消費消息,完成業務邏輯處理。

當然市面上消息隊列框架非常多,常見的有 RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等

不同的消息隊列的功能特性會略有不同,但整體架構類似,這裏就不展開了。

我們只需要記住一個關鍵點,藉助消息隊列這個中間件可以高效的實現異步編程。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/MnY1rWBg9orS66WbQOfyWA