微服務拆分治理最佳實踐

一、背景

部門中維護了一個老系統,功能都耦合在一個單體應用中 (300 + 接口),表也放在同一個庫中 (200 + 表),導致系統存在很多風險和缺陷。經常出現問題:如數據庫的單點、性能問題,應用的擴展受限,複雜性高等問題。

從下圖可見。各業務相互耦合無明確邊界,調用關係錯綜複雜。

隨着業務快速發展,各種問題越來越明顯,急需對系統進行微服務改造優化。經過思考,整體改造將分爲三個階段進行:

二、數據庫拆分

單體數據庫的痛點:未進行業務隔離,一個慢 SQL 易導致系統整體出現問題;吞吐量高,讀寫壓力大,性能下降;

數據庫改造

根據業務劃分,我們計劃將數據庫拆分爲 9 個業務庫。數據同步方式採用主從複製的方式,並且通過 binlog 過濾將對應的表和數據同步到對應的新數據庫中。

代碼改造方案

如果一個接口中操作了多張表,之前這些表屬於同一個庫,數據庫拆分後可能會分屬於不同的庫。所以需要針對代碼進行相應的改造。

目前存在問題的位置:

改造點梳理:

梳理方式:

採用部門中的切面工具,抓取入口和表的調用關係(可識別表的讀 / 寫操作),找到一個接口中操作了多個表,並且多個表分屬於不同業務庫的情況;

分佈式事務:

進行應用拆分和數據收口之後,是不存在分佈式事務的問題的,因爲操作第二個庫會調用對應系統的 RPC 接口進行操作。所以本次不會正式支持分佈式事務,而是採用代碼邏輯保證一致性的方式來解決;

方案一

將 service 中分別操作多個庫的 mapper,抽取成多個 Service。分別添加切換數據源註解和事務註解。

問題: 改動位置多,涉及改動的每個方法都需要梳理歷史業務;service 存在很多嵌套調用的情況,有時難以理清邏輯;修改 200 + 位置改動工作量大,風險高;

方案二

如圖所示,方案二將數據源註解移動到 Mapper 上,並使用自定義的事務實現來處理事務。

將多數據源註解放到 Mapper 上的好處是,不需要梳理代碼邏輯,只需要在 Mapper 上添加對應數據源名稱即可。但是這樣又有新的問題出現,

下面將對方案中出現的兩個組件進行簡要說明原理。

多數據源組件

多數據源組件是單個應用連接多個數據源時使用的工具,其核心原理是通過配置文件將數據庫鏈接在程序啓動時初始化好,在執行到存在註解的方法時,通過切面獲取當前的數據源名稱來切換數據源,當一次調用涉及多個數據源時,會利用棧的特性解決數據源嵌套的問題。

/**
 * 切面方法
 */
public Object switchDataSourceAroundAdvice(ProceedingJoinPoint pjp) throws Throwable {
        //獲取數據源的名字
        String dsName = getDataSourceName(pjp);
        boolean dataSourceSwitched = false;
        if (StringUtils.isNotEmpty(dsName)
                && !StringUtils.equals(dsName, StackRoutingDataSource.getCurrentTargetKey())) {
            // 見下一段代碼
            StackRoutingDataSource.setTargetDs(dsName);
            dataSourceSwitched = true;
        }
        try {
            // 執行切面方法
            return pjp.proceed();
        } catch (Throwable e) {
            throw e;
        } finally {
            if (dataSourceSwitched) {
                StackRoutingDataSource.clear();
            }
        }
    }
public static void setTargetDs(String dbName) {
    if (dbName == null) {
  throw new NullPointerException();
    }
    if (contextHolder.get() == null) {
        contextHolder.set(new Stack<String>());
    }
    contextHolder.get().push(dbName);
    log.debug("set current datasource is " + dbName);
}

StackRoutingDataSource 繼承 AbstractRoutingDataSource 類,AbstractRoutingDataSource 是 spring-jdbc 包提供的一個了 AbstractDataSource 的抽象類,它實現了 DataSource 接口的用於獲取數據庫鏈接的方法。

自定義事務實現

從方案二的圖中可以看到默認的事務實現使用的是 mybatis 的 SpringManagedTransaction。

如上圖,Transaction 和 SpringManagedTransaction 都是 mybatis 提供的類,他提供了接口供 SqlSession 使用,處理事務操作。

通過下邊的一段代碼可以看到,事務對象中存在 connection 變量,首次獲得數據庫鏈接後,後續當前事務內的所有數據庫操作都不會重新獲取數據庫鏈接,而是會使用現有的數據庫鏈接,從而無法支持跨庫操作。

public class SpringManagedTransaction implements Transaction {

  private static final Log LOGGER = LogFactory.getLog(SpringManagedTransaction.class);

  private final DataSource dataSource;

  private Connection connection;

  private boolean isConnectionTransactional;

  private boolean autoCommit;

  public SpringManagedTransaction(DataSource dataSource) {
    notNull(dataSource, "No DataSource specified");
    this.dataSource = dataSource;
  }
  // 下略
}

MultiDataSourceManagedTransaction 是我們自定義的事務實現,繼承自 SpringManagedTransaction 類,並在內部支持維護多個數據庫鏈接。每次執行數據庫操作時,會根據數據源名稱判斷,如果當前數據源沒有緩存的鏈接則重新獲取鏈接。這樣,service 上的事務註解其實控制了多個單庫事務,且作用域範圍相同,一起進行提交或回滾。

代碼如下:

public class MultiDataSourceManagedTransaction extends SpringManagedTransaction {
    private DataSource dataSource;

    public ConcurrentHashMap<String, Connection> CON_MAP = new ConcurrentHashMap<>();


    public MultiDataSourceManagedTransaction(DataSource dataSource) {
        super(dataSource);
        this.dataSource = dataSource;
    }

    @Override
    public Connection getConnection() throws SQLException {
        Method getCurrentTargetKey;
        String dataSourceKey;
        try {
            getCurrentTargetKey = dataSource.getClass().getDeclaredMethod("getCurrentTargetKey");
            getCurrentTargetKey.setAccessible(true);
            dataSourceKey = (String) getCurrentTargetKey.invoke(dataSource);
        } catch (Exception e) {
            log.error("MultiDataSourceManagedTransaction invoke getCurrentTargetKey 異常", e);
            return null;
        }

        if (CON_MAP.get(dataSourceKey) == null) {
            Connection connection = dataSource.getConnection();
            if (!TransactionSynchronizationManager.isActualTransactionActive()) {
                connection.setAutoCommit(true);
            } else {
                connection.setAutoCommit(false);
            }
            CON_MAP.put(dataSourceKey, connection);
            return connection;
        }

        return CON_MAP.get(dataSourceKey);
    }

    @Override
    public void commit() throws SQLException {
        if (CON_MAP == null || CON_MAP.size() == 0) {
            return;
        }
        Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
        for (Map.Entry<String, Connection> entry : entries) {
            Connection value = entry.getValue();
            if (!value.isClosed() && !value.getAutoCommit()) {
                value.commit();
            }
        }
    }

    @Override
    public void rollback() throws SQLException {
        if (CON_MAP == null || CON_MAP.size() == 0) {
            return;
        }
        Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
        for (Map.Entry<String, Connection> entry : entries) {
            Connection value = entry.getValue();
            if (value == null) {
                continue;
            }
            if (!value.isClosed() && !value.getAutoCommit()) {
                entry.getValue().rollback();
            }
        }
    }

    @Override
    public void close() throws SQLException {
        if (CON_MAP == null || CON_MAP.size() == 0) {
            return;
        }
        Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
        for (Map.Entry<String, Connection> entry : entries) {
            DataSourceUtils.releaseConnection(entry.getValue(), this.dataSource);
        }
        CON_MAP.clear();
    }
}

注:上面並不是分佈式事務。在數據收口之前,它只存在於同一個 JVM 中。如果項目允許,可以考慮使用 Atomikos 和 Mybatis 整合的方案。

數據安全性

本次進行了很多代碼改造,如何保證數據安全,保證數據不丟失,我們的機制如下,分爲三種情況進行討論:

綜上,通過對三種情況的處理來保證數據的安全性。

三、應用拆分

系統接近單體架構,存在以下風險:

  1. 系統性風險:一個組件缺陷會導致整個進程崩潰,如內存泄漏、死鎖。

  2. 複雜性高:系統代碼繁多,每次修改代碼都心驚膽戰,任何一個 bug 都可能導致整個系統崩潰,不敢優化代碼導致代碼可讀性也越來越差。

  3. 測試環境衝突,測試效率低:業務都耦合在一個系統,只要有需求就會出現環境搶佔,需要額外拉分支合併代碼。

拆分方案

與數據庫拆分相同,系統拆分也是根據業務劃分拆成 9 個新系統。

方案一:搭建空的新系統,然後將老系統的相關代碼挪到新系統。

方案二:從老系統原樣複製出 9 個新系統,然後直接上線,通過流量路由將老系統流量轉發到新系統,後續再對新系統的冗餘代碼做刪減。

我們在考慮拆分風險和拆分效率後,最終選擇了方案二。

拆分實踐

1、搭建新系統

直接複製老系統代碼,修改系統名稱,部署即可

2、流量路由

路由器是拆分的核心,負責分發流量到新系統,同時需要支持識別測試流量,讓測試同學可以提前在線上測試新系統。我們這邊用 filter 來作爲路由器的,源碼見下方。

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws ServletException, IOException {
    HttpServletRequest servletRequest = (HttpServletRequest) request;
    HttpServletResponse servletResponse = (HttpServletResponse) response;

    // 路由開關(0-不路由, 1-根據指定請求頭路由, 2-全量路由)
    final int systemRouteSwitch = configUtils.getInteger("system_route_switch", 1);
    if (systemRouteSwitch == 0) {
        filterChain.doFilter(request, response);
        return;
    }
    // 只路由測試流量
    if (systemRouteSwitch == 1) {
        // 檢查請求頭是否包含測試流量標識 包含才進行路由
        String systemRoute = ((HttpServletRequest) request).getHeader("systemRoute");
        if (systemRoute == null || !systemRoute.equals("1")) {
            filterChain.doFilter(request, response);
            return;
        }
    }

    String systemRouteMapJsonStr = configUtils.getString("route.map", "");
    Map<String, String> map = JSONObject.parseObject(systemRouteMapJsonStr, Map.class);
    String rootUrl = map.get(servletRequest.getRequestURI());

    if (StringUtils.isEmpty(rootUrl)) {
        log.error("路由失敗,本地服務內部處理。原因:請求地址映射不到對應系統, uri : {}", servletRequest.getRequestURI());
        filterChain.doFilter(request, response);
        return;
    }

    String targetURL = rootUrl + servletRequest.getRequestURI();
    if (servletRequest.getQueryString() != null) {
        targetURL = targetURL + "?" + servletRequest.getQueryString();
    }
    RequestEntity<byte[]> requestEntity = null;
    try {
        log.info("路由開始 targetURL = {}", targetURL);
        requestEntity = createRequestEntity(servletRequest, targetURL);
        ResponseEntity responseEntity = restTemplate.exchange(requestEntity, byte[].class);
        if (requestEntity != null && requestEntity.getBody() != null && requestEntity.getBody().length > 0) {
            log.info("路由完成-請求信息: requestEntity = {}, body = {}", requestEntity.toString(), new String(requestEntity.getBody()));
        } else {
            log.info("路由完成-請求信息: requestEntity = {}", requestEntity != null ? requestEntity.toString() : targetURL);
        }

        HttpHeaders headers = responseEntity.getHeaders();
        String resp = null;
        if (responseEntity.getBody() != null && headers != null && headers.get("Content-Encoding") != null && headers.get("Content-Encoding").contains("gzip")) {
            byte[] bytes = new byte[30 * 1024];
            int len = new GZIPInputStream(new ByteArrayInputStream((byte[]) responseEntity.getBody())).read(bytes, 0, bytes.length);
            resp = new String(bytes, 0, len);
        }

        log.info("路由完成-響應信息: targetURL = {}, headers = {}, resp = {}", targetURL, JSON.toJSONString(headers), resp);
        if (headers != null && headers.containsKey("Location") && CollectionUtils.isNotEmpty(headers.get("Location"))) {
            log.info("路由完成-需要重定向到 {}", headers.get("Location").get(0));
            ((HttpServletResponse) response).sendRedirect(headers.get("Location").get(0));
        }
        addResponseHeaders(servletRequest, servletResponse, responseEntity);
        writeResponse(servletResponse, responseEntity);
    } catch (Exception e) {
        if (requestEntity != null && requestEntity.getBody() != null && requestEntity.getBody().length > 0) {
            log.error("路由異常-請求信息: requestEntity = {}, body = {}", requestEntity.toString(), new String(requestEntity.getBody()), e);
        } else {
            log.error("路由異常-請求信息: requestEntity = {}", requestEntity != null ? requestEntity.toString() : targetURL, e);
        }
        response.setCharacterEncoding("UTF-8");
        ((HttpServletResponse) response).addHeader("Content-Type", "application/json");
        response.getWriter().write(JSON.toJSONString(ApiResponse.failed("9999", "網絡繁忙哦~,請您稍後重試")));
    }
}

3、接口抓取 & 歸類

路由 filter 是根據接口路徑將請求分發到各個新系統的,所以需要抓取一份接口和新系統的映射關係。

我們這邊自定義了一個註解 @TargetSystem,用註解標識接口應該路由到的目標系統域名

@TargetSystem(value = "http://order.demo.com")
@GetMapping("/order/info")
public ApiResponse orderInfo(String orderId) {
    return ApiResponse.success();
}

歷獲取所有 controller 根據接口地址和註解生成路由映射關係 map

    /**
     * 生成路由映射關係MAP
     * key:接口地址 ,value:路由到目標新系統的域名
     */
    public Map<String, String> generateRouteMap() {
        Map<RequestMappingInfo, HandlerMethod> handlerMethods = requestMappingHandlerMapping.getHandlerMethods();
        Set<Map.Entry<RequestMappingInfo, HandlerMethod>> entries = handlerMethods.entrySet();
        Map<String, String> map = new HashMap<>();
        for (Map.Entry<RequestMappingInfo, HandlerMethod> entry : entries) {
            RequestMappingInfo key = entry.getKey();
            HandlerMethod value = entry.getValue();
            Class declaringClass = value.getMethod().getDeclaringClass();
            TargetSystem targetSystem = (TargetSystem) declaringClass.getAnnotation(TargetSystem.class);
            String targetUrl = targetSystem.value();
            String s1 = key.getPatternsCondition().toString();
            String url = s1.substring(1, s1.length() - 1);
            map.put(url, targetUrl);
        }
        return map;
    }

4、測試流量識別

測試可以用利用抓包工具 charles,爲每個請求都添加固定的請求頭,也就是測試流量標識,路由器攔截請求後判斷請求頭內是否包含測試流量標,包含就路由到新系統,不包含就是線上流量留在老系統執行。

5、需求代碼合併

執行系統拆分的過程中,還是有需求正在並行開發,並且需求代碼是寫在老系統的,系統拆分完成上線後,需要將這部分需求的代碼合併到新系統,同時要保證 git 版本記錄不能丟失,那應該怎麼做呢?

我們利用了 git 可以添加多個多個遠程倉庫來解決需求合併的痛點,命令:git remote add origin 倉庫地址,把新系統的 git 倉庫地址添加爲老系統 git 的遠程倉庫,老系統的 git 變動就可以同時 push 到所有新系統的倉庫內,新系統 pull 下代碼後進行合併。

6、上線風險

風險一:JOB 在新老系統並行執行。新系統是複製的老系統,JOB 也會複製過來,導致新老系統有相同的 JOB,如果這時候上線新系統,新系統的 JOB 就會執行,老系統的 JOB 也一直在 run,這樣一個 JOB 就會執行 2 次。新系統剛上線還沒經過測試驗證,這時候執行 JOB 是有可能失敗的。以上 2 種情況都會引起線上 Bug,影響系統穩定性。

風險二:新系統提前消費 MQ。和風險一一樣,新系統監聽和老系統一樣的 topic,如果新系統直接上線,消息是有可能被新系統消費的,新系統剛上線還沒經過測試驗證,消費消息有可能會出異常,造成消息丟失或其他問題,影響系統穩定性。

如何解決以上 2 個上線風險呢?

我們用 “動態開關” 解決了上述風險,爲新老系統的 JOB 和 MQ 都加了開關,用開關控制 JOB 和 MQ 在新 / 老系統執行。上線後新系統的 JOB 和 MQ 都是關掉的,待 QA 測試通過後,把老系統的 JOB 和 MQ 關掉,把新系統的 JOB 和 MQ 打開就可以了。

系統瘦身

拆分的時候已經梳理出了一份 “入口映射關係 map”,每個新系統只需要保留自己系統負責的接口、JOB、MQ 代碼就可以了,除此之外都可以進行刪除。

拆分帶來的好處

  1. 系統架構更合理,可用性更高:即使某個服務掛了也不會導致整個系統崩潰

  2. 複雜性可控:每個系統都是單一職責,系統邏輯清晰

  3. 系統性能提升上限大:可以針對每個系統做優化,如加緩存

  4. 測試環境衝突的問題解決,不會因爲多個系統需求並行而搶佔環境

四、數據訪問權限接口

問題介紹

數據訪問權限未收口:一個業務的數據庫被其餘業務應用直接訪問,未通過 rpc 接口將數據訪問權限收口到數據擁有方自己的應用。數據訪問邏輯分散,存在業務耦合,阻礙後續迭代和優化。

問題產生的背景:之前是單體應用和單體數據庫,未進行業務隔離。在進行數據庫拆分和系統拆分時,爲解決系統穩定性的問題需快速上線,所以未優化拆分後跨業務訪問數據庫的情況。本階段是對數據庫拆分和應用拆分的延伸和補充。

改造過程

  1. RPC 接口統計(如圖一)

進行比對,如程序入口歸類和調用的業務 DB 歸類不一致,則認爲 Dao 方法需提供 RPC 接口

經統計,應用訪問非本業務數據庫的位置有 260+。由於涉及位置多,人工改造成本高、效率較低,且有錯改和漏掉的風險,我們採用了開發工具,用工具進行代碼生成和批量修改的方式進行改造。

  1. RPC 接口生成(如圖二)

名詞解釋:

  1. 灰度方案(如圖三)

收益

  1. 業務數據解耦,數據操作統一由各自垂直系統進行,入口統一

  2. 方便後續在接口粒度上增加緩存和降級處理

五、總結

以上,是我們對單體系統的改造過程,經過了三步優化、上線,將單體系統平滑過渡到了微服務結構。解決了數據庫的單點問題、性能問題,應用業務得到了簡化,更利於分工,迭代。並且可以針對各業務單獨進行優化升級,擴容、縮容,提升了資源的利用率。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Orbm7viIOCW--OkG3SpwOg