spring-cloud gateway 網關調優

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator
                .getRoutes()
                //individually filter routes so that filterWhen error delaying is not a problem
                .concatMap(route -> Mono
                        .just(route)
                        .filterWhen(r -> {
                            // add the current route we are testing
                            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                            return r.getPredicate().apply(exchange);
                        })
                        //instead of immediately stopping main flux due to error, log and swallow it
                        .doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))
                        .onErrorResume(e -> Mono.empty())
                )
                // .defaultIfEmpty() put a static Route not found
                // or .switchIfEmpty()
                // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
                .next()
                //TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });

        /* TODO: trace logging
            if (logger.isTraceEnabled()) {
                logger.trace("RouteDefinition did not match: " + routeDefinition.getId());
            }*/
    }

遍歷所有的路由規則直到找到一個符合的,路由過多是排序越往後自然越慢,但是也考慮到地方項目只有 10 個,但是我們還是試一試。
我們把這部分源碼抽出來自己修改一下,先寫死一個路由

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        if (this.routeLocator instanceof CachingRouteLocator) {
            CachingRouteLocator cachingRouteLocator = (CachingRouteLocator) this.routeLocator;
            // 這裏的getRouteMap()也是新加的方法
            return cachingRouteLocator.getRouteMap().next().map(map ->
                    map.get(“api-user”))
                    //這裏寫死一個路由id
                    .switchIfEmpty(matchRoute(exchange));
        }

        return matchRoute(exchange);
    }
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        //String md5Key = getMd5Key(exchange);
        String appId = exchange.getRequest().getHeaders().getFirst("M-Sy-AppId");
        String serviceId = exchange.getRequest().getHeaders().getFirst("M-Sy-Service");     
        String token = exchange.getRequest().getHeaders().getFirst("M-Sy-Token");
        String path = exchange.getRequest().getURI().getRawPath();
        StringBuilder value = new StringBuilder();
        String md5Key = "";
        if(StringUtils.isNotBlank(token)) {
            try {
                Map<String, Object> params =  (Map<String, Object>) redisTemplate.opsForValue().get("token:" + token);
                if(null !=params && !params.isEmpty()) {
                    JSONObject user = JSONObject.parseObject(params.get("user").toString());
                    appId = user.getString("appId");
                    serviceId = user.getString("serviceid");
                }
            }catch(Exception e) {
                e.printStackTrace();
            }
        }
        if(StringUtils.isBlank(appId) || StringUtils.isBlank(serviceId)) {
            md5Key = DigestUtils.md5Hex(path);
        }else {
            value.append(appId);
            value.append(serviceId);
            value.append(path);
            md5Key = DigestUtils.md5Hex(value.toString());
        }

        if (logger.isDebugEnabled()) {
            logger.info("Route matched before: " + routes.containsKey(md5Key));
        }
        if ( routes.containsKey(md5Key)
         && this.routeLocator instanceof CachingRouteLocator) {
            final String key = md5Key;
            CachingRouteLocator cachingRouteLocator = (CachingRouteLocator) this.routeLocator;
            // 注意,這裏的getRouteMap()也是新加的方法
            return cachingRouteLocator.getRouteMap().next().map(map ->
                    map.get(routes.get(key)))
                    // 這裏保證如果適配不到,仍然走老的官方適配邏輯
                    .switchIfEmpty(matchRoute(exchange,md5Key));
        }

        return matchRoute(exchange,md5Key);
    }

    private Mono<Route> matchRoute(ServerWebExchange exchange,String md5Key) {
        //String md5Key = getMd5Key(exchange);
        return this.routeLocator
                .getRoutes()
                //individually filter routes so that filterWhen error delaying is not a problem
                .concatMap(route -> Mono
                        .just(route)
                        .filterWhen(r -> {
                            // add the current route we are testing
                            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                            return r.getPredicate().apply(exchange);
                        })
                        //instead of immediately stopping main flux due to error, log and swallow it
                        .doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))
                        .onErrorResume(e -> Mono.empty())
                )
                // .defaultIfEmpty() put a static Route not found
                // or .switchIfEmpty()
                // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
                .next()
                //TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                        logger.debug("緩存"+routes.get(md5Key));
                    }
                    // redisTemplate.opsForValue().set(ROUTE_KEY+md5Key,  route.getId(), 5, TimeUnit.MINUTES);
                    routes.put(md5Key, route.getId());
                    validateRoute(route, exchange);
                    return route;
                });

        /* TODO: trace logging
            if (logger.isTraceEnabled()) {
                logger.trace("RouteDefinition did not match: " + routeDefinition.getId());
            }*/
    }

此次修改後路由有了一個較大的提升,開始繼續分析拒絕請求以及卡頓問題。
考慮到是不是 netty 依據電腦的配置做了限制?在自己的筆記本上限制連接在 200 左右,在服務器上在 2000 左右
查了許多資料發現 netty 的對外配置並不是很多,不像 tomcat、undertow 等等
目前使用的 scg 版本較舊沒有辦法將 netty 修改爲 tomcat 或者 undertow,於是我在官網下載了最新的 scg 並將啓動容器修改爲 tomcat 和 undertow 依次進行了嘗試,發現都沒有 200 的限制。

DEFAULT_IO_WORKER_COUNT:如果環境變量有設置 reactor.ipc.netty.workerCount,則用該值;沒有設置則取 Math.max(Runtime.getRuntime().availableProcessors(), 4)))

JSONObject message = new JSONObject();
try {
           Thread.sleep(30000);
} catch (InterruptedException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
}
ServerHttpResponse response = exchange.getResponse();
message.put("code", 4199);
message.put("msg""模擬堵塞");

byte[] bits = message.toJSONString().getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = response.bufferFactory().wrap(bits);
response.setStatusCode(HttpStatus.UNAUTHORIZED);
 // 指定編碼,否則在瀏覽器中會中文亂碼
 response.getHeaders().add("Content-Type""application/json;charset=UTF-8");
 return response.writeWith(Mono.just(buffer));

通過模擬堵塞測試,發現該參數用於控制接口的返回數量,這應該就是壓測時接口卡頓返回的原因了,通過壓測發現該參數在 16 核 cpu 的 3 倍時表現已經較好。16 核 cpu4 倍時單機 scg 壓測時沒有卡頓,但是單機壓 15000 時 cpu 大概在 70-80。

DEFAULT_IO_SELECT_COUNT:如果環境變量有設置 reactor.ipc.netty.selectCount,則用該值;沒有設置則取 - 1,表示沒有 selector thread

找到源碼 reactor.ipc.netty.resources.DefaultLoopResources
看到這段代碼

if (selectCount == -1) {
            this.selectCount = workerCount;
            this.serverSelectLoops = this.serverLoops;
            this.cacheNativeSelectLoops = this.cacheNativeServerLoops;
  }else {
            this.selectCount = selectCount;
            this.serverSelectLoops =
                    new NioEventLoopGroup(selectCount, threadFactory(this, "select-nio"));
            this.cacheNativeSelectLoops = new AtomicReference<>();
}

總結

source: www.icode9.com/content-4-1057716.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/fYgsMgeCpASvk46tn3RuuA