重磅出擊,20 張圖帶你徹底瞭解 ReentrantLock 加鎖解鎖的原理
哈嘍大家好,我是阿 Q。
最近是上班忙項目,下班帶娃,忙的不可開交,連摸魚的時間都沒有了。今天趁假期用圖解的方式從源碼角度給大家說一下ReentrantLock
加鎖解鎖的全過程。繫好安全帶,發車了。
簡單使用
在聊它的源碼之前,我們先來做個簡單的使用說明。當我在IDEA
中創建了一個簡單的Demo
之後,它會給出以下提示
提示文字
在使用阻塞等待獲取鎖的方式中,必須在try
代碼塊之外,並且在加鎖方法與try
代碼塊之間沒有任何可能拋出異常的方法調用,避免加鎖成功後,在finally
中無法解鎖。
-
1、如果在
lock
方法與try
代碼塊之間的方法調用拋出異常,那麼無法解鎖,造成其它線程無法成功獲取鎖。 -
2、如果
lock
方法在try
代碼塊之內,可能由於其它方法拋出異常,導致在finally
代碼塊中,unlock
對未加鎖的對象解鎖,它會調用AQS
的tryRelease
方法(取決於具體實現類),拋出IllegalMonitorStateException
異常。 -
3、在
Lock
對象的lock
方法實現中可能拋出unchecked
異常,產生的後果與說明二相同。
java.concurrent.LockShouldWithTryFinallyRule.rule.desc
還舉了兩個例子,正確案例如下:
Lock lock = new XxxLock();
// ...
lock.lock();
try {
doSomething();
doOthers();
} finally {
lock.unlock();
}
錯誤案例如下:
Lock lock = new XxxLock();
// ...
try {
// 如果在此拋出異常,會直接執行 finally 塊的代碼
doSomething();
// 不管鎖是否成功,finally 塊都會執行
lock.lock();
doOthers();
} finally {
lock.unlock();
}
AQS
上邊的案例中加鎖調用的是lock()
方法,解鎖用的是unlock()
方法,而通過查看源碼發現它們都是調用的內部靜態抽象類Sync
的相關方法。
abstract static class Sync extends AbstractQueuedSynchronizer
Sync
是通過繼承AbstractQueuedSynchronizer
來實現的,沒錯,AbstractQueuedSynchronizer
就是AQS
的全稱。AQS
內部維護着一個FIFO
的雙向隊列(CLH
),ReentrantLock
也是基於它來實現的,先來張圖感受下。
Node 屬性
//此處是 Node 的部分屬性
static final class Node {
//排他鎖標識
static final Node EXCLUSIVE = null;
//如果帶有這個標識,證明是失效了
static final int CANCELLED = 1;
//具有這個標識,說明後繼節點需要被喚醒
static final int SIGNAL = -1;
//Node對象存儲標識的地方
volatile int waitStatus;
//指向上一個節點
volatile Node prev;
//指向下一個節點
volatile Node next;
//當前Node綁定的線程
volatile Thread thread;
//返回前驅節點即上一個節點,如果前驅節點爲空,拋出異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
對於裏邊的waitStatus
屬性,我們需要做個解釋:(非常重要)
-
CANCELLED(1):當前節點取消獲取鎖。當等待超時或被中斷 (響應中斷),會觸發變更爲此狀態,進入該狀態後節點狀態不再變化;
-
SIGNAL(-1):後面節點等待當前節點喚醒;
-
CONDITION(-2):
Condition
中使用,當前線程阻塞在Condition
,如果其他線程調用了Condition
的signal
方法,這個結點將從等待隊列轉移到同步隊列隊尾,等待獲取同步鎖; -
PROPAGATE(-3):共享模式,前置節點喚醒後面節點後,喚醒操作無條件傳播下去;
-
0:中間狀態,當前節點後面的節點已經喚醒,但是當前節點線程還沒有執行完成;
AQS 屬性
// 頭結點
private transient volatile Node head;
// 尾結點
private transient volatile Node tail;
//0->1 拿到鎖,大於0 說明當前已經有線程佔用了鎖資源
private volatile int state;
今天我們先簡單瞭解下AQS
的構造以幫助大家更好的理解ReentrantLock
,至於深層次的東西先不做展開!
加鎖
對AQS
的結構有了基本瞭解之後,我們正式進入主題——加鎖。從源碼中可以看出鎖被分爲公平鎖和非公平鎖。
/**
* 公平鎖代碼
*/
final void lock() {
acquire(1);
}
/**
* 非公平鎖代碼
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
初步查看代碼發現非公平鎖似乎包含公平鎖的邏輯,所以我們就從 “非公平鎖” 開始。
非公平鎖
final void lock() {
//通過 CAS 的方式嘗試將 state 從0改爲1,
//如果返回 true,代表修改成功,獲得鎖資源;
//如果返回false,代表修改失敗,未獲取鎖資源
if (compareAndSetState(0, 1))
// 將屬性exclusiveOwnerThread設置爲當前線程,該屬性是AQS的父類提供的
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState()
:底層調用的是unsafe
的compareAndSwapInt
,該方法是原子操作;
假設有兩個線程(t1
、t2
)在競爭鎖資源,線程 1 獲取鎖資源之後,執行setExclusiveOwnerThread
操作,設置屬性值爲當前線程t1
此時,當t2
想要獲取鎖資源,調用lock()
方法之後,執行compareAndSetState(0, 1)
返回false
,會走else
執行acquire()
方法。
方法查看
public final void accquire(int arg) {
// tryAcquire 再次嘗試獲取鎖資源,如果嘗試成功,返回true,嘗試失敗返回false
if (!tryAcquire(arg) &&
// 走到這,代表獲取鎖資源失敗,需要將當前線程封裝成一個Node,追加到AQS的隊列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 線程中斷
selfInterrupt();
}
accquire()
中涉及的方法比較多,我們將進行拆解,一個一個來分析,順序:tryAcquire() -> addWaiter() -> acquireQueued()
查看 tryAcquire() 方法
//AQS中
protected boolean tryAcquire(int arg) {
//AQS 是基類,具體實現在自己的類中實現,我們去查看“非公平鎖”中的實現
throw new UnsupportedOperationException();
}
//ReentrantLock 中
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 獲取當前線程
final Thread current = Thread.currentThread();
//獲取AQS 的 state
int c = getState();
// 如果 state 爲0,代表嘗試再次獲取鎖資源
if (c == 0) {
// 步驟同上:通過 CAS 的方式嘗試將 state 從0改爲1,
//如果返回 true,代表修改成功,獲得鎖資源;
//如果返回false,代表修改失敗,未獲取鎖資源
if (compareAndSetState(0, acquires)) {
//設置屬性爲當前線程
setExclusiveOwnerThread(current);
return true;
}
}
//當前佔有鎖資源的線程是否是當前線程,如果是則證明是可重入操作
else if (current == getExclusiveOwnerThread()) {
//將 state + 1
int nextc = c + acquires;
//爲什麼會小於 0 呢?因爲最大值 + 1 後會將符號位的0改爲1 會變成負數(可參考Integer.MAX_VALUE + 1)
if (nextc < 0) // overflow
//加1後小於0,超出鎖可重入的最大值,拋異常
throw new Error("Maximum lock count exceeded");
//設置 state 狀態
setState(nextc);
return true;
}
return false;
}
因爲線程 1 已經獲取到了鎖,此時state
爲 1,所以不走nonfairTryAcquire()
的if
。又因爲當前是線程 2,不是佔有當前鎖的線程 1,所以也不會走else if
,即tryAcquire()
方法返回false
。
查看 addWaiter() 方法
走到本方法中,代表獲取鎖資源失敗。addWaiter()
將沒有獲取到鎖資源的線程甩到隊列的尾部。
private Node addWaiter(Node mode) {
//創建 Node 類,並且設置 thread 爲當前線程,設置爲排它鎖
Node node = new Node(Thread.currentThread(), mode);
// 獲取 AQS 中隊列的尾部節點
Node pred = tail;
// 如果 tail == null,說明是空隊列,
// 不爲 null,說明現在隊列中有數據,
if (pred != null) {
// 將當前節點的 prev 指向剛纔的尾部節點,那麼當前節點應該設置爲尾部節點
node.prev = pred;
// CAS 將 tail 節點設置爲當前節點
if (compareAndSetTail(pred, node)) {
// 將之前尾節點的 next 設置爲當前節點
pred.next = node;
// 返回當前節點
return node;
}
}
enq(node);
return node;
}
當tail
不爲空,即隊列中有數據時,我們來圖解一下pred!=null
代碼塊中的代碼。初始化狀態如下,pred
指向尾節點,node
指向新的節點。
node.prev = pred;
將node
的前驅節點設置爲pred
指向的節點
compareAndSetTail(pred, node)
通過CAS
的方式嘗試將當前節點node
設置爲尾結點,此處我們假設設置成功,則FIFO
隊列的tail
指向node
節點。
pred.next = node;
將pred
節點的後繼節點設置爲node
節點,此時node
節點成功進入FIFO
隊列尾部。
而當pred
爲空,即隊列中沒有節點或將node
節點設置爲尾結點失敗時,會走enq()
方法。我們列舉的例子就符合pred
爲空的情況,就讓我們以例子爲基礎繼續分析吧。
//現在沒人排隊,我是第一個 || 前邊CAS失敗也會進入這個位置重新往隊列尾巴去塞
private Node enq(final Node node) {
//死循環
for (;;) {
//重新獲取tail節點
Node t = tail;
// 沒人排隊,隊列爲空
if (t == null) {
// 初始化一個 Node 爲 head,而這個head 沒有意義
if (compareAndSetHead(new Node()))
// 將頭尾都指向了這個初始化的Node,第一次循環結束
tail = head;
} else {
// 有人排隊,往隊列尾巴塞
node.prev = t;
// CAS 將 tail 節點設置爲當前節點
if (compareAndSetTail(t, node)) {
//將之前尾節點的 next 設置爲當前節點
t.next = node;
return t;
}
}
}
}
進入死循環,首先會走if
方法的邏輯,通過CAS
的方式嘗試將一個新節點設置爲head
節點,然後將tail
也指向新節點。可以看出隊列中的頭節點只是個初始化的節點,沒有任何意義。
繼續走死循環中的代碼,此時t
不爲null
,所以會走else
方法。將node
的前驅節點指向t
,通過CAS
方式將當前節點node
設置爲尾結點,然後將t
的後繼節點指向node
。此時線程 2 的節點就被成功塞入FIFO
隊列尾部。
查看 acquireQueued() 方法
將已經在隊列中的node
嘗試去獲取鎖否則掛起。
final boolean acquireQueued(final Node node, int arg) {
// 獲取鎖資源的標識,失敗爲 true,成功爲 false
boolean failed = true;
try {
// 線程中斷的標識,中斷爲 true,不中斷爲 false
boolean interrupted = false;
for (;;) {
// 獲取當前節點的上一個節點
final Node p = node.predecessor();
//p爲頭節點,嘗試獲取鎖操作
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
// 將獲取鎖失敗標識置爲false
failed = false;
// 獲取到鎖資源,不會被中斷
return interrupted;
}
// p 不是 head 或者 沒拿到鎖資源,
if (shouldParkAfterFailedAcquire(p, node) &&
// 基於 Unsafe 的 park方法,掛起線程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這裏又出現了一次死循環,首先獲取當前節點的前驅節點 p,如果 p 是頭節點 (頭節點沒有意義),說明node
是head
後的第一個節點,此時當前獲取鎖資源的線程 1 可能會釋放鎖,所以線程 2 可以再次嘗試獲取鎖。
假設獲取成功,證明拿到鎖資源了,將node
節點設置爲head
節點,並將node
節點的pre
和thread
設置爲null
。因爲拿到鎖資源了,node
節點就不需要排隊了。
將頭節點 p 的next
置爲null
,此時 p 節點就不在隊列中存在了,可以幫助GC
回收 (可達性分析)。failed
設置爲false
,表明獲取鎖成功;interrupted
爲false
,則線程不會中斷。
如果 p 不是head
節點或者沒有拿到鎖資源,會執行下邊的代碼,因爲我們的線程 1 沒有釋放鎖資源,所以線程 2 獲取鎖失敗,會繼續往下執行。
//該方法的作用是保證上一個節點的waitStatus狀態爲-1(爲了喚醒後繼節點)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取上一個節點的狀態,該狀態爲-1,纔會喚醒下一個節點。
int ws = pred.waitStatus;
// 如果上一個節點的狀態是SIGNAL即-1,可以喚醒下一個節點,直接返回true
if (ws == Node.SIGNAL)
return true;
// 如果上一個節點的狀態大於0,說明已經失效了
if (ws > 0) {
do {
// 將node 的節點與 pred 的前一個節點相關聯,並將前一個節點賦值給 pred
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 一直找到小於等於0的
// 將重新標識好的最近的有效節點的 next 指向當前節點
pred.next = node;
} else {
// 小於等於0,但是不等於-1,將上一個有效節點狀態修改爲-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
只有節點的狀態爲 - 1,纔會喚醒後一個節點,如果節點狀態未設置,默認爲 0。
圖解一下ws>0
的過程,因爲ws>0
的節點爲失效節點,所以do...while
中會重複向前查找前驅節點,直到找到第一個ws<=0
的節點爲止,將node
節點掛到該節點上。
我們的pred
是頭結點且未設置狀態,所以狀態爲 0,會走else
。通過CAS
嘗試將pred
節點的waitStatus
設置爲 - 1,表明node
節點需要被pred
喚醒。
shouldParkAfterFailedAcquire()
返回false
,繼續執行acquireQueued()
中的死循環。
步驟和上邊一樣,node
的前驅節點還是head
,繼續嘗試獲取鎖。如果線程 1 釋放了鎖,線程 2 就可以拿到,返回true
;否則繼續調用shouldParkAfterFailedAcquire()
,因爲上一步已經將前驅結點的ws
設置爲 - 1 了,所以直接返回true
。
執行parkAndCheckInterrupt()
方法,通過UNSAFE.park();
方法阻塞當前線程 2。等以後執行unpark
方法的時候,如果node
是頭節點後的第一個節點,會進入acquireQueued()
方法中走if (p == head && tryAcquire(arg))
的邏輯獲取鎖資源並結束死循環。
查看 cancelAcquire() 方法
該方法執行的機率約等於 0,爲什麼這麼說呢?因爲針對failed
屬性,只有JVM
內部出現問題時,纔可能出現異常,執行該方法。
// node 爲當前節點
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 上一個節點
Node pred = node.prev;
// 節點狀態大於0,說明節點失效
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 將第一個不是失效節點的後繼節點聲明出來
Node predNext = pred.next;
// 節點狀態變爲失效
node.waitStatus = Node.CANCELLED;
// node爲尾節點,cas設置pred爲尾節點
if (node == tail && compareAndSetTail(node, pred)) {
//cas將pred的next設置爲null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 中間節點
// 如果上一個節點不是head 節點
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
// 前邊已經判斷了大於0的操作,
// pred 是需要喚醒後繼節點的,所以當 waitStatus 不爲 -1 時,需要將 pred 節點的 waitStatus 設置爲 -1
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// CAS 嘗試將 pred 的 next 指向當前節點的 next
compareAndSetNext(pred, predNext, next);
} else {
// head 節點,喚醒後繼節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
執行到while
時找到前驅節點中最近的有效節點,把當前節點node
掛到有效節點後邊,可以過濾掉當前節點前的失效節點。聲明出有效節點的第一個後繼無效節點predNext
,並把當前的node
節點狀態設置爲失效狀態。
if
中的操作:如果當前節點是尾節點,CAS
嘗試將最近的有效節點設置爲尾節點,並將尾節點的next
設置爲null
。
else
中的操作:
如果pred
節點不是頭結點即中間節點,並且pred
的waitStatus
爲 - 1 或者waitStatus<=0
,爲了讓pred
節點能喚醒後繼節點,需要設置爲 - 1,並且pred
節點的線程不爲空。獲取node
節點的後繼節點,如果後繼節點有效,CAS
嘗試將pred
的next
指向node
節點的next
。
當其他節點來找有效節點的時候走當前node
的prev
這條線,而不是再一個一個往前找,可以提高效率。
如果是頭結點則喚醒後繼節點。
最後將node
節點的next
指向自己。
解鎖
釋放鎖是不區分公平鎖和非公平鎖的,釋放鎖的核心是將state
由大於 0 的數置爲 0。廢話不多說,直接上代碼
//釋放鎖方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//嘗試釋放鎖資源,如果釋放成功,返回true
if (tryRelease(arg)) {
Node h = head;
// head 不爲空且 head 的 ws 不爲0(如果爲0,代表後邊沒有其他線程掛起)
if (h != null && h.waitStatus != 0)
// AQS的隊列中有 node 在排隊,並且線程已經掛起
// 需要喚醒被掛起的 Node
unparkSuccessor(h);
return true;
}
// 代表釋放一次沒有完全釋放
return false;
}
如果釋放鎖成功,需要獲取head
節點。如果頭結點不爲空且waitStatus
不爲 0,則證明有node
在排隊,執行喚醒掛起其他node
的操作。
查看 tryRelease() 方法
protected final boolean tryRelease(int releases) {
//獲取當前鎖的狀態,先進行減1操作,代表釋放一次鎖資源
int c = getState() - releases;
//如果釋放鎖的線程不是佔用鎖的線程,直接拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果 c 爲0 ,代表鎖完全釋放了,如果不爲0,代表鎖之前重入了,一次沒釋放掉,等待下次再次執行時,再次判斷
if (c == 0) {
// 釋放鎖標誌爲 true,代表完全釋放了
free = true;
// 將佔用互斥鎖的標識置爲 null
setExclusiveOwnerThread(null);
}
// 設置 state 狀態
setState(c);
return free;
}
我們的例子中線程 1 佔用鎖資源,線程 1 釋放鎖之後,state
爲 0。進入if
操作,將釋放標誌更新爲true
,將FIFO
隊列的exclusiveOwnerThread
標誌置爲null
。
查看 unparkSuccessor() 方法
用於喚醒AQS
中被掛起的線程。
// 注意當前的 node 節點是 head 節點
private void unparkSuccessor(Node node) {
//獲取 head 的狀態
int ws = node.waitStatus;
if (ws < 0)
// CAS 將 node 的 ws 設置爲0,代表當前 node 接下來會捨棄
compareAndSetWaitStatus(node, ws, 0);
// 獲取頭節點的下一個節點
Node s = node.next;
// 如果下一個節點爲null 或者 下一個節點爲失效節點,需要找到離 head 最近的有效node
if (s == null || s.waitStatus > 0) {
s = null;
// 從尾節點開始往前找不等於null且不是node的節點
for (Node t = tail; t != null && t != node; t = t.prev)
// 如果該節點有效,則將s節點指向t節點
if (t.waitStatus <= 0)
s = t;
}
// 找到最近的node後,直接喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
問題解析:爲什麼要從尾結點往前查找呢?
因爲在addWaiter
方法中是先給prev
指針賦值,最後纔將上一個節點的next
指針賦值,爲了避免防止丟失節點或者跳過節點,必須從後往前找。
我們舉例中head
節點的狀態爲-1
,通過CAS
的方式將head
節點的waitStatus
設置爲 0。
我們的頭結點的後繼節點是線程 2 所在的節點,不爲null
,所以這邊會執行unpark
操作,從上邊的acquireQueued()
內的parkAndCheckInterrupt()
方法繼續執行。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//返回目標線程是否中斷的布爾值:中斷返回true,不中斷返回false,且返回後會重置中斷狀態爲未中斷
return Thread.interrupted();
}
因爲線程 2 未中斷,所以返回false
。繼續執行acquireQueued()
中的死循環
for (;;) {
// 獲取當前節點的上一個節點
final Node p = node.predecessor();
//p爲頭節點,嘗試獲取鎖操作
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
// 將獲取鎖失敗標識置爲false
failed = false;
// 獲取到鎖資源,不會被中斷
return interrupted;
}
// p 不是 head 或者 沒拿到鎖資源,
if (shouldParkAfterFailedAcquire(p, node) &&
// 基於 Unsafe 的 park方法,掛起線程
parkAndCheckInterrupt())
interrupted = true;
}
此時 p 是頭節點,且能獲取鎖成功,將exclusiveOwnerThread
設置爲線程 2,即線程 2 獲取鎖資源。
將node
節點設置爲head
節點,並將node
節點的pre
和thread
設置爲null
。因爲拿到鎖資源了,node
節點就不需要排隊了。
將頭節點 p 的next
置爲null
,此時 p 節點就不在隊列中存在了,可以幫助GC
回收 (可達性分析)。failed
設置爲false
,表明獲取鎖成功;interrupted
爲false
,則線程不會中斷。
爲什麼被喚醒的線程要調用 Thread.interrupted() 清除中斷標記
從上邊的方法可以看出,當parkAndCheckInterrupt()
方法返回true
時,即Thread.interrupted()
方法返回了true
,也就是該線程被中斷了。爲了讓被喚醒的線程繼續執行後續獲取鎖的操作,就需要讓中斷的線程像沒有被中斷過一樣繼續往下執行,所以在返回中斷標記的同時要清除中斷標記,將其設置爲false
。
清除中斷標記之後不代表該線程不需要中斷了,所以在parkAndCheckInterrupt()
方法返回true
時,要自己設置一箇中斷標誌interrupted = true
,爲的就是當獲取到鎖資源執行完相關的操作之後進行中斷補償,故而需要執行selfInterrupt()
方法中斷線程。
以上就是我們加鎖解鎖的圖解過程了。最後我們再來說一下公平鎖和非公平鎖的區別。
區別
前邊已經說過了,似乎非公平鎖包含了公平鎖的全部操作。打開公平鎖的代碼,我們發現accquire()
方法中只有該方法的實現有點區別。
hasQueuedPredecessors()
返回false
時纔會嘗試獲取鎖資源。該方法代碼實現如下
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
-
h==t
時,隊列爲空,表示沒人排隊,可以獲取鎖資源; -
隊列不爲空,頭結點有後繼節點不爲空且 s 節點獲取鎖的線程是當前線程也可以獲取鎖資源,代表鎖重入操作;
總結
以上就是我們的全部內容了,我們在最後再做個總結:
-
代碼使用要合乎規範,避免加鎖成功後,在
finally
中無法解鎖; -
理解
AQS
的FIFO
隊列以及Node
的相關屬性,尤其注意waitStatus
的狀態; -
利用圖加深對非公平鎖源碼的理解;
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/3tqBo47GtG3ljdrig2b8AA