Redis 6 中的多線程是如何實現的!?
大家好,我是飛哥!
Redis 是一個高性能服務端的典範。它通過多路複用 epoll 來管理海量的用戶連接,只使用一個線程來通過事件循環來處理所有用戶請求,就可以達到每秒數萬 QPS 的處理能力。下圖是單線程版本 Redis 工作的核心原理圖(詳情參見:單線程 Redis 如何做到每秒數萬 QPS 的超高處理能力!)。
單線程的 Redis 雖然性能很高,但是卻有兩個問題。一個問題是沒有辦法充分發揮現代 CPU 的多核處理能力,一個實例只能使用一個核的能力。二是如果某個用戶請求的處理過程卡住一段時間,會導致其它所有的請求都會出現超時的情況。所以,在線上的 redis 使用過程時是明確禁止使用 keys * 等長耗時的操作的。
那如何改進呢,思路和方向其實很明確。那就是和其它的主流程序一樣引入多線程,用更多的線程來分擔這些可能耗時的操作。事實上 Redis 也確實這麼幹了,在 6.0 以後的版本里,開始支持了多線程。我們今天就來領略一下 Redis 的多線程是如何實現的。
一、多線程 Redis 服務啓動
首先獲取多線程版本 Redis 的源碼
# git clone https://github.com/redis/redis
# cd redis
# git checkout -b 6.2.0 6.2.0
默認情況下多線程是默認關閉的。如果想要啓動多線程,需要在配置文件中做適當的修改。相關的配置項是 io-threads 和 io-threads-do-reads 兩個。
#vi /usr/local/soft/redis6/conf/redis.conf
io-threads 4 #啓用的 io 線程數量
io-threads-do-reads yes #讀請求也使用io線程
其中 io-threads 表示要啓動的 io 線程的數量。io-threads-do-reads 表示是否在讀階段也使用 io 線程,默認是隻在寫階段使用 io 線程的。
現在假設我們已經打開了如上兩項多線程配置。帶着這個假設,讓我們進入到 Redis 的 main 入口函數。
//file: src/server.c
int main(int argc, char **argv) {
......
// 1.1 主線程初始化
initServer();
// 1.2 啓動 io 線程
InitServerLast();
// 進入事件循環
aeMain(server.el);
}
1.1 主線程初始化
在 initServer 這個函數內,Redis 主線程做了這麼幾件重要的事情。
-
初始化讀任務隊列、寫任務隊列
-
創建一個 epoll 對象
-
對配置的監聽端口進行 listen
-
把 listen socket 讓 epoll 給管理起來
//file: src/server.c
void initServer() {
// 1 初始化 server 對象
server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
......
// 2 初始化回調 events,創建 epoll
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
// 3 綁定監聽服務端口
listenToPort(server.port,server.ipfd,&server.ipfd_count);
// 4 註冊 accept 事件處理器
for (j = 0; j < server.ipfd_count; j++) {
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL);
}
...
}
接下來我們分別來看。
初始化 server 對象
在 initServer 的一開頭,先是對 server 的各種成員變量進行初始化。值得注意的是 clients_pending_write 和 clients_pending_read 這兩個成員,它們分別是寫任務隊列和讀任務隊列。將來主線程產生的任務都會放在放在這兩個任務隊列裏。
主線程會根據這兩個任務隊列來進行任務哈希散列,以將任務分配到多個線程中進行處理。
aeCreateEventLoop 處理
我們來看 aeCreateEventLoop 詳細邏輯。它會初始化事件回調 event,並且創建了一個 epoll 對象出來。
//file:src/ae.c
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
eventLoop = zmalloc(sizeof(*eventLoop);
//將來的各種回調事件就都會存在這裏
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
......
aeApiCreate(eventLoop);
return eventLoop;
}
我們注意一下 eventLoop->events,將來在各種事件註冊的時候都會保存到這個數組裏。
//file:src/ae.h
typedef struct aeEventLoop {
......
aeFileEvent *events; /* Registered events */
}
具體創建 epoll 的過程在 ae_epoll.c 文件下的 aeApiCreate 中。在這裏,真正調用了 epoll_create
//file:src/ae_epoll.c
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
state->epfd = epoll_create(1024);
eventLoop->apidata = state;
return 0;
}
綁定監聽服務端口
我們再來看 Redis 中的 listen 過程,它在 listenToPort 函數中。調用鏈條很長,依次是 listenToPort => anetTcpServer => _anetTcpServer => anetListen。在 anetListen 中,就是簡單的 bind 和 listen 的調用。
//file:src/anet.c
static int anetListen(......) {
bind(s,sa,len);
listen(s, backlog);
......
}
註冊事件回調函數
前面我們調用 aeCreateEventLoop 創建了 epoll,調用 listenToPort 進行了服務端口的 bind 和 listen。接着就調用的 aeCreateFileEvent 就是來註冊一個 accept 事件處理器。
我們來看 aeCreateFileEvent 具體代碼。
//file: src/ae.c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 取出一個文件事件結構
aeFileEvent *fe = &eventLoop->events[fd];
// 監聽指定 fd 的指定事件
aeApiAddEvent(eventLoop, fd, mask);
// 設置文件事件類型,以及事件的處理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
// 私有數據
fe->clientData = clientData;
}
函數 aeCreateFileEvent 一開始,從 eventLoop->events 獲取了一個 aeFileEvent 對象。
接下來調用 aeApiAddEvent。這個函數其實就是對 epoll_ctl 的一個封裝。主要就是實際執行 epoll_ctl EPOLL_CTL_ADD。
//file:src/ae_epoll.c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
// add or mod
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
......
// epoll_ctl 添加事件
epoll_ctl(state->epfd,op,fd,&ee);
return 0;
}
每一個 eventLoop->events 元素都指向一個 aeFileEvent 對象。在這個對象上,設置了三個關鍵東西
-
rfileProc:讀事件回調
-
wfileProc:寫事件回調
-
clientData:一些額外的擴展數據
將來 當 epoll_wait 發現某個 fd 上有事件發生的時候,這樣 redis 首先根據 fd 到 eventLoop->events 中查找 aeFileEvent 對象,然後再看 rfileProc、wfileProc 就可以找到讀、寫回調處理函數。
回頭看 initServer 調用 aeCreateFileEvent 時傳參來看。
//file: src/server.c
void initServer() {
......
for (j = 0; j < server.ipfd_count; j++) {
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL);
}
}
listen fd 對應的讀回調函數 rfileProc 事實上就被設置成了 acceptTcpHandler,寫回調沒有設置,私有數據 client_data 也爲 null。
1.2 io 線程啓動
在主線程啓動以後,會調用 InitServerLast => initThreadedIO 來創建多個 io 線程。
將來這些 IO 線程會配合主線程一起共同來處理所有的 read 和 write 任務。
我們來看 InitServerLast 創建 IO 線程的過程。
//file:src/server.c
void InitServerLast() {
initThreadedIO();
......
}
//file:src/networking.c
void initThreadedIO(void) {
//如果沒開啓多 io 線程配置就不創建了
if (server.io_threads_num == 1) return;
//開始 io 線程的創建
for (int i = 0; i < server.io_threads_num; i++) {
pthread_t tid;
pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i)
io_threads[i] = tid;
}
}
在 initThreadedIO 中調用 pthread_create 庫函數創建線程,並且註冊線程回調函數 IOThreadMain。
//file:src/networking.c
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
while(1) {
//循環等待任務
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
//允許主線程來關閉自己
......
//遍歷當前線程等待隊列裏的請求 client
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
}
}
是將當前線程等待隊列 io_threads_list[id] 裏所有的請求 client,依次取出處理。其中讀操作通過 readQueryFromClient 處理, 寫操作通過 writeToClient 處理。其中 io_threads_list[id] 中的任務是主線程分配過來的,後面我們將會看到。
二、主線程事件循環
接着我們進入到 Redis 最重要的 aeMain,這個函數就是一個死循環(Redis 不退出的話),不停地執行 aeProcessEvents 函數。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
其中 aeProcessEvents 就是所謂的事件分發器。它通過調用 epoll_wait 來發現所發生的各種事件,然後調用事先註冊好的處理函數進行處理。
接着看 aeProcessEvents 函數。
//file:src/ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 2.3 事件循環處理3:epoll_wait 前進行讀寫任務隊列處理
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
//epoll_wait發現事件並進行處理
numevents = aeApiPoll(eventLoop, tvp);
// 從已就緒數組中獲取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
//如果是讀事件,並且有讀回調函數
//2.1 如果是 listen socket 讀事件,則處理新連接請求
//2.2 如果是客戶連接socket 讀事件,處理客戶連接上的讀請求
fe->rfileProc()
//如果是寫事件,並且有寫回調函數
fe->wfileProc()
......
}
其中 aeApiPoll 就是對 epoll_wait 的一個封裝而已。
//file: src/ae_epoll.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
// 等待事件
aeApiState *state = eventLoop->apidata;
epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
...
}
aeProcessEvents 就是調用 epoll_wait 來發現事件。當發現有某個 fd 上事件發生以後,則調爲其事先註冊的事件處理器函數 rfileProc 和 wfileProc。
2.1 事件循環處理 1:新連接到達
在 1.1 節中我們看到,主線程初始化的時候,將 listen socket 上的讀事件處理函數註冊成了 acceptTcpHandler。也就是說如果有新連接到達的時候,acceptTcpHandler 將會被執行到。
在這個函數內,主要完成如下幾件事情。
-
調用 accept 接收連接
-
創建一個 redisClient 對象
-
添加到 epoll
-
註冊讀事件處理函數
接下來讓我們進入 acceptTcpHandler 源碼。
//file:src/networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
......
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
其中 netTcpAccept 調用 accept 系統調用獲取連接,就不展開了。我們看 acceptCommonHandler。
//file: src/networking.c
static void acceptCommonHandler(int fd, int flags) {
// 創建客戶端
redisClient *c;
if ((c = createClient(fd)) == NULL) {
}
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
// 爲用戶連接註冊讀事件處理器
if (conn) {
...
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
selectDb(c,0);
c->id = client_id;
c->resp = 2;
c->conn = conn;
......
}
在上面的代碼中,我們重點關注 connSetReadHandler(conn, readQueryFromClient)
, 這一行是將這個新連接的讀事件處理函數設置成了 readQueryFromClient。
2.2 事件循環處理 2:用戶命令請求到達
在上面我們看到了, Redis 把用戶連接上的讀請求處理函數設置成了 readQueryFromClient,這意味着當用戶連接上有命令發送過來的時候,會進入 readQueryFromClient 開始執行。
在多線程版本的 readQueryFromClient 中,處理邏輯非常簡單,僅僅只是將發生讀時間的 client 放到了任務隊列裏而已。
來詳細看 readQueryFromClient 代碼。
//file:src/networking.c
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
//如果啓動 threaded I/O 的話,直接入隊
if (postponeClientRead(c)) return;
//處理用戶連接讀請求
......
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = connRead(c->conn, c->querybuf+qblen, readlen);
processInputBuffer(c);
}
在 postponeClientRead 中判斷,是不是開啓了多 io 線程,如果開啓了的話,那就將有請求數據到達的 client 直接放到讀任務隊列(server.clients_pending_read)中就算是完事。我們看下 postponeClientRead。
//file:src/networking.c
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
listAddNodeHead 就是把這個 client 對象添加到 server.clients_pending_read 而已。
2.3 事件循環處理 3:epoll_wait 前進行任務處理
在 aeProcessEvents 中假如 aeApiPoll(epoll_wait) 中的事件都處理完了以後,則會進入下一次的循環再次進入 aeProcessEvents。
而這一次中 beforesleep 將會處理前面讀事件處理函數添加的讀任務隊列了。
//file:src/ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 參見 2.4 事件循環處理3:epoll_wait 前進行任務處理
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
//epoll_wait發現事件並進行處理
numevents = aeApiPoll(eventLoop, tvp);
......
}
在 beforeSleep 裏會依次處理兩個任務隊列。先處理讀任務隊列,解析其中的請求,並處理之。然後將處理結果寫到緩存中,同時寫到寫任務隊列中。緊接着 beforeSleep 會進入寫任務隊列處理,會將處理結果寫到 socket 裏,進行真正的數據發送。
我們來看 beforeSleep 的代碼,這個函數中最重要的兩個調用是 handleClientsWithPendingReadsUsingThreads(處理讀任務隊列),handleClientsWithPendingWritesUsingThreads(處理寫任務隊列)
//file:src/server.c
void beforeSleep(struct aeEventLoop *eventLoop) {
//處理讀任務隊列
handleClientsWithPendingReadsUsingThreads();
//處理寫任務隊列
handleClientsWithPendingWritesUsingThreads();
......
}
值得注意的是,如果開啓了多 io 線程的話,handleClientsWithPendingReadsUsingThreads 和 handleClientsWithPendingWritesUsingThreads 中將會是主線程、io 線程一起配合來處理的。所以我們單獨分兩個小節來闡述。
三、主線程 && io 線程處理讀請求
在 handleClientsWithPendingReadsUsingThreads 中,主線程會遍歷讀任務隊列 server.clients_pending_read,把其中的請求分配到每個 io 線程的處理隊列 io_threads_list[target_id] 中。然後通知各個 io 線程開始處理。
3.1 主線程分配任務
我們來看 handleClientsWithPendingReadsUsingThreads 詳細代碼。
//file:src/networking.c
//當開啓了 reading + parsing 多線程 I/O
//read handler 僅僅只是把 clients 推到讀隊列裏
//而這個函數開始處理該任務隊列
int handleClientsWithPendingReadsUsingThreads(void) {
//訪問讀任務隊列 server.clients_pending_read
listRewind(server.clients_pending_read,&li);
//把每一個任務取出來
//添加到指定線程的任務隊列裏 io_threads_list[target_id]
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
//啓動Worker線程,處理讀請求
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
//主線程處理 0 號任務隊列
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
//需要先幹掉 CLIENT_PENDING_READ 標誌
//否則 readQueryFromClient 並不處理,而是入隊
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
//主線程等待其它線程處理完畢
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
//再跑一遍任務隊列,目的是處理輸入
while(listLength(server.clients_pending_read)) {
......
processInputBuffer(c);
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
clientInstallWriteHandler(c);
}
}
在主線程中將任務分別放到了 io_threads_list 的第 0 到第 N 個元素裏。並對 1 : N 號線程通過 setIOPendingCount 發消息,告訴他們起來處理。這時候 io 線程將會在 IOThreadMain 中收到消息並開始處理讀任務。
//file:src/networking.c
void *IOThreadMain(void *myid) {
while(1) {
//遍歷當前線程等待隊列裏的請求 client
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
}
}
在 io 線程中,從自己的 io_threads_list[id] 中遍歷獲取待處理的 client。如果發現是讀請求處理,則進入 readQueryFromClient 開始處理特定的 client。
而主線程在分配完 1 :N 任務隊列讓其它 io 線程處理後,自己則開始處理第 0 號任務池。同樣是會進入到 readQueryFromClient 中來執行。
//file:src/networking.c
int handleClientsWithPendingReadsUsingThreads(void) {
......
//主線程處理 0 號任務隊列
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
//需要先幹掉 CLIENT_PENDING_READ 標誌
//否則 readQueryFromClient 並不處理,而是入隊
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
......
}
所以無論是主線程還是 io 線程,處理客戶端的讀事件都是會進入 readQueryFromClient。我們來看其源碼。
3.2 讀請求處理
//file:src/networking.c
void readQueryFromClient(connection *conn) {
//讀取請求
nread = connRead(c->conn, c->querybuf+qblen, readlen);
//處理請求
processInputBuffer(c);
}
在 connRead 中就是調用 read 將 socket 中的命令讀取出來,就不展開看了。接着在 processInputBuffer 中將輸入緩衝區中的數據解析成對應的命令。解析完命令後真正開始處理它。
//file:src/networking.c
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
//解析命令
......
//真正開始處理 command
processCommandAndResetClient(c);
}
}
函數 processCommandAndResetClient 會調用 processCommand,查詢命令並開始執行。執行的核心方法是 call 函數,我們直接看它。
//file:src/server.c
void call(client *c, int flags) {
// 查找處理命令,
struct redisCommand *real_cmd = c->cmd;
// 調用命令處理函數
c->cmd->proc(c);
......
}
在 server.c 中定義了每一個命令對應的處理函數
//file:src/server.c
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
......
{"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
{"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
......
}
對於 get 命令來說,其對應的命令處理函數就是 getCommand。也就是說當處理 GET 命令執行到 c->cmd->proc
的時候會進入到 getCommand 函數中來。
//file: src/t_string.c
void getCommand(client *c) {
getGenericCommand(c);
}
int getGenericCommand(client *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
return C_OK;
...
addReplyBulk(c,o);
return C_OK;
}
getGenericCommand 方法會調用 lookupKeyReadOrReply 來從內存中查找對應的 key 值。如果找不到,則直接返回 C_OK;如果找到了,調用 addReplyBulk 方法將值添加到輸出緩衝區中。
//file: src/networking.c
void addReplyBulk(client *c, robj *obj) {
addReplyBulkLen(c,obj);
addReply(c,obj);
addReply(c,shared.crlf);
}
3.3 寫處理結果到發送緩存區
其主體是調用 addReply 來設置回覆數據。在 addReply 方法中做了兩件事情:
-
prepareClientToWrite 判斷是否需要返回數據,並且將當前 client 添加到等待寫返回數據隊列中。
-
調用 _addReplyToBuffer 和 _addReplyObjectToList 方法將返回值寫入到輸出緩衝區中,等待寫入 socekt
//file:src/networking.c
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
} else {
......
}
}
先來看 prepareClientToWrite 的詳細實現,
//file: src/networking.c
int prepareClientToWrite(client *c) {
......
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
clientInstallWriteHandler(c);
}
//file:src/networking.c
void clientInstallWriteHandler(client *c) {
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
其中 server.clients_pending_write 就是我們說的任務隊列,隊列中的每一個元素都是有待寫返回數據的 client 對象。在 prepareClientToWrite 函數中,把 client 添加到任務隊列 server.clients_pending_write 裏就算完事。
接下再來 _addReplyToBuffer,該方法是向固定緩存中寫,如果寫不下的話就繼續調用 _addReplyStringToList 往鏈表裏寫。簡單起見,我們只看 _addReplyToBuffer 的代碼。
//file:src/networking.c
int _addReplyToBuffer(client *c, const char *s, size_t len) {
......
// 拷貝到 client 對象的 Response buffer 中
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return C_OK;
}
要注意的是,本節的讀請求處理過程是主線程和 io 線程在並行執行的。主線程在處理完後會等待其它的 io 線程處理。在所有的讀請求都處理完後,主線程 beforeSleep 中對 handleClientsWithPendingReadsUsingThreads 的調用就結束了。
四、主線程 && io 線程配合處理寫請求
當所有的讀請求處理完後,handleClientsWithPendingReadsUsingThreads 會退出。主線程會緊接着進入 handleClientsWithPendingWritesUsingThreads 中來處理。
//file:src/server.c
void beforeSleep(struct aeEventLoop *eventLoop) {
//處理讀任務隊列
handleClientsWithPendingReadsUsingThreads();
//處理寫任務隊列
handleClientsWithPendingWritesUsingThreads();
......
}
4.1 主線程分配任務
//file:src/networking.c
int handleClientsWithPendingWritesUsingThreads(void) {
//沒有開啓多線程的話,仍然是主線程自己寫
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
......
//獲取待寫任務
int processed = listLength(server.clients_pending_write);
//在N個任務列表中分配該任務
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
//hash的方式進行分配
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
//告訴對應的線程該開始幹活了
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
//主線程自己也會處理一些
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
//循環等待其它線程結束處理
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
......
}
在 io 線程中收到消息後,開始遍歷自己的任務隊列 io_threads_list[id],並將其中的 client 挨個取出來開始處理。
//file:src/networking.c
void *IOThreadMain(void *myid) {
while(1) {
//遍歷當前線程等待隊列裏的請求 client
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
}
}
listEmpty(io_threads_list[id]);
}
}
4.2 寫請求處理
由於這次任務隊列裏都是寫請求,所以 io 線程會進入 writeToClient。而主線程在分配完任務以後,自己開始處理起了 io_threads_list[0],並也進入到 writeToClient。
//file:src/networking.c
int writeToClient(int fd, client *c, int handler_installed) {
while(clientHasPendingReplies(c)) {
// 先發送固定緩衝區
if (c->bufpos > 0) {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
......
// 再發送回復鏈表中數據
} else {
o = listNodeValue(listFirst(c->reply));
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
......
}
}
}
writeToClient 中的主要邏輯就是調用 write 系統調用讓內核幫其把數據發送出去即可。由於每個命令的處理結果大小是不固定的。所以 Redis 採用的做法用固定的 buf + 可變鏈表來儲存結果字符串。這裏自然發送的時候就需要分別對固定緩存區和鏈表來進行發送了。
當所有的寫請求也處理完後,beforeSleep 就退出了。主線程將會再次調用 epoll_wait 來發現請求,進入下一輪的用戶請求處理。
五、總結
//file: src/server.c
int main(int argc, char **argv) {
......
// 1.1 主線程初始化
initServer();
// 1.2 啓動 io 線程
InitServerLast();
// 進入事件循環
aeMain(server.el);
}
在 initServer 這個函數內,Redis 做了這麼三件重要的事情。
-
創建一個 epoll 對象
-
對配置的監聽端口進行 listen
-
把 listen socket 讓 epoll 給管理起來
在 initThreadedIO 中調用 pthread_create 庫函數創建線程,並且註冊線程回調函數 IOThreadMain。在 IOThreadMain 中等待其隊列 io_threads_list[id] 產生請求,當有請求到達的時候取出 client,依次處理。其中讀操作通過 readQueryFromClient 處理, 寫操作通過 writeToClient 處理。
主線程在 aeMain 函數中,是一個無休止的循環,它是 Redis 中最重要的部分。它先是調用事件分發器發現事件。如果有新連接請求到達的時候,執行 accept 接收新連接,併爲其註冊事件處理函數。
當用戶連接上有命令請求到達的時候,主線程在 read 處理函數中將其添加到讀發送隊列中。然後接着在 beforeSleep 中開啓對讀任務隊列和寫任務隊列的處理。總體工作過程如下圖所示。
在這個處理過程中,對讀任務隊列和寫任務隊列的處理都是多線程並行進行的(前提是開篇我們開啓了多 IO 線程並且也併發處理讀)。
當讀任務隊列和寫任務隊列的都處理完的時候,主線程再一次調用 epoll_wait 去發現新的待處理事件,如此往復循環進行處理。
至此,多線程版本的 Redis 的工作原理就介紹完了。坦白講,我覺得這種多線程模型實現的並不足夠的好。
原因是主線程是在處理讀、寫任務隊列的時候還要等待其它的 io 線程處理完才能進入下一步。假設這時有 10 個用戶請求到達,其中 9 個處理耗時需要 1 ms,而另外一個命令需要 1 s。則這時主線程仍然會等待這個 io 線程處理 1s 結束後才能進入後面的處理。整個 Redis 服務還是被一個耗時的命令給 block 住了。
我倒是希望我的理解哪裏有問題。因爲這種方式真的是沒能很好地併發起來。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/MU8cxoKS3rU9mN_CY3WxWQ