花了 25 分鐘,給小白講 ReentrantLock
Hello World
,我是老田,今天給大家分享併發編程之ReentrantLock
。
剛入行 java 開發或者說在 JDK1.5 之前,遇到併發問題時,第一印象想到的都是synchronized
同步鎖。
關於同步鎖的使用,我們在前面文章中已經聊過。
其中,我們在使用synchronized
的時候,會存在以下幾個問題:
-
synchronized 是不可中斷鎖,需要線程執行完才能釋放鎖。
-
synchronized 是非公平鎖。
-
在 Synchronized 優化以前,synchronized 的性能非常不樂觀,但是自從 Synchronized 引入了偏向鎖,輕量級鎖(自旋鎖)後,性能有所提升。
-
synchronized 的細粒度和靈活度不夠好。
-
....
於是,Doug Lea
大師就搞了個ReentrantLock
,ReentrantLock 誕生於JDK1.5
,位於java.util.concurrent
(簡稱JUC
)包目錄下。
努力從今天開始,成功從 “
零
” 開始
本文內容
大致分爲以下幾個核心知識點:
-
簡單使用
ReentrantLock
-
公平鎖和非公平鎖
-
源碼分析
AQS
中的state
和線程等待隊列(CLH
) -
公平鎖和非公平鎖獲取鎖的方法和不同點
-
超時獲取鎖是如何實現的
-
鎖是怎麼釋放的
文章有點長,因爲涉及到部分源代碼分析。
重點知識點
state、線程同步隊列、CAS
、死循環
簡單使用ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
test();
}
},"線程1").start();
new Thread(new Runnable() {
@Override
public void run() {
test();
}
},"線程2").start();
}
public static void test() {
try {
//獲取鎖
lock.lock();
System.out.println(Thread.currentThread().getName() + "獲取到鎖了");
//業務代碼,使用部分花費100毫秒
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//釋放鎖放在finally中。
lock.unlock();
System.out.println(Thread.currentThread().getName() + "釋放了鎖");
}
}
}
輸出
線程1獲取到鎖了
線程1釋放了鎖
線程2獲取到鎖了
線程2釋放了鎖
效果和 synchronized 的一樣,線程 1 獲取到鎖了,線程 2 需要等待線程 1 釋放鎖後纔可以獲取鎖。
注意
爲了防止鎖不被釋放,從而造成死鎖,所以強烈建議把鎖釋放放在 finally 模塊中。
ReentrantLock
整體介紹
字面意思爲可重入的鎖,顧名思義ReentrantLock
也是可重入鎖。synchronized 也是可重入鎖。
進入java.util.concurrent.locks.ReentrantLock
中,發現ReentrantLock
有三個內部類;
對應 UML 類圖結構如下:
看到了一個熟悉的身影java.util.concurrent.locks.AbstractQueuedSynchronizer
(江湖人簡稱爲AQS
),這就是傳說中的AQS
(同步隊列器)。
我們都知道鎖分爲公平鎖和非公平鎖。你那麼上面類圖中NonfairSync
爲非公平鎖,FairSync
爲公平鎖。
lock 方法和 unlock 方法
private final Sync sync;
//ReentrantLock無參構造方法
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
sync.lock();
}
從無參構造方法中可以得知ReentrantLock
默認是非公平鎖。
進去NonfairSync
中的 lock
//上鎖,使用final修飾,次方法不能被重寫
final void lock() {
//通過cas操作來修改state狀態,表示爭搶鎖的操作
if (compareAndSetState(0, 1))
//設置當前獲得鎖狀態的線程
setExclusiveOwnerThread(Thread.currentThread());
else
//嘗試去獲取鎖
acquire(1);
}
以下三個方法都是AQS
中的。
compareAndSetState();//通過cas操作來修改state狀態,表示爭搶鎖的操作
setExclusiveOwnerThread();//設置當前獲得鎖狀態的線程
acquire();//嘗試去獲取鎖
更別說ReentrantLock
中的unlock
方法了,直接都是從AQS
裏開始的
//解鎖
public void unlock() {
sync.release(1);
}
再到AQS
中 release 方法
//釋放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//空方法留給子類自己去實現
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
沒法聊了,還是先暫時擱置到這裏,因爲公平鎖和非公平鎖都是繼承自java.util.concurrent.locks.AbstractQueuedSynchronizer
。所以我們不得不先說說AQS
,先把AQS
搞清楚了上面的ReentrantLock
方可繼續。
深入AQS
AbstractQueuedSynchronizer
類如其名,抽象的隊列式的同步器,AQS
定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch
等併發工具類,還有阻塞隊列和線程池的實現都有使用到AQS
。因爲是抽象類,就可以想象到很有可能有抽象方法和已經實現好的方法。看源碼中你會發現很多方法是空的,需要子類自己去實現。
它維護了一個 volatile int state(代表共享資源)和一個 FIFO 線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。
state
這裏 volatile 是核心關鍵詞,具體 volatile 的語義,在此不述。state 的訪問方式有三種:
在AQS
中有個很重要的角色
private volatile int state;
//get set
//通過cas操作來修改state狀態,表示爭搶鎖的操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
這段代碼其實邏輯很簡單,就是通過 **CAS
樂觀鎖 ** 的方式來做比較並替換。上面這段代碼的意思是,如果當前內存中的 state 的值和預期值 expect 相等,則替換爲 update,更新成功返回 true,否則返回 false。
這個操作是原子的,不會出現線程安全問題,這裏面涉及到 Unsafe 這個類的操作,一級涉及到 state 這個屬性的意義。關於 Unsafe 類後面會專門寫一篇文章來講它。
細心的朋友估計應該注意到了,前面 lock 和 unlock 方法中涉及到到三個方法,方法參數都是 1。
compareAndSetState(0, 1)
acquire(1);
release(1);
-
當 state=0 時,表示無鎖狀態
-
當 state>0 時,表示已經有線程獲得了鎖,也就是 state=1。但是因爲
ReentrantLock
允許重入,所以同一個線程多次獲得同步鎖的時候,state 會遞增,比如重入 5 次,那麼 state=5。而在釋放鎖的時候,同樣需要釋放 5 次直到 state=0 其他線程纔有資格獲得鎖。
注意:
AQS
不同的實現類對於 state 字段的含義是有所差別的,這個點一定要注意。
線程等待隊列
我們繼續來看AQS
中的隊列,該隊列的實現是一個雙向鏈表,被稱爲sync queue
,它表示所有等待鎖的線程的集合,有點類似於我們前面介紹 synchronized 原理的時候說的wait set
。每個 Node 節點保存了當前線程的同步狀態,等待狀態,前驅和後繼節點等。
我們前面說過,在併發編程中使用隊列通常是將當前線程包裝成某種類型的數據結構扔到等待隊列中,我們先來看看隊列中的每一個節點是怎麼個結構:
static final class Node {
// 共享模式下等待的標記
static final Node SHARED = new Node();
// 獨佔模式下等待的標記
static final Node EXCLUSIVE = null;
// 線程的等待狀態 表示線程已經被取消
static final int CANCELLED = 1;
// 線程的等待狀態 表示後繼線程需要被喚醒
static final int SIGNAL = -1;
// 線程的等待狀態 表示線程在Condtion上
static final int CONDITION = -2;
// 表示下一個acquireShared需要無條件的傳播
static final int PROPAGATE = -3;
/**
* SIGNAL: 當前節點的後繼節點處於等待狀態時,如果當前節點的同步狀態被釋放或者取消,
* 必須喚起它的後繼節點
*
* CANCELLED: 一個節點由於超時或者中斷需要在CLH隊列中取消等待狀態,被取消的節點不會再次等待
*
* CONDITION: 當前節點在等待隊列中,只有當節點的狀態設爲0的時候該節點纔會被轉移到同步隊列
*
* PROPAGATE: 下一次的共享模式同步狀態的獲取將會無條件的傳播
* waitStatus的初始值時0,使用CAS來修改節點的狀態
*/
volatile int waitStatus;
/**
* 當前節點的前驅節點,當前線程依賴它來檢查waitStatus,在入隊的時候才被分配,
* 並且只在出隊的時候才被取消(爲了GC),頭節點永遠不會被取消,一個節點成爲頭節點
* 僅僅是成功獲取到鎖的結果,一個被取消的線程永遠也不會獲取到鎖,線程只取消自身,
* 而不涉及其他節點
*/
volatile Node prev;
/**
* 當前節點的後繼節點,當前線程釋放的才被喚起,在入隊時分配,在繞過被取消的前驅節點
* 時調整,在出隊列的時候取消(爲了GC)
* 如果一個節點的next爲空,我們可以從尾部掃描它的prev,雙重檢查
* 被取消節點的next設置爲指向節點本身而不是null,爲了isOnSyncQueue更容易操作
*/
volatile Node next;
//當前節點的線程,初始化後使用,在使用後失效
volatile Thread thread;
/**
* 鏈接到下一個節點的等待條件,或特殊的值SHARED,因爲條件隊列只有在獨佔模式時才能被訪問,
* 所以我們只需要一個簡單的連接隊列在等待的時候保存節點,然後把它們轉移到隊列中重新獲取
* 因爲條件只能是獨佔性的,我們通過使用特殊的值來表示共享模式
*/
Node nextWaiter;
//如果節點處於共享模式下等待直接返回true
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回當前節點的前驅節點,如果爲空,直接拋出空指針異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 用來建立初始化的head 或 SHARED的標記
Node() {
}
// 指定線程和模式的構造方法
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 指定線程和節點狀態的構造方法
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
節點信息說完了,我們繼續說 sync queue,AQS 是怎麼使用這個隊列的呢,既然是雙向鏈表,操縱它自然只需要一個頭結點和一個尾節點:
// 頭結點,不代表任何線程,是一個啞結點
private transient volatile Node head;
// 尾節點,每一個請求鎖的線程會加到隊尾
private transient volatile Node tail;
都是用了 volatile 修飾,以確保多線程間保證字段的可見性。
那麼這個同步隊列大整體就應該是這樣的:
不過這裏有一點我們提前說一下,在 AQS 中的隊列是一個 CLH 隊列,它的 head 節點永遠是一個啞結點(dummy node), 它不代表任何線程(某些情況下可以看做是代表了當前持有鎖的線程),因此 head 所指向的 Node 的 thread 屬性永遠是 null。只有從次頭節點往後的所有節點才代表了所有等待鎖的線程。也就是說,在當前線程沒有搶到鎖被包裝成 Node 扔到隊列中時,即使隊列是空的,它也會排在第二個,我們會在它的前面新建一個 dummy 節點 (具體的代碼我們在後面分析源碼時再詳細講)。爲了便於描述,下文中我們把除去 head 節點的隊列稱作是等待隊列,在這個隊列中的節點才代表了所有等待鎖的線程:
在繼續往下之前我們再對着上圖總結一下 Node 節點各個參數的含義:
-
thread
:表示當前 Node 所代表的線程 -
waitStatus
:表示節點所處的等待狀態,共享鎖模式下只需關注三種狀態:SIGNAL
CANCELLED
初始態(0)
-
prev
next
:節點的前驅和後繼 -
nextWaiter
:進作爲標記,值永遠爲 null,表示當前處於獨佔鎖模式
AQS 中有三個重要的點,state、等待隊列還有
當前持有鎖的線程,注意這個屬性是從AbstractOwnableSynchronizer
繼承而來
private transient Thread exclusiveOwnerThread;
//執行的方法(方法修飾符爲protected,只允許子類調用)
//設置當前鎖被thread線程鎖持有,重入鎖條件之一就是使用這個來判斷當前線程是不是鎖持有的線程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
講完AQS
了的整體,我們繼續回到上面的 lock 方法,我們今天的目標是搞定ReentrantLock
。繼續往下看。
深入 lock 方法
非公平鎖
private final Sync sync;
//ReentrantLock無參構造方法
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
sync.lock();
}
NonfairSync
中 lock 方法
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//上鎖,使用final修飾,次方法不能被重寫
final void lock() {
//方法1:通過cas操作來修改state狀態,表示爭搶鎖的操作
if (compareAndSetState(0, 1))
//方法2:設置當前獲得鎖狀態的線程
setExclusiveOwnerThread(Thread.currentThread());
else
//方法3:嘗試去獲取鎖
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
//nonfairTryAcquire方法是Sync中實現的方法
return nonfairTryAcquire(acquires);
}
}
lock 中的方法 1 和方法 2 在前面我們已經說過。接下來我們來說一下方法 3。
這裏方法 3:acquire
是AQS
中的方法
//注意此時的arg=1
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
acquire 方法中共有四個方法:
tryAcquire//嘗試獲得鎖
addWaiter//添加一個等待節點
acquireQueued//
selfInterrupt
下面對上面上個方法逐個說明
tryAcquire 方法
這個方法在 AQS 中是一個空方法,留個子類自己去實現。上面我們使用的是非公平鎖。所以回到NonfairSync
中
//acquires=1
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
這裏方法nonfairTryAcquire
是 Sync 的方法
//acquires=1
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//把state賦值爲1,exclusiveOwnerThread賦值爲thread2,然後返回true
if (compareAndSetState(0, acquires)) {
//把當前持有鎖的線程設置爲當前線程
setExclusiveOwnerThread(current);
return true;
}
}
//判斷當前持有鎖的線程是不是當前線程
else if (current == getExclusiveOwnerThread()) {
//state = state +1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//把state更新
setState(nextc);
return true;
}
return false;
}
上面這段代碼的邏輯大致爲:
1,獲取當前線程
2,獲取當前 state,因爲 state 是 volatile 修飾的,所不用考慮線程可見性問題。
3,判斷 state==0,表示鎖沒有被持有,把 state 設置成 1,把鎖持有線程設置成當前線程,返回 true 表示以獲取鎖。
4,state!=0,判斷持有鎖的線程是不是當前線程
5,是當前線程,state=state+1,返回 true 表示獲取鎖成功。
現任如果線程 1 進來,執行完第三步就結束了。
addWaiter
方法
線程 1 把鎖持有了,把 state 設置成了 1,這時候線程 2 來執行上面的nonfairTryAcquire
方法返回 false,那麼這時候就會執行addWaiter
方法。把上面的獲取 acquire 方法再貼一次:
//注意此時的arg=1
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
addWaiter(Node.EXCLUSIVE)
,再執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法。下面用一個時序圖來看看當前到哪一步了。
那到底這個addWaiter
方法做了寫什麼呢?
static final Node EXCLUSIVE = null;
//因爲Node.EXCLUSIVE=null,所以mode=null
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//1
//此時tail=null
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊。
enq(node);
return node;
}
enq 方法
private Node enq(final Node node) {
//CAS"自旋",直到成功加入隊尾
for (;;) {
Node t = tail;
if (t == null) {
//CAS"自旋",直到成功加入隊尾
if (compareAndSetHead(new Node()))
tail = head;
} else {//CAS"自旋",直到成功加入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued
方法
該方法由AQS
實現, 這個方法比較複雜, 主要對上面剛加入隊列的 Node 不斷嘗試以下兩種操作之一。
-
在前驅節點就是 head 節點的時候,繼續嘗試獲取鎖
-
將當前線程掛起, 使 CPU 不再調度它
該線程獲取資源失敗,已經被放入等待隊列尾部了。聰明的你立刻應該能想到該線程下一部該幹什麼了吧:進入等待狀態休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。沒錯,就是這樣!是不是跟醫院排隊拿號有點相似~~acquireQueued() 就是幹這件事:在等待隊列中排隊拿號(中間沒其它事幹可以休息),直到拿到號後再返回。這個函數非常關鍵,繼續看源碼:
final boolean acquireQueued(final Node node, int arg) {
//標記是否成功拿到資源
boolean failed = true;
try {
//標記等待過程中是否被中斷過
boolean interrupted = false;
//又是一個“自旋”!
for (;;) {
//拿到前驅
final Node p = node.predecessor();
//如果前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源
//(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
//拿到資源後,將head指向該結點。所以head所指的標杆結點,
//就是當前獲取到資源的那個結點或null。
setHead(node);
// setHead中node.prev已置爲null,此處再將head.next置爲null,
//就是爲了方便GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!
p.next = null;
failed = false; // 成功獲取資源
return interrupted;//返回等待過程中是否被中斷過
}
//如果自己可以休息了,就通過park()進入waiting狀態,直到被unpark()。如果不可中斷的情況下被中斷了,
//那麼會從park()中醒過來,發現拿不到資源,從而繼續進入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待過程中被中斷過,哪怕只有那麼一次,就將interrupted標記爲true
}
} finally {
// 如果等待過程中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),
//那麼取消結點在隊列中的等待。
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire 方法
此方法主要用於檢查狀態,看看自己是否真的可以去休息了,萬一隊列前邊的線程都放棄了只是瞎站着,那也說不定,對吧!
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驅的狀態
if (ws == Node.SIGNAL)
//如果已經告訴前驅拿完號後通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。
* 注意:那些放棄的結點,由於被自己“加塞”到它們前邊,它們相當於形成一個無引用鏈,稍後就會被保安大叔趕走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號後通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整個流程中,如果前驅結點的狀態不是 SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿號。
parkAndCheckInterrup
方法
如果線程找好安全休息點後,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀態。就是傳說中的 park 方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//調用park()使線程進入waiting狀態
return Thread.interrupted();//如果被喚醒,查看自己是不是被中斷的。
}
park() 會讓當前線程進入 waiting 狀態。在此狀態下,有兩種途徑可以喚醒該線程:1)被 unpark();2)被 interrupt()。需要注意的是,Thread.interrupted() 會清除當前線程的中斷標記位。
acquireQueued
方法總結
-
結點進入隊尾後,檢查狀態,找到安全休息點;
-
調用 park() 進入 waiting 狀態,等待 unpark() 或 interrupt() 喚醒自己;
-
被喚醒後,看自己是不是有資格能拿到號。如果拿到,head 指向當前結點,並返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續流程 1。
selfInterrupt
方法
該方法由 AQS 實現, 用於中斷當前線程。由於在整個搶鎖過程中,我們都是不響應中斷的。那如果在搶鎖的過程中發生了中斷怎麼辦呢,總不能假裝沒看見呀。AQS 的做法簡單的記錄有沒有有發生過中斷,如果返回的時候發現曾經發生過中斷,則在退出acquire
方法之前,就調用selfInterrupt
自我中斷一下,就好像將這個發生在搶鎖過程中的中斷 “推遲” 到搶鎖結束以後再發生一樣。
tryAcquire
方法總結
再貼一遍tryAcquire
方法的源碼
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
acquire 的流程
-
調用自定義同步器的
tryAcquire()
嘗試直接去獲取資源,如果成功則直接返回; -
沒成功,則
addWaiter()
將該線程加入等待隊列的尾部,並標記爲獨佔模式; -
acquireQueued()
使線程在等待隊列中休息,有機會時(輪到自己,會被 unpark())會去嘗試獲取資源。獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回 true,否則返回 false。 -
如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷
selfInterrupt()
,將中斷補上。
由於此函數是重中之重,我再用流程圖總結一下:
至此,acquire() 的流程終於算是告一段落了。這也就是ReentrantLock
的lock()
的流程,不信你去看其 lock() 源碼吧,整個函數就是一條 acquire(1)!
不容易滴。搞了半天還只是搞了個 lock 方法。
非公平鎖佔用鎖的的整個流程
公平鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
//直接調用AQS中的acquire方法
acquire(1);
}
//公平鎖和非公平鎖的區別在於公平鎖多方法hasQueuedPredecessors
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//如果當前線程是隊列的最前面或者隊列是空的,則當前線程可以回去鎖
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平鎖和非公平鎖的主要區別:公平鎖會考慮前面有沒有線程在等待隊列裏,就是前面有沒有線程先進來,先來先到。
深入 unlock 方法
釋放鎖和鎖的公平性就沒關係了,繼續在ReentrantLock
中的 unlock 方法
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public void unlock() {
sync.release(1);
}
這裏的 release 方法是AQS
中的 release 方法,此方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即 state=0), 它會喚醒等待隊列裏的其他線程來獲取資源。這也正是 unlock() 的語義,當然不僅僅只限於unlock()
。
以下是 release() 的源碼:
//有點模板方法模式
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//空方法,留給子類去實現
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
這裏的tryRelease
方法是AQS
的子類Sync
,也就是公平鎖和非公平歲的父類實現的
//釋放當前線程佔用的鎖 releases=1
protected final boolean tryRelease(int releases) {
//計算state=state-1
int c = getState() - releases;
//判斷持有鎖的線程是不是當前線程
//不是拋異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果state==0證明本次鎖釋放成功
if (c == 0) {
free = true;
//鎖持有線程設置成null
setExclusiveOwnerThread(null);
}
//把state設置成0
setState(c);
return free;
}
釋放鎖還是蠻簡單的。到此釋放鎖就結束了。
超時獲取鎖
在ReetrantLock
的tryLock(long timeout, TimeUnit unit)
提供了超時獲取鎖的功能。它的語義是在指定的時間內如果獲取到鎖就返回 true,獲取不到則返回 false。這種機制避免了線程無限期的等待鎖釋放。
繼續看看源碼裏是怎麼實現的
//timeout時間長短
//unit時間單位
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
方法tryAcquireNanos
是AQS
中的方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//判斷是否已被中斷
if (Thread.interrupted())
throw new InterruptedException();
//
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
關於方法 tryAcquire 在前面我們已經說過了。這裏就不再累贅了。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//時長小於等於0就沒有必要再嘗試獲取了,直接返回false沒拿到鎖
if (nanosTimeout <= 0L) return false;
//線程有效期期deadline
final long deadline = System.nanoTime() + nanosTimeout;
//創建一個結點node
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
//死循環嘗試獲取---自旋
for (;;) {
//獲取等待隊列的前驅結點
final Node p = node.predecessor();
//如果前驅是頭節點並且佔用鎖成功,則將當前節點變成頭結點
//tryAcquire方法前面已經說過了就是嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//成功了,將當前節點變成頭結點
setHead(node);
//方便GC
p.next = null;
failed = false;
return true;
}
//計算當前還剩多少時間
nanosTimeout = deadline - System.nanoTime();
//如果這個死循環把時間耗完了還拿到就返回false,沒拿到鎖
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//再次判斷是否被中斷了,中斷了直接拋中斷異常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驅的狀態
if (ws == Node.SIGNAL)
//如果已經告訴前驅拿完號後通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。
* 注意:那些放棄的結點,由於被自己“加塞”到它們前邊,它們相當於形成一個無引用鏈,
* 稍後就會被保安大叔趕走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號後通知自己一下。
//有可能失敗,人家說不定剛剛釋放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
超時獲取鎖的總結
如果超時時間設置小於等於 0,則直接返回獲取失敗。線程先入等待隊列,然後開始自旋,嘗試獲取鎖,獲取成功就返回,失敗則在隊列裏找一個安全點把自己掛起直到超時時間過期。
很多人可能會問:這裏爲什麼還需要循環呢?
因爲當前線程節點的前驅狀態可能不是 SIGNAL,那麼在當前這一輪循環中線程不會被掛起,然後更新超時時間,開始新一輪的嘗試
最後
好了,到此我們今天的內容就分享結束了,如果有疑問,或者對文章中某些知識講的不是很清楚的,可以加我微信,我們搬個板凳坐着聊。
參考
https://segmentfault.com/a/1190000015739343 https://blog.csdn.net/u010452388/article/details/90485326 https://www.cnblogs.com/waterystone/p/4920797.html
關注我,分享更多技術知識,記得把我公衆號設置爲 “星標”!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/EALE52sIS7OH4bIRTczPPw