Go 中祕而不宣的數據結構 Treap:平衡樹不一定就用紅黑樹
treap 是一棵二叉樹,它同時維護二叉搜索樹 (BST) 和堆的屬性, 所以由此得名 (tree + heap ⇒ treap)。
從形式上講,treap (tree + heap) 是一棵二叉樹,其節點包含兩個值,一個 key 和一個 priority,這樣 key 保持 BST 屬性,priority 是一個保持 heap 屬性的隨機值(至於是最大堆還是最小堆並不重要)。相對於其他的平衡二叉搜索樹,treap 的特點是實現簡單,且能基本實現隨機平衡的結構。屬於弱平衡樹。
treap 由 Raimund Siedel 和 Cecilia Aragon 於 1989 年提出。
treap 通常也被稱爲 “笛卡爾樹”,因爲它很容易嵌入到笛卡爾平面中:
具體來說,treap 是一種在二叉樹中存儲鍵值對 (X,Y) 的數據結構,其特點是:按 X 值滿足二叉搜索樹的性質,同時按 Y 值滿足二叉堆的性質。如果樹中某個節點包含值 (X₀,Y₀),那麼:
-
左子樹中所有節點的 X 值都滿足
X ≤ X₀(BST 屬性) -
右子樹中所有節點的 X 值都滿足
X₀ ≤ X(BST 屬性) -
左右子樹中所有節點的 Y 值都滿足 Y ≤ Y₀ (堆屬性。這裏以最大堆爲例)
在這種實現中, X 是鍵(同時也是存儲在 Treap 中的值),並且 Y 稱爲優先級。如果沒有優先級,則 treap 將是一個常規的二叉搜索樹。
優先級(前提是每個節點的優先級都不相同)的特殊之處在於:它們可以確定性地決定樹的最終結構(不會受到插入數據順序的影響)。這一點是可以通過相關定理來證明的。這裏有個巧妙的設計:如果我們隨機分配這些優先級值,就能在平均情況下得到一棵比較平衡的樹(避免樹退化成鏈表)。這樣就能保證主要操作(如查找、插入、刪除等)的時間複雜度保持在 O(log N) 水平。正是因爲這種隨機分配優先級的特點,這種數據結構也被稱爲 "隨機二叉搜索樹"。

Treap 維護堆性質的方法用到了旋轉,且只需要進行兩種旋轉操作,因此編程複雜度較紅黑樹、AVL 樹要小一些。
紅黑樹的操作:插入_以最大堆爲例_給節點隨機分配一個優先級,先和二叉搜索樹的插入一樣,先把要插入的點插入到一個葉子上,然後跟維護堆一樣進行以下操作:
-
如果當前節點的優先級比父節點大就進行 2. 或 3. 的操作
-
如果當前節點是父節點的左子葉就右旋
-
如果當前節點是父節點的右子葉就左旋。
刪除
因爲 treap 滿足堆性質,所以只需要把要刪除的節點旋轉到葉節點上,然後直接刪除就可以了。具體的方法就是每次找到優先級最大的子葉,向與其相反的方向旋轉,直到那個節點被旋轉到了葉節點,然後直接刪除。
查找
和一般的二叉搜索樹一樣,但是由於 treap 的隨機化結構,Treap 中查找的期望複雜度是 O(logn)
以上是 treap 數據結構的背景知識,如果你想了解更多關於 treap 的知識,你可以參考
-
https://en.wikipedia.org/wiki/Treap
-
https://medium.com/carpanese/a-visual-introduction-to-treap-data-structure-part-1-6196d6cc12ee
-
https://cp-algorithms.com/data_structures/treap.html
Go 運行時的 treap 和用途
在 Go 運行時 sema.go#semaRoot[1] 中,定義了一個數據結構 semaRoot:
type semaRoot struct {
lock mutex
treap *sudog // 不重複的等待者(goroutine)的平衡樹(treap)的根節點
nwait atomic.Uint32 // 等待者(goroutine)的數量
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
waiters uint16
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
這是 Go 語言互斥鎖 (Mutex) 底層實現中的關鍵數據結構,用於管理等待獲取互斥鎖的 goroutine 隊列。我們已經知道,在獲取 sync.Mutex 時,如果鎖已經被其它 goroutine 獲取,那麼當前請求鎖的 goroutine 會被 block 住,就會被放入到這樣一個數據結構中 (所以你也知道這個數據結構中的 goroutine 都是唯一的,不重複)。
semaRoot 保存了一個平衡樹,樹中的 sudog 節點都有不同的地址 (s.elem) , 每個 sudog 可能通過 s.waitlink 指向一個鏈表,該鏈表包含等待相同地址的其他 sudog。對具有相同地址的 sudog 內部鏈表的操作時間複雜度都是 O(1).。掃描頂層 semaRoot 列表的時間複雜度是 O(log n), 其中 n 是具有被阻塞 goroutine 的不同地址的數量(這些地址會散列到給定的 semaRoot)。
semaRoot 的 treap *sudog 其實就是一個 treap, 我們來看看它的實現。
增加一個元素(入隊)
增加一個等待的 goroutine(sudog) 到 semaRoot 的 treap 中,如果 lifo 爲 true,則將 s 替換到 t 的位置,否則將 s 添加到 t 的等待列表的末尾。
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
// 設置這個要加入的節點
s.g = getg()
s.elem = unsafe.Pointer(addr)
s.next = nil
s.prev = nil
s.waiters = 0
var last *sudog
pt := &root.treap
// 從根節點開始
for t := *pt; t != nil; t = *pt { // ①
// 如果地址已經在列表中,則加入到這個地址的鏈表中
if t.elem == unsafe.Pointer(addr) {
// 如果地址已經在列表中,並且指定了先入後出flag,這是一個替換操作
if lifo {
// 替換操作
*pt = s
s.ticket = t.ticket
... // 把t的各種信息複製給s
} else {
// 增加到到等待列表的末尾
if t.waittail == nil {
t.waitlink = s
} else {
t.waittail.waitlink = s
}
t.waittail = s
s.waitlink = nil
if t.waiters+1 != 0 {
t.waiters++
}
}
return
}
last = t
// 二叉搜索樹查找
if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) { // ②
pt = &t.prev
} else {
pt = &t.next
}
}
// 爲新節點設置ticket.這個ticket是一個隨機值,作爲隨機堆的優先級,用於保持treap的平衡。
s.ticket = cheaprand() | 1 // ③
s.parent = last
*pt = s
// 根據優先級(ticket)旋轉以保持treap的平衡
for s.parent != nil && s.parent.ticket > s.ticket { // ④
if s.parent.prev == s {
root.rotateRight(s.parent) // ⑤
} else {
if s.parent.next != s {
panic("semaRoot queue")
}
root.rotateLeft(s.parent) // ⑥
}
}
}
① 是遍歷 treap 的過程,當然它是通過搜索二叉樹的方式實現。addr就是我們一開始講的 treap 的 key,也就是 s.elem。首先檢查 addr 已經在 treap 中,如果存在,那麼就把 s 加入到 addr 對應的 sudog 鏈表中,或者替換掉 addr 對應的 sudog。
這個addr, 如果對於sync.Mutex來說,就是 Mutex.sema字段的地址。
type Mutex struct {
state int32
sema uint32
}
所以對於阻塞在同一個sync.Mutex上的 goroutine,他們的addr是相同的,所以他們會被加入到同一個sudog鏈表中。如果是不同的sync.Mutex鎖,他們的addr是不同的,那麼他們會被加入到這個 treap 不同的節點。
進而,你可以知道,這個rootSema是維護多個sync.Mutex的等待隊列的,可以快速找到不同的sync.Mutex的等待隊列, 也可以維護同一個sync.Mutex的等待隊列。這給了我們啓發,如果你有類似的需求,可以參考這個實現。
③ 就是設置這個節點的優先級,它是一個隨機值,用於保持 treap 的平衡。這裏有個技巧就是總是把優先級最低位設置爲 1,這樣保證優先級不爲 0. 因爲優先級經常和 0 做比較,我們將最低位設置爲 1,就表明優先級已經設置。
④ 就是將這個新加入的節點旋轉到合適的位置,以保持 treap 的平衡。這裏的旋轉操作就是上面提到的左旋和右旋。稍後看。
移除一個元素(出隊)
對應的,還有出對的操作。這個操作就是從 treap 中移除一個節點,這個節點就是一個等待的 goroutine(sudog)。
dequeue 搜索並找到在semaRoot中第一個因addr而阻塞的goroutine。比如需要喚醒一個 goroutine, 讓它繼續執行 (比如直接將鎖交給它,或者喚醒它去爭搶鎖)。
func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now, tailtime int64) {
ps := &root.treap
s := *ps
for ; s != nil; s = *ps { // ①, 二叉搜索樹查找
if s.elem == unsafe.Pointer(addr) { // ②
goto Found
}
if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
ps = &s.prev
} else {
ps = &s.next
}
}
return nil, 0, 0
Found: // ③
...
if t := s.waitlink; t != nil { // ④
*ps = t
...
} else { // ⑤
// 旋轉s到葉節點,以便刪除
for s.next != nil || s.prev != nil {
if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
root.rotateRight(s)
} else {
root.rotateLeft(s)
}
}
// ⑤ 刪除s
if s.parent != nil {
if s.parent.prev == s {
s.parent.prev = nil
} else {
s.parent.next = nil
}
} else {
root.treap = nil
}
tailtime = s.acquiretime
}
... // 清理s的不需要的信息
return s, now, tailtime
}
① 是遍歷 treap 的過程,當然它是通過搜索二叉樹的方式實現。addr就是我們一開始講的 treap 的 key,也就是 s.elem。如果找到了,就跳到 Found 標籤。如果沒有找到,就返回 nil。
④ 是檢查這個地址上是不是有多個等待的 goroutine,如果有,就把這個節點替換成鏈表中的下一個節點。把這個節點從 treap 中移除並返回。如果就一個 goroutine,那麼把這個移除掉後,需要旋轉 treap,直到這個節點被旋轉到葉節點,然後刪除這個節點。
這裏的旋轉操作就是上面提到的左旋和右旋。
左旋 rotateLeft
rotateLeft 函數將以 x 爲根的子樹左旋,使其變爲 y 爲根的子樹。左旋之前的結構爲 (x a (y b c)),旋轉後變爲 (y (x a b) c)。
func (root *semaRoot) rotateLeft(x *sudog) {
// p -> (x a (y b c))
p := x.parent
y := x.next
b := y.prev
y.prev = x // ①
x.parent = y // ②
x.next = b // ③
if b != nil {
b.parent = x // ④
}
y.parent = p // ⑤
if p == nil {
root.treap = y // ⑥
} else if p.prev == x { // ⑦
p.prev = y
} else {
if p.next != x {
throw("semaRoot rotateLeft")
}
p.next = y
}
}
具體步驟:
-
將
y設爲x的父節點 (②),x設爲y的左子節點 (①)。 -
將
b設爲x的右子節點 (③),並更新其父節點爲x(④)。 -
更新
y的父節點爲p(⑤),即x的原父節點。如果p爲 nil,則 y 成爲新的樹根 (⑥)。 -
根據
y是p的左子節點還是右子節點,更新對應的指針 (⑦)。
左旋爲
右旋 rotateRight
rotateRight 旋轉以節點 y 爲根的樹。將 (y (x a b) c) 變爲 (x a (y b c))。
func (root *semaRoot) rotateRight(y *sudog) {
// p -> (y (x a b) c)
p := y.parent
x := y.prev
b := x.next
x.next = y // ①
y.parent = x // ②
y.prev = b // ③
if b != nil {
b.parent = y // ④
}
x.parent = p // ⑤
if p == nil {
root.treap = x // ⑥
} else if p.prev == y { // ⑦
p.prev = x
} else {
if p.next != y {
throw("semaRoot rotateRight")
}
p.next = x
}
}
具體步驟:
-
將 y 設爲 x 的右子節點 (①), x 設爲 y 的父節點 (②)
-
將 b 設爲 y 的左子節點 (③),並更新其父節點爲 y(④)
-
更新 x 的父節點爲 p(⑤),即 y 的原父節點。如果 p 爲 nil,則 x 成爲新的樹根 (⑥)
-
根據 x 是 p 的左子節點還是右子節點,更新對應的指針 (⑦)
右旋爲
理解了左旋和右旋,你就理解了出隊代碼中這一段爲什麼把當前節點旋轉到葉結點中了:
// 旋轉s到葉節點,以便刪除
for s.next != nil || s.prev != nil {
if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
root.rotateRight(s)
} else {
root.rotateLeft(s)
}
}
整體上看,treap 這個數據結構確實簡單可維護。左旋和右旋的代碼量很少,結合圖看起來也容易理解。出入隊的代碼也很簡單,只是簡單的二叉搜索樹的操作,加上旋轉操作。
這是我介紹的 Go 祕而不宣的數據結構第三篇,希望你喜歡。你還希望看到 Go 運行時和標準庫中的哪些數據結構呢,歡迎留言。
參考資料
[1]
sema.go#semaRoot: https://github.com/golang/go/blob/master/src/runtime/sema.go#L40
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/VLKMHMs0iQyzPvikcNjUNQ