最簡單的服務響應時長優化方法,沒有之一

序言 - From 萬俊峯 Kevin

我們能把服務做到平均延遲基本在 30ms 左右,其中非常大的一個前提是我們大量使用了 MapReduce 技術,讓我們的服務即使調用很多個服務,很多時候也只取決於最慢的那一個請求的時長。

對你現有的服務,不需要優化 DB 操作,不需要優化緩存,不需要重寫業務邏輯,只需要通過 MapReduce 把正交(不相關)的請求並行化,你就可以大幅降低服務響應時長。

本文歐陽安就給大家仔細分析一下 MapReduce 的實現細節。

爲什麼需要 MapReduce

在實際的業務場景中我們常常需要從不同的 rpc 服務中獲取相應屬性來組裝成複雜對象。

比如要查詢商品詳情:

  1. 商品服務 - 查詢商品屬性

  2. 庫存服務 - 查詢庫存屬性

  3. 價格服務 - 查詢價格屬性

  4. 營銷服務 - 查詢營銷屬性

如果是串行調用的話響應時間會隨着 rpc 調用次數呈線性增長,所以我們要優化性能一般會將串行改並行。

簡單的場景下使用 waitGroup 也能夠滿足需求,但是如果我們需要對 rpc 調用返回的數據進行校驗、數據加工轉換、數據彙總呢?繼續使用 waitGroup 就有點力不從心了,go 的官方庫中並沒有這種工具(java 中提供了 CompleteFuture),go-zero 作者依據 mapReduce 架構思想實現了進程內的數據批處理 mapReduce 併發工具類。

設計思路

我們嘗試把自己代入到作者的角色梳理一下併發工具可能的業務場景:

  1. 查詢商品詳情:支持併發調用多個服務來組合產品屬性,支持調用錯誤可以立即結束。

  2. 商品詳情頁自動推薦用戶卡券:支持併發校驗卡券,校驗失敗自動剔除,返回全部卡券。

以上實際都是在進行對輸入數據進行處理最後輸出清洗後的數據,針對數據處理有個非常經典的異步模式:生產者消費者模式。於是我們可以抽象一下數據批處理的生命週期,大致可以分爲三個階段:

  1. 數據生產 generate

  2. 數據加工 mapper

  3. 數據聚合 reducer

其中數據生產是不可或缺的階段,數據加工、數據聚合是可選階段,數據生產與加工支持併發調用,數據聚合基本屬於純內存操作單協程即可。

再來思考一下不同階段之間數據應該如何流轉,既然不同階段的數據處理都是由不同 goroutine 執行的,那麼很自然的可以考慮採用 channel 來實現 goroutine 之間的通信。

如何實現隨時終止流程呢?

很簡單,goroutine 中監聽一個全局的結束 channel 就行。

go-zero 代碼實現

core/mr/mapreduce.go

詳細源碼可查看 https://github.com/Ouyangan/go-zero-annotation/blob/24a5753f19a6a18fc05615cb019ad809aab54232/core/mr/mapreduce.go

前置知識 - channel  基本用法

因爲 MapReduce 源碼中大量使用 channel 進行通信,大概提一下 channel 基本用法:

  1. channel 寫結束後記得關閉
ch := make(chan interface{})
// 寫入完畢需要主動關閉channel
defer func() {
    close(ch)
}()
go func() {
    // v,ok模式 讀取channel
    for {
        v, ok := <-ch
        if !ok {
            return
        }
        t.Log(v)
    }

    // for range模式讀取channel,channel關閉循環自動退出
    for i := range ch {
        t.Log(i)
    }

    // 清空channel,channel關閉循環自動退出
    for range ch {
    }
}()
for i := 0; i < 10; i++ {
    ch <- i
    time.Sleep(time.Second)
}
  1. 已關閉的 channel 依然支持讀取

  2. 限定 channel 讀寫權限

// 只讀channel
func readChan(rch <-chan interface{}) {
    for i := range rch {
        log.Println(i)
    }
}

// 只寫channel
func writeChan(wch chan<- interface{}) {
    wch <- 1
}

接口定義

先來看最核心的三個函數定義:

  1. 數據生產

  2. 數據加工

  3. 數據聚合

// 數據生產func
// source - 數據被生產後寫入source
GenerateFunc func(source chan<- interface{})

// 數據加工func
// item - 生產出來的數據
// writer - 調用writer.Write()可以將加工後的向後傳遞至reducer
// cancel - 終止流程func
MapperFunc func(item interface{}, writer Writer, cancel func(error))

// 數據聚合func
// pipe - 加工出來的數據
// writer - 調用writer.Write()可以將聚合後的數據返回給用戶
// cancel - 終止流程func
ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))

面向用戶的方法定義

使用方法可以查看官方文檔,這裏不做贅述

面向用戶的方法比較多,方法主要分爲兩大類:

  1. 無返回

  2. 執行過程發生錯誤立即終止

  3. 執行過程不關注錯誤

  4. 有返回值

  5. 手動寫入 source,手動讀取聚合數據 channel

  6. 手動寫入 source,自動讀取聚合數據 channel

  7. 外部傳入 source,自動讀取聚合數據 channel

// 併發執行func,發生任何錯誤將會立即終止流程
func Finish(fns ...func() error) error

// 併發執行func,即使發生錯誤也不會終止流程
func FinishVoid(fns ...func())

// 需要用戶手動將生產數據寫入 source,加工數據後返回一個channel供讀取
// opts - 可選參數,目前包含:數據加工階段協程數量
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option)

// 無返回值,不關注錯誤
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option)

// 無返回值,關注錯誤
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option)

// 需要用戶手動將生產數據寫入 source ,並返回聚合後的數據
// generate 生產
// mapper 加工
// reducer 聚合
// opts - 可選參數,目前包含:數據加工階段協程數量
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error)

// 支持傳入數據源channel,並返回聚合後的數據
// source - 數據源channel
// mapper - 讀取source內容並處理
// reducer - 數據處理完畢發送至reducer聚合
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
    opts ...Option) (interface{}, error)

核心方法是 MapReduceWithSource 和 Map,其他方法都在內部調用她倆。弄清楚了 MapReduceWithSource  方法 Map 也不在話下。

MapReduceWithSource 源碼實現

一切都在這張圖裏面了

// 支持傳入數據源channel,並返回聚合後的數據
// source - 數據源channel
// mapper - 讀取source內容並處理
// reducer - 數據處理完畢發送至reducer聚合
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
    opts ...Option) (interface{}, error) {
    // 可選參數設置
    options := buildOptions(opts...)
    // 聚合數據channel,需要手動調用write方法寫入到output中
    output := make(chan interface{})
    // output最後只會被讀取一次
    defer func() {
        // 如果有多次寫入的話則會造成阻塞從而導致協程泄漏
        // 這裏用 for range檢測是否可以讀出數據,讀出數據說明多次寫入了
        // 爲什麼這裏使用panic呢?顯示的提醒用戶用法錯了會比自動修復掉好一些
        for range output {
            panic("more than one element written in reducer")
        }
    }()
    // 創建有緩衝的chan,容量爲workers
    // 意味着最多允許 workers 個協程同時處理數據
    collector := make(chan interface{}, options.workers)
    // 數據聚合任務完成標誌
    done := syncx.NewDoneChan()
    // 支持阻塞寫入chan的writer
    writer := newGuardedWriter(output, done.Done())
    // 單例關閉
    var closeOnce sync.Once
    var retErr errorx.AtomicError
    // 數據聚合任務已結束,發送完成標誌
    finish := func() {
        // 只能關閉一次
        closeOnce.Do(func() {
            // 發送聚合任務完成信號,close函數將會向chan寫入一個零值
            done.Close()
            // 關閉數據聚合chan
            close(output)
        })
    }
    // 取消操作
    cancel := once(func(err error) {
        // 設置error
        if err != nil {
            retErr.Set(err)
        } else {
            retErr.Set(ErrCancelWithNil)
        }
        // 清空source channel
        drain(source)
        // 調用完成方法
        finish()
    })

    go func() {
        defer func() {
            // 清空聚合任務channel
            drain(collector)
            // 捕獲panic
            if r := recover(); r != nil {
                // 調用cancel方法,立即結束
                cancel(fmt.Errorf("%v", r))
            } else {
                // 正常結束
                finish()
            }
        }()
        // 執行數據加工
        // 注意writer.write將加工後數據寫入了output
        reducer(collector, writer, cancel)
    }()
    // 異步執行數據加工
    // source - 數據生產
    // collector - 數據收集
    // done - 結束標誌
    // workers - 併發數
    go executeMappers(func(item interface{}, w Writer) {
        mapper(item, w, cancel)
    }, source, collector, done.Done(), options.workers)
    // reducer將加工後的數據寫入了output,
    // 需要數據返回時讀取output即可
    // 假如output被寫入了超過兩次
    // 則開始的defer func那裏將還可以讀到數據
    // 由此可以檢測到用戶調用了多次write方法
    value, ok := <-output
    if err := retErr.Load(); err != nil {
        return nil, err
    } else if ok {
        return value, nil
    } else {
        return nil, ErrReduceNoOutput
    }
}
// 數據加工
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
    done <-chan lang.PlaceholderType, workers int) {
    // goroutine協調同步信號量
    var wg sync.WaitGroup
    defer func() {
        // 等待數據加工任務完成
        // 防止數據加工的協程還未處理完數據就直接退出了
        wg.Wait()
        // 關閉數據加工channel
        close(collector)
    }()
    // 帶緩衝區的channel,緩衝區大小爲workers
    // 控制數據加工的協程數量
    pool := make(chan lang.PlaceholderType, workers)
    // 數據加工writer
    writer := newGuardedWriter(collector, done)
    for {
        select {
        // 監聽到外部結束信號,直接結束
        case <-done:
            return
        // 控制數據加工協程數量
        // 緩衝區容量-1
        // 無容量時將會被阻塞,等待釋放容量
        case pool <- lang.Placeholder:
            // 阻塞等待生產數據channel
            item, ok := <-input
            // 如果ok爲false則說明input已被關閉或者清空
            // 數據加工完成,執行退出
            if !ok {
                // 緩衝區容量+1
                <-pool
                // 結束本次循環
                return
            }
            // wg同步信號量+1
            wg.Add(1)
            // better to safely run caller defined method
            // 異步執行數據加工,防止panic錯誤
            threading.GoSafe(func() {
                defer func() {
                    // wg同步信號量-1
                    wg.Done()
                    // 緩衝區容量+1
                    <-pool
                }()

                mapper(item, writer)
            })
        }
    }
}

總結

mapReduce 的源碼我大概看了兩個晚上,整體看下來比較累。一方面是我自身 go 語言並不是很熟練尤其是 channel 的用法,導致我需要頻繁停下來查詢相關文檔理解作者的寫法,另一方面是多個 goroutine 之間通過 channel 進行通信實現協作真的蠻燒腦(佩服作者的思維能力)。

其次看源碼時第一遍看起來肯定會比較懵的,其實沒關係找到程序的入口(公共基礎組件一般是面向的方法)先沿着主線一路看下去把每一句代碼都看懂加上註釋,再看支線代碼。

如果有實在看不懂的地方就查查這段代碼的提交記錄非常有可能是解決某個 bug 改動的,比如下面這段代碼我死活看了好多遍都不理解。

// 聚合數據channel,需要手動調用write方法寫入到output中
output := make(chan interface{})
// output最後只會被讀取一次
defer func() {
    // 如果有多次寫入的話則會造成阻塞從而導致協程泄漏
    // 這裏用 for range檢測是否可以讀出數據,讀出數據說明多次寫入了
    // 爲什麼這裏使用panic呢?顯示的提醒用戶用法錯了會比自動修復掉好一些
    for range output {
        panic("more than one element written in reducer")
    }
}()

最後畫出流程圖基本就能把源碼看懂了,對於我而言這方法比較笨但有效。

資料

Go Channel 詳解: https://colobu.com/2016/04/14/Golang-Channels/

go-zero MapReduce 文檔: https://go-zero.dev/cn/mapreduce.html

項目地址

https://github.com/zeromicro/go-zero

歡迎使用 go-zerostar 支持我們!

微信交流羣

關注『微服務實踐』公衆號並點擊 交流羣 獲取社區羣二維碼。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Ec1nuR5Q_QgaC3FqeX1gLg