【Redis 源碼】集羣之分佈式 cluster 建立集羣關係

前言:

redis 在 redis3.0 版本之後推出 redis cluster 模式集羣,redis cluster 是官方提供的分佈式解決方案。當一個 redis 節點掛了可以快速切到另一個節點中。當遇到單機內存、併發瓶頸時可以考慮使用 redis cluster。
集羣的內容會比較長,這一章會分爲兩篇作爲描述:
《集羣之分佈式 cluster 建立集羣關係》與《集羣之分佈式 cluster 原理》

(一)cluster 基礎知識

1.1 瞭解 cluser

一個 redis 集羣通常由多個節點組成,起初每個階段都是獨立的個體。它們都在自己的集羣當中。如果要構建一個真正的集羣,我們必須將各個獨立的節點連接在一起,構成一個包含多節點的集羣。

如圖中所示:

ZoxQgk

我們要構成這些節點,可以通過命令或者配置構成。

1.2 配置 cluster

  1. 配置信息
port 6389
#開啓cluster模式
cluster-enabled yes

#配置節點之間超時時間
cluster-node-timeout 15000

#這個配置很重要,cluster開啓必須重命名指定cluster-config-file
#不能與別的節點相同,否則會啓動失敗,最好按主機+端口命名
#其次,該文件保存了本節點與其他節點的信息及關係
cluster-config-file nodes-6389.conf
  1. 創建 redis cluster
redis-cli --cluster create 127.0.0.1:6379 127.0.0.1:6380  127.0.0.1:6381 127.0.0.1:6389 127.0.0.1:6390 127.0.0.1:6391 --cluster-replicas 1

該條命令時 redis5.0 客戶端才支持的。除了這種方式還可以使用 redis-trib.rb

在創建節點過程中如果時遇到 [ERR] Not all 16384 slots are covered by nodes. 錯誤,
或者使用過程中遇到 (error) CLUSTERDOWN The cluster is down。
可以執行如下命令:

redis-cli --cluster fix 127.0.0.1:6379

修復過程中,我們可以看到我們集羣中有 16384 個槽可以分別指派給集羣中的各個節點。

1.3 基礎命令

#查看當前節點
CLUSTER NODES

#將 ip 和 port 所指定的節點添加到集羣中
CLUSTER MEET <ip> <port> . 

#從集羣中移除 node_id 指定的節點
CLUSTER FORGET <node_id>

#將當前節點設置爲 node_id 指定的節點的從節點
CLUSTER REPLICATE <node_id>

#將節點的配置文件保存到硬盤裏面
CLUSTER SAVECONFIG

#將一個或多個槽(slot)指派(assign)給當前節點
CLUSTER ADDSLOTS <slot> [slot ...]

#移除一個或多個槽對當前節點的指派
CLUSTER DELSLOTS <slot> [slot ...]

#移除當前節點所有槽
CLUSTER FLUSHSLOTS

#將槽 slot 指派給 node_id 指定的節點,如果槽已經指派給另一個節點,
#那麼先讓另一個節點刪除該槽,然後再進行指派
CLUSTER SETSLOT <slot> NODE <node_id>

#將本節點的槽 slot 遷移到 node_id 指定的節點中
CLUSTER SETSLOT <slot> MIGRATING <node_id>

#從 node_id 指定的節點中導入槽 slot 到本節點
CLUSTER SETSLOT <slot> IMPORTING <node_id>

#取消對槽 slot 的導入(import)或者遷移(migrate)
CLUSTER SETSLOT <slot> STABLE

#計算鍵 key 應該被放置在哪個槽上
CLUSTER KEYSLOT <key>

#返回槽 slot 目前包含的鍵值對數量
CLUSTER COUNTKEYSINSLOT <slot>

#返回 count 個 slot 槽中的鍵
CLUSTER GETKEYSINSLOT <slot> <count>

(二) 源碼分析

2.1 基礎結構

#define CLUSTER_SLOTS 16384  //對應卡槽最大數量
typedef struct clusterNode {
    mstime_t ctime;              /* 創建節點時間. */
    char name[CLUSTER_NAMELEN];  /* 節點名稱 hex 字節串, 40個字節 */
    int flags;                   /* 節點標識,CLUSTER_NODE_... */
    uint64_t configEpoch;        /* 節點當前的配置紀元,用於實現故障轉移 */
    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
    int numslots;                         /* 此節點處理的插槽數 */
    int numslaves;                        /* 如果這是主節點,則從節點的數量 */
    struct clusterNode **slaves;         /*  節點從節點指針 */
    struct clusterNode *slaveof;         /* 指向主節點的指針。*/
    mstime_t ping_sent;      /* 最新一個 ping 時間 */
    mstime_t pong_received;  /* 最新一個回覆 pong 時間*/
    mstime_t fail_time;      /* 設置失敗標誌的Unix時間 */
    mstime_t voted_time;     /* 上一次投票時間 */
    mstime_t repl_offset_time;  /* 我們收到此節點偏移量的Unix時間 */
    mstime_t orphaned_time;     /* 孤立主條件開始的時間 */
    long long repl_offset;      /* 此節點的最後一個已知複製偏移量. */
    char ip[NET_IP_STR_LEN];  /* 節點IP地址 */
    int port;                   /* 節點端口 */
    int cport;                  /* 此節點的最新已知羣集端口. */
    clusterLink *link;          /* 節點 TCP/IP 連接信息  */
    list *fail_reports;         /* 失敗節點列表 */
} clusterNode;

CLUSTER_SLOTS宏是我們cluster卡槽數量,link保存了連接節點所需的有關信息, 比如套接字描述符, 輸入緩衝區和輸出緩衝區:
typedef struct clusterLink {
    mstime_t ctime;             /* Link 創建時間 */
    int fd;                     /* TCP socket 描述符 */
    sds sndbuf;                 /* 輸出緩衝區 */
    sds rcvbuf;                 /* 輸入緩衝區 */
    struct clusterNode *node;   /* 與這個連接相關聯的節點,如果沒有的話就爲 NULL */
} clusterLink;

每個連接都會都會維護一個clusterState狀態。
typedef struct clusterState {
    clusterNode *myself;   /* 指向當前節點指針 */
    uint64_t currentEpoch; /* 集羣當前的配置紀元,用於故障恢復 */
    int state;            /* CLUSTER_OK, CLUSTER_FAIL, ... */
    int size;             /* 集羣中至少處理着一個槽的節點的數量 */
    dict *nodes;          /* 集羣節點名單 對應 clusterNode 結構體 */
    //...省略
} clusterState;

2.2 初始化 cluster

cluster 命令方法:

void clusterCommand(client *c) {
    if (server.cluster_enabled == 0) { //判斷是否開啓cluster
        addReplyError(c,"This instance has cluster support disabled");
        return;
    }
    //判斷參數必須是4個或者5個,且第二個參數等於meet
    if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
        /* CLUSTER MEET <ip> <port> [cport] */
        long long port, cport;

        if (getLongLongFromObject(c->argv[3], &port) != C_OK) { //獲得端口參數
            addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
                                (char*)c->argv[3]->ptr);
            return;
        }

        if (c->argc == 5) { //5個參數時
            if (getLongLongFromObject(c->argv[4], &cport) != C_OK) { // 獲得cport
                addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
                                    (char*)c->argv[4]->ptr);
                return;
            }
        } else {
            cport = port + CLUSTER_PORT_INCR; //默認清楚port + 10000
        }

        if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 && .  //握手
            errno == EINVAL)
        {
            addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
        } else {
            addReply(c,shared.ok);
        }
    }
    //。。。省略
}

握手函數源碼

int clusterStartHandshake(char *ip, int port, int cport) {
    clusterNode *n;
    char norm_ip[NET_IP_STR_LEN];
    struct sockaddr_storage sa;

    /* IP健全性檢查 */
    if (inet_pton(AF_INET,ip,
            &(((struct sockaddr_in *)&sa)->sin_addr)))
    {
        sa.ss_family = AF_INET;
    } else if (inet_pton(AF_INET6,ip,
            &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
    {
        sa.ss_family = AF_INET6;
    } else {
        errno = EINVAL;
        return 0;
    }

    /* 端口健全性檢測 */
    if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
        errno = EINVAL;
        return 0;
    }

    /* 網絡ip地址轉寒*/
    memset(norm_ip,0,NET_IP_STR_LEN);
    if (sa.ss_family == AF_INET)
        inet_ntop(AF_INET,
            (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
            norm_ip,NET_IP_STR_LEN);
    else
        inet_ntop(AF_INET6,
            (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
            norm_ip,NET_IP_STR_LEN);
    //判斷是否正在握手中,防止重複握手
    if (clusterHandshakeInProgress(norm_ip,port,cport)) {
        errno = EAGAIN;
        return 0;
    }

    /* 創建node節點結構*/
    n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    n->cport = cport;
    //添加nodes節點到server.cluster->nodes中
    clusterAddNode(n);
    return 1;
}

加入 node 到 server.cluster->nodes 後,serverCron 中會調用 clusterCron。該函數中會判斷處於握手狀態的節點是否握手超時,如果是。則調用 clusterDelNode 函數刪除節點。如果節點的節點 TCP/IP 連接信息 等於空時(link == NULL),會發起 tcp/ip 連接,且將 fd 信息保存到節點 link->fd 中。已經 ping 信息。(該部分下一章詳細講解)

2.3 鍵值設置流程


1)第一步通過 6379 端口登錄

#redis-cli -p 6379 -c

2)下斷點 processCommand

int processCommand(client *c) {
     //。。。省略
    /* 如果啓用羣集,請在此處執行羣集重定向。
    *但是,如果發生以下情況,則不執行重定向:
    *1)這個命令的發送者是我們的master。
    *2)命令沒有鍵參數。*/
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        //獲得key屬於哪個slot,內部通過keyHashSlot函數計算slot
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) { //判斷node不是自己
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code); //跳轉
            return C_OK;
        }
    }
    //。。。省略
}

計算 slot 分佈函數,keyHashSlot

unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */

    for (s = 0; s < keylen; s++)   //計算{的位置
        if (key[s] == '{') break;

    /* 沒有{情況下直接 crc16(key) % 16384 = crc16(key) & 0x3FFF */
    if (s == keylen) return crc16(key,keylen) & 0x3FFF;

    /* 有'{'情況,計算}的位置  */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;

    /* 沒有 '}'或者不是{} 閉合 */
    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

    /* {key}閉合時取中間的key */
    return crc16(key+s+1,e-s-1) & 0x3FFF;
}

函數中計算 slot 的分佈情況, 以 crc16(key) % 16384 。


上圖爲計算 slots


上圖爲跳轉函數


上圖爲跳轉後展示

總結:

1.cluster 中是採用 hash 槽,有 16384 個槽可以分別指派給集羣中的各個節點。每個節點都會記錄哪些槽分配給自己,哪些槽指派給其他節點。在 node_xxx.conf 中也可以體現。
2.redis5.0 客戶端通過 redis-cli --cluster 可以創建 cluster 集羣,通過 redis-cli --cluster fix 可以修復集羣重新分配槽。
3. 調用命令時,會發起調用 processCommand 函數,函數中如果開啓 cluster,計算的 slot 後得到 node 節點不是自己。則客戶端連接則會跳轉。
4.CLUSTER MEET 命令可以加入 cluster,加入時發起握手相關操作,檢測服務正常後,加入節點信息到 server.cluster->nodes 中。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/lEPnBQuFPRUTno2O7kPI8A