實戰:如何設計一個高性能網關
作者:煙味 i,來源:cnblogs,鏈接:
https://cnblogs.com/2YSP/p/14223892.html
背景
最近在 github 上看了 soul 網關的設計,突然就來了興趣準備自己從零開始寫一個高性能的網關。經過兩週時間的開發,我的網關 ship-gate 核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理後臺😤。
02
設計
2.1 技術選型
網關是所有請求的入口,所以要求有很高的吞吐量,爲了實現這點可以使用請求異步化來解決。目前一般有以下兩種方案:
- Tomcat/Jetty+NIO+Servlet3
Servlet3 已經支持異步,這種方案使用比較多,京東,有贊和 Zuul,都用的是這種方案。
- Netty+NIO
Netty 爲高併發而生,目前唯品會的網關使用這個策略,在唯品會的技術文章中在相同的情況下 Netty 是每秒 30w + 的吞吐量,Tomcat 是 13w+, 可以看出是有一定的差距的,但是 Netty 需要自己處理 HTTP 協議,這一塊比較麻煩。
後面發現 Soul 網關是基於 Spring WebFlux(底層 Netty)的,不用太關心 HTTP 協議的處理,於是決定也用 Spring WebFlux。
網關的第二個特點是具備可擴展性,比如 Netflix Zuul 有 preFilters,postFilters 等在不同的階段方便處理不同的業務,基於責任鏈模式將請求進行鏈式處理即可實現。
在微服務架構下,服務都會進行多實例部署來保證高可用,請求到達網關時,網關需要根據 URL 找到所有可用的實例,這時就需要服務註冊和發現功能,即註冊中心。
現在流行的註冊中心有 Apache 的 Zookeeper 和阿里的 Nacos 兩種(consul 有點小衆),因爲之前寫 RPC 框架時已經用過了 Zookeeper,所以這次就選擇了 Nacos。
2.2 需求清單
首先要明確目標,即開發一個具備哪些特性的網關,總結下後如下:
自定義路由規則
可基於 version 的路由規則設置,路由對象包括 DEFAUL,HEADER 和 QUERY 三種,匹配方式包括 =、regex、like 三種。
跨語言
HTTP 協議天生跨語言
高性能
Netty 本身就是一款高性能的通信框架,同時 server 將一些路由規則等數據緩存到 JVM 內存避免請求 admin 服務。
高可用
支持集羣模式防止單節點故障,無狀態。
灰度發佈
灰度發佈(又名金絲雀發佈)是指在黑與白之間,能夠平滑過渡的一種發佈方式。在其上可以進行 A/B testing,即讓一部分用戶繼續用產品特性 A,一部分用戶開始用產品特性 B,如果用戶對 B 沒有什麼反對意見,那麼逐步擴大範圍,把所有用戶都遷移到 B 上面來。通過特性一可以實現。
接口鑑權
基於責任鏈模式,用戶開發自己的鑑權插件即可。
負載均衡
支持多種負載均衡算法,如隨機,輪詢,加權輪詢等。利用 SPI 機制可以根據配置進行動態加載。
2.3 架構設計
在參考了一些優秀的網關 Zuul,Spring Cloud Gateway,Soul 後,將項目劃分爲以下幾個模塊。
它們之間的關係如圖:
網關設計
注意: 這張圖與實際實現有點出入,Nacos push 到本地緩存的那個環節沒有實現,目前只有 ship-sever 定時輪詢 pull 的過程。ship-admin 從 Nacos 獲取註冊服務信息的過程,也改成了 ServiceA 啓動時主動發生 HTTP 請求通知 ship-admin。
2.4 表結構設計
03
編碼
3.1 ship-client-spring-boot-starter
首先創建一個 spring-boot-starter 命名爲 ship-client-spring-boot-starter,不知道如何自定義 starter 的可以看我以前寫的《開發自己的 starter》。
其核心類 AutoRegisterListener 就是在項目啓動時做了兩件事:
-
將服務信息註冊到 Nacos 註冊中心
-
通知 ship-admin 服務上線了並註冊下線 hook。
代碼如下:
1* Created by 2YSP on 2020/12/21
2*/
3public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
4
5 private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);
6
7 private volatile AtomicBoolean registered = new AtomicBoolean(false);
8
9 private final ClientConfigProperties properties;
10
11 @NacosInjected
12 private NamingService namingService;
13
14 @Autowired
15 private RequestMappingHandlerMapping handlerMapping;
16
17 private final ExecutorService pool;
18
19 /**
20* url list to ignore
21*/
22 private static List<String> ignoreUrlList = new LinkedList<>();
23
24 static {
25 ignoreUrlList.add("/error");
26 }
27
28 public AutoRegisterListener(ClientConfigProperties properties) {
29 if (!check(properties)) {
30 LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");
31 throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");
32 }
33 this.properties = properties;
34 pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
35 }
36
37 /**
38* check the ClientConfigProperties
39*
40* @param properties
41* @return
42*/
43 private boolean check(ClientConfigProperties properties) {
44 if (properties.getPort() == null| properties.getContextPath() == null
45 | properties.getVersion() == null| properties.getAppName() == null
46 | properties.getAdminUrl() == null) {
47 return false;
48 }
49 return true;
50 }
51
52
53 @Override
54 public void onApplicationEvent(ContextRefreshedEvent event) {
55 if (!registered.compareAndSet(false, true)) {
56 return;
57 }
58 doRegister();
59 registerShutDownHook();
60 }
61
62 /**
63* send unregister request to admin when jvm shutdown
64*/
65 private void registerShutDownHook() {
66 final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;
67 final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();
68 unregisterAppDTO.setAppName(properties.getAppName());
69 unregisterAppDTO.setVersion(properties.getVersion());
70 unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
71 unregisterAppDTO.setPort(properties.getPort());
72 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
73 OkhttpTool.doPost(url, unregisterAppDTO);
74 LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());
75 }));
76 }
77
78 /**
79* register all interface info to register center
80*/
81 private void doRegister() {
82 Instance instance = new Instance();
83 instance.setIp(IpUtil.getLocalIpAddress());
84 instance.setPort(properties.getPort());
85 instance.setEphemeral(true);
86 Map<String, String> metadataMap = new HashMap<>();
87 metadataMap.put("version", properties.getVersion());
88 metadataMap.put("appName", properties.getAppName());
89 instance.setMetadata(metadataMap);
90 try {
91 namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);
92 } catch (NacosException e) {
93 LOGGER.error("register to nacos fail", e);
94 throw new ShipException(e.getErrCode(), e.getErrMsg());
95 }
96 LOGGER.info("register interface info to nacos success!");
97 // send register request to ship-admin
98 String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;
99 RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);
100 OkhttpTool.doPost(url, registerAppDTO);
101 LOGGER.info("register to ship-admin success!");
102 }
103
104
105 private RegisterAppDTO buildRegisterAppDTO(Instance instance) {
106 RegisterAppDTO registerAppDTO = new RegisterAppDTO();
107 registerAppDTO.setAppName(properties.getAppName());
108 registerAppDTO.setContextPath(properties.getContextPath());
109 registerAppDTO.setIp(instance.getIp());
110 registerAppDTO.setPort(instance.getPort());
111 registerAppDTO.setVersion(properties.getVersion());
112 return registerAppDTO;
113 }
114}
115
116
ship-sever 項目主要包括了兩個部分內容, 1. 請求動態路由的主流程 2. 本地緩存數據和 ship-admin 及 nacos 同步,這部分在後面 3.3 再講。
ship-server 實現動態路由的原理是利用 WebFilter 攔截請求,然後將請求教給 plugin chain 去鏈式處理。
PluginFilter 根據 URL 解析出 appName,然後將啓用的 plugin 組裝成 plugin chain。
1public class PluginFilter implements WebFilter {
2
3 private ServerConfigProperties properties;
4
5 public PluginFilter(ServerConfigProperties properties) {
6 this.properties = properties;
7 }
8
9 @Override
10 public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
11 String appName = parseAppName(exchange);
12 if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {
13 throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
14 }
15 PluginChain pluginChain = new PluginChain(properties, appName);
16 pluginChain.addPlugin(new DynamicRoutePlugin(properties));
17 pluginChain.addPlugin(new AuthPlugin(properties));
18 return pluginChain.execute(exchange, pluginChain);
19 }
20
21 private String parseAppName(ServerWebExchange exchange) {
22 RequestPath path = exchange.getRequest().getPath();
23 String appName = path.value().split("/")[1];
24 return appName;
25 }
26}```
27
28PluginChain繼承了AbstractShipPlugin並持有所有要執行的插件。
29
30```java
31* @Author: Ship
32* @Description:
33* @Date: Created in 2020/12/25
34*/
35public class PluginChain extends AbstractShipPlugin {
36 /**
37* the pos point to current plugin
38*/
39 private int pos;
40 /**
41* the plugins of chain
42*/
43 private List<ShipPlugin> plugins;
44
45 private final String appName;
46
47 public PluginChain(ServerConfigProperties properties, String appName) {
48 super(properties);
49 this.appName = appName;
50 }
51
52 /**
53* add enabled plugin to chain
54*
55* @param shipPlugin
56*/
57 public void addPlugin(ShipPlugin shipPlugin) {
58 if (plugins == null) {
59 plugins = new ArrayList<>();
60 }
61 if (!PluginCache.isEnabled(appName, shipPlugin.name())) {
62 return;
63 }
64 plugins.add(shipPlugin);
65 // order by the plugin's order
66 plugins.sort(Comparator.comparing(ShipPlugin::order));
67 }
68
69 @Override
70 public Integer order() {
71 return null;
72 }
73
74 @Override
75 public String name() {
76 return null;
77 }
78
79 @Override
80 public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
81 if (pos == plugins.size()) {
82 return exchange.getResponse().setComplete();
83 }
84 return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);
85 }
86
87 public String getAppName() {
88 return appName;
89 }
90
91}
92
93
AbstractShipPlugin 實現了 ShipPlugin 接口,並持有 ServerConfigProperties 配置對象。
1public abstract class AbstractShipPlugin implements ShipPlugin {
2
3 protected ServerConfigProperties properties;
4
5 public AbstractShipPlugin(ServerConfigProperties properties) {
6 this.properties = properties;
7 }
8}```
9
10ShipPlugin接口定義了所有插件必須實現的三個方法order(),name()和execute()。
11
12```java
13public interface ShipPlugin {
14 /**
15* lower values have higher priority
16*
17* @return
18*/
19 Integer order();
20
21 /**
22* return current plugin name
23*
24* @return
25*/
26 String name();
27
28 Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);
29
30}```
31
32DynamicRoutePlugin繼承了抽象類AbstractShipPlugin,包含了動態路由的主要業務邏輯。
33
34```java
35* @Author: Ship
36* @Description:
37* @Date: Created in 2020/12/25
38*/
39public class DynamicRoutePlugin extends AbstractShipPlugin {
40
41 private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);
42
43 private static WebClient webClient;
44
45 private static final Gson gson = new GsonBuilder().create();
46
47 static {
48 HttpClient httpClient = HttpClient.create()
49 .tcpConfiguration(client ->
50 client.doOnConnected(conn ->
51 conn.addHandlerLast(new ReadTimeoutHandler(3))
52 .addHandlerLast(new WriteTimeoutHandler(3)))
53 .option(ChannelOption.TCP_NODELAY, true)
54 );
55 webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
56 .build();
57 }
58
59 public DynamicRoutePlugin(ServerConfigProperties properties) {
60 super(properties);
61 }
62
63 @Override
64 public Integer order() {
65 return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
66 }
67
68 @Override
69 public String name() {
70 return ShipPluginEnum.DYNAMIC_ROUTE.getName();
71 }
72
73 @Override
74 public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
75 String appName = pluginChain.getAppName();
76 ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
77// LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));
78 // request service
79 String url = buildUrl(exchange, serviceInstance);
80 return forward(exchange, url);
81 }
82
83 /**
84* forward request to backend service
85*
86* @param exchange
87* @param url
88* @return
89*/
90 private Mono<Void> forward(ServerWebExchange exchange, String url) {
91 ServerHttpRequest request = exchange.getRequest();
92 ServerHttpResponse response = exchange.getResponse();
93 HttpMethod method = request.getMethod();
94
95 WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {
96 headers.addAll(request.getHeaders());
97 });
98
99 WebClient.RequestHeadersSpec<?> reqHeadersSpec;
100 if (requireHttpBody(method)) {
101 reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
102 } else {
103 reqHeadersSpec = requestBodySpec;
104 }
105 // nio->callback->nio
106 return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
107 .onErrorResume(ex -> {
108 return Mono.defer(() -> {
109 String errorResultJson = "";
110 if (ex instanceof TimeoutException) {
111 errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";
112 } else {
113 errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";
114 }
115 return ShipResponseUtil.doResponse(exchange, errorResultJson);
116 }).then(Mono.empty());
117 }).flatMap(backendResponse -> {
118 response.setStatusCode(backendResponse.statusCode());
119 response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
120 return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
121 });
122 }
123
124 /**
125* weather the http method need http body
126*
127* @param method
128* @return
129*/
130 private boolean requireHttpBody(HttpMethod method) {
131 if (method.equals(HttpMethod.POST)| method.equals(HttpMethod.PUT)| method.equals(HttpMethod.PATCH)) {
132 return true;
133 }
134 return false;
135 }
136
137 private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {
138 ServerHttpRequest request = exchange.getRequest();
139 String query = request.getURI().getQuery();
140 String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");
141 String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;
142 if (!StringUtils.isEmpty(query)) {
143 url = url + "?" + query;
144 }
145 return url;
146 }
147
148
149 /**
150* choose an ServiceInstance according to route rule config and load balancing algorithm
151*
152* @param appName
153* @param request
154* @return
155*/
156 private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {
157 List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName);
158 if (CollectionUtils.isEmpty(serviceInstances)) {
159 LOGGER.error("service instance of {} not find", appName);
160 throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
161 }
162 String version = matchAppVersion(appName, request);
163 if (StringUtils.isEmpty(version)) {
164 throw new ShipException("match app version error");
165 }
166 // filter serviceInstances by version
167 List<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());
168 //Select an instance based on the load balancing algorithm
169 LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);
170 ServiceInstance serviceInstance = loadBalance.chooseOne(instances);
171 return serviceInstance;
172 }
173
174
175 private String matchAppVersion(String appName, ServerHttpRequest request) {
176 List<AppRuleDTO> rules = RouteRuleCache.getRules(appName);
177 rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
178 for (AppRuleDTO rule : rules) {
179 if (match(rule, request)) {
180 return rule.getVersion();
181 }
182 }
183 return null;
184 }
185
186
187 private boolean match(AppRuleDTO rule, ServerHttpRequest request) {
188 String matchObject = rule.getMatchObject();
189 String matchKey = rule.getMatchKey();
190 String matchRule = rule.getMatchRule();
191 Byte matchMethod = rule.getMatchMethod();
192 if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {
193 return true;
194 } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {
195 String param = request.getQueryParams().getFirst(matchKey);
196 if (!StringUtils.isEmpty(param)) {
197 return StringTools.match(param, matchMethod, matchRule);
198 }
199 } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {
200 HttpHeaders headers = request.getHeaders();
201 String headerValue = headers.getFirst(matchKey);
202 if (!StringUtils.isEmpty(headerValue)) {
203 return StringTools.match(headerValue, matchMethod, matchRule);
204 }
205 }
206 return false;
207 }
208
209}
210
211
3.3 數據同步
app 數據同步
後臺服務(如訂單服務)啓動時,只將服務名,版本,ip 地址和端口號註冊到了 Nacos,並沒有實例的權重和啓用的插件信息怎麼辦?
一般在線的實例權重和插件列表都是在管理界面配置,然後動態生效的,所以需要 ship-admin 定時更新實例的權重和插件信息到註冊中心。
對應代碼 ship-admin 的 NacosSyncListener
1* @Author: Ship
2* @Description:
3* @Date: Created in 2020/12/30
4*/
5@Configuration
6public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent> {
7
8 private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);
9
10 private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
11 new ShipThreadFactory("nacos-sync", true).create());
12
13 @NacosInjected
14 private NamingService namingService;
15
16 @Value("${nacos.discovery.server-addr}")
17 private String baseUrl;
18
19 @Resource
20 private AppService appService;
21
22 @Override
23 public void onApplicationEvent(ContextRefreshedEvent event) {
24 if (event.getApplicationContext().getParent() != null) {
25 return;
26 }
27 String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;
28 scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS);
29 }
30
31 class NacosSyncTask implements Runnable {
32
33 private NamingService namingService;
34
35 private String url;
36
37 private AppService appService;
38
39 private Gson gson = new GsonBuilder().create();
40
41 public NacosSyncTask(NamingService namingService, String url, AppService appService) {
42 this.namingService = namingService;
43 this.url = url;
44 this.appService = appService;
45 }
46
47 /**
48* Regular update weight,enabled plugins to nacos instance
49*/
50 @Override
51 public void run() {
52 try {
53 // get all app names
54 ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
55 if (CollectionUtils.isEmpty(services.getData())) {
56 return;
57 }
58 List<String> appNames = services.getData();
59 List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);
60 for (AppInfoDTO appInfo : appInfos) {
61 if (CollectionUtils.isEmpty(appInfo.getInstances())) {
62 continue;
63 }
64 for (ServiceInstance instance : appInfo.getInstances()) {
65 Map<String, Object> queryMap = buildQueryMap(appInfo, instance);
66 String resp = OkhttpTool.doPut(url, queryMap, "");
67 LOGGER.debug("response :{}", resp);
68 }
69 }
70
71 } catch (Exception e) {
72 LOGGER.error("nacos sync task error", e);
73 }
74 }
75
76 private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {
77 Map<String, Object> map = new HashMap<>();
78 map.put("serviceName", appInfo.getAppName());
79 map.put("groupName", NacosConstants.APP_GROUP_NAME);
80 map.put("ip", instance.getIp());
81 map.put("port", instance.getPort());
82 map.put("weight", instance.getWeight().doubleValue());
83 NacosMetadata metadata = new NacosMetadata();
84 metadata.setAppName(appInfo.getAppName());
85 metadata.setVersion(instance.getVersion());
86 metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));
87 map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));
88 map.put("ephemeral", true);
89 return map;
90 }
91 }
92}
93
94
ship-server 再定時從 Nacos 拉取 app 數據更新到本地 Map 緩存。
1* @Author: Ship
2* @Description: sync data to local cache
3* @Date: Created in 2020/12/25
4*/
5@Configuration
6public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent> {
7
8 private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
9 new ShipThreadFactory("service-sync", true).create());
10
11 @NacosInjected
12 private NamingService namingService;
13
14 @Autowired
15 private ServerConfigProperties properties;
16
17 @Override
18 public void onApplicationEvent(ContextRefreshedEvent event) {
19 if (event.getApplicationContext().getParent() != null) {
20 return;
21 }
22 scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)
23 , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);
24 WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());
25 websocketSyncCacheServer.start();
26 }
27
28
29 class DataSyncTask implements Runnable {
30
31 private NamingService namingService;
32
33 public DataSyncTask(NamingService namingService) {
34 this.namingService = namingService;
35 }
36
37 @Override
38 public void run() {
39 try {
40 // get all app names
41 ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
42 if (CollectionUtils.isEmpty(services.getData())) {
43 return;
44 }
45 List<String> appNames = services.getData();
46 // get all instances
47 for (String appName : appNames) {
48 List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);
49 if (CollectionUtils.isEmpty(instanceList)) {
50 continue;
51 }
52 ServiceCache.add(appName, buildServiceInstances(instanceList));
53 List<String> pluginNames = getEnabledPlugins(instanceList);
54 PluginCache.add(appName, pluginNames);
55 }
56 ServiceCache.removeExpired(appNames);
57 PluginCache.removeExpired(appNames);
58
59 } catch (NacosException e) {
60 e.printStackTrace();
61 }
62 }
63
64 private List<String> getEnabledPlugins(List<Instance> instanceList) {
65 Instance instance = instanceList.get(0);
66 Map<String, String> metadata = instance.getMetadata();
67 // plugins: DynamicRoute,Auth
68 String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());
69 return Arrays.stream(plugins.split(",")).collect(Collectors.toList());
70 }
71
72 private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {
73 List<ServiceInstance> list = new LinkedList<>();
74 instanceList.forEach(instance -> {
75 Map<String, String> metadata = instance.getMetadata();
76 ServiceInstance serviceInstance = new ServiceInstance();
77 serviceInstance.setAppName(metadata.get("appName"));
78 serviceInstance.setIp(instance.getIp());
79 serviceInstance.setPort(instance.getPort());
80 serviceInstance.setVersion(metadata.get("version"));
81 serviceInstance.setWeight((int) instance.getWeight());
82 list.add(serviceInstance);
83 });
84 return list;
85 }
86 }
87}
88
89
路由規則數據同步
同時,如果用戶在管理後臺更新了路由規則,ship-admin 需要推送規則數據到 ship-server,這裏參考了 soul 網關的做法利用 websocket 在第一次建立連接後進行全量同步,此後路由規則發生變更就只作增量同步。
服務端 WebsocketSyncCacheServer:
1* @Author: Ship
2* @Description:
3* @Date: Created in 2020/12/28
4*/
5public class WebsocketSyncCacheServer extends WebSocketServer {
6
7 private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);
8
9 private Gson gson = new GsonBuilder().create();
10
11 private MessageHandler messageHandler;
12
13 public WebsocketSyncCacheServer(Integer port) {
14 super(new InetSocketAddress(port));
15 this.messageHandler = new MessageHandler();
16 }
17
18
19 @Override
20 public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
21 LOGGER.info("server is open");
22 }
23
24 @Override
25 public void onClose(WebSocket webSocket, int i, String s, boolean b) {
26 LOGGER.info("websocket server close...");
27 }
28
29 @Override
30 public void onMessage(WebSocket webSocket, String message) {
31 LOGGER.info("websocket server receive message:\n[{}]", message);
32 this.messageHandler.handler(message);
33 }
34
35 @Override
36 public void onError(WebSocket webSocket, Exception e) {
37
38 }
39
40 @Override
41 public void onStart() {
42 LOGGER.info("websocket server start...");
43 }
44
45
46 class MessageHandler {
47
48 public void handler(String message) {
49 RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);
50 if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {
51 return;
52 }
53 Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList()
54 .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
55 if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
56 | OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {
57 RouteRuleCache.add(map);
58 } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {
59 RouteRuleCache.remove(map);
60 }
61 }
62 }
63}
64
65
客戶端 WebsocketSyncCacheClient:
1* @Author: Ship
2* @Description:
3* @Date: Created in 2020/12/28
4*/
5@Component
6public class WebsocketSyncCacheClient {
7
8 private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);
9
10 private WebSocketClient client;
11
12 private RuleService ruleService;
13
14 private Gson gson = new GsonBuilder().create();
15
16 public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
17RuleService ruleService) {
18 if (StringUtils.isEmpty(serverWebSocketUrl)) {
19 throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);
20 }
21 this.ruleService = ruleService;
22 ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
23 new ShipThreadFactory("websocket-connect", true).create());
24 try {
25 client = new WebSocketClient(new URI(serverWebSocketUrl)) {
26 @Override
27 public void onOpen(ServerHandshake serverHandshake) {
28 LOGGER.info("client is open");
29 List<AppRuleDTO> list = ruleService.getEnabledRule();
30 String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));
31 send(msg);
32 }
33
34 @Override
35 public void onMessage(String s) {
36 }
37
38 @Override
39 public void onClose(int i, String s, boolean b) {
40 }
41
42 @Override
43 public void onError(Exception e) {
44 LOGGER.error("websocket client error", e);
45 }
46 };
47
48 client.connectBlocking();
49 //使用調度線程池進行斷線重連,30秒進行一次
50 executor.scheduleAtFixedRate(() -> {
51 if (client != null && client.isClosed()) {
52 try {
53 client.reconnectBlocking();
54 } catch (InterruptedException e) {
55 LOGGER.error("reconnect server fail", e);
56 }
57 }
58 }, 10, 30, TimeUnit.SECONDS);
59
60 } catch (Exception e) {
61 LOGGER.error("websocket sync cache exception", e);
62 throw new ShipException(e.getMessage());
63 }
64 }
65
66 public <T> void send(T t) {
67 while (!client.getReadyState().equals(ReadyState.OPEN)) {
68 LOGGER.debug("connecting ...please wait");
69 }
70 client.send(gson.toJson(t));
71 }
72}
73
74
04
測試
4.1 動態路由測試
1、本地啓動 nacos ,sh startup.sh -m standalone
2、啓動 ship-admin
3、本地啓動兩個 ship-example 實例。
實例 1 配置:
1ship:
2 http:
3 app-name: order
4 version: gray_1.0
5 context-path: /order
6 port: 8081
7 admin-url: 127.0.0.1:9001
8
9 server:
10 port: 8081
11
12 nacos:
13 discovery:
14 server-addr: 127.0.0.1:8848
15
16
實例 2 配置:
1ship:
2 http:
3 app-name: order
4 version: prod_1.0
5 context-path: /order
6 port: 8082
7 admin-url: 127.0.0.1:9001
8
9 server:
10 port: 8082
11
12 nacos:
13 discovery:
14 server-addr: 127.0.0.1:8848
15
16
4、在數據庫添加路由規則配置,該規則表示當 http header 中的 name=ship 時請求路由到 gray_1.0 版本的節點。
5、啓動 ship-server, 看到以下日誌時則可以進行測試了。
12021-01-02 19:57:09.159 INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer : websocket server receive message:
2 [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
3
4
6、用 Postman 請求http://localhost:9000/order/user/add
,POST 方式,header 設置 name=ship,可以看到只有實例 1 有日誌顯示。
1==========add user,version:gray_1.0
2
3
4.2 性能壓測
壓測環境:
-
MacBook Pro 13 英寸
-
處理器 2.3 GHz 四核 Intel Core i7
-
內存 16 GB 3733 MHz LPDDR4X
-
後端節點個數一個
-
壓測工具:wrk
-
壓測結果:20 個線程,500 個連接數,吞吐量大概每秒 9400 個請求。
壓測結果
05
總結
千里之行始於足下,開始以爲寫一個網關會很難,但當你實際開始行動時就會發現其實沒那麼難,所以邁出第一步很重要。過程中也遇到了很多問題,還在 github 上給 soul 和 nacos 這兩個開源項目提了兩個 issue,後來發現是自己的問題,尷尬😅。
本文代碼已全部上傳到 https://github.com/2YSP/ship-gate
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/AULqNKeaYl6o95gq0_ScPQ