Golang 分佈式系統

一、分佈式 id 生成器

在高併發場景中,通常需要類似 MySQL 自增 id 一樣不斷增長且不會重複的 id。

比如某電商雙 11 時,在 0:00 開始,會有千萬到億級的訂單湧入,每秒要處理 10w+ 的訂單。在將訂單插入數據庫前,我們需要給訂單一個唯一的 id 再插入數據庫內。也正因爲訂單量大,一個無意義的純數字 id 在對數據庫進行增刪改查時不能起到優化作用。此 id 內應該包含一些時間信息,這樣即使後端的系統對消息進行了分庫分表,也能夠以時間順序對這些消息進行排序。

Twitter 的 snowflake 算法是這種場景下的一個典型解法,原理如圖:

首先確定的是,id 數值長度是 64 位,int64 類型,被分爲四個部分(不含開頭的符號 / unused ):

這樣的機制可以支持一臺機器在一毫秒內能夠產生4096條消息。一秒共 409.6w 條消息。從值域上來講完全夠用。

數據中心 id 加上實例 id 共有 10 位,每個數據中心可以部署 32 臺實例,搭建 32 個數據中心,所以可以一共部署 1024 臺實例。

41 位的時間戳(毫秒爲單位)能夠使用 69 年。

1 worker_id 分配

timestampdatacenter_idworker_idsequence_id這四個字段中,timestampsequence_id是由程序在運行期生成的。但datacenter_idworker_id需要在部署階段就能夠獲取得到,並且一旦程序啓動之後,就是不可更改的了。如果可以隨意更改,可能被不慎修改,造成最終生成的 id 有衝突。

一般不同數據中心的機器,會提供對應的獲取數據中心 id 的 API,所以datacenter_id我們可以在部署階段輕鬆地獲取到。而 worker_id 是我們邏輯上給機器分配的一個 id,這個要怎麼辦呢?比較簡單的想法是由能夠提供這種自增 id 功能的工具來支持,比如 MySQL:

mysql> insert into a (ip) values("10.1.2.101");
Query OK, 1 row affected (0.00 sec)

mysql> select last_insert_id();
+------------------+
| last_insert_id() |
+------------------+
|                2 |
+------------------+
1 row in set (0.00 sec)

從 MySQL 中獲取到worker_id之後,就把這個worker_id直接持久化到本地,以避免每次上線時都需要獲取新的worker_id。讓單實例的worker_id可以始終保持不變。

當然,使用 MySQL 相當於給我們簡單的 id 生成服務增加了一個外部依賴。依賴越多,我們的服務的可運維性就越差。

考慮到集羣中即使有單個 id 生成服務的實例掛了,也就是損失一段時間的一部分 id,所以我們也可以更簡單暴力一些,把worker_id直接寫在 worker 的配置中,上線時,由部署腳本完成worker_id字段替換。

2 開源實例

2.1 標準 snowflake

github.com/bwmarrin/snowflake 是一個相當輕量化的 snowflake 的 Go 實現。其文檔對各位使用的定義如下圖所示:

此庫和標準的 snowflake 實現方式全完一致,使用也比較簡單:

package main

import (
    // 示例代碼只導入提供核心功能的package,其他內置package自行導入,下同
    "github.com/bwmarrin/snowflake"
)

func main() {
    node, err := snowflake.NewNode(1)
    if err != nil {
        println(err.Error())
        os.Exit(1)
    }

    for i := 0; i < 20; i++ {
        id := node.Generate()

        fmt.Printf("Int64  ID: %d\n", id)
        fmt.Printf("String ID: %s\n", id)
        fmt.Printf("Base2  ID: %s\n", id.Base2())
        fmt.Printf("Base64 ID: %s\n", id.Base64())

        fmt.Printf("ID Time  : %d\n", id.Time())

        fmt.Printf("ID Node  : %d\n", id.Node())

        fmt.Printf("ID Step  : %d\n", id.Step())

        fmt.Println("---------------------------------------------")
    }

}

這個庫因爲是單文件,所以也方便定製,其源碼本身就預留了一些可定製字段:

var (
    // Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC in milliseconds
    // You may customize this to set a different epoch for your application.
    Epoch int64 = 1288834974657

    // NodeBits holds the number of bits to use for Node
    // Remember, you have a total 22 bits to share between Node/Step
    NodeBits uint8 = 10

    // StepBits holds the number of bits to use for Step
    // Remember, you have a total 22 bits to share between Node/Step
    StepBits uint8 = 12
)

Epoch爲起始時間,NodeBits是實例 id 的位長,StepBits是自增 id 的位長。

2.2 sonyflake

sonyflake 同樣是受 Twitter 的 snowflake 啓發而來,但 sonyflake 側重於多主機 / 實例的生命週期和性能,所以與 snowflake 使用不同的位分配:

sonyflake 的優點和缺點:

sonyflake 在啓動階段需要配置參數:

func NewSonyflake(st Settings) *Sonyflake

Settings數據結構如下:

type Settings struct {
    StartTime      time.Time
    MachineID      func() (uint16, error)
    CheckMachineID func(uint16) bool
}

使用示例:

package main

import (
    "github.com/sony/sonyflake"
)

func getMachineID() (uint16, error) {
    var machineID uint16 = 6
    return machineID, nil
}

func checkMachineID(machineID uint16) bool {
    existsMachines := []uint16{1, 2, 3, 4, 5}
    for _, v := range existsMachines {
        if v == machineID {
            return false
        }
    }
    return true
}

func main() {
    t, _ := time.Parse("2006-01-02""2021-01-01")
    settings := sonyflake.Settings{
        StartTime:      t,
        MachineID:      getMachineID,
        CheckMachineID: checkMachineID,
    }

    sf := sonyflake.NewSonyflake(settings)

    for i := 0; i < 10; i++ {
        id, err := sf.NextID()
        if err != nil {
            fmt.Println(err)
            os.Exit(1)
        }
        fmt.Println(id)
    }

}

二、分佈式鎖

單機程序併發或並行修改全局變量時,需要對修改行爲加鎖以創造臨界區。如果不加鎖,得到的結果將不準確,如:

package main

import (
    "sync"
)

// 全局變量
var counter int

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
        defer wg.Done()
            counter++
        }()
    }

    wg.Wait()
    println(counter)
}

多次運行結果不同:

➜  go run main.go              
884
➜  go run main.go
957
➜  go run main.go
923

1 進程內加鎖

想要得到正確的結果,要把計數器的操作代碼部分加上鎖:

    var wg sync.WaitGroup
    var lock sync.Mutex
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            lock.Lock()
            counter++
            lock.Unlock()
        }()
    }
    wg.Wait()
    println(counter)

這樣能夠得到正確結果:

➜  go run main.go
1000

2 trylock

在某些場景,我們只希望一個任務有單一的執行者,而不像計數器一樣,所有的 Goroutine 都成功執行。後續的 Goroutine 在搶鎖失敗後,需要放棄執行,這時候就需要嘗試加鎖 / trylock

嘗試加鎖,在加鎖成功後執行後續流程,失敗時不可以阻塞,而是直接返回加鎖的結果。

在 Go 語言中可以用大小爲 1 的 Channel 來模擬 trylock:

// Lock try lock
type Lock struct {
    c chan struct{}
}

// Lock try lock, return lock result
func (l Lock) Lock() bool {
    result := false
    select {
    case <-l.c:
        result = true
    default:
    }
    return result
}

// Unlock the try lock
func (l Lock) Unlock() {
    l.c <- struct{}{}
}

// NewLock generate a try lock
func NewLock() Lock {
    var l Lock
    l.c = make(chan struct{}, 1)
    l.c <- struct{}{}
    return l
}

func main() {
    var lock = NewLock()
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if !lock.Lock() {
                println("lock failed")
                return
            }
            counter++
            println("current counter: ", counter)
            lock.Unlock()
        }()
    }
    wg.Wait()
}

每個 Goruntine 只有成功執行了Lock()纔會繼續執行後續代碼,因此在Unlock()時可以保證 Lock 結構體裏的 Channel 一定是空的,所以不會阻塞也不會失敗。

在單機系統中,trylock 並不是一個好選擇,因爲大量的 Goruntine 搶鎖會無意義地佔用 cpu 資源,這就是活鎖。

活鎖指是的程序看起來在正常執行,但 cpu 週期被浪費在搶鎖而非執行任務上,從而程序整體的執行效率低下。活鎖的問題定位起來很麻煩,所以在單機場景下,不建議使用這種鎖。

3 基於 Redis 的 setnx

在分佈式場景中,也需要 “搶佔” 的邏輯,可以用 Redis 的setnx實現:

package main

import (
    "github.com/go-redis/redis"
)

func setnx() {
    client := redis.NewClient(&redis.Options{})

    var lockKey = "counter_lock"
    var counterKey = "counter"

    // lock
    resp := client.SetNX(lockKey, 1, time.Second*5)
    lockStatus, err := resp.Result()
    if err != nil || !lockStatus {
        println("lock failed")
        return
    }

    // counter++
    getResp := client.Get(counterKey)
    cntValue, err := getResp.Int64()
    if err == nil || err == redis.Nil {
        cntValue++
        resp := client.Set(counterKey, cntValue, 0)
        _, err := resp.Result()
        if err != nil {
            println(err)
        }
    }
    println("current counter is ", cntValue)

    // unlock
    delResp := client.Del(lockKey)
    unlockStatus, err := delResp.Result()
    if err == nil && unlockStatus > 0 {
        println("unlock success")
    } else {
        println("unlock failed", err)
    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            setnx()
        }()
    }
    wg.Wait()
}

運行結果:

➜  go run main.go
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
current counter is  34
lock failed
unlock success

通過代碼和執行結果可以看到,遠程調用setnx運行流程上和單機的 trolock 非常相似,如果獲取鎖失敗,那麼相關的任務邏輯就不會繼續向後執行。

setnx很適合高併發場景下用來爭搶一些 “唯一” 的資源。比如交易攝合系統中賣家發起訂單,多個買家會對其進行併發爭搶。這種場景我們沒有辦法依賴具體的時間來判斷先後,因爲不同設備的時間不能保證使用的是統一的時間,也就不能保證時序。

所以,我們需要依賴於這些請求到達 redis 節點的順序來做正確的搶鎖操作。

如果用戶的網絡環境比較差,是必然搶不到的。

4 基於 ZooKeeper

基於 ZooKeeper 的鎖與基於 Redis 的鎖不同之處在於 Lock 成功之前會一直阻塞,這與單機場景中的mutex.Lock很相似。

package main

import (
    "github.com/go-zookeeper/zk"
)

func main() {
    c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second)
    if err != nil {
        panic(err)
    }
    l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
    err = l.Lock()
    if err != nil {
        panic(err)
    }
    println("lock success, do your business logic")

    time.Sleep(time.Second * 10) // 模擬業務處理

    l.Unlock()
    println("unlock success, finish business logic")
}

其原理也是基於臨時 Sequence 節點和 watch API,例如我們這裏使用的是/lock節點。Lock 會在該節點下的節點列表中插入自己的值,只要節點下的子節點發生變化,就會通知所有 watch 該節點的程序。這時候程序會檢查當前節點下最小的子節點的 id 是否與自己的一致。如果一致,說明加鎖成功了。

這種分佈式的阻塞鎖比較適合分佈式任務調度場景,但不適合高頻次持鎖時間短的搶鎖場景。按照 Google 的 Chubby 論文裏的闡述,基於強一致協議的鎖適用於粗粒度的加鎖操作。這裏的粗粒度指鎖佔用時間較長。我們在使用時也應思考在自己的業務場景中使用是否合適。

5 基於 etcd

etcd 是與 ZooKeeper 類似的分佈式組件,也能實現與 ZooKeeper 鎖相似的功能:

package main

import (
    clientv3 "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

func main() {
c := make(chan os.Signal)
    signal.Notify(c)

    client, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        panic(err)
    }
    defer client.Close()

    lockKey := "/lock"

    go func() {
        session, err := concurrency.NewSession(client)
        if err != nil {
            panic(err)
        }
        m := concurrency.NewMutex(session, lockKey)
        if err := m.Lock(context.Background()); err != nil {
            panic("go1 get mutex failed " + err.Error())
        }
        fmt.Printf("go1 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(10) * time.Second)
        m.Unlock(context.Background())
        fmt.Printf("go1 release lock\n")
    }()

    go func() {
        time.Sleep(time.Duration(2) * time.Second)
        session, err := concurrency.NewSession(client)
        if err != nil {
            panic(err)
        }
        m := concurrency.NewMutex(session, lockKey)
        if err := m.Lock(context.Background()); err != nil {
            panic("go2 get mutex failed " + err.Error())
        }
        fmt.Printf("go2 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(2) * time.Second)
        m.Unlock(context.Background())
        fmt.Printf("go2 release lock\n")
    }()

    <-c
}

三、延時任務系統

實時處理並反饋給用戶是系統的主要內容,但有時也會遇到非實時的任務需求,比如在確定的時間點發布重要公告,或者需要在用戶做了一件事情的幾分鐘後對其作出特定動作,比如發個優惠券。

如果業務規模比較小,有時也可以通過數據庫配置輪詢來對這種任務進行簡單處理,但一旦上了規模,就要尋找更好的解決方案。

一般有兩種思路來解決這個問題:

  1. 實現一套類似 crontab 的分佈式定時任務管理系統

  2. 實現一個支持定時發送消息的消息隊列

兩種思路衍生出了一些不同的系統,但本質都差不多,都是需要實現一個定時器(timer)。

在單機的場景下定時器其實並不少見,如在和網絡數據庫連接時會調用SetReadDeadline()函數,就是在本地創建一個定時器,在到達指定的時間後,我們會接收到定時器的通知,告訴我們時間已到,如果此時讀取未完成,就可以認爲是發生了網絡問題,從而中斷讀取。

下面從定時器開始, 探究延時任務系統的實現。

1 定時器的實現

定時器(timer)的實現在工業界已經是有解的問題了,常見的就是時間堆(堆排序)和時間輪。

1.1 時間堆

是一種經過排序的完全二叉樹,其中任一非終端節點的數據值均不大於(或不小於)其左子節點和右子節點的值。

最常見的時間堆一般用最小堆實現。

最小堆:根節點的鍵值是所有堆節點鍵值中最小者的堆

如下圖:

對於定時器來說,如果堆頂元素比當前的時間還要大,說明堆項內所有元素都比當前時間大,進而說明這個時刻我們還沒有必要對時間堆進行任何處理。定時檢查的時間複雜度是 𝑂(1) 。

當我們發現堆項的元素小於當前時間時,說明可能已經有一批事件過期了,這時進行正常的彈出的堆調操作就好。每次堆調整的時間複雜度是 𝑂(𝑙𝑜𝑔𝑁) 。

Go 自身的內置定時器就是用時間堆實現的,不過使用的不是二叉樹堆,而是扁平一些的四叉堆:

性質:父節點比其 4 個子節點都小,子節點之間沒有特別的大小關係要求。

四叉堆中元素超時和堆調整與二叉堆沒有本質區別。

1.2 時間輪

用時間輪來實現定時器時,需要定義每一個格子的 “刻度”,可以將時間想象成一個時鐘,中心有秒針順時針轉動,每次轉動到一個刻度時,需要去查看該刻度掛載的任務列表是否已經有到期的任務。

從結構上來講,時間輪和哈希表很相似,如果把哈希算法定義爲:觸發時間 % 時間輪元素大小,那麼這就是一個簡單的哈希表。在哈希衝突時,採用鏈表掛載哈希衝突的定時器。

除了這種單層時間輪,還有一些時間輪採用多層實現。

用 Go 實現的時間輪項目不多,下面這個是其中性能較好的時間輪的延時任務示例:

package main

import (
    "github.com/antlabs/timer"
)

// 一次性定時器
func after(tm timer.Timer) {
    var wg sync.WaitGroup
    wg.Add(2)
    defer wg.Wait()

    go func() {
        defer wg.Done()
        tm.AfterFunc(1*time.Second, func() {
            log.Printf("after 1 second\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(10*time.Second, func() {
            log.Printf("after 10 seconds\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(30*time.Second, func() {
            log.Printf("after 30 seconds\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(time.Minute, func() {
            log.Printf("after 1 minute\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(time.Minute+30*time.Second, func() {
            log.Printf("after 1 minute and 30 seconds\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(2*time.Minute+45*time.Second, func() {
            log.Printf("after 2 minutes and 45 seconds\n")
        })
    }()
}

// 週期性定時器
func schedule(tm timer.Timer) {
    tm.ScheduleFunc(500*time.Millisecond, func() {
        log.Printf("schedule 500 milliseconds\n")
    })

    tm.ScheduleFunc(time.Second, func() {
        log.Printf("schedule 1 second\n")
    })

    tm.ScheduleFunc(20*time.Second, func() {
        log.Printf("schedule 20 seconds\n")
    })

    tm.ScheduleFunc(1*time.Minute, func() {
        log.Printf("schedule 1 minute\n")
    })
}

func main() {
    tm := timer.NewTimer()
    defer tm.Stop()

    // go after(tm)
    go schedule(tm)

    go func() {
        time.Sleep(2*time.Minute + 50*time.Second)
        tm.Stop()
    }()

    tm.Run()
}

2 任務分發

有了基本的定時器實現方案,如果我們開發的是單機系統,就可以開始幹活了,但如果是分佈式,還需要把 “定時” 或“延時”的任務分發出去。

示例圖:

每個實例每隔一個小時,會從數據庫裏獲取下一個小時需要處理的定時任務,獲取的時候只取符合task_id % shard_count = shard_id的任務。

當這些定時任務被觸發之後需要通知用戶側,有兩種思路實現:

  1. 將任務被觸發的信息封裝成一條消息,發往消息隊列,由用戶側對消息隊列進行監聽

  2. 對用戶預先配置的回調函數進行調用

兩種方案各有優缺點,如果採用1,那麼如果消息隊列出現故障會導致整個系統不可用,當然,現在的隊列消息一般都是高可用,大多數時候不用擔心這個問題。其次,一般業務流程中間走消息隊列的話會導致延時增加,定時任務若必須在觸發後的幾十毫秒到幾百毫秒內完成的話,採取消息隊列就有一定的風險。

如果採用2,會加重定時任務系統的負擔。單機的定時器執行時最害怕的就是回調函數執行時間過長,這樣會阻塞後續的任務執行。在分佈式場景下,這種憂慮依然存在——一個不嚴謹的業務回調可能會直接拖垮整個定時任務系統。所以我們還要考慮在回調的基礎上增加經過測試的超時時間設置,並且對由用戶填入的超時時間做慎重的審覈。

3 數據再平衡的冪等考量

當任務執行集羣的機器故障時,需要對任務進行重新分配。按照之前的求模策略,對這臺機器還沒有處理的任務進行重新分配就比較麻煩,如果實際運行的線上系統,還要在故障時的任務平衡方面花更多的心思。

參考 Elasticsearch 的數據分佈設計,每份任務數據都有多個副本,這裏假設有兩個副本,如下圖所示:

一份數據雖然有兩個持有者,但持有者持有的副本會進行區分,比如持有的是主副本還是非主副本。

一個任務只會在持有主副本的節點上被執行。

當有機器故障時,任務數據需要進行數據再平衡的工作,比如節點 1 掛了:

節點 1的數據會被遷移到節點 2節點 3上。

當然,也可以用稍微複雜一點的思路,比如對集羣中的節點進行角色劃分,由協調節點來做這種故障時的任務重新分配工作,考慮到高可用,協調節點可能也需要有 1 至 2 個備用節點以防不測。

使用消息隊列時,很多隊列不支持exactly once語義,這種情況下需要讓用戶自己來負責消息的去重或者消費的冪等處理。

四、分佈式搜索引擎

MySQL 很脆弱,數據庫系統本身要保證實時和強一致性,所以其功能設計上都是爲了滿足這種一致性需求。比如 write ahead log 的設計,基於 B+ 樹實現的索引和數據組織,以及基於 MVCC 實現的事務等等。

關係型數據庫一般被用於實現 OLTP 系統:

聯機事務處理OLTP, Online transaction processing)是指透過信息系統、電腦網絡及數據庫,以在線交易的方式處理一般即時性的作業資料,和更早期傳統數據庫系統大量批量的作業方式並不相同。OLTP 通常被運用於自動化的資料處理工作,如訂單輸入、金融業務… 等反覆性的日常性交易活動。和其相對的是屬於決策分析層次的聯機分析處理(OLAP)。

在互聯網的業務場景中,也有一些實時性要求不高(可以接受多秒的延遲),但是查詢複雜性卻很高的場景。比如,在電商的 WMS 系統中,或者在大多數業務場景豐富的 CRM 或者客服系統中,可能需要提供幾十個字段的隨意組合查詢功能。這種系統的數據維度天生衆多,比如一個電商的 WMS 中對一件貨物的描述,可能有下面這些字段:

倉庫 id,入庫時間,庫位分區 id,儲存貨架 id,入庫操作員 id,出庫操作員 id,庫存數量,過期時間,SKU 類型,產品品牌,產品分類,內件數量

除了上述信息,如果商品在倉庫內有流轉。可能還有關聯的流程 id,當前的流轉狀態等。

如果經營一個大型電商,每天千萬級別的訂單,那麼在這個數據庫中查詢和建立合適的索引都是一件非常難的事情。

在 CRM 或客服類系統中,常常有根據關鍵字進行搜索的需求,大型互聯網公司每天接收數以萬計的用戶請求。考慮到事件溯源,用戶的請求至少要保存 2~3 年,這些數據是千萬級甚至上億級的數據,如果根據關鍵字進行一次 like 查詢,可能整個 MySQL 就崩潰了。

這時,就需要搜索引擎解決。

1 搜索引擎

Elasticsearch是開源分佈式搜索引擎的霸主,其依賴於 Lucene 實現,在部署和運維方面做了很多優化。當今建立一個分佈式搜索引擎比起 Sphinx 的時代已經容易很多了,只需要簡單配置客戶端 IP 和端口就行。

1.1 倒排列表

雖然Elasticsearch是針對搜索場景來定製的,但如前文所言,實際應用中常常用Elasticsearch來作爲數據庫使用,就是因爲倒排列表的特性。可以用比較樸素的觀點來理解倒排索引:

Elasticsearch中的數據進行查詢時,本質就是求多個排好序的序列求交集。非數值類型字段涉及到分詞問題,大多數內部使用場景下,可以直接使用默認的 bi-gram 分詞:

即將所有TiT(i+1)組成一個詞(在Elasticsearch中叫 term),然後再編排其倒排列表,這樣倒排列表大概就是這樣的:

當用戶搜索 “天氣很好” 時,其實就是求:天氣、氣很、很好三組倒排列表的交集。但這裏的相等判斷邏輯有些特殊,用僞代碼表示:

func equal() {
    if postEntry.docID of '天氣' == postEntry.docID of '氣很' &&
        postEntry.offset + 1 of '天氣' == postEntry.offset of '氣很' {
            return true
    }

    if postEntry.docID of '氣很' == postEntry.docID of '很好' &&
        postEntry.offset + 1 of '氣很' == postEntry.offset of '很好' {
        return true
    }

    if postEntry.docID of '天氣' == postEntry.docID of '很好' &&
        postEntry.offset + 2 of '天氣' == postEntry.offset of '很好' {
        return true
    }

    return false
}

多個有序列表求交集的時間複雜度是 𝑂(𝑁∗𝑀) ,𝑁 爲給定列表中元素數最小的集合,𝑀 爲給定列表的個數。

在整個算法中起決定作用的一是最短的倒排列表的長度,其次是詞數總和,一般詞數不會很大,所以起決定性作用的,一般是所有倒排列表中最短的那一個的長度。

因此,文檔總數很多的情況下,搜索詞的倒排列表最短的那一個不長時,搜索速度也會很快。如果用關係型數據庫,那就需要按照索引來慢慢掃描。

2 查詢 DSL

Elasticsearch定義了一套查詢 DSL,當我們把Elasticsearch當數據庫使用時,需要用到其 bool 查詢,如:

{
  "query"{
    "bool"{
      "must"[
        {
          "match"{
            "field_1"{
              "query""1",
              "type""phrase"
            }
          }
        },
        {
          "match"{
            "field_2"{
              "query""2",
              "type""phrase"
            }
          }
        },
        {
          "match"{
            "field_3"{
              "query""3",
              "type""phrase"
            }
          }
        },
        {
          "match"{
            "field_4"{
              "query""4",
              "type""phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size"1
}

看起來比較麻煩,但表達的意思很簡單:

if field_1 == 1 && field_2 == 2 && field_3 == 3 && field_4 == 4 {
    return true
}

用 bool should query 可以表示 or 的邏輯:

{
  "query"{
    "bool"{
      "should"[
        {
          "match"{
            "field_1"{
              "query""1",
              "type""phrase"
            }
          }
        },
        {
          "match"{
            "field_2"{
              "query""3",
              "type""phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size"1
}

這裏表示的是類似:

if field_1 == 1 || field_2 == 2 {
    return true
}

這些 Go 代碼裏 if 後面跟着的表達式在編程語言中有專有名詞來表達Boolean Expression

4 > 1
5 == 2
3 < i && x > 10

ElasticsearchBool Query方案,就是用 json 來表達了這種程序語言中的 Boolean Expression,爲什麼可以這麼做呢?因爲 json 本身是可以表達樹形結構的,程序代碼在被編繹器 parse 後,也會變成 AST,而 AST 抽象語法樹就是樹形結構。理論上 json 能夠完備地表達一段程序代碼被 parse 之後的結果。這裏的 Boolean Expression 被編繹器 parse 後也會生成差不多的樹形結構,而且只是整個編繹器實現的一個很小的子集。

3 基於 client SDK 開發

elasticsearch 官方 Go 客戶端版本已更新到 8,且客戶端內容較多,單獨寫了一篇文章以供參考:Elasticsearch Go 客戶端。

因爲 Lucene 的性質,本質上搜索引擎內的數據是不可變的,所以如果要對文檔進行更新,Lucene 內部是按照 id 進行完全覆蓋(本質是取同一 id 最新的 segment 中的數據)的操作,所以與插入的情況是一樣的。

使用Elasticsearch作爲數據庫使用時,需要注意,因爲Elasticsearch有索引合併的操作,所以從數據插入到Elasticsearch中到可以查詢需要一段時間(由Elasticsearch的 refresh_interval 決定)。所以千萬不要把Elasticsearch當成強一致的關係型數據庫使用。

4 將 sql 轉換爲 DSL

比如一段 bool 表達式user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1),寫成 SQL 是如下形式:

select * from xxx 
where user_id = 1 and
(
    product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1
)

寫成Elasticsearch的 DSL 是如下形式:

{
  "query"{
    "bool"{
      "must"[
        {
          "match"{
            "user_id"{
              "query""1",
              "type""phrase"
            }
          }
        },
        {
          "match"{
            "product_id"{
              "query""1",
              "type""phrase"
            }
          }
        },
        {
          "bool"{
            "should"[
              {
                "match"{
                  "star_num"{
                    "query""4",
                    "type""phrase"
                  }
                }
              },
              {
                "match"{
                  "star_num"{
                    "query""5",
                    "type""phrase"
                  }
                }
              }
            ]
          }
        },
        {
          "match"{
            "banned"{
              "query""1",
              "type""phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size"1
}

Elasticsearch的 DSL 雖然很好理解,但是手寫起來非常費勁。前面提供了基於 SDK 的方式來寫,但也不足夠靈活。

SQL 的 where 部分就是 boolean expression。這種 bool 表達式被解析後,和Elasticsearch的 DSL 結構長得差不多,所以可以直接將 SQL 轉換爲 DSL。

SQL 的 where 被 parse 之後的結構和Elasticsearch的 DSL 結構對比:

既然結構上完全一致,邏輯上就可以相互轉換。以廣度優先對 AST 樹進行遍歷,然後將二元表達式轉換成 json 字符串,再拼裝起來就可以了,參考:github.com/cch123/elasticsql

5 異構數據同步

在實際應用中,很少直接向搜索引擎中寫入數據,更爲常見方式是,將 MySQL 或其他關係型數據中的數據同步到搜索引擎中,而搜索引擎的使用方式只能對數據進行查詢,無法進行修改和刪除。

常見的同步方案有兩種。

5.1 通過時間戳進行增量數據同步

這種同步方式與業務強綁定,倒如 WMS 系統中的出庫單,並不需要非常實時,稍微有延遲也可以接受,每分鐘從 MySQL 的出庫單表中,把最近十分鐘創建的所有出庫單取出,批量存入Elasticsearch中,取數據的操作需要執行的邏輯可以表達爲下面的 SQL:

select * from wms_orders where update_time >= date_sub(now(), interval 10 minute);

當然,考慮到邊界情況,可以讓這個時間段的數據與前一次的有一些重疊:

select * from wms_orders where update_time >= date_sub(
    now(), interval 11 minute
);

取最近 11 分鐘有變動的數據覆蓋更新到Elasticsearch中,這種方案的缺點顯而易見,必須要求業務數據嚴格遵守一定的規範。比如必須有 update_time 字段,並且每次創建和更新都要保證該字段有正確的時間值,否則同步邏輯就會丟失數據。

5.2 通過 binlog 進行數據同步

業界使用較多的是阿里開源的 Canal 來進行 binlog 解析與同步。canal 會僞裝成 MySQL 的從庫,然後解析好行格式的 binlog,再以更容易解析的格式(如 json)發送到消息隊列。

由下游的 kafka 消費者負責把上游數據表的自增主鍵作爲Elasticsearch的文檔的 id 進行寫入,這樣可以保證每次接收到 binlog 時,對應 id 的數據都被覆蓋更新爲最新。MySQL 的 Row 格式的 binlog 會將每條記錄的所有字段都提供給下游,所以向異構數據目標同步數據時,不需要考慮數據是插入還是更新,只要一律按 id 進行覆蓋即可。

這種模式同樣需要業務遵守一條數據表規範,即表中必須有唯一主鍵 id 來保證我們進入Elasticsearch的數據不會發生重複。一旦不遵守該規範,那麼就會在同步時導致數據重複。當然,你也可以爲每一張需要的表去定製消費者的邏輯,但這不是通用系統討論的範疇。

五、負載均衡

1 常見的負載均衡思路

如果不考慮均衡的話,現在有 n 個服務節點,完成業務流程只需要從這 n 箇中挑選其中的一個,有幾種思路:

  1. 按順序挑:例如上次選了第一臺,這次就選第二臺,下次選第三臺,依此類推。如果已經到了最後一臺,那麼下一次從第一臺開始。這種情況下可以把服務節點信息都存儲在數組中,每次請求完成下游之後,將一個索引後移即可,在移到盡頭時再移回數組開頭。

  2. 隨機挑一個:每次都隨機挑,真隨機僞隨機均可。假設選第 x 臺機器,那麼 x 可描述爲rand.Intn() % n

  3. 根據某種權重,對下游節點進行排序,選擇權重最大或最小的那一個。

當然,實際場景不可能無腦輪詢或者無腦隨機,如果對下游請求失敗了,還需要某種機制來進行重試。如果純粹隨機,存在一定的可能性讓下一次選擇的節點仍是問題節點。

2 基於洗牌算法的負載均衡

如果隨機選取每次發送請求的節點,在遇到下游出問題時換其他節點重試,需要設計一個大小和節點數組大小一致的索引數組,每次來新的請求,對索引數組做洗牌,然後取第一個元素作爲選中的服務節點。如果請求失敗,那麼選擇下一個節點重試,依此類推:

var endpoints = []string {
    "100.69.62.1:3232",
    "100.69.62.32:3232",
    "100.69.62.42:3232",
    "100.69.62.81:3232",
    "100.69.62.11:3232",
    "100.69.62.113:3232",
    "100.69.62.101:3232",
}

// 重點在這個 shuffle
func shuffle(slice []int) {
    for i := 0; i < len(slice); i++ {
        a := rand.Intn(len(slice))
        b := rand.Intn(len(slice))
        slice[a], slice[b] = slice[b], slice[a]
    }
}

func request(params map[string]interface{}) error {
    var indexes = []int {0,1,2,3,4,5,6}
    var err error

    shuffle(indexes)
    maxRetryTimes := 3

    idx := 0
    for i := 0; i < maxRetryTimes; i++ {
        err = apiRequest(params, indexes[idx])
        if err == nil {
            break
        }
        idx++
    }

    if err != nil {
        // logging
        return err
    }

    return nil
}

循環一遍 slice,隨機生成兩個索引,交換這兩個索引對應的值,完成洗牌,看起來沒有什麼問題。

2.1 錯誤的洗牌導致的負載不均衡

但其實上述洗牌是有問題的,存在兩個問題:

  1. 沒有隨機種子。在沒有隨機種子的情況下,rand.Intn()返回的僞隨機數序列是固定的。

  2. 洗牌不均勻。不均勻會導致整個數組第一個節點有大概率被選中,所以多個節點的負載分佈不均衡

第一個問題不用多說,簡單但容易犯,特別注意即可規避。

第二個問題用概率簡單解釋。如果每次挑選都是真隨機,那麼這種洗牌的結果是 77=823543,但我們需要的是每次選中的數字的概率相等,則應該有 7!=5040 種結果,823543 明顯不是 5040 的倍數,所以肯定有某個數字以比其他數字更高的概率被選擇,所以這種代碼是錯誤的,計算概率的過程很複雜,直接上結論:** 索引爲 0 的元素被選中的概率約爲 22%,其他元素被選中的概率約爲 13%**。

2.2 修正洗牌算法

從數學上得到過證明的還是經典 fisher-yates 算法,主要思路爲每次隨機挑選一個值,放到數組末尾。然後在 n-1 個元素的數組中再隨機挑選一個值放在數組末尾,依此類推。

func shuffle(indexes []int) {
    for i:=len(indexes); i>0; i-- {
        lastIdx := i - 1
        idx := rand.Intn(i)
        indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
    }
}

Go 標準庫中內置了該算法:

func shuffle(n int) []int {
    b := rand.Perm(n)
    return b
}

在當前的場景下,只要用rand.Perm就可以得到想要的索引數組了。

3 ZooKeeper 集羣的隨機節點挑選問題

本節中的場景是從 N 個節點中選擇一個節點發送請求,初始請求結束之後,後續請求會重新對數組洗牌,所以每兩個請求之間沒有關聯。因此本節最開始的洗牌算法,不初始化隨機庫的種子理論上也沒有什麼問題。

但在一些特殊的場景下,例如使用 ZooKeeper 時,客戶端初始化從多個服務節點中挑選一個節點後,是會向該節點建立長連接的。之後客戶端請求都會發往該節點,直到該節點不可用纔會在節點列表中挑選下一個節點。在這種場景下,初始連接節點的選擇就必須是真隨機了,否則,所有客戶端啓動時,都會去連接同一個 ZooKeeper 的實例,根本無法起到負載均衡的目的。

如果在日常開發中,當前業務也是類似的場景,就必須考慮一下是否會發生類似的情況。

rand設置種子:

rand.Seed(time.Now().UnixNano())

4 負載均衡算法效果驗證

不考慮加權負載均衡的情況,對上述原shuffle函數和改良過的shuffle函數進行簡單對比:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func init() {
    rand.Seed(time.Now().UnixNano())
}

func shuffle1(slice []int) {
    for i := 0; i < len(slice); i++ {
        a := rand.Intn(len(slice))
        b := rand.Intn(len(slice))
        slice[a], slice[b] = slice[b], slice[a]
    }
}

func shuffle2(indexes []int) {
    for i := len(indexes); i > 0; i-- {
        lastIdx := i - 1
        idx := rand.Intn(i)
        indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
    }
}

func main() {
    var cnt1 = map[int]int{}
    for i := 0; i < 1000000; i++ {
        var sl = []int{0, 1, 2, 3, 4, 5, 6}
        shuffle1(sl)
        cnt1[sl[0]]++
    }

    var cnt2 = map[int]int{}
    for i := 0; i < 1000000; i++ {
        var sl = []int{0, 1, 2, 3, 4, 5, 6}
        shuffle2(sl)
        cnt2[sl[0]]++
    }

    fmt.Println(cnt1, "\n", cnt2)
}

結果:

map[0:223847 1:128878 2:129454 3:129201 4:129353 5:129609 6:129658] 
map[0:142507 1:142003 2:143590 3:142715 4:142487 5:143064 6:143634]

六 分佈式配置管理

在分佈式系統中,常困擾我們的還有上線問題。

雖然目前有一些優雅的重啓方案,但實際應用中可能受限於系統內部的運行情況而無法做到真正的優雅。比如爲了對去下游的流量進行限制,在內存中堆積一些數據,並對堆積設定時間或總量的閾值。在任意閾值達到之後將數據統一發送給下游,以避免頻繁的請求超出下游的承載能力讓下游崩潰。這種情況下重啓要做到優雅就比較難了。

所以應該儘量避免採用或繞過上線的方式對線上程序做一些修改。比較典型的修改內容就是程序的配置項。

1 場景舉例

1.1 報表系統

在一些偏 OLAP 或者離線的數據平臺中,經過長期的迭代開發,整個系統的功能模塊已經漸漸穩定,可變動的項只出現在數據層,而數據層的變動大多可以認爲是 SQL 的變動。架構師們自然而然地會想着把這些變動項抽離到系統外部。

當業務提出了新的需求時,需要將新的 SQL 錄入到系統內部,或者簡單修改一個老的 SQL,不對系統進行上線,就可以直接完成修改。

1.2 業務配置

大公司的平臺部門服務衆多業務線,在平臺內爲各業務線分配唯一 id。平臺本身也由多個模塊組成,這些模塊需要共享相同的業務線定義。當公司新開產品線時,需要能夠在短時間內打通所有平臺系統的流程,這時每個系統都走上線流程肯定來不及。另外需要對這種公共配置進行統一管理,同時對其增減邏輯也做統一管理。這些信息變更時,需要自動通知業務方的系統,而不需要人力介入或只需簡單介入。

除業務管理外,很多互聯網公司會按照城市來鋪展自己的業務。在某個城市未開城之前,理論上所有模塊都應該認爲帶有該城市 id 的數據都是髒數據並自動過濾。如果業務開城,就應該將這個新的城市 id 加入到白名單中,這樣業務流程便可以自動運轉。

互聯網公司的運營系統中會有各種類型的運營活動,有些運營活動推出後可能出現超出預期的事件(比如公關危機),需要緊急將系統下線。這時會用到一些開關來快速關閉相應的功能,或者快速將想要剔除的活動 id 從白名單中剔除。

2 使用 etcd 實現配置更新

使用 etcd 實現一個簡單的配置讀取和動態更新流程,以此來了解線上的配置更新流程。

2.1 配置定義

簡單的配置,可以將內容完全存儲在 etcd 中,如:

etcdctl put /configs/remote_config.json -- '{"addr":"127.0.0.1:1080","aes_key":"01B345B7A9ABC00F0123456789ABCDAF","https":false,"secret":"","private_key_path":"","cert_file_path":""}'
etcdctl get /configs/remote_config.json
{
    "addr" : "127.0.0.1:1080",
    "aes_key" : "01B345B7A9ABC00F0123456789ABCDAF",
    "https" : false,
    "secret" : "",
    "private_key_path" : "",
    "cert_file_path" : ""
}

2.2 新建 etcd client

    client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"http://127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        panic(err)
    }

2.3 獲取配置

    var resp *clientv3.GetResponse
    resp, err = client.Get(context.Background()"/configs/remote_config.json")
    if err != nil {
        log.Fatalln(err)
    }
    log.Println(resp)

client結構體已完成kv接口,可以直接調用Get()方法獲取值。

2.4 更新訂閱

    watch := client.Watch(context.Background()"/configs/remote_config.json")

    for wresp := range watch {
        for _, ev := range wresp.Events {
            log.Println("new values is ", string(ev.Kv.Value))
            err = json.Unmarshal(ev.Kv.Value, &appConfig)
            if err != nil {
                log.Fatalln(err)
            }
        }
    }

通過訂閱 config 路徑的變動事件,在該路徑下內容發生變化時,客戶端側可以收到變動通知,並收到變動後的字符串值。

2.5 完整代碼

導包時也只保留了核心包。

package main

import (
    clientv3 "go.etcd.io/etcd/client/v3"
)

type Config struct {
    Addr           string `json:"addr"`
    AesKey         string `json:"aes_key"`
    HTTPS          bool   `json:"https"`
    Secret         string `json:"secret"`
    PrivateKeyPath string `json:"private_key_path"`
    CertFilePath   string `json:"cert_file_path"`
}

var (
    appConfig  Config
    configPath = `/configs/remote_config.json`
    client     *clientv3.Client
    err        error
)

func init() {
    client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"http://127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        panic(err)
    }

    initConfig()
}

func watchAndUpdate() {
    watch := client.Watch(context.Background(), configPath)

    for wresp := range watch {
        for _, ev := range wresp.Events {
            log.Println("new values is ", string(ev.Kv.Value))
            err = json.Unmarshal(ev.Kv.Value, &appConfig)
            if err != nil {
                log.Fatalln(err)
            }
        }
    }

}

func initConfig() {
    var resp *clientv3.GetResponse
    resp, err = client.Get(context.Background(), configPath)
    if err != nil {
        panic(err)
    }
    json.Unmarshal(resp.Kvs[0].Value, &appConfig)
}

func getConfig() Config {
    return appConfig
}

func main() {
    defer client.Close()
    watchAndUpdate()
}

上述代碼在更新配置時進行了一系列操作:watch 響應、json 解析,這些操作都不具備原子性。當單個業務請求流程中多次獲取 config 時,有可能因爲中途 config 發生變化而導致單個請求前後邏輯不一致。因此,在使用類似方法進行更新配置時,需要在單個請求的生命週期內使用相同的配置。具體實現方式可以是隻在請求開始時獲取一次配置,然後依次向下透傳。當然具體情況需要具體分析。

3 配置膨脹

隨着業務的發展,配置系統本身所承載的壓力可能也會越來越大,配置文件可能成千上萬,客戶端也可能成千上萬,此時,將配置內容存儲在 etcd 內部就不合適了。

隨着配置文件數量的膨脹,除了存儲系統本身的吞吐量問題,還有配置信息的管理問題。

需要對相應的配置進行權限管理,需要根據業務量進行配置存儲的集羣劃分。如果客戶端太多,導致配置存儲系統無法承受瞬時大量的 QPS,可能還需要在客戶端側進行緩存優化等等。

這也是爲什麼大公司都會針對自己的業務額外開發一套複雜配置系統的原因。

4 配置版本管理

在配置管理過程中,難免出現用戶誤操作的情況,例如在更新配置時,輸入了無法解析的配置。此時可以通過配置校驗來解決。

有時錯誤的配置可能不是格式上的問題,而是邏輯上的問題。比如寫 SQL 時少 select 了一個字段,更新配置時,不小心丟掉了 json 字符串裏的一個 field 而導致程序無法理解新的配置而運行異常。爲了快速止損,最快且最有效的辦法就是進行版本管理,並支持按版本回滾。

在配置進行更新時,要爲每份配置的新內容賦予一個版本號,並將修改前的內容和版本號記錄下來,當發現新配置出現問題時,能夠及時回滾。

常見的做法是,使用 MySQL 來存儲配置文件或配置字符串的不同版本內容,在需要回滾時,只需進行簡單查詢即可。

5 客戶端容錯

在業務系統的配置被剝離到配置中心後,並不意味着系統就可以高枕無憂了。

當配置中心本身宕機時,也需要一定的容錯能力,至少保證在其宕機期間,業務仍然可以運轉。這要求系統能夠在配置中心宕機時,也能夠拿到需要的配置信息,即使這些信息不是最新版本。

具體來講,在給業務提供配置讀取的 SDK 時,最好能夠將拿到的配置在業務機器的磁盤上也緩存一份。這樣遠程配置中心不可用時,可以直接用硬盤上的內容繼續運行。當重新連接上配置中心時,再把相應的內容進行更新。

加入緩存之後,必須考慮數據一致性的問題。當有個別業務機器因爲網絡錯誤而與其他機器配置不一致時,應該能夠從監控系統中知曉。

使用一種手段可能解決配置更新的痛點,但同時此手段也可能帶來新的問題。實際開發中,要對每一步決策多多思考,以使自己能夠在發生問題時不會手足無措。

轉自:

cnblogs.com/thepoy/p/14573822.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ect-fmjvQVLaOwIw7bEowg