【Redis 源碼】集羣之主從複製 replication
前言:
說到主從大家應該都不陌生,也應該都清楚主從解決服務的哪些問題。單臺服務器的支撐能力是有限的,爲了提高我們的 QPS 或者說數據的容災。主從服務則起到了相應的作用。
不過主從複製也會有一些缺點,比如說 “高可用問題”,“單服務器資源有限問題”。針對高可用問題我們後續解析 redis 集羣的哨兵,針對單服務器問題,我們會解析 redis Cluster 分佈式應用。
(一) 主從相關操作
1.1 如何建立主從關係
建立主從關係可以大致可以分爲三種形式:
1)通過配置 redis.conf
slaveof 127.0.0.1 6379
2)通過啓動 slaveof 參數
#redis-server --port 6380 --slaveof 127.0.0.1 6379
3)通過命令
>127.0.0.1:6381> slaveof 127.0.0.1 6380
說明:當我們使用 slaveof 時比如說 127.0.0.1:6380 服務調用 slaveof 127.0.0.1 6379, 其實這時候就是將 127.0.0.1:6379 當作是 127.0.0.1:6380 的主服務,127.0.0.1:6380 則是 127.0.0.1:6379 的從服務。
1.2 如何查看主從
info replication
主從信息介紹:
1.3 相關參數設置
server.c
int processCommand(client *c) {
//...省略
/* 默認從不支持寫入,需修改配置。
server.repl_slave_ro參數爲replica-read-only設置,默認情況下是不支持寫入 */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}
//...省略
}
(二) 主從複製原理及源碼分析
2.1 slaveof 建立主從過程
(1)保存主節點信息
當主從建立時會保存主信息到 server.masterhost(連接地址) 和 server.masterport(端口)中,server 爲 redisServer 結構體。
(2) 建立 socket 連接
當 server.repl_state 設置 REPL_STATE_CONNECT 宏時,則 serverCron 中調用 replicationCron 的函數中會調用 connectWithMaster 建立與主服務器的 socket 連接, 並且 server.repl_state 參數設置爲 REPL_STATE_CONNECTING 狀態。
(3) 心跳檢測 PING
當調用 connectWithMaster 建立連接時,會創建事件調用 syncWithMaster,建立連接成功後 server.repl_state 的狀態爲 REPL_STATE_CONNECTING 會發起一個 PING 檢測心跳。並且 server.repl_state 狀態會更改爲 REPL_STATE_RECEIVE_PONG,接收到兩端有效回覆後 一個肯定的 + PONG 回覆或驗證。此時 server.repl_state 狀態變爲 REPL_STATE_SEND_AUTH;
(4) 驗證授權
驗證授權會有兩種情況,一種是沒有賬號密碼直接 server.repl_state 變爲 REPL_STATE_SEND_PORT 狀態。另外一種是登錄服務端授權後 server.repl_state 變爲 REPL_STATE_SEND_PORT 狀態。
(5) 信息同步
信息同步時會同步端口、ip 地址等信息.
(6) 接收 rdb 載入
(7)連接建立完畢
2.2 建立過程源碼分析
server.h 主從同步到狀態
#define REPL_STATE_NONE 0 /* 未開啓主從同步情況 */
#define REPL_STATE_CONNECT 1 /* 待發起連接主服務器 */
#define REPL_STATE_CONNECTING 2 /* 主服務器連接成功 */
/* --- Handshake states, must be ordered --- */
#define REPL_STATE_RECEIVE_PONG 3 /* 已經發起PING操作,等待接收主服務器PONG回覆 */
#define REPL_STATE_SEND_AUTH 4 /*待發起主服務器密碼驗證 */
#define REPL_STATE_RECEIVE_AUTH 5 /* 已經發起主服務器認證“auth 密碼”操作,等待主服務器回覆 */
#define REPL_STATE_SEND_PORT 6 /* 待發送端口號 REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 7 /* 已發起端口號,等待主服務器回覆 REPLCONF reply */
#define REPL_STATE_SEND_IP 8 /* 待發送ip地址, REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 9 /* 已發送ip地址,等待主服務器回覆 REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10 /* 主從複製進行優化升級 REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 11 /*等待主服務器回覆 REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 12 /* 待發送 PSYNC命令 */
#define REPL_STATE_RECEIVE_PSYNC 13 /* 等待 PSYNC命令回覆 */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 14 /* 正在接收rdb文件 */
#define REPL_STATE_CONNECTED 15 /* 數據載入成功,主從複製建立完畢 */
replication.c 第一步 保存信息
void replicaofCommand(client *c) {
//..省略
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) { //slaveof on one 取消主從
if (server.masterhost) {
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
}
} else {
//...省略
replicationSetMaster(c->argv[1]->ptr, port); //設置主從信息
//...省略
}
addReply(c,shared.ok);
}
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip); //保存master host信息
server.masterport = port; //保存master 端口信息
//...省略
server.repl_state = REPL_STATE_CONNECT; //設置等待連接狀態
server.repl_down_since = 0;
}
保存信息到 server.masterhost 和 server.masterport 中,並且設置 server.repl_state 狀態未等待連接狀態。
replication.c 第二步創建 socket 連接
void replicationCron(void) {
static long long replication_cron_loops = 0;
//。。。省略
/* 如果是REPL_STATE_CONNECT狀態,連接到主服務器 */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) { //創建socket連接
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}
// ... 省略
}
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); //建立master socket連接
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}
//創建事件syncWithMaster方法
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
serverLog(LL_WARNING,"Can't create readable event for SYNC");
return C_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REPL_STATE_CONNECTING; //設置連接中狀態
return C_OK;
}
replication.c 第三步到第七步
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);
/* 未開啓主從同步情況 */
if (server.repl_state == REPL_STATE_NONE) {
close(fd);
return;
}
/* 第三步:發起ping 到master */
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
/* 刪除可寫事件以使可讀事件保持不變已經註冊 */
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REPL_STATE_RECEIVE_PONG; //設置等待PONG回覆
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); //發起ping到master
if (err) goto write_error;
return;
}
/* 等待PONG回覆 */
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
//..省略
server.repl_state = REPL_STATE_SEND_AUTH; //設置等待授權狀態
}
/* 第四步:發起auth命令到master授權 */
if (server.repl_state == REPL_STATE_SEND_AUTH) {
if (server.masterauth) { //設置密碼情況
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else { //未設置密碼情況
server.repl_state = REPL_STATE_SEND_PORT; //設置待發送端口號
}
}
/* 等待auth授權結果 */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
//。。。省略
server.repl_state = REPL_STATE_SEND_PORT; //設置待發送端口號
}
/* 第五步:信息同步 ,同步端口 */
if (server.repl_state == REPL_STATE_SEND_PORT) {
sds port = sdsfromlonglong(server.slave_announce_port ?
server.slave_announce_port : server.port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"listening-port",port, NULL); //同步端口
sdsfree(port);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}
/* 第五步:信息同步 ,回覆同步端口 */
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
//。。。省略
sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP;
}
/* 第五步:信息同步 ,同步端口 */
if (server.repl_state == REPL_STATE_SEND_IP &&
server.slave_announce_ip == NULL)
{
server.repl_state = REPL_STATE_SEND_CAPA;
}
/* 同步ip地址 */
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP;
return;
}
/* 回覆同步ip地址. */
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
//。。。省略
server.repl_state = REPL_STATE_SEND_CAPA;
}
/* 告訴master 當前 (slave) 的支持能力。
* REPLCONF capa eof capa psync2
*
* EOF:支持EOF風格的RDB傳輸,用於無盤複製。
* PSYNC2:支持PSYNC v2, 服務端返回標示 +CONTINUE <new repl ID>
*
* master 會忽略它不支持的能力. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return;
}
/* 回覆支持能力. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
//。。。省略
server.repl_state = REPL_STATE_SEND_PSYNC;
}
/* slaveTryPartialResynchronization() 函數中啓動有兩個作用
* 1.獲取主讀取主運行ID和偏移量,併發起 PSYNC {replid} {offset}命令複製.。
* 2.讀取PSYNC命令狀態,判斷是部分同步還是完整同步。
*/
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
// ...省略
// 讀取狀態
psync_result = slaveTryPartialResynchronization(fd,1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
/* 爲批量傳輸準備合適的臨時文件 */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
/* 創建事件調用readSyncBulkPayload函數,該函數爲接收和加載rdb文件,加載完成後會更新狀態爲REPL_STATE_CONNECTED */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
//..省略
}
2.3 心跳包檢測
主從節點在建立連接後,它們之間維護着長連接並彼此發送心跳命令:
(1) slave 主發 REPLCONF 進行 ACK 校驗 【127.0.0.1:6380 往 127.0.0.1:6379】
當前往從發送 REPLCONF ACK 19695 命令
(2)主往從發送 PING 【127.0.0.1:6379 往 127.0.0.1:6380】
當前往主發送 PING 命令
replication.c 心跳包代碼
void replicationCron(void) {
/* 不時地向主服務器發送ACK。
* 請注意,我們不會定期向不支持PSYNC和複製偏移量的主機發送ack。
*/
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
/* master 每N秒鐘給slave 一次ping */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
}
}
server.repl_ping_slave_period 參數爲 redis.conf 中的 repl-ping-replica-period 參數,定義心跳(PING)間隔,默認爲 10 秒。
總結:
- 建立主從關鍵有三種形式:redis 啓動,redis 配置,redis 命令。
- 從庫默認情況是不支持寫入操作,需要 redis.conf 配置 slave-read-only 參數。
- 主從之間是存在心跳響應,主會往從發 PING,從會往主發 ACK 校驗。
- 主從模式如果是數據可靠性服務,可以提高可靠性解決數據容災問題。
5.info replication 命令可以查看相關主從信息和複製偏移量,解決主從中遇到複製失敗問題。
6.slaveof no one 命令可以取消主從複製。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/IPnggBPaqpaj06DlHFhivA