1-5w 字,30 圖帶你徹底掌握 AQS!

前言

AQS( AbstractQueuedSynchronizer )是一個用來構建鎖和同步器(所謂同步,是指線程之間的通信、協作)的框架,Lock 包中的各種鎖(如常見的 ReentrantLock, ReadWriteLock), concurrent 包中的各種同步器(如 CountDownLatch, Semaphore, CyclicBarrier)都是基於 AQS 來構建,所以理解 AQS 的實現原理至關重要,AQS 也是面試中區分侯選人的常見考點,我們務必要掌握,本文將用循序漸近地介紹 AQS,相信大家看完一定有收穫。文章目錄如下

  1. 鎖原理 - 信號量 vs 管程

  2. AQS 實現原理

  3. AQS 源碼剖析

  4. 如何利用 AQS 自定義一個互斥鎖

鎖原理 - 信號量 vs 管程

在併發編程領域,有兩大核心問題:互斥同步,互斥即同一時刻只允許一個線程訪問共享資源,同步,即線程之間如何通信、協作,一般這兩大問題可以通過信號量管程來解決。

信號量

信號量(Semaphore)是操作系統提供的一種進程間常見的通信方式,主要用來協調併發程序對共享資源的訪問,操作系統可以保證對信號量操作的原子性。它是怎麼實現的呢。

示意圖如下

信號量機制的引入解決了進程同步和互斥問題,但信號量的大量同步操作分散在各個進程中不便於管理,還有可能導致系統死鎖。如:生產者消費者問題中將 P、V 顛倒可能死鎖(見文末參考鏈接),另外條件越多,需要的信號量就越多,需要更加謹慎地處理信號量之間的處理順序,否則很容易造成死鎖現象。

基於信號量給編程帶來的隱患,於是有了提出了對開發者更加友好的併發編程模型 - 管程

管程

Dijkstra 於 1971 年提出:把所有進程對某一種臨界資源的同步操作都集中起來,構成一個所謂的祕書進程。凡要訪問該臨界資源的進程,都需先報告祕書,由祕書來實現諸進程對同一臨界資源的互斥使用,這種機制就是管程。

管程是一種在信號量機制上進行改進的併發編程模型,解決了信號量在臨界區的 PV 操作上配對的麻煩,把配對的 PV 操作集中在一起而形成的併發編程方法理論,極大降低了使用和理解成本。

管程由四部分組成:

  1. 管程內部的共享變量。

  2. 管程內部的條件變量。

  3. 管程內部並行執行的進程。

  4. 對於局部與管程內部的共享數據設置初始值的語句。

由此可見,管程就是一個對象監視器。任何線程想要訪問該資源(共享變量),就要排隊進入監控範圍。進入之後,接受檢查,不符合條件,則要繼續等待,直到被通知,然後繼續進入監視器。

需要注意的事,信號量和管程兩者是等價的,信號量可以實現管程,管程也可以實現信號量,只是兩者的表現形式不同而已,管程對開發者更加友好。

兩者的區別如下

管程爲了解決信號量在臨界區的 PV 操作上的配對的麻煩,把配對的 PV 操作集中在一起,並且加入了條件變量的概念,使得在多條件下線程間的同步實現變得更加簡單。

怎麼理解管程中的入口等待隊列,共享變量,條件變量等概念,有時候技術上的概念較難理解,我們可以藉助生活中的場景來幫助我們理解,就以我們的就醫場景爲例來簡單說明一下,正常的就醫流程如下:

  1. 病人去掛號後,去侯診室等待叫號

  2. 叫到自己時,就可以進入就診室就診了

  3. 就診時,有兩種情況,一種是醫生很快就確定病人的病,並作出診斷,診斷完成後,就通知下一位病人進來就診,一種是醫生無法確定病因,需要病人去做個驗血 / CT 檢查才能確定病情,於是病人就先去驗個血 /  CT

  4. 病人驗完血 / 做完 CT 後,重新取號,等待叫號(進入入口等待隊列)

  5. 病人等到自己的號, 病人又重新拿着驗血 / CT 報告去找醫生就診

整個流程如下

那麼管程是如何解決互斥同步的呢

首先來看互斥,上文中醫生即共享資源(也即共享變量),就診室即爲臨界區,病人即線程,任何病人如果想要訪問臨界區,必須首先獲取共享資源(即醫生),入口一次只允許一個線程經過,在共享資源被佔有的情況下,如果再有線程想佔有共享資源,就需要到等待隊列去等候,等到獲取共享資源的線程釋放資源後,等待隊列中的線程就可以去競爭共享資源了,這樣就解決了互斥問題,所以本質上管程是通過將共享資源及其對共享資源的操作(線程安全地獲取和釋放)封裝起來來保證互斥性的。

再來看同步,同步是通過文中的條件變量及其等待隊列實現的,同步的實現分兩種情況

  1. 病人進入就診室後,無需做驗血 / CT 等操作,於是醫生診斷完成後,就會釋放共享資源(解鎖)去通知(notify,notifyAll)入口等待隊列的下一個病人,下一個病人聽到叫號後就能看醫生了。

  2. 如果病人進入就診室後需要做驗血 / CT 等操作,會去驗血 / CT 隊列(條件隊列)排隊, 同時釋放共享變量(醫生),通知入口等待隊列的其他病人(線程)去獲取共享變量(醫生),獲得許可的線程執行完臨界區的邏輯後會喚醒條件變量等待隊列中的線程,將它放到入口等待隊列中 ,等到其獲取共享變量(醫生)時,即可進入入口(臨界區)處理。

在 Java 裏,鎖大多是依賴於管程來實現的,以大家熟悉的內置鎖 synchronized 爲例,它的實現原理如下。

可以看到 synchronized 鎖也是基於管程實現的,只不過它只有且只有一個條件變量(就是鎖對象本身)而已,這也是爲什麼 JDK 要實現 Lock 鎖的原因之一,Lock 支持多個條件變量。

通過這樣的類比,相信大家對管程的工作機制有了比較清晰的認識,爲啥要花這麼大的力氣介紹管程呢,一來管程是解決併發問題的萬能鑰匙,二來 AQS 是基於 Java 併發包中管程的一種實現,所以理解管程對我們理解 AQS 會大有幫助,接下來我們就來看看 AQS 是如何工作的。

AQS 實現原理

AQS 全稱是 AbstractQueuedSynchronizer,是一個用來構建同步器的框架,它維護了一個共享資源 state 和一個 FIFO 的等待隊列(即上文中管程的入口等待隊列),底層利用了 CAS 機制來保證操作的原子性。

AQS 實現鎖的主要原理如下:

以實現獨佔鎖爲例(即當前資源只能被一個線程佔有),其實現原理如下:state 初始化 0,在多線程條件下,線程要執行臨界區的代碼,必須首先獲取 state,某個線程獲取成功之後, state 加 1,其他線程再獲取的話由於共享資源已被佔用,所以會到 FIFO 等待隊列去等待,等佔有 state 的線程執行完臨界區的代碼釋放資源 (state 減 1) 後,會喚醒 FIFO 中的下一個等待線程(head 中的下一個結點)去獲取 state。

state 由於是多線程共享變量,所以必須定義成 volatile,以保證 state 的可見性, 同時雖然 volatile 能保證可見性,但不能保證原子性,所以 AQS 提供了對 state 的原子操作方法,保證了線程安全。

另外 AQS 中實現的 FIFO 隊列(CLH 隊列)其實是雙向鏈表實現的,由 head, tail 節點表示,head 結點代表當前佔用的線程,其他節點由於暫時獲取不到鎖所以依次排隊等待鎖釋放。

所以我們不難明白 AQS 的如下定義:

public abstract class AbstractQueuedSynchronizer
  extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // 以下爲雙向鏈表的首尾結點,代表入口等待隊列
    private transient volatile Node head;
    private transient volatile Node tail;
    // 共享變量 state
    private volatile int state;
    // cas 獲取 / 釋放 state,保證線程安全地獲取鎖
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    // ...
 }

AQS 源碼剖析

ReentrantLock 是我們比較常用的一種鎖,也是基於 AQS 實現的,所以接下來我們就來分析一下 ReentrantLock 鎖的實現來一探 AQS 究竟。本文將會採用圖文並茂的方式讓大家理解 AQS 的實現原理,大家在學習過程中,可以多類比一下上文中就診的例子,相信會有助於理解。

首先我們要知道 ReentrantLock 是獨佔鎖,也有公平和非公平兩種鎖模式,什麼是獨佔與有共享模式,什麼又是公平鎖與非公平鎖

與獨佔鎖對應的是共享鎖,這兩者有什麼區別呢

獨佔鎖:即其他線程只有在佔有鎖的線程釋放後才能競爭鎖,有且只有一個線程能競爭成功(醫生只有一個,一次只能看一個病人)

共享鎖:即共享資源可以被多個線程同時佔有,直到共享資源被佔用完畢(多個醫生,可以看多個病人),常見的有讀寫鎖 ReadWriteLock, CountdownLatch,兩者的區別如下

什麼是公平鎖與非公平鎖

還是以就醫爲例,所謂公平鎖即大家取號後老老實實按照先來後到的順序在侯診室依次等待叫號,如果是非公平鎖呢,新來的病人(線程)很霸道,不取號排隊 ,直接去搶先看病,佔有醫生(不一定成功)

公平鎖與非公平鎖

本文我們將會重點分析獨佔,非公平模式的源碼實現,不分析共享模式與 Condition 的實現,因爲剖析了獨佔鎖的實現,由於原理都是相似的,再分析共享與 Condition 就不難了。

首先我們先來看下 ReentrantLock 的使用方法

// 1. 初始化可重入鎖
private ReentrantLock lock = new ReentrantLock();
public void run() {
    // 加鎖
    lock.lock();
    try {
        // 2. 執行臨界區代碼
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 3. 解鎖
        lock.unlock();
    }
}

第一步是初始化可重入鎖,可以看到默認使用的是非公平鎖機制

public ReentrantLock() {
    sync = new NonfairSync();
}

當然你也可以用如下構造方法來指定使用公平鎖:

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

畫外音: FairSync 和 NonfairSync 是 ReentrantLock 實現的內部類,分別指公平和非公平模式,ReentrantLock ReentrantLock 的加鎖(lock),解鎖(unlock)在內部具體都是調用的 FairSync,NonfairSync 的加鎖和解鎖方法。

幾個類的關係如下:

我們先來剖析下非公平鎖(NonfairSync)的實現方式,來看上述示例代碼的第二步:加鎖,由於默認的是非公平鎖的加鎖,所以我們來分析下非公平鎖是如何加鎖的

可以看到 lock 方法主要有兩步

  1. 使用 CAS 來獲取 state 資源,如果成功設置 1,代表 state 資源獲取鎖成功,此時記錄下當前佔用 state 的線程 setExclusiveOwnerThread(Thread.currentThread());

  2. 如果 CAS 設置 state 爲 1 失敗(代表獲取鎖失敗),則執行 acquire(1) 方法,這個方法是 AQS 提供的方法,如下

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire 剖析

首先 調用 tryAcquire 嘗試着獲取 state,如果成功,則跳過後面的步驟。如果失敗,則執行 acquireQueued 將線程加入 CLH 等待隊列中。

先來看下 tryAcquire 方法,這個方法是 AQS 提供的一個模板方法,最終由其 AQS 具體的實現類(Sync)實現,由於執行的是非公平鎖邏輯,執行的代碼如下:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();

    if (c == 0) {
        // 如果 c 等於0,表示此時資源是空閒的(即鎖是釋放的),再用 CAS 獲取鎖
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 此條件表示之前已有線程獲得鎖,且此線程再一次獲得了鎖,獲取資源次數再加 1,這也映證了 ReentrantLock 爲可重入鎖
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

此段代碼可知鎖的獲取主要分兩種情況

  1. state 爲 0 時,代表鎖已經被釋放,可以去獲取,於是使用 CAS 去重新獲取鎖資源,如果獲取成功,則代表競爭鎖成功,使用 setExclusiveOwnerThread(current) 記錄下此時佔有鎖的線程,看到這裏的 CAS,大家應該不難理解爲啥當前實現是非公平鎖了,因爲隊列中的線程與新線程都可以 CAS 獲取鎖啊,新來的線程不需要排隊

  2. 如果 state 不爲 0,代表之前已有線程佔有了鎖,如果此時的線程依然是之前佔有鎖的線程(current == getExclusiveOwnerThread() 爲 true),代表此線程再一次佔有了鎖(可重入鎖),此時更新 state,記錄下鎖被佔有的次數(鎖的重入次數), 這裏的 setState 方法不需要使用 CAS 更新,因爲此時的鎖就是當前線程佔有的,其他線程沒有機會進入這段代碼執行。所以此時更新 state 是線程安全的。

假設當前 state = 0,即鎖不被佔用,現在有 T1, T2, T3 這三個線程要去競爭鎖

假設現在 T1 獲取鎖成功,則兩種情況分別爲 1、 T1 首次獲取鎖成功

2、 T1 再次獲取鎖成功,state 再加 1,表示鎖被重入了兩次,當前如果 T1 一直申請佔用鎖成功,state 會一直累加

acquireQueued 剖析

如果 tryAcquire(arg) 執行失敗,代表獲取鎖失敗,則執行 acquireQueued 方法,將線程加入 FIFO 等待隊列

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

所以接下來我們來看看 acquireQueued 的執行邏輯,首先會調用 addWaiter(Node.EXCLUSIVE)  將包含有當前線程的 Node 節點入隊, Node.EXCLUSIVE 代表此結點爲獨佔模式

再來看下 addWaiter 是如何實現的

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 如果尾結點不爲空,則用 CAS 將獲取鎖失敗的線程入隊
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果結點爲空,執行 enq 方法
    enq(node);
    return node;
}

這段邏輯比較清楚,首先是獲取 FIFO 隊列的尾結點,如果尾結點存在,則採用 CAS 的方式將等待線程入隊,如果尾結點爲空則執行 enq 方法

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 尾結點爲空,說明 FIFO 隊列未初始化,所以先初始化其頭結點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 尾結點不爲空,則將等待線程入隊
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

首先判斷 tail 是否爲空,如果爲空說明 FIFO 隊列的 head,tail 還未構建,此時先構建頭結點,構建之後再用 CAS 的方式將此線程結點入隊

使用 CAS 創建 head 節點的時候只是簡單調用了 new Node() 方法,並不像其他節點那樣記錄 thread,這是爲啥

因爲 head 結點爲虛結點,它只代表當前有線程佔用了 state,至於佔用 state 的是哪個線程,其實是調用了上文的 setExclusiveOwnerThread(current) ,即記錄在 exclusiveOwnerThread 屬性裏。

執行完 addWaiter 後,線程入隊成功,現在就要看最後一個最關鍵的方法 acquireQueued 了,這個方法有點難以理解,先不急,我們先用三個線程來模擬一下之前的代碼對應的步驟

1、假設 T1 獲取鎖成功,由於此時 FIFO 未初始化,所以先創建 head 結點

2、此時 T2 或 T3 再去競爭 state 失敗,入隊,如下圖:

好了,現在問題來了, T2,T3 入隊後怎麼處理,是馬上阻塞嗎,馬上阻塞意味着線程從運行態轉爲阻塞態 ,涉及到用戶態向內核態的切換,而且喚醒後也要從內核態轉爲用戶態,開銷相對比較大,所以 AQS 對這種入隊的線程採用的方式是讓它們自旋來競爭鎖,如下圖示

不過聰明的你可能發現了一個問題,如果當前鎖是獨佔鎖,如果鎖一直被被 T1 佔有, T2,T3 一直自旋沒太大意義,反而會佔用 CPU,影響性能,所以更合適的方式是它們自旋一兩次競爭不到鎖後識趣地阻塞以等待前置節點釋放鎖後再來喚醒它。

另外如果鎖在自旋過程中被中斷了,或者自旋超時了,應該處於「取消」狀態。

基於每個 Node 可能所處的狀態,AQS 爲其定義了一個變量 waitStatus,根據這個變量值對相應節點進行相關的操作,我們一起來看這看這個變量都有哪些值,是時候看一個 Node 結點的屬性定義了

static final class Node {
    static final Node SHARED = new Node();//標識等待節點處於共享模式
    static final Node EXCLUSIVE = null;//標識等待節點處於獨佔模式

    static final int CANCELLED = 1; //由於超時或中斷,節點已被取消
    static final int SIGNAL = -1;  // 節點阻塞(park)必須在其前驅結點爲 SIGNAL 的狀態下才能進行,如果結點爲 SIGNAL,則其釋放鎖或取消後,可以通過 unpark 喚醒下一個節點,
    static final int CONDITION = -2;//表示線程在等待條件變量(先獲取鎖,加入到條件等待隊列,然後釋放鎖,等待條件變量滿足條件;只有重新獲取鎖之後才能返回)
    static final int PROPAGATE = -3;//表示後續結點會傳播喚醒的操作,共享模式下起作用

    //等待狀態:對於condition節點,初始化爲CONDITION;其它情況,默認爲0,通過CAS操作原子更新
    volatile int waitStatus;

通過狀態的定義,我們可以猜測一下 AQS 對線程自旋的處理:如果當前節點的上一個節點不爲 head,且它的狀態爲 SIGNAL,則結點進入阻塞狀態。來看下代碼以映證我們的猜測:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果前一個節點是 head,則嘗試自旋獲取鎖
            if (p == head && tryAcquire(arg)) {
                //  將 head 結點指向當前節點,原 head 結點出隊
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果前一個節點不是 head 或者競爭鎖失敗,則進入阻塞狀態
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 如果線程自旋中因爲異常等原因獲取鎖最終失敗,則調用此方法
            cancelAcquire(node);
    }
}

先來看第一種情況,如果當前結點的前一個節點是 head 結點,且獲取鎖(tryAcquire)成功的處理

可以看到主要的處理就是把 head 指向當前節點,並且讓原 head 結點出隊,這樣由於原 head 不可達, 會被垃圾回收。

注意其中 setHead 的處理

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

將 head 設置成當前結點後,要把節點的 thread, pre 設置成 null,因爲之前分析過了,head 是虛節點,不保存除 waitStatus(結點狀態)的其他信息,所以這裏把 thread ,pre 置爲空,因爲佔有鎖的線程由 exclusiveThread 記錄了,如果 head 再記錄 thread 不僅多此一舉,反而在釋放鎖的時候要多操作一遍 head 的 thread 釋放。

如果前一個節點不是 head 或者競爭鎖失敗,則首先調用  shouldParkAfterFailedAcquire 方法判斷鎖是否應該停止自旋進入阻塞狀態:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
        
    if (ws == Node.SIGNAL)
       // 1. 如果前置頂點的狀態爲 SIGNAL,表示當前節點可以阻塞了
        return true;
    if (ws > 0) {
        // 2. 移除取消狀態的結點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 3. 如果前置節點的 ws 不爲 0,則其設置爲 SIGNAL,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

這一段代碼有點繞,需要稍微動點腦子,按以上步驟一步步來看

1、 首先我們要明白,根據之前 Node 類的註釋,如果前驅節點爲 SIGNAL,則當前節點可以進入阻塞狀態。

如圖示:T2,T3 的前驅節點的 waitStatus 都爲 SIGNAL,所以 T2,T3 此時都可以阻塞。

2、如果前驅節點爲取消狀態,則前驅節點需要移除,這些採用了一個更巧妙的方法,把所有當前節點之前所有 waitStatus 爲取消狀態的節點全部移除,假設有四個線程如下,T2,T3 爲取消狀態,則執行邏輯後如下圖所示,T2, T3 節點會被 GC。

3、如果前驅節點小於等於 0,則需要首先將其前驅節點置爲 SIGNAL, 因爲前文我們分析過,當前節點進入阻塞的一個條件是前驅節點必須爲 SIGNAL,這樣下一次自旋後發現前驅節點爲 SIGNAL,就會返回 true(即步驟 1)

shouldParkAfterFailedAcquire 返回 true 代表線程可以進入阻塞中斷,那麼下一步 parkAndCheckInterrupt 就該讓線程阻塞了

private final boolean parkAndCheckInterrupt() {
    // 阻塞線程
    LockSupport.park(this);
    // 返回線程是否中斷過,並且清除中斷狀態(在獲得鎖後會補一次中斷)
    return Thread.interrupted();
}

這裏的阻塞線程很容易理解,但爲啥要判斷線程是否中斷過呢,因爲如果線程在阻塞期間收到了中斷,喚醒(轉爲運行態)獲取鎖後(acquireQueued 爲 true)需要補一箇中斷,如下所示:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果是因爲中斷喚醒的線程,獲取鎖後需要補一下中斷
        selfInterrupt();
}

至此,獲取鎖的流程已經分析完畢,不過還有一個疑惑我們還沒解開:前文不是說 Node 狀態爲取消狀態會被取消嗎,那 Node 什麼時候會被設置爲取消狀態呢。

回頭看 acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 省略自旋獲取鎖代碼        
    } finally {
        if (failed)
            // 如果線程自旋中因爲異常等原因獲取鎖最終失敗,則調用此方法
            cancelAcquire(node);
    }
}

看最後一個 cancelAcquire 方法,如果線程自旋中因爲異常等原因獲取鎖最終失敗,則會調用此方法

private void cancelAcquire(Node node) {
    // 如果節點爲空,直接返回
    if (node == null)
        return;
    // 由於線程要被取消了,所以將 thread 線程清掉
    node.thread = null;

    // 下面這步表示將 node 的 pre 指向之前第一個非取消狀態的結點(即跳過所有取消狀態的結點),waitStatus > 0 表示當前結點狀態爲取消狀態
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 獲取經過過濾後的 pre 的 next 結點,這一步主要用在後面的 CAS 設置 pre 的 next 節點上
    Node predNext = pred.next;
    // 將當前結點設置爲取消狀態
    node.waitStatus = Node.CANCELLED;

    // 如果當前取消結點爲尾結點,使用 CAS 則將尾結點設置爲其前驅節點,如果設置成功,則尾結點的 next 指針設置爲空
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
    // 這一步看得有點繞,我們想想,如果當前節點取消了,那是不是要把當前節點的前驅節點指向當前節點的後繼節點,但是我們之前也說了,要喚醒或阻塞結點,須在其前驅節點的狀態爲 SIGNAL 的條件才能操作,所以在設置 pre 的 next 節點時要保證 pre 結點的狀態爲 SIGNAL,想通了這一點相信你不難理解以下代碼。
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
        // 如果 pre 爲 head,或者  pre 的狀態設置 SIGNAL 失敗,則直接喚醒後繼結點去競爭鎖,之前我們說過, SIGNAL 的結點取消(或釋放鎖)後可以喚醒後繼結點
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

這一段代碼有點繞,我們一個個來看,首先考慮以下情況

1、首先第一步當前節點之前有取消結點時,則邏輯如下

2、如果當前結點既非頭結點的後繼結點,也非尾結點,即步驟 1 所示,則最終結果如下

這裏肯定有人問,這種情況下當前節點與它的前驅結點無法被 GC 啊,還記得我們上文分析鎖自旋時的處理嗎, 再看下以下代碼

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 省略無關代碼
    if (ws > 0) {
        // 移除取消狀態的結點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } 
    return false;
}

這段代碼會將 node 的 pre 指向之前 waitStatus 爲非 CANCEL 的節點

所以當 T4 執行這段代碼時,會變成如下情況

可以看到此時中間的兩個 CANCEL 節點不可達了,會被 GC

3、如果當前結點爲 tail 結點,則結果如下,這種情況下當前結點不可達,會被 GC

4、如果當前結點爲 head 的後繼結點時,如下

結果中的 CANCEL 結點同樣會在 tail 結點自旋調用 shouldParkAfterFailedAcquire 後不可達,如下

自此我們終於分析完了鎖的獲取流程,接下來我們來看看鎖是如何釋放的。

鎖釋放

不管是公平鎖還是非公平鎖,最終都是調的 AQS 的如下模板方法來釋放鎖

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
    // 鎖釋放是否成功
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease 方法定義在了 AQS 的子類 Sync 方法裏

// java.util.concurrent.locks.ReentrantLock.Sync

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 只有持有鎖的線程才能釋放鎖,所以如果當前鎖不是持有鎖的線程,則拋異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 說明線程持有的鎖全部釋放了,需要釋放 exclusiveOwnerThread 的持有線程
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

鎖釋放成功後該幹嘛,顯然是喚醒之後 head 之後節點,讓它來競爭鎖

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
    // 鎖釋放是否成功
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 鎖釋放成功後,喚醒 head 之後的節點,讓它來競爭鎖
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這裏釋放鎖的條件爲啥是 h != null && h.waitStatus != 0 呢。

  1. 如果 h == null, 這有兩種可能,一種是一個線程在競爭鎖,現在它釋放了,當然沒有所謂的喚醒後繼節點,一種是其他線程正在運行競爭鎖,只是還未初始化頭節點,既然其他線程正在運行,也就無需執行喚醒操作

  2. 如果 h != null 且 h.waitStatus == 0,說明 head 的後繼節點正在自旋競爭鎖,也就是說線程是運行狀態的,無需喚醒。

  3. 如果 h != null 且 h.waitStatus < 0, 此時 waitStatus 值可能爲 SIGNAL,或 PROPAGATE,這兩種情況說明後繼結點阻塞需要被喚醒

來看一下喚醒方法 unparkSuccessor:

private void unparkSuccessor(Node node) {
    // 獲取 head 的 waitStatus(假設其爲 SIGNAL),並用 CAS 將其置爲 0,爲啥要做這一步呢,之前我們分析過多次,其實 waitStatus = SIGNAL(< -1)或 PROPAGATE(-·3) 只是一個標誌,代表在此狀態下,後繼節點可以喚醒,既然正在喚醒後繼節點,自然可以將其重置爲 0,當然如果失敗了也不影響其喚醒後繼結點
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 以下操作爲獲取隊列第一個非取消狀態的結點,並將其喚醒
    Node s = node.next;
    // s 狀態爲非空,或者其爲取消狀態,說明 s 是無效節點,此時需要執行 if 裏的邏輯
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 以下操作爲從尾向前獲取最後一個非取消狀態的結點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

這裏的尋找隊列的第一個非取消狀態的節點爲啥要從後往前找呢,因爲節點入隊並不是原子操作,如下

線程自旋時時是先執行 node.pre = pred, 然後再執行 pred.next = node,如果 unparkSuccessor 剛好在這兩者之間執行,此時是找不到  head 的後繼節點的,如下

如何利用 AQS 自定義一個互斥鎖

AQS 通過提供 state 及 FIFO 隊列的管理,爲我們提供了一套通用的實現鎖的底層方法,基本上定義一個鎖,都是轉爲在其內部定義 AQS 的子類,調用 AQS 的底層方法來實現的,由於 AQS 在底層已經爲了定義好了這些獲取 state 及 FIFO 隊列的管理工作,我們要實現一個鎖就比較簡單了,我們可以基於 AQS 來實現一個非可重入的互斥鎖,如下所示

public class Mutex  {

    private Sync sync = new Sync();
    
    public void lock () {
        sync.acquire(1);
    }
    
    public void unlock () {
        sync.release(1);
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire (int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease (int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively () {
            return getState() == 1;
        }
    }
}

可以看到區區幾行代碼就實現了,確實很方便。

總結

本文通過圖文並茂的方式幫助大家梳理了一遍 AQS 的實現方式,相信大家看完對 AQS 應該有了比較深入的認識,首先要明白鎖的實現原理,信號量及管程,理解了管程的設計思路對 AQS 有了一個概念上的認識,再去讀源碼就會用管程的概念去套,也就更容易理解了,另外大家可以多類比一下生活中的場景,如就醫場景,通過類似的方式學習能讓我們更好地理解相關技術的設計思路。

最後歡迎大家加我好友,拉你進技術交流羣,一起抱團取暖

巨人的肩膀

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/bdLT9CHZI6Ng6m9FE75YDw