基於 Redis 實現分佈式鎖思考

分佈式鎖

基於 redis 實現分佈式鎖思考幾個問題???

synchronized 鎖爲什麼不能應用於分佈式鎖?

synchronized 雖然能夠解決同步問題,但是每次只有一個線程訪問,並且 synchronized 鎖屬於 JVM 鎖,僅適用於單點部署;然而分佈式需要部署多臺實例,屬於不同的 JVM 線程對象

使用 redis 中 setnx 實現分佈式鎖。

//設置分佈式鎖
String lockKey = "product_001_key";
//語義:如何不存在則存入緩存中,且返回true;
//否則已存在,則返回false即加鎖失敗
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "product_001_lock");
if (!result) {
  //沒有加鎖成功,則返回提示等
}
try{
  
}catch() {
  
}finally{
  //釋放鎖
  stringRedisTemplate.delete(lockKey);
}

針對以上設置分佈式鎖思考一下問題?

1. 如果突然服務器宕機,那麼必然造成鎖無法釋放,即造成死鎖?

解決方案:設置超時時間。

//設置分佈式鎖
String lockKey = "product_001_key";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "product_001_lock");
//設置鎖超時時間30s
stringRedisTemplate.expire(lockKey,30, TimeUnit.SECONDS);
if (!result) {
  //沒有加鎖成功,則返回提示等
}
try{
  
}catch() {
  
}finally{
  //釋放鎖
  stringRedisTemplate.delete(lockKey);
}

2. 加鎖和設置超時時間中間引起服務器宕機,則一樣會導致死鎖。

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "product_001_lock");
//------服務器宕機,則超時時間未設置成功-------
//設置鎖超時時間30s
stringRedisTemplate.expire(lockKey,30, TimeUnit.SECONDS);

解決方案:原子性操作,即同時加鎖和設置超時時間;

即上面的代碼合併成一句操作:

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"product_001_lock", 30, TimeUnit.SECONDS)

3. 思考超時時間設置是否合理呢?即線程執行時間和鎖超時時間並非一致。

場景:假設設置加鎖超時時間 10s;

高併發場景下,線程 A 執行時間爲 15s,redis 依據超時時間,將其線程 A 加的鎖釋放掉;然後線程 B 獲取鎖,並加鎖成功,此時線程 A 執行結束,執行 finally 代碼塊就會將線程 B 加的鎖釋放。

解決方案:設置線程隨機 ID,釋放鎖時判斷是否爲當前線程加的鎖,即使存在線程 A 因線程執行時間超時被動釋放其鎖,但至少保證當前超時線程不會釋放其他線程加的鎖。但是面對線程執行時間大於設置的超時時間,也是會存在併發問題。

String lockKey = "product_001";
String clientId = UUID.randomUUID().toString();
//設置超時時間,且加鎖和設置線程ID
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,clientId, 30, TimeUnit.SECONDS)`
  
if (!result) {
  //沒有加鎖成功,則返回提示等
}
try{
  
}catch() {
  
}finally{
  //釋放鎖:加鎖線程ID和當前執行線程ID相同,才允許釋放鎖
 if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
        stringRedisTemplate.delete(lockKey);
    }
}

4. 上面場景解決方案:加鎖續命即續線程鎖超時時間

解決方案:加鎖成功時,開啓一個後臺線程,每隔 10s(自定義)判斷當前線程是否還持有鎖,持有鎖則再續命 30s 等

Redission 實現分佈式鎖

實現原理流程:

   String lockKey = "product_001";
        //獲取鎖對象,並未加鎖
        RLock redissonLock = redisson.getLock(lockKey);
        try {
            // **此時加鎖**,實現鎖續命功能
            redissonLock.lock();
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); 
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); 
                System.out.println("扣減成功,剩餘庫存:" + realStock + "");
            } else {
                System.out.println("扣減失敗,庫存不足");
            }
        }finally {
          //釋放鎖
            redissonLock.unlock();
        }

總結

綜上,設計實現分佈式鎖需要滿足一下條件:

  1. 互斥性;在任意時刻,只有一個客戶端能持有鎖。

  2. 不能發生死鎖;即使存在一個線程持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他線程能加鎖。

  3. 加鎖和解鎖必須是同一個線程。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/M9MUKm_ilu6UTwZ0jI609A