觀察者模式在 spring 中的應用

1、觀察者模式簡介

1.1 定義

指多個對象間存在一對多的依賴關係,當一個對象的狀態發生改變時,所有依賴於它的對象都得到通知並被自動更新。這種模式有時又稱作發佈 - 訂閱模式、模型 - 視圖模式,它是對象行爲型模式。

1.2 角色介紹

在觀察者模式中,有以下幾個角色。

主題也叫被觀察者(Subject):

觀察者(Observer):
觀察者接收到消息後,即進行更新操作,對接收到的信息進行處理。

具體的被觀察者(ConcreteSubject):
定義被觀察者自己的業務邏輯,同時定義對哪些事件進行通知。

具體的觀察者(ConcreteObserver):
具體的觀察者,每個觀察者接收到消息後的處理反應是不同的,每個觀察者都有自己的處理邏輯。

1.3 觀察者模式的適用場景

**2、**觀察者模式在 Spring 中的應用

2.1 spring 的事件監聽機制

Spring 事件機制是觀察者模式的實現。ApplicationContext 中事件處理是由 ApplicationEvent 類和 ApplicationListener 接口來提供的。如果一個 Bean 實現了 ApplicationListener 接口,並且已經發布到容器中去,每次 ApplicationContext 發佈一個 ApplicationEvent 事件,這個 Bean 就會接到通知。ApplicationEvent 事件的發佈需要顯示觸發,要麼 Spring 觸發,要麼我們編碼觸發。spring 內置事件由 spring 觸發。我們先來看一下,如何自定義 spring 事件,並使其被監聽和發佈。

2.1.1 事件

事件,ApplicationEvent,該抽象類繼承了 EventObject,EventObject 是 JDK 中的類,並建議所有的事件都應該繼承自 EventObject。

public abstract class ApplicationEvent extends EventObject {
    private static final long serialVersionUID = 7099057708183571937L;
    private final long timestamp = System.currentTimeMillis();
    public ApplicationEvent(Object source) {
        super(source);
    }
    public final long getTimestamp() {
        return this.timestamp;
    }
}
2.1.2 監聽器

ApplicationListener,是一個接口,該接口繼承了 EventListener 接口。EventListener 接口是 JDK 中的,建議所有的事件監聽器都應該繼承 EventListener。

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    void onApplicationEvent(E var1);
}
2.1.3 事件發佈器

ApplicationEventPublisher,ApplicationContext 繼承了該接口,在 ApplicationContext 的抽象實現類 AbstractApplicationContext 中做了實現下面我們來看一下

org.springframework.context.support.AbstractApplicationContext#publishEvent(java.lang.Object, org.springframework.core.ResolvableType)

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
   Assert.notNull(event, "Event must not be null");
   ApplicationEvent applicationEvent;
   if (event instanceof ApplicationEvent) {
      applicationEvent = (ApplicationEvent) event;
   }
   else {
      applicationEvent = new PayloadApplicationEvent<>(this, event);
      if (eventType == null) {
         eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
      }
   }
   if (this.earlyApplicationEvents != null) {
      this.earlyApplicationEvents.add(applicationEvent);
   }
   else {
//獲取當前注入的發佈器,執行發佈方法
      getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
   }
   if (this.parent != null) {
      if (this.parent instanceof AbstractApplicationContext) {
         ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
      }
      else {
         this.parent.publishEvent(event);
      }
   }
}

我們可以看到,AbstractApplicationContext 中 publishEvent 方法最終執行發佈事件的是 ApplicationEventMulticaster#multicastEvent 方法,下面我們再來一起看一下 multicastEvent 方法:

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
   ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
   Executor executor = getTaskExecutor();
//拿到所有的監聽器,如果異步執行器不爲空,異步執行
   for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
      if (executor != null) {
         executor.execute(() -> invokeListener(listener, event));
      }
      else {
//執行監聽方法
         invokeListener(listener, event);
      }
   }
}
    protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
        ErrorHandler errorHandler = getErrorHandler();
        if (errorHandler != null) {
            try {
                doInvokeListener(listener, event);
            }
            catch (Throwable err) {
                errorHandler.handleError(err);
            }
        }
        else {
            doInvokeListener(listener, event);
        }
    }
    private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
        try {
            listener.onApplicationEvent(event);
        }
        catch (ClassCastException ex) {
            String msg = ex.getMessage();
            if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
                Log logger = LogFactory.getLog(getClass());
                if (logger.isTraceEnabled()) {
                    logger.trace("Non-matching event type for listener: " + listener, ex);
                }
            }
            else {
                throw ex;
            }
        }
    }

上面介紹了非 spring 內置的事件發佈和監聽執行流程。總結一下

2.2 spring 事件實現原理

上面我們講到了 spring 事件的發佈,那麼 spring 事件發佈之後,spring 是如何根據事件找到事件對應的監聽器呢?我們一起來探究一下。

spring 的容器初始化過程想必大家都已十分了解,這裏就不過多贅述,我們直接看 refresh 方法在 refresh 方法中,有這樣兩個方法,initApplicationEventMulticaster() 和 registerListeners()

我們先來看 initApplicationEventMulticaster() 方法

2.2.1 initApplicationEventMulticaster()

org.springframework.context.support.AbstractApplicationContext#initApplicationEventMulticaster

public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";
protected void initApplicationEventMulticaster() {
//獲得beanFactory
   ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//BeanFactory中是否有ApplicationEventMulticaster
   if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
      this.applicationEventMulticaster =
            beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
      if (logger.isTraceEnabled()) {
         logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
      }
   }
   else {
//如果BeanFactory中不存在,就創建一個SimpleApplicationEventMulticaster
      this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
      beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
      if (logger.isTraceEnabled()) {
         logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
               "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
      }
   }
}

上述代碼我們可以看出,spring 先從 BeanFactory 中獲取 applicationEventMulticaster 如果爲空,則直接創建 SimpleApplicationEventMulticaster

2.2.2 registerListeners()

org.springframework.context.support.AbstractApplicationContext#registerListeners

registerListeners 是將各種實現了 ApplicationListener 的監聽器註冊到 ApplicationEventMulticaster 事件廣播器中

protected void registerListeners() {
   for (ApplicationListener<?> listener : getApplicationListeners()) {
      getApplicationEventMulticaster().addApplicationListener(listener);
   }
   String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
   for (String listenerBeanName : listenerBeanNames) {
//把監聽器註冊到事件發佈器上
      getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
   }
   Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
   this.earlyApplicationEvents = null;
   if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
//如果內置監聽事件集合不爲空
      for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
//執行spring內置的監聽方法
         getApplicationEventMulticaster().multicastEvent(earlyEvent);
      }
   }
}

這裏解釋一下 earlyApplicationListeners

earlyApplicationListeners 的本質還是 ApplicationListener。Spring 單例 Ban 的實例化是在 Refresh 階段實例化的,那麼用戶自定義的一些 ApplicationListener 組件自然也是在這個階段才初始化,但是 Spring 容器啓動過程中,在 Refresh 完成之前還有很多事件:如 Spring 上下文環境準備等事件,這些事件又是 Spring 容器啓動必須要監聽的。所以 Spring 定義了一個 earlyApplicationListeners 集合,這個集合中的 Listener 在 factories 文件中定義好,在容器 Refresh 之前預先實例化好,然後就可以監聽 Spring 容器啓動過程中的所有事件。

當 registerListeners 方法執行完成, 我們的監聽器已經添加到多播器 SimpleApplicationEventMulticaster 中了,並且 earlyEvent 早期事件也已經執行完畢。但是我們發現, 如果自定義了一個監聽器去監聽 spring 內置的事件, 此時並沒有被執行,那我們註冊的監聽器是如何被執行的呢?答案在 finishRefresh 方法中。

2.2.3 finishRefresh

org.springframework.context.support.AbstractApplicationContext#finishRefresh

protected void finishRefresh() {
   clearResourceCaches();
   initLifecycleProcessor();
   getLifecycleProcessor().onRefresh();
   //容器中的類全部初始化完畢後,觸發刷新事件
   publishEvent(new ContextRefreshedEvent(this));
   LiveBeansView.registerApplicationContext(this);
}

如果我們想要實現在 spring 容器中所有 bean 創建完成後做一些擴展功能,我們就可以實現 ApplicationListener 這樣我們就可以實現其功能了。至此,Spring 中同步的事件監聽發佈模式我們就講解完了,當然 Spring 還支持異步的消息監聽執行機制。

2.2.4 spring 中異步的監聽執行機制

我們回過頭來看一下 ApplicationEventMulticaster#pushEvent 方法

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
   Assert.notNull(event, "Event must not be null");
   ApplicationEvent applicationEvent;
   if (event instanceof ApplicationEvent) {
      applicationEvent = (ApplicationEvent) event;
   }
   else {
      applicationEvent = new PayloadApplicationEvent<>(this, event);
      if (eventType == null) {
         eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
      }
   }
   if (this.earlyApplicationEvents != null) {
      this.earlyApplicationEvents.add(applicationEvent);
   }
   else {
    //獲取當前注入的發佈器,執行發佈方法
      getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
   }
   if (this.parent != null) {
      if (this.parent instanceof AbstractApplicationContext) {
         ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
      }
      else {
         this.parent.publishEvent(event);
      }
   }

最終執行發佈事件的是 ApplicationEventMulticaster#multicastEvent 方法,下面我們再來一起看一下 multicastEvent 方法

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
   ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
   Executor executor = getTaskExecutor();
//拿到所有的監聽器,如果異步執行器不爲空,異步執行
   for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
      if (executor != null) {
         executor.execute(() -> invokeListener(listener, event));
      }
      else {
//執行監聽方法
         invokeListener(listener, event);
      }
   }
}

可以看到,異步事件通知主要依靠 SimpleApplicationEventMulticaster 類中的 Executor 去實現的,如果這個變量不配置的話默認事件通知是同步的, 否則就是異步通知了,要實現同時支持同步通知和異步通知就得從這裏下手;我們上文已經分析過了在 initApplicationEventMulticaster 方法中有這樣一段代碼

if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
   this.applicationEventMulticaster =
         beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
   if (logger.isTraceEnabled()) {
      logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
   }
}

如果 BeanFactory 中已經有了 SimpleApplicationEventMulticaster 則不會重新創建,那麼我們可以在 spring 中註冊一個 SimpleApplicationEventMulticaster 並且向其中注入對應的 Executor 這樣我們就可以得到一個異步執行監聽的 SimpleApplicationEventMulticaster 了,我們的通知就會通過 Executor 異步執行。這樣可以大大提高事件發佈的效率。

在 springboot 項目中我們可以增加一個配置類來實現

@Configuration
@EnableAsync
public class Config {
    @Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
    public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster(){
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
        return simpleApplicationEventMulticaster;
    }
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(300);
        executor.setThreadNamePrefix("thread-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

spring 項目中我們也可以增加如下 xml 配置

<!--定義事件異步處理-->
<bean
         >
        <!-- 線程池維持處於Keep-alive狀態的線程數量。如果設置了allowCoreThreadTimeOut爲true,該值可能爲0。
            併發線程數,想達到真正的併發效果,最好對應CPU的線程數及核心數 -->
        <property  />
        <!-- 最大線程池容量 -->
        <property  />
        <!-- 超過最大線程池容量後,允許的線程隊列數 -->
        <property  />
        <!-- 線程池維護線程所允許的空閒時間 .單位毫秒,默認爲60s,超過這個時間後會將大於corePoolSize的線程關閉,保持corePoolSize的個數 -->
        <property  />
        <!-- 允許核心線程超時: false(默認值)不允許超時,哪怕空閒;true則使用keepAliveSeconds來控制等待超時時間,最終corePoolSize的個數可能爲0 -->
        <property  />
        <!-- 線程池對拒絕任務(無線程可用)的處理策略 -->
        <property >
            <bean />
            <!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主線程直接執行該任務,執行完之後嘗試添加下一個任務到線程池中 -->
            <!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
        </property>
    </bean>
    <!--名字必須是applicationEventMulticaster,因爲AbstractApplicationContext默認找個-->
    <bean>
        <!--注入任務執行器 這樣就實現了異步調用-->
        <property ></property>
 </bean>

3、小結

本文主要講解了觀察者模式在 spring 中的應用及事件監聽機制,JDK 也有實現提供事件監聽機制 Spring 的事件機制也是基於 JDK 來擴展的。Spring 的事件機制默認是同步阻塞的,想要提升對應的效率要考慮異步事件。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/yEYaIdmbsaY2srjRIeYwEQ