重磅出擊,20 張圖帶你徹底瞭解 ReentrantLock 加鎖解鎖的原理

哈嘍大家好,我是阿 Q。

最近是上班忙項目,下班帶娃,忙的不可開交,連摸魚的時間都沒有了。今天趁假期用圖解的方式從源碼角度給大家說一下ReentrantLock加鎖解鎖的全過程。繫好安全帶,發車了。

簡單使用

在聊它的源碼之前,我們先來做個簡單的使用說明。當我在IDEA中創建了一個簡單的Demo之後,它會給出以下提示

提示文字

在使用阻塞等待獲取鎖的方式中,必須在try代碼塊之外,並且在加鎖方法與try代碼塊之間沒有任何可能拋出異常的方法調用,避免加鎖成功後,在finally中無法解鎖。

java.concurrent.LockShouldWithTryFinallyRule.rule.desc

還舉了兩個例子,正確案例如下:

Lock lock = new XxxLock();
// ...
lock.lock();
try {
    doSomething();
    doOthers();
} finally {
    lock.unlock();
}

錯誤案例如下:

Lock lock = new XxxLock();
// ...
try {
    // 如果在此拋出異常,會直接執行 finally 塊的代碼
    doSomething();
    // 不管鎖是否成功,finally 塊都會執行
    lock.lock();
    doOthers();

} finally {
    lock.unlock();
}

AQS

上邊的案例中加鎖調用的是lock()方法,解鎖用的是unlock()方法,而通過查看源碼發現它們都是調用的內部靜態抽象類Sync的相關方法。

abstract static class Sync extends AbstractQueuedSynchronizer

Sync 是通過繼承AbstractQueuedSynchronizer來實現的,沒錯,AbstractQueuedSynchronizer就是AQS的全稱。AQS內部維護着一個FIFO的雙向隊列(CLH),ReentrantLock也是基於它來實現的,先來張圖感受下。

Node 屬性

//此處是 Node 的部分屬性
static final class Node {
 
 //排他鎖標識
 static final Node EXCLUSIVE = null;

 //如果帶有這個標識,證明是失效了
 static final int CANCELLED =  1;
 
 //具有這個標識,說明後繼節點需要被喚醒
 static final int SIGNAL = -1;

 //Node對象存儲標識的地方
 volatile int waitStatus;

 //指向上一個節點
 volatile Node prev;

 //指向下一個節點
 volatile Node next;
 
 //當前Node綁定的線程
 volatile Thread thread;
 
 //返回前驅節點即上一個節點,如果前驅節點爲空,拋出異常
 final Node predecessor() throws NullPointerException {
  Node p = prev;
  if (p == null)
   throw new NullPointerException();
  else
   return p;
 }
}

對於裏邊的waitStatus屬性,我們需要做個解釋:(非常重要)

AQS 屬性

// 頭結點
private transient volatile Node head;

// 尾結點
private transient volatile Node tail;

//0->1 拿到鎖,大於0 說明當前已經有線程佔用了鎖資源
private volatile int state;

今天我們先簡單瞭解下AQS的構造以幫助大家更好的理解ReentrantLock,至於深層次的東西先不做展開!

加鎖

AQS的結構有了基本瞭解之後,我們正式進入主題——加鎖。從源碼中可以看出鎖被分爲公平鎖非公平鎖

/**
 * 公平鎖代碼
 */
final void lock() {
    acquire(1);
}

/**
 * 非公平鎖代碼
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

初步查看代碼發現非公平鎖似乎包含公平鎖的邏輯,所以我們就從 “非公平鎖” 開始。

非公平鎖

final void lock() {
    //通過 CAS 的方式嘗試將 state 從0改爲1,
    //如果返回 true,代表修改成功,獲得鎖資源;
    //如果返回false,代表修改失敗,未獲取鎖資源
    if (compareAndSetState(0, 1))
        // 將屬性exclusiveOwnerThread設置爲當前線程,該屬性是AQS的父類提供的
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

compareAndSetState():底層調用的是unsafecompareAndSwapInt,該方法是原子操作;

假設有兩個線程(t1t2)在競爭鎖資源,線程 1 獲取鎖資源之後,執行setExclusiveOwnerThread操作,設置屬性值爲當前線程t1

此時,當t2想要獲取鎖資源,調用lock()方法之後,執行compareAndSetState(0, 1)返回false,會走else執行acquire()方法。

方法查看

public final void accquire(int arg) {
    // tryAcquire 再次嘗試獲取鎖資源,如果嘗試成功,返回true,嘗試失敗返回false
    if (!tryAcquire(arg) &&
        // 走到這,代表獲取鎖資源失敗,需要將當前線程封裝成一個Node,追加到AQS的隊列中
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 線程中斷
        selfInterrupt();
}

accquire()中涉及的方法比較多,我們將進行拆解,一個一個來分析,順序:tryAcquire() -> addWaiter() -> acquireQueued()

查看 tryAcquire() 方法

//AQS中
protected boolean tryAcquire(int arg) {
    //AQS 是基類,具體實現在自己的類中實現,我們去查看“非公平鎖”中的實現
    throw new UnsupportedOperationException();
}

//ReentrantLock 中
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}


final boolean nonfairTryAcquire(int acquires) {
 // 獲取當前線程
 final Thread current = Thread.currentThread();
 //獲取AQS 的 state 
 int c = getState();
 // 如果 state 爲0,代表嘗試再次獲取鎖資源
 if (c == 0) {
  // 步驟同上:通過 CAS 的方式嘗試將 state 從0改爲1,
  //如果返回 true,代表修改成功,獲得鎖資源;
  //如果返回false,代表修改失敗,未獲取鎖資源
  if (compareAndSetState(0, acquires)) {
   //設置屬性爲當前線程
   setExclusiveOwnerThread(current);
   return true;
  }
 }
 //當前佔有鎖資源的線程是否是當前線程,如果是則證明是可重入操作
 else if (current == getExclusiveOwnerThread()) {
  //將 state + 1
  int nextc = c + acquires;
  //爲什麼會小於 0 呢?因爲最大值 + 1 後會將符號位的0改爲1 會變成負數(可參考Integer.MAX_VALUE + 1)
  if (nextc < 0) // overflow
   //加1後小於0,超出鎖可重入的最大值,拋異常
   throw new Error("Maximum lock count exceeded");
  //設置 state 狀態
  setState(nextc);
  return true;
 }
 return false;
}

因爲線程 1 已經獲取到了鎖,此時state爲 1,所以不走nonfairTryAcquire()if。又因爲當前是線程 2,不是佔有當前鎖的線程 1,所以也不會走else if,即tryAcquire()方法返回false

查看 addWaiter() 方法

走到本方法中,代表獲取鎖資源失敗。addWaiter()將沒有獲取到鎖資源的線程甩到隊列的尾部。

private Node addWaiter(Node mode) {
 //創建 Node 類,並且設置 thread 爲當前線程,設置爲排它鎖
 Node node = new Node(Thread.currentThread(), mode);
 // 獲取 AQS 中隊列的尾部節點
 Node pred = tail;
 // 如果 tail == null,說明是空隊列,
 // 不爲 null,說明現在隊列中有數據,
 if (pred != null) {
  // 將當前節點的 prev 指向剛纔的尾部節點,那麼當前節點應該設置爲尾部節點
  node.prev = pred;
  // CAS 將 tail 節點設置爲當前節點
  if (compareAndSetTail(pred, node)) {
   // 將之前尾節點的 next 設置爲當前節點
   pred.next = node;
   // 返回當前節點
   return node;
  }
 }
 enq(node);
 return node;
}

tail不爲空,即隊列中有數據時,我們來圖解一下pred!=null代碼塊中的代碼。初始化狀態如下,pred指向尾節點,node指向新的節點。

node.prev = pred;node的前驅節點設置爲pred指向的節點

compareAndSetTail(pred, node)通過CAS的方式嘗試將當前節點node設置爲尾結點,此處我們假設設置成功,則FIFO隊列的tail指向node節點。

pred.next = node;pred節點的後繼節點設置爲node節點,此時node節點成功進入FIFO隊列尾部。

而當pred爲空,即隊列中沒有節點或將node節點設置爲尾結點失敗時,會走enq()方法。我們列舉的例子就符合pred爲空的情況,就讓我們以例子爲基礎繼續分析吧。

//現在沒人排隊,我是第一個 || 前邊CAS失敗也會進入這個位置重新往隊列尾巴去塞
private Node enq(final Node node) {
 //死循環
 for (;;) {
  //重新獲取tail節點
  Node t = tail;
  // 沒人排隊,隊列爲空
  if (t == null) {
   // 初始化一個 Node 爲 head,而這個head 沒有意義
   if (compareAndSetHead(new Node()))
    // 將頭尾都指向了這個初始化的Node,第一次循環結束
    tail = head;
  } else {
   // 有人排隊,往隊列尾巴塞
   node.prev = t;
   // CAS 將 tail 節點設置爲當前節點
   if (compareAndSetTail(t, node)) {
    //將之前尾節點的 next 設置爲當前節點
    t.next = node;
    return t;
   }
  }
 }
}

進入死循環,首先會走if方法的邏輯,通過CAS的方式嘗試將一個新節點設置爲head節點,然後將tail也指向新節點。可以看出隊列中的頭節點只是個初始化的節點,沒有任何意義。

繼續走死循環中的代碼,此時t不爲null,所以會走else方法。將node的前驅節點指向t,通過CAS方式將當前節點node設置爲尾結點,然後將t的後繼節點指向node。此時線程 2 的節點就被成功塞入FIFO隊列尾部。

查看 acquireQueued() 方法

將已經在隊列中的node嘗試去獲取鎖否則掛起。

final boolean acquireQueued(final Node node, int arg) {
 // 獲取鎖資源的標識,失敗爲 true,成功爲 false
 boolean failed = true;
 try {
  // 線程中斷的標識,中斷爲 true,不中斷爲 false
  boolean interrupted = false;
  for (;;) {
   // 獲取當前節點的上一個節點
   final Node p = node.predecessor();
   //p爲頭節點,嘗試獲取鎖操作
   if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null;
    // 將獲取鎖失敗標識置爲false
    failed = false;
    // 獲取到鎖資源,不會被中斷
    return interrupted;
   }
   // p 不是 head 或者 沒拿到鎖資源,
   if (shouldParkAfterFailedAcquire(p, node) &&
    // 基於 Unsafe 的 park方法,掛起線程
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {
  if (failed)
   cancelAcquire(node);
 }
}

這裏又出現了一次死循環,首先獲取當前節點的前驅節點 p,如果 p 是頭節點 (頭節點沒有意義),說明nodehead後的第一個節點,此時當前獲取鎖資源的線程 1 可能會釋放鎖,所以線程 2 可以再次嘗試獲取鎖。

假設獲取成功,證明拿到鎖資源了,將node節點設置爲head節點,並將node節點的prethread設置爲null。因爲拿到鎖資源了,node節點就不需要排隊了。

將頭節點 p 的next置爲null,此時 p 節點就不在隊列中存在了,可以幫助GC回收 (可達性分析)。failed設置爲false,表明獲取鎖成功;interruptedfalse,則線程不會中斷。

如果 p 不是head節點或者沒有拿到鎖資源,會執行下邊的代碼,因爲我們的線程 1 沒有釋放鎖資源,所以線程 2 獲取鎖失敗,會繼續往下執行。

//該方法的作用是保證上一個節點的waitStatus狀態爲-1(爲了喚醒後繼節點)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 //獲取上一個節點的狀態,該狀態爲-1,纔會喚醒下一個節點。
 int ws = pred.waitStatus;
 // 如果上一個節點的狀態是SIGNAL即-1,可以喚醒下一個節點,直接返回true
 if (ws == Node.SIGNAL)
  return true;
 // 如果上一個節點的狀態大於0,說明已經失效了
 if (ws > 0) {
  do {
   // 將node 的節點與 pred 的前一個節點相關聯,並將前一個節點賦值給 pred
   node.prev = pred = pred.prev;
  } while (pred.waitStatus > 0); // 一直找到小於等於0的
  // 將重新標識好的最近的有效節點的 next 指向當前節點
  pred.next = node;
 } else {
  // 小於等於0,但是不等於-1,將上一個有效節點狀態修改爲-1
  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
}

只有節點的狀態爲 - 1,纔會喚醒後一個節點,如果節點狀態未設置,默認爲 0。

圖解一下ws>0的過程,因爲ws>0的節點爲失效節點,所以do...while中會重複向前查找前驅節點,直到找到第一個ws<=0的節點爲止,將node節點掛到該節點上。

我們的pred是頭結點且未設置狀態,所以狀態爲 0,會走else。通過CAS嘗試將pred節點的waitStatus設置爲 - 1,表明node節點需要被pred喚醒。

shouldParkAfterFailedAcquire()返回false,繼續執行acquireQueued()中的死循環。

步驟和上邊一樣,node的前驅節點還是head,繼續嘗試獲取鎖。如果線程 1 釋放了鎖,線程 2 就可以拿到,返回true;否則繼續調用shouldParkAfterFailedAcquire(),因爲上一步已經將前驅結點的ws設置爲 - 1 了,所以直接返回true

執行parkAndCheckInterrupt()方法,通過UNSAFE.park();方法阻塞當前線程 2。等以後執行unpark方法的時候,如果node是頭節點後的第一個節點,會進入acquireQueued()方法中走if (p == head && tryAcquire(arg))的邏輯獲取鎖資源並結束死循環。

查看 cancelAcquire() 方法

該方法執行的機率約等於 0,爲什麼這麼說呢?因爲針對failed屬性,只有JVM內部出現問題時,纔可能出現異常,執行該方法。

// node 爲當前節點
private void cancelAcquire(Node node) {
 if (node == null)
  return;
 node.thread = null;
 // 上一個節點
 Node pred = node.prev;
 // 節點狀態大於0,說明節點失效
 while (pred.waitStatus > 0)
  node.prev = pred = pred.prev;

 // 將第一個不是失效節點的後繼節點聲明出來
 Node predNext = pred.next;
 // 節點狀態變爲失效
 node.waitStatus = Node.CANCELLED;
 // node爲尾節點,cas設置pred爲尾節點
 if (node == tail && compareAndSetTail(node, pred)) {
  //cas將pred的next設置爲null
  compareAndSetNext(pred, predNext, null);
 } else {
  int ws;
  // 中間節點
  // 如果上一個節點不是head 節點
  if (pred != head &&
   ((ws = pred.waitStatus) == Node.SIGNAL ||
    // 前邊已經判斷了大於0的操作,
    // pred 是需要喚醒後繼節點的,所以當 waitStatus 不爲 -1 時,需要將 pred 節點的 waitStatus 設置爲 -1 
    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
   pred.thread != null) {
   Node next = node.next;
   if (next != null && next.waitStatus <= 0)
    // CAS 嘗試將 pred 的 next 指向當前節點的 next
    compareAndSetNext(pred, predNext, next);
  } else {
   // head 節點,喚醒後繼節點
   unparkSuccessor(node);
  }

  node.next = node; // help GC
 }
}

執行到while時找到前驅節點中最近的有效節點,把當前節點node掛到有效節點後邊,可以過濾掉當前節點前的失效節點。聲明出有效節點的第一個後繼無效節點predNext,並把當前的node節點狀態設置爲失效狀態。

if中的操作:如果當前節點是尾節點,CAS嘗試將最近的有效節點設置爲尾節點,並將尾節點的next設置爲null

else中的操作:

如果pred節點不是頭結點即中間節點,並且predwaitStatus爲 - 1 或者waitStatus<=0,爲了讓pred節點能喚醒後繼節點,需要設置爲 - 1,並且pred節點的線程不爲空。獲取node節點的後繼節點,如果後繼節點有效,CAS嘗試將prednext指向node節點的next

當其他節點來找有效節點的時候走當前nodeprev這條線,而不是再一個一個往前找,可以提高效率。

如果是頭結點則喚醒後繼節點。

最後將node節點的next指向自己。

解鎖

釋放鎖是不區分公平鎖和非公平鎖的,釋放鎖的核心是將state由大於 0 的數置爲 0。廢話不多說,直接上代碼

//釋放鎖方法
public void unlock() {
 sync.release(1);
}


public final boolean release(int arg) {
  //嘗試釋放鎖資源,如果釋放成功,返回true
 if (tryRelease(arg)) {
  Node h = head;
  // head 不爲空且 head 的 ws 不爲0(如果爲0,代表後邊沒有其他線程掛起)
  if (h != null && h.waitStatus != 0)
   // AQS的隊列中有 node 在排隊,並且線程已經掛起
   // 需要喚醒被掛起的 Node
   unparkSuccessor(h);
  return true;
 }
 // 代表釋放一次沒有完全釋放
 return false;
}

如果釋放鎖成功,需要獲取head節點。如果頭結點不爲空且waitStatus不爲 0,則證明有node在排隊,執行喚醒掛起其他node的操作。

查看 tryRelease() 方法

protected final boolean tryRelease(int releases) {
 //獲取當前鎖的狀態,先進行減1操作,代表釋放一次鎖資源
 int c = getState() - releases;
 //如果釋放鎖的線程不是佔用鎖的線程,直接拋出異常
 if (Thread.currentThread() != getExclusiveOwnerThread())
  throw new IllegalMonitorStateException();
 boolean free = false;
 // 如果 c 爲0 ,代表鎖完全釋放了,如果不爲0,代表鎖之前重入了,一次沒釋放掉,等待下次再次執行時,再次判斷
 if (c == 0) {
  // 釋放鎖標誌爲 true,代表完全釋放了
  free = true;
  // 將佔用互斥鎖的標識置爲 null
  setExclusiveOwnerThread(null);
 }
 // 設置 state 狀態
 setState(c);
 return free;
}

我們的例子中線程 1 佔用鎖資源,線程 1 釋放鎖之後,state爲 0。進入if操作,將釋放標誌更新爲true,將FIFO隊列的exclusiveOwnerThread標誌置爲null

查看 unparkSuccessor() 方法

用於喚醒AQS中被掛起的線程。

// 注意當前的 node 節點是 head 節點
private void unparkSuccessor(Node node) {
 //獲取 head 的狀態
 int ws = node.waitStatus;
 if (ws < 0)
  // CAS 將 node 的 ws 設置爲0,代表當前 node 接下來會捨棄
  compareAndSetWaitStatus(node, ws, 0);

 // 獲取頭節點的下一個節點
 Node s = node.next;
 // 如果下一個節點爲null 或者 下一個節點爲失效節點,需要找到離 head 最近的有效node
 if (s == null || s.waitStatus > 0) {
  s = null;
  // 從尾節點開始往前找不等於null且不是node的節點
  for (Node t = tail; t != null && t != node; t = t.prev)
   // 如果該節點有效,則將s節點指向t節點
   if (t.waitStatus <= 0)
    s = t;
 }
  // 找到最近的node後,直接喚醒
 if (s != null)
  LockSupport.unpark(s.thread);
}

問題解析:爲什麼要從尾結點往前查找呢?

因爲在addWaiter方法中是先給prev指針賦值,最後纔將上一個節點的next指針賦值,爲了避免防止丟失節點或者跳過節點,必須從後往前找。

我們舉例中head節點的狀態爲-1,通過CAS的方式將head節點的waitStatus設置爲 0。

我們的頭結點的後繼節點是線程 2 所在的節點,不爲null,所以這邊會執行unpark操作,從上邊的acquireQueued()內的parkAndCheckInterrupt()方法繼續執行。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    //返回目標線程是否中斷的布爾值:中斷返回true,不中斷返回false,且返回後會重置中斷狀態爲未中斷
    return Thread.interrupted();
}

因爲線程 2 未中斷,所以返回false。繼續執行acquireQueued()中的死循環

for (;;) {
    // 獲取當前節點的上一個節點
    final Node p = node.predecessor();
    //p爲頭節點,嘗試獲取鎖操作
    if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null;
        // 將獲取鎖失敗標識置爲false
        failed = false;
        // 獲取到鎖資源,不會被中斷
        return interrupted;
    }
    // p 不是 head 或者 沒拿到鎖資源,
    if (shouldParkAfterFailedAcquire(p, node) &&
        // 基於 Unsafe 的 park方法,掛起線程
        parkAndCheckInterrupt())
        interrupted = true;
}

此時 p 是頭節點,且能獲取鎖成功,將exclusiveOwnerThread設置爲線程 2,即線程 2 獲取鎖資源。

node節點設置爲head節點,並將node節點的prethread設置爲null。因爲拿到鎖資源了,node節點就不需要排隊了。

將頭節點 p 的next置爲null,此時 p 節點就不在隊列中存在了,可以幫助GC回收 (可達性分析)。failed設置爲false,表明獲取鎖成功;interruptedfalse,則線程不會中斷。

爲什麼被喚醒的線程要調用 Thread.interrupted() 清除中斷標記

從上邊的方法可以看出,當parkAndCheckInterrupt()方法返回true時,即Thread.interrupted()方法返回了true,也就是該線程被中斷了。爲了讓被喚醒的線程繼續執行後續獲取鎖的操作,就需要讓中斷的線程像沒有被中斷過一樣繼續往下執行,所以在返回中斷標記的同時要清除中斷標記,將其設置爲false

清除中斷標記之後不代表該線程不需要中斷了,所以在parkAndCheckInterrupt()方法返回true時,要自己設置一箇中斷標誌interrupted = true,爲的就是當獲取到鎖資源執行完相關的操作之後進行中斷補償,故而需要執行selfInterrupt()方法中斷線程。

以上就是我們加鎖解鎖的圖解過程了。最後我們再來說一下公平鎖和非公平鎖的區別。

區別

前邊已經說過了,似乎非公平鎖包含了公平鎖的全部操作。打開公平鎖的代碼,我們發現accquire()方法中只有該方法的實現有點區別。

hasQueuedPredecessors()返回false時纔會嘗試獲取鎖資源。該方法代碼實現如下

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

總結

以上就是我們的全部內容了,我們在最後再做個總結:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/3tqBo47GtG3ljdrig2b8AA