Spring Cloud 與響應式微服務

衆所周知,Spring Cloud 服務間的調用方式是使用的 RESTful API,我們平時都是 R estTemplate 或 Feign 來調用的,這兩種方式其實說到底都是同步的方式。

Spring 支持響應式編程。那麼我們能不能在 Spring Cloud 的服務間調用的時候用這種異步非阻塞的方式呢?隨着 Spring Cloud Finchley 的發佈,這一切均可以實現。

本文我們就用 WebFlux、Spring Data Reactive 從頭到腳構建一個響應式的微服務。

準備

爲了完成這個示例,我們需要:

服務註冊中心:我們要用到服務發現和服務註冊,這裏用一個單節點的 Eureka Server 來做。

兩個微服務:帳戶服務和客戶服務。每個微服務都有自己的數據庫,且對外暴露簡單的響應式 API,用於檢索和存儲數據。另外,客戶服務與帳戶服務可以相互通信,以獲取客戶的所有帳戶,並通過客戶服務 API 方法返回。

數據庫:因爲現在還沒幾個數據庫有實現了反應式數據訪問的可用驅動,Spring Data Reactive 目前僅支持 MangoDB、Redis 和 Cassandra,簡單起見我們就用 MangoDB。MangoDB 我這裏使用 Docker 來創建,一切均用默認配置(主要是懶 ,這樣就不用去改 Spring Boot 的配置文件了)

docker run -d --name mongo -p 27017:27017 mongo

實戰

服務註冊中心

新建一個基本的 Spring Boot 工程,命名爲 eureka-server。

pom.xml 中依賴如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

配置文件 application.yml 配置如下:

spring:
  application:
    name: eureka-server
eureka:
  client:
    register-with-eureka: false
    fetch-registry: false
    service-url:
      defaultZone: http://localhost:8000/eureka/
server:
  port: 8000

在啓動類上加上 @EnableEurekaServer 註解:

@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

賬戶服務

新建一個基本的 Spring Boot 工程,命名爲 cloud-account。

如果是使用 Spring Initializr 話,引入 Lombok、Reactive Web、Reactive MongoDB 和 Eureka Discovery 這四個依賴。

最終 pom.xml 中應有以下依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

配置文件 application.yml

spring:
  application:
    name: cloud-account
server:
  port: 8100
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8000/eureka/

創建賬戶的實體類,其中 @AllArgsConstructor、@NoArgsConstructor 和 @Data 都是 Lombok 提供註解,不瞭解的可以自行學習,這裏不多說了。

@AllArgsConstructor
@NoArgsConstructor
@Data
@Document(collection = "accounts")
public class Account {
    @Id
    private String id;
    private String customerId;
    private Double amount;
}

我們使用 Spring Data Reactive。與非響應式 Spring Data 的 CrudReposity 對應的,響應式的 Spring Data 也提供了相應的 Repository 庫:ReactiveCrudReposity,我們也可以使用它的子接口 ReactiveMongoRepository。

public interface AccountMongoReactiveRepository extends ReactiveCrudRepository<Account, String> {
    Flux<Account> findByCustomerId(String customerId);
}

爲賬戶服務創建對應的 Controller,這裏只簡單提供一個查詢客戶的所有賬戶的接口。爲了在後面測試負載均衡,這裏加上了調用時間戳的打印。

@RequestMapping("/account")
@RestController
public class AccountController {
    @Autowired
    private AccountMongoReactiveRepository repository;
    @GetMapping("/customer/{customer}")
    public Flux<Account> findByCustomer(@PathVariable(name = "customer") String customer) {
        System.out.println("Customer => " + customer + " [ " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss.SSS")) + " ]");
        return repository.findByCustomerId(customer);
    }
}

客戶服務

新建一個基本的 Spring Boot 工程,命名爲 cloud-customer,POM 依賴和之前的 cloud-account 的一模一樣。

配置文件如下,僅是改了服務名和端口號:

spring:
  application:
    name: cloud-customer
server:
  port: 8200
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8000/eureka/

創建一個 Customer 的實體類:

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "customers")
public class Customer {
    @Id
    private String id;
    private String name;
    private String mobile;
}

數據訪問層直接繼承 ReactiveCrudRepository,我們便有了基本的 CRUD 能力:

public interface CustomerMongoReactiveRepository extends ReactiveCrudRepository<Customer, String> {
}

因爲我們只是示例,不做複雜的業務邏輯,所以省略了 Service 層,在 Controller 裏邊直接將 CRUD 的操作代理給了 Repository。

@RestController
@RequestMapping("/customer")
public class CustomerController {
    @Autowired private CustomerMongoReactiveRepository repository;
    @Autowired private WebClient.Builder webClientBuilder;
    @GetMapping("")
    public Flux<Customer> list() {
        return repository.findAll();
    }
    @GetMapping("/{id}")
    public Mono<Customer> get(@PathVariable String id) {
        return repository.findById(id);
    }
    @PostMapping("")
    public Mono<Customer> create(@RequestBody Customer customer) {
        return repository.save(customer);
    }
    @PutMapping("/{id}")
    public Mono<Customer> update(@PathVariable("id") String id, @RequestBody Customer customer) {
        customer.setId(id);
        return repository.save(customer);
    }
    @DeleteMapping("/{id}")
    public Mono<Void> delete(@PathVariable String id) {
        return repository.deleteById(id);
    }
}

到這裏,我們的服務註冊中心和兩個微服務就都好了。但是,這兩個微服務之間還是完全獨立的,沒有相互間的服務調用。現在我們來實現之前說的需求:客戶服務與帳戶服務可以相互通信,以獲取客戶的所有帳戶,並通過客戶服務 API 方法返回。

首先創建一個 Java Config,這裏我們不再使用 RestTemplate 來調用服務,而是 WebClient。這個配置看起來和註冊 RestTemplate 時差不多,但是要注意這裏註冊的 Bean 是 WebClient.Builder。

@Configuration
public class WebClientConfig {
    @Bean
    @LoadBalanced
    public WebClient.Builder loadBalancedWebClientBuilder() {
        return WebClient.builder();
    }
}

除了這種寫法,還有一種寫法是:

public class MyClass {
    @Autowired
    private LoadBalancerExchangeFilterFunction lbFunction;
    public Mono<String> doOtherStuff() {
        return WebClient.builder().baseUrl("http://cloud-account/account")
            .filter(lbFunction)
            .build()
            .get()
            .uri("")
            .retrieve()
            .bodyToMono(String.class);
    }
}

下邊的是錯誤的寫法,會拋出異常:

@Bean
@LoadBalanced
public WebClient loadBalancedWebClient() {
    return WebClient.builder().baseUrl("http://cloud-account/account").build();
}

然後在 CustomerController 實現這個端點:

@GetMapping("/{id}/account")
public Flux<Account> getAllAccounts(@PathVariable String id) {
    return webClientBuilder.baseUrl("http://cloud-account/account/").build()
        .get().uri("/customer/" + id)
        .retrieve()
        .bodyToFlux(Account.class);
}

這裏需要在 cloud-customer 裏創建一個 DTO Account,因爲和 cloud-account 裏的完全一樣,就省略了。

測試

同時啓動兩個 cloud-account 服務:

然後不斷請求 http://localhost:8200/customer/5ae15fa640f1687f200d8941/account 接口,從下圖的時間戳可以看出 WebClient 會輪流請求兩個服務,達到了負載均衡的效果。

總結

我們從服務提供、服務調用、數據訪問三個方面均使用了響應式編程(Reactive Programming),可以說是做到了 Full Reactive Stack Backend。相信你對響應式編程及其在 Web 應用、微服務架構中如何發揮作用有了更多的體會,本文實戰是比較基礎的,因爲切換到響應式思維方式並非易事,希望能夠通過上手編寫代碼體會響應式編程的感覺。

作者:Yibo

來源:haoyizebo.com/posts/a0cb2c47/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/xa4i81vEeyhdRcuLYg3Ptw