萬字長文帶你深入 Redis 底層數據結構

Redis 數據庫的數據結構

Redis 的鍵值對中的 key 就是字符串對象,而 value 就是指 Redis 的數據類型,可以是 String,也可以是 List、Hash、Set、 Zset 的數據類型。

其實是 Redis 底層使用了一個全局哈希表保存所有鍵值對,哈希表的最大好處就是 O(1) 的時間複雜度快速查找到鍵值對。哈希表其實就是一個數組,數組中的元素叫做哈希桶。

struct redisServer {
   //...
    redisDb *db;
 //...
 int dbnum; //默認16個
}

typedef struct redisDb {
    dict *dict;     //全局hash表
    //...
} redisDb;

struct dict {
   //...
     dictht ht[2]; //兩個dictEntry,一個開始爲空,rehash遷移時使用
    //...
 long rehashidx; /* rehashing not in progress if rehashidx == -1 */
};

typedef struct dictht {
    dictEntry **table;        // 哈希表節點數組
    unsigned long size;       // 哈希表大小
    unsigned long sizemask;   // 哈希表大小掩碼,用於計算索引值,總是等於size-1
    unsigned long used;       // 該哈希表已有節點的數量
} dictht;

struct dictEntry {//具體的對象
    void *key; //key
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;//value
    struct dictEntry *next;    //下一個節點的指針
    void *metadata[];
};

void *key 和 void *value 指針指向的就是 Redis 對象。Redis 中有全局 hash 表,key 是 String,value 是不同類型的對象,如果是 Java,那可以直接用 Map<String,Object> 來通用表示。而 Redis 直接由 C 語言實現,因此具體的每個對象都由 redisObject 結構表示,用 type 來表示具體類型,如下:

typedef struct redisObject {
    unsigned type: 4;        // 對象類型
    unsigned storage: 2;     // REDIS_VM_MEMORY or REDIS_VM_SWAPPING
    unsigned encoding: 4;    // 對象所使用的編碼
    unsigned lru: 22;        // lru time (relative to server.lruclock)
    int refcount;            // 對象的引用計數
    void *ptr;               // 指向對象的底層實現數據結構
} robj;

如圖,Redis 數據類型(也叫 Redis 對象)和底層數據結構的對應關圖:

這幾個值都是可以修改的,沒必要記;在 redis.conf 裏

hash-max-listpack-entries 512
hash-max-listpack-value 64

zset-max-listpack-entries 128
zset-max-listpack-value 64

SDS

Simple Dynamic String,簡單動態字符串

C 語言中的缺陷

獲取字符串長度複雜度爲 O(n)

在 C 語言裏,字符數組的結尾位置用 “\0” 表示,意思是指字符串的結束。
因此 c 語言獲取字符串長度的函數 strlen,就是遍歷字符數組中的每一個字符,遇到字符爲 “\0” 後,就會停止遍歷,然後返回已經統計到的字符個數,即爲字符串長度,因此複雜度爲 O(n)

字符串只能保存文本數據

字符數組的結尾位置用 “\0” 表示
因此,除了字符串的末尾之外,字符串裏面不能含有 “\0” 字符,否則最先被程序讀入的 “\0” 字符將被誤認爲是字符串結尾,這個限制使得 C 語言的字符串只能保存文本數據,不能保存像圖片、音頻、視頻文化這樣的二進制數據

有可能發生緩衝區溢出

C 語言的字符串是不會記錄自身的緩衝區大小的,所以 strcat 函數假定程序員在執行這個函數時,已經爲 dest 分配了足夠多的內存,可以容納 src 字符串中的所有內容,而一旦這個假定不成立,就會發生緩衝區溢出將可能會造成程序運行終止。

SDS 結構

SDS 的 API 使用二進制的方式來處理 SDS 存放在 buf[] 裏的數據,使得 Redis 不僅可以保存文本數據,也可以保存任意格式的二進制數據。

SDS 的動態其實指的就是動態擴容

hisds hi_sdsMakeRoomFor(hisds s, size_t addlen)
{
    ... ...
    // s目前的剩餘空間已足夠,無需擴展,直接返回
    if (avail >= addlen)
        return s;
    //獲取目前s的長度
    len = hi_sdslen(s);
    sh = (char *)s - hi_sdsHdrSize(oldtype);
    //擴展之後 s 至少需要的長度
    newlen = (len + addlen);
    //根據新長度,爲s分配新空間所需要的大小
    if (newlen < HI_SDS_MAX_PREALLOC)
        //新長度<HI_SDS_MAX_PREALLOC 則分配所需空間*2的空間
  //默認定義HI_SDS_MAX_PREALLOC爲(1024*1024)即1M
        newlen *= 2;
    else
        //否則,分配長度爲目前長度+1MB
        newlen += HI_SDS_MAX_PREALLOC;
       ...
}

// #define HI_SDS_MAX_PREALLOC (1024*1024)

雙向鏈表

Redis 中的鏈表是雙向鏈表結構

typedef struct listNode {
    //前置節點
    struct listNode *prev;
    //後置節點
    struct listNode *next;
    //節點的值
    void *value;
} listNode;

不過,Redis 在 listNode 結構體基礎上又封裝了 list 這個數據結構;類似於 Java 定義了 Node 節點,同時再此基礎上封裝了 LinkedList

typedef struct list {
 //鏈表頭節點
    listNode *head;
    //鏈表尾節點
    listNode *tail;
    //節點值複製函數
    void *(*dup)(void *ptr);
    //節點值釋放函數
    void (*free)(void *ptr);
    //節點值比較函數
    int (*match)(void *ptr, void *key);
    //鏈表節點數量
    unsigned long len;
} list;

Redis 的鏈表的優缺點與鏈表的優缺點一致

壓縮列表

壓縮列表是 Redis 爲了節約內存而開發的,它是由連續內存塊組成的順序型數據結構,類似於數組。

壓縮列表有四個重要字段:

在壓縮列表中,如果要查找定位第一個元素和最後一個元素,可以通過表頭三個字段(zllen)的長度直接定位,複雜度是 O(1)。而查找其他元素時,就沒有這麼高效了,只能逐個查找,此時的複雜度就是 O(N) 了,因此壓縮列表不適合保存過多的元素。

壓縮列表節點(entry)的構成如下:entry 中的字段:

往壓縮列表中插入數據時,壓縮列表會根據數據類型是字符串還是整數,以及數據的大小,會使用不同空間大小的 prevlen 和 encoding 這兩個元素裏保存的信息,這種根據數據大小和類型進行不同的空間大小分配的設計思想,正是 Redis 爲了節省內存而採用的。

壓縮列表裏的每個節點中的 prevlen 屬性都記錄了前一個節點的長度,而且 prevlen 屬性的空間大小跟前一個節點長度值有關,比如:

連鎖更新

壓縮列表新增某個元素或修改某個元素時,如果空間不不夠,壓縮列表佔用的內存空間就需要重新分配。由於 prevlen 是前一個節點的長度,當新插入的元素較大時,那麼就可能會導致後續元素的 prevlen 佔用空間都發生變化(比如說 1 字節編程 5 字節),如果當前節點的 prevlen 屬性從 1 字節變成 5 字節後導致下一個節點的 prevlen 屬性也變大,那可能就會而引起「連鎖更新」問題,導致每個元素的空間都要重新分配,造成訪問壓縮列表性能的下降。

壓縮列表的優缺點

優點:

缺點:

目前已被 listpack 替代

哈希表

全局哈希表 和 底層哈希表的對象都是使用的這個結構

哈希表中的節點結構

struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;     /* Next entry in the same hash bucket. */
    void *metadata[];          
};

dictEntry 結構裏不僅包含指向鍵和值的指針,還包含了指向下一個哈希表節點的指針,這個指針可以將多個哈希值相同的鍵值對鏈接起來,以此來解決哈希衝突的問題,這就是鏈式哈希。

關於解決 hash 衝突問題可以看這篇文章:解決哈希衝突的三種方法

而 redis 是先通過拉鍊法解決,再通過 rehash 來解決 hash 衝突問題的,即再 hash 法

rehash

Redis 定義一個 dict 結構體,這個結構體裏定義了兩個哈希表(ht_table[2])。

struct dict {
   //...
    dictEntry **ht_table[2]; //兩個dictEntry,一個開始爲空,rehash遷移時使用
    //...
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
};

在正常服務請求階段,插入的數據,都會寫入到哈希表 1,此時的哈希表 2  並沒有被分配空間。隨着數據逐步增多(根據負載因子判斷),觸發了 rehash 操作,這個過程分爲如下三步:

如果哈希表 1的數據量非常大,那麼在遷移至哈希表 2的時候,因爲會涉及大量的數據拷貝,此時可能會對 Redis 造成阻塞,無法服務其他請求。因此 redis 採用了漸進式 rehash

漸進式 rehash 步驟如下:

  1. 先給哈希表 2分配空間;

  2. 在 rehash 進行期間,每次哈希表元素進行新增、刪除、查找或者更新操作時,Redis 除了會執行對應的操作之外,還會順序將哈希表 1中索引位置上的所有 key-value 遷移到哈希表 2上;

  3. 隨着處理客戶端發起的哈希表操作請求數量越多,最終在某個時間點會把哈希表 1的所有 key-value 遷移到哈希表 2,從而完成 rehash 操作。

這樣就把一次性大量數據遷移工作的開銷,分攤到了多次處理請求的過程中,避免了一次性 rehash 的耗時操作。

在進行漸進式 rehash 的過程中,會有兩個哈希表,所以在漸進式 rehash 進行期間,哈希表元素的刪除、查找、更新等操作都會在這兩個哈希表進行。比如,在漸進式 rehash 進行期間,查找一個 key 的值的話,先會在哈希表 1裏面進行查找,如果沒找到,就會繼續到哈希表 2 裏面進行找到。新增一個 key-value 時,會被保存到哈希表 2裏面,而哈希表 1則不再進行任何添加操作,這樣保證了哈希表 1的 key-value 數量只會減少,隨着 rehash 操作的完成,最終哈希表 1就會變成空表。

哈希表的查找過程:

dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    uint64_t h, idx, table;

    if (dictSize(d) == 0) return NULL; /* dict is empty */
    if (dictIsRehashing(d)) _dictRehashStep(d);//檢查是否正在漸進式 rehash,如果是,那就rehash一步
    h = dictHashKey(d, key);//計算key的hash值
 //哈希表元素的刪除、查找、更新等操作都會在兩個哈希表進行
    for (table = 0; table <= 1; table++) {
        idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
        he = d->ht_table[table][idx];
        while(he) {
            void *he_key = dictGetKey(he);
            if (key == he_key || dictCompareKeys(d, key, he_key))
                return he;
            he = dictGetNext(he);
        }
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

關鍵在於哈希表插入時會去檢查是都正在 Rehash,如果不是,那就往 0 號 hash 表中插入;如果是,那就直接往 1 號 hash 表中插入,因爲如果正在 Rehash 還往 0 號 hash 表中插入,那麼最終還是要 rehash 到 1 號 hash 表的

int htidx = dictIsRehashing(d) ? 1 : 0;

rehash 的觸發條件

負載因子 = 哈希表已保存節點數量 / 哈希表大小

觸發 rehash 操作的條件,主要有兩個:

整數集合

當一個 Set 對象只包含整數值元素,並且元素數量不大時,就會使用整數集這個數據結構作爲底層實現。

typedef struct intset {
    uint32_t encoding;
   //集合包含的元素數量
    uint32_t length;
    //保存元素的數組
    int8_t contents[];
} intset;

保存元素的容器是一個 contents 數組,雖然 contents 被聲明爲 int8_t 類型的數組,但是實際上 contents 數組並不保存任何 int8_t 類型的元素,contents 數組的真正類型取決於 intset 結構體裏的 encoding 屬性的值。比如:

整數集合升級

將一個新元素加入到整數集合裏面,如果新元素的類型(int32_t)比整數集合現有所有元素的類型(int16_t)都要長時,整數集合需要先進行升級,也就是按新元素的類型(int32_t)擴展 contents 數組的空間大小,然後才能將新元素加入到整數集合裏,當然升級的過程中,也要維持整數集合的有序性。

整數集合升級的好處:
如果要讓一個數組同時保存 int16_t、int32_t、int64_t 類型的元素,最簡單做法就是直接使用 int64_t 類型的數組。不過這樣的話,當如果元素都是 int16_t 類型的,就會造成內存浪費。

使用整數集合主要思想就是爲了節省內存開銷。

跳錶

跳錶的優勢是能支持平均 O(logN) 複雜度的節點查找。

跳錶是在鏈表基礎上改進過來的,實現了一種「多層」的有序鏈表,這樣的好處是能快讀定位數據。

typedef struct zskiplist {
 //便於在O(1)時間複雜度內訪問跳錶的頭節點和尾節點;
    struct zskiplistNode *header, *tail;
 //跳錶的長度
    unsigned long length;
 //跳錶的最大層數
    int level;
} zskiplist;

typedef struct zskiplistNode {
    //Zset 對象的元素值
    sds ele;
    //元素權重值
    double score;
    //後向指針,指向前一個節點,目的是爲了方便從跳錶的尾節點開始訪問節點,方便倒序查找。
    struct zskiplistNode *backward;
  
    //節點的level數組,保存每層上的前向指針和跨度
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

跳錶結構如下:

跳錶的相鄰兩層的節點數量最理想的比例是 2:1,查找複雜度可以降低到 O(logN)。

Redis 中的跳錶是兩步兩步跳的嗎?

如果採用新增節點或者刪除節點時,來調整跳錶節點以維持比例 2:1 的方法的話,顯然是會帶來額外開銷的。

跳錶在創建節點時候,會生成範圍爲 [0-1] 的一個隨機數,如果這個隨機數小於 0.25(相當於概率 25%),那麼層數就增加 1 層,然後繼續生成下一個隨機數,直到隨機數的結果大於 0.25 結束,最終確定該節點的層數。因爲隨機數取值在[0,0.25)範圍內概率不會超過 25%,所以這也說明了增加一層的概率不會超過 25%。這樣的話,當插入一個新結點時,只需修改前後結點的指針,而其它結點的層數就不需要隨之改變了,這樣就降低插入操作的複雜度。

// #define ZSKIPLIST_P 0.25
int zslRandomLevel(void) {
    static const int threshold = ZSKIPLIST_P*RAND_MAX;
    int level = 1; //初始化爲一級索引
    while (random() < threshold)
        level += 1;//隨機數小於 0.25就增加一層
 //如果level 沒有超過最大層數就返回,否則就返回最大層數
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

爲什麼不用 AVL 樹

listpack

listpack,目的是替代壓縮列表,它最大特點是 listpack 中每個節點不再包含前一個節點的長度了, 解決壓縮列表的連鎖更新問題

listpack 採用了壓縮列表的很多優秀的設計,比如還是用一塊連續的內存空間來緊湊地保存數據,並且爲了節省內存的開銷,listpack 節點與壓縮列表一樣採用不同的編碼方式保存不同大小的數據

typedef struct {
    /* When string is used, it is provided with the length (slen). */
    unsigned char *sval;
    uint32_t slen;
    /* When integer is used, 'sval' is NULL, and lval holds the value. */
    long long lval;
} listpackEntry;

listpack 沒有壓縮列表中記錄前一個節點長度的字段了,listpack 只記錄當前節點的長度,向 listpack 加入一個新元素的時候,不會影響其他節點的長度字段的變化,從而避免了壓縮列表的連鎖更新問題。

quicklist

目前版本的 quicklist 其實是 雙向鏈表 + listpack 的組合,因爲一個 quicklist 就是一個鏈表,而鏈表中的每個元素又是一個 listpack。

typedef struct quicklist {
 //quicklist的鏈表頭
    quicklistNode *head;
 //quicklist的鏈表尾
    quicklistNode *tail;
 //所有listpacks中的總元素個數
    unsigned long count;        /* total count of all entries in all listpacks */
    //quicklistNodes的個數
 unsigned long len;          /* number of quicklistNodes */
    signed int fill : QL_FILL_BITS;       /* fill factor for individual nodes */
    unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
    unsigned int bookmark_count: QL_BM_BITS;
    quicklistBookmark bookmarks[];
} quicklist;

typedef struct quicklistEntry {
    const quicklist *quicklist;
    quicklistNode *node;
 //指向listpack的指針
    unsigned char *zi;
    unsigned char *value;
    long long longval;
    size_t sz;
    int offset;
} quicklistEntry;

typedef struct quicklistNode {
 //前一個quicklistNode
    struct quicklistNode *prev;
 //下一個quicklistNode
    struct quicklistNode *next;
    unsigned char *entry;
 //listpack的的字節大小
    size_t sz;             /* entry size in bytes */
 //listpack列表的元素個數
    unsigned int count : 16;     /* count of items in listpack */
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
    unsigned int container : 2;  /* PLAIN==1 or PACKED==2 */
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int dont_compress : 1; /* prevent compression of entry that will be used later */
    unsigned int extra : 9; /* more bits to steal for future usage */
} quicklistNode;

BitMap

詳情請看 位圖 [1],源碼部分就不多做介紹了

HyperLogLog

HyperLogLog 算法是一種非常巧妙的近似統計海量去重元素數量的算法。它內部維護了 16384 個桶(bucket)來記錄各自桶的元素數量。當一個元素到來時,它會散列到其中一個桶,以一定的概率影響這個桶的計數值。因爲是概率算法,所以單個桶的計數值並不準確,但是將所有的桶計數值進行調合均值累加起來,結果就會非常接近真實的計數值。

爲了便於理解 HyperLogLog 算法,我們先簡化它的計數邏輯。因爲是去重計數,如果是準確的去重,肯定需要用到 set 集合,使用集合來記錄所有的元素,然後使用 scard 指令來獲取集合大小就可以得到總的計數。因爲元素特別多,單個集合會特別大,所以將集合打散成 16384 個小集合。當元素到來時,通過 hash 算法將這個元素分派到其中的一個小集合存儲,同樣的元素總是會散列到同樣的小集合。這樣總的計數就是所有小集合大小的總和。使用這種方式精確計數除了可以增加元素外,還可以減少元素。

集合打散並沒有什麼明顯好處,因爲總的內存佔用並沒有減少。HyperLogLog 肯定不是這個算法,它需要對這個小集合進行優化,壓縮它的存儲空間,讓它的內存變得非常微小。HyperLogLog 算法中每個桶所佔用的空間實際上只有 6 個 bit,這 6 個 bit 自然是無法容納桶中所有元素的,它記錄的是桶中元素數量的對數值。

爲了說明這個對數值具體是個什麼東西,我們先來考慮一個小問題。一個隨機的整數值,這個整數的尾部有一個 0 的概率是 50%,要麼是 0 要麼是 1。同樣,尾部有兩個 0 的概率是 25%,有三個零的概率是 12.5%,以此類推,有 k 個 0 的概率是 2^(-k)。如果我們隨機出了很多整數,整數的數量我們並不知道,但是我們記錄了整數尾部連續 0 的最大數量 K。我們就可以通過這個 K 來近似推斷出整數的數量,這個數量就是 2^K。

當然結果是非常不準確的,因爲可能接下來你隨機了非常多的整數,但是末尾連續零的最大數量 K 沒有變化,但是估計值還是 2^K。你也許會想到要是這個 K 是個浮點數就好了,每次隨機一個新元素,它都可以稍微往上漲一點點,那麼估計值應該會準確很多。

HyperLogLog 通過分配 16384 個桶,然後對所有的桶的最大數量 K 進行調合平均來得到一個平均的末尾零最大數量 K# ,K# 是一個浮點數,使用平均後的 2^K# 來估計元素的總量相對而言就會準確很多。不過這只是簡化算法,真實的算法還有很多修正因子,因爲涉及到的數學理論知識過於繁多,這裏就不再精確描述。

下面我們看看 Redis HyperLogLog 算法的具體實現。我們知道一個 HyperLogLog 實際佔用的空間大約是 13684 * 6bit / 8 = 12k 字節。但是在計數比較小的時候,大多數桶的計數值都是零。如果 12k 字節裏面太多的字節都是零,那麼這個空間是可以適當節約一下的。Redis 在計數值比較小的情況下采用了稀疏存儲,稀疏存儲的空間佔用遠遠小於 12k 字節。相對於稀疏存儲的就是密集存儲,密集存儲會恆定佔用 12k 字節。

密集存儲結構

不論是稀疏存儲還是密集存儲,Redis 內部都是使用字符串位圖來存儲 HyperLogLog 所有桶的計數值。密集存儲的結構非常簡單,就是連續 16384 個 6bit 串成的字符串位圖。

那麼給定一個桶編號,如何獲取它的 6bit 計數值呢?這 6bit 可能在一個字節內部,也可能會跨越字節邊界。我們需要對這一個或者兩個字節進行適當的移位拼接纔可以得到計數值。

假設桶的編號爲 idx,這個 6bit 計數值的起始字節位置偏移用 offset_bytes 表示,它在這個字節的起始比特位置偏移用 offset_bits 表示。我們有

offset_bytes = (idx * 6) / 8
offset_bits = (idx * 6) % 8

前者是商,後者是餘數。比如 bucket 2 的字節偏移是 1,也就是第 2 個字節。它的位偏移是 4,也就是第 2 個字節的第 5 個位開始是 bucket 2 的計數值。需要注意的是字節位序是左邊低位右邊高位,而通常我們使用的字節都是左邊高位右邊低位,我們需要在腦海中進行倒置。

如果 offset_bits 小於等於 2,那麼這 6bit 在一個字節內部,可以直接使用下面的表達式得到計數值 val

val = buffer[offset_bytes] >> offset_bits  # 向右移位

如果 offset_bits 大於 2,那麼就會跨越字節邊界,這時需要拼接兩個字節的位片段。

# 低位值
low_val = buffer[offset_bytes] >> offset_bits
# 低位個數
low_bits = 8 - offset_bits
# 拼接,保留低6位
val = (high_val << low_bits | low_val) & 0b111111

不過下面 Redis 的源碼要晦澀一點,看形式它似乎只考慮了跨越字節邊界的情況。這是因爲如果 6bit 在單個字節內,上面代碼中的 high_val 的值是零,所以這一份代碼可以同時照顧單字節和雙字節。

// 獲取指定桶的計數值
#define HLL_DENSE_GET_REGISTER(target,p,regnum) do { \
    uint8_t *_p = (uint8_t*) p; \
    unsigned long _byte = regnum*HLL_BITS/8; 
    unsigned long _fb = regnum*HLL_BITS&7;  # %8 = &7
    unsigned long _fb8 = 8 - _fb; \
    unsigned long b0 = _p[_byte]; \
    unsigned long b1 = _p[_byte+1]; \
    target = ((b0 >> _fb) | (b1 << _fb8)) & HLL_REGISTER_MAX; \
} while(0)

// 設置指定桶的計數值
#define HLL_DENSE_SET_REGISTER(p,regnum,val) do { \
    uint8_t *_p = (uint8_t*) p; \
    unsigned long _byte = regnum*HLL_BITS/8; \
    unsigned long _fb = regnum*HLL_BITS&7; \
    unsigned long _fb8 = 8 - _fb; \
    unsigned long _v = val; \
    _p[_byte] &= ~(HLL_REGISTER_MAX << _fb); \
    _p[_byte] |= _v << _fb; \
    _p[_byte+1] &= ~(HLL_REGISTER_MAX >> _fb8); \
    _p[_byte+1] |= _v >> _fb8; \
} while(0)

稀疏存儲結構

稀疏存儲適用於很多計數值都是零的情況。下圖表示了一般稀疏存儲計數值的狀態。

當多個連續桶的計數值都是零時,Redis 使用了一個字節來表示接下來有多少個桶的計數值都是零:00xxxxxx。前綴兩個零表示接下來的 6bit 整數值加 1 就是零值計數器的數量,注意這裏要加 1 是因爲數量如果爲零是沒有意義的。比如 00010101 表示連續 22 個零值計數器。6bit 最多隻能表示連續 64 個零值計數器,所以 Redis 又設計了連續多個多於 64 個的連續零值計數器,它使用兩個字節來表示:01xxxxxx yyyyyyyy,後面的 14bit 可以表示最多連續 16384 個零值計數器。這意味着 HyperLogLog 數據結構中 16384 個桶的初始狀態,所有的計數器都是零值,可以直接使用 2 個字節來表示。

如果連續幾個桶的計數值非零,那就使用形如 1vvvvvxx 這樣的一個字節來表示。中間 5bit 表示計數值,尾部 2bit 表示連續幾個桶。它的意思是連續 (xx +1) 個計數值都是 (vvvvv + 1)。比如 10101011 表示連續 4 個計數值都是 11。注意這兩個值都需要加 1,因爲任意一個是零都意味着這個計數值爲零,那就應該使用零計數值的形式來表示。注意計數值最大隻能表示到 32,而 HyperLogLog 的密集存儲單個計數值用 6bit 表示,最大可以表示到 63。當稀疏存儲的某個計數值需要調整到大於 32 時,Redis 就會立即轉換 HyperLogLog 的存儲結構,將稀疏存儲轉換成密集存儲。

Redis 爲了方便表達稀疏存儲,它將上面三種字節表示形式分別賦予了一條指令。

  1. ZERO:len 單個字節表示 00[len-1],連續最多 64 個零計數值

  2. VAL:value,len 單個字節表示 1[value-1][len-1],連續 len 個值爲 value 的計數值

  3. XZERO:len 雙字節表示 01[len-1],連續最多 16384 個零計數值

#define HLL_SPARSE_XZERO_BIT 0x40 /* 01xxxxxx */
#define HLL_SPARSE_VAL_BIT 0x80 /* 1vvvvvxx */
#define HLL_SPARSE_IS_ZERO(p) (((*(p)) & 0xc0) == 0) /* 00xxxxxx */
#define HLL_SPARSE_IS_XZERO(p) (((*(p)) & 0xc0) == HLL_SPARSE_XZERO_BIT)
#define HLL_SPARSE_IS_VAL(p) ((*(p)) & HLL_SPARSE_VAL_BIT)
#define HLL_SPARSE_ZERO_LEN(p) (((*(p)) & 0x3f)+1)
#define HLL_SPARSE_XZERO_LEN(p) (((((*(p)) & 0x3f) << 8) | (*((p)+1)))+1)
#define HLL_SPARSE_VAL_VALUE(p) ((((*(p)) >> 2) & 0x1f)+1)
#define HLL_SPARSE_VAL_LEN(p) (((*(p)) & 0x3)+1)
#define HLL_SPARSE_VAL_MAX_VALUE 32
#define HLL_SPARSE_VAL_MAX_LEN 4
#define HLL_SPARSE_ZERO_MAX_LEN 64
#define HLL_SPARSE_XZERO_MAX_LEN 16384

上圖可以使用指令形式表示如下

Redis 從稀疏存儲轉換到密集存儲的條件是:

  1. 任意一個計數值從 32 變成 33,因爲 VAL 指令已經無法容納,它能表示的計數值最大爲 32

  2. 稀疏存儲佔用的總字節數超過 3000 字節,這個閾值可以通過 hll_sparse_max_bytes 參數進行調整。

計數緩存

前面提到 HyperLogLog 表示的總計數值是由 16384 個桶的計數值進行調和平均後再基於因子修正公式計算得出來的。它需要遍歷所有的桶進行計算纔可以得到這個值,中間還涉及到很多浮點運算。這個計算量相對來說還是比較大的。

所以 Redis 使用了一個額外的字段來緩存總計數值,這個字段有 64bit,最高位如果爲 1 表示該值是否已經過期,如果爲 0, 那麼剩下的 63bit 就是計數值。

當 HyperLogLog 中任意一個桶的計數值發生變化時,就會將計數緩存設爲過期,但是不會立即觸發計算。而是要等到用戶顯示調用 pfcount 指令時纔會觸發重新計算刷新緩存。緩存刷新在密集存儲時需要遍歷 16384 個桶的計數值進行調和平均,但是稀疏存儲時沒有這麼大的計算量。也就是說只有當計數值比較大時纔可能產生較大的計算量。另一方面如果計數值比較大,那麼大部分 pfadd 操作根本不會導致桶中的計數值發生變化。

這意味着在一個極具變化的 HLL 計數器中頻繁調用 pfcount 指令可能會有少許性能問題。關於這個性能方面的擔憂在 Redis 作者 antirez 的博客中也提到了。不過作者做了仔細的壓力的測試,發現這是無需擔心的,pfcount 指令的平均時間複雜度就是 O(1)。

After this change even trying to add elements at maximum speed using a pipeline of 32 elements with 50 simultaneous clients, PFCOUNT was able to perform as well as any other O(1) command with very small constant times.

源碼解析

接下來通過源碼來看一下 pfadd 和 pfcount 兩個命令的具體流程。在這之前我們首先要了解的是 HyperLogLog 的頭結構體和創建一個 HyperLogLog 對象的步驟。

HyperLogLog 頭結構體

struct hllhdr {
    char magic[4];      /* "HYLL" */
    uint8_t encoding;   /* HLL_DENSE or HLL_SPARSE. */
    uint8_t notused[3]; /* Reserved for future use, must be zero. */
    uint8_t card[8];    /* Cached cardinality, little endian. */
    uint8_t registers[]; /* Data bytes. */
};

創建 HyperLogLog 對象

#define HLL_P 14 /* The greater is P, the smaller the error. */
#define HLL_REGISTERS (1<<HLL_P) /* With P=14, 16384 registers. */
#define HLL_SPARSE_XZERO_MAX_LEN 16384


#define HLL_SPARSE_XZERO_SET(p,len) do { \
    int _l = (len)-1; \
    *(p) = (_l>>8) | HLL_SPARSE_XZERO_BIT; \
    *((p)+1) = (_l&0xff); \
} while(0)

/* Create an HLL object. We always create the HLL using sparse encoding.
 * This will be upgraded to the dense representation as needed. */
robj *createHLLObject(void) {
    robj *o;
    struct hllhdr *hdr;
    sds s;
    uint8_t *p;
    int sparselen = HLL_HDR_SIZE +
                    (((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) /
                     HLL_SPARSE_XZERO_MAX_LEN)*2);
    int aux;

    /* Populate the sparse representation with as many XZERO opcodes as
     * needed to represent all the registers. */
    aux = HLL_REGISTERS;
    s = sdsnewlen(NULL,sparselen);
    p = (uint8_t*)s + HLL_HDR_SIZE;
    while(aux) {
        int xzero = HLL_SPARSE_XZERO_MAX_LEN;
        if (xzero > aux) xzero = aux;
        HLL_SPARSE_XZERO_SET(p,xzero);
        p += 2;
        aux -= xzero;
    }
    serverAssert((p-(uint8_t*)s) == sparselen);

    /* Create the actual object. */
    o = createObject(OBJ_STRING,s);
    hdr = o->ptr;
    memcpy(hdr->magic,"HYLL",4);
    hdr->encoding = HLL_SPARSE;
    return o;
}

這裏 sparselen=HLL_HDR_SIZE+2,因爲初始化時默認所有桶的計數值都是 0。其他過程不難理解,用的存儲方式是我們前面提到過的稀疏存儲,創建的對象實質上是一個字符串對象,這也是字符串命令可以操作 HyperLogLog 對象的原因。

PFADD 命令

/* PFADD var ele ele ele ... ele => :0 or :1 */
void pfaddCommand(client *c) {
    robj *o = lookupKeyWrite(c->db,c->argv[1]);
    struct hllhdr *hdr;
    int updated = 0, j;

    if (o == NULL) {
        /* Create the key with a string value of the exact length to
         * hold our HLL data structure. sdsnewlen() when NULL is passed
         * is guaranteed to return bytes initialized to zero. */
        o = createHLLObject();
        dbAdd(c->db,c->argv[1],o);
        updated++;
    } else {
        if (isHLLObjectOrReply(c,o) != C_OK) return;
        o = dbUnshareStringValue(c->db,c->argv[1],o);
    }
    /* Perform the low level ADD operation for every element. */
    for (j = 2; j < c->argc; j++) {
        int retval = hllAdd(o, (unsigned char*)c->argv[j]->ptr,
                               sdslen(c->argv[j]->ptr));
        switch(retval) {
        case 1:
            updated++;
            break;
        case -1:
            addReplySds(c,sdsnew(invalid_hll_err));
            return;
        }
    }
    hdr = o->ptr;
    if (updated) {
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
        server.dirty++;
        HLL_INVALIDATE_CACHE(hdr);
    }
    addReply(c, updated ? shared.cone : shared.czero);
}

PFADD 命令會先判斷 key 是否存在,如果不存在,則創建一個新的 HyperLogLog 對象;如果存在,會調用 isHLLObjectOrReply() 函數檢查這個對象是不是 HyperLogLog 對象,檢查方法主要是檢查魔數是否正確,存儲結構是否正確以及頭結構體的長度是否正確等。

一切就緒後,纔可以調用 hllAdd() 函數添加元素。hllAdd 函數很簡單,只是根據存儲結構判斷需要調用 hllDenseAdd() 函數還是 hllSparseAdd() 函數。

密集存儲結構只是比較新舊計數值,如果新計數值大於就計數值,就將其替代。

而稀疏存儲結構要複雜一些:

  1. 判斷是否需要調整爲密集存儲結構,如果不需要則繼續進行,否則就先調整爲密集存儲結構,然後執行添加操作

  2. 我們需要先定位要修改的字節段,通過循環計算每一段表示的桶的範圍是否包括要修改的桶

  3. 定位到桶後,如果這個桶已經是 VAL,並且計數值大於當前要添加的計數值,則返回 0,如果小於當前計數值,就進行更新

  4. 如果是 ZERO,並且長度爲 1,那麼可以直接把它替換爲 VAL,並且設置計數值

  5. 如果不是上述兩種情況,則需要對現有的存儲進行拆分

PFCOUNT 命令

/* PFCOUNT var -> approximated cardinality of set. */
void pfcountCommand(client *c) {
    robj *o;
    struct hllhdr *hdr;
    uint64_t card;

    /* Case 1: multi-key keys, cardinality of the union.
     *
     * When multiple keys are specified, PFCOUNT actually computes
     * the cardinality of the merge of the N HLLs specified. */
    if (c->argc > 2) {
        uint8_t max[HLL_HDR_SIZE+HLL_REGISTERS], *registers;
        int j;

        /* Compute an HLL with M[i] = MAX(M[i]_j). */
        memset(max,0,sizeof(max));
        hdr = (struct hllhdr*) max;
        hdr->encoding = HLL_RAW; /* Special internal-only encoding. */
        registers = max + HLL_HDR_SIZE;
        for (j = 1; j < c->argc; j++) {
            /* Check type and size. */
            robj *o = lookupKeyRead(c->db,c->argv[j]);
            if (o == NULL) continue; /* Assume empty HLL for non existing var.*/
            if (isHLLObjectOrReply(c,o) != C_OK) return;

            /* Merge with this HLL with our 'max' HHL by setting max[i]
             * to MAX(max[i],hll[i]). */
            if (hllMerge(registers,o) == C_ERR) {
                addReplySds(c,sdsnew(invalid_hll_err));
                return;
            }
        }

        /* Compute cardinality of the resulting set. */
        addReplyLongLong(c,hllCount(hdr,NULL));
        return;
    }

    /* Case 2: cardinality of the single HLL.
     *
     * The user specified a single key. Either return the cached value
     * or compute one and update the cache. */
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (o == NULL) {
        /* No key? Cardinality is zero since no element was added, otherwise
         * we would have a key as HLLADD creates it as a side effect. */
        addReply(c,shared.czero);
    } else {
        if (isHLLObjectOrReply(c,o) != C_OK) return;
        o = dbUnshareStringValue(c->db,c->argv[1],o);

        /* Check if the cached cardinality is valid. */
        hdr = o->ptr;
        if (HLL_VALID_CACHE(hdr)) {
            /* Just return the cached value. */
            card = (uint64_t)hdr->card[0];
            card |= (uint64_t)hdr->card[1] << 8;
            card |= (uint64_t)hdr->card[2] << 16;
            card |= (uint64_t)hdr->card[3] << 24;
            card |= (uint64_t)hdr->card[4] << 32;
            card |= (uint64_t)hdr->card[5] << 40;
            card |= (uint64_t)hdr->card[6] << 48;
            card |= (uint64_t)hdr->card[7] << 56;
        } else {
            int invalid = 0;
            /* Recompute it and update the cached value. */
            card = hllCount(hdr,&invalid);
            if (invalid) {
                addReplySds(c,sdsnew(invalid_hll_err));
                return;
            }
            hdr->card[0] = card & 0xff;
            hdr->card[1] = (card >> 8) & 0xff;
            hdr->card[2] = (card >> 16) & 0xff;
            hdr->card[3] = (card >> 24) & 0xff;
            hdr->card[4] = (card >> 32) & 0xff;
            hdr->card[5] = (card >> 40) & 0xff;
            hdr->card[6] = (card >> 48) & 0xff;
            hdr->card[7] = (card >> 56) & 0xff;
            /* This is not considered a read-only command even if the
             * data structure is not modified, since the cached value
             * may be modified and given that the HLL is a Redis string
             * we need to propagate the change. */
            signalModifiedKey(c->db,c->argv[1]);
            server.dirty++;
        }
        addReplyLongLong(c,card);
    }
}

如果要計算多個 HyperLogLog 的基數,則需要將多個 HyperLogLog 對象合併,這裏合併方法是將所有的 HyperLogLog 對象合併到一個名爲 max 的對象中,max 採用的是密集存儲結構,如果被合併的對象也是密集存儲結構,則循環比較每一個計數值,將大的那個存入 max。如果被合併的是稀疏存儲,則只需要比較 VAL 即可。

如果計算單個 HyperLogLog 對象的基數,則先判斷對象頭結構體中的基數緩存是否有效,如果有效,可直接返回。如果已經失效,則需要重新計算基數,並修改原有緩存,這也是 PFCOUNT 命令不被當做只讀命令的原因。

推薦工具

給大家推薦一個幫助理解 HyperLogLog 原理的工具:http://content.research.neustar.biz/blog/hll.html,有興趣的話可以去學習一下。

GEO

特點:

和 zset 的相似之處

GEO 的 key 裏面需要保存各個 member 和經緯度,而且經緯度還必須得能夠排序,所以這個結構其實和 redis 的 zset 結構其實挺像的,唯一的區別可能在於 zset 只有一個 score,而 GEO 有經度和緯度,所以能用一個 score 來保存經度和緯度就可以解決問題了。其實 redis 的確也是這麼做的,而且 GEO 的底層其實就是在 zset 的結果上做了一層封裝,所以按照嚴格意義上講 GEO 並不是 redis 的一種新的數據類型。

GEO 的 hash 編碼方式

爲了能高效地對經緯度進行比較,Redis 採用了業界廣泛使用的 GeoHash 編碼方法,這 個方法的基本原理就是 “二分區間,區間編碼”。GeoHash 是一種地理位置編碼方法。 由 Gustavo Niemeyer 和 G.M. Morton 於 2008 年發明,它將地理位置編碼爲一串簡短的字母和數字。它是一種分層的空間數據結構,將空間細分爲網格形狀的桶,這是所謂的 z 順序曲線的衆多應用之一,通常是空間填充曲線。

當要對一組經緯度進行 GeoHash 編碼時,要先對經度和緯度分別編碼,然後再把經緯度各自的編碼組合成一個最終編碼。

首先,來看下經度和緯度的單獨編碼過程。以經緯度 116.37,39.86 爲例,首先看經度  116.37

對緯度的編碼方式,和對經度的一樣,只是緯度的範圍是 [-90,90],下面這張表顯示了對 緯度值 39.86 的編碼過程。

剛剛計算的經緯度 (116.37,39.86) 的各自編碼值是 11010 和 10111,進行交叉組合,偶數位是經度,奇數位是緯度,組合之後, 第 0 位是經度的第 0 位 1,第 1 位是緯度的第 0 位 1,第 2 位是經度的第 1 位 1,第 3 位是緯度的第 1 位 0,以此類推,就能得到最終編碼值 1110011101,如下圖所示

用了 GeoHash 編碼後,原來無法用一個權重分數表示的一組經緯度 (116.37,39.86) 就 可以用 1110011101 這一個值來表示,就可以保存爲 Sorted Set 的權重分數了。最後根據上述得到的二進制值,以 5 位爲一組,進行 base32 編碼

最後獲得的結果就是一組經緯度的 geohash 值。

地理位置二維轉一維

上文講了 GeoHash 的計算步驟,僅僅說明是什麼而沒有說明爲什麼?爲什麼分別給經度和維度編碼?爲什麼需要將經緯度兩串編碼交叉組合成一串編碼?本節試圖回答這一問題。

如下圖所示,將二進制編碼的結果填寫到空間中,當將空間劃分爲四塊時候,編碼的順序分別是左下角 00,左上角 01,右下腳 10,右上角 11,也就是類似於 Z 的曲線,當我們遞歸的將各個塊分解成更小的子塊時,編碼的順序是自相似的(分形),每一個子塊也形成 Z 曲線,這種類型的曲線被稱爲 Peano 空間填充曲線。

這種類型的空間填充曲線的優點是將二維空間轉換成一維曲線(事實上是分形維),對大部分而言,編碼相似的距離也相近, 但 Peano 空間填充曲線最大的缺點就是突變性,有些編碼相鄰但距離卻相差很遠,比如 0111 與 1000,編碼是相鄰的,但距離相差很大。

所以,爲了避免查詢不準確問題,我們可以同時查詢給定經緯度所在的方格周圍的 4 個或 8 個方格。

源碼

本質上 redis 中的 geo 就是對 geohash 的封裝,具體 geohash 相關的代碼就不給大家列了 (可自行查閱),就給大家介紹下 redis geo 裏的大體流程。

/* GEOADD key [CH] [NX|XX] long lat name [long2 lat2 name2 ... longN latN nameN] */
void geoaddCommand(client *c) {
    int xx = 0, nx = 0, longidx = 2;
    int i;
    /* 解析可選參數 */
    while (longidx < c->argc) {
        char *opt = c->argv[longidx]->ptr;
        if (!strcasecmp(opt,"nx")) nx = 1;
        else if (!strcasecmp(opt,"xx")) xx = 1;
        else if (!strcasecmp(opt,"ch")) {}
        else break;
        longidx++;
    }
    if ((c->argc - longidx) % 3 || (xx && nx)) {
        /* 解析所有的經緯度值和member,並對其個數做校驗 */
            addReplyErrorObject(c,shared.syntaxerr);
        return;
    }
    /* 構建zadd的參數數組 */
    int elements = (c->argc - longidx) / 3;
    int argc = longidx+elements*2; /* ZADD key [CH] [NX|XX] score ele ... */
    robj **argv = zcalloc(argc*sizeof(robj*));
    argv[0] = createRawStringObject("zadd",4);
    for (i = 1; i < longidx; i++) {
        argv[i] = c->argv[i];
        incrRefCount(argv[i]);
    }
    /* 以3個參數爲一組,將所有的經緯度和member信息從參數列表裏解析出來,並放到zadd的參數數組中 */
    for (i = 0; i < elements; i++) {
        double xy[2];
        if (extractLongLatOrReply(c, (c->argv+longidx)+(i*3),xy) == C_ERR) {
            for (i = 0; i < argc; i++)
                if (argv[i]) decrRefCount(argv[i]);
            zfree(argv);
            return;
        }
        /* 將經緯度座標轉化成score信息 */
        GeoHashBits hash;
        geohashEncodeWGS84(xy[0], xy[1], GEO_STEP_MAX, &hash);
        GeoHashFix52Bits bits = geohashAlign52Bits(hash);
        robj *score = createObject(OBJ_STRING, sdsfromlonglong(bits));
        robj *val = c->argv[longidx + i * 3 + 2];
        argv[longidx+i*2] = score;
        argv[longidx+1+i*2] = val;
        incrRefCount(val);
    }
    /* 轉化成zadd命令所需要的參數格式*/
    replaceClientCommandVector(c,argc,argv);
    zaddCommand(c);
}

geo 的存儲只是 zset 包了一層,zet 底層就是跳錶,具體可以看上面跳錶內容。

void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
    robj *storekey = NULL;
    int storedist = 0; /* 0 for STORE, 1 for STOREDIST. */
    /* 根據key找找到對應的zojb */
    robj *zobj = NULL;
    if ((zobj = lookupKeyReadOrReply(c, c->argv[srcKeyIndex], shared.emptyarray)) == NULL ||
        checkType(c, zobj, OBJ_ZSET)) {
        return;
    }
    /* 解析請求中的經緯度值 */
    int base_args;
    GeoShape shape = {0};
    if (flags & RADIUS_COORDS) {
    /*
     * 各種必選參數的解析,省略細節代碼,主要是解析座標點信息和半徑   
     */ 
    }
    /* 解析所有的可選參數. */
    int withdist = 0, withhash = 0, withcoords = 0;
    int frommember = 0, fromloc = 0, byradius = 0, bybox = 0;
    int sort = SORT_NONE;
    int any = 0; /* any=1 means a limited search, stop as soon as enough results were found. */
    long long count = 0;  /* Max number of results to return. 0 means unlimited. */
    if (c->argc > base_args) {
    /*
     * 各種可選參數的解析,省略細節代碼   
     */ 
    }
    /* Get all neighbor geohash boxes for our radius search
     * 獲取到要查找範圍內所有的9個geo鄰域 */
    GeoHashRadius georadius = geohashCalculateAreasByShapeWGS84(&shape);
    /* 創建geoArray存儲結果列表 */
    geoArray *ga = geoArrayCreate();
    /* 掃描9個區域中是否有滿足條的點,有就放到geoArray中 */
    membersOfAllNeighbors(zobj, georadius, &shape, ga, any ? count : 0);
    /* 如果沒有匹配結果,返回空對象 */
    if (ga->used == 0 && storekey == NULL) {
        addReply(c,shared.emptyarray);
        geoArrayFree(ga);
        return;
    }
    long result_length = ga->used;
    long returned_items = (count == 0 || result_length < count) ?
                          result_length : count;
    long option_length = 0;
    /* 
     * 後續一些參數邏輯,比如處理排序,存儲……
     */
    // 釋放geoArray佔用的空間 
    geoArrayFree(ga);
}

上述代碼刪減了大量細節,有興趣的同學可以自行查閱 (redis 中的 src/geo.c)。不過可以看出 georadius 的整體流程非常清晰:

  1. 解析請求參數。

  2. 計算目標座標所在的 geohash 和 8 個鄰居。

  3. 在 zset 中查找這 9 個區域中滿足距離限制的所有點集。

  4. 處理排序等後續邏輯。

  5. 清理臨時存儲空間。

Reference

[1]

位圖: https://www.seven97.top/cs-basics/data-structure/bitmap.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/gg6N6e90ZtbA0MH4KIEHlg