實戰:如何設計一個高性能網關

作者:煙味 i,來源:cnblogs,鏈接:

https://cnblogs.com/2YSP/p/14223892.html

背景

最近在 github 上看了 soul 網關的設計,突然就來了興趣準備自己從零開始寫一個高性能的網關。經過兩週時間的開發,我的網關 ship-gate 核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理後臺😤。

02

設計

2.1 技術選型

網關是所有請求的入口,所以要求有很高的吞吐量,爲了實現這點可以使用請求異步化來解決。目前一般有以下兩種方案:

Servlet3 已經支持異步,這種方案使用比較多,京東,有贊和 Zuul,都用的是這種方案。

Netty 爲高併發而生,目前唯品會的網關使用這個策略,在唯品會的技術文章中在相同的情況下 Netty 是每秒 30w + 的吞吐量,Tomcat 是 13w+, 可以看出是有一定的差距的,但是 Netty 需要自己處理 HTTP 協議,這一塊比較麻煩。

後面發現 Soul 網關是基於 Spring WebFlux(底層 Netty)的,不用太關心 HTTP 協議的處理,於是決定也用 Spring WebFlux。

網關的第二個特點是具備可擴展性,比如 Netflix Zuul 有 preFilters,postFilters 等在不同的階段方便處理不同的業務,基於責任鏈模式將請求進行鏈式處理即可實現。

在微服務架構下,服務都會進行多實例部署來保證高可用,請求到達網關時,網關需要根據 URL 找到所有可用的實例,這時就需要服務註冊和發現功能,即註冊中心。

現在流行的註冊中心有 Apache 的 Zookeeper 和阿里的 Nacos 兩種(consul 有點小衆),因爲之前寫 RPC 框架時已經用過了 Zookeeper,所以這次就選擇了 Nacos。

2.2 需求清單

首先要明確目標,即開發一個具備哪些特性的網關,總結下後如下:

自定義路由規則

可基於 version 的路由規則設置,路由對象包括 DEFAUL,HEADER 和 QUERY 三種,匹配方式包括 =、regex、like 三種。

跨語言

HTTP 協議天生跨語言

高性能

Netty 本身就是一款高性能的通信框架,同時 server 將一些路由規則等數據緩存到 JVM 內存避免請求 admin 服務。

高可用

支持集羣模式防止單節點故障,無狀態。

灰度發佈

灰度發佈(又名金絲雀發佈)是指在黑與白之間,能夠平滑過渡的一種發佈方式。在其上可以進行 A/B testing,即讓一部分用戶繼續用產品特性 A,一部分用戶開始用產品特性 B,如果用戶對 B 沒有什麼反對意見,那麼逐步擴大範圍,把所有用戶都遷移到 B 上面來。通過特性一可以實現。

接口鑑權

基於責任鏈模式,用戶開發自己的鑑權插件即可。

負載均衡

支持多種負載均衡算法,如隨機,輪詢,加權輪詢等。利用 SPI 機制可以根據配置進行動態加載。

2.3 架構設計

在參考了一些優秀的網關 Zuul,Spring Cloud Gateway,Soul 後,將項目劃分爲以下幾個模塊。

它們之間的關係如圖:

網關設計

注意: 這張圖與實際實現有點出入,Nacos push 到本地緩存的那個環節沒有實現,目前只有 ship-sever 定時輪詢 pull 的過程。ship-admin 從 Nacos 獲取註冊服務信息的過程,也改成了 ServiceA 啓動時主動發生 HTTP 請求通知 ship-admin。

2.4 表結構設計

03

編碼

3.1 ship-client-spring-boot-starter

首先創建一個 spring-boot-starter 命名爲 ship-client-spring-boot-starter,不知道如何自定義 starter 的可以看我以前寫的《開發自己的 starter》。

其核心類 AutoRegisterListener 就是在項目啓動時做了兩件事:

  1. 將服務信息註冊到 Nacos 註冊中心

  2. 通知 ship-admin 服務上線了並註冊下線 hook。

代碼如下:

  1* Created by 2YSP on 2020/12/21
  2*/
  3public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
  4
  5   private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);
  6
  7   private volatile AtomicBoolean registered = new AtomicBoolean(false);
  8
  9   private final ClientConfigProperties properties;
 10
 11   @NacosInjected
 12   private NamingService namingService;
 13
 14   @Autowired
 15   private RequestMappingHandlerMapping handlerMapping;
 16
 17   private final ExecutorService pool;
 18
 19   /**
 20* url list to ignore
 21*/
 22   private static List<String> ignoreUrlList = new LinkedList<>();
 23
 24   static {
 25       ignoreUrlList.add("/error");
 26   }
 27
 28   public AutoRegisterListener(ClientConfigProperties properties) {
 29       if (!check(properties)) {
 30           LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");
 31           throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");
 32       }
 33       this.properties = properties;
 34       pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
 35   }
 36
 37   /**
 38* check the ClientConfigProperties
 39*
 40* @param properties
 41* @return
 42*/
 43   private boolean check(ClientConfigProperties properties) {
 44       if (properties.getPort() == null| properties.getContextPath() == null
 45              | properties.getVersion() == null| properties.getAppName() == null
 46              | properties.getAdminUrl() == null) {
 47           return false;
 48       }
 49       return true;
 50   }
 51
 52
 53   @Override
 54   public void onApplicationEvent(ContextRefreshedEvent event) {
 55       if (!registered.compareAndSet(false, true)) {
 56           return;
 57       }
 58       doRegister();
 59       registerShutDownHook();
 60   }
 61
 62   /**
 63* send unregister request to admin when jvm shutdown
 64*/
 65   private void registerShutDownHook() {
 66       final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;
 67       final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();
 68       unregisterAppDTO.setAppName(properties.getAppName());
 69       unregisterAppDTO.setVersion(properties.getVersion());
 70       unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
 71       unregisterAppDTO.setPort(properties.getPort());
 72       Runtime.getRuntime().addShutdownHook(new Thread(() -> {
 73           OkhttpTool.doPost(url, unregisterAppDTO);
 74           LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());
 75       }));
 76   }
 77
 78   /**
 79* register all interface info to register center
 80*/
 81   private void doRegister() {
 82       Instance instance = new Instance();
 83       instance.setIp(IpUtil.getLocalIpAddress());
 84       instance.setPort(properties.getPort());
 85       instance.setEphemeral(true);
 86       Map<String, String> metadataMap = new HashMap<>();
 87       metadataMap.put("version", properties.getVersion());
 88       metadataMap.put("appName", properties.getAppName());
 89       instance.setMetadata(metadataMap);
 90       try {
 91           namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);
 92       } catch (NacosException e) {
 93           LOGGER.error("register to nacos fail", e);
 94           throw new ShipException(e.getErrCode(), e.getErrMsg());
 95       }
 96       LOGGER.info("register interface info to nacos success!");
 97       // send register request to ship-admin
 98       String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;
 99       RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);
100       OkhttpTool.doPost(url, registerAppDTO);
101       LOGGER.info("register to ship-admin success!");
102   }
103
104
105   private RegisterAppDTO buildRegisterAppDTO(Instance instance) {
106       RegisterAppDTO registerAppDTO = new RegisterAppDTO();
107       registerAppDTO.setAppName(properties.getAppName());
108       registerAppDTO.setContextPath(properties.getContextPath());
109       registerAppDTO.setIp(instance.getIp());
110       registerAppDTO.setPort(instance.getPort());
111       registerAppDTO.setVersion(properties.getVersion());
112       return registerAppDTO;
113   }
114}
115
116

ship-sever 項目主要包括了兩個部分內容, 1. 請求動態路由的主流程 2. 本地緩存數據和 ship-admin 及 nacos 同步,這部分在後面 3.3 再講。

ship-server 實現動態路由的原理是利用 WebFilter 攔截請求,然後將請求教給 plugin chain 去鏈式處理。

PluginFilter 根據 URL 解析出 appName,然後將啓用的 plugin 組裝成 plugin chain。

 1public class PluginFilter implements WebFilter {
 2
 3   private ServerConfigProperties properties;
 4
 5   public PluginFilter(ServerConfigProperties properties) {
 6       this.properties = properties;
 7   }
 8
 9   @Override
10   public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
11       String appName = parseAppName(exchange);
12       if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {
13           throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
14       }
15       PluginChain pluginChain = new PluginChain(properties, appName);
16       pluginChain.addPlugin(new DynamicRoutePlugin(properties));
17       pluginChain.addPlugin(new AuthPlugin(properties));
18       return pluginChain.execute(exchange, pluginChain);
19   }
20
21   private String parseAppName(ServerWebExchange exchange) {
22       RequestPath path = exchange.getRequest().getPath();
23       String appName = path.value().split("/")[1];
24       return appName;
25   }
26}```
27
28PluginChain繼承了AbstractShipPlugin並持有所有要執行的插件。
29
30```java
31* @Author: Ship
32* @Description:
33* @Date: Created in 2020/12/25
34*/
35public class PluginChain extends AbstractShipPlugin {
36   /**
37* the pos point to current plugin
38*/
39   private int pos;
40   /**
41* the plugins of chain
42*/
43   private List<ShipPlugin> plugins;
44
45   private final String appName;
46
47   public PluginChain(ServerConfigProperties properties, String appName) {
48       super(properties);
49       this.appName = appName;
50   }
51
52   /**
53* add enabled plugin to chain
54*
55* @param shipPlugin
56*/
57   public void addPlugin(ShipPlugin shipPlugin) {
58       if (plugins == null) {
59           plugins = new ArrayList<>();
60       }
61       if (!PluginCache.isEnabled(appName, shipPlugin.name())) {
62           return;
63       }
64       plugins.add(shipPlugin);
65       // order by the plugin's order
66       plugins.sort(Comparator.comparing(ShipPlugin::order));
67   }
68
69   @Override
70   public Integer order() {
71       return null;
72   }
73
74   @Override
75   public String name() {
76       return null;
77   }
78
79   @Override
80   public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
81       if (pos == plugins.size()) {
82           return exchange.getResponse().setComplete();
83       }
84       return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);
85   }
86
87   public String getAppName() {
88       return appName;
89   }
90
91}
92
93

AbstractShipPlugin 實現了 ShipPlugin 接口,並持有 ServerConfigProperties 配置對象。

  1public abstract class AbstractShipPlugin implements ShipPlugin {
  2
  3   protected ServerConfigProperties properties;
  4
  5   public AbstractShipPlugin(ServerConfigProperties properties) {
  6       this.properties = properties;
  7   }
  8}```
  9
 10ShipPlugin接口定義了所有插件必須實現的三個方法order(),name()和execute()。
 11
 12```java
 13public interface ShipPlugin {
 14   /**
 15* lower values have higher priority
 16*
 17* @return
 18*/
 19   Integer order();
 20
 21   /**
 22* return current plugin name
 23*
 24* @return
 25*/
 26   String name();
 27
 28   Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);
 29
 30}```
 31
 32DynamicRoutePlugin繼承了抽象類AbstractShipPlugin,包含了動態路由的主要業務邏輯。
 33
 34```java
 35* @Author: Ship
 36* @Description:
 37* @Date: Created in 2020/12/25
 38*/
 39public class DynamicRoutePlugin extends AbstractShipPlugin {
 40
 41   private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);
 42
 43   private static WebClient webClient;
 44
 45   private static final Gson gson = new GsonBuilder().create();
 46
 47   static {
 48       HttpClient httpClient = HttpClient.create()
 49               .tcpConfiguration(client ->
 50                       client.doOnConnected(conn ->
 51                               conn.addHandlerLast(new ReadTimeoutHandler(3))
 52                                       .addHandlerLast(new WriteTimeoutHandler(3)))
 53                               .option(ChannelOption.TCP_NODELAY, true)
 54               );
 55       webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
 56               .build();
 57   }
 58
 59   public DynamicRoutePlugin(ServerConfigProperties properties) {
 60       super(properties);
 61   }
 62
 63   @Override
 64   public Integer order() {
 65       return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
 66   }
 67
 68   @Override
 69   public String name() {
 70       return ShipPluginEnum.DYNAMIC_ROUTE.getName();
 71   }
 72
 73   @Override
 74   public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
 75       String appName = pluginChain.getAppName();
 76       ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
 77//        LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));
 78       // request service
 79       String url = buildUrl(exchange, serviceInstance);
 80       return forward(exchange, url);
 81   }
 82
 83   /**
 84* forward request to backend service
 85*
 86* @param exchange
 87* @param url
 88* @return
 89*/
 90   private Mono<Void> forward(ServerWebExchange exchange, String url) {
 91       ServerHttpRequest request = exchange.getRequest();
 92       ServerHttpResponse response = exchange.getResponse();
 93       HttpMethod method = request.getMethod();
 94
 95       WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {
 96           headers.addAll(request.getHeaders());
 97       });
 98
 99       WebClient.RequestHeadersSpec<?> reqHeadersSpec;
100       if (requireHttpBody(method)) {
101           reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
102       } else {
103           reqHeadersSpec = requestBodySpec;
104       }
105       // nio->callback->nio
106       return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
107               .onErrorResume(ex -> {
108                   return Mono.defer(() -> {
109                       String errorResultJson = "";
110                       if (ex instanceof TimeoutException) {
111                           errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";
112                       } else {
113                           errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";
114                       }
115                       return ShipResponseUtil.doResponse(exchange, errorResultJson);
116                   }).then(Mono.empty());
117               }).flatMap(backendResponse -> {
118                   response.setStatusCode(backendResponse.statusCode());
119                   response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
120                   return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
121               });
122   }
123
124   /**
125* weather the http method need http body
126*
127* @param method
128* @return
129*/
130   private boolean requireHttpBody(HttpMethod method) {
131       if (method.equals(HttpMethod.POST)| method.equals(HttpMethod.PUT)| method.equals(HttpMethod.PATCH)) {
132           return true;
133       }
134       return false;
135   }
136
137   private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {
138       ServerHttpRequest request = exchange.getRequest();
139       String query = request.getURI().getQuery();
140       String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");
141       String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;
142       if (!StringUtils.isEmpty(query)) {
143           url = url + "?" + query;
144       }
145       return url;
146   }
147
148
149   /**
150* choose an ServiceInstance according to route rule config and load balancing algorithm
151*
152* @param appName
153* @param request
154* @return
155*/
156   private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {
157       List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName);
158       if (CollectionUtils.isEmpty(serviceInstances)) {
159           LOGGER.error("service instance of {} not find", appName);
160           throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
161       }
162       String version = matchAppVersion(appName, request);
163       if (StringUtils.isEmpty(version)) {
164           throw new ShipException("match app version error");
165       }
166       // filter serviceInstances by version
167       List<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());
168       //Select an instance based on the load balancing algorithm
169       LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);
170       ServiceInstance serviceInstance = loadBalance.chooseOne(instances);
171       return serviceInstance;
172   }
173
174
175   private String matchAppVersion(String appName, ServerHttpRequest request) {
176       List<AppRuleDTO> rules = RouteRuleCache.getRules(appName);
177       rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
178       for (AppRuleDTO rule : rules) {
179           if (match(rule, request)) {
180               return rule.getVersion();
181           }
182       }
183       return null;
184   }
185
186
187   private boolean match(AppRuleDTO rule, ServerHttpRequest request) {
188       String matchObject = rule.getMatchObject();
189       String matchKey = rule.getMatchKey();
190       String matchRule = rule.getMatchRule();
191       Byte matchMethod = rule.getMatchMethod();
192       if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {
193           return true;
194       } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {
195           String param = request.getQueryParams().getFirst(matchKey);
196           if (!StringUtils.isEmpty(param)) {
197               return StringTools.match(param, matchMethod, matchRule);
198           }
199       } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {
200           HttpHeaders headers = request.getHeaders();
201           String headerValue = headers.getFirst(matchKey);
202           if (!StringUtils.isEmpty(headerValue)) {
203               return StringTools.match(headerValue, matchMethod, matchRule);
204           }
205       }
206       return false;
207   }
208
209}
210
211

3.3 數據同步

app 數據同步

後臺服務(如訂單服務)啓動時,只將服務名,版本,ip 地址和端口號註冊到了 Nacos,並沒有實例的權重和啓用的插件信息怎麼辦?

一般在線的實例權重和插件列表都是在管理界面配置,然後動態生效的,所以需要 ship-admin 定時更新實例的權重和插件信息到註冊中心。

對應代碼 ship-admin 的 NacosSyncListener

 1* @Author: Ship
 2* @Description:
 3* @Date: Created in 2020/12/30
 4*/
 5@Configuration
 6public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent> {
 7
 8   private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);
 9
10   private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
11           new ShipThreadFactory("nacos-sync", true).create());
12
13   @NacosInjected
14   private NamingService namingService;
15
16   @Value("${nacos.discovery.server-addr}")
17   private String baseUrl;
18
19   @Resource
20   private AppService appService;
21
22   @Override
23   public void onApplicationEvent(ContextRefreshedEvent event) {
24       if (event.getApplicationContext().getParent() != null) {
25           return;
26       }
27       String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;
28       scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS);
29   }
30
31   class NacosSyncTask implements Runnable {
32
33       private NamingService namingService;
34
35       private String url;
36
37       private AppService appService;
38
39       private Gson gson = new GsonBuilder().create();
40
41       public NacosSyncTask(NamingService namingService, String url, AppService appService) {
42           this.namingService = namingService;
43           this.url = url;
44           this.appService = appService;
45       }
46
47       /**
48* Regular update weight,enabled plugins to nacos instance
49*/
50       @Override
51       public void run() {
52           try {
53               // get all app names
54               ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
55               if (CollectionUtils.isEmpty(services.getData())) {
56                   return;
57               }
58               List<String> appNames = services.getData();
59               List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);
60               for (AppInfoDTO appInfo : appInfos) {
61                   if (CollectionUtils.isEmpty(appInfo.getInstances())) {
62                       continue;
63                   }
64                   for (ServiceInstance instance : appInfo.getInstances()) {
65                       Map<String, Object> queryMap = buildQueryMap(appInfo, instance);
66                       String resp = OkhttpTool.doPut(url, queryMap, "");
67                       LOGGER.debug("response :{}", resp);
68                   }
69               }
70
71           } catch (Exception e) {
72               LOGGER.error("nacos sync task error", e);
73           }
74       }
75
76       private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {
77           Map<String, Object> map = new HashMap<>();
78           map.put("serviceName", appInfo.getAppName());
79           map.put("groupName", NacosConstants.APP_GROUP_NAME);
80           map.put("ip", instance.getIp());
81           map.put("port", instance.getPort());
82           map.put("weight", instance.getWeight().doubleValue());
83           NacosMetadata metadata = new NacosMetadata();
84           metadata.setAppName(appInfo.getAppName());
85           metadata.setVersion(instance.getVersion());
86           metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));
87           map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));
88           map.put("ephemeral", true);
89           return map;
90       }
91   }
92}
93
94

ship-server 再定時從 Nacos 拉取 app 數據更新到本地 Map 緩存。

 1* @Author: Ship
 2* @Description: sync data to local cache
 3* @Date: Created in 2020/12/25
 4*/
 5@Configuration
 6public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent> {
 7
 8   private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
 9           new ShipThreadFactory("service-sync", true).create());
10
11   @NacosInjected
12   private NamingService namingService;
13
14   @Autowired
15   private ServerConfigProperties properties;
16
17   @Override
18   public void onApplicationEvent(ContextRefreshedEvent event) {
19       if (event.getApplicationContext().getParent() != null) {
20           return;
21       }
22       scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)
23               , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);
24       WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());
25       websocketSyncCacheServer.start();
26   }
27
28
29   class DataSyncTask implements Runnable {
30
31       private NamingService namingService;
32
33       public DataSyncTask(NamingService namingService) {
34           this.namingService = namingService;
35       }
36
37       @Override
38       public void run() {
39           try {
40               // get all app names
41               ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
42               if (CollectionUtils.isEmpty(services.getData())) {
43                   return;
44               }
45               List<String> appNames = services.getData();
46               // get all instances
47               for (String appName : appNames) {
48                   List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);
49                   if (CollectionUtils.isEmpty(instanceList)) {
50                       continue;
51                   }
52                   ServiceCache.add(appName, buildServiceInstances(instanceList));
53                   List<String> pluginNames = getEnabledPlugins(instanceList);
54                   PluginCache.add(appName, pluginNames);
55               }
56               ServiceCache.removeExpired(appNames);
57               PluginCache.removeExpired(appNames);
58
59           } catch (NacosException e) {
60               e.printStackTrace();
61           }
62       }
63
64       private List<String> getEnabledPlugins(List<Instance> instanceList) {
65           Instance instance = instanceList.get(0);
66           Map<String, String> metadata = instance.getMetadata();
67           // plugins: DynamicRoute,Auth
68           String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());
69           return Arrays.stream(plugins.split(",")).collect(Collectors.toList());
70       }
71
72       private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {
73           List<ServiceInstance> list = new LinkedList<>();
74           instanceList.forEach(instance -> {
75               Map<String, String> metadata = instance.getMetadata();
76               ServiceInstance serviceInstance = new ServiceInstance();
77               serviceInstance.setAppName(metadata.get("appName"));
78               serviceInstance.setIp(instance.getIp());
79               serviceInstance.setPort(instance.getPort());
80               serviceInstance.setVersion(metadata.get("version"));
81               serviceInstance.setWeight((int) instance.getWeight());
82               list.add(serviceInstance);
83           });
84           return list;
85       }
86   }
87}
88
89

路由規則數據同步

同時,如果用戶在管理後臺更新了路由規則,ship-admin 需要推送規則數據到 ship-server,這裏參考了 soul 網關的做法利用 websocket 在第一次建立連接後進行全量同步,此後路由規則發生變更就只作增量同步。

服務端 WebsocketSyncCacheServer:

 1* @Author: Ship
 2* @Description:
 3* @Date: Created in 2020/12/28
 4*/
 5public class WebsocketSyncCacheServer extends WebSocketServer {
 6
 7   private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);
 8
 9   private Gson gson = new GsonBuilder().create();
10
11   private MessageHandler messageHandler;
12
13   public WebsocketSyncCacheServer(Integer port) {
14       super(new InetSocketAddress(port));
15       this.messageHandler = new MessageHandler();
16   }
17
18
19   @Override
20   public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
21       LOGGER.info("server is open");
22   }
23
24   @Override
25   public void onClose(WebSocket webSocket, int i, String s, boolean b) {
26       LOGGER.info("websocket server close...");
27   }
28
29   @Override
30   public void onMessage(WebSocket webSocket, String message) {
31       LOGGER.info("websocket server receive message:\n[{}]", message);
32       this.messageHandler.handler(message);
33   }
34
35   @Override
36   public void onError(WebSocket webSocket, Exception e) {
37
38   }
39
40   @Override
41   public void onStart() {
42       LOGGER.info("websocket server start...");
43   }
44
45
46   class MessageHandler {
47
48       public void handler(String message) {
49           RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);
50           if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {
51               return;
52           }
53           Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList()
54                   .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
55           if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
56                  | OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {
57               RouteRuleCache.add(map);
58           } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {
59               RouteRuleCache.remove(map);
60           }
61       }
62   }
63}
64
65

客戶端 WebsocketSyncCacheClient:

 1* @Author: Ship
 2* @Description:
 3* @Date: Created in 2020/12/28
 4*/
 5@Component
 6public class WebsocketSyncCacheClient {
 7
 8   private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);
 9
10   private WebSocketClient client;
11
12   private RuleService ruleService;
13
14   private Gson gson = new GsonBuilder().create();
15
16   public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
17RuleService ruleService) {
18       if (StringUtils.isEmpty(serverWebSocketUrl)) {
19           throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);
20       }
21       this.ruleService = ruleService;
22       ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
23               new ShipThreadFactory("websocket-connect", true).create());
24       try {
25           client = new WebSocketClient(new URI(serverWebSocketUrl)) {
26               @Override
27               public void onOpen(ServerHandshake serverHandshake) {
28                   LOGGER.info("client is open");
29                   List<AppRuleDTO> list = ruleService.getEnabledRule();
30                   String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));
31                   send(msg);
32               }
33
34               @Override
35               public void onMessage(String s) {
36               }
37
38               @Override
39               public void onClose(int i, String s, boolean b) {
40               }
41
42               @Override
43               public void onError(Exception e) {
44                   LOGGER.error("websocket client error", e);
45               }
46           };
47
48           client.connectBlocking();
49           //使用調度線程池進行斷線重連,30秒進行一次
50           executor.scheduleAtFixedRate(() -> {
51               if (client != null && client.isClosed()) {
52                   try {
53                       client.reconnectBlocking();
54                   } catch (InterruptedException e) {
55                       LOGGER.error("reconnect server fail", e);
56                   }
57               }
58           }, 10, 30, TimeUnit.SECONDS);
59
60       } catch (Exception e) {
61           LOGGER.error("websocket sync cache exception", e);
62           throw new ShipException(e.getMessage());
63       }
64   }
65
66   public <T> void send(T t) {
67       while (!client.getReadyState().equals(ReadyState.OPEN)) {
68           LOGGER.debug("connecting ...please wait");
69       }
70       client.send(gson.toJson(t));
71   }
72}
73
74

04

測試

4.1 動態路由測試

1、本地啓動 nacos ,sh startup.sh -m standalone

2、啓動 ship-admin

3、本地啓動兩個 ship-example 實例。

實例 1 配置:

 1ship:
 2 http:
 3   app-name: order
 4   version: gray_1.0
 5   context-path: /order
 6   port: 8081
 7   admin-url: 127.0.0.1:9001
 8
 9 server:
10 port: 8081
11
12 nacos:
13 discovery:
14   server-addr: 127.0.0.1:8848
15
16

實例 2 配置:

 1ship:
 2 http:
 3   app-name: order
 4   version: prod_1.0
 5   context-path: /order
 6   port: 8082
 7   admin-url: 127.0.0.1:9001
 8
 9 server:
10 port: 8082
11
12 nacos:
13 discovery:
14   server-addr: 127.0.0.1:8848
15
16

4、在數據庫添加路由規則配置,該規則表示當 http header 中的 name=ship 時請求路由到 gray_1.0 版本的節點。

5、啓動 ship-server, 看到以下日誌時則可以進行測試了。

12021-01-02 19:57:09.159  INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer      : websocket server receive message:
2 [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
3
4

6、用 Postman 請求http://localhost:9000/order/user/add,POST 方式,header 設置 name=ship,可以看到只有實例 1 有日誌顯示。

1==========add user,version:gray_1.0
2
3

4.2 性能壓測

壓測環境:

壓測結果

05

總結

千里之行始於足下,開始以爲寫一個網關會很難,但當你實際開始行動時就會發現其實沒那麼難,所以邁出第一步很重要。過程中也遇到了很多問題,還在 github 上給 soul 和 nacos 這兩個開源項目提了兩個 issue,後來發現是自己的問題,尷尬😅。

本文代碼已全部上傳到 https://github.com/2YSP/ship-gate

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/AULqNKeaYl6o95gq0_ScPQ