一文徹底搞懂線程安全問題
前言
關於線程安全問題是一塊非常基礎的知識,但基礎不代表簡單,一個人的基本功能往往能決定他是否可以寫出高質量、高性能的代碼。關於什麼是 synchronized、Lock、volatile,相信大家都能道出一二,但概念都懂一用就懵,一不小心還能寫出一個死鎖出來。
本文將基於生產者消費者模式加一個個具體案例,循序漸進的講解線程安全問題的誕生背景以及解決方案,一文幫你抓住 synchronized 的應用場景,以及與 Lock 的區別。
- 線程安全問題的誕生背景以及解決方式
1.1 爲什麼線程間需要通信?
線程是 CPU 執行的基本單位,爲了提高 CPU 的使用率以及模擬多個應用程序同時運行的場景,便衍生出了多線程的概念。
在 JVM 架構下堆內存、方法區是可以被線程共享的,那爲什麼要這樣設計呢?
舉個例子簡要描述下:
現要做一個網絡請求,請求響應後渲染到手機界面。Android 爲了提升用戶體驗將 main 線程當作 UI 線程,只做界面渲染,耗時操作應交由到工作線程。如若在 UI 線程執行耗時操作可能會出現阻塞現象,最直觀的感受就是界面卡死。網絡請求屬於 IO 操作會出現阻塞想象,前面提到 UI 線程不允許出現阻塞現象,所以網絡請求必須扔到工作線程,但拿到數據包後怎麼傳遞給 UI 線程呢?最常規的做法就是回調接口,將 HTTP 數據包解析成本地模型,再通過接口將本地模型對應的堆內存地址值傳遞到 UI 線程。
如果您正在學習 Spring Boot,那麼推薦一個連載多年還在繼續更新的免費教程:http://blog.didispace.com/spring-boot-learning-2x/
工作線程將堆內存對象地址值交給 UI 線程這一過程,就是線程間通信,也是 JVM 將堆內存設置爲線程共享的原因,關於線程間通信用一句通俗易懂的話描述就是:"多個線程操作同一資源",這一資源位於堆內存或方法區
1.2 單生產單消費引發的安全問題
"多個線程操作同一資源",聽起來如此的簡單,殊不知一不小心便可能引發致命問題。喲,此話怎講呢?,不急,容我娓娓道來...
案例
現有一個車輛公司,主要經營四輪小汽車和兩輪自行車,工人負責生產,銷售員負責售賣。
以上案例如何通過應用程序來實現?思路如下:
定義一個車輛資源類,可以設置爲小汽車和自行車
public class Resource {
//一輛車對應一個id
private int id;
//車名
private String name;
//車的輪子數
private int wheelNumber;
//標記(後面會用到)
private boolean flag = false;
...
忽略setter、getter
...
@Override
public String toString() {
return "id=" + id + "--- + wheelNumber;
}
}
定義一個工人線程任務,專門用來生產四輪小汽車和倆輪自行車,爲生產者
public class Input implements Runnable{
private Resource r;
public Input(Resource r){
this.r = r;
}
public void run() {
//無限生產車輛
for(int i =0;;i++){
if(i%2==0){
r.setId(i);//設置車的id
r.setName("小汽車");//設置車類型
r.setWheelNumber(4);//設置車的輪子數
}else{
r.setId(i);//設置車的id
r.setName("電動車");//設置車類型
r.setWheelNumber(2);//設置車的輪子數
}
}
}
}
定義一個銷售員線程任務,專門用來銷售車輛,爲消費者
public class Output implements Runnable{
private Resource r;
public Output(Resource r){
this.r = r;
}
public void run() {
//無限消費車輛
for(;;){
//消費車輛
System.out.println(r.toString());
}
}
}
開始生產、消費
//資源對象,對應車輛
Resource r = new Resource();
//生產者runnable,對應工人
Input in = new Input(r);
//消費者runnable,對應銷售員
Output out = new Output(r);
Thread t1 = new Thread(in);
Thread t2 = new Thread(out);
//開啓生產者線程
t1.start();
//開啓消費者線程
t2.start();
打印結果:
...
id=51--- name=電動車--- wheelNumber=2
id=52--- name=小汽車--- wheelNumber=2
...
一切有條不紊的進行,老闆數着鈔票那叫一個開心。喫水不忘挖井人,正當老闆準備給員工發獎金時,出現了一個嚴重問題 編號爲 52 的小汽車少裝了倆輪子!!!得,獎金不僅沒了,還得連夜排查問題
導致原因:
tips:流程對應上面打印結果。下同
-
生產者線程得到 CPU 執行權,將 name 和
wheelNumber
分別設置爲電動車和 2,隨後 CPU 切換到了消費者線程。 -
消費者線程得到 CPU 執行權,此時 name 和
wheelNumber
別爲電動車和 2,隨後打印name=電動車--- wheelNumber=2
,CPU 切換到了生產者線程。 -
生產者線程再次得到 CPU 執行權,將 name 設置爲小汽車 (未對 wheelNumber 進行設置),此時 name 和
wheelNumber
分別爲小汽車和 2,CPU 切換到了消費者線程。 -
消費者線程得到 CPU 執行權,此時 name 和
wheelNumber
別爲小汽車和 2,隨後打印name=小汽車--- wheelNumber=2
-
如果您正在學習 Spring Boot,那麼推薦一個連載多年還在繼續更新的免費教程:http://blog.didispace.com/spring-boot-learning-2x/
工人:"生產到一半你銷售員就拿去賣了,這鍋我不背"
解決方案:
導致原因其實就是生產者對 Resource 的一次操作還未結束,消費者強行介入了。此時可以引入 synchronized 關鍵字,使得生產者一次工作結束前消費者不得介入
更改後的代碼如下:
#Input
public void run() {
//無限生產車輛
for(int i =0;;i++){
synchronized(r){
if(i%2==0){
r.setId(i);//設置車的id
r.setName("小汽車");//設置車類型
r.setWheelNumber(4);//設置車的輪子數
}else{
r.setId(i);//設置車的id
r.setName("電動車");//設置車類型
r.setWheelNumber(2);//設置車的輪子數
}
}
}
}
#Output
public void run() {
for(;;){
synchronized(r){
//消費車輛
System.out.println(r.toString());
}
}
}
生產者和消費者 for 循環中都加了一個 synchronized,對應的鎖是 r,修改後重新執行
...
id=79--- name=電動車--- wheelNumber=2
id=80--- name=小汽車--- wheelNumber=4
id=80--- name=小汽車--- wheelNumber=4
...
一切又恢復了正常。但又暴露出一個更嚴重的問題,編號爲 80 的小汽車被消費 (銷售) 了兩次
也既銷售員把一輛車賣給了兩個客戶,真乃商業奇才啊!!!
導致原因:
-
生產者線程得到 CPU 執行權,將 name 和
wheelNumber
分別設置爲小汽車和 4,隨後 CPU 執行權切換到了消費者線程。 -
消費者線程得到 CPU 執行權,此時 name 和
wheelNumber
別爲小汽車和 4,隨後打印name=小汽車--- wheelNumber=4
,但消費後 CPU 執行權並未切換到生產者線程,而是由消費者線程繼續執行,於是就出現了編號爲 80 的小汽車被打印(消費)了兩次
解決方案:
產生問題的原因就是消費者把資源消費後未處於等待狀態,而是繼續消費。此時可以引入 wait、notify 機制,使得銷售員售賣完一輛車後處於等待狀態,當工人重新生產一輛新車後再通知銷售員,銷售員接收到工人消息後再進行售賣。
更改後的代碼如下:
#Input
public void run() {
//無限生產車輛
for(int i =0;;i++){
synchronized(r){
//flag爲true的時候代表已經生產過,此時將當前線程wait,等待消費者消費
if(r.isFlag()){
try {
r.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(i%2==0){
r.setId(i);//設置車的id
r.setName("小汽車");//設置車的型號
r.setWheel(4);//設置車的輪子數
}else{
r.setId(i);//設置車的id
r.setName("電動車");//設置車的型號
r.setWheel(2);//設置車的輪子數
}
r.setFlag(true);
//將線程池中的線程喚醒
r.notify();
}
}
}
#Output
public void run() {
//無限消費車輛
for(;;){
synchronized(r){
//flag爲false,代表當前生產的車已經被消費掉,
//進入wait狀態等待生產者生產
if(!r.isFlag()){
try {
r.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消費車輛
System.out.println(r.toString());
r.setFlag(false);
//將線程池中的線程喚醒
r.notify();
}
}
}
打印結果:
...
id=129--- name=電動車--- wheelNumber=2
id=130--- name=小汽車--- wheelNumber=4
id=131--- name=電動車--- wheelNumber=2
...
這次真的沒問題了,工人和銷售員都如願以償的拿到了老闆發的獎金
注意點 1:
synchronized 括號內傳入的是一把鎖,可以是任意類型的對象,生產者消費者必須使用同一把鎖才能實現同步操作。這樣設計的目的是爲了更靈活使用同步代碼塊,否則整個進程那麼多 synchronized,鎖誰不鎖誰根本不明確
注意點 2:
wait、notify 其實是 object 的方法,它們只能在 synchronized 代碼塊內由鎖進行調用,否則就會拋異常。每一把鎖對應線程池的一塊區域,被 wait 的線程會被放入到鎖對應的線程池區域,並且釋放鎖。notify 會隨機喚醒鎖對應線程池區域的任意一個線程,線程被喚醒後會重新上鎖,注意是隨機喚醒任意一個線程
- 由死鎖問題看顯示鎖 Lock 的應用場景
2.1 何爲死鎖?
關於死鎖,顧名思義應該是鎖死了,它可以使線程處於假死狀態但又沒真死,卡在半道又無法被回收。
舉個例子:
class Deadlock1 implements Runnable{
private Object lock1;
private Object lock2;
public Deadlock1(Object obj1,Object obj2){
this.lock1 = obj1;
this.lock2 = obj2;
}
public void run() {
while(true){
synchronized(lock1){
System.out.println("Deadlock1----lock1");
synchronized(lock2){
System.out.println("Deadlock1----lock2");
}
}
}
}
}
class Deadlock2 implements Runnable{
private Object lock1;
private Object lock2;
public Deadlock2(Object obj1,Object obj2){
this.lock1 = obj1;
this.lock2 = obj2;
}
public void run() {
while(true){
synchronized(lock2){
System.out.println("Deadlock2----lock2");
synchronized(lock1){
System.out.println("Deadlock2----lock1");
}
}
}
}
}
#運行
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
Deadlock1 d1 = new Deadlock1(lock1,lock2);
Deadlock2 d2 = new Deadlock2(lock1,lock2);
Thread t1 = new Thread(d1);
Thread t2 = new Thread(d2);
t1.start();
t2.start();
}
運行後打印結果:
Deadlock1----lock1
Deadlock2----lock2
run()
方法中寫的是無限循環,按理來說應該是無限打印。但程序運行後,在我沒有終止控制檯的情況下只打印了這兩行數據。實際上這一過程引發了死鎖,具體緣由如下:
-
線程 t1 執行,判斷了第一個同步代碼塊,此時鎖 lock1 可用,於是持着鎖 lock1 進入了第一個同步代碼塊,打印了:
Deadlock1----lock1
,隨後線程切換到了線程 t2 -
線程 t2 執行,判斷第一個同步代碼塊,此時鎖 lock2 可用,於是持着鎖 lock2 進入了第一個同步代碼塊,打印了:
Deadlock2----lock2
,接着向下執行,判斷鎖 lock1 不可用 (因爲鎖 lock1 已經被線程 t1 所佔用),於是線程 t1 進行等待. 隨後再次切換到線程 t1 -
線程 t1 執行,判斷第二個同步代碼塊,此時鎖 lock2 不可用 (因爲所 lock2 已經被線程 t2 所佔用),線程 t1 也進入了等待狀態
通過以上描述可知:線程 t1 持有線程 t2 需要的鎖進行等待,線程 t2 持有線程 t1 所需要的鎖進行等待,兩個線程各自拿着對方需要的鎖處於一種僵持現象,導致線程假死即死鎖
以上案例只是死鎖的一種,死鎖的標準就是判斷線程是否處於假死狀態
2.2 多生產多消費場景的死鎖如何避免?
第一小節主要是在講單生產單消費,爲了進一步提升運行效率可以適當引入多生產多消費,既多個生產者多個消費者。繼續引用第一小節案例,稍作改動:
//生產者任務
class Input implements Runnable{
private Resource r;
//將i寫爲成員變量而不是寫在for循環中是爲了方便講解下面多生產多消費的內容,沒必要糾結這點
private int i = 0;
public Input(Resource r){
this.r = r;
}
public void run() {
//無限生產車輛
for(;;){
synchronized(r){
//flag爲true的時候代表已經生產過,此時將當前線程wait,等待消費者消費
if(r.isFlag()){
try {
r.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(i%2==0){
r.setId(i);//設置車的id
r.setName("小汽車");//設置車的型號
r.setWhell(4);//設置車的輪子數
}else{
r.setId(i);//設置車的id
r.setName("電動車");//設置車的型號
r.setWhell(2);//設置車的輪子數
}
i++;
r.setFlag(true);
//將線程池中的線程喚醒
r.notify();
}
}
}
}
public static void main(String[] args) {
Resource r = new Resource();
Input in = new Input(r);
Output out = new Output(r);
Thread in1= new Thread(in);
Thread in2 = new Thread(in);
Thread out1 = new Thread(out);
Thread out2 = new Thread(out);
in1.start();//開啓生產者1線程
in2 .start();//開啓生產者2線程
out1 .start();//開啓消費者1線程
out2 .start();//開啓消費者2線程
}
運行結果:
id=211--- name=自行車--- wheelNumber=2
id=220--- name=小汽車--- wheelNumber=4
id=220--- name=小汽車--- wheelNumber=4
id=220--- name=小汽車--- wheelNumber=4
...
安全問題又產生了,編號爲 211-220 的車輛未被打印,也即生產了未被消費。同時編號爲 220 的車輛被打印了三次。先彆着急,我接着給大家分析:
-
生產者線程 in1 得到執行權,生產了 id 爲 211 的車輛,將 flag 置爲 true,循環回來再判斷標記爲 true,此時執
wait()
方法進入等待狀態 -
生產者線程 in2 得到執行權,判斷標記爲 true,執行
wait()
方法進入等待狀態。 -
消費者線程 out1 得到執行權,判斷標記爲 true,不進行等待而是選擇了消費 id 爲 211 的車輛,消費完畢後將標記置爲 false 並執行
notify()
將線程池中的任意一個線程給喚醒,假設喚醒的是 in1 -
生產者線程 in1 再次得到執行權,此時生產者線程 in1 被喚醒後不會判斷標記而是選擇生產一輛 id 爲 1 的車輛,隨後將標記置爲 true 並執行
notify()
將線程池中任意一個線程給喚醒,假設喚醒的是 in2 -
生產者線程 in2 再次得到執行權,此時生產者線程 in2 被喚醒後不會判斷標記而是直接生產了一輛 id 爲 212 的車輛,隨後喚醒 in1 生產 id 爲 213 的車輛,再喚醒 in2.....
以上即爲編號 211-220 的車輛未被打印的原因,編號爲 220 車輛重複打印同理。
如何解決?其實很簡單,將生產者和消費者判斷 flag 地方的 if 更改成 while, 被喚醒後重新再判斷標記即可。代碼就不重複貼了,運行結果如下:
id=0--- name=小汽車--- wheelNumber=4
id=1--- name=電動車--- wheelNumber=2
id=2--- name=小汽車--- wheelNumber=4
id=3--- name=電動車--- wheelNumber=2
id=4--- name=小汽車--- wheelNumber=4
看起來很正常,但在我沒有關控制檯的情況下打印到編號爲 4 的車輛時停了,沒錯,死鎖出現了,具體原因如下:
-
線程 in1 開始執行,生產了一輛車將 flag 置爲 true,循環回來判斷 flag 進入
wait()
狀態,此時線程池中進行等待的線程有:in1 -
線程 in2 開始執行,判斷 flag 爲 true 進入
wait()
狀態,此時線程池中進行等待的線程有:in1,in2 -
線程 out1 開始執行,判斷 flag 爲 true,消費了一輛汽車將 flag 置爲 false 並喚醒一個線程,我們假定喚醒的爲 in1(這裏需要注意,被喚醒並不意味着會立刻執行,只是當前具備着執行資格但並不具備執行權),線程 out1 循環回來判讀 flag 進入 wait 狀態,此時線程池中的線程有 in2,out1,隨後 out2 得到執行權
-
線程 out2 開始執行,判斷標記爲 false,進入等待狀態,此時線程池中的線程有 in2,out1,out2
-
線程 in1 開始執行,判斷標記爲 false,生產了一輛汽車必將 flag 置爲 true 並喚醒線程池中的一個線程,我們假定喚醒的是 in2,隨後 in1 循環判斷 flag 進入
wait()
狀態,此時線程池中的線程有 in1,out1,out2 -
線程 int2 得到執行權,判斷標記爲 false,進入
wait()
狀態,此時線程池中的線程有 in1,in2,out1,out2
如果您正在學習 Spring Boot,那麼推薦一個連載多年還在繼續更新的免費教程:http://blog.didispace.com/spring-boot-learning-2x/
所有生產者消費者線程都被 wait 掉了,導致了死鎖現象的產生。根本原因在於生產者 wait 後理應喚醒消費者,而不是喚醒生產者,object 還有一個方法notifyAll()
,它可以喚醒鎖對應線程池區域的所有線程,所以將 notify 替換成 notifyAll 即可解決以上死鎖問題
2.3 通過 Lock 優雅的解決死鎖問題
2.2 提到的notifyAll
是可以解決死鎖問題,但不夠優雅,因爲notifyAll()
會喚醒對應線程池所有線程,單其實只需要喚醒一個即可,多了就會造成線程反覆被 wait,進而會造成性能問題。所以後來 Java 在 1.5 版本引入了顯示鎖 Lock 的概念,它可以靈活的指定 wait、notify 的作用域,專門用來解決此類問題。
通過顯示鎖 Lock 對 2.2 死鎖問題改進後代碼如下:
#生產者
class Input implements Runnable{
private Resource r;
private int i = 0;
private Lock lock;
private Condition in_con;//生產者監視器
private Condition out_con;//消費者監視器
public Input(Resource r,Lock lock,Condition in_con,Condition out_con){
this.r = r;
this.lock = lock;
this.in_con = in_con;
this.out_con = out_con;
}
public void run() {
//無限生產車輛
for(;;){
lock.lock();//獲取鎖
//flag爲true的時候代表已經生產過,此時將當前線程wait,等待消費者消費
while(r.isFlag()){
try {
in_con.await();//跟wait作用相同
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(i%2==0){
r.setId(i);//設置車的id
r.setName("小汽車");//設置車的型號
r.setWhell(4);//設置車的輪子數
}else{
r.setId(i);//設置車的id
r.setName("電動車");//設置車的型號
r.setWhell(2);//設置車的輪子數
}
i++;
r.setFlag(true);
//將線程池中的消費者線程喚醒
out_con.signal();
lock.unlock();//釋放鎖
}
}
}
//消費者
class Output implements Runnable{
private Resource r;
private Lock lock;
private Condition in_con;//生產者監視器
private Condition out_con;//消費者監視器
public Output(Resource r,Lock lock,Condition in_con,Condition out_con){
this.r = r;
this.lock = lock;
this.in_con = in_con;
this.out_con = out_con;
}
public void run() {
//無限消費車輛
for(;;){
lock.lock();//獲取鎖
while(!r.isFlag()){
try {
out_con.await();//將消費者線程wait
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(r.toString());
r.setFlag(false);
in_con.signal();//喚醒生產者線程
lock.unlock();//釋放鎖
}
}
}
public static void main(String[] args) {
Resource r = new Resource();
Lock lock = new ReentrantLock();
//生產者監視器
Condition in_con = lock.newCondition();
//消費者監視器
Condition out_con = lock.newCondition();
Input in = new Input(r,lock,in_con,out_con);
Output out = new Output(r,lock,in_con,out_con);
Thread t1 = new Thread(in);
Thread t2 = new Thread(in);
Thread t3 = new Thread(out);
Thread t4 = new Thread(out);
t1.start();//開啓生產者線程
t2.start();//開啓生產者線程
t3.start();//開啓消費者線程
t4.start();//開啓消費者線程
}
這次就真的沒問題了。其中 Lock 對應 synchronized,Condition 爲 Lock 下的監視器,每一個監視器對應一個 wait、notify 作用域,註釋寫的很清楚就不再贅述
綜上所述
-
多線程是用來提升 CUP 使用率的
-
多個線程訪問同一資源可能會引發安全問題
-
synchronized 配合 wait、notify 可以解決線程安全問題
-
Lock 可以解決 synchronized 下 wait、notify 的侷限性
本想一文理清所有關於線程安全的問題,但到這發現篇幅已經很長啦,爲了不影響閱讀體驗先到此爲止吧~~
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/0X1aHH06UEOvK9-g9--v7A