一文徹底搞懂線程安全問題

前言

關於線程安全問題是一塊非常基礎的知識,但基礎不代表簡單,一個人的基本功能往往能決定他是否可以寫出高質量、高性能的代碼。關於什麼是 synchronized、Lock、volatile,相信大家都能道出一二,但概念都懂一用就懵,一不小心還能寫出一個死鎖出來。

本文將基於生產者消費者模式加一個個具體案例,循序漸進的講解線程安全問題的誕生背景以及解決方案,一文幫你抓住 synchronized 的應用場景,以及與 Lock 的區別。

  1. 線程安全問題的誕生背景以及解決方式

1.1 爲什麼線程間需要通信?

線程是 CPU 執行的基本單位,爲了提高 CPU 的使用率以及模擬多個應用程序同時運行的場景,便衍生出了多線程的概念。

在 JVM 架構下堆內存、方法區是可以被線程共享的,那爲什麼要這樣設計呢?

舉個例子簡要描述下:

現要做一個網絡請求,請求響應後渲染到手機界面。Android 爲了提升用戶體驗將 main 線程當作 UI 線程,只做界面渲染,耗時操作應交由到工作線程。如若在 UI 線程執行耗時操作可能會出現阻塞現象,最直觀的感受就是界面卡死。網絡請求屬於 IO 操作會出現阻塞想象,前面提到 UI 線程不允許出現阻塞現象,所以網絡請求必須扔到工作線程,但拿到數據包後怎麼傳遞給 UI 線程呢?最常規的做法就是回調接口,將 HTTP 數據包解析成本地模型,再通過接口將本地模型對應的堆內存地址值傳遞到 UI 線程。

如果您正在學習 Spring Boot,那麼推薦一個連載多年還在繼續更新的免費教程:http://blog.didispace.com/spring-boot-learning-2x/

工作線程將堆內存對象地址值交給 UI 線程這一過程,就是線程間通信,也是 JVM 將堆內存設置爲線程共享的原因,關於線程間通信用一句通俗易懂的話描述就是:"多個線程操作同一資源",這一資源位於堆內存或方法區

1.2 單生產單消費引發的安全問題

"多個線程操作同一資源",聽起來如此的簡單,殊不知一不小心便可能引發致命問題。喲,此話怎講呢?,不急,容我娓娓道來...

案例

現有一個車輛公司,主要經營四輪小汽車和兩輪自行車,工人負責生產,銷售員負責售賣。

以上案例如何通過應用程序來實現?思路如下:

定義一個車輛資源類,可以設置爲小汽車和自行車

public class Resource {
    //一輛車對應一個id
    private int id;
    //車名
    private String name;
    //車的輪子數
    private int wheelNumber;
    //標記(後面會用到)
    private boolean flag = false;
    ...
    忽略setter、getter
    ...
    @Override
    public String toString() {
        return "id=" + id + "---  + wheelNumber;
    }
}

定義一個工人線程任務,專門用來生產四輪小汽車和倆輪自行車,爲生產者

public class Input implements Runnable{
    private Resource r;
    public Input(Resource r){
        this.r = r;
    }
    public void run() {
        //無限生產車輛
        for(int i =0;;i++){
            if(i%2==0){
                r.setId(i);//設置車的id
                r.setName("小汽車");//設置車類型
                r.setWheelNumber(4);//設置車的輪子數
            }else{
                r.setId(i);//設置車的id
                r.setName("電動車");//設置車類型
                r.setWheelNumber(2);//設置車的輪子數
            }
        }
    }
}

定義一個銷售員線程任務,專門用來銷售車輛,爲消費者

public class Output implements Runnable{
    private Resource r;
    public Output(Resource r){
        this.r = r;
    }
    public void run() {
        //無限消費車輛
        for(;;){
            //消費車輛
            System.out.println(r.toString());
        }
    }
}

開始生產、消費

//資源對象,對應車輛
Resource r = new Resource();
//生產者runnable,對應工人
Input in = new Input(r);
//消費者runnable,對應銷售員
Output out = new Output(r);
Thread t1 = new Thread(in);
Thread t2 = new Thread(out);
//開啓生產者線程
t1.start();
//開啓消費者線程
t2.start();

打印結果:

...
id=51--- name=電動車--- wheelNumber=2
id=52--- name=小汽車--- wheelNumber=2
...

一切有條不紊的進行,老闆數着鈔票那叫一個開心。喫水不忘挖井人,正當老闆準備給員工發獎金時,出現了一個嚴重問題 編號爲 52 的小汽車少裝了倆輪子!!!得,獎金不僅沒了,還得連夜排查問題

導致原因:

tips:流程對應上面打印結果。下同

工人:"生產到一半你銷售員就拿去賣了,這鍋我不背"

解決方案:

導致原因其實就是生產者對 Resource 的一次操作還未結束,消費者強行介入了。此時可以引入 synchronized 關鍵字,使得生產者一次工作結束前消費者不得介入

更改後的代碼如下:

#Input
public void run() {
   //無限生產車輛
   for(int i =0;;i++){
       synchronized(r){
           if(i%2==0){
               r.setId(i);//設置車的id
               r.setName("小汽車");//設置車類型
               r.setWheelNumber(4);//設置車的輪子數
           }else{
               r.setId(i);//設置車的id
               r.setName("電動車");//設置車類型
               r.setWheelNumber(2);//設置車的輪子數
           }
       }
    }      
}
    
#Output
public void run() {
   for(;;){
       synchronized(r){
           //消費車輛
           System.out.println(r.toString());
       }
   }
}

生產者和消費者 for 循環中都加了一個 synchronized,對應的鎖是 r,修改後重新執行

...
id=79--- name=電動車--- wheelNumber=2
id=80--- name=小汽車--- wheelNumber=4
id=80--- name=小汽車--- wheelNumber=4
...

一切又恢復了正常。但又暴露出一個更嚴重的問題,編號爲 80 的小汽車被消費 (銷售) 了兩次

也既銷售員把一輛車賣給了兩個客戶,真乃商業奇才啊!!!

導致原因:
解決方案:

產生問題的原因就是消費者把資源消費後未處於等待狀態,而是繼續消費。此時可以引入 wait、notify 機制,使得銷售員售賣完一輛車後處於等待狀態,當工人重新生產一輛新車後再通知銷售員,銷售員接收到工人消息後再進行售賣。

更改後的代碼如下:

#Input
public void run() {
    //無限生產車輛
    for(int i =0;;i++){
         synchronized(r){
              //flag爲true的時候代表已經生產過,此時將當前線程wait,等待消費者消費
              if(r.isFlag()){
                  try {
                      r.wait();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
              if(i%2==0){
                  r.setId(i);//設置車的id
                  r.setName("小汽車");//設置車的型號
                  r.setWheel(4);//設置車的輪子數
              }else{
                  r.setId(i);//設置車的id
                  r.setName("電動車");//設置車的型號
                  r.setWheel(2);//設置車的輪子數
              }
              r.setFlag(true);
              //將線程池中的線程喚醒
              r.notify();
        }
    }
}
#Output
public void run() {
    //無限消費車輛
    for(;;){
        synchronized(r){
             //flag爲false,代表當前生產的車已經被消費掉,
             //進入wait狀態等待生產者生產
             if(!r.isFlag()){
                 try {
                     r.wait();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
             //消費車輛
             System.out.println(r.toString());
             r.setFlag(false);
             //將線程池中的線程喚醒
             r.notify();
        }
    }
}

打印結果:

...
id=129--- name=電動車--- wheelNumber=2
id=130--- name=小汽車--- wheelNumber=4
id=131--- name=電動車--- wheelNumber=2
...

這次真的沒問題了,工人和銷售員都如願以償的拿到了老闆發的獎金

注意點 1:

synchronized 括號內傳入的是一把鎖,可以是任意類型的對象,生產者消費者必須使用同一把鎖才能實現同步操作。這樣設計的目的是爲了更靈活使用同步代碼塊,否則整個進程那麼多 synchronized,鎖誰不鎖誰根本不明確

注意點 2:

wait、notify 其實是 object 的方法,它們只能在 synchronized 代碼塊內由鎖進行調用,否則就會拋異常。每一把鎖對應線程池的一塊區域,被 wait 的線程會被放入到鎖對應的線程池區域,並且釋放鎖。notify 會隨機喚醒鎖對應線程池區域的任意一個線程,線程被喚醒後會重新上鎖,注意是隨機喚醒任意一個線程

  1. 由死鎖問題看顯示鎖 Lock 的應用場景

2.1 何爲死鎖?

關於死鎖,顧名思義應該是鎖死了,它可以使線程處於假死狀態但又沒真死,卡在半道又無法被回收。

舉個例子:

class Deadlock1 implements Runnable{
    private Object lock1;
    private Object lock2;
    public Deadlock1(Object obj1,Object obj2){
        this.lock1 = obj1;
        this.lock2 = obj2;
    }
    public void run() {
        while(true){
            synchronized(lock1){
                System.out.println("Deadlock1----lock1");
                synchronized(lock2){
                    System.out.println("Deadlock1----lock2");
                }
            }
        }
    }
}
class Deadlock2 implements Runnable{
    private Object lock1;
    private Object lock2;
    public Deadlock2(Object obj1,Object obj2){
        this.lock1 = obj1;
        this.lock2 = obj2;
    }
    public void run() {
        while(true){
            synchronized(lock2){
                System.out.println("Deadlock2----lock2");
                synchronized(lock1){
                    System.out.println("Deadlock2----lock1");
                }
            }
        }
    }
}
#運行
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
      Deadlock1 d1 = new Deadlock1(lock1,lock2);
      Deadlock2 d2 = new Deadlock2(lock1,lock2);
      Thread t1 = new Thread(d1);
      Thread t2 = new Thread(d2);
      t1.start();
      t2.start();
}

運行後打印結果:

Deadlock1----lock1
Deadlock2----lock2

run()方法中寫的是無限循環,按理來說應該是無限打印。但程序運行後,在我沒有終止控制檯的情況下只打印了這兩行數據。實際上這一過程引發了死鎖,具體緣由如下:

通過以上描述可知:線程 t1 持有線程 t2 需要的鎖進行等待,線程 t2 持有線程 t1 所需要的鎖進行等待,兩個線程各自拿着對方需要的鎖處於一種僵持現象,導致線程假死即死鎖

以上案例只是死鎖的一種,死鎖的標準就是判斷線程是否處於假死狀態

2.2 多生產多消費場景的死鎖如何避免?

第一小節主要是在講單生產單消費,爲了進一步提升運行效率可以適當引入多生產多消費,既多個生產者多個消費者。繼續引用第一小節案例,稍作改動:

//生產者任務
class Input implements Runnable{
    private Resource r;
    //將i寫爲成員變量而不是寫在for循環中是爲了方便講解下面多生產多消費的內容,沒必要糾結這點
    private int i = 0;
    public Input(Resource r){
        this.r = r;
    }
    public void run() {
        //無限生產車輛
        for(;;){
            synchronized(r){
                //flag爲true的時候代表已經生產過,此時將當前線程wait,等待消費者消費
                if(r.isFlag()){
                    try {
                        r.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if(i%2==0){
                    r.setId(i);//設置車的id
                    r.setName("小汽車");//設置車的型號
                    r.setWhell(4);//設置車的輪子數
                }else{
                    r.setId(i);//設置車的id
                    r.setName("電動車");//設置車的型號
                    r.setWhell(2);//設置車的輪子數
                }
                i++;
                r.setFlag(true);
                //將線程池中的線程喚醒
                r.notify();
            }
        }
    }
}

public static void main(String[] args) {
        Resource r = new Resource();
        Input in = new Input(r);
        Output out = new Output(r);
        Thread in1= new Thread(in);
        Thread in2 = new Thread(in);
        Thread out1 = new Thread(out);
        Thread out2 = new Thread(out);
        in1.start();//開啓生產者1線程
        in2 .start();//開啓生產者2線程
        out1 .start();//開啓消費者1線程
        out2 .start();//開啓消費者2線程
}

運行結果:

id=211--- name=自行車--- wheelNumber=2
id=220--- name=小汽車--- wheelNumber=4
id=220--- name=小汽車--- wheelNumber=4
id=220--- name=小汽車--- wheelNumber=4
...

安全問題又產生了,編號爲 211-220 的車輛未被打印,也即生產了未被消費。同時編號爲 220 的車輛被打印了三次。先彆着急,我接着給大家分析:

以上即爲編號 211-220 的車輛未被打印的原因,編號爲 220 車輛重複打印同理。

如何解決?其實很簡單,將生產者和消費者判斷 flag 地方的 if 更改成 while, 被喚醒後重新再判斷標記即可。代碼就不重複貼了,運行結果如下:

id=0--- name=小汽車--- wheelNumber=4
id=1--- name=電動車--- wheelNumber=2
id=2--- name=小汽車--- wheelNumber=4
id=3--- name=電動車--- wheelNumber=2
id=4--- name=小汽車--- wheelNumber=4

看起來很正常,但在我沒有關控制檯的情況下打印到編號爲 4 的車輛時停了,沒錯,死鎖出現了,具體原因如下:

如果您正在學習 Spring Boot,那麼推薦一個連載多年還在繼續更新的免費教程:http://blog.didispace.com/spring-boot-learning-2x/

所有生產者消費者線程都被 wait 掉了,導致了死鎖現象的產生。根本原因在於生產者 wait 後理應喚醒消費者,而不是喚醒生產者,object 還有一個方法notifyAll(),它可以喚醒鎖對應線程池區域的所有線程,所以將 notify 替換成 notifyAll 即可解決以上死鎖問題

2.3 通過 Lock 優雅的解決死鎖問題

2.2 提到的notifyAll是可以解決死鎖問題,但不夠優雅,因爲notifyAll()會喚醒對應線程池所有線程,單其實只需要喚醒一個即可,多了就會造成線程反覆被 wait,進而會造成性能問題。所以後來 Java 在 1.5 版本引入了顯示鎖 Lock 的概念,它可以靈活的指定 wait、notify 的作用域,專門用來解決此類問題。

通過顯示鎖 Lock 對 2.2 死鎖問題改進後代碼如下:

#生產者
class Input implements Runnable{
    private Resource r;
    private int i = 0;
    private Lock lock;
    private Condition in_con;//生產者監視器
    private Condition out_con;//消費者監視器
    public Input(Resource r,Lock lock,Condition in_con,Condition out_con){
        this.r = r;
        this.lock = lock;
        this.in_con = in_con;
        this.out_con = out_con;
    }
    public void run() {
        //無限生產車輛
        for(;;){
            lock.lock();//獲取鎖
            //flag爲true的時候代表已經生產過,此時將當前線程wait,等待消費者消費
            while(r.isFlag()){
                try {
                    in_con.await();//跟wait作用相同
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if(i%2==0){
                r.setId(i);//設置車的id
                r.setName("小汽車");//設置車的型號
                r.setWhell(4);//設置車的輪子數
            }else{
                r.setId(i);//設置車的id
                r.setName("電動車");//設置車的型號
                r.setWhell(2);//設置車的輪子數
            }
            i++;
            r.setFlag(true);
            //將線程池中的消費者線程喚醒
            out_con.signal();
            lock.unlock();//釋放鎖
        }
    }
}
//消費者
class Output implements Runnable{
    private Resource r;
    private Lock lock;
    private Condition in_con;//生產者監視器
    private Condition out_con;//消費者監視器
    public Output(Resource r,Lock lock,Condition in_con,Condition out_con){
        this.r = r;
        this.lock = lock;
        this.in_con = in_con;
        this.out_con = out_con;
    }
    public void run() {
        //無限消費車輛
        for(;;){
            lock.lock();//獲取鎖
            while(!r.isFlag()){
                try {
                    out_con.await();//將消費者線程wait
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(r.toString());
            r.setFlag(false);
            in_con.signal();//喚醒生產者線程
            lock.unlock();//釋放鎖
        }
    }
}
public static void main(String[] args) {
        Resource r = new Resource();
        Lock lock = new ReentrantLock();
        //生產者監視器
        Condition in_con = lock.newCondition();
        //消費者監視器
        Condition out_con = lock.newCondition();
        Input in = new Input(r,lock,in_con,out_con);
        Output out = new Output(r,lock,in_con,out_con);
        Thread t1 = new Thread(in);
        Thread t2 = new Thread(in);
        Thread t3 = new Thread(out);
        Thread t4 = new Thread(out);
        t1.start();//開啓生產者線程
        t2.start();//開啓生產者線程
        t3.start();//開啓消費者線程
        t4.start();//開啓消費者線程
    }

這次就真的沒問題了。其中 Lock 對應 synchronized,Condition 爲 Lock 下的監視器,每一個監視器對應一個 wait、notify 作用域,註釋寫的很清楚就不再贅述

綜上所述

本想一文理清所有關於線程安全的問題,但到這發現篇幅已經很長啦,爲了不影響閱讀體驗先到此爲止吧~~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/0X1aHH06UEOvK9-g9--v7A