重磅出擊,20 張圖帶你徹底瞭解 ReentrantLock 加鎖解鎖的原理
哈嘍大家好,我是阿 Q。
最近是上班忙項目,下班帶娃,忙的不可開交,連摸魚的時間都沒有了。今天趁假期用圖解的方式從源碼角度給大家說一下ReentrantLock加鎖解鎖的全過程。繫好安全帶,發車了。
簡單使用
在聊它的源碼之前,我們先來做個簡單的使用說明。當我在IDEA中創建了一個簡單的Demo之後,它會給出以下提示
提示文字
在使用阻塞等待獲取鎖的方式中,必須在try代碼塊之外,並且在加鎖方法與try代碼塊之間沒有任何可能拋出異常的方法調用,避免加鎖成功後,在finally中無法解鎖。
-
1、如果在
lock方法與try代碼塊之間的方法調用拋出異常,那麼無法解鎖,造成其它線程無法成功獲取鎖。 -
2、如果
lock方法在try代碼塊之內,可能由於其它方法拋出異常,導致在finally代碼塊中,unlock對未加鎖的對象解鎖,它會調用AQS的tryRelease方法(取決於具體實現類),拋出IllegalMonitorStateException異常。 -
3、在
Lock對象的lock方法實現中可能拋出unchecked異常,產生的後果與說明二相同。
java.concurrent.LockShouldWithTryFinallyRule.rule.desc
還舉了兩個例子,正確案例如下:
Lock lock = new XxxLock();
// ...
lock.lock();
try {
doSomething();
doOthers();
} finally {
lock.unlock();
}
錯誤案例如下:
Lock lock = new XxxLock();
// ...
try {
// 如果在此拋出異常,會直接執行 finally 塊的代碼
doSomething();
// 不管鎖是否成功,finally 塊都會執行
lock.lock();
doOthers();
} finally {
lock.unlock();
}
AQS
上邊的案例中加鎖調用的是lock()方法,解鎖用的是unlock()方法,而通過查看源碼發現它們都是調用的內部靜態抽象類Sync的相關方法。
abstract static class Sync extends AbstractQueuedSynchronizer
Sync 是通過繼承AbstractQueuedSynchronizer來實現的,沒錯,AbstractQueuedSynchronizer就是AQS的全稱。AQS內部維護着一個FIFO的雙向隊列(CLH),ReentrantLock也是基於它來實現的,先來張圖感受下。
Node 屬性
//此處是 Node 的部分屬性
static final class Node {
//排他鎖標識
static final Node EXCLUSIVE = null;
//如果帶有這個標識,證明是失效了
static final int CANCELLED = 1;
//具有這個標識,說明後繼節點需要被喚醒
static final int SIGNAL = -1;
//Node對象存儲標識的地方
volatile int waitStatus;
//指向上一個節點
volatile Node prev;
//指向下一個節點
volatile Node next;
//當前Node綁定的線程
volatile Thread thread;
//返回前驅節點即上一個節點,如果前驅節點爲空,拋出異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
對於裏邊的waitStatus屬性,我們需要做個解釋:(非常重要)
-
CANCELLED(1):當前節點取消獲取鎖。當等待超時或被中斷 (響應中斷),會觸發變更爲此狀態,進入該狀態後節點狀態不再變化;
-
SIGNAL(-1):後面節點等待當前節點喚醒;
-
CONDITION(-2):
Condition中使用,當前線程阻塞在Condition,如果其他線程調用了Condition的signal方法,這個結點將從等待隊列轉移到同步隊列隊尾,等待獲取同步鎖; -
PROPAGATE(-3):共享模式,前置節點喚醒後面節點後,喚醒操作無條件傳播下去;
-
0:中間狀態,當前節點後面的節點已經喚醒,但是當前節點線程還沒有執行完成;
AQS 屬性
// 頭結點
private transient volatile Node head;
// 尾結點
private transient volatile Node tail;
//0->1 拿到鎖,大於0 說明當前已經有線程佔用了鎖資源
private volatile int state;
今天我們先簡單瞭解下AQS的構造以幫助大家更好的理解ReentrantLock,至於深層次的東西先不做展開!
加鎖
對AQS的結構有了基本瞭解之後,我們正式進入主題——加鎖。從源碼中可以看出鎖被分爲公平鎖和非公平鎖。
/**
* 公平鎖代碼
*/
final void lock() {
acquire(1);
}
/**
* 非公平鎖代碼
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
初步查看代碼發現非公平鎖似乎包含公平鎖的邏輯,所以我們就從 “非公平鎖” 開始。
非公平鎖
final void lock() {
//通過 CAS 的方式嘗試將 state 從0改爲1,
//如果返回 true,代表修改成功,獲得鎖資源;
//如果返回false,代表修改失敗,未獲取鎖資源
if (compareAndSetState(0, 1))
// 將屬性exclusiveOwnerThread設置爲當前線程,該屬性是AQS的父類提供的
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState():底層調用的是unsafe的compareAndSwapInt,該方法是原子操作;
假設有兩個線程(t1、t2)在競爭鎖資源,線程 1 獲取鎖資源之後,執行setExclusiveOwnerThread操作,設置屬性值爲當前線程t1
此時,當t2想要獲取鎖資源,調用lock()方法之後,執行compareAndSetState(0, 1)返回false,會走else執行acquire()方法。
方法查看
public final void accquire(int arg) {
// tryAcquire 再次嘗試獲取鎖資源,如果嘗試成功,返回true,嘗試失敗返回false
if (!tryAcquire(arg) &&
// 走到這,代表獲取鎖資源失敗,需要將當前線程封裝成一個Node,追加到AQS的隊列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 線程中斷
selfInterrupt();
}
accquire()中涉及的方法比較多,我們將進行拆解,一個一個來分析,順序:tryAcquire() -> addWaiter() -> acquireQueued()
查看 tryAcquire() 方法
//AQS中
protected boolean tryAcquire(int arg) {
//AQS 是基類,具體實現在自己的類中實現,我們去查看“非公平鎖”中的實現
throw new UnsupportedOperationException();
}
//ReentrantLock 中
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 獲取當前線程
final Thread current = Thread.currentThread();
//獲取AQS 的 state
int c = getState();
// 如果 state 爲0,代表嘗試再次獲取鎖資源
if (c == 0) {
// 步驟同上:通過 CAS 的方式嘗試將 state 從0改爲1,
//如果返回 true,代表修改成功,獲得鎖資源;
//如果返回false,代表修改失敗,未獲取鎖資源
if (compareAndSetState(0, acquires)) {
//設置屬性爲當前線程
setExclusiveOwnerThread(current);
return true;
}
}
//當前佔有鎖資源的線程是否是當前線程,如果是則證明是可重入操作
else if (current == getExclusiveOwnerThread()) {
//將 state + 1
int nextc = c + acquires;
//爲什麼會小於 0 呢?因爲最大值 + 1 後會將符號位的0改爲1 會變成負數(可參考Integer.MAX_VALUE + 1)
if (nextc < 0) // overflow
//加1後小於0,超出鎖可重入的最大值,拋異常
throw new Error("Maximum lock count exceeded");
//設置 state 狀態
setState(nextc);
return true;
}
return false;
}
因爲線程 1 已經獲取到了鎖,此時state爲 1,所以不走nonfairTryAcquire()的if。又因爲當前是線程 2,不是佔有當前鎖的線程 1,所以也不會走else if,即tryAcquire()方法返回false。
查看 addWaiter() 方法
走到本方法中,代表獲取鎖資源失敗。addWaiter()將沒有獲取到鎖資源的線程甩到隊列的尾部。
private Node addWaiter(Node mode) {
//創建 Node 類,並且設置 thread 爲當前線程,設置爲排它鎖
Node node = new Node(Thread.currentThread(), mode);
// 獲取 AQS 中隊列的尾部節點
Node pred = tail;
// 如果 tail == null,說明是空隊列,
// 不爲 null,說明現在隊列中有數據,
if (pred != null) {
// 將當前節點的 prev 指向剛纔的尾部節點,那麼當前節點應該設置爲尾部節點
node.prev = pred;
// CAS 將 tail 節點設置爲當前節點
if (compareAndSetTail(pred, node)) {
// 將之前尾節點的 next 設置爲當前節點
pred.next = node;
// 返回當前節點
return node;
}
}
enq(node);
return node;
}
當tail不爲空,即隊列中有數據時,我們來圖解一下pred!=null代碼塊中的代碼。初始化狀態如下,pred指向尾節點,node指向新的節點。
node.prev = pred;將node的前驅節點設置爲pred指向的節點
compareAndSetTail(pred, node)通過CAS的方式嘗試將當前節點node設置爲尾結點,此處我們假設設置成功,則FIFO隊列的tail指向node節點。
pred.next = node;將pred節點的後繼節點設置爲node節點,此時node節點成功進入FIFO隊列尾部。
而當pred爲空,即隊列中沒有節點或將node節點設置爲尾結點失敗時,會走enq()方法。我們列舉的例子就符合pred爲空的情況,就讓我們以例子爲基礎繼續分析吧。
//現在沒人排隊,我是第一個 || 前邊CAS失敗也會進入這個位置重新往隊列尾巴去塞
private Node enq(final Node node) {
//死循環
for (;;) {
//重新獲取tail節點
Node t = tail;
// 沒人排隊,隊列爲空
if (t == null) {
// 初始化一個 Node 爲 head,而這個head 沒有意義
if (compareAndSetHead(new Node()))
// 將頭尾都指向了這個初始化的Node,第一次循環結束
tail = head;
} else {
// 有人排隊,往隊列尾巴塞
node.prev = t;
// CAS 將 tail 節點設置爲當前節點
if (compareAndSetTail(t, node)) {
//將之前尾節點的 next 設置爲當前節點
t.next = node;
return t;
}
}
}
}
進入死循環,首先會走if方法的邏輯,通過CAS的方式嘗試將一個新節點設置爲head節點,然後將tail也指向新節點。可以看出隊列中的頭節點只是個初始化的節點,沒有任何意義。
繼續走死循環中的代碼,此時t不爲null,所以會走else方法。將node的前驅節點指向t,通過CAS方式將當前節點node設置爲尾結點,然後將t的後繼節點指向node。此時線程 2 的節點就被成功塞入FIFO隊列尾部。
查看 acquireQueued() 方法
將已經在隊列中的node嘗試去獲取鎖否則掛起。
final boolean acquireQueued(final Node node, int arg) {
// 獲取鎖資源的標識,失敗爲 true,成功爲 false
boolean failed = true;
try {
// 線程中斷的標識,中斷爲 true,不中斷爲 false
boolean interrupted = false;
for (;;) {
// 獲取當前節點的上一個節點
final Node p = node.predecessor();
//p爲頭節點,嘗試獲取鎖操作
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
// 將獲取鎖失敗標識置爲false
failed = false;
// 獲取到鎖資源,不會被中斷
return interrupted;
}
// p 不是 head 或者 沒拿到鎖資源,
if (shouldParkAfterFailedAcquire(p, node) &&
// 基於 Unsafe 的 park方法,掛起線程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這裏又出現了一次死循環,首先獲取當前節點的前驅節點 p,如果 p 是頭節點 (頭節點沒有意義),說明node是head後的第一個節點,此時當前獲取鎖資源的線程 1 可能會釋放鎖,所以線程 2 可以再次嘗試獲取鎖。
假設獲取成功,證明拿到鎖資源了,將node節點設置爲head節點,並將node節點的pre和thread設置爲null。因爲拿到鎖資源了,node節點就不需要排隊了。
將頭節點 p 的next置爲null,此時 p 節點就不在隊列中存在了,可以幫助GC回收 (可達性分析)。failed設置爲false,表明獲取鎖成功;interrupted爲false,則線程不會中斷。
如果 p 不是head節點或者沒有拿到鎖資源,會執行下邊的代碼,因爲我們的線程 1 沒有釋放鎖資源,所以線程 2 獲取鎖失敗,會繼續往下執行。
//該方法的作用是保證上一個節點的waitStatus狀態爲-1(爲了喚醒後繼節點)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取上一個節點的狀態,該狀態爲-1,纔會喚醒下一個節點。
int ws = pred.waitStatus;
// 如果上一個節點的狀態是SIGNAL即-1,可以喚醒下一個節點,直接返回true
if (ws == Node.SIGNAL)
return true;
// 如果上一個節點的狀態大於0,說明已經失效了
if (ws > 0) {
do {
// 將node 的節點與 pred 的前一個節點相關聯,並將前一個節點賦值給 pred
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 一直找到小於等於0的
// 將重新標識好的最近的有效節點的 next 指向當前節點
pred.next = node;
} else {
// 小於等於0,但是不等於-1,將上一個有效節點狀態修改爲-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
只有節點的狀態爲 - 1,纔會喚醒後一個節點,如果節點狀態未設置,默認爲 0。
圖解一下ws>0的過程,因爲ws>0的節點爲失效節點,所以do...while中會重複向前查找前驅節點,直到找到第一個ws<=0的節點爲止,將node節點掛到該節點上。
我們的pred是頭結點且未設置狀態,所以狀態爲 0,會走else。通過CAS嘗試將pred節點的waitStatus設置爲 - 1,表明node節點需要被pred喚醒。
shouldParkAfterFailedAcquire()返回false,繼續執行acquireQueued()中的死循環。
步驟和上邊一樣,node的前驅節點還是head,繼續嘗試獲取鎖。如果線程 1 釋放了鎖,線程 2 就可以拿到,返回true;否則繼續調用shouldParkAfterFailedAcquire(),因爲上一步已經將前驅結點的ws設置爲 - 1 了,所以直接返回true。
執行parkAndCheckInterrupt()方法,通過UNSAFE.park();方法阻塞當前線程 2。等以後執行unpark方法的時候,如果node是頭節點後的第一個節點,會進入acquireQueued()方法中走if (p == head && tryAcquire(arg))的邏輯獲取鎖資源並結束死循環。
查看 cancelAcquire() 方法
該方法執行的機率約等於 0,爲什麼這麼說呢?因爲針對failed屬性,只有JVM內部出現問題時,纔可能出現異常,執行該方法。
// node 爲當前節點
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 上一個節點
Node pred = node.prev;
// 節點狀態大於0,說明節點失效
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 將第一個不是失效節點的後繼節點聲明出來
Node predNext = pred.next;
// 節點狀態變爲失效
node.waitStatus = Node.CANCELLED;
// node爲尾節點,cas設置pred爲尾節點
if (node == tail && compareAndSetTail(node, pred)) {
//cas將pred的next設置爲null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 中間節點
// 如果上一個節點不是head 節點
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
// 前邊已經判斷了大於0的操作,
// pred 是需要喚醒後繼節點的,所以當 waitStatus 不爲 -1 時,需要將 pred 節點的 waitStatus 設置爲 -1
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// CAS 嘗試將 pred 的 next 指向當前節點的 next
compareAndSetNext(pred, predNext, next);
} else {
// head 節點,喚醒後繼節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
執行到while時找到前驅節點中最近的有效節點,把當前節點node掛到有效節點後邊,可以過濾掉當前節點前的失效節點。聲明出有效節點的第一個後繼無效節點predNext,並把當前的node節點狀態設置爲失效狀態。
if中的操作:如果當前節點是尾節點,CAS嘗試將最近的有效節點設置爲尾節點,並將尾節點的next設置爲null。
else中的操作:
如果pred節點不是頭結點即中間節點,並且pred的waitStatus爲 - 1 或者waitStatus<=0,爲了讓pred節點能喚醒後繼節點,需要設置爲 - 1,並且pred節點的線程不爲空。獲取node節點的後繼節點,如果後繼節點有效,CAS嘗試將pred的next指向node節點的next。
當其他節點來找有效節點的時候走當前node的prev這條線,而不是再一個一個往前找,可以提高效率。
如果是頭結點則喚醒後繼節點。
最後將node節點的next指向自己。
解鎖
釋放鎖是不區分公平鎖和非公平鎖的,釋放鎖的核心是將state由大於 0 的數置爲 0。廢話不多說,直接上代碼
//釋放鎖方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//嘗試釋放鎖資源,如果釋放成功,返回true
if (tryRelease(arg)) {
Node h = head;
// head 不爲空且 head 的 ws 不爲0(如果爲0,代表後邊沒有其他線程掛起)
if (h != null && h.waitStatus != 0)
// AQS的隊列中有 node 在排隊,並且線程已經掛起
// 需要喚醒被掛起的 Node
unparkSuccessor(h);
return true;
}
// 代表釋放一次沒有完全釋放
return false;
}
如果釋放鎖成功,需要獲取head節點。如果頭結點不爲空且waitStatus不爲 0,則證明有node在排隊,執行喚醒掛起其他node的操作。
查看 tryRelease() 方法
protected final boolean tryRelease(int releases) {
//獲取當前鎖的狀態,先進行減1操作,代表釋放一次鎖資源
int c = getState() - releases;
//如果釋放鎖的線程不是佔用鎖的線程,直接拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果 c 爲0 ,代表鎖完全釋放了,如果不爲0,代表鎖之前重入了,一次沒釋放掉,等待下次再次執行時,再次判斷
if (c == 0) {
// 釋放鎖標誌爲 true,代表完全釋放了
free = true;
// 將佔用互斥鎖的標識置爲 null
setExclusiveOwnerThread(null);
}
// 設置 state 狀態
setState(c);
return free;
}
我們的例子中線程 1 佔用鎖資源,線程 1 釋放鎖之後,state爲 0。進入if操作,將釋放標誌更新爲true,將FIFO隊列的exclusiveOwnerThread標誌置爲null。
查看 unparkSuccessor() 方法
用於喚醒AQS中被掛起的線程。
// 注意當前的 node 節點是 head 節點
private void unparkSuccessor(Node node) {
//獲取 head 的狀態
int ws = node.waitStatus;
if (ws < 0)
// CAS 將 node 的 ws 設置爲0,代表當前 node 接下來會捨棄
compareAndSetWaitStatus(node, ws, 0);
// 獲取頭節點的下一個節點
Node s = node.next;
// 如果下一個節點爲null 或者 下一個節點爲失效節點,需要找到離 head 最近的有效node
if (s == null || s.waitStatus > 0) {
s = null;
// 從尾節點開始往前找不等於null且不是node的節點
for (Node t = tail; t != null && t != node; t = t.prev)
// 如果該節點有效,則將s節點指向t節點
if (t.waitStatus <= 0)
s = t;
}
// 找到最近的node後,直接喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
問題解析:爲什麼要從尾結點往前查找呢?
因爲在addWaiter方法中是先給prev指針賦值,最後纔將上一個節點的next指針賦值,爲了避免防止丟失節點或者跳過節點,必須從後往前找。
我們舉例中head節點的狀態爲-1,通過CAS的方式將head節點的waitStatus設置爲 0。
我們的頭結點的後繼節點是線程 2 所在的節點,不爲null,所以這邊會執行unpark操作,從上邊的acquireQueued()內的parkAndCheckInterrupt()方法繼續執行。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//返回目標線程是否中斷的布爾值:中斷返回true,不中斷返回false,且返回後會重置中斷狀態爲未中斷
return Thread.interrupted();
}
因爲線程 2 未中斷,所以返回false。繼續執行acquireQueued()中的死循環
for (;;) {
// 獲取當前節點的上一個節點
final Node p = node.predecessor();
//p爲頭節點,嘗試獲取鎖操作
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
// 將獲取鎖失敗標識置爲false
failed = false;
// 獲取到鎖資源,不會被中斷
return interrupted;
}
// p 不是 head 或者 沒拿到鎖資源,
if (shouldParkAfterFailedAcquire(p, node) &&
// 基於 Unsafe 的 park方法,掛起線程
parkAndCheckInterrupt())
interrupted = true;
}
此時 p 是頭節點,且能獲取鎖成功,將exclusiveOwnerThread設置爲線程 2,即線程 2 獲取鎖資源。
將node節點設置爲head節點,並將node節點的pre和thread設置爲null。因爲拿到鎖資源了,node節點就不需要排隊了。
將頭節點 p 的next置爲null,此時 p 節點就不在隊列中存在了,可以幫助GC回收 (可達性分析)。failed設置爲false,表明獲取鎖成功;interrupted爲false,則線程不會中斷。
爲什麼被喚醒的線程要調用 Thread.interrupted() 清除中斷標記
從上邊的方法可以看出,當parkAndCheckInterrupt()方法返回true時,即Thread.interrupted()方法返回了true,也就是該線程被中斷了。爲了讓被喚醒的線程繼續執行後續獲取鎖的操作,就需要讓中斷的線程像沒有被中斷過一樣繼續往下執行,所以在返回中斷標記的同時要清除中斷標記,將其設置爲false。
清除中斷標記之後不代表該線程不需要中斷了,所以在parkAndCheckInterrupt()方法返回true時,要自己設置一箇中斷標誌interrupted = true,爲的就是當獲取到鎖資源執行完相關的操作之後進行中斷補償,故而需要執行selfInterrupt()方法中斷線程。
以上就是我們加鎖解鎖的圖解過程了。最後我們再來說一下公平鎖和非公平鎖的區別。
區別
前邊已經說過了,似乎非公平鎖包含了公平鎖的全部操作。打開公平鎖的代碼,我們發現accquire()方法中只有該方法的實現有點區別。
hasQueuedPredecessors()返回false時纔會嘗試獲取鎖資源。該方法代碼實現如下
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
-
h==t時,隊列爲空,表示沒人排隊,可以獲取鎖資源; -
隊列不爲空,頭結點有後繼節點不爲空且 s 節點獲取鎖的線程是當前線程也可以獲取鎖資源,代表鎖重入操作;
總結
以上就是我們的全部內容了,我們在最後再做個總結:
-
代碼使用要合乎規範,避免加鎖成功後,在
finally中無法解鎖; -
理解
AQS的FIFO隊列以及Node的相關屬性,尤其注意waitStatus的狀態; -
利用圖加深對非公平鎖源碼的理解;
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/3tqBo47GtG3ljdrig2b8AA