【Redis 源碼】位圖 SETBIT、BITCOUNT
前言:
位圖並不是一個特殊的數據結構,位圖其實就是一個字符串,位圖這種結構佔用空間特別小。如果是數億以上的用戶如果存儲活躍,如果是用 key/value 去存儲每個節點都需要數 G 去存儲,存儲是很大問題。如果換做位圖去存儲則可以大大節約空間, 不過適應於用戶 ID 連續性的。除此之外也可以用作比如說點讚的存儲。
(一)命令解析
setbit 存儲解析:
1)上圖中操作 setbit bitstr 0 1 和 setbit bitstr 7 1 其實只佔用 1 個字節,創建 sds 創建 byte+1 多加了一個字節。所以看到是 2 個字節。
2) 上圖中操作 setbit bitstr 25 1 此時的 byte 等於 4 個字節,多加一個。所以是 5 個字節。
3) byte 計算形式等於 offset / 8 。這個 byte 其實是計算存儲到那個數組字節中。1 個字節 = 8bit
4)計算存儲在哪個 bit 裏, (offset % 8) + 1。
如圖:
(二)setbit 源碼解析
setbit 命令,bitops.c 中:
void setbitCommand(client *c) {
robj *o;
char *err = "bit is not an integer or out of range";
size_t bitoffset;
ssize_t byte, bit;
int byteval, bitval;
long on;
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK) //獲得offset偏移,字符串有一個512MB的限制
return;
if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != C_OK) //獲得值
return;
/* 當前值只能是1和0 */
if (on & ~1) {
addReplyError(c,err);
return;
}
if ((o = lookupStringForBitCommand(c,bitoffset)) == NULL) return; //查找或者創建字符串sds或擴容
/* Get current values */
byte = bitoffset >> 3; //btye = bitoffset / 8 計算字節位置
byteval = ((uint8_t*)o->ptr)[byte]; //獲得存儲到字節
/*
假設bitoffset=25。當前(1 << (7 - (bitoffset & 0x7)))= 64
bit = 64對應二進制 = 0100 0000
bit用於做位運算
*/
bit = 7 - (bitoffset & 0x7);
bitval = byteval & (1 << bit);
/* Update byte with new bit value and return original value */
byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit);
((uint8_t*)o->ptr)[byte] = byteval;
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero);
}
獲得 offset 偏移
int getBitOffsetFromArgument(client *c, robj *o, size_t *offset, int hash, int bits) {
long long loffset;
char *err = "bit offset is not an integer or out of range";
char *p = o->ptr;
size_t plen = sdslen(p);
int usehash = 0;
/* Handle #<offset> form. */
if (p[0] == '#' && hash && bits > 0) usehash = 1;
if (string2ll(p+usehash,plen-usehash,&loffset) == 0) {
addReplyError(c,err);
return C_ERR;
}
/* Adjust the offset by 'bits' for #<offset> form. */
if (usehash) loffset *= bits;
/* 512MB字符串限制,不能超過 */
if ((loffset < 0) || ((unsigned long long)loffset >> 3) >= (512*1024*1024))
{
addReplyError(c,err);
return C_ERR;
}
*offset = (size_t)loffset;
return C_OK;
}
查找或者創建字符串 sds 或擴容函數,bitops.c 中:
robj *lookupStringForBitCommand(client *c, size_t maxbit) {
size_t byte = maxbit >> 3; //btye = bitoffset / 8 計算字節位置
robj *o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1)); //創建時多創建一個字節
dbAdd(c->db,c->argv[1],o);
} else {
if (checkType(c,o,OBJ_STRING)) return NULL; //檢測不是字符串類型返回
o = dbUnshareStringValue(c->db,c->argv[1],o); //獲得robj對象
o->ptr = sdsgrowzero(o->ptr,byte+1); //內部使用了sdsMakeRoomFor重新計算sds字符串長度
}
return o;
}
(三)理解 variable-precision SWAR 算法
統計一個位數組中非 0 位的數量,數學上稱作:”Hanmming Weight“(漢明重量)。最高的是 variable-precision SWAR 算法,可以在常數時間內計算出多個字節的非 0 數目。
swar 函數:
int swar(uint32_t i)
{
//計算每2位二進制數中1的個數
i = ( i & 0x55555555) + ((i >> 1) & 0x55555555);
//計算每4位二進制數中1的個數
i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
//計算每8位二進制數中1的個數
i = (i & 0x0F0F0F0F) + ((i >> 4) & 0x0F0F0F0F);
//將每8位二進制數中1的個數和相加,並移至最低位8位
i = (i * 0x01010101) >> 24;
return i;
}
1)第一次計算
通過 (i & 0x55555555) + ((i >> 1) & 0x55555555)計算後所得 0x51618294,( i & 0x55555555) 得到每兩位奇數的 1 的個數,而 ((i >> 1) & 0x55555555) 在每兩位上得到偶數的 1 個數。二者相加得到的就是每兩位上 1 的個數。
每兩位的取值範圍二進制:01 、10、11。其實就是 1、2、3。
2)第二次計算
通過 (i & 0x33333333) + ((i >> 2) & 0x33333333) 計算後所得 0x21312231。計算每 4 位的 1 的個數。
3)第三次計算
通過 (i & 0x0F0F0F0F) + ((i >> 2) & 0x0F0F0F0F) 計算後所得 0x3040404。計算每 8 位的 1 的個數。
4)第四次計算
0x3040404 * 0x0101010 計算所得其實是一個 long 類型的數字 0x3070b0f0c0804。但是由於 uint32_t 只佔四位。多餘部分會被移除所以只剩下 0x0f0c0804,而 0x0f0c0804 對應的二進制是 00001111 00001100 00001000 00000100 。右移動 24 位之後剩下 00001111,而 00001111 對應的就是 15。
(四)bitcount 源碼解析
bitcount 命令
void bitcountCommand(client *c) {
robj *o;
long start, end, strlen;
unsigned char *p;
char llbuf[LONG_STR_SIZE];
。。。省略
/* Precondition: end >= 0 && end < strlen, so the only condition where
* zero can be returned is: start > end. */
if (start > end) {
addReply(c,shared.czero);
} else {
long bytes = end-start+1;
addReplyLongLong(c,redisPopcount(p+start,bytes)); //統計字節數組中的1數量
}
}
統計字節數組中的 1 數量函數
size_t redisPopcount(void *s, long count) {
size_t bits = 0;
unsigned char *p = s;
uint32_t *p4;
/**
預先生成對應的bit表,因爲一個字節非負數是可以是0~255個數字。所以生成了一個256數組。
比如說0對應的二進制一個1都沒有,bitsinbyte[0]則爲0
255對應二進制全是1,bitsinbyte[255]則爲8
*/
static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8};
/* 計算對齊,以4字節對齊。多餘部分先計算到bits中 */
while((unsigned long)p & 3 && count) {
bits += bitsinbyte[*p++];
count--; //減少統計字節數
}
/*
開始用variable-precision SWAR算法計算“1”的個數,
計算以 28 字節寬度統計,節約計算時間
*/
p4 = (uint32_t*)p;
while(count>=28) { //統計字節 >= 28
uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7;
aux1 = *p4++;
aux2 = *p4++;
aux3 = *p4++;
aux4 = *p4++;
aux5 = *p4++;
aux6 = *p4++;
aux7 = *p4++;
count -= 28;
aux1 = aux1 - ((aux1 >> 1) & 0x55555555); //第一步
aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333); //第二步
aux2 = aux2 - ((aux2 >> 1) & 0x55555555);
aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333);
aux3 = aux3 - ((aux3 >> 1) & 0x55555555);
aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333);
aux4 = aux4 - ((aux4 >> 1) & 0x55555555);
aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333);
aux5 = aux5 - ((aux5 >> 1) & 0x55555555);
aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333);
aux6 = aux6 - ((aux6 >> 1) & 0x55555555);
aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333);
aux7 = aux7 - ((aux7 >> 1) & 0x55555555);
aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333);
bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) +
((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) +
((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) +
((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) +
((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) +
((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) +
((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24; //第三步
}
/* 計算剩餘部分字節 */
p = (unsigned char*)p4;
while(count--) bits += bitsinbyte[*p++];
return bits;
}
總結:
- 位圖適合解決大數據存儲減少存儲資源。
2)setbit 是通過 sds 字符串存儲,是按照位存儲。
3)bitcount 採用 SWAR 算法,其實還採用了 bitsinbyte 預生成數組去減少計算開銷。
4)setbit 的最大限制是 512MB
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/JDB_0GKV1xB6VJV9LkVtMw