Linux 驚羣效應解析,讓性能不再 “浪費”
在 Linux 服務器的複雜世界裏,性能優化如同一場永無止境的馬拉松。衆多影響性能的 “暗礁” 中,“驚羣效應” 頗具隱蔽性卻又影響深遠。想象這樣一個場景:多個進程或線程宛如一羣在巢穴中休憩的鳥兒,同時阻塞等待着同一個事件的發生,如同等待遠方傳來的食物信號。一旦該事件出現,所有 “鳥兒” 瞬間被驚醒,一擁而上。但殘酷的現實是,最終僅有一隻 “鳥兒” 能成功獲取並處理這個事件,其餘的只能失望而歸,重新回到巢穴中繼續等待。這,便是 Linux 驚羣效應的生動寫照。
驚羣效應絕非無害的小插曲。它會引發系統對用戶進程或線程頻繁進行無效調度,大量寶貴的 CPU 時間浪費在進程間的上下文切換上,使得系統性能如斷了線的風箏般直線下滑。同時,爲保證僅有一個進程能獲取資源,開發者不得不引入鎖機制,這無疑又給系統增加了額外開銷。從早期的 Linux 內核版本到如今,驚羣效應不斷演變,accept() 函數的驚羣問題在 Linux 2.6 版本後已得到內核層面的妥善解決,可 epoll 驚羣等新問題又接踵而至,持續挑戰着開發者的智慧。接下來,讓我們深入剖析 Linux 驚羣效應的方方面面,探尋讓性能不再 “浪費” 的有效策略 。
Part1 驚羣效應是什麼?
想象這樣一個場景:在一個寧靜的廣場上,一羣鴿子正在悠閒地踱步。你手中拿着一把穀物,向鴿羣扔去。瞬間,所有的鴿子都被驚動,它們撲騰着翅膀,一擁而上,瘋狂地爭奪食物。然而,最終只有一隻幸運的鴿子能夠搶到那塊最大的穀物,其他鴿子則只能失望地回到原地,繼續等待下一次機會。這就是生活中的 “驚羣” 現象,看似平常,卻蘊含着有趣的資源競爭和效率問題。
在 Linux 操作系統的世界裏,也存在着類似的現象,被稱爲 “驚羣效應(Thundering Herd Problem)” 。當多個進程或線程同時阻塞等待同一個事件(如網絡連接請求、文件可讀可寫等)時,一旦這個事件發生,所有等待的進程或線程都會被喚醒。但實際上,只有一個進程或線程能夠真正處理這個事件,其他被喚醒的進程或線程發現自己無法獲取所需資源(如無法 accept 新連接、文件已被其他進程鎖定等),只能再次進入阻塞狀態,重新等待下一次事件的發生。這種現象就如同廣場上的鴿子,被一塊食物驚動後蜂擁而上,卻只有一隻鴿子能得到食物,其他鴿子的努力都白費了,造成了不必要的系統開銷和性能浪費。
在網絡編程中,當使用多進程或多線程模型處理 socket 連接時,驚羣效應尤爲常見。比如,父進程創建一個 socket,綁定端口並開始監聽(listen),然後通過 fork 創建多個子進程,每個子進程都繼承了父進程的 socket,並調用 accept 開始監聽等待網絡連接。此時,如果有新的連接到來,所有阻塞在 accept 系統調用上的子進程都會被喚醒,但最終只有一個子進程能夠成功 accept 這個連接,其他子進程則會在處理該事件失敗後重新休眠。這不僅浪費了系統資源,還可能導致系統性能的下降,特別是在高併發場景下,這種影響更爲明顯。
Part2 驚羣效應的危害
驚羣效應就像隱藏在 Linux 系統背後的幽靈,悄無聲息地對系統性能產生着負面影響。一旦它出現,系統就會陷入一系列的困境,導致資源浪費和性能下降 。下面,我們就來詳細分析驚羣效應的危害。
2.1 系統性能損耗
在驚羣效應的影響下,Linux 內核對用戶進程或線程頻繁地進行無效的調度和上下文切換,這使得系統性能大打折扣。上下文切換(context switch)過高時,CPU 就像一個忙碌的搬運工,不斷地在寄存器和運行隊列之間奔波,將大量的時間花費在進程或線程的切換上,而不是執行真正有意義的工作。
以一個簡單的 Web 服務器爲例,假設服務器採用多進程模型來處理客戶端請求,每個進程都阻塞在 accept 系統調用上等待新的連接。當有新的連接到來時,所有阻塞在 accept 上的進程都會被喚醒。然而,只有一個進程能夠成功 accept 這個連接,其他進程則會發現連接已經被處理,只能再次進入阻塞狀態。
在這個過程中,CPU 需要對這些被喚醒的進程進行調度,保存和加載它們的寄存器狀態,執行系統調度器的代碼,這些操作都需要消耗 CPU 的時間和資源。此外,多核 CPU 中的緩存也會受到影響,因爲不同進程的頻繁切換可能導致緩存中的數據失效,從而增加了 CPU 從內存中讀取數據的次數,進一步降低了系統性能。
2.2 資源競爭與鎖開銷
爲了確保在驚羣效應發生時只有一個進程或線程能夠得到資源,我們需要對資源操作進行加鎖保護。這無疑加大了系統的開銷,引入了新的性能瓶頸。以常見的鎖機制爲例,當一個進程或線程獲取到鎖並處理事件時,其他進程或線程需要等待鎖的釋放。在等待過程中,這些進程或線程可能會被阻塞,從而導致 CPU 資源的浪費。
一些常見的服務器軟件,如 Nginx,通過鎖機制來解決驚羣效應 。Nginx 的鎖機制默認是開啓的,它通過互斥鎖來保證只有一個進程能夠處理新的連接請求,從而避免了驚羣效應帶來的性能問題。然而,鎖機制本身也會帶來一定的開銷,包括鎖的申請、釋放以及可能的死鎖檢測等操作,這些都會消耗 CPU 等資源,對系統性能產生一定的影響。如果鎖的粒度設置不當,還可能導致其他進程或線程長時間等待,進一步降低系統的併發處理能力。
Part3 常見驚羣場景
3.1 accept 驚羣
在網絡編程中,accept 驚羣是一種較爲常見的驚羣場景。其場景通常是這樣的:主進程首先創建一個 socket,然後通過 bind 函數將其綁定到指定的 IP 地址和端口上,接着調用 listen 函數使 socket 進入監聽狀態 。完成這些初始化操作後,主進程通過 fork 系統調用創建多個子進程。這些子進程會繼承主進程的 socket,並開始循環調用 accept 函數,阻塞等待新的連接到來。
當有新的連接請求到達時,系統內核會檢測到這個事件,並喚醒所有阻塞在 accept 調用上的子進程。然而,實際上只有一個子進程能夠成功 accept 這個新連接,其他子進程在嘗試 accept 時會返回 EAGAIN 錯誤,表示資源暫時不可用,然後重新進入阻塞狀態,繼續等待下一次連接請求。
在 Linux 2.6 版本之前,accept 驚羣問題確實存在,這給服務器的性能帶來了一定的影響。不過,從 Linux 2.6 版本開始,內核開發者對這一問題進行了優化和解決。其核心原理是引入了互斥等待變量(WQ_FLAG_EXCLUSIVE 標誌) 。當多個進程阻塞在 accept 調用上時,內核會爲每個進程的等待隊列入口設置 WQ_FLAG_EXCLUSIVE 標誌,並將它們添加到等待隊列的尾部。當有新的連接到來時,內核調用 wake_up 函數,它會在喚醒第一個帶有 WQ_FLAG_EXCLUSIVE 標誌的進程後就停止喚醒其他進程,從而避免了所有進程被喚醒的情況,大大提高了系統的性能和效率。
下面是一個簡單的驗證代碼示例,用於測試 accept 驚羣問題在 Linux 2.6 內核及之後版本中的情況:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <string.h>
#include <netinet/in.h>
#include <unistd.h>
#define PROCESS_NUM 10
int main() {
int fd = socket(PF_INET, SOCK_STREAM, 0);
int connfd;
int pid;
char sendbuff[1024];
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(1234);
bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
listen(fd, 1024);
int i;
for (i = 0; i < PROCESS_NUM; ++i) {
pid = fork();
if (pid == 0) {
while (1) {
connfd = accept(fd, (struct sockaddr *)NULL, NULL);
sprintf(sendbuff, "接收到accept事件的進程PID = %d\n", getpid());
send(connfd, sendbuff, strlen(sendbuff)+1, 0);
printf("process %d accept success\n", getpid());
close(connfd);
}
}
}
wait(0);
return 0;
}
編譯並運行上述代碼,然後使用 telnet 連接到該服務器程序(例如:telnet 127.0.0.1 1234)。從運行結果中可以觀察到,只有一個進程能夠成功 accept 新連接,並輸出相應的信息,其他進程並未被喚醒,這就證明了在 Linux 2.6 內核及之後版本中,accept 驚羣問題已經得到了解決 。通過這個簡單的示例,我們可以更直觀地理解 accept 驚羣問題以及內核的解決方式。
3.2 epoll 驚羣
epoll 是 Linux 內核中一種高效的 I/O 事件通知機制,常用於實現高性能的網絡服務器。然而,在使用 epoll 時,如果設計不當,也可能會出現驚羣效應,導致系統性能下降。epoll 驚羣主要有以下兩種場景:
①fork 前創建 epollfd
在這種場景下,主進程首先創建一個用於監聽的 socket(listenfd),然後創建一個 epoll 實例(epollfd) 。接着,主進程通過 fork 系統調用創建多個子進程。這些子進程會繼承主進程的 listenfd 和 epollfd。每個子進程都會將 listenfd 添加到自己的 epoll 實例中,然後調用 epoll_wait 函數,阻塞等待事件的發生。當有新的連接請求到達時,內核會檢測到這個事件,並向所有子進程的 epoll 實例發送事件通知。這就導致所有子進程的 epoll_wait 函數都會返回,觸發 epoll 驚羣。
不過,與早期的 accept 驚羣類似,Linux 內核已經意識到了這個問題,並通過類似的加鎖機制來解決。內核在處理 epoll 事件通知時,會確保只有一個子進程能夠真正處理這個新連接事件,其他子進程雖然被喚醒,但在嘗試處理事件時會發現資源已被佔用,從而避免了無效的競爭和資源浪費 。
②fork 後創建 epollfd
另一種容易出現 epoll 驚羣的場景是在 fork 之後創建 epollfd。具體過程如下:主進程首先創建一個用於監聽的 socket(listendfd),然後通過 fork 系統調用創建多個子進程。每個子進程在創建後,各自獨立地創建自己的 epoll 實例(epollfd),並將從主進程繼承而來的 listenfd 添加到自己的 epoll 實例中。之後,每個子進程調用 epoll_wait 函數,阻塞等待事件的發生。當有新的連接請求到達時,內核會檢測到這個事件,並向所有子進程的 epoll 實例發送事件通知。
由於每個子進程的 epoll 實例是獨立的,內核無法確定應該將事件通知發送給哪個子進程,因此會喚醒所有子進程,導致 epoll 驚羣的發生。與 fork 前創建 epollfd 的情況不同,這種場景下的驚羣問題內核並沒有提供直接的解決方案,因爲內核難以判斷應該喚醒哪個子進程來處理新連接事件 。
下面是一個驗證 fork 後創建 epollfd 導致 epoll 驚羣的代碼示例:
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/wait.h>
#define IP "127.0.0.1"
#define PORT 8888
#define PROCESS_NUM 4
#define MAXEVENTS 64
static int create_and_bind() {
int fd = socket(PF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
inet_pton(AF_INET, IP, &serveraddr.sin_addr);
serveraddr.sin_port = htons(PORT);
bind(fd, (struct sockaddr *) &serveraddr, sizeof(serveraddr));
return fd;
}
static int make_socket_non_blocking(int sfd) {
int flags, s;
flags = fcntl(sfd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl(sfd, F_SETFL, flags);
if (s == -1) {
perror("fcntl");
return -1;
}
return 0;
}
int worker(int sfd, int efd, struct epoll_event *events, int k) {
/* The event loop */
while(1) {
int n, i;
n = epoll_wait(efd, events, MAXEVENTS, -1);
sleep(1); // 確保能夠看到驚羣現象
printf("worker %d return from epoll_wait!\n", k);
for (i = 0; i < n; i++) {
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
} else if (sfd == events[i].data.fd) {
while(1) {
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof(in_addr);
infd = accept(sfd, &in_addr, &in_len);
if (infd == -1) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
break;
} else {
perror("accept");
break;
}
}
s = getnameinfo(&in_addr, in_len,
hbuf, sizeof(hbuf),
sbuf, sizeof(sbuf),
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0) {
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
}
make_socket_non_blocking(infd);
struct epoll_event ev;
ev.data.fd = infd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, infd, &ev) == -1) {
perror("epoll_ctl");
close(infd);
}
}
continue;
} else {
// 處理其他事件
}
}
}
return 0;
}
int main(int argc, char *argv[]) {
int sfd, efd;
struct epoll_event event;
struct epoll_event *events;
int i;
sfd = create_and_bind();
if (sfd == -1) {
abort();
}
if (make_socket_non_blocking(sfd) == -1) {
abort();
}
if (listen(sfd, SOMAXCONN) == -1) {
perror("listen");
abort();
}
for (i = 0; i < PROCESS_NUM; ++i) {
pid_t pid = fork();
if (pid == -1) {
perror("fork");
abort();
} else if (pid == 0) {
efd = epoll_create1(0);
if (efd == -1) {
perror("epoll_create1");
abort();
}
event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event) == -1) {
perror("epoll_ctl: listen_sock");
abort();
}
events = calloc(MAXEVENTS, sizeof event);
if (events == NULL) {
perror("calloc");
abort();
}
worker(sfd, efd, events, i);
free(events);
close(efd);
return 0;
}
}
for (i = 0; i < PROCESS_NUM; ++i) {
wait(NULL);
}
close(sfd);
return 0;
}
編譯並運行上述代碼,然後使用 telnet 連接到該服務器程序(例如:telnet 127.0.0.1 8888)。從運行結果中可以觀察到,當有新連接到來時,多個子進程的 epoll_wait 函數都會返回,輸出 “worker X return from epoll_wait!” 的信息,這表明多個子進程被同時喚醒,驗證了 fork 後創建 epollfd 會導致 epoll 驚羣的問題 。
3.3 poll/select 驚羣問題
先說結論,poll/select 是存在驚羣問題的。下面用代碼驗證,再看看代碼流程。
client 代碼仍然使用 accept 時的代碼。下面是 poll 模式下 server 端代碼:
root@master:~# cat poll_thunder.c
#include<stdio.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<string.h>
#include<netinet/in.h>
#include<unistd.h>
#include <errno.h>
#include <poll.h>
#define PROCESS_NUM 10
int main()
{
int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int connfd;
int pid;
int status = 1;
char sendbuff[1024];
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(2222);
bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
listen(fd, 1024);
int i, new_fd, ret=0;
struct pollfd clientfds[1];
for(i = 0; i < PROCESS_NUM; ++i){
pid = fork();
if(pid == 0){
//printf("I'm pid: %d, poll on : %d\n", getpid(), fd);
while (1) {
printf("I'm pid: %d, poll on : %d\n", getpid(), fd);
clientfds[0].fd = fd;
clientfds[0].events=POLLIN;
ret = poll(clientfds, 2, -1);
if (ret < 0)
{
perror("poll error");
}
else if(ret == 0)
{
printf("poll timeout\n");
continue;
}
if (clientfds[0].revents&POLLIN) {
new_fd = accept(fd, (struct sockaddr *)NULL, NULL);;
if(new_fd < 0)
{
printf("accept failed: %d on pid: %d\n", errno, getpid());
printf("\n");
continue;
}
printf("new read event: accept new_fd: %d on pid: %d\n", new_fd, getpid());
close(new_fd);
}
printf("\n");
if (clientfds[0].revents&POLLOUT) {
printf("new write event in pid: %d\n", getpid());
}
if (clientfds[0].revents&POLLERR) {
printf("new error event in pid: %d\n", getpid());
}
}
}
}
//int status;
wait(0);
return 0;
}
client 連接 server 一次,可看到有四個進程被喚醒,只有一個進程接收了新連接。
root@master:~# ./poll
I'm pid: 18907, poll on : 3
I'm pid: 18908, poll on : 3
I'm pid: 18909, poll on : 3
I'm pid: 18910, poll on : 3
I'm pid: 18911, poll on : 3
I'm pid: 18912, poll on : 3
I'm pid: 18913, poll on : 3
I'm pid: 18914, poll on : 3
I'm pid: 18915, poll on : 3
I'm pid: 18916, poll on : 3
new read event: accept new_fd: 4 on pid: 18915 -->只有此進程接收新連接成功
accept failed: 11 on pid: 18916
I'm pid: 18915, poll on : 3
I'm pid: 18916, poll on : 3
accept failed: 11 on pid: 18907
I'm pid: 18907, poll on : 3
accept failed: 11 on pid: 18908
I'm pid: 18908, poll on : 3
這裏有一個問題,按說應該 10 個進程都被喚醒了,爲什麼只有四個進程執行到 accept 處?
因爲進程被喚醒後,會調用目標文件的 poll 函數獲取發生的事件通知用戶程序,用戶程序調用 accept 後,會將發生的事件清空。如果清空事件前,被喚醒的進程調用 poll 還會獲取到發生的事件,用戶程序再調用 accept 會返回失敗。但是清空事件後,被喚醒的進程調用 poll 獲取不到事件,也就不會通知用戶程序,而是繼續睡眠,這個情況通過 log 是看不到的,可以通過 strace 觀察 server。或者在進程被喚醒後,在調用 accept 前 sleep 一段時間,讓所有進程都有時間調用 poll 獲取事件就會看到如下 log,10 個進程都被喚醒並且都調用 accept,但是仍然只有一個進程能成功。
root@master:~# ./poll
I'm pid: 20130, poll on : 3
I'm pid: 20131, poll on : 3
I'm pid: 20132, poll on : 3
I'm pid: 20133, poll on : 3
I'm pid: 20135, poll on : 3
I'm pid: 20136, poll on : 3
I'm pid: 20137, poll on : 3
I'm pid: 20138, poll on : 3
I'm pid: 20134, poll on : 3
I'm pid: 20139, poll on : 3
//10個進程都被喚醒執行accept了,但是隻有一個能成功接收新連接
accept failed: 11 on pid: 20136
new read event: accept new_fd: 4 on pid: 20133
accept failed: 11 on pid: 20131
I'm pid: 20136, poll on : 3
I'm pid: 20131, poll on : 3
accept failed: 11 on pid: 20139
I'm pid: 20139, poll on : 3
accept failed: 11 on pid: 20132
I'm pid: 20133, poll on : 3
accept failed: 11 on pid: 20135
I'm pid: 20132, poll on : 3
I'm pid: 20135, poll on : 3
accept failed: 11 on pid: 20137
accept failed: 11 on pid: 20130
I'm pid: 20137, poll on : 3
I'm pid: 20130, poll on : 3
accept failed: 11 on pid: 20134
I'm pid: 20134, poll on : 3
accept failed: 11 on pid: 20138
I'm pid: 20138, poll on : 3
用戶調用 poll 後,在內核態流程調用:
do_sys_poll->do_poll->do_pollfd-> f.file->f_op->poll, 對於socket來說,poll函數爲調用sock_poll
static unsigned int sock_poll(struct file *file, poll_table *wait)
return busy_flag | sock->ops->poll(file, sock, wait); //tcp_poll
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask;
struct sock *sk = sock->sk;
const struct tcp_sock *tp = tcp_sk(sk);
sock_rps_record_flow(sk);
sock_poll_wait(file, sk_sleep(sk), wait);
if (sk->sk_state == TCP_LISTEN)
return inet_csk_listen_poll(sk);
}
//如果全連接隊列不爲空,則直接返回POLLIN事件
static inline unsigned int inet_csk_listen_poll(const struct sock *sk)
{
return !reqsk_queue_empty(&inet_csk(sk)->icsk_accept_queue) ?
(POLLIN | POLLRDNORM) : 0;
}
static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p)
{
if (!poll_does_not_wait(p) && wait_address) {
poll_wait(filp, wait_address, p);
/* We need to be sure we are in sync with the
* socket flags modification.
*
* This memory barrier is paired in the wq_has_sleeper.
*/
smp_mb();
}
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
//p->_qproc爲__pollwait
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}
/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
entry->filp = get_file(filp);
entry->wait_address = wait_address;
entry->key = p->_key;
init_waitqueue_func_entry(&entry->wait, pollwake);
entry->wait.private = pwq;
add_wait_queue(wait_address, &entry->wait);
}
void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;
//將exclusive標誌清除掉,這樣有10個進程調用select,就會將10個進程添加到等待隊列中。
//因爲沒有exclusive,所以這10個進程都會被喚醒。
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}
**喚醒進程流程:**喚醒流程和 accept 中喚醒流程相同,只不過最後調用__wake_up_common 時,因爲 poll 添加到等待隊列時,沒有設置 exclusive,所以所有進程都會被喚醒。被喚醒的進程再調用 accept 接收新連接,但是隻有一個進程會成功,其餘 9 個都返回錯誤 EAGAIN。
Part4 解決驚羣效應的方法
既然驚羣效應會對系統性能產生如此大的危害,那麼我們該如何解決它呢?在 Linux 系統中,有多種方法可以有效地應對驚羣效應,下面我們將詳細介紹這些方法。
4.1 epoll + EPOLLEXCLUSIVE
爲了解決 epoll 驚羣問題,Linux 4.5 版本引入了 EPOLLEXCLUSIVE 標誌 。當我們在使用 epoll 時,爲監聽 socket 添加 EPOLLEXCLUSIVE 標誌後,就可以保證同一個事件只喚醒一個等待的進程或線程,從而避免了不必要的喚醒操作,大大提高了系統的性能和效率。
下面是一個使用 epoll + EPOLLEXCLUSIVE 的示例代碼:
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define PORT 8080
#define MAX_EVENTS 10
int main() {
int server_fd, epfd;
struct sockaddr_in address;
struct epoll_event event, events[MAX_EVENTS];
int new_socket;
ssize_t valread;
char buffer[1024] = {0};
// 創建socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 綁定socket
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) == -1) {
perror("bind failed");
close(server_fd);
exit(EXIT_FAILURE);
}
// 監聽socket
if (listen(server_fd, 10) == -1) {
perror("listen failed");
close(server_fd);
exit(EXIT_FAILURE);
}
// 創建epoll實例
epfd = epoll_create1(0);
if (epfd == -1) {
perror("epoll_create1 failed");
close(server_fd);
exit(EXIT_FAILURE);
}
// 將監聽socket添加到epoll實例中,並設置EPOLLEXCLUSIVE標誌
event.data.fd = server_fd;
event.events = EPOLLIN | EPOLLEXCLUSIVE;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
perror("epoll_ctl: listen_sock");
close(server_fd);
close(epfd);
exit(EXIT_FAILURE);
}
while (1) {
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (n == -1) {
perror("epoll_wait failed");
break;
}
for (int i = 0; i < n; i++) {
if (events[i].data.fd == server_fd) {
new_socket = accept(server_fd, NULL, NULL);
if (new_socket == -1) {
perror("accept failed");
continue;
}
// 處理新連接
valread = read(new_socket, buffer, 1024);
if (valread == -1) {
perror("read failed");
close(new_socket);
} else {
printf("Message from client: %s\n", buffer);
send(new_socket, buffer, strlen(buffer), 0);
}
close(new_socket);
}
}
}
close(server_fd);
close(epfd);
return 0;
}
在上述代碼中,我們首先創建了一個 socket,並將其綁定到指定的端口上,然後創建了一個 epoll 實例 。接着,我們將監聽 socket 添加到 epoll 實例中,並設置了 EPOLLEXCLUSIVE 標誌。在事件循環中,通過 epoll_wait 等待事件的發生,當有新的連接到來時,只有一個進程或線程會被喚醒並處理該連接 。需要注意的是,EPOLLEXCLUSIVE 標誌只能用於監聽 socket,不適用於普通文件或管道。如果在不支持 EPOLLEXCLUSIVE 標誌的低版本 Linux 內核中,我們可以考慮使用其他方法來解決驚羣效應,如負載均衡或線程池模型等 。
4.2 負載均衡(多進程綁定)
使用負載均衡算法(如 SO_REUSEPORT)是另一種有效的解決驚羣效應的方法。從 Linux 3.9 及以上版本開始,內核支持 SO_REUSEPORT 選項,該選項允許多個 socket 綁定到同一個端口 。這樣,當有新的連接到來時,內核會自動將連接分配到不同的進程,實現負載均衡,從而避免了驚羣效應。
下面是一個設置 SO_REUSEPORT 的示例代碼:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#define PORT 8888
#define WORKER 4
int worker(int i) {
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &address.sin_addr);
address.sin_port = htons(PORT);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
if (listenfd == -1) {
perror("socket");
return -1;
}
int val = 1;
// 設置SO_REUSEPORT選項
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) == -1) {
perror("setsockopt");
close(listenfd);
return -1;
}
if (bind(listenfd, (struct sockaddr *)&address, sizeof(address)) == -1) {
perror("bind");
close(listenfd);
return -1;
}
if (listen(listenfd, 5) == -1) {
perror("listen");
close(listenfd);
return -1;
}
while (1) {
struct sockaddr_in client_addr;
socklen_t client_addrlen = sizeof(client_addr);
int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_addrlen);
if (connfd == -1) {
perror("accept");
continue;
}
char buffer[1024];
ssize_t nbytes = recv(connfd, buffer, sizeof(buffer), 0);
if (nbytes > 0) {
buffer[nbytes] = '\0';
printf("Worker %d received: %s\n", i, buffer);
send(connfd, buffer, nbytes, 0);
}
close(connfd);
}
close(listenfd);
return 0;
}
int main() {
int i;
for (i = 0; i < WORKER; i++) {
pid_t pid = fork();
if (pid == -1) {
perror("fork");
return -1;
} else if (pid == 0) {
worker(i);
exit(0);
}
}
for (i = 0; i < WORKER; i++) {
wait(NULL);
}
return 0;
}
在這個示例中,我們創建了多個子進程,每個子進程都創建一個 socket,並設置 SO_REUSEPORT 選項,然後綁定到同一個端口 。當有新的連接到來時,內核會自動將連接分配給不同的子進程,實現了負載均衡,有效地避免了驚羣效應 。這種方式使得每個進程都能獨立地處理自己的連接請求,減少了進程間的競爭和無效喚醒,提高了系統的併發處理能力 。
4.3 線程池模型
線程池模型是一種常見的解決驚羣效應的方法,它通過預先創建一定數量的線程,將這些線程放入線程池中,當有新的連接請求到來時,主線程將請求分配給線程池中的一個線程進行處理 。這樣可以減少線程的頻繁創建與銷燬,降低上下文切換成本,提高系統的性能和響應速度。
線程池模型的工作原理如下:
-
主線程:負責監聽 socket,接收新的連接請求 。當有新連接到來時,主線程將連接請求放入任務隊列中,並通知線程池中的線程有新任務到來 。
-
線程池:包含若干個預先創建好的線程,這些線程處於空閒狀態時會在條件變量上等待 。當任務隊列中有新任務時,線程池中的一個線程會被喚醒,從任務隊列中取出任務並進行處理 。處理完成後,線程將再次回到條件變量上等待新任務 。
下面是一個簡單的線程池模型示例代碼:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#define PORT 8080
#define THREAD_NUM 4
#define QUEUE_SIZE 100
typedef struct {
int fd;
} Task;
typedef struct {
Task queue[QUEUE_SIZE];
int front;
int rear;
pthread_mutex_t mutex;
pthread_cond_t cond;
} TaskQueue;
TaskQueue taskQueue;
void initTaskQueue() {
taskQueue.front = 0;
taskQueue.rear = 0;
pthread_mutex_init(&taskQueue.mutex, NULL);
pthread_cond_init(&taskQueue.cond, NULL);
}
void enqueueTask(int fd) {
pthread_mutex_lock(&taskQueue.mutex);
while ((taskQueue.rear + 1) % QUEUE_SIZE == taskQueue.front) {
pthread_cond_wait(&taskQueue.cond, &taskQueue.mutex);
}
taskQueue.queue[taskQueue.rear] = (Task){fd};
taskQueue.rear = (taskQueue.rear + 1) % QUEUE_SIZE;
pthread_cond_signal(&taskQueue.cond);
pthread_mutex_unlock(&taskQueue.mutex);
}
int dequeueTask() {
pthread_mutex_lock(&taskQueue.mutex);
while (taskQueue.front == taskQueue.rear) {
pthread_cond_wait(&taskQueue.cond, &taskQueue.mutex);
}
int fd = taskQueue.queue[taskQueue.front].fd;
taskQueue.front = (taskQueue.front + 1) % QUEUE_SIZE;
pthread_cond_signal(&taskQueue.cond);
pthread_mutex_unlock(&taskQueue.mutex);
return fd;
}
void *worker(void *arg) {
while (1) {
int fd = dequeueTask();
char buffer[1024] = {0};
ssize_t valread = read(fd, buffer, 1024);
if (valread == -1) {
perror("read failed");
} else {
printf("Message from client: %s\n", buffer);
send(fd, buffer, strlen(buffer), 0);
}
close(fd);
}
return NULL;
}
int main() {
int server_fd, new_socket;
struct sockaddr_in address;
pthread_t threads[THREAD_NUM];
// 創建socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 綁定socket
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) == -1) {
perror("bind failed");
close(server_fd);
exit(EXIT_FAILURE);
}
// 監聽socket
if (listen(server_fd, 10) == -1) {
perror("listen failed");
close(server_fd);
exit(EXIT_FAILURE);
}
initTaskQueue();
// 創建線程池
for (int i = 0; i < THREAD_NUM; i++) {
if (pthread_create(&threads[i], NULL, worker, NULL) != 0) {
perror("pthread_create failed");
exit(EXIT_FAILURE);
}
}
while (1) {
struct sockaddr_in client_addr;
socklen_t client_addrlen = sizeof(client_addr);
new_socket = accept(server_fd, (struct sockaddr *)&client_addr, &client_addrlen);
if (new_socket == -1) {
perror("accept failed");
continue;
}
enqueueTask(new_socket);
}
// 等待線程結束
for (int i = 0; i < THREAD_NUM; i++) {
pthread_join(threads[i], NULL);
}
close(server_fd);
pthread_mutex_destroy(&taskQueue.mutex);
pthread_cond_destroy(&taskQueue.cond);
return 0;
}
在這個示例中,我們創建了一個任務隊列和一個線程池 。主線程負責監聽 socket,接收新的連接請求,並將連接請求放入任務隊列中 。線程池中的線程從任務隊列中取出任務並進行處理 。通過這種方式,我們有效地避免了驚羣效應,提高了系統的性能和併發處理能力 。
4.4 自旋鎖 + 喚醒機制優化
在一些鎖競爭比較激烈的場景下,我們可以通過使用自旋鎖或優化喚醒機制來解決驚羣效應 。自旋鎖是一種特殊的鎖,當一個線程嘗試獲取自旋鎖時,如果鎖已經被其他線程持有,該線程不會立即進入睡眠狀態,而是在原地不斷地嘗試獲取鎖,直到獲取到鎖爲止 。這種方式可以減少線程的上下文切換開銷,提高系統的性能 。
除了自旋鎖,我們還可以通過優化喚醒機制來控制線程的喚醒順序,確保每次只喚醒一個等待線程 。例如,我們可以結合條件變量(condition variable)和信號量(semaphore)來實現更精細的喚醒控制 。
下面是一個簡單的使用自旋鎖和條件變量來優化喚醒機制的示例代碼:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define THREAD_NUM 10
pthread_spinlock_t spinlock;
pthread_cond_t cond;
int shared_resource = 0;
void *thread_function(void *arg) {
int id = *(int *)arg;
pthread_spin_lock(&spinlock);
while (shared_resource == 0) {
pthread_cond_wait(&cond, &spinlock);
}
// 處理共享資源
printf("Thread %d is processing shared resource\n", id);
shared_resource--;
pthread_spin_unlock(&spinlock);
return NULL;
}
int main() {
pthread_t threads[THREAD_NUM];
int thread_ids[THREAD_NUM];
pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE);
pthread_cond_init(&cond, NULL);
// 創建線程
for (int i = 0; i < THREAD_NUM; i++) {
thread_ids[i] = i;
if (pthread_create(&threads[i], NULL, thread_function, &thread_ids[i]) != 0) {
perror("pthread_create failed");
return 1;
}
}
// 模擬資源準備好
pthread_spin_lock(&spinlock);
shared_resource = 1;
pthread_cond_signal(&cond);
pthread_spin_unlock(&spinlock);
// 等待線程結束
for (int i = 0; i < THREAD_NUM; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("pthread_join failed");
return 1;
}
}
pthread_spin_destroy(&spinlock);
pthread_cond_destroy(&cond);
return 0;
}
在這個示例中,我們使用了自旋鎖和條件變量來保護共享資源 。當共享資源準備好時,主線程通過條件變量喚醒一個等待的線程,而不是喚醒所有線程,從而避免了驚羣效應 。通過這種方式,我們可以有效地提高系統在鎖競爭場景下的性能和效率 。
Part5epoll 驚羣案例實戰********
還是先說結論:ET 模式下不存在驚羣問題,LT 模式下存在。
下面通過代碼驗證,再分析實現流程。
root@master:~# cat epoll_thunder.c
#include<stdio.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<string.h>
#include<netinet/in.h>
#include<unistd.h>
#include <errno.h>
#include<sys/epoll.h>
#define MAXEVENTS 64
#define PROCESS_NUM 10
int main()
{
int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int connfd;
int pid;
int i, epoll_fd, new_fd, ret=0, num;
struct epoll_event event;
struct epoll_event *events;
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(2222);
bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
listen(fd, 1024);
if ((epoll_fd = epoll_create(MAXEVENTS))< 0) {
perror("epoll_create");
exit(1);
}
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
//event.events = EPOLLIN;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0){
perror("epoll_ctl");
exit(1);
}
events = calloc(MAXEVENTS, sizeof(event));
for(i = 0; i < PROCESS_NUM; ++i) {
pid = fork();
if(pid == 0) {
while (1) {
printf("I'm pid: %d, epoll on : %d\n", getpid(), fd);
num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
if (num < 0) {
printf("epoll_wait failed %d, on pid %d\n", errno, getpid());
continue;
}
for(i = 0; i < num; ++i){
if((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))){
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
}else if(fd == events[i].data.fd){
new_fd = accept(fd, (struct sockaddr *)NULL, NULL);;
if(new_fd < 0)
{
printf("accept failed: %d on pid: %d\n", errno, getpid());
printf("\n");
continue;
}
printf("new read event: accept new_fd: %d on pid: %d\n", new_fd, getpid());
close(new_fd);
}
}
}
}
}
wait(0);
return 0;
}
在 epoll_thunder.c 中,如果 events 加上標誌 EPOLLET 就是 ET 模式,不加的話默認是 LT 模式。
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event)
ET 模式測試結果如下,始終只有一個進程接收新連接,並且是同一個進程。這是因爲調用 epoll_wait 堵塞時,添加 wait 節點加到了 ep 等待隊列頭部,並且是 exclusive 的,而喚醒進程時總是從隊列頭部開始,由於設置了 exclusive,所以只好喚醒一個進程。
**爲什麼總是同一個進程呢?**是因爲第一個被喚醒的進程馬上又調用 epoll_wait 將其再次加入等待隊列頭部,所以下次事件到來時仍然喚醒同一個進程。
root@ubuntu:/home/jk/socket# ./epoll
I'm pid: 119280, epoll on : 3
I'm pid: 119281, epoll on : 3
I'm pid: 119282, epoll on : 3
I'm pid: 119285, epoll on : 3
I'm pid: 119283, epoll on : 3
I'm pid: 119287, epoll on : 3
I'm pid: 119284, epoll on : 3
I'm pid: 119286, epoll on : 3
I'm pid: 119288, epoll on : 3
I'm pid: 119289, epoll on : 3
new read event: accept new_fd: 5 on pid: 119289
I'm pid: 119289, epoll on : 3
new read event: accept new_fd: 5 on pid: 119289
I'm pid: 119289, epoll on : 3
new read event: accept new_fd: 5 on pid: 119289
I'm pid: 119289, epoll on : 3
下面將進程被喚醒後 sleep 一段時間 20s,然後用 client 連接 server,會發現新連接總是被等待隊列頭部的進程處理。
root@master:~# ./epoll
I'm pid: 27656, epoll on : 3
I'm pid: 27657, epoll on : 3
I'm pid: 27658, epoll on : 3
I'm pid: 27659, epoll on : 3
I'm pid: 27660, epoll on : 3
I'm pid: 27661, epoll on : 3
I'm pid: 27662, epoll on : 3
I'm pid: 27663, epoll on : 3
I'm pid: 27664, epoll on : 3
I'm pid: 27665, epoll on : 3
new read event: accept new_fd: 5 on pid: 27665 -->第一次連接被27665進程處理,然後sleep 20s
new read event: accept new_fd: 5 on pid: 27664 -->第二次連接被27664進程處理,然後sleep 20s
new read event: accept new_fd: 5 on pid: 27663 -->第三次連接被27663進程處理,然後sleep 20s
I'm pid: 27665, epoll on : 3 -->進程27665 sleep結束,重新開始wait
new read event: accept new_fd: 5 on pid: 27665 -->第四次連接又被27663進程處理
LT 模式下,測試結果如下,從結果看,好像也是隻有一個進程被喚醒了,但是實際上喚醒進程不只一個
root@ubuntu:/home/jk/socket# ./epoll
I'm pid: 119318, epoll on : 3
I'm pid: 119319, epoll on : 3
I'm pid: 119320, epoll on : 3
I'm pid: 119324, epoll on : 3
I'm pid: 119321, epoll on : 3
I'm pid: 119326, epoll on : 3
I'm pid: 119322, epoll on : 3
I'm pid: 119323, epoll on : 3
I'm pid: 119325, epoll on : 3
I'm pid: 119327, epoll on : 3
new read event: accept new_fd: 5 on pid: 119327
I'm pid: 119327, epoll on : 3
new read event: accept new_fd: 5 on pid: 119327
I'm pid: 119327, epoll on : 3
new read event: accept new_fd: 5 on pid: 119327
I'm pid: 119327, epoll on : 3
LT 模式下,如果添加 sleep 後,就會看到所有進程都被喚醒了。喚醒多少個進程和被喚醒進程處理時間長短有關係,如果處理越快,喚醒的進程越少。
num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
if (num < 0) {
printf("epoll_wait failed %d, on pid %d\n", errno, getpid());
continue;
}
sleep(2);
再次執行,可看到所有進程都被喚醒了
root@ubuntu:/home/jk/socket# ./epoll
I'm pid: 119480, epoll on : 3
I'm pid: 119481, epoll on : 3
I'm pid: 119482, epoll on : 3
I'm pid: 119483, epoll on : 3
I'm pid: 119484, epoll on : 3
I'm pid: 119485, epoll on : 3
I'm pid: 119486, epoll on : 3
I'm pid: 119487, epoll on : 3
I'm pid: 119488, epoll on : 3
I'm pid: 119489, epoll on : 3
new read event: accept new_fd: 5 on pid: 119489
accept failed: 11 on pid: 119488
I'm pid: 119488, epoll on : 3
I'm pid: 119489, epoll on : 3
accept failed: 11 on pid: 119487
I'm pid: 119487, epoll on : 3
accept failed: 11 on pid: 119486
I'm pid: 119486, epoll on : 3
accept failed: 11 on pid: 119485
I'm pid: 119485, epoll on : 3
accept failed: 11 on pid: 119484
I'm pid: 119484, epoll on : 3
accept failed: 11 on pid: 119483
I'm pid: 119483, epoll on : 3
accept failed: 11 on pid: 119482
I'm pid: 119482, epoll on : 3
accept failed: 11 on pid: 119481
I'm pid: 119481, epoll on : 3
accept failed: 11 on pid: 119480
I'm pid: 119480, epoll on : 3
代碼分析
調用 epll_wait 時,如果需要堵塞等待,則將調用進程加入到 ep 等待隊列中,設置 exclusive,並且添加到隊列頭部,如果有新連接到來,也只會喚醒一個進程。
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);
這樣看貌似 epoll_wait 已經解決了驚羣問題,但在 LT 和 ET 模式下處理流程的差別導致了 LT 模式下驚羣問題。
假如調用 epoll_ctl 將一個 fd 加入到 epoll 進行監聽,會調用目標文件 fd 的 poll 函數,將 wait 節點 (fd 上有新事件發生時,調用 ep_poll_callback 喚醒監聽進程) 加入到目標文件 fd 的等待隊列中,再 fork 十個進程調用 epoll_wait 等待事件到來, 這樣就會有 10 個進程堵塞在 ep 的等待隊列中(如果有事件發生時,則調用 default_wake_function 喚醒堵塞進程)。
如果此時 client 和 server 完成了三次握手,則會調用 socket 的 fd 等待隊列上的 task,即會調用 ep_poll_callback,將發生事件的 fd 添加到就緒鏈表 rdlist 中,如果 ep 的等待隊列中不爲空 (此例不爲空,有 10 個節點),則會喚醒第一個進程 (因爲添加了 exclusive 標誌,所以只會喚醒第一個進程)。
ep_poll_callback
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
第一個進程被喚醒後,首先將自己從 ep 的等待隊列中刪除,然後調用 rdlist 上發生事件的 fd 的 poll 函數獲取發生的事件,將其傳遞到用戶程序,如果是用戶程序感興趣的事件,用戶程序再調用 accept 接收新連接。
ep_send_events_proc
revents = ep_item_poll(epi, &pt);
if (revents) {
__put_user(revents, &uevent->events)
如果是 LT 模式,則將目標文件再次添加到就緒鏈表 rdlist,如果是 ET 模式,就不會將目標文件再次添加到就緒鏈表 rdlist。
ep_send_events_proc
revents = ep_item_poll(epi, &pt);
if (revents) {
if (!(epi->event.events & EPOLLET))
list_add_tail(&epi->rdllink, &ep->rdllist);
在 LT 模式下,rdlist 不爲空,並且 ep->wq 中還有 9 個進程在堵塞等待,則又會喚醒第二個進程;第二個進程被喚醒後,後面處理方式和第一個進程相同。如果第一個進程調用 accept 把事件取走了,則第二個進程調用目標文件 poll 函數時就得不到事件,ep->wq 中的其他進程就不會被喚醒了;如果每個被喚醒進程調用 accept 的時間更長,則會喚醒更多的進程。
在 ET 模式下,rdlist 爲空,就不會喚醒等待隊列上的其他進程了。epoll 下涉及兩個等待隊列:
-
a. 調用 epoll_ctl 時,將進程添加到目標文件的等待隊列中,目標文件發生事件時調用 ep_poll_callback,判斷如果 ep 等待隊列不爲空,則喚醒 ep 等待隊列上的進程。
-
b. 調用 epoll_wait 時,將進程添加到 ep 的等待隊列中。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/nN4sYAz49p7aM9OC2OIXbQ