Kotlin 協程用法淺析及在京東 APP 業務中實踐

前言

協程定義及設計的目的:協程是一種併發設計模式,是一套由 Kotlin 提供的線程框架。開發者使用協程框架可以通過結構化併發機制在同一作用域下,把運行的不同線程的代碼寫在同一個代碼塊裏並執行,簡化異步執行的代碼,使得我們的代碼顯得線性。

用法淺析

基礎概念

使用協程前我們需要先了解幾個概念:

協程的創建與啓動

launch
 coroutineScope.launch(Dispatchers.IO) { // 示例(1)
     // 運行在IO線程
 }
 coroutineScope.launch(Dispatchers.Main) { // 示例(2)
     // 運行在UI線程
 }

在上述代碼中,演示了一個協程的創建,我們以實例(1)爲例,它的含義是通過 coroutineScope 作用域的擴展函數 launch 創建了一個運行在 IO 線程的協程,大家可以看到代碼還是很清晰的,這時候就可以在協程中做一些耗時性的操作。同理實例(2)中創建了一個運行在 UI 線程的協程。

val job: Job = coroutineScope.launch(Dispatchers.IO, CoroutineStart.LAZY) { // 示例(1)
                    // 運行在IO
               }
               job.start()

在上述代碼中,我們將示例(1)進行了改造,調用 launch 函數時,新增了一個參數 CoroutineStart.LAZY,並將返回的 Job 對象賦值給變量 job。

默認情況下,協程的啓動模式爲 CoroutineStart.DEFAULT,即協程創建完成之後會立即執行,示例中設置啓動模式爲 CoroutineStart.LAZY,這時候 launch 函數創建了協程,並沒有啓動它,此時協程的啓動需要依靠 Job 的 start 等函數進行啓動。

Job 是一個具有生命週期的並且可以被取消的後臺工作或者說異步任務,Job 內提供了 isActive、isCompleted、isCancelled 屬性用以判斷協程的狀態,以及啓動協程 start()、取消協程 cancel() 等操作的 api。

async 併發
 coroutineScope.launch(Dispatchers.Main) {
     val async1 = async(Dispatchers.IO) { // 網絡請求1
         "模擬用戶信息數據獲取"
     }
     val async2 = async(Dispatchers.IO) { // 網絡請求2
         "模擬企業信息數據獲取"
     }
     handleData(async1.await(), async2.await()) // 模擬合併數據
 }

在上述代碼中通過 async 發起兩個協程獲取數據,並通過 await() 獲取到請求結果,因爲並行發起,所以速度也是挺快的。

通過 async 創建的協程返回值是一個 Deferred,Deferred 帶有延遲的意思,可以通俗理解成要等一等才能拿到結果,Deferred 也是一個 Job,它是 Job 的一個子類, 所以具有 Job 同樣的功能。

當然 async 默認的啓動模式和 launch 一樣,也是 CoroutineStart.DEFAULT 立即執行,當將啓動模式設置爲 CoroutineStart.LAZY 時可以通過 await() 啓動協程,也可以通過 Job 的 start() 函數啓動。

Kotlin 協程優勢

在這一章節中,會通過幾個示例對比,來體現 Kotlin 協程的優勢在哪裏,同時筆者建議閱讀此章節的時候不要太在意實現的細節,關注不同方式的實現風格就好。

    /** 獲取用戶信息 */
    private fun getUserInfo() { // 示例(1)
        apiService.getUserInfo().enqueue(object : Callback<UserInfoEntry> {
            override fun onResponse(c: Call<UserInfoEntry>, re: Response<UserInfoEntry>) {
                runOnUiThread {
                    tvName.text = response.body()?.userName
                }
            }

            override fun onFailure(call: Call<UserInfoEntry>, t: Throwable) {
            }
        })
    }

    /** 獲取用戶信息 協程*/
    private fun getUserInfoByCoroutine() { // 示例(2)
        coroutineScope.launch(Dispatchers.Main) {
            val userInfo = coroutineApiService.getUserInfo()
            tvName.text = userInfo.userName
        }
    }

這是一個獲取用戶信息的網絡請求示例,通過普通的 CallBack 方式及 Kotlin 協 程的方式分別實現。

現在我們對比一下兩種方式的實現,看看協程的實現有什麼優化的地方?首先在協程的實現中沒有了 CallBack 的回調,其次在刷新 UI 的時候並沒有切換到主線程的操作,最後代碼量也是比較簡潔的。

其實還好,第一種方式在我們在開發中,這種 CallBack 的回調,應該應用過無數次了,寫起來也是分分鐘的事情,並不會多麼困難。確實,這樣 Kotlin 協程的優勢也不是那麼明顯了。

接下來我們看一個複雜一些的場景,以上文講解 async 時提到過的合併用戶信息數據和企業信息數據爲例,我們看看更詳細的實現,在這裏複述一下場景:“存在兩個接口,一個用於獲取用戶個人信息、一個用於獲取企業信息,需要兩個接口數據都獲取到的時候纔可以進行 UI 的刷新”。

      
  /** 開始獲取數據 */
    private fun start() {
        getUserInfo()
        getCompanyInfo()
    }

  /** 獲取用戶信息 */
    private fun getUserInfo() {
        apiService.getUserInfo().enqueue(object : Callback<UserInfoEntry> {
            override fun onResponse(c: Call<UserInfoEntry>, r: Response<UserInfoEntry>) {
                // 判斷是不是已經拿到公司信息了
                // 刷新UI handle.post()
            }

            override fun onFailure(call: Call<UserInfoEntry>, t: Throwable) {
            }
        })
    }

    /** 獲取公司信息 */
    private fun getCompanyInfo() {
        apiService.getCompanyInfo().enqueue(object : Callback<UserInfoEntry> {
            override fun onResponse(c: Call<UserInfoEntry>, r: Response<UserInfoEntry>) {
                // 判斷是不是已經拿到用戶信息了
                // 刷新UI handle.post()
            }
          
            override fun onFailure(call: Call<UserInfoEntry>, t: Throwable) {
            }
        })
    }

在這種方式中,我們將兩個接口請求封裝了兩個 API,同時發起網絡請求,相對使用上不能說不方便,關鍵在於數據的處理上,用戶信息的數據拿到之後需要判斷企業信息是不是也獲取到了,同理企業信息的數據也是一樣,現在只有兩組數據的合併,如果涉及更多信息類型數據的獲取,相應的邏輯處理就變的越來越複雜了。

當然如果改成串行的邏輯也是很好處理的,比如先獲取用戶信息數據,獲取之後再進行企業信息數據的讀取,但是這種方式犧牲了時間,本來可以並行的請求,變成串行,請求時間加長。

    /** 獲取信息 kotlin協程 */
    private fun getKotlinInfo() {
        coroutineScope.launch(Dispatchers.Main) {
            val userInfo = async {
                apiService.getUserInfo()
            } // 獲取用戶信息
            val companyInfo = async {
                apiService.getCompanyInfo()
            } // 公司信息
            MergeEntry(userInfo.await(), companyInfo.await())
        }
    }

這是 Kotlin 協程的實現方式,使用 CoroutineScope 的 async 構建器實現,在需要更多請求時,它的邏輯處理很方便,多一個請求多一個 async 即可,並行的請求節省時間,而且消除了回調,並且不需要切換線程。

協程的使用

在瞭解了協程的創建、啓動及優勢之後,現在有一個問題我們什麼時候使用協程?當我們需要處理耗時數據的時候,這時候可以使用協程切換到子線程執行,當處理完數據需要刷新 UI 的時候可以使用協程切換到主線程,其實需要指定運行線程的時候就可以用協程處理。

 coroutineScope.launch(Dispatchers.IO) { // 運行在IO線程
     handleFileData() // 模擬讀文件耗時操作
     launch(Dispatchers.Main) { // 數據處理完成刷新UI
         tvName.text = ""
     }
 }

在上述代碼中,有一個耗時讀文件操作,所以這裏使用了協程,通過 launch 切換到 IO 線程處理耗時操作,處理完成之後通過 launch 函數切到 Main 線程刷新 UI, 好像沒毛病,我們繼續看下一段代碼。

    coroutineScope.launch(Dispatchers.IO) {// 運行在IO線程
        handleFileData() // 模擬讀文件
        launch(Dispatchers.Main) {
            // 數據處理完成刷新UI
            launch(Dispatchers.IO) {
                // 處理數據
                launch(Dispatchers.Main) {
                    // 數據處理完成刷新UI
                    launch(Dispatchers.IO) {
                        launch(Dispatchers.Main) {
                            launch(Dispatchers.IO) {
                                launch(Dispatchers.Main) {
                                }
                            }
                        }
                    }
                }
            }
        }
    }

這個示例演示的場景比較極端,很少在開發中會遇到 IO 與 Main 線程切換如此頻繁,在這裏只是爲了暴露問題。前面我們說過 Kolin 協程消除了回調,但在這個示例中卻表現的很回調,層層嵌套。

因爲單單使用 launch、async 協程構建器函數並不能很好的處理這種複雜的需要頻繁切換線程的場景,爲了解決示例中的問題,Kotlin 協程爲我們提供了一些另外的函數來配合使用, 比如 withContext 掛起函數。

withContext 掛起函數

withContext 是 Kotlin 協程提供的掛起函數, 它提供給的功能有:

    coroutineScope.launch(Dispatchers.Main) { // 在主線程開啓一個協程
        val data = withContext(Dispatchers.IO) { // 切到IO線程處理耗時操作
            handleFileData() // 在IO線程運行
        }
        tvName.text = data // withContext函數體執行完,自定切換到主線程刷新UI
    }

    coroutineScope.launch(Dispatchers.Main) {
        withContext(Dispatchers.IO) { // **操作(1)**
            // 切換IO線程
            // ... 在IO線程執行
        }
        // .. 在UI線程執行  **操作(2)**
        withContext(Dispatchers.IO) {
            // 切換IO線程
            // ... 在IO線程執行
        }
        // .. 在UI線程執行
        withContext(Dispatchers.IO) {
            // 切換IO線程
            // ... 在IO線程執行
        }
        // .. 在UI線程執行
        // ...等等...
    }

使用 withContext 改造之後,消除了嵌套,代碼變得清晰,所以,Kotlin 協程除了 launch 等擴展函數之外,還需要 withContext 等掛起函數,纔可體現它的優勢。

這裏有必要提一下,在沒有使用協程的時候,開啓一個線程,代碼就會出現兩個分支,比如上述代碼中的操作(1),切到了 IO 線程執行,這是一個分支,緊接着是執行操作(2),這是另一個分支,這兩個分支各走各的,“幾乎同步執行”;

但在協程中,操作(1)使用 withContext 掛起函數切換到 IO 線程去執行它的操作後,並不會執行操作(2),而是等待操作(1)的 withContext 執行完成之後,切換線程回到 Main 線程中時,操作(2)纔會執行,後續的 supend 章節會有講解。

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {}

在上面的示例中 getData() 是一個普通的函數,在其中調用的 withContext 掛起函數時,提示報錯信息:suspend function 'withContext' should be called only from a coroutine or another supend function,意思是說 withContext 是一個被 suspend 修飾的函數,它應該在協程或者另一個 spspend 函數中調用。源碼中 withContext 被 suspend 修飾。

suspend

suspend 是 Kotlin 協程的一個關鍵字,由 suspend 修飾的函數爲掛起函數,掛起函數只能在協程或者另一個掛起函數中調用。

掛起函數?掛起的是誰?

剛纔我們說被 suspend 修飾的函數是掛起函數,掛起從字面意思可以理解爲不執行了或者說是暫停了,這裏有一個疑問,掛起的是誰?是線程?函數?還是協程?

其實掛起的是協程,可以理解爲在協程中執行到 suspend 掛起函數的時候,就會暫停協程後續代碼的執行,我們分析一下下面代碼的執行流程。

    coroutineScope.launch(Dispatchers.Main) { // 在主線程開啓一個協程  (1)
        val data = withContext(Dispatchers.IO) { // 切到IO線程處理耗時操作 (2)
            handleFileData() // 在IO線程運行 (3)
        }
        tvName.text = data // withContext函數體執行完,自定切換到主線程刷新UI (4)
    }

那麼後續的代碼什麼時候執行?協程掛起了,對應的也有恢復的操作,這裏就涉及協程的恢復了,當 withContext 掛起函數執行完成之後,協程會重新切回原來的線程(如果掛起前的線程是一個子線程,有可能會因爲線程空閒而被回收,切回來的線程並不一定百分百是原來的線程)繼續執行剩餘的代碼,比如示例中刷新 UI 的操作。

總結一下 Kotlin 協程掛起的概念,什麼是掛起?可以理解爲兩個操作:

更通俗一些,當 Kotlin 協程執行到一個掛起函數時,會將線程切換到掛起函數指定的線程中執行,後續的代碼將被暫停執行,當掛起函數執行完成之後,會將線程重新切回原來的線程,恢復剩餘代碼的執行,這就是掛起。

另外說一下掛起的非阻塞式:

還是以上面的代碼爲例,操作(1)在 Main 線程中啓動了一個協程,協程執行到操作(2)時,切到 IO 線程中執行操作(3),此時操作(4)被暫停,不執行了,但 Main 線程被阻塞了嗎?並沒有,主線程該幹嘛就幹嘛去了,這就是掛起的非阻塞式,雖然被掛起了,但掛起的是自己,是協程,並沒有阻塞原來的線程。

京東 APP 業務實踐

本文以核心樓層數據處理進行講解,該業務需要將兜底數據和接口下發的動態數據進行組裝,最終整合成業務所需的數據源。

dependencies {implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.8'}

/** 協程作用域 */
private val scope = MainScope()

private fun assembleDataList(response: PlatformResponse?) = scope.launch(
            CoroutineExceptionHandler { _, exception ->
                /** 未捕獲的異常處理 */
            })
    {
        val localStaticData: Deferred<MutableList<BaseTemplateEntity>?> = async(start = CoroutineStart.LAZY) { getLocalStaticData() }
        val dynamicData: Deferred<MutableList<BaseTemplateEntity>?> = async(start = CoroutineStart.LAZY) { getDynamicData(response) }
        getAssembleDataListFunc(localStaticData.await(), dynamicData.await())
    }

我們通過作用域構建器擴展函數 launch 在當前的 MainScope 下創建新的協程並啓動,在 launch 函數的 lambda 表達式中,我們使用了 async 函數並聲明 start 參數設置爲 CoroutineStart.LAZY 惰性模式創建一個子協程(但該協程並不會立即執行),該函數會返回一個 Deferred 對象,Deferred 是帶有返回值的 Job 擴展(類似於 Java 中的 Futuer 對象),只有當我們主動調用 Deferred 的 await 或 start 函數時,該子協程纔會真正執行。

與 RxJava 實現方案對比

private void assembleDataList(PlatformResponse response) {
    Observable<List<BaseTemplateEntity>> localStaticData = getLocalStaticData();
    Observable<List<BaseTemplateEntity>> assembleData = getDynamicData(response);
    Func2<List<BaseTemplateEntity>, List<BaseTemplateEntity>, List<BaseTemplateEntity>> assembleData = getAssembleDataListFunc();
    Observable<List<BaseTemplateEntity>> observable = Observable.zip(localStaticData, assembleData, assembleData);
        subscribe(observable, callback);
    }

通過實現代碼可以看出,我們使用 zip 操作符,將 localStaticData 和 assembleData 這兩個觀察者發送的事件序列,在組合後生成一個新的事件序列併發送(此處我們不討論 localStaticData 和 assembleData 這兩個事件序列是串行還是並行執行)。

Observable firstObservable = Observable.create(new Observable.OnSubscribe<CacheBean>() {
            @Override
            public void call(Subscriber<? super CacheBean> subscriber) {
                if (subscriber != null && !subscriber.isUnsubscribed()) {
                    subscriber.onNext(handleCacheBean());
                    subscriber.onCompleted();
                    RxUtil.unSubscribeSafely(subscriber);
                }
            }
        });
Observable secondObservable = Observable.just(new CacheBean(null, "0"));
firstObservable.timeout(TIME_OUT, TimeUnit.SECONDS)
               .onErrorResumeNext(secondObservable)
               .subscribe();

協程的優點:用同步的方式寫異步執行的代碼,使得代碼邏輯更加簡潔和清晰;輕量級,佔用更少的系統資源;執行效率高;掛起函數較於實現 Runnable 或 Callable 接口更加方便可控;線程切換很簡單。協程的缺點:有一定學習成本,由於是基於 Kotlin 語言,需有一定語言基礎。

協程和 RxJava 我們應該如何選擇?

經過協程和 RxJava 的對比,我們也對各框架有所瞭解,但談到應該如何選擇這個話題,筆者以爲如果你已經對 RxJava 重度使用,其實沒必要刻意遷移到協程,RxJava 功能強大目前仍是很流行的異步編程框架,基於 RxJava 的拓展庫 RxKotlin 也可以滿足在 kotlin 語言環境下使用 RxJava 開發。如果你已經有一定 Kotlin 開發經驗,又喜歡嘗試新鮮事物,協程是個不錯的選擇,其非阻塞時的掛起可以讓開發人員用同步的風格編寫異步代碼,提高開發效率同時也降低了維護成本。協程的概念越來越普及,尤其已在 Flutter 跨平臺框架中廣泛使用,勢必會成爲趨勢。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/SqzGseEragyTB68VK_oBow