揭祕一致性 Hash 算法應用!

一致性 Hash 算法是解決分佈式緩存等問題的一種算法。本文介紹了一致性 Hash 算法的原理,並給出了一種實現和實際運用的案例。

一、背景

考慮這麼一種場景:我們有三臺緩存服務器編號 node0、node1、node2,現在有 3000 萬個 key,希望可以將這些個 key 均勻的緩存到三臺機器上,你會想到什麼方案呢?

我們可能首先想到的方案是:取模算法 hash(key)%N,即:對 key 進行 hash 運算後取模,N 是機器的數量。

這樣,對 key 進行 hash 後的結果對 3 取模,得到的結果一定是 0、1 或者 2,正好對應服務器 node0、node1、node2,存取數據直接找對應的服務器即可,簡單粗暴,完全可以解決上述的問題。

取模算法雖然使用簡單,但對機器數量取模,在集羣擴容和收縮時卻有一定的侷限性:因爲在生產環境中根據業務量的大小,調整服務器數量是常有的事**,而****服務器數量 N 發生變化後 hash(key)%N 計算的結果也會隨之變化**!

比如:一個服務器節點掛了,計算公式從 hash(key)% 3 變成了 hash(key)% 2,結果會發生變化,此時想要訪問一個 key,這個 key 的緩存位置大概率會發生改變,那麼之前緩存 key 的數據也會失去作用與意義。

大量緩存在同一時間失效,造成緩存的雪崩,進而導致整個緩存系統的不可用,這基本上是不能接受的。爲了解決優化上述情況,一致性 hash 算法應運而生~

二、一致性 Hash 算法詳述

一算法原理

一致性哈希算法在 1997 年由麻省理工學院提出,是一種特殊的哈希算法,在移除或者添加一個服務器時,能夠儘可能小地改變已存在的服務請求與處理請求服務器之間的映射關係;

一致性哈希解決了簡單哈希算法在分佈式哈希表(Distributed Hash Table,DHT)中存在的動態伸縮等問題。

一致性 hash 算法本質上也是一種取模算法。不過,不同於上邊按服務器數量取模,一致性 hash 是對固定值 2^32 取模。

IPv4 的地址是 4 組 8 位 2 進制數組成,所以用 2^32 可以保證每個 IP 地址會有唯一的映射。

我們可以將這 2^32 個值抽象成一個圓環⭕️,圓環的正上方的點代表 0,順時針排列,以此類推:1、2、3… 直到 2^32-1,而這個由 2 的 32 次方個點組成的圓環統稱爲 hash 環。

在對服務器進行映射時,使用 hash(服務器 ip)% 2^32,即:使用服務器 IP 地址進行 hash 計算,用哈希後的結果對 2^32 取模,結果一定是一個 0 到 2^32-1 之間的整數。

而這個整數映射在 hash 環上的位置代表了一個服務器,依次將 node0、node1、node2 三個緩存服務器映射到 hash 環上。

在對對應的 Key 映射到具體的服務器時,需要首先計算 Key 的 Hash 值:hash(key)% 2^32。

:此處的 Hash 函數可以和之前計算服務器映射至 Hash 環的函數不同,只要保證取值範圍和 Hash 環的範圍相同即可(即:2^32)

將 Key 映射至服務器遵循下面的邏輯:

從緩存對象 key 的位置開始,沿順時針方向遇到的第一個服務器,便是當前對象將要緩存到的服務器

假設我們有 “semlinker”、“kakuqo”、“lolo”、“fer” 四個對象,分別簡寫爲 o1、o2、o3 和 o4。

首先,使用哈希函數計算這個對象的 hash 值,值的範圍是 [0,2^32-1]:

圖中對象的映射關係如下:

hash(o1) = k1; hash(o2) = k2;
hash(o3) = k3; hash(o4) = k4;

同時 3 臺緩存服務器,分別爲 CS1、CS2 和 CS3:

則可知,各對象和服務器的映射關係如下:

K1 => CS1
K4 => CS3
K2 => CS2
K3 => CS1

即:

以上便是一致性 Hash 的工作原理。

可以看到,一致性 Hash 就是:將原本單個點的 Hash 映射,轉變爲了在一個環上的某個片段上的映射

下面我們來看幾種服務器擴縮容的場景。

(二)服務器擴縮容場景

假設 CS3 服務器出現故障導致服務下線,這時原本存儲於 CS3 服務器的對象 o4,需要被重新分配至 CS2 服務器,其它對象仍存儲在原有的機器上:

此時受影響的數據只有 CS2 和 CS3 服務器之間的部分數據!

假如業務量激增,我們需要增加一臺服務器 CS4,經過同樣的 hash 運算,該服務器最終落於 t1 和 t2 服務器之間,具體如下圖所示:

此時,只有 t1 和 t2 服務器之間的部分對象需要重新分配。

在以上示例中只有 o3 對象需要重新分配,即它被重新到 CS4 服務器。

在前面我們已經說過:如果使用簡單的取模方法,當新添加服務器時可能會導致大部分緩存失效,而使用一致性哈希算法後,這種情況得到了較大的改善,因爲只有少部分對象需要重新分配!

(三)數據偏斜 & 服務器性能平衡問題

在上面給出的例子中,各個服務器幾乎是平均被均攤到 Hash 環上。

但是在實際場景中很難選取到一個 Hash 函數這麼完美的將各個服務器散列到 Hash 環上。

此時,在服務器節點數量太少的情況下,很容易因爲節點分佈不均勻而造成數據傾斜問題。

如下圖被緩存的對象大部分緩存在 node-4 服務器上,導致其他節點資源浪費,系統壓力大部分集中在 node-4 節點上,這樣的集羣是非常不健康的:

同時,還有另一個問題:

在上面新增服務器 CS4 時,CS4 只分擔了 CS1 服務器的負載,服務器 CS2 和 CS3 並沒有因爲 CS4 服務器的加入而減少負載壓力;如果 CS4 服務器的性能與原有服務器的性能一致甚至可能更高,那麼這種結果並不是我們所期望的。

針對上面的問題,我們可以通過:引入虛擬節點來解決負載不均衡的問題:即將每臺物理服務器虛擬爲一組虛擬服務器,將虛擬服務器放置到哈希環上,如果要確定對象的服務器,需先確定對象的虛擬服務器,再由虛擬服務器確定物理服務器

如下圖所示:

在圖中:o1 和 o2 表示對象,v1~v6 表示虛擬服務器,s1~s3 表示實際的物理服務器。

虛擬節點的 hash 計算通常可以採用:對應節點的 IP 地址加數字編號後綴 hash(10.24.23.227#1) 的方式;

舉個例子,node-1 節點 IP 爲 10.24.23.227,正常計算 node-1 的 hash 值:

假設我們給 node-1 設置三個虛擬節點,node-1#1、node-1#2、node-1#3,對它們進行 hash 後取模:

注意:

(四)使用場景

一致性 hash 在分佈式系統中應該是實現負載均衡的首選算法,它的實現比較靈活,既可以在客戶端實現,也可以在中間件上實現,比如日常使用較多的緩存中間件 memcached 和 redis 集羣都有用到它。

memcached 的集羣比較特殊,嚴格來說它只能算是僞集羣,因爲它的服務器之間不能通信,請求的分發路由完全靠客戶端來的計算出緩存對象應該落在哪個服務器上,而它的路由算法用的就是一致性 hash。

還有 redis 集羣中 hash 槽的概念,雖然實現不盡相同,但思想萬變不離其宗,看完本篇的一致性 hash,你再去理解 redis 槽位就輕鬆多了。

其它的應用場景還有很多:

三、一致性 Hash 算法實現

下面我們根據上面的講述,使用 Golang 實現一個一致性 Hash 算法,這個算法具有一些下面的功能特性:

具體源代碼見:
https://github.com/JasonkayZK/consistent-hashing-demo

下面開始實現吧!

(一)結構體、錯誤以及常量定義

首先定義每一臺緩存服務器的數據結構:core/host.go

type Host struct {
  // the host id: ip:port
  Name string
  // the load bound of the host
  LoadBound int64
}

其中:

其次,定義一致性 Hash 的結構:core/algorithm.go

// Consistent is an implementation of consistent-hashing-algorithm
type Consistent struct {
  // the number of replicas
  replicaNum int
  // the total loads of all replicas
  totalLoad int64
  // the hash function for keys
  hashFunc func(key string) uint64
  // the map of virtual nodes  to hosts
  hostMap map[string]*Host
  // the map of hashed virtual nodes to host name
  replicaHostMap map[uint64]string
  // the hash ring
  sortedHostsHashSet []uint64
  // the hash ring lock
  sync.RWMutex
}

其中:

大概的結構如上所示,下面我們來看一些常量和錯誤的定義。

常量的定義如下:core/algorithm.go

const (
  // The format of the host replica name
  hostReplicaFormat = `%s%d`
)
var (
  // the default number of replicas
  defaultReplicaNum = 10
  // the load bound factor
  // ref: https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
  loadBoundFactor = 0.25
  // the default Hash function for keys
  defaultHashFunc = func(key string) uint64 {
    out := sha512.Sum512([]byte(key))
    return binary.LittleEndian.Uint64(out[:])
  }
)

分別表示:

還有一些錯誤的定義:core/error.go

var (
  ErrHostAlreadyExists = errors.New("host already exists")
  ErrHostNotFound = errors.New("host not found")
)

分別表示服務器已經註冊,以及緩存服務器未找到。

下面來看具體的方法實現!

(二)註冊 / 註銷緩存服務器

註冊緩存服務器的代碼如下:core/algorithm.go

func (c *Consistent) RegisterHost(hostName string) error {
  c.Lock()
  defer c.Unlock()
  if _, ok := c.hostMap[hostName]; ok {
    return ErrHostAlreadyExists
  }
  c.hostMap[hostName] = &Host{
    Name:      hostName,
    LoadBound: 0,
  }
  for i := 0; i < c.replicaNum; i++ {
    hashedIdx := c.hashFunc(fmt.Sprintf(hostReplicaFormat, hostName, i))
    c.replicaHostMap[hashedIdx] = hostName
    c.sortedHostsHashSet = append(c.sortedHostsHashSet, hashedIdx)
  }
  // sort hashes in ascending order
  sort.Slice(c.sortedHostsHashSet, func(i int, j int) bool {
    if c.sortedHostsHashSet[i] < c.sortedHostsHashSet[j] {
      return true
    }
    return false
  })
  return nil
}

代碼比較簡單,簡單說一下。

首先,檢查服務器是否已經註冊,如果已經註冊,則直接返回已經註冊的錯誤。

隨後,創建一個 Host 對象,並且在 for 循環中創建多個虛擬節點:

最後,對 Hash 環進行排序。

這裏使用數組作爲 Hash 環只是爲了便於說明,在實際實現中建議選用其他數據結構進行實現,以獲取更好的性能。

當緩存服務器信息寫入 replicaHostMap 映射以及 Hash 環後,即完成了緩存服務器的註冊。

註銷緩存服務器的代碼如下:core/algorithm.go

func (c *Consistent) UnregisterHost(hostName string) error {
  c.Lock()
  defer c.Unlock()
  if _, ok := c.hostMap[hostName]; !ok {
    return ErrHostNotFound
  }
  delete(c.hostMap, hostName)
  for i := 0; i < c.replicaNum; i++ {
    hashedIdx := c.hashFunc(fmt.Sprintf(hostReplicaFormat, hostName, i))
    delete(c.replicaHostMap, hashedIdx)
    c.delHashIndex(hashedIdx)
  }
  return nil
}
// Remove hashed host index from the hash ring
func (c *Consistent) delHashIndex(val uint64) {
  idx := -1
  l := 0
  r := len(c.sortedHostsHashSet) - 1
  for l <= r {
    m := (l + r) / 2
    if c.sortedHostsHashSet[m] == val {
      idx = m
      break
    } else if c.sortedHostsHashSet[m] < val {
      l = m + 1
    } else if c.sortedHostsHashSet[m] > val {
      r = m - 1
    }
  }
  if idx != -1 {
    c.sortedHostsHashSet = append(c.sortedHostsHashSet[:idx], c.sortedHostsHashSet[idx+1:]...)
  }
}

和註冊緩存服務器相反,將服務器在 Map 映射以及 Hash 環中去除即完成了註銷。

這裏的邏輯和上面註冊的邏輯極爲類似,這裏不再贅述!

(三)查詢 Key(核心)

查詢 Key 是整個一致性 Hash 算法的核心,但是實現起來也並不複雜。

代碼如下:core/algorithm.go

func (c *Consistent) GetKey(key string) (string, error) {
  hashedKey := c.hashFunc(key)
  idx := c.searchKey(hashedKey)
  return c.replicaHostMap[c.sortedHostsHashSet[idx]], nil
}
func (c *Consistent) searchKey(key uint64) int {
  idx := sort.Search(len(c.sortedHostsHashSet), func(i int) bool {
    return c.sortedHostsHashSet[i] >= key
  })
  if idx >= len(c.sortedHostsHashSet) {
    // make search as a ring
    idx = 0
  }
  return idx
}

代碼首先計算 key 的散列值;

隨後,在 Hash 環上 “順時針” 尋找可以緩存的第一臺緩存服務器:

idx := sort.Search(len(c.sortedHostsHashSet), func(i int) bool {
    return c.sortedHostsHashSet[i] >= key
})

注意到,如果 key 比當前 Hash 環中最大的虛擬節點的 hash 值還大,則選擇當前 Hash 環中 hash 值最小的一個節點(即 “環形” 的邏輯):

if idx >= len(c.sortedHostsHashSet) {
    // make search as a ring
    idx = 0
}

searchKey 返回了虛擬節點在 Hash 環數組中的 index。

隨後,我們使用 map 返回 index 對應的緩存服務器的名稱即可。至此,一致性 Hash 算法基本實現,接下來我們來驗證一下。

四、一致性 Hash 算法實踐與檢驗

(一)算法驗證前準備

在驗證算法之前,我們還需要準備幾臺緩存服務器。

爲了簡單起見,這裏使用了 HTTP 服務器作爲緩存服務器,具體代碼如下所示:server/main.go

package main
import (
  "flag"
  "fmt"
  "net/http"
  "sync"
  "time"
)
type CachedMap struct {
  KvMap sync.Map
  Lock  sync.RWMutex
}
var (
  cache = CachedMap{KvMap: sync.Map{}}
  port = flag.String("p", "8080", "port")
  regHost = "http://localhost:18888"
  expireTime = 10
)
func main() {
  flag.Parse()
  stopChan := make(chan interface{})
  startServer(*port)
  <-stopChan
}
func startServer(port string) {
  hostName := fmt.Sprintf("localhost:%s", port)
  fmt.Printf("start server: %s\n", port)
  err := registerHost(hostName)
  if err != nil {
    panic(err)
  }
  http.HandleFunc("/", kvHandle)
  err = http.ListenAndServe(":"+port, nil)
  if err != nil {
    err = unregisterHost(hostName)
    if err != nil {
      panic(err)
    }
    panic(err)
  }
}
func kvHandle(w http.ResponseWriter, r *http.Request) {
  _ = r.ParseForm()
  if _, ok := cache.KvMap.Load(r.Form["key"][0]); !ok {
    val := fmt.Sprintf("hello: %s", r.Form["key"][0])
    cache.KvMap.Store(r.Form["key"][0], val)
    fmt.Printf("cached key: {%s: %s}\n", r.Form["key"][0], val)
    time.AfterFunc(time.Duration(expireTime)*time.Second, func() {
      cache.KvMap.Delete(r.Form["key"][0])
      fmt.Printf("removed cached key after 3s: {%s: %s}\n", r.Form["key"][0], val)
    })
  }
  val, _ := cache.KvMap.Load(r.Form["key"][0])
  _, err := fmt.Fprintf(w, val.(string))
  if err != nil {
    panic(err)
  }
}
func registerHost(host string) error {
  resp, err := http.Get(fmt.Sprintf("%s/register?host=%s", regHost, host))
  if err != nil {
    return err
  }
  defer resp.Body.Close()
  return nil
}
func unregisterHost(host string) error {
  resp, err := http.Get(fmt.Sprintf("%s/unregister?host=%s", regHost, host))
  if err != nil {
    return err
  }
  defer resp.Body.Close()
  return nil
}

代碼接受由命令行指定的 - p 參數指定服務器端口號。

代碼執行後,會調用 startServer 函數啓動一個 http 服務器。

在 startServer 函數中,首先調用 registerHost 在代理服務器上進行註冊(下文會講),並監聽 / 路徑,具體代碼如下:

func startServer(port string) {
  hostName := fmt.Sprintf("localhost:%s", port)
  fmt.Printf("start server: %s\n", port)
  err := registerHost(hostName)
  if err != nil {
    panic(err)
  }
  http.HandleFunc("/", kvHandle)
  err = http.ListenAndServe(":"+port, nil)
  if err != nil {
    err = unregisterHost(hostName)
    if err != nil {
      panic(err)
    }
    panic(err)
  }
}

kvHandle 函數對請求進行處理:

func kvHandle(w http.ResponseWriter, r *http.Request) {
  _ = r.ParseForm()
  if _, ok := cache.KvMap.Load(r.Form["key"][0]); !ok {
    val := fmt.Sprintf("hello: %s", r.Form["key"][0])
    cache.KvMap.Store(r.Form["key"][0], val)
    fmt.Printf("cached key: {%s: %s}\n", r.Form["key"][0], val)
    time.AfterFunc(time.Duration(expireTime)*time.Second, func() {
      cache.KvMap.Delete(r.Form["key"][0])
      fmt.Printf("removed cached key after 3s: {%s: %s}\n", r.Form["key"][0], val)
    })
  }
  val, _ := cache.KvMap.Load(r.Form["key"][0])
  _, err := fmt.Fprintf(w, val.(string))
  if err != nil {
    panic(err)
  }
}

首先,解析來自路徑的參數:?key=xxx。

隨後,查詢服務器中的緩存(爲了簡單起見,這裏使用 sync.Map 來模擬緩存):如果緩存不存在,則寫入緩存,並通過 time.AfterFunc 設置緩存過期時間(expireTime)。

最後,返回緩存。

有了緩存服務器之後,我們還需要一個代理服務器來選擇具體選擇哪個緩存服務器來請求。

代碼如下:proxy/proxy.go

package proxy
import (
  "fmt"
  "github.com/jasonkayzk/consistent-hashing-demo/core"
  "io/ioutil"
  "net/http"
  "time"
)
type Proxy struct {
  consistent *core.Consistent
}
// NewProxy creates a new Proxy
func NewProxy(consistent *core.Consistent) *Proxy {
  proxy := &Proxy{
    consistent: consistent,
  }
  return proxy
}
func (p *Proxy) GetKey(key string) (string, error) {
  host, err := p.consistent.GetKey(key)
  if err != nil {
    return "", err
  }
  resp, err := http.Get(fmt.Sprintf("http://%s?key=%s", host, key))
  if err != nil {
    return "", err
  }
  defer resp.Body.Close()
  body, _ := ioutil.ReadAll(resp.Body)
  fmt.Printf("Response from host %s: %s\n", host, string(body))
  return string(body), nil
}
func (p *Proxy) RegisterHost(host string) error {
  err := p.consistent.RegisterHost(host)
  if err != nil {
    return err
  }
  fmt.Println(fmt.Sprintf("register host: %s success", host))
  return nil
}
func (p *Proxy) UnregisterHost(host string) error {
  err := p.consistent.UnregisterHost(host)
  if err != nil {
    return err
  }
  fmt.Println(fmt.Sprintf("unregister host: %s success", host))
  return nil
}

代理服務器的邏輯很簡單,就是創建一個一致性 Hash 結構:Consistent,把 Consistent 和請求緩存服務器的邏輯進行了一層封裝;

(二)算法驗證

啓動代理服務器的代碼如下:

package main
import (
  "fmt"
  "github.com/jasonkayzk/consistent-hashing-demo/core"
  "github.com/jasonkayzk/consistent-hashing-demo/proxy"
  "net/http"
)
var (
  port = "18888"
  p = proxy.NewProxy(core.NewConsistent(10, nil))
)
func main() {
  stopChan := make(chan interface{})
  startServer(port)
  <-stopChan
}
func startServer(port string) {
  http.HandleFunc("/register", registerHost)
  http.HandleFunc("/unregister", unregisterHost)
  http.HandleFunc("/key", getKey)
  fmt.Printf("start proxy server: %s\n", port)
  err := http.ListenAndServe(":"+port, nil)
  if err != nil {
    panic(err)
  }
}
func registerHost(w http.ResponseWriter, r *http.Request) {
  _ = r.ParseForm()
  err := p.RegisterHost(r.Form["host"][0])
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    _, _ = fmt.Fprintf(w, err.Error())
    return
  }
  _, _ = fmt.Fprintf(w, fmt.Sprintf("register host: %s success", r.Form["host"][0]))
}
func unregisterHost(w http.ResponseWriter, r *http.Request) {
  _ = r.ParseForm()
  err := p.UnregisterHost(r.Form["host"][0])
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    _, _ = fmt.Fprintf(w, err.Error())
    return
  }
  _, _ = fmt.Fprintf(w, fmt.Sprintf("unregister host: %s success", r.Form["host"][0]))
}
func getKey(w http.ResponseWriter, r *http.Request) {
  _ = r.ParseForm()
  val, err := p.GetKey(r.Form["key"][0])
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    _, _ = fmt.Fprintf(w, err.Error())
    return
  }
  _, _ = fmt.Fprintf(w, fmt.Sprintf("key: %s, val: %s", r.Form["key"][0], val))
}

和緩存服務器類似,這裏採用 HTTP 服務器來模擬。

代理服務器監聽 18888 端口的幾個路由:

這裏爲了簡單起見,使用了這種方式進行服務註冊,實際使用時請使用其他組件進行實現!

接下來啓動緩存服務器:

start proxy server: 18888

分別啓動三個緩存服務器:

$ go run server/main.go -p 8080
start server: 8080
$ go run server/main.go -p 8081
start server: 8081
$ go run server/main.go -p 8082
start server: 8082

同時,代理服務器輸出:

register host: localhost:8080 success
register host: localhost:8081 success
register host: localhost:8082 success

可以看到緩存服務器已經成功註冊。

可以使用 curl 命令請求代理服務器獲取緩存 key:

$ curl localhost:18888/key?key=123
key: 123, val: hello: 123

此時,代理服務器輸出:

Response from host localhost:8080: hello: 123

同時,8000 端口的緩存服務器輸出:

cached key: {123: hello: 123}
removed cached key after 10s: {123: hello: 123}

可以看到,8000 端口的服務器對 key 值進行了緩存,並在 10 秒後清除了緩存;

嘗試獲取多個 Key:

Response from host localhost:8082: hello: 45363456
Response from host localhost:8080: hello: 4
Response from host localhost:8082: hello: 1
Response from host localhost:8080: hello: 2
Response from host localhost:8082: hello: 3
Response from host localhost:8080: hello: 4
Response from host localhost:8082: hello: 5
Response from host localhost:8080: hello: 6
Response from host localhost:8082: hello: sdkbnfoerwtnbre
Response from host localhost:8082: hello: sd45555254tg423i5gvj4v5
Response from host localhost:8081: hello: 0
Response from host localhost:8082: hello: 032452345

可以看到不同的 key 被散列到了不同的緩存服務器。

接下來我們通過 debug 查看具體的變量來一探究竟。

開啓 debug,並註冊單個緩存服務器後,查看 Consistent 中的值:

註冊三個緩存服務器後,查看 Consistent 中的值:

從 debug 中的變量,我們就可以很清楚的看到註冊不同數量的服務器時,一致性 Hash 上服務器的動態變化!

以上就是基本的一致性 Hash 算法的實現了!

但是很多時候,我們的緩存服務器需要同時處理大量的緩存請求,而通過上面的算法,我們總是會去同一臺緩存服務器去獲取緩存數據。

如果很多的熱點數據都落在了同一臺緩存服務器上,則可能會出現性能瓶頸。

Google 在 2017 年提出了:含有負載邊界值的一致性 Hash 算法。

下面我們在基本的一致性 Hash 算法的基礎上,實現含有負載邊界值的一致性 Hash!

五、含有負載邊界值的一致性 Hash

(一)算法描述

17 年時,Google 提出了含有負載邊界值的一致性 Hash 算法,此算法主要應用於在實現一致性的同時,實現負載的平均性。

此算法最初由 Vimeo 的 Andrew Rodland 在 haproxy 中實現並開源。

參考:
https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html

arvix 論文地址:
https://arxiv.org/abs/1608.01350

這個算法將緩存服務器視爲一個含有一定容量的桶(可以簡單理解爲 Hash 桶),將客戶端視爲球,則平均性目標表示爲:所有約等於平均密度(球的數量除以桶的數量):

實際使用時,可以設定一個平均密度的參數ε,將每個桶的容量設置爲平均加載時間的下上限 (1+ε);

具體的計算過程如下:

例如下面的圖:

使用哈希函數將 6 個球和 3 個桶分配給 Hash 環上的隨機位置,假設每個桶的容量設置爲 2,按 ID 值的遞增順序分配球。

然後 6 號球順時針移動,首先擊中 B 桶;但是桶 B 的容量爲 2,並且已經包含球 3 和 4,所以球 6 繼續移動到達桶 C,但該桶也已滿;最後,球 6 最終進入具有備用插槽的桶 A。

(二)算法實現

在上面基本一致性 Hash 算法實現的基礎上,我們繼續實現含有負載邊界值的一致性 Hash 算法。

在覈心算法中添加根據負載情況查詢 Key 的函數,以及增加 / 釋放負載值的函數。

根據負載情況查詢 Key 的函數:core/algorithm.go

func (c *Consistent) GetKeyLeast(key string) (string, error) {
  c.RLock()
  defer c.RUnlock()
  if len(c.replicaHostMap) == 0 {
    return "", ErrHostNotFound
  }
  hashedKey := c.hashFunc(key)
  idx := c.searchKey(hashedKey) // Find the first host that may serve the key
  i := idx
  for {
    host := c.replicaHostMap[c.sortedHostsHashSet[i]]
    loadChecked, err := c.checkLoadCapacity(host)
    if err != nil {
      return "", err
    }
    if loadChecked {
      return host, nil
    }
    i++
    // if idx goes to the end of the ring, start from the beginning
    if i >= len(c.replicaHostMap) {
      i = 0
    }
  }
}
func (c *Consistent) checkLoadCapacity(host string) (bool, error) {
  // a safety check if someone performed c.Done more than needed
  if c.totalLoad < 0 {
    c.totalLoad = 0
  }
  var avgLoadPerNode float64
  avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.hostMap)))
  if avgLoadPerNode == 0 {
    avgLoadPerNode = 1
  }
  avgLoadPerNode = math.Ceil(avgLoadPerNode * (1 + loadBoundFactor))
  candidateHost, ok := c.hostMap[host]
  if !ok {
    return false, ErrHostNotFound
  }
  if float64(candidateHost.LoadBound)+1 <= avgLoadPerNode {
    return true, nil
  }
  return false, nil
}

在 GetKeyLeast 函數中,首先根據 searchKey 函數,順時針獲取可能滿足條件的第一個虛擬節點。

隨後調用 checkLoadCapacity 校驗當前緩存服務器的負載數是否滿足條件:

candidateHost.LoadBound+1<=(c.totalLoad+1)/len(hosts)*(1+loadBoundFactor)

如果不滿足條件,則沿着 Hash 環走到下一個虛擬節點,繼續判斷是否滿足條件,直到滿足條件。

這裏使用的是無條件的 for 循環,因爲一定存在低於平均負載 *(1+loadBoundFactor) 的虛擬節點!

增加 / 釋放負載值的函數:core/algorithm.go

func (c *Consistent) Inc(hostName string) {
  c.Lock()
  defer c.Unlock()
  atomic.AddInt64(&c.hostMap[hostName].LoadBound, 1)
  atomic.AddInt64(&c.totalLoad, 1)
}
func (c *Consistent) Done(host string) {
  c.Lock()
  defer c.Unlock()
  if _, ok := c.hostMap[host]; !ok {
    return
  }
  atomic.AddInt64(&c.hostMap[host].LoadBound, -1)
  atomic.AddInt64(&c.totalLoad, -1)
}

邏輯比較簡單,就是原子的對對應緩存服務器進行負載加減一操作。

(三)算法測試

在代理服務器中增加路由:proxy/proxy.go

func (p *Proxy) GetKeyLeast(key string) (string, error) {
  host, err := p.consistent.GetKeyLeast(key)
  if err != nil {
    return "", err
  }
  p.consistent.Inc(host)
  time.AfterFunc(time.Second*10, func() { // drop the host after 10 seconds(for testing)!
    fmt.Printf("dropping host: %s after 10 second\n", host)
    p.consistent.Done(host)
  })
  resp, err := http.Get(fmt.Sprintf("http://%s?key=%s", host, key))
  if err != nil {
    return "", err
  }
  defer resp.Body.Close()
  body, _ := ioutil.ReadAll(resp.Body)
  fmt.Printf("Response from host %s: %s\n", host, string(body))
  return string(body), nil
}

注意:這裏模擬的是單個 key 請求可能會持續 10s 鍾。

啓動代理服務器時增加路由:main.go

func startServer(port string) {
    // ......
  http.HandleFunc("/key_least", getKeyLeast)
  // ......
}
func getKeyLeast(w http.ResponseWriter, r *http.Request) {
  _ = r.ParseForm()
  val, err := p.GetKeyLeast(r.Form["key"][0])
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    _, _ = fmt.Fprintf(w, err.Error())
    return
  }
  _, _ = fmt.Fprintf(w, fmt.Sprintf("key: %s, val: %s", r.Form["key"][0], val))
}

啓動代理服務器,並開啓三臺緩存服務器。

通過下面的命令獲取含有負載邊界的 Key:

$ curl localhost:18888/key_least?key=123
key: 123, val: hello: 123

多次請求後的結果如下:

start proxy server: 18888
register host: localhost:8080 success
register host: localhost:8081 success
register host: localhost:8082 success
Response from host localhost:8080: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8082: hello: 123

可以看到,緩存被均攤到了其他服務器(這是由於一個緩存請求會持續 10s 導致的)!

六、總結

本文拋磚引玉的講解了一致性 Hash 算法的原理,並提供了 Go 的實現。在此基礎之上,根據 Google 的論文實現了帶有負載邊界的一致性 Hash 算法。

當然上面的代碼在實際生產環境下仍然需要部分改進,如:

大家在實際使用時,可以根據需要,搭配實際的組件!

參考資料:

  1. 不會一致性 hash 算法,勸你簡歷別寫搞過負載均衡

  2. 圖解一致性哈希算法

  3. 一致性 hash 算法詳解

  4. 一致哈希

6.Consistent Hashing with Bounded Loads 

  1. 源代碼: https://github.com/JasonkayZK/consistent-hashing-demo

作者簡介

張凱

騰訊後臺開發工程師

騰訊後臺開發工程師,畢業於華南理工大學。目前負責騰訊電子籤的後端開發工作。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/PvKlAaWcotqulUzTapJg3w