線程數突增!領導:誰再這麼寫就滾蛋!

作者:魔性的茶葉

來源:juejin.cn/post/7197424371991855159

下面是正文。

今天上班把需求寫完,出於學習(摸魚)的心理上 skywalking 看看,突然發現我們的一個應用,應用內線程數超過 900 條,接近 1000 條,但是 cpu 並沒有高漲,內存也不算高峯。

但是敏銳的我還是立刻意識到這個應用有不妥,因爲線程數太多了,不符合我們一個正常健康的應用數量。熟練的打出 cpu dump 觀察,首先看線程組名的概覽。

從線程分組看,pool 名開頭線程佔 616 條,而且 waiting 狀態也是 616 條,這個點就非常可疑了,我斷定就是這個 pool 開頭線程池導致的問題。我們先排查爲何這個線程池中會有 600 + 的線程處於 waiting 狀態並且無法釋放,記接下來我們找幾條線程的堆棧觀察具體堆棧:

這個堆棧看上去很合理,線程在線程池中不斷的循環獲取任務,因爲獲取不到任務所以進入了 waiting 狀態,等待着有任務後被喚醒。

看上去不只一個線程池,並且這些線程池的名字居然是一樣的,我大膽的猜測一下,是不斷的創建同樣的線程池,但是線程池無法被回收導致的線程數,所以接下來我們要分析兩個問題,首先這個線程池在代碼裏是哪個線程池,第二這個線程池是怎麼被創建的?爲啥釋放不了?

我在 idea 搜索new ThreadPoolExecutor()得到的結果是這樣的:

於是我陷入懵逼的狀態,難道還有其他騷操作?

正在這時,一位不知名的鄭網友發來一張截圖:

好傢伙!竟然是用new FixedTreadPool()整出來的。難怪我完全搜不到,因爲用的new FixedTreadPool(),所以線程池中的線程名是默認的 pool(又多了一個不使用 Executors 來創建線程池的理由)。

然後我迫不及 die 的打開代碼,試圖找到罪魁禍首,結果發現作者居然是我自己。這是另一個驚喜,驚嚇的驚。

冷靜下來後我梳理一遍代碼,這個接口是我兩年前寫的,主要是功能是統計用戶的錢包每個月的流水,因爲擔心統計比較慢,所以使用了線程池,做了批量的處理,沒想到居然導致了線程數過高,雖然沒有導致事故,但是確實是潛在的隱患,現在沒出事不代表以後不會出事。

去掉多餘業務邏輯,我簡單的還原一個代碼給大家看,還原現場:

private static void threadDontGcDemo(){
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    executorService.submit(() -> {
        System.out.println("111");
    });
}

那麼爲啥線程池裏面的線程和線程池都沒釋放呢

難道是因爲沒有調用 shutdown?我大概能理解我兩年前當時爲啥不調用 shutdown,是因爲當初我覺得接口跑完,方法走到結束,理論上棧幀出棧,局部變量應該都銷燬了,按理說executorService這個變量應該直接 GG 了,那麼按理說我是不用調用 shutdown 方法的。

我簡單的跑了個 demo,循環的去 new 線程池,不調用 shutdown 方法,看看線程池能不能被回收

打開java visual vm查看實時線程:

可以看到線程數和線程池都一直在增加,但是一直沒有被回收,確實符合發生的問題狀況,那麼假如我在方法結束前調用 shutdown 方法呢,會不會回收線程池和線程呢?

簡單寫個 demo 結合 jvisualvm 驗證下:

結果是線程和線程池都被回收了。也就是說,執行了 shutdown 的線程池最後會回收線程池和線程對象。

我們知道,一個對象能不能回收,是看它到 gc root 之間有沒有可達路徑,線程池不能回收說明到達線程池的 gc root 還是有可達路徑的。這裏講個冷知識,這裏的線程池的 gc root 是線程,具體的 gc 路徑是thread->workers->線程池

線程對象是線程池的 gc root,假如線程對象能被 gc,那麼線程池對象肯定也能被 gc 掉(因爲線程池對象已經沒有到 gc root 的可達路徑了)。

那麼現在問題就轉爲線程對象是在什麼時候 gc

鄭網友給了一個粗淺但是合理的解釋,線程對象肯定不是在運行中的時候被回收的,因爲 jvm 肯定不可能去回收一條在運行中的線程,至少 runnalbe 狀態的線程 jvm 不可能去回收。

在 stackoverflow 上我找到了更準確的答案:

A running thread is considered a so called garbage collection root and is one of those things keeping stuff from being garbage collected

這句話的意思是,一條正在運行的線程是 gc root,注意,是正在運行,這個正在運行我先透露下,即使是 waiting 狀態,也算正在運行。這個回答的整體的意思是,運行的線程是 gc root,但是非運行的線程不是 gc root(可以被回收)。

現在比較清楚了,線程池和線程被回收的關鍵就在於線程能不能被回收,那麼回到原來的起點,爲何調用線程池的 shutdown 方法能夠導致線程和線程池被回收呢?難道是 shutdown 方法把線程變成了非運行狀態嗎?

talk is cheap,show me the code

我們直接看看線程池的 shutdown 方法的源碼

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

我們從interruptIdleWorkers方法入手,這方法看上去最可疑,看到interruptIdleWorkers方法,這個方法裏面主要就做了一件事,遍歷當前線程池中的線程,並且調用線程的interrupt()方法,通知線程中斷,也就是說 shutdown 方法只是去遍歷所有線程池中的線程,然後通知線程中斷。所以我們需要了解線程池裏的線程是怎麼處理中斷的通知的。

我們點開 worker 對象,這個 worker 對象是線程池中實際運行的線程,所以我們直接看 worker 的 run 方法,中斷通知肯定是在裏面被處理了

//WOrker的run方法裏面直接調用的是這個方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

這個 runwoker 屬於是線程池的核心方法了,相當的有意思,線程池能不斷運作的原理就是這裏,我們一點點看。

首先最外層用一個 while 循環套住,然後不斷的調用gettask()方法不斷從隊列中取任務,假如拿不到任務或者任務執行發生異常(拋出異常了)那就屬於異常情況,直接將completedAbruptly 設置爲 true,並且進入異常的processWorkerExit流程。

我們看看gettask()方法,瞭解下啥時候可能會拋出異常:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

這樣很清楚了,拋去前面的大部分代碼不看,這句代碼解釋了 gettask 的作用:

Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take()

gettask 就是從工作隊列中取任務,但是前面還有個 timed,這個 timed 的語義是這樣的:如果allowCoreThreadTimeOut參數爲 true(一般爲 false)或者當前工作線程數超過核心線程數,那麼使用隊列的 poll 方法取任務,反之使用 take 方法。

也就是說線程池的shutdownnow方法調用interruptIdleWorkers去對線程對象 interrupt 是爲了讓處於 waiting 或者是time_waiting的線程拋出異常。

那麼線程池是在哪裏處理這個異常的呢?我們看runwoker中的調用的processWorkerExit方法,說實話這個方法看着就像處理拋出異常的方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

我們可以看到,在這個方法裏有一個很明顯的workers.remove(w)方法,也就是在這裏,這個 w 的變量,被移出了 workers 這個集合,導致 worker 對象不能到達 gc root,於是 workder 對象順理成章的變成了一個垃圾對象,被回收掉了。

然後等到 worker 中所有的 worker 都被移出 works 後,並且當前請求線程也完成後,線程池對象也成爲了一個孤兒對象,沒辦法到達gc root,於是線程池對象也被 gc 掉了。寫了挺長的篇幅,我小結一下:

總結

如果只是在局部方法中使用線程池,線程池對象不是 bean 的情況時,記得要合理的使用shutdown或者shutdownnow方法來釋放線程和線程池對象,如果不使用,會造成線程池和線程對象的堆積。


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/KhGuxqE3-nxjH5PRZ4QDaQ