一張圖理解線程池

作者:pq217
鏈接:https://www.jianshu.com/p/214d0f1e3a0f

前言

多線程 Runnable 任務的執行器 Executor 有很多,今天來看一下最常用的 Executor:ThreadPoolExecutor,也就是線程池

ThreadPoolExecutor

ThreadPoolExecutor 想必都用過,有的是直接new ThreadPoolExecutor來創建線程池,有的是通過Executors.newFixedThreadPool等工廠方法去創建線程池

阿里 BB 的規範中不允許使用 Executors 創建線程池,主要是怕忽略線程池創建的參數而造成資源耗盡的風險,那麼我們先看看 ThreadPoolExecutor 可配參數有哪些

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    ...                       
}

圖文理解

我們把線程池比作公司,線程比作員工,那麼corePoolSize 核心線程數相當於一個公司的正式編制員工數,他們接受任務並執行,沒任務時候等待新任務 (公司聘請正式員工的方式是來一個任務聘用一個新員工)

當公司正式編制員工聘用滿後,新來的任務放到workQueue 代辦任務隊列中,等待被員工處理

當任務變多了,正式員工都在忙,任務隊列放不下新任務了,就要考慮聘請臨時工幫忙,maximumPoolSize 代表公司可容納的最多員工數,相當於辦公位數,那麼最大能聘請的臨時工的數量就是maximumPoolSize-corePoolSize ,請多了也沒有工位可幹活,臨時工在公司不忙時候就離開公司 (線程銷燬),給他一個緩衝時間避免剛走又忙起來,這個保持時間就是keepAliveTime,時間單位是unit

當任務隊列都滿了,所有員工 (包括臨時工) 都在忙,而員工總數已經到達maximumPoolSize ,再不能新增員工了,這時如果還有任務那隻能拒絕了,拒絕工作的處理者就是handler ,可以自行定義

threadFactory相當於員工的來源,比如來自某個大學,那麼這個大學對於公司來說就是員工的生產工廠,可以通過配置自定義threadFactory來控制員工的實際生成工作,比如可以統一名稱前綴、統一編號規則等

工作機制

回到程序本身,首先線程池初始化是沒有任何線程的,執行邏輯如下

step1

當任務到來時,真實線程數少於corePoolSize ,就會創建線程,並執行這個任務,執行結束後線程並不關閉,因爲是corePool,繼續獲取workQueue 裏的任務,如果沒有就阻塞等待

step2

如果上一步條件不滿足 (corePool已全部創建完成),會嘗試把任務加入隊列workQueue 中,那麼空閒下來的線程就可以從隊列獲取並執行任務

step3

如果上一步失敗 - 隊列已滿,線程數小於maximumPoolSize ,則會嘗試新增一個臨時線程去執行任務,這些臨時線程工作完成後會存活一段時間,直到空閒了keepAliveTime設置的時間後就銷燬 (實際上可能銷燬原線程而保留新增線程,屬於一種淘汰機制)

代碼

以上的線程創建並不是直接 new Thread,而是通過ThreadFactory創建

public interface ThreadFactory {
    // 生成新線程
    Thread newThread(Runnable r);
}

我們可以自定義實現 ThreadFactory 讓我們線程池的擁有一樣的名稱前綴或編號規則,方便 jvm 調試時識別,比如

private static ThreadFactory threadFactory = new ThreadFactory() {
    private AtomicInteger no = new AtomicInteger(0);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "my-thread-"+(no.incrementAndGet()));
    }
};
// 生成的線程名字就是my-thread-1,my-thread-2,my-thread-3...

以上所說三步驗證,可以看一下ThreadPoolExecutor.execute源碼

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // step1:小於corePoolSize,新增線程:addWorker
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // step2:嘗試添加到隊列:workQueue.offer
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // step3:嘗試新增線程:addWorker
    else if (!addWorker(command, false))
        reject(command);
}

其中addWorker代表是新增線程的方法,其定義如下

private void addWorker(Runnable firstTask, boolean core)

第二參數 boolean 型的 core,代表是否核心線程,可以看到 execute 的step1傳入的是 true,此時添加的是核心線程,step3傳的是 false,此時添加的是臨時線程

總結

ThreadPoolExecutor 的核心代碼看起來還是挺費勁,主要考慮的線程安全的事太多,後續計劃模仿手寫一個線程池來更深一層看一下具體實現~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/V8OwUSJWsAXj61lcj6-Hdw