深入理解併發和並行

1 併發與並行

爲什麼操作系統上可以同時運行多個程序而用戶感覺不出來?

因爲操作系統營造出了可以同時運行多個程序的假象,通過調度進程以及快速切換 CPU 上下文,每個進程執行一會就停下來,切換到下個被調度到的進程上,這種切換速度非常快,人無法感知到,從而產生了多個任務同時運行的錯覺。

併發(concurrent) 是指的在宏觀上多個程序或任務在同時運行,而在微觀上這些程序交替執行,可以提高系統的資源利用率和吞吐量。

通常一個 CPU 內核在一個時間片只能執行一個線程(某些 CPU 採用超線程技術,物理核心數和邏輯核心數形成一個 1:2 的關係,比如 4 核 CPU,邏輯處理器會有 8 個,可以同時跑 8 個線程),如果 N 個內核同時執行 N 個線程,就叫做並行(parallel) 。我們編寫的多線程代碼具備併發特性,而不一定會並行。因爲能否並行取決於操作系統的調度,程序員無法控制,但是調度算法會盡量讓不同線程使用不同的 CPU 核心,所以在實際使用中幾乎總是會並行。如果多個任務在一個內核中順序執行,就是串行(Serial) ,如下圖所示:

串行、併發、並行

併發、並行的執行時間

併發與並行的任務處理方式

併發是多個程序在一段時間內同時執行的現象,而並行是多個任務在同一時刻同時執行,也是多核 CPU 的重要特性。

這裏有一個疑問:併發一定並行嗎?

併發並不一定並行。併發是邏輯上的同時發生,而並行是物理上的同時發生。併發可以跑在一個處理器上通過時間片進行切換,而並行需要兩個或兩個以上的線程跑在不同的處理器上。如果同一個任務的多個線程始終運行在不變的 CPU 核心上,那就不是並行。

舉一個生活中的例子:

2 多核調度算法

在多核 CPU 系統中,調度算法的主要目標是有效地利用所有可用的 CPU 核心,以提高系統的整體性能和資源利用率。下面是一些常見的多核 CPU 調度算法:

  1. 搶佔式調度(Preemptive Scheduling) :這種調度算法允許操作系統隨時中斷當前正在執行的任務,並將處理器分配給其他任務。在多核系統中,搶佔式調度器可以將任務遷移到其他核心上,以充分利用系統資源。

  2. 公平調度(Fair Scheduling) :公平調度算法旨在公平地分配 CPU 時間給系統中的所有任務,以確保每個任務都有機會在一定的時間內執行。在多核系統中,公平調度器通常會嘗試平衡各個核心上的負載,以避免出現某些核心過載而其他核心處於空閒狀態的情況。

  3. 負載均衡調度(Load Balancing) :負載均衡調度算法用於在多核系統中平衡各個核心上的任務負載,以確保所有核心都能夠充分利用。這可以通過將任務從負載較重的核心遷移到負載較輕的核心來實現,或者通過動態地將新任務分配給負載較輕的核心來實現。

  4. 優先級調度(Priority Scheduling) :優先級調度算法允許爲每個任務分配一個優先級,並根據優先級來決定任務的執行順序。在多核系統中,可以根據任務的優先級將其分配給不同的核心,以確保高優先級任務優先得到執行。

  5. 混合調度(Hybrid Scheduling) :混合調度算法結合了多種調度策略的優點,以適應不同的應用場景和系統配置。例如,可以將公平調度算法和負載均衡調度算法結合起來,以在系統中實現公平且高效的任務調度。

這些調度算法可以根據系統的需求進行組合和調整,以實現對多核 CPU 系統資源的有效管理和利用。

搶佔式調度(Preemptive Scheduling)的使用最爲廣泛,它允許操作系統在任何時候中斷當前正在執行的任務,並將處理器分配給其他任務。這種調度策略使得操作系統能夠及時響應各種事件和請求,從而提高系統的響應性和實時性。

在搶佔式調度中,每個任務都被賦予一個優先級,操作系統會根據任務的優先級來決定哪個任務應該在當前時間片執行。如果某個高優先級任務準備就緒並且當前正在執行的任務的優先級低於它,操作系統會中斷當前任務的執行,並將處理器分配給高優先級任務,從而實現搶佔。搶佔式調度的主要優點包括:

  1. 實時性:搶佔式調度允許操作系統及時地響應外部事件和請求,從而滿足實時性要求。

  2. 靈活性:操作系統可以根據任務的優先級動態地調整任務的執行順序,以適應不同的系統負載和需求。

  3. 公平性:搶佔式調度可以確保高優先級任務得到及時執行,而不會被低優先級任務長時間佔用處理器。

  4. 多任務併發:通過在任務之間進行快速的切換,搶佔式調度可以實現多任務併發執行,從而提高系統的吞吐量和效率。

搶佔式調度也存在一些挑戰和限制:

  1. 上下文切換開銷:頻繁的任務切換會導致上下文切換的開銷增加,可能會影響系統的性能。

  2. 優先級反轉:如果低優先級任務持有某些資源而高優先級任務需要訪問這些資源,可能會導致優先級反轉問題,從而影響系統的實時性。

  3. 飢餓問題:如果某個任務的優先級始終較低,並且總是被更高優先級的任務搶佔,可能會導致該任務長時間無法執行,出現飢餓問題。

搶佔式調度在許多操作系統中得到了廣泛應用,包括 Windows、Linux、MacOS 等,它爲實時系統和響應式系統提供了一種高效的任務調度機制。

3 Java 並行編程

在編碼層面上看,採用 Java 語言創建多線程代碼,不需要程序員打上並行的標記,因爲爲了充分利用計算資源,操作系統一定會盡可能調度多線程到不同的 CPU 核心上。併發的任務通常有多線程競爭資源和頻繁的 CPU 上下文切換,這些都會降低執行效率。

在實際的業務場景裏,許多計算任務其實互不干擾,最後彙總結果就可以了,比如統計不同用戶的每日活動次數。它們不存在競爭資源,並行處理的效率非常高,Java 語言提供了多線程並行執行的 API。

3.1 Future

在 Java 併發編程中,Future 是一種用於表示異步計算結果的接口。它允許你提交一個任務並且在將來的某個時候獲取任務的結果。Future 的原理是通過一個佔位符來表示異步操作的結果,在任務完成之前,可以通過 Future 對象獲取佔位符,並且在需要的時候等待任務的完成並獲取結果。Future 接口定義了異步計算結果的標準,具體的異步計算由實現了 Future 接口的類來執行,比如 ExecutorService 的 submit 方法會返回一個 Future 對象,用於跟蹤任務的執行狀態和結果。

Future 提供了以下主要方法:

看看下面這個代碼示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class FutureParallelExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
 
        Callable<Integer> task1 = () -> {
            // 模擬耗時計算
            Thread.sleep(2000);
            return 10;
        };
 
        Callable<Integer> task2 = () -> {
            // 模擬耗時計算
            Thread.sleep(3000);
            return 20;
        };
 
        Future<Integer> future1 = executor.submit(task1);
        Future<Integer> future2 = executor.submit(task2);
 
        // 異步執行,繼續執行下面的代碼
        System.out.println("Asynchronous computation is executing.");
 
        // 獲取第一個任務的結果
        Integer result1 = future1.get(); // 這將會阻塞直到任務1完成
        System.out.println("Task 1 result: " + result1);
 
        // 獲取第二個任務的結果
        Integer result2 = future2.get(); // 這將會阻塞直到任務2完成
        System.out.println("Task 2 result: " + result2);
 
        // 關閉ExecutorService
        executor.shutdown();
    }
}

在這個例子中,啓動了兩個異步任務,並分別獲取了它們的 Future 對象。通過 Future.get() 方法,我們可以等待任務完成並獲取結果。ExecutorService 使用了一個固定的線程池,大小爲 2。這意味着兩個任務將會並行執行。

3.2 Fork / Join

Fork / Join 框架是 Java 7 中新增的併發編程工具,主要有兩個步驟,第一是 fork:將一個大任務分成很多個小任務;第二是 join:將第一個任務的結果 join 起來,生成最後的結果。如果第一步中並沒有任何返回值,join 將會等到所有的小任務都結束。

斐波那契數列由意大利數學家斐波那契首次提出,這個數列從第三項開始,每一項都等於前兩項之和,通常以遞歸方式定義,即 F(0)=1,F(1)=1,對於 n>=2 的任何正整數 n,F(n)=F(n-1)+F(n-2),數列的前幾個數字是 1,1,2,3,5,8,13,21,34。我們嘗試使用遞歸計算第 n 項的數值,代碼如下:

/**
* 遞歸實現斐波那契數列
**/
public class FibonacciRecursion {
    public static int fibonacciRecursive(int n) {
        if (n <= 1)
            return n;
        return fibonacciRecursive(n - 1) + fibonacciRecursive(n - 2);
    }
 
    public static void main(String[] args) {
        int n = 10;
        System.out.println("Fibonacci of " + n + " is " + fibonacciRecursive(n));
    }
}

以上代碼輸出結果是:55。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* fork/join實現斐波那契數列
**/
public class FibonacciFork extends RecursiveTask<Integer> {
    final int n;
 
    public FibonacciFork(int n) {
        this.n = n;
    }
 
    @Override
    protected Integer compute() {
        if (n <= 1)
            return n;
        FibonacciFork f1 = new FibonacciFork(n - 1);
        FibonacciFork f2 = new FibonacciFork(n - 2);
 
        f1.fork();
        return f2.compute() + f1.join();
    }
 
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        FibonacciFork fib = new FibonacciFork(10);
        Integer result = pool.invoke(fib);
        System.out.println(result);
    }
}

以上代碼中,定義了 RecursiveTask 的子類 FibonacciFork 類,用於計算斐波那契數列的第 n 項。在 main 方法中,創建了一個 ForkJoinPool 並提交了任務執行。這個任務會遞歸地分解成更小的子任務,並且使用 fork/join 模式來並行處理這些子任務,最後通過 join 方法獲取子任務的結果並累加,輸出結果是:55。

3.3 Stream API

Java 8 加入了新特性 Stream API(叫做流式計算或並行流),極大地提升了處理集合數據的靈活性與效率。Stream API 簡化了集合操作的代碼量,還通過 lambda 表達式增強了函數式編程風格,核心邏輯是將數據集合分成多個小塊,然後在多個處理器上並行處理,最後將結果合併成一個結果集。Java Stream API 的底層原理主要涉及兩個方面:流的管道化操作和惰性求值:

我們來看看簡單的代碼示例:

public class StreamDemo {

    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
              .reduce((a, b) -> a + b)
              .ifPresent(System.out::println); // 輸出結果:45
    }
}

上面代碼創建了一個包含 1 到 9 整數的並行流,然後通過 reduce 方法計算所有數字的和,並打印結果。在默認情況下,這些操作是在單線程中按順序逐個執行的。

public class StreamParallelDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
              .parallel()
              .reduce((a, b) -> a + b)
              .ifPresent(System.out::println);
    }
}

上面代碼調用了 parallel() 後,reduce() 方法內部邏輯發生了變化,它會根據當前線程池資源分配任務,並行地在不同的工作線程上執行累加操作,而不是串行執行的。

Java 並行流是基於 Fork/Join 框架實現的,它使用了多線程來處理流操作。在多核環境中,Fork/Join 框架會根據系統資源自動調整任務分配,儘可能多地利用空閒核心,更充分地發揮硬件潛力。比如,當 CPU 具有 8 個內核時,並行計算的耗時遠小於串行計算耗時的 8 倍,但是由於線程創建、銷燬以及上下文切換等開銷,實際性能提升並非線性。

並行計算並不總是適用於所有場景,特別是在數據集較小或者任務分解後產生的子任務粒度較小時,線程管理的開銷可能超過並行計算帶來的優勢。如果硬件只有單核或少核,則並行計算效果有限甚至可能會因線程切換而降低效率。綜合考慮以下因素:

3.4 CompletableFuture

CompletableFuture 是一個實現了 Future 接口的類,它提供了一種更加靈活和強大的方式來進行異步編程。CompletableFuture 可以用來表示一個異步計算的結果,並且提供了豐富的方法來處理異步操作的完成、組合多個異步操作、處理異常等。CompletableFuture 相比於傳統的 Future 接口,具有以下優勢:

  1. 更加靈活的方法鏈:CompletableFuture 提供了一系列的方法,可以鏈式地進行異步操作,比如 thenApply、thenAccept、thenCompose 等,使得代碼更加簡潔清晰。

  2. 組合多個異步操作:CompletableFuture 允許你組合多個異步操作,可以按照順序執行、並行執行,或者根據一定的條件來執行。

  3. 異常處理:CompletableFuture 提供了 exceptionally 和 handle 等方法來處理異步操作中的異常情況,使得異常處理變得更加靈活。

  4. 支持回調函數:你可以通過 thenApply、thenAccept 等方法設置回調函數,以便在異步操作完成時執行特定的操作。

  5. 可編程式地完成異步操作:CompletableFuture 提供了 complete、completeExceptionally 等方法,可以手動地完成異步操作,從而更加靈活地控制異步任務的執行過程。

我們看看簡單的代碼示例:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            // 異步任務,返回結果爲100
            return 100;
        });

        // 在任務完成後輸出結果
        future.thenAccept(result -> System.out.println("異步任務結果爲:" + result));
    }
}
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {

    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            // 異步任務1,返回結果爲100
            return 100;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            // 異步任務2,返回結果爲200
            return 200;
        });

        // 將兩個異步任務的結果相加
        CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);

        // 在組合任務完成後輸出結果
        combinedFuture.thenAccept(result -> System.out.println("兩個異步任務的結果之和爲:" + result));
    }
}
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            // 模擬一個可能發生異常的異步任務
            if (Math.random() < 0.5) {
                throw new RuntimeException("Oops! Something went wrong.");
            }
            return 100;
        });

        // 處理異常情況
        future.exceptionally(throwable -> {
            System.out.println("異步任務發生異常:" + throwable.getMessage());
            return null; // 返回默認值或者做其他處理
        });

        // 在任務完成後輸出結果
        future.thenAccept(result -> System.out.println("異步任務結果爲:" + result));
    }
}
ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("執行異步操作。。。");
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, executorService);
        System.out.println("結果:"+voidCompletableFuture.get());

這些示例展示了使用 CompletableFuture 進行異步編程的一些常見用法,包括簡單的異步任務、組合多個 CompletableFuture、處理異常情況等。總的來說,CompletableFuture 是 Java 併發編程中一個強大而靈活的工具,它使得異步編程變得更加簡單、清晰和可控。

4 總結

要更好地掌握 Java 併發編程技能,可以採取以下幾個步驟:

  1. 學習基礎知識: 對 Java 併發編程的基本概念和術語有清晰的理解,比如線程、鎖、同步、併發問題等。可以通過閱讀相關的書籍、教程或者在線課程來學習。

  2. 熟悉併發工具類: Java 提供了豐富的併發工具類,比如 Thread、Runnable、Executor、ThreadPoolExecutor、Semaphore、CountDownLatch 等。深入瞭解這些工具類的使用方法和特性,以及在不同場景下的應用。

  3. 掌握多線程編程: 多線程編程是 Java 併發編程的核心,要熟練掌握如何創建線程、管理線程生命週期、線程同步和通信等技術。瞭解線程的狀態、優先級、調度方式等概念,以及如何避免常見的多線程問題,比如死鎖、競態條件等。

  4. 深入理解併發模型: 瞭解併發模型,比如共享內存模型和消息傳遞模型,以及它們的優缺點。掌握在這些模型下如何設計和實現併發程序。

  5. 學習併發設計模式: 掌握常見的併發設計模式,比如生產者 - 消費者模式、讀寫鎖模式、工作竊取模式等。瞭解這些模式的原理和實現方式,以及在實際項目中的應用。

  6. 實踐項目經驗: 通過實際項目來鍛鍊併發編程技能,嘗試在項目中應用所學的知識解決實際的併發問題。可以選擇一些開源項目或者自己構建小型項目來練習。

作者:編碼專家
鏈接:
https://juejin.cn/post/7353950023175209012

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/2BG-YVqPML3ToZPQC82RNw