萬字長文,帶你深入理解 AQS 隊列同步器

作者_:zhenchao_

AbstractQueuedSynchronizer 簡稱 AQS,可能我們幾乎不會直接去使用它,但它卻是 JUC 的核心基礎組件,支撐着 java 鎖和同步器的實現,例如 ReentrantLockReentrantReadWriteLockCountDownLatch,以及 Semaphore 等。大神 Doug Lea 在設計 JUC 包時希望能夠抽象一個基礎且通用的組件以支撐上層模塊的實現,AQS 應運而生。

AQS 本質上是一個 FIFO 的雙向隊列,線程被包裝成結點的形式,基於自旋機制在隊列中等待獲取資源(這裏的資源可以簡單理解爲對象鎖)。AQS 在設計上實現了兩類隊列,即 同步隊列 和 條件隊列 ,其中同步隊列服務於線程阻塞等待獲取資源,而條件隊列則服務於線程因某個條件不滿足而進入等待狀態。

條件隊列中的線程實際上已經獲取到了資源,但是沒有能夠繼續執行下去的條件,所以被打入條件隊列並釋放持有的資源,以讓渡其它線程執行,如果未來某個時刻條件得以滿足,則該線程會被從條件隊列轉移到同步隊列,繼續參與競爭資源,以繼續向下執行。

本文我們主要分析 AQS 的設計與實現,包括 LockSupport 工具類、同步隊列、條件隊列,以及 AQS 資源獲取和釋放的通用過程。AQS 採用模板方法設計模式,具體獲取資源和釋放資源的過程都交由子類實現,對於這些方法的分析將留到後面分析具體子類的文章中再展開。

LockSupport 工具類

LockSupport 工具類是 JUC 的基礎組件,主要作用是用來阻塞和喚醒線程,底層依賴於 Unsafe 類實現。LockSupport 主要定義類 2 類方法:park 和 unpark,其中 park 方法用於阻塞當前線程,而 unpark 方法用於喚醒處於阻塞狀態的指定線程。

下面的示例演示了 park 和 unpark 方法的基本使用:

 1Thread thread = new Thread(() -> {
 2    System.out.println("Thread start: " + Thread.currentThread().getName());
 3    LockSupport.park(); // 阻塞自己
 4    System.out.println("Thread end: " + Thread.currentThread().getName());
 5});
 6
 7thread.setName("A");
 8thread.start();
 9
10System.out.println("Main thread sleep 3 second: " + Thread.currentThread().getId());
11TimeUnit.SECONDS.sleep(3);
12LockSupport.unpark(thread); // 喚醒線程 A
13
14

線程 A 在啓動之後調用了 LockSupport#park 方法將自己阻塞,主線程在休息 3 秒之後調用 LockSupport#unpark 方法線程 A 喚醒。運行結果:

1Thread start: A
2Main thread sleep 3 second: 1
3Thread end: A
4
5

LockSupport 針對 park 方法提供了多種實現,如下:

1public static void park()
2public static void park(Object blocker)
3public static void parkNanos(long nanos)
4public static void parkNanos(Object blocker, long nanos)
5public static void parkUntil(long deadline)
6public static void parkUntil(Object blocker, long deadline)
7
8

由方法命名不難看出,parkNanos 和 parkUntil 都屬於 park 方法的超時版本,區別在於 parkNanos 方法接收一個納秒單位的時間值,用於指定阻塞的時間長度,例如當設置 nanos=3000000000 時,線程將阻塞 3 秒後甦醒,而 parkUntil 方法則接收一個時間戳,參數 deadline 用於指定阻塞的到期時間。

所有的 park 方法都提供了包含 Object blocker 參數的重載版本,參數 blocker 指代導致當前線程阻塞等待的鎖對象,方便問題排查和系統監控,而在 LockSupport 最開始被設計時卻忽視了這一點,導致在線程 dump 時無法提供阻塞對象的相關信息,這一點在 java 6 中得以改進。實際開發中如果使用到了 LockSupport 工具類,推薦使用帶 blocker 參數的版本。

下面以 LockSupport#park(java.lang.Object) 方法爲例來看一下具體的實現,如下:

 1public static void park(Object blocker) {
 2    // 獲取當前線程對象
 3    Thread t = Thread.currentThread();
 4    // 記錄當前線程阻塞等待的鎖對象(設置線程對象的 parkBlocker 爲參數指定的 blocker 對象)
 5    setBlocker(t, blocker);
 6    // 阻塞線程
 7    UNSAFE.park(false, 0L);
 8    // 線程恢復運行,清除 parkBlocker 參數記錄的鎖對象
 9    setBlocker(t, null);
10}
11
12

具體實現比較簡單,阻塞線程的操作依賴於 Unsafe 類實現。上述方法會調用 LockSupport#setBlocker 方法基於 Unsafe 類將參數指定的 blocker 對象記錄到當前線程對象的 Thread#parkBlocker 字段中,然後進入阻塞狀態,並在被喚醒之後清空對應的 Thread#parkBlocker 字段。推薦:Java 面試練題寶典

當一個線程調用 park 方法進入阻塞狀態之後,會在滿足以下 3 個條件之一時從阻塞狀態中甦醒:

  1. 其它線程調用 unpark 方法喚醒當前線程。

  2. 其它線程中斷了當前線程的阻塞狀態。

  3. 方法 park 因爲一些不合邏輯的原因退出。

線程在從 park 方法中返回時並不會攜帶具體的返回原因,調用者需要自行檢測,例如再次檢查之前調用 park 方法的條件是否仍然滿足以予以推測。

方法 LockSupport#unpark 的實現同樣基於 Unsafe 類實現,不同於 park 的多版本實現,LockSupport 針對 unpark 方法僅提供了單一實現,如下:

1public static void unpark(Thread thread) {
2    if (thread != null) {
3        UNSAFE.unpark(thread);
4    }
5}
6
7

需要注意的一點是,如果事先針對某個線程調用了 unpark 方法,則該線程繼續調用 park 方法並不會進入阻塞狀態,而是會立即返回,並且 park 方法是不可重入的。

同步隊列

同步隊列的作用在於管理競爭資源的線程,當一個線程競爭資源失敗會被記錄到同步隊列的末端,並以自旋的方式循環檢查能夠成功獲取到資源。AQS 的同步隊列基於 CLH(Craig, Landin, and Hagersten) 鎖思想進行設計和實現。CLH 鎖是一種基於鏈表的可擴展、高性能,且具備公平性的自旋鎖。線程以鏈表結點的形式進行組織,在等待期間相互獨立的執行自旋,並不斷輪詢前驅結點的狀態,如果發現前驅結點上的線程釋放了資源則嘗試獲取。

CLH 鎖是 AQS 隊列同步器實現的基礎,AQS 以內部類 Node 的形式定義了同步隊列結點,包括下一小節介紹的條件隊列,同樣以 Node 定義結點。Node 的字段定義如下:

 1static final class Node {
 2
 3    /** 模式定義 */
 4
 5    static final Node SHARED = new Node();
 6    static final Node EXCLUSIVE = null;
 7
 8    /** 線程狀態 */
 9
10    static final int CANCELLED = 1;
11    static final int SIGNAL = -1;
12    static final int CONDITION = -2;
13    static final int PROPAGATE = -3;
14
15    /** 線程等待狀態 */
16    volatile int waitStatus;
17
18    /** 前驅結點 */
19    volatile Node prev;
20    /** 後置結點 */
21    volatile Node next;
22
23    /** 持有的線程對象 */
24    volatile Thread thread;
25
26    /** 對於獨佔模式而言,指向下一個處於 CONDITION 等待狀態的結點;對於共享模式而言,則爲 SHARED 結點 */
27    Node nextWaiter;
28
29    // ... 省略方法定義
30}
31
32

由上述字段定義可以看出,位於 CLH 鏈表中的線程以 2 種模式在等待資源,即 SHARED 和 EXCLUSIVE,其中 SHARED 表示共享模式,而 EXCLUSIVE 表示獨佔模式。共享模式與獨佔模式的主要區別在於,同一時刻獨佔模式只能有一個線程獲取到資源,而共享模式在同一時刻可以有多個線程獲取到資源。典型的場景就是讀寫鎖,讀操作可以有多個線程同時獲取到讀鎖資源,而寫操作同一時刻只能有一個線程獲取到寫鎖資源,其它線程在嘗試獲取資源時都會被阻塞。

AQS 的 CLH 鎖爲處於 CLH 鏈表中的線程定義了 4 種狀態,包括 CANCELLED、SIGNAL、CONDITION,以及 PROPAGATE,並以 Node#waitStatus 字段進行記錄。這 4 種狀態的含義分別爲:

一個結點在被創建時,字段 Node#waitStatus 的初始值爲 0,表示結點上的線程不位於上述任何狀態。

Node 類在方法定義上除了基本的構造方法外,僅定義了 Node#isShared 和 Node#predecessor 兩個方法,其中前者用於返回當前結點是否以共享模式在等待,後者用於返回當前結點的前驅結點。

介紹完了隊列結點的定義,那麼同步隊列具體如何實現呢?這還需要依賴於 AbstractQueuedSynchronizer 類中的兩個字段定義,即:

1private transient volatile Node head;
2private transient volatile Node tail;
3
4

其中 head 表示同步隊列的頭結點,而 tail 則表示同步隊列的尾結點,具體組織形式如下圖:

image

當調用 AQS 的 acquire 方法獲取資源時,如果資源不足則當前線程會被封裝成 Node 結點添加到同步隊列的末端,頭結點 head 用於記錄當前正在持有資源的線程結點,而 head 的後繼結點就是下一個將要被調度的線程結點,當 release 方法被調用時,該結點上的線程將被喚醒,繼續獲取資源。

關於同步隊列結點入隊列、出隊列的實現先不展開,留到後面分析 AQS 資源獲取與釋放的過程時一併分析。

條件隊列

除了上面介紹的同步隊列,在 AQS 中還定義了一個條件隊列。內部類 ConditionObject 實現了條件隊列的組織形式,包含一個起始結點(firstWaiter)和一個末尾結點(lastWaiter),並同樣以上面介紹的 Node 類定義結點,如下:

 1public class ConditionObject implements Condition, Serializable {
 2
 3        /** 指向條件隊列中的起始結點 */
 4        private transient Node firstWaiter;
 5        /** 指向條件隊列的末尾結點 */
 6        private transient Node lastWaiter;
 7
 8        // ... 省略方法定義
 9
10}
11
12

前面在分析 Node 內部類的時候,可以看到 Node 類還定義了一個 Node#nextWaiter 字段,用於指向條件隊列中的下一個等待結點。由此我們可以描繪出條件隊列的組織形式如下:

image

ConditionObject 類實現了 Condition 接口,該接口定義了與 Lock 鎖相關的線程通信方法,主要分爲 await 和 signal 兩大類。

當線程調用 await 方法時,該線程會被包裝成結點添加到條件隊列的末端,並釋放持有的資源。當條件得以滿足時,方法 signal 可以將條件隊列中的一個或全部的線程結點從條件隊列轉移到同步隊列以參與競爭資源。應用可以創建多個 ConditionObject 對象,每個對象都對應一個條件隊列,對於同一個條件隊列而言,其中的線程所等待的條件是相同的。推薦:Java 面試練題寶典

Condition 接口的定義如下:

 1public interface Condition {
 2
 3    void await() throws InterruptedException;
 4    void awaitUninterruptibly();
 5    long awaitNanos(long nanosTimeout) throws InterruptedException;
 6    boolean await(long time, TimeUnit unit) throws InterruptedException;
 7    boolean awaitUntil(Date deadline) throws InterruptedException;
 8
 9    void signal();
10    void signalAll();
11}
12
13

等待:await

下面來分析一下 ConditionObject 類針對 Condition 接口方法的實現,首先來看一下 ConditionObject#await 方法,該方法用於將當前線程添加到條件隊列中進行等待,同時支持響應中斷。方法實現如下:

 1public final void await() throws InterruptedException {
 2    if (Thread.interrupted()) {
 3        // 立即響應中斷
 4        throw new InterruptedException();
 5    }
 6    // 將當前線程添加到等待隊列末尾,等待狀態爲 CONDITION
 7    Node node = this.addConditionWaiter();
 8    // 釋放當前線程持有的資源
 9    int savedState = fullyRelease(node);
10    int interruptMode = 0;
11    while (!isOnSyncQueue(node)) { // 如果當前結點位於條件隊列中,則循環
12        // 阻塞當前線程
13        LockSupport.park(this);
14        // 如果線程在阻塞期間被中斷,則退出循環
15        if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {
16            break;
17        }
18    }
19    // 如果在同步隊列中等待期間被中斷,且之前的中斷狀態不爲 THROW_IE
20    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
21        interruptMode = REINTERRUPT;
22    }
23    if (node.nextWaiter != null) {
24        // 清除條件隊列中所有狀態不爲 CONDITION 的結點
25        this.unlinkCancelledWaiters();
26    }
27    // 如果等待期間被中斷,則響應中斷
28    if (interruptMode != 0) {
29        this.reportInterruptAfterWait(interruptMode);
30    }
31}
32
33

因爲 ConditionObject#await 方法支持響應中斷,所以在方法一開始會先檢查一下當前線程是否被中斷,如果是則拋出 InterruptedException 異常,否則繼續將當前線程加入到條件隊列中進行等待。整體執行流程可以概括爲:

  1. 將當前線程加入到條件隊列末端,並設置等待狀態爲 CONDITION;

  2. 釋放當前線程所持有的資源,避免飢餓或死鎖;

  3. 基於自旋機制在條件隊列中等待,直到被其它線程轉移到同步隊列,或者等待期間被中斷;

  4. 如果等待期間被中斷,則響應中斷。

ConditionObject 定義了兩種中斷響應方式,即:REINTERRUPT 和 THROW_IE。如果是 REINTERRUPT,則線程會調用 Thread#interrupt 方法中斷自己;如果是 THROW_IE,則線程會直接拋出 InterruptedException 異常。

下面繼續分析一下支撐 ConditionObject#await 運行的其它幾個方法,包括 addConditionWaiter、fullyRelease、isOnSyncQueue,以及 unlinkCancelledWaiters。

方法 ConditionObject#addConditionWaiter 用於將當前線程包裝成 Node 結點對象添加到條件隊列的末端,期間會執行清除條件隊列中處於取消狀態(等待狀態不爲 CONDITION)的線程結點。方法實現如下:

 1private Node addConditionWaiter() {
 2    // 獲取條件隊列的末尾結點
 3    Node t = lastWaiter;
 4    // 如果末尾結點狀態不爲 CONDITION,表示對應的線程已經取消了等待,需要執行清理操作
 5    if (t != null && t.waitStatus != Node.CONDITION) {
 6        // 清除條件隊列中所有狀態不爲 CONDITION 的結點
 7        this.unlinkCancelledWaiters();
 8        t = lastWaiter;
 9    }
10    // 構建當前線程對應的 Node 結點,等待狀態爲 CONDITION,並添加到條件隊列末尾
11    Node node = new Node(Thread.currentThread(), Node.CONDITION);
12    if (t == null) {
13        firstWaiter = node;
14    } else {
15        t.nextWaiter = node;
16    }
17    lastWaiter = node;
18    return node;
19}
20
21

將當前線程對象添加到條件隊列中的過程本質上是一個簡單的鏈表插入操作,在執行插入操作之前,上述方法會先對條件隊列執行一遍清理操作,清除那些狀態不爲 CONDITION 的結點。具體實現位於 ConditionObject#unlinkCancelledWaiters 方法中:

 1private void unlinkCancelledWaiters() {
 2    Node t = firstWaiter;
 3    Node trail = null; // 記錄上一個不被刪除的結點
 4    while (t != null) {
 5        Node next = t.nextWaiter;
 6        // 如果結點上的線程等待狀態不爲 CONDITION,則刪除對應結點
 7        if (t.waitStatus != Node.CONDITION) {
 8            t.nextWaiter = null;
 9            if (trail == null) {
10                firstWaiter = next;
11            } else {
12                trail.nextWaiter = next;
13            }
14            if (next == null) {
15                lastWaiter = trail;
16            }
17        } else {
18            trail = t;
19        }
20        t = next;
21    }
22}
23
24

方法 AbstractQueuedSynchronizer#fullyRelease 用於釋放當前線程持有的資源,這也是非常容易理解的,畢竟當前線程即將進入等待狀態,如果持有的資源不被釋放,將可能導致程序最終被餓死,或者死鎖。方法的實現如下:

 1final int fullyRelease(Node node) {
 2    boolean failed = true;
 3    try {
 4        // 獲取當前線程的同步狀態,可以理解爲持有的資源數量
 5        int savedState = this.getState();
 6        // 嘗試釋放當前線程持有的資源
 7        if (this.release(savedState)) {
 8            failed = false;
 9            return savedState;
10        } else {
11            // 釋放資源失敗
12            throw new IllegalMonitorStateException();
13        }
14    } finally {
15        if (failed) {
16            // 如果釋放資源失敗,則取消當前線程
17            node.waitStatus = Node.CANCELLED;
18        }
19    }
20}
21
22

如果資源釋放失敗,則上述方法會將當前線程的狀態設置爲 CANCELLED,以退出等待狀態。

方法 AbstractQueuedSynchronizer#isOnSyncQueue 用於檢測當前結點是否位於同步隊列中,方法實現如下:

 1final boolean isOnSyncQueue(Node node) {
 2    // 如果結點位於等待隊列,或是頭結點則返回 false
 3    if (node.waitStatus == Node.CONDITION || node.prev == null) {
 4        return false;
 5    }
 6    // If has successor, it must be on queue
 7    if (node.next != null) {
 8        return true;
 9    }
10
11    /*
12     * node.prev can be non-null, but not yet on queue because the CAS to place it on queue can fail.
13     * So we have to traverse from tail to make sure it actually made it. It will always be near the tail in calls to this method,
14     * and unless the CAS failed (which is unlikely), it will be there, so we hardly ever traverse much.
15     */
16
17    // 從後往前檢測目標結點是否位於同步隊列中
18    return this.findNodeFromTail(node);
19}
20
21

如果一個線程所等待的條件被滿足,則觸發條件滿足的線程會將等待該條件的一個或全部線程結點從條件隊列轉移到同步隊列,此時,這些線程將從 ConditionObject#await 方法中退出,以參與競爭資源。

方法 ConditionObject#awaitNanosConditionObject#awaitUntil 和 ConditionObject#await(long, TimeUnit) 在上面介紹的 ConditionObject#await 方法的基礎上引入了超時機制,當一個線程在條件隊列中等待的時間超過設定值時,線程結點將被從條件隊列轉移到同步隊列,參與競爭資源。其它執行過程與 ConditionObject#await 方法相同,故不再展開。

下面來分析一下 ConditionObject#awaitUninterruptibly 方法,由方法命名可以看出該方法相對於 ConditionObject#await 方法的區別在於在等待期間不響應中斷。方法實現如下:

 1public final void awaitUninterruptibly() {
 2    // 將當前線程添加到等待隊列末尾,等待狀態爲 CONDITION
 3    Node node = this.addConditionWaiter();
 4    // 釋放當前線程持有的資源
 5    int savedState = fullyRelease(node);
 6    boolean interrupted = false;
 7    // 如果當前結點位於條件隊列中,則循環
 8    while (!isOnSyncQueue(node)) {
 9        // 阻塞當前線程
10        LockSupport.park(this);
11        if (Thread.interrupted()) {
12            // 標識線程等待期間被中斷,但不立即響應
13            interrupted = true;
14        }
15    }
16    // 自旋獲取資源,返回 true 則說明等待期間被中斷過
17    if (acquireQueued(node, savedState) || interrupted) {
18        // 響應中斷
19        selfInterrupt();
20    }
21}
22
23

如果線程在等待期間被中斷,則上述方法會用一個字段進行記錄,並在最後集中處理,而不會因爲中斷而退出等待狀態。

通知:signal

調用 await 方法會將線程對象自身加入到條件隊列中進行等待,而 signal 通知方法則用於將一個或全部的等待線程從條件隊列轉移到同步隊列,以參與競爭資源。ConditionObject 定義了兩個通知方法:signal 和 signalAll,前者用於將條件隊列的頭結點(也就是等待時間最長的結點)從條件隊列轉移到同步隊列,後者用於將條件隊列中所有處於等待狀態的結點從條件隊列轉移到同步隊列。下面分別來分析一下這兩個方法的實現。推薦:Java 面試練題寶典

方法 ConditionObject#signal 的實現如下:

 1public final void signal() {
 2    // 先檢測當前線程是否獲取到了鎖,否則不允許繼續執行
 3    if (!isHeldExclusively()) {
 4        throw new IllegalMonitorStateException();
 5    }
 6    // 獲取條件隊列頭結點,即等待時間最長的結點
 7    Node first = firstWaiter;
 8    if (first != null) {
 9        // 將頭結點從條件隊列轉移到同步隊列,參與競爭資源
10        this.doSignal(first);
11    }
12}
13
14

調用 ConditionObject#signal 方法的線程必須位於臨界區,也就是必須先持有獨佔鎖,所以上述方法一開始會對這一條件進行校驗,方法 AbstractQueuedSynchronizer#isHeldExclusively 是一個模板方法,交由子類來實現。

如果滿足執行條件,則上述方法會調用 ConditionObject#doSignal 方法將條件隊列的頭結點從條件隊列轉移到同步隊列。

 1private void doSignal(Node first) {
 2    // 從前往後遍歷,直到遇到第一個不爲 null 的結點,並將其從條件隊列轉移到同步隊列
 3    do {
 4        if ((firstWaiter = first.nextWaiter) == null) {
 5            lastWaiter = null;
 6        }
 7        first.nextWaiter = null;
 8    } while (!transferForSignal(first) && (first = firstWaiter) != null);
 9}
10
11// AbstractQueuedSynchronizer#transferForSignal
12final boolean transferForSignal(Node node) {
13    // 更新當前結點的等待狀態:CONDITION -> 0
14    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
15        // 更新失敗,說明對應的結點上的線程已經被取消
16        return false;
17    }
18
19    /*
20     * Splice onto queue and try to set waitStatus of predecessor to indicate that thread is (probably) waiting.
21     * If cancelled or attempt to set waitStatus fails, wake up to resync (in which case the waitStatus can be transiently and harmlessly wrong).
22     */
23
24    // 將結點添加到同步隊列末端,並返回該結點的前驅結點
25    Node p = this.enq(node);
26    int ws = p.waitStatus;
27    // 如果前驅結點被取消,或者設置前驅結點的狀態爲 SIGNAL 失敗,則喚醒當前結點上的線程
28    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
29        LockSupport.unpark(node.thread);
30    }
31    return true;
32}
33
34

方法 ConditionObject#doSignal 會從前往後遍歷條件隊列,尋找第一個不爲 null 的結點,並應用 AbstractQueuedSynchronizer#transferForSignal 方法嘗試將其從條件隊列轉移到同步隊列。

在入同步隊列之前,方法 AbstractQueuedSynchronizer#transferForSignal 會基於 CAS 機制清除結點的 CONDITION 狀態,如果清除失敗則說明該結點上的線程已被取消,此時 ConditionObject#doSignal 方法會繼續尋找下一個可以被喚醒的結點。如果清除結點狀態成功,則接下來會將該結點添加到同步隊列的末端,同時依據前驅結點的狀態決定是否喚醒當前結點上的線程。

繼續來看 ConditionObject#signalAll 方法的實現,相對於上面介紹的 ConditionObject#signal 方法,該方法的特點在於它會喚醒條件隊列中所有不爲 null 的等待結點。方法實現如下:

 1public final void signalAll() {
 2    if (!isHeldExclusively()) {
 3        // 先檢測當前線程是否獲取到了鎖,否則不允許繼續執行
 4        throw new IllegalMonitorStateException();
 5    }
 6    // 獲取條件隊列頭結點
 7    Node first = firstWaiter;
 8    if (first != null) {
 9        // 將所有結點從條件隊列轉移到同步隊列,參與競爭資源
10        this.doSignalAll(first);
11    }
12}
13
14private void doSignalAll(Node first) {
15    lastWaiter = firstWaiter = null;
16    do {
17        Node next = first.nextWaiter;
18        first.nextWaiter = null;
19        transferForSignal(first);
20        first = next;
21    } while (first != null);
22}
23
24

實際上理解了 ConditionObject#doSignal 的運行機制,再理解 ConditionObject#signalAll 的運行機制也是水到渠成的事情。

資源的獲取與釋放

前面的小節我們分析了 LockSupport 工具類,以及 AQS 同步隊列和條件隊列的設計與實現,這些都是支撐 AQS 運行的基礎組件,本小節我們將正式開始分析 AQS 的實現機制。

AQS 對應的 AbstractQueuedSynchronizer 實現類,在屬性定義上主要包含 4 個字段(如下),其中 exclusiveOwnerThread 由父類 AbstractOwnableSynchronizer 繼承而來,用於記錄當前持有獨佔鎖的線程對象,而 head 和 tail 字段分別指向同步隊列的頭結點和尾結點:

1private transient Thread exclusiveOwnerThread;
2
3private transient volatile Node head;
4private transient volatile Node tail;
5
6private volatile int state;
7
8

字段 state 用於描述同步狀態,對於不同的實現類來說具備不同的用途:

具體細節我們將在後面分析相應組件實現機制的文章中再展開說明。

AbstractQueuedSynchronizer 是一個抽象類,在方法設計上引入了模板方法設計模式,下面的代碼塊中列出了所有需要子類依據自身運行機制針對性實現的模板方法:

1protected boolean tryAcquire(int arg)
2protected boolean tryRelease(int arg)
3protected int tryAcquireShared(int arg)
4protected boolean tryReleaseShared(int arg)
5protected boolean isHeldExclusively()
6
7

這裏先簡單說明一下各個方法的作用,具體實現留到後面分析各個基於 AQS 實現組件的文章中再進一步分析:

AbstractQueuedSynchronizer 中的方法實現按照功能劃分可以分爲兩大類,即獲取資源(acquire)和釋放資源(release),同時區分獨佔模式和共享模式。下面的小節中主要對獲取和釋放資源的方法區分獨佔模式和共享模式進行分析。

獨佔獲取資源

針對獨佔模式獲取資源,AbstractQueuedSynchronizer 定義了多個版本的 acquire 方法實現,包括:acquire、acquireInterruptibly,以及 tryAcquireNanos,其中 acquireInterruptibly 是 acquire 的中斷版本,在等待獲取資源期間支持響應中斷請求,tryAcquireNanos 除了支持響應中斷以外,還引入了超時等待機制。

下面主要分析一下 AbstractQueuedSynchronizer#acquire 的實現,理解了該方法的實現機制,也就自然而然理解了另外兩個版本的實現機制。方法 AbstractQueuedSynchronizer#acquire 的實現如下:

 1public final void acquire(int arg) {
 2    if (!this.tryAcquire(arg) // 嘗試獲取資源
 3            // 如果獲取資源失敗,則將當前線程加入到同步隊列的末端(獨佔模式),並基於自旋機制等待獲取資源
 4            && this.acquireQueued(this.addWaiter(Node.EXCLUSIVE), arg)) {
 5        // 等待獲取資源期間曾被中斷過,在獲取資源成功之後再響應中斷
 6        selfInterrupt();
 7    }
 8}
 9
10

方法 AbstractQueuedSynchronizer#tryAcquire 的功能在前面已經簡單介紹過了,用於嘗試獲取資源,如果獲取資源失敗則會將當前線程添加到同步隊列中,基於自旋機制等待獲取資源。

方法 AbstractQueuedSynchronizer#addWaiter 用於將當前線程對象封裝成結點添加到同步隊列末端,並最終返回線程結點對象:

 1private Node addWaiter(Node mode) {
 2    // 爲當前線程創建結點對象
 3    Node node = new Node(Thread.currentThread(), mode);
 4    // 基於 CAS 機制嘗試快速添加結點到同步隊列末端
 5    Node pred = tail;
 6    if (pred != null) {
 7        node.prev = pred;
 8        if (this.compareAndSetTail(pred, node)) {
 9            pred.next = node;
10            return node;
11        }
12    }
13    // 快速添加失敗,繼續嘗試將該結點添加到同步隊列末端,如果同步隊列未被初始化則執行初始化
14    this.enq(node);
15    // 返回當前線程對應的結點對象
16    return node;
17}
18
19

上述方法在添加結點的時候,如果同步隊列已經存在,則嘗試基於 CAS 操作快速將當前結點添加到同步隊列末端。如果添加失敗,或者隊列不存在,則需要再次調用 AbstractQueuedSynchronizer#enq 方法執行添加操作,該方法在判斷隊列不存在時會初始化同步隊列,然後基於 CAS 機制嘗試往同步隊列末端插入線程結點。方法實現如下:

 1private Node enq(final Node node) {
 2    for (; ; ) {
 3        // 獲取同步隊列末尾結點
 4        Node t = tail;
 5        // 如果結點不存在,則初始化
 6        if (t == null) { // Must initialize
 7            if (this.compareAndSetHead(new Node())) {
 8                tail = head;
 9            }
10        } else {
11            // 往末尾追加
12            node.prev = t;
13            if (this.compareAndSetTail(t, node)) {
14                t.next = node;
15                return t;
16            }
17        }
18    }
19}
20
21

完成了結點的入同步隊列操作,接下來會調用 AbstractQueuedSynchronizer#acquireQueued 方法基於自旋機制等待獲取資源,在等待期間並不會響應中斷,而是記錄中斷標誌,等待獲取資源成功後延遲響應。方法實現如下:

 1final boolean acquireQueued(final Node node, int arg) {
 2    boolean failed = true;
 3    try {
 4        boolean interrupted = false; // 標記自旋過程中是否被中斷
 5        // 基於自旋機制等待獲取資源
 6        for (; ; ) {
 7            // 獲取前驅結點
 8            final Node p = node.predecessor();
 9            // 如果前驅結點爲頭結點,說明當前結點是排在同步隊列最前面,可以嘗試獲取資源
10            if (p == head && this.tryAcquire(arg)) {
11                // 獲取資源成功,更新頭結點
12                this.setHead(node); // 頭結點一般記錄持有資源的線程結點
13                p.next = null; // help GC
14                failed = false;
15                return interrupted; // 自旋過程中是否被中斷
16            }
17            // 如果還未輪到當前結點,或者獲取資源失敗
18            if (shouldParkAfterFailedAcquire(p, node) // 判斷是否需要阻塞當前線程
19                    && this.parkAndCheckInterrupt()) { // 如果需要,則進入阻塞狀態,並在甦醒時檢查中斷狀態
20                // 標識等待期間被中斷
21                interrupted = true;
22            }
23        }
24    } finally {
25        // 嘗試獲取資源失敗,說明執行異常,取消當前結點獲取資源的進程
26        if (failed) {
27            this.cancelAcquire(node);
28        }
29    }
30}
31
32

上述方法會循環檢測當前結點是否已經排在同步隊列的最前端,如果是則調用 AbstractQueuedSynchronizer#tryAcquire 方法嘗試獲取資源,具體獲取資源的過程由子類實現。自旋期間如果還未輪到調度當前線程結點,或者嘗試獲取資源失敗,則會調用 AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire 方法檢測是否需要阻塞當前線程,具體判定的過程依賴於前驅結點的等待狀態,實現如下:

 1private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2    // 獲取前驅結點狀態
 3    int ws = pred.waitStatus;
 4    if (ws == Node.SIGNAL) {
 5        // 前驅結點狀態爲 SIGNAL,說明當前結點需要被阻塞
 6        return true;
 7    }
 8    if (ws > 0) {
 9        // 前驅結點處於取消狀態,則一直往前尋找處於等待狀態的結點,並排在其後面
10        do {
11            node.prev = pred = pred.prev;
12        } while (pred.waitStatus > 0);
13        pred.next = node;
14    } else {
15        /*
16         * 前驅結點的狀態爲 0 或 PROPAGATE,但是當前結點需要一個被喚醒的信號,
17         * 所以基於 CAS 將前驅結點等待狀態設置爲 SIGNAL,在阻塞之前,調用者需要重試以再次確認不能獲取到資源。
18         */
19        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
20    }
21    return false;
22}
23
24

上述方法首先會獲取前驅結點的等待狀態,並依據具體的狀態值進行決策:

  1. 如果前驅結點等待狀態爲 SIGNAL,則說明當前結點需要被阻塞,所以直接返回 true;

  2. 否則,如果前驅結點的等待狀態大於 0(即處於取消狀態),則一直往前尋找未被取消的結點,並將當前結點排在其後,這種情況下直接返回 false,再次嘗試獲取一次資源;

  3. 否則,前驅結點的狀態爲 0 或 PROPAGATE(不可能爲 CONDITION 狀態,因爲當前處於同步隊列),因爲當前結點需要一個喚醒信號,所以修改前驅結點的狀態爲 SIGNAL,這種情況下同樣返回 false,以再次確認不能獲取到資源。

如果上述檢查返回 true,則接下來會調用 AbstractQueuedSynchronizer#parkAndCheckInterrupt 方法,基於 LockSupport 工具阻塞當前線程,並在線程甦醒時檢查中斷狀態。如果期間被中斷過則記錄中斷標記,而不立即響應,直到成功獲取到資源,或者期間發生異常退出自旋。方法 AbstractQueuedSynchronizer#acquireQueued 最終會返回這一中斷標記,並在外圍進行響應。

如果在自旋期間發生異常,則上述方法會執行 AbstractQueuedSynchronizer#cancelAcquire 以取消當前結點等待獲取資源的進程,包括設置結點的等待狀態爲 CANCELLED,喚醒後繼結點等。推薦:Java 面試練題寶典

獨佔釋放資源

針對獨佔模式釋放資源,AbstractQueuedSynchronizer 定義了單一實現,即 AbstractQueuedSynchronizer#release 方法,該方法本質上是一個調度的過程,具體釋放資源的操作交由 tryRelease 方法完成,由子類實現。方法 AbstractQueuedSynchronizer#release 實現如下:

 1public final boolean release(int arg) {
 2    // 嘗試釋放資源
 3    if (this.tryRelease(arg)) {
 4        Node h = head;
 5        // 如果釋放資源成功,則嘗試喚醒後繼結點
 6        if (h != null && h.waitStatus != 0) {
 7            this.unparkSuccessor(h);
 8        }
 9        return true;
10    }
11    return false;
12}
13
14

如果 tryRelease 釋放資源成功,則上述方法會嘗試喚醒同步隊列中由後往前距離頭結點最近的一個結點上的線程。方法 AbstractQueuedSynchronizer#unparkSuccessor 的實現如下:

 1private void unparkSuccessor(Node node) {
 2    // 獲取當前結點狀態
 3    int ws = node.waitStatus;
 4    if (ws < 0) {
 5        // 如果當前結點未被取消,則基於 CAS 更新結點等待狀態爲 0
 6        compareAndSetWaitStatus(node, ws, 0);
 7    }
 8
 9    /*
10     * Thread to unpark is held in successor, which is normally just the next node.
11     * But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.
12     */
13    Node s = node.next; // 獲取後繼結點
14    // 如果後繼結點爲 null,或者被取消
15    if (s == null || s.waitStatus > 0) {
16        s = null;
17        // 從後往前尋找距離當前結點最近的一個未被取消的線程結點
18        for (Node t = tail; t != null && t != node; t = t.prev) {
19            if (t.waitStatus <= 0) {
20                s = t;
21            }
22        }
23    }
24    // 喚醒結點上的線程
25    if (s != null) {
26        LockSupport.unpark(s.thread);
27    }
28}
29
30

選舉待喚醒線程結點的過程被設計成從後往前遍歷,尋找距離當前結點最近的未被取消的結點,並調用 LockSupport 工具類喚醒結點上的線程。

那 爲什麼要設計成從後往前遍歷同步隊列呢 ?在 Doug Lea 大神的論文 The java.util.concurrent Synchronizer Framework 中給出了答案,摘錄如下:

An AbstractQueuedSynchronizer queue node contains a next link to its successor. But because there are no applicable techniques for lock-free atomic insertion of double-linked listnodes using compareAndSet, this link is not atomically set as part of insertion; it is simply assigned: pred.next = node; after the insertion. This is reflected in all usages. The next link is treated only as an optimized path. If a node’s successor does not appear to exist (or appears to be cancelled) via its next field, it is always possible to start at the tail of the list and traverse backwards using the pred field to accurately check if therereally is one.

也就說對於雙向鏈表而言,沒有不加鎖的原子手段可以保證構造雙向指針的線程安全性。回到代碼中,我們回顧一下往同步隊列中添加結點的執行過程,如下(其中 pred 是末尾結點,而 node 是待插入的結點):

1node.prev = pred;
2if (this.compareAndSetTail(pred, node)) {
3    pred.next = node;
4    return node;
5}
6
7

上述方法會將 node 結點的 prev 指針指向 pred 結點,而將 pred 的 next 指針指向 node 結點的過程需要建立在基於 CAS 成功將 node 設置爲末端結點的基礎之上,如果這一過程失敗則 next 指針將會斷掉,而選擇從後往前遍歷則始終能夠保證遍歷到頭結點。

共享獲取資源

針對共享模式獲取資源,AbstractQueuedSynchronizer 同樣定義了多個版本的 acquire 方法實現,包括:acquireShared、acquireSharedInterruptibly,以及 tryAcquireSharedNanos,其中 acquireSharedInterruptibly 是 acquireShared 的中斷版本,在等待獲取資源期間支持響應中斷請求,tryAcquireSharedNanos 除了支持響應中斷以外,還引入了超時等待機制。下面同樣主要分析一下 AbstractQueuedSynchronizer#acquireShared 的實現,理解了該方法的實現機制,也就自然而然理解了另外兩個版本的實現機制。

方法 AbstractQueuedSynchronizer#acquireShared 的實現如下:

 1public final void acquireShared(int arg) {
 2    // 返回負數表示獲取資源失敗
 3    if (this.tryAcquireShared(arg) < 0) {
 4        // 將當前線程添加到條件隊列,基於自旋等待獲取資源
 5        this.doAcquireShared(arg);
 6    }
 7}
 8
 9private void doAcquireShared(int arg) {
10    // 將當前線程加入條件隊列末端,並標記爲共享模式
11    final Node node = this.addWaiter(Node.SHARED);
12    boolean failed = true;
13    try {
14        boolean interrupted = false; // 標記自旋過程中是否被中斷
15        for (; ; ) {
16            // 獲取前驅結點
17            final Node p = node.predecessor();
18            // 如果前驅結點爲頭結點,說明當前結點是排在同步隊列最前面,可以嘗試獲取資源
19            if (p == head) {
20                // 嘗試獲取資源
21                int r = this.tryAcquireShared(arg);
22                if (r >= 0) {
23                    // 獲取資源成功,設置自己爲頭結點,並嘗試喚醒後繼結點
24                    this.setHeadAndPropagate(node, r);
25                    p.next = null; // help GC
26                    if (interrupted) {
27                        selfInterrupt();
28                    }
29                    failed = false;
30                    return;
31                }
32            }
33            // 如果還未輪到當前結點,或者獲取資源失敗
34            if (shouldParkAfterFailedAcquire(p, node) // 判斷是否需要阻塞當前線程
35                    && this.parkAndCheckInterrupt()) { // 如果需要,則進入阻塞狀態,並在甦醒時檢查中斷狀態
36                // 標識等待期間被中斷
37                interrupted = true;
38            }
39        }
40    } finally {
41        // 嘗試獲取資源失敗,說明執行異常,取消當前結點獲取資源的進程
42        if (failed) {
43            this.cancelAcquire(node);
44        }
45    }
46}
47
48

上述方法與 AbstractQueuedSynchronizer#acquire 的實現邏輯大同小異,區別在於線程在被封裝成結點之後,是以共享(SHARED)模式在同步隊列中進行等待。這裏我們重點關注一下 AbstractQueuedSynchronizer#setHeadAndPropagate 方法的實現,當結點上的線程成功獲取到資源會觸發執行該方法,以嘗試喚醒後繼結點。實現如下:

 1private void setHeadAndPropagate(Node node, int propagate) {
 2    Node h = head; // 記錄之前的頭結點
 3    this.setHead(node); // 頭結點一般記錄持有資源的線程結點
 4    /*
 5     * 如果滿足以下條件,嘗試喚醒後繼結點:
 6     *
 7     * 1. 存在剩餘可用的資源;
 8     * 2. 後繼結點處於等待狀態,或後繼結點爲空
 9     *
10     * Try to signal next queued node if:
11     *   Propagation was indicated by caller,
12     *   or was recorded (as h.waitStatus either before or after setHead) by a previous operation
13     *   (note: this uses sign-check of waitStatus because PROPAGATE status may transition to SIGNAL.)
14     * and
15     *   The next node is waiting in shared mode,
16     *   or we don't know, because it appears null
17     *
18     * The conservatism in both of these checks may cause unnecessary wake-ups,
19     * but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.
20     */
21    if (propagate > 0 // 存在剩餘可用的資源
22            || h == null || h.waitStatus < 0 // 此時 h 是之前的頭結點
23            || (h = head) == null || h.waitStatus < 0) { // 此時 h 已經更新爲當前頭結點
24        Node s = node.next;
25        // 如果後繼結點以共享模式在等待,或者後繼結點未知,則嘗試喚醒後繼結點
26        if (s == null || s.isShared()) {
27            this.doReleaseShared();
28        }
29    }
30}
31
32

因爲當前結點已經獲取到資源,所以需要將當前結點記錄到頭結點中。此外,如果滿足以下 2 種情況之一,還需要喚醒後繼結點:

  1. 參數 propagate > 0,即存在可用的剩餘資源;

  2. 前任頭結點或當前頭結點不存在,或指明後繼結點需要被喚醒。

如果滿足上述條件之一,且後繼結點狀態未知或以共享模式在等待,則調用 AbstractQueuedSynchronizer#doReleaseShared 方法喚醒後繼結點,關於該方法的實現留到下一小節進行分析。

共享釋放資源

針對共享模式釋放資源,AbstractQueuedSynchronizer 同樣定義了單一實現,即 AbstractQueuedSynchronizer#releaseShared 方法,該方法本質上也是一個調度的過程,具體釋放資源的操作交由 tryReleaseShared 方法完成,由子類實現。方法 AbstractQueuedSynchronizer#releaseShared 實現如下:

 1public final boolean releaseShared(int arg) {
 2    // 嘗試釋放資源
 3    if (this.tryReleaseShared(arg)) {
 4        // 釋放資源成功,喚醒後繼結點
 5        this.doReleaseShared();
 6        return true;
 7    }
 8    return false;
 9}
10
11private void doReleaseShared() {
12    /*
13     * Ensure that a release propagates, even if there are other in-progress acquires/releases.
14     * This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal.
15     * But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues.
16     * Additionally, we must loop in case a new node is added while we are doing this.
17     * Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking.
18     */
19    for (; ; ) {
20        Node h = head;
21        if (h != null && h != tail) {
22            int ws = h.waitStatus;
23            // 如果頭結點狀態爲 SIGNAL,則在喚醒後繼結點之前嘗試清除當前結點的狀態
24            if (ws == Node.SIGNAL) {
25                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
26                    // loop to recheck cases
27                    continue;
28                }
29                // 喚醒後繼結點
30                this.unparkSuccessor(h);
31            }
32            /*
33             * 如果後繼結點暫時不需要被喚醒,則基於 CAS 嘗試將目標結點的 waitStatus 由 0 修改爲 PROPAGATE,
34             * 以保證後續由喚醒通知到來時,能夠將通知傳遞下去
35             */
36            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
37                // loop on failed CAS
38                continue;
39            }
40        }
41        // 如果頭結點未變更,則說明期間持有鎖的線程未發生變化,能夠走到這一步說明前面的操作已經成功完成
42        if (h == head) {
43            break;
44        }
45        // 如果頭結點發生變更,則說明期間持有鎖的線程發生了變化,需要重試以保證喚醒動作的成功執行
46    }
47}
48
49

如果釋放資源成功,需要依據頭結點當下等待狀態分別處理:

  1. 如果頭結點的等待狀態爲 SIGNAL,則表明後繼結點需要被喚醒,在執行喚醒操作之前需要清除等待狀態。

  2. 如果頭結點狀態爲 0,則表示後繼結點不需要被喚醒,此時需要將結點狀態修改爲 PROPAGATE,以保證後續接收到喚醒通知時能夠將通知傳遞下去。

總結

本文我們分析了 AQS 的設計與實現。理解了 AQS 的運行機制也就理解了 java 的 Lock 鎖是如何實現線程的阻塞、喚醒、等待和通知機制的,所以理解 AQS 也是我們後面分析 Lock 鎖和同步器實現的基礎。

從下一篇開始,我們將介紹 JUC 中基於 AQS 實現的組件,包括 ReentrantLock、ReentrantReadWriteLock、CountDownLatch,以及 Semaphore 等,去分析 AQS 中定義的模板方法是如何在這些組件中進行實現的。

參考

  1. JDK 1.8 源碼

  2. The java.util.concurrent Synchronizer Framework

  3. 知乎:Java AQS unparkSuccessor 方法中 for 循環從 tail 開始而不是 head 的疑問?

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DM60_avZYjnTwrm71gJY1g