SpringBoot 通用限流方案

一、背景

限流對於一個微服務架構系統來說具有非常重要的意義,否則其中的某個微服務將成爲整個系統隱藏的雪崩因素,爲什麼這麼說?

舉例來講,某個 SAAS 平臺有 100 多個微服務應用,但是作爲底層的某個或某幾個應用來說,將會被所有上層應用頻繁調用,業務高峯期時,如果底層應用不做限流處理,該應用必將面臨着巨大的壓力,尤其是那些個別被高頻調用的接口來說,最直接的表現就是導致後續新進來的請求阻塞、排隊、響應超時... 最後直到該服務所在 JVM 資源被耗盡。

二、限流概述

在大多數的微服務架構在設計之初,比如在技術選型階段,架構師會從一個全局的視角去規劃技術棧的組合,比如結合當前產品的現狀考慮是使用 dubbo?還是 springcloud?作爲微服務治理的底層框架。甚至爲了滿足快速的上線、迭代和交付,直接以 springboot 爲基座進行開發,後續再引入新的技術棧等...

所以在談論某個業務場景具體的技術解決方案時不可一概而論,而是需要結合產品和業務的現狀綜合評估,以限流來說,在下面的不同的技術架構下具體在選擇的時候可能也不一樣。

2.1 dubbo 服務治理模式

選擇 dubbo 框架作爲基礎服務治理對於那種偏向內部平臺的應用還是不錯的,dubbo 底層走 netty,這一點相比 http 協議來說,在一定場景下還是具有優勢的,如果選擇 dubbo,在選擇限流方案上可以做如下的參考。

2.1.1 dubbo 框架級限流

dubbo 官方提供了完善的服務治理,能夠滿足大多數開發場景中的需求,針對限流這個場景,具體來說包括如下手段,具體的配置,可以參考官方手冊;

客戶端限流

服務端限流

2.1.2 線程池設置

多線程併發操作一定離不開線程池,Dubbo 自身提供了支持了四種線程池類型支持。生產者<dubbo:protocol>標籤中可配置線程池關鍵參數,線程池類型、阻塞隊列大小、核心線程數量等,通過配置生產端的線程池數量可以在一定程度上起到限流的效果。

2.1.3 集成第三方組件

如果是 springboot 框架的項目,可以考慮直接引入地方的組件或 SDK,比如 hystrix,guava,sentinel 原生 SDK 等,如果技術實力足夠強甚至可以考慮自己造輪子。

2.2 springcloud 服務治理模式

如果你的服務治理框架選用的是 springcloud 或 springcloud-alibaba,其框架自身的生態中已經包含了相應的限流組件,可以實現開箱即用,下面列舉幾種常用的基於 springcloud 框架的限流組件。

2.2.1 hystrix

Hystrix 是 Netflix 開源的一款容錯框架,在 springcloud 早期推出市場的時候,作爲 springcloud 生態中用於限流、熔斷、降級的一款組件。

Hystrix 提供了限流功能,在 springcloud 架構的系統中,可以在網關啓用 Hystrix,進行限流處理,每個微服務也可以各自啓用 Hystrix 進行限流。

Hystrix 默認使用線程隔離模式,可以通過線程數 + 隊列大小進行限流,具體參數配置可以參考官網相關資料。

2.2.2 sentinel

Sentinel 號稱分佈式系統的流量防衛兵,屬於 springcloud-alibaba 生態中的重要組件,面向分佈式服務架構的流量控制組件,主要以流量爲切入點,從限流、流量整形、熔斷降級、系統負載保護、熱點防護等多個維度來幫助開發者保障微服務的穩定性。

2.3 網關層限流

隨着微服務規模的增加,整個系統中很多微服務都需要實現限流這種需求時,就可以考慮在網關這一層進行限流了,通常來說,網關層的限流面向的是通用的業務,比如那些惡意的請求,爬蟲,攻擊等,簡單來說,網關層面的限流提供了一層對系統整體的保護措施。

三、常用限流策略

3.1 限流常用的算法

不管是哪種限流組件,其底層的限流實現算法大同小異,這裏列舉幾種常用的限流算法以供瞭解。

3.1.1 令牌桶算法

令牌桶算法是目前應用最爲廣泛的限流算法,顧名思義,它有以下兩個關鍵角色:

令牌桶主要涉及到 2 個過程,即令牌的生成,令牌的獲取

3.1.2 漏桶算法

漏桶算法的前半段和令牌桶類似,但是操作的對象不同,結合下圖進行理解。

令牌桶是將令牌放入桶裏,而漏桶是將訪問請求的數據包放到桶裏。同樣的是,如果桶滿了,那麼後面新來的數據包將被丟棄。

3.1.3 滑動時間窗口

根據下圖,簡單描述下滑動時間窗口這種過程:

滑動窗口其實也是一種計算器算法,它有一個顯著特點,當時間窗口的跨度越長時,限流效果就越平滑。打個比方,如果當前時間窗口只有兩秒,而訪問請求全部集中在第一秒的時候,當時間向後滑動一秒後,當前窗口的計數量將發生較大的變化,拉長時間窗口可以降低這種情況的發生概率

四、通用限流實現方案

拋開網關層的限流先不說,在微服務應用中,考慮到技術棧的組合,團隊人員的開發水平,以及易維護性等因素,一個比較通用的做法是,利用 AOP 技術 + 自定義註解實現對特定的方法或接口進行限流,下面基於這個思路來分別介紹下幾種常用的限流方案的實現。

4.1 基於 guava 限流實現

guava 爲谷歌開源的一個比較實用的組件,利用這個組件可以幫助開發人員完成常規的限流操作,接下來看具體的實現步驟。

4.1.1 引入 guava 依賴

版本可以選擇更高的或其他版本

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>23.0</version>
</dependency>
4.1.2 自定義限流注解

自定義一個限流用的註解,後面在需要限流的方法或接口上面只需添加該註解即可;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
 
@Target(value = ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface RateConfigAnno {
 
    String limitType();
 
    double limitCount() default 5d;
}
4.1.3 限流 AOP 類

通過 AOP 前置通知的方式攔截添加了上述自定義限流注解的方法,解析註解中的屬性值,並以該屬性值作爲 guava 提供的限流參數,該類爲整個實現的核心所在。

import com.alibaba.fastjson2.JSONObject;
import com.google.common.util.concurrent.RateLimiter;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
 
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Objects;
 
@Aspect
@Component
public class GuavaLimitAop {
 
    private static Logger logger = LoggerFactory.getLogger(GuavaLimitAop.class);
 
    @Before("execution(@RateConfigAnno * *(..))")
    public void limit(JoinPoint joinPoint) {
        //1、獲取當前的調用方法
        Method currentMethod = getCurrentMethod(joinPoint);
        if (Objects.isNull(currentMethod)) {
            return;
        }
        //2、從方法註解定義上獲取限流的類型
        String limitType = currentMethod.getAnnotation(RateConfigAnno.class).limitType();
        double limitCount = currentMethod.getAnnotation(RateConfigAnno.class).limitCount();
        //使用guava的令牌桶算法獲取一個令牌,獲取不到先等待
        RateLimiter rateLimiter = RateLimitHelper.getRateLimiter(limitType, limitCount);
        boolean b = rateLimiter.tryAcquire();
        if (b) {
            System.out.println("獲取到令牌");
        }else {
            HttpServletResponse resp = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
            JSONObject jsonObject=new JSONObject();
            jsonObject.put("success",false);
            jsonObject.put("msg","限流中");
            try {
                output(resp, jsonObject.toJSONString());
            }catch (Exception e){
                logger.error("error,e:{}",e);
            }
        }
    }
 
    private Method getCurrentMethod(JoinPoint joinPoint) {
        Method[] methods = joinPoint.getTarget().getClass().getMethods();
        Method target = null;
        for (Method method : methods) {
            if (method.getName().equals(joinPoint.getSignature().getName())) {
                target = method;
                break;
            }
        }
        return target;
    }
 
    public void output(HttpServletResponse response, String msg) throws IOException {
        response.setContentType("application/json;charset=UTF-8");
        ServletOutputStream outputStream = null;
        try {
            outputStream = response.getOutputStream();
            outputStream.write(msg.getBytes("UTF-8"));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            outputStream.flush();
            outputStream.close();
        }
    }
}

其中限流的核心 API 即爲 RateLimiter 這個對象,涉及到的 RateLimitHelper 類如下

import com.google.common.util.concurrent.RateLimiter;
 
import java.util.HashMap;
import java.util.Map;
 
public class RateLimitHelper {
 
    private RateLimitHelper(){}
 
    private static Map<String,RateLimiter> rateMap = new HashMap<>();
 
    public static RateLimiter getRateLimiter(String limitType,double limitCount ){
        RateLimiter rateLimiter = rateMap.get(limitType);
        if(rateLimiter == null){
            rateLimiter = RateLimiter.create(limitCount);
            rateMap.put(limitType,rateLimiter);
        }
        return rateLimiter;
    }
 
}
4.1.4 測試接口

下面添加一個測試接口,測試一下上面的代碼是否生效

@RestController
public class OrderController {
 
    //localhost:8081/save
    @GetMapping("/save")
    @RateConfigAnno(limitType = "saveOrder",limitCount = 1)
    public String save(){
        return "success";
    }
 
}

在接口中爲了模擬出效果,我們將參數設置的非常小,即 QPS 爲 1,可以預想當每秒請求超過 1 時將會出現被限流的提示,啓動工程並驗證接口,每秒 1 次的請求,可以正常得到結果,效果如下:

快速刷接口,將會看到下面的效果

4.2 基於 sentinel 限流實現

在不少同學的意識中,sentinel 通常是需要結合 springcloud-alibaba 框架一起實用的,而且與框架集成之後,可以配合控制檯一起使用達到更好的效果,實際上,sentinel 官方也提供了相對原生的 SDK 可供使用,接下來就以這種方式進行整合。

4.2.1 引入 sentinel 核心依賴包

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.0</version>
</dependency>
4.2.2 自定義限流注解

可以根據需要,添加更多的屬性

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
 
@Target(value = ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface SentinelLimitAnnotation {
 
    String resourceName();
 
    int limitCount() default 5;
 
}
4.2.3 自定義 AOP 類實現限流

該類的實現思路與上述使用 guava 類似,不同的是,這裏使用的是 sentinel 原生的限流相關的 API,對此不夠屬性的可以查閱官方的文檔進行學習,這裏就不展開來說了。

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
 
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
 
@Aspect
@Component
public class SentinelMethodLimitAop {
 
    private static void initFlowRule(String resourceName,int limitCount) {
        List<FlowRule> rules = new ArrayList<>();
        FlowRule rule = new FlowRule();
        //設置受保護的資源
        rule.setResource(resourceName);
        //設置流控規則 QPS
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        //設置受保護的資源閾值
        rule.setCount(limitCount);
        rules.add(rule);
        //加載配置好的規則
        FlowRuleManager.loadRules(rules);
    }
 
    @Pointcut(value = "@annotation(com.congge.sentinel.SentinelLimitAnnotation)")
    public void rateLimit() {
 
    }
 
    @Around("rateLimit()")
    public Object around(ProceedingJoinPoint joinPoint) {
        //1、獲取當前的調用方法
        Method currentMethod = getCurrentMethod(joinPoint);
        if (Objects.isNull(currentMethod)) {
            return null;
        }
        //2、從方法註解定義上獲取限流的類型
        String resourceName = currentMethod.getAnnotation(SentinelLimitAnnotation.class).resourceName();
        if(StringUtils.isEmpty(resourceName)){
            throw new RuntimeException("資源名稱爲空");
        }
        int limitCount = currentMethod.getAnnotation(SentinelLimitAnnotation.class).limitCount();
        initFlowRule(resourceName,limitCount);
 
        Entry entry = null;
        Object result = null;
        try {
            entry = SphU.entry(resourceName);
            try {
                result = joinPoint.proceed();
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        } catch (BlockException ex) {
            // 資源訪問阻止,被限流或被降級
            // 在此處進行相應的處理操作
            System.out.println("blocked");
            return "被限流了";
        } catch (Exception e) {
            Tracer.traceEntry(e, entry);
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
        return result;
    }
 
    private Method getCurrentMethod(JoinPoint joinPoint) {
        Method[] methods = joinPoint.getTarget().getClass().getMethods();
        Method target = null;
        for (Method method : methods) {
            if (method.getName().equals(joinPoint.getSignature().getName())) {
                target = method;
                break;
            }
        }
        return target;
    }
}
4.2.4 自定義測試接口

爲了模擬效果,這裏將 QPS 的數量設置爲 1

//localhost:8081/limit
@GetMapping("/limit")
@SentinelLimitAnnotation(limitCount = 1,resourceName = "sentinelLimit")
public String sentinelLimit(){
    return "sentinelLimit";
}

啓動工程之後,瀏覽器調用接口測試一下,每秒一個請求,可以正常通過

快速刷接口,超過每秒 1 次時,效果如下

這裏只是爲了演示出效果,建議在真實的項目中使用時,對返回結果做一個封裝。

4.3 基於 redis+lua 限流實現

redis 是線程安全的,天然具有線程安全的特性,支持原子性操作,限流服務不僅需要承接超高 QPS,還要保證限流邏輯的執行層面具備線程安全的特性,利用 Redis 這些特性做限流,既能保證線程安全,也能保證性能。基於 redis 的限流實現完整流程如下圖:

結合上面的流程圖,這裏梳理出一個整體的實現思路:

4.3.1 引入 redis 依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
4.3.2 自定義註解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface RedisLimitAnnotation {
 
    /**
     * key
     */
    String key() default "";
    /**
     * Key的前綴
     */
    String prefix() default "";
    /**
     * 一定時間內最多訪問次數
     */
    int count();
    /**
     * 給定的時間範圍 單位()
     */
    int period();
    /**
     * 限流的類型(用戶自定義key或者請求ip)
     */
    LimitType limitType() default LimitType.CUSTOMER;
 
}
4.3.3 自定義 redis 配置類
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;
 
import java.io.Serializable;
 
@Component
public class RedisConfiguration {
 
    @Bean
    public DefaultRedisScript<Number> redisluaScript() {
        DefaultRedisScript<Number> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("limit.lua")));
        redisScript.setResultType(Number.class);
        return redisScript;
    }
 
    @Bean("redisTemplate")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
 
        //設置value的序列化方式爲JSOn
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        //設置key的序列化方式爲String
        redisTemplate.setKeySerializer(new StringRedisSerializer());
 
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
 
        return redisTemplate;
    }
 
}
4.3.4 自定義限流 AOP 類
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
 
import javax.servlet.http.HttpServletRequest;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
 
@Aspect
@Configuration
public class LimitRestAspect {
 
    private static final Logger logger = LoggerFactory.getLogger(LimitRestAspect.class);
 
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    @Autowired
    private DefaultRedisScript<Number> redisluaScript;
 
 
    @Pointcut(value = "@annotation(com.congge.config.limit.RedisLimitAnnotation)")
    public void rateLimit() {
 
    }
 
    @Around("rateLimit()")
    public Object interceptor(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        Class<?> targetClass = method.getDeclaringClass();
        RedisLimitAnnotation rateLimit = method.getAnnotation(RedisLimitAnnotation.class);
        if (rateLimit != null) {
            HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
            String ipAddress = getIpAddr(request);
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append(ipAddress).append("-")
                    .append(targetClass.getName()).append("- ")
                    .append(method.getName()).append("-")
                    .append(rateLimit.key());
            List<String> keys = Collections.singletonList(stringBuffer.toString());
            //調用lua腳本,獲取返回結果,這裏即爲請求的次數
            Number number = redisTemplate.execute(
                    redisluaScript,
                    keys,
                    rateLimit.count(),
                    rateLimit.period()
            );
            if (number != null && number.intValue() != 0 && number.intValue() <= rateLimit.count()) {
                logger.info("限流時間段內訪問了第:{} 次", number.toString());
                return joinPoint.proceed();
            }
        } else {
            return joinPoint.proceed();
        }
        throw new RuntimeException("訪問頻率過快,被限流了");
    }
 
    /**
     * 獲取請求的IP方法
     * @param request
     * @return
     */
    private static String getIpAddr(HttpServletRequest request) {
        String ipAddress = null;
        try {
            ipAddress = request.getHeader("x-forwarded-for");
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("WL-Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getRemoteAddr();
            }
            // 對於通過多個代理的情況,第一個IP爲客戶端真實IP,多個IP按照','分割
            if (ipAddress != null && ipAddress.length() > 15) {
                if (ipAddress.indexOf(",") > 0) {
                    ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
                }
            }
        } catch (Exception e) {
            ipAddress = "";
        }
        return ipAddress;
    }
 
}

該類要做的事情和上面的兩種限流措施類似,不過在這裏核心的限流是通過讀取 lua 腳步,通過參數傳遞給 lua 腳步實現的。

4.3.5 自定義 lua 腳本

在工程的 resources 目錄下,添加如下的 lua 腳本

local key = "rate.limit:" .. KEYS[1]
 
local limit = tonumber(ARGV[1])
 
local current = tonumber(redis.call('get', key) or "0")
 
if current + 1 > limit then
  return 0
else
   -- 沒有超閾值,將當前訪問數量+1,並設置2秒過期(可根據自己的業務情況調整)
   redis.call("INCRBY", key,"1")
   redis.call("expire", key,"2")
   return current + 1
end
4.3.6 添加測試接口
@RestController
public class RedisController {
 
    //localhost:8081/redis/limit
    @GetMapping("/redis/limit")
    @RedisLimitAnnotation(key = "queryFromRedis",period = 1, count = 1)
    public String queryFromRedis(){
        return "success";
    }
 
}

爲了模擬效果,這裏將 QPS 設置爲 1 ,啓動工程後(提前啓動 redis 服務),調用一下接口,正常的效果如下:

快速刷接口,超過每秒 1 次的請求時看到如下效果

五、自定義 starter 限流實現

上面通過案例介紹了幾種常用的限流實現,不過細心的同學可以看到,這些限流的實現都是在具體的工程模塊中嵌入的,事實上,在真實的微服務開發中,一個項目可能包含了衆多的微服務模塊,爲了減少重複造輪子,避免每個微服務模塊中單獨實現,可以考慮將限流的邏輯實現封裝成一個 SDK,即作爲一個 springboot 的 starter 的方式被其他微服務模塊進行引用即可。這也是目前很多生產實踐中比較通用的做法,接下來看看具體的實現吧。

5.1 前置準備

創建一個空的 springboot 工程,工程目錄結構如下圖,目錄說明:

5.2 代碼整合完成步驟

5.2.1 導入基礎的依賴

這裏包括如下幾個必須的依賴,其他的依賴可以結合自身的情況合理選擇;

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/>
    </parent>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
 
    <dependencies>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
 
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
 
        <!-- guava-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.0</version>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
 
        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-core</artifactId>
            <version>1.8.0</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>
 
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.22</version>
        </dependency>
 
    </dependencies>
 
    <build>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/**</include>
                </includes>
            </resource>
        </resources>
    </build>
5.2.2 自定義註解

目前該 SDK 支持三種限流方式,即後續其他微服務工程中可以通過添加這 3 種註解即可實現限流,分別是基於 guava 的令牌桶,基於 sentinel 的限流,基於 java 自帶的 Semaphore 限流,三個自定義註解類如下:

令牌桶

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
 
public @interface TokenBucketLimiter {
    int value() default 50;
}

Semaphore

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ShLimiter {
    int value() default 50;
}

sentinel

@Target(value = ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface SentinelLimiter {
 
    String resourceName();
 
    int limitCount() default 50;
 
}
5.2.3 限流實現 AOP 類

具體的限流在 AOP 中進行實現,思路和上一章節類似,即通過環繞通知的方式,先解析那些添加了限流注解的方法,然後解析裏面的參數,進行限流的業務實現。

基於 guava 的 aop 實現

import com.alibaba.fastjson2.JSONObject;
import com.congge.annotation.TokenBucketLimiter;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.cglib.core.ReflectUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
 
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
@Aspect
@Component
@Slf4j
public class GuavaLimiterAop {
 
    private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<String, RateLimiter>();
 
    @Pointcut("@annotation(com.congge.annotation.TokenBucketLimiter)")
    public void aspect() {
    }
 
    @Around(value = "aspect()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        log.debug("準備限流");
        Object target = point.getTarget();
        String targetName = target.getClass().getName();
        String methodName = point.getSignature().getName();
        Object[] arguments = point.getArgs();
        Class<?> targetClass = Class.forName(targetName);
        Class<?>[] argTypes = ReflectUtils.getClasses(arguments);
        Method method = targetClass.getDeclaredMethod(methodName, argTypes);
        // 獲取目標method上的限流注解@Limiter
        TokenBucketLimiter limiter = method.getAnnotation(TokenBucketLimiter.class);
        RateLimiter rateLimiter = null;
        Object result = null;
        if (null != limiter) {
            // 以 class + method + parameters爲key,避免重載、重寫帶來的混亂
            String key = targetName + "." + methodName + Arrays.toString(argTypes);
            rateLimiter = rateLimiters.get(key);
            if (null == rateLimiter) {
                // 獲取限定的流量
                // 爲了防止併發
                rateLimiters.putIfAbsent(key, RateLimiter.create(limiter.value()));
                rateLimiter = rateLimiters.get(key);
            }
            boolean b = rateLimiter.tryAcquire();
            if(b){
                log.debug("得到令牌,準備執行業務");
                result = point.proceed();
            }else {
                HttpServletResponse resp = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
                JSONObject jsonObject=new JSONObject();
                jsonObject.put("success",false);
                jsonObject.put("msg","限流中");
                try {
                    output(resp, jsonObject.toJSONString());
                }catch (Exception e){
                    log.error("error,e:{}",e);
                }
            }
        } else {
            result = point.proceed();
        }
        log.debug("退出限流");
        return result;
    }
 
    public void output(HttpServletResponse response, String msg) throws IOException {
        response.setContentType("application/json;charset=UTF-8");
        ServletOutputStream outputStream = null;
        try {
            outputStream = response.getOutputStream();
            outputStream.write(msg.getBytes("UTF-8"));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            outputStream.flush();
            outputStream.close();
        }
    }
}

基於 Semaphore 的 aop 實現

import com.congge.annotation.ShLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cglib.core.ReflectUtils;
import org.springframework.stereotype.Component;
 
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
 
@Aspect
@Component
@Slf4j
public class SemaphoreLimiterAop {
 
    private final Map<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
    private final static Logger LOG = LoggerFactory.getLogger(SemaphoreLimiterAop.class);
 
    @Pointcut("@annotation(com.congge.annotation.ShLimiter)")
    public void aspect() {
 
    }
 
    @Around(value = "aspect()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        log.debug("進入限流aop");
        Object target = point.getTarget();
        String targetName = target.getClass().getName();
        String methodName = point.getSignature().getName();
        Object[] arguments = point.getArgs();
        Class<?> targetClass = Class.forName(targetName);
        Class<?>[] argTypes = ReflectUtils.getClasses(arguments);
        Method method = targetClass.getDeclaredMethod(methodName, argTypes);
        // 獲取目標method上的限流注解@Limiter
        ShLimiter limiter = method.getAnnotation(ShLimiter.class);
        Object result = null;
        if (null != limiter) {
            // 以 class + method + parameters爲key,避免重載、重寫帶來的混亂
            String key = targetName + "." + methodName + Arrays.toString(argTypes);
            // 獲取限定的流量
            Semaphore semaphore = semaphores.get(key);
            if (null == semaphore) {
                semaphores.putIfAbsent(key, new Semaphore(limiter.value()));
                semaphore = semaphores.get(key);
            }
            try {
                semaphore.acquire();
                result = point.proceed();
            } finally {
                if (null != semaphore) {
                    semaphore.release();
                }
            }
        } else {
            result = point.proceed();
        }
        log.debug("退出限流");
        return result;
    }
 
}

基於 sentinel 的 aop 實現

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.congge.annotation.SentinelLimiter;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
 
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
 
@Aspect
@Component
public class SentinelLimiterAop {
 
    private static void initFlowRule(String resourceName,int limitCount) {
        List<FlowRule> rules = new ArrayList<>();
        FlowRule rule = new FlowRule();
        //設置受保護的資源
        rule.setResource(resourceName);
        //設置流控規則 QPS
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        //設置受保護的資源閾值
        rule.setCount(limitCount);
        rules.add(rule);
        //加載配置好的規則
        FlowRuleManager.loadRules(rules);
    }
 
    @Pointcut(value = "@annotation(com.congge.annotation.SentinelLimiter)")
    public void rateLimit() {
 
    }
 
    @Around("rateLimit()")
    public Object around(ProceedingJoinPoint joinPoint) {
        //1、獲取當前的調用方法
        Method currentMethod = getCurrentMethod(joinPoint);
        if (Objects.isNull(currentMethod)) {
            return null;
        }
        //2、從方法註解定義上獲取限流的類型
        String resourceName = currentMethod.getAnnotation(SentinelLimiter.class).resourceName();
        if(StringUtils.isEmpty(resourceName)){
            throw new RuntimeException("資源名稱爲空");
        }
        int limitCount = currentMethod.getAnnotation(SentinelLimiter.class).limitCount();
        initFlowRule(resourceName,limitCount);
 
        Entry entry = null;
        Object result = null;
        try {
            entry = SphU.entry(resourceName);
            try {
                result = joinPoint.proceed();
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        } catch (BlockException ex) {
            // 資源訪問阻止,被限流或被降級
            // 在此處進行相應的處理操作
            System.out.println("blocked");
            return "被限流了";
        } catch (Exception e) {
            Tracer.traceEntry(e, entry);
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
        return result;
    }
 
    private Method getCurrentMethod(JoinPoint joinPoint) {
        Method[] methods = joinPoint.getTarget().getClass().getMethods();
        Method target = null;
        for (Method method : methods) {
            if (method.getName().equals(joinPoint.getSignature().getName())) {
                target = method;
                break;
            }
        }
        return target;
    }
 
}
5.2.4 配置自動裝配 AOP 實現

在 resources 目錄下創建上述的spring.factories文件,內容如下,通過這種方式配置後,其他應用模塊引入了當前的 SDK 的 jar 之後,就可以實現開箱即用了;

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.congge.aop.SemaphoreLimiterAop,\
  com.congge.aop.GuavaLimiterAop,\
  com.congge.aop.SemaphoreLimiterAop
5.2.5 將工程打成 jar 進行安裝

這一步比較簡單就跳過了

5.2.6 在其他的工程中引入上述 SDK
<dependency>
    <groupId>cm.congge</groupId>
    <artifactId>biz-limit</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>
5.2.7 編寫測試接口

在其他工程中,編寫一個測試接口,並使用上面的註解,這裏以 guava 的限流注解爲例進行說明

import com.congge.annotation.TokenBucketLimiter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class SdkController {
 
    //localhost:8081/query
    @GetMapping("/query")
    @TokenBucketLimiter(1)
    public String queryUser(){
        return "queryUser";
    }
 
}
5.2.8 功能測試

啓動當前的工程後,正常調用接口,每秒一次的請求,可以正常得到結果

快速刷接口,QPS 超過 1 之後,將會觸發限流,看到如下效果

通過上面這種方式,也可以得到預期的效果,其他兩種限流注解有興趣的同學也可以繼續測試驗證,篇幅原因就不再贅述了。

上述通過 starter 的方式實現了一種更優雅的限流集成方式,也是生產中比較推薦的一種方式,不過當前的案例還比較粗糙,需要使用的同學還需根據自己的情況完善裏面的邏輯,進一步的封裝以期得到更好的效果。

六、寫在文末

本文通過較大的篇幅結合實際案例詳細闡述了微服務中限流的一些實現方案,限流對於一個穩定運行的系統來說具有很重要的意義,可以說是服務治理中一個重要的方面,希望對看到的同學有所幫助,謝謝觀看。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/dZRSIhdxloeSpHGAN5ORhg