一文看懂 JUC 之 AQS 機制
本文來源:http://m9q.net/mw03n
爲了解決原子性的問題,Java 加入了鎖機制,同時保證了可見性和順序性。JDK1.5 的併發包中新增了 Lock 接口以及相關實現類來實現鎖功能,比 synchronized 更加靈活,開發者可根據實際的場景選擇相應的實現類。本文注重講解其不同衍生類的使用場景以及其內部 AQS 的原理。併發問題引入以及 synchronized 相關的知識請看上一篇文章一文 一文看懂 Java 鎖機制。
Lock 特性
可重入
像 synchronized 和 ReentrantLock 都是可重入鎖,可重入性表明了鎖的分配機制是基於線程的分配,而不是基於方法調用的分配。
舉個簡單的例子,當一個線程已經獲取到鎖,當後續再獲取同一個鎖,直接獲取成功。但獲取鎖和釋放鎖必須要成對出現。
可響應中斷
當線程因爲獲取鎖而進入阻塞狀態,外部是可以中斷該線程的,調用方通過捕獲 InterruptedException 可以捕獲中斷
可設置超時時間
獲取鎖時,可以指定超時時間,可以通過返回值來判斷是否成功獲取鎖
公平性
提供公平性鎖和非公平鎖(默認)兩種選擇。
-
公平鎖,線程將按照他們發出請求的順序來獲取鎖,不允許插隊;
-
非公平鎖,則允許插隊:當一個線程發生獲取鎖的請求的時刻,如果這個鎖是可用的,那這個線程將跳過所在隊列裏等待線程並獲得鎖。
考慮這麼一種情況:A 線程持有鎖,B 線程請求這個鎖,因此 B 線程被掛起;A 線程釋放這個鎖時,B 線程將被喚醒,因此再次嘗試獲取鎖;與此同時,C 線程也請求獲取這個鎖,那麼 C 線程很可能在 B 線程被完全喚醒之前獲得、使用以及釋放這個鎖。這是種雙贏的局面,B 獲取鎖的時刻(B 被喚醒後才能獲取鎖)並沒有推遲,C 更早地獲取了鎖,並且吞吐量也獲得了提高。在大多數情況下,非公平鎖的性能要高於公平鎖的性能。
另外,這個公平性是針對線程而言的,不能依賴此來實現業務上的公平性,應該由開發者自己控制,比如通過 FIFO 隊列來保證公佈。
讀寫鎖
允許讀鎖和寫鎖分離,讀鎖與寫鎖互斥,但是多個讀鎖可以共存,適用於讀頻次遠大於寫頻次的場景
豐富的 API
提供了多個方法來獲取鎖相關的信息,可以幫助開發者監控和排查問題
isFair() // 判斷鎖是否是公平鎖 isLocked() // 判斷鎖是否被任何線程獲取了 isHeldByCurrentThread() // 判斷鎖是否被當前線程獲取了 hasQueuedThreads() // 判斷是否有線程在等待該鎖 getHoldCount() // 查詢當前線程佔有 lock 鎖的次數 getQueueLength() // 獲取正在等待此鎖的線程數
鎖的使用
ReentrantLock
獨佔鎖的實現,擁有上面列舉的除讀寫鎖之外的所有特性,使用比較簡單
class X {
// 創建獨佔鎖實例
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
// 必須要釋放鎖,unlock與lock成對出現
lock.unlock()
}
}
}
ReentrantReadWriteLock
讀寫鎖的實現,擁有上面列舉的所有特性。並且寫鎖可降級爲讀鎖,反之不行。
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
StampedLock
StampedLock 也是一種讀寫鎖,提供兩種讀模式:樂觀讀和悲觀讀。樂觀讀允許讀的過程中也可以獲取寫鎖後寫入!這樣一來,我們讀的數據就可能不一致,所以,需要一點額外的代碼來判斷讀的過程中是否有寫入。
樂觀鎖的意思就是樂觀地估計讀的過程中大概率不會有寫入,因此被稱爲樂觀鎖。反過來,悲觀鎖則是讀的過程中拒絕有寫入,也就是寫入必須等待。顯然樂觀鎖的併發效率更高,但一旦有小概率的寫入導致讀取的數據不一致,需要能檢測出來,再讀一遍就行。
public class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 獲取寫鎖
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 釋放寫鎖
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 獲得一個樂觀讀鎖
// 注意下面兩行代碼不是原子操作
// 假設x,y = (100,200)
double currentX = x;
// 此處已讀取到x=100,但x,y可能被寫線程修改爲(300,400)
double currentY = y;
// 此處已讀取到y,如果沒有寫入,讀取是正確的(100,200)
// 如果有寫入,讀取是錯誤的(100,400)
if (!stampedLock.validate(stamp)) { // 檢查樂觀讀鎖後是否有其他寫鎖發生
stamp = stampedLock.readLock(); // 獲取一個悲觀讀鎖
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 釋放悲觀讀鎖
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
Condition
Condition 成爲條件隊列或條件變量,爲一個線程掛起執行(等待)提供了一種方法,直到另一線程通知某些狀態條件現在可能爲真爲止。由於對該共享狀態信息的訪問發生在不同的線程中,因此必須由互斥鎖對其其進行保護。
await 方法:必須在獲取鎖之後的調用,表示釋放當前鎖,阻塞當前線程;等待其他線程調用鎖的 signal 或 signalAll 方法,線程喚醒重新獲取鎖。
Lock 配合 Condition,可以實現 synchronized 與 對象(wait,notify)同樣的效果,來進行線程間基於共享變量的通信。但優勢在於同一個鎖可以由多個條件隊列,當某個條件滿足時,只需要喚醒對應的條件隊列即可,避免無效的競爭。
// 此類實現類似阻塞隊列(ArrayBlockingQueue)
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
BlockingQueue
BlockingQueue 阻塞隊列實際上是一個生產者 / 消費者模型,當隊列長度大於指定的最大值,生產線程就會被阻塞;反之當隊列元素爲空時,消費線程就會被阻塞;同時當消費成功時,就會喚醒阻塞的生產者線程;生產成功就會喚醒消費者線程;
內部使用就是 ReentrantLock + Condition 來實現的,可以參照上面的示例。
CountDownLatch
稱之爲倒計時器鎖,初始化指定數值,調用 countDown 可以對數值減一,當數值減爲 0 時,就會喚醒所有因爲調用 await 方法而阻塞的線程。
可以達到一組線程等待另外一組線程都完成任務的效果。
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
CyclicBarrier
稱之爲同步屏障,它使得一組線程互相等待,直到到達某個公共屏障點。
初始化指定數值,調用 await 方法會使得線程阻塞,直到指定數量的線程都調用 await 方法時,所有被阻塞的線程會被喚醒,繼續執行。
與 CountDownLatch 的區別是,CountDownLatch 是一組線程等待另外一組線程,而 CyclicBarrier 是一組線程之間相互等待。
Semaphore
稱之爲信號量,與互斥鎖 ReentrantLock 用法類似,區別就是 Semaphore 共享的資源是多個,允許多個線程同時競爭成功。
AQS 原理
AQS 是 AbstractQueuedSynchronizer 的縮寫,中文 抽象隊列同步器,是構建各類鎖和同步器的基礎實現。內部維護了共享變量 state (int 類型) 和 雙向隊列 (包含頭指針和尾指針)
併發問題解決
原子性
Unsafe.compareAndSwapXXX 實現 CAS 更改 state 和 隊列指針 內部依賴 CPU 提供的原子指令
可見性與有序性
volatile 修飾 state 與 隊列指針 (prev/next/head/tail)
線程阻塞與喚醒
Unsafe.park Unsafe.parkNanos Unsafe.unpark
“
Unsafe 類是在 sun.misc 包下,不屬於 Java 標準。提供了內存管理、對象實例化、數組操作、CAS 操作、線程掛起與恢復等功能,Unsafe 類提升了 Java 運行效率,增強了 Java 語言底層的操作能力。很多 Java 的基礎類庫,包括一些被廣泛使用的高性能開發庫都是基於 Unsafe 類開發的,比如 Netty、Cassandra、Hadoop、Kafka 等
”
AQS 內部有兩種模式:獨佔模式和共享模式
AQS 的設計是基於模板方法的,使用者需要繼承 AQS 並重寫指定的方法。不同的自定義同步器爭用共享資源的方式不同,比如可重入、公平性等都是子類來實現。
自定義同步器在實現時只需要實現共享資源 state 的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊 / 喚醒出隊等),由 AQS 內部處理。
獨佔模式
-
只有一個線程都能夠獲取到鎖
-
鎖釋放後需要喚醒後繼節點
AQS 提供的獨佔模式相關的方法
// 獲取獨佔鎖(線程阻塞直至獲取成功)
public final void acquire(int)
// 獲取獨佔鎖,可被中斷
public final void acquireInterruptibly(int)
// 獲取獨佔鎖,可被中斷 和 指定超時時間
public final boolean tryAcquireNanos(int, long)
// 釋放獨佔鎖(釋放鎖後,將等待隊列中第一個等待節點喚醒 )
public final boolean release(int)
AQS 子類需要實現的獨佔模式相關的方法
// 嘗試獲取獨佔鎖
protected boolean tryAcquire(int)
// 嘗試釋放獨佔鎖
protected boolean tryRelease(int)
獲取獨佔鎖的流程
-
調用子類 tryAcquire 嘗試獲取鎖,獲取成功,直接返回
-
通過自旋 CAS 將當前線程封裝成節點加入隊列末尾
-
循環等待或嘗試 tryAcquire 獲取鎖
-
判斷前置節點如果爲 head,則嘗試獲取鎖
-
根據隊列中節點狀態,決定是否需要阻塞當前線程
-
tryAcquire 獲取鎖成功後,將當前節點設置爲 head 並 返回
-
如果當前線程中斷或超時,則執行
cancelAcquire
-
將當前節點狀態置爲 CANCELED,並從隊列刪除
-
如果前置節點爲 Head,則將後置節點喚醒
釋放獨佔鎖的流程
共享模式
-
多個線程都能夠獲取到鎖
-
鎖釋放後需要喚醒後繼節點
-
鎖獲取後如果還有資源需要喚醒後繼共享節點
AQS 提供的共享模式相關的方法
// 獲取共享鎖(線程阻塞直至獲取成功)
public final void acquireShared(int)
// 獲取共享鎖,可被中斷
public final acquireSharedInterruptibly(int)
// 獲取共享鎖,可被中斷 和 指定超時時間
public final tryAcquireSharedNanos(int, long)
// 獲取共享鎖
public final boolean releaseShared(int)
AQS 子類需要實現的共享模式相關的方法
// 嘗試獲取共享鎖
protected int tryAcquireShared(int)
// 嘗試釋放共享鎖
protected boolean tryReleaseShared(int)
獲取共享鎖的流程
-
調用子類 tryAcquireShared 嘗試獲取鎖,獲取成功,直接返回
-
通過自旋 CAS 將當前線程封裝成節點加入隊列末尾
-
循環等待或嘗試 tryAcquireShared 獲取鎖
-
如果資源有剩餘或者原先的 head 節點狀態爲 SIGNAL/PROPAGATE,則調用 doReleaseShared
-
如果當前 head 節點狀態爲 SIGNAL,喚醒後繼節點
-
如果當前 head 節點狀態爲 ZERO,將 head 節點狀態置爲 PROPAGATE
-
判斷前置節點如果爲 head,則嘗試獲取鎖
-
根據隊列中節點狀態,決定是否需要阻塞當前線程
-
tryAcquireShared 獲取鎖成功後,將當前節點設置爲 head
-
如果當前線程中斷或超時,則執行
cancelAcquire
-
將當前節點狀態置爲 CANCELED,並從隊列刪除
-
如果前置節點爲 Head,則將後置節點喚醒
釋放共享鎖的流程
等待隊列中節點的狀態變化
ReentrantLock 示例
tryAcquire 邏輯
tryRelease 邏輯
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/HEylBNG8-uIHrUwDFE8GYA