從 - 99 打造 Sentinel 高可用集羣限流中間件

接上篇 Sentinel 集羣限流探索,上次簡單提到了集羣限流的原理,然後用官方給的 demo 簡單修改了一下,可以正常運行生效。

這一次需要更進一步,基於 Sentinel 實現內嵌式集羣限流的高可用方案,並且包裝成一箇中間件 starter 提供給三方使用。

對於高可用,我們主要需要解決兩個問題,這無論是使用內嵌或者獨立模式都需要解決的問題,相比而言,內嵌式模式更簡單一點。

  1. 集羣 server 自動選舉

  2. 自動故障轉移

  3. Sentinel-Dashboard 持久化到 Apollo

集羣限流

首先,考慮到大部分的服務可能都不需要集羣限流這個功能,因此實現一個註解用於手動開啓集羣限流模式,只有開啓註解的情況下,纔去實例化集羣限流的 Bean 和限流數據。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({EnableClusterImportSelector.class})
@Documented
public @interface SentinelCluster {
}

public class EnableClusterImportSelector implements DeferredImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata annotationMetadata) {
        return new String[]{ClusterConfiguration.class.getName()};
    }
}

這樣寫好之後,當掃描到有我們的 SentinelCluster 註解的時候,就會去實例化 ClusterConfiguration

@Slf4j
public class ClusterConfiguration implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
    private Environment environment;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ClusterManager.class);
        beanDefinitionBuilder.addConstructorArgValue(this.environment);
        registry.registerBeanDefinition("clusterManager", beanDefinitionBuilder.getBeanDefinition());
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}

在配置中去實例化用於管理集羣限流的ClusterManager,這段邏輯和我們之前文章中使用到的一般無二,註冊到ApolloDataSource之後自動監聽Apollo的變化達到動態生效的效果。

@Slf4j
public class ClusterManager {
    private Environment environment;
    private String namespace;
    private static final String CLUSTER_SERVER_KEY = "sentinel.cluster.server"; //服務集羣配置
    private static final String DEFAULT_RULE_VALUE = "[]"; //集羣默認規則
    private static final String FLOW_RULE_KEY = "sentinel.flow.rules"; //限流規則
    private static final String DEGRADE_RULE_KEY = "sentinel.degrade.rules"; //降級規則
    private static final String PARAM_FLOW_RULE_KEY = "sentinel.param.rules"; //熱點限流規則
    private static final String CLUSTER_CLIENT_CONFIG_KEY = "sentinel.client.config"; //客戶端配置

    public ClusterManager(Environment environment) {
        this.environment = environment;
        this.namespace = "YourNamespace";
        init();
    }

    private void init() {
        initClientConfig();
        initClientServerAssign();
        registerRuleSupplier();
        initServerTransportConfig();
        initState();
    }

    private void initClientConfig() {
        ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new ApolloDataSource<>(
                namespace,
                CLUSTER_CLIENT_CONFIG_KEY,
                DEFAULT_SERVER_VALUE,
                source -> JacksonUtil.from(source, ClusterClientConfig.class)
        );
        ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
    }

    private void initClientServerAssign() {
        ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new ApolloDataSource<>(
                namespace,
                CLUSTER_SERVER_KEY,
                DEFAULT_SERVER_VALUE,
                new ServerAssignConverter(environment)
        );
        ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
    }

    private void registerRuleSupplier() {
        ClusterFlowRuleManager.setPropertySupplier(ns -> {
            ReadableDataSource<String, List<FlowRule>> ds = new ApolloDataSource<>(
                    namespace,
                    FLOW_RULE_KEY,
                    DEFAULT_RULE_VALUE,
                    source -> JacksonUtil.fromList(source, FlowRule.class));
            return ds.getProperty();
        });
        ClusterParamFlowRuleManager.setPropertySupplier(ns -> {
            ReadableDataSource<String, List<ParamFlowRule>> ds = new ApolloDataSource<>(
                    namespace,
                    PARAM_FLOW_RULE_KEY,
                    DEFAULT_RULE_VALUE,
                    source -> JacksonUtil.fromList(source, ParamFlowRule.class)
            );
            return ds.getProperty();
        });
    }

    private void initServerTransportConfig() {
        ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new ApolloDataSource<>(
                namespace,
                CLUSTER_SERVER_KEY,
                DEFAULT_SERVER_VALUE,
                new ServerTransportConverter(environment)
        );

        ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
    }

    private void initState() {
        ReadableDataSource<String, Integer> clusterModeDs = new ApolloDataSource<>(
                namespace,
                CLUSTER_SERVER_KEY,
                DEFAULT_SERVER_VALUE,
                new ServerStateConverter(environment)
        );

        ClusterStateManager.registerProperty(clusterModeDs.getProperty());
    }
}

這樣的話,一個集羣限流的基本功能已經差不多是 OK 了,上述步驟都比較簡單,按照官方文檔基本都能跑起來,接下來要實現文章開頭提及到的核心的幾個功能了。

自動選舉 & 故障轉移

自動選舉怎麼實現?簡單點,不用考慮那麼多,每臺機器啓動成功之後直接寫入到 Apollo 當中,第一個寫入成功的就是 Server 節點。

這個過程爲了保證併發帶來的問題,我們需要加鎖確保只有一臺機器成功寫入自己的本機信息。

由於我使用 Eureka 作爲註冊中心,Eureka 又有CacheRefreshedEvent本地緩存刷新的事件,基於此每當本地緩存刷新,我們就去檢測當前 Server 節點是否存在,然後根據實際情況去實現選舉。

首先在 spring.factories 中添加我們的監聽器。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.test.config.SentinelEurekaEventListener

監聽器只有當開啓了集羣限流注解SentinelCluster之後纔會生效。

@Configuration
@Slf4j
@ConditionalOnBean(annotation = SentinelCluster.class)
public class SentinelEurekaEventListener implements ApplicationListener<CacheRefreshedEvent> {
    @Resource
    private DiscoveryClient discoveryClient;
    @Resource
    private Environment environment;
    @Resource
    private ApolloManager apolloManager;

    @Override
    public void onApplicationEvent(EurekaClientLocalCacheRefreshedEvent event) {
        if (!leaderAlive(loadEureka(), loadApollo())) {
            boolean tryLockResult = redis.lock; //redis或者其他加分佈式鎖
            if (tryLockResult) {
                try {
                    flush();
                } catch (Exception e) {
                } finally {
                    unlock();
                }
            }
        }
    }
  
    private boolean leaderAlive(List<ClusterGroup> eurekaList, ClusterGroup server) {
        if (Objects.isNull(server)) {
            return false;
        }
        for (ClusterGroup clusterGroup : eurekaList) {
            if (clusterGroup.getMachineId().equals(server.getMachineId())) {
                return true;
            }
        }
        return false;
    }
}

OK,其實看到代碼已經知道我們把故障轉移的邏輯也實現了,其實道理是一樣的。

第一次啓動的時候 Apollo 中的 server 信息是空的,所以第一臺加鎖寫入的機器就是 server 節點,後續如果 server 宕機下線,本地註冊表緩存刷新,對比 Eureka 的實例信息和 Apollo 中的 server,如果 server 不存在,那麼就重新執行選舉的邏輯。

需要注意的是,本地緩存刷新的時間極端情況下可能會達到幾分鐘級別,那麼也就是說在服務下線的可能幾分鐘內沒有重新選舉出新的 server 節點整個集羣限流是不可用的狀態,對於業務要求非常嚴格的情況這個方案就不太適用了。

對於 Eureka 緩存時間同步的問題,可以參考之前的文章 Eureka 服務下線太慢,電話被告警打爆了

Dashboard 持久化改造

到這兒爲止,我們已經把高可用方案實現好了,接下來最後一步,只要通過 Sentinel 自帶的控制檯能夠把配置寫入到 Apollo 中,那麼應用就自然會監聽到配置的變化,達到動態生效的效果。

根據官方的描述,官方已經實現了FlowControllerV2用於集羣限流,同時在測試目錄下有簡單的案例幫助我們快速實現控制檯的持久化的邏輯。

我們只要實現DynamicRuleProvider,同時注入到Controller中使用即可,這裏我們實現flowRuleApolloProvider用於提供從 Apollo 查詢數據,flowRuleApolloPublisher用於寫入限流配置到 Apollo。

@RestController
@RequestMapping(value = "/v2/flow")
public class FlowControllerV2 {
    private final Logger logger = LoggerFactory.getLogger(FlowControllerV2.class);

    @Autowired
    private InMemoryRuleRepositoryAdapter<FlowRuleEntity> repository;

    @Autowired
    @Qualifier("flowRuleApolloProvider")
    private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;
    @Autowired
    @Qualifier("flowRuleApolloPublisher")
    private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;


}

實現方式很簡單,provider 通過 Apollo 的 open-api 從 namespace 中讀取配置,publisher 則是通過 open-api 寫入規則。

@Component("flowRuleApolloProvider")
public class FlowRuleApolloProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {

    @Autowired
    private ApolloManager apolloManager;
    @Autowired
    private Converter<String, List<FlowRuleEntity>> converter;

    @Override
    public List<FlowRuleEntity> getRules(String appName) {
        String rules = apolloManager.loadNamespaceRuleList(appName, ApolloManager.FLOW_RULES_KEY);

        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        return converter.convert(rules);
    }
}

@Component("flowRuleApolloPublisher")
public class FlowRuleApolloPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {

    @Autowired
    private ApolloManager apolloManager;
    @Autowired
    private Converter<List<FlowRuleEntity>, String> converter;

    @Override
    public void publish(String app, List<FlowRuleEntity> rules) {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        apolloManager.writeAndPublish(app, ApolloManager.FLOW_RULES_KEY, converter.convert(rules));
    }
}

ApolloManager實現了通過open-api查詢和寫入配置的能力,使用需要自行配置 Apollo Portal 地址和 token,這裏不贅述,可以自行查看 Apollo 的官方文檔。

@Component
public class ApolloManager {
    private static final String APOLLO_USERNAME = "apollo";
    public static final String FLOW_RULES_KEY = "sentinel.flow.rules";
    public static final String DEGRADE_RULES_KEY = "sentinel.degrade.rules";
    public static final String PARAM_FLOW_RULES_KEY = "sentinel.param.rules";
    public static final String APP_NAME = "YourAppName";

    @Value("${apollo.portal.url}")
    private String portalUrl;
    @Value("${apollo.portal.token}")
    private String portalToken;
    private String apolloEnv;
    private String apolloCluster = "default";
    private ApolloOpenApiClient client;

    @PostConstruct
    public void init() {
        this.client = ApolloOpenApiClient.newBuilder()
                .withPortalUrl(portalUrl)
                .withToken(portalToken)
                .build();
        this.apolloEnv = "default";
    }

    public String loadNamespaceRuleList(String appName, String ruleKey) {
        OpenNamespaceDTO openNamespaceDTO = client.getNamespace(APP_NAME, apolloEnv, apolloCluster, "default");
        return openNamespaceDTO
                .getItems()
                .stream()
                .filter(p -> p.getKey().equals(ruleKey))
                .map(OpenItemDTO::getValue)
                .findFirst()
                .orElse("");
    }

    public void writeAndPublish(String appName, String ruleKey, String value) {
        OpenItemDTO openItemDTO = new OpenItemDTO();
        openItemDTO.setKey(ruleKey);
        openItemDTO.setValue(value);
        openItemDTO.setComment("Add Sentinel Config");
        openItemDTO.setDataChangeCreatedBy(APOLLO_USERNAME);
        openItemDTO.setDataChangeLastModifiedBy(APOLLO_USERNAME);
        client.createOrUpdateItem(APP_NAME, apolloEnv, apolloCluster, "default", openItemDTO);

        NamespaceReleaseDTO namespaceReleaseDTO = new NamespaceReleaseDTO();
        namespaceReleaseDTO.setEmergencyPublish(true);
        namespaceReleaseDTO.setReleasedBy(APOLLO_USERNAME);
        namespaceReleaseDTO.setReleaseTitle("Add Sentinel Config Release");
        client.publishNamespace(APP_NAME, apolloEnv, apolloCluster, "default", namespaceReleaseDTO);
    }

}

對於其他規則,比如降級、熱點限流都可以參考此方式去修改,當然控制檯要做的修改肯定不是這一點點,比如集羣的flowId默認使用的單機自增,這個肯定需要修改,還有頁面的傳參、查詢路由的修改等等,比較繁瑣,就不在此贅述了,總歸也就是工作量的問題。

好了,本期內容就這些,我是艾小仙,我們下期見。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/0_jIshZ73FultM4scvCCfw