DDD 落地之事件驅動模型
一. 前言
hello,everyone。一日不見,如隔 24 小時。
週末的時候寫了一文帶你落地 DDD[1], 發現大家對於新的領域與知識都挺感興趣的。後面將會出幾篇 DDD 系列文章給大家介紹 mvc 遷移 DDD 實際要做的一些步驟。
DDD 的理念中有一個是貫穿始終的,業務邊界與解耦。我最開始不瞭解 DDD 的時候,我就覺得事件驅動模型能夠非常好的解耦系統功能。當然,這個是我比較菜,在接觸 DDD 之後纔開始對事件驅動模型做深度應用與瞭解。其實無論是在 spring 的框架中還是在日常 MVC 代碼的編寫過程中,巧用事件驅動模型都能很好的提高代碼的可維護性。
因此,本文將對 DDD 中使用事件驅動模型建立與踩坑做一個系統性的介紹。從應用層面出發,幫助大家更好的去進行架構遷移。
二. 事件驅動模型
2.1. 爲什麼需要事件驅動模型
一個框架,一門技術,使用之前首先要清楚,什麼樣的業務場景需要使用這個東西。爲什麼要用跟怎麼樣把他用好更加重要。
假設我們現在有一個比較龐大的單體服務的訂單系統,有下面一個業務需求:創建訂單後,需要下發優惠券,給用戶增長積分
先看一下,大多數同學在單體服務內的寫法。【假設訂單,優惠券,積分均爲獨立 service】
//在orderService內部定義一個放下
@Transactional(rollbackFor = Exception.class)
public void createOrder(CreateOrderCommand command){
//創建訂單
Long orderId = this.doCreate(command);
//發送優惠券
couponService.sendCoupon(command,orderId);
//增長積分
integralService.increase(command.getUserId,orderId);
}
上面這樣的代碼在線上運行會不會有問題?不會!
那爲什麼要改呢?
原因是,業務需求在不斷迭代的過程中,與當前業務非強相關的主流程業務,隨時都有可能被替換或者升級。
雙 11 大促,用戶下單的同時需要給每個用戶贈送幾個小禮品,那你又要寫一個函數了,拼接在主方法的後面。雙 11 結束,這段要代碼要被註釋。有一年大促,贈送的東西改變,代碼又要加回來。。。。
來來回回的,訂單邏輯變得又臭又長,註釋的代碼邏輯很多還不好閱讀與理解。
如果用了事件驅動模型,那麼當第一步創建訂單成功之後,發佈一個創建訂單成功的領域事件。優惠券服務,積分服務,贈送禮品等等監聽這個事件,對監聽到的事件作出相應的處理。
事件驅動模型代碼
//在orderService內部定義一個放下
@Transactional(rollbackFor = Exception.class)
public void createOrder(CreateOrderCommand command){
//創建訂單
Long orderId = this.doCreate(command);
publish(orderCreateEvent);
}
//各個需要監聽的服務
public void handlerEvent(OrderCreateEvent event){
//邏輯處理
}
代碼解耦,高度符合開閉原則
2.2. 事件驅動模型選型
2.2.1.JDK 中時間驅動機制
JDK 爲我們提供的事件驅動 (EventListener、EventObject
)、觀察者模式 (Observer
)。
JDK 不僅提供了Observable
類、Observer
接口支持觀察者模式,而且也提供了EventObject
、EventListener
接口來支持事件監聽模式。
觀察者 (Observer) 相當於事件監聽者(監聽器) ,被觀察者 (Observable
) 相當於事件源和事件,執行邏輯時通知 observer 即可觸發 oberver 的 update, 同時可傳被觀察者和參數。簡化了事件 - 監聽模式的實現
// 觀察者,實現此接口即可
public interface Observer {
/**
* 當被觀察的對象發生變化時候,這個方法會被調用
* Observable o:被觀察的對象
* Object arg:傳入的參數
**/
void update(Observable o, Object arg);
}
// 它是一個Class
public class Observable {
// 是否變化,決定了後面是否調用update方法
private boolean changed = false;
// 用來存放所有`觀察自己的對象`的引用,以便逐個調用update方法
// 需要注意的是:1.8的jdk源碼爲Vector(線程安全的),有版本的源碼是ArrayList的集合實現;
private Vector<Observer> obs;
public Observable() {
obs = new Vector<>();
}
public synchronized void addObserver(Observer o); //添加一個觀察者 注意調用的是addElement方法, 添加到末尾 所以執行時是倒序執行的
public synchronized void deleteObserver(Observer o);
public synchronized void deleteObservers(); //刪除所有的觀察者
// 循環調用所有的觀察者的update方法
public void notifyObservers();
public void notifyObservers(Object arg);
public synchronized int countObservers() {
return obs.size();
}
// 修改changed的值
protected synchronized void setChanged() {
changed = true;
}
protected synchronized void clearChanged() {
changed = false;
}
public synchronized boolean hasChanged() {
return changed;
}
}
2.2.2.spring 中的事件驅動機制
spring 在 4.2 之後提供了@EventListener
註解,讓我們更便捷的使用監聽。
瞭解過 spring 啓動流程的同學都知道,Spring 容器刷新的時候會發布ContextRefreshedEvent
事件,因此若我們需要監聽此事件,直接寫個監聽類即可。
@Slf4j
@Component
public class ApplicationRefreshedEventListener implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//解析這個事件,做你想做的事,嘿嘿
}
}
同樣的我們也可以自己來定義一個事件,通過ApplicationEventPublisher
發送。
/**
* 領域事件基類
*
* @author baiyan
* @date 2021/09/07
*/
@Getter
@Setter
@NoArgsConstructor
public abstract class BaseDomainEvent<T> implements Serializable {
private static final long serialVersionUID = 1465328245048581896L;
/**
* 領域事件id
*/
private String demandId;
/**
* 發生時間
*/
private LocalDateTime occurredOn;
/**
* 領域事件數據
*/
private T data;
public BaseDomainEvent(String demandId, T data) {
this.demandId = demandId;
this.data = data;
this.occurredOn = LocalDateTime.now();
}
}
定義統一的業務總線發送事件
/**
* 領域事件發佈接口
*
* @author baiyan
* @date 2021/09/07
*/
public interface DomainEventPublisher {
/**
* 發佈事件
*
* @param event 領域事件
*/
void publishEvent(BaseDomainEvent event);
}
/**
* 領域事件發佈實現類
*
* @author baiyan
* @date 2021/09/07
*/
@Component
@Slf4j
public class DomainEventPublisherImpl implements DomainEventPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void publishEvent(BaseDomainEvent event) {
log.debug("發佈事件,event:{}", event.toString());
applicationEventPublisher.publishEvent(event);
}
}
監聽事件
@Component
@Slf4j
public class UserEventHandler {
@EventListener
public void handleEvent(DomainEvent event) {
//doSomething
}
}
蕪湖,起飛~
相比較與 JDK 提供的觀察者模型的事件驅動,spring 提供的方式就是 yyds。
2.3. 事件驅動之事務管理
平時我們在完成某些數據的入庫後,發佈了一個事件。後續我們進行操作記錄在 es 的記載,但是這時 es 可能集羣響應超時了,操作記錄入庫失敗報錯。但是從業務邏輯上來看,操作記錄的入庫失敗,不應該影響到主流程的邏輯執行,需要事務獨立。亦或是,如果主流程執行出錯了,那麼我們需要觸發一個事件,發送釘釘消息到羣裏進行線上業務監控,需要在主方法邏輯中拋出異常再調用此事件。這時,我們如果使用的是@EventListener
,上述業務場景的實現就是比較麻煩的邏輯了。
爲了解決上述問題,Spring 爲我們提供了兩種方式:
(1)@TransactionalEventListener
註解。
(2) 事務同步管理器TransactionSynchronizationManager
。
本文針對@TransactionalEventListener
進行一下解析。
我們可以從命名上直接看出,它就是個EventListener
,在 Spring4.2+,有一種叫做@TransactionEventListener
的方式,能夠實現在控制事務的同時,完成對對事件的處理。
//被@EventListener標註,表示它能夠監聽事件
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
//表示當前事件跟隨消息發送方事務的出發時機,默認爲消息發送方事務提交之後才進行處理。
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
//true時不論發送方是否存在事務均出發當前事件處理邏輯
boolean fallbackExecution() default false;
//監聽的事件具體類型,還是建議指定一下,避免監聽到子類的一些情況出現
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] value() default {};
//指向@EventListener對應的值
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] classes() default {};
//指向@EventListener對應的值
String condition() default "";
}
public enum TransactionPhase {
// 指定目標方法在事務commit之前執行
BEFORE_COMMIT,
// 指定目標方法在事務commit之後執行
AFTER_COMMIT,
// 指定目標方法在事務rollback之後執行
AFTER_ROLLBACK,
// 指定目標方法在事務完成時執行,這裏的完成是指無論事務是成功提交還是事務回滾了
AFTER_COMPLETION
}
我們知道,Spring 的事件監聽機制(發佈訂閱模型)實際上並不是異步的(默認情況下),而是同步的來將代碼進行解耦。而@TransactionEventListener
仍是通過這種方式,但是加入了回調的方式來解決,這樣就能夠在事務進行 Commited,Rollback… 等時候纔去進行 Event 的處理,來達到事務同步的目的。
三. 實踐及踩坑
針對是事件驅動模型裏面的@TransactionEventListener
與@EventListener
假設兩個業務場景。
新增用戶,關聯角色,增加關聯角色賦權操作記錄。
1.統一事務
:上述三個操作事務一體,無論哪個發生異常,數據統一回滾。
2獨立事務
:上述三個操作事務獨立,事件一旦發佈,後續發生任意異常均不影響。
3.1. 統一事務
用戶新增
@Service
@Slf4j
public class UserServiceImpl implements UserService {
@Autowired
DomainEventPublisher domainEventPublisher;
@Transactional(rollbackFor = Exception.class)
public void createUser(){
//省略非關鍵代碼
save(user);
domainEventPublisher.publishEvent(userEvent);
}
}
用戶角色關聯
@Component
@Slf4j
public class UserEventHandler {
@Autowired
DomainEventPublisher domainEventPublisher;
@Autowired
UserRoleService userRoleService;
@EventListener
public void handleEvent(UserEvent event) {
log.info("接受到用戶新增事件:"+event.toString());
//省略部分數據組裝與解析邏輯
userRoleService.save(userRole);
domainEventPublisher.publishEvent(userRoleEvent);
}
}
用戶角色操作記錄
@Component
@Slf4j
public class UserRoleEventHandler {
@Autowired
UserRoleRecordService userRoleRecordService;
@EventListener
public void handleEvent(UserRoleEvent event) {
log.info("接受到userRole事件:"+event.toString());
//省略部分數據組裝與解析邏輯
userRoleRecordService.save(record);
}
}
以上即爲同一事務下的一個邏輯,任意方法內拋出異常,所有數據的插入邏輯都會回滾。
給出一下結論,@EventListener
標註的方法是被加入在當前事務的執行邏輯裏面的,與主方法事務一體。
踩坑 1:
嚴格意義上來說這裏不算是把主邏輯從業務中拆分出來了,還是在同步的事務中,當然這個也是有適配場景的,大家爲了代碼簡潔性與函數級邏輯清晰可以這麼做。但是這樣做其實不是那麼 DDD,DDD 中應用服務的一個方法即爲一個用例,裏面貫穿了主流程的邏輯,既然是當前系統內強一致性的業務,那就應該在一個應用服務中體現。當然這個是屬於業務邊界的。舉例的場景來看,用戶與賦權顯然不是強一致性的操作,賦權失敗,不應該影響我新增用戶,所以這個場景下做 DDD 改造,不建議使用統一事務。
踩坑 2:
listener 裏面的執行邏輯可能比較耗時,需要做異步化處理,在UserEventHandler
方法上標註@Async
,那麼這裏與主邏輯的方法事務就隔離開了,監聽器內的事務開始獨立,將不會影響到 userService 內的事務。例如其他代碼不變的情況下用戶角色服務代碼修改如下
@Component
@Slf4j
public class UserEventHandler {
@Autowired
DomainEventPublisher domainEventPublisher;
@Autowired
UserRoleService userRoleService;
@EventListener
@Async
public void handleEvent(UserEvent event) {
log.info("接受到用戶新增事件:"+event.toString());
//省略部分數據組裝與解析邏輯
userRoleService.save(userRole);
domainEventPublisher.publishEvent(userRoleEvent);
throw new RuntimeException("製造一下異常");
}
}
發現,用戶新增了,用戶角色關聯關係新增了,但是操作記錄沒有新增。第一個結果好理解,第二個結果就奇怪了把,事件監聽裏面拋了異常,但是居然數據保存成功了。
這裏其實是因爲UserEventHandler
的handleEvent
方法外層爲嵌套@Transactional
,userRoleService.save
操作結束,事務就提交了,後續的拋異常也不影響。爲了保持事務一致,在方法上加一個@Transactional
即可。
3.2. 獨立事務
@EventListener 作爲驅動加載業務分散代碼管理還挺好的。但是在 DDD 層面,事務數據被雜糅在一起,除了問題一層層找也麻煩,而且數據捆綁較多,還是比較建議使用@TransactionalEventListene
用戶新增
@Service
@Slf4j
public class UserServiceImpl implements UserService {
@Autowired
DomainEventPublisher domainEventPublisher;
@Transactional(rollbackFor = Exception.class)
public void createUser(){
//省略非關鍵代碼
save(user);
domainEventPublisher.publishEvent(userEvent);
}
}
用戶角色關聯
@Component
@Slf4j
public class UserEventHandler {
@Autowired
DomainEventPublisher domainEventPublisher;
@Autowired
UserRoleService userRoleService;
@TransactionalEventListener
public void handleEvent(UserEvent event) {
log.info("接受到用戶新增事件:"+event.toString());
//省略部分數據組裝與解析邏輯
userRoleService.save(userRole);
domainEventPublisher.publishEvent(userRoleEvent);
}
}
用戶角色操作記錄
@Component
@Slf4j
public class UserRoleEventHandler {
@Autowired
UserRoleRecordService userRoleRecordService;
@TransactionalEventListener
public void handleEvent(UserRoleEvent event) {
log.info("接受到userRole事件:"+event.toString());
//省略部分數據組裝與解析邏輯
userRoleRecordService.save(record);
}
}
一樣的代碼,把註解從@EventListener
更換爲@TransactionalEventListener
。執行之後發現了一個神奇的問題,用戶角色操作記錄數據沒有入庫!!!
捋一捋邏輯看看,換了個註解,就出現這個問題了,比較一下 · 兩個註解的區別。 @TransactionalEventListener
事務獨立,且默認註解phase
參數值爲 TransactionPhase.AFTER_COMMIT,即爲主邏輯方法事務提交後在執行。而我們知道 spring 中事務的提交關鍵代碼在AbstractPlatformTransactionManager.commitTransactionAfterReturning
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
//斷點處
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
配置文件中添加以下配置
logging:
level:
org:
mybatis: debug
在上述代碼的地方打上斷點,再次執行邏輯。
發現,第一次userService
保存數據進入此斷點,然後進入到userRoleService.save
邏輯,此處不進入斷點,後續的操作記錄的事件處理方法也沒有進入。
在來看一下日誌
- 2021-09-07 19:54:38.166, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
- 2021-09-07 19:54:38.166, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:38.167, DEBUG, [,,], [http-nio-8088-exec-6], o.m.s.t.SpringManagedTransaction - JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@1832a0d9] will be managed by Spring
- 2021-09-07 19:54:38.184, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.430, INFO, [,,], [http-nio-8088-exec-6], com.examp.event.demo.UserEventHandler - 接受到用戶新增事件:com.examp.event.demo.UserEvent@385db2f9
- 2021-09-07 19:54:53.602, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
- 2021-09-07 19:54:53.602, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818] was not registered for synchronization because synchronization is not active
- 2021-09-07 19:54:53.603, DEBUG, [,,], [http-nio-8088-exec-6], o.m.s.t.SpringManagedTransaction - JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@1832a0d9] will be managed by Spring
- 2021-09-07 19:54:53.622, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818]
注意看接受到用戶新增事件
之後的日誌,SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818] was not registered for synchronization because synchronization is not active
說明當前事件是無事務執行的邏輯。再回過頭去看一下@TransactionalEventListener
,默認配置是在事務提交後才進行事件執行的,但是這裏事務都沒有,自然也就不會觸發事件了。
看圖捋一下代碼邏輯
那怎麼解決上面的問題呢?
其實這個東西還是比較簡單的:
-
可以對監聽此事件的邏輯無腦標註
@TransactionalEventListener(fallbackExecution = true)
, 無論事件發送方是否有事務都會觸發事件。 -
在第二個發佈事件的上面標註一個
@Transactional(propagation = Propagation.REQUIRES_NEW)
,切記不可直接標註@Transactional
,這樣因爲 userService 上事務已經提交,而@Transactional
默認事務傳播機制爲Propagation.REQUIRED
, 如果當前沒有事務,就新建一個事務,如果已經存在一個事務,加入到這個事務中。
userService 中的事務還存在,只是已經被提交,無法再加入,也就是會導致操作記錄仍舊無法被插入。
將配置修改爲
logging:
level:
org: debug
可以看到日誌
- 2021-09-07 20:26:29.900, DEBUG, [,,], [http-nio-8088-exec-2], o.s.j.d.DataSourceTransactionManager - Cannot register Spring after-completion synchronization with existing transaction - processing Spring after-completion callbacks immediately, with outcome status 'unknown'
四. DDD 中的事件驅動應用
理清楚 spring 中事件驅動模型之後,我們所要做的就是開始解耦業務邏輯。
通過事件風暴理清楚業務用例,設計完成聚合根【ps:其實我覺得設計聚合根是最難的,業務邊界是需要團隊成員達成共識的地方,不是研發說了算的】
,劃分好業務領域邊界,將原先雜糅在 service 裏面的各個邏輯根據聚合根進行:
1. 對於聚合的每次命令操作,都至少一個領域事 件發佈出去,表示操作的執行結果 2. 每一個領域事件都將被保存到事件存儲中 3. 從資源庫獲取聚合時,將根據發生在聚合上的 事件來重建聚合,事件的重放順序與其產生順序相同 4. 聚合快照:將聚合的某一事件發生時的狀態快 照序列化存儲下來。
五. 總結
本文着重介紹了事件驅動模型的概念與應用,並對實際可能出現的業務邏輯做了分析與避坑。最後對於 DDD 中如何進行以上事件驅動模型進行了分析。
當然我覺得到這裏大家應該對事件模型有了一個清晰的認知了,但是對於 DDD 中應用還是有些模糊。千言萬語匯成一句話:與聚合核心邏輯有關的,走應用服務編排,與核心邏輯無關的,走事件驅動模型,採用獨立事務模式。至於數據一致性,就根據大家自己相關的業務來決定了,方法與踩坑都告訴了大家了。
你我都是架構師!!!
六. 引用及參考
@TransactionalEventListener 的使用和實現原理 [6]
【小家 Spring】從 Spring 中的 (ApplicationEvent) 事件驅動機制出發,聊聊【觀察者模式】【監聽者模式】【發佈訂閱模式】【消息隊列 MQ】【EventSourcing】...[7]
七. 聯繫我
文中如有不正確之處,歡迎指正,寫文不易,點個贊吧,麼麼噠~
釘釘:louyanfeng25
微信:baiyan_lou
公衆號:柏炎大叔
引用鏈接
[1]
一文帶你落地 DDD: https://juejin.cn/post/7004002483601145863
[2]
一文帶你落地 DDD: https://juejin.cn/post/7004002483601145863
[3]
DDD 落地之事件驅動模型: https://juejin.cn/post/7005175434555949092
[4]
DDD 落地之倉儲: https://juejin.cn/post/7006595886646034463
[5]
DDD 落地之架構分層: https://juejin.cn/post/7007382308667785253
[6]
@TransactionalEventListener 的使用和實現原理: https://blog.csdn.net/qq_41378597/article/details/105748703
[7]
【小家 Spring】從 Spring 中的 (ApplicationEvent) 事件驅動機制出發,聊聊【觀察者模式】【監聽者模式】【發佈訂閱模式】【消息隊列 MQ】【EventSourcing】...: https://blog.csdn.net/f641385712/article/details/88806639
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/6vIZVSHi5cJHGUJT66gvxw