Go 中 sync-map 使用小結

sync.map

前言

Go 中的 map 不是併發安全的,在 Go1.9 之後,引入了sync.Map, 併發安全的 map。

深入瞭解下

對於 map,我們常用的做法就是加鎖。

對於sync.Map這些操作則是不需要的,來看下sync.Map的特點:

簡單的使用栗子:

func syncMapDemo() {

	var smp sync.Map

	// 數據寫入
	smp.Store("name", "小紅")
	smp.Store("age", 18)

	// 數據讀取
	name, _ := smp.Load("name")
	fmt.Println(name)

	age, _ := smp.Load("age")
	fmt.Println(age)

	// 遍歷
	smp.Range(func(key, value interface{}) bool {
		fmt.Println(key, value)
		return true
	})

	// 刪除
	smp.Delete("age")
	age, ok := smp.Load("age")
	fmt.Println("刪除後的查詢")
	fmt.Println(age, ok)

	// 讀取或寫入,存在就讀取,不存在就寫入
	smp.LoadOrStore("age", 100)
	age, _ = smp.Load("age")
	fmt.Println("不存在")
	fmt.Println(age)

	smp.LoadOrStore("age", 99)
	age, _ = smp.Load("age")
	fmt.Println("存在")
	fmt.Println(age)
}

查看下具體的實現

// sync/map.go
type Map struct {
   // 當寫read map 或讀寫dirty map時 需要上鎖
   mu Mutex

   // read map的 k v(entry) 是不變的,刪除只是打標記,插入新key會加鎖寫到dirty中
   // 因此對read map的讀取無需加鎖
   read atomic.Value // 保存readOnly結構體

   // dirty map 對dirty map的操作需要持有mu鎖
   dirty map[interface{}]*entry

   // 當Load操作在read map中未找到,嘗試從dirty中進行加載時(不管是否存在),misses+1
   // 當misses達到diry map len時,dirty被提升爲read 並且重新分配dirty
   misses int
}

// read map數據結構
type readOnly struct {
   m       map[interface{}]*entry
   // 爲true時代表dirty map中含有m中沒有的元素
   amended bool
}

type entry struct {
   // 指向實際的interface{}
   // p有三種狀態:
   // p == nil: 鍵值已經被刪除,此時,m.dirty==nil 或 m.dirty[k]指向該entry
   // p == expunged: 鍵值已經被刪除, 此時, m.dirty!=nil 且 m.dirty不存在該鍵值
   // 其它情況代表實際interface{}地址 如果m.dirty!=nil 則 m.read[key] 和 m.dirty[key] 指向同一個entry
   // 當刪除key時,並不實際刪除,先CAS entry.p爲nil 等到每次dirty map創建時(dirty提升後的第一次新建Key),會將entry.p由nil CAS爲expunged
   p unsafe.Pointer // *interface{}
}

read mapdirty map 的存儲方式是不一致的。

前者使用 atomic.Value,後者只是單純的使用 map。原因是 read map 使用 lock free 操作,必須保證 load/store 的原子性;而 dirty mapload+store 操作是由 lock(就是 mu)來保護的。

1、read 和 dirty 通過 entry 包裝 value,這樣使得 value 的變化和 map 的變化隔離,前者可以用 atomic 無鎖完成
2、Map 的 read 字段結構體定義爲 readOnly,這只是針對 map[interface{}]*entry 而言的,entry 內的內容以及 amended 字段都是可以變的
3、大部分情況下,對已有 key 的刪除 (entry.p 置爲 nil) 和更新可以直接通過修改 entry.p 來完成

Load

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 首先在通過atomic的原子操作讀取內容
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
    // 如果沒在 read 中找到,並且 amended 爲 true,即 dirty 中存在 read 中沒有的 key
	if !ok && read.amended {
       // read調用了atomic的原子性,所以不用加鎖,但是dirty map[interface{}]*entry就需要了,用的互斥鎖
		m.mu.Lock()
        // double check,避免在加鎖的時候dirty map提升爲read map
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
        // 還是沒有找到
		if !ok && read.amended {
            // 從 dirty 中找
			e, ok = m.dirty[key]
            // 不管dirty中有沒有找到 都增加misses計數 該函數可能將dirty map提升爲readmap
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

// 從entry中atomic load實際interface{}
func (e *entry) load() (value interface{}, ok bool) {
  p := atomic.LoadPointer(&e.p)
  if p == nil || p == expunged {
    return nil, false
  }
  return *(*interface{})(p), true
}

梳理下處理的邏輯:

1、首先是 fast path,直接在 read 中找,如果找到了直接調用 entry 的 load 方法,取出其中的值。
2、如果 read 中沒有這個 key,且 amended 爲 fase,說明 dirty 爲空,那直接返回 空和 false。
3、如果 read 中沒有這個 key,且 amended 爲 true,說明 dirty 中可能存在我們要找的 key。當然要先上鎖,再嘗試去 dirty 中查找。在這之前,仍然有一個 double check 的操作。若還是沒有在 read 中找到,那麼就從 dirty 中找。不管 dirty 中有沒有找到,都要 "記一筆",因爲在 dirty 被提升爲 read 之前,都會進入這條路徑

// 增加misses計數,並在必要的時候提升dirty map
// 如果 misses 值小於 m.dirty 的長度,就直接返回。否則,將 m.dirty 晉升爲 read,並清空 dirty,清空 misses 計數值。這樣,之前
// 一段時間新加入的 key 都會進入到 read 中,從而能夠提升 read 的命中率。
func (m *Map) missLocked() {
  m.misses++
  if m.misses < len(m.dirty) {
    return
  }
  // 提升過程很簡單,直接將m.dirty賦給m.read.m
  // 提升完成之後 amended == false m.dirty == nil
  // m.dirty並不立即創建被拷貝元素,而是延遲創建
  m.read.Store(readOnly{m: m.dirty})
  m.dirty = nil
  m.misses = 0
}

對於missLocked會直接將 misses 的值加 1,表示一次未命中,如果 misses 值小於 m.dirty 的長度,就直接返回。否則,將 m.dirty 晉升爲 read,並清空 dirty,清空 misses 計數值。這樣,之前一段時間新加入的 key 都會進入到 read 中,從而能夠提升 read 的命中率。

Store

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    // 如果read map中存在該key  則嘗試直接更改(由於修改的是entry內部的pointer,因此dirty map也可見)
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// 如果 read map 中存在該 key,但 p == expunged,則說明 m.dirty != nil 並且 m.dirty 中不存在該 key 值 此時:
			//    a. 將 p 的狀態由 expunged  更改爲 nil
			//    b. dirty map 插入 key
			m.dirty[key] = e
		}
        // 更新 entry.p = value (read map 和 dirty map 指向同一個 entry)
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
        // 如果 read map 中不存在該 key,但 dirty map 中存在該 key,直接寫入更新 entry(read map 中仍然沒有這個 key)
		e.storeLocked(&value)
	} else {
		// 如果read map和dirty map中都不存在該key,則:
		//    a. 如果dirty map爲空,則需要創建dirty map,並從read map中拷貝未刪除的元素
		//    b. 更新amended字段,標識dirty map中存在read map中沒有的key
		//    c. 將k v寫入dirty map中,read.m不變
		if !read.amended {

			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

// 嘗試直接更新entry 如果p == expunged 返回false
func (e *entry) tryStore(i *interface{}) bool {
  p := atomic.LoadPointer(&e.p)
  if p == expunged {
    return false
  }
  for {
    if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
      return true
    }
    p = atomic.LoadPointer(&e.p)
    if p == expunged {
      return false
    }
  }
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
  return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

// 如果 dirty map爲nil,則從read map中拷貝元素到dirty map
func (m *Map) dirtyLocked() {
  if m.dirty != nil {
    return
  }

  read, _ := m.read.Load().(readOnly)
  m.dirty = make(map[interface{}]*entry, len(read.m))
  for k, e := range read.m {
    // a. 將所有爲 nil的 p 置爲 expunged
    // b. 只拷貝不爲expunged 的 p
    if !e.tryExpungeLocked() {
      m.dirty[k] = e
    }
  }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
  p := atomic.LoadPointer(&e.p)
  for p == nil {
    if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
      return true
    }
    p = atomic.LoadPointer(&e.p)
  }
  return p == expunged
}

梳理下流程:

1、首先還是去 read map 中查詢,存在並且 p!=expunged, 直接修改。(由於修改的是 entry 內部的 pointer,因此 dirty map 也可見)
2、如果 read map 中存在該 key,但 p == expunged。加鎖更新 p 的狀態,然後直接更新該 entry (此時 m.dirtynil 或 m.dirty[key]e)
3、如果 read map 中不存在該 Key,但 dirty map 中存在該 key,直接寫入更新 entry(read map 中仍然沒有)
4、如果 read map 和 dirty map 都不存在該 key

Delete

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
    // 從read map中尋找
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
    // 沒找到
	if !ok && read.amended { // read.amended爲true代表dirty map中含有m中沒有的元素
		m.mu.Lock()
        // double check
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
        // 第二次仍然沒找到,但dirty map中存在,則直接從dirty map刪除
		if !ok && read.amended {
			delete(m.dirty, key)
		}
		m.mu.Unlock()
	}
    // 如果read存在,將entry.p 置爲 nil
	if ok {
		e.delete()
	}
}

func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

梳理下流程:
1、先去 read map 中尋找,如果存在就直接刪除
2、如果沒找到,並且 read.amended 爲 true 代表 dirty map 中存在,依照傳統進行 double check。
3、read map 找到就刪除,沒找到判斷 dirty map 是否存在,存在了就刪除

LoadOrStore

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
	// 首先還是先去read map中查詢
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		actual, loaded, ok := e.tryLoadOrStore(value)
		if ok {
			return actual, loaded
		}
	}

	m.mu.Lock()
    // double check
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
		    // 如果 read map 中存在該 key,但 p == expunged,則說明 m.dirty != nil 並且 m.dirty 中不存在該 key 值 此時:
		    //    a. 將 p 的狀態由 expunged  更改爲 nil
		    //    b. dirty map 插入 key
			m.dirty[key] = e
		}
		actual, loaded, _ = e.tryLoadOrStore(value)
	} else if e, ok := m.dirty[key]; ok {
        // 如果 read map 中不存在該 key,但 dirty map 中存在該 key
		actual, loaded, _ = e.tryLoadOrStore(value)
        // 不管dirty中有沒有找到 都增加misses計數 該函數可能將dirty map提升爲readmap
		m.missLocked()
	} else {
		if !read.amended {
		    // 如果read map和dirty map中都不存在該key,則:
		    //    a. 如果dirty map爲空,則需要創建dirty map,並從read map中拷貝未刪除的元素
		    //    b. 更新amended字段,標識dirty map中存在read map中沒有的key
		    //    c. 將k v寫入dirty map中,read.m不變
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
		actual, loaded = value, false
	}
	m.mu.Unlock()

	return actual, loaded
}

// 如果entry is expunged則不處理,返回false
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == expunged {
		return nil, false, false
	}
	if p != nil {
		return *(*interface{})(p), true, true
	}

	// Copy the interface after the first load to make this method more amenable
	// to escape analysis: if we hit the "load" path or the entry is expunged, we
	// shouldn't bother heap-allocating.
	ic := i
	for {
		if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
			return i, false, true
		}
		p = atomic.LoadPointer(&e.p)
		if p == expunged {
			return nil, false, false
		}
		if p != nil {
			return *(*interface{})(p), true, true
		}
	}
}

這個函數結合了 Load 和 Store 的功能,如果 map 中存在這個 key,那麼返回這個 key 對應的 value;否則,將 key-value 存入 map。
這在需要先執行 Load 查看某個 key 是否存在,之後再更新此 key 對應的 value 時很有效,因爲 LoadOrStore 可以併發執行。

總結

除了 Load/Store/Delete 之外,sync.Map 還提供了 LoadOrStore/Range 操作,但沒有提供 Len() 方法,這是因爲要統計有效的鍵值對只能先提升 dirty map(dirty map 中可能有 read map 中沒有的鍵值對),再遍歷 m.read(由於延遲刪除,不是所有的鍵值對都有效),這其實就是 Range 做的事情,因此在不添加新數據結構支持的情況下,sync.Map 的長度獲取和 Range 操作是同一複雜度的。這部分只能看官方後續支持。

1、sync.map 是線程安全的,讀取,插入,刪除也都保持着常數級的時間複雜度。

2、通過讀寫分離,降低鎖時間來提高效率,適用於讀多寫少的場景。

3、Range 操作需要提供一個函數,參數是 k,v,返回值是一個布爾值:f func(key, value interface{}) bool。

4、調用 Load 或 LoadOrStore 函數時,如果在 read 中沒有找到 key,則會將 misses 值原子地增加 1,當 misses 增加到和 dirty 的長度相等時,會將 dirty 提升爲 read。以期減少 “讀 miss”。

5、新寫入的 key 會保存到 dirty 中,如果這時 dirty 爲 nil,就會先新創建一個 dirty,並將 read 中未被刪除的元素拷貝到 dirty。

6、當 dirty 爲 nil 的時候,read 就代表 map 所有的數據;當 dirty 不爲 nil 的時候,dirty 才代表 map 所有的數據。

流程圖片

最後附上一張不錯的圖片

參考

【Go sync.Map 實現】https://wudaijun.com/2018/02/go-sync-map-implement/
【深度解密 Go 語言之 sync.map】https://www.cnblogs.com/qcrao-2018/p/12833787.html
【圖片】http://russellluo.com/2017/06/go-sync-map-diagram.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://www.cnblogs.com/ricklz/p/13659397.html