16 圖,一個 state 竟然搞出了這麼多併發鎖
上篇文章扔掉源碼,15 張圖帶你徹底理解 java AQS 通過 15 張圖講解了 AQS 管程模型中入口等待隊列原理。AQS 使用 FIFO 隊列實現了一個鎖相關的併發器模板,可以基於這個模板來實現各種鎖。JDK 建議併發鎖工具類使用內部類實現 AQS 的同步屬性。
今天我們就來聊一聊基於 AQS 實現的各種鎖。
1 ReentrantLock
我們先來看一下 UML 類圖:
從圖中可以看到,ReentrantLock 使用抽象內部類 Sync 來實現了 AQS 的方法,然後基於 Sync 這個同步器實現了公平鎖和非公平鎖。主要實現了下面 3 個方法:
-
tryAcquire(int arg):獲取獨佔鎖
-
tryRelease(int arg):釋放獨佔鎖
-
isHeldExclusively:當前線程是否佔有獨佔鎖
ReentrantLock 默認實現的是非公平鎖,可以在構造函數指定。
從實現的方法可以看到,ReentrantLock 中獲取的鎖是獨佔鎖,我們再來看一下獲取和釋放獨佔鎖的代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
獨佔鎖的特點是調用上面 acquire 方法,傳入的參數是 1。
1.1 獲取公平鎖
獲取鎖首先判斷同步狀態 (state) 的值。
1.1.1 state 等於 0
這說明沒有線程佔用鎖,當前線程如果符合下面兩個條件,就可以獲取到鎖:
- 沒有前任節點,如下圖:
- CAS 的方式更新 state 值 (把 0 更新成 1) 成功。
如果獲取獨佔鎖成功,會更新 AQS 中 exclusiveOwnerThread 爲當前線程,這個很容易理解。
1.1.2 state 不等於 0
這說明已經有線程佔有鎖,判斷佔有鎖的線程是不是當前線程,如下圖:
state += 1 值如果小於 0,會拋出異常。
如果獲取鎖失敗,則進入 AQS 隊列等待喚醒。
1.2 獲取非公平鎖
跟公平鎖相比,非公平鎖的唯一不同是如果判斷到 state 等於 0,不用判斷有沒有前任節點,只要 CAS 設置 state 值 (把 0 更新成 1) 成功,就獲取到了鎖。
1.3 釋放鎖
公平鎖和非公平鎖,釋放邏輯完全一樣,都是在內部類 Sync 中實現的。釋放鎖需要注意兩點,如下圖:
爲什麼 state 會大於 1,因爲是可以重入的,佔有鎖的線程可以多次獲取鎖。
1.4 總結
公平鎖的特點是每個線程都要進行排隊,不用擔心線程永遠獲取不到鎖,但有個缺點是每個線程入隊後都需要阻塞和被喚醒,這一定程度上影響了效率。非公平鎖的特點是每個線程入隊前都會先嚐試獲取鎖,如果獲取成功就不會入隊了,這比公平鎖效率高。但也有一個缺點,隊列中的線程有可能等待很長時間,高併發下甚至可能永遠獲取不到鎖。
2 ReentrantReadWriteLock
我們先來看一下 UML 類圖:
從圖中可以看到,ReentrantReadWriteLock 使用抽象內部類 Sync 來實現了 AQS 的方法,然後基於 Sync 這個同步器實現了公平鎖和非公平鎖。主要實現了下面 3 個方法:
-
tryAcquire(int arg):獲取獨佔鎖
-
tryRelease(int arg):釋放獨佔鎖
-
tryAcquireShared(int arg):獲取共享鎖
-
tryReleaseShared(int arg):釋放共享鎖
-
isHeldExclusively:當前線程是否佔有獨佔鎖
可見 ReentrantReadWriteLock 裏面同時用到了共享鎖和獨佔鎖。
下圖是定義的幾個常用變量:
下面這 2 個方法用戶獲取共享鎖和獨佔鎖的數量:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
從 sharedCount 可以看到,共享鎖的數量要右移 16 位獲取,也就是說共享鎖佔了高 16 位。從上圖 EXCLUSIVE_MASK 的定義看到,跟 EXCLUSIVE_MASK 進行與運算,得到的是低 16 位的值,所以獨佔鎖佔了低 16 位。如下圖:
這樣上面獲取鎖數量的方法就很好理解了。參考 1[1]
2.1 讀鎖
讀鎖的實現對應內部類 ReadLock。
2.1.1 獲取讀鎖
獲取讀鎖實際上是 ReadLock 調用了 AQS 的下面方法, 傳入參數是 1:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
ReentrantReadWriteLock 內部類 Sync 實現了 tryAcquireShared 方法,主要包括如下三種情況:
-
使用 exclusiveCount 方法查看 state 中是否有獨佔鎖,如果有並且獨佔線程不是當前線程,返回 - 1,獲取失敗。
-
使用 sharedCount 查看 state 中共享鎖數量,如果讀鎖數量小於最大值 (MAX_COUNT=65535),則再滿足下面 3 個條件就可以獲取成功並返回 1:
- 當前線程不需要阻塞 (readerShouldBlock)。在公平鎖中,需要判斷是否有前置節點,如下圖就需要阻塞:
在非公平鎖中,則是判斷第一個節點是不是有獨佔鎖,如下圖就需要阻塞:
- 使用 CAS 把 state 的值加 SHARED_UNIT(65536)。
這裏是不是就更理解讀鎖佔高位的說法了,獲取一個讀鎖,state 的值就要加 SHARED_UNIT 這麼多個。
- 給當前線程的 holdCount 加 1。
- 如果 2 失敗,自旋,重複上面的步驟直到獲取到鎖。
tryAcquireShared(獲取共享鎖) 會返回一個整數,如下:
返回負數:獲取鎖失敗。
返回 0:獲取鎖成功但是之後再由線程來獲取共享鎖時就會失敗。
返回正數: 獲取鎖成功而且之後再有線程來獲取共享鎖時也可能會成功。
2.1.2 釋放讀鎖
ReentrantReadWriteLock 釋放讀鎖是在 ReadLock 中調用了 AQS 下面方法,傳入的參數是 1:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
ReentrantReadWriteLock 內部類 Sync 實現了 releaseShared 方法,具體邏輯分爲下面兩步:
-
當前線程 holdCounter 值減 1。
-
CAS 的方式將 state 的值減去 SHARED_UNIT。
2.2 寫鎖
寫鎖的實現對應內部類 WriteLock。
2.2.1 獲取寫鎖
ReentrantReadWriteLock 獲取寫鎖其實是在 WriteLock 中調用了 AQS 的下面方法,傳入參數 1:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在 ReentrantReadWriteLock 內部類 Sync 實現了 tryAcquire 方法,首先獲取 state 值和獨佔鎖數量 (exclusiveCount), 之後分如下兩種情況,如下圖:
- state 不等於 0:
-
獨佔鎖數量等於 0,這時說明有線程佔用了共享鎖,如果當前線程不是獨佔線程,獲取鎖失敗。
-
獨佔鎖數量不等於 0,獨佔鎖數量加 1 後大於 MAX_COUNT,獲取鎖失敗。
-
上面 2 種情況不符合,獲取鎖成功,state 值加 1。
- state 等於 0,判斷當前線程是否需要阻塞 (writerShouldBlock)。
在公平鎖中,跟 readerShouldBlock 的邏輯完全一樣,就是判斷隊列中 head 節點的後繼節點是不是當前線程。在非公平鎖中,直接返回 false,即可以直接嘗試獲取鎖。
如果當前線程不需要阻塞,並且給 state 賦值成功,使用 CAS 方式把 state 值加 1,把獨佔線程置爲當前線程。
2.2.2 釋放寫鎖
ReentrantReadWriteLock 釋放寫鎖其實是在 WriteLock 中調用了 AQS 的下面方法,傳入參數 1:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantReadWriteLock 在 Sync 中實現了 tryRelease(arg) 方法,邏輯如下:
-
判斷當前線程是不是獨佔線程,如果不是,拋出異常。
-
state 值減 1 後,用新 state 值判斷獨佔鎖數量是否等於 0
-
如果等於 0,則把獨佔線程置爲空,返回 true,這樣上面的代碼就可以喚醒隊列中的後置節點了
-
如果不等於 0,返回 false,不喚醒後繼節點。
3 CountDownLatch
我們先來看一下 UML 類圖:
從上面的圖中看出,CountDownLatch 的內部類 Sync 實現了獲取共享鎖和釋放共享鎖的邏輯。
使用 CountDownLatch 時,構造函數會傳入一個 int 類型的參數 count,表示調動 count 次的 countDown 後主線程纔可以被喚醒。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
上面的 Sync(count) 就是將 AQS 中的 state 賦值爲 count。
3.1 await
CountDownLatch 的 await 方法調用了 AQS 中的 acquireSharedInterruptibly(int arg),傳入參數 1,不過這個參數並沒有用。代碼如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Sync 中實現了 tryAcquireShared 方法,await 邏輯如下圖:
上面的自旋過程就是等待 state 的值不斷減小,只有 state 值成爲 0 的時候,主線程纔會跳出自旋執行之後的邏輯。
3.2 countDown
CountDownLatch 的 countDown 方法調用了 AQS 的 releaseShared(int arg),傳入參數 1,不過這個參數並沒有用。內部類 Sync 實現了 tryReleaseShared 方法,邏輯如下圖:
3.3 總結
CountDownLatch 的構造函數入參值會賦值給 state 變量,入隊操作是主線程入隊,每個子線程調用了 countDown 後 state 值減 1,當 state 值成爲 0 後喚醒主線程。
4 Semaphore
Semaphore 是一個信號量,用來保護共享資源。如果線程要訪問共享資源,首先從 Semaphore 獲取鎖 (信號量),如果信號量的計數器等於 0,則當前線程進入 AQS 隊列阻塞等待。否則,線程獲取鎖成功,信號量減 1。使用完共享資源後,釋放鎖 (信號量加 1)。
Semaphore 跟管程模型不一樣的是,允許多個 (構造函數的 permits) 線程進入管程內部,因此也常用它來做限流。
UML 類圖如下:
Semaphore 的構造函數會傳入一個 int 類型參數,用來初始化 state 的值。
4.1 acquire
獲取鎖的操作調用了 AQS 中的 acquireSharedInterruptibly 方法,傳入參數 1,代碼見 CountDownLatch 中 await 小節。Semaphore 在公平鎖和非公平鎖中分別實現了 tryAcquireShared 方法。
4.1.1 公平鎖
Semaphore 默認使用非公平鎖,如果使用公平鎖,需要在構造函數指定。獲取公平鎖邏輯比較簡單,如下圖:
4.1.2 非公平鎖
acquire 在非公平的鎖唯一的區別就是不會判斷 AQS 隊列是否有前置節點 (hasQueuedPredecessors),而是直接嘗試獲取鎖。
除了 acquire 方法外,還有其他幾個獲取鎖的方法,原理類似,只是調用了 AQS 中的不同方法。
4.2 release
釋放鎖的操作調用了 AQS 中的 releaseShared(int arg) 方法,傳入參數 1,在內部類 Sync 中實現了 tryReleaseShared 方法,邏輯很簡單:使用 CAS 的方式將 state 的值加 1,之後喚醒隊列中的後繼節點。
5 ThreadPoolExecutor
ThreadPoolExecutor 中也用到了 AQS,看下面的 UML 類圖:
Worker 主要在 ThreadPoolExecutor 中斷線程的時候使用。Worker 自己實現了獨佔鎖,在中斷線程時首先進行加鎖,中斷操作後釋放鎖。按照官方說法,這裏不直接使用 ReentrantLock 的原因是防止調用控制線程池的方法 (類似 setCorePoolSize) 時能夠重新獲取到鎖,
5.1 tryAcquire
使用 CAS 的方式把 AQS 中 state 從 0 改爲 1,把當前線程置爲獨佔線程。
5.2 tryRelease
把獨佔線程置爲空,把 AQS 中 state 改爲 0。
Worker 初始化的時候會把 state 置爲 - 1,這樣是不能獲取鎖成功的。只有調用了 runWorker 方法,纔會通過釋放鎖操作把 state 更爲 0。這樣保證了只中斷運行中的線程,而不會中斷等待中的線程。
6 總結
AQS 基於雙向隊列實現了入口等待隊列,基於 state 變量實現了各種併發鎖,上篇文章講了入口等待隊列,而這篇文章主要講了基於 AQS 的併發鎖原理。
在管程模型中,還有一塊兒沒有介紹,就是條件等待隊列,請看下篇。 ·············· END ··············
感謝閱讀,如果對你有幫助,請點個再看。歡迎大家加我微信,圍觀朋友圈,做點贊之交,一起進步。想要進技術交流羣的朋友,加我微信回覆進羣。
參考資料
參考 1: https://blog.csdn.net/fxkcsdn/article/details/82217760
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/RNSb6BdPsT38RBIrJucSaw