Redis 事件機制是如何實現的?
前言
我們都知道,Redis 是單線程(非嚴謹),你是否想過,一個線程要如何處理來自各個客戶端的各種請求呢?它忙的過來嗎?沒錯,它還真的能忙過來,並且還井井有條。其中多虧了 IO 多路複用,而不僅僅是它,事件機制在其中也是一個不錯的設計。
PS:Redis 高版本已經支持多線程處理某些事情,爲了簡化,這裏不做討論,故下文出現的單線程僅是描述那些必須單線程執行的場景。
嘗試思考
首先,讓我們來思考一下,如果是我們自己來實現,會嘗試如何去做。
對於請求連接處理的思考
最笨的方法,那麼就是來一個客戶端 accept 一次,然後給什麼請求做什麼事情,先來先做,做完走人,對吧。那顯然這樣太慢了,要知道作爲一個緩存,這樣設計要把人給急死。
當然,我們也可以說,來一個我開一個線程單獨處理你,相當於你一來我就單獨找人爲你服務,而服務的人最終會將請求給到一個處理中心,讓處理中心統一去處理,然後將結果返回。但顯然 Redis 沒有那麼多資源讓你浪費。
於是要找人幫忙,那就是 IO 多路複用,至少它能幫我解決前面服務的問題,fd 我就不管了,直接告訴我哪些人來了,並且告訴我有事的是那些人。
反觀機制的思考
既然 epoll_wait 能 告訴我們有那些 socket 已經就緒,那麼我們就處理就緒的這些就可以了。但我們需要一個合理的機制來幫我們來優雅的處理他們,畢竟 Redis 後面只有個單線程在處理。由於處理沒這麼快,肯定需要一個地方來存放未處理的這些事件,那很合理就能想到需要一個類似 buffer 的東西。
所以,對於這個事件機制,我第一個想法就是弄個隊列,或者 ringbuffer 來搞,那不就是一個生產消費者模型嗎?
事件機制
那麼下面我們就來看看 Redis 它是如何設計。
分類
首先 Redis 分了兩類事件
-
fileEvents 文件事件,就是我們之前提到的請求的處理,我們也主要討論這個
-
timedEvents 定時事件,沒錯肯定有一些定時任務觸發的事件在裏面
文件事件處理
OK,看完圖我們就有了一個大致的印象,爲了靈活的處理不同的事件,需要將事件分配給處理器去處理,這裏也是我們之前思考的時候沒有想到的一個設計。通常來說對於任何的處理往往都有這樣一個分配器去分配所有的任務,這樣可以讓擴展更加靈活,如果後續有新的類型,只需要擴展出一個新的處理器就可以了。
源碼分析
https://github.com/redis/redis/blob/9b1d4f003de1b141ea850f01e7104e7e5c670620/src/ae.c#L493
首先入口在 aeMain 這個簡單,就是循環,也正是這個循環處理着所有的事件,我們可以看到,只要不停 (stop),就會一直循環處理
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
然後就是我們重點的 aeProcessEvents 方法,其中重點就是調用 aeApiPoll 獲取當前就緒的事件,然後你就能看到我們的 aeFileEvent 也就是文件事件了,最後還有 processTimeEvents 處理定時事件。那麼事件本身,是如何處理的呢?就是 rfileProc 和 wfileProc 一個處理讀一個處理寫。那麼問題來了,這兩個方法具體是什麼呢?賣個關子,我們先瞅一眼 aeApiPoll
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
int64_t usUntilTimer;
if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
eventLoop->beforesleep(eventLoop);
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. 注意這裏!!!!!!!!!!!!!!*/
numevents = aeApiPoll(eventLoop, tvp);
/* Don't process file events if not requested. */
if (!(flags & AE_FILE_EVENTS)) {
numevents = 0;
}
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */
int invert = fe->mask & AE_BARRIER;
if (!invert && fe->mask & mask & AE_READABLE) {
/* rfileProc 在處理什麼事件呢?*/
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
/* wfileProc 在處理什麼事件呢?*/
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
這裏其他都不重要,重點就在我們熟悉的 epoll_wait ,獲取所有就緒的 fd 也就能知道所有需要處理的事件了。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
} else if (retval == -1 && errno != EINTR) {
panic("aeApiPoll: epoll_wait, %s", strerror(errno));
}
return numevents;
}
好了,我們來解密究竟 rfileProc 和 wfileProc 是什麼,aeCreateFileEvent 方法是用於創建 FileEvent 的方法,其中的入參裏面有 aeFileProc 沒錯就是它了。根據不同的類型用不同的 handler 創建不同的 event。也就是說,最終的處理方式是通過參數傳遞進去的。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
小思考
如果是我設計,或許絕大多數情況下就是弄一個對象,而對象根據具體的事件類型執行不同的處理邏輯。最多用一個 策略模式 可能就上天了。而 Redis 的這樣的設計思路,類似一種閉包的設計,或者說函數式編程的一種思路吧,將具體的處理對象,處理方式,處理結果,通通包含在內。我們先不說這樣的設計好不好,但給我的第一印象是,這樣的設計會讓我覺得最終執行的整個處理會更加連貫,並且處理的時候執行的全部邏輯是高度一致的,而處理方式的本身真正做到了可擴展。
總結
那我們通過 Redis 的事件機制能學到什麼呢?
-
這個事件機制的模型很通用也很清晰,包含:接收、循環、處理,三個部分,很標準的設計
-
其中對於任務的處理有一個專門的分配器去分配,這在很多 handler 的設計中非常實用,熟悉 java 的同學應該知道 DispatcherServlet 沒錯這樣的模型會更加的清晰
-
易於擴展,這裏的擴展有兩方面一方面是對於處理器的擴展,之後有其他事件類型只需要增加事件處理器就可以了;而另一方面這裏的擴展還包括了多線程的擴展,方便了同時支持多個事件的處理。
-
其實,Redis 的事件機制是一個標準的 Reactor 模式 是一種基於事件驅動的設計模式,所以我們更多的是要學到這樣設計模式,來運用到以後的編碼中,可以更清晰也易擴展。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/cNDKunSkPwAlI3Ly15rQNg