Java 8 的 Stream 流那麼強大,你知道它的原理嗎?

大家好,我是不才陳某~

Java 8 API 添加了一個新的抽象稱爲流 Stream,可以讓你以一種聲明的方式處理數據。

Stream 使用一種類似用 SQL 語句從數據庫查詢數據的直觀方式來提供一種對 Java 集合運算和表達的高階抽象。

Stream API 可以極大提高 Java 程序員的生產力,讓程序員寫出高效率、乾淨、簡潔的代碼。

本文會對 Stream 的實現原理進行剖析。

Stream 的組成與特點

Stream(流)是一個來自數據源的元素隊列並支持聚合操作:

和以前的Collection操作不同, Stream 操作還有兩個基礎的特徵:

和迭代器又不同的是,Stream 可以並行化操作,迭代器只能命令式地、串行化操作。顧名思義,當使用串行方式去遍歷時,每個 item 讀完後再讀下一個 item。而使用並行去遍歷時,數據會被分成多個段,其中每一個都在不同的線程中處理,然後將結果一起輸出。

Stream 的並行操作依賴於 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。Java 的並行 API 演變歷程基本如下:

1.0-1.4 中的 java.lang.Thread

5.0 中的 java.util.concurrent

6.0 中的 Phasers 等

7.0 中的 Fork/Join 框架

8.0 中的 Lambda

Stream具有平行處理能力,處理的過程會分而治之,也就是將一個大任務切分成多個小任務,這表示每個任務都是一個操作:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(out::println);

可以看到一行簡單的代碼就幫我們實現了並行輸出集合中元素的功能,但是由於並行執行的順序是不可控的所以每次執行的結果不一定相同。

如果非得相同可以使用forEachOrdered方法執行終止操作:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEachOrdered(out::println);

這裏有一個疑問,如果結果需要有序,是否和我們的並行執行的初衷相悖?是的,這個場景下明顯無需使用並行流,直接用串行流執行即可, 否則性能可能更差,因爲最後又強行將所有並行結果進行了排序。

OK,下面我們先介紹一下Stream接口的相關知識。

BaseStream 接口

Stream的父接口是BaseStream,後者是所有流實現的頂層接口,定義如下:

public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {
    Iterator<T> iterator();

    Spliterator<T> spliterator();

    boolean isParallel();

    S sequential();

    S parallel();

    S unordered();

    S onClose(Runnable closeHandler);

    void close();
}

其中,T爲流中元素的類型,S爲一個BaseStream的實現類,它裏面的元素也是T並且S同樣是自己:

S extends BaseStream<T, S>

是不是有點暈?

其實很好理解,我們看一下接口中對S的使用就知道了:如sequential()parallel()這兩個方法,它們都返回了S實例,也就是說它們分別支持對當前流進行串行或者並行的操作,並返回「改變」後的流對象。

如果是並行一定涉及到對當前流的拆分,即將一個流拆分成多個子流,子流肯定和父流的類型是一致的。子流可以繼續拆分子流,一直拆分下去…

也就是說這裏的SBaseStream的一個實現類,它同樣是一個流,比如StreamIntStreamLongStream等。

Stream 接口

再來看一下Stream的接口聲明:

public interface Stream<T> extends BaseStream<T, Stream<T>>

參考上面的解釋這裏不難理解:即Stream<T>可以繼續拆分爲Stream<T>,我們可以通過它的一些方法來證實:

Stream<T> filter(Predicate<? super T> predicate);
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
Stream<T> sorted();
Stream<T> peek(Consumer<? super T> action);
Stream<T> limit(long maxSize);
Stream<T> skip(long n);
...

這些都是操作流的中間操作,它們的返回結果必須是流對象本身。

關閉流操作

BaseStream 實現了 AutoCloseable 接口,也就是 close() 方法會在流關閉時被調用。同時,BaseStream 中還給我們提供了onClose()方法:

S onClose(Runnable closeHandler);

AutoCloseableclose()接口被調用的時候會觸發調用流對象的onClose()方法,但有幾點需要注意:

並行流和串行流

BaseStream接口中分別提供了並行流串行流兩個方法,這兩個方法可以任意調用若干次,也可以混合調用,但最終只會以最後一次方法調用的返回結果爲準

參考parallel()方法的說明:

Returns an equivalent stream that is parallel. May return

itself, either because the stream was already parallel, or because

the underlying stream state was modified to be parallel.

所以多次調用同樣的方法並不會生成新的流,而是直接複用當前的流對象。

下面的例子裏以最後一次調用parallel()爲準,最終是並行地計算sum

stream.parallel()
   .filter(...)
   .sequential()
   .map(...)
   .parallel()
   .sum();

ParallelStream 背後的男人:ForkJoinPool

ForkJoin 框架是從 JDK7 中新特性,它同 ThreadPoolExecutor 一樣,也實現了 Executor 和 ExecutorService 接口。它使用了一個「無限隊列」來保存需要執行的任務,而線程的數量則是通過構造函數傳入, 如果沒有向構造函數中傳入希望的線程數量,那麼當前計算機可用的 CPU 數量會被設置爲線程數量作爲默認值。

ForkJoinPool 主要用來使用分治法 (Divide-and-Conquer Algorithm) 來解決問題,典型的應用比如_快速排序算法_。這裏的要點在於,ForkJoinPool 需要使用相對少的線程來處理大量的任務。

比如要對 1000 萬個數據進行排序,那麼會將這個任務分割成兩個500 萬的排序任務一個針對這兩組500萬數據的合併任務

以此類推,對於 500 萬的數據也會做出同樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,停止這樣的分割處理。比如,當元素的數量小於 10 時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概 2000000 + 個。

問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行,想象一下歸併排序的過程。

所以當使用 ThreadPoolExecutor 時,使用分治法會存在問題,因爲 ThreadPoolExecutor 中的線程無法向 任務隊列中再添加一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的線程創建新的任務,並掛起當前的任務,此時線程就能夠從隊列中選擇子任務執行

那麼使用 ThreadPoolExecutor 或者 ForkJoinPool,會有什麼性能的差異呢?

首先,使用 ForkJoinPool 能夠使用數量有限的線程來完成非常多的具有「父子關係」的任務,比如使用 4 個線程來完成超過 200 萬個任務。使用 ThreadPoolExecutor 時,是不可能完成的,因爲 ThreadPoolExecutor 中的 Thread 無法選擇優先執行子任務,需要完成 200 萬個具有父子關係的任務時,也需要 200 萬個線程,顯然這是不可行的。

Work Stealing 原理:

  1. 每個工作線程都有自己的工作隊列 WorkQueue;

  2. 這是一個雙端隊列 dequeue,它是線程私有的;

  3. ForkJoinTask 中 fork 的子任務,將放入運行該任務的工作線程的隊頭,工作線程將以 LIFO 的順序來處理工作隊列中的任務,即堆棧的方式;

  4. 爲了最大化地利用 CPU,空閒的線程將從其它線程的隊列中「竊取」任務來執行

  5. 但是是從工作隊列的尾部竊取任務,以減少和隊列所屬線程之間的競爭;

  6. 雙端隊列的操作:push()/pop() 僅在其所有者工作線程中調用,poll() 是由其它線程竊取任務時調用的;

  7. 當只剩下最後一個任務時,還是會存在競爭,是通過 CAS 來實現的;

用 ForkJoinPool 的眼光來看 ParallelStream

Java 8 爲 ForkJoinPool 添加了一個通用線程池,這個線程池用來處理那些沒有被顯式提交到任何線程池的任務。它是 ForkJoinPool 類型上的一個靜態元素,它擁有的默認線程數量等於運行計算機上的 CPU 數量。

當調用 Arrays 類上添加的新方法時,自動並行化就會發生。

比如用來排序一個數組的並行快速排序,用來對一個數組中的元素進行並行遍歷。自動並行化也被運用在 Java 8 新添加的 Stream API 中。

比如下面的代碼用來遍歷列表中的元素並執行需要的操作:

List<UserInfo> userInfoList =
        DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);

對於列表中的元素的操作都會以並行的方式執行。forEach方法會爲每個元素的計算操作創建一個任務,該任務會被前文中提到的ForkJoinPool中的 commonPool 處理。

以上的並行計算邏輯當然也可以使用ThreadPoolExecutor完成,但是就代碼的可讀性和代碼量而言,使用ForkJoinPool明顯更勝一籌。

對於ForkJoinPool通用線程池的線程數量,通常使用默認值就可以了,即運行時計算機的處理器數量。也可以通過設置系統屬性:-Djava.util.concurrent .ForkJoinPool.common.parallelism=N (N 爲線程數量), 來調整ForkJoinPool的線程數量。

值得注意的是,當前執行的線程也會被用來執行任務,所以最終的線程個數爲N+1,1 就是當前的主線程

這裏就有一個問題,如果你在並行流的執行計算使用了_阻塞操作_,如 I/O,那麼很可能會導致一些問題:

public static String query(String question) {
  List<String> engines = new ArrayList<String>();
  engines.add("http://www.google.com/?q=");
  engines.add("http://duckduckgo.com/?q=");
  engines.add("http://www.bing.com/search?q=");

  // get element as soon as it is available
  Optional<String> result = engines.stream().parallel().map((base) - {
    String url = base + question;
    // open connection and fetch the result
    return WS.url(url).get();
  }).findAny();
  return result.get();
}

這個例子很典型,讓我們來分析一下:

正如我們上面那個列子的情況分析得知,lambda 的執行並不是瞬間完成的, 所有使用 parallel streams 的程序都有可能成爲阻塞程序的源頭, 並且在執行過程中程序中的其他部分將無法訪問這些 workers,這意味着任何依賴 parallel streams 的程序在什麼別的東西佔用着 common ForkJoinPool 時將會變得不可預知並且暗藏危機。

小結:

  1. 當需要處理遞歸分治算法時,考慮使用 ForkJoinPool。

  2. 仔細設置不再進行任務劃分的閾值,這個閾值對性能有影響。

  3. Java 8 中的一些特性會使用到 ForkJoinPool 中的通用線程池。在某些場合下,需要調整該線程池的默認的線程數量

  4. lambda 應該儘量避免副作用,也就是說,避免突變基於堆的狀態以及任何 IO

  5. lambda 應該互不干擾,也就是說避免修改數據源(因爲這可能帶來線程安全的問題)

  6. 避免訪問在流操作生命週期內可能會改變的狀態

並行流的性能

並行流框架的性能受以下因素影響:

源數據結構分爲以下 3 組:

注意:下面幾個部分節選自:Streams 的幕後原理,順便感謝一下作者_Brian Goetz_,寫的太通透了。

NQ 模型

要確定並行性是否會帶來提速,需要考慮的最後兩個因素是:可用的數據量和針對每個數據元素執行的計算量。

在我們最初的並行分解描述中,我們採用的概念是拆分來源,直到分段足夠小,以致解決該分段上的問題的順序方法更高效。分段大小必須依賴於所解決的問題,確切的講,取決於每個元素完成的工作量。

例如,計算一個字符串的長度涉及的工作比計算字符串的 SHA-1 哈希值要少得多。爲每個元素完成的工作越多,“大到足夠利用並行性” 的閾值就越低。類似地,擁有的數據越多, 拆分的分段就越多,而不會與 “太小” 閾值發生衝突。

一個簡單但有用的並行性能模型是 NQ 模型,其中 N 是數據元素數量,Q 是爲每個元素執行的工作量。乘積 N*Q 越大,就越有可能獲得並行提速。對於具有很小的 Q 的問題,比如對數字求和,您通常可能希望看到 N > 10,000 以獲得提速;隨着 Q 增加,獲得提速所需的數據大小將會減小。

並行化的許多阻礙(比如拆分成本、組合成本或遇到順序敏感性)都可以通過 Q 更高的操作來緩解。儘管拆分某個 LinkedList 特徵的結果可能很糟糕,但只要擁有足夠大的 Q,仍然可能獲得並行提速。

遇到順序

遇到順序指的是來源分發元素的順序是否對計算至關重要。一些來源(比如基於哈希的集合和映射)沒有有意義的遇到順序。流標誌 ORDERED 描述了流是否有有意義的遇到順序。

JDK 集合的 spliterator 會根據集合的規範來設置此標誌;

一些中間操作可能注入 ORDERED (sorted()) 或清除它 (unordered())。

如果流沒有遇到順序,大部分流操作都必須遵守該順序。對於順序執行,會「自動保留遇到順序」,因爲元素會按遇到它們的順序自然地處理。

甚至在並行執行中,許多操作(無狀態中間操作和一些終止操作(比如 reduce())),遵守遇到順序不會產生任何實際成本。

但對於其他操作(有狀態中間操作,其語義與遇到順序關聯的終止操作,比如 findFirst() 或 forEachOrdered()), 在並行執行中遵守遇到順序的責任可能很重大。

如果流有一個已定義的遇到順序,但該順序對結果沒有意義, 那麼可以通過使用 unordered() 操作刪除 ORDERED 標誌,加速包含順序敏感型操作的管道的順序執行。

作爲對遇到順序敏感的操作的示例,可以考慮 limit(),它會在指定大小處截斷一個流。在順序執行中實現 limit() 很簡單:保留一個已看到多少元素的計數器,在這之後丟棄任何元素。

但是在並行執行中,實現 limit() 要複雜得多;您需要保留前 N 個元素。此要求大大限制了利用並行性的能力;如果輸入劃分爲多個部分,您只有在某個部分之前的所有部分都已完成後,才知道該部分的結果是否將包含在最終結果中。

因此,該實現一般會錯誤地選擇不使用所有可用的核心,或者緩存整個試驗性結果,直到您達到目標長度。

如果流沒有遇到順序,limit() 操作可以自由選擇任何 N 個元素,這讓執行效率變得高得多。知道元素後可立即將其發往下游, 無需任何緩存,而且線程之間唯一需要執行的協調是發送一個信號來確保未超出目標流長度。

遇到順序成本的另一個不太常見的示例是排序。如果遇到順序有意義,那麼 sorted() 操作會實現一種穩定 排序 (相同的元素按照它們進入輸入時的相同順序出現在輸出中),而對於無序的流,穩定性(具有成本)不是必需的。

distinct() 具有類似的情況:如果流有一個遇到順序,那麼對於多個相同的輸入元素,distinct() 必須發出其中的第一個, 而對於無序的流,它可以發出任何元素 — 同樣可以獲得高效得多的並行實現。

在使用 collect() 聚合時會遇到類似的情形。如果在無序流上執行 collect(groupingBy()) 操作, 與任何鍵對應的元素都必須按它們在輸入中出現的順序提供給下游收集器。

此順序對應用程序通常沒有什麼意義,而且任何順序都沒有意義。在這些情況下,可能最好選擇一個併發 收集器(比如 groupingByConcurrent()),它可以忽略遇到順序, 並讓所有線程直接收集到一個共享的併發數據結構中(比如 ConcurrentHashMap),而不是讓每個線程收集到它自己的中間映射中, 然後再合併中間映射(這可能產生很高的成本)。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YA20tx0aCMkA2GGbRdpFVQ