redis 源碼:通信流程分析

大家好,我是盼盼!

最近想學習一下 redis 源碼,先看一下 redis 通信流程。由於功力有限,不足之處望大家指正。服務端和客戶端通信,一般都是服務端先啓動,那先從服務端的源碼看起。

//事件處理器的狀態
typedef struct aeEventLoop {
    // 目前已註冊的最大描述符
    int maxfd;   /* highest file descriptor currently registered */
    // 目前已追蹤的最大描述符
    int setsize; /* max number of file descriptors tracked */
    // 用於生成時間事件 id
    long long timeEventNextId;
    // 最後一次執行時間事件的時間
    time_t lastTime;     /* Used to detect system clock skew */
    // 已註冊的文件事件
    aeFileEvent *events; /* Registered events */
    // 已就緒的文件事件
    aeFiredEvent *fired; /* Fired events */
    // 時間事件
    aeTimeEvent *timeEventHead;
    // 事件處理器的開關
    int stop;
    // 多路複用庫的私有數據
    void *apidata; /* This is used for polling API specific data */
    // 在處理事件前要執行的函數
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

下面要對事件處理器進行初始化。

//事件狀態
typedef struct aeApiState {
    // epoll_event 實例描述符
    int epfd;
    // 事件槽
    struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    if (!state) return -1;
    // 初始化事件槽空間
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    // 創建 epoll 實例
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    // 賦值給 eventLoop
    eventLoop->apidata = state;
    return 0;
}

上面創建的 epoll 句柄和初始化的事件槽保存到傳入的 eventLoop 事件對象中。這個事件對象保存在全局的一個 redisserver 中,redisServer 中結構體成員很多,這裏只展示一個。

struct redisServer {
    //...
    // 事件狀態
    aeEventLoop *el;
    // 一個鏈表,保存了所有客戶端狀態結構
    list *clients;              /* List of active clients */
    /...
};

aeEventLoop *el 存儲剛纔創建的事件狀態。

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
    int s, rv;
    char _port[6];  /* strlen("65535") */
    struct addrinfo hints, *servinfo, *p;
    snprintf(_port,6,"%d",port);
    memset(&hints,0,sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;    /* No effect if bindaddr != NULL */
    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
            continue;
        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
        goto end;
    }
    if (p == NULL) {
        anetSetError(err, "unable to bind socket");
        goto error;
    }
error:
    s = ANET_ERR;
end:
    freeaddrinfo(servinfo);
    return s;
}

上面的函數用來打開監聽端口。

 // 爲 TCP 連接關聯連接應答(accept)處理器
    // 用於接受並應答客戶端的 connect() 調用
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, //使文件讀關聯一個函數
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
/*
 * 根據 mask 參數的值,監聽 fd 文件的狀態,
 * 當 fd 可用時,執行 proc 函數
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    if (fd >= eventLoop->setsize) return AE_ERR;
    // 取出文件事件結構
    aeFileEvent *fe = &eventLoop->events[fd];
    // 監聽指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    // 設置文件事件類型,以及事件的處理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    // 私有數據
    fe->clientData = clientData;
    // 如果有需要,更新事件處理器的最大 fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

aeCreateFileEvent 函數用來註冊回調用,參數 aeEventLoop *eventLoop 就是前面初始化的事件處理器的狀態,當 AE_READABLE 產生時就會調用 acceptTcpHandler 函數,這時是有客戶端 connect 了。
前面已經初始化了一定數量的處理器,aeApiAddEvent 把所有的事件對象都註冊到 epoll,後面接着設置對應 AE_READABLE 和 AE_WRITABLE 對應的回調函數。

/* File event structure
 *
 * 文件事件結構
 */
typedef struct aeFileEvent {
    // 監聽事件類型掩碼,
    // 值可以是 AE_READABLE 或 AE_WRITABLE ,
    // 或者 AE_READABLE | AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE) */
    // 讀事件處理器
    aeFileProc *rfileProc;
    // 寫事件處理器
    aeFileProc *wfileProc;
    // 多路複用庫的私有數據
    void *clientData;
} aeFileEvent;

上面是文件事件結構的結構體,對應的讀和寫的回調函數都保存在 aeFileEvent(文件事件) 中,aeFileEvent(文件事件) 就是 aeEventLoop(事件處理器狀態) 的成員,aeEventLoop(事件處理器狀態) 就是 redisServer 結構體中 aeEventLoop *el(事件狀態成員),所有的這些都保存在全局的 redisServer 結構體中。接下來就是事件處理器主循環中。

/*
 * 事件處理器的主循環
 */
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 如果有需要在事件處理前執行的函數,那麼運行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 開始處理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);//一直循環調用這個函數等到消息
    }
}
//處理所有已到達的時間事件,以及所有已就緒的文件事件。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        // 獲取最近的時間事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果時間事件存在的話
            // 那麼根據最近可執行時間事件和現在時間的時間差來決定文件事件的阻塞時間
            long now_sec, now_ms;
            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 計算距今最近的時間事件還要多久才能達到
            // 並將該時間距保存在 tv 結構中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            // 時間差小於 0 ,說明事件已經可以執行了,將秒和毫秒設爲 0 (不阻塞)
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            // 執行到這一步,說明沒有時間事件
            // 那麼根據 AE_DONT_WAIT 是否設置來決定是否阻塞,以及阻塞的時間長度
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 設置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 文件事件可以阻塞直到有事件到達爲止
                tvp = NULL; /* wait forever */
            }
        }
        // 處理文件事件,阻塞時間由 tvp 決定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 從已就緒數組中獲取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 讀事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 確保讀/寫事件只能執行其中一個
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 寫事件
            if (fe->mask & mask & AE_WRITABLE) {
                printf("can writable\n");
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    // 執行時間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    return processed; /* return the number of processed file/time events */
}
/*
 * 獲取可執行事件
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 等待時間
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);//epoll_wait用於向用戶進程返回ready list
    // 有至少一個事件就緒?
    if (retval > 0) {
        int j;
        // 爲已就緒事件設置相應的模式
        // 並加入到 eventLoop 的 fired 數組中
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    // 返回已就緒事件個數
    return numevents;
}

aeProcessEvents 一直被循環調用用來處理就緒的文件事件 (時間事件這裏不考慮),通過調用 aeApiPoll 中的 epoll_wait 等待事件的促發。

typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
/* A fired event
 *
 * 已就緒事件
 */
typedef struct aeFiredEvent {
    // 已就緒文件描述符
    int fd;
    // 事件類型掩碼,
    // 值可以是 AE_READABLE 或 AE_WRITABLE
    // 或者是兩者的或
    int mask;
} aeFiredEvent;

上面列出了 epoll 結構體和 aeFiredEvent(已就緒事件結構體),aeFiredEvent 屬於事件處理器狀態 (aeEventLoop) 成員,並循環保存就緒文件事件對應中已就緒描述符和其類型,這些又都保存在事件處理器狀態 (aeEventLoop) 中。
函數返回到 aeProcessEvents 中,然後走對應的回調 (這時候還沒講回調關聯對應的函數)。

現在假如有客戶端來連接了,按前面說的,套接字變的可讀,acceptTcpHandler 被調用,acceptTcpHandler 函數接收客戶端的連接,併爲客戶端創建狀態,並註冊讀取客戶端命令的函數 readQueryFromClient。並把創建的客戶端保存在 redisServer 裏面的 list *clients 裏面。

/*
 * 創建一個新客戶端
 */
redisClient *createClient(int fd) {
    printf("-----------%s--------\n",__FUNCTION__);
    // 分配空間
    redisClient *c = zmalloc(sizeof(redisClient));
    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the Redis commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    // 當 fd 不爲 -1 時,創建帶網絡連接的客戶端
    // 如果 fd 爲 -1 ,那麼創建無網絡連接的僞客戶端
    // 因爲 Redis 的命令必須在客戶端的上下文中使用,所以在執行 Lua 環境中的命令時
    // 需要用到這種僞終端
    if (fd != -1) {
        // 非阻塞
        anetNonBlock(NULL,fd);
        // 禁用 Nagle 算法
        anetEnableTcpNoDelay(NULL,fd);
        // 設置 keep alive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 綁定讀事件到事件 loop (開始接收命令請求)
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,   //客戶端連接上之後,再爲客戶端關聯一個讀數據的函數。之前關聯的建立連接
            readQueryFromClient, c) == AE_ERR)            //沒有建立連接之前關聯建立函數,建立連接之後關聯讀數據的函數
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    // 初始化各個屬性
    // 默認數據庫
    selectDb(c,0);
    // 套接字
    c->fd = fd;
    // 名字
    c->name = NULL;
    // 回覆緩衝區的偏移量
    c->bufpos = 0;
    // 查詢緩衝區
    c->querybuf = sdsempty();
    // 查詢緩衝區峯值
    c->querybuf_peak = 0;
    // 命令請求的類型
    c->reqtype = 0;
    // 命令參數數量
    c->argc = 0;
    // 命令參數
    c->argv = NULL;
    // 當前執行的命令和最近一次執行的命令
    c->cmd = c->lastcmd = NULL;
    // 查詢緩衝區中未讀入的命令內容數量
    c->multibulklen = 0;
    // 讀入的參數的長度
    c->bulklen = -1;
    // 已發送字節數
    c->sentlen = 0;
    // 狀態 FLAG
    c->flags = 0;
    // 創建時間和最後一次互動時間
    c->ctime = c->lastinteraction = server.unixtime;
    // 認證狀態
    c->authenticated = 0;
    // 複製狀態
    c->replstate = REDIS_REPL_NONE;
    // 複製偏移量
    c->reploff = 0;
    // 通過 ACK 命令接收到的偏移量
    c->repl_ack_off = 0;
    // 通過 AKC 命令接收到偏移量的時間
    c->repl_ack_time = 0;
    // 客戶端爲從服務器時使用,記錄了從服務器所使用的端口號
    c->slave_listening_port = 0;
    // 回覆鏈表
    c->reply = listCreate();
    // 回覆鏈表的字節量
    c->reply_bytes = 0;
    // 回覆緩衝區大小達到軟限制的時間
    c->obuf_soft_limit_reached_time = 0;
    // 回覆鏈表的釋放和複製函數
    listSetFreeMethod(c->reply,decrRefCountVoid);
    listSetDupMethod(c->reply,dupClientReplyValue);
    // 阻塞類型
    c->btype = REDIS_BLOCKED_NONE;
    // 阻塞超時
    c->bpop.timeout = 0;
    // 造成客戶端阻塞的列表鍵
    c->bpop.keys = dictCreate(&setDictType,NULL);
    // 在解除阻塞時將元素推入到 target 指定的鍵中
    // BRPOPLPUSH 命令時使用
    c->bpop.target = NULL;
    c->bpop.numreplicas = 0;
    c->bpop.reploffset = 0;
    c->woff = 0;
    // 進行事務時監視的鍵
    c->watched_keys = listCreate();
    // 訂閱的頻道和模式
    c->pubsub_channels = dictCreate(&setDictType,NULL);
    c->pubsub_patterns = listCreate();
    c->peerid = NULL;
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    // 如果不是僞客戶端,那麼添加到服務器的客戶端鏈表中
    if (fd != -1) listAddNodeTail(server.clients,c);
    // 初始化客戶端的事務狀態
    initClientMultiState(c);
    // 返回客戶端
    return c;
}
/* With multiplexing we need to take per-client state.
 * Clients are taken in a liked list.
 *
 * 因爲 I/O 複用的緣故,需要爲每個客戶端維持一個狀態。
 *
 * 多個客戶端狀態被服務器用鏈表連接起來。
 */
typedef struct redisClient {
    // 套接字描述符
    int fd;
    // 當前正在使用的數據庫
    redisDb *db;
    // 當前正在使用的數據庫的 id (號碼)
    int dictid;
    // 客戶端的名字
    robj *name;             /* As set by CLIENT SETNAME */
    // 查詢緩衝區
    sds querybuf;
    // 查詢緩衝區長度峯值
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size */
    // 參數數量
    int argc;
    // 參數對象數組
    robj **argv;
    // 記錄被客戶端執行的命令
    struct redisCommand *cmd, *lastcmd;
    // 請求的類型:內聯命令還是多條命令
    int reqtype;
    // 剩餘未讀取的命令內容數量
    int multibulklen;       /* number of multi bulk arguments left to read */
    // 命令內容的長度
    long bulklen;           /* length of bulk argument in multi bulk request */
    // 回覆鏈表
    list *reply;
    // 回覆鏈表中對象的總大小
    unsigned long reply_bytes; /* Tot bytes of objects in reply list */
    // 已發送字節,處理 short write 用
    int sentlen;            /* Amount of bytes already sent in the current
                               buffer or object being sent. */
    // 創建客戶端的時間
    time_t ctime;           /* Client creation time */
    // 客戶端最後一次和服務器互動的時間
    time_t lastinteraction; /* time of the last interaction, used for timeout */
    // 客戶端的輸出緩衝區超過軟性限制的時間
    time_t obuf_soft_limit_reached_time;
    // 客戶端狀態標誌
    int flags;              /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
    // 當 server.requirepass 不爲 NULL 時
    // 代表認證的狀態
    // 0 代表未認證, 1 代表已認證
    int authenticated;      /* when requirepass is non-NULL */
    // 複製狀態
    int replstate;          /* replication state if this is a slave */
    // 用於保存主服務器傳來的 RDB 文件的文件描述符
    int repldbfd;           /* replication DB file descriptor */
    // 讀取主服務器傳來的 RDB 文件的偏移量
    off_t repldboff;        /* replication DB file offset */
    // 主服務器傳來的 RDB 文件的大小
    off_t repldbsize;       /* replication DB file size */
    sds replpreamble;       /* replication DB preamble. */
    // 主服務器的複製偏移量
    long long reploff;      /* replication offset if this is our master */
    // 從服務器最後一次發送 REPLCONF ACK 時的偏移量
    long long repl_ack_off; /* replication ack offset, if this is a slave */
    // 從服務器最後一次發送 REPLCONF ACK 的時間
    long long repl_ack_time;/* replication ack time, if this is a slave */
    // 主服務器的 master run ID
    // 保存在客戶端,用於執行部分重同步
    char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
    // 從服務器的監聽端口號
    int slave_listening_port; /* As configured with: SLAVECONF listening-port */
    // 事務狀態
    multiState mstate;      /* MULTI/EXEC state */
    // 阻塞類型
    int btype;              /* Type of blocking op if REDIS_BLOCKED. */
    // 阻塞狀態
    blockingState bpop;     /* blocking state */
    // 最後被寫入的全局複製偏移量
    long long woff;         /* Last write global replication offset. */
    // 被監視的鍵
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    // 這個字典記錄了客戶端所有訂閱的頻道
    // 鍵爲頻道名字,值爲 NULL
    // 也即是,一個頻道的集合
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    // 鏈表,包含多個 pubsubPattern 結構
    // 記錄了所有訂閱頻道的客戶端的信息
    // 新 pubsubPattern 結構總是被添加到表尾
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
    sds peerid;             /* Cached peer ID. */
    /* Response buffer */
    // 回覆偏移量
    int bufpos;
    // 回覆緩衝區
    char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;

當客戶端發送命令過來時,epoll 返回,readQueryFromClient 被調用,注意回調函數的轉變。沒建立連接之前是關聯 acceptTcpHandler,建立連接之後關聯 readQueryFromClient 函數讀取客戶端的數據。

/*
 * 讀取客戶端的查詢緩衝區內容
 */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    printf("-----------%s--------\n",__FUNCTION__);
    redisClient *c = (redisClient*) privdata;
    int nread, readlen;
    size_t qblen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    // 設置服務器的當前客戶端
    server.current_client = c;
    // 讀入長度(默認爲 16 MB)
    readlen = REDIS_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= REDIS_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
        if (remaining < readlen) readlen = remaining;
    }
    // 獲取查詢緩衝區當前內容的長度
    // 如果讀取出現 short read ,那麼可能會有內容滯留在讀取緩衝區裏面
    // 這些滯留內容也許不能完整構成一個符合協議的命令,
    qblen = sdslen(c->querybuf);
    // 如果有需要,更新緩衝區內容長度的峯值(peak)
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    // 爲查詢緩衝區分配空間
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // 讀入內容到查詢緩存
    nread = read(fd, c->querybuf+qblen, readlen);//接收客戶端發送過來的數據到
    // 讀入出錯
    if (nread == -1) {
        if (errno == EAGAIN) {
            nread = 0;
        } else {
            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    // 遇到 EOF
    } else if (nread == 0) {
        redisLog(REDIS_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }
    if (nread) {
        // 根據內容,更新查詢緩衝區(SDS) free 和 len 屬性
        // 並將 '\0' 正確地放到內容的最後
        sdsIncrLen(c->querybuf,nread);
        // 記錄服務器和客戶端最後一次互動的時間
        c->lastinteraction = server.unixtime;
        // 如果客戶端是 master 的話,更新它的複製偏移量
        if (c->flags & REDIS_MASTER) c->reploff += nread;
    } else {
        // 在 nread == -1 且 errno == EAGAIN 時運行
        server.current_client = NULL;
        return;
    }
    // 查詢緩衝區長度超出服務器最大緩衝區長度
    // 清空緩衝區並釋放客戶端
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        bytes = sdscatrepr(bytes,c->querybuf,64);
        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }
    // 從查詢緩存重讀取內容,創建參數,並執行命令
    // 函數會執行到緩存中的所有內容都被處理完爲止
    processInputBuffer(c);
    server.current_client = NULL;
}

收到客戶端的命令之後就要分析並執行命令,然後被結果返給客戶端。readQueryFromClient->processInputBuffer->processCommand->addReply->prepareClientToWrite。prepareClientToWrite 這個函數就是註冊回覆客戶端的函數 sendReplyToClient。

int prepareClientToWrite(redisClient *c) {
    // LUA 腳本環境所使用的僞客戶端總是可寫的
    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
    // 客戶端是主服務器並且不接受查詢,
    // 那麼它是不可寫的,出錯
    if ((c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
    // 無連接的僞客戶端總是不可寫的
    if (c->fd <= 0) return REDIS_ERR; /* Fake client */
    // 一般情況,爲客戶端套接字安裝寫處理器到事件循環
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}

每一個階段都關聯一個回調函數,當事件觸發後走回調函數。

redis 源碼還是很值得學習的!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/1pblkpR9McGWKAEEaGBk1g