Redis 6 中的多線程是如何實現的!?

大家好,我是飛哥!

Redis 是一個高性能服務端的典範。它通過多路複用 epoll 來管理海量的用戶連接,只使用一個線程來通過事件循環來處理所有用戶請求,就可以達到每秒數萬 QPS 的處理能力。下圖是單線程版本 Redis 工作的核心原理圖(詳情參見:單線程 Redis 如何做到每秒數萬 QPS 的超高處理能力!)。

單線程的 Redis 雖然性能很高,但是卻有兩個問題。一個問題是沒有辦法充分發揮現代 CPU 的多核處理能力,一個實例只能使用一個核的能力。二是如果某個用戶請求的處理過程卡住一段時間,會導致其它所有的請求都會出現超時的情況。所以,在線上的 redis 使用過程時是明確禁止使用 keys * 等長耗時的操作的。

那如何改進呢,思路和方向其實很明確。那就是和其它的主流程序一樣引入多線程,用更多的線程來分擔這些可能耗時的操作。事實上 Redis 也確實這麼幹了,在 6.0 以後的版本里,開始支持了多線程。我們今天就來領略一下 Redis 的多線程是如何實現的。

一、多線程 Redis 服務啓動

首先獲取多線程版本 Redis 的源碼

# git clone https://github.com/redis/redis
# cd redis
# git checkout -b 6.2.0 6.2.0

默認情況下多線程是默認關閉的。如果想要啓動多線程,需要在配置文件中做適當的修改。相關的配置項是 io-threads 和 io-threads-do-reads 兩個。

#vi /usr/local/soft/redis6/conf/redis.conf 
io-threads 4 #啓用的 io 線程數量
io-threads-do-reads yes #讀請求也使用io線程

其中 io-threads 表示要啓動的 io 線程的數量。io-threads-do-reads 表示是否在讀階段也使用 io 線程,默認是隻在寫階段使用 io 線程的。

現在假設我們已經打開了如上兩項多線程配置。帶着這個假設,讓我們進入到 Redis 的 main 入口函數。

//file: src/server.c
int main(int argc, char **argv) {
    ......

    // 1.1 主線程初始化
    initServer();

    // 1.2 啓動 io 線程
    InitServerLast();

    // 進入事件循環
    aeMain(server.el);
}

1.1 主線程初始化

在 initServer 這個函數內,Redis 主線程做了這麼幾件重要的事情。

//file: src/server.c
void initServer() {

    // 1 初始化 server 對象
    server.clients_pending_write = listCreate();
    server.clients_pending_read = listCreate();
    ......

    // 2 初始化回調 events,創建 epoll
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    // 3 綁定監聽服務端口
    listenToPort(server.port,server.ipfd,&server.ipfd_count);

    // 4 註冊 accept 事件處理器
    for (j = 0; j < server.ipfd_count; j++) {
        aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL);
    }
    ...
}

接下來我們分別來看。

初始化 server 對象

在 initServer 的一開頭,先是對 server 的各種成員變量進行初始化。值得注意的是 clients_pending_write 和 clients_pending_read 這兩個成員,它們分別是寫任務隊列和讀任務隊列。將來主線程產生的任務都會放在放在這兩個任務隊列裏。

主線程會根據這兩個任務隊列來進行任務哈希散列,以將任務分配到多個線程中進行處理。

aeCreateEventLoop 處理

我們來看 aeCreateEventLoop 詳細邏輯。它會初始化事件回調 event,並且創建了一個 epoll 對象出來。

//file:src/ae.c
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    eventLoop = zmalloc(sizeof(*eventLoop);

    //將來的各種回調事件就都會存在這裏
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    ......

    aeApiCreate(eventLoop);
    return eventLoop;
}

我們注意一下 eventLoop->events,將來在各種事件註冊的時候都會保存到這個數組裏。

//file:src/ae.h
typedef struct aeEventLoop {
    ......
    aeFileEvent *events; /* Registered events */
}

具體創建 epoll 的過程在 ae_epoll.c 文件下的 aeApiCreate 中。在這裏,真正調用了 epoll_create

//file:src/ae_epoll.c
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    state->epfd = epoll_create(1024); 
    eventLoop->apidata = state;
    return 0;
}

綁定監聽服務端口

我們再來看 Redis 中的 listen 過程,它在 listenToPort 函數中。調用鏈條很長,依次是 listenToPort => anetTcpServer => _anetTcpServer => anetListen。在 anetListen 中,就是簡單的 bind 和 listen 的調用。

//file:src/anet.c
static int anetListen(......) {
    bind(s,sa,len);
    listen(s, backlog);
    ......
}

註冊事件回調函數

前面我們調用 aeCreateEventLoop 創建了 epoll,調用 listenToPort 進行了服務端口的 bind 和 listen。接着就調用的 aeCreateFileEvent 就是來註冊一個 accept 事件處理器。

我們來看 aeCreateFileEvent 具體代碼。

//file: src/ae.c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    // 取出一個文件事件結構
    aeFileEvent *fe = &eventLoop->events[fd];

    // 監聽指定 fd 的指定事件
    aeApiAddEvent(eventLoop, fd, mask);

    // 設置文件事件類型,以及事件的處理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    // 私有數據
    fe->clientData = clientData;
}

函數 aeCreateFileEvent 一開始,從 eventLoop->events 獲取了一個 aeFileEvent 對象。

接下來調用 aeApiAddEvent。這個函數其實就是對 epoll_ctl 的一個封裝。主要就是實際執行 epoll_ctl EPOLL_CTL_ADD。

//file:src/ae_epoll.c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    // add or mod
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    ......

    // epoll_ctl 添加事件
    epoll_ctl(state->epfd,op,fd,&ee);
    return 0;
}

每一個 eventLoop->events 元素都指向一個 aeFileEvent 對象。在這個對象上,設置了三個關鍵東西

將來 當 epoll_wait 發現某個 fd 上有事件發生的時候,這樣 redis 首先根據 fd 到 eventLoop->events 中查找 aeFileEvent 對象,然後再看 rfileProc、wfileProc 就可以找到讀、寫回調處理函數。

回頭看 initServer 調用 aeCreateFileEvent 時傳參來看。

//file: src/server.c
void initServer() {
    ......

    for (j = 0; j < server.ipfd_count; j++) {
        aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL);
    }
}

listen fd 對應的讀回調函數 rfileProc 事實上就被設置成了 acceptTcpHandler,寫回調沒有設置,私有數據 client_data 也爲 null。

1.2 io 線程啓動

在主線程啓動以後,會調用 InitServerLast => initThreadedIO 來創建多個 io 線程。

將來這些 IO 線程會配合主線程一起共同來處理所有的 read 和 write 任務。

我們來看 InitServerLast 創建 IO 線程的過程。

//file:src/server.c
void InitServerLast() {
    initThreadedIO();
    ......
}
//file:src/networking.c
void initThreadedIO(void) {
    //如果沒開啓多 io 線程配置就不創建了
    if (server.io_threads_num == 1) return;

    //開始 io 線程的創建
    for (int i = 0; i < server.io_threads_num; i++) {
        pthread_t tid;
        pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i)
        io_threads[i] = tid;
    }
}

在 initThreadedIO 中調用 pthread_create 庫函數創建線程,並且註冊線程回調函數 IOThreadMain。

//file:src/networking.c
void *IOThreadMain(void *myid) {
    long id = (unsigned long)myid;

    while(1) {
        //循環等待任務
        for (int j = 0; j < 1000000; j++) {
            if (getIOPendingCount(id) != 0) break;
        }

        //允許主線程來關閉自己
        ......

        //遍歷當前線程等待隊列裏的請求 client
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
    }
}

是將當前線程等待隊列 io_threads_list[id] 裏所有的請求 client,依次取出處理。其中讀操作通過 readQueryFromClient 處理, 寫操作通過 writeToClient 處理。其中 io_threads_list[id] 中的任務是主線程分配過來的,後面我們將會看到。

二、主線程事件循環

接着我們進入到 Redis 最重要的 aeMain,這個函數就是一個死循環(Redis 不退出的話),不停地執行 aeProcessEvents 函數。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

其中 aeProcessEvents 就是所謂的事件分發器。它通過調用 epoll_wait 來發現所發生的各種事件,然後調用事先註冊好的處理函數進行處理。

接着看 aeProcessEvents 函數。

//file:src/ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    // 2.3 事件循環處理3:epoll_wait 前進行讀寫任務隊列處理
    if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

    //epoll_wait發現事件並進行處理
    numevents = aeApiPoll(eventLoop, tvp);

    // 從已就緒數組中獲取事件
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

    //如果是讀事件,並且有讀回調函數
    //2.1 如果是 listen socket 讀事件,則處理新連接請求
    //2.2 如果是客戶連接socket 讀事件,處理客戶連接上的讀請求
    fe->rfileProc()

    //如果是寫事件,並且有寫回調函數
    fe->wfileProc()
    ......
}

其中 aeApiPoll 就是對 epoll_wait 的一個封裝而已。

//file: src/ae_epoll.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    // 等待事件
    aeApiState *state = eventLoop->apidata;
    epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    ...
}

aeProcessEvents 就是調用 epoll_wait 來發現事件。當發現有某個 fd 上事件發生以後,則調爲其事先註冊的事件處理器函數 rfileProc 和 wfileProc。

2.1 事件循環處理 1:新連接到達

在 1.1 節中我們看到,主線程初始化的時候,將 listen socket 上的讀事件處理函數註冊成了 acceptTcpHandler。也就是說如果有新連接到達的時候,acceptTcpHandler 將會被執行到。

在這個函數內,主要完成如下幾件事情。

接下來讓我們進入 acceptTcpHandler 源碼。

//file:src/networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    ......
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip)&cport);
    acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}

其中 netTcpAccept 調用 accept 系統調用獲取連接,就不展開了。我們看 acceptCommonHandler。

//file: src/networking.c
static void acceptCommonHandler(int fd, int flags) {
    // 創建客戶端
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {
    }
}

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    // 爲用戶連接註冊讀事件處理器
    if (conn) {
        ...
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }

    selectDb(c,0);
    c->id = client_id;
    c->resp = 2;
    c->conn = conn;
    ......
}

在上面的代碼中,我們重點關注 connSetReadHandler(conn, readQueryFromClient), 這一行是將這個新連接的讀事件處理函數設置成了 readQueryFromClient。

2.2 事件循環處理 2:用戶命令請求到達

在上面我們看到了, Redis 把用戶連接上的讀請求處理函數設置成了 readQueryFromClient,這意味着當用戶連接上有命令發送過來的時候,會進入 readQueryFromClient 開始執行。

在多線程版本的 readQueryFromClient 中,處理邏輯非常簡單,僅僅只是將發生讀時間的 client 放到了任務隊列裏而已。

來詳細看 readQueryFromClient 代碼。

//file:src/networking.c
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);

    //如果啓動 threaded I/O 的話,直接入隊
    if (postponeClientRead(c)) return;

    //處理用戶連接讀請求
    ......
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    processInputBuffer(c);
}

在 postponeClientRead 中判斷,是不是開啓了多 io 線程,如果開啓了的話,那就將有請求數據到達的 client 直接放到讀任務隊列(server.clients_pending_read)中就算是完事。我們看下 postponeClientRead。

//file:src/networking.c
int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

listAddNodeHead 就是把這個 client 對象添加到 server.clients_pending_read 而已。

2.3 事件循環處理 3:epoll_wait 前進行任務處理

在 aeProcessEvents 中假如 aeApiPoll(epoll_wait) 中的事件都處理完了以後,則會進入下一次的循環再次進入 aeProcessEvents。

而這一次中 beforesleep 將會處理前面讀事件處理函數添加的讀任務隊列了。

//file:src/ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    // 參見 2.4 事件循環處理3:epoll_wait 前進行任務處理
    if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

    //epoll_wait發現事件並進行處理
    numevents = aeApiPoll(eventLoop, tvp);
    ......
}

在 beforeSleep 裏會依次處理兩個任務隊列。先處理讀任務隊列,解析其中的請求,並處理之。然後將處理結果寫到緩存中,同時寫到寫任務隊列中。緊接着 beforeSleep 會進入寫任務隊列處理,會將處理結果寫到 socket 裏,進行真正的數據發送。

我們來看 beforeSleep 的代碼,這個函數中最重要的兩個調用是 handleClientsWithPendingReadsUsingThreads(處理讀任務隊列),handleClientsWithPendingWritesUsingThreads(處理寫任務隊列)

//file:src/server.c
void beforeSleep(struct aeEventLoop *eventLoop) {
    //處理讀任務隊列
    handleClientsWithPendingReadsUsingThreads();
    //處理寫任務隊列
    handleClientsWithPendingWritesUsingThreads();
    ......
}

值得注意的是,如果開啓了多 io 線程的話,handleClientsWithPendingReadsUsingThreads 和 handleClientsWithPendingWritesUsingThreads 中將會是主線程、io 線程一起配合來處理的。所以我們單獨分兩個小節來闡述。

三、主線程 && io 線程處理讀請求

在 handleClientsWithPendingReadsUsingThreads 中,主線程會遍歷讀任務隊列 server.clients_pending_read,把其中的請求分配到每個 io 線程的處理隊列 io_threads_list[target_id] 中。然後通知各個 io 線程開始處理。

3.1 主線程分配任務

我們來看 handleClientsWithPendingReadsUsingThreads 詳細代碼。

//file:src/networking.c
//當開啓了 reading + parsing 多線程 I/O 
//read handler 僅僅只是把 clients 推到讀隊列裏
//而這個函數開始處理該任務隊列
int handleClientsWithPendingReadsUsingThreads(void) {

    //訪問讀任務隊列 server.clients_pending_read
    listRewind(server.clients_pending_read,&li);

    //把每一個任務取出來
    //添加到指定線程的任務隊列裏 io_threads_list[target_id]
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    //啓動Worker線程,處理讀請求
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    //主線程處理 0 號任務隊列
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        //需要先幹掉 CLIENT_PENDING_READ 標誌
        //否則 readQueryFromClient 並不處理,而是入隊
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }

    //主線程等待其它線程處理完畢
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    //再跑一遍任務隊列,目的是處理輸入
    while(listLength(server.clients_pending_read)) {
        ......
        processInputBuffer(c);
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }
}

在主線程中將任務分別放到了 io_threads_list 的第 0 到第 N 個元素裏。並對 1 : N 號線程通過 setIOPendingCount 發消息,告訴他們起來處理。這時候 io 線程將會在 IOThreadMain 中收到消息並開始處理讀任務。

//file:src/networking.c
void *IOThreadMain(void *myid) {
    while(1) {
        //遍歷當前線程等待隊列裏的請求 client
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
    }
}

在 io 線程中,從自己的 io_threads_list[id] 中遍歷獲取待處理的 client。如果發現是讀請求處理,則進入 readQueryFromClient 開始處理特定的 client。

而主線程在分配完 1 :N 任務隊列讓其它 io 線程處理後,自己則開始處理第 0 號任務池。同樣是會進入到 readQueryFromClient 中來執行。

//file:src/networking.c
int handleClientsWithPendingReadsUsingThreads(void) {
    ......
    //主線程處理 0 號任務隊列
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        //需要先幹掉 CLIENT_PENDING_READ 標誌
        //否則 readQueryFromClient 並不處理,而是入隊
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    ......
}

所以無論是主線程還是 io 線程,處理客戶端的讀事件都是會進入 readQueryFromClient。我們來看其源碼。

3.2 讀請求處理

//file:src/networking.c
void readQueryFromClient(connection *conn) {

    //讀取請求
    nread = connRead(c->conn, c->querybuf+qblen, readlen);

    //處理請求
    processInputBuffer(c);
}

在 connRead 中就是調用 read 將 socket 中的命令讀取出來,就不展開看了。接着在 processInputBuffer 中將輸入緩衝區中的數據解析成對應的命令。解析完命令後真正開始處理它。

//file:src/networking.c
void processInputBuffer(client *c) {
    while(c->qb_pos < sdslen(c->querybuf)) {
        //解析命令
        ......
        //真正開始處理 command
        processCommandAndResetClient(c);
    }
}

函數 processCommandAndResetClient 會調用 processCommand,查詢命令並開始執行。執行的核心方法是 call 函數,我們直接看它。

//file:src/server.c
void call(client *c, int flags) {

    // 查找處理命令,
    struct redisCommand *real_cmd = c->cmd;

    // 調用命令處理函數
    c->cmd->proc(c);

    ......
}

在 server.c 中定義了每一個命令對應的處理函數

//file:src/server.c
struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    ......

    {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    ......
}

對於 get 命令來說,其對應的命令處理函數就是 getCommand。也就是說當處理 GET 命令執行到 c->cmd->proc 的時候會進入到 getCommand 函數中來。

//file: src/t_string.c
void getCommand(client *c) {
    getGenericCommand(c);
}
int getGenericCommand(client *c) {
    robj *o;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
        return C_OK;
    ...
    addReplyBulk(c,o);
    return C_OK;
}

getGenericCommand 方法會調用 lookupKeyReadOrReply 來從內存中查找對應的 key 值。如果找不到,則直接返回 C_OK;如果找到了,調用 addReplyBulk 方法將值添加到輸出緩衝區中。

//file: src/networking.c
void addReplyBulk(client *c, robj *obj) {
    addReplyBulkLen(c,obj);
    addReply(c,obj);
    addReply(c,shared.crlf);
}

3.3 寫處理結果到發送緩存區

其主體是調用 addReply 來設置回覆數據。在 addReply 方法中做了兩件事情:

//file:src/networking.c
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else {
        ......        
    }
}

先來看 prepareClientToWrite 的詳細實現,

//file: src/networking.c
int prepareClientToWrite(client *c) {
    ......

    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
        clientInstallWriteHandler(c);
}

//file:src/networking.c
void clientInstallWriteHandler(client *c) {
    c->flags |= CLIENT_PENDING_WRITE;
    listAddNodeHead(server.clients_pending_write,c);
}

其中 server.clients_pending_write 就是我們說的任務隊列,隊列中的每一個元素都是有待寫返回數據的 client 對象。在 prepareClientToWrite 函數中,把 client 添加到任務隊列 server.clients_pending_write 裏就算完事。

接下再來 _addReplyToBuffer,該方法是向固定緩存中寫,如果寫不下的話就繼續調用 _addReplyStringToList 往鏈表裏寫。簡單起見,我們只看 _addReplyToBuffer 的代碼。

//file:src/networking.c
int _addReplyToBuffer(client *c, const char *s, size_t len) {
    ......
    // 拷貝到 client 對象的 Response buffer 中
    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return C_OK;
}

要注意的是,本節的讀請求處理過程是主線程和 io 線程在並行執行的。主線程在處理完後會等待其它的 io 線程處理。在所有的讀請求都處理完後,主線程 beforeSleep 中對 handleClientsWithPendingReadsUsingThreads 的調用就結束了。

四、主線程 && io 線程配合處理寫請求

當所有的讀請求處理完後,handleClientsWithPendingReadsUsingThreads 會退出。主線程會緊接着進入 handleClientsWithPendingWritesUsingThreads 中來處理。

//file:src/server.c
void beforeSleep(struct aeEventLoop *eventLoop) {
    //處理讀任務隊列
    handleClientsWithPendingReadsUsingThreads();
    //處理寫任務隊列
    handleClientsWithPendingWritesUsingThreads();
    ......
}

4.1 主線程分配任務

//file:src/networking.c
int handleClientsWithPendingWritesUsingThreads(void) {
    //沒有開啓多線程的話,仍然是主線程自己寫
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    ......

    //獲取待寫任務
    int processed = listLength(server.clients_pending_write);

    //在N個任務列表中分配該任務
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        //hash的方式進行分配
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    //告訴對應的線程該開始幹活了
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    //主線程自己也會處理一些
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    //循環等待其它線程結束處理
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    ......
}

在 io 線程中收到消息後,開始遍歷自己的任務隊列 io_threads_list[id],並將其中的 client 挨個取出來開始處理。

//file:src/networking.c
void *IOThreadMain(void *myid) {
    while(1) {
        //遍歷當前線程等待隊列裏的請求 client
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } 
        }
        listEmpty(io_threads_list[id]);
    }
}

4.2 寫請求處理

由於這次任務隊列裏都是寫請求,所以 io 線程會進入 writeToClient。而主線程在分配完任務以後,自己開始處理起了 io_threads_list[0],並也進入到 writeToClient。

//file:src/networking.c
int writeToClient(int fd, client *c, int handler_installed) {
    while(clientHasPendingReplies(c)) {
        // 先發送固定緩衝區
        if (c->bufpos > 0) {
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            ......

        // 再發送回復鏈表中數據
        } else {
            o = listNodeValue(listFirst(c->reply));
            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
            ......
        }
    }
}

writeToClient 中的主要邏輯就是調用 write 系統調用讓內核幫其把數據發送出去即可。由於每個命令的處理結果大小是不固定的。所以 Redis 採用的做法用固定的 buf + 可變鏈表來儲存結果字符串。這裏自然發送的時候就需要分別對固定緩存區和鏈表來進行發送了。

當所有的寫請求也處理完後,beforeSleep 就退出了。主線程將會再次調用 epoll_wait 來發現請求,進入下一輪的用戶請求處理。

五、總結

//file: src/server.c
int main(int argc, char **argv) {
    ......

    // 1.1 主線程初始化
    initServer();

    // 1.2 啓動 io 線程
    InitServerLast();

    // 進入事件循環
    aeMain(server.el);
}

在 initServer 這個函數內,Redis 做了這麼三件重要的事情。

在 initThreadedIO 中調用 pthread_create 庫函數創建線程,並且註冊線程回調函數 IOThreadMain。在 IOThreadMain 中等待其隊列 io_threads_list[id] 產生請求,當有請求到達的時候取出 client,依次處理。其中讀操作通過 readQueryFromClient 處理, 寫操作通過 writeToClient 處理。

主線程在 aeMain 函數中,是一個無休止的循環,它是 Redis 中最重要的部分。它先是調用事件分發器發現事件。如果有新連接請求到達的時候,執行 accept 接收新連接,併爲其註冊事件處理函數。

當用戶連接上有命令請求到達的時候,主線程在 read 處理函數中將其添加到讀發送隊列中。然後接着在 beforeSleep 中開啓對讀任務隊列和寫任務隊列的處理。總體工作過程如下圖所示。

在這個處理過程中,對讀任務隊列和寫任務隊列的處理都是多線程並行進行的(前提是開篇我們開啓了多 IO 線程並且也併發處理讀)。

當讀任務隊列和寫任務隊列的都處理完的時候,主線程再一次調用 epoll_wait 去發現新的待處理事件,如此往復循環進行處理。

至此,多線程版本的 Redis 的工作原理就介紹完了。坦白講,我覺得這種多線程模型實現的並不足夠的好。

原因是主線程是在處理讀、寫任務隊列的時候還要等待其它的 io 線程處理完才能進入下一步。假設這時有 10 個用戶請求到達,其中 9 個處理耗時需要 1 ms,而另外一個命令需要 1 s。則這時主線程仍然會等待這個 io 線程處理 1s 結束後才能進入後面的處理。整個 Redis 服務還是被一個耗時的命令給 block 住了。

我倒是希望我的理解哪裏有問題。因爲這種方式真的是沒能很好地併發起來。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/MU8cxoKS3rU9mN_CY3WxWQ