多線程間的 5 種通信方式

問題

有兩個線程,A 線程向一個集合裏面依次添加元素 “abc” 字符串,一共添加十次,當添加到第五次的時候,希望 B 線程能夠收到 A 線程的通知,然後 B 線程執行相關的業務操作。線程間通信的模型有兩種:共享內存和消息傳遞,以下方式都是基本這兩種模型來實現的。

一、使用 volatile 關鍵字

基於 volatile 關鍵字來實現線程間相互通信是使用共享內存的思想。大致意思就是多個線程同時監聽一個變量,當這個變量發生變化的時候 ,線程能夠感知並執行相應的業務。這也是最簡單的一種實現方式

public class TestSync {
    //定義共享變量來實現通信,它需要volatile修飾,否則線程不能及時感知
    static volatile boolean notice = false;

    public static void main(String[] args) {
        List<String>  list = new ArrayList<>();
        //線程A
        Thread threadA = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("線程A添加元素,此時list的size爲:" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    notice = true;
            }
        });
        //線程B
        Thread threadB = new Thread(() -> {
            while (true) {
                if (notice) {
                    System.out.println("線程B收到通知,開始執行自己的業務...");
                    break;
                }
            }
        });
        //需要先啓動線程B
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 再啓動線程A
        threadA.start();
    }
}

二、使用 Object 類的 wait()/notify()

Object 類提供了線程間通信的方法:wait()notify()notifyAll(),它們是多線程通信的基礎,而這種實現方式的思想自然是線程間通信。

注意:wait/notify 必須配合 synchronized 使用,wait 方法釋放鎖,notify 方法不釋放鎖。wait 是指在一個已經進入了同步鎖的線程內,讓自己暫時讓出同步鎖,以便其他正在等待此鎖的線程可以得到同步鎖並運行,只有其他線程調用了notify(),notify 並不釋放鎖,只是告訴調用過wait()的線程可以去參與獲得鎖的競爭了,但不是馬上得到鎖,因爲鎖還在別人手裏,別人還沒釋放,調用 wait() 的一個或多個線程就會解除 wait 狀態,重新參與競爭對象鎖,程序如果可以再次得到鎖,就可以繼續向下運行。

public class TestSync {
    public static void main(String[] args) {
        //定義一個鎖對象
        Object lock = new Object();
        List<String>  list = new ArrayList<>();
        // 線程A
        Thread threadA = new Thread(() -> {
            synchronized (lock) {
                for (int i = 1; i <= 10; i++) {
                    list.add("abc");
                    System.out.println("線程A添加元素,此時list的size爲:" + list.size());
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (list.size() == 5)
                        lock.notify();//喚醒B線程
                }
            }
        });
        //線程B
        Thread threadB = new Thread(() -> {
            while (true) {
                synchronized (lock) {
                    if (list.size() != 5) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("線程B收到通知,開始執行自己的業務...");
                }
            }
        });
        //需要先啓動線程B
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //再啓動線程A
        threadA.start();
    }
}

由輸出結果,在線程 A 發出 notify() 喚醒通知之後,依然是走完了自己線程的業務之後,線程 B 纔開始執行,正好說明 notify() 不釋放鎖,而 wait() 釋放鎖。

三、使用 JUC 工具類 CountDownLatch

jdk1.5 之後在java.util.concurrent包下提供了很多併發編程相關的工具類,簡化了併發編程代碼的書寫,CountDownLatch 基於 AQS 框架,相當於也是維護了一個線程間共享變量 state。

public class TestSync {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        List<String>  list = new ArrayList<>();
        //線程A
        Thread threadA = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("線程A添加元素,此時list的size爲:" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    countDownLatch.countDown();
            }
        });
        //線程B
        Thread threadB = new Thread(() -> {
            while (true) {
                if (list.size() != 5) {
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("線程B收到通知,開始執行自己的業務...");
                break;
            }
        });
        //需要先啓動線程B
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //再啓動線程A
        threadA.start();
    }
}

四、使用 ReentrantLock 結合 Condition

public class TestSync {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        List<String> list = new ArrayList<>();
        //線程A
        Thread threadA = new Thread(() -> {
            lock.lock();
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("線程A添加元素,此時list的size爲:" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    condition.signal();
            }
            lock.unlock();
        });
        //線程B
        Thread threadB = new Thread(() -> {
            lock.lock();
            if (list.size() != 5) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("線程B收到通知,開始執行自己的業務...");
            lock.unlock();
        });
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadA.start();
    }
}

這種方式使用起來並不是很好,代碼編寫複雜,而且線程 B 在被 A 喚醒之後由於沒有獲取鎖還是不能立即執行,也就是說,A 在喚醒操作之後,並不釋放鎖。這種方法跟 Object 的 wait()/notify() 一樣。

五、基本 LockSupport 實現線程間的阻塞和喚醒

LockSupport 是一種非常靈活的實現線程間阻塞和喚醒的工具,使用它不用關注是等待線程先進行還是喚醒線程先運行,但是得知道線程的名字。

public class TestSync {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        //線程B
        final Thread threadB = new Thread(() -> {
            if (list.size() != 5) {
                LockSupport.park();
            }
            System.out.println("線程B收到通知,開始執行自己的業務...");
        });
        //線程A
        Thread threadA = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("線程A添加元素,此時list的size爲:" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    LockSupport.unpark(threadB);
            }
        });
        threadA.start();
        threadB.start();
    }
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/CX1Lqr4eq2arpo6MJpVNrg