Redis 源碼分析 I-O 模型詳解
主流 I/O 模型
阻塞 IO、非阻塞 IO、異步 IO 。
BIO 模型
同步阻塞 模型,一個客戶單對應一個鏈接的處理線程
缺點:
1、IO 中如果進行 read 是阻塞操作,如果請求的鏈接操作不做任何操作,也會導致線程阻塞,浪費線程資源
2、如果線程很多,會導致服務器壓力增加,比如 C10K 問題
引用場景:
BIO 方式運用數目比較小且固定的架構,這種方式對服務器資源要求比較高,但是程序簡單容易理解。
NIO 模型
同步非阻塞,是服務器實現的模式是一個線程可以處理多個請求(鏈接),客戶端發送的鏈接都會註冊到多路複用器 selector 上,多路複用器輪訓到介入的所有 IO 請求進行處理。
應用場景:
NIO 方式適用於鏈接數目多(輕操作) 的架構,比如聊天服務器,彈幕系統,服務器間通訊,編程比較複雜。Java NIO 模型如下圖所示:
總結:
NIO 的三大核心組件:Channel(通道)、Buffer (緩衝區)、Selector (多路複用器)
1、Channel 類似流,每個 Channel 對應一個 buffer 緩衝區。
2、Channel 組冊到 Selector 上,由 Selecotor 根據 Channel 讀寫事件發生時交給空閒線程處理。
3、NIO 中 Buffer 與 Channel 都是可讀可寫的。
NIO 模型實現
在 linux 系統中是通過調用系統內核函數來創建 socket ,selecotor 對應操作系統的 epoll 描述符。可以將 socket 的連接文件描述符綁定到 epoll 文件描述符上,進行事件的異步通知,實現一個線程處理,並且減少大量的無效遍歷,事件處理交給了操作系統的內核,提升效率。
Redis 線程模型
Redis 是一個典型的基於 epoll 的 nio 線程模型, epoll 實例手機所有的事件(連接與讀事件)由一個服務線程處理所有命令。
Redis 底層相關的 epoll 的源碼實現在 src/ae_epoll.c 文件中。
AIO 模型
異步非阻塞、由於操作系統完成後回調通知程序啓動線程去處理,一般適用於鏈接較多且鏈接時間較長的應用。
應用場景:
AIO 方式適用於鏈接數目多且比較長(重操作),比如設備每間隔 2 秒上報狀態。
三種 I/O 模型對比
Redis 線程模型
1、交互模型
2、Reactor 模型
處理流程:
-
主線程往 epoll 內核事件表註冊 socket 上的讀事件。
-
主線程調用 epoll_wait 等待 socket 上數據可讀。
-
當 socket 可讀的時候 ,epoll_wait 通知主線程,主線程則將 socket 可讀事件放入請求隊列。
-
睡眠在請求隊列上的某個工作線程被喚醒,他從 socket 讀取數據,並且處理用戶請求,然後往 epoll 內核事件表中註冊 socket 寫就緒事件。
-
主線程 epoll_wart 等待 socket 可寫
-
當 socket 可寫時, epoll_wait 通知主線程。主線程將 socket 可寫事件放入請求隊列。
-
睡眠在請求隊列中的某個線程被喚醒,他往 socket 上寫服務端處理客戶端請求的結果。
優點和缺點:
-
優點
-
響應快,不必爲單個同步操作阻塞,也不用考慮 fd 跨線程問題。
-
可拓展性,可以很方便的通過 reactor 實例(如 multi reactor)個數來利用 cpu 資源;
-
可複用性,reacotor 本身與具體事件處理邏輯無關,方便複用。
-
缺點
-
共享同一個 reactor 時,若出現較長的讀寫,會影響該 reactor 的響應時間,此時可以考慮 thread-per-connection
3、Reactor 模型示例
服務端 (基於 netty):
// 基於 Java 代碼爲例
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 4096)
.childHandler(new JkvServerInitalizer());
ChannelFuture f = b.bind(SERVER_PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
客戶端 (基於 netty):
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new MyChatClientInitializer());
Channel channel = bootstrap.connect("localhost",SERVER_PORT).sync().channel();
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
channel.writeAndFlush(br.readLine() + "\r\n");
}
}finally {
eventLoopGroup.shutdownGracefully();
}
Redis 的網絡模型
Redis 採用的是單線程 Reactor。單機壓測 QPS 可以達到 10w , 因爲 Redis 主要是以內存讀寫爲主,效率是非常高的。
Redis 服務器是一個事件驅動的程序,服務器需要處理一下兩類事件:
1、文件事件(file event): Redis 服務器通過套接字與客戶端(或者其他 Redis 服務器)進行連接,而文件事件就是服務器對套接字操作的抽。服務器與客戶端(或者其他服務器)的通訊都會產生相應的文件事件,而服務器則通過監聽並且處理這些事件來完成一些列網絡通訊操作
2、 事件事件(time event): Redis 服務器中國呢的一些操作(比如 serverCron 函數)需要在給定的事件點執行,而時間事件就是服務器對着咧定時操作的抽象。
文件事件
Redis 基於 Reactor 模式開發了自己的網絡事件處理器:這個處理器被稱爲文件事件處理器(file event handler)
-
文件事件處理器使用 I/O 多路複用(multiplexing)程序來同時監聽多個套接字,並根據套接字目前執行的任務來爲套接字關聯不同的事件處理器。
-
當被監聽的套接字準備好執行連接應答(accept)、讀取(read)、寫入(write)、關閉(close)等操作時,與操作相對應的文件事件就會產生,這時文件事件處理器就會調用套接字之前關聯好的事件處理器來處理這些事件。
文件事件構成,文件事件處理器的 4 個部分:套接字、 I/O 多路複用程序、文件事件派發器(dispatcher)、以及事件處理器。
多路複用器, 的所有功能都是通過包裝常見的 select、epoll 、evport 和 kququee 這些 i/o 多路複用函數庫來實現了,每個 i/o 多路複用器在 redis 中都對應一個單獨的文件比如:src\ae_epoll.c、src\ac_evport.c、src\ac_kqueue.c、src\ac_select.c 等。
因爲 Redis 每個 I/O 多路複用函數庫都實現了相同的 API , 所以 I/O 多路複用程序的底層實現是可以互換的。
Redis 在 I/O 多路複用程序實現源碼中通過 #include 宏定義了相應的穀子額,程序會在編譯期間自動選擇系統中性能最高的 I/O 多路複用函數庫來作爲 Redis 的 I/O 多路複用程序的底層實現:
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
事件的類型
I/O 多路複用程序可以監聽多個套接字的 ae.h/AE_READABLE 事件和 ae.h/AE_WRITABLE 事件,這兩類事件和套接字操作之間的對應關係如下:
-
當套接字變得可讀時(客戶端對套接字執行 write 操作,或者執行 close 操作),或者有新的可應答(acceptable)套接字出現時(客戶端對服務器的監聽套接字執行 connect 操作),套接字產生 AE_READABLE 事件。
-
當套接字變得可寫時(客戶端對套接字執行 read 操作),套接字產生 AE_WRITABLE 事件。
如果套接字同時可讀可寫,那麼服務器先讀套接字,後寫套接字。
文件事件處理器
1、連接應答處理器
networking.c/acceptTcpHandler 函數是 Redis 的連接應答處理器,這個處理器用於對連接服務器監聽套接字的客戶端進行應答,具體實現爲 sys/socket.h/accept 函數的包裝。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
2、命令請求處理器
networking.c/readQueryFromClient 函數是 Redis 的命令請求處理器,這個處理器負責從套接字中讀入客戶端發送的命令請求內容,具體實現爲 unistd.h/read 函數的包裝。
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0 && remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClientAsync(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
atomicIncr(server.stat_net_input_bytes, nread);
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
return;
}
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBuffer(c);
}
3、命令回覆處理器
networking.c/sendReplyToClient 函數是 Redis 的命令回覆處理器,這個處理器負責將服務器執行命令後得到的命令回覆通過套接字返回給客戶端,具體實現爲 unistd.h/write 函數的包裝。
/* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {
client *c = connGetPrivateData(conn);
writeToClient(c,1);
}
定時事件
實際上 redis 支持的是週期任務事件,即執行完之後不會刪除,而是在重新插入鏈表。
定時器採用鏈表的方式進行管理,新定時任務插入鏈表表頭。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
具體定時事件處理如下:
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
//刪除定時器
while(te) {
long long id;
/* Remove events scheduled for deletion. */
// 下一輪中事件進行刪除
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
// timeProc 返回值 retval 爲事件事件執行的間隔
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
// 如果超時,那麼標記爲刪除
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
來源: https://juejin.cn/post/7069279726036058142
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Z6ztX1x8EoBWPjSP4Aqqxw