億級流量下的分佈式鎖架構解密!

作者個人研發的在高併發場景下,提供的簡單、穩定、可擴展的延遲消息隊列框架,具有精準的定時任務和延遲隊列處理功能。自開源半年多以來,已成功爲十幾家中小型企業提供了精準定時調度方案,經受住了生產環境的考驗。爲使更多童鞋受益,現給出開源框架地址:

https://github.com/sunshinelyz/mykit-delay

PS: 歡迎各位 Star 源碼,也可以 pr 你牛逼哄哄的代碼。      

寫在前面

最近,很多小夥伴留言說,在學習高併發編程時,不太明白分佈式鎖是用來解決什麼問題的,還有不少小夥伴甚至連分佈式鎖是什麼都不太明白。明明在生產環境上使用了自己開發的分佈式鎖,爲什麼還會出現問題呢?同樣的程序,加上分佈式鎖後,性能差了幾個數量級!這又是爲什麼呢?今天,我們就來說說如何在高併發環境下實現分佈式鎖,不是所有的鎖都是高併發的。

萬字長文,帶你深入解密高併發環境下的分佈式鎖架構,不是所有的鎖都是分佈式鎖!!!

究竟什麼樣的鎖才能更好的支持高併發場景呢?今天,我們就一起解密高併發環境下典型的分佈式鎖架構,結合【高併發】專題下的其他文章,學以致用。

鎖用來解決什麼問題呢?

在我們編寫的應用程序或者高併發程序中,不知道大家有沒有想過一個問題,就是我們爲什麼需要引入鎖?鎖爲我們解決了什麼問題呢?

在很多業務場景下,我們編寫的應用程序中會存在很多的 資源競爭 的問題。而我們在高併發程序中,引入鎖,就是爲了解決這些資源競爭的問題。

電商超賣問題

這裏,我們可以列舉一個簡單的業務場景。比如,在電子商務(商城)的業務場景中,提交訂單購買商品時,首先需要查詢相應商品的庫存是否足夠,只有在商品庫存數量足夠的前提下,才能讓用戶成功的下單。下單時,我們需要在庫存數量中減去用戶下單的商品數量,並將庫存操作的結果數據更新到數據庫中。整個流程我們可以簡化成下圖所示。

很多小夥伴也留言說,讓我給出代碼,這樣能夠更好的學習和掌握相關的知識。好吧,這裏,我也給出相應的代碼片段吧。我們可以使用下面的代碼片段來表示用戶的下單操作,我這裏將商品的庫存信息保存在了 Redis 中。

@RequestMapping("/submitOrder")
public String submitOrder(){
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
    }else{
        logger.debug("庫存不足,扣減庫存失敗");
        throw new OrderException("庫存不足,扣減庫存失敗");
    }
    return "success";
}

注意:上述代碼片段比較簡單,只是爲了方便大家理解,真正項目中的代碼就不能這麼寫了。

上述的代碼看似是沒啥問題的,但是我們不能只從代碼表面上來觀察代碼的執行順序。這是因爲在 JVM 中代碼的執行順序未必是按照我們書寫代碼的順序執行的。即使在 JVM 中代碼是按照我們書寫的順序執行,那我們對外提供的接口一旦暴露出去,就會有成千上萬的客戶端來訪問我們的接口。所以說,我們暴露出去的接口是會被併發訪問的。

試問,上面的代碼在高併發環境下是線程安全的嗎?答案肯定不是線程安全的,因爲上述扣減庫存的操作會出現並行執行的情況。

我們可以使用 Apache JMeter 來對上述接口進行測試,這裏,我使用 Apache JMeter 對上述接口進行測試。

在 Jmeter 中,我將線程的併發度設置爲 3,接下來的配置如下所示。

以 HTTP GET 請求的方式來併發訪問提交訂單的接口。此時,運行 JMeter 來訪問接口,命令行會打印出下面的日誌信息。

庫存扣減成功,當前庫存爲:49
庫存扣減成功,當前庫存爲:49
庫存扣減成功,當前庫存爲:49

這裏,我們明明請求了 3 次,也就是說,提交了 3 筆訂單,爲什麼扣減後的庫存都是一樣的呢?這種現象在電商領域有一個專業的名詞叫做  “超賣”

如果一個大型的高併發電商系統,比如淘寶、天貓、京東等,出現了超賣現象,那損失就無法估量了!架構設計和開發電商系統的人員估計就要通通下崗了。所以,作爲技術人員,我們一定要嚴謹的對待技術,嚴格做好系統的每一個技術環節。

JVM 中提供的鎖

JVM 中提供的 synchronized 和 Lock 鎖,相信大家並不陌生了,很多小夥伴都會使用這些鎖,也能使用這些鎖來實現一些簡單的線程互斥功能。那麼,作爲立志要成爲架構師的你,是否瞭解過 JVM 鎖的底層原理呢?

JVM 鎖原理

說到 JVM 鎖的原理,我們就不得不限說說 Java 中的對象頭了。

Java 中的對象頭

每個 Java 對象都有對象頭。如果是⾮數組類型,則⽤ 2 個字寬來存儲對象頭,如果是數組,則會⽤ 3 個字寬來存儲對象頭。在 32 位處理器中,⼀個字寬是 32 位;在 64 位虛擬機中,⼀個字寬是 64 位。

對象頭的內容如下表 。

N0qVrR

Mark Work 的格式如下所示。

ju9BeQ

可以看到,當對象狀態爲偏向鎖時, Mark Word 存儲的是偏向的線程 ID;當狀態爲輕量級鎖時, Mark Word 存儲的是指向線程棧中 Lock Record 的指針;當狀態爲重量級鎖時, Mark Word 爲指向堆中的 monitor 對象的指針 。

有關 Java 對象頭的知識,參考《深入淺出 Java 多線程》。

JVM 鎖原理

簡單點來說,JVM 中鎖的原理如下。

在 Java 對象的對象頭上,有一個鎖的標記,比如,第一個線程執行程序時,檢查 Java 對象頭中的鎖標記,發現 Java 對象頭中的鎖標記爲未加鎖狀態,於是爲 Java 對象進行了加鎖操作,將對象頭中的鎖標記設置爲鎖定狀態。第二個線程執行同樣的程序時,也會檢查 Java 對象頭中的鎖標記,此時會發現 Java 對象頭中的鎖標記的狀態爲鎖定狀態。於是,第二個線程會進入相應的阻塞隊列中進行等待。

這裏有一個關鍵點就是 Java 對象頭中的鎖標記如何實現。

JVM 鎖的短板

JVM 中提供的 synchronized 和 Lock 鎖都是 JVM 級別的,大家都知道,當運行一個 Java 程序時,會啓動一個 JVM 進程來運行我們的應用程序。synchronized 和 Lock 在 JVM 級別有效,也就是說,synchronized 和 Lock 在同一 Java 進程內有效。如果我們開發的應用程序是分佈式的,那麼只是使用 synchronized 和 Lock 來解決分佈式場景下的高併發問題,就會顯得有點力不從心了。

synchronized 和 Lock 支持 JVM 同一進程內部的線程互斥

synchronized 和 Lock 在 JVM 級別能夠保證高併發程序的互斥,我們可以使用下圖來表示。

但是,當我們將應用程序部署成分佈式架構,或者將應用程序在不同的 JVM 進程中運行時,synchronized 和 Lock 就不能保證分佈式架構和多 JVM 進程下應用程序的互斥性了。

synchronized 和 Lock 不能實現多 JVM 進程之間的線程互斥

分佈式架構和多 JVM 進程的本質都是將應用程序部署在不同的 JVM 實例中,也就是說,其本質還是多 JVM 進程。

分佈式鎖

我們在實現分佈式鎖時,可以參照 JVM 鎖實現的思想,JVM 鎖在爲對象加鎖時,通過改變 Java 對象的對象頭中的鎖的標誌位來實現,也就是說,所有的線程都會訪問這個 Java 對象的對象頭中的鎖標誌位。

我們同樣以這種思想來實現分佈式鎖,當我們將應用程序進行拆分並部署成分佈式架構時,所有應用程序中的線程訪問共享變量時,都到同一個地方去檢查當前程序的臨界區是否進行了加鎖操作,而是否進行了加鎖操作,我們在統一的地方使用相應的狀態來進行標記。

可以看到,在分佈式鎖的實現思想上,與 JVM 鎖相差不大。而在實現分佈式鎖中,保存加鎖狀態的服務可以使用 MySQL、Redis 和 Zookeeper 實現。

但是,在互聯網高併發環境中, 使用 Redis 實現分佈式鎖的方案是使用的最多的。 接下來,我們就使用 Redis 來深入解密分佈式鎖的架構設計。

Redis 如何實現分佈式鎖

Redis 命令

在 Redis 中,有一個不常使用的命令如下所示。

SETNX key value

這條命令的含義就是 “SET if Not Exists”,即不存在的時候纔會設置值。

只有在 key 不存在的情況下,將鍵 key 的值設置爲 value。如果 key 已經存在,則 SETNX 命令不做任何操作。

這個命令的返回值如下。

所以,我們在分佈式高併發環境下,可以使用 Redis 的 SETNX 命令來實現分佈式鎖。假設此時有線程 A 和線程 B 同時訪問臨界區代碼,假設線程 A 首先執行了 SETNX 命令,並返回結果 1,繼續向下執行。而此時線程 B 再次執行 SETNX 命令時,返回的結果爲 0,則線程 B 不能繼續向下執行。只有當線程 A 執行 DELETE 命令將設置的鎖狀態刪除時,線程 B 纔會成功執行 SETNX 命令設置加鎖狀態後繼續向下執行。

引入分佈式鎖

瞭解瞭如何使用 Redis 中的命令實現分佈式鎖後,我們就可以對下單接口進行改造了,加入分佈式鎖,如下所示。

/**
* 爲了演示方便,我這裏就簡單定義了一個常量作爲商品的id
* 實際工作中,這個商品id是前端進行下單操作傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通過stringRedisTemplate來調用Redis的SETNX命令,key爲商品的id,value爲字符串“binghe”
    //實際上,value可以爲任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
    }else{
        logger.debug("庫存不足,扣減庫存失敗");
        throw new OrderException("庫存不足,扣減庫存失敗");
    }
    //業務執行完成,刪除PRODUCT_ID key
    stringRedisTemplate.delete(PRODUCT_ID);
    return "success";
}

那麼,在上述代碼中,我們加入了分佈式鎖的操作,那上述代碼是否能夠在高併發場景下保證業務的原子性呢?答案是可以保證業務的原子性。但是,在實際場景中,上面實現分佈式鎖的代碼是不可用的!!

假設當線程 A 首先執行 stringRedisTemplate.opsForValue() 的 setIfAbsent() 方法返回 true,繼續向下執行,正在執行業務代碼時,拋出了異常,線程 A 直接退出了 JVM。此時,stringRedisTemplate.delete(PRODUCT_ID); 代碼還沒來得及執行,之後所有的線程進入提交訂單的方法時,調用 stringRedisTemplate.opsForValue() 的 setIfAbsent() 方法都會返回 false。導致後續的所有下單操作都會失敗。這就是分佈式場景下的死鎖問題。

所以,上述代碼中實現分佈式鎖的方式在實際場景下是不可取的!!

引入 try-finally 代碼塊

說到這,相信小夥伴們都能夠想到,使用 try-finall 代碼塊啊,接下來,我們爲下單接口的方法加上 try-finally 代碼塊。

/**
* 爲了演示方便,我這裏就簡單定義了一個常量作爲商品的id
* 實際工作中,這個商品id是前端進行下單操作傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通過stringRedisTemplate來調用Redis的SETNX命令,key爲商品的id,value爲字符串“binghe”
    //實際上,value可以爲任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
     stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

那麼,上述代碼是否真正解決了死鎖的問題呢?我們在寫代碼時,不能只盯着代碼本身,覺得上述代碼沒啥問題了。實際上,生產環境是非常複雜的。如果線程在成功加鎖之後,執行業務代碼時,還沒來得及執行刪除鎖標誌的代碼,此時,服務器宕機了,程序並沒有優雅的退出 JVM。也會使得後續的線程進入提交訂單的方法時,因無法成功的設置鎖標誌位而下單失敗。所以說,上述的代碼仍然存在問題。

引入 Redis 超時機制

在 Redis 中可以設置緩存的自動過期時間,我們可以將其引入到分佈式鎖的實現中,如下代碼所示。

/**
* 爲了演示方便,我這裏就簡單定義了一個常量作爲商品的id
* 實際工作中,這個商品id是前端進行下單操作傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通過stringRedisTemplate來調用Redis的SETNX命令,key爲商品的id,value爲字符串“binghe”
    //實際上,value可以爲任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
     stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

在上述代碼中,我們加入瞭如下一行代碼來爲 Redis 中的鎖標誌設置過期時間。

stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);

此時,我們設置的過期時間爲 30 秒。

那麼問題來了,這樣是否就真正的解決了問題呢?上述程序就真的沒有坑了嗎?答案是還是有坑的!!

“坑位” 分析

我們在下單操作的方法中爲分佈式鎖引入了超時機制,此時的代碼還是無法真正避免死鎖的問題,那 “坑位” 到底在哪裏呢?試想,當程序執行完 stringRedisTemplate.opsForValue().setIfAbsent()方法後,正要執行 stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS)代碼時,服務器宕機了,你還別說,生產壞境的情況非常複雜,就是這麼巧,服務器就宕機了。此時,後續請求進入提交訂單的方法時,都會因爲無法成功設置鎖標誌而導致後續下單流程無法正常執行。

既然我們找到了上述代碼的 “坑位”,那我們如何將這個” 坑“填上?如何解決這個問題呢?別急,Redis 已經提供了這樣的功能。我們可以在向 Redis 中保存數據的時候,可以同時指定數據的超時時間。所以,我們可以將代碼改造成如下所示。

/**
* 爲了演示方便,我這裏就簡單定義了一個常量作爲商品的id
* 實際工作中,這個商品id是前端進行下單操作傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通過stringRedisTemplate來調用Redis的SETNX命令,key爲商品的id,value爲字符串“binghe”
    //實際上,value可以爲任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe", 30, TimeUnit.SECONDS);
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
     stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

在上述代碼中,我們在向 Redis 中設置鎖標誌位的時候就設置了超時時間。此時,只要向 Redis 中成功設置了數據,則即使我們的業務系統宕機,Redis 中的數據過期後,也會自動刪除。後續的線程進入提交訂單的方法後,就會成功的設置鎖標誌位,並向下執行正常的下單流程。

到此,上述的代碼基本上在功能角度解決了程序的死鎖問題,那麼,上述程序真的就完美了嗎?哈哈,很多小夥伴肯定會說不完美!確實,上面的代碼還不是完美的,那大家知道哪裏不完美嗎?接下來,我們繼續分析。

在開發集成角度分析代碼

在我們開發公共的系統組件時,比如我們這裏說的分佈式鎖,我們肯定會抽取一些公共的類來完成相應的功能來供系統使用。

這裏,假設我們定義了一個 RedisLock 接口,如下所示。

public interface RedisLock{
    //加鎖操作
    boolean tryLock(String key, long timeout, TimeUnit unit);
    //解鎖操作
    void releaseLock(String key);
}

接下來,使用 RedisLockImpl 類實現 RedisLock 接口,提供具體的加鎖和解鎖實現,如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        return stringRedisTemplate.opsForValue().setIfAbsent(key, "binghe", timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        stringRedisTemplate.delete(key);
    }
}

在開發集成的角度來說,當一個線程從上到下執行時,首先對程序進行加鎖操作,然後執行業務代碼,執行完成後,再進行釋放鎖的操作。理論上,加鎖和釋放鎖時,操作的 Redis Key 都是一樣的。但是,如果其他開發人員在編寫代碼時,並沒有調用 tryLock() 方法,而是直接調用了 releaseLock() 方法,並且他調用 releaseLock() 方法傳遞的 key 與你調用 tryLock() 方法傳遞的 key 是一樣的。那此時就會出現問題了,他在編寫代碼時,硬生生的將你加的鎖釋放了!!!

所以,上述代碼是不安全的,別人能夠隨隨便便的將你加的鎖刪除,這就是鎖的誤刪操作,這是非常危險的,所以,上述的程序存在很嚴重的問題!!

那如何實現只有加鎖的線程才能進行相應的解鎖操作呢? 繼續向下看。

如何實現加鎖和解鎖的歸一化?

什麼是加鎖和解鎖的歸一化呢?簡單點來說,就是一個線程執行了加鎖操作後,後續必須由這個線程執行解鎖操作,加鎖和解鎖操作由同一個線程來完成。

爲了解決只有加鎖的線程才能進行相應的解鎖操作的問題,那麼,我們就需要將加鎖和解鎖操作綁定到同一個線程中,那麼,如何將加鎖操作和解鎖操作綁定到同一個線程呢?其實很簡單,相信很多小夥伴都想到了—— 使用 ThreadLocal 實現 。沒錯,使用 ThreadLocal 類確實能夠解決這個問題。

此時,我們將 RedisLockImpl 類的代碼修改成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
          stringRedisTemplate.delete(key);   
        }
    }
}

上述代碼的主要邏輯爲:在對程序執行嘗試加鎖操作時,首先生成一個 uuid,將生成的 uuid 綁定到當前線程,並將傳遞的 key 參數操作 Redis 中的 key,生成的 uuid 作爲 Redis 中的 Value,保存到 Redis 中,同時設置超時時間。當執行解鎖操作時,首先,判斷當前線程中綁定的 uuid 是否和 Redis 中存儲的 uuid 相等,只有二者相等時,纔會執行刪除鎖標誌位的操作。這就避免了一個線程對程序進行了加鎖操作後,其他線程對這個鎖進行了解鎖操作的問題。

繼續分析

我們將加鎖和解鎖的方法改成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    private String lockUUID;
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        lockUUID = uuid;
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(lockUUID.equals(stringRedisTemplate.opsForValue().get(key))){
          stringRedisTemplate.delete(key);   
        }
    }
}

相信很多小夥伴都會看出上述代碼存在什麼問題了!!沒錯,那就是 線程安全的問題。

所以,這裏,我們需要使用 ThreadLocal 來解決線程安全問題。

可重入性分析

在上面的代碼中,當一個線程成功設置了鎖標誌位後,其他的線程再設置鎖標誌位時,就會返回失敗。還有一種場景就是在提交訂單的接口方法中,調用了服務 A,服務 A 調用了服務 B,而服務 B 的方法中存在對同一個商品的加鎖和解鎖操作。

所以,服務 B 成功設置鎖標誌位後,提交訂單的接口方法繼續執行時,也不能成功設置鎖標誌位了。也就是說,目前實現的分佈式鎖沒有可重入性。

這裏,就存在可重入性的問題了。我們希望設計的分佈式鎖 具有可重入性 ,那什麼是可重入性呢?簡單點來說,就是同一個線程,能夠多次獲取同一把鎖,並且能夠按照順序進行解決操作。

其實,在 JDK 1.5 之後提供的鎖很多都支持可重入性,比如 synchronized 和 Lock。

如何實現可重入性呢?

映射到我們加鎖和解鎖方法時,我們如何支持同一個線程能夠多次獲取到鎖(設置鎖標誌位)呢?可以這樣簡單的設計:如果當前線程沒有綁定 uuid,則生成 uuid 綁定到當前線程,並且在 Redis 中設置鎖標誌位。如果當前線程已經綁定了 uuid,則直接返回 true,證明當前線程之前已經設置了鎖標誌位,也就是說已經獲取到了鎖,直接返回 true。

結合以上分析,我們將提交訂單的接口方法代碼改造成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
         threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
          stringRedisTemplate.delete(key);   
        }
    }
}

這樣寫看似沒有啥問題,但是大家細想一下,這樣寫就真的 OK 了嗎?

可重入性的問題分析

既然上面分佈式鎖的可重入性是存在問題的,那我們就來分析下問題的根源在哪裏!

假設我們提交訂單的方法中,首先使用 RedisLock 接口對代碼塊添加了分佈式鎖,在加鎖後的代碼中調用了服務 A,而服務 A 中也存在調用 RedisLock 接口的加鎖和解鎖操作。而多次調用 RedisLock 接口的加鎖操作時,只要之前的鎖沒有失效,則會直接返回 true,表示成功獲取鎖。也就是說,無論調用加鎖操作多少次,最終只會成功加鎖一次。而執行完服務 A 中的邏輯後,在服務 A 中調用 RedisLock 接口的解鎖方法,此時,會將當前線程所有的加鎖操作獲得的鎖全部釋放掉。

我們可以使用下圖來簡單的表示這個過程。

那麼問題來了,如何解決可重入性的問題呢?

解決可重入性問題

相信很多小夥伴都能夠想出使用計數器的方式來解決上面可重入性的問題,沒錯,就是使用計數器來解決。 整體流程如下所示。

那麼,體現在程序代碼上是什麼樣子呢?我們來修改 RedisLockImpl 類的代碼,如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
         threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        //加鎖成功後將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減爲0時釋放鎖
            if(count == null || --count <= 0){
              stringRedisTemplate.delete(key);      
            }
        }
    }
}

至此,我們基本上解決了分佈式鎖的可重入性問題。

說到這裏,我還要問大家一句,上面的解決問題的方案真的沒問題了嗎?

阻塞與非阻塞鎖

在提交訂單的方法中,當獲取 Redis 分佈式鎖失敗時,我們直接返回了 failure 來表示當前請求下單的操作失敗了。試想,在高併發環境下,一旦某個請求獲得了分佈式鎖,那麼,在這個請求釋放鎖之前,其他的請求調用下單方法時,都會返回下單失敗的信息。在真實場景中,這是非常不友好的。我們可以將後續的請求進行阻塞,直到當前請求釋放鎖後,再喚醒阻塞的請求獲得分佈式鎖來執行方法。

所以,我們設計的分佈式鎖需要支持 阻塞和非阻塞 的特性。

那麼,如何實現阻塞呢?我們可以使用自旋來實現,繼續修改 RedisLockImpl 的代碼如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
         threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
        }else{
            isLocked = true;   
        }
        //加鎖成功後將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減爲0時釋放鎖
            if(count == null || --count <= 0){
              stringRedisTemplate.delete(key);      
            }
        }
    }
}

在分佈式鎖的設計中,阻塞鎖和非阻塞鎖 是非常重要的概念,大家一定要記住這個知識點。

鎖失效問題

儘管我們實現了分佈式鎖的阻塞特性,但是還有一個問題是我們不得不考慮的。那就是 鎖失效 的問題。

當程序執行業務的時間超過了鎖的過期時間會發生什麼呢? 想必很多小夥伴都能夠想到,那就是前面的請求沒執行完,鎖過期失效了,後面的請求獲取到分佈式鎖,繼續向下執行了,程序無法做到真正的互斥,無法保證業務的原子性了。

那如何解決這個問題呢?答案就是:我們必須保證在業務代碼執行完畢後,才能釋放分佈式鎖。 方案是有了,那如何實現呢?

說白了,我們需要在業務代碼中,時不時的執行下面的代碼來保證在業務代碼沒執行完時,分佈式鎖不會因超時而被釋放。

springRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);

這裏,我們需要定義一個定時策略來執行上面的代碼,需要注意的是:我們不能等到 30 秒後再執行上述代碼,因爲 30 秒時,鎖已經失效了。例如,我們可以每 10 秒執行一次上面的代碼。

有些小夥伴說,直接在 RedisLockImpl 類中添加一個 while(true) 循環來解決這個問題,那我們就這樣修改下 RedisLockImpl 類的代碼,看看有沒有啥問題。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
         threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //定義更新鎖的過期時間
            while(true){
                Integer count = threadLocalInteger.get();
                //當前鎖已經被釋放,則退出循環
                if(count == 0 || count <= 0){
                    break;
                }
                springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
                try{
                    //每隔10秒執行一次
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }else{
            isLocked = true;   
        }
        //加鎖成功後將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減爲0時釋放鎖
            if(count == null || --count <= 0){
              stringRedisTemplate.delete(key);      
            }
        }
    }
}

相信小夥伴們看了代碼就會發現哪裏有問題了:更新鎖過期時間的代碼肯定不能這麼去寫。因爲這麼寫會 導致當前線程在更新鎖超時時間的 while(true) 循環中一直阻塞而無法返回結果。 所以,我們不能將當前線程阻塞,需要異步執行定時任務來更新鎖的過期時間。

此時,我們繼續修改 RedisLockImpl 類的代碼,將定時更新鎖超時的代碼放到一個單獨的線程中執行,如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
         threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //啓動新線程來執行定時任務,更新鎖過期時間
           new Thread(new UpdateLockTimeoutTask(uuid, stringRedisTemplate, key)).start();
        }else{
            isLocked = true;   
        }
        //加鎖成功後將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        String uuid = stringRedisTemplate.opsForValue().get(key);
        if(threadLocal.get().equals(uuid)){
            Integer count = threadLocalInteger.get();
            //計數器減爲0時釋放鎖
            if(count == null || --count <= 0){
              stringRedisTemplate.delete(key); 
                //獲取更新鎖超時時間的線程並中斷
                long threadId = stringRedisTemplate.opsForValue().get(uuid);
                Thread updateLockTimeoutThread = ThreadUtils.getThreadByThreadId(threadId);
                if(updateLockTimeoutThread != null){
                     //中斷更新鎖超時時間的線程
                    updateLockTimeoutThread.interrupt();   
                    stringRedisTemplate.delete(uuid);
                }
            }
        }
    }
}

創建 UpdateLockTimeoutTask 類來執行更新鎖超時的時間。

public class UpdateLockTimeoutTask implements Runnable{
    //uuid
    private long uuid;
    private StringRedisTemplate stringRedisTemplate;
    private String key;
    public UpdateLockTimeoutTask(long uuid, StringRedisTemplate stringRedisTemplate, String key){
        this.uuid = uuid;
        this.stringRedisTemplate = stringRedisTemplate;
        this.key = key;
    }
    @Override
    public void run(){
        //以uuid爲key,當前線程id爲value保存到Redis中
        stringRedisTemplate.opsForValue().set(uuid, Thread.currentThread().getId());
         //定義更新鎖的過期時間
        while(true){
            springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
            try{
                //每隔10秒執行一次
                Thread.sleep(10000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

接下來,我們定義一個 ThreadUtils 工具類,這個工具類中有一個根據線程 id 獲取線程的方法 getThreadByThreadId(long threadId)。

public class ThreadUtils{
    //根據線程id獲取線程句柄
    public static Thread getThreadByThreadId(long threadId){
        ThreadGroup group = Thread.currentThread().getThreadGroup();
        while(group != null){
            Thread[] threads = new Thread[(int)(group.activeCount() * 1.2)];
            int count = group.enumerate(threads, true);
            for(int i = 0; i < count; i++){
                if(threadId == threads[i].getId()){
                    return threads[i];
                }
            }
        }
    }
}

上述解決分佈式鎖失效的問題在分佈式鎖領域有一個專業的術語叫做 “異步續命” 。需要注意的是:當業務代碼執行完畢後,我們需要停止更新鎖超時時間的線程。所以,這裏,我對程序的改動是比較大的,首先,將更新鎖超時的時間任務重新定義爲一個 UpdateLockTimeoutTask 類,並將 uuid 和 StringRedisTemplate 注入到任務類中,在執行定時更新鎖超時時間時,首先將當前線程保存到 Redis 中,其中 Key 爲傳遞進來的 uuid。

在首先獲取分佈式鎖後,重新啓動線程,並將 uuid 和 StringRedisTemplate 傳遞到任務類中執行任務。當業務代碼執行完畢後,調用 releaseLock() 方法釋放鎖時,我們會通過 uuid 從 Redis 中獲取更新鎖超時時間的線程 id,並通過線程 id 獲取到更新鎖超時時間的線程,調用線程的 interrupt() 方法來中斷線程。

此時,當分佈式鎖釋放後,更新鎖超時的線程就會由於線程中斷而退出了。

實現分佈式鎖的基本要求

結合上述的案例,我們可以得出實現分佈式鎖的基本要求:

通用分佈式解決方案

在互聯網行業,分佈式鎖是一個繞不開的話題,同時,也有很多通用的分佈式鎖解決方案,其中,用的比較多的一種方案就是使用開源的 Redisson 框架來解決分佈式鎖問題。

有關 Redisson 分佈式鎖的使用方案大家可以參考《【高併發】你知道嗎?大家都在使用 Redisson 實現分佈式鎖了!!

既然 Redisson 框架已經很牛逼了,我們直接使用 Redisson 框架是否能夠 100% 的保證分佈式鎖不出問題呢?答案是無法 100% 的保證。因爲在分佈式領域沒有哪一家公司或者架構師能夠保證 100% 的不出問題,就連阿里這樣的大公司、阿里的首席架構師這樣的技術大牛也不敢保證 100% 的不出問題。

在分佈式領域,無法做到 100% 無故障,我們追求的是幾個 9 的目標,例如 99.999% 無故障。

CAP 理論

在分佈式領域,有一個非常重要的理論叫做 CAP 理論。

在分佈式領域中,是必須要保證分區容錯性的,也就是必須要保證 “P”,所以,我們只能保證 CP 或者 AP。

這裏,我們可以使用 Redis 和 Zookeeper 來進行簡單的對比,我們可以使用 Redis 實現 AP 架構的分佈式鎖,使用 Zookeeper 實現 CP 架構的分佈式鎖。

在基於 Redis 實現的 AP 架構的分佈式鎖模型中,向 Redis 節點 1 寫入數據後,會立即返回結果,之後在 Redis 中會以異步的方式來同步數據。

在基於 Zookeeper 實現的 CP 架構的分佈式模型中,向節點 1 寫入數據後,會等待數據的同步結果,當數據在大多數 Zookeeper 節點間同步成功後,纔會返回結果數據。

當我們使用基於 Redis 的 AP 架構實現分佈式鎖時,需要注意一個問題,這個問題可以使用下圖來表示。

也就是 Redis 主從節點之間的數據同步失敗,假設線程向 Master 節點寫入了數據,而 Redis 中 Master 節點向 Slave 節點同步數據失敗了。此時,另一個線程讀取的 Slave 節點中的數據,發現沒有添加分佈式鎖,此時就會出現問題了!!!

所以,在設計分佈式鎖方案時,也需要注意 Redis 節點之間的數據同步問題。

紅鎖的實現

在 Redisson 框架中,實現了紅鎖的機制,Redisson 的 RedissonRedLock 對象實現了 Redlock 介紹的加鎖算法。該對象也可以用來將多個 RLock 對象關聯爲一個紅鎖,每個 RLock 對象實例可以來自於不同的 Redisson 實例。當紅鎖中超過半數的 RLock 加鎖成功後,纔會認爲加鎖是成功的,這就提高了分佈式鎖的高可用。

我們可以使用 Redisson 框架來實現紅鎖。

public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
 RLock lock1 = redisson1.getLock("lock1");
 RLock lock2 = redisson2.getLock("lock2");
 RLock lock3 = redisson3.getLock("lock3");
 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
 try {
  // 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功。
  lock.lock();
  // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
  boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
 } catch (InterruptedException e) {
  e.printStackTrace();
 } finally {
  lock.unlock();
 }
}

其實,在實際場景中,紅鎖是很少使用的。這是因爲使用了紅鎖後會影響高併發環境下的性能,使得程序的體驗更差。所以,在實際場景中,我們一般都是要保證 Redis 集羣的可靠性。同時,使用紅鎖後,當加鎖成功的 RLock 個數不超過總數的一半時,會返回加鎖失敗,即使在業務層面任務加鎖成功了,但是紅鎖也會返回加鎖失敗的結果。另外,使用紅鎖時,需要提供多套 Redis 的主從部署架構,同時,這多套 Redis 主從架構中的 Master 節點必須都是獨立的,相互之間沒有任何數據交互。

高併發 “黑科技” 與致勝奇招

假設,我們就是使用 Redis 來實現分佈式鎖,假設 Redis 的讀寫併發量在 5 萬左右。我們的商城業務需要支持的併發量在 100 萬左右。如果這 100 萬的併發全部打入 Redis 中,Redis 很可能就會掛掉,那麼,我們如何解決這個問題呢?接下來,我們就一起來探討這個問題。

在高併發的商城系統中,如果採用 Redis 緩存數據,則 Redis 緩存的併發處理能力是關鍵,因爲很多的前綴操作都需要訪問 Redis。而異步削峯只是基本的操作,關鍵還是要保證 Redis 的併發處理能力。

解決這個問題的關鍵思想就是:分而治之,將商品庫存分開放。

暗度陳倉

我們在 Redis 中存儲商品的庫存數量時,可以將商品的庫存進行 “分割” 存儲來提升 Redis 的讀寫併發量。

例如,原來的商品的 id 爲 10001,庫存爲 1000 件,在 Redis 中的存儲爲 (10001, 1000),我們將原有的庫存分割爲 5 份,則每份的庫存爲 200 件,此時,我們在 Redia 中存儲的信息爲 (10001_0, 200),(10001_1, 200),(10001_2, 200),(10001_3, 200),(10001_4, 200)。

此時,我們將庫存進行分割後,每個分割後的庫存使用商品 id 加上一個數字標識來存儲,這樣,在對存儲商品庫存的每個 Key 進行 Hash 運算時,得出的 Hash 結果是不同的,這就說明,存儲商品庫存的 Key 有很大概率不在 Redis 的同一個槽位中,這就能夠提升 Redis 處理請求的性能和併發量。

分割庫存後,我們還需要在 Redis 中存儲一份商品 id 和分割庫存後的 Key 的映射關係,此時映射關係的 Key 爲商品的 id,也就是 10001,Value 爲分割庫存後存儲庫存信息的 Key,也就是 10001_0,10001_1,10001_2,10001_3,10001_4。在 Redis 中我們可以使用 List 來存儲這些值。

在真正處理庫存信息時,我們可以先從 Redis 中查詢出商品對應的分割庫存後的所有 Key,同時使用 AtomicLong 來記錄當前的請求數量,使用請求數量對從 Redia 中查詢出的商品對應的分割庫存後的所有 Key 的長度進行求模運算,得出的結果爲 0,1,2,3,4。再在前面拼接上商品 id 就可以得出真正的庫存緩存的 Key。此時,就可以根據這個 Key 直接到 Redis 中獲取相應的庫存信息。

同時,我們可以將分隔的不同的庫存數據分別存儲到不同的 Redis 服務器中,進一步提升 Redis 的併發量。

移花接木

在高併發業務場景中,我們可以直接使用 Lua 腳本庫(OpenResty)從負載均衡層直接訪問緩存。

這裏,我們思考一個場景:如果在高併發業務場景中,商品被瞬間搶購一空。此時,用戶再發起請求時,如果系統由負載均衡層請求應用層的各個服務,再由應用層的各個服務訪問緩存和數據庫,其實,本質上已經沒有任何意義了,因爲商品已經賣完了,再通過系統的應用層進行層層校驗已經沒有太多意義了!!而應用層的併發訪問量是以百爲單位的,這又在一定程度上會降低系統的併發度。

爲了解決這個問題,此時,我們可以在系統的負載均衡層取出用戶發送請求時攜帶的用戶 id,商品 id 和活動 id 等信息,直接通過 Lua 腳本等技術來訪問緩存中的庫存信息。如果商品的庫存小於或者等於 0,則直接返回用戶商品已售完的提示信息,而不用再經過應用層的層層校驗了。

重磅福利

微信搜一搜【冰河技術】微信公衆號,關注這個有深度的程序員,每天閱讀超硬核技術乾貨,公衆號內回覆【PDF】有我準備的一線大廠面試資料和我原創的超硬核 PDF 技術文檔,以及我爲大家精心準備的多套簡歷模板(不斷更新中),希望大家都能找到心儀的工作,學習是一條時而鬱鬱寡歡,時而開懷大笑的路,加油。如果你通過努力成功進入到了心儀的公司,一定不要懈怠放鬆,職場成長和新技術學習一樣,不進則退。如果有幸我們江湖再見!

另外,我開源的各個 PDF,後續我都會持續更新和維護,感謝大家長期以來對冰河的支持!!

寫在最後

如果你覺得冰河寫的還不錯,請微信搜索並關注「 冰河技術 」微信公衆號,跟冰河學習高併發、分佈式、微服務、大數據、互聯網和雲原生技術,「 冰河技術 」微信公衆號更新了大量技術專題,每一篇技術文章乾貨滿滿!不少讀者已經通過閱讀「 冰河技術 」微信公衆號文章,吊打面試官,成功跳槽到大廠;也有不少讀者實現了技術上的飛躍,成爲公司的技術骨幹!如果你也想像他們一樣提升自己的能力,實現技術能力的飛躍,進大廠,升職加薪,那就關注「 冰河技術 」微信公衆號吧,每天更新超硬核技術乾貨,讓你對如何提升技術能力不再迷茫!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/0j8VOI2A6Js4eZNc628n6Q