微服務拆分治理最佳實踐
一、背景
部門中維護了一個老系統,功能都耦合在一個單體應用中 (300 + 接口),表也放在同一個庫中 (200 + 表),導致系統存在很多風險和缺陷。經常出現問題:如數據庫的單點、性能問題,應用的擴展受限,複雜性高等問題。
從下圖可見。各業務相互耦合無明確邊界,調用關係錯綜複雜。
隨着業務快速發展,各種問題越來越明顯,急需對系統進行微服務改造優化。經過思考,整體改造將分爲三個階段進行:
-
數據庫拆分:數據庫按照業務垂直拆分。
-
應用拆分:應用按照業務垂直拆分。
-
數據訪問權限收口:數據權限按照各自業務領域,歸屬到各自的應用,應用與數據庫一對一,禁止交叉訪問。
二、數據庫拆分
單體數據庫的痛點:未進行業務隔離,一個慢 SQL 易導致系統整體出現問題;吞吐量高,讀寫壓力大,性能下降;
數據庫改造
根據業務劃分,我們計劃將數據庫拆分爲 9 個業務庫。數據同步方式採用主從複製的方式,並且通過 binlog 過濾將對應的表和數據同步到對應的新數據庫中。
代碼改造方案
如果一個接口中操作了多張表,之前這些表屬於同一個庫,數據庫拆分後可能會分屬於不同的庫。所以需要針對代碼進行相應的改造。
目前存在問題的位置:
-
數據源選擇:系統之前是支持多數據源切換的,在 service 上添加註解來選擇數據源。數據庫拆分後出現的情況是同一個 service 中操作的多個 mapper 從屬於不同的庫。
-
事務:事務註解目前是存在於 service 上的,並且事務會緩存數據庫鏈接,一個事務內不支持同時操作多個數據庫。
改造點梳理:
-
同時寫入多個庫,且是同一事務的接口 6 個:需改造數據源,需改造事務,需要關注分佈式事務;
-
同時寫入多個庫,且不是同一事務的接口 50+:需改造數據源,需改造事務,無需關注分佈式事務;
-
同時讀取多個庫 或 讀取一個庫寫入另一個庫的接口 200+:需改造數據源,但無需關注事務;
-
涉及多個庫的表的聯合查詢 8 個:需進行代碼邏輯改造
梳理方式:
採用部門中的切面工具,抓取入口和表的調用關係(可識別表的讀 / 寫操作),找到一個接口中操作了多個表,並且多個表分屬於不同業務庫的情況;
分佈式事務:
進行應用拆分和數據收口之後,是不存在分佈式事務的問題的,因爲操作第二個庫會調用對應系統的 RPC 接口進行操作。所以本次不會正式支持分佈式事務,而是採用代碼邏輯保證一致性的方式來解決;
方案一
將 service 中分別操作多個庫的 mapper,抽取成多個 Service。分別添加切換數據源註解和事務註解。
問題: 改動位置多,涉及改動的每個方法都需要梳理歷史業務;service 存在很多嵌套調用的情況,有時難以理清邏輯;修改 200 + 位置改動工作量大,風險高;
方案二
如圖所示,方案二將數據源註解移動到 Mapper 上,並使用自定義的事務實現來處理事務。
將多數據源註解放到 Mapper 上的好處是,不需要梳理代碼邏輯,只需要在 Mapper 上添加對應數據源名稱即可。但是這樣又有新的問題出現,
-
問題 1:如上圖,事務的是配置在 Service 層,當事務開啓時,數據源的連接並沒有獲取到,因爲真正的數據源配置在 Mapper 上。所以會報錯,這個錯誤可以通過多數據源組件的默認數據源功能解決。
-
問題 2:mybatis 的事務實現會緩存數據庫鏈接。當第一次緩存了數據庫鏈接後,後續配置在 mapper 上的數據源註解並不會重新獲取數據庫鏈接,而是直接使用緩存起來的數據庫鏈接。如果後續的 mapper 要操作其餘數據庫,會出現找不到表的情況。鑑於以上問題,我們開發了一個自定義的事務實現類,用來解決這個問題。
下面將對方案中出現的兩個組件進行簡要說明原理。
多數據源組件
多數據源組件是單個應用連接多個數據源時使用的工具,其核心原理是通過配置文件將數據庫鏈接在程序啓動時初始化好,在執行到存在註解的方法時,通過切面獲取當前的數據源名稱來切換數據源,當一次調用涉及多個數據源時,會利用棧的特性解決數據源嵌套的問題。
/**
* 切面方法
*/
public Object switchDataSourceAroundAdvice(ProceedingJoinPoint pjp) throws Throwable {
//獲取數據源的名字
String dsName = getDataSourceName(pjp);
boolean dataSourceSwitched = false;
if (StringUtils.isNotEmpty(dsName)
&& !StringUtils.equals(dsName, StackRoutingDataSource.getCurrentTargetKey())) {
// 見下一段代碼
StackRoutingDataSource.setTargetDs(dsName);
dataSourceSwitched = true;
}
try {
// 執行切面方法
return pjp.proceed();
} catch (Throwable e) {
throw e;
} finally {
if (dataSourceSwitched) {
StackRoutingDataSource.clear();
}
}
}
public static void setTargetDs(String dbName) {
if (dbName == null) {
throw new NullPointerException();
}
if (contextHolder.get() == null) {
contextHolder.set(new Stack<String>());
}
contextHolder.get().push(dbName);
log.debug("set current datasource is " + dbName);
}
StackRoutingDataSource 繼承 AbstractRoutingDataSource 類,AbstractRoutingDataSource 是 spring-jdbc 包提供的一個了 AbstractDataSource 的抽象類,它實現了 DataSource 接口的用於獲取數據庫鏈接的方法。
自定義事務實現
從方案二的圖中可以看到默認的事務實現使用的是 mybatis 的 SpringManagedTransaction。
如上圖,Transaction 和 SpringManagedTransaction 都是 mybatis 提供的類,他提供了接口供 SqlSession 使用,處理事務操作。
通過下邊的一段代碼可以看到,事務對象中存在 connection 變量,首次獲得數據庫鏈接後,後續當前事務內的所有數據庫操作都不會重新獲取數據庫鏈接,而是會使用現有的數據庫鏈接,從而無法支持跨庫操作。
public class SpringManagedTransaction implements Transaction {
private static final Log LOGGER = LogFactory.getLog(SpringManagedTransaction.class);
private final DataSource dataSource;
private Connection connection;
private boolean isConnectionTransactional;
private boolean autoCommit;
public SpringManagedTransaction(DataSource dataSource) {
notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
}
// 下略
}
MultiDataSourceManagedTransaction 是我們自定義的事務實現,繼承自 SpringManagedTransaction 類,並在內部支持維護多個數據庫鏈接。每次執行數據庫操作時,會根據數據源名稱判斷,如果當前數據源沒有緩存的鏈接則重新獲取鏈接。這樣,service 上的事務註解其實控制了多個單庫事務,且作用域範圍相同,一起進行提交或回滾。
代碼如下:
public class MultiDataSourceManagedTransaction extends SpringManagedTransaction {
private DataSource dataSource;
public ConcurrentHashMap<String, Connection> CON_MAP = new ConcurrentHashMap<>();
public MultiDataSourceManagedTransaction(DataSource dataSource) {
super(dataSource);
this.dataSource = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
Method getCurrentTargetKey;
String dataSourceKey;
try {
getCurrentTargetKey = dataSource.getClass().getDeclaredMethod("getCurrentTargetKey");
getCurrentTargetKey.setAccessible(true);
dataSourceKey = (String) getCurrentTargetKey.invoke(dataSource);
} catch (Exception e) {
log.error("MultiDataSourceManagedTransaction invoke getCurrentTargetKey 異常", e);
return null;
}
if (CON_MAP.get(dataSourceKey) == null) {
Connection connection = dataSource.getConnection();
if (!TransactionSynchronizationManager.isActualTransactionActive()) {
connection.setAutoCommit(true);
} else {
connection.setAutoCommit(false);
}
CON_MAP.put(dataSourceKey, connection);
return connection;
}
return CON_MAP.get(dataSourceKey);
}
@Override
public void commit() throws SQLException {
if (CON_MAP == null || CON_MAP.size() == 0) {
return;
}
Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
for (Map.Entry<String, Connection> entry : entries) {
Connection value = entry.getValue();
if (!value.isClosed() && !value.getAutoCommit()) {
value.commit();
}
}
}
@Override
public void rollback() throws SQLException {
if (CON_MAP == null || CON_MAP.size() == 0) {
return;
}
Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
for (Map.Entry<String, Connection> entry : entries) {
Connection value = entry.getValue();
if (value == null) {
continue;
}
if (!value.isClosed() && !value.getAutoCommit()) {
entry.getValue().rollback();
}
}
}
@Override
public void close() throws SQLException {
if (CON_MAP == null || CON_MAP.size() == 0) {
return;
}
Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
for (Map.Entry<String, Connection> entry : entries) {
DataSourceUtils.releaseConnection(entry.getValue(), this.dataSource);
}
CON_MAP.clear();
}
}
注:上面並不是分佈式事務。在數據收口之前,它只存在於同一個 JVM 中。如果項目允許,可以考慮使用 Atomikos 和 Mybatis 整合的方案。
數據安全性
本次進行了很多代碼改造,如何保證數據安全,保證數據不丟失,我們的機制如下,分爲三種情況進行討論:
-
跨庫事務:6 處,採用了代碼保證一致性的改造方式;上線前經過重點測試,保證邏輯無問題;
-
單庫事務:依賴於自定義事務實現,針對自定義事務實現這一個類進行充分測試即可,測試範圍小,安全性有保障;
-
其餘單表操作:相關修改是在 mapper 上添加了數據源切換註解,改動位置幾百處,幾乎是無腦改動,但也存在遺漏或錯改的可能;測試同學可以覆蓋到核心業務流程,但邊緣業務可能會遺漏;我們添加了線上監測機制,當出現找不到表的錯誤時(說明數據源切換註解添加錯誤),記錄當前執行 sql 並報警,我們進行邏輯修復與數據處理。
綜上,通過對三種情況的處理來保證數據的安全性。
三、應用拆分
系統接近單體架構,存在以下風險:
-
系統性風險:一個組件缺陷會導致整個進程崩潰,如內存泄漏、死鎖。
-
複雜性高:系統代碼繁多,每次修改代碼都心驚膽戰,任何一個 bug 都可能導致整個系統崩潰,不敢優化代碼導致代碼可讀性也越來越差。
-
測試環境衝突,測試效率低:業務都耦合在一個系統,只要有需求就會出現環境搶佔,需要額外拉分支合併代碼。
拆分方案
與數據庫拆分相同,系統拆分也是根據業務劃分拆成 9 個新系統。
方案一:搭建空的新系統,然後將老系統的相關代碼挪到新系統。
-
優點:一步到位。
-
缺點:需要主觀挑選代碼,然後挪到新系統,可視爲做了全量業務邏輯的變動,需要全量測試,風險高,週期長。
方案二:從老系統原樣複製出 9 個新系統,然後直接上線,通過流量路由將老系統流量轉發到新系統,後續再對新系統的冗餘代碼做刪減。
-
優點:拆分速度快, 首次上線前無業務邏輯改動,風險低;後續刪減代碼時依據接口調用量情況來判定,也可視爲無業務邏輯的改動,風險較低,並且各系統可各自進行,無需整體排期, 較爲靈活。
-
缺點:分爲了兩步,拆分上線和刪減代碼
我們在考慮拆分風險和拆分效率後,最終選擇了方案二。
拆分實踐
1、搭建新系統
直接複製老系統代碼,修改系統名稱,部署即可
2、流量路由
路由器是拆分的核心,負責分發流量到新系統,同時需要支持識別測試流量,讓測試同學可以提前在線上測試新系統。我們這邊用 filter 來作爲路由器的,源碼見下方。
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws ServletException, IOException {
HttpServletRequest servletRequest = (HttpServletRequest) request;
HttpServletResponse servletResponse = (HttpServletResponse) response;
// 路由開關(0-不路由, 1-根據指定請求頭路由, 2-全量路由)
final int systemRouteSwitch = configUtils.getInteger("system_route_switch", 1);
if (systemRouteSwitch == 0) {
filterChain.doFilter(request, response);
return;
}
// 只路由測試流量
if (systemRouteSwitch == 1) {
// 檢查請求頭是否包含測試流量標識 包含才進行路由
String systemRoute = ((HttpServletRequest) request).getHeader("systemRoute");
if (systemRoute == null || !systemRoute.equals("1")) {
filterChain.doFilter(request, response);
return;
}
}
String systemRouteMapJsonStr = configUtils.getString("route.map", "");
Map<String, String> map = JSONObject.parseObject(systemRouteMapJsonStr, Map.class);
String rootUrl = map.get(servletRequest.getRequestURI());
if (StringUtils.isEmpty(rootUrl)) {
log.error("路由失敗,本地服務內部處理。原因:請求地址映射不到對應系統, uri : {}", servletRequest.getRequestURI());
filterChain.doFilter(request, response);
return;
}
String targetURL = rootUrl + servletRequest.getRequestURI();
if (servletRequest.getQueryString() != null) {
targetURL = targetURL + "?" + servletRequest.getQueryString();
}
RequestEntity<byte[]> requestEntity = null;
try {
log.info("路由開始 targetURL = {}", targetURL);
requestEntity = createRequestEntity(servletRequest, targetURL);
ResponseEntity responseEntity = restTemplate.exchange(requestEntity, byte[].class);
if (requestEntity != null && requestEntity.getBody() != null && requestEntity.getBody().length > 0) {
log.info("路由完成-請求信息: requestEntity = {}, body = {}", requestEntity.toString(), new String(requestEntity.getBody()));
} else {
log.info("路由完成-請求信息: requestEntity = {}", requestEntity != null ? requestEntity.toString() : targetURL);
}
HttpHeaders headers = responseEntity.getHeaders();
String resp = null;
if (responseEntity.getBody() != null && headers != null && headers.get("Content-Encoding") != null && headers.get("Content-Encoding").contains("gzip")) {
byte[] bytes = new byte[30 * 1024];
int len = new GZIPInputStream(new ByteArrayInputStream((byte[]) responseEntity.getBody())).read(bytes, 0, bytes.length);
resp = new String(bytes, 0, len);
}
log.info("路由完成-響應信息: targetURL = {}, headers = {}, resp = {}", targetURL, JSON.toJSONString(headers), resp);
if (headers != null && headers.containsKey("Location") && CollectionUtils.isNotEmpty(headers.get("Location"))) {
log.info("路由完成-需要重定向到 {}", headers.get("Location").get(0));
((HttpServletResponse) response).sendRedirect(headers.get("Location").get(0));
}
addResponseHeaders(servletRequest, servletResponse, responseEntity);
writeResponse(servletResponse, responseEntity);
} catch (Exception e) {
if (requestEntity != null && requestEntity.getBody() != null && requestEntity.getBody().length > 0) {
log.error("路由異常-請求信息: requestEntity = {}, body = {}", requestEntity.toString(), new String(requestEntity.getBody()), e);
} else {
log.error("路由異常-請求信息: requestEntity = {}", requestEntity != null ? requestEntity.toString() : targetURL, e);
}
response.setCharacterEncoding("UTF-8");
((HttpServletResponse) response).addHeader("Content-Type", "application/json");
response.getWriter().write(JSON.toJSONString(ApiResponse.failed("9999", "網絡繁忙哦~,請您稍後重試")));
}
}
3、接口抓取 & 歸類
路由 filter 是根據接口路徑將請求分發到各個新系統的,所以需要抓取一份接口和新系統的映射關係。
我們這邊自定義了一個註解 @TargetSystem,用註解標識接口應該路由到的目標系統域名
@TargetSystem(value = "http://order.demo.com")
@GetMapping("/order/info")
public ApiResponse orderInfo(String orderId) {
return ApiResponse.success();
}
歷獲取所有 controller 根據接口地址和註解生成路由映射關係 map
/**
* 生成路由映射關係MAP
* key:接口地址 ,value:路由到目標新系統的域名
*/
public Map<String, String> generateRouteMap() {
Map<RequestMappingInfo, HandlerMethod> handlerMethods = requestMappingHandlerMapping.getHandlerMethods();
Set<Map.Entry<RequestMappingInfo, HandlerMethod>> entries = handlerMethods.entrySet();
Map<String, String> map = new HashMap<>();
for (Map.Entry<RequestMappingInfo, HandlerMethod> entry : entries) {
RequestMappingInfo key = entry.getKey();
HandlerMethod value = entry.getValue();
Class declaringClass = value.getMethod().getDeclaringClass();
TargetSystem targetSystem = (TargetSystem) declaringClass.getAnnotation(TargetSystem.class);
String targetUrl = targetSystem.value();
String s1 = key.getPatternsCondition().toString();
String url = s1.substring(1, s1.length() - 1);
map.put(url, targetUrl);
}
return map;
}
4、測試流量識別
測試可以用利用抓包工具 charles,爲每個請求都添加固定的請求頭,也就是測試流量標識,路由器攔截請求後判斷請求頭內是否包含測試流量標,包含就路由到新系統,不包含就是線上流量留在老系統執行。
5、需求代碼合併
執行系統拆分的過程中,還是有需求正在並行開發,並且需求代碼是寫在老系統的,系統拆分完成上線後,需要將這部分需求的代碼合併到新系統,同時要保證 git 版本記錄不能丟失,那應該怎麼做呢?
我們利用了 git 可以添加多個多個遠程倉庫來解決需求合併的痛點,命令:git remote add origin 倉庫地址,把新系統的 git 倉庫地址添加爲老系統 git 的遠程倉庫,老系統的 git 變動就可以同時 push 到所有新系統的倉庫內,新系統 pull 下代碼後進行合併。
6、上線風險
風險一:JOB 在新老系統並行執行。新系統是複製的老系統,JOB 也會複製過來,導致新老系統有相同的 JOB,如果這時候上線新系統,新系統的 JOB 就會執行,老系統的 JOB 也一直在 run,這樣一個 JOB 就會執行 2 次。新系統剛上線還沒經過測試驗證,這時候執行 JOB 是有可能失敗的。以上 2 種情況都會引起線上 Bug,影響系統穩定性。
風險二:新系統提前消費 MQ。和風險一一樣,新系統監聽和老系統一樣的 topic,如果新系統直接上線,消息是有可能被新系統消費的,新系統剛上線還沒經過測試驗證,消費消息有可能會出異常,造成消息丟失或其他問題,影響系統穩定性。
如何解決以上 2 個上線風險呢?
我們用 “動態開關” 解決了上述風險,爲新老系統的 JOB 和 MQ 都加了開關,用開關控制 JOB 和 MQ 在新 / 老系統執行。上線後新系統的 JOB 和 MQ 都是關掉的,待 QA 測試通過後,把老系統的 JOB 和 MQ 關掉,把新系統的 JOB 和 MQ 打開就可以了。
系統瘦身
拆分的時候已經梳理出了一份 “入口映射關係 map”,每個新系統只需要保留自己系統負責的接口、JOB、MQ 代碼就可以了,除此之外都可以進行刪除。
拆分帶來的好處
-
系統架構更合理,可用性更高:即使某個服務掛了也不會導致整個系統崩潰
-
複雜性可控:每個系統都是單一職責,系統邏輯清晰
-
系統性能提升上限大:可以針對每個系統做優化,如加緩存
-
測試環境衝突的問題解決,不會因爲多個系統需求並行而搶佔環境
四、數據訪問權限接口
問題介紹
數據訪問權限未收口:一個業務的數據庫被其餘業務應用直接訪問,未通過 rpc 接口將數據訪問權限收口到數據擁有方自己的應用。數據訪問邏輯分散,存在業務耦合,阻礙後續迭代和優化。
問題產生的背景:之前是單體應用和單體數據庫,未進行業務隔離。在進行數據庫拆分和系統拆分時,爲解決系統穩定性的問題需快速上線,所以未優化拆分後跨業務訪問數據庫的情況。本階段是對數據庫拆分和應用拆分的延伸和補充。
改造過程
- RPC 接口統計(如圖一)
進行比對,如程序入口歸類和調用的業務 DB 歸類不一致,則認爲 Dao 方法需提供 RPC 接口
經統計,應用訪問非本業務數據庫的位置有 260+。由於涉及位置多,人工改造成本高、效率較低,且有錯改和漏掉的風險,我們採用了開發工具,用工具進行代碼生成和批量修改的方式進行改造。
- RPC 接口生成(如圖二)
-
讀取需要生成 RPC 接口的 Dao 文件,進行解析
-
獲取文件名稱,Dao 方法列表,import 導包列表等,放入 ClassContext 上下文
-
匹配 api、rpc 文件模板,從 classContext 內取值替換模板變量,通過 package 路徑生成 java 文件到指定服務內
-
批量將服務內 Dao 名稱後綴替換爲 Rpc 服務名,減少人工改動風險,例:SettleRuleDao -> SettleRuleRpc
名詞解釋:
-
ftl:Freemarker 模板的文件後綴名,FreeMarker 是一個模版引擎,一個基於文本的模板輸出工具。
-
interfaceName:用存放 api 文件名稱
-
className:用於存放 serviceImpl 文件名稱
-
methodList:用於存放方法列表,包含入參、出參、返回值等信息
-
importList:用於存放 api 和 impl 文件內其他引用實體的導包路徑
-
apiPackage:用於存放生成的 Api 接口類包名
-
implPackage:用於存放生成的 Api 實現類包名
-
rpcPackage:用於存放生成的 rpc 調用類包名
- 灰度方案(如圖三)
-
數據操作統一走 RPC 層處理,初期階段 RPC 層兼顧 RPC 調用,也有之前的 DAO 調用,使用開關切換。
-
RPC 層進行雙讀,進行 Api 層和 Dao 層返回結果的比對,前期優先返回 Dao 層結果,驗證無問題後,在全量返回 RPC 的結果,清除其他業務數據庫連接。
-
支持開關一鍵切換,按流量進行灰度,降低數據收口風險
收益
-
業務數據解耦,數據操作統一由各自垂直系統進行,入口統一
-
方便後續在接口粒度上增加緩存和降級處理
五、總結
以上,是我們對單體系統的改造過程,經過了三步優化、上線,將單體系統平滑過渡到了微服務結構。解決了數據庫的單點問題、性能問題,應用業務得到了簡化,更利於分工,迭代。並且可以針對各業務單獨進行優化升級,擴容、縮容,提升了資源的利用率。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Orbm7viIOCW--OkG3SpwOg