有圖解有案例,我終於把 Condition 的原理講透徹了

哈嘍大家好,我是阿 Q!

20 張圖圖解 ReentrantLock 加鎖解鎖原理文章一發,便引發了大家激烈的討論,更有小夥伴前來彈窗:平時加解鎖都是直接使用Synchronized關鍵字來實現的,簡單好用,爲啥還要引用ReentrantLock呢?

爲了解決小夥伴的疑問,我們來對兩者做個簡單的比較吧:

相同點

兩者都是 “可重入鎖”,即當前線程獲取到鎖對象之後,如果想繼續獲取鎖對象還是可以繼續獲取的,只不過鎖對象的計數器進行“+1” 操作就可以了。

不同點

  1. ReentrantLock是基於API實現的,Synchronized是依賴於JVM實現的;

  2. ReentrantLock可以響應中斷,Synchronized是不可以的;

  3. ReentrantLock可以指定是公平鎖還是非公平鎖,而Synchronized只能是非公平鎖;

  4. ReentrantLocklock是同步非阻塞,採用的是樂觀併發策略,Synchronized是同步阻塞的,使用的是悲觀併發策略;

  5. ReentrantLock藉助Condition可以實現多路選擇通知,Synchronized通過wait()notify()/notifyAll()方法可以實現等待 / 通知機制(單路通知);

綜上所述,ReentrantLock還是有區別於Synchronized的使用場景的,今天我們就來聊一聊它的多路選擇通知功能。

實戰

沒有實戰的 “紙上談兵” 都是扯淡,今天我們反其道而行,先拋出實戰Demo

場景描述

加油站爲了吸引更多的車主前來加油,在加油站投放了自動洗車機來爲加油的汽車提供免費洗車服務。我們規定汽車必須按照 “加油 -> 洗車 ->駛離”的流程來加油,等前一輛汽車駛離之後才允許下一輛車進來加油。

代碼實現

首先創建鎖對象並生成三個Condition

/**
 * 控制線程喚醒的標誌
 */
private int flag = 1;

/**
 * 創建鎖對象
 */
private Lock lock = new ReentrantLock();

/**
 * 等待隊列
 * c1對應加油
 * c2對應洗車
 * c3對應開車
 */
Condition c1 = lock.newCondition();
Condition c2 =  lock.newCondition();
Condition c3 =  lock.newCondition();

然後聲明加油、清洗、駛離的方法,並規定加完油之後去洗車並駛離加油站

/**
 * 汽車加油
 */
public void fuelUp(int num) {
 lock.lock();
 try {
  while (flag!=1){
   c1.await();
  }
  System.out.println("第"+num+"輛車開始加油");
  flag = 2;
  c2.signal();
 } catch (InterruptedException e) {
  e.printStackTrace();
 } finally {
  lock.unlock();
 }

}

/**
 * 汽車清洗
 */
public void carWash(int num) {
 lock.lock();
 try {
  while (flag!=2){
   c2.await();
  }
  System.out.println("第"+num+"輛車開始清洗");
  flag = 3;
  c3.signal();
 } catch (InterruptedException e) {
  e.printStackTrace();
 } finally {
  lock.unlock();
 }
}

/**
 * 駛離
 */
public void drive(int num) {
 lock.lock();
 try {
  while (flag!=3){
   c3.await();
  }
  System.out.println("第"+num+"輛車已經駛離加油站");
  flag = 1;
  c1.signal();
 } catch (InterruptedException e) {
  e.printStackTrace();
 } finally {
  lock.unlock();
 }
}

其中await爲等待方法,signal爲喚醒方法。

最後我們來定義main方法,模擬一下 3 輛車同時到達加油站的場景

public static void main(String[] args) {
 CarOperation carOperation = new CarOperation();
 //汽車加油
 new Thread(()->{
  for (int i = 1; i < 4; i++) {
   carOperation.fuelUp(i);
  }
 },"fuelUp").start();

 //汽車清洗
 new Thread(()->{
  for (int i = 1; i < 4; i++) {
   carOperation.carWash(i);
  }
 },"carRepair").start();

 //駛離
 new Thread(()->{
  for (int i = 1; i < 4; i++) {
   carOperation.drive(i);
  }
 },"drive").start();
}

使用是不是很絲滑?爲了加深大家對Condition的理解,接下來我們用圖解的方式分析一波Condition的原理~

圖解

大家都看到了,上邊的案例都是圍繞Condition來操作的,那什麼是Condition呢?Condition是一個接口,裏邊定義了線程等待和喚醒的方法。

代碼中調用的lock.newCondition()實際調用的是Sync類中的newCondition方法,而ConditionObject就是Condition的實現類。

final ConditionObject newCondition() {
    return new ConditionObject();
}

我們發現它處於AQS的內部,沒法直接實例化,所以需要配合ReentrantLock來使用。

ConditionObject

ConditionObject內部維護了一個基於NodeFIFO單向隊列,我們把它稱爲等待隊列firstWaiter指向首節點,lastWaiter指向尾節點,Node中的nextWaiter指向隊列中的下一個元素,並且等待隊列中節點的waitStatus都是 - 2。

瞭解了ConditionObject的數據結構之後,我們就從源碼角度來圖解一下ReentrantLock的等待 / 喚醒機制。

await

首先找到AQS類中await的源碼

public final void await() throws InterruptedException {
 if (Thread.interrupted())
  throw new InterruptedException();
 //將當前線程封裝成node加入等待隊列尾部
 Node node = addConditionWaiter();
 int savedState = fullyRelease(node);
 int interruptMode = 0;
    //檢測此節點的線程是否在同步隊上,如果不在,則說明該線程還不具備競爭鎖的資格,則繼續等待直到檢測到此節點在同步隊列上
 while (!isOnSyncQueue(node)) {
        //當node處於等待隊列時,掛起當前線程。
  LockSupport.park(this);
        //如果發生了中斷,則跳出循環,結束等待
  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
   break;
 }
    //被喚醒後該節點一定會在AQS隊列上,
    //之前分析過acquireQueued方法獲取不到鎖會繼續阻塞
    //獲取到了鎖,中斷過返回true,未中斷過返回false
    //獲取到鎖存在中斷並且不是中斷喚醒的線程將中斷模式設置爲重新中斷
 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  interruptMode = REINTERRUPT;
 if (node.nextWaiter != null) // clean up if cancelled
        //清除條件隊列中所有狀態不爲 CONDITION 的結點
  unlinkCancelledWaiters();
 if (interruptMode != 0)
  reportInterruptAfterWait(interruptMode);
}

如果線程中斷,清除中斷標記並拋出異常。

查看 addConditionWaiter

該方法的作用是將當前線程封裝成node加入等待隊列尾部

private Node addConditionWaiter() {
 Node t = lastWaiter;
 if (t != null && t.waitStatus != Node.CONDITION) {
  //將不處於等待狀態的結點從等待隊列中移除
  unlinkCancelledWaiters();
  t = lastWaiter;
 }
 Node node = new Node(Thread.currentThread(), Node.CONDITION);
 //尾節點爲空
 if (t == null)
        //將首節點指向node
  firstWaiter = node;
 else
  //將尾節點的nextWaiter指向node節點
  t.nextWaiter = node;
 //尾節點指向node
 lastWaiter = node;
 return node;
}

首先將 t 指向尾節點,如果尾節點不爲空並且它的waitStatus!=-2,則將不處於等待狀態的結點從等待隊列中移除,並且將 t 指向新的尾節點。

將當前線程封裝成waitStatus爲 - 2 的節點追加到等待隊列尾部。

如果尾節點爲空,則隊列爲空,將首尾節點都指向當前節點。

如果尾節點不爲空,證明隊列中有其他節點,則將當前尾節點的nextWaiter指向當前節點,將當前節點置爲尾節點。

接着我們來查看下unlinkCancelledWaiters()方法——將不處於等待狀態的結點從等待隊列中移除。

private void unlinkCancelledWaiters() {
 Node t = firstWaiter;
 //trail是t的前驅結點
 Node trail = null;
 while (t != null) {
  //next爲t的後繼結點
  Node next = t.nextWaiter;
  //如果t節點的waitStatus不爲-2即失效節點
  if (t.waitStatus != Node.CONDITION) {
   t.nextWaiter = null;
   //如果t的前驅節點爲空,則將首節點指向next
   if (trail == null)
    firstWaiter = next;
   else
    //t的前驅結點不爲空,將前驅節點的後繼指針指向next
    trail.nextWaiter = next;
   //如果next爲null,則將尾節點指向t的前驅節點
   if (next == null)
    lastWaiter = trail;
  }
  else
   trail = t;
  t = next;
 }
}

t 爲當前節點,trail爲 t 的前驅節點,next爲 t 的後繼節點。

while方法會從首節點順着等待隊列往後尋找waitStatus!=-2的節點,將當前節點的nextWaiter置爲空。

如果當前節點的前驅節點爲空,代表當前節點爲首節點,則將 next 設置爲首節點;

如果不爲空,則將前驅節點的nextWaiter指向後繼節點。

如果後繼節點爲空,則直接將前驅節點設置爲尾節點。

查看 fullyRelease

從名字也差不多能明白該方法的作用是徹底釋放鎖資源。

final int fullyRelease(Node node) {
 //釋放鎖失敗爲true,釋放鎖成功爲false
 boolean failed = true;
 try {
     //獲取當前鎖的state
  int savedState = getState();
  //釋放鎖成功的話
  if (release(savedState)) {
   failed = false;
   return savedState;
  } else {
   throw new IllegalMonitorStateException();
  }
 } finally {
  if (failed)
   //釋放鎖失敗的話將節點狀態置爲取消
   node.waitStatus = Node.CANCELLED;
 }
}

最重要的就是release方法,而我們上文中已經講過了,release執行成功的話,當前線程已經釋放了鎖資源。

查看 isOnSyncQueue

判斷當前線程所在的Node是否在同步隊列中(同步隊列即AQS隊列)。在這裏有必要給大家看一下同步隊列與等待隊列的關係圖了。

final boolean isOnSyncQueue(Node node) {
 if (node.waitStatus == Node.CONDITION || node.prev == null)
  return false;
 if (node.next != null) 
  return true;
 //node節點的next爲null
 return findNodeFromTail(node);
}

如果當前節點的waitStatus=-2,說明它在等待隊列中,返回false;如果當前節點有前驅節點,則證明它在AQS隊列中,但是前驅節點爲空,說明它是頭節點,而頭節點是不參與鎖競爭的,也返回false

如果當前節點既不在等待隊列中,又不是AQS中的頭結點且存在next節點,說明它存在於AQS中,直接返回true

接着往下看,如果當前節點的next爲空,該節點可能是tail節點,也可能是該節點的next還未賦值,所以需要從後往前遍歷節點。

private boolean findNodeFromTail(Node node) {
 Node t = tail;
 for (;;) {
  //先用尾節點來判斷,然後用隊列中的節點依次來判斷
  if (t == node)
   return true;
  //節點爲空,說明找到頭也不在AQS隊列中,返回false
  if (t == null)
   return false;
  t = t.prev;
 }
}

在遍歷過程中,如果隊列中有節點等於當前節點,返回true;如果找到頭節點也沒找到,則返回false

我們回到awaitwhile循環處,如果返回false,說明該節點不在同步隊列中,進入循環中掛起該線程。

知識點補充

阿 Q 的理解是線程被喚醒會存在兩種情況:一種是調用signal/signalAll喚醒線程;一種是通過線程中斷信號,喚醒線程並拋出中斷異常。

查看 checkInterruptWhileWaiting(難點)

該方法的作用是判斷當前線程是否發生過中斷,如果未發生中斷返回0,如果發生了中斷返回1或者-1

private int checkInterruptWhileWaiting(Node node) {
 return Thread.interrupted() ?
  (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  0;
}

我們來看看transferAfterCancelledWait方法是如果區分1-1

final boolean transferAfterCancelledWait(Node node) {
 //cas嘗試將node的waitStatus設置爲0
 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  //將node節點由等待隊列加入AQS隊列
  enq(node);
  return true;
 }
 //cas失敗後,看看隊列是不是已經在AQS隊列中,如果不在,則通過yield方法給其它線程讓路
 while (!isOnSyncQueue(node))
  Thread.yield();
    //如果已經在AQS隊列中,則返回false
 return false;
}

那什麼情況下cas操作會成功?什麼情況下又會失敗呢?

當線程接收到中斷信號時會被喚醒,此時nodewaitStatus=-2,所以會cas成功,同時會將node從等待隊列轉移到AQS隊列中。

當線程先通過signal喚醒後接收到中斷信號,由於signal已經將nodewaitStatus設置爲 - 2 了,所以此時會cas失敗。

舉例

大家可以用下邊的例子在transferAfterCancelledWait中打斷點測試一下,相信就明瞭了。

public class CarOperation {
 //創建一個重入鎖
    private Lock lock = new ReentrantLock();

    //聲明等待隊列
    Condition c1 = lock.newCondition();
 
    /*
     * 等待操作
     */
 public void await() {
        lock.lock();
        try {
            System.out.println("開始阻塞");
            c1.await();
            System.out.println("喚醒之後繼續執行");
        } catch (InterruptedException e) {
            System.out.println("喚醒但是拋出異常了");
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

     /*
     * 喚醒操作
     */
    public void signal() {
        lock.lock();
        try {
            c1.signal();
            System.out.println("喚醒了。。。。。。。。。。。。。。");
        } finally {
            lock.unlock();
        }
    }
}

中斷測試

public static void main(String[] args) {
 CarOperation carOperation = new CarOperation();
 Thread t1 = new Thread(()->{
        //等待,掛起線程
  carOperation.await();
 });
 t1.start();
 try {
        //模擬其它線程搶佔資源執行過程
  Thread.sleep(10000);
        //發出線程中斷信號
  t1.interrupt();
 } catch (InterruptedException exception) {
  exception.printStackTrace();
 }
}

先喚醒後中斷測試

public static void main(String[] args) {
    CarOperation carOperation = new CarOperation();
    Thread t1 = new Thread(()->{
        carOperation.await();
    });
    t1.start();
    try {
        Thread.sleep(10000);
        //先喚醒線程
        carOperation.signal();
        //後中斷
        t1.interrupt();
    } catch (InterruptedException exception) {
        exception.printStackTrace();
    }
}

查看 reportInterruptAfterWait

//要麼拋出異常,要麼重新中斷。
private void reportInterruptAfterWait(int interruptMode)
 throws InterruptedException {
 if (interruptMode == THROW_IE)
  throw new InterruptedException();
 else if (interruptMode == REINTERRUPT)
  selfInterrupt();
}

以上就是await的全部內容了,我們先來做個簡單的總結。

總結

如果你哪個地方存在疑問可以小窗阿 Q!

signal

接下來我們再來捋一捋喚醒的過程

public final void signal() {
    //當前線程是否是鎖的持有者,不是的話拋出異常
 if (!isHeldExclusively())
  throw new IllegalMonitorStateException();
 Node first = firstWaiter;
 if (first != null)
        //具體的喚醒過程
  doSignal(first);
}

private void doSignal(Node first) {
 do {
        //獲取頭結點的下一個節點並賦值爲頭結點
  if ( (firstWaiter = first.nextWaiter) == null)
   lastWaiter = null;
        //將之前的頭節點置爲空
  first.nextWaiter = null;
        //將頭結點從等待隊列轉移到AQS隊列中,如果轉移失敗,則尋找下一個節點繼續轉移
 } while (!transferForSignal(first) &&
    (first = firstWaiter) != null);
}

首先將等待隊列的頭結點從等待隊列中取出來

然後執行transferForSignal方法進行轉移

final boolean transferForSignal(Node node) {
 //將node的waitStatus設置爲0,如果設置失敗說明node的節點已經不在等待隊列中了,返回false
 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  return false;
 //將node從等待隊列轉移到AQS隊列,並返回node的前驅節點
 Node p = enq(node);
 //獲取node前驅節點的狀態
 int ws = p.waitStatus;
 //如果該節點是取消狀態或者將其設置爲喚醒狀態失敗(說明本身已經是喚醒狀態了),所以可以去喚醒node節點所在的線程
 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  //喚醒當前節點
  LockSupport.unpark(node.thread);
 return true;
}

將等待隊列的頭結點從等待隊列轉移到AQS隊列中,如果轉移失敗,說明該節點已被取消,直接返回false,然後將first指向新的頭結點重新進行轉移。如果轉移成功則根據前驅節點的狀態判斷是否直接喚醒當前線程。

怎麼樣?喚醒的邏輯是不是超級簡單?我們也按例做個簡單的總結。

總結

從等待隊列的隊首開始,嘗試對隊首節點執行喚醒操作,如果節點已經被取消了,就嘗試喚醒下一個節點。

對首節點執行喚醒操作時,首先將節點轉移到同步隊列,如果前驅節點的狀態爲取消狀態或設置前驅節點的狀態爲喚醒狀態失敗,那麼就立即喚醒當前節點對應的線程,否則不執行喚醒操作。

以上就是今天的全部內容了,我們下期再見。感興趣的可以關注下公衆號,也可以來技術羣討論問題呦!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/UeeZ3gVLo8Ze1sCwJ9dn9Q