【Redis 源碼】集羣之主從複製 replication

前言:

說到主從大家應該都不陌生,也應該都清楚主從解決服務的哪些問題。單臺服務器的支撐能力是有限的,爲了提高我們的 QPS 或者說數據的容災。主從服務則起到了相應的作用。
不過主從複製也會有一些缺點,比如說 “高可用問題”,“單服務器資源有限問題”。針對高可用問題我們後續解析 redis 集羣的哨兵,針對單服務器問題,我們會解析 redis Cluster 分佈式應用。

Uswmtz

(一) 主從相關操作

1.1 如何建立主從關係

建立主從關係可以大致可以分爲三種形式:
1)通過配置 redis.conf

slaveof 127.0.0.1 6379

2)通過啓動 slaveof 參數

#redis-server --port  6380 --slaveof 127.0.0.1 6379

3)通過命令

>127.0.0.1:6381> slaveof 127.0.0.1 6380

說明:當我們使用 slaveof 時比如說 127.0.0.1:6380 服務調用 slaveof 127.0.0.1 6379, 其實這時候就是將 127.0.0.1:6379 當作是 127.0.0.1:6380 的主服務,127.0.0.1:6380 則是 127.0.0.1:6379 的從服務。

1.2 如何查看主從

info replication

主從信息介紹:

ILJEQC

1.3 相關參數設置

VDuKf1

server.c

int processCommand(client *c) { 
    //...省略
       
    /* 默認從不支持寫入,需修改配置。
    server.repl_slave_ro參數爲replica-read-only設置,默認情況下是不支持寫入 */
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        c->cmd->flags & CMD_WRITE)
    {
        addReply(c, shared.roslaveerr);
        return C_OK;
    }
    //...省略
}

(二) 主從複製原理及源碼分析

2.1 slaveof 建立主從過程

(1)保存主節點信息
當主從建立時會保存主信息到 server.masterhost(連接地址) 和 server.masterport(端口)中,server 爲 redisServer 結構體。

(2) 建立 socket 連接
當 server.repl_state 設置 REPL_STATE_CONNECT 宏時,則 serverCron 中調用 replicationCron 的函數中會調用 connectWithMaster 建立與主服務器的 socket 連接, 並且 server.repl_state 參數設置爲 REPL_STATE_CONNECTING 狀態。

(3) 心跳檢測 PING
當調用 connectWithMaster 建立連接時,會創建事件調用 syncWithMaster,建立連接成功後 server.repl_state 的狀態爲 REPL_STATE_CONNECTING 會發起一個 PING 檢測心跳。並且 server.repl_state 狀態會更改爲 REPL_STATE_RECEIVE_PONG,接收到兩端有效回覆後 一個肯定的 + PONG 回覆或驗證。此時 server.repl_state 狀態變爲 REPL_STATE_SEND_AUTH;

(4) 驗證授權
驗證授權會有兩種情況,一種是沒有賬號密碼直接 server.repl_state 變爲 REPL_STATE_SEND_PORT 狀態。另外一種是登錄服務端授權後 server.repl_state 變爲 REPL_STATE_SEND_PORT 狀態。

(5) 信息同步
信息同步時會同步端口、ip 地址等信息.

(6) 接收 rdb 載入

(7)連接建立完畢

2.2 建立過程源碼分析

server.h 主從同步到狀態

#define REPL_STATE_NONE 0       /*  未開啓主從同步情況 */
#define REPL_STATE_CONNECT 1    /* 待發起連接主服務器 */
#define REPL_STATE_CONNECTING 2  /* 主服務器連接成功 */
/* --- Handshake states, must be ordered --- */
#define REPL_STATE_RECEIVE_PONG 3 /* 已經發起PING操作,等待接收主服務器PONG回覆 */
#define REPL_STATE_SEND_AUTH 4    /*待發起主服務器密碼驗證 */
#define REPL_STATE_RECEIVE_AUTH 5 /* 已經發起主服務器認證“auth 密碼”操作,等待主服務器回覆 */
#define REPL_STATE_SEND_PORT 6    /* 待發送端口號 REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 7 /* 已發起端口號,等待主服務器回覆 REPLCONF reply */
#define REPL_STATE_SEND_IP 8      /* 待發送ip地址, REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 9   /* 已發送ip地址,等待主服務器回覆 REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10    /* 主從複製進行優化升級 REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 11 /*等待主服務器回覆 REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 12    /* 待發送 PSYNC命令 */
#define REPL_STATE_RECEIVE_PSYNC 13 /* 等待 PSYNC命令回覆 */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 14      /* 正在接收rdb文件 */
#define REPL_STATE_CONNECTED 15     /* 數據載入成功,主從複製建立完畢 */

replication.c 第一步 保存信息

void replicaofCommand(client *c) {
    //..省略
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {   //slaveof on one 取消主從
        if (server.masterhost) {
            replicationUnsetMaster();
            sds client = catClientInfoString(sdsempty(),c);
            serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
                client);
            sdsfree(client);
        }
    } else {
        //...省略
        replicationSetMaster(c->argv[1]->ptr, port);  //設置主從信息
        //...省略
    }
    addReply(c,shared.ok);
}

void replicationSetMaster(char *ip, int port) {
    int was_master = server.masterhost == NULL;

    sdsfree(server.masterhost);
    server.masterhost = sdsnew(ip);   //保存master host信息
    server.masterport = port;         //保存master 端口信息
    //...省略
    server.repl_state = REPL_STATE_CONNECT; //設置等待連接狀態
    server.repl_down_since = 0;
}

保存信息到 server.masterhost 和 server.masterport 中,並且設置 server.repl_state 狀態未等待連接狀態。

replication.c 第二步創建 socket 連接

void replicationCron(void) {
    static long long replication_cron_loops = 0;
    //。。。省略

    /* 如果是REPL_STATE_CONNECT狀態,連接到主服務器 */
    if (server.repl_state == REPL_STATE_CONNECT) {
        serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
            server.masterhost, server.masterport);
        if (connectWithMaster() == C_OK) { //創建socket連接
            serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
    // ... 省略
}

int connectWithMaster(void) {
    int fd;

    fd = anetTcpNonBlockBestEffortBindConnect(NULL,
        server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); //建立master socket連接
    if (fd == -1) {
        serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return C_ERR;
    }
    //創建事件syncWithMaster方法
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        serverLog(LL_WARNING,"Can't create readable event for SYNC");
        return C_ERR;
    }

    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    server.repl_state = REPL_STATE_CONNECTING;  //設置連接中狀態
    return C_OK;
}

replication.c 第三步到第七步

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err = NULL;
    int dfd = -1, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    UNUSED(el);
    UNUSED(privdata);
    UNUSED(mask);

    /* 未開啓主從同步情況 */
    if (server.repl_state == REPL_STATE_NONE) {
        close(fd);
        return;
    }


    /* 第三步:發起ping 到master */
    if (server.repl_state == REPL_STATE_CONNECTING) {
        serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
        /* 刪除可寫事件以使可讀事件保持不變已經註冊 */
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        server.repl_state = REPL_STATE_RECEIVE_PONG; //設置等待PONG回覆
     
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); //發起ping到master
        if (err) goto write_error;
        return;
    }

    /* 等待PONG回覆 */
    if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        //..省略
        server.repl_state = REPL_STATE_SEND_AUTH; //設置等待授權狀態
    }

    /* 第四步:發起auth命令到master授權 */
    if (server.repl_state == REPL_STATE_SEND_AUTH) {
        if (server.masterauth) { //設置密碼情況
            err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
            if (err) goto write_error;
            server.repl_state = REPL_STATE_RECEIVE_AUTH;
            return;
        } else {  //未設置密碼情況
            server.repl_state = REPL_STATE_SEND_PORT;   //設置待發送端口號
        }
    }

    /* 等待auth授權結果 */
    if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        //。。。省略
        server.repl_state = REPL_STATE_SEND_PORT; //設置待發送端口號
    }

    /* 第五步:信息同步 ,同步端口 */
    if (server.repl_state == REPL_STATE_SEND_PORT) {
        sds port = sdsfromlonglong(server.slave_announce_port ?
            server.slave_announce_port : server.port);
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "listening-port",port, NULL);  //同步端口
        sdsfree(port);
        if (err) goto write_error;
        sdsfree(err);
        server.repl_state = REPL_STATE_RECEIVE_PORT;
        return;
    }

    /* 第五步:信息同步 ,回覆同步端口 */
    if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        //。。。省略
        sdsfree(err);
        server.repl_state = REPL_STATE_SEND_IP;
    }

    /* 第五步:信息同步 ,同步端口 */
    if (server.repl_state == REPL_STATE_SEND_IP &&
        server.slave_announce_ip == NULL)
    {
            server.repl_state = REPL_STATE_SEND_CAPA;
    }

    /* 同步ip地址 */
    if (server.repl_state == REPL_STATE_SEND_IP) {
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "ip-address",server.slave_announce_ip, NULL);
        if (err) goto write_error;
        sdsfree(err);
        server.repl_state = REPL_STATE_RECEIVE_IP;
        return;
    }

    /* 回覆同步ip地址. */
    if (server.repl_state == REPL_STATE_RECEIVE_IP) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        //。。。省略
        server.repl_state = REPL_STATE_SEND_CAPA;
    }

    /*  告訴master 當前 (slave) 的支持能力。
     *  REPLCONF capa eof capa  psync2 
     *
     * EOF:支持EOF風格的RDB傳輸,用於無盤複製。
     * PSYNC2:支持PSYNC v2, 服務端返回標示 +CONTINUE <new repl ID>    
     *
     * master 會忽略它不支持的能力. */
    if (server.repl_state == REPL_STATE_SEND_CAPA) {
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "capa","eof","capa","psync2",NULL);
        if (err) goto write_error;
        sdsfree(err);
        server.repl_state = REPL_STATE_RECEIVE_CAPA;
        return;
    }

    /* 回覆支持能力. */
    if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        //。。。省略
        server.repl_state = REPL_STATE_SEND_PSYNC;
    }

    /* slaveTryPartialResynchronization() 函數中啓動有兩個作用
     * 1.獲取主讀取主運行ID和偏移量,併發起 PSYNC  {replid} {offset}命令複製.。
     * 2.讀取PSYNC命令狀態,判斷是部分同步還是完整同步。
     */
    if (server.repl_state == REPL_STATE_SEND_PSYNC) {
        if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
            err = sdsnew("Write error sending the PSYNC command.");
            goto write_error;
        }
        server.repl_state = REPL_STATE_RECEIVE_PSYNC;
        return;
    }
    
    // ...省略
    
    // 讀取狀態
    psync_result = slaveTryPartialResynchronization(fd,1);
    if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */


    /* 爲批量傳輸準備合適的臨時文件 */
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
        goto error;
    }

    /* 創建事件調用readSyncBulkPayload函數,該函數爲接收和加載rdb文件,加載完成後會更新狀態爲REPL_STATE_CONNECTED */
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        serverLog(LL_WARNING,
            "Can't create readable event for SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }
    //..省略
}

2.3 心跳包檢測

主從節點在建立連接後,它們之間維護着長連接並彼此發送心跳命令:


(1) slave 主發 REPLCONF 進行 ACK 校驗 【127.0.0.1:6380 往 127.0.0.1:6379】

當前往從發送 REPLCONF ACK 19695 命令

(2)主往從發送 PING 【127.0.0.1:6379 往 127.0.0.1:6380】

當前往主發送 PING 命令

replication.c 心跳包代碼

 void replicationCron(void) {   
    /* 不時地向主服務器發送ACK。
     * 請注意,我們不會定期向不支持PSYNC和複製偏移量的主機發送ack。
     */
    if (server.masterhost && server.master &&
        !(server.master->flags & CLIENT_PRE_PSYNC))
        replicationSendAck();

  
    /* master 每N秒鐘給slave 一次ping */
    if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
        listLength(server.slaves))
    {
        ping_argv[0] = createStringObject("PING",4);
        replicationFeedSlaves(server.slaves, server.slaveseldb,
            ping_argv, 1);
        decrRefCount(ping_argv[0]);
    }
}

server.repl_ping_slave_period 參數爲 redis.conf 中的 repl-ping-replica-period 參數,定義心跳(PING)間隔,默認爲 10 秒。

總結:

  1. 建立主從關鍵有三種形式:redis 啓動,redis 配置,redis 命令。
  2. 從庫默認情況是不支持寫入操作,需要 redis.conf 配置 slave-read-only 參數。
  3. 主從之間是存在心跳響應,主會往從發 PING,從會往主發 ACK 校驗。
  4. 主從模式如果是數據可靠性服務,可以提高可靠性解決數據容災問題。
    5.info replication 命令可以查看相關主從信息和複製偏移量,解決主從中遇到複製失敗問題。
    6.slaveof no one 命令可以取消主從複製。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/IPnggBPaqpaj06DlHFhivA