如何實現異步通知的重試機制

工作中經常要和第三方做對接,比如支付、電子合同等系統。操作成功之後,第三方會發送異步的通知,返回最終的處理結果,使用異步而不是使用同步通知,是爲了加快系統響應速度,防止線程阻塞。任務處理完成後通過異步的通知,發送給對應的服務端。之前對接微信支付,完成支付後,微信發送一個異步通知給服務端,服務端根據支付通知修改狀態,通知規則看到以下的一段話。

其中有段話:

重新發送通知,直到成功爲止(在通知一直不成功的情況下,微信總共會發起多次通知,通知頻率爲 15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h - 總計 24h4m)

微信爲何要這麼設計

微信結果通知本質就是發送一個網絡請求到不同的服務器上,既然是一個網絡請求,就可能因爲各種原因導致請求超時或者失敗,比如:

以上原因都會導致支付結果通知接收失敗,也就無法通知給用戶。爲了解決上述的問題,就需要引入**「重試機制」**,當請求無法應答時,就需要重試幾次,保證請求能確認發送。

異步通知的重試機制

從微信支付通知可以引申到所有的異步通知,或者和第三方對接時。如果要確保通知能被成功的接收,就需要考慮請求失敗的情況,大部分都是需要使用重試機制。而重試機制是隔段時間不是固定的,是越來越大的,這是考慮到重試時,由於網絡故障或者服務器故障重啓設備需要花一段時間,而**「間隔時間越來越長就可以更大的保證請求可以被成功接收」**。

重複請求,接口需要考慮重複請求的情況,要設計成一個冪等性接口,多次請求和請求一次的效果是一致的。

重試機制的實現

重試機制就是一個定時器,隔一段時間執行一次,沒有預期的效果就再重複執行一次。

實現的難點就在於,間隔的時間是不一致的,如果時間的間隔是固定的話,就可以使用定時任務。

方案一:定時任務(不可行)

使用定時器,每隔一段時間執行一次任務。在 SpringBoot 啓動類添加 @EnableScheduling 註解,然後在執行的方法添加 @Scheduled 註解。

@Scheduled(fixedDelay = 1000*2)
public void test2() {
    Date date = new Date();
    System.out.println("tesk2 " + date);
}

以上表示每隔 2 秒執行一次。間隔時間都是固定的,這個不符合預期,因爲要求的時間間隔是依次增加的。

如果是間隔時間是固定的,那定時任務就符合條件嗎?

如果是隻有一條任務在執行,執行不成功,存放在 Redis 中,然後定時執行任務,如果任務執行成功,就去掉任務。但是定時器還是會定時執行。

如果執行的任務很多的話,前面的任務要等待後續的任務執行,那延遲就很嚴重了,就需要使用到多線程,開啓多個線程,在《阿里 Java 開發手冊》有一條:

線程資源必須通過線程池提供,不允許在應用中自行顯式創建線程。

定時任務有以下幾個缺點不滿足:

方案二:線程池 + 定時任務 (不可行)

既然使用單線程會產生延遲,就使用線程池來降低延遲,因爲發起請求屬於 IO 密集型,所以線程數設置成 CPU 個數的兩倍,在 SpringBoot 自定義一個線程池:

@Configuration
public class ThreadPoolConfig {

    // 線程存活時間
    private static int keepAliveTime = 10;

    // 調用線程運行多餘任務
    RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

    @Bean("customerTaskExecutor")
    public TaskExecutor taskExecutor() {
        // 核心線程數
        int cores = Runtime.getRuntime().availableProcessors()*2;
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(cores);
        executor.setMaxPoolSize(cores);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setRejectedExecutionHandler(handler);
        executor.setThreadNamePrefix("Custom-");  // 線程名前綴
        executor.initialize();
        return executor;
    }
}

其中核心線程數和最大線程數設置成一致,拒絕策略使用調用線程運行多餘的任務,確保每個任務都能執行。然後添加一個異步方法.

public interface AsyncService {

    void executeAsync();
}

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {

    @Override
    @Async("customerTaskExecutor")
    public void executeAsync() {
        log.info("【開始執行任務】");
        // 延遲幾秒
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("【結束執行任務】");

    }
}

使用 sleep 方法延遲,模擬請求,使用壓測工具,發起 100 次請求,控制檯輸出如下:

2023-10-31 18:00:32.792  INFO 53009 --- [       Custom-1] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.811  INFO 53009 --- [       Custom-2] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.813  INFO 53009 --- [       Custom-3] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.814  INFO 53009 --- [       Custom-4] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.816  INFO 53009 --- [       Custom-5] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.817  INFO 53009 --- [       Custom-6] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.819  INFO 53009 --- [       Custom-7] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.820  INFO 53009 --- [       Custom-8] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.821  INFO 53009 --- [       Custom-9] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.823  INFO 53009 --- [      Custom-10] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.824  INFO 53009 --- [      Custom-11] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.825  INFO 53009 --- [      Custom-12] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:33.296  INFO 53009 --- [       Custom-1] com.jeremy.threadpool.AsyncServiceImpl   : 【結束執行任務】
2023-10-31 18:00:33.296  INFO 53009 --- [       Custom-1] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】

採用線程池執行的任務,多個線程同時執行任務,能有效的降低了任務的延遲性。定時任務間隔固定時間從數據庫 Mysql 或者 Redis 獲取需要請求的數據,同時執行請求。

這樣就有幾個問題:

除了定時器,還有什麼組件可以解決上面問題,那就是使用消息中間件了。

方案三:消息中間件 + 線程池(可行)

使用線程池的方式開啓多個線程運行。那針對固定時間間隔和只能同時執行的問題使用消息中間件就能很好的解決問題,消息中間件採用**「生產 + 消費」**模型實現消息的生產和消費,

延遲隊列

本文使用消息中間件 RabbitMQ 實現延遲隊列,具體實現可以看我的另外一篇文章延遲隊列實現訂單超時自動取消, 具體實現流程圖試下如下。

請求發送失敗之後,調用生產者發送消息,經過設定的時間間隔之後,發送給消費者,消費端再次發起請求,如果請求失敗,再調用生產者發送消息,並設置好下一次的時間間隔,其中消費端發起任務使用線程池發起請求。

下載 RabbitMQ 延遲消息的插件 delayed_message_exchange,

在 Github 官網找到對應的版本,我選擇的是 3.8.17:

配置延遲隊列:

@Configuration
public class XDelayedMessageConfig {

 /**
  * 延遲交換機
  */
 public static final String DELAYED_EXCHANGE = "exchange.delayed";

 /**
  * 重試隊列
  */
 public static final String RETRY_QUEUE = "queue.retry";


 /**
  * 重試routing key
  */
 public static final String RETRY_ROUTING_KEY = "routingKey.bind.retry";

    @Bean
 public Queue retryQueue() {
  return new Queue(RETRY_QUEUE,true);
 }

 
 /**
  * 定義延遲交換機
  * 交換機的類型爲 x-delayed-message
  * @return
  */
 @Bean
 public CustomExchange delayedExchange() {
  Map<String,Object> map = new HashMap<>();
  map.put("x-delayed-type","direct");
  return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
 }



 @Bean
 public Binding retryQueueBinding() {
  return BindingBuilder.bind(retryQueue()).to(delayedExchange()).with(RETRY_ROUTING_KEY).noargs();
 }



}

在發送端模擬重試機制,設置時間間隔 5、10、30 秒。

@Autowired
private RabbitTemplate rabbitTemplate;

private final int[] INTERVAL_ARRAY= {5,10,30};

@GetMapping("/retry")
public String retry(int index) {
    if (index >= 0 && index <= 2) {
        send(index +",延遲" + INTERVAL_ARRAY[index] + "s",INTERVAL_ARRAY[index]);
    }
    return "ok";
}


private void send(String message,Integer delayTime) {
    message = message + " " + DateUtil.dateFormat(new Date());
    System.out.println("【發送消息】" + message);
    rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.RETRY_ROUTING_KEY,
            message, message1 -> {
                message1.getMessageProperties().setDelay(delayTime*1000);
                return message1;
            });
}

接收端:

@RabbitListener(queues = XDelayedMessageConfig.RETRY_QUEUE)
public void delayProcess(String msg, Channel channel, Message message) {
    System.out.println("【接收消息】" + msg + " 當前時間" + DateUtil.dateFormat(new Date()));
    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (IOException e) {
        e.printStackTrace();
    }
    int index = Integer.parseInt(msg.split(",")[0]);
    retry(++index);
}

控制檯輸出:

【發送消息】0,延遲5s 10:59:29
【接收消息】0,延遲5s 10:59:29 當前時間10:59:33
【發送消息】1,延遲10s 10:59:33
【接收消息】1,延遲10s 10:59:33 當前時間10:59:43
【發送消息】2,延遲30s 10:59:43
【接收消息】2,延遲30s 10:59:43 當前時間11:00:10

其中 0、1、2 表示重試的次數。通過延遲消息的方式,重試發送信息。每個任務作爲一個消息進行消費。和定時服務相比,有以下幾個優點:

總結

在發送一些異步通知時候,需要考慮到通知可能接收失敗的情況,比如:

此時無法正確的及時推送通知,無法保證通知的可靠性。這個時候就需要重試多次,而且間隔要依次增加,因爲服務啓動或者網絡的卡頓在經過一段時間就恢復了。後續重試成功的概率就更高了。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/803kbx-QqKj_H8eWd2GFig