高性能分佈式限流:Redis-Lua 真香!

什麼是限流?爲什麼要限流?

限流,這個詞其實並不陌生,在我們生活中也隨處可見。做核酸時,工作人員會在覈酸檢測點的空地上擺放着彎彎曲曲的圍欄,人們排着隊左拐右拐的往前移動,其實這麼做的目的就是限流!因爲核酸檢測的窗口是有限的,一下子進那麼多人,沒那麼多空間讓人們站下,就會造成擁擠,甚至會造成事故。所以需要限流!

同樣的,我們的應用程序也是類似的,任何系統它處理請求的能力都是有限的,一旦請求多到超出系統的處理極限,系統就會崩潰。對於生產環境,崩潰是一個很大的生產事故,保不準就會給公司造成很大的損失,輕則賠款,重則判刑都是有可能的。所以今天我們就來聊一下如何實現高性能的限流。

重試、限流、熔斷、降級被稱爲分佈式系統高可用的四板斧。

不可避免地,

第一,我們一定要設置超時;

第二,要在一些場景裏面去考慮重試的邏輯;

第三,考慮熔斷的邏輯,不要被下游拖死;

第四,一定要有限流的邏輯,不要被上游打死。

當今社會,互聯網公司的流量巨大,系統上線前需要對系統進行全面的流量峯值評估,以判斷系統所能承載的最大瞬時請求數,尤其是像各種秒殺促銷活動,爲了保證系統不被巨大的流量壓垮,會事先評估系統最大請數,並設置限流邏輯,以便在系統流量到達設定的閾值時,拒絕掉這部分流量,從而確保系統不會崩潰。

限流會導致用戶在短時間內(這個時間段是毫秒級的)系統不可用,假設系統設置的每秒流量閾值是 100,理論上一秒內第 101 個及之後的請求都會被限流,相當於拒絕服務,下一秒進來的請求能正常被響應,這也就是爲什麼我們搶購時,一會兒能進頁面一會兒顯示 “請稍後” 之類的提示語。相比於系統的短暫不可用,要比系統崩潰要好太多了。

對於限流有很多方式,最經典的幾種就是,計數器法、滑動窗口、漏桶法、令牌桶等,今天北哥要講的是採用 Redis + Lua 腳本實現高性能的分佈式限流,下面就跟着北哥來一起實戰吧。

分佈式限流

所謂的分佈式限流,其實道理很簡單。分佈式區別於單機限流的場景,它把整個分佈式集羣環境中所有服務器當做一個整體來考量。比如說針對 IP 限流,我們限制了 1 個 IP 每秒最多 10 個訪問,不管來自這個 IP 地址的請求落在了哪臺機器上,只要是訪問了集羣中的服務節點,那麼都會受到限制規則的制約。

從上面的例子不難看出,我們必須將限流信息保存在一個 “中心化” 的組件上,這樣它就可以獲取到集羣中所有機器的訪問狀態。

目前有兩個比較主流的限流方案:

  1. 網關層限流。將限流規則應用在所有流量的入口處

  2. 中間件限流。將限流信息存儲在分佈式環境中某個中間件裏(比如 redis),每個組件都可以從這裏獲取到當前時間的流量統計,從而決定是否放行還是拒絕。

Redis+Lua 實現高性能分佈式限流

這篇文章介紹 Redis+Lua 實現分佈式限流,很多小夥伴不知道 Lua 是什麼,個人理解,Lua 腳本和 MySQL 數據庫的存儲過程比較相似,他們執行一組命令,所有命令的執行要麼全部成功或者失敗,以此達到原子性。也可以把 Lua 腳本理解爲,一段具有業務邏輯的代碼塊。

而 Lua 本身就是一種編程語言(腳本語言),Redis 腳本使用 Lua 解釋器來執行腳本。Reids 2.6 版本通過內嵌支持 Lua 環境。執行腳本的常用命令爲 EVAL。詳細參考 https://www.redis.net.cn/tutorial/3516.html

雖然 Redis 官方沒有直接提供限流相應的 API,但卻支持了 Lua 腳本的功能,可以使用它實現複雜的令牌桶或漏桶算法,也是分佈式系統中實現限流的主要方式之一。

並且通常我們使用 Redis 事務時,並不是直接使用 Redis 自身提供的事務功能,而是使用 Lua 腳本。相比 Redis 事務,Lua 腳本的優點:

Lua 腳本大致邏輯如下:

-- 獲取調用腳本時傳入的第一個key值(用作限流的 key)
local key = KEYS[1]
-- 獲取調用腳本時傳入的第一個參數值(限流大小)
local limit = tonumber(ARGV[1])
-- 獲取當前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")
-- 是否超出限流
if curentLimit + 1 > limit then
    -- 返回(拒絕)
    return 0
else
    -- 沒有超出 value + 1
    redis.call("INCRBY", key, 1)
    -- 設置過期時間
    redis.call("EXPIRE", key, 2)
    -- 返回(放行)
    return 1
end

實戰

首先創建一個 springboot 項目,在 pom.xml 中引入依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>21.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.9</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

配置 RedisTemplate

首先 application.properties 配置 Redis 連接信息

spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0

然後通過 @Bean 配置 RedisTemplate

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.Serializable;
/**
 * redis配置類.
 *
 * @author : 北哥 公衆號:BiggerBoy
 * @version : 1.0 2022/09/09
 * @since : 1.0
 */
@Configuration
public class RedisConfig {
    @Bean
    JedisConnectionFactory jedisConnectionFactory() {
        return new JedisConnectionFactory();
    }
    @Bean
    public RedisTemplate<String, Serializable> limitRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Serializable> template = new RedisTemplate<>();
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

創建自定義註解

然後我們創建自定義註解:

import com.biggerboy.redislimiter.enums.LimitType;
import java.lang.annotation.*;
/**
 * redis限流注解.
 *
 * @author : 北哥 公衆號:BiggerBoy
 * @version : 1.0 2022/09/09
 * @since : 1.0
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MyRedisLimiter {
    /**
     * 緩存到Redis的key
     */
    String key();
    /**
     * Key的前綴
     */
    String prefix() default "limiter:";
    /**
     * 給定的時間範圍 單位()
     * 默認1秒 即1秒內超過count次的請求將會被限流
     */
    int period() default 1;
    /**
     * 一定時間內最多訪問的次數
     */
    int count();
    /**
     * 限流的維度(用戶自定義key 或者 調用方ip)
     */
    LimitType limitType() default LimitType.CUSTOMER;
}

創建切面類 RedisLimitAspect

大致邏輯是獲取方法上的註解 MyRedisLimiter,從註解上獲取配置信息,組裝 keys 和參數,然後調用 RedisTemplate 的 execute 方法獲取當前時間內請求數,小於等於 limitCount 則不限流,否則限流降級處理。

/**
 * @param pjp
 * @author 問北(北哥) 公衆號:BiggerBoy
 * @description 切面
 * @date 2022-9-8 18:29:53
 */
@Around("execution(public * *(..)) && @annotation(com.wenbei.annotation.MyRedisLimiter)")
public Object limit(ProceedingJoinPoint pjp) {
    MethodSignature signature = (MethodSignature) pjp.getSignature();
    Method method = signature.getMethod();
    MyRedisLimiter limitAnnotation = method.getAnnotation(MyRedisLimiter.class);
    LimitType limitType = limitAnnotation.limitType();
    int limitPeriod = limitAnnotation.period();
    int limitCount = limitAnnotation.count();
    String key = getKey(limitAnnotation, limitType);
    ImmutableList<String> keys = ImmutableList.of(StringUtils.join(limitAnnotation.prefix(), key));
    try {
        Number count = limitRedisTemplate.execute(redisScript, keys, limitCount, limitPeriod);
        logger.info("try to access, this time count is {} for key: {}", count, key);
        if (count != null && count.intValue() <= limitCount) {
            return pjp.proceed();
        } else {
            demote();//降級
            return null;
        }
    } catch (Throwable e) {
        if (e instanceof RuntimeException) {
            throw new RuntimeException(e.getLocalizedMessage());
        }
        throw new RuntimeException("服務器出現異常,請稍後再試");
    }
}

到這裏,最關鍵的 Lua 是如何使用的還沒講到。我們可以看到上述代碼調用 limitRedisTemplate.execute 參數的第一個是 redisScript,這便是 Redis 用於執行 Lua 腳本的重要支持。

加載 Lua 腳本

在切面類中,我們可以通過初始化加載 Lua 腳本,如下 new ClassPathResource(LIMIT_LUA_PATH)

private static final String LIMIT_LUA_PATH = "limit.lua";
private DefaultRedisScript<Number> redisScript;
@PostConstruct
public void init() {
    redisScript = new DefaultRedisScript<>();
    redisScript.setResultType(Number.class);
    ClassPathResource classPathResource = new ClassPathResource(LIMIT_LUA_PATH);
    try {
        classPathResource.getInputStream();//探測資源是否存在
        redisScript.setScriptSource(new ResourceScriptSource(classPathResource));
    } catch (IOException e) {
        logger.error("未找到文件:{}", LIMIT_LUA_PATH);
    }
}

我們傳入常量 limit.lua,這是 classpath 下創建的腳本文件,Lua 腳本如下,也很簡單,就不在贅述。通常應該在 limit.lua 文件中放置腳本文件,這樣如果需要修改腳本,僅需要修改文件重啓即可。

local count
count = redis.call('get',KEYS[1])
--不超過最大值,則直接返回
if count and tonumber(count) > tonumber(ARGV[1]) then
    return count;
end
    --執行計算器自加
    count = redis.call('incr',KEYS[1])
if tonumber(count) == 1 then
    --從第一次調用開始限流,設置對應key的過期時間
    redis.call('expire',KEYS[1],ARGV[2])
end
return count;

降級

然後在降級方法中寫我們的降級邏輯,通過拋異常或往 HttpServletResponse 寫入返回信息都可以。

/**
 * 降級策略
 * @author 北哥
 * @date 2020/4/8 13:24
 */
private void demote() {
    logger.info("try to access fail, this request will be demoted");
    //throw new RuntimeException("限流了");
    response.setHeader("Content-Type", "text/html;charset=UTF8");
    PrintWriter writer = null;
    try {
        writer = response.getWriter();
        writer.println("訪問失敗,請稍後再試...");
        writer.flush();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (writer != null) {
            writer.close();
        }
    }
}

接口限流

好了,準備工作都 ok 了,下面我們在 controller 接口上加上註解,測試一下。

/**
 * 測試限流controller
 * @author 北哥 公衆號:BiggerBoy
 * @date 2022年9月8日17:02:40
 */
@RestController
public class TestLimiterController {
    /**
     * @author 北哥
     * @description
     * @date 2020/4/8 13:42
     */
    @MyRedisLimiter(key = "limitTest", count = 2)
    @RequestMapping(value = "/limitTest")
    public Long limitTest() {
        System.out.println("limitTest");
        return 1L;
    }
    /**
     * @author 北哥
     * @description
     * @date 2020/4/8 13:42
     */
    @MyRedisLimiter(key = "customer_limit_test", period = 10, count = 3, limitType = LimitType.CUSTOMER)
    @GetMapping("/limitTest2")
    public Integer testLimiter2() {
        System.out.println("limitTest2");
        return 1;
    }
    /**
     * @author 北哥
     * @description
     * @date 2020/4/8 13:42
     */
    @MyRedisLimiter(key = "ip_limit_test", period = 10, count = 3, limitType = LimitType.IP)
    @GetMapping("/limitTest3")
    public Integer testLimiter3() {
        System.out.println("limitTest3");
        return 3;
    }
}

測試

接口限制每秒 2 個請求,我們使用 jmeter1 秒發 10 個請求

結果只有前兩個成功了(上述降級採用的直接拋異常,方便在這裏看到限流時下面時紅色的)

總結

以上 springboot + aop + Lua 限流實現是比較簡單的,旨在讓大家認識下什麼是限流?如何做一個簡單的限流功能,面試要知道這是個什麼東西。上面雖然說了幾種實現限流的方案,但選哪種還要結合具體的業務場景,不能爲了用而用。在真正的場景裏,不止設置一種限流規則,而是會設置多個限流規則共同作用,如連接數、訪問頻率、黑白名單、傳輸速率等。

源碼地址:

https://gitee.com/it-wenbei/redis-limiter.git

參考:

https://www.cnblogs.com/h1763656169/articles/16554906.html

https://www.redis.net.cn/tutorial/3516.html

https://mp.weixin.qq.com/s/kyFAWH3mVNJvurQDt4vchA

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/p8Eb092RCXONENtkhuqe3A