Flutter 異步編程指南

一、Dart 中的事件循環模型

在 App 開發中,經常會遇到處理異步任務的場景,如網絡請求、讀寫文件等。Android、iOS 使用的是多線程,而在 Flutter 中爲單線程事件循環,如下圖所示

Dart 中有兩個任務隊列,分別爲 microtask 隊列和 event 隊列,隊列中的任務按照先進先出的順序執行,而 microtask 隊列的執行優先級高於 event 隊列。在 main 方法執行完畢後,會啓動事件循環,首先將 microtask 隊列中的任務逐個執行完畢,再去執行 event 隊列中的任務,每一個 event 隊列中的任務在執行完成後,會再去優先執行 microtask 隊列中的任務,如此反覆,直到清空所有隊列,這個過程就是 Dart 事件循環的處理機制。這種機制可以讓我們更簡單的處理異步任務,不用擔心鎖的問題。我們可以很容易的預測任務執行的順序,但無法準確的預測到事件循環何時會處理到你期望執行的任務。例如創建了一個延時任務,但排在前面的任務結束前是不會處理這個延時任務的,也就說這個任務的等待時間可能會大於指定的延遲時間。

Dart 中的方法一旦開始執行就不會被打斷,而 event 隊列中的事件還來自於用戶輸入、IO、定時器、繪製等,這意味着在兩個隊列中都不適合執行計算量過大的任務,才能保證流暢的 UI 繪製和用戶事件的快速響應。而且當一個任務的代碼發生異常時,只會打斷當前任務,後續任務不受影響,程序更不會退出。從上圖還可以看出,將一個任務加入 microtask 隊列,可以提高任務優先級,但是一般不建議這麼做,除非比較緊急的任務並且計算量不大,因爲 UI 繪製和處理用戶事件是在 event 事件隊列中的,濫用 microtask 隊列可能會影響用戶體驗。

總結下 Dart 事件循環的主要概念:

  1. Dart 中有兩個隊列來執行任務:microtask 隊列和 event 隊列。

  2. 事件循環在 main 方法執行完畢後啓動, microtask 隊列中的任務會被優先處理。

  3. microtask 隊列只處理來自 Dart 內部的任務,event 隊列中有來自 Dart 內部的 Future、Timer、isolate message,還有來自系統的用戶輸入、IO、UI 繪製等外部事件任務。

  4. Dart 中的方法執行不會被打斷,因此兩個隊列中都不適合用來執行計算量大的任務。

  5. 一個任務中未被處理的異常只會打斷當前任務,後續任務不受影響,程序更不會退出。

1.1 向 microtask 隊列中添加任務

可以使用頂層方法 scheduleMicrotask 或者 Future.microtask 方法,如下所示:

scheduleMicrotask(() => print('microtask1'));
Future.microtask(() => print('microtask2'));

使用 Future.microtask 的優勢在於可以在 then 回調中處理任務返回的結果。

1.2 向 event 隊列中添加任務

Future(() => print('event task'));

基於以上理論,通過如下代碼可以驗證 Dart 的事件循環機制:

void main() {
  print('main start');
  Future(() => print('event task1'));
  Future.microtask(() => print('microtask1'));
  Future(() => print('event task1'));
  Future.microtask(() => print('microtask2'));
  print('main stop');

執行結果:

main start
main stop
microtask1
microtask2
event task1
event task1

通過輸出結果可以看到,任務的執行順序並不是按照編寫代碼的順序來的,將任務添加到隊列不會立刻執行,而執行順序也完全符合前面講的規則,當前 main 方法中的代碼執行完畢後,纔會去執行隊列中的任務,且 microTask 隊列的優先級高於 event 隊列。

二、Dart 中的異步實現

在 Dart 中通過 Future 來執行異步任務, Future 是對異步任務狀態的封裝,對任務結果的代理,通過 then 方法可以註冊處理任務結果的回調方法。

創建方法 Future 方式:
Future()
Future.delayed()
Future.microtask()
Future.sync()

2.1 Future()

factory Future(FutureOr<T> computation()) {
  _Future<T> result = new _Future<T>();
  Timer.run(() {
    try {
      result._complete(computation());
    } catch (e, s) {
      _completeWithErrorCallback(result, e, s);
    }
  });
  return result;
}

上面是 Future() 的源碼,可以看到內部是通過啓動一個沒有延遲的計時器來添加任務的,實用 try catch 來捕獲任務代碼中可能出現的異常,我們可以在 catchError 回調中來處理異常。

2.2 Future.delayed()

factory Future.delayed(Duration duration, [FutureOr<T> computation()?]) {
  if (computation == null && !typeAcceptsNull<T>()) {
    throw ArgumentError.value(null, "computation", "The type parameter is not nullable");
  }
  _Future<T> result = new _Future<T>();
  new Timer(duration, () {
    if (computation == null) {
      result._complete(null as T);
    } else {
      try {
        result._complete(computation());
      } catch (e, s) {
        _completeWithErrorCallback(result, e, s);
      }
    }
  });
  return result;
}

Future.delayed() 與 Future() 的區別是通過一個延遲的計時器來添加任務。

2.3 Future.microtask()

factory Future.microtask(FutureOr<T> computation()) {
  _Future<T> result = new _Future<T>();
  scheduleMicrotask(() {
    try {
      result._complete(computation());
    } catch (e, s) {
      _completeWithErrorCallback(result, e, s);
    }
  });
  return result;
}

Future.microtask() 是將任務添加到 microtask 隊列,通過這種可以很方便通過 then 方法中的回調來處理任務的結果。

2.4 Future.sync()

factory Future.sync(FutureOr<T> computation()) {
  try {
    var result = computation();
    if (result is Future<T>) {
      return result;
    } else {
      // TODO(40014): Remove cast when type promotion works.
      return new _Future<T>.value(result as dynamic);
    }
  } catch (error, stackTrace) {
    var future = new _Future<T>();
    AsyncError? replacement = Zone.current.errorCallback(error, stackTrace);
    if (replacement != null) {
      future._asyncCompleteError(replacement.error, replacement.stackTrace);
    } else {
      future._asyncCompleteError(error, stackTrace);
    }
    return future;
  }
}

Future.sync() 中的任務會被立即執行,不會添加到任何隊列。

在第一個章節中講到了可以很容易的預測任務的執行順序,下面我們通過一個例子來驗證:

void main() {
  print('main start');
  Future.microtask(() => print('microtask1'));
  Future.delayed(new Duration(seconds:1), () => print('delayed event'));
  Future(() => print('event1'));
  Future(() => print('event2'));
  Future.microtask(() => print('microtask2'));
  print('main stop');
}

執行結果:

main start
main stop
microtask1
microtask2
event1
event2
delayed event

因爲代碼比較簡單,通過代碼可以很容易地預測到執行結果,下面將複雜度稍微提高。

void main() {
  print('main start');
  Future.microtask(() => print('microtask1'));
  Future.delayed(new Duration(seconds:1), () => print('delayed event'));
  Future(() => print('event1'))
    .then((_) => print('event1 - callback1'))
    .then((_) => print('event1 - callback2'));
  Future(() => print('event2')).then((_) {
    print('event2 - callback1');
    return Future(() => print('event4')).then((_) => print('event4 - callback'));
  }).then((_) {
    print('event2 - callback2');
    Future(() => print('event5')).then((_) => print('event5 - callback'));
  }).then((_) {
    print('event2 - callback3');
    Future.microtask(() => print('microtask3'));
  }).then((_) {
    print('event2 - callback4');
  });
  Future(() => print('event3'));
  Future.sync(() => print('sync task'));
  Future.microtask(() => print('microtask2')).then((_) => print('microtask2 - callbak'));
  print('main stop');
}

執行結果:

main start
sync task
main stop
microtask1
microtask2
microtask2 - callbak
event1
event1 - callback1
event1 - callback2
event2
event2 - callback1
event3
event4
event4 - callback
event2 - callback2
event2 - callback3
event2 - callback4
microtask3
event5
event5 - callback
delayed event

看到結果後你可能會疑惑,爲什麼 event1、event1 - callback1、event1 - callback2 會連續輸出,而 event2 - callback1 輸出後爲什麼是 event3,event5、event5 - callback 爲什麼會在 microtask3 後輸出?

這裏我們補充下 then 方法的一些關鍵知識,理解了這些,上面的輸出結果也就很好理解了:

  1. then 方法中的回調並不是按照它們註冊的順序來執行。

  2. Future 中的任務執行完畢後會立刻執行 then 方法中的回調,並且回調不會被添加到任何隊列中。

  3. 如果 Future 中的任務在 then 方法調用之前已經執行完畢了,那麼會有一個任務被加入到 microtask 隊列中。這個任務執行的就是被傳入 then 方法中的回調。

2.5 catchError、whenComplete

Future(() {
  throw 'error';
}).then((_) {
  print('success');
}).catchError((error) {
  print(error);
}).whenComplete(() {
  print('completed');
});

輸出結果:

error
completed

通過 catchError 方法註冊的回調,可以用來處理任務代碼產生的異常。不管 Future 中的任務執行成功與否,whenComplete 方法都會被調用。

2.6 async、await

使用 async、await 能以更簡潔的編寫異步代碼,是 Dart 提供的一個語法糖。使用 async 關鍵字修飾的方法返回值類型爲 Future,在 async 方法內可以使用 await 關鍵字來修飾異步任務,在方法內部達到同步執行的效果,可以達到簡化代碼和提高可讀性的效果,不過如果想要處理異常,需要實用 try catch 語句來包裹 await 修飾的異步任務。

void main() async {
  print(await getData());
}
Future<int> getData() async {
  final a = await Future.delayed(Duration(seconds: 1), () => 1);
  final b = await Future.delayed(Duration(seconds: 1), () => 1);
  return a + b;
}

三、Isolate 介紹

前面講到耗時任務不適合放到 microtask 隊列或 event 隊列中執行,會導致 UI 卡頓。那麼在 Flutter 中有沒有既可以執行耗時任務又不影響 UI 繪製呢,其實是有的,前面提到 microtask 隊列和 event 隊列是在 main isolate 中運行的,而 isolate 是在線程中運行的,那我們開啓一個新的 isolate 就可以了,相當於開啓一個新的線程,使用多線程的方式來執行任務,Flutter 也爲我們提供了相應的 Api。

3.1 compute

void main() async {
  compute<String, String>(
    getData,
    'Alex',
  ).then((result) {
    print(result);
  });
}
String getData(String name) {
  // 模擬耗時3秒
  sleep(Duration(seconds: 3));
  return 'Hello $name';
}

compute 第一個參數是要執行的任務,第二個參數是要向任務發送的消息,需要注意的是第一個參數只支持頂層參數。使用 compute() 可以方便的執行耗時任務,但是濫用的話也會適得其反,因爲每次調用,相當於新建一個 isolate。上面的代碼執行一個經歷了 isolate 的創建以及銷燬過程,還有數據的傳遞會經歷兩次拷貝,因爲 isolate 之間是完全隔離的,不能共享內存,整個過程除去任務本身的執行時間,也會非常的耗時,isolate 的創建也比較消耗內存,創建過多的 isolate 還有 OOM 的風險。這時我們就需要一個更優的解決方案,減少頻繁創建銷燬 isolate 所帶來的消耗,最好是能創建一個類似於線程池的東西,只要提前初始化好,後面就可以隨時使用,不用擔心會發生前面所講的問題,這時候 LoadBalancer 就派上用場了

3.2 LoadBalancer

// 用來創建 LoadBalancer
Future<LoadBalancer> loadBalancerCreator = LoadBalancer.create(2, IsolateRunner.spawn);
// 全局可用的 loadBalancer
late LoadBalancer loadBalancer;
void main() async {
  // 初始化 LoadBalancer
  loadBalancer = await loadBalancerCreator;
  // 使用 LoadBalancer 執行任務
  final result = await loadBalancer.run<String, String>(getData, 'Alex');
  print(result);
}
String getData(String name) {
  // 模擬耗時3秒
  sleep(Duration(seconds: 3));
  return 'Hello $name';
}

使用 LoadBalancer.create() 方法可以創建出一個 isolate 線程池,能夠指定 isolate 的數量,並自動實現了負載均衡。應用啓動後在合適的時機將其初始化好,後續就有一個全局可用的 LoadBalancer 了。

四、實用經驗

4.1 指定任務的執行順序

在開發中經常會有需要連續執行異步任務的場景,例如下面的例子,後面的一步任務直接需要依賴前面任務的結果,所有任務正常執行完畢纔算成功。

void main() async {
  print(await getData());
}
Future<int> getData() {
  final completer = Completer<int>();
  int value = 0;
  Future(() {
    return 1;
  }).then((result1) {
    value += result1;
    return Future(() {
      return 2;
    }).then((result2) {
      value += result2;
      return Future(() {
        return 3;
      }).then((result3) {
        value += result3;
        completer.complete(value);
      });
    });
  });
  return completer.future;
}

這種方式出現了回調地獄,代碼非常難以閱讀,實際開發中還會有處理異常的代碼,會顯得更加臃腫,編寫難度也大,顯然這種方式是不建議使用的。

4.2 使用 then 的鏈式調用

void main() async {
  print(await getData());
}
Future<int> getData() {
  int value = 0;
  return Future(() => 1).then((result1) {
    value += result1;
    return Future(() => 2);
  }).then((result2) {
    value += result2;
    return Future(() => 3);
  }).then((result3) {
    value += result3;
    return value;
  });
}

回調地獄的問題解決了,代碼可讀性提高很多。

4.3 使用 async、await

void main() async {
  print(await getData());
}
Future<int> getData() async {
  int value = 0;
  value += await Future(() => 1);
  value += await Future(() => 2);
  value += await Future(() => 3);
  return value;
}

效果顯而易見,代碼更加清晰了。

4.4 取消任務

在前面講到了 Dart 方法執行時是不能被中斷的,這就意味着一個 Future 任務開始後必然會走到完成的狀態,但是很多時候我們需要又取消一個異步任務,唯一的辦法就是在任務結束後不執行回調代碼,就可以實現類似取消的效果。

4.5 CancelableOperation

在 Flutter 的 async 包中,提供了一個 CancelableOperation 給我們使用,使用它可以很簡單地實現取消任務的需求。

void main() async {
  // 創建一個可以取消的任務
  final cancelableOperation = CancelableOperation.fromFuture(
    Future(() async {
      print('start');
      await Future.delayed(Duration(seconds: 3)); // 模擬耗時3秒
      print('end');
    }),
    onCancel: () => print('cancel...'),
  );
  // 註冊任務結束後的回調
  cancelableOperation.value.then((val) {
    print('finished');
  });
  // 模擬1秒後取消任務
  Future.delayed(Duration(seconds: 1)).then((_) => cancelableOperation.cancel());
}

CancelableOperation 是對 Future 的代理, 對 Future 的 then 進行了接管,判斷 isCanceled 標記決定是否需要執行用戶提供的回調。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9RfFXenMn8rO0EeYvVbg7g