Sentinel 集羣限流探索
最近看了下關於分佈式限流的部分,看到 Sentinel 的分佈式限流,也就是集羣限流的部分,想搭個環境看看,結果發現網上關於這方面的內容基本可以說沒有,你甚至很難跑起來他的 demo,就算能跑起來,估計也得自己研究半天,麻煩的要死。
我猜測很重要的原因可能就是 Sentinel 關於這塊做的並不完善,而且從官方的 Issue 中能看出來,其實官方對於這塊後續並沒有計劃去做的更好。
那麼廢話不多說,在此之前,肯定要先說下關於 Sentinel 集羣限流方面的原理,沒有原理一切都是空中樓閣。
集羣限流原理
原理這方面比較好解釋,就是在原本的限流規則中加了一個clusterMode
參數,如果是true
的話,那麼會走集羣限流的模式,反之就是單機限流。
如果是集羣限流,判斷身份是限流客戶端還是限流服務端,客戶端則和服務端建立通信,所有的限流都通過和服務端的交互來達到效果。
對於 Sentinel 集羣限流,包含兩種模式,內嵌式和獨立式。
內嵌式
什麼是內嵌式呢,簡單來說,要限流那麼必然要有個服務端去處理多個客戶端的限流請求,對於內嵌式來說呢,就是整個微服務集羣內部選擇一臺機器節點作爲限流服務端(Sentinel 把這個叫做 token-server),其他的微服務機器節點作爲限流的客戶端(token-client),這樣的做法有缺點也有優點。
限流 - 嵌入式
首先說優點:這種方式部署不需要獨立部署限流服務端,節省獨立部署服務端產生的額外服務器開支,降低部署和維護複雜度。
再說缺點,缺點的話也可以說是整個 Sentinel 在集羣限流這方面做得不夠好的問題。
先說第一個缺點:無自動故障轉移機制。
無論是內嵌式還是獨立式的部署方案,都無法做到自動的故障轉移。
所有的 server 和 client 都需要事先知道 IP 的請求下做出配置,如果 server 掛了,需要手動的修改配置,否則集羣限流會退化成單機限流。
比如你的交易服務有 3 臺機器 A\B\C,其中 A 被手動設置爲 server,B\C 則是作爲 client,當 A 服務器宕機之後,需要手動修改 B\C 中一臺作爲 server,否則整個集羣的機器都將退化回單機限流的模式。
但是,如果 client 掛了,則是不會影響到整個集羣限流的,比如 B 掛了,那麼 A 和 C 將會繼續組成集羣限流。
如果 B 再次重啓成功,那麼又會重新加入到整個集羣限流當中來,因爲會有一個自動重連的機制,默認的時間是 N*2 秒,逐漸遞增的一個時間。
這是想用 Sentinel 做集羣限流並且使用內嵌式需要考慮的問題,要自己去實現自動故障轉移的機制,當然,server 節點選舉也要自己實現了。
對於這個問題,官方提供了可以修改 server/client 的 API 接口,另外一個就是可以基於動態的數據源配置方式,這個我們後面再談。
第二個缺點:適用於單微服務集羣內部限流。
這個其實也是顯而易見的道理,都內部選舉一臺作爲 server 去限流了,如果還跨多個微服務的話,顯然是不太合理的行爲,現實中這種情況肯定也是非常少見的了,當然你非要想跨多個微服務集羣也不是不可以,只要你開心就好。
第三個缺點:server 節點的機器性能會受到一定程度的影響。
這個肯定也比較好理解的,作爲 server 去限流,那麼其他的客戶端肯定要和 server 去通信才能做到集羣限流啊,對不對,所以一定程度上肯定會影響到 server 節點本身服務的性能,但是我覺得問題不大,就當 server 節點多了一個流量比較大的接口好了。
具體上會有多大的影響,我沒有實際對這塊做出實際的測試,如果真的流量非常大,需要實際測試一下這方面的問題。
我認爲影響還是可控的,本身 server 和 client 基於 netty 通信,通信的內容其實也非常的小。
獨立式
說完內嵌式的這些點,然後再說獨立式,也非常好理解,就是單獨部署一臺機器作爲限流服務端 server,就不在本身微服務集羣內部選一臺作爲 server 了。
限流 - 獨立式
很明顯,優點就是解決了上面的缺點。
-
不會和內嵌式一樣,影響到 server 節點的本身性能
-
可以適用於跨多個微服務之間的集羣限流
優點可以說就是解決了內嵌式的兩個缺點,那麼缺點也來了,這同樣也是 Sentinel 本身並沒有幫助我們去解決的問題。
缺點一:需要獨立部署,會產生額外的資源(錢)和運維複雜度
缺點二:server 默認是單機,需要自己實現高可用方案
缺點二很致命啊,官方的 server 實現默認就是單機的,單點問題大家懂的都懂,自己實現高可用,我真的是有點服了。
這麼說 Sentinel 這個集羣限流就是簡單的實現了一下,真正複雜的部分他都沒管,你可以這麼理解。
run 起來
那基本原理大概瞭解之後,還是要真正跑起來看看效果的,畢竟開頭我就說了,網上這方面真的是感覺啥也搜不到,下面以嵌入式集羣的方式舉例。
無論集羣限流還是單機限流的方式,官方都支持寫死配置和動態數據源的配置方式,寫的話下面的代碼中也都有,被我註釋掉了,至於動態數據源的配置,會基於 Apollo 來實現。
理解一下動態數據源的配置方式,基於這個我們可以實現限流規則的動態刷新,還有重點的一點可以做到基於修改配置方式的半自動故障轉移。
動態數據源支持推和拉兩種方式,比如文件系統和 Eureka 就是拉取的方式,定時讀取文件內容的變更,Eureka 則是建立 HTTP 連接,定時獲取元數據的變更。
推送的方式主要是基於事件監聽機制,比如 Apollo 和 Nacos,Redis 官方則是基於 Pub/Sub 來實現,默認的實現方式是基於 Lettuce,如果想用其他的客戶端要自己實現。
限流 - 集羣工作模式
首先,該引入的包還是引入。
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-client-default</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-server-default</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-apollo</artifactId>
<version>1.8.4</version>
</dependency>
實現 SPI,在resources
目錄的META-INF/services
下新增名爲com.alibaba.csp.sentinel.init.InitFunc
的文件,內容寫上我們自己實現的類名,比如我的com.irving.demo.init.DemoClusterInitFunc
。
實現InitFunc
接口,重寫init
方法,代碼直接貼出來,這裏整體依賴的是 Apollo 的配置方式,註釋的部分是我在測試的時候寫死代碼的配置方式,也是可以用的。
public class DemoClusterInitFunc implements InitFunc {
private final String namespace = "application";
private final String ruleKey = "demo_sentinel";
private final String ruleServerKey = "demo_cluster";
private final String defaultRuleValue = "[]";
@Override
public void init() throws Exception {
// 初始化 限流規則
initDynamicRuleProperty();
//初始化 客戶端配置
initClientConfigProperty();
// 初始化 服務端配置信息
initClientServerAssignProperty();
registerClusterRuleSupplier();
// token-server的傳輸規則
initServerTransportConfigProperty();
// 初始化 客戶端和服務端狀態
initStateProperty();
}
/**
* 限流規則和熱點限流規則配置
*/
private void initDynamicRuleProperty() {
ReadableDataSource<String, List<FlowRule>> ruleSource = new ApolloDataSource<>(namespace, ruleKey,
defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
}));
FlowRuleManager.register2Property(ruleSource.getProperty());
ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new ApolloDataSource<>(namespace, ruleKey,
defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {
}));
ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
}
/**
* 客戶端配置,註釋的部分是通過Apollo配置,只有一個配置我就省略了
*/
private void initClientConfigProperty() {
// ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new ApolloDataSource<>(namespace, ruleKey,
// defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {
// }));
// ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
ClusterClientConfig clientConfig = new ClusterClientConfig();
clientConfig.setRequestTimeout(1000);
ClusterClientConfigManager.applyNewConfig(clientConfig);
}
/**
* client->server 傳輸配置,設置端口號,註釋的部分是寫死的配置方式
*/
private void initServerTransportConfigProperty() {
ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new ApolloDataSource<>(namespace, ruleServerKey,
defaultRuleValue, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
});
ServerTransportConfig serverTransportConfig = Optional.ofNullable(groupList)
.flatMap(this::extractServerTransportConfig)
.orElse(null);
return serverTransportConfig;
});
ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
// ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig().setIdleSeconds(600).setPort(transPort));
}
private void registerClusterRuleSupplier() {
ClusterFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<FlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey,
defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
}));
return ds.getProperty();
});
ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<ParamFlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey,
defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {
}));
return ds.getProperty();
});
}
/**
* 服務端配置,設置server端口和IP,註釋的配置是寫死的方式,這個在服務端是不用配置的,只有客戶端需要配置用來連接服務端
*/
private void initClientServerAssignProperty() {
ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new ApolloDataSource<>(namespace, ruleServerKey,
defaultRuleValue, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
});
ClusterClientAssignConfig clusterClientAssignConfig = Optional.ofNullable(groupList)
.flatMap(this::extractClientAssignment)
.orElse(null);
return clusterClientAssignConfig;
});
ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
// ClusterClientAssignConfig serverConfig = new ClusterClientAssignConfig();
// serverConfig.setServerHost("127.0.0.1");
// serverConfig.setServerPort(transPort);
// ConfigSupplierRegistry.setNamespaceSupplier(() -> "trade-center");
// ClusterClientConfigManager.applyNewAssignConfig(serverConfig);
}
private Optional<ClusterClientAssignConfig> extractClientAssignment(List<ClusterGroupEntity> groupList) {
ClusterGroupEntity tokenServer = groupList.stream().filter(x -> x.getState().equals(ClusterStateManager.CLUSTER_SERVER)).findFirst().get();
Integer currentMachineState = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
if (currentMachineState.equals(ClusterStateManager.CLUSTER_CLIENT)) {
String ip = tokenServer.getIp();
Integer port = tokenServer.getPort();
return Optional.of(new ClusterClientAssignConfig(ip, port));
}
return Optional.empty();
}
/**
* 初始化客戶端和服務端狀態,註釋的也是寫死的配置方式
*/
private void initStateProperty() {
ReadableDataSource<String, Integer> clusterModeDs = new ApolloDataSource<>(namespace, ruleServerKey,
defaultRuleValue, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
});
Integer state = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
return state;
});
ClusterStateManager.registerProperty(clusterModeDs.getProperty());
// ClusterStateManager.applyState(ClusterStateManager.CLUSTER_SERVER);
}
private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
return groupList.stream()
.filter(x -> x.getMachineId().equalsIgnoreCase(getCurrentMachineId()) && x.getState().equals(ClusterStateManager.CLUSTER_SERVER))
.findAny()
.map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
}
private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
return getCurrentMachineId().equals(group.getMachineId());
}
private String getCurrentMachineId() {
// 通過-Dcsp.sentinel.api.port=8719 配置, 默認8719,隨後遞增
return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();
}
private static final String SEPARATOR = "@";
}
基礎類,定義配置的基礎信息。
@Data
public class ClusterGroupEntity {
private String machineId;
private String ip;
private Integer port;
private Integer state;
}
然後是 Apollo 中的限流規則的配置和 server/client 集羣關係的配置。
需要說明一下的就是flowId
,這個是區分限流規則的全局唯一 ID,必須要有,否則集羣限流會有問題。
thresholdType
代表限流模式,默認是 0,代表單機均攤,比如這裏count
限流 QPS=20,有 3 臺機器,那麼集羣限流閾值就是 60,如果是 1 代表全局閾值,也就是count
配置的值就是集羣限流的上限。
demo_sentinel=[
{
"resource": "test_res", //限流資源名
"count": 20, //集羣限流QPS
"clusterMode": true, //true爲集羣限流模式
"clusterConfig": {
"flowId": 111, //這個必須得有,否則會有問題
"thresholdType": 1 //限流模式,默認爲0單機均攤,1是全局閾值
}
}
]
demo_cluster=[
{
"ip": "192.168.3.20",
"machineId": "192.168.3.20@8720",
"port": 9999, //server和client通信接口
"state": 1 //指定爲server
},
{
"ip": "192.168.3.20",
"machineId": "192.168.3.20@8721",
"state": 0
},
{
"ip": "192.168.3.20",
"machineId": "192.168.3.20@8722",
"state": 0
}
]
OK,到這裏代碼和配置都已經 OK,還需要跑起來 Sentinel 控制檯,這個不用教,還有啓動參數。
本地可以直接跑多個客戶端,注意修改端口號:-Dserver.port=9100 -Dcsp.sentinel.api.port=8720
這兩個一塊改,至於怎麼連 Apollo 這塊我就省略了,自己整吧,公司應該都有,不行的話用代碼裏的寫死的方式也可以用。
-Dserver.port=9100 -Dcsp.sentinel.api.port=8720 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dcsp.sentinel.log.use.pid=true
因爲有流量之後控制檯才能看到限流的情況,所以用官方給的限流測試代碼修改一下,放到 Springboot 啓動類中,觸發限流規則的初始化。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
new FlowQpsDemo();
}
}
測試限流代碼:
public class FlowQpsDemo {
private static final String KEY = "test_res";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 32;
private static int seconds = 60 + 40;
public FlowQpsDemo() {
tick();
simulateTraffic();
}
private static void simulateTraffic() {
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " send qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
// stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
static class RunTask implements Runnable {
@Override
public void run() {
while (!stop) {
Entry entry = null;
try {
entry = SphU.entry(KEY);
// token acquired, means pass
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
Random random2 = new Random();
try {
TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
} catch (InterruptedException e) {
// ignore
}
}
}
}
}
啓動之後查看控制檯,可以看到嵌入式的集羣服務端已經啓動好。
查看限流的情況:
最後爲了測試效果,再啓動一個客戶端,修改端口號爲 9200 和 8721,可以看到新的客戶端已經連接到了服務端,不過這裏顯示的總 QPS 30000 和我們配置的不符,這個不用管他。
好了,這個就是集羣限流原理和使用配置方式,當然了,你可以啓動多臺服務,然後手動修改 Apollo 中的state
參數修改服務端,驗證修改配置的方式是否能實現故障轉移機制,另外就是關閉 client 或者 server 驗證是否回退到單機限流的情況,這裏就不一一測試了,因爲我已經測試過了呀。
對於獨立式的部署方式基本也是一樣的,只是單獨啓動一個服務端的服務,需要手動配置 server,而嵌入式的則不需要,loadServerNamespaceSet
配置爲自己的服務名稱即可。
ClusterTokenServer tokenServer = new SentinelDefaultTokenServer();
ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig()
.setIdleSeconds(600)
.setPort(11111));
ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton(DemoConstants.APP_NAME));
tokenServer.start();
OK,這就是本期的所有內容,我是艾小仙,我們過兩個月再見。
不要問我爲啥突然復活了,只是在家隔離玩遊戲玩的有點無聊了。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/BcmsrvV2xkaBybvxvB6UMQ