【Redis 源碼】位圖 SETBIT、BITCOUNT

前言:

位圖並不是一個特殊的數據結構,位圖其實就是一個字符串,位圖這種結構佔用空間特別小。如果是數億以上的用戶如果存儲活躍,如果是用 key/value 去存儲每個節點都需要數 G 去存儲,存儲是很大問題。如果換做位圖去存儲則可以大大節約空間, 不過適應於用戶 ID 連續性的。除此之外也可以用作比如說點讚的存儲。

(一)命令解析

bzOeY1

setbit 存儲解析:

1)上圖中操作 setbit bitstr 0 1 和 setbit bitstr 7 1 其實只佔用 1 個字節,創建 sds 創建 byte+1 多加了一個字節。所以看到是 2 個字節。
2) 上圖中操作 setbit bitstr 25 1 此時的 byte 等於 4 個字節,多加一個。所以是 5 個字節。
3) byte 計算形式等於 offset / 8 。這個 byte 其實是計算存儲到那個數組字節中。1 個字節 = 8bit
4)計算存儲在哪個 bit 裏, (offset % 8) + 1。
如圖:

(二)setbit 源碼解析

setbit 命令,bitops.c 中:

void setbitCommand(client *c) {
    robj *o;
    char *err = "bit is not an integer or out of range";
    size_t bitoffset;
    ssize_t byte, bit;
    int byteval, bitval;
    long on;

    if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)   //獲得offset偏移,字符串有一個512MB的限制
        return;

    if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != C_OK)        //獲得值
        return;

    /* 當前值只能是1和0 */
    if (on & ~1) {
        addReplyError(c,err);
        return;
    }

    if ((o = lookupStringForBitCommand(c,bitoffset)) == NULL) return;   //查找或者創建字符串sds或擴容

    /* Get current values */
    byte = bitoffset >> 3;    //btye = bitoffset / 8 計算字節位置
    byteval = ((uint8_t*)o->ptr)[byte];  //獲得存儲到字節
    
    /*
    假設bitoffset=25。當前(1 << (7 - (bitoffset & 0x7))= 64
    bit = 64對應二進制 = 0100 0000 
    bit用於做位運算
    */
    bit = 7 - (bitoffset & 0x7);          
    bitval = byteval & (1 << bit);       
    
    /* Update byte with new bit value and return original value */
    byteval &= ~(1 << bit);
    byteval |= ((on & 0x1) << bit);
    ((uint8_t*)o->ptr)[byte] = byteval;
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
    server.dirty++;
    addReply(c, bitval ? shared.cone : shared.czero);
}

獲得 offset 偏移

int getBitOffsetFromArgument(client *c, robj *o, size_t *offset, int hash, int bits) {
    long long loffset;
    char *err = "bit offset is not an integer or out of range";
    char *p = o->ptr;
    size_t plen = sdslen(p);
    int usehash = 0;

    /* Handle #<offset> form. */
    if (p[0] == '#' && hash && bits > 0) usehash = 1;

    if (string2ll(p+usehash,plen-usehash,&loffset) == 0) {
        addReplyError(c,err);
        return C_ERR;
    }

    /* Adjust the offset by 'bits' for #<offset> form. */
    if (usehash) loffset *= bits;

    /* 512MB字符串限制,不能超過 */
    if ((loffset < 0) || ((unsigned long long)loffset >> 3) >= (512*1024*1024))
    {
        addReplyError(c,err);
        return C_ERR;
    }

    *offset = (size_t)loffset;
    return C_OK;
}

查找或者創建字符串 sds 或擴容函數,bitops.c 中:

robj *lookupStringForBitCommand(client *c, size_t maxbit) {
    size_t byte = maxbit >> 3;   //btye = bitoffset / 8 計算字節位置
    robj *o = lookupKeyWrite(c->db,c->argv[1]);

    if (o == NULL) {
        o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1));  //創建時多創建一個字節
        dbAdd(c->db,c->argv[1],o);
    } else {
        if (checkType(c,o,OBJ_STRING)) return NULL;  //檢測不是字符串類型返回
        o = dbUnshareStringValue(c->db,c->argv[1],o); //獲得robj對象
        o->ptr = sdsgrowzero(o->ptr,byte+1);  //內部使用了sdsMakeRoomFor重新計算sds字符串長度
    }
    return o;
}

(三)理解 variable-precision SWAR 算法

統計一個位數組中非 0 位的數量,數學上稱作:”Hanmming Weight“(漢明重量)。最高的是 variable-precision SWAR 算法,可以在常數時間內計算出多個字節的非 0 數目。

ShnpkS

swar 函數:

int swar(uint32_t i)
{
    //計算每2位二進制數中1的個數
    i = ( i & 0x55555555) + ((i >> 1) & 0x55555555);
    //計算每4位二進制數中1的個數
    i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
    //計算每8位二進制數中1的個數
    i = (i & 0x0F0F0F0F) + ((i >> 4) & 0x0F0F0F0F);
    //將每8位二進制數中1的個數和相加,並移至最低位8位
    i = (i * 0x01010101) >> 24;
    return i;
}

1)第一次計算

ofDaAl

通過 (i & 0x55555555) + ((i >> 1) & 0x55555555)計算後所得 0x51618294,( i & 0x55555555) 得到每兩位奇數的 1 的個數,而 ((i >> 1) & 0x55555555) 在每兩位上得到偶數的 1 個數。二者相加得到的就是每兩位上 1 的個數。
每兩位的取值範圍二進制:01 、10、11。其實就是 1、2、3。

2)第二次計算

WB1V6u

通過 (i & 0x33333333) + ((i >> 2) & 0x33333333) 計算後所得 0x21312231。計算每 4 位的 1 的個數。

3)第三次計算

mpDALU

通過 (i & 0x0F0F0F0F) + ((i >> 2) & 0x0F0F0F0F) 計算後所得 0x3040404。計算每 8 位的 1 的個數。

4)第四次計算

ly0nDM

0x3040404 * 0x0101010 計算所得其實是一個 long 類型的數字 0x3070b0f0c0804。但是由於 uint32_t 只佔四位。多餘部分會被移除所以只剩下 0x0f0c0804,而 0x0f0c0804 對應的二進制是 00001111 00001100 00001000 00000100 。右移動 24 位之後剩下 00001111,而 00001111 對應的就是 15。

(四)bitcount 源碼解析

bitcount 命令

void bitcountCommand(client *c) {
    robj *o;
    long start, end, strlen;
    unsigned char *p;
    char llbuf[LONG_STR_SIZE];

    。。。省略
    
    /* Precondition: end >= 0 && end < strlen, so the only condition where
     * zero can be returned is: start > end. */
    if (start > end) {
        addReply(c,shared.czero);
    } else {
        long bytes = end-start+1;

        addReplyLongLong(c,redisPopcount(p+start,bytes)); //統計字節數組中的1數量
    }
}

統計字節數組中的 1 數量函數

size_t redisPopcount(void *s, long count) {
    size_t bits = 0;
    unsigned char *p = s;
    uint32_t *p4;
    /**
    預先生成對應的bit表,因爲一個字節非負數是可以是0~255個數字。所以生成了一個256數組。
    比如說0對應的二進制一個1都沒有,bitsinbyte[0]則爲0
    255對應二進制全是1,bitsinbyte[255]則爲8
    */
    static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8};

    /* 計算對齊,以4字節對齊。多餘部分先計算到bits中 */
    while((unsigned long)p & 3 && count) {
        bits += bitsinbyte[*p++];
        count--;  //減少統計字節數
    }

    /* 
    開始用variable-precision SWAR算法計算“1”的個數,
    計算以 28 字節寬度統計,節約計算時間
     */
    p4 = (uint32_t*)p;
    while(count>=28) { //統計字節 >= 28
        uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7;

        aux1 = *p4++;
        aux2 = *p4++;
        aux3 = *p4++;
        aux4 = *p4++;
        aux5 = *p4++;
        aux6 = *p4++;
        aux7 = *p4++;
        count -= 28;

        aux1 = aux1 - ((aux1 >> 1) & 0x55555555);  //第一步
        aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333); //第二步
        aux2 = aux2 - ((aux2 >> 1) & 0x55555555);
        aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333);
        aux3 = aux3 - ((aux3 >> 1) & 0x55555555);
        aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333);
        aux4 = aux4 - ((aux4 >> 1) & 0x55555555);
        aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333);
        aux5 = aux5 - ((aux5 >> 1) & 0x55555555);
        aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333);
        aux6 = aux6 - ((aux6 >> 1) & 0x55555555);
        aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333);
        aux7 = aux7 - ((aux7 >> 1) & 0x55555555);
        aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333);
        bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) +
                    ((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) +
                    ((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) +
                    ((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) +
                    ((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) +
                    ((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) +
                    ((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24; //第三步
    }
    
    /* 計算剩餘部分字節 */
    p = (unsigned char*)p4;
    while(count--) bits += bitsinbyte[*p++];
    return bits;
}

總結:

  1. 位圖適合解決大數據存儲減少存儲資源。
    2)setbit 是通過 sds 字符串存儲,是按照位存儲。
    3)bitcount 採用 SWAR 算法,其實還採用了 bitsinbyte 預生成數組去減少計算開銷。
    4)setbit 的最大限制是 512MB
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/JDB_0GKV1xB6VJV9LkVtMw