天天在用 Stream,你知道如此強大的 Stream 的實現原理嗎?

前面我們已經學會如何使用 Stream API,用起來真的很爽,但簡潔的方法下面似乎隱藏着無盡的祕密,如此強大的 API 是如何實現的呢?比如 Pipeline 是怎麼執行的,每次方法調用都會導致一次迭代嗎?自動並行又是怎麼做到的,線程個數是多少?本節我們學習 Stream 流水線的原理,這是 Stream 實現的關鍵所在。

首先回顧一下容器執行 Lambda 表達式的方式,以ArrayList.forEach()方法爲例,具體代碼如下:

// ArrayList.forEach()
public void forEach(Consumer<? super E> action) {
    ...
    for (int i=0; modCount == expectedModCount && i < size; i++) {
        action.accept(elementData[i]);// 回調方法
    }
    ...
}

我們看到ArrayList.forEach()方法的主要邏輯就是一個 for 循環,在該 for 循環裏不斷調用action.accept()回調方法完成對元素的遍歷。這完全沒有什麼新奇之處,回調方法在 Java GUI 的監聽器中廣泛使用。Lambda 表達式的作用就是相當於一個回調方法,這很好理解。

Stream API 中大量使用 Lambda 表達式作爲回調方法,但這並不是關鍵。理解 Stream 我們更關心的是另外兩個問題:流水線和自動並行。使用 Stream 或許很容易寫入如下形式的代碼:

int longestStringLengthStartingWithA
        = strings.stream()
              .filter(s -> s.startsWith("A"))
              .mapToInt(String::length)
              .max();

上述代碼求出以字母 A 開頭的字符串的最大長度,一種直白的方式是爲每一次函數調用都執一次迭代,這樣做能夠實現功能,但效率上肯定是無法接受的。類庫的實現着使用流水線(Pipeline)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中儘可能多的執行用戶指定的操作。爲講解方便我們彙總了 Stream 的所有操作。

sneAZT

Stream 上的所有操作分爲兩類:中間操作和結束操作,中間操作只是一種標記,只有結束操作纔會觸發實際計算。中間操作又可以分爲無狀態的 (Stateless) 和有狀態的 (Stateful),無狀態中間操作是指元素的處理不受前面元素的影響,而有狀態的中間操作必須等到所有元素處理之後才知道最終結果,比如排序是有狀態操作,在讀取所有元素之前並不能確定排序結果;結束操作又可以分爲短路操作和非短路操作,短路操作是指不用處理全部元素就可以返回結果,比如_找到第一個滿足條件的元素_。之所以要進行如此精細的劃分,是因爲底層對每一種情況的處理方式不同。爲了更好的理解流的中間操作和終端操作,可以通過下面的兩段代碼來看他們的執行過程。

IntStream.range(1, 10)
   .peek(x -> System.out.print("\nA" + x))
   .limit(3)
   .peek(x -> System.out.print("B" + x))
   .forEach(x -> System.out.print("C" + x));

輸出爲:A1B1C1 A2B2C2 A3B3C3 中間操作是懶惰的,也就是中間操作不會對數據做任何操作,直到遇到了最終操作。而最終操作,都是比較熱情的。他們會往前回溯所有的中間操作。也就是當執行到最後的 forEach 操作的時候,它會回溯到它的上一步中間操作,上一步中間操作,又會回溯到上上一步的中間操作,...,直到最初的第一步。第一次 forEach 執行的時候,會回溯 peek 操作,然後 peek 會回溯更上一步的 limit 操作,然後 limit 會回溯更上一步的 peek 操作,頂層沒有操作了,開始自上向下開始執行,輸出:A1B1C1 第二次 forEach 執行的時候,然後會回溯 peek 操作,然後 peek 會回溯更上一步的 limit 操作,然後 limit 會回溯更上一步的 peek 操作,頂層沒有操作了,開始自上向下開始執行,輸出:A2B2C2

... 當第四次 forEach 執行的時候,然後會回溯 peek 操作,然後 peek 會回溯更上一步的 limit 操作,到 limit 的時候,發現 limit(3) 這個 job 已經完成,這裏就相當於循環裏面的 break 操作,跳出來終止循環。

再來看第二段代碼:

IntStream.range(1, 10)
   .peek(x -> System.out.print("\nA" + x))
   .skip(6)
   .peek(x -> System.out.print("B" + x))
   .forEach(x -> System.out.print("C" + x));

輸出爲:A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9 第一次 forEach 執行的時候,會回溯 peek 操作,然後 peek 會回溯更上一步的 skip 操作,skip 回溯到上一步的 peek 操作,頂層沒有操作了,開始自上向下開始執行,執行到 skip 的時候,因爲執行到 skip,這個操作的意思就是跳過,下面的都不要執行了,也就是就相當於循環裏面的 continue,結束本次循環。輸出:A1

第二次 forEach 執行的時候,會回溯 peek 操作,然後 peek 會回溯更上一步的 skip 操作,skip 回溯到上一步的 peek 操作,頂層沒有操作了,開始自上向下開始執行,執行到 skip 的時候,發現這是第二次 skip,結束本次循環。輸出:A2

...

第七次 forEach 執行的時候,會回溯 peek 操作,然後 peek 會回溯更上一步的 skip 操作,skip 回溯到上一步的 peek 操作,頂層沒有操作了,開始自上向下開始執行,執行到 skip 的時候,發現這是第七次 skip,已經大於 6 了,它已經執行完了 skip(6) 的 job 了。這次 skip 就直接跳過,繼續執行下面的操作。輸出:A7B7C7

... 直到循環結束。

一種直白的實現方式

Stream_pipeline_naive

仍然考慮上述求最長字符串的程序,一種直白的流水線實現方式是爲每一次函數調用都執一次迭代,並將處理中間結果放到某種數據結構中(比如數組,容器等)。具體說來,就是調用filter()方法後立即執行,選出所有以 A 開頭的字符串並放到一個列表 list1 中,之後讓 list1 傳遞給mapToInt()方法並立即執行,生成的結果放到 list2 中,最後遍歷 list2 找出最大的數字作爲最終結果。程序的執行流程如如所示:

這樣做實現起來非常簡單直觀,但有兩個明顯的弊端:

  1. 迭代次數多。迭代次數跟函數調用的次數相等。

  2. 頻繁產生中間結果。每次函數調用都產生一次中間結果,存儲開銷無法接受。

這些弊端使得效率底下,根本無法接受。如果不使用 Stream API 我們都知道上述代碼該如何在一次迭代中完成,大致是如下形式:

int longest = 0;
for(String str : strings){
    if(str.startsWith("A")){// 1. filter(), 保留以A開頭的字符串
        int len = str.length();// 2. mapToInt(), 轉換成長度
        longest = Math.max(len, longest);// 3. max(), 保留最長的長度
    }
}

採用這種方式我們不但減少了迭代次數,也避免了存儲中間結果,顯然這就是流水線,因爲我們把三個操作放在了一次迭代當中。只要我們事先知道用戶意圖,總是能夠採用上述方式實現跟 Stream API 等價的功能,但問題是 Stream 類庫的設計者並不知道用戶的意圖是什麼。如何在無法假設用戶行爲的前提下實現流水線,是類庫的設計者要考慮的問題。

Stream 流水線解決方案

我們大致能夠想到,應該採用某種方式記錄用戶每一步的操作,當用戶調用結束操作時將之前記錄的操作疊加到一起在一次迭代中全部執行掉。沿着這個思路,有幾個問題需要解決:

  1. 用戶的操作如何記錄?

  2. 操作如何疊加?

  3. 疊加之後的操作如何執行?

  4. 執行後的結果(如果有)在哪裏?

>> 操作如何記錄

Java_stream_pipeline_classes

注意這裏使用的是 “操作 (operation)”一詞,指的是 “Stream 中間操作” 的操作,很多 Stream 操作會需要一個回調函數(Lambda 表達式),因此一個完整的操作是< 數據來源,操作,回調函數 > 構成的三元組。Stream 中使用 Stage 的概念來描述一個完整的操作,並用某種實例化後的 PipelineHelper 來代表 Stage,將具有先後順序的各個 Stage 連到一起,就構成了整個流水線。跟 Stream 相關類和接口的繼承關係圖示。

還有 IntPipeline, LongPipeline, DoublePipeline 沒在圖中畫出,這三個類專門爲三種基本類型(不是包裝類型)而定製的,跟 ReferencePipeline 是並列關係。圖中 Head 用於表示第一個 Stage,即調用調用諸如 _Collection.stream()* 方法產生的 Stage,很顯然這個 Stage 裏不包含任何操作;*StatelessOp * 和 * StatefulOp_ 分別表示無狀態和有狀態的 Stage,對應於無狀態和有狀態的中間操作。

Stream 流水線組織結構示意圖如下:

Stream_pipeline_example

圖中通過Collection.stream()方法得到 Head 也就是 stage0,緊接着調用一系列的中間操作,不斷產生新的 Stream。這些 Stream 對象以雙向鏈表的形式組織在一起,構成整個流水線,由於每個 Stage 都記錄了前一個 Stage 和本次的操作以及回調函數,依靠這種結構就能建立起對數據源的所有操作 。這就是 Stream 記錄操作的方式。

>> 操作如何疊加

以上只是解決了操作記錄的問題,要想讓流水線起到應有的作用我們需要一種將所有操作疊加到一起的方案。你可能會覺得這很簡單,只需要從流水線的 head 開始依次執行每一步的操作(包括回調函數)就行了。這聽起來似乎是可行的,但是你忽略了前面的 Stage 並不知道後面 Stage 到底執行了哪種操作,以及回調函數是哪種形式。換句話說,只有當前 Stage 本身才知道該如何執行自己包含的動作。這就需要有某種協議來協調相鄰 Stage 之間的調用關係。

這種協議由 Sink 接口完成,Sink 接口包含的方法如下表所示:

3txWXH

有了上面的協議,相鄰 Stage 之間調用就很方便了,每個 Stage 都會將自己的操作封裝到一個 Sink 裏,前一個 Stage 只需調用後一個 Stage 的accept()方法即可,並不需要知道其內部是如何處理的。當然對於有狀態的操作,Sink 的begin()end()方法也是必須實現的。比如 Stream.sorted() 是一個有狀態的中間操作,其對應的 Sink.begin() 方法可能創建一個盛放結果的容器,而 accept() 方法負責將元素添加到該容器,最後 end() 負責對容器進行排序。對於短路操作,Sink.cancellationRequested()也是必須實現的,比如 Stream.findFirst() 是短路操作,只要找到一個元素,cancellationRequested() 就應該返回 true,以便調用者儘快結束查找。Sink 的四個接口方法常常相互協作,共同完成計算任務。實際上 Stream API 內部實現的的本質,就是如何重寫 Sink 的這四個接口方法

有了 Sink 對操作的包裝,Stage 之間的調用問題就解決了,執行時只需要從流水線的 head 開始對數據源依次調用每個 Stage 對應的 Sink.{begin(), accept(), cancellationRequested(), end()} 方法就可以了。一種可能的 Sink.accept() 方法流程是這樣的:

void accept(U u){
    1. 使用當前Sink包裝的回調函數處理u
    2. 將處理結果傳遞給流水線下游的Sink
}

Sink 接口的其他幾個方法也是按照這種 [處理 -> 轉發]的模型實現。下面我們結合具體例子看看 Stream 的中間操作是如何將自身的操作包裝成 Sink 以及 Sink 是如何將處理結果轉發給下一個 Sink 的。先看 Stream.map()方法:

// Stream.map(),調用該方法將產生一個新的Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    ...
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override /*opWripSink()方法返回由回調函數包裝而成Sink*/
        Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {
            return new Sink.ChainedReference<P_OUT, R>(downstream) {
                @Override
                public void accept(P_OUT u) {
                    R r = mapper.apply(u);// 1. 使用當前Sink包裝的回調函數mapper處理u
                    downstream.accept(r);// 2. 將處理結果傳遞給流水線下游的Sink
                }
            };
        }
    };
}

上述代碼看似複雜,其實邏輯很簡單,就是將回調函數 mapper 包裝到一個 Sink 當中。由於 Stream.map() 是一個無狀態的中間操作,所以 map() 方法返回了一個 StatelessOp 內部類對象(一個新的 Stream),調用這個新 Stream 的 opWripSink() 方法將得到一個包裝了當前回調函數的 Sink。

再來看一個複雜一點的例子。Stream.sorted() 方法將對 Stream 中的元素進行排序,顯然這是一個有狀態的中間操作,因爲讀取所有元素之前是沒法得到最終順序的。拋開模板代碼直接進入問題本質,sorted() 方法是如何將操作封裝成 Sink 的呢?sorted() 一種可能封裝的 Sink 代碼如下:

// Stream.sort()方法用到的Sink實現
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;// 存放用於排序的元素
    RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
        super(downstream, comparator);
    }
    @Override
    public void begin(long size) {
        ...
        // 創建一個存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }
    @Override
    public void end() {
        list.sort(comparator);// 只有元素全部接收之後才能開始排序
        downstream.begin(list.size());
        if (!cancellationWasRequested) {// 下游Sink不包含短路操作
            list.forEach(downstream::accept);// 2. 將處理結果傳遞給流水線下游的Sink
        }
        else {// 下游Sink包含短路操作
            for (T t : list) {// 每次都調用cancellationRequested()詢問是否可以結束處理。
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);// 2. 將處理結果傳遞給流水線下游的Sink
            }
        }
        downstream.end();
        list = null;
    }
    @Override
    public void accept(T t) {
        list.add(t);// 1. 使用當前Sink包裝動作處理t,只是簡單的將元素添加到中間列表當中
    }
}

上述代碼完美的展現了 Sink 的四個接口方法是如何協同工作的:

  1. 首先 begin() 方法告訴 Sink 參與排序的元素個數,方便確定中間結果容器的的大小;

  2. 之後通過 accept() 方法將元素添加到中間結果當中,最終執行時調用者會不斷調用該方法,直到遍歷所有元素;

  3. 最後 end() 方法告訴 Sink 所有元素遍歷完畢,啓動排序步驟,排序完成後將結果傳遞給下游的 Sink;

  4. 如果下游的 Sink 是短路操作,將結果傳遞給下游時不斷詢問下游 cancellationRequested() 是否可以結束處理。

>> 疊加之後的操作如何執行

Stream_pipeline_Sink

Sink 完美封裝了 Stream 每一步操作,並給出了 [處理 -> 轉發]的模式來疊加操作。這一連串的齒輪已經咬合,就差最後一步撥動齒輪啓動執行。是什麼啓動這一連串的操作呢?也許你已經想到了啓動的原始動力就是結束操作(Terminal Operation),一旦調用某個結束操作,就會觸發整個流水線的執行。

結束操作之後不能再有別的操作,所以結束操作不會創建新的流水線階段 (Stage),直觀的說就是流水線的鏈表不會在往後延伸了。結束操作會創建一個包裝了自己操作的 Sink,這也是流水線中最後一個 Sink,這個 Sink 只需要處理數據而不需要將結果傳遞給下游的 Sink(因爲沒有下游)。對於 Sink 的[處理 -> 轉發]模型,結束操作的 Sink 就是調用鏈的出口。

我們再來考察一下上游的 Sink 是如何找到下游 Sink 的。一種可選的方案是在 PipelineHelper 中設置一個 Sink 字段,在流水線中找到下游 Stage 並訪問 Sink 字段即可。但 Stream 類庫的設計者沒有這麼做,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到 Sink,該方法的作用是返回一個新的包含了當前 Stage 代表的操作以及能夠將結果傳遞給 downstream 的 Sink 對象。爲什麼要產生一個新對象而不是返回一個 Sink 字段?這是因爲使用 opWrapSink() 可以將當前操作與下游 Sink(上文中的 downstream 參數)結合成新 Sink。試想只要從流水線的最後一個 Stage 開始,不斷調用上一個 Stage 的 opWrapSink() 方法直到最開始(不包括 stage0,因爲 stage0 代表數據源,不包含操作),就可以得到一個代表了流水線上所有操作的 Sink,用代碼表示就是這樣:

// AbstractPipeline.wrapSink()
// 從下游向上遊不斷包裝Sink。如果最初傳入的sink代表結束操作,
// 函數返回時就可以得到一個代表了流水線上所有操作的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    ...
    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

現在流水線上從開始到結束的所有的操作都被包裝到了一個 Sink 裏,執行這個 Sink 就相當於執行整個流水線,執行 Sink 的代碼如下:

// AbstractPipeline.copyInto(), 對spliterator代表的數據執行wrappedSink代表的操作。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷
        spliterator.forEachRemaining(wrappedSink);// 迭代
        wrappedSink.end();// 通知遍歷結束
    }
    ...
}

上述代碼首先調用 wrappedSink.begin() 方法告訴 Sink 數據即將到來,然後調用 spliterator.forEachRemaining() 方法對數據進行迭代(Spliterator 是容器的一種迭代器,[參閱](https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/3-Lambda and Collections.md#spliterator)),最後調用 wrappedSink.end() 方法通知 Sink 數據處理結束。邏輯如此清晰。

>> 執行後的結果在哪裏

最後一個問題是流水線上所有操作都執行後,用戶所需要的結果(如果有)在哪裏?首先要說明的是不是所有的 Stream 結束操作都需要返回結果,有些操作只是爲了使用其副作用 (Side-effects),比如使用Stream.forEach()方法將結果打印出來就是常見的使用副作用的場景(事實上,除了打印之外其他場景都應避免使用副作用),對於真正需要返回結果的結束操作結果存在哪裏呢?

特別說明:副作用不應該被濫用,也許你會覺得在 Stream.forEach()裏進行元素收集是個不錯的選擇,就像下面代碼中那樣,但遺憾的是這樣使用的正確性和效率都無法保證,因爲 Stream 可能會並行執行。大多數使用副作用的地方都可以使用 [歸約操作](https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/5-Streams API(II).md) 更安全和有效的完成。

// 錯誤的收集方式
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
      .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
// 正確的收集方式
List<String>results =
     stream.filter(s -> pattern.matcher(s).matches())
             .collect(Collectors.toList());  // No side-effects!

回到流水線執行結果的問題上來,需要返回結果的流水線結果存在哪裏呢?這要分不同的情況討論,下表給出了各種有返回結果的 Stream 結束操作。

s55GKX

  1. 對於表中返回 boolean 或者 Optional 的操作(Optional 是存放 一個 值的容器)的操作,由於值返回一個值,只需要在對應的 Sink 中記錄這個值,等到執行結束時返回就可以了。

  2. 對於歸約操作,最終結果放在用戶調用時指定的容器中(容器類型通過 [收集器](https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/5-Streams API(II).md# 收集器) 指定)。collect(), reduce(), max(), min()都是歸約操作,雖然 max()和 min()也是返回一個 Optional,但事實上底層是通過調用 [reduce()](https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/5-Streams API(II).md# 多面手 reduce) 方法實現的。

  3. 對於返回是數組的情況,毫無疑問的結果會放在數組當中。這麼說當然是對的,但在最終返回數組之前,結果其實是存儲在一種叫做 Node 的數據結構中的。Node 是一種多叉樹結構,元素存儲在樹的葉子當中,並且一個葉子節點可以存放多個元素。這樣做是爲了並行執行方便。關於 Node 的具體結構,我們會在下一節探究 Stream 如何並行執行時給出詳細說明。

結語

本文詳細介紹了 Stream 流水線的組織方式和執行過程,學習本文將有助於理解原理並寫出正確的 Stream 代碼,同時打消你對 Stream API 效率方面的顧慮。如你所見,Stream API 實現如此巧妙,即使我們使用外部迭代手動編寫等價代碼,也未必更加高效。

注:留下本文所用的 JDK 版本,以便有考究癖的人考證:

$ java -version
java version "1.8.0_101"
Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
Java HotSpot(TM) Server VM (build 25.101-b13, mixed mode)
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/MoDFtgP54CyMeAejuaKP2g