通過 etcd 和 redis 實現分佈式鎖(全)

Java 鎖實現大全

首先,我們解釋一下,什麼是 Java 中的鎖:

“一段 synchronized(同步塊) 的代碼被一個線程執行之前,他要先拿到執行這段代碼的權限,在 Java 裏邊就是拿到某個同步對象的鎖(一個對象只有一把鎖);

如果這個時候同步對象的鎖被其他線程拿走了,他(這個線程)就只能等了(線程阻塞在鎖池等待隊列中)。 取到鎖後,他就開始執行同步代碼 (被 synchronized 修飾的代碼);線程執行完同步代碼後馬上就把鎖還給同步對象,其他在鎖池中等待的某個線程就可以拿到鎖執行同步代碼了。這樣就保證了同步代碼在統一時刻只有一個線程在執行 "

Java 的鎖實現,如下圖所示(通過數據庫做分佈式鎖開銷太大,因此下圖打叉):

針對分佈式鎖,就需要用外部的方式生成分佈式鎖。

分佈式環境下,鎖定全局唯一資源,作用是:

分佈式事務中,常見的分佈式鎖有:

我們在選擇鎖的時候,需要結合場景。如果 CP 類業務,就應該選擇上表 CP 的 zookeeper 或 redis。AP 類的業務 (例如社交類),就可以選擇 redis 做分佈式鎖。

etcd 實現分佈式鎖詳解

但是,如果業務是 CP 模型,就需要使用強一致的分佈式鎖,如 etcd。

實際上,在容器雲時代,etcd 作爲服務註冊中心被廣泛使用。

etcd 有如下特點:

etcd 提供的分佈式鎖整體方案是:分佈式 celient+etcd,是 client TTL 模式。

etcdv3 默認提供分佈式鎖的功能。我們不用顯性書寫鎖續租,只需要關注申請鎖、釋放鎖即可。續租會自動進行。

   etcd 提供了獨有的集羣管理模式,方便進行極端 case 下的測試,以三個節點的 etcd 集羣爲例:

        1. 單節點停機,不影響持續寫入,不影響讀,結果有一致性。

        2. 當只有一個節點時,讀會停機,寫入正常。

        3. 理論上只要不是多節點同時停機,線上服務不會受影響。

etcd 流程圖如下所示:

在上圖中,etcd client 用於管理 etcd 的鏈接,節點監控、節點複雜均衡(etcd 集羣自己的多活不需要 etcd client 來保證。etcd client 是爲了保證連接到的 etcd 實例是可用的)。

客戶端在請求鎖的時候,會先判斷 etcd client 是否存在,如果存在,則根據負載均衡算法建立與 etcd 鏈接,連上以後,從 etcd 競爭鎖,成功的話,後面鎖的租期自動續租,當鎖使用結束後,釋放鎖。

客戶端在請求鎖的時候,如果沒有 etcd client,那麼會加載 etcd 集羣建立 etcdclient。而這個 etcd client 的目的,本質上是判斷 etcd 集羣哪個實例可用,然後按照算法爲分配一個 etcd 的實例連接。

我們查看 etcd client 的部分代碼(EtcdClient.java):

第一個圈是加載 etcd 節點的列表(node ip);

第一個方框是把節點列表都加入到內存中;

第二個方框是對 etcd 實例做心跳檢查(探活、自動恢復);

下面代碼段是 etcd client 對外提供的 etcd 的操作接口,如競爭鎖、刪除鎖等。

代碼中對鎖的操作採用 curl。下面代碼段用於組裝 curl 的命令:

接下里,我們查看 etcd 心跳檢查方法(EtcdHeartbeatTask.java):

代碼通過嘗試連接來確認 etcd 實例是否是好的:

通過建立 socket 通信來檢驗 etcd 是否可以被連接接:

接下來,我們看 etcd 鎖的實現。它支持可重入鎖、自動續租、競爭鎖、釋放鎖。(** EtcdLock.java**)。

我們查看代碼中聲明鎖的代碼段, 我們可以看到用 etcd 實現分佈式鎖的本質利用它的 Key-value:

在前文中我我們提到了,Java 鎖三種本地事務鎖都都支持重入鎖。支持重入性,表示能夠對共享資源能夠重複加鎖,即當前線程獲取該鎖再次獲取不會被阻塞。

要想支持重入性,就要解決兩個問題:1. 在線程獲取鎖的時候,如果已經獲取鎖的線程是當前線程的話則直接再次獲取成功;2. 由於鎖會被獲取 n 次,那麼只有鎖在被釋放同樣的 n 次之後,該鎖纔算是完全釋放成功。

接下來我們繼續看 etcd 實現分佈式鎖的源碼。

在 BaseLock.java 中,查看如下代碼段:

如果沒有獲取到 etcd 中的 key,表示是空鎖,返回 false;

如果是可重入鎖並且當線程是否已經持有 key,持有的話則返回 true。這就實現了可重入鎖直接拿到鎖,不用競爭和等待的非阻塞模式。

如果沒搶到鎖,則需要重新搶鎖。

我們看一下 EtcdLock.java 中鎖的核心代碼:

上圖第一個方框搶的鎖,是 EtcdClient.java 中定義的casVal方法名。如果沒有搶到鎖,就重複嘗試搶鎖,直到超時退出。此外,如果搶鎖 error,也需要重試搶鎖。

那麼,搶的鎖在哪裏定義的呢?EtcdClient.java 中。

這樣,我們就對上號了。EtcdClient.java 定義訪問監控和訪問 etcd 集羣,定義鎖;BaseLock.java 定義空鎖和可重入鎖的判斷;EtcdLock.java 定義搶鎖的方法。

截止到目前,etcd 做分佈式鎖我們已經介紹完。

redis 實現分佈式鎖詳解

接下來,我們看通過 redis 實現分佈式鎖。

Redis 雖然本身支持多線程,但只是 I/O 支持多線程,但本質上命令處理還是隻唯一線程串行處理。

在通過 redis 實現分佈式鎖的時候,也需要設置 RedisClient.java. 用於配置 java 連接、監控 redi 集羣。

Redis 自身的鎖可以利用 Redis 的 setnx 命令(需要注意的是,分佈式鎖這樣不成 -)。

Redis 加鎖解鎖僞代碼如下:

if (setnx(key, 1) == 1){
    expire(key, 30)
try {
//TODO 業務邏輯
    } finally {
        del(key)
    }
}

上述鎖實現方式存在一些問題:如果 SETNX 成功,在設置鎖超時時間後,服務器掛掉、重啓或網絡問題等,導致 EXPIRE 命令沒有執行,鎖沒有設置超時時間變成死鎖。

有很多開源代碼來解決這個問題,比如使用 lua 腳本。也就是說,雖然 redis 自己有加鎖的命令,但我們在實際應用中不會這樣用,因爲會出現一些問題。

我們看兩段通過 redis 實現分佈式鎖加鎖和解鎖的代碼片段。

加鎖代碼:

public class RedisTool {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
     * 嘗試獲取分佈式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @param expireTime 超期時間
     * @return 是否獲取成功
     */
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
        }
return false;
    }
}

可以看到,我們加鎖就一行代碼:jedis.set(String key, String value, String nxxx, String expx, int time),這個 set() 方法一共有五個形參:

  1. 第一個爲 key,我們使用 key 來當鎖,因爲 key 是唯一的。

  2. 第二個爲 value,我們傳的是 requestId,很多童鞋可能不明白,有 key 作爲鎖不就夠了嗎,爲什麼還要用到 value?原因就是我們在上面講到可靠性時,分佈式鎖要滿足第四個條件解鈴還須繫鈴人,通過給 value 賦值爲 requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。requestId 可以使用 UUID.randomUUID().toString() 方法生成。

  3. 第三個爲 nxxx,這個參數我們填的是 NX,意思是 SET IF NOT EXIST,即當 key 不存在時,我們進行 set 操作;若 key 已經存在,則不做任何操作;

  4. 第四個爲 expx,這個參數我們傳的是 PX,意思是我們要給這個 key 加一個過期的設置,具體時間由第五個參數決定。

  5. 第五個爲 time,與第四個參數相呼應,代表 key 的過期時間。

解鎖代碼:

public class RedisTool {
private static final Long RELEASE_SUCCESS = 1L;
/**
     * 釋放分佈式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @return 是否釋放成功
     */
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
        }
return false;
    }
}

首先獲取鎖對應的 value 值,檢查是否與 requestId 相等,如果相等則刪除鎖(解鎖)。那麼爲什麼要使用 Lua 語言來實現呢?因爲要確保上述操作是原子性的。eval 命令執行 Lua 代碼的時候,Lua 代碼將被當成一個命令去執行,並且直到 eval 命令執行完成,Redis 纔會執行其他命令。

參考文獻:

https://mp.weixin.qq.com/s/qJK61ew0kCExvXrqb7-RSg

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/zFJuIdKeOqVy0nRoGPWo8w