sync-atomic 設計與實現

概述

atomic 提供了原子同步操作原語,整個過程無需加鎖,也不會產生 goroutine 上下文切換。

API

atomic API

Swap 操作由 SwapT 系列函數 (例如 SwapInt32, SwapInt64) 實現,等價於如下的原子操作:

old = *addr
*addr = new
return old

CAS 操作由 CompareAndSwapT 系列函數 (例如 CompareAndSwapInt32, CompareAndSwapInt64) 實現,等價於如下的原子操作:

if *addr == old {
    *addr = new
    return true
}
return false

Add 操作由 AddT 系列函數 (例如 AddInt32, AddInt64) 實現,等價於如下的原子操作:

*addr += delta
return *addr

Load And Store 操作由 LoadT and StoreT 系列函數 (例如 LoadInt32, LoadInt64) 實現,等價於如下的原子操作:

*addr = val

內部實現

我們來探究一下 sync/atomic 包的內部實現,文件目錄路徑爲 $GOROOT/src/sync/atomic,筆者的 Go 版本爲 go1.19 linux/amd64。需要注意的是,該文件內只給出了函數的定義,函數的實現爲在對應的 asm.s 彙編文件中。

函數聲明

Swap, CAS 等操作的函數原型全部定義在 doc.go 文件中。

// 在 386 上, 64 位函數使用的指令在 Pentium MMX 之前不可用
// 在非 Linux ARM 上,64 位函數使用的指令在 ARMv6k core 之前不可使用
// 在 ARM 上,386 和 32 位 MIPS,調用方負責以原子的方式訪問 64 位對齊的 64 位字
// 全局變量、局部變量、已經分配的結構體、數組、切片中的第一個字可以依賴於 64 位對齊

func SwapInt32(addr *int32, new int32) (old int32)

func SwapInt64(addr *int64, new int64) (old int64)

...


func StoreUintptr(addr *uintptr, val uintptr)

func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

函數實現

Swap, CAS 等操作的函數實現全部在 asm.s 彙編文件中,但是該文件中的函數並不是直接實現,而是 “套了一層殼”, 最終的實現在平臺體系結構對應的彙編文件中,目錄爲$GOROOT/src/runtime/internal/atomic。例如: 筆者的 Go 版本爲 go1.19 linux/amd64, 那麼就會跳轉到 runtime/internal/atomic/atomic_amd64.s 文件對應的函數。

不同平臺體系結構的實現

這裏以 ·SwapInt32 彙編函數爲例,來看下函數的跳轉和實現。

# asm.s 文件

TEXT ·SwapInt32(SB),NOSPLIT,$0
 JMP runtime∕internal∕atomic·Xchg(SB)

函數的實現中使用了 JMP 跳轉指令,最終跳轉到了文件 $GOROOT/src/runtime/internal/atomic/atomic_amd64.s 中的 ·Xchg 函數。

// atomic_amd64.s 文件

TEXT ·Xchg(SB), NOSPLIT, $0-20
 MOVQ ptr+0(FP), BX   // 參數 1,8 字節 *int32 指針
 MOVL new+8(FP), AX   // 參數 2,4 字節 int32
 XCHGL AX, 0(BX)       // 交換指令
 MOVL AX, ret+16(FP)  // 交換後的 AX(old value) 寫入 FP 僞寄存器返回值位
 RET

// 上面的彙編代碼等價於如下 Go 代碼
// uint32 Xchg(ptr *uint32, new uint32)
// Atomically:
// old := *ptr
// *ptr = new
// return old

atomic.Value

接下來看一下原子數據類型 atomic.Value 的內部實現。

Value 對象

Value 數據類型提供了一致性原子性的 Swap, CAS 等方法,其中

type Value struct {
    v any
}

ifaceWords 對象

ifaceWords 對象是 Value 對象對應的 數據類型 + 值 的內部抽象表示,雖然 Value 對象相關方法的參數類型是 any,但是內部操作的都是 ifaceWords 對象 (通過類型轉換機制)。

type ifaceWords struct {
 typ  unsafe.Pointer // 類型
 data unsafe.Pointer // 值
}

Value.Load 方法

Load 返回最新設置的值,如果未對該值調用過 Store 方法 (Value 處於零值狀態),返回 nil。

func (v *Value) Load() (val any) {
 // 類型轉化
 vp := (*ifaceWords)(unsafe.Pointer(v))
 // 原子獲取值類型
 typ := LoadPointer(&vp.typ)
    // 從未調用過 Store 方法
 if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
  return nil
 }
 
 // 原子獲取值
 data := LoadPointer(&vp.data)
 // 返回值類型轉換
 vlp := (*ifaceWords)(unsafe.Pointer(&val))
 // 返回值類型
 vlp.typ = typ
 // 返回值數據值
 vlp.data = data
 return
}

Value.Store 方法

Store 方法設置 Value 對象的值,對於某個特定的 Value 對象,每次調用 Store 方法時都必須使用相同的數據類型值 (例如第一次存儲的是 int 類型, 那麼之後調用時必須傳遞 int 類型), 類型不一致時會產生 panic, 傳遞 nil 參數 ( Store(nil) ) 同樣會產生 panic

func (v *Value) Store(val any) {
 if val == nil {
  // 參數爲 nil, 直接 panic
  panic("sync/atomic: store of nil value into Value")
 }
 
 // 參數類型轉換
 vp := (*ifaceWords)(unsafe.Pointer(v))
 // 返回值類型轉換
 vlp := (*ifaceWords)(unsafe.Pointer(&val))
 
 for {
  // 原子獲取參數值數據類型
  typ := LoadPointer(&vp.typ)
  
  if typ == nil {
   // 如果類型爲 nil, 嘗試開始第一次 Store
   // 禁止搶佔(避免操作未完成時,被其他 goroutine 搶佔),其他 goroutine 可以使用自旋鎖來等待完成
   // 同時可以避免 GC 的時候看到 unsafe.Pointer(^uintptr(0)) 這個中間狀態的值
   
   runtime_procPin() // 禁止搶佔 (具體的實現這裏先忽略)
   
   if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
    // 如果設置值的類型時失敗
    // 說明當前 goroutine 在禁止搶佔執行結束前,已經有其他 goroutine 設置完了值類型
    runtime_procUnpin() // 恢復搶佔
    continue
   }
   
   // 完成第一次 Store 操作
   StorePointer(&vp.data, vlp.data)
   StorePointer(&vp.typ, vlp.typ)
   
   runtime_procUnpin() // 恢復搶佔 (具體的實現這裏先忽略)
   return
  }
  
  if typ == unsafe.Pointer(&firstStoreInProgress) {
   // 第一次 Store 操作正在執行中,等待...
   // [待優化項] 因爲在第一次 Store 禁用了搶佔,所以可以使用自旋鎖等待完成
   continue
  }
  
  // 已經完成了第一次 Store 操作, 檢查類型並覆蓋數據(檢查是否和第一次設置的數據類型一致)
  if typ != vlp.typ {
   // 當前數據類型和第一次設置的數據類型不一致,產生 panic
   panic("sync/atomic: store of inconsistently typed value into Value")
  }
  
  StorePointer(&vp.data, vlp.data)
  return
 }
}

Value.Swap 方法

Swap 方法將新值存儲到 Value 對象,並返回 Value 對象的舊值,如果 Value 爲空, 則返回 nil。對於某個特定的 Value 對象,每次調用 Swap 方法時都必須使用相同的數據類型值 (例如第一次存儲的是 int 類型, 那麼之後調用時必須傳遞 int 類型), 類型不一致時會產生 panic, 傳遞 nil 參數 ( Swap(nil) ) 同樣會產生 panic

func (v *Value) Swap(new any) (old any) {
 if new == nil {
  // 參數爲 nil, 直接 panic
  panic("sync/atomic: swap of nil value into Value")
 }
 
 // 當前值類型轉換
 vp := (*ifaceWords)(unsafe.Pointer(v))
 // 參數類型轉換
 np := (*ifaceWords)(unsafe.Pointer(&new))
 
 for {
  // 原子獲取當前值數據類型
  typ := LoadPointer(&vp.typ)
  
  if typ == nil {
     // Value 對象還沒有被設置
     // 嘗試開始第一次 Store 操作
   
     // 流程和 Store 方法內部類型,這裏直接省略掉
     ...

   return nil
  }
  
  ...
  
  // 已經完成了第一次 Store 操作, 檢查類型並覆蓋數據(檢查是否和第一次設置的數據類型一致)
  if typ != np.typ {
   // 當前數據類型和第一次設置的數據類型不一致,產生 panic
   panic("sync/atomic: swap of inconsistently typed value into Value")
  }
  
  // 返回值類型轉換
  op := (*ifaceWords)(unsafe.Pointer(&old))
  op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)
  return old
 }
}

Value.CompareAndSwap 方法

CompareAndSwap 執行 CAS 操作,對於某個特定的 Value 對象,每次調用 CompareAndSwap 方法時都必須使用相同的數據類型值 (例如第一次存儲的是 int 類型, 那麼之後調用時必須傳遞 int 類型), 類型不一致時會產生 panic, 傳遞 nil 參數 ( CompareAndSwap(old, nil) ) 同樣會產生 panic

func (v *Value) CompareAndSwap(old, new any) (swapped bool) {
 if new == nil {
  // 參數爲 nil, 直接 panic
  panic("sync/atomic: compare and swap of nil value into Value")
 }
 
 // 當前值類型轉換
 vp := (*ifaceWords)(unsafe.Pointer(v))
 // 參數新值類型轉換
 np := (*ifaceWords)(unsafe.Pointer(&new))
 // 參數舊值類型轉換
 op := (*ifaceWords)(unsafe.Pointer(&old))
 
 if op.typ != nil && np.typ != op.typ {
  // 新值和舊值數據類型不一致
  panic("sync/atomic: compare and swap of inconsistently typed values")
 }
 
 for {
  // 原子獲取當前值數據類型
  typ := LoadPointer(&vp.typ)
  if typ == nil {
   if old != nil {
    // 當前值和參數值類型不一樣
    return false
   }

    // 流程和 Store 方法內部類型,這裏直接省略掉
   ...
   
   return true
  }
  
  ...
  
  if typ != np.typ {
   // 參數新值數據類型和當前值的數據類型不一致,產生 panic
   panic("sync/atomic: compare and swap of inconsistently typed value into Value")
  }
  
  // CompareAndSwapPointer 函數只能確保 vp.data 從獲取到之後沒有發生變化
  data := LoadPointer(&vp.data)
  
  // 拷貝當前值的變量,然後和參數舊值進行比較 
  var i any
  // 當前值類型
  (*ifaceWords)(unsafe.Pointer(&i)).typ = typ
  // 當前值數據
  (*ifaceWords)(unsafe.Pointer(&i)).data = data
  if i != old {
   // 當前值已經發生變化
   return false
  }
  
  // 調用 CAS 操作 (內部再調用對應的彙編)
  return CompareAndSwapPointer(&vp.data, data, np.data)
 }
}

小結

atomic 實現了同步算法的底層內存原子操作原語,其內部實現主要是彙編 (各個平臺對應各自不同的指令,通過編譯器鏈接),使用這些函數時需要更加謹慎, 官方給出的建議是除了特殊的、底層的應用程序外,其他情況最好使用 channel 或其他同步原語來完成 (但是從大多數開源組件實現代碼來看,並沒有遵守官方的建議)。

標準庫還提供了 atomic.Value 原子數據類型,並且爲該類型實現了常見的原子操作,如 Load, Store, CAS 等, 在實現類似 對象需要原子操作 這樣的功能時可以直接複用該類型,例如標準庫 context 包的內部實現中裏面就用到了 atomic.Value

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/lk8FgA7nFzkmN8F2fRQyvQ