源碼級深挖 AQS 隊列同步器

我們知道,在 java 中提供了兩類鎖的實現,一種是在 jvm 層級上實現的synchrinized隱式鎖,另一類是 jdk 在代碼層級實現的,juc 包下的Lock顯示鎖,而提到Lock就不得不提一下它的核心隊列同步器AQS)了,它的全稱是AbstractQueuedSynchronizer,是用來構建鎖或者其他一些同步組件的基礎,除了ReentrantLockReentrantReadWriteLock外,它還在CountDownLatchSemaphore以及ThreadPoolExecutor中被使用,通過理解隊列同步器的工作原理,對我們瞭解和使用這些工具類會有很大的幫助。

1、AQS 基礎

爲了便於理解 AQS 的概念,這裏首先摘錄部分AbstractQueuedSynchronizer的註釋進行了簡要翻譯:

它提供了一個框架,對於依賴先進先出等待隊列的阻塞鎖和同步器(例如信號量和事件),可以用它來實現。這個類的設計,對於大多數依賴於單個原子值來表示狀態(state)的同步器,可以提供有力的基礎。子類需要重寫被 protected 修飾的方法,例如更改狀態(state),定義在獲取或釋放對象時這些狀態表示的含義。基於這些,類中的其他方法實現了隊列和阻塞機制。在子類中可以維護其他的狀態字段,但是隻有使用 getState,setState,compareAndSetState 方法原子更新的狀態值變量,才與同步有關。

子類被推薦定義爲非 public 的內部類,用來實現封閉類的屬性同步。同步器本身沒有實現任何同步接口,它僅僅定義了一些方法,供具體的鎖和同步組件中的 public 方法調用。

隊列同步器支持獨佔模式和共享模式,當一個線程在獨佔模式下獲取時,其他線程不能獲取成功,在共享模式下多線程的獲取可能成功。在不同模式下,等待的線程使用的是相同的先進先出隊列。通常,實現子類只支持其中的一種模式,但是在 ReadWriteLock 中兩者都可以發揮作用。只支持一種模式的子類在實現時不需要重寫另一種模式中的方法。

閱讀這些註釋,可以知道AbstractQueuedSynchronizer是一個抽象類,它基於內部先進先出(FIFO)的雙向隊列、以及內置的一些protected方法來實現同步器,完成同步狀態的管理,並且我們可以通過子類繼承 AQS 抽象類的方式,在共享模式或獨佔模式下,實現自定義的同步組件。

通過上面的描述,可以看出 AQS 中的兩大核心是同步狀態和雙向的同步隊列,來看一下源碼中是如何對它們進行定義的:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
 static final class Node {
  volatile int waitStatus;
  volatile Node prev;
  volatile Node next;
  volatile Thread thread;
  //...
 }
 private transient volatile Node head;
 private transient volatile Node tail;
 private volatile int state;
 //...
}

下面針對這兩個核心內容分別進行研究。

同步隊列

AQS 內部靜態類Node用於表示同步隊列中的節點,變量表示的意義如下:

每個節點的prevnext指針在加入隊列的時候進行賦值,通過這些指針就形成了一個雙向列表,另外 AQS 還保存了同步隊列的頭節點head和尾節點tail,通過這樣的結構,就能夠通過頭節點或尾節點,找到隊列中的任何一個節點。使用圖來表示同步隊列的結構如下:

另外可以看到,在源碼中爲了保證可見性,同步器中的headtailstate,以及節點中的prevnext屬性都加了關鍵字volatile修飾。

同步狀態

AQS 的另一核心同步狀態,在代碼中是使用int類型的變量state來表示的,通過原子操作修改同步狀態的值,來實現對同步組件的狀態進行修改。在子類中,主要通過 AQS 提供的下面 3 個方法對同步狀態的訪問和轉換進行操作:

線程會試圖修改state的值,如果修改成功那麼表示線程得到或釋放了同步狀態,如果失敗就會將當前線程封裝成一個Node節點,然後將其加入到同步隊列中,並阻塞當前線程。

設計思想

AQS 的設計使用了模板方法的設計模式,模板方法一般在父類中封裝不變的部分(如算法骨架),把擴展的可變部分交給子類進行擴展,子類的執行結果會影響父類的結果,是一種反向的控制結構。AQS 中應用了這種設計模式,將一部分方法交給子類進行重寫,而自定義的同步組件在調用同步器提供的模板方法(父類中的方法)時,又會調用子類重寫的方法。

以 AQS 類中常用於獲取鎖的acquire方法爲例,它的代碼如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire方法被final修飾,不可以在子類中重寫,因爲它是對外提供的模板方法,有相對具體和固定的執行邏輯。在acquire方法中調用了tryAcquire方法:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

可以看到帶有protected修飾的tryAcquire方法是一個空殼方法,並沒有定義實際獲取同步狀態的邏輯,這就需要我們在繼承 AQS 的子類中對齊進行重寫,從而達到擴展目的。在重寫過程中,就會用到上面提到的獲取和修改同步狀態的 3 個方法getStatesetStatecompareAndSetState

ReentrantLock中的方法調用爲例,當調用ReentrantLock中的lock方法時,會調用繼承了 AQS 的內部類Sync的父類中的acquire方法,acquire方法再調用子類SynctryAcquire方法並返回boolean類型結果。

除了tryAcquire方法外,子類中還提供了其他可以重寫的方法,列出如下:

而我們在實現自定義的同步組件時,可以直接調用 AQS 提供的下面這些模板方法:

從模板方法中可以看出,大多方法都是獨佔模式和共享模式對稱出現的,除去查詢等待線程方法外,可以將他們分爲兩類:獨佔式獲取或釋放同步狀態、共享式獲取或釋放同步狀態,並且它們的核心都是acquirerelease方法,其他方法只是在它們實現的基礎上做了部分的邏輯改動,增加了中斷和超時功能的支持。下面對主要的 4 個方法進行分析。

2、源碼分析

acquire

分析上面acquire方法中源碼的執行流程:

  1. 首先調用tryAcquire嘗試獲取同步狀態,如果獲取成功,那麼直接返回

  2. 如果獲取同步狀態失敗,調用addWaiter方法生成新Node節點並加入同步隊列:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

方法中使用當前線程和等待狀態構造了一個新的Node節點,在同步隊列的隊尾節點不爲空的情況下(說明同步隊列非空),調用compareAndSetTail方法以 CAS 的方式把新節點設置爲同步隊列的隊尾節點。如果隊尾節點爲空或添加新節點失敗,則調用enq方法:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在同步隊列爲空的情況下,會先創建一個新的空節點作爲頭節點,然後通過 CAS 的方式將當前線程創建的Node設爲尾節點。在 for 循環中,只有通過 CAS 將節點插入到隊尾後纔會返回,否則就會重複循環,通過這樣的方式,能夠將併發添加節點的操作變爲串行添加,保證了線程的安全性。這一過程可以使用下圖表示:

  1. 添加新節點完成後,調用acquireQueued方法,嘗試以自旋的方式獲取同步狀態:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

當新添加的Node的前驅節點是同步隊列的頭節點並且嘗試獲取同步狀態成功時,線程將Node設爲頭節點並從自旋中退出,否則調用shouldParkAfterFailedAcquire方法判斷是否需要掛起:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

在該方法中,傳入的第一個Node類型的參數是當前節點的前驅節點,對其等待狀態進行判斷:

當返回爲true時,調用parkAndCheckInterrupt方法:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

在方法內部調用了LockSupportpark方法,阻塞當前線程,並返回當前線程是否被中斷的狀態。

在上面的代碼中,各節點通過自旋的方式檢測自己的前驅節點是否頭節點的過程,可用下圖表示:

  1. 當滿足條件,返回acquire方法後,調用selfInterrupt方法。方法內部使用interrupt方法,喚醒被阻塞的線程,繼續向下執行:
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

最後,使用流程圖的方式總結acquire方法獨佔式獲取鎖的整體流程:

release

acquire方法對應,release方法負責獨佔式釋放同步狀態,流程也相對簡單。在ReentrantLock中,unlock方法就是直接調用的 AQS 的release方法。先來直接看一下它的源碼:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  1. 方法中首先調用子類重寫的tryRelease方法,嘗試釋放當前線程持有的同步狀態,如果成功則向下執行,失敗返回false

  2. 如果同步隊列的頭節點不爲空且等待狀態不爲初始狀態,那麼將調用unparkSuccessor方法喚醒它的後繼節點:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

方法主要實現的功能有:

同步隊列新頭節點的設置過程如下圖所示:

在上面的過程中,採用的是從後向前遍歷尋找未取消節點的方式,這是因爲 AQS 的同步隊列是一個弱一致性的雙向列表,在下面的情況中,存在next指針爲null的情況:

acquireShared

在瞭解了獨佔式獲取同步狀態後,再來看一下共享式獲取同步狀態。在共享模式下,允許多個線程同時獲取到同步狀態,來看一下它的源碼:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

首先調用子類重寫的tryAcquireShared方法,返回值爲int類型,如果值大於等於 0 表示獲取同步狀態成功,那麼直接返回。如果小於 0 表示獲取失敗,執行下面的doAcquireShared方法,將線程放入等待隊列使用自旋嘗試獲取,直到獲取同步狀態成功:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

對上面的代碼進行簡要的解釋:

  1. 調用addWaiter方法,封裝新節點,並以共享模式(Node.SHARED)將節點放入同步隊列中隊尾

  2. for循環中,獲取當前節點的前驅節點,如果前驅節點是同步隊列的頭節點,那麼就以共享模式去嘗試獲取同步狀態,判斷tryAcquireShared的返回值,如果返回值大於等於 0,表示獲取同步狀態成功,修改新的頭節點,並將信息傳播給同步隊列中的後繼節點,然後檢查中斷標誌位,如果線程被阻塞,那麼進行喚醒

  3. 如果前驅節點不是頭節點、或獲取同步狀態失敗時,調用shouldParkAfterFailedAcquire判斷是否需要阻塞,如果需要則調用parkAndCheckInterrupt,在前驅節點的等待狀態爲SIGNAL時,將節點對應的線程阻塞

可以看到,共享式的獲取同步狀態的調用過程和acquire方法非常相似,但不同的是在獲取同步狀態成功後,會調用setHeadAndPropagate方法進行共享式同步狀態的傳播:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

因爲共享式同步狀態是允許多個線程共享的,所以在一個線程獲取到同步狀態後,需要在第一時間通知後繼節點的線程可以嘗試獲取同步資源,這樣就可以避免其他線程阻塞時間過長。在方法中,把當前節點設置爲頭節點後,需要根據情況判斷後繼節點是否需要釋放:

如果滿足上面的任何一種狀態,並且它的後繼節點是SHARED狀態的,則執行doReleaseShared方法釋放後繼節點:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
}

doReleaseShared方法不僅在這裏的共享狀態傳播的情況下被調用,還會在後面介紹的共享式釋放同步狀態中被調用。在方法中,當頭節點不爲空且不等於尾節點(意味着沒有後繼節點需要等待喚醒)時:

通過上面的流程,就實現了從頭節點嘗試向後喚醒節點,實現了共享狀態的向後傳播。

releaseShared

最後,再來看一下對應的共享式釋放同步狀態方法:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

releaseShared方法會釋放指定量的資源,如果調用子類重寫的tryReleaseShared方法返回值爲true,表示釋放成功,那麼還是執行上面介紹過的doReleaseShared方法喚醒同步隊列中的等待線程。

3、自定義同步組件

在前面的介紹中說過,在使用 AQS 時,需要定義一個子類繼承AbstractQueuedSynchronizer抽象類,並實現它的抽象方法來管理同步狀態。接下來我們就來手寫一個獨佔式的鎖,按照文檔中的推薦,我們將子類定義爲自定義同步工具類的靜態內部類:

public class MyLock {
    private static class AqsHelper extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            int state = getState();
            if (state == 0) {
                if (compareAndSetState(0, arg)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            } else if (getExclusiveOwnerThread() == Thread.currentThread()) {
                setState(getState() + arg);
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            int state = getState() - arg;
            if (state == 0) {
                setExclusiveOwnerThread(null);
                setState(state);
                return true;
            }
            setState(state);
            return false;
        }
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }
    
    private final AqsHelper aqsHelper = new AqsHelper();
    public void lock() {
        aqsHelper.acquire(1);
    }
    public boolean tryLock() {
        return aqsHelper.tryAcquire(1);
    }
    public void unlock() {
        aqsHelper.release(1);
    }
    public boolean isLocked() {
        return aqsHelper.isHeldExclusively();
    }
}

在 AQS 的子類中,首先重寫了tryAcquire方法,在方法中利用 CAS 來修改state的狀態值,並在修改成功時設置當前線程獨佔資源。並且通過比較嘗試獲取鎖的線程與持有鎖的線程是否相同的方式,來實現了鎖的可重入性。在重寫的tryRelease方法中,進行資源的釋放,如果存在重入的情況,會一直到所有重入鎖釋放完纔會真正的釋放鎖,並放棄佔有狀態。

可以注意到在自定義的鎖工具類中,我們定義了locktryLock兩個方法,分別調用了acquiretryAcquire方法,它們的區別是lock會等待鎖資源,直到成功時纔會返回,而tryLock嘗試獲取鎖時,會立即返回成功或失敗的狀態。

接下來,我們通過下面的測試代碼,驗證自定義的鎖的有效性:

public class Test {
    private MyLock lock=new MyLock();
    private int i=0;
    public void sayHi(){
        try {
            lock.lock();
            System.out.println("i am "+i++);
        }finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        Test test=new Test();
        Thread[] th=new Thread[20];
        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                test.sayHi();
            }).start();
        }
    }
}

運行上面的測試代碼,結果如下,可以看見通過加鎖保證了對變量i的同步訪問控制:

接下來通過下面的例子測試鎖的可重入性:

public class Test2 {
    private MyLock lock=new MyLock();
    public void function1(){
        lock.lock();
        System.out.println("execute function1");
        function2();
        lock.unlock();
    }
    public void function2(){
        lock.lock();
        System.out.println("execute function2");
        lock.unlock();
    }
    public static void main(String[] args) {
        Test2 test2=new Test2();
        new Thread(()->{
            test2.function1();
        }).start();
    }
}

執行上面的代碼,可以看到在function1未釋放鎖的情況下,function2對鎖進行了重入並執行了後續的代碼:

總結

通過上面的學習,我們瞭解了 AQS 的兩大核心同步隊列和同步狀態,並對 AQS 對資源的管理以及隊列狀態的變化有了一定的研究。其實歸根結底,AQS 只是提供給我們來開發同步組件的一個底層框架,在它的層面上,並不關心子類在繼承它時要實現什麼功能,AQS 只是提供了一套維護同步狀態的功能,至於要完成什麼樣的一個工具類,這完全是由我們自己去定義的。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/T9iGjgxxuRujOQt3YSVUHA