深入剖析 Spring WebFlux

作者:vivo 互聯網服務器團隊 - Zhou Changqing

一、WebFlux 簡介

WebFlux 是 Spring Framework5.0 中引入的一種新的反應式 Web 框架。通過 Reactor 項目實現 Reactive Streams 規範,完全異步和非阻塞框架。本身不會加快程序執行速度,但在高併發情況下藉助異步 IO 能夠以少量而穩定的線程處理更高的吞吐,規避文件 IO / 網絡 IO 阻塞帶來的線程堆積。

1.1 WebFlux 的特性

WebFlux 具有以下特性:

1.2 WebFlux 的設計目標

二、Spring WebFlux 組件介紹

2.1 HTTPHandler

一個簡單的處理請求和響應的抽象,用來適配不同 HTTP 服務容器的 API。

圖片

2.2 WebHandler

一個用於處理業務請求抽象接口,定義了一系列處理行爲。相關核心實現類如下;

圖片

2.3 DispatcherHandler

請求處理的總控制器,實際工作是由多個可配置的組件來處理。

圖片

WebFlux 是兼容 Spring MVC 基於 @Controller,@RequestMapping 等註解的編程開發方式的,可以做到平滑切換。

2.4 Functional Endpoints

這是一個輕量級函數編程模型。是基於 @Controller,@RequestMapping 等註解的編程模型的替代方案,提供一套函數式 API 用於創建 Router,Handler 和 Filter。調用處理組件如下:

圖片

簡單的 RouterFuntion 路由註冊和業務處理過程:

@Bean
public RouterFunction<ServerResponse> initRouterFunction() {
    return RouterFunctions.route()
        .GET("/hello/{name}", serverRequest -> {
            String name = serverRequest.pathVariable("name");
            return ServerResponse.ok().bodyValue(name);
        }).build();
}

請求轉發處理過程:

圖片

2.5 Reactive Stream

這是一個重要的組件,WebFlux 就是利用 Reactor 來重寫了傳統 Spring MVC 邏輯。其中 Flux 和 Mono 是 Reactor 中兩個關鍵概念。掌握了這兩個概念才能理解 WebFlux 工作方式。

Flux 和 Mono 都實現了 Reactor 的 Publisher 接口, 屬於時間發佈者,對消費者提供訂閱接口,當有事件發生的時候,Flux 或者 Mono 會通過回調消費者的相應的方法來通知消費者相應的事件。這就是所謂的響應式編程模型。

Mono 工作流程圖

圖片

只會在發送出單個結果後完成。

Flux 工作流程圖

圖片

發送出零個或者多個,可能無限個結果後才完成。

對於流式媒體類型:application/stream+json 或者 text/event-stream ,可以讓調用端獲得服務器滾動結果。
對於非流類型:application/json  WebFlux 默認JSON編碼器會將序列化的JSON 一次性刷新到網絡,這並不意味着阻塞,因爲結果Flux<?> 是以反應式方式寫入網絡的,沒有任何障礙。

三、WebFlux 工作原理

3.1 組件裝配過程

圖片

流程相關源碼解析 -WebFluxAutoConfiguration

@Configuration
//條件裝配 只有啓動的類型是REACTIVE時加載
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
//只有存在 WebFluxConfigurer實例  時加載
@ConditionalOnClass(WebFluxConfigurer.class)
//在不存在  WebFluxConfigurationSupport實例時 加載
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
//在之後裝配
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class,
      CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })
//自動裝配順序
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {
   @Configuration
   @EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class })
   //接口編程 在裝配WebFluxConfig 之前要先 裝配EnableWebFluxConfiguration
   @Import({ EnableWebFluxConfiguration.class })
   public static class WebFluxConfig implements WebFluxConfigurer {
      //隱藏部分源碼
     /**
     * Configuration equivalent to {@code @EnableWebFlux}.
     */
   } 
    @Configuration
    public static class EnableWebFluxConfiguration
            extends DelegatingWebFluxConfiguration {
        //隱藏部分代碼
    }
    @Configuration
    @ConditionalOnEnabledResourceChain
    static class ResourceChainCustomizerConfiguration {
        //隱藏部分代碼
    }
    private static class ResourceChainResourceHandlerRegistrationCustomizer
            implements ResourceHandlerRegistrationCustomizer {
        //隱藏部分代碼
    }

WebFluxAutoConfiguration 自動裝配時先自動

裝配 EnableWebFluxConfiguration 

而 EnableWebFluxConfiguration->

-> DelegatingWebFluxConfiguration 

-> WebFluxConfigurationSupport。

最終 WebFluxConfigurationSupport 不僅配置 DispatcherHandler 還同時配置了其他很多 WebFlux 核心組件包括 異常處理器 WebExceptionHandler, 映射處理器處理器 HandlerMapping,請求適配器 HandlerAdapter,響應處理器 HandlerResultHandler 等。

DispatcherHandler 創建初始化過程如下;

public class WebFluxConfigurationSupport implements ApplicationContextAware {
   //隱藏部分代碼
   @Nullable
   public final ApplicationContext getApplicationContext() {
      return this.applicationContext;
   }
  //隱藏部分代碼
   @Bean
   public DispatcherHandler webHandler() {
      return new DispatcherHandler();
   }
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
   @Nullable
   private List<HandlerMapping> handlerMappings;
   @Nullable
   private List<HandlerAdapter> handlerAdapters;
   @Nullable
   private List<HandlerResultHandler> resultHandlers;
     @Override
   public void setApplicationContext(ApplicationContext applicationContext) {
    initStrategies(applicationContext);
   }
   protected void initStrategies(ApplicationContext context) {
       //注入handlerMappings
      Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerMapping.class, true, false);
      ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
      AnnotationAwareOrderComparator.sort(mappings);
      this.handlerMappings = Collections.unmodifiableList(mappings);
     //注入handlerAdapters
      Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerAdapter.class, true, false);
      this.handlerAdapters = new ArrayList<>(adapterBeans.values());
      AnnotationAwareOrderComparator.sort(this.handlerAdapters);
      //注入resultHandlers
      Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerResultHandler.class, true, false);
      this.resultHandlers = new ArrayList<>(beans.values());
      AnnotationAwareOrderComparator.sort(this.resultHandlers);
   }

流程相關源碼解析

HTTPHandlerAutoConfiguration

上面已講解過 WebFlux 核心組件裝載過程,那麼這些組件又是什麼時候注入到對應的容器上下文中的呢?其實是在刷新容器上下文時注入進去的。

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh

public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
      implements ConfigurableWebServerApplicationContext {
   @Override
   protected void onRefresh() {
      super.onRefresh();
      try {
         createWebServer();
      }
      catch (Throwable ex) {
         throw new ApplicationContextException("Unable to start reactive web server", ex);
      }
   }
   private void createWebServer() {
      WebServerManager serverManager = this.serverManager;
      if (serverManager == null) {
         String webServerFactoryBeanName = getWebServerFactoryBeanName();
         ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
         boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
         // 這裏創建容器管理時注入httpHandler
         this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
         getBeanFactory().registerSingleton("webServerGracefulShutdown",
               new WebServerGracefulShutdownLifecycle(this.serverManager));
         // 註冊一個 web容器啓動服務類,該類繼承了SmartLifecycle
         getBeanFactory().registerSingleton("webServerStartStop",
               new WebServerStartStopLifecycle(this.serverManager));
      }
      initPropertySources();
   }
   protected HttpHandler getHttpHandler() {
    String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
    if (beanNames.length == 0) {
      throw new ApplicationContextException(
          "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
    }
    if (beanNames.length > 1) {
      throw new ApplicationContextException(
          "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
              + StringUtils.arrayToCommaDelimitedString(beanNames));
    }
        //容器上下文獲取httpHandler
    return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
  }

而這個 HTTPHandler 是

由 HTTPHandlerAutoConfiguration 裝配進去的。

@Configuration
@ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnMissingBean(HttpHandler.class)
@AutoConfigureAfter({ WebFluxAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class HttpHandlerAutoConfiguration {
   @Configuration
   public static class AnnotationConfig {
      private ApplicationContext applicationContext;
      public AnnotationConfig(ApplicationContext applicationContext) {
         this.applicationContext = applicationContext;
      }
      //構建WebHandler
      @Bean
      public HttpHandler httpHandler() {
         return WebHttpHandlerBuilder.applicationContext(this.applicationContext)
               .build();
      }
   }

流程相關源碼解析 - web 容器

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer 。在創建 WebServerManager 容器管理器時會獲取對應 web 容器實例,並注入響應的 HTTPHandler。

class WebServerManager {
   private final ReactiveWebServerApplicationContext applicationContext;
   private final DelayedInitializationHttpHandler handler;
   private final WebServer webServer;
   WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
         Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
      this.applicationContext = applicationContext;
      Assert.notNull(factory, "Factory must not be null");
      this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
      this.webServer = factory.getWebServer(this.handler);
   }
}

以 Tomcat 容器爲例展示創建過程,使用的是 TomcatHTTPHandlerAdapter 來連接 Servlet 請求到 HTTPHandler 組件。

public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory {
    //隱藏部分代碼   
    @Override
    public WebServer getWebServer(HttpHandler httpHandler) {
        if (this.disableMBeanRegistry) {
            Registry.disableRegistry();
        }
        Tomcat tomcat = new Tomcat();
        File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
        tomcat.setBaseDir(baseDir.getAbsolutePath());
        Connector connector = new Connector(this.protocol);
        connector.setThrowOnFailure(true);
        tomcat.getService().addConnector(connector);
        customizeConnector(connector);
        tomcat.setConnector(connector);
        tomcat.getHost().setAutoDeploy(false);
        configureEngine(tomcat.getEngine());
        for (Connector additionalConnector : this.additionalTomcatConnectors) {
            tomcat.getService().addConnector(additionalConnector);
        }
        TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);
        prepareContext(tomcat.getHost(), servlet);
        return getTomcatWebServer(tomcat);
    }
}

最後 Spring 容器加載後通過 SmartLifecycle 實現類

WebServerStartStopLifecycle 來啓動 Web 容器。

WebServerStartStopLifecycle 註冊過程詳見:

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer

3.2 完整請求處理流程

圖片

(引用自:https://blog.csdn.net)

該圖給出了一個 HTTP 請求處理的調用鏈路。是採用 Reactor Stream 方式書寫,只有最終調用 subscirbe 才真正執行業務邏輯。基於 WebFlux 開發時要避免 controller 中存在阻塞邏輯。列舉下面例子可以看到 Spring MVC 和 Spring Webflux 之間的請求處理區別。

@RestControllerpublic
class TestController {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @GetMapping("sync")
    public String sync() {
        logger.info("sync method start");
        String result = this.execute();
        logger.info("sync method end");
        return result;
    }
    @GetMapping("async/mono")
    public Mono<String> asyncMono() {
        logger.info("async method start");
        Mono<String> result = Mono.fromSupplier(this::execute);
        logger.info("async method end");
        return result;
    }
    private String execute() {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }
}

日誌輸出

2021-05-31 20:14:52.384  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method start
2021-05-31 20:14:57.385  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method end
2021-05-31 20:15:09.659  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method start
2021-05-31 20:15:09.660  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method end

從上面例子可以看出 sync() 方法阻塞了請求,而 asyncMono() 沒有阻塞請求並立刻返回的。asyncMono() 方法具體業務邏輯 被包裹在了 Mono 中 Supplier 中的了。當 execute 處理完業務邏輯後通過回調方式響應給瀏覽器。

四、存儲支持

一旦控制層使用了 Spring Webflux 則安全認證層、數據訪問層都必須使用 Reactive API 才真正實現異步非阻塞。

NOSQL Database

Relational Database

五、總結

關於 Spring MVC 和 Spring WebFlux 測評很多,本文引用下做簡單說明。參考:《Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC》。

基本依賴

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-r2dbc</artifactId> 
</dependency> 
<!-- r2dbc 連接池 --> 
<dependency> 
    <groupId >io.r2dbc</groupId> 
    <artifactId>r2dbc-pool</artifactId> 
</dependency> 
<!--r2dbc mysql 庫--> 
<dependency> 
    <groupId>dev.miku</groupId> 
    <artifactId>r2dbc- mysql</artifactId> 
</dependency> 
<!--自動配置需要引入一個嵌入式數據庫類型對象--> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-jdbc</artifactId> 
</dependency>
<!-- 反應方程式 web 框架 webflux--> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-webflux</artifactId> 
</dependency>

相同數據下效果如下**;**

圖片

Spring MVC + JDBC 在低併發下表現最好,但 WebFlux + R2DBC 在高併發下每個處理請求使用的內存最少。

圖片

Spring WebFlux + R2DBC 在高併發下,吞吐量表現優異。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/H6rkH9RkMaknelK2YPmo3w