Java 是如何實現線程間通信的?
世界以痛吻我,要我報之以歌 —— 泰戈爾《飛鳥集》
雖然通常每個子線程只需要完成自己的任務,但是有時我們希望多個線程一起工作來完成一個任務,這就涉及到線程間通信。
關於線程間通信本文涉及到的方法和類包括:thread.join()、object.wait()、object.notify()、CountdownLatch、CyclicBarrier、FutureTask、Callable。
接下來將用幾個例子來介紹如何在 Java 中實現線程間通信:
-
如何讓兩個線程依次執行,即一個線程等待另一個線程執行完成後再執行?
-
如何讓兩個線程以指定的方式有序相交執行?
-
有四個線程:A、B、C、D,如何實現 D 在 A、B、C 都同步執行完畢後執行?
-
三個運動員分開準備,然後在每個人準備好後同時開始跑步。
-
子線程完成任務後,將結果返回給主線程。
1. 如何讓兩個線程依次執行?
假設有兩個線程:A 和 B,這兩個線程都可以按照順序打印數字,代碼如下:
public class Test01 {
public static void main(String[] args) throws InterruptedException {
demo1();
}
public static void demo1() {
Thread a = new Thread(() -> {
printNumber("A");
});
Thread b = new Thread(() -> {
printNumber("B");
});
a.start();
b.start();
}
public static void printNumber(String threadName) {
int i = 0;
while (i++ < 3) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + " print: " + i);
}
}
}
得到的結果如下:
A print: 1
B print: 1
B print: 2
A print: 2
A print: 3
B print: 3
可以看到 A 和 B 同時打印數字,如果我們希望 B 在 A 執行完成之後開始執行,那麼可以使用 thread.join() 方法實現,代碼如下:
public static void demo2() {
Thread a = new Thread(() -> {
printNumber("A");
});
Thread b = new Thread(() -> {
System.out.println("B 等待 A 執行");
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
printNumber("B");
});
a.start();
b.start();
}
得到的結果如下:
B 等待 A 執行
A print: 1
A print: 2
A print: 3
B print: 1
B print: 2
B print: 3
我們可以看到該 a.join() 方法會讓 B 等待 A 完成打印。
thread.join() 方法的作用就是阻塞當前線程,等待調用 join() 方法的線程執行完畢後再執行後面的代碼。
查看 join() 方法的源碼,內部是調用了 join(0) ,如下:
public final void join() throws InterruptedException {
join(0);
}
查看 join(0) 的源碼如下:
// 注意這裏使用了 sychronized 加鎖,鎖對象是線程的實例對象
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
// 調用 join(0) 執行下面的代碼
if (millis == 0) {
// 這裏使用 while 循環的目的是爲了避免虛假喚醒
// 如果當前線程存活則調用 wait(0), 0 表示永久等待,直到調用 notifyAll() 或者 notify() 方法
// 當線程結束的時候會調用 notifyAll() 方法
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
從源碼中可以看出 join(long millis) 方法是通過 wait(long timeout) (Object 提供的方法)方法實現的,調用 wait 方法之前,當前線程必須獲得對象的鎖,所以此 join 方法使用了 synchronized 加鎖,鎖對象是線程的實例對象。其中 wait(0) 方法會讓當前線程阻塞等待,直到另一個線程調用此對象的 notify() 或者 notifyAll() 方法纔會繼續執行。當調用 join 方法的線程結束的時候會調用 notifyAll() 方法,所以 join() 方法可以實現一個線程等待另一個調用 join() 的線程結束後再執行。
虛假喚醒:一個線程在沒有被通知、中斷、超時的情況下被喚醒;
虛假喚醒可能導致條件不成立的情況下執行代碼,破壞被鎖保護的約束關係;
爲什麼使用 while 循環來避免虛假喚醒:
在 if 塊中使用 wait 方法,是非常危險的,因爲一旦線程被喚醒,並得到鎖,就不會再判斷 if 條件而執行 if 語句塊外的代碼,所以建議凡是先要做條件判斷,再 wait 的地方,都使用 while 循環來做,循環會在等待之前和之後對條件進行測試。
2. 如何讓兩個線程按照指定的方式有序相交?
如果現在我們希望 B 線程在 A 線程打印 1 後立即打印 1,2,3,然後 A 線程繼續打印 2,3,那麼我們需要更細粒度的鎖來控制執行順序。
在這裏,我們可以利用 object.wait() 和 object.notify() 方法,代碼如下:
public static void demo3() {
Object lock = new Object();
Thread A = new Thread(() -> {
synchronized (lock) {
System.out.println("A 1");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A 2");
System.out.println("A 3");
}
});
Thread B = new Thread(() -> {
synchronized (lock) {
System.out.println("B 1");
System.out.println("B 2");
System.out.println("B 3");
lock.notify();
}
});
A.start();
B.start();
}
得到的結果如下:
A 1
B 1
B 2
B 3
A 2
A 3
上述代碼的執行流程如下:
-
首先我們創建一個由 A 和 B 共享的對象鎖:lock = new Object();
-
當 A 拿到鎖時,先打印 1,然後調用 lock.wait() 方法進入等待狀態,然後交出鎖的控制權;
-
B 不會被執行,直到 A 調用該 lock.wait() 方法釋放控制權並且 B 獲得鎖;
-
B 拿到鎖後打印 1,2,3,然後調用 lock.notify() 方法喚醒正在等待的 A;
-
A 喚醒後繼續打印剩餘的 2,3。
爲了便於理解,我將上面的代碼添加了日誌,代碼如下:
public static void demo3() {
Object lock = new Object();
Thread A = new Thread(() -> {
System.out.println("INFO:A 等待獲取鎖");
synchronized (lock) {
System.out.println("INFO:A 獲取到鎖");
System.out.println("A 1");
try {
System.out.println("INFO:A 進入 waiting 狀態,放棄鎖的控制權");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("INFO:A 被 B 喚醒繼續執行");
System.out.println("A 2");
System.out.println("A 3");
}
});
Thread B = new Thread(() -> {
System.out.println("INFO:B 等待獲取鎖");
synchronized (lock) {
System.out.println("INFO:B 獲取到鎖");
System.out.println("B 1");
System.out.println("B 2");
System.out.println("B 3");
System.out.println("INFO:B 執行結束,調用 notify 方法喚醒 A");
lock.notify();
}
});
A.start();
B.start();
}
得到的結果如下:
INFO:A 等待獲取鎖
INFO:A 獲取到鎖
A 1
INFO:A 進入 waiting 狀態,放棄鎖的控制權
INFO:B 等待獲取鎖
INFO:B 獲取到鎖
B 1
B 2
B 3
INFO:B 執行結束,調用 notify 方法喚醒 A
INFO:A 被 B 喚醒繼續執行
A 2
A 3
3. 線程 D 在 A、B、C 都同步執行完畢後執行
thread.join() 前面介紹的方法允許一個線程在等待另一個線程完成運行後繼續執行。但是如果我們將 A、B、C 依次加入到 D 線程中,就會讓 A、B、C 依次執行,而我們希望它們三個同步運行。
我們要實現的目標是:A、B、C 三個線程可以同時開始運行,各自獨立運行完成後通知 D;D 不會開始運行,直到 A、B 和 C 都運行完畢。所以我們 CountdownLatch 用來實現這種類型的通信。它的基本用法是:
-
創建一個計數器,並設置一個初始值, CountdownLatch countDownLatch = new CountDownLatch(3);
-
調用 countDownLatch.await() 進入等待狀態,直到計數值變爲 0;
-
在其他線程調用 countDownLatch.countDown(),該方法會將計數值減一;
-
當計數器的值變爲 0 時,countDownLatch.await() 等待線程中的方法會繼續執行下面的代碼。
實現代碼如下:
public static void runDAfterABC() {
int count = 3;
CountDownLatch countDownLatch = new CountDownLatch(count);
new Thread(() -> {
System.out.println("INFO: D 等待 A B C 運行完成");
try {
countDownLatch.await();
System.out.println("INFO: A B C 運行完成,D 開始運行");
System.out.println("D is working");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
for (char threadName = 'A'; threadName <= 'C' ; threadName++) {
final String name = String.valueOf(threadName);
new Thread(() -> {
System.out.println(name + " is working");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " finished");
countDownLatch.countDown();
}).start();
}
}
得到的結果如下:
INFO: D 等待 A B C 運行完成
A is working
B is working
C is working
C finished
B finished
A finished
INFO: A B C 運行完成,D 開始運行
D is working
其實 CountDownLatch 它本身就是一個倒數計數器,我們把初始的 count 值設置爲 3。D 運行的時候,首先調用該 countDownLatch.await() 方法檢查計數器的值是否爲 0,如果不是 0 則保持等待狀態. A、B、C 運行完畢後,分別使用 countDownLatch.countDown() 方法將倒數計數器減 1。計數器將減爲 0,然後通知 await() 方法結束等待,D 開始繼續執行。
因此,CountDownLatch 適用於一個線程需要等待多個線程的情況。
4. 三個運動員分開準備同時開跑
這一次,A、B、C 這三個線程都需要分別準備,等三個線程都準備好後開始同時運行,我們應該如何做到這一點?
CountDownLatch 可以用來計數,但完成計數的時候,只有一個線程的一個 await() 方法會得到響應,所以多線程不能在同一時間被觸發。爲了達到線程相互等待的效果,我們可以使用該 CyclicBarrier,其基本用法爲:
-
首先創建一個公共對象 CyclicBarrier,並設置同時等待的線程數,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
-
這些線程同時開始準備,準備好後,需要等待別人準備好,所以調用 cyclicBarrier.await() 方法等待別人;
-
當指定的需要同時等待的線程都調用了該 cyclicBarrier.await() 方法時,意味着這些線程準備好了,那麼這些線程就會開始同時繼續執行。
想象一下有三個跑步者需要同時開始跑步,所以他們需要等待其他人都準備好,實現代碼如下:
public static void runABCWhenAllReady() {
int count = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
Random random = new Random();
for (char threadName = 'A'; threadName <= 'C' ; threadName++) {
final String name = String.valueOf(threadName);
new Thread(() -> {
int prepareTime = random.nextInt(10000);
System.out.println(name + " 準備時間:" + prepareTime);
try {
Thread.sleep(prepareTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " 準備好了,等待其他人");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " 開始跑步");
}).start();
}
}
得到結果如下:
A 準備時間:1085
B 準備時間:7729
C 準備時間:8444
A 準備好了,等待其他人
B 準備好了,等待其他人
C 準備好了,等待其他人
C 開始跑步
A 開始跑步
B 開始跑步
CyclicBarrier 的作用就是等待多個線程同時執行。
5. 子線程將結果返回給主線程
在實際開發中,往往我們需要創建子線程來做一些耗時的任務,然後將執行結果傳回主線程。那麼如何在 Java 中實現呢?
一般在創建線程的時候,我們會把 Runnable 對象傳遞給 Thread 執行,Runable 的源碼如下:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
可以看到 Runable 是一個函數式接口,該接口中的 run 方法沒有返回值,那麼如果要返回結果,可以使用另一個類似的接口 Callable。
函數式接口:只有一個方法的接口
Callable 接口的源碼如下:
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
可以看出,最大的區別 Callable 在於它返回的是泛型。
那麼接下來的問題是,如何將子線程的結果傳回去呢?Java 有一個類,FutureTask,它可以與 一起工作 Callable,但請注意,get 用於獲取結果的方法會阻塞主線程。FutureTask 本質上還是一個 Runnable,所以可以直接傳到 Thread 中。
public static void getResultInWorker() {
Callable<Integer> callable = () -> {
System.out.println("子任務開始執行");
Thread.sleep(1000);
int result = 0;
for (int i = 0; i <= 100; i++) {
result += i;
}
System.out.println("子任務執行完成並返回結果");
return result;
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
try {
System.out.println("開始執行 futureTask.get()");
Integer result = futureTask.get();
System.out.println("執行的結果:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
得到的結果如下:
開始執行 futureTask.get()
子任務開始執行
子任務執行完成並返回結果
執行的結果:5050
可以看出在主線程調用 futureTask.get() 方法時阻塞了主線程;然後 Callable 開始在內部執行並返回操作的結果;然後 futureTask.get() 得到結果,主線程恢復運行。
在這裏我們可以瞭解到,FutureTask 和 Callable 可以直接在主線程中獲取子線程的結果,但是它們會阻塞主線程。當然,如果你不希望阻塞主線程,可以考慮使用 ExecutorService 把 FutureTask 到線程池來管理執行。
來源:
https://juejin.cn/post/7004401589385609246
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/NDOCKAGV4l6ZjyBoeiOHAg