Go 中祕而不宣的數據結構 Treap:平衡樹不一定就用紅黑樹


treap 是一棵二叉樹,它同時維護二叉搜索樹 (BST) 和堆的屬性, 所以由此得名 (tree + heap   ⇒  treap)。

從形式上講,treap (tree + heap) 是一棵二叉樹,其節點包含兩個值,一個  key  和一個  priority,這樣 key 保持 BST 屬性,priority 是一個保持 heap 屬性的隨機值(至於是最大堆還是最小堆並不重要)。相對於其他的平衡二叉搜索樹,treap 的特點是實現簡單,且能基本實現隨機平衡的結構。屬於弱平衡樹。

treap 由 Raimund Siedel 和 Cecilia Aragon 於 1989 年提出。

treap 通常也被稱爲 “笛卡爾樹”,因爲它很容易嵌入到笛卡爾平面中: 

具體來說,treap 是一種在二叉樹中存儲鍵值對 (X,Y) 的數據結構,其特點是:按 X 值滿足二叉搜索樹的性質,同時按 Y 值滿足二叉堆的性質。如果樹中某個節點包含值 (X₀,Y₀),那麼:

在這種實現中,  X 是鍵(同時也是存儲在 Treap 中的值),並且   Y 稱爲優先級。如果沒有優先級,則 treap 將是一個常規的二叉搜索樹。

優先級(前提是每個節點的優先級都不相同)的特殊之處在於:它們可以確定性地決定樹的最終結構(不會受到插入數據順序的影響)。這一點是可以通過相關定理來證明的。這裏有個巧妙的設計:如果我們隨機分配這些優先級值,就能在平均情況下得到一棵比較平衡的樹(避免樹退化成鏈表)。這樣就能保證主要操作(如查找、插入、刪除等)的時間複雜度保持在 O(log N) 水平。正是因爲這種隨機分配優先級的特點,這種數據結構也被稱爲 "隨機二叉搜索樹"。

![](Pasted image 20241026113542.png)

Treap 維護堆性質的方法用到了旋轉,且只需要進行兩種旋轉操作,因此編程複雜度較紅黑樹、AVL 樹要小一些。

紅黑樹的操作:插入_以最大堆爲例_給節點隨機分配一個優先級,先和二叉搜索樹的插入一樣,先把要插入的點插入到一個葉子上,然後跟維護堆一樣進行以下操作:

  1. 如果當前節點的優先級比父節點大就進行 2. 或 3. 的操作

  2. 如果當前節點是父節點的左子葉就右旋

  3. 如果當前節點是父節點的右子葉就左旋。

刪除

因爲 treap 滿足堆性質,所以只需要把要刪除的節點旋轉到葉節點上,然後直接刪除就可以了。具體的方法就是每次找到優先級最大的子葉,向與其相反的方向旋轉,直到那個節點被旋轉到了葉節點,然後直接刪除。

查找

和一般的二叉搜索樹一樣,但是由於 treap 的隨機化結構,Treap 中查找的期望複雜度是 O(logn)

以上是 treap 數據結構的背景知識,如果你想了解更多關於 treap 的知識,你可以參考

Go 運行時的 treap 和用途

在 Go 運行時 sema.go#semaRoot[1] 中,定義了一個數據結構 semaRoot:

type semaRoot struct {
 lock  mutex
 treap *sudog        // 不重複的等待者(goroutine)的平衡樹(treap)的根節點
 nwait atomic.Uint32 // 等待者(goroutine)的數量
}

type sudog struct {
 g *g

 next *sudog
 prev *sudog
 elem unsafe.Pointer // data element (may point to stack)

 acquiretime int64
 releasetime int64
 ticket      uint32

 isSelect bool
 success bool

 waiters uint16

 parent   *sudog // semaRoot binary tree
 waitlink *sudog // g.waiting list or semaRoot
 waittail *sudog // semaRoot
 c        *hchan // channel
}

這是 Go 語言互斥鎖 (Mutex) 底層實現中的關鍵數據結構,用於管理等待獲取互斥鎖的 goroutine 隊列。我們已經知道,在獲取 sync.Mutex 時,如果鎖已經被其它 goroutine 獲取,那麼當前請求鎖的 goroutine 會被 block 住,就會被放入到這樣一個數據結構中 (所以你也知道這個數據結構中的 goroutine 都是唯一的,不重複)。

semaRoot 保存了一個平衡樹,樹中的 sudog 節點都有不同的地址 (s.elem) , 每個 sudog 可能通過 s.waitlink 指向一個鏈表,該鏈表包含等待相同地址的其他 sudog。對具有相同地址的 sudog 內部鏈表的操作時間複雜度都是 O(1).。掃描頂層 semaRoot 列表的時間複雜度是 O(log n), 其中 n 是具有被阻塞 goroutine 的不同地址的數量(這些地址會散列到給定的 semaRoot)。

semaRoottreap *sudog 其實就是一個 treap, 我們來看看它的實現。

增加一個元素(入隊)

增加一個等待的 goroutine(sudog) 到 semaRoottreap 中,如果 lifotrue,則將 s 替換到 t 的位置,否則將 s 添加到 t 的等待列表的末尾。

func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
 // 設置這個要加入的節點
 s.g = getg()
 s.elem = unsafe.Pointer(addr)
 s.next = nil
 s.prev = nil
 s.waiters = 0

 var last *sudog
 pt := &root.treap
 // 從根節點開始
 for t := *pt; t != nil; t = *pt { // ①
  // 如果地址已經在列表中,則加入到這個地址的鏈表中
  if t.elem == unsafe.Pointer(addr) {
   // 如果地址已經在列表中,並且指定了先入後出flag,這是一個替換操作
   if lifo {
    // 替換操作
    *pt = s
    s.ticket = t.ticket
    ... // 把t的各種信息複製給s
   } else {
    // 增加到到等待列表的末尾
    if t.waittail == nil {
     t.waitlink = s
    } else {
     t.waittail.waitlink = s
    }
    t.waittail = s
    s.waitlink = nil
    if t.waiters+1 != 0 {
     t.waiters++
    }
   }
   return
  }
  last = t
  // 二叉搜索樹查找
  if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) { // ②
   pt = &t.prev
  } else {
   pt = &t.next
  }
 }

 // 爲新節點設置ticket.這個ticket是一個隨機值,作爲隨機堆的優先級,用於保持treap的平衡。
 s.ticket = cheaprand() | 1 // ③
 s.parent = last
 *pt = s

 // 根據優先級(ticket)旋轉以保持treap的平衡
 for s.parent != nil && s.parent.ticket > s.ticket { // ④
  if s.parent.prev == s {
   root.rotateRight(s.parent) // ⑤
  } else {
   if s.parent.next != s {
    panic("semaRoot queue")
   }
   root.rotateLeft(s.parent) // ⑥
  }
 }
}

① 是遍歷 treap 的過程,當然它是通過搜索二叉樹的方式實現。addr就是我們一開始講的 treap 的 key,也就是 s.elem。首先檢查 addr 已經在 treap 中,如果存在,那麼就把 s 加入到 addr 對應的 sudog 鏈表中,或者替換掉 addr 對應的 sudog

這個addr, 如果對於sync.Mutex來說,就是 Mutex.sema字段的地址。

type Mutex struct {
 state int32
 sema  uint32
}

所以對於阻塞在同一個sync.Mutex上的 goroutine,他們的addr是相同的,所以他們會被加入到同一個sudog鏈表中。如果是不同的sync.Mutex鎖,他們的addr是不同的,那麼他們會被加入到這個 treap 不同的節點。

進而,你可以知道,這個rootSema是維護多個sync.Mutex的等待隊列的,可以快速找到不同的sync.Mutex的等待隊列, 也可以維護同一個sync.Mutex的等待隊列。這給了我們啓發,如果你有類似的需求,可以參考這個實現。

③ 就是設置這個節點的優先級,它是一個隨機值,用於保持 treap 的平衡。這裏有個技巧就是總是把優先級最低位設置爲 1,這樣保證優先級不爲 0. 因爲優先級經常和 0 做比較,我們將最低位設置爲 1,就表明優先級已經設置。

④ 就是將這個新加入的節點旋轉到合適的位置,以保持 treap 的平衡。這裏的旋轉操作就是上面提到的左旋和右旋。稍後看。

移除一個元素(出隊)

對應的,還有出對的操作。這個操作就是從 treap 中移除一個節點,這個節點就是一個等待的 goroutine(sudog)。

dequeue 搜索並找到在semaRoot中第一個因addr而阻塞的goroutine。比如需要喚醒一個 goroutine, 讓它繼續執行 (比如直接將鎖交給它,或者喚醒它去爭搶鎖)。

func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now, tailtime int64) {
 ps := &root.treap
 s := *ps
 for ; s != nil; s = *ps { // ①, 二叉搜索樹查找
  if s.elem == unsafe.Pointer(addr) { // ②
   goto Found
  }
  if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
   ps = &s.prev
  } else {
   ps = &s.next
  }
 }
 return nil, 0, 0

Found: // ③
 ...
 if t := s.waitlink; t != nil { // ④
  *ps = t
  ...
 } else { // ⑤
  // 旋轉s到葉節點,以便刪除
  for s.next != nil || s.prev != nil {
   if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
    root.rotateRight(s)
   } else {
    root.rotateLeft(s)
   }
  }

  // ⑤ 刪除s
  if s.parent != nil {
   if s.parent.prev == s {
    s.parent.prev = nil
   } else {
    s.parent.next = nil
   }
  } else {
   root.treap = nil
  }
  tailtime = s.acquiretime
 }
 ... // 清理s的不需要的信息
 return s, now, tailtime
}

① 是遍歷 treap 的過程,當然它是通過搜索二叉樹的方式實現。addr就是我們一開始講的 treap 的 key,也就是 s.elem。如果找到了,就跳到 Found 標籤。如果沒有找到,就返回 nil

④ 是檢查這個地址上是不是有多個等待的 goroutine,如果有,就把這個節點替換成鏈表中的下一個節點。把這個節點從 treap 中移除並返回。如果就一個 goroutine,那麼把這個移除掉後,需要旋轉 treap,直到這個節點被旋轉到葉節點,然後刪除這個節點。

這裏的旋轉操作就是上面提到的左旋和右旋。

左旋 rotateLeft

rotateLeft 函數將以 x 爲根的子樹左旋,使其變爲 y 爲根的子樹。左旋之前的結構爲 (x a (y b c)),旋轉後變爲 (y (x a b) c)

func (root *semaRoot) rotateLeft(x *sudog) {
 // p -> (x a (y b c))
 p := x.parent
 y := x.next
 b := y.prev

 y.prev = x // ①
 x.parent = y // ②
 x.next = b // ③
 if b != nil {
  b.parent = x // ④
 }

 y.parent = p // ⑤
 if p == nil {
  root.treap = y // ⑥
 } else if p.prev == x { // ⑦
  p.prev = y
 } else {
  if p.next != x {
   throw("semaRoot rotateLeft")
  }
  p.next = y
 }
}

具體步驟:

 左旋爲

右旋 rotateRight

rotateRight 旋轉以節點 y 爲根的樹。將 (y (x a b) c) 變爲 (x a (y b c))

func (root *semaRoot) rotateRight(y *sudog) {
 // p -> ((x a b) c)
 p := y.parent
 x := y.prev
 b := x.next

 x.next = y // ①
 y.parent = x // ②
 y.prev = b // ③
 if b != nil {
  b.parent = y // ④
 }

 x.parent = p // ⑤
 if p == nil {
  root.treap = x // ⑥
 } else if p.prev == y { // ⑦
  p.prev = x
 } else {
  if p.next != y {
   throw("semaRoot rotateRight")
  }
  p.next = x
 }
}

具體步驟:

 右旋爲 

理解了左旋和右旋,你就理解了出隊代碼中這一段爲什麼把當前節點旋轉到葉結點中了:

  // 旋轉s到葉節點,以便刪除
  for s.next != nil || s.prev != nil {
   if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
    root.rotateRight(s)
   } else {
    root.rotateLeft(s)
   }
  }

整體上看,treap 這個數據結構確實簡單可維護。左旋和右旋的代碼量很少,結合圖看起來也容易理解。出入隊的代碼也很簡單,只是簡單的二叉搜索樹的操作,加上旋轉操作。

這是我介紹的 Go 祕而不宣的數據結構第三篇,希望你喜歡。你還希望看到 Go 運行時和標準庫中的哪些數據結構呢,歡迎留言。

參考資料

[1]

sema.go#semaRoot: https://github.com/golang/go/blob/master/src/runtime/sema.go#L40

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VLKMHMs0iQyzPvikcNjUNQ