Golang: 詳解併發編程基礎之原子操作 -atomic 包-

前言

最近想寫一個併發編程系列的文章,使用Go也有一段時間了,但是對併發的理解不是很透徹,藉着這次總結,希望能更進一步。我們以 "原子操作" 開篇,對於併發操作而言,原子操作是個非常現實的問題,比較典型的應用的就是i++操作,併發情況下,同時對內存中的i進行讀取,就會產生與預期不符的結果,所以Go語言中的sync/atomic就是解決這個問題的,接下來我們一起來看一看Go的原子操作。

什麼是原子性、原子操作

原子 (atomic) 本意是 "不能被進一步分割的最小粒子",而原子操作 (atomic operation) 意爲 "不可中斷的一個或一系列操作"。其實用大白話說出來就是讓多個線程對同一塊內存的操作是串行的,不會因爲併發操作把內存寫的不符合預期。我們來看這樣一個例子:假設現在是一個銀行賬戶系統,用戶 A 想要自己從自己的賬戶中轉 1 萬元到用戶 B 的賬戶上,直到轉帳成功完成一個事務,主要做這兩件事:

假設在操作一的時候,系統發生了故障,導致給 B 賬戶添加款項失敗了,那麼就要進行回滾。回滾就是回到事務之前的狀態,我們把這種要麼一起成功的操作叫做原子操作,而原子性就是要麼完整的被執行、要麼完全不執行。

如何保證原子性

在處理器層面,可以採用總線加鎖或者對緩存加鎖的方式來實現多處理器之間的原子操作。通過加鎖保證從系統內存中讀取或寫入一個字節是原子的,也就是當一個處理器讀取一個字節時,其他處理器不能訪問這個字節的內存地址。

總線鎖:處理器提供一個Lock#信號,當一個處理器上在總線上輸出此信號時,其他處理器的請求將被阻塞住,那麼該處理器可以獨佔共享內存。總線鎖會把CPU和內存之間的通信鎖住了,在鎖定期間,其他處理就不能操作其他內存地址的數據,所以總線鎖定的開銷比較大,所以處理會在某些場合使用緩存鎖進行優化。緩存鎖:內存區域如果被緩存在處理器上的緩存行中,並且在Lock#操作期間,那麼當它執行操作回寫到內存時,處理不在總線上聲言Lock#信號,而是修改內部的內存地址,並允許它的緩存一致機制來保證操作的原子性,因爲緩存一致性機制會阻止同時修改由兩個以上處理器緩存的內存區域的數據,其他處理器回寫已被鎖定的緩存行的數據時,就會使緩存無效。

鎖機制雖然可以保證原子性,但是鎖機制會存在以下問題:

上面我們說的都是悲觀鎖,要解決這種低效的問題,我們可以採用樂觀鎖,每次不加鎖,而是假設沒有衝突去完成某項操作,如果因爲衝突失敗就重試,直到成功爲止。也就是我們接下來要說的 CAS(compare and swap).

CAS 的全稱爲Compare And Swap,直譯就是比較交換。是一條 CPU 的原子指令,其作用是讓CPU先進行比較兩個值是否相等,然後原子地更新某個位置的值,其實現方式是給予硬件平臺的彙編指令,在intelCPU中,使用的cmpxchg指令,就是說CAS是靠硬件實現的,從而在硬件層面提升效率。簡述過程是這樣:

假設包含 3 個參數內存位置 (V)、預期原值(A) 和新值(B)。V表示要更新變量的值,E表示預期值,N表示新值。僅當V值等於E值時,纔會將V的值設爲N,如果V值和E值不同,則說明已經有其他線程在做更新,則當前線程什麼都不做,最後CAS返回當前V的真實值。CAS 操作時抱着樂觀的態度進行的,它總是認爲自己可以成功完成操作。基於這樣的原理,CAS 操作即使沒有鎖,也可以發現其他線程對於當前線程的干擾。

僞代碼可以這樣寫:

func CompareAndSwap(int *addr,int oldValue,int newValue) bool{
    if *addr == nil{
        return false
    }
    if *addr == oldValue {
        *addr = newValue
        return true
    }
    return false
}

不過上面的代碼可能會發生一個問題,也就是ABA問題,因爲 CAS 需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是 A,變成了 B,又變成了 A,那麼使用 CAS 進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA 問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼 A-B-A 就會變成 1A-2B-3A。

go 語言中如何進行原子操作

Go語言標準庫中,sync/atomic包將底層硬件提供的原子操作封裝成了Go的函數,主要分爲 5 個系列的函數,分別是:

old = *addr
*addr = new
return old
if *addr == old{
    *addr = new
    return ture
}
return false
*addr += delta
return *addr

Go語言在1.4版本時添加一個新的類型Value,此類型的值就相當於一個容器,可以被用來 "原子地" 存儲 (store) 和加載 (Load) 任意類型的值。這些使用起來都還比較簡單,就不寫例子了,接下來我們一起看一看這些方法是如何實現的。

源碼解析

由於系列比較多。底層實現的方法也大同小異,這裏就主要分析一下Value的實現方法吧。爲什麼不分析其他系列的呢?因爲原子操作由底層硬件支持,所以看其他系列實現都要看彙編,Go 的彙編是基於Plan9的,這個彙編語言真的資料甚少,我也是真的不懂,水平不夠,也不自討苦吃了,等後面真的能看懂這些彙編了,再來分析吧。這個網站有一些關於plan9彙編的知識,有興趣可以看一看:http://doc.cat-v.org/plan_9/4th_edition/papers/asm。

Value結構

我們先來看一下Value的結構:

type Value struct {
 v interface{}
}

Value結構裏就只有一個字段,是 interface 類型,雖然這裏是interface類型,但是這裏要注意,第一次Store寫入的類型就確定了之後寫入的類型,否則會發生panic。因爲這裏是interface類型,所以爲了之後寫入與讀取操作方便,又在這個包裏定義了一個ifaceWords結構,其實他就是一個空interface,他的作用就是將interface分解成類型和數值。結構如下:

// ifaceWords is interface{} internal representation.
type ifaceWords struct {
 typ  unsafe.Pointer
 data unsafe.Pointer
}

Value的寫入操作

我們一起來看一看他是如何實現寫入操作的:

// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(x interface{}) {
 if x == nil {
  panic("sync/atomic: store of nil value into Value")
 }
 vp := (*ifaceWords)(unsafe.Pointer(v))
 xp := (*ifaceWords)(unsafe.Pointer(&x))
 for {
  typ := LoadPointer(&vp.typ)
  if typ == nil {
   // Attempt to start first store.
   // Disable preemption so that other goroutines can use
   // active spin wait to wait for completion; and so that
   // GC does not see the fake type accidentally.
   runtime_procPin()
   if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
    runtime_procUnpin()
    continue
   }
   // Complete first store.
   StorePointer(&vp.data, xp.data)
   StorePointer(&vp.typ, xp.typ)
   runtime_procUnpin()
   return
  }
  if uintptr(typ) == ^uintptr(0) {
   // First store in progress. Wait.
   // Since we disable preemption around the first store,
   // we can wait with active spinning.
   continue
  }
  // First store completed. Check type and overwrite data.
  if typ != xp.typ {
   panic("sync/atomic: store of inconsistently typed value into Value")
  }
  StorePointer(&vp.data, xp.data)
  return
 }
}

// Disable/enable preemption, implemented in runtime.
func runtime_procPin()
func runtime_procUnpin()

這段代碼中的註釋集已經告訴了我們,調用Store方法寫入的類型必須與原類型相同,不一致便會發生 panic。接下來分析代碼實現:

  1. 首先判斷條件寫入參數不能爲nil,否則觸發panic

  2. 通過使用unsafe.PointeroldValuenewValue轉換成ifaceWords類型。方便我們獲取他的原始類型 (typ) 和值(data).

  3. 爲了保證原子性,所以這裏使用一個for換來處理,當已經有Store正在進行寫入時,會進行等待.

  4. 如果還沒寫入過數據,那麼獲取不到原始類型,就會開始第一次寫入操作,這裏會把先調用runtime_procPin()方法禁止調度器對當前 goroutine 的搶佔(preemption),這樣也可以防止GC線程看到假類型。

  5. 調用CAS方法來判斷當前地址是否有被搶佔,這裏大家可能對unsafe.Pointer(^uintptr(0))這一句話有點不明白,因爲是第一個寫入數據,之前是沒有數據的,所以通過這樣一箇中間值來做判斷,如果失敗就會解除搶佔鎖,解除禁止調度器,繼續循環等待.

  6. 設置中間值成功後,我們接下來就可以安全的把v設爲傳入的新值了,這裏會先寫入值,在寫入類型 (typ),因爲我們會根據 ty 來做完成判斷。

  7. 第一次寫入沒完成,我們還會通過uintptr(typ) == ^uintptr(0)來進行判斷,因爲還是第一次放入的中間類型,他依然會繼續等待第一次完成。

  8. 如果第一次寫入完成,會檢查上一次寫入的類型與這次寫入的類型是否一致,不一致則會拋出panic.

這裏代碼量沒有多少,相信大家一定看懂了吧~。

Value的讀操作

先看一下代碼:

// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (x interface{}) {
 vp := (*ifaceWords)(unsafe.Pointer(v))
 typ := LoadPointer(&vp.typ)
 if typ == nil || uintptr(typ) == ^uintptr(0) {
  // First store not yet completed.
  return nil
 }
 data := LoadPointer(&vp.data)
 xp := (*ifaceWords)(unsafe.Pointer(&x))
 xp.typ = typ
 xp.data = data
 return
}

讀取操作的代碼就很簡單了:1. 第一步使用unsafe.PointeroldValue轉換成ifaceWords類型,然後獲取他的類型,如果沒有類型或者類型出去中間值,那麼說明現在還沒數據或者第一次寫入還沒有完成。2. 通過檢查後,調用LoadPointer方法可以獲取他的值,然後構造一個新interfacetypdata返回。

小彩蛋

前面我們在說 CAS 時,說到了ABA問題,所以我就寫了demo試一試Go標準庫atomic.CompareAndSwapXXX方法是否有解決這個問題,看運行結果是沒有,所以這裏大家使用的時候要注意一下 (雖然我也沒想到什麼現在什麼業務場景會出現這個問題,但是還是要注意一下,需要自己評估)。

func main()  {
 var share uint64 = 1
 wg := sync.WaitGroup{}
 wg.Add(3)
 // 協程1,期望值是1,欲更新的值是2
 go func() {
  defer wg.Done()
  swapped := atomic.CompareAndSwapUint64(&share,1,2)
  fmt.Println("goroutine 1",swapped)
 }()
 // 協程2,期望值是1,欲更新的值是2
 go func() {
  defer wg.Done()
  time.Sleep(5 * time.Millisecond)
  swapped := atomic.CompareAndSwapUint64(&share,1,2)
  fmt.Println("goroutine 2",swapped)
 }()
 // 協程3,期望值是2,欲更新的值是1
 go func() {
  defer wg.Done()
  time.Sleep(1 * time.Millisecond)
  swapped := atomic.CompareAndSwapUint64(&share,2,1)
  fmt.Println("goroutine 3",swapped)
 }()
 wg.Wait()
 fmt.Println("main exit")
}

總結

原子操作是併發編程的一個基礎,也是爲我學習sync.once打基礎,好啦,現在你們應該知道下篇文章的內容是什麼啦,敬請期待~。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/-KE6jNKkVxNrm-e3laibzw