技術專家帶你徹底掌握線程池
1. 導讀
線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然後在創建線程後自動啓動這些任務。線程池線程都是後臺線程。說到線程池,幾乎是項目中必備的、面試中必問的,但是很多人實際並沒有徹底掌握這項技能。如生產如何設置核心線程與最大線程配比、線程池的拒絕策略取捨等等。
本文包含以下內容:
-
什麼是線程池?
-
線程池相關類講解
-
JDK 定義的四類線程池
-
線程池的 7 大參數詳解
-
Spring/Spring Boot 使用線程池
-
根據設備 CPU 動態配置線程池
-
常見面試題精講
2. 什麼是線程池?
2.1 基本概念
線程池,顧名思義,就是存放預先創建好的線程的池子,需要使用的時候直接從池子裏拿即可。池化技術,可以類比數據庫連接池,存放預先創建好的數據庫連接的池子。
2.2 線程池優點
我們主張項目中,用線程池代替自己創建的線程,那麼爲什麼這樣建議呢?下面就來說一說,線程池的優點,爲什麼選擇使用線程池。
合理分配
設想一下這樣的情景,項目中使用到線程的地方,都是 new Thread 的方式,也就是說每次執行方法時都創建線程。那麼當大量請求湧入,方法被瘋狂調用,那麼線程是不是也在瘋狂地遞增,這樣,用不了多久服務器 CPU 就會被擠爆,而從導致宕機、癱瘓等問題。而這,顯然不是我們願意看到的,線程池的出現,很好的解決了這個問題。
線程池可以指定核心線程數和最大線程數,以及任務隊列,限制了線程不能被無限創建,集中由線程池進行分配,避免了可能由線程引發的資源耗盡問題。
線程預熱
項目啓動,線程池就會預先創建一部分線程以供使用。需要使用時,直接使用即可,減少了創建線程所需要的時間。
資源複用
線程的創建到銷燬是比較消耗 CPU 資源的,使用線程池,線程可以重複使用,提高了資源利用率。
2.3 進程和線程
本來想省略此節,但是由於面試中經常會提問,我們還是拿出來說一說。
-
進程:一個正在執行的計算機程序就是一個進程
-
線程:CPU 調度的最小單位,一個進程由一個或多個線程組成
2.4 線程的狀態
此小節爲高頻面試點,最好做到倒背如流。線程擁有生命週期,生命週期的各個階段就是線程的狀態。
線程有以下狀態:
-
新建
-
就緒
-
阻塞
-
等待(等待 / 等待超時)
-
終止
線程狀態源碼
源碼位置:java.lang.Thread
public enum State {
/**
* 線程被創建但還未啓動
*/
NEW,
/**
* 線程爲就緒(可運行)狀態,在 jvm 中執行,但是可能需要等待其他操作系統資源執行
*/
RUNNABLE,
/**
* 線程被監控器鎖阻塞
*/
BLOCKED,
/**
* 線程處於等待狀態,需要被喚醒才能繼續執行
*/
WAITING,
/**
* 等待超時,正在等待的線程超過了指定的等待時間。
*/
TIMED_WAITING,
/**
* 線程終止,線程執行完成
*/
TERMINATED;
}
2.5 併發和並行
記得有一次面試問到過這個問題,在這裏也給大家分享一下,併發和並行。
-
併發:多個線程訪問同一個資源
-
並行:同一時間執行多個任務
2.6 創建線程的幾種方式
-
new Thread 類
-
實現 Runnable 接口(無返回值)
-
實現 Callable 接口(有返回值)
-
使用線程池
3. 線程池相關類講解
3.1 簡單但有設計的 Executor 接口
線程池頂層接口是 Executor,它提供了一個 execute 執行方法。Executor 頂層接口的設計,用戶只需要提供實現 Runnable 接口的實現類即可,不需要關心線程的創建和具體的執行。任務提交與創建和執行進行了解耦。
public interface Executor {
void execute(Runnable command);
}
3.2 進一步增強的 ExecutorService 接口
ExecutorService 在 Executor 的基礎上,增加了一些能力:
-
停止和關閉任務線程
-
批量執行或指定執行用戶提交的任務
-
提交一個用戶執行的 Runnable 的任務
常用方法
// 提交 Runnable 任務
submit(Callable<T> task);
submit(Runnable task);
submit(Runnable task, T result);
// 請求關閉、發生超時或者當前線程中斷,無論哪一個首先發生之後,都將導致阻塞,直到所有任務完成執行
awaitTermination(long timeout, TimeUnit unit);
// 啓動一次順序關閉,執行以前提交的任務,但不接受新任務
shutdown();
// 試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表
shutdownNow();
// 批量執行給定的任務
invokeAll(Collection<? extends Callable<T>> tasks);
// 執行單個指定的任務
invokeAny(Collection<? extends Callable<T>> tasks);
3.3 AbstractExecutorService 抽象類
AbstractExecutorService 抽象類比較簡單,其大部分方法都繼承於 ExecutorService,在此基礎上增加了兩個 protected 方法,供子類重寫。
// 爲給定可運行任務和默認值返回一個 RunnableFuture。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value);
// 爲給定可運行任務返回一個 RunnableFuture。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
從 3.1~3.3 都是一些接口和抽象類的設計,並沒有具體實現,可見設計在前、實現在後的重要性。
3.4 主角 ThreadPoolExecutor 類
從圖 1 中,我們看到了 ThreadPoolExecutor 的繼承關係圖,即 ThreadPoolExecutor 實現了以上所有的接口和抽象類所具備的能力。我們平時說的 Java 線程池的真身,其實就是 ThreadPoolExecutor。
3.4.1 ThreadPoolExecutor 的運行原理圖
線程池在內部實際上構建了一個生產者消費者模型,將線程和任務兩者解耦,並不直接關聯,從而良好的緩衝任務,複用線程。線程池的運行主要分成兩部分:任務管理、線程管理。
3.4.2 任務提交執行流程
用戶提交任務,ThreadPoolExecutor 進行任務分配,分以下四種種情況:
-
存在空閒核心線程,線程分配核心線程直接執行
-
無空閒核心線程,阻塞隊列未滿,則緩衝執行。此時如果核心線程有空閒了,線程分配核心線程從阻塞隊列中獲取任務執行
-
無空閒核心線程,阻塞隊列隊滿,則線程分配新的線程執行任務。新線程的數量上限即最大線程數
-
無空閒核心線程,阻塞隊列隊滿,已到達最大線程數,則會執行飽和策略(任務拒絕)
看圖更好理解:
3.4.3 線程池生命週期
線程的生命週期和線程池的生命週期是有區別的,ThreadPoolExecutor 的運行狀態有 5 種,分別爲:
狀態的轉換流程圖如下:
線程池的狀態,不是用戶顯示設置的,而是由線程池內部來維護。線程池內部使用一個變量維護兩個值:運行狀態(runState)和線程數量(workerCount)。在具體實現中,線程池將運行狀態(runState)、線程數量(workerCount)兩個關鍵參數的維護放在了一起。
代碼如下:
// 原子整形, 底層採用 CAS 原理控制併發
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 變量是用於控制線程池狀態和有效線程數量的一個字段,它包含兩部分信息:線程池的運行狀態(runState)和線程池內有效線程的數量(workerCount),高 3 位保存 runState,低 29 位保存 workerCount,兩個變量之間互不干擾。用一個變量存儲兩個值的設計,可以避免在做出相關決策時出現不一致的情況,不必爲了維護兩者的一致,而佔用鎖資源。
ctl 變量的相關計算是使用位運算來完成的,相比於基礎運算,位運算速度較快。
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 計算當前運行狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 計算當前線程數量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通過狀態和線程數生成 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
3.4.4 添加線程源碼講解
private boolean addWorker(Runnable firstTask, boolean core) {
//相當於 goto
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池的狀態到了 SHUTDOWN 或者之上的狀態時候,只有一種情況還需要繼續添加線程,
// 那就是線程池已經 SHUTDOWN,但是隊列中還有任務在排隊,而且不接受新任務(firstTask 爲 null)
// 這裏還繼續添加線程的原因是加快執行等待隊列中的任務,儘快讓線程池關閉
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 傳入的 core 的參數,唯一用到的地方,如果線程數超過理論最大容量,如果 core 是 true 跟最大核心線程數比較,否則跟最大線程數比較
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 通過 CAS 自旋,增加線程數+1,增加成功跳出雙層循環,繼續往下執行
if (compareAndIncrementWorkerCount(c))
break retry;
// 檢測當前線程狀態如果發生了變化,則繼續回到 retry,重新開始循環
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到這裏,說明我們已經成功的將線程數+1 了,但是真正的線程還沒有被添加
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 添加線程,Worker 是繼承了 AQS,實現了 Runnable 接口的包裝類
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 檢查線程狀態, 邏輯和之前一樣
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 線程只能被 start 一次
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers 是一個 HashSet,添加我們新增的 Worker
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啓動 Worker
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3.4.5 Worker 的工作流程
3.5 線程工具類 Executors
Executors 是線程的工具類,用於幫助用戶快速創建線程池。此類包含所定義的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 類的工廠和實用方法。此類支持以下各種方法:
-
創建並返回設置有常用配置字符串的 ExecutorService 的方法。
-
創建並返回設置有常用配置字符串的 ScheduledExecutorService 的方法。
-
創建並返回 “包裝的”ExecutorService 方法,它通過使特定於實現的方法不可訪問來禁用重新配置。
-
創建並返回 ThreadFactory 的方法,它可將新創建的線程設置爲已知的狀態。
-
創建並返回非閉包形式的 Callable 的方法,這樣可將其用於需要 Callable 的執行方法中。
具體的應用我們將在下一節詳細講解,剩下相關的線程池相關類,我們將在後續逐步講解。
4. JDK 定義的四類線程池
小建議:建議先閱讀第五節——線程池 7 大參數詳解,這樣有助於大家閱讀理解。
JDK 中定義了四類線程池:
-
固定數量線程池
-
單線程線程池
-
帶緩存的線程池
-
定時任務線程池
下面我們將來一步步解析這四類線程池,這四類線程池可直接使用 3.5 節中的 Executors 創建。
4.1 固定數量線程池
4.1.1 創建固定數量線程池
/** 使用 Executors 工具類創建固定數量線程池 */
private ExecutorService executorService = Executors.newFixedThreadPool(3);
public void start() {
// 提交一個 Runnable 任務
executorService.submit(() -> {
System.out.println("hello word");
});
}
4.1.2 固定數量線程池源碼解讀
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定數量線程池的底層就是我們第 3 節講解的 ThreadPoolExecutor 類,通過構造方法設置參數。特點:核心線程數和最大線程數相等,存活時間爲 0,即始終活躍,阻塞隊列使用的是 LinkedBlockingQueue。
4.1.3 阻塞隊列 LinkedBlockingQueue
上一小節提到,固定數量線程池使用的是 LinkedBlockingQueue 作爲阻塞隊列,那麼 LinkedBlockingQueue 隊列有什麼特點呢?爲什麼選擇它作爲阻塞隊列呢?
-
由鏈表結構組成的隊列,隊列中的元素按 FIFO(先進先出的原則對元素進行排序)
-
排在隊列頭部的元素是時間最長的元素,排在隊尾的元素是時間最短的元素
-
鏈接隊列的吞吐量通常要高於基於數組的隊列
缺點:
- 如果指定容量,則可以在一定程度上防止隊列過度拓展,隊滿時無法插入。如果不指定容量,則使用 Integer.MAX_VALUE 作爲默認容量。
由於設定了固定數量的線程,那麼用戶提交的任務很可能就超出了核心線程數,此時任務隊列對插入和取出的要求就比較高,鏈表結構在插入和刪除的效率較高,故選擇此隊列。
4.2 單線程線程池
4.2.1 創建單線程線程池
/** 使用 Executors 工具類創建 */
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public void start() {
// 提交一個 Runnable 任務
executorService.submit(() -> {
System.out.println("hello word");
});
}
4.2.2 單線程線程池源碼
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
單線程線程池指定核心線程和最大線程均爲一,即從始至終線程池中只會存在一個線程,線程始終活躍,阻塞隊列爲 LinkedBlockingQueue
4.3 帶緩存的線程池
4.3.1 創建帶緩存的線程池
private ExecutorService executorService = Executors.newCachedThreadPool();
public void start() {
executorService.submit(() -> {
System.out.println("hello word");
});
}
4.3.2 帶緩存線程池源碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
帶緩存的線程池,這裏的 “緩存” 不是指的數據緩存,而是指按需創建線程,並設置了存活時間,在存活時間內線程還可以處理其他任務。我們可以看到,它設置的核心線程數是 0,最大線程數是 Integer.MAX_VALUE。也就是說,線程池創建時,不初始化存放線程,當用戶提交任務時,只要任務數小於 Integer.MAX_VALUE,則直接創建線程執行。線程執行完成後並不會立即銷燬,而會緩存存活 60 秒,在 60 秒內,如果還有用戶任務提交,且任務數小於等於存活的線程數,則由存活的線程執行。如果大於存活線程數,且小於 Integer.MAX_VALUE,則創建 任務數 - 存活線程數 的差值個線程,進行處理。
4.3.3 阻塞隊列 SynchronousQueue
我們發現,帶緩存的線程池沒有使用 LinkedBlockingQueue 阻塞隊列,而是使用的 SynchronousQueue 隊列。
特點:隊列中的元素插入和移出必須是同時操作的,也就是說一個任務被取出的同時,也要有一個任務被插入。二者同時進行,是一個同步隊列。
同步隊列類似於 CSP 和 Ada 中使用的 rendezvous 信道。它非常適合於傳遞性設計,在這種設計中,在一個線程中運行的對象要將某些信息、事件或任務傳遞給在另一個線程中運行的對象,它就必須與該對象同步。
支持公平和非公平,看源碼。
/**
* Creates a {@code SynchronousQueue} with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
當指定爲公平隊列時,會創建一個 FIFO 的有序隊列,否則順序是未指定的。
4.4 定時任務線程池
4.4.1 創建一個定時任務線程池
// 創建一個定時任務線程池, 並指定核心線程數
private ExecutorService executorService = Executors.newScheduledThreadPool(10);
public void start() {
// 提交一個 Runnable 任務
executorService.submit(() -> {
System.out.println("hello word");
});
}
4.4.2 定時任務線程池源碼
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
底層使用的時
ScheduledThreadPoolExecutor, 我們追蹤進去看一下,發現它是 super 調用父類的構造方法。
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
其父類就是 ThreadPoolExecutor。創建的是一個指定核心線程數,最大線程數爲 Integer.MAX_VALUE,阻塞隊列爲 DelayedWorkQueue 的線程池。
DelayedWorkQueue 基於堆的數據結構 類似於 DelayQueue 和 PriorityQueue,每個 ScheduledFutureTask 將其索引記錄到 堆數組。這彌補了查找任務的損失的效率 ,大大加快刪除速度(從 O(n) 到 O(log n))。
5. 線程池的 7 大參數詳解
從上一節我們知道,JDK 自帶的四類線程池都是根據配置 ThreadPoolExecutor 而得到的。不同的參數組合誕生不同線程池,這 7 大參數幾乎是面試中的必考題,也是實際生產中必須要使用到的。掌握它,讓你的線程池使用遊刃有餘。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
七大參數分別是:
-
核心線程數
-
最大線程數
-
線程存活時間
-
存活時間單位
-
阻塞隊列
-
線程工廠
-
拒絕策略
5.1 核心線程數
核心線程數指的是初始化時就需要創建的線程,核心線程始終活躍,不管有沒有需要執行的任務,核心線程都不會銷燬。可以理解爲,隨時待命!
5.2 最大線程數
顧名思義,線程池中最多允許存在多少個線程。當核心線程繁忙,隊列隊滿的情況下,如果 “最大線程數 - 核心線程數> 0”,線程池則會新建線程執行任務。
5.3 線程存活時間
當線程數大於核心數時,這是多餘的空閒線程(即存活於藍色區域的線程)在終止前等待新任務的最長時間。和時間單位參數連用。
5.4 存活時間單位
和線程存活時間一起使用,指定的是一段時間。常用單位有:
-
TimeUnit.NANOSECONDS 納秒
-
TimeUnit.MILLISECONDS 毫秒,1 秒 = 1000 毫秒
-
TimeUnit.SECONDS 秒
-
TimeUnit.MINUTES 分
舉例:30,TimeUnit.SECONDS ==> 存活時間:30 秒
5.5 阻塞隊列
線程池中的阻塞隊列類型也挺多的,特性也不盡相同,這也提升了線程池的靈活及多樣性。參數類型是 BlockingQueue,BlockingQueue 是一個接口,它的實現類都可以使用。
實現類有:ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue。
它們之間的特性:
BlockingQueue 的四類方法:
這四種形式的處理方式不同:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。
5.6 線程工廠
線程工廠可以用戶自定義,也可以使用默認的線程工程。線程工廠就是用來創建線程的。
使用默認的線程工廠:
Executors.defaultThreadFactory()
源碼如下:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
// 從安全管理器中拿到線程組
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 指定線程的名字
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// 設置用戶進程
if (t.isDaemon())
t.setDaemon(false);
// 設置優先級
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
默認的線程工廠主要是設置了一些線程名稱規則,用戶線程,以及線程默認的優先級。
當然,你也可以自定義線程工廠,參照默認的線程工廠的實現就可以,這樣你自己創建的線程的名稱,優先級等等都是可以按照你自己的規範來。
5.7 拒絕策略(飽和策略)
任務的拒絕策略,也可以叫飽和策略,就是當阻塞隊列隊滿時,剩下提交的任務的處理策略。
JDK 中提供了四種拒絕策略,默認使用的是飽和丟棄策略。
/**
* 源碼中默認使用的是 AbortPolicy 策略
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
飽和策略詳解:
具體選擇哪種策略,需要根據實際的業務場景來考量
6. Spring/Spring Boot 使用線程池
如果大家已經很熟悉能夠使用線程池,則可以直接跳過本節。
6.1 Spring 使用線程池
6.1.1 創建 maven 工程,導入相關依賴
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.12.RELEASE</version>
</dependency>
6.1.2 創建並配置線程池
Spring 有兩種方式,一種是使用配置類的形式,一種是在 bean.xml 中配置。我們演示使用配置類的。
創建一個線程池配置類,並配置好 7 大參數。
/**
* @author 九月長安
* @version $Id: MyThreadPoolConfig.java, v 0.1 2021-08-03 18:41 九月長安 Exp $$
*/
@Configuration
public class MyThreadPoolConfig {
// 指定注入的 bean 名稱
@Bean(name = "executorService")
public ExecutorService getThreadPool() {
return new ThreadPoolExecutor(2,
20,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(),
Executors.defaultThreadFactory(),
new AbortPolicy());
}
}
6.1.3 使用線程池
/**
* @author 九月長安
* @version $Id: UserService.java, v 0.1 2021-08-03 11:16 九月長安 Exp $$
*/
@Service
public class UserService {
@Autowired
private UserDao userDao;
// 注入線程池
@Autowired
private ExecutorService executorService;
public void getUserInfo() throws ExecutionException, InterruptedException {
// 提交有返回值的任務
Future<Person> future = executorService.submit(new Callable<Person>() {
@Override
public Person call() throws Exception {
return userDao.getPerson();
}
});
// 獲取返回結果
Person p = future.get();
// 打印
System.out.println(p);
}
}
打印結果:
至此,您已掌握 Spring 線程池的基本使用。實際開發中,很多任務是可以異步執行的,這些任務使用線程池能夠大大地提升速度。例如向用戶推送消息,我們沒必要去等待全部推送完再返回,我們只需要將執行結果記錄一下,過段時間去查詢一下執行情況即可。
6.2 Spring Boot 使用線程池
6.2.1 創建一個 Spring Boot 項目
大家可以使用 IDE 創建,也可以使用 Spring 官網提供的初始化嚮導 地址:https://start.spring.io/。
6.2.2 創建並配置線程池
Spring Boot 線程池創建與配置和 Spring 幾乎一樣。
@Configuration
public class MyThreadPoolConfig {
// 指定注入的 bean 名稱
@Bean(name = "executorService")
public ExecutorService getThreadPool() {
return new ThreadPoolExecutor(2,
20,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(),
Executors.defaultThreadFactory(),
new AbortPolicy());
}
}
6.2.3 線程池的使用
Spring Boot 線程池使用和 Spring 沒有太大的區別,一樣是注入然後使用。可參考 6.1 節。詳細操作方法可查閱 Java API,地址:
https://tool.oschina.net/apidocs/apidoc?api=jdk-zh
7. 根據設備 CPU 動態配置線程池
追求線程池配置的最佳合理參數,是大家共同的夙願,我們先來看一看由於配置不合理導致出現問題的實際案例。
7.1 實際案例
案例 1:頁面大量產生接口服務降級
原因:沒有預估好調用的流量,導致最大核心數設置偏小,大量拋出
RejectedExecutionException,導致隊滿而拋出異常,從而產生降級。
案例 2:自身作爲上游服務,執行時間過長,導致整體服務超時,影響下游服務大量調用失敗
原因:阻塞隊列設置過長,最大線程數設置太小,導致請求數量增加時,大量任務堆積在隊列中,任務執行時間過長,最終導致調用超時失敗。
7.2 追求最佳參數配置
那麼有沒有一個參數是最佳參數配置呢?這個還在不斷地討論和實踐中,因爲實際的服務器環境和業務要求複雜且多樣,IO 密集型和 CPU 密集型的任務運行起來的情況差異非常大,但是追求完美依然是我們要做的。
美團技術團隊針對以上方案,也沒有得出一個最佳通用的配置,沒有一個通用的公式可以解決這一問題。
7.3 較爲常用的配比
其實我們最難確定的就是核心線程和最大線程的配比,那麼有沒有一些配比是較爲常用的呢?其實是有的。
-
CPU 密集型:核心線程數 = CPU 核數 + 1
-
IO 密集型:核心線程數 = CPU 核數 * 2,最大線程數 = CPU 核數 /(1- 阻塞係數),阻塞係數:0.8~0.9
例如:8 核,則 8/(1-0.9) = 80,及最大線程數爲 80。
7.4 動態化線程池
線程池既然那麼重要,而且參數不能最佳適配業務場景,那麼能不能設計一個動態化的線程池?例如現在業務負載過大,動態的調整核心線程數,那麼是不是就能完美的解決這一問題呢?我們來看一下美團技術團隊的實踐架構:
個人覺得已經是相當的不錯,包含申請,動態調參,監控告警,讓線程池始終處於最佳狀態。想要設計自己的線程池架構的小夥伴,可以參考此架構設計。
8. 常見面試題精講
此節希望大家學習完成後時常來溫習,做到胸有成竹最好了。
創建線程有哪幾種方式?
答:new Thread 類,實現 Runnable 接口,實現 Callable 接口,使用線程池。
使用線程池有什麼好處?
答:資源合理分配,提高資源複用,提升執行效率,線程創建執行與任務提交解耦。
線程池 7 大參數有哪些?
答:核心線程數、最大線程數、存活時間、存活時間單位、阻塞隊列、線程工廠、拒絕策略。
如果核心線程數滿了,那麼此時提交的任務怎麼處理?
如果核心線程數滿了,則將任務提交至阻塞隊列等待執行,如果阻塞隊列也滿了,且最大線程數 - 核心線程數 > 0 則創建新的線程執行提交的任務。
線程池的拒絕策略有哪些?
-
AbortPolicy 丟棄任務並拋出異常。
-
DiscardPolicy 丟棄任務,不拋異常。
-
DiscardOldestPolicy 丟棄隊列最前面的任務,然後重新提交被拒絕的任務。
-
CallerRunsPolicy 由調用線程執行該任務。例如:如果是主線程調用線程池,提交任務,則拒絕的任務由主線程執行。
如果讓你來設計線程池你會怎樣設計?
首先是根據業務場景,判斷是 CPU 密集型還是 IO 密集型,不同的類型方案不一樣,通常 IO 密集型設置的 CPU 核數較多。其次根據實際訪問量,以及部署環境來設定參數。拒絕策略的話,需要看具體業務對任務不能執行的容忍程度。最好設置足夠適合的隊列長度、核心線程數、最大線程數,儘量避免觸發拒絕策略。
來源: https://www.toutiao.com/article/6995324544550437388/?log_from=b46a7156b27b3_1653959189348
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/EGv7Va-27393xO9VXJX7Xw