用反應器模式和 epoll 構建百萬併發服務器並測試
此處的百萬併發指的是可以建立至少 100w 個客戶端連接,不考慮業務處理。
反應器模式下的 epoll 相比起普通的 epoll 不同在於:普通的 epoll 在獲取到就緒狀態的 event 結構體之後,先判斷是什麼類型的 fd,再進行操作。而 reactor 先判斷是什麼類型的事件,再進行操作。本文從頭用 reactor 設計模式來構建一個 epoll 服務器,這個過程中每次發生 IO 事件都要注意維護用戶空間的數據結構和內核的 epoll 實例,下面是構建百萬併發服務器的詳細地步驟(暫不包含併發量測試):
導入相關的包並設置宏
聲明事件處理函數
構建存儲數據結構
設計一個結構體,用於存儲 IO 相關的信息,並建立全局數組。後續需要不斷地維護該數組,一旦發生了 IO 事件就需要進行更新。結構體的內容包括:1、文件描述符。2、接收到的數據和數據長度。3、要發送的數據和數據長度。4、用於處理接收數據的回調函數,請求連接和連接成功後接收到數據,要調用不同的處理函數。5、用於處理發送數據的回調函數。
編寫 epoll 實例的維護函數
包括修改節點和增加節點的步驟
實現 accept 事件的回調函數
某個文件描述符接收到數據之後,相應地要維護全局數組,以及 epoll 實例。全局數組將相應的節點的 fd 屬性進行修改,並初始化數據區爲 0,同時還要選擇處理接收數據的回調函數。
實現 recv 事件的回調函數
除了相應的數據區,還要注意修改 epoll 紅黑樹裏相應節點的狀態。
實現 send 事件的回調函數
較爲簡單,全局數組不需要維護,但是內核的 epoll 實例需要維護。
編寫初始化服務器的函數
相關視頻推薦
手把手帶你實現 epoll 組件,爲 tcp 併發的實現 epoll
建立紅黑樹實例並建立服務器套接字
一共要建立 20 個服務器套接字。同樣的,每成功建立一次,都要維護全局數組和 epoll 實例。注意套接字描述符的接收數據回調函數選擇是 accept_cb;
檢測就緒狀態的 fd 並處理
根據檢測到的事件進行處理,而不是根據文件描述符是服務器套接字還是通信套接字進行處理。值得注意是處理 EPOLLIN 事件中,雖然代碼上調用的都是 recv_callback 函數,但實際上如果 i 是服務器套接字,那麼調用的依然是 accpet_callback。這是因爲 union 特性,union 的屬性如果都是同一類型,那麼進行賦值之後,無論調用哪一個屬性都是都可以調用同一個值。
百萬併發
-
整個過程涉及到一些時間處理的細節,是爲了計算服務器處理併發的效率。
-
全局數組設置很大,這是爲了處理百萬併發,不是很科學但是能用。
-
百萬併發處理一方面體現在三方面,全局數組的大小,epoll 模型,服務器套接字的數量。
測試
測試用客戶端代碼
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/time.h>
#include <unistd.h>
#define MAX_BUFFER 128
#define MAX_EPOLLSIZE (384*1024)
#define MAX_PORT 20
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int isContinue = 0;
static int ntySetNonblock(int fd) {
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0) return -1;
return 0;
}
static int ntySetReUseAddr(int fd) {
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
int main(int argc, char **argv) {
if (argc <= 2) {
printf("Usage: %s ip port\n", argv[0]);
exit(0);
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int connections = 0;
char buffer[128] = {0};
int i = 0, index = 0;
struct epoll_event events[MAX_EPOLLSIZE];
int epoll_fd = epoll_create(MAX_EPOLLSIZE);
strcpy(buffer, " Data From MulClient\n");
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
while (1) {
if (++index >= MAX_PORT) index = 0;
struct epoll_event ev;
int sockfd = 0;
if (!isContinue) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
goto err;
}
//ntySetReUseAddr(sockfd);
addr.sin_port = htons(port+index);
if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
perror("connect");
goto err;
}
//ntySetNonblock(sockfd);
ntySetReUseAddr(sockfd);
//sprintf(buffer, "Hello Server: client --> %d\n", connections);
//send(sockfd, buffer, strlen(buffer), 0);
ev.data.fd = sockfd;
ev.events = EPOLLIN;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
connections ++;
}
//connections ++;
if (connections % 1000 == 999) {
struct timeval tv_cur;
memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
gettimeofday(&tv_begin, NULL);
int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);
int nfds = epoll_wait(epoll_fd, events, connections, 100);
for (i = 0;i < nfds;i ++) {
int clientfd = events[i].data.fd;
if (events[i].events & EPOLLOUT) {
sprintf(buffer, "data from %d\n", clientfd);
send(sockfd, buffer, strlen(buffer), 0);
} else if (events[i].events & EPOLLIN) {
char rBuffer[MAX_BUFFER] = {0};
ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
if (length > 0) {
printf(" RecvBuffer:%s\n", rBuffer);
if (!strcmp(rBuffer, "quit")) {
isContinue = 0;
}
} else if (length == 0) {
printf(" Disconnect clientfd:%d\n", clientfd);
connections --;
close(clientfd);
} else {
if (errno == EINTR) continue;
printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
} else {
printf(" clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
}
}
usleep(100);
}
return 0;
err:
printf("error : %s\n", strerror(errno));
return 0;
}
需要的環境,1 臺 8G8 核的 ubuntu(配置服務器),3 臺 4G4 核的 ubuntu(配置客戶端,每個客戶端負責的連接數儘量平均,也就是 33.33w)。如果電腦資源足夠,性能夠好,建議把配置做大些,問題會少不少。
解決連接上限問題
先用一臺客戶端嘗試向服務器發起大量連接,發現報錯。
再看看服務器的情況,同樣是報錯。
我們用 ulimit -a 查看服務器情況發現是一個進程最多能打開 1024 個文件,也就是建立 1024 個文件描述符。於是用 ulimit -n 解決,客戶端和服務器都需要解決這些問題。
解決五元不組問題
服務器每建立一個連接,都會在內核中建立一個 tcp 控制塊,簡稱 tcb。tcb 包含了以下元素:源 ip,目的 ip,源 port,目的 port,協議類型(tcp,udp 等)。
通常協議類型和目的 ip(服務器 ip)是固定下來的。百萬併發確保每個連接都對應剩下 3 個元素的不同的排列組合,若不能保證,產生的問題叫 “五元不組”。
源 port 一般不被客戶端指定,一臺機子可用的 port 大概是(65535-1024)個,1024 是因爲部分非用戶主動空控制的程序也在使用端口,在高併發下情況下 1024-65535 可能會無法用盡。
因此解決 “五元不組” 通常由以下幾種解法:
-
源 ip 多樣化:多準備幾個不同 ip 的客戶端。
-
目的 port 多樣化:讓服務器多開幾個 listen port 並且負載均衡。
-
源 port 多樣化,修改 / etc/sysctl.conf 文件,明確指定客戶端在本地的端口的可用範圍。
本文中兩種方法都用,基於解法 1 配置了 3 臺不同的 ubuntu,每臺上都有 1 個客戶端。基於解法 2 服務器開了 20 個監聽端口,並且客戶端循環地進行 connect。
基於解法 3,則需要修改客戶端所在的 ubuntu 的配置文件。
如圖,在相關配置文件的末尾添加一條語句:
解決內存泄漏的問題
服務器每建立一個 clientfd,都需要相應的數據結構儲存 clinetfd。普通的數組往往容易發生越棧從而發生段錯誤,不瞭解的人往往會以爲這是因爲代碼的邏輯錯誤。我們在服務器中建立了百萬長度的數組進行存儲,不夠優雅但是有效。
解決效率受限的問題
在併發連接的時候,經常連着連着就發現,每一千個連接的耗時突然暴增,這是受到了系統級別的文件描述符數量的限制。也需要調整。往客戶端的配置文件裏添加這句話。
該命令與 ulimit -n 的差別在於作用的級別不同,前者作用於系統層面,需要 root 權限,後者作用於當前用戶的會話級別。
解決進程制終止問題
進程被強制終止可能發生在客戶端,也有可能發生在服務器。
有些機子可能會發生服務器進程 CPU 和內存消耗過大,被強制 kill 掉的情況。這是由兩種解法:
-
把配置做高,內存更大。
-
修改內核 tcp 協議棧內存、發送緩衝區、接收緩衝區的大小。
CPU 和內核消耗的監控命令:htop
在 conf 文件中可以繼續添加以下語句,分別用於修改內核 tcp 協議棧內存、單個連接發送緩衝區、單個連接接收緩衝區的最小值、默認值、最大值,可以逐步逐步地進行測試什麼數據合適。
注意,也有可能是客戶端配置不夠高,發生了崩潰,導致服務器大量斷連從而也崩潰。也有可能是因爲進行被強制終止,內存被強制回收導致的。重點是用 htop 觀察誰的內存爆了。
成功完成百萬併發測試
原文地址:https://blog.csdn.net/weixin_73240939/article/details/134917698?spm=1001.2014.3001.5501
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/WZW2qQP7bMEhaqA19kNgRQ