響應式編程和協程在 Java 語言的應用
作者簡介
董騏瑞,2018 年加入貝殼找房人工智能技術中心,先後負責推薦系統、搜索推薦,目前是智能匹配組、廣告投放工程負責人。
引言
現代編程中,對於高併發場景不可避免地會使用多線程和異步處理,但是多線程的大規模使用並不會無成本地提升運行效率,隨着線程數的增加,內存的佔用量和上下文切換的開銷都不能忽視。
本文針對 Java 語言的高併發場景,提出了進一步優化性能的技術原理和實現思路,通過 WebFlux 和 Quasar 的合理應用,減少硬件資源佔用,提高資源利用效率,對於提升服務性能收益顯著。
文章也展開介紹了響應式編程和協程技術在 Java 語言的技術現狀、實現方式以及性能表現。
文章最後總結了當前技術背景下,應用該技術的挑戰和難度。
概述
WebFlux
爲什麼使用響應式編程?
首先從 CPU 說起,CPU 的職責是爲了解決計算問題,由於 CPU 的運算速度要比內存讀寫速度快很多,爲了解決這種速度不匹配的矛盾,引入了 CPU 多級緩存。簡單地說,計算機工作時有多個任務,例如 A、B、C 三個程序,對於計算機來說,可能是三個進程,CPU 會把自己的工作劃分切片,每個切片處理一段任務,處理完畢時切換到下一個任務,最大化自己的利用效率。對於進程也是如此,爲了能同時處理更多任務,充分利用多核 CPU 的計算能力,會新建很多線程,期望併發編程最大化利用程序可以使用的計算資源。
然而併發不能完全解決問題,Web 應用往往伴隨着 I/O,例如數據庫請求或網絡調用,這時如果 I/O 阻塞了當前線程,使其處於等待數據的狀態,就很難提高資源的利用效率。當然,可以使用異步回調繞過問題,但不是所有的異步都是非阻塞的,同時,大量的回調不易編寫,容易導致代碼難以維護和閱讀。
此外,併發還涉及到 CPU 上下文切換的成本。實際上,上下文切換的成本並不低。一般情況下,CPU 指令運算只需要 0.38ns,而一次上下文切換卻需要 1500ns。所謂的上下文切換並沒有進行任何有效計算,只是切換了不同進程的寄存器和內存狀態,此外還破壞了緩存,而這令後續的計算更加耗時。舉個例子:把 CPU 計算想象成自己工作的狀態,一天的工作時間不會只做一項事情,可能做 A 工作 1 小時,B 工作 2 小時,C 工作 1 小時…… 如果從事 A 工作 1 分鐘,保存當前進度後從事 B 工作 2 分鐘,再切換工作臺從事 C 工作 30 秒,可能會讓人崩潰。
上面提到的上下文切換隻是部分成本,多線程本身還有內存成本,所以也是爲什麼線程數目配置不合理可能會導致 OOM。
早期的 Java 框架將 Web 請求綁定到 Servlet API 中的線程使得 Web 應用可以並行處理請求。但是,這有一個缺點,Java 的線程是以系統進程的形式實現的,這就意味着線程的切換會伴隨着大量的上下文切換成本。這在早年的 Web 服務中並不是主要的矛盾,而在企業應用要求苛刻的今天就是一個需要解決的矛盾了。
通過增加線程數來支撐業務的同時,卻帶來了更多的線程資源佔用和更多的上下文切換開銷。在這樣的背景下,如何增加 Web 應用的吞吐量?這個時候響應式編程就可以登場了。
什麼是響應式編程?
In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.
from WIKIPEDIA
響應式編程(Reactive Programming)是一種基於數據流和變化傳播的聲明式編程範式。
幾個關鍵詞:數據流、變化傳播、聲明式。
先介紹聲明式,一個比較形象的例子是 Excel,Excel 中可以定義單元格 C1 的公式來自 A1+B1,這樣 A1=1、B1=2 時,此時 C1=3,但這裏定義的並不是賦值 C1=3、而是定義 C1 來自 A1+B1 的關係;此時修改 A1=3 時,由於定義的是關係,C1 自動地從 1+2=3 變爲 3+2=5,這就是變化傳播。即:不需要重新賦值一次 C1 = 新的 A1(3)+B1(2)=5,由 A1 從 1 變更爲 3 觸發了 C1 值的變化。A1、B1 作爲觸發變動的源數據,稱爲數據流。
命令式編程(Imperative)和聲明式編程(Declarative)的區別在於命令式編程主要是告訴機器怎麼做拿到結果(命令機器處理一系列邏輯以獲取結果);聲明式編程是告訴機器要什麼結果(讓機器自己處理邏輯只關注結果)。後者聽起來有點像 OKR 管理,事實上聲明式語言包括:數據庫查詢語言(SQL)、正則表達式、邏輯編程、函數式編程和組態管理系統。
響應式編程(Reactive)是告訴機器數據和結果的關係,響應式編程強調變化的思想:數據流是不斷變化的,但是關係是定義好的,它依賴於事件,通過事件驅動實現處理過程。
舉個例子:
命令式編程類似於回到家,打開客廳的燈,換鞋,打開空調,去換衣間開燈,更換睡衣,關掉換衣間的燈,回客廳看電視;
響應式編程類似於回到家(客廳燈光觸發開、空調觸發開),換鞋,去換衣間更換睡衣(燈光觸發開),回客廳看電視(換衣間燈燈光觸發關)。
可以感受一下關注的主體。簡單的說,通過事件驅動,編程者可以關注於業務的觸發邏輯。
爲什麼響應式編程要強調變化傳播?因爲這樣的計算方式能最大化異步的效果。試想,應用可以處理 100 個請求,但是由於有同步等待,就會導致很多請求佔用的 CPU 資源被浪費了。如果整個流程不存在同步等待的操作,通過事件驅動,那麼 CPU 的利用效率會更高。
響應式編程通過將 CPU 核數和處理線程綁定來實現,處理請求的線程並不需要和實際的請求數一一對應,也就是說此前的處理線程數的網絡請求數是 1:1 的關係,基於響應式編程的處理線程數和網絡請求數是 1:N 的關係。實際的業務處理是通過異步實現的。畢竟硬件層面 CPU 核數不可能和處理線程數達到 1:1 的關係,此前的業務的處理必然涉及到頻繁的上下文切換。如果 CPU 核數、處理線程數和網絡請求數以 1:1:N 的模型進行,處理多個請求的同時就避免了頻繁的上下文切換,此外,使用固定的線程數(一般和 CPU 核數匹配)替代高併發的線程配置,也減少了內存佔用。例如:此前線程數 200 可以使用 8 核代替直接減少了 192M 內存佔用(假設未調整 - Xss 參數,每個線程佔用 1MB 空間)。
響應式編程的設計有些類似多路複用或是觀察者模式,是基於回調觸發的。但是不同於大量的 Callable 的應用,因爲後者容易產生回調地獄(Callable Heal,所謂回調地獄是指大量的回調函數聲明後已經嚴重影響編碼的效率和可維護性)。而通過響應式編程這種基於事件驅動的聲明式編程和數據流的訂閱發佈可以優雅地化解這個問題。
這裏有一段來自 Reactor 3 Reference Guide 的代碼示意,主要功能是:在用戶界面顯示用戶的前五個收藏夾、如果沒有給出建議結果。如下所示:
userService.getFavorites(userId, new Callback<List<String>>() {
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
} else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
不難看出上述代碼想表達的就是回調地獄的編碼風格,而使用響應式編程替換上述代碼,如下所示:
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
很簡潔是不是?實際上,還可以給出限定超時時間(題外話:最開始吸引作者研究該技術的正是這個關鍵詞)。
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800))
.onErrorResume(cacheService.cachedFavoritesFor(userId))
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
再舉一個例子,也可以使用 CompletableFuture 做示例:
CompletableFuture<List<String>> ids = ifhIds();
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> {
CompletableFuture<String> nameTask = ifhName(i);
CompletableFuture<Integer> statTask = ifhStat(i);
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
List<String> results = result.join();
assertThat(results).contains(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
同樣用響應式編程的運算符簡化代碼:
Flux<String> ids = ifhrIds();
Flux<String> combinations =
ids.flatMap(id -> {
Mono<String> nameTask = ifhrName(id);
Mono<Integer> statTask = ifhrStat(id);
return nameTask.zipWith(statTask,
(name, stat) -> "Name " + name + " has stats " + stat);
});
Mono<List<String>> result = combinations.collectList();
List<String> results = result.block();
assertThat(results).containsExactly(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
僅從代碼編寫方面看:儘管編碼風格改變巨大,但是可讀性和編碼靈活性大大提升。
什麼是 WebFlux?
下面循序漸進地介紹一些相關概念。
Reactive Streams:
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
Reactive Streams 是一項倡議非阻塞、具有背壓的異步流處理標準。
它是一套用於構建高吞吐量、低延遲應用的規範。響應式流規範是 RxJava 等大牛制定的,同時基於該規範對 RxJava 重構形成了 RxJava 2 更奠定了它的權威性。然而,RxJava 傾向於 Android 端的響應式開發,基於 JVM 的響應式開發則依賴於 Reactor。
Reactor:
Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM.
Reactor 是第四代響應式庫,基於 Reactive Streams 規範的實現,旨在構建 JVM 環境的非阻塞應用。
Reactor 屬於 Pivotal 旗下。對於 Pivotal 可能很多人比較陌生,順便提一下 Pivotal 旗下赫赫有名的 Spring 項目。Reactor 是 Pivotal 推出的支持響應式流規範的 Web 實現,同樣具有非阻塞的基礎和背壓的支持。
響應式編程中的模型類似發佈 / 訂閱模式,但是該模式有個弊端,即當訂閱者消費速度較發佈者生產速度慢的情況下,容易產生消息堆積,而響應式編程的背壓概念就是針對該問題提出的。簡單地說,背壓(BackPressure)指的是一種反向反饋的方式,訂閱者聲明其能夠處理的消費數量,發佈者根據該數量生產消息直至下一次反饋。
同時,需要掌握兩個數據流 Publisher:Mono、Flux。其中,Mono 指的是 0 個或 1 個元素的異步序列,Flux 則表示 0 個到 N 個元素的異步序列。它們都提供了豐富的操作符,可以像 Stream API 一樣使用 map、flatmap、filter、zip 等 Operator,當然也有 onErrorReturn、onErrorResume、onErrorMap 等錯誤處理方式。
下圖是 Flux 處理數據的示意圖:
下圖是 Mono 處理數據的示意圖:
那麼,如何創建一個 Flux 或 Mono 並消費呢?可以參考下面的例子:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(Arrays.asList("foo", "bar", "foobar"));
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
Mono<String> noData = Mono.empty();
Mono<String> data = Mono.just("foo");
WebFlux:
The reactive-stack web framework, Spring WebFlux, was added later in version 5.0. It is fully non-blocking, supports Reactive Streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.
WebFlux 是 Spring Framework 5.0 中引入的新的響應式 Web 框架,與 Spring MVC 不同,它是一套完全異步非阻塞的 Web 框架,並且通過 Reactor 項目實現了 Reactive Streams 規範。適合使用事件驅動的風格構造 Web 應用。運行於 Netty、Undertow、及 Servlet 3.1 以上版本容器中。
併發模型
提到 WebFlux,首先對它的併發模型做下簡單介紹。
請求線程模型:
這種模型下,用戶對於 Web 服務器的請求由不同線程處理,而在很多數據庫請求和網絡 I/O 中是阻塞的。
事實上,這種併發模型是主流的實現方式,當然,它有一個弊端:隨着請求越來越多,一些阻塞的操作令整體模型表現低於預期。
事件循環模型:
我們反覆強調,異步是響應式編程的基礎。因此,在這種模型下,編程風格大幅轉變,程序結構基於異步事件流進行。
這種併發模型使用了較少的線程處理較多的請求,在每個處理線程中並不會阻塞,而是立即註冊回調事件並讓出資源。這便是前文提到的事件驅動(Event Driven)
圖片給出了事件循環的設計圖,可以看出:事件循環依賴少量的線程(通常和實際內核相關,甚至是單線程)進行處理,通過不斷的像平臺註冊事件並立即返回充分利用資源,當觸發完成的回調事件後,通知原調用方。
通過這種模型,可以大幅節省處理線程。
如何實現 WebFlux?
替換 spring-boot-starter 爲 spring-boot-starter-webflux 依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Spring WebFlux 關於 Controller 聲明有兩種方式:功能性和註釋型,基於註釋的在 Spring MVC 中較爲常見:
@RestController
@RequestMapping(value = "/dsp")
public class Controller {
@Autowired
private StrategyService strategyService;
@RequestMapping(value = "/routeA", method = RequestMethod.POST)
public ExampleResponse routeA(@RequestBody Map<String, String> requestBody) {
return strategyService.methodA();
}
@RequestMapping(value = "/routeB", method = RequestMethod.POST)
public ExampleResponse routeB(@RequestBody Map<String, String> requestBody) {
return strategyService.methodB();
}
@RequestMapping(value = "/routeC", method = RequestMethod.POST)
public ExampleResponse routeC(@RequestBody Map<String, String> requestBody) {
return strategyService.methodC();
}
}
基於功能性的方式則需要適應。本文作者更推薦嘗試該方式,使用這種方式會更清晰:
@Configuration
public class Router {
@Bean
public RouterFunction<ServerResponse> route(StrategyHandler strategyHandler) {
return RouterFunctions
.route(RequestPredicates.POST("/dsp/routeA")
.and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)), strategyHandler::methodA)
.andRoute(RequestPredicates.POST("/dsp/routeB")
.and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)), strategyHandler::methodB)
.andRoute(RequestPredicates.POST("/dsp/routeC")
.and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)), strategyHandler::methodC);
}
}
接着,定義一個示例方法:
public Mono<ServerResponse> methodA(ServerRequest serverRequest) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(exampleService.dealWithRequest(
serverRequest.bodyToMono(String.class), serverRequest),
String.class);
}
由於響應式編程主要是基於異步非阻塞的,這對系統提出了一個較爲重要的隱式要求,即:全程不要有阻塞操作,可以使用 WebClient、Lettuce、RJDBC 等技術,這裏以 WebClient 爲例改造項目。
WebClient 是隨着 Spring Framework 5.0 推出的支持響應式編程的非阻塞 HTTP 請求客戶端工具,隨 WebFlux 提供,也就是說,不需要引入額外的依賴。
Mono<String> response = null;
switch (method) {
case GET:
response = WebClient.create()
.method(HttpMethod.GET)
.uri(requestUrl)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.retrieve()
.bodyToMono(String.class);
break;
case POST:
response = WebClient.create()
.method(HttpMethod.POST)
.uri(requestUrl)
.body(Mono.just(requestBody), String.class)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.headers(headers -> headers.addAll(httpHeaders))
.retrieve()
.bodyToMono(String.class);
break;
default:
break;
}
可以看出使用並不複雜,值得一提的是關於 WebClient 有幾個方法:retrieve()、exchange()、subscribe()、block()。
其中,retrieve 和 exchange 都是處理 HTTP 請求並返回響應,區別在於 retrieve 只能拿到 body 信息;exchange 可以獲取更多信息、包括狀態碼、請求頭等信息,然而 exchange() 在 5.3 版本後由於內存連接泄露被廢棄、改爲 exchangeToMono()、exchangeToFlux()。
subscribe 是非阻塞方法,用於異步訂閱響應結果,不會阻塞主線程執行;block 是阻塞方法,可以獲取返回結果,可以在測試中使用,特別注意的是:生產中不建議使用,因爲有限的請求資源會被阻塞掉、極大地影響性能表現。
WebClient 也可以使用 builder 模式,例如下面例子實現了讀寫超時時間的控制,自行嘗試即可。
@Bean
public WebClient webClient()
{
HttpClient httpClient = HttpClient.create()
.tcpConfiguration(client ->
client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100)
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10))
)
);
ClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
return WebClient.builder()
.baseUrl("http://localhost:8080/findAll")
.clientConnector(connector)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
使用中,如果出現 DataBufferLimitException 異常,是由於 WebFlux 限制了數據緩衝的默認內存限制爲 256KB,可以嘗試調整參數解決:
spring.codec.max-in-memory-size=1MB
事實上,一個成熟的大型項目需要改造的內容遠不止這些,由於篇幅有限,不做過多贅述。
Quasar
爲什麼需要使用協程?
前面提到了使用 WebFlux 改造項目,但遺憾的是,不是所有的方法都是非阻塞的,可以通過切換技術棧進行改造,但還是不可避免的會遇到無法替代的方案。改成響應式編程後,如果存在同步任務,性能不升反降,需要保證全流程幾乎純異步。
這時候直接有效的方式是使用異步方式處理,可能首先想到的是 Future,但即使是 Future 模式,get 的環節也是阻塞處理的,這點需要注意;而 JDK1.8 加入的 CompletableFuture 在使用特殊方法如 whenComplete 是非阻塞的。
如果考慮到編碼的複雜度,一定要通過線程等待的話,或者說有些任務就是併發處理的,如何實現呢?我們知道,每個 Java 線程需要一個棧空間保存掛起狀態,64 位的環境中默認是 1MB,也就是說線程的增加是伴隨着內存使用增長的,線程數過多的後果就是 OutOfMemoryError。是否可以壓縮線程的空間呢,可以通過 - Xss 調節,但是不具備實際意義,因爲遞歸調用的層次過多會導致 StackOverflowError,爲了等待這些阻塞的任務執行要浪費 N*1MB。那麼如何節約這部分資源呢?答案是用輕量的方式實現等待或併發任務,由於使用常規線程做異步等待比較浪費資源,可以考慮使用輕量級的協程做異步等待。即:使用協程。
什麼是協程?
協程也可以稱爲纖程,英文 Fiber 或 Coroutine,主要來自語法層面實現的區別,本文統一稱爲協程。如果說進程和線程的模型可以看做 1:N 的關係;線程和協程也可以看做 1:N 的關係,換言之就是更輕量的線程。
協程的概念並不新鮮,Go 語言的協程已經很出名,實際上 Java 早年也是支持協程的,那時候叫做 GreenThread,運行於用戶空間,但其儘可以用 1:1 綁定,不能與系統內核實現 M:N 綁定,所以後續被線程綁定的模型代替。目前 Java 語言環境的解決方案有:Parallel Universe 公司開發的 Quasar,Oracle 主導的 Project Loom,以及 Kotlin 的語言支持。由於 Loom 目前還沒有正式完成,Kotlin 需要語言混用,本文使用 Quasar 實現。
協程的原理?
首先了解下棧的作用,棧主要用於保存函數調用之後的返回位置。協程的實現方式從棧的角度區分主要分爲兩種:Stackless 和 Stackful。以下是兩大實現方式的主流代表:
Stackless:C#、Scala、Kotlin
Stackful:Go、Quasar、Javaflow
主要區別在於是否需要固定的棧內存,如名字含義 Stackful 是有棧內存的,繼續執行任務時從棧的位置執行,而 Stackless 需要編譯器支持、生成代碼自定義繼續執行的邏輯;至於性能方面 Stackful 稍微影響 CPU 的分支預測,繼而影響性能,但幾乎可以忽略,而 Stackless 沒有影響;內存方面 Stack 需要固定分配佔內存(如 4KB),而 Stackless 幾乎不佔用內存;實現方面,Stackless 的更爲複雜,對編譯器作者的挑戰極高,而 Stackful 不會像 Stackless 那麼高難度。
Stackful 在調用時可以保存棧,暫停和恢復類似線程,主要區別在於調度;而 Stackless 則通過中斷,簡單的理解是個掛起點,類似回調實現。可以看出,大體的原理都是保存回調位置,掛起阻塞的任務,通過恢復來快速切換,充分利用計算資源。
那麼,應該不難理解協程的輕量級含義了,它幾乎沒有自己的棧內存,而且不和實際的線程映射,也就不需要處理器的上下文切換。
總結下來,協程的幾個好處是以下三點:避免了內核態和用戶態切換的成本,在用戶空間切換;用更少的棧空間創建類線程。
Quasar 的原理?
Quasar 的原理主要是通過 SuspendExecution 來實現的,爲什麼這麼說?它會織入字節碼,在方法調用的前後插入代碼用於保存和恢復 Fiber 棧本地變量的狀態,記錄暫停點,隨後 park 掛起 Fiber、拋出 SuspendExecution 異常,當方法被阻塞的時候,該異常被捕獲。當喚醒時,方法被調用,立即跳到那一行。原理不是很簡單,但是性能損失僅約爲 3%~5%。
如何使用 Quasar?
首先需要引入依賴,Quasar 最新版本是 0.8.0 支持 JDK11 及以上,由於公司的 JDK 環境是 1.8,所以使用 0.7.* 的 Quasar 版本。
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.7.4</version>
<classifier>jdk8</classifier>
</dependency>
Quasar 依賴 Java 的字節碼織入,需要修改 JavaAgent,增加啓動命令:
-javaagent:path-to-quasar-jar.jar
修改 JavaAgent 也可以用另一種實現:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- Turn off before production -->
<!-- argLine>-Dco.paralleluniverse.fibers.verifyInstrumentation=true</argLine -->
<!-- Enable if using compile-time (AoT) instrumentation -->
<!-- argLine>-Dco.paralleluniverse.fibers.disableAgentWarning</argLine -->
<!-- Quasar Agent for JDK 8 -->
<argLine>-javaagent:${co.paralleluniverse:quasar-core:jar:jdk8}</argLine>
</configuration>
</plugin>
Quasar 的 Fiber 使用風格和 JDK 內置的 runnable 差別不大,如下:
new Fiber<V>() {
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();
如果不需要返回值,則設置泛型 V 爲 Void,返回 null 即可。
new Fiber<Void>(new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();
當然,可以使用 Lambda 表達式簡化爲:
new Fiber<Void>((SuspendableRunnable) () -> {
// your code
}).start();
但無論哪種,一定要允許方法拋出 SuspendExecution 異常,因爲 Fiber 正是通過捕獲該異常控制掛起及恢復的。
項目中建議使用線程池,因此線程池可以改造爲 Fiber 支持的形式:
@Bean(name = EXECUTOR)
public Executor setThreadPoolExecutor() {
/* Do not drop task */
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(getMaxPoolSize());
threadPoolTaskExecutor.setThreadNamePrefix("Executor_");
threadPoolTaskExecutor.setQueueCapacity(200);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(false);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.initialize();
// return threadPoolTaskExecutor;
return new FiberExecutorScheduler("Fiber_", threadPoolTaskExecutor);
}
值得注意的是關於 Fiber 的逃逸問題,大批量的 Fiber 逃逸會嚴重影響性能,可以通過監控避免。
項目中如果大量使用了 ThreadLocal 或者 InheritableThreadLocal,甚至是 TransmittableThreadLocal,要尤其注意 Fiber 下的實際表現。慶幸的是,關於 ThreadLocal 或者 InheritableThreadLocal,Quasar 源碼中有單元測試可供參考。
技術實踐
應用場景
廣告投放。廣告投放負責承接媒體流量,進行受衆定向後,結合廣告召回、排序、出價等策略,進行站外投放等目的,最終達到拉活、拉新等站外引流的效果。
技術挑戰
由於全網媒體流量衆多,廣告服務承載的流量高達幾百億每天,QPS 也是百萬每秒,而媒體給出的 RT 要求是 40~100ms 之間,不難看出,廣告投放對於響應效率要求極爲苛刻。
語言情況
服務語言棧引擎使用 Go 語言;策略系統使用 Java 語言。本篇文章改造的目標是策略系統這部分 Java 語言的應用。
架構簡介
效果回收
在測試項目進行了遷移實驗,可以看到使用了 WebFlux+Quasar 的 TPS 提升明顯,可以達到此前的 294%;平響、錯誤率、95 線幾乎持平。
儘管根據預估還有提升空間,但目前的效果已經非常顯著,後續將進一步思考如何優化模型調用的性能表現及儘快完成遷移。
遷移成本
JavaWeb 有個特點,對於新鮮的技術耳熟能詳,但是實際應用上較爲保守,所以實際使用案例少之又少,加上響應式編程對於傳統編程的區別以及纖程在處理併發細節等問題增加了學習成本,團隊推行難度更大,也需要更謹慎。
由於 Java 的業務衆多,難以保證所有的業務場景都能適用,例如,在 RJDBC 出現前,操作數據庫的阻塞問題始終無法解決,也阻礙了一部分開發者切換 WebFlux 的步伐。
實際項目使用中,要考慮是不是所有的操作都是非阻塞的,這個尤爲重要,響應式編程一旦內部流程有阻塞的過程,不但無法提升性能,反而會嚴重降低性能。所以,如果效果不夠理想,建議關注下是否存在阻塞操作及纖程逃逸現象。
綜上所述,流程的非阻塞、異步化改造和學習曲線註定會有一定的遷移成本。
結語(總結和展望)
在高併發的場景下,常規的性能優化已經難以滿足業務需求。如何深度優化服務吞吐表現以支撐業務發展是持續努力的方向,本文通過使用 WebFlux 技術解決了高併發場景下吞吐量瓶頸的難題,通過使用 Quasar 技術解決了併發計算和異步等待的內存消耗和線程切換成本,同時取得了很理想的結果,TPS 提升約 3 倍。
同時不難發現,將一個成熟的項目做文中的升級是極具挑戰的,主要體現在學習曲線、業務適用性及技術改造等方面。因此,在實際生產中,項目負責人可以根據情況分析、實驗並嘗試,但不代表使用了該技術一定是最合適的選擇。
儘管如此,我們還是欣喜地看到,該技術應用得當的話,性能提升是令人興奮的。同時,可以使代碼編寫更簡潔、業務處理更清晰。
無論如何,實現成本是不能被忽視的,本文作者曾經嘗試將工具二次封裝減少調用成本,降低使用門檻,最後發現,推廣的困難並不侷限於此,實際開發時還是需要掌握原理才能避免出錯。因此,在國內互聯網公司普遍停留於 JDK1.8 版本的背景下,如何嘗試一些新鮮技術並應用於生產環境是個值得思考的問題。
參考文獻
1. [Reactive Programming](https://en.wikipedia.org/wiki/Reactive_programming)
2. [Project Reactor](https://projectreactor.io)
3. [Quasar](http://docs.paralleluniverse.co/quasar)
4. [多線程併發方案的不足——響應式Spring的道法術器](https://blog.csdn.net/get_set/article/details/79553262)
5. [響應式流——響應式Spring的道法術器](https://blog.csdn.net/get_set/article/details/79466402)
6. [讓 CPU 告訴你硬盤和網絡到底有多慢](https://cizixs.com/2017/01/03/how-slow-is-disk-and-network)
7. [外行人都能看懂的WebFlux,錯過了血虧!](https://segmentfault.com/a/1190000021038373)
8. [背壓和響應流標準](https://www.jianshu.com/p/14af721188ea)
9. [Java 9 - 說說響應式流](https://yanbin.blog/java-9-talk-reactive-stream/#more-8877)
10. [命令式編程(Imperative) vs聲明式編程( Declarative)](https://zhuanlan.zhihu.com/p/34445114)
11. [Spring boot Webclient's retrieve vs exchange](https://stackoverflow.com/questions/58410352/spring-boot-webclients-retrieve-vs-exchange)
12. [Spring WebClient](https://howtodoinjava.com/spring-webflux/webclient-get-post-example)
13. [Tag: WebFlux](https://www.baeldung.com/tag/webflux)
14. [Java中的纖程庫 - Quasar](https://colobu.com/2016/07/14/Java-Fiber-Quasar)
15. [Java線程與Xss](https://segmentfault.com/a/1190000004694232)
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/GDswfPmiHoIwdi5XaTELfw