用反應器模式和 epoll 構建百萬併發服務器並測試

此處的百萬併發指的是可以建立至少 100w 個客戶端連接,不考慮業務處理。

反應器模式下的 epoll 相比起普通的 epoll 不同在於:普通的 epoll 在獲取到就緒狀態的 event 結構體之後,先判斷是什麼類型的 fd,再進行操作。而 reactor 先判斷是什麼類型的事件,再進行操作。本文從頭用 reactor 設計模式來構建一個 epoll 服務器,這個過程中每次發生 IO 事件都要注意維護用戶空間的數據結構和內核的 epoll 實例,下面是構建百萬併發服務器的詳細地步驟(暫不包含併發量測試):

導入相關的包並設置宏


聲明事件處理函數


構建存儲數據結構


設計一個結構體,用於存儲 IO 相關的信息,並建立全局數組。後續需要不斷地維護該數組,一旦發生了 IO 事件就需要進行更新。結構體的內容包括:1、文件描述符。2、接收到的數據和數據長度。3、要發送的數據和數據長度。4、用於處理接收數據的回調函數,請求連接和連接成功後接收到數據,要調用不同的處理函數。5、用於處理發送數據的回調函數。

編寫 epoll 實例的維護函數


包括修改節點和增加節點的步驟

實現 accept 事件的回調函數


某個文件描述符接收到數據之後,相應地要維護全局數組,以及 epoll 實例。全局數組將相應的節點的 fd 屬性進行修改,並初始化數據區爲 0,同時還要選擇處理接收數據的回調函數。

實現 recv 事件的回調函數


除了相應的數據區,還要注意修改 epoll 紅黑樹裏相應節點的狀態。

實現 send 事件的回調函數


較爲簡單,全局數組不需要維護,但是內核的 epoll 實例需要維護。

編寫初始化服務器的函數


相關視頻推薦


6 種 epoll 的設計,讓你吊打面試官

手把手帶你實現 epoll 組件,爲 tcp 併發的實現 epoll

準備 4 臺虛擬機,我們一起來實現服務器百萬級併發

建立紅黑樹實例並建立服務器套接字


一共要建立 20 個服務器套接字。同樣的,每成功建立一次,都要維護全局數組和 epoll 實例。注意套接字描述符的接收數據回調函數選擇是 accept_cb;

檢測就緒狀態的 fd 並處理


根據檢測到的事件進行處理,而不是根據文件描述符是服務器套接字還是通信套接字進行處理。值得注意是處理 EPOLLIN 事件中,雖然代碼上調用的都是 recv_callback 函數,但實際上如果 i 是服務器套接字,那麼調用的依然是 accpet_callback。這是因爲 union 特性,union 的屬性如果都是同一類型,那麼進行賦值之後,無論調用哪一個屬性都是都可以調用同一個值。

百萬併發


  1. 整個過程涉及到一些時間處理的細節,是爲了計算服務器處理併發的效率。

  2. 全局數組設置很大,這是爲了處理百萬併發,不是很科學但是能用。

  3. 百萬併發處理一方面體現在三方面,全局數組的大小,epoll 模型,服務器套接字的數量。

測試


測試用客戶端代碼


#include <stdio.h>
#include <string.h>
#include <stdlib.h>
 
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/time.h>
#include <unistd.h>
 
#define MAX_BUFFER              128
#define MAX_EPOLLSIZE   (384*1024)
#define MAX_PORT                20
 
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
 
int isContinue = 0;
 
static int ntySetNonblock(int fd) {
        int flags;
 
        flags = fcntl(fd, F_GETFL, 0);
        if (flags < 0) return flags;
        flags |= O_NONBLOCK;
        if (fcntl(fd, F_SETFL, flags) < 0) return -1;
        return 0;
}
 
static int ntySetReUseAddr(int fd) {
        int reuse = 1;
        return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
 
 
 
int main(int argc, char **argv) {
        if (argc <= 2) {
                printf("Usage: %s ip port\n", argv[0]);
                exit(0);
        }
 
        const char *ip = argv[1];
        int port = atoi(argv[2]);
        int connections = 0;
        char buffer[128] = {0};
        int i = 0, index = 0;
 
        struct epoll_event events[MAX_EPOLLSIZE];
 
        int epoll_fd = epoll_create(MAX_EPOLLSIZE);
 
        strcpy(buffer, " Data From MulClient\n");
 
        struct sockaddr_in addr;
        memset(&addr, 0, sizeof(struct sockaddr_in));
 
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = inet_addr(ip);
 
        struct timeval tv_begin;
        gettimeofday(&tv_begin, NULL);
 
        while (1) {
                if (++index >= MAX_PORT) index = 0;
 
                struct epoll_event ev;
                int sockfd = 0;
 
                if (!isContinue) {
                        sockfd = socket(AF_INET, SOCK_STREAM, 0);
                        if (sockfd == -1) {
                                perror("socket");
                                goto err;
                        }
 
                        //ntySetReUseAddr(sockfd);
                        addr.sin_port = htons(port+index);
 
                        if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
                                perror("connect");
                                goto err;
                        }
                        //ntySetNonblock(sockfd);
                        ntySetReUseAddr(sockfd);
 
                        //sprintf(buffer, "Hello Server: client --> %d\n", connections);
                        //send(sockfd, buffer, strlen(buffer), 0);
 
                        ev.data.fd = sockfd;
                        ev.events = EPOLLIN;
                        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
 
                        connections ++;
                }
                //connections ++;
                if (connections % 1000 == 999) {
                        struct timeval tv_cur;
                        memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
 
                        gettimeofday(&tv_begin, NULL);
 
                        int time_used = TIME_SUB_MS(tv_begin, tv_cur);
                        printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);
 
                        int nfds = epoll_wait(epoll_fd, events, connections, 100);
                        for (i = 0;i < nfds;i ++) {
                                int clientfd = events[i].data.fd;
 
                                if (events[i].events & EPOLLOUT) {
                                        sprintf(buffer, "data from %d\n", clientfd);
                                        send(sockfd, buffer, strlen(buffer), 0);
                                } else if (events[i].events & EPOLLIN) {
                                        char rBuffer[MAX_BUFFER] = {0};
                                        ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
                                        if (length > 0) {
                                                printf(" RecvBuffer:%s\n", rBuffer);
 
                                                if (!strcmp(rBuffer, "quit")) {
                                                        isContinue = 0;
                                                }
 
                                        } else if (length == 0) {
                                                printf(" Disconnect clientfd:%d\n", clientfd);
                                                connections --;
                                                close(clientfd);
                                        } else {
                                                if (errno == EINTR) continue;
 
                                                printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
                                                close(clientfd);
                                        }
                                } else {
                                        printf(" clientfd:%d, errno:%d\n", clientfd, errno);
                                        close(clientfd);
                                }
                        }
                }
 
                usleep(100);
        }
 
        return 0;
 
err:
        printf("error : %s\n", strerror(errno));
        return 0;
 
}

需要的環境,1 臺 8G8 核的 ubuntu(配置服務器),3 臺 4G4 核的 ubuntu(配置客戶端,每個客戶端負責的連接數儘量平均,也就是 33.33w)。如果電腦資源足夠,性能夠好,建議把配置做大些,問題會少不少。

解決連接上限問題


先用一臺客戶端嘗試向服務器發起大量連接,發現報錯。

再看看服務器的情況,同樣是報錯。

我們用 ulimit -a 查看服務器情況發現是一個進程最多能打開 1024 個文件,也就是建立 1024 個文件描述符。於是用 ulimit -n 解決,客戶端和服務器都需要解決這些問題。

解決五元不組問題


服務器每建立一個連接,都會在內核中建立一個 tcp 控制塊,簡稱 tcb。tcb 包含了以下元素:源 ip,目的 ip,源 port,目的 port,協議類型(tcp,udp 等)。

通常協議類型和目的 ip(服務器 ip)是固定下來的。百萬併發確保每個連接都對應剩下 3 個元素的不同的排列組合,若不能保證,產生的問題叫 “五元不組”。

源 port 一般不被客戶端指定,一臺機子可用的 port 大概是(65535-1024)個,1024 是因爲部分非用戶主動空控制的程序也在使用端口,在高併發下情況下 1024-65535 可能會無法用盡。

因此解決 “五元不組” 通常由以下幾種解法:

  1. 源 ip 多樣化:多準備幾個不同 ip 的客戶端。

  2. 目的 port 多樣化:讓服務器多開幾個 listen port 並且負載均衡。

  3. 源 port 多樣化,修改 / etc/sysctl.conf 文件,明確指定客戶端在本地的端口的可用範圍。

本文中兩種方法都用,基於解法 1 配置了 3 臺不同的 ubuntu,每臺上都有 1 個客戶端。基於解法 2 服務器開了 20 個監聽端口,並且客戶端循環地進行 connect。

基於解法 3,則需要修改客戶端所在的 ubuntu 的配置文件。

如圖,在相關配置文件的末尾添加一條語句:

解決內存泄漏的問題


服務器每建立一個 clientfd,都需要相應的數據結構儲存 clinetfd。普通的數組往往容易發生越棧從而發生段錯誤,不瞭解的人往往會以爲這是因爲代碼的邏輯錯誤。我們在服務器中建立了百萬長度的數組進行存儲,不夠優雅但是有效。

解決效率受限的問題


在併發連接的時候,經常連着連着就發現,每一千個連接的耗時突然暴增,這是受到了系統級別的文件描述符數量的限制。也需要調整。往客戶端的配置文件裏添加這句話。

該命令與 ulimit -n 的差別在於作用的級別不同,前者作用於系統層面,需要 root 權限,後者作用於當前用戶的會話級別。

解決進程制終止問題


進程被強制終止可能發生在客戶端,也有可能發生在服務器。

有些機子可能會發生服務器進程 CPU 和內存消耗過大,被強制 kill 掉的情況。這是由兩種解法:

  1. 把配置做高,內存更大。

  2. 修改內核 tcp 協議棧內存、發送緩衝區、接收緩衝區的大小。

CPU 和內核消耗的監控命令:htop

在 conf 文件中可以繼續添加以下語句,分別用於修改內核 tcp 協議棧內存、單個連接發送緩衝區、單個連接接收緩衝區的最小值、默認值、最大值,可以逐步逐步地進行測試什麼數據合適。

注意,也有可能是客戶端配置不夠高,發生了崩潰,導致服務器大量斷連從而也崩潰。也有可能是因爲進行被強制終止,內存被強制回收導致的。重點是用 htop 觀察誰的內存爆了。

成功完成百萬併發測試


原文地址:https://blog.csdn.net/weixin_73240939/article/details/134917698?spm=1001.2014.3001.5501

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/WZW2qQP7bMEhaqA19kNgRQ