Go 高性能編程技法

作者:dablelv,騰訊 IEGggG 後臺開發工程師

代碼的穩健、可讀和高效是我們每一個 coder 的共同追求。本文將結合 Go 語言特性,爲書寫效率更高的代碼,從常用數據結構、內存管理和併發,三個方面給出相關建議。話不多說,讓我們一起學習 Go 高性能編程的技法吧。

常用數據結構

1. 反射雖好,切莫貪杯

標準庫 reflect 爲 Go 語言提供了運行時動態獲取對象的類型和值以及動態創建對象的能力。反射可以幫助抽象和簡化代碼,提高開發效率。

Go 語言標準庫以及很多開源軟件中都使用了 Go 語言的反射能力,例如用於序列化和反序列化的 json、ORM 框架 gorm、xorm 等。

1.1 優先使用 strconv 而不是 fmt

基本數據類型與字符串之間的轉換,優先使用 strconv 而不是 fmt,因爲前者性能更佳。

// Bad
for i := 0; i < b.N; i++ {
 s := fmt.Sprint(rand.Int())
}

BenchmarkFmtSprint-4    143 ns/op    2 allocs/op

// Good
for i := 0; i < b.N; i++ {
 s := strconv.Itoa(rand.Int())
}

BenchmarkStrconv-4    64.2 ns/op    1 allocs/op

爲什麼性能上會有兩倍多的差距,因爲 fmt 實現上利用反射來達到範型的效果,在運行時進行類型的動態判斷,所以帶來了一定的性能損耗。

1.2 少量的重複不比反射差

有時,我們需要一些工具函數。比如從 uint64 切片過濾掉指定的元素。

利用反射,我們可以實現一個類型泛化支持擴展的切片過濾函數。

// DeleteSliceElms 從切片中過濾指定元素。注意:不修改原切片。
func DeleteSliceElms(i interface{}, elms ...interface{}) interface{} {
 // 構建 map set。
 m := make(map[interface{}]struct{}, len(elms))
 for _, v := range elms {
  m[v] = struct{}{}
 }
 // 創建新切片,過濾掉指定元素。
 v := reflect.ValueOf(i)
 t := reflect.MakeSlice(reflect.TypeOf(i), 0, v.Len())
 for i := 0; i < v.Len(); i++ {
  if _, ok := m[v.Index(i).Interface()]; !ok {
   t = reflect.Append(t, v.Index(i))
  }
 }
 return t.Interface()
}

很多時候,我們可能只需要操作一個類型的切片,利用反射實現的類型泛化擴展的能力壓根沒用上。退一步說,如果我們真地需要對 uint64 以外類型的切片進行過濾,拷貝一次代碼又何妨呢?可以肯定的是,絕大部份場景,根本不會對所有類型的切片進行過濾,那麼反射帶來好處我們並沒有充分享受,但卻要爲其帶來的性能成本買單。

// DeleteU64liceElms 從 []uint64 過濾指定元素。注意:不修改原切片。
func DeleteU64liceElms([]uint64, elms ...uint64) []uint64 {
 // 構建 map set。
 m := make(map[uint64]struct{}, len(elms))
 for _, v := range elms {
  m[v] = struct{}{}
 }
 // 創建新切片,過濾掉指定元素。
 t := make([]uint64, 0, len(i))
 for _, v := range i {
  if _, ok := m[v]; !ok {
   t = append(t, v)
  }
 }
 return t
}

下面看一下二者的性能對比。

func BenchmarkDeleteSliceElms(b *testing.B) {
 slice := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}
 elms := []interface{}{uint64(1), uint64(3), uint64(5), uint64(7), uint64(9)}
 for i := 0; i < b.N; i++ {
  _ = DeleteSliceElms(slice, elms...)
 }
}

func BenchmarkDeleteU64liceElms(b *testing.B) {
 slice := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}
 elms := []uint64{1, 3, 5, 7, 9}
 for i := 0; i < b.N; i++ {
  _ = DeleteU64liceElms(slice, elms...)
 }
}

運行上面的基準測試。

go test -bench=. -benchmem main/reflect 
goos: darwin
goarch: amd64
pkg: main/reflect
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkDeleteSliceElms-12              1226868               978.2 ns/op           296 B/op         16 allocs/op
BenchmarkDeleteU64liceElms-12            8249469               145.3 ns/op            80 B/op          1 allocs/op
PASS
ok      main/reflect    3.809s

可以看到,反射涉及了額外的類型判斷和大量的內存分配,導致其對性能的影響非常明顯。隨着切片元素的遞增,每一次判斷元素是否在 map 中,因爲 map 的 key 是不確定的類型,會發生變量逃逸,觸發堆內存的分配。所以,可預見的是當元素數量增加時,性能差異會越來大。

當使用反射時,請問一下自己,我真地需要它嗎?

1.3 慎用 binary.Read 和 binary.Write

binary.Read 和 binary.Write 使用反射並且很慢。如果有需要用到這兩個函數的地方,我們應該手動實現這兩個函數的相關功能,而不是直接去使用它們。

encoding/binary 包實現了數字和字節序列之間的簡單轉換以及 varints 的編碼和解碼。varints 是一種使用可變字節表示整數的方法。其中數值本身越小,其所佔用的字節數越少。Protocol Buffers 對整數採用的便是這種編碼方式。

其中數字與字節序列的轉換可以用如下三個函數:

// Read 從結構化二進制數據 r 讀取到 data。data 必須是指向固定大小值的指針或固定大小值的切片。
func Read(r io.Reader, order ByteOrder, data interface{}) error
// Write 將 data 的二進制表示形式寫入 w。data 必須是固定大小的值或固定大小值的切片,或指向此類數據的指針。
func Write(w io.Writer, order ByteOrder, data interface{}) error
// Size 返回 Wirte 函數將 v 寫入到 w 中的字節數。
func Size(v interface{}) int

下面以我們熟知的 C 標準庫函數 ntohl() 函數爲例,看看 Go 利用 binary 包如何實現。

// Ntohl 將網絡字節序的 uint32 轉爲主機字節序。
func Ntohl(bys []byte) uint32 {
 r := bytes.NewReader(bys)
 err = binary.Read(buf, binary.BigEndian, &num)
}

// 如將 IP 127.0.0.1 網絡字節序解析到 uint32
fmt.Println(Ntohl([]byte{0x7f, 0, 0, 0x1})) // 2130706433 <nil>

如果我們針對 uint32 類型手動實現一個 ntohl() 呢?

func NtohlNotUseBinary(bys []byte) uint32 {
 return uint32(bys[3]) | uint32(bys[2])<<8 | uint32(bys[1])<<16 | uint32(bys[0])<<24
}

// 如將 IP 127.0.0.1 網絡字節序解析到 uint32
fmt.Println(NtohlNotUseBinary([]byte{0x7f, 0, 0, 0x1})) // 2130706433

該函數也是參考了 encoding/binary 包針對大端字節序將字節序列轉爲 uint32 類型時的實現。

下面看下剝去反射前後二者的性能差異。

func BenchmarkNtohl(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _, _ = Ntohl([]byte{0x7f, 0, 0, 0x1})
 }
}

func BenchmarkNtohlNotUseBinary(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = NtohlNotUseBinary([]byte{0x7f, 0, 0, 0x1})
 }
}

運行上面的基準測試,結果如下:

go test -bench=BenchmarkNtohl.* -benchmem main/reflect
goos: darwin
goarch: amd64
pkg: main/reflect
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkNtohl-12                       13026195                81.96 ns/op           60 B/op          4 allocs/op
BenchmarkNtohlNotUseBinary-12           1000000000               0.2511 ns/op          0 B/op          0 allocs/op
PASS
ok      main/reflect    1.841s

可見使用反射實現的 encoding/binary 包的性能相較於針對具體類型實現的版本,性能差異非常大。

2. 避免重複的字符串到字節切片的轉換

不要反覆從固定字符串創建字節 slice,因爲重複的切片初始化會帶來性能損耗。相反,請執行一次轉換並捕獲結果。

// Bad
for i := 0; i < b.N; i++ {
 w.Write([]byte("Hello world"))
}

BenchmarkBad-4   50000000   22.2 ns/op

// Good
data := []byte("Hello world")
for i := 0; i < b.N; i++ {
 w.Write(data)
}

BenchmarkGood-4  500000000   3.25 ns/op

3. 指定容器容量

儘可能指定容器容量,以便爲容器預先分配內存。這將在後續添加元素時減少通過複製來調整容器大小。

3.1 指定 map 容量提示

在儘可能的情況下,在使用 make() 初始化的時候提供容量信息。

make(map[T1]T2, hint)

向 make() 提供容量提示會在初始化時嘗試調整 map 的大小,這將減少在將元素添加到 map 時爲 map 重新分配內存。

注意,與 slice 不同。map capacity 提示並不保證完全的搶佔式分配,而是用於估計所需的 hashmap bucket 的數量。因此,在將元素添加到 map 時,甚至在指定 map 容量時,仍可能發生分配。

// Bad
m := make(map[string]os.FileInfo)

files, _ := ioutil.ReadDir("./files")
for _, f := range files {
    m[f.Name()] = f
}
// m 是在沒有大小提示的情況下創建的; 在運行時可能會有更多分配。

// Good
files, _ := ioutil.ReadDir("./files")

m := make(map[string]os.FileInfo, len(files))
for _, f := range files {
    m[f.Name()] = f
}
// m 是有大小提示創建的;在運行時可能會有更少的分配。
3.2 指定切片容量

在儘可能的情況下,在使用 make() 初始化切片時提供容量信息,特別是在追加切片時。

make([]T, length, capacity)

與 map 不同,slice capacity 不是一個提示:編譯器將爲提供給 make() 的 slice 的容量分配足夠的內存,這意味着後續的 append() 操作將導致零分配(直到 slice 的長度與容量匹配,在此之後,任何 append 都可能調整大小以容納其他元素)。

const size = 1000000

// Bad
for n := 0; n < b.N; n++ {
 data := make([]int, 0)
   for k := 0; k < size; k++ {
     data = append(data, k)
  }
}

BenchmarkBad-4    219    5202179 ns/op

// Good
for n := 0; n < b.N; n++ {
 data := make([]int, 0, size)
   for k := 0; k < size; k++ {
     data = append(data, k)
  }
}

BenchmarkGood-4   706    1528934 ns/op

執行基準測試:

go test -bench=^BenchmarkJoinStr -benchmem 
BenchmarkJoinStrWithOperator-8    66930670    17.81 ns/op    0 B/op    0 allocs/op
BenchmarkJoinStrWithSprintf-8      7032921    166.0 ns/op    64 B/op   4 allocs/op

4. 字符串拼接方式的選擇

4.1 行內拼接字符串推薦使用運算符 +

行內拼接字符串爲了書寫方便快捷,最常用的兩個方法是:

行內字符串的拼接,主要追求的是代碼的簡潔可讀。fmt.Sprintf() 能夠接收不同類型的入參,通過格式化輸出完成字符串的拼接,使用非常方便。但因其底層實現使用了反射,性能上會有所損耗。

運算符 + 只能簡單地完成字符串之間的拼接,非字符串類型的變量需要單獨做類型轉換。行內拼接字符串不會產生內存分配,也不涉及類型地動態轉換,所以性能上優於fmt.Sprintf()

從性能出發,兼顧易用可讀,如果待拼接的變量不涉及類型轉換且數量較少(<=5),行內拼接字符串推薦使用運算符 +,反之使用 fmt.Sprintf()

下面看下二者的性能對比。

// Good
func BenchmarkJoinStrWithOperator(b *testing.B) {
 s1, s2, s3 := "foo""bar""baz"
 for i := 0; i < b.N; i++ {
  _ = s1 + s2 + s3
 }
}

// Bad
func BenchmarkJoinStrWithSprintf(b *testing.B) {
 s1, s2, s3 := "foo""bar""baz"
 for i := 0; i < b.N; i++ {
  _ = fmt.Sprintf("%s%s%s", s1, s2, s3)
 }
}

執行基準測試結果如下:

go test -bench=^BenchmarkJoinStr -benchmem .
BenchmarkJoinStrWithOperator-8    70638928    17.53 ns/op     0 B/op    0 allocs/op
BenchmarkJoinStrWithSprintf-8      7520017    157.2 ns/op    64 B/op    4 allocs/op
4.2 非行內拼接字符串推薦使用 strings.Builder

字符串拼接還有其他的方式,比如strings.Join()strings.Builderbytes.Bufferbyte[],這幾種不適合行內使用。當待拼接字符串數量較多時可考慮使用。

先看下其性能測試的對比。

func BenchmarkJoinStrWithStringsJoin(b *testing.B) {
 s1, s2, s3 := "foo""bar""baz"
 for i := 0; i < b.N; i++ {
  _ = strings.Join([]string{s1, s2, s3}"")
 }
}

func BenchmarkJoinStrWithStringsBuilder(b *testing.B) {
 s1, s2, s3 := "foo""bar""baz"
 for i := 0; i < b.N; i++ {
  var builder strings.Builder
  _, _ = builder.WriteString(s1)
  _, _ = builder.WriteString(s2)
  _, _ = builder.WriteString(s3)
 }
}

func BenchmarkJoinStrWithBytesBuffer(b *testing.B) {
 s1, s2, s3 := "foo""bar""baz"
 for i := 0; i < b.N; i++ {
  var buffer bytes.Buffer
  _, _ = buffer.WriteString(s1)
  _, _ = buffer.WriteString(s2)
  _, _ = buffer.WriteString(s3)
 }
}

func BenchmarkJoinStrWithByteSlice(b *testing.B) {
 s1, s2, s3 := "foo""bar""baz"
 for i := 0; i < b.N; i++ {
  var bys []byte
  bys= append(bys, s1...)
  bys= append(bys, s2...)
  _ = append(bys, s3...)
 }
}

func BenchmarkJoinStrWithByteSlicePreAlloc(b *testing.B) {
 s1, s2, s3 := "foo""bar""baz"
 for i := 0; i < b.N; i++ {
  bys:= make([]byte, 0, 9)
  bys= append(bys, s1...)
  bys= append(bys, s2...)
  _ = append(bys, s3...)
 }
}

基準測試結果如下:

go test -bench=^BenchmarkJoinStr .
goos: windows
goarch: amd64
pkg: main/perf
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkJoinStrWithStringsJoin-8               31543916                36.39 ns/op
BenchmarkJoinStrWithStringsBuilder-8            30079785                40.60 ns/op
BenchmarkJoinStrWithBytesBuffer-8               31663521                39.58 ns/op
BenchmarkJoinStrWithByteSlice-8                 30748495                37.34 ns/op
BenchmarkJoinStrWithByteSlicePreAlloc-8         665341896               1.813 ns/op

從結果可以看出,strings.Join()strings.Builderbytes.Bufferbyte[] 的性能相近。如果結果字符串的長度是可預知的,使用 byte[] 且預先分配容量的拼接方式性能最佳。

所以如果對性能要求非常嚴格,或待拼接的字符串數量足夠多時,建議使用  byte[] 預先分配容量這種方式。

綜合易用性和性能,一般推薦使用strings.Builder來拼接字符串。

string.Builder也提供了預分配內存的方式 Grow:

func BenchmarkJoinStrWithStringsBuilderPreAlloc(b *testing.B) {
 s1, s2, s3 := "foo""bar""baz"
 for i := 0; i < b.N; i++ {
  var builder strings.Builder
  builder.Grow(9)
  _, _ = builder.WriteString(s1)
  _, _ = builder.WriteString(s2)
  _, _ = builder.WriteString(s3)
 }
}

使用了 Grow 優化後的版本的性能測試結果如下。可以看出相較於不預先分配空間的方式,性能提升了很多。

BenchmarkJoinStrWithStringsBuilderPreAlloc-8    60079003                20.95 ns/op

5. 遍歷 []struct{} 使用下標而不是 range

Go 中遍歷切片或數組有兩種方式,一種是通過下標,一種是 range。二者在功能上沒有區別,但是在性能上會有區別嗎?

5.1 []int

首先看一下遍歷基本類型切片時二者的性能差別,以 []int 爲例。

// genRandomIntSlice 生成指定長度的隨機 []int 切片
func genRandomIntSlice(n int) []int {
 rand.Seed(time.Now().UnixNano())
 nums := make([]int, 0, n)
 for i := 0; i < n; i++ {
  nums = append(nums, rand.Int())
 }
 return nums
}

func BenchmarkIndexIntSlice(b *testing.B) {
 nums := genRandomIntSlice(1024)
 for i := 0; i < b.N; i++ {
  var tmp int
  for k := 0; k < len(nums); k++ {
   tmp = nums[k]
  }
  _ = tmp
 }
}

func BenchmarkRangeIntSlice(b *testing.B) {
 nums := genRandomIntSlice(1024)
 for i := 0; i < b.N; i++ {
  var tmp int
  for _, num := range nums {
   tmp = num
  }
  _ = tmp
 }
}

運行測試結果如下:

go test -bench=IntSlice$ .
goos: windows
goarch: amd64
pkg: main/perf
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkIndexIntSlice-8         5043324               236.2 ns/op
BenchmarkRangeIntSlice-8         5076255               239.1 ns/op

genRandomIntSlice() 函數用於生成指定長度元素類型爲 int 的切片。從最終的結果可以看到,遍歷 []int 類型的切片,下標與 range 遍歷性能幾乎沒有區別。

5.2 []struct{}

那麼對於稍微複雜一點的 []struct 類型呢?

type Item struct {
 id  int
 val [1024]byte
}

func BenchmarkIndexStructSlice(b *testing.B) {
 var items [1024]Item
 for i := 0; i < b.N; i++ {
  var tmp int
  for j := 0; j < len(items); j++ {
   tmp = items[j].id
  }
  _ = tmp
 }
}

func BenchmarkRangeIndexStructSlice(b *testing.B) {
 var items [1024]Item
 for i := 0; i < b.N; i++ {
  var tmp int
  for k := range items {
   tmp = items[k].id
  }
  _ = tmp
 }
}

func BenchmarkRangeStructSlice(b *testing.B) {
 var items [1024]Item
 for i := 0; i < b.N; i++ {
  var tmp int
  for _, item := range items {
   tmp = item.id
  }
  _ = tmp
 }
}

運行測試結果如下:

go test -bench=StructSlice$ .
goos: windows
goarch: amd64
pkg: main/perf
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkIndexStructSlice-8              5079468               234.9 ns/op
BenchmarkRangeIndexStructSlice-8         5087448               236.2 ns/op
BenchmarkRangeStructSlice-8                38716               32265 ns/op

可以看出,兩種通過 index 遍歷 []struct 性能沒有差別,但是 range 遍歷 []struct 中元素時,性能非常差。

range 只遍歷 []struct 下標時,性能比 range 遍歷  []struct 值好很多。從這裏我們應該能夠知道二者性能差別之大的原因。

Item 是一個結構體類型 ,Item 由兩個字段構成,一個類型是 int,一個是類型是 [1024]byte,如果每次遍歷 []Item,都會進行一次值拷貝,所以帶來了性能損耗。

此外,因爲 range 時獲取的是值拷貝的副本,所以對副本的修改,是不會影響到原切片。

5.3 []*struct

那如果切片中是指向結構體的指針,而不是結構體呢?

// genItems 生成指定長度 []*Item 切片
func genItems(n int) []*Item {
 items := make([]*Item, 0, n)
 for i := 0; i < n; i++ {
  items = append(items, &Item{id: i})
 }
 return items
}

func BenchmarkIndexPointer(b *testing.B) {
 items := genItems(1024)
 for i := 0; i < b.N; i++ {
  var tmp int
  for k := 0; k < len(items); k++ {
   tmp = items[k].id
  }
  _ = tmp
 }
}

func BenchmarkRangePointer(b *testing.B) {
 items := genItems(1024)
 for i := 0; i < b.N; i++ {
  var tmp int
  for _, item := range items {
   tmp = item.id
  }
  _ = tmp
 }
}

執行性能測試結果:

go test -bench=Pointer$ main/perf
goos: windows
goarch: amd64
pkg: main/perf
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkIndexPointer-8           773634              1521 ns/op
BenchmarkRangePointer-8           752077              1514 ns/op

切片元素從結構體 Item 替換爲指針 *Item 後,for 和 range 的性能幾乎是一樣的。而且使用指針還有另一個好處,可以直接修改指針對應的結構體的值。

5.4 小結

range 在迭代過程中返回的是元素的拷貝,index 則不存在拷貝。

如果 range 迭代的元素較小,那麼 index 和 range 的性能幾乎一樣,如基本類型的切片 []int。但如果迭代的元素較大,如一個包含很多屬性的 struct 結構體,那麼 index 的性能將顯著地高於 range,有時候甚至會有上千倍的性能差異。對於這種場景,建議使用 index。如果使用 range,建議只迭代下標,通過下標訪問元素,這種使用方式和 index 就沒有區別了。如果想使用 range 同時迭代下標和值,則需要將切片 / 數組的元素改爲指針,才能不影響性能。

內存管理

1. 使用空結構體節省內存

1.1 不佔內存空間

在 Go 中,我們可以使用 unsafe.Sizeof 計算出一個數據類型實例需要佔用的字節數。

package main

import (
 "fmt"
 "unsafe"
)

func main() {
 fmt.Println(unsafe.Sizeof(struct{}{}))
}

運行上面的例子將會輸出:

go run main.go
0

可以看到,Go 中空結構體 struct{} 是不佔用內存空間,不像 C/C++ 中空結構體仍佔用 1 字節。

1.2 用法

因爲空結構體不佔據內存空間,因此被廣泛作爲各種場景下的佔位符使用。一是節省資源,二是空結構體本身就具備很強的語義,即這裏不需要任何值,僅作爲佔位符,達到的代碼即註釋的效果。

1.2.1 實現集合(Set)

Go 語言標準庫沒有提供 Set 的實現,通常使用 map 來代替。事實上,對於集合來說,只需要 map 的鍵,而不需要值。即使是將值設置爲 bool 類型,也會多佔據 1 個字節,那假設 map 中有一百萬條數據,就會浪費 1MB 的空間。

因此呢,將 map 作爲集合(Set)使用時,可以將值類型定義爲空結構體,僅作爲佔位符使用即可。

type Set map[string]struct{}

func (s Set) Has(key string) bool {
 _, ok := s[key]
 return ok
}

func (s Set) Add(key string) {
 s[key] = struct{}{}
}

func (s Set) Delete(key string) {
 delete(s, key)
}

func main() {
 s := make(Set)
 s.Add("foo")
 s.Add("bar")
 fmt.Println(s.Has("foo"))
 fmt.Println(s.Has("bar"))
}

如果想使用 Set 的完整功能,如初始化(通過切片構建一個 Set)、Add、Del、Clear、Contains 等操作,可以使用開源庫 golang-set

1.2.2 不發送數據的信道
func worker(ch chan struct{}) {
 <-ch
 fmt.Println("do something")
}

func main() {
 ch := make(chan struct{})
 go worker(ch)
 ch <- struct{}{}
 close(ch)
}

有時候使用 channel 不需要發送任何的數據,只用來通知子協程(goroutine)執行任務,或只用來控制協程的併發。這種情況下,使用空結構體作爲佔位符就非常合適了。

1.2.3 僅包含方法的結構體
type Door struct{}

func (d Door) Open() {
 fmt.Println("Open the door")
}

func (d Door) Close() {
 fmt.Println("Close the door")
}

在部分場景下,結構體只包含方法,不包含任何的字段。例如上面例子中的 Door,在這種情況下,Door 事實上可以用任何的數據結構替代。

type Door int
type Door bool

無論是 int 還是 bool 都會浪費額外的內存,因此呢,這種情況下,聲明爲空結構體最合適。

2. struct 佈局要考慮內存對齊

2.1 爲什麼需要內存對齊

CPU 訪問內存時,並不是逐個字節訪問,而是以字長(word size)爲單位訪問。比如 32 位的 CPU ,字長爲 4 字節,那麼 CPU 訪問內存的單位也是 4 字節。

這麼設計的目的,是減少 CPU 訪問內存的次數,加大 CPU 訪問內存的吞吐量。比如同樣讀取 8 個字節的數據,一次讀取 4 個字節那麼只需要讀取 2 次。

CPU 始終以字長訪問內存,如果不進行內存對齊,很可能增加 CPU 訪問內存的次數,例如:

變量 a、b 各佔據 3 字節的空間,內存對齊後,a、b 佔據 4 字節空間,CPU 讀取 b 變量的值只需要進行一次內存訪問。如果不進行內存對齊,CPU 讀取 b 變量的值需要進行 2 次內存訪問。第一次訪問得到 b 變量的第 1 個字節,第二次訪問得到 b 變量的後兩個字節。

從這個例子中也可以看到,內存對齊對實現變量的原子性操作也是有好處的,每次內存訪問是原子的,如果變量的大小不超過字長,那麼內存對齊後,對該變量的訪問就是原子的,這個特性在併發場景下至關重要。

簡言之:合理的內存對齊可以提高內存讀寫的性能,並且便於實現變量操作的原子性。

2.2 Go 內存對齊規則

編譯器一般爲了減少 CPU 訪存指令週期,提高內存的訪問效率,會對變量進行內存對齊。Go 作爲一門追求高性能的後臺編程語言,當然也不例外。

Go Language Specification 中 Size and alignment guarantees 描述了內存對齊的規則。

1.For a variable x of any type: unsafe.Alignof(x) is at least 1. 2.For a variable x of struct type: unsafe.Alignof(x) is the largest of all the values unsafe.Alignof(x.f) for each field f of x, but at least 1. 3.For a variable x of array type: unsafe.Alignof(x) is the same as the alignment of a variable of the array's element type.

其中函數 unsafe.Alignof 用於獲取變量的對齊係數。對齊係數決定了字段的偏移和變量的大小,兩者必須是對齊係數的整數倍。

2.3 合理的 struct 佈局

因爲內存對齊的存在,合理的 struct 佈局可以減少內存佔用,提高程序性能。

type demo1 struct {
 a int8
 b int16
 c int32
}

type demo2 struct {
 a int8
 c int32
 b int16
}

func main() {
 fmt.Println(unsafe.Sizeof(demo1{})) // 8
 fmt.Println(unsafe.Sizeof(demo2{})) // 12
}

可以看到,同樣的字段,因字段排列順序不同,最終會導致不一樣的結構體大小。

每個字段按照自身的對齊係數來確定在內存中的偏移量,一個字段因偏移而浪費的大小也不同。

接下來逐個分析,首先是 demo1:a 是第一個字段,默認是已經對齊的,從第 0 個位置開始佔據 1 字節。b 是第二個字段,對齊係數爲 2,因此,必須空出 1 個字節,偏移量纔是 2 的倍數,從第 2 個位置開始佔據 2 字節。c 是第三個字段,對齊倍數爲 4,此時,內存已經是對齊的,從第 4 個位置開始佔據 4 字節即可。

因此 demo1 的內存佔用爲 8 字節。

對於 demo2:a 是第一個字段,默認是已經對齊的,從第 0 個位置開始佔據 1 字節。c 是第二個字段,對齊倍數爲 4,因此,必須空出 3 個字節,偏移量纔是 4 的倍數,從第 4 個位置開始佔據 4 字節。b 是第三個字段,對齊倍數爲 2,從第 8 個位置開始佔據 2 字節。

demo2 的對齊係數由 c 的對齊係數決定,也是 4,因此,demo2 的內存佔用爲 12 字節。

因此,在對內存特別敏感的結構體的設計上,我們可以通過調整字段的順序,將字段寬度從小到大由上到下排列,來減少內存的佔用。

2.4 空結構與空數組對內存對齊的影響

空結構與空數組在 Go 中比較特殊。沒有任何字段的空 struct{} 和沒有任何元素的 array 佔據的內存空間大小爲 0。

因爲這一點,空 struct{} 或空 array 作爲其他 struct 的字段時,一般不需要內存對齊。但是有一種情況除外:即當 struct{} 或空 array 作爲結構體最後一個字段時,需要內存對齊。因爲如果有指針指向該字段,返回的地址將在結構體之外,如果此指針一直存活不釋放對應的內存,就會有內存泄露的問題(該內存不因結構體釋放而釋放)。

type demo3 struct {
 a struct{}
 b int32
}
type demo4 struct {
 b int32
 a struct{}
}

func main() {
 fmt.Println(unsafe.Sizeof(demo3{})) // 4
 fmt.Println(unsafe.Sizeof(demo4{})) // 8
}

可以看到,demo3{} 的大小爲 4 字節,與字段 b 佔據空間一致,而 demo4{} 的大小爲 8 字節,即額外填充了 4 字節的空間。

3. 減少逃逸,將變量限制在棧上

變量逃逸一般發生在如下幾種情況:

知道變量逃逸的原因後,我們可以有意識的控制變量不發生逃逸,將其控制在棧上,減少堆變量的分配,降低 GC 成本,提高程序性能。

3.1 小的拷貝好過引用

小的拷貝好過引用,什麼意思呢,就是儘量使用棧變量而不是堆變量。下面舉一個反常識的例子,來證明小的拷貝比在堆上創建引用變量要好。

我們都知道 Go 裏面的 Array 以 pass-by-value 方式傳遞後,再加上其長度不可擴展,考慮到性能我們一般很少使用它。實際上,凡事無絕對。有時使用數組進行拷貝傳遞,比使用切片要好。

// copy/copy.go

const capacity = 1024

func arrayFibonacci() [capacity]int {
 var d [capacity]int
 for i := 0; i < len(d); i++ {
  if i <= 1 {
   d[i] = 1
   continue
  }
  d[i] = d[i-1] + d[i-2]
 }
 return d
}

func sliceFibonacci() []int {
 d := make([]int, capacity)
 for i := 0; i < len(d); i++ {
  if i <= 1 {
   d[i] = 1
   continue
  }
  d[i] = d[i-1] + d[i-2]
 }
 return d
}

下面看一下性能對比。

func BenchmarkArray(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = arrayFibonacci()
 }
}

func BenchmarkSlice(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = sliceFibonacci()
 }
}

運行上面的基準測試,將得到如下結果。

go test -bench=. -benchmem -gcflags="-l" main/copy
goos: darwin
goarch: amd64
pkg: main/copy
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkArray-12         692400              1708 ns/op               0 B/op          0 allocs/op
BenchmarkSlice-12         464974              2242 ns/op            8192 B/op          1 allocs/op
PASS
ok      main/copy       3.908s

從測試結果可以看出,對數組的拷貝性能卻比使用切片要好。爲什麼會這樣呢?

sliceFibonacci() 函數中分配的局部變量切片因爲要返回到函數外部,所以發生了逃逸,需要在堆上申請內存空間。從測試也過也可以看出,arrayFibonacci() 函數沒有內存分配,完全在棧上完成數組的創建。這裏說明了對於一些短小的對象,棧上覆制的成本遠小於在堆上分配和回收操作。

需要注意,運行上面基準測試時,傳遞了禁止內聯的編譯選項 "-l",如果發生內聯,那麼將不會出現變量的逃逸,就不存在堆上分配內存與回收的操作了,二者將看不出性能差異。

編譯時可以藉助選項 -gcflags=-m 查看編譯器對上面兩個函數的優化決策。

go build  -gcflags=-m copy/copy.go
# command-line-arguments
copy/copy.go:5:6: can inline arrayFibonacci
copy/copy.go:17:6: can inline sliceFibonacci
copy/copy.go:18:11: make([]int, capacity) escapes to heap

可以看到,arrayFibonacci() 和 sliceFibonacci() 函數均可內聯。sliceFibonacci() 函數中定義的局部變量切片逃逸到了堆。

那麼多大的變量纔算是小變量呢?對 Go 編譯器而言,超過一定大小的局部變量將逃逸到堆上,不同的 Go 版本的大小限制可能不一樣。一般是 <64KB,局部變量將不會逃逸到堆上。

3.2 返回值 VS 返回指針

值傳遞會拷貝整個對象,而指針傳遞只會拷貝地址,指向的對象是同一個。返回指針可以減少值的拷貝,但是會導致內存分配逃逸到堆中,增加垃圾回收 (GC) 的負擔。在對象頻繁創建和刪除的場景下,傳遞指針導致的 GC 開銷可能會嚴重影響性能。

一般情況下,對於需要修改原對象值,或佔用內存比較大的結構體,選擇返回指針。對於只讀的佔用內存較小的結構體,直接返回值能夠獲得更好的性能。

3.3 返回值使用確定的類型

如果變量類型不確定,那麼將會逃逸到堆上。所以,函數返回值如果能確定的類型,就不要使用 interface{}。

我們還是以上面斐波那契數列函數爲例,看下返回值爲確定類型和 interface{} 的性能差別。

const capacity = 1024

func arrayFibonacci() [capacity]int {
 var d [capacity]int
 for i := 0; i < len(d); i++ {
  if i <= 1 {
   d[i] = 1
   continue
  }
  d[i] = d[i-1] + d[i-2]
 }
 return d
}

func arrayFibonacciIfc() interface{} {
 var d [capacity]int
 for i := 0; i < len(d); i++ {
  if i <= 1 {
   d[i] = 1
   continue
  }
  d[i] = d[i-1] + d[i-2]
 }
 return d
}
func BenchmarkArray(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = arrayFibonacci()
 }
}

func BenchmarkIfc(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = arrayFibonacciIfc()
 }
}

運行上面的基準測試結果如下:

go test -bench=. -benchmem main/copy
goos: darwin
goarch: amd64
pkg: main/copy
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkArray-12         832418              1427 ns/op               0 B/op          0 allocs/op
BenchmarkIfc-12           380626              2861 ns/op            8192 B/op          1 allocs/op
PASS
ok      main/copy       3.742s

可見,函數返回值使用 interface{} 返回時,編譯器無法確定返回值的具體類型,導致返回值逃逸到堆上。當發生了堆上內存的申請與回收時,性能會差一點。

4.sync.Pool 複用對象

4.1 簡介

sync.Pool 是 sync 包下的一個組件,可以作爲保存臨時取還對象的一個 “池子”。個人覺得它的名字有一定的誤導性,因爲 Pool 裏裝的對象可以被無通知地被回收,可能 sync.Cache 是一個更合適的名字。

sync.Pool 是可伸縮的,同時也是併發安全的,其容量僅受限於內存的大小。存放在池中的對象如果不活躍了會被自動清理。

4.2 作用

對於很多需要重複分配、回收內存的地方,sync.Pool 是一個很好的選擇。頻繁地分配、回收內存會給 GC 帶來一定的負擔,嚴重的時候會引起 CPU 的毛刺,而 sync.Pool 可以將暫時不用的對象緩存起來,待下次需要的時候直接使用,不用再次經過內存分配,複用對象的內存,減輕 GC 的壓力,提升系統的性能。

一句話總結:用來保存和複用臨時對象,減少內存分配,降低 GC 壓力。

4.3 如何使用

sync.Pool 的使用方式非常簡單,只需要實現 New 函數即可。對象池中沒有對象時,將會調用 New 函數創建。

假設我們有一個 “學生” 結構體,並複用改結構體對象。

type Student struct {
 Name   string
 Age    int32
 Remark [1024]byte
}

var studentPool = sync.Pool{
    New: func() interface{} { 
        return new(Student) 
    },
}

然後調用 Pool 的 Get() 和 Put() 方法來獲取和放回池子中。

stu := studentPool.Get().(*Student)
json.Unmarshal(buf, stu)
studentPool.Put(stu)
4.4 性能差異

我們以 bytes.Buffer 字節緩衝器爲例,利用 sync.Pool 複用 bytes.Buffer 對象,避免重複創建與回收內存,來看看對性能的提升效果。

var bufferPool = sync.Pool{
 New: func() interface{} {
  return &bytes.Buffer{}
 },
}

var data = make([]byte, 10000)

func BenchmarkBufferWithPool(b *testing.B) {
 for n := 0; n < b.N; n++ {
  buf := bufferPool.Get().(*bytes.Buffer)
  buf.Write(data)
  buf.Reset()
  bufferPool.Put(buf)
 }
}

func BenchmarkBuffer(b *testing.B) {
 for n := 0; n < b.N; n++ {
  var buf bytes.Buffer
  buf.Write(data)
 }
}

測試結果如下:

go test -bench=. -benchmem main/pool
goos: darwin
goarch: amd64
pkg: main/pool
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkBufferWithPool-12      11987966                97.12 ns/op            0 B/op          0 allocs/op
BenchmarkBuffer-12               1246887              1020 ns/op           10240 B/op          1 allocs/op
PASS
ok      main/pool       3.510s

這個例子創建了一個 bytes.Buffer 對象池,每次只執行 Write 操作,及做一次數據拷貝,耗時幾乎可以忽略。而內存分配和回收的耗時佔比較多,因此對程序整體的性能影響更大。從測試結果也可以看出,使用了 Pool 複用對象,每次操作不再有內存分配。

4.5 在標準庫中的應用

Go 標準庫也大量使用了 sync.Pool,例如 fmt 和 encoding/json。以 fmt 包爲例,我們看下其是如何使用 sync.Pool 的。

我們可以看一下最常用的標準格式化輸出函數 Printf() 函數。

// Printf formats according to a format specifier and writes to standard output.
// It returns the number of bytes written and any write error encountered.
func Printf(format string, a ...interface{}) (n int, err error) {
 return Fprintf(os.Stdout, format, a...)
}

繼續看 Fprintf() 的定義。

// Fprintf formats according to a format specifier and writes to w.
// It returns the number of bytes written and any write error encountered.
func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {
 p := newPrinter()
 p.doPrintf(format, a)
 n, err = w.Write(p.buf)
 p.free()
 return
}

Fprintf() 函數的參數是一個 io.Writer,Printf() 傳的是 os.Stdout,相當於直接輸出到標準輸出。這裏的 newPrinter 用的就是 sync.Pool。

// go version go1.17 darwin/amd64

// pp is used to store a printer's state and is reused with sync.Pool to avoid allocations.
type pp struct {
    buf buffer
    ...
}

var ppFree = sync.Pool{
 New: func() interface{} { return new(pp) },
}

// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
 p := ppFree.Get().(*pp)
 p.panicking = false
 p.erroring = false
 p.wrapErrs = false
 p.fmt.init(&p.buf)
 return p
}

// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
 // Proper usage of a sync.Pool requires each entry to have approximately
 // the same memory cost. To obtain this property when the stored type
 // contains a variably-sized buffer, we add a hard limit on the maximum buffer
 // to place back in the pool.
 //
 // See https://golang.org/issue/23199
 if cap(p.buf) > 64<<10 {
  return
 }

 p.buf = p.buf[:0]
 p.arg = nil
 p.value = reflect.Value{}
 p.wrappedErr = nil
 ppFree.Put(p)
}

fmt.Printf() 的調用是非常頻繁的,利用 sync.Pool 複用 pp 對象能夠極大地提升性能,減少內存佔用,同時降低 GC 壓力。

併發編程

1. 關於鎖

1.1 無鎖化

加鎖是爲了避免在併發環境下,同時訪問共享資源產生的安全問題。那麼,在併發環境下,是否必須加鎖?答案是否定的。並非所有的併發都需要加鎖。適當地降低鎖的粒度,甚至採用無鎖化的設計,更能提升併發能力。

無鎖化主要有兩種實現,無鎖數據結構和串行無鎖。

1.1.1 無鎖數據結構

利用硬件支持的原子操作可以實現無鎖的數據結構,原子操作可以在 lock-free 的情況下保證併發安全,並且它的性能也能做到隨 CPU 個數的增多而線性擴展。很多語言都提供 CAS 原子操作(如 Go 中的 atomic 包和 C++11 中的 atomic 庫),可以用於實現無鎖數據結構,如無鎖鏈表。

我們以一個簡單的線程安全單向鏈表的插入操作來看下無鎖編程和普通加鎖的區別。

package list

import (
 "fmt"
 "sync"
 "sync/atomic"

 "golang.org/x/sync/errgroup"
)

// Node 鏈表節點
type Node struct {
 Value interface{}
 Next  *Node
}

//
// 有鎖單向鏈表的簡單實現
//

// WithLockList 有鎖單向鏈表
type WithLockList struct {
 Head *Node
 mu   sync.Mutex
}

// Push 將元素插入到鏈表的首部
func (l *WithLockList) Push(v interface{}) {
 l.mu.Lock()
 defer l.mu.Unlock()
 n := &Node{
  Value: v,
  Next:  l.Head,
 }
 l.Head = n
}

// String 有鎖鏈表的字符串形式輸出
func (l WithLockList) String() string {
 s := ""
 cur := l.Head
 for {
  if cur == nil {
   break
  }
  if s != "" {
   s += ","
  }
  s += fmt.Sprintf("%v", cur.Value)
  cur = cur.Next
 }
 return s
}

//
// 無鎖單向鏈表的簡單實現
//

// LockFreeList 無鎖單向鏈表
type LockFreeList struct {
 Head atomic.Value
}

// Push 有鎖
func (l *LockFreeList) Push(v interface{}) {
 for {
  head := l.Head.Load()
  headNode, _ := head.(*Node)
  n := &Node{
   Value: v,
   Next:  headNode,
  }
  if l.Head.CompareAndSwap(head, n) {
   break
  }
 }
}

// String 有鎖鏈表的字符串形式輸出
func (l LockFreeList) String() string {
 s := ""
 cur := l.Head.Load().(*Node)
 for {
  if cur == nil {
   break
  }
  if s != "" {
   s += ","
  }
  s += fmt.Sprintf("%v", cur.Value)
  cur = cur.Next
 }
 return s
}

上面的實現有幾點需要注意一下:

(1)無鎖單向鏈表實現時在插入時需要進行 CAS 操作,即調用CompareAndSwap()方法進行插入,如果插入失敗則進行 for 循環多次嘗試,直至成功。

(2)爲了方便打印鏈表內容,實現一個String()方法遍歷鏈表,且使用值作爲接收者,避免打印對象指針時無法生效。

  1. If an operand implements method String() string, that method will be invoked to convert the object to a string, which will then be formatted as required by the verb (if any).

我們分別對兩種鏈表做一個併發寫入的操作驗證一下其功能。

package main

import (
 "fmt"
 
 "main/list"
)

// ConcurWriteWithLockList 併發寫入有鎖鏈表
func ConcurWriteWithLockList(l *WithLockList) {
 var g errgroup.Group
 // 10 個協程併發寫入鏈表
 for i := 0; i < 10; i++ {
  i := i
  g.Go(func() error {
   l.Push(i)
   return nil
  })
 }
 _ = g.Wait()
}

// ConcurWriteLockFreeList 併發寫入無鎖鏈表
func ConcurWriteLockFreeList(l *LockFreeList) {
 var g errgroup.Group
 // 10 個協程併發寫入鏈表
 for i := 0; i < 10; i++ {
  i := i
  g.Go(func() error {
   l.Push(i)
   return nil
  })
 }
 _ = g.Wait()
}

func main() {
 // 併發寫入與遍歷打印有鎖鏈表
 l1 := &list.WithLockList{}
 list.ConcurWriteWithLockList(l1)
 fmt.Println(l1)

 // 併發寫入與遍歷打印無鎖鏈表
 l2 := &list.LockFreeList{}
 list.ConcurWriteLockFreeList(l2)
 fmt.Println(l2)
}

注意,多次運行上面的main()函數的結果可能會不相同,因爲併發是無序的。

8,7,6,9,5,4,3,1,2,0
9,8,7,6,5,4,3,2,0,1

下面再看一下鏈表 Push 操作的基準測試,對比一下有鎖與無鎖的性能差異。

func BenchmarkWriteWithLockList(b *testing.B) {
 l := &WithLockList{}
 for n := 0; n < b.N; n++ {
  l.Push(n)
 }
}
BenchmarkWriteWithLockList-8    14234166                83.58 ns/op

func BenchmarkWriteLockFreeList(b *testing.B) {
 l := &LockFreeList{}
 for n := 0; n < b.N; n++ {
  l.Push(n)
 }
}
BenchmarkWriteLockFreeList-8    15219405                73.15 ns/op

可以看出無鎖版本比有鎖版本性能高一些。

1.1.2 串行無鎖

串行無鎖是一種思想,就是避免對共享資源的併發訪問,改爲每個併發操作訪問自己獨佔的資源,達到串行訪問資源的效果,來避免使用鎖。不同的場景有不同的實現方式。比如網絡 I/O 場景下將單 Reactor 多線程模型改爲主從 Reactor 多線程模型,避免對同一個消息隊列鎖讀取。

這裏我介紹的是後臺微服務開發經常遇到的一種情況。我們經常需要併發拉取多方面的信息,匯聚到一個變量上。那麼此時就存在對同一個變量互斥寫入的情況。比如批量併發拉取用戶信息寫入到一個 map。此時我們可以將每個協程拉取的結果寫入到一個臨時對象,這樣便將併發地協程與同一個變量解綁,然後再將其匯聚到一起,這樣便可以不用使用鎖。即獨立處理,然後合併。

爲了模擬上面的情況,簡單地寫個示例程序,對比下性能。

import (
 "sync"

 "golang.org/x/sync/errgroup"
)

// ConcurWriteMapWithLock 有鎖併發寫入 map
func ConcurWriteMapWithLock() map[int]int {
 m := make(map[int]int)
 var mu sync.Mutex
 var g errgroup.Group
 // 10 個協程併發寫入 map
 for i := 0; i < 10; i++ {
  i := i
  g.Go(func() error {
   mu.Lock()
   defer mu.Unlock()
   m[i] = i * i
   return nil
  })
 }
 _ = g.Wait()
 return m
}

// ConcurWriteMapLockFree 無鎖併發寫入 map
func ConcurWriteMapLockFree() map[int]int {
 m := make(map[int]int)
 // 每個協程獨佔一 value
 values := make([]int, 10)
 // 10 個協程併發寫入 map
 var g errgroup.Group
 for i := 0; i < 10; i++ {
  i := i
  g.Go(func() error {
   values[i] = i * i
   return nil
  })
 }
 _ = g.Wait()
 // 匯聚結果到 map
 for i, v := range values {
  m[i] = v
 }
 return m
}

看下二者的性能差異:

func BenchmarkConcurWriteMapWithLock(b *testing.B) {
 for n := 0; n < b.N; n++ {
  _ = ConcurWriteMapWithLock()
 }
}
BenchmarkConcurWriteMapWithLock-8         218673              5089 ns/op

func BenchmarkConcurWriteMapLockFree(b *testing.B) {
 for n := 0; n < b.N; n++ {
  _ = ConcurWriteMapLockFree()
 }
}
BenchmarkConcurWriteMapLockFree-8         316635              4048 ns/op
1.2 減少鎖競爭

如果加鎖無法避免,則可以採用分片的形式,減少對資源加鎖的次數,這樣也可以提高整體的性能。

比如 Golang 優秀的本地緩存組件  bigcachego-cachefreecache 都實現了分片功能,每個分片一把鎖,採用分片存儲的方式減少加鎖的次數從而提高整體性能。

以一個簡單的示例,通過對map[uint64]struct{}分片前後併發寫入的對比,來看下減少鎖競爭帶來的性能提升。

var (
 num = 1000000
 m0  = make(map[int]struct{}, num)
 mu0 = sync.RWMutex{}
 m1  = make(map[int]struct{}, num)
 mu1 = sync.RWMutex{}
)

// ConWriteMapNoShard 不分片寫入一個 map。
func ConWriteMapNoShard() {
 g := errgroup.Group{}
 for i := 0; i < num; i++ {
  g.Go(func() error {
   mu0.Lock()
   defer mu0.Unlock()
   m0[i] = struct{}{}
   return nil
  })
 }
 _ = g.Wait()
}

// ConWriteMapTwoShard 分片寫入兩個 map。
func ConWriteMapTwoShard() {
 g := errgroup.Group{}
 for i := 0; i < num; i++ {
  g.Go(func() error {
   if i&1 == 0 {
    mu0.Lock()
    defer mu0.Unlock()
    m0[i] = struct{}{}
    return nil
   }
   mu1.Lock()
   defer mu1.Unlock()
   m1[i] = struct{}{}
   return nil
  })
 }
 _ = g.Wait()
}

看下二者的性能差異:

func BenchmarkConWriteMapNoShard(b *testing.B) {
 for i := 0; i < b.N; i++ {
  ConWriteMapNoShard()
 }
}
BenchmarkConWriteMapNoShard-12                 3         472063245 ns/op

func BenchmarkConWriteMapTwoShard(b *testing.B) {
 for i := 0; i < b.N; i++ {
  ConWriteMapTwoShard()
 }
}
BenchmarkConWriteMapTwoShard-12                4         310588155 ns/op

可以看到,通過對分共享資源的分片處理,減少了鎖競爭,能明顯地提高程序的併發性能。可以預見的是,隨着分片粒度地變小,性能差距會越來越大。當然,分片粒度不是越小越好。因爲每一個分片都要配一把鎖,那麼會帶來很多額外的不必要的開銷。可以選擇一個不太大的值,在性能和花銷上尋找一個平衡。

1.3 優先使用共享鎖而非互斥鎖

如果併發無法做到無鎖化,優先使用共享鎖而非互斥鎖。

所謂互斥鎖,指鎖只能被一個 Goroutine 獲得。共享鎖指可以同時被多個 Goroutine 獲得的鎖。

Go 標準庫 sync 提供了兩種鎖,互斥鎖(sync.Mutex)和讀寫鎖(sync.RWMutex),讀寫鎖便是共享鎖的一種具體實現。

1.3.1 sync.Mutex

互斥鎖的作用是保證共享資源同一時刻只能被一個 Goroutine 佔用,一個 Goroutine 佔用了,其他的 Goroutine 則阻塞等待。

sync.Mutex 提供了兩個導出方法用來使用鎖。

Lock()   // 加鎖
Unlock()   // 釋放鎖

我們可以通過在訪問共享資源前前用 Lock 方法對資源進行上鎖,在訪問共享資源後調用 Unlock 方法來釋放鎖,也可以用 defer 語句來保證互斥鎖一定會被解鎖。在一個 Go 協程調用 Lock 方法獲得鎖後,其他請求鎖的協程都會阻塞在 Lock 方法,直到鎖被釋放。

1.3.2 sync.RWMutex

讀寫鎖是一種共享鎖,也稱之爲多讀單寫鎖 (multiple readers, single writer lock)。在使用鎖時,對獲取鎖的目的操作做了區分,一種是讀操作,一種是寫操作。因爲同一時刻允許多個 Gorouine 獲取讀鎖,所以是一種共享鎖。但寫鎖是互斥的。

一般來說,有如下幾種情況:

sync.RWMutex 提供了五個導出方法用來使用鎖。

Lock()    // 加寫鎖
Unlock()   // 釋放寫鎖
RLock()    // 加讀鎖
RUnlock()   // 釋放讀鎖
RLocker() Locker // 返回讀鎖,使用 Lock() 和 Unlock() 進行 RLock() 和 RUnlock()

讀寫鎖的存在是爲了解決讀多寫少時的性能問題,讀場景較多時,讀寫鎖可有效地減少鎖阻塞的時間。

1.3.3 性能對比

大部分業務場景是讀多寫少,所以使用讀寫鎖可有效提高對共享數據的訪問效率。最壞的情況,只有寫請求,那麼讀寫鎖頂多退化成互斥鎖。所以優先使用讀寫鎖而非互斥鎖,可以提高程序的併發性能。

接下來,我們測試三種情景下,互斥鎖和讀寫鎖的性能差異。

首先根據互斥鎖和讀寫鎖分別實現對共享 map 的併發讀寫。

// OpMapWithMutex 使用互斥鎖讀寫 map。
// rpct 爲讀操作佔比。
func OpMapWithMutex(rpct int) {
 m := make(map[int]struct{})
 mu := sync.Mutex{}
 var wg sync.WaitGroup
 for i := 0; i < 100; i++ {
  i := i
  wg.Add(1)
  go func() {
   defer wg.Done()
   mu.Lock()
   defer mu.Unlock()
   // 寫操作。
   if i >= rpct {
    m[i] = struct{}{}
    time.Sleep(time.Microsecond)
    return
   }
   // 讀操作。
   _ = m[i]
   time.Sleep(time.Microsecond)
  }()
 }
 wg.Wait()
}

// OpMapWithRWMutex 使用讀寫鎖讀寫 map。
// rpct 爲讀操作佔比。
func OpMapWithRWMutex(rpct int) {
 m := make(map[int]struct{})
 mu := sync.RWMutex{}
 var wg sync.WaitGroup
 for i := 0; i < 100; i++ {
  i := i
  wg.Add(1)
  go func() {
   defer wg.Done()
   // 寫操作。
   if i >= rpct {
    mu.Lock()
    defer mu.Unlock()
    m[i] = struct{}{}
    time.Sleep(time.Microsecond)
    return
   }
   // 讀操作。
   mu.RLock()
   defer mu.RUnlock()
   _ = m[i]
   time.Sleep(time.Microsecond)
  }()
 }
 wg.Wait()
}

入參 rpct 用來調節讀操作的佔比,來模擬讀寫佔比不同的場景。rpct 設爲 80 表示讀多寫少 (讀佔 80%),rpct 設爲 50 表示讀寫一致 (各佔 50%),rpct 設爲 20 表示讀少寫多 (讀佔 20%)。

func BenchmarkMutexReadMore(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithMutex(80)
 }
}

func BenchmarkRWMutexReadMore(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithRWMutex(80)
 }
}

func BenchmarkMutexRWEqual(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithMutex(50)
 }
}

func BenchmarkRWMutexRWEqual(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithRWMutex(50)
 }
}

func BenchmarkMutexWriteMore(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithMutex(20)
 }
}

func BenchmarkRWMutexWriteMore(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithRWMutex(20)
 }
}

執行當前包下的所有基準測試,結果如下:

dablelv@DABLELV-MB0 mutex % go test -bench=.
goos: darwin
goarch: amd64
pkg: main/mutex
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkMutexReadMore-12                   2462            485917 ns/op
BenchmarkRWMutexReadMore-12                 8074            145690 ns/op
BenchmarkMutexRWEqual-12                    2406            498673 ns/op
BenchmarkRWMutexRWEqual-12                  4124            303693 ns/op
BenchmarkMutexWriteMore-12                  1906            532350 ns/op
BenchmarkRWMutexWriteMore-12                2462            432386 ns/op
PASS
ok      main/mutex      9.532s

可見讀多寫少的場景,使用讀寫鎖併發性能會更優。可以預見的是如果寫佔比更低,那麼讀寫鎖帶的併發效果會更優。

這裏需要注意的是,因爲每次讀寫 map 的操作耗時很短,所以每次睡眠一微秒(百萬分之一秒)來增加耗時,不然對共享資源的訪問耗時,小於鎖處理的本身耗時,那麼使用讀寫鎖帶來的性能優化效果將變得不那麼明顯,甚至會降低性能。

2. 限制協程數量

2.1 協程數過多的問題
2.1.1 程序崩潰

Go 程(goroutine)是由 Go 運行時管理的輕量級線程。通過它我們可以輕鬆實現併發編程。但是當我們無限開闢協程時,將會遇到致命的問題。

func main() {
 var wg sync.WaitGroup
 for i := 0; i < math.MaxInt32; i++ {
  wg.Add(1)
  go func(i int) {
   defer wg.Done()
   fmt.Println(i)
   time.Sleep(time.Second)
  }(i)
 }
 wg.Wait()
}

這個例子實現了 math.MaxInt32 個協程的併發,2^31 - 1 約爲 20 億個,每個協程內部幾乎沒有做什麼事情。正常的情況下呢,這個程序會亂序輸出 0 ~ 2^31-1 個數字。

程序會像預期的那樣順利的運行嗎?

go run main.go
...
108668
1142025
panic: too many concurrent operations on a single file or socket (max 1048575)

goroutine 1158408 [running]:
internal/poll.(*fdMutex).rwlock(0xc0000ae060, 0x0)
        /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x11b
internal/poll.(*FD).writeLock(...)
        /usr/local/go/src/internal/poll/fd_mutex.go:239
internal/poll.(*FD).Write(0xc0000ae060, {0xc12cadf690, 0x8, 0x8})
        /usr/local/go/src/internal/poll/fd_unix.go:262 +0x72
os.(*File).write(...)
        /usr/local/go/src/os/file_posix.go:49
os.(*File).Write(0xc0000ac008, {0xc12cadf690, 0x1, 0xc12ea62f50})
        /usr/local/go/src/os/file.go:176 +0x65
fmt.Fprintln({0x10c00e0, 0xc0000ac008}{0xc12ea62f90, 0x1, 0x1})
        /usr/local/go/src/fmt/print.go:265 +0x75
fmt.Println(...)
        /usr/local/go/src/fmt/print.go:274
main.main.func1(0x0)
        /Users/dablelv/work/code/test/main.go:16 +0x8f
...

運行的結果是程序直接崩潰了,關鍵的報錯信息是:

panic: too many concurrent operations on a single file or socket (max 1048575)

對單個 file/socket 的併發操作個數超過了系統上限,這個報錯是 fmt.Printf 函數引起的,fmt.Printf 將格式化後的字符串打印到屏幕,即標準輸出。在 Linux 系統中,標準輸出也可以視爲文件,內核(Kernel)利用文件描述符(File Descriptor)來訪問文件,標準輸出的文件描述符爲 1,錯誤輸出文件描述符爲 2,標準輸入的文件描述符爲 0。

簡而言之,系統的資源被耗盡了。

那如果我們將 fmt.Printf 這行代碼去掉呢?那程序很可能會因爲內存不足而崩潰。這一點更好理解,每個協程至少需要消耗 2KB 的空間,那麼假設計算機的內存是 4GB,那麼至多允許 4GB/2KB = 1M 個協程同時存在。那如果協程中還存在着其他需要分配內存的操作,那麼允許併發執行的協程將會數量級地減少。

2.1.2 協程的代價

前面的例子過於極端,一般情況下程序也不會無限開闢協程,旨在說明協程數量是有限制的,不能無限開闢。

如果我們開闢很多協程,但不會導致程序崩潰,可以嗎?如果真要這麼做的話,我們應該清楚地知道,協程雖然輕量,但仍有開銷。

Go 的開銷主要是三個方面:創建(佔用內存)、調度(增加調度器負擔)和刪除(增加 GC 壓力)。

空間上,一個 Go 程佔用約 2K 的內存,在源碼 src/runtime/runtime2.go 裏面,我們可以找到 Go 程的結構定義 type g struct。

時間上,協程調度也會有 CPU 開銷。我們可以利用 runntime.Gosched() 讓當前協程主動讓出 CPU 去執行另外一個協程,下面看一下協程之間切換的耗時。

const NUM = 10000

func cal() {
 for i := 0; i < NUM; i++ {
  runtime.Gosched()
 }
}

func main() {
 // 只設置一個 Processor
 runtime.GOMAXPROCS(1)
 start := time.Now().UnixNano()
 go cal()
 for i := 0; i < NUM; i++ {
  runtime.Gosched()
 }
 end := time.Now().UnixNano()
 fmt.Printf("total %vns per %vns", end-start, (end-start)/NUM)
}

運行輸出:

total 997200ns per 99ns

可見一次協程的切換,耗時大概在 100ns,相對於線程的微秒級耗時切換,性能表現非常優秀,但是仍有開銷。

package main

import (
 "fmt"
 "runtime"
 "runtime/debug"
 "sync"
 "time"
)

func createLargeNumGoroutine(num int, wg *sync.WaitGroup) {
 wg.Add(num)
 for i := 0; i < num; i++ {
  go func() {
   defer wg.Done()
  }()
 }
}

func main() {
 // 只設置一個 Processor 保證 Go 程串行執行
 runtime.GOMAXPROCS(1)
 // 關閉GC改爲手動執行
 debug.SetGCPercent(-1)

 var wg sync.WaitGroup
 createLargeNumGoroutine(1000, &wg)
 wg.Wait()
 t := time.Now()
 runtime.GC() // 手動GC
 cost := time.Since(t)
 fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 1000)

 createLargeNumGoroutine(10000, &wg)
 wg.Wait()
 t = time.Now()
 runtime.GC() // 手動GC
 cost = time.Since(t)
 fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 10000)

 createLargeNumGoroutine(100000, &wg)
 wg.Wait()
 t = time.Now()
 runtime.GC() // 手動GC
 cost = time.Since(t)
 fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 100000)
}

運行輸出:

GC cost 0s when goroutine num is 1000
GC cost 2.0027ms when goroutine num is 10000
GC cost 30.9523ms when goroutine num is 100000

當創建的 Go 程數量越多,GC 耗時越大。

上面的分析目的是爲了儘可能地量化 Goroutine 的開銷。雖然官方宣稱用 Golang 寫併發程序的時候隨便起個成千上萬的 Goroutine 毫無壓力,但當我們起十萬、百萬甚至千萬個 Goroutine 呢?Goroutine 輕量的開銷將被放大。

2.2 限制協程數量

系統地資源是有限,協程是有代價的,爲了保護程序,提高性能,我們應主動限制併發的協程數量。

可以利用信道 channel 的緩衝區大小來實現。

func main() {
 var wg sync.WaitGroup
 ch := make(chan struct{}, 3)
 for i := 0; i < 10; i++ {
  ch <- struct{}{}
  wg.Add(1)
  go func(i int) {
   defer wg.Done()
   log.Println(i)
   time.Sleep(time.Second)
   <-ch
  }(i)
 }
 wg.Wait()
}

上例中創建了緩衝區大小爲 3 的 channel,在沒有被接收的情況下,至多發送 3 個消息則被阻塞。開啓協程前,調用ch <- struct{}{},若緩存區滿,則阻塞。協程任務結束,調用 <-ch 釋放緩衝區。

sync.WaitGroup 並不是必須的,例如 Http 服務,每個請求天然是併發的,此時使用 channel 控制併發處理的任務數量,就不需要 sync.WaitGroup。

運行結果如下:

2022/03/06 20:37:02 0
2022/03/06 20:37:02 2
2022/03/06 20:37:02 1
2022/03/06 20:37:03 3
2022/03/06 20:37:03 4
2022/03/06 20:37:03 5
2022/03/06 20:37:04 6
2022/03/06 20:37:04 7
2022/03/06 20:37:04 8
2022/03/06 20:37:05 9

從日誌中可以很容易看到,每秒鐘只併發執行了 3 個任務,達到了協程併發控制的目的。

2.3 協程池化

上面的例子只是簡單地限制了協程開闢的數量。在此基礎之上,基於對象複用的思想,我們可以重複利用已開闢的協程,避免協程的重複創建銷燬,達到池化的效果。

協程池化,我們可以自己寫一個協程池,但不推薦這麼做。因爲已經有成熟的開源庫可供使用,無需再重複造輪子。目前有很多第三方庫實現了協程池,可以很方便地用來控制協程的併發數量,比較受歡迎的有:

下面以 panjf2000/ants 爲例,簡單介紹其使用。

ants 是一個簡單易用的高性能 Goroutine 池,實現了對大規模 Goroutine 的調度管理和複用,允許使用者在開發併發程序的時候限制 Goroutine 數量,複用協程,達到更高效執行任務的效果。

package main

import (
 "fmt"
 "time"

 "github.com/panjf2000/ants"
)

func main() {
 // Use the common pool
 for i := 0; i < 10; i++ {
  i := i
  ants.Submit(func() {
   fmt.Println(i)
  })
 }
 time.Sleep(time.Second)
}

使用 ants,我們簡單地使用其默認的協程池,直接將任務提交併發執行。默認協程池的缺省容量 math.MaxInt32。

如果自定義協程池容量大小,可以調用 NewPool 方法來實例化具有給定容量的池,如下所示:

// Set 10000 the size of goroutine pool
p, _ := ants.NewPool(10000)
2.4 小結

Golang 爲併發而生。Goroutine 是由 Go 運行時管理的輕量級線程,通過它我們可以輕鬆實現併發編程。Go 雖然輕量,但天下沒有免費的午餐,無休止地開闢大量 Go 程勢必會帶來性能影響,甚至程序崩潰。所以,我們應儘可能的控制協程數量,如果有需要,請複用它。

3. 使用 sync.Once 避免重複執行

3.1 簡介

sync.Once 是 Go 標準庫提供的使函數只執行一次的實現,常應用於單例模式,例如初始化配置、保持數據庫連接等。作用與 init 函數類似,但有區別。

在多數情況下,sync.Once 被用於控制變量的初始化,這個變量的讀寫滿足如下三個條件:

3.2 原理

sync.Once 用來保證函數只執行一次。要達到這個效果,需要做到兩點:

3.2.1 源碼

下面看一下 sync.Once 結構,其有兩個變量。使用 done 統計函數執行次數,使用鎖 m 實現線程安全。果不其然,和上面的猜想一致。

// Once is an object that will perform exactly one action.
//
// A Once must not be copied after first use.
type Once struct {
 // done indicates whether the action has been performed.
 // It is first in the struct because it is used in the hot path.
 // The hot path is inlined at every call site.
 // Placing done first allows more compact instructions on some architectures (amd64/386),
 // and fewer instructions (to calculate offset) on other architectures.
 done uint32
 m    Mutex
}

sync.Once 僅提供了一個導出方法 Do(),參數 f 是隻會被執行一次的函數,一般爲對象初始化函數。

// go version go1.17 darwin/amd64

// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
//  var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
//  config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
 // Note: Here is an incorrect implementation of Do:
 //
 // if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
 //  f()
 // }
 //
 // Do guarantees that when it returns, f has finished.
 // This implementation would not implement that guarantee:
 // given two simultaneous calls, the winner of the cas would
 // call f, and the second would return immediately, without
 // waiting for the first's call to f to complete.
 // This is why the slow path falls back to a mutex, and why
 // the atomic.StoreUint32 must be delayed until after f returns.

 if atomic.LoadUint32(&o.done) == 0 {
  // Outlined slow-path to allow inlining of the fast-path.
  o.doSlow(f)
 }
}

func (o *Once) doSlow(f func()) {
 o.m.Lock()
 defer o.m.Unlock()
 if o.done == 0 {
  defer atomic.StoreUint32(&o.done, 1)
  f()
 }
}

拋去大段的註釋,可以看到 sync.Once 實現非常簡潔。Do() 函數中,通過對成員變量 done 的判斷,來決定是否執行傳入的任務函數。執行任務函數前,通過鎖保證任務函數的執行和 done 的修改是一個互斥操作。在執行任務函數前,對 done 做一個二次判斷,來保證任務函數只會被執行一次,done 只會被修改一次。

3.2.2  done 爲什麼是第一個字段

從字段 done 前有一段註釋,說明了 done 爲什麼是第一個字段。

done 在熱路徑中,done 放在第一個字段,能夠減少 CPU 指令,也就是說,這樣做能夠提升性能。

熱路徑(hot path)是程序非常頻繁執行的一系列指令,sync.Once 絕大部分場景都會訪問 o.done,在熱路徑上是比較好理解的。如果 hot path 編譯後的機器碼指令更少,更直接,必然是能夠提升性能的。

爲什麼放在第一個字段就能夠減少指令呢?因爲結構體第一個字段的地址和結構體的指針是相同的,如果是第一個字段,直接對結構體的指針解引用即可。如果是其他的字段,除了結構體指針外,還需要計算與第一個值的偏移(calculate offset)。在機器碼中,偏移量是隨指令傳遞的附加值,CPU 需要做一次偏移值與指針的加法運算,才能獲取要訪問的值的地址。因爲,訪問第一個字段的機器代碼更緊湊,速度更快。

參考 What does “hot path” mean in the context of sync.Once? - StackOverflow

3.3 性能差異

我們以一個簡單示例,來說明使用 sync.Once 保證函數只會被執行一次和多次執行,二者的性能差異。

考慮一個簡單的場景,函數 ReadConfig 需要讀取環境變量,並轉換爲對應的配置。環境變量在程序執行前已經確定,執行過程中不會發生改變。ReadConfig 可能會被多個協程併發調用,爲了提升性能(減少執行時間和內存佔用),使用 sync.Once 是一個比較好的方式。

type Config struct {
 GoRoot string
 GoPath string
}

var (
 once   sync.Once
 config *Config
)

func ReadConfigWithOnce() *Config {
 once.Do(func() {
  config = &Config{
   GoRoot: os.Getenv("GOROOT"),
   GoPath: os.Getenv("GOPATH"),
  }
 })
 return config
}

func ReadConfig() *Config {
 return &Config{
  GoRoot: os.Getenv("GOROOT"),
  GoPath: os.Getenv("GOPATH"),
 }
}

我們看下二者的性能差異。

func BenchmarkReadConfigWithOnce(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = ReadConfigWithOnce()
 }
}

func BenchmarkReadConfig(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = ReadConfig()
 }
}

執行測試結果如下:

go test -bench=. main/once
goos: darwin
goarch: amd64
pkg: main/once
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkReadConfigWithOnce-12          670438965                1.732 ns/op
BenchmarkReadConfig-12                  13339154                87.46 ns/op
PASS
ok      main/once       3.006s

sync.Once 中保證了 Config 初始化函數僅執行了一次,避免了多次重複初始化,在併發環境下很有用。

4. 使用 sync.Cond 通知協程

4.1 簡介

sync.Cond 是基於互斥鎖 / 讀寫鎖實現的條件變量,用來協調想要訪問共享資源的那些 Goroutine,當共享資源的狀態發生變化的時候,sync.Cond 可以用來通知等待條件發生而阻塞的 Goroutine。

sync.Cond 基於互斥鎖 / 讀寫鎖,它和互斥鎖的區別是什麼呢?

互斥鎖 sync.Mutex 通常用來保護共享的臨界資源,條件變量 sync.Cond 用來協調想要訪問共享資源的 Goroutine。當共享資源的狀態發生變化時,sync.Cond 可以用來通知被阻塞的 Goroutine。

4.2 使用場景

sync.Cond 經常用在多個 Goroutine 等待,一個 Goroutine 通知(事件發生)的場景。如果是一個通知,一個等待,使用互斥鎖或 channel 就能搞定了。

我們想象一個非常簡單的場景:

有一個協程在異步地接收數據,剩下的多個協程必須等待這個協程接收完數據,才能讀取到正確的數據。在這種情況下,如果單純使用 chan 或互斥鎖,那麼只能有一個協程可以等待,並讀取到數據,沒辦法通知其他的協程也讀取數據。

這個時候,就需要有個全局的變量來標誌第一個協程數據是否接受完畢,剩下的協程,反覆檢查該變量的值,直到滿足要求。或者創建多個 channel,每個協程阻塞在一個 channel 上,由接收數據的協程在數據接收完畢後,逐個通知。總之,需要額外的複雜度來完成這件事。

Go 語言在標準庫 sync 中內置一個 sync.Cond 用來解決這類問題。

4.3 原理

sync.Cond 內部維護了一個等待隊列,隊列中存放的是所有在等待這個 sync.Cond 的 Go 程,即保存了一個通知列表。sync.Cond 可以用來喚醒一個或所有因等待條件變量而阻塞的 Go 程,以此來實現多個 Go 程間的同步。

sync.Cond 的定義如下:

// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
 noCopy noCopy

 // L is held while observing or changing the condition
 L Locker

 notify  notifyList
 checker copyChecker
}

每個 Cond 實例都會關聯一個鎖 L(互斥鎖 *Mutex,或讀寫鎖 *RWMutex),當修改條件或者調用 Wait 方法時,必須加鎖。

sync.Cond 的四個成員函數定義如下:

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
 return &Cond{L: l}
}

NewCond 創建 Cond 實例時,需要關聯一個鎖。

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() {
 c.checker.check()
 t := runtime_notifyListAdd(&c.notify)
 c.L.Unlock()
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}

Wait 用於阻塞調用者,等待通知。調用 Wait 會自動釋放鎖 c.L,並掛起調用者所在的 goroutine。如果其他協程調用了 Signal 或 Broadcast 喚醒了該協程,那麼 Wait 方法在結束阻塞時,會重新給 c.L 加鎖,並且繼續執行 Wait 後面的代碼。

對條件的檢查,使用了 for !condition() 而非 if,是因爲當前協程被喚醒時,條件不一定符合要求,需要再次 Wait 等待下次被喚醒。爲了保險起,使用 for 能夠確保條件符合要求後,再執行後續的代碼。

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
 c.checker.check()
 runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
 c.checker.check()
 runtime_notifyListNotifyAll(&c.notify)
}

Signal 只喚醒任意 1 個等待條件變量 c 的 goroutine,無需鎖保護。Broadcast 喚醒所有等待條件變量 c 的 goroutine,無需鎖保護。

4.4 使用示例

我們實現一個簡單的例子,三個協程調用 Wait() 等待,另一個協程調用 Broadcast() 喚醒所有等待的協程。

var done = false

func read(name string, c *sync.Cond) {
 c.L.Lock()
 for !done {
  c.Wait()
 }
 log.Println(name, "starts reading")
 c.L.Unlock()
}

func write(name string, c *sync.Cond) {
 log.Println(name, "starts writing")
 time.Sleep(time.Second)
 done = true
 log.Println(name, "wakes all")
 c.Broadcast()
}

func main() {
 cond := sync.NewCond(&sync.Mutex{})

 go read("reader1", cond)
 go read("reader2", cond)
 go read("reader3", cond)
 write("writer", cond)

 time.Sleep(time.Second * 3)
}

運行輸出:

go run main.go
2022/03/07 17:20:09 writer starts writing
2022/03/07 17:20:10 writer wakes all
2022/03/07 17:20:10 reader3 starts reading
2022/03/07 17:20:10 reader1 starts reading
2022/03/07 17:20:10 reader2 starts reading

更多關於 sync.Cond 的討論可參考 How to correctly use sync.Cond? - StackOverflow

4.5 注意事項

sync.Cond 不能被複制的原因,並不是因爲其內部嵌套了 Locker。因爲 NewCond 時傳入的 Mutex/RWMutex 指針,對於 Mutex 指針複製是沒有問題的。

主要原因是 sync.Cond 內部是維護着一個 Goroutine 通知隊列 notifyList。如果這個隊列被複制的話,那麼就在併發場景下導致不同 Goroutine 之間操作的 notifyList.wait、notifyList.notify 並不是同一個,這會導致出現有些 Goroutine 會一直阻塞。

從等待隊列中按照順序喚醒,先進入等待隊列,先被喚醒。

調用 Wait() 函數前,需要先獲得條件變量的成員鎖,原因是需要互斥地變更條件變量的等待隊列。在 Wait() 返回前,會重新上鎖。

參考文獻

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Lv2XTD-SPnxT2vnPNeREbg