聊聊併發編程的 12 種業務場景
大家好,我是蘇三,又跟大家見面了。
前言
併發編程是一項非常重要的技術,無論在面試,還是工作中出現的頻率非常高。
之前我發表的一篇《聊聊併發編程的 10 個坑》,在全網廣受好評。說明了這類文章還是比較有價值的,接下來,打算繼續聊聊併發編程這個話題。
併發編程說白了就是多線程編程,但多線程一定比單線程效率更高?
答:不一定,要看具體業務場景。
畢竟如果使用了多線程,那麼線程之間的競爭和搶佔 cpu 資源,線程的上下文切換,也是相對來說比較耗時的操作。
下面這幾個問題在面試中,你必定遇到過:
-
你在哪來業務場景中使用過多線程?
-
怎麼用的?
-
踩過哪些坑?
今天聊聊我之前在項目中用併發編程的 12 種業務場景,給有需要的朋友一個參考。
- 簡單定時任務
各位親愛的朋友,你沒看錯,Thread
類真的能做定時任務。如果你看過一些定時任務框架
的源碼,你最後會發現,它們的底層也會使用 Thread 類。
實現這種定時任務的具體代碼如下:
public static void init() {
new Thread(() -> {
while (true) {
try {
System.out.println("下載文件");
Thread.sleep(1000 * 60 * 5);
} catch (Exception e) {
log.error(e);
}
}
}).start();
}
使用 Thread 類可以做最簡單的定時任務,在 run 方法中有個 while 的死循環(當然還有其他方式),執行我們自己的任務。有個需要特別注意的地方是,需要用try...catch
捕獲異常,否則如果出現異常,就直接退出循環,下次將無法繼續執行了。
但這種方式做的定時任務,只能週期性執行,不能支持定時在某個時間點執行。
特別提醒一下,該線程建議定義成守護線程
,可以通過setDaemon
方法設置,讓它在後臺默默執行就好。
使用場景:比如項目中有時需要每隔 5 分鐘去下載某個文件
,或者每隔 10 分鐘去讀取模板文件生成靜態html頁面
等等,一些簡單的週期性任務場景。
使用Thread
類做定時任務的優缺點:
-
優點:這種定時任務非常簡單,學習成本低,容易入手,對於那些簡單的週期性任務,是個不錯的選擇。
-
缺點:不支持指定某個時間點執行任務,不支持延遲執行等操作,功能過於單一,無法應對一些較爲複雜的場景。
- 監聽器
有時候,我們需要寫個監聽器,去監聽某些數據的變化。
比如:我們在使用canal
的時候,需要監聽binlog
的變化,能夠及時把數據庫中的數據,同步到另外一個業務數據庫中。
主要代碼如下:
@Service
public CanalService {
private volatile boolean running = false;
private Thread thread;
@Autowired
private CanalConnector canalConnector;
public void handle() {
//連接canal
while(running) {
//業務處理
}
}
public void start() {
thread = new Thread(this::handle, "name");
running = true;
thread.start();
}
public void stop() {
if(!running) {
return;
}
running = false;
}
}
在 start 方法中開啓了一個線程,在該線程中異步執行 handle 方法的具體任務。然後通過調用 stop 方法,可以停止該線程。
其中,使用volatile
關鍵字控制的 running 變量作爲開關,它可以控制線程中的狀態。
接下來,有個比較關鍵的點是:如何通過配置中心的配置,控制這個開關呢?
以apollo
配置爲例,我們在配置中心的後臺,修改配置之後,自動獲取最新配置的核心代碼如下:
public class CanalConfig {
@Autowired
private CanalService canalService;
@ApolloConfigChangeListener
public void change(ConfigChangeEvent event) {
String value = event.getChange("test.canal.enable").getNewValue();
if(BooleanUtils.toBoolean(value)) {
canalService.start();
} else {
canalService.stop();
}
}
}
通過apollo
的ApolloConfigChangeListener
註解,可以監聽配置參數的變化。
如果test.canal.enable
開關配置的 true,則調用 canalService 類的 start 方法開啓 canal 數據同步功能。如果開關配置的 false,則調用 canalService 類的 stop 方法,自動停止 canal 數據同步功能。
- 收集日誌
在某些高併發的場景中,我們需要收集部分用戶的日誌(比如:用戶登錄的日誌),寫到數據庫中,以便於做分析。
但由於項目中,還沒有引入消息中間件,比如:kafka
、rocketmq
等。
如果直接將日誌同步寫入數據庫,可能會影響接口性能。
所以,大家很自然想到了異步處理。
實現這個需求最簡單的做法是,開啓一個線程,異步寫入數據到數據庫即可。
這樣做,可以是可以。
但如果用戶登錄操作的耗時,比異步寫入數據庫的時間要少得多。這樣導致的結果是:生產日誌的速度,比消費日誌的速度要快得多,最終的性能瓶頸在消費端。
其實,還有更優雅的處理方式,雖說沒有使用消息中間件,但借用了它的思想。
這套記錄登錄日誌的功能,分爲:日誌生產端、日誌存儲端和日誌消費端。
如下圖所示:
先定義了一個阻塞隊列。
@Component
public class LoginLogQueue {
private static final int QUEUE_MAX_SIZE = 1000;
private BlockingQueueblockingQueue queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
//生成消息
public boolean push(LoginLog loginLog) {
return this.queue.add(loginLog);
}
//消費消息
public LoginLog poll() {
LoginLog loginLog = null;
try {
loginLog = this.queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}
然後定義了一個日誌的生產者。
@Service
public class LoginSerivce {
@Autowired
private LoginLogQueue loginLogQueue;
public int login(UserInfo userInfo) {
//業務處理
LoginLog loginLog = convert(userInfo);
loginLogQueue.push(loginLog);
}
}
接下來,定義了日誌的消費者。
@Service
public class LoginInfoConsumer {
@Autowired
private LoginLogQueue queue;
@PostConstruct
public voit init {
new Thread(() -> {
while (true) {
LoginLog loginLog = queue.take();
//寫入數據庫
}
}).start();
}
}
當然,這個例子中使用單線程接收登錄日誌,爲了提升性能,也可以使用線程池來處理業務邏輯(比如:寫入數據庫)等。
4.excel 導入
我們可能會經常收到運營同學提過來的 excel 數據導入需求,比如:將某一大類下的所有子類一次性導入系統,或者導入一批新的供應商數據等等。
我們以導入供應商數據爲例,它所涉及的業務流程很長,比如:
-
調用天眼查接口校驗企業名稱和統一社會信用代碼。
-
寫入供應商基本表
-
寫入組織表
-
給供應商自動創建一個用戶
-
給該用戶分配權限
-
自定義域名
-
發站內通知
等等。
如果在程序中,解析完 excel,讀取了所有數據之後。用單線程一條條處理業務邏輯,可能耗時會非常長。
爲了提升 excel 數據導入效率,非常有必要使用多線程來處理。
當然在 java 中實現多線程的手段有很多種,下面重點聊聊 java8 中最簡單的實現方式:parallelStream
。
僞代碼如下:
supplierList.parallelStream().forEach(x -> importSupplier(x));
parallelStream
是一個並行執行的流,它默認通過ForkJoinPool
實現的,能提高你的多線程任務的速度。
ForkJoinPool
處理的過程會分而治之,它的核心思想是:將一個大任務切分成多個小任務
。每個小任務都能單獨執行,最後它會把所用任務的執行結果進行彙總。
下面用一張圖簡單介紹一下 ForkJoinPool 的原理:
當然除了 excel 導入之外,還有類似的讀取文本文件,也可以用類似的方法處理。
溫馨的提醒一下,如果一次性導入的數據非常多,用多線程處理,可能會使系統的 cpu 使用率飆升,需要特別關注。
- 查詢接口
很多時候,我們需要在某個查詢接口中,調用其他服務的接口,組合數據之後,一起返回。
比如有這樣的業務場景:
在用戶信息查詢接口中需要返回:用戶名稱、性別、等級、頭像、積分、成長值等信息。
而用戶名稱、性別、等級、頭像在用戶服務中,積分在積分服務中,成長值在成長值服務中。爲了彙總這些數據統一返回,需要另外提供一個對外接口服務。
於是,用戶信息查詢接口需要調用用戶查詢接口、積分查詢接口 和 成長值查詢接口,然後彙總數據統一返回。
調用過程如下圖所示:
調用遠程接口總耗時 530ms = 200ms + 150ms + 180ms
顯然這種串行調用遠程接口性能是非常不好的,調用遠程接口總的耗時爲所有的遠程接口耗時之和。
那麼如何優化遠程接口性能呢?
既然串行調用多個遠程接口性能很差,爲什麼不改成並行呢?
如下圖所示:
調用遠程接口總耗時 200ms = 200ms(即耗時最長的那次遠程接口調用)
在 java8 之前可以通過實現Callable
接口,獲取線程返回結果。
java8 以後通過CompleteFuture
類實現該功能。我們這裏以 CompleteFuture 爲例:
public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException {
final UserInfo userInfo = new UserInfo();
CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
getRemoteUserAndFill(id, userInfo);
return Boolean.TRUE;
}, executor);
CompletableFuture bonusFuture = CompletableFuture.supplyAsync(() -> {
getRemoteBonusAndFill(id, userInfo);
return Boolean.TRUE;
}, executor);
CompletableFuture growthFuture = CompletableFuture.supplyAsync(() -> {
getRemoteGrowthAndFill(id, userInfo);
return Boolean.TRUE;
}, executor);
CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();
userFuture.get();
bonusFuture.get();
growthFuture.get();
return userInfo;
}
溫馨提醒一下,這兩種方式別忘了使用線程池
。示例中我用到了executor
,表示自定義的線程池,爲了防止高併發場景下,出現線程過多的問題。
- 獲取用戶上下文
不知道你在項目開發時,有沒有遇到過這樣的需求:用戶登錄之後,在所有的請求接口中,通過某個公共方法,就能獲取到當前登錄用戶的信息?
獲取的用戶上下文,我們以CurrentUser
爲例。
CurrentUser
內部包含了一個ThreadLocal
對象,它負責保存當前線程的用戶上下文信息。當然爲了保證在線程池中,也能從用戶上下文中獲取到正確的用戶信息,這裏用了阿里的TransmittableThreadLocal
。僞代碼如下:
@Data
public class CurrentUser {
private static final TransmittableThreadLocal<CurrentUser> THREA_LOCAL = new TransmittableThreadLocal<>();
private String id;
private String userName;
private String password;
private String phone;
...
public statis void set(CurrentUser user) {
THREA_LOCAL.set(user);
}
public static void getCurrent() {
return THREA_LOCAL.get();
}
}
這裏爲什麼用了阿里的 TransmittableThreadLocal,而不是普通的 ThreadLocal 呢?在線程池中,由於線程會被多次複用,導致從普通的 ThreadLocal 中無法獲取正確的用戶信息。父線程中的參數,沒法傳遞給子線程,而 TransmittableThreadLocal 很好解決了這個問題。
然後在項目中定義一個全局的 spring mvc 攔截器,專門設置用戶上下文到 ThreadLocal 中。僞代碼如下:
public class UserInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
CurrentUser user = getUser(request);
if(Objects.nonNull(user)) {
CurrentUser.set(user);
}
}
}
用戶在請求我們接口時,會先觸發該攔截器,它會根據用戶 cookie 中的 token,調用調用接口獲取 redis 中的用戶信息。如果能獲取到,說明用戶已經登錄,則把用戶信息設置到 CurrentUser 類的 ThreadLocal 中。
接下來,在 api 服務的下層,即 business 層的方法中,就能輕鬆通過 CurrentUser.getCurrent(); 方法獲取到想要的用戶上下文信息了。
這套用戶體系的想法是很 good 的,但深入使用後,發現了一個小插曲:
api 服務和 mq 消費者服務都引用了 business 層,business 層中的方法兩個服務都能直接調用。
我們都知道在 api 服務中用戶是需要登錄的,而 mq 消費者服務則不需要登錄。
如果 business 中的某個方法剛開始是給 api 開發的,在方法深處使用了 CurrentUser.getCurrent(); 獲取用戶上下文。但後來,某位新來的帥哥在 mq 消費者中也調用了那個方法,並未發覺這個小機關,就會中招,出現找不到用戶上下文的問題。
所以我當時的第一個想法是:代碼沒做兼容處理,因爲之前這類問題偶爾會發生一次。
想要解決這個問題,其實也很簡單。只需先判斷一下能否從 CurrentUser 中獲取用戶信息,如果不能,則取配置的系統用戶信息。僞代碼如下:
@Autowired
private BusinessConfig businessConfig;
CurrentUser user = CurrentUser.getCurrent();
if(Objects.nonNull(user)) {
entity.setUserId(user.getUserId());
entity.setUserName(user.getUserName());
} else {
entity.setUserId(businessConfig.getDefaultUserId());
entity.setUserName(businessConfig.getDefaultUserName());
}
這種簡單無公害的代碼,如果只是在一兩個地方加還 OK。
此外,衆所周知,SimpleDateFormat
在 java8 以前,是用來處理時間的工具類,它是非線程安全的。也就是說,用該方法解析日期會有線程安全問題。
爲了避免線程安全問題的出現,我們可以把 SimpleDateFormat 對象定義成局部變量
。但如果你一定要把它定義成靜態變量,可以使用 ThreadLocal 保存日期,也能解決線程安全問題。
- 傳遞參數
之前見過有些同事寫代碼時,一個非常有趣的用法,即:使用MDC
傳遞參數。
MDC 是什麼?
MDC
是org.slf4j
包下的一個類,它的全稱是Mapped Diagnostic Context
,我們可以認爲它是一個線程安全的存放診斷日誌的容器。
MDC
的底層是用了ThreadLocal
來保存數據的。
例如現在有這樣一種場景:我們使用RestTemplate
調用遠程接口時,有時需要在header
中傳遞信息,比如:traceId,source 等,便於在查詢日誌時能夠串聯一次完整的請求鏈路,快速定位問題。
這種業務場景就能通過ClientHttpRequestInterceptor
接口實現,具體做法如下:
第一步,定義一個 LogFilter 攔截所有接口請求,在 MDC 中設置 traceId:
public class LogFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
MdcUtil.add(UUID.randomUUID().toString());
System.out.println("記錄請求日誌");
chain.doFilter(request, response);
System.out.println("記錄響應日誌");
}
@Override
public void destroy() {
}
}
第二步,實現ClientHttpRequestInterceptor
接口,MDC 中獲取當前請求的 traceId,然後設置到 header 中:
public class RestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
request.getHeaders().set("traceId", MdcUtil.get());
return execution.execute(request, body);
}
}
第三步,定義配置類,配置上面定義的 RestTemplateInterceptor 類:
@Configuration
public class RestTemplateConfiguration {
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.setInterceptors(Collections.singletonList(restTemplateInterceptor()));
return restTemplate;
}
@Bean
public RestTemplateInterceptor restTemplateInterceptor() {
return new RestTemplateInterceptor();
}
}
其中 MdcUtil 其實是利用 MDC 工具在ThreadLocal
中存儲和獲取 traceId
public class MdcUtil {
private static final String TRACE_ID = "TRACE_ID";
public static String get() {
return MDC.get(TRACE_ID);
}
public static void add(String value) {
MDC.put(TRACE_ID, value);
}
}
當然,這個例子中沒有演示 MdcUtil 類的 add 方法具體調的地方,我們可以在filter
中執行接口方法之前,生成 traceId,調用 MdcUtil 類的 add 方法添加到 MDC 中,然後在同一個請求的其他地方就能通過 MdcUtil 類的 get 方法獲取到該 traceId。
能使用 MDC 保存 traceId 等參數的根本原因是,用戶請求到應用服務器,Tomcat
會從線程池中分配一個線程去處理該請求。
那麼該請求的整個過程中,保存到MDC
的ThreadLocal
中的參數,也是該線程獨享的,所以不會有線程安全問題。
- 模擬高併發
有時候我們寫的接口,在低併發的場景下,一點問題都沒有。
但如果一旦出現高併發調用,該接口可能會出現一些意想不到的問題。
爲了防止類似的事情發生,一般在項目上線前,我們非常有必要對接口做一下壓力測試
。
當然,現在已經有比較成熟的壓力測試工具,比如:Jmeter
、LoadRunner
等。
如果你覺得下載壓測工具比較麻煩,也可以手寫一個簡單的模擬併發操作的工具,用CountDownLatch
就能實現,例如:
public static void concurrenceTest() {
/**
* 模擬高併發情況代碼
*/
final AtomicInteger atomicInteger = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1000); // 相當於計數器,當所有都準備好了,再一起執行,模仿多併發,保證併發量
final CountDownLatch countDownLatch2 = new CountDownLatch(1000); // 保證所有線程執行完了再打印atomicInteger的值
ExecutorService executorService = Executors.newFixedThreadPool(10);
try {
for (int i = 0; i < 1000; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await(); //一直阻塞當前線程,直到計時器的值爲0,保證同時併發
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
//每個線程增加1000次,每次加1
for (int j = 0; j < 1000; j++) {
atomicInteger.incrementAndGet();
}
countDownLatch2.countDown();
}
});
countDownLatch.countDown();
}
countDownLatch2.await();// 保證所有線程執行完
executorService.shutdown();
} catch (Exception e){
log.error(e.getMessage(),e);
}
}
- 處理 mq 消息
在高併發的場景中,消息積壓問題,可以說如影隨形,真的沒辦法從根本上解決。表面上看,已經解決了,但後面不知道什麼時候,就會冒出一次,比如這次:
有天下午,產品過來說:有幾個商戶投訴過來了,他們說菜品有延遲,快查一下原因。
這次問題出現得有點奇怪。
爲什麼這麼說?
首先這個時間點就有點奇怪,平常出問題,不都是中午或者晚上用餐高峯期嗎?怎麼這次問題出現在下午?
根據以往積累的經驗,我直接看了kafka
的topic
的數據,果然上面消息有積壓,但這次每個partition
都積壓了十幾萬的消息沒有消費,比以往加壓的消息數量增加了幾百倍。這次消息積壓得極不尋常。
我趕緊查服務監控看看消費者掛了沒,還好沒掛。又查服務日誌沒有發現異常。這時我有點迷茫,碰運氣問了問訂單組下午發生了什麼事情沒?他們說下午有個促銷活動,跑了一個JOB
批量更新過有些商戶的訂單信息。
這時,我一下子如夢初醒,是他們在 JOB 中批量發消息導致的問題。怎麼沒有通知我們呢?實在太坑了。
雖說知道問題的原因了,倒是眼前積壓的這十幾萬的消息該如何處理呢?
此時,如果直接調大partition
數量是不行的,歷史消息已經存儲到4
個固定的 partition,只有新增的消息纔會到新的 partition。我們重點需要處理的是已有的 partition。
直接加服務節點也不行,因爲kafka
允許同組的多個partition
被一個consumer
消費,但不允許一個 partition 被同組的多個 consumer 消費,可能會造成資源浪費。
看來只有用多線程
處理了。
爲了緊急解決問題,我改成了用線程池處理消息,核心線程和最大線程數都配置成了50
。
大致用法如下:
- 先定義一個線程池:
@Configuration
public class ThreadPoolConfig {
@Value("${thread.pool.corePoolSize:5}")
private int corePoolSize;
@Value("${thread.pool.maxPoolSize:10}")
private int maxPoolSize;
@Value("${thread.pool.queueCapacity:200}")
private int queueCapacity;
@Value("${thread.pool.keepAliveSeconds:30}")
private int keepAliveSeconds;
@Value("${thread.pool.threadNamePrefix:ASYNC_}")
private String threadNamePrefix;
@Bean("messageExecutor")
public Executor messageExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix(threadNamePrefix);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
- 再定義一個消息的 consumer:
@Service
public class MyConsumerService {
@Autowired
private Executor messageExecutor;
@KafkaListener(id="test",topics={"topic-test"})
public void listen(String message){
System.out.println("收到消息:" + message);
messageExecutor.submit(new MyWork(message);
}
}
- 在定義的 Runable 實現類中處理業務邏輯:
public class MyWork implements Runnable {
private String message;
public MyWork(String message) {
this.message = message;
}
@Override
public void run() {
System.out.println(message);
}
}
果然,調整之後消息積壓數量確實下降的非常快,大約半小時後,積壓的消息就非常順利的處理完了。
但此時有個更嚴重的問題出現:我收到了報警郵件,有兩個訂單系統的節點 down 機了。。。
更詳細內容,請看看我的另一篇文章《我用 kafka 兩年踩過的一些非比尋常的坑》
- 統計數量
在多線程的場景中,有時候需要統計數量,比如:用多線程導入供應商數據時,統計導入成功的供應商數有多少。
如果這時候用 count++ 統計次數,最終的結果可能會不準。因爲 count++ 並非原子操作,如果多個線程同時執行該操作,則統計的次數,可能會出現異常。
爲了解決這個問題,就需要使用concurent
的atomic
包下面的類,比如:AtomicInteger
、AtomicLong
等。
@Servcie
public class ImportSupplierService {
private static AtomicInteger count = new AtomicInteger(0);
public int importSupplier(List<SupplierInfo> supplierList) {
if(CollectionUtils.isEmpty(supplierList)) {
return 0;
}
supplierList.parallelStream().forEach(x -> {
try {
importSupplier(x);
count.addAndGet(1);
} catch(Exception e) {
log.error(e.getMessage(),e);
}
);
return count.get();
}
}
AtomicInteger
的底層說白了使用自旋鎖
+CAS
。
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
自旋鎖
說白了就是一個死循環
。
而CAS
是比較
和交換
的意思。
它的實現邏輯是:將內存位置處的舊值
與預期值
進行比較,若相等,則將內存位置處的值替換爲新值
。若不相等,則不做任何操作。
- 延遲定時任務
我們經常有延遲處理數據的需求,比如:如果用戶下單後,超過 30 分鐘還未完成支付,則系統自動將該訂單取消。
這裏需求就可以使用延遲定時任務
實現。
ScheduledExecutorService
是JDK1.5+
版本引進的定時任務,該類位於java.util.concurrent
併發包下。
ScheduledExecutorService 是基於多線程的,設計的初衷是爲了解決Timer
單線程執行,多個任務之間會互相影響的問題。
它主要包含 4 個方法:
-
schedule(Runnable command,long delay,TimeUnit unit),帶延遲時間的調度,只執行一次,調度之後可通過 Future.get() 阻塞直至任務執行完畢。
-
schedule(Callablecallable,long delay,TimeUnit unit),帶延遲時間的調度,只執行一次,調度之後可通過 Future.get() 阻塞直至任務執行完畢,並且可以獲取執行結果。
-
scheduleAtFixedRate,表示以固定頻率執行的任務,如果當前任務耗時較多,超過定時週期 period,則當前任務結束後會立即執行。
-
scheduleWithFixedDelay,表示以固定延時執行任務,延時是相對當前任務結束爲起點計算開始時間。
實現這種定時任務的具體代碼如下:
public class ScheduleExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println("doSomething");
},1000,1000, TimeUnit.MILLISECONDS);
}
}
調用ScheduledExecutorService
類的scheduleAtFixedRate
方法實現週期性任務,每隔 1 秒鐘執行一次,每次延遲 1 秒再執行。
這種定時任務是阿里巴巴開發者規範中用來替代Timer
類的方案,對於多線程執行週期性任務,是個不錯的選擇。
使用ScheduledExecutorService
類做延遲定時任務的優缺點:
-
優點:基於多線程的定時任務,多個任務之間不會相關影響,支持週期性的執行任務,並且帶延遲功能。
-
缺點:不支持一些較複雜的定時規則。
當然,你也可以使用分佈式定時任務,比如:xxl-job 或者 elastic-job 等等。
其實,在實際工作中我使用多線程的場景遠遠不只這 12 種,在這裏只是拋磚引玉,介紹了一些我認爲比較常見的業務場景。
此外,如果你對併發編程中的一些坑,比較感興趣的話,可以看看我的另一個文章《聊聊併發編程的 10 個坑》,裏面寫的非常詳細。
蘇三說技術 作者就職於知名互聯網公司,掘金月度優秀作者,從事開發、架構和部分管理工作。實戰經驗豐富,對 jdk、spring、springboot、springcloud、mybatis 等開源框架源碼有一定研究,歡迎關注,和我一起交流。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/BE9lti3pJ1H8HveUUXCFgQ