Redis 分佈式鎖如何自動續期

大家好,我是頂級架構師。

Redis 實現分佈式鎖

問題

如果這個鎖的過期時間是 30 秒,但是業務運行超過了 30 秒,比如 40 秒,當業務運行到 30 秒的時候,鎖過期了,其他客戶端拿到了這個鎖,怎麼辦

我們可以設置一個合理的過期時間,讓業務能夠在這個時間內完成業務邏輯,但 LockTime 的設置原本就很不容易。

我們只能通過經驗去配置,一個可以接受的值,基本上是這個服務歷史上的平均耗時再增加一定的 buff。總體來說,設置一個合理的過期時間並不容易

我們也可以不設置過期時間,讓業務運行結束後解鎖,但是如果客戶端出現了異常結束了或宕機了,那麼這個鎖就無法解鎖,變成死鎖;

自動續期

我們可以先給鎖設置一個 LockTime,然後啓動一個守護線程,讓守護線程在一段時間後,重新去設置這個鎖的 LockTime。

看起來很簡單,但實現起來並不容易

看門狗

Redisson 的看門狗機制就是這種機制實現自動續期的

Redissson tryLock

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 1.嘗試獲取鎖
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }

        // 申請鎖的耗時如果大於等於最大等待時間,則申請鎖失敗.
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        current = System.currentTimeMillis();

        /**
         * 2.訂閱鎖釋放事件,並通過 await 方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
         * 基於信息量,當鎖被其它資源佔用時,當前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發消息通知待等待的線程進行競爭.
         *
         * 當 this.await 返回 false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱並返回獲取鎖失敗.
         * 當 this.await 返回 true,進入循環嘗試獲取鎖.
         */
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // await 方法內部是用 CountDownLatch 來實現阻塞,獲取 subscribe 異步執行的結果(應用了 Netty 的 Future)
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            // 計算獲取鎖的總耗時,如果大於等於最大等待時間,則獲取鎖失敗.
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;

              }

            /**
             * 3.收到鎖釋放的信號後,在最大等待時間之內,循環一次接着一次的嘗試獲取鎖
             * 獲取鎖成功,則立馬返回 true,
             * 若在最大等待時間之內還沒獲取到鎖,則認爲獲取鎖失敗,返回 false 結束循環
             */
            while (true) {
                long currentTime = System.currentTimeMillis();

                // 再次嘗試獲取鎖
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
                // 超過最大等待時間則返回 false 結束循環,獲取鎖失敗
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                /**
                 * 6.阻塞等待鎖(通過信號量(共享鎖)阻塞,等待解鎖消息):
                 */
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    //如果剩餘時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    //則就在wait time 時間範圍內等待可以通過信號量
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                // 更新剩餘的等待時間(最大等待時間-已經消耗的阻塞時間)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 7.無論是否獲得鎖,都要取消訂閱解鎖消息
            unsubscribe(subscribeFuture, threadId);
        }
        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

看門狗如何自動續期

Redisson 看門狗機制, 只要客戶端加鎖成功,就會啓動一個 Watch Dog。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

續期原理

續期原理其實就是用 lua 腳本,將鎖的時間重置爲 30s

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()),
        internalLockLeaseTime, getLockName(threadId));
}

Watch Dog 機制其實就是一個後臺定時任務線程,獲取鎖成功之後,會將持有鎖的線程放入到一個 RedissonLock.EXPIRATION_RENEWAL_MAP裏面,然後每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 還持有鎖 key(判斷客戶端是否還持有 key,其實就是遍歷 EXPIRATION_RENEWAL_MAP 裏面線程 id 然後根據線程 id 去 Redis 中查,如果存在就會延長 key 的時間),那麼就會不斷的延長鎖 key 的生存時間。

如果服務宕機了,Watch Dog 機制線程也就沒有了,此時就不會延長 key 的過期時間,到了 30s 之後就會自動過期了,其他線程就可以獲取到鎖。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/vvJY6qzd8crRtWs4x7GgBQ