SpringBoot 多數據源及事務解決方案

大家好,我是不才陳某~

  1. 背景

一個主庫和 N 個應用庫的數據源,並且會同時操作主庫和應用庫的數據,需要解決以下兩個問題:

本文主要探討這兩個問題的解決方案,希望能對讀者有一定的啓發。

  1. 數據源切換原理

通過擴展 Spring 提供的抽象類AbstractRoutingDataSource,可以實現切換數據源。其類結構如下圖所示:

項目上需要使用的所有數據源和默認數據源。

當 Spring 容器創建AbstractRoutingDataSource對象時,通過調用afterPropertiesSet複製上述目標數據源。由此可見,一旦數據源實例對象創建完畢,業務無法再添加新的數據源。

此方法爲抽象方法,通過擴展這個方法來實現數據源的切換。目標數據源的結構爲:Map<Object, DataSource>其 key 爲lookup key

我們來看官方對這個方法的註釋:

lookup key 通常是綁定在線程上下文中,根據這個 key 去resolvedDataSources中取出 DataSource。

根據目標數據源的管理方式不同,可以使用基於配置文件和數據庫表兩種方式。基於配置文件管理方案無法後續添加新的數據源,而基於數據庫表方案管理,則更加靈活。關注公 z 號:碼猿技術專欄,回覆關鍵詞:1111 獲取阿里內部 Java 性能調優手冊!

  1. 配置文件解決方案

根據上面的分析,我們可以按照下面的步驟去實現:

其流程如下圖所示:

3.1 創建數據源

DynamicDataSource數據源的注入,目前業界主流實現步驟如下:

在配置文件中定義數據源

spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassName=com.mysql.jdbc.Driver
# 主數據源
spring.datasource.druid.master.url=jdbcUrl
spring.datasource.druid.master.username=***
spring.datasource.druid.master.password=***
# 其他數據源
spring.datasource.druid.second.url=jdbcUrl
spring.datasource.druid.second.username=***
spring.datasource.druid.second.password=***

在代碼中配置 Bean

@Configuration
public class DynamicDataSourceConfig {
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource firstDataSource(){
        return DruidDataSourceBuilder.create().build();
    }
 
    @Bean
    @ConfigurationProperties("spring.datasource.druid.second")
    public DataSource secondDataSource(){
        return DruidDataSourceBuilder.create().build();
    }
 
    @Bean
    @Primary
    public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) {
        Map<Object, Object> targetDataSources = new HashMap<>(5);
        targetDataSources.put(DataSourceNames.FIRST, firstDataSource);
        targetDataSources.put(DataSourceNames.SECOND, secondDataSource);
        return new DynamicDataSource(firstDataSource, targetDataSources);
    }
}

3.2 AOP 處理

通過DataSourceAspect切面技術來簡化業務上的使用,只需要在業務方法添加@SwitchDataSource註解即可完成動態切換:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface SwitchDataSource {
    String value();
}

DataSourceAspect攔截業務方法,更新當前線程上下文DataSourceContextHolder中存儲的 key,即可實現數據源切換。

3.3 方案不足

基於AbstractRoutingDataSource的多數據源動態切換,有個明顯的缺點,無法動態添加和刪除數據源。在我們的產品中,不能把應用數據源寫死在配置文件。接下來分享一下基於數據庫表的實現方案。

  1. 數據庫表解決方案

我們需要實現可視化的數據源管理,並實時查看數據源的運行狀態。所以我們不能把數據源全部配置在文件中,應該將數據源定義保存到數據庫表。參考AbstractRoutingDataSource的設計思路,實現自定義數據源管理。

4.1 設計數據源表

主庫的數據源信息仍然配置在項目配置文件中,應用庫數據源配置參數,則設計對應的數據表。表結構如下所示:

這個表主要就是DataSource的相關配置參數,其相應的 ORM 操作代碼在此不再贅述,主要是實現數據源的增刪改查操作。

4.2 自定義數據源管理

4.2.1 定義管理接口

通過繼承AbstractDataSource即可實現DynamicDataSource。爲了方便對數據源進行操作,我們定義一個接口DataSourceManager,爲業務提供操作數據源的統一接口。

public interface DataSourceManager {
    void put(String var1, DataSource var2);
 
    DataSource get(String var1);
 
    Boolean hasDataSource(String var1);
 
    void remove(String var1);
 
    void closeDataSource(String var1);
 
    Collection<DataSource> all();
}

該接口主要是對數據表中定義的數據源,提供基礎管理功能。

4.2.2 自定義數據源

DynamicDataSource的實現如下圖所示:

根據前面的分析,AbstractRoutingDataSource是在容器啓動的時候,執行afterPropertiesSet注入數據源對象,完成之後無法對數據源進行修改。DynamicDataSource則實現DataSourceManager接口,可以將數據表中的數據源加載到 dataSources。

4.2.3 切面處理

這一塊的處理跟配置文件數據源方案處理方式相同,都是通過 AOP 技術切換 lookup key。

public DataSource determineTargetDataSource() {
        String lookupKey = DataSourceContextHolder.getKey();
        DataSource dataSource = Optional.ofNullable(lookupKey)
                .map(dataSources::get)
                .orElse(defaultDataSource);
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine DataSource for lookup key [" + lookupKey + "]");
        }
        return dataSource;
    }

4.2.4 管理數據源狀態

在項目啓動的時候,加載數據表中的所有數據源,並執行初始化。初始化操作主要是使用 SpringBoot 提供的DataSourceBuilder類,根據數據源表的定義創建 DataSource。在項目運行過程中,可以使用定時任務對數據源進行保活,爲了提升性能再添加一層緩存。

AbstractRoutingDataSource 只支持單庫事務,切換數據源是在開啓事務之前執行。Spring 使用 DataSourceTransactionManager進行事務管理。開啓事務,會將數據源緩存到DataSourceTransactionObject對象中,後續的 commit 和 rollback 事務操作實際上是使用的同一個數據源。

如何解決切庫事務問題?藉助 Spring 的聲明式事務處理,我們可以在多次切庫操作時強制開啓新的事務:

@SwitchDataSource    
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)

這樣的話,執行切庫操作的時候強制啓動新事務,便可實現多次切庫而且事務能夠生效。但是這種事務方式,存在數據一致性問題:

假若 ServiceB 正常執行提交事務,接着返回 ServiceA 執行並且發生異常。因爲兩次處理是不同的事務,ServiceA 這個事務執行回滾,而 ServiceA 事務已經提交。這樣的話,數據就不一致了。接下來,我們主要討論如何解決多庫的事務問題。

  1. 多庫事務處理

6.1 關於事務的理解

首先有必要理解事務的本質。

  1. 提到 Spring 事務,就離不開事務的四大特性和隔離級別、七大傳播特性。

事務特性和離級別是屬於數據庫範疇。Spring 事務的七大傳播特性是什麼呢?它是 Spring 在當前線程內,處理多個事務操作時的事務應用策略,數據庫事務本身並不存在傳播特性。

2.Spring 事務的定義包括:begin、commit、rollback、close、suspend、resume 等動作。

實際上,只有 commit、rollback、close 是在 JDBC 真實存在的,而其他動作都是應用的語意,而非 JDBC 事務的真實命令。因此,事務真實存在的方法是:setAutoCommit()commit()rollback()

close() 語義爲:

使用 DataSource 並不會執行物理關閉,只是歸還給連接池。

6.2 自定義管理事務

爲了保證在多個數據源中事務的一致性,我們可以手動管理Connetion的事務提交和回滾。考慮到不同 ORM 框架的事務管理實現差異,要求實現自定義事務管理不影響框架層的事務。

這可以通過使用裝飾器設計模式,對Connection進行包裝重寫 commit 和 rolllback 屏蔽其默認行爲,這樣就不會影響到原生Connection和 ORM 框架的默認事務行爲。其整體思路如下圖所示:

這裏並沒有使用前面提到的@SwitchDataSource,這是因爲我們在TransactionAop中已經執行了 lookupKey 的切換。

6.2.1 定義多事務註解

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransaction {
    String transactionManager() default "multiTransactionManager";
    // 默認數據隔離級別,隨數據庫本身默認值
    IsolationLevel isolationLevel() default IsolationLevel.DEFAULT;
    // 默認爲主庫數據源
    String datasourceId() default "default";
    // 只讀事務,若有更新操作會拋出異常
    boolean readOnly() default false;

業務方法只需使用該註解即可開啓事務,datasourceId指定事務用到的數據源,不指定默認爲主庫。

6.2.3 包裝 Connection

自定義事務我們使用包裝過的Connection,屏蔽其中的commit&rollback方法。這樣我們就可以在主事務裏進行統一的事務提交和回滾操作。

public class ConnectionProxy implements Connection {
 
    private final Connection connection;
 
    public ConnectionProxy(Connection connection) {
        this.connection = connection;
    }
 
    @Override
    public void commit() throws SQLException {
        // connection.commit();
    }
 
    public void realCommit() throws SQLException {
        connection.commit();
    }
 
    @Override
    public void close() throws SQLException {
        //connection.close();
    }
 
    public void realClose() throws SQLException {
        if (!connection.getAutoCommit()) {
            connection.setAutoCommit(true);
        }
        connection.close();
    }
 
    @Override
    public void rollback() throws SQLException {
        if(!connection.isClosed())
            connection.rollback();
    }
    ...
}

這裏commit&close方法不執行操作,rollback 執行的前提是連接執行 close 才生效。這樣不管是使用哪個 ORM 框架,其自身事務管理都將失效。事務的控制就交由MultiTransaction控制了。

6.2.4 事務上下文管理

public class TransactionHolder {
    // 是否開啓了一個MultiTransaction
    private boolean isOpen;
    // 是否只讀事務
    private boolean readOnly;
    // 事務隔離級別
    private IsolationLevel isolationLevel;
    // 維護當前線程事務ID和連接關係
    private ConcurrentHashMap<String, ConnectionProxy> connectionMap;
    // 事務執行棧
    private Stack<String> executeStack;
    // 數據源切換棧
    private Stack<String> datasourceKeyStack;
    // 主事務ID
    private String mainTransactionId;
    // 執行次數
    private AtomicInteger transCount;
 
    // 事務和數據源key關係
    private ConcurrentHashMap<String, String> executeIdDatasourceKeyMap;
 
}

每開啓一個事物,生成一個事務 ID 並綁定一個ConnectionProxy。事務嵌套調用,保存事務 ID 和 lookupKey 至棧中,當內層事務執行完畢執行 pop。這樣的話,外層事務只需在棧中執行 peek 即可獲取事務 ID 和 lookupKey。

6.2.5 數據源兼容處理

爲了不影響原生事務的使用,需要重寫getConnection方法。當前線程沒有啓動自定義事務,則直接從數據源中返回連接。

@Override
    public Connection getConnection() throws SQLException {
        TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
        if (Objects.isNull(transactionHolder)) {
            return determineTargetDataSource().getConnection();
        }
        ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap()
                .get(transactionHolder.getExecuteStack().peek());
        if (ConnectionProxy == null) {
            // 沒開跨庫事務,直接返回
            return determineTargetDataSource().getConnection();
        } else {
            transactionHolder.addCount();
            // 開了跨庫事務,從當前線程中拿包裝過的Connection
            return ConnectionProxy;
        }
    }

6.2.6 切面處理

切面處理的核心邏輯是:維護一個嵌套事務棧,當業務方法執行結束,或者發生異常時,判斷當前棧頂事務 ID 是否爲主事務 ID。如果是的話這時候已經到了最外層事務,這時才執行提交和回滾。詳細流程如下圖所示:

package com.github.mtxn.transaction.aop;
@Aspect
@Component
@Slf4j
@Order(99999)
public class MultiTransactionAop {
 
    @Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
    public void pointcut() {
        if (log.isDebugEnabled()) {
            log.debug("start in transaction pointcut...");
        }
    }
 
 
    @Around("pointcut()")
    public Object aroundTransaction(ProceedingJoinPoint point) throws Throwable {
        MethodSignature signature = (MethodSignature) point.getSignature();
        // 從切面中獲取當前方法
        Method method = signature.getMethod();
        MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction.class);
        if (multiTransaction == null) {
            return point.proceed();
        }
        IsolationLevel isolationLevel = multiTransaction.isolationLevel();
        boolean readOnly = multiTransaction.readOnly();
        String prevKey = DataSourceContextHolder.getKey();
        MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager());
        // 切數據源,如果失敗使用默認庫
        if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed();
        // 開啓事務棧
        TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager);
        Object proceed;
 
        try {
            proceed = point.proceed();
            multiTransactionManager.commit();
        } catch (Throwable ex) {
            log.error("execute method:{}#{},err:", method.getDeclaringClass(), method.getName(), ex);
            multiTransactionManager.rollback();
            throw ExceptionUtils.api(ex, "系統異常:%s", ex.getMessage());
        } finally {
            // 當前事務結束出棧
            String transId = multiTransactionManager.getTrans().getExecuteStack().pop();
            transactionHolder.getDatasourceKeyStack().pop();
            // 恢復上一層事務
            DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
            // 最後回到主事務,關閉此次事務
            multiTransactionManager.close(transId);
        }
        return proceed;
 
    }
 
 
}
  1. 總結

本文主要介紹了多數據源管理的解決方案 (應用層事務,而非 XA 二段提交保證),以及對多個庫同時操作的事務管理。

需要注意的是,這種方式只適用於單體架構的應用。因爲多個庫的事務參與者都是運行在同一個 JVM 進行。如果是在微服務架構的應用中,則需要使用分佈式事務管理 (譬如:Seata)。

來源:https://blog.csdn.net/qq381332153

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/57nxz7286PO1X6x7XO907Q