Go 併發編程 — 結構體多字段的原子操作

多字段更新?

併發編程中,原子更新多個字段是常見的需求。

舉個例子,有一個 struct Person 的結構體,裏面有兩個字段。我們先更新 Person.name,再更新 Person.age ,這是兩個步驟,但我們必須保證原子性。

有童鞋可能奇怪了,爲什麼要保證原子性?

我們以一個示例程序開端,公用內存簡化成一個全局變量,開 10 個併發協程去更新。你猜最後的結果是啥?

package main

import (
    "fmt"
    "sync"
    "time"
)

type Person struct {
    name string
    age  int
}

// 全局變量(簡單處理)
var p Person

func update(name string, age int) {
    // 更新第一個字段
    p.name = name
    // 加點隨機性
    time.Sleep(time.Millisecond*200)
    // 更新第二個字段
    p.age = age
}

func main() {
    wg := sync.WaitGroup{}
    wg.Add(10)
    // 10 個協程併發更新
    for i := 0; i < 10; i++ {
        name, age := fmt.Sprintf("nobody:%v", i), i
        go func() {
            defer wg.Done()
            update(name, age)
        }()
    }
    wg.Wait()
    // 結果是啥?你能猜到嗎?
    fmt.Printf("p.name=%s\np.age=%v\n", p.name, p.age)
}

打印結果是啥?你能猜到嗎?

可能是這樣的:

p.name=nobody:2
p.age=3

也可能是:

p.name=nobody:8
p.age=7

按照排列組合來算,一共有 10*10 種結果。

那我們想要什麼結果?我們想要 name 和 age 一定要是匹配的,不能牛頭不對馬嘴。換句話說,name 和 age 的更新一定要原子操作,不能出現未定義的狀態。

我們想要的是 ( nobody:i,i ),正確的結果只能在以下預定的 10 種結果出現:

( nobody:0, 0 )
( nobody:1, 1 )
( nobody:2, 2 )
( nobody:3, 3 )
    ...
( nobody:9, 9 )

這僅僅是一個簡單的示例,童鞋們思考下自己現實的需求,應該是非常常見的。

現在有兩個問題:

第一個問題:這個 demo 觀察下運行時間,用 time 來觀察,時間大概是 200 ms 左右,爲什麼?

root@ubuntu:~/code/gopher/src/atomic_test# time ./atomic_test 
p.name=nobody:8
p.age=7

real 0m0.203s
user 0m0.000s
sys 0m0.000s

如上就是 203 毫秒。劃重點:這個時間大家請先記住了,對我們分析下面的例子有幫助。

這個 200 毫秒是因爲奇伢在 update 函數中故意加入了一點點時延,這樣可以讓程序估計跑慢一點。

每個協程跑 update 的時候至少需要 200 毫秒,10 個協程併發跑,沒有任何互斥,時間重疊,所以整個程序的時間也是差不都 200 毫秒左右。

第二個問題:怎麼解決這個正確性的問題。

大概兩個辦法:

  1. 鎖互斥

  2. 原子操作

下面詳細分析下異同和優劣。

鎖實現

在併發的上下文,用鎖來互斥,這是最常見的思路。 鎖能形成一個臨界區,鎖內的一系列操作任何時刻都只會有一個人更新,如此就能確保更新不會混亂,從而保證多步操作的原子性。

首先配合變量,對應一把互斥鎖:

// 全局變量(簡單處理)
var p Person
// 互斥鎖,保護變量更新
var mu sync.Mutex

更新的邏輯在鎖內:

func update(name string, age int) {
    // 更新:加鎖,邏輯串行化
    mu.Lock()
    defer mu.Unlock()

    // 以下邏輯不變
}

大家按照上面的把程序改了之後,邏輯是不是就正確了。一定是 ( nobody:i,i )配套更新的。

但你注意到另一個可怕的問題嗎?

程序運行變的好慢!!!!

同樣用 time 命令統計下程序運行時間,竟然耗費 2 秒!!!,10 倍的時延增長,每次都是這樣。

root@ubuntu:~/code/gopher/src/atomic_test# time ./atomic_test 
p.name=nobody:8
p.age=8

real 0m2.017s
user 0m0.000s
sys 0m0.000s

不禁要問自己,爲啥?

還記得上面我提到過,一個 update 固定要 200 毫秒。

加鎖之後的 update 函數邏輯全部在鎖內,10 個協程併發跑 update 函數,但由於鎖的互斥性,搶鎖不到就阻塞等待,保證 update 內部邏輯的串行化。

第 1 個協程加上鎖了,後面 9 個都要等待,依次類推。最長的等待時間應該是 1.8 秒。

換句話說,程序串行執行了 10 次 update 函數,時間是累加的。程序 2 秒的運行時延就這樣來的。

加鎖不怕,搶鎖等待纔可怕。在大量併發的時候,由於鎖的互斥特性,這裏的性能可能堪憂。

還有就是搶鎖失敗的話,是要把調度權讓出去的,直到下一次被喚醒。這裏還增加了協程調度的開銷,一來一回可能性能就更慢了下來。

思考:用鎖之後正確性是保證了,某些場景性能可能堪憂。那咋吧?

在本次的例子,下一步的進化就是:原子化操作。

溫馨提示

怕童鞋誤會,聲明一下:鎖不是不能用,是要區分場景,不分場景的性能優化措施是沒有意義的哈。大部分的場景,用鎖沒啥問題。且鎖是可以細化的,比如讀鎖和寫鎖,更新加寫鎖,只讀操作加讀鎖。這樣確實能帶來較大的性能提升,特別是在寫少讀多的時候。

原子操作

其實我們再深究下,這裏本質上是想要保證更新 name 和 age 的原子性,要保證他們配套。其實可以先在局部環境設置好 Person 結構體,然後一把原子賦值給全局變量即可。Go 提供了 atomic.Value 這個類型。

怎麼改造?

首先把併發更新的目標設置爲 atomic.Value 類型:

// 全局變量(簡單處理)
var p atomic.Value

然後 update 函數改造成先局部構造,再原子賦值的方式:

func update(name string, age int) {
    lp := &Person{}
    // 更新第一個字段
    lp.name = name
    // 加點隨機性
    time.Sleep(time.Millisecond * 200)
    // 更新第二個字段
    lp.age = age
    // 原子設置到全局變量
    p.Store(lp)
}

最後 main 函數讀取全局變量打印的地方,需要使用原子 Load 方式:

    // 結果是啥?你能猜到嗎?
    _p := p.Load().(*Person)
    fmt.Printf("p.name=%s\np.age=%v\n", _p.name, _p.age)

這樣就解決併發更新的正確性問題啦。感興趣的童鞋可以運行下,結果都是正確的 ( nobody:i,i )。

下面再看一下程序的運行時間:

root@ubuntu:~/code/gopher/src/atomic_test# time ./atomic_test 
p.name=nobody:7
p.age=7

real 0m0.202s
user 0m0.000s
sys 0m0.000s

竟然是 200 毫秒作用,比鎖的實現時延少 10 倍,並且保證了正確性。

爲什麼會這樣?

因爲這 10 個協程還是併發的,沒有類似於鎖阻塞等待的操作,只有最後 p.Store(lp) 調用內纔有做狀態的同步,而這個時間微乎其微,所以 10 個協程的運行時間是重疊起來的,自然整個程序就只有 200 毫秒左右。

鎖和原子變量都能保證正確的邏輯。在我們這個簡要的場景裏,我相信你已經感受到性能的差距了。

當然了,還是那句話,具體用那個實現要看具體場景,不能一概而論。而且,鎖有自己無可替代的作用,它能保證多個步驟的原子性,而不僅僅是字段的賦值。

相信你已經非常好奇 atomic.Value 了,下面簡要的分析下原理,是否真的很神祕呢?

原理可能要大跌眼鏡。

趁現在我們還不懂內部原理,先思考個問題(不然待會一下子看懂了就沒意思了)?

Value.Store  和 Value.Load 是用來賦值和取值的。我的問題是,這兩個函數里面有沒有用戶數據拷貝?StoreLoad 是否是保證了多字段拷貝的原子性?

提前透露下:並非如此。

atomic.Value 原理

 1   atomic.Value 結構體

atomic.Value  定義於文件 src/sync/atomic/value.go  ,結構本身非常簡單,就是一個空接口:

type Value struct {
    v interface{}
}

在之前文章中,奇伢有分享過 Go 的空接口類型( interface {} )在 Go 內部實現是一個叫做 eface 的結構體( src/runtime/iface.go ):

type eface struct {
    _type *_type
    data  unsafe.Pointer
}

interface {} 是給程序猿用的,eface  是 Go 內部自己用的,位於不同層面的同一個東西,這個請先記住了,因爲 atomic.Value 就利用了這個特性,在 value.go 定義了一個 ifaceWords 的結構體。

劃重點:interface {}efaceifaceWords 這三個結構體內存佈局完全一致,只是用的地方不同而已,本質無差別。這給類型的強制轉化創造了前提。

 2   Value.Store 方法

看一下簡要的代碼,這是一個簡單的 for 循環:

func (v *Value) Store(x interface{}) {
    // 強制轉化類型,轉變成 ifaceWords (三種類型,相同的內存佈局,這是前提)
    vp := (*ifaceWords)(unsafe.Pointer(v))
    xp := (*ifaceWords)(unsafe.Pointer(&x))
    for {
        // 獲取數據類型
        typ := LoadPointer(&vp.typ)
        // 第一個判斷:atomic.Value 初始的時候是 nil 值,那麼就是走這裏進去的;
        if typ == nil {
            runtime_procPin()
            if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
                runtime_procUnpin()
                continue
            }
            // 初始賦值
            StorePointer(&vp.data, xp.data)
            StorePointer(&vp.typ, xp.typ)
            runtime_procUnpin()
            return
        }
        // 第二個判斷:這個也是初始的時候,這是一箇中間狀態;
        if uintptr(typ) == ^uintptr(0) {
            continue
        }
        // 第三個判斷:類型校驗,通過這裏就能看出來,Value 裏面的類型不能變,否則會 panic;
        if typ != xp.typ {
            panic("sync/atomic: store of inconsistently typed value into Value")
        }
        // 劃重點啦:只要過了初始化賦值階段,基本上就是直接跑到這行代碼啦
        StorePointer(&vp.data, xp.data)
        return
    }
}

有幾個點稍微解釋下:

  1. atomic.Value 使用 ^uintptr(0) 作爲第一次存取的標誌位,這個標識位是設置在 type 字段裏,這是一箇中間狀態;

  2. 通過 CompareAndSwapPointer 來確保 ^uintptr(0)  只能被一個執行體搶到,其他沒搶到的走 continue ,再循環一次;

  3. atomic.Value 第一次寫入數據時,將當前協程設置爲不可搶佔,當存儲完畢後,即可解除不可搶佔;

  4. 真正的賦值,無論是第一次,還是後續的 data 賦值,在 Store 內,只涉及到指針的原子操作,不涉及到數據拷貝

這裏有沒有大跌眼鏡?

Store 內部並不是保證多字段的原子拷貝!!!!Store  裏面處理的是個結構體指針。 只通過了 StorePointer 保證了指針的原子賦值操作。

我的天?是這樣的嗎?那何來的原子操作。

核心在於:****Value.Store()  的參數必須是個局部變量(或者說是一塊全新的內存)。

這裏就回答了上面的問題:Store,Load 是否有數據拷貝?

劃重點:沒有!沒動數據

原來你是這樣子的 atomic.Value

回憶一下我上面的 update 函數,真的是局部變量,全新的內存塊

func update(name string, age int) {
    // 注意哦,局部變量哦
    lp := &Person{}
    // 更新字段 。。。。
 
    // 設置的是全新的內存地址給全局的 atomic.Value 變量
    p.Store(lp)
}

又有個問題,你可能會想了,如果 p.Store( /* */ ) 傳入的不是指針,而是一個結構體呢?

事情會是這樣的:

  1. 編譯器識別到這種情況,編譯期間就會多生成一段代碼,用 runtime.convT2E  函數把結構體賦值轉化成 eface (注意,這裏會涉及到結構體數據的拷貝);

  2. 然後再調用 Value.Store 方法,所以就 Store 方法而言,行爲還是不變;

再思考一個問題:既然是指針的操作,爲什麼還要有個 for 循環,還要有個  CompareAndSwapPointer  ?

這是因爲 ifaceWords 是兩個字段的結構體,初始賦值的時候,要賦值類型和數據指針兩部分。

atomic.Value 是服務所有類型,此類需求的,通用封裝。

 3   Value.Load 方法

有寫就有讀嘛,看一下讀的簡要的實現:

func (v *Value) Load() (x interface{}) {
    vp := (*ifaceWords)(unsafe.Pointer(v))
    typ := LoadPointer(&vp.typ)
    // 初始賦值還未完成
    if typ == nil || uintptr(typ) == ^uintptr(0) {
        return nil
    }
    // 劃重點啦:只要過了初始化賦值階段,原子讀的時候基本上就直接跑到這行代碼啦;
    data := LoadPointer(&vp.data)
    xp := (*ifaceWords)(unsafe.Pointer(&x))
    // 賦值類型,和數據結構體的地址
    xp.typ = typ
    xp.data = data
    return
}

哇,太簡單了。處理做了一下初始賦值的判斷(返回 nil ),後續基本就只靠 LoadPointer 函數來個原子讀指針值而已。

總結

  1. interface {}efaceifaceWords  本質是一個東西,同一種內存的三種類型解釋,用在不同層面和場景。它們可以通過強制類型轉化進行切換;

  2. atomic.Value 使用 cas 操作只在初始賦值的時候,一旦賦值過,後續賦值的原子操作更簡單,依賴於 StorePointer ,指針值得原子賦值;

  3. atomic.ValueStoreLoad 方法都不涉及到數據拷貝,只涉及到指針操作;

  4. atomic.Value 的神奇的核心在於:每次 Store 的時候用的是全新的內存塊 !!!LoadStore 都是以完整結構體的地址進行操作,所以纔有原子操作的效果。

  5. atomic.Value 實現多字段原子賦值的原理千萬不要以爲是併發操作同一塊多字段內存,還能保證原子性

後記

說實話,原理讓我大跌眼鏡,當然也讓我們避免踩坑,就怕你以爲 atomic.Value 是萬能的, Store  進去了一個會併發操作的內存塊,那就尷了個尬了。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/fU7AihsT8KXSkZLUx105yg