深入源碼,深度解析 Java 線程池的實現原理
作者 l zyz1992
來源 l Hollis(ID:hollischuang)
Java 系統的運行歸根到底是程序的運行,程序的運行歸根到底是代碼的執行,代碼的執行歸根到底是虛擬機的執行,虛擬機的執行其實就是操作系統的線程在執行,並且會佔用一定的系統資源,如 CPU、內存、磁盤、網絡等等。所以,如何高效的使用這些資源就是程序員在平時寫代碼時候的一個努力的方向。本文要說的線程池就是一種對 CPU 利用的優化手段。
線程池,百度百科是這麼解釋的:
線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然後在創建線程後自動啓動這些任務。線程池線程都是後臺線程。每個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。如果某個線程在託管代碼中空閒(如正在等待某個事件), 則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間後創建另一個輔助線程但線程的數目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成後才啓動。
線程池的優點
在 Java 併發編程框架中的線程池是運用場景最多的技術,幾乎所有需要異步或併發執行任務的程序都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來至少以下 4 個好處。
第一:降低資源消耗。通過重複利用已創建的線程降低線程創建和銷燬造成的消耗;
第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行;
第三:提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。
第四:**提供更強大的功能。**比如延時定時線程池;
線程池的實現原理
當向線程池提交一個任務之後,線程池是如何處理這個任務的呢?下面就先來看一下它的主要處理流程。先來看下面的這張圖,然後我們一步一步的來解釋。
當使用者將一個任務提交到線程池以後,線程池是這麼執行的:
①首先判斷核心的線程數是否已滿,如果沒有滿,那麼就去創建一個線程去執行該任務;否則請看下一步
②如果線程池的核心線程數已滿,那麼就繼續判斷任務隊列是否已滿,如果沒滿,那麼就將任務放到任務隊列中;否則請看下一步
③如果任務隊列已滿,那麼就判斷線程池是否已滿,如果沒滿,那麼就創建線程去執行該任務;否則請看下一步;
④如果線程池已滿,那麼就根據拒絕策略來做出相應的處理;
上面的四步其實就已經將線程池的執行原理描述結束了。如果不明白沒有關係,先一步一步往下看,上面涉及到的線程池的專有名詞都會詳細的介紹到。
我們在平時的開發中,線程池的使用基本都是基於 ThreadPoolExexutor 類,他的繼承體系是這樣子的:
image-20210322133058425
那既然說在使用中都是基於 ThreadPoolExexutor 的那麼我們就重點分析這個類。
至於他構造體系中的其他的類或者是接口中的屬性,這裏就不去截圖了,完全沒有必要。小夥伴如果實在想看就自己去打開代碼看一下就行了。
ThreadPoolExecutor
在《阿里巴巴 java 開發手冊》中指出了線程資源必須通過線程池提供,不允許在應用中自行顯示的創建線程,這樣一方面是線程的創建更加規範,可以合理控制開闢線程的數量;另一方面線程的細節管理交給線程池處理,優化了資源的開銷。
其原文描述如下:
在 ThreadPoolExecutor 類中提供了四個構造方法,但是他的四個構造器中,實際上最終都會調用同一個構造器,只不過是在另外三個構造器中,如果有些參數不傳 ThreadPoolExecutor 會幫你使用默認的參數。所以,我們直接來看這個完整參數的構造器,來徹底剖析裏面的參數。
public class ThreadPoolExecutor extends AbstractExecutorService {
......
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0){
throw new IllegalArgumentException();
}
if (workQueue == null || threadFactory == null || handler == null){
throw new NullPointerException();
}
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
主要參數就是下面這幾個:
-
corePoolSize:線程池中的核心線程數,包括空閒線程,也就是核心線程數的大小;
-
maximumPoolSize:線程池中允許的最多的線程數,也就是說線程池中的線程數是不可能超過該值的;
-
keepAliveTime:當線程池中的線程數大於 corePoolSize 的時候,在超過指定的時間之後就會將多出 corePoolSize 的的空閒的線程從線程池中刪除;
-
unit:keepAliveTime 參數的單位(常用的秒爲單位);
-
workQueue:用於保存任務的隊列,此隊列僅保持由 executor 方法提交的任務 Runnable 任務;
-
threadFactory:線程池工廠,他主要是爲了給線程起一個標識。也就是爲線程起一個具有意義的名稱;
-
handler:拒絕策略
阻塞隊列
workQueue 有多種選擇,在 JDK 中一共提供了 7 中阻塞對列,分別爲:
-
ArrayBlockingQueue :一個由數組結構組成的****有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證訪問者公平地訪問隊列 ,所謂公平訪問隊列是指阻塞的線程,可按照阻塞的先後順序訪問隊列。非公平性是對先等待的線程是不公平的,當隊列可用時,阻塞的線程都可以競爭訪問隊列的資格。
-
LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。此隊列的默認和最大長度爲 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
-
PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。(雖然此隊列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗,導致 OutOfMemoryError)
-
DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素
-
SynchronousQueue:一個不存儲元素的阻塞隊列。一種阻塞隊列,其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。(SynchronousQueue 該隊列不保存元素)
-
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。相對於其他阻塞隊列 LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
-
LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。是一個由鏈表結構組成的雙向阻塞隊列
在以上的 7 個隊列中,線程池中常用的是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue,隊列中的常用的方法如下:
關於阻塞隊列,介紹到這裏也就基本差不多了。
線程池工廠
線程池工廠,就像上面已經介紹的,目的是爲了給線程起一個有意義的名字。用起來也非常的簡單,只需要實現 ThreadFactory 接口即可
public class CustomThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("我是你們自己定義的線程名稱");
return thread;
}
}
具體的使用就不去廢話了。
拒絕策略
-
AbortPolicy:這是線程池默認的拒絕策略,在任務不能再提交的時候,拋出異常,及時反饋程序運行狀態。如果是比較關鍵的業務,推薦使用此拒絕策略,這樣子在系統不能承載更大的併發量的時候,能夠及時的通過異常發現;
-
DiscardPolicy:丟棄任務,但是不拋出異常。如果線程隊列已滿,則後續提交的任務都會被丟棄,且是靜默丟棄。這玩意不建議使用;
-
DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新提交被拒絕的任務。這玩意不建議使用;
-
CallerRunsPolicy:如果任務添加失敗,那麼主線程就會自己調用執行器中的 executor 方法來執行該任務。這玩意不建議使用;
public class CustomRejection implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("你自己想怎麼處理就怎麼處理");
}
}
看到這裏,我們再來畫一張圖來總結和概括下線程池的執行示意圖:
詳細的執行過程全部在圖中說明了。
提交任務到線程池
在 java 中,有兩個方法可以將任務提交到線程池,分別是 submit 和 execute。
execute 方法
execute() 方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。
void execute(Runnable command);
通過以下代碼可知 execute() 方法輸入的任務是一個 Runnable 類的實例。
executorService.execute(()->{
System.out.println("ThreadPoolDemo.execute");
});
submit 方法
submit() 方法用於提交需要返回值的任務。
Future<?> submit(Runnable task);
線程池會返回一個 future 類型的對象,通過這個 future 對象可以判斷任務是否執行成功,並且可以通過 future 的 get() 方法來獲取返回值,get() 方法會阻塞當前線程直到任務完成,而使用 get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後立即返回,這時候有可能任務沒有執行完。
Future<?> submit = executorService.submit(() -> {
System.out.println("ThreadPoolDemo.submit");
});
關閉線程池
其實,如果優雅的關閉線程池是一個令人頭疼的問題,線程開啓是簡單的,但是想要停止卻不是那麼容易的。通常而言, 大部分程序員都是使用 jdk 提供的兩個方法來關閉線程池,他們分別是:shutdown 或 shutdownNow;
**通過調用線程池的 shutdown 或 shutdownNow 方法來關閉線程池。**它們的原理是遍歷線程池中的工作線程,然後逐個調用線程的 interrupt 方法來中斷線程(PS:中斷,僅僅是給線程打上一個標記,並不是代表這個線程停止了,如果線程不響應中斷,那麼這個標記將毫無作用),所以無法響應中斷的任務可能永遠無法終止。
但是它們存在一定的區別,shutdownNow 首先將線程池的狀態設置成 STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而 shutdown 只是將線程池的狀態設置成 SHUTDOWN 狀態,然後中斷所有沒有正在執行任務的線程。
只要調用了這兩個關閉方法中的任意一個,isShutdown 方法就會返回 true。當所有的任務都已關閉後,才表示線程池關閉成功,這時調用 isTerminaed 方法會返回 true。至於應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用 shutdown 方法來關閉線程池,如果任務不一定要執行完,則可以調用 shutdownNow 方法。
這裏推薦使用穩妥的 shutdownNow 來關閉線程池,至於更優雅的方式我會在以後的併發編程設計模式中的兩階段終止模式中會再次詳細介紹。
合理的參數
爲什麼叫合理的參數,那不合理的參數是什麼樣子的?在我們創建線程池的時候,裏面的參數該如何設置才能稱之爲合理呢?其實這是有一定的依據的,我們先來看一下以下的創建的方式:
ExecutorService executorService = new ThreadPoolExecutor(5,
5,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
r -> {
Thread thread = new Thread(r);
thread.setName("線程池原理講解");
return thread;
});
你說他合理不合理?我也不知道,因爲我們沒有參考的依據,在實際的開發中,我們需要根據任務的性質(IO 是否頻繁?)來決定我們創建的核心的線程數的大小,實際上可以從以下的一個角度來分析:
-
任務的性質:CPU 密集型任務、IO 密集型任務和混合型任務;
-
任務的優先級:高、中和低;
-
任務的執行時間:長、中和短;
-
任務的依賴性:是否依賴其他系統資源,如數據庫連接;
性質不同的任務可以用不同規模的線程池分開處理。分爲 CPU 密集型和 IO 密集型。
CPU 密集型任務應配置儘可能小的線程,如配置 Ncpu+1 個線程的線程池。(可以通過 Runtime.getRuntime().availableProcessors() 來獲取 CPU 物理核數)
IO 密集型任務線程並不是一直在執行任務,則應配置儘可能多的線程,如 2*Ncpu。
混合型的任務,如果可以拆分,將其拆分成一個 CPU 密集型任務一個 IO 密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於串行執行的吞吐量。
如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過 Runtime.getRuntime().availableProcessors() 方法獲得當前設備的 CPU 個數。
優先級不同的任務可以使用優先級隊列 PriorityBlockingQueue 來處理。它可以讓優先級高的任務先執行(注意:如果一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行)
執行時間不同的任務可以交給不同規模的線程池來處理,或者可以使用優先級隊列,讓執行時間短的任務先執行。依賴數據庫連接池的任務,因爲線程提交 SQL 後需要等待數據庫返回結果,等待的時間越長,則 CPU 空閒時間就越長,那麼線程數應該設置得越大,這樣才能更好地利用 CPU。
建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點。方式因爲提交的任務過多而導致 OOM
小結
本文主要介紹的是線程池的實現原理以及一些使用技巧,在實際開發中,線程池可以說是稍微高級一點的程序員的必備技能。所以掌握好線程池這門技術也是重中之重!
另外,在瞭解了原理之後,使用線程池時候也要格外小心,在阿里巴巴 Java 開發手冊中提到,禁止使用 Executors 直接創建線程池,因爲可能會導致 OOM(OutOfMemory , 內存溢出),具體原因和替代方案參考:Java 中線程池,你真的會用嗎?
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/TlDtXA8QIqtcl34IdJIJQQ