如何實現異步通知的重試機制
工作中經常要和第三方做對接,比如支付、電子合同等系統。操作成功之後,第三方會發送異步的通知,返回最終的處理結果,使用異步而不是使用同步通知,是爲了加快系統響應速度,防止線程阻塞。任務處理完成後通過異步的通知,發送給對應的服務端。之前對接微信支付,完成支付後,微信發送一個異步通知給服務端,服務端根據支付通知修改狀態,通知規則看到以下的一段話。
其中有段話:
重新發送通知,直到成功爲止(在通知一直不成功的情況下,微信總共會發起多次通知,通知頻率爲 15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h - 總計 24h4m)
微信爲何要這麼設計
微信結果通知本質就是發送一個網絡請求到不同的服務器上,既然是一個網絡請求,就可能因爲各種原因導致請求超時或者失敗,比如:
-
請求的服務器掛了
-
網絡發生了波動
-
服務器響應異常
以上原因都會導致支付結果通知接收失敗,也就無法通知給用戶。爲了解決上述的問題,就需要引入**「重試機制」**,當請求無法應答時,就需要重試幾次,保證請求能確認發送。
異步通知的重試機制
從微信支付通知可以引申到所有的異步通知,或者和第三方對接時。如果要確保通知能被成功的接收,就需要考慮請求失敗的情況,大部分都是需要使用重試機制。而重試機制是隔段時間不是固定的,是越來越大的,這是考慮到重試時,由於網絡故障或者服務器故障重啓設備需要花一段時間,而**「間隔時間越來越長就可以更大的保證請求可以被成功接收」**。
重複請求,接口需要考慮重複請求的情況,要設計成一個冪等性接口,多次請求和請求一次的效果是一致的。
重試機制的實現
重試機制就是一個定時器,隔一段時間執行一次,沒有預期的效果就再重複執行一次。
實現的難點就在於,間隔的時間是不一致的,如果時間的間隔是固定的話,就可以使用定時任務。
方案一:定時任務(不可行)
使用定時器,每隔一段時間執行一次任務。在 SpringBoot 啓動類添加 @EnableScheduling 註解,然後在執行的方法添加 @Scheduled 註解。
@Scheduled(fixedDelay = 1000*2)
public void test2() {
Date date = new Date();
System.out.println("tesk2 " + date);
}
以上表示每隔 2 秒執行一次。間隔時間都是固定的,這個不符合預期,因爲要求的時間間隔是依次增加的。
如果是間隔時間是固定的,那定時任務就符合條件嗎?
如果是隻有一條任務在執行,執行不成功,存放在 Redis 中,然後定時執行任務,如果任務執行成功,就去掉任務。但是定時器還是會定時執行。
如果執行的任務很多的話,前面的任務要等待後續的任務執行,那延遲就很嚴重了,就需要使用到多線程,開啓多個線程,在《阿里 Java 開發手冊》有一條:
線程資源必須通過線程池提供,不允許在應用中自行顯式創建線程。
定時任務有以下幾個缺點不滿足:
-
時間間隔固定。
-
只能單線程處理任務,任務越多,延遲性越久。
方案二:線程池 + 定時任務 (不可行)
既然使用單線程會產生延遲,就使用線程池來降低延遲,因爲發起請求屬於 IO 密集型,所以線程數設置成 CPU 個數的兩倍,在 SpringBoot 自定義一個線程池:
@Configuration
public class ThreadPoolConfig {
// 線程存活時間
private static int keepAliveTime = 10;
// 調用線程運行多餘任務
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
@Bean("customerTaskExecutor")
public TaskExecutor taskExecutor() {
// 核心線程數
int cores = Runtime.getRuntime().availableProcessors()*2;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(cores);
executor.setMaxPoolSize(cores);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setRejectedExecutionHandler(handler);
executor.setThreadNamePrefix("Custom-"); // 線程名前綴
executor.initialize();
return executor;
}
}
其中核心線程數和最大線程數設置成一致,拒絕策略使用調用線程運行多餘的任務,確保每個任務都能執行。然後添加一個異步方法.
public interface AsyncService {
void executeAsync();
}
@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("customerTaskExecutor")
public void executeAsync() {
log.info("【開始執行任務】");
// 延遲幾秒
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("【結束執行任務】");
}
}
使用 sleep 方法延遲,模擬請求,使用壓測工具,發起 100 次請求,控制檯輸出如下:
2023-10-31 18:00:32.792 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.811 INFO 53009 --- [ Custom-2] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.813 INFO 53009 --- [ Custom-3] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.814 INFO 53009 --- [ Custom-4] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.816 INFO 53009 --- [ Custom-5] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.817 INFO 53009 --- [ Custom-6] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.819 INFO 53009 --- [ Custom-7] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.820 INFO 53009 --- [ Custom-8] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.821 INFO 53009 --- [ Custom-9] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.823 INFO 53009 --- [ Custom-10] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.824 INFO 53009 --- [ Custom-11] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.825 INFO 53009 --- [ Custom-12] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:33.296 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【結束執行任務】
2023-10-31 18:00:33.296 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
採用線程池執行的任務,多個線程同時執行任務,能有效的降低了任務的延遲性。定時任務間隔固定時間從數據庫 Mysql 或者 Redis 獲取需要請求的數據,同時執行請求。
這樣就有幾個問題:
-
間隔是固定的。
-
空閒的時候沒有請求執行,到了執行時間,大量的請求在執行,導致**「閒的時候閒死,忙的時候忙死。」**資源得不到很好的利用。
除了定時器,還有什麼組件可以解決上面問題,那就是使用消息中間件了。
方案三:消息中間件 + 線程池(可行)
使用線程池的方式開啓多個線程運行。那針對固定時間間隔和只能同時執行的問題使用消息中間件就能很好的解決問題,消息中間件採用**「生產 + 消費」**模型實現消息的生產和消費,
延遲隊列
本文使用消息中間件 RabbitMQ 實現延遲隊列,具體實現可以看我的另外一篇文章延遲隊列實現訂單超時自動取消, 具體實現流程圖試下如下。
請求發送失敗之後,調用生產者發送消息,經過設定的時間間隔之後,發送給消費者,消費端再次發起請求,如果請求失敗,再調用生產者發送消息,並設置好下一次的時間間隔,其中消費端發起任務使用線程池發起請求。
下載 RabbitMQ 延遲消息的插件 delayed_message_exchange,
在 Github 官網找到對應的版本,我選擇的是 3.8.17:
配置延遲隊列:
@Configuration
public class XDelayedMessageConfig {
/**
* 延遲交換機
*/
public static final String DELAYED_EXCHANGE = "exchange.delayed";
/**
* 重試隊列
*/
public static final String RETRY_QUEUE = "queue.retry";
/**
* 重試routing key
*/
public static final String RETRY_ROUTING_KEY = "routingKey.bind.retry";
@Bean
public Queue retryQueue() {
return new Queue(RETRY_QUEUE,true);
}
/**
* 定義延遲交換機
* 交換機的類型爲 x-delayed-message
* @return
*/
@Bean
public CustomExchange delayedExchange() {
Map<String,Object> map = new HashMap<>();
map.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
}
@Bean
public Binding retryQueueBinding() {
return BindingBuilder.bind(retryQueue()).to(delayedExchange()).with(RETRY_ROUTING_KEY).noargs();
}
}
在發送端模擬重試機制,設置時間間隔 5、10、30 秒。
@Autowired
private RabbitTemplate rabbitTemplate;
private final int[] INTERVAL_ARRAY= {5,10,30};
@GetMapping("/retry")
public String retry(int index) {
if (index >= 0 && index <= 2) {
send(index +",延遲" + INTERVAL_ARRAY[index] + "s",INTERVAL_ARRAY[index]);
}
return "ok";
}
private void send(String message,Integer delayTime) {
message = message + " " + DateUtil.dateFormat(new Date());
System.out.println("【發送消息】" + message);
rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.RETRY_ROUTING_KEY,
message, message1 -> {
message1.getMessageProperties().setDelay(delayTime*1000);
return message1;
});
}
接收端:
@RabbitListener(queues = XDelayedMessageConfig.RETRY_QUEUE)
public void delayProcess(String msg, Channel channel, Message message) {
System.out.println("【接收消息】" + msg + " 當前時間" + DateUtil.dateFormat(new Date()));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
int index = Integer.parseInt(msg.split(",")[0]);
retry(++index);
}
控制檯輸出:
【發送消息】0,延遲5s 10:59:29
【接收消息】0,延遲5s 10:59:29 當前時間10:59:33
【發送消息】1,延遲10s 10:59:33
【接收消息】1,延遲10s 10:59:33 當前時間10:59:43
【發送消息】2,延遲30s 10:59:43
【接收消息】2,延遲30s 10:59:43 當前時間11:00:10
其中 0、1、2 表示重試的次數。通過延遲消息的方式,重試發送信息。每個任務作爲一個消息進行消費。和定時服務相比,有以下幾個優點:
-
支持動態間隔
-
任務不是同時執行,降低服務器的壓力。
總結
在發送一些異步通知時候,需要考慮到通知可能接收失敗的情況,比如:
-
請求的服務器掛了。
-
網絡發生了波動。
-
服務器響應異常,服務重啓。
此時無法正確的及時推送通知,無法保證通知的可靠性。這個時候就需要重試多次,而且間隔要依次增加,因爲服務啓動或者網絡的卡頓在經過一段時間就恢復了。後續重試成功的概率就更高了。
-
定時重試
-
定時重試首先不符合變化的間隔時間,間隔的時間是固定的,重試的任務都堆積在一起請求,這樣也會給服務器造成很大的壓力。而空閒的時候,服務器的利用率有比較低。
-
同時請求,只能一個一個同步執行任務,同時執行的任務越多,延遲就越嚴重。
-
定時任務 + 線程池
-
爲了解決同時處理任務,添加了自定義的線程池,因爲請求屬於 IO 密集型,所以設置線程數爲 CPU 核數的兩倍。
-
多個任務執行,降低了延遲性。
-
無法滿足動態間隔時間的問題,而且同時請求服務器壓力大。
-
延遲隊列 + 線程池
-
延遲時間請求可以使用到延遲隊列,每個任務都作爲一個消息。每次處理不成功,就發送消息到延遲隊列中,到達時間間隔之後,再消費消息。如果請求失敗再重複以上操作。
-
消費者處理消息,使用線程池處理,加快處理速度。也可以開啓多臺服務器分發處理任務,加快處理速度,降低任務的延遲性。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/803kbx-QqKj_H8eWd2GFig