DDD 落地之事件驅動模型

一. 前言

hello,everyone。一日不見,如隔 24 小時。

週末的時候寫了一文帶你落地 DDD[1], 發現大家對於新的領域與知識都挺感興趣的。後面將會出幾篇 DDD 系列文章給大家介紹 mvc 遷移 DDD 實際要做的一些步驟。

DDD 的理念中有一個是貫穿始終的,業務邊界與解耦。我最開始不瞭解 DDD 的時候,我就覺得事件驅動模型能夠非常好的解耦系統功能。當然,這個是我比較菜,在接觸 DDD 之後纔開始對事件驅動模型做深度應用與瞭解。其實無論是在 spring 的框架中還是在日常 MVC 代碼的編寫過程中,巧用事件驅動模型都能很好的提高代碼的可維護性。

因此,本文將對 DDD 中使用事件驅動模型建立與踩坑做一個系統性的介紹。從應用層面出發,幫助大家更好的去進行架構遷移。

二. 事件驅動模型

2.1. 爲什麼需要事件驅動模型

一個框架,一門技術,使用之前首先要清楚,什麼樣的業務場景需要使用這個東西。爲什麼要用跟怎麼樣把他用好更加重要。

假設我們現在有一個比較龐大的單體服務的訂單系統,有下面一個業務需求:創建訂單後,需要下發優惠券,給用戶增長積分

先看一下,大多數同學在單體服務內的寫法。【假設訂單,優惠券,積分均爲獨立 service】

//在orderService內部定義一個放下
@Transactional(rollbackFor = Exception.class)
public void createOrder(CreateOrderCommand command){
  //創建訂單
  Long orderId = this.doCreate(command);
  //發送優惠券
  couponService.sendCoupon(command,orderId);
  //增長積分
  integralService.increase(command.getUserId,orderId);
}

上面這樣的代碼在線上運行會不會有問題?不會!

那爲什麼要改呢?

原因是,業務需求在不斷迭代的過程中,與當前業務非強相關的主流程業務,隨時都有可能被替換或者升級。

雙 11 大促,用戶下單的同時需要給每個用戶贈送幾個小禮品,那你又要寫一個函數了,拼接在主方法的後面。雙 11 結束,這段要代碼要被註釋。有一年大促,贈送的東西改變,代碼又要加回來。。。。

來來回回的,訂單邏輯變得又臭又長,註釋的代碼邏輯很多還不好閱讀與理解。

如果用了事件驅動模型,那麼當第一步創建訂單成功之後,發佈一個創建訂單成功的領域事件。優惠券服務,積分服務,贈送禮品等等監聽這個事件,對監聽到的事件作出相應的處理。

事件驅動模型代碼

//在orderService內部定義一個放下
@Transactional(rollbackFor = Exception.class)
public void createOrder(CreateOrderCommand command){
  //創建訂單
  Long orderId = this.doCreate(command);
  publish(orderCreateEvent);
}
//各個需要監聽的服務
public void handlerEvent(OrderCreateEvent event){
//邏輯處理
}

代碼解耦,高度符合開閉原則

2.2. 事件驅動模型選型

2.2.1.JDK 中時間驅動機制

JDK 爲我們提供的事件驅動 (EventListener、EventObject)、觀察者模式 (Observer)。

JDK 不僅提供了Observable類、Observer接口支持觀察者模式,而且也提供了EventObjectEventListener接口來支持事件監聽模式。

觀察者 (Observer) 相當於事件監聽者(監聽器) ,被觀察者 (Observable) 相當於事件源和事件,執行邏輯時通知 observer 即可觸發 oberver 的 update, 同時可傳被觀察者和參數。簡化了事件 - 監聽模式的實現

// 觀察者,實現此接口即可
public interface Observer {
  /**
  * 當被觀察的對象發生變化時候,這個方法會被調用
  * Observable o:被觀察的對象
  * Object arg:傳入的參數
  **/
  void update(Observable o, Object arg);
}
// 它是一個Class
public class Observable {
  // 是否變化,決定了後面是否調用update方法
  private boolean changed = false;
  // 用來存放所有`觀察自己的對象`的引用,以便逐個調用update方法
  // 需要注意的是:1.8的jdk源碼爲Vector(線程安全的),有版本的源碼是ArrayList的集合實現;
  private Vector<Observer> obs;
  public Observable() {
  obs = new Vector<>();
  }
  public synchronized void addObserver(Observer o); //添加一個觀察者 注意調用的是addElement方法,   添加到末尾   所以執行時是倒序執行的
  public synchronized void deleteObserver(Observer o);
  public synchronized void deleteObservers(); //刪除所有的觀察者
  // 循環調用所有的觀察者的update方法
  public void notifyObservers();
  public void notifyObservers(Object arg);
  public synchronized int countObservers() {
  return obs.size();
  }
  // 修改changed的值
  protected synchronized void setChanged() {
    changed = true;
  }
  protected synchronized void clearChanged() {
    changed = false;
  }
  public synchronized boolean hasChanged() {
    return changed;
  }
}

2.2.2.spring 中的事件驅動機制

spring 在 4.2 之後提供了@EventListener註解,讓我們更便捷的使用監聽。

瞭解過 spring 啓動流程的同學都知道,Spring 容器刷新的時候會發布ContextRefreshedEvent事件,因此若我們需要監聽此事件,直接寫個監聽類即可。

@Slf4j
@Component
public class ApplicationRefreshedEventListener implements   ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        //解析這個事件,做你想做的事,嘿嘿
    }
}

同樣的我們也可以自己來定義一個事件,通過ApplicationEventPublisher發送。

/**
 * 領域事件基類
 *
 * @author baiyan
 * @date 2021/09/07
 */
@Getter
@Setter
@NoArgsConstructor
public abstract class BaseDomainEvent<T> implements Serializable {
    private static final long serialVersionUID = 1465328245048581896L;
    /**
     * 領域事件id
     */
    private String demandId;
    /**
     * 發生時間
     */
    private LocalDateTime occurredOn;
    /**
     * 領域事件數據
     */
    private T data;
    public BaseDomainEvent(String demandId, T data) {
        this.demandId = demandId;
        this.data = data;
        this.occurredOn = LocalDateTime.now();
    }
}

定義統一的業務總線發送事件

/**
 * 領域事件發佈接口
 *
 * @author baiyan
 * @date  2021/09/07
 */
public interface DomainEventPublisher {
    /**
     * 發佈事件
     *
     * @param event 領域事件
     */
    void publishEvent(BaseDomainEvent event);
}
/**
 * 領域事件發佈實現類
 *
 * @author baiyan
 * @date  2021/09/07
 */
@Component
@Slf4j
public class DomainEventPublisherImpl implements DomainEventPublisher {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    @Override
    public void publishEvent(BaseDomainEvent event) {
        log.debug("發佈事件,event:{}", event.toString());
        applicationEventPublisher.publishEvent(event);
    }
}

監聽事件

@Component
@Slf4j
public class UserEventHandler {
    @EventListener
    public void handleEvent(DomainEvent event) {
       //doSomething
    }
}

蕪湖,起飛~

相比較與 JDK 提供的觀察者模型的事件驅動,spring 提供的方式就是 yyds。

2.3. 事件驅動之事務管理

平時我們在完成某些數據的入庫後,發佈了一個事件。後續我們進行操作記錄在 es 的記載,但是這時 es 可能集羣響應超時了,操作記錄入庫失敗報錯。但是從業務邏輯上來看,操作記錄的入庫失敗,不應該影響到主流程的邏輯執行,需要事務獨立。亦或是,如果主流程執行出錯了,那麼我們需要觸發一個事件,發送釘釘消息到羣裏進行線上業務監控,需要在主方法邏輯中拋出異常再調用此事件。這時,我們如果使用的是@EventListener,上述業務場景的實現就是比較麻煩的邏輯了。

爲了解決上述問題,Spring 爲我們提供了兩種方式:

(1)@TransactionalEventListener註解。

(2) 事務同步管理器TransactionSynchronizationManager

本文針對@TransactionalEventListener進行一下解析。

我們可以從命名上直接看出,它就是個EventListener,在 Spring4.2+,有一種叫做@TransactionEventListener的方式,能夠實現在控制事務的同時,完成對對事件的處理。

//被@EventListener標註,表示它能夠監聽事件
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
  //表示當前事件跟隨消息發送方事務的出發時機,默認爲消息發送方事務提交之後才進行處理。
   TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
   //true時不論發送方是否存在事務均出發當前事件處理邏輯
   boolean fallbackExecution() default false;
   //監聽的事件具體類型,還是建議指定一下,避免監聽到子類的一些情況出現
   @AliasFor(annotation = EventListener.class, attribute = "classes")
   Class<?>[] value() default {};
   //指向@EventListener對應的值
   @AliasFor(annotation = EventListener.class, attribute = "classes")
   Class<?>[] classes() default {};
   //指向@EventListener對應的值
   String condition() default "";
}
public enum TransactionPhase {
   // 指定目標方法在事務commit之前執行
   BEFORE_COMMIT,
   // 指定目標方法在事務commit之後執行
    AFTER_COMMIT,
    // 指定目標方法在事務rollback之後執行
    AFTER_ROLLBACK,
   // 指定目標方法在事務完成時執行,這裏的完成是指無論事務是成功提交還是事務回滾了
   AFTER_COMPLETION
  }

我們知道,Spring 的事件監聽機制(發佈訂閱模型)實際上並不是異步的(默認情況下),而是同步的來將代碼進行解耦。而@TransactionEventListener仍是通過這種方式,但是加入了回調的方式來解決,這樣就能夠在事務進行 Commited,Rollback… 等時候纔去進行 Event 的處理,來達到事務同步的目的。

三. 實踐及踩坑

針對是事件驅動模型裏面的@TransactionEventListener@EventListener假設兩個業務場景。

新增用戶,關聯角色,增加關聯角色賦權操作記錄。

1.統一事務:上述三個操作事務一體,無論哪個發生異常,數據統一回滾。

2獨立事務:上述三個操作事務獨立,事件一旦發佈,後續發生任意異常均不影響。

3.1. 統一事務

用戶新增

@Service
@Slf4j
public class UserServiceImpl implements UserService {
    @Autowired
    DomainEventPublisher domainEventPublisher;
    @Transactional(rollbackFor = Exception.class)
    public void createUser(){
        //省略非關鍵代碼
        save(user);
        domainEventPublisher.publishEvent(userEvent);
    }
}

用戶角色關聯

@Component
@Slf4j
public class UserEventHandler {
    @Autowired
    DomainEventPublisher domainEventPublisher;
    @Autowired
    UserRoleService userRoleService;
    @EventListener
    public void handleEvent(UserEvent event) {
        log.info("接受到用戶新增事件:"+event.toString());
        //省略部分數據組裝與解析邏輯
        userRoleService.save(userRole);
        domainEventPublisher.publishEvent(userRoleEvent);
    }
}

用戶角色操作記錄

@Component
@Slf4j
public class UserRoleEventHandler {
    @Autowired
    UserRoleRecordService userRoleRecordService;
    @EventListener
    public void handleEvent(UserRoleEvent event) {
        log.info("接受到userRole事件:"+event.toString());
        //省略部分數據組裝與解析邏輯
        userRoleRecordService.save(record);
    }
}

以上即爲同一事務下的一個邏輯,任意方法內拋出異常,所有數據的插入邏輯都會回滾。

給出一下結論,@EventListener標註的方法是被加入在當前事務的執行邏輯裏面的,與主方法事務一體。

踩坑 1:

嚴格意義上來說這裏不算是把主邏輯從業務中拆分出來了,還是在同步的事務中,當然這個也是有適配場景的,大家爲了代碼簡潔性與函數級邏輯清晰可以這麼做。但是這樣做其實不是那麼 DDD,DDD 中應用服務的一個方法即爲一個用例,裏面貫穿了主流程的邏輯,既然是當前系統內強一致性的業務,那就應該在一個應用服務中體現。當然這個是屬於業務邊界的。舉例的場景來看,用戶與賦權顯然不是強一致性的操作,賦權失敗,不應該影響我新增用戶,所以這個場景下做 DDD 改造,不建議使用統一事務。

踩坑 2:

listener 裏面的執行邏輯可能比較耗時,需要做異步化處理,在UserEventHandler方法上標註@Async,那麼這裏與主邏輯的方法事務就隔離開了,監聽器內的事務開始獨立,將不會影響到 userService 內的事務。例如其他代碼不變的情況下用戶角色服務代碼修改如下

@Component
@Slf4j
public class UserEventHandler {
    @Autowired
    DomainEventPublisher domainEventPublisher;
    @Autowired
    UserRoleService userRoleService;
    @EventListener
    @Async
    public void handleEvent(UserEvent event) {
        log.info("接受到用戶新增事件:"+event.toString());
        //省略部分數據組裝與解析邏輯
        userRoleService.save(userRole);
        domainEventPublisher.publishEvent(userRoleEvent);
        throw new RuntimeException("製造一下異常");
    }
}

發現,用戶新增了,用戶角色關聯關係新增了,但是操作記錄沒有新增。第一個結果好理解,第二個結果就奇怪了把,事件監聽裏面拋了異常,但是居然數據保存成功了。

這裏其實是因爲UserEventHandlerhandleEvent方法外層爲嵌套@TransactionaluserRoleService.save操作結束,事務就提交了,後續的拋異常也不影響。爲了保持事務一致,在方法上加一個@Transactional即可。

3.2. 獨立事務

@EventListener 作爲驅動加載業務分散代碼管理還挺好的。但是在 DDD 層面,事務數據被雜糅在一起,除了問題一層層找也麻煩,而且數據捆綁較多,還是比較建議使用@TransactionalEventListene

用戶新增

@Service
@Slf4j
public class UserServiceImpl implements UserService {
    @Autowired
    DomainEventPublisher domainEventPublisher;
    @Transactional(rollbackFor = Exception.class)
    public void createUser(){
        //省略非關鍵代碼
        save(user);
        domainEventPublisher.publishEvent(userEvent);
    }
}

用戶角色關聯

@Component
@Slf4j
public class UserEventHandler {
    @Autowired
    DomainEventPublisher domainEventPublisher;
    @Autowired
    UserRoleService userRoleService;
    @TransactionalEventListener
    public void handleEvent(UserEvent event) {
        log.info("接受到用戶新增事件:"+event.toString());
        //省略部分數據組裝與解析邏輯
        userRoleService.save(userRole);
        domainEventPublisher.publishEvent(userRoleEvent);
    }
}

用戶角色操作記錄

@Component
@Slf4j
public class UserRoleEventHandler {
    @Autowired
    UserRoleRecordService userRoleRecordService;
    @TransactionalEventListener
    public void handleEvent(UserRoleEvent event) {
        log.info("接受到userRole事件:"+event.toString());
        //省略部分數據組裝與解析邏輯
        userRoleRecordService.save(record);
    }
}

一樣的代碼,把註解從@EventListener更換爲@TransactionalEventListener。執行之後發現了一個神奇的問題,用戶角色操作記錄數據沒有入庫!!!

捋一捋邏輯看看,換了個註解,就出現這個問題了,比較一下 · 兩個註解的區別。 @TransactionalEventListener事務獨立,且默認註解phase參數值爲 TransactionPhase.AFTER_COMMIT,即爲主邏輯方法事務提交後在執行。而我們知道 spring 中事務的提交關鍵代碼在AbstractPlatformTransactionManager.commitTransactionAfterReturning

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
   if (txInfo != null && txInfo.getTransactionStatus() != null) {
      if (logger.isTraceEnabled()) {
         logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
      }
      //斷點處
      txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
   }
}

配置文件中添加以下配置

logging:
  level:
    org:
      mybatis: debug

在上述代碼的地方打上斷點,再次執行邏輯。

發現,第一次userService保存數據進入此斷點,然後進入到userRoleService.save邏輯,此處不進入斷點,後續的操作記錄的事件處理方法也沒有進入。

在來看一下日誌

- 2021-09-07 19:54:38.166, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
- 2021-09-07 19:54:38.166, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:38.167, DEBUG, [,,], [http-nio-8088-exec-6], o.m.s.t.SpringManagedTransaction - JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@1832a0d9] will be managed by Spring
- 2021-09-07 19:54:38.184, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.430,  INFO, [,,], [http-nio-8088-exec-6], com.examp.event.demo.UserEventHandler - 接受到用戶新增事件:com.examp.event.demo.UserEvent@385db2f9
- 2021-09-07 19:54:53.602, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
- 2021-09-07 19:54:53.602, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818] was not registered for synchronization because synchronization is not active
- 2021-09-07 19:54:53.603, DEBUG, [,,], [http-nio-8088-exec-6], o.m.s.t.SpringManagedTransaction - JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@1832a0d9] will be managed by Spring
- 2021-09-07 19:54:53.622, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818]

注意看接受到用戶新增事件之後的日誌,SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818] was not registered for synchronization because synchronization is not active說明當前事件是無事務執行的邏輯。再回過頭去看一下@TransactionalEventListener,默認配置是在事務提交後才進行事件執行的,但是這裏事務都沒有,自然也就不會觸發事件了。

看圖捋一下代碼邏輯

那怎麼解決上面的問題呢?

其實這個東西還是比較簡單的:

  1. 可以對監聽此事件的邏輯無腦標註@TransactionalEventListener(fallbackExecution = true), 無論事件發送方是否有事務都會觸發事件。

  2. 在第二個發佈事件的上面標註一個@Transactional(propagation = Propagation.REQUIRES_NEW),切記不可直接標註@Transactional,這樣因爲 userService 上事務已經提交,而@Transactional默認事務傳播機制爲Propagation.REQUIRED, 如果當前沒有事務,就新建一個事務,如果已經存在一個事務,加入到這個事務中。

userService 中的事務還存在,只是已經被提交,無法再加入,也就是會導致操作記錄仍舊無法被插入。

將配置修改爲

logging:
  level:
    org: debug

可以看到日誌

- 2021-09-07 20:26:29.900, DEBUG, [,,], [http-nio-8088-exec-2], o.s.j.d.DataSourceTransactionManager - Cannot register Spring after-completion synchronization with existing transaction - processing Spring after-completion callbacks immediately, with outcome status 'unknown'

四. DDD 中的事件驅動應用

理清楚 spring 中事件驅動模型之後,我們所要做的就是開始解耦業務邏輯。

通過事件風暴理清楚業務用例,設計完成聚合根【ps:其實我覺得設計聚合根是最難的,業務邊界是需要團隊成員達成共識的地方,不是研發說了算的】,劃分好業務領域邊界,將原先雜糅在 service 裏面的各個邏輯根據聚合根進行:

1. 對於聚合的每次命令操作,都至少一個領域事 件發佈出去,表示操作的執行結果 2. 每一個領域事件都將被保存到事件存儲中 3. 從資源庫獲取聚合時,將根據發生在聚合上的 事件來重建聚合,事件的重放順序與其產生順序相同 4. 聚合快照:將聚合的某一事件發生時的狀態快 照序列化存儲下來。

五. 總結

本文着重介紹了事件驅動模型的概念與應用,並對實際可能出現的業務邏輯做了分析與避坑。最後對於 DDD 中如何進行以上事件驅動模型進行了分析。

當然我覺得到這裏大家應該對事件模型有了一個清晰的認知了,但是對於 DDD 中應用還是有些模糊。千言萬語匯成一句話:與聚合核心邏輯有關的,走應用服務編排,與核心邏輯無關的,走事件驅動模型,採用獨立事務模式。至於數據一致性,就根據大家自己相關的業務來決定了,方法與踩坑都告訴了大家了。

你我都是架構師!!!

六. 引用及參考

@TransactionalEventListener 的使用和實現原理 [6]

【小家 Spring】從 Spring 中的 (ApplicationEvent) 事件驅動機制出發,聊聊【觀察者模式】【監聽者模式】【發佈訂閱模式】【消息隊列 MQ】【EventSourcing】...[7]

七. 聯繫我

文中如有不正確之處,歡迎指正,寫文不易,點個贊吧,麼麼噠~

釘釘:louyanfeng25

微信:baiyan_lou

公衆號:柏炎大叔

引用鏈接

[1] 一文帶你落地 DDD: https://juejin.cn/post/7004002483601145863
[2] 一文帶你落地 DDD: https://juejin.cn/post/7004002483601145863
[3] DDD 落地之事件驅動模型: https://juejin.cn/post/7005175434555949092
[4] DDD 落地之倉儲: https://juejin.cn/post/7006595886646034463
[5] DDD 落地之架構分層: https://juejin.cn/post/7007382308667785253
[6] @TransactionalEventListener 的使用和實現原理: https://blog.csdn.net/qq_41378597/article/details/105748703
[7] 【小家 Spring】從 Spring 中的 (ApplicationEvent) 事件驅動機制出發,聊聊【觀察者模式】【監聽者模式】【發佈訂閱模式】【消息隊列 MQ】【EventSourcing】...: https://blog.csdn.net/f641385712/article/details/88806639

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/6vIZVSHi5cJHGUJT66gvxw