epoll 使用詳解(精髓)

epoll 和 select


相比於 select,epoll 最大的好處在於它不會隨着監聽 fd 數目的增長而降低效率。因爲在內核中的 select 實現中,它是採用輪詢來處理的,輪詢的 fd 數目越多,自然耗時越多。

並且,在 linux/posix_types.h 頭文件有這樣的聲明:

#define __FD_SETSIZE    1024

表示 select 最多同時監聽 1024 個 fd,當然,可以通過修改頭文件再重編譯內核來擴大這個數目,但這似乎並不治本。

一、IO 多路複用的 select


IO 多路複用相對於阻塞式和非阻塞式的好處就是它可以監聽多個 socket ,並且不會消耗過多資源。當用戶進程調用 select 時,它會監聽其中所有 socket 直到有一個或多個 socket 數據已經準備好,否則就一直處於阻塞狀態。select 的缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,select() 所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量的 tcp 鏈接處於非常活躍狀態,但調用 select() 會對所有的 socket 進行一次線性掃描,所以這也浪費了一定的開銷。不過它的好處還有就是它的跨平臺特性。

二、 epoll


epoll 的 ET 是必須對非阻塞的 socket 才能工作,LT 對於阻塞的 socket 也可以

所有 I/O 多路複用操作都是同步的,涵蓋 select/poll。

阻塞 / 非阻塞是相對於同步 I/O 來說的,與異步 I/O 無關。

select/poll/epoll 本身是同步的,可以阻塞也可以不阻塞。

(阻塞和非阻塞 與同步不同步不同;阻塞與否 是自身,異步與否是與外部協作的關係)

skater:

無論是阻塞 I/O、非阻塞 I/O,還是基於非阻塞 I/O 的多路複用都是同步調用。因爲它們在 read 調用時,內核將數據從內核空間拷貝到應用程序空間(epoll 應該是從 mmap),過程都是需要等待的,也就是說這個過程是同步的,如果內核實現的拷貝效率不高,read 調用就會在這個同步過程中等待比較長的時間。

epoll事件:
   EPOLLIN : 表示對應的文件描述符可以讀(包括對端SOCKET正常關閉);
   EPOLLOUT: 表示對應的文件描述符可以寫;
   EPOLLPRI: 表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);
   EPOLLERR: 表示對應的文件描述符發生錯誤;
   EPOLLHUP: 表示對應的文件描述符被掛斷;

epoll 高效的核心是:1、用戶態和內核太共享內存 mmap。2、數據到來採用事件通知機制(而不需要輪詢)。

epoll 的接口


epoll 的接口非常簡單,一共就三個函數:

(1)epoll_create 系統調用

epoll_create 在 C 庫中的原型如下。

int epoll_create(int size);

epoll_create 返回一個句柄,之後 epoll 的使用都將依靠這個句柄來標識。參數 size 是告訴 epoll 所要處理的大致事件數目。不再使用 epoll 時,必須調用 close 關閉這個句柄。

注意:size 參數只是告訴內核這個 epoll 對象會處理的事件大致數目,而不是能夠處理的事件的最大個數。在 Linux 最新的一些內核版本的實現中,這個 size 參數沒有任何意義。

(2)epoll_ctl 系統調用

epoll_ctl 在 C 庫中的原型如下。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);

epoll_ctl 向 epoll 對象中添加、修改或者刪除感興趣的事件,返回 0 表示成功,否則返回–1,此時需要根據 errno 錯誤碼判斷錯誤類型。epoll_wait 方法返回的事件必然是通過 epoll_ctl 添加到 epoll 中的。

參數:

epfd: epoll_create 返回的句柄,

op:的意義見下表:

       EPOLL_CTL_ADD:註冊新的 fd 到 epfd 中;

       EPOLL_CTL_MOD:修改已經註冊的 fd 的監聽事件;

       EPOLL_CTL_DEL:從 epfd 中刪除一個 fd;

fd:需要監聽的 socket 句柄 fd,

event:告訴內核需要監聽什麼事的結構體,struct epoll_event 結構如下:

epoll_data_t;
 
struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
};

__uint32_t events 就要監聽的事件(感興趣的事件):

EPOLLIN :表示對應的文件描述符可以讀(包括對端 SOCKET 正常關閉);

EPOLLOUT:表示對應的文件描述符可以寫;

EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);

EPOLLERR:表示對應的文件描述符發生錯誤;

EPOLLHUP:表示對應的文件描述符被掛斷;

EPOLLET: 將 EPOLL 設爲邊緣觸發 (Edge Triggered) 模式,這是相對於水平觸發 (Level Triggered) 來說的。

EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個 socket 的話,需要再次把這個 socket 加入到 EPOLL 隊列裏

data成員是一個epoll_data聯合,其定義如下:
 
typedef union epoll_data {
 
void *ptr;
 
int fd;
 
uint32_t u32;
 
uint64_t u64;
 
} epoll_data_t;
 
可見,這個 data成員還與具體的使用方式相關。例如,ngx_epoll_module模塊只使用了聯合中的 ptr成員,
作爲指向 ngx_connection_t連接的指針。我們在項目中一般使用的也是 ptr成員,因爲它可以指向任意的結構
體地址。
  1. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epoll_wait 在 C 庫中的原型如下:

int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);

收集在 epoll 監控的事件中已經發生的事件,如果 epoll 中沒有任何一個事件發生,則最多等待 timeout 毫秒後返回。epoll_wait 的返回值表示當前發生的事件個數,如果返回 0,則表示本次調用中沒有事件發生,如果返回–1,則表示出現錯誤,需要檢查 errno 錯誤碼判斷錯誤類型。

epfd:epoll 的描述符。

events:分配好的 epoll_event 結構體數組,epoll 將會把發生的事件複製到 events 數組中(events 不可以是空指針,內核只負責把數據複製到這個 events 數組中,不會去幫助我們在用戶態中分配內存。內核這種做法效率很高)。

maxevents:表示本次可以返回的最大事件數目,通常 maxevents 參數與預分配的 events 數組的大小是相等的。

timeout:表示在沒有檢測到事件發生時最多等待的時間(單位爲毫秒),如果 timeout 爲 0,則表示 epoll_wait 在 rdllist 鏈表中爲空,立刻返回,不會等待。

epoll 有兩種工作模式:LT(水平觸發)模式和 ET(邊緣觸發)模式。

默認情況下,epoll 採用 LT 模式工作,這時可以處理阻塞和非阻塞套接字,而上表中的 EPOLLET 表示可以將一個事件改爲 ET 模式。ET 模式的效率要比 LT 模式高,它只支持非阻塞套接字。

(水平觸發 LT:當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據一次性全部讀寫完 (如讀寫緩衝區太小),那麼下次調用 epoll_wait() 時,它還會通知你在上次沒讀寫完的文件描述符上繼續讀寫

邊緣觸發 ET:當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據全部讀寫完 (如讀寫緩衝區太小),那麼下次調用 epoll_wait() 時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件纔會通知你

此可見,水平觸發時如果系統中有大量你不需要讀寫的就緒文件描述符,而它們每次都會返回,這樣會大大降低處理程序檢索自己關心的就緒文件描述符的效率,而邊緣觸發,則不會充斥大量你不關心的就緒文件描述符,從而性能差異,高下立見。)

如何來使用 epoll


1、包含一個頭文件 #include <sys/epoll.h>

2、create_epoll(int maxfds) 來創建一個 epoll 的句柄,其中 maxfds 爲你 epoll 所支持的最大句柄數。這個函數會返回一個新的 epoll 句柄,之後的所有操作將通過這個句柄來進行操作。在用完之後,記得用 close() 來關閉這個創建出來的 epoll 句柄。

3、之後在你的網絡主循環裏面,每一幀的調用 epoll_wait(int epfd, epoll_event events, int max events, int timeout) 來查詢所有的網絡接口,看哪一個可以讀,哪一個可以寫了。基本的語法爲:

nfds = epoll_wait(kdpfd, events, maxevents, -1);

其中 kdpfd 爲用 epoll_create 創建之後的句柄,events 是一個 epoll_event * 的指針,當 epoll_wait 這個函數操作成功之後,epoll_events 裏面將儲存所有的讀寫事件。max_events 是當前需要監聽的所有 socket 句柄數。

最後一個 timeout:是 epoll_wait 的超時,

爲 0 的時候表示馬上返回,

爲 - 1 的時候表示一直等下去,直到有事件返回,

爲任意正整數的時候表示等這麼長的時間,如果一直沒有事件,則返回。

一般如果網絡主循環是單獨的線程的話,可以用 - 1 來等,這樣可以保證一些效率,如果是和主邏輯在同一個線程的話,則可以用 0 來保證主循環的效率。

epoll_wait 範圍之後應該是一個循環,遍利所有的事件。

epoll 通過在 Linux 內核中申請一個簡易的文件系統 (文件系統一般用什麼數據結構實現?B + 樹)。把原先的 select/poll 調用分成了 3 個部分:

1)調用 epoll_create() 建立一個 epoll 對象 (在 epoll 文件系統中爲這個句柄對象分配資源)

2)調用 epoll_ctl 向 epoll 對象中添加這 100 萬個連接的套接字

3)調用 epoll_wait 收集發生的事件的連接

epoll 程序框架


幾乎所有的 epoll 程序都使用下面的框架:

僞代碼:

listenfd 爲全局變量,服務端監聽的套接字的 fd。

關於epoll_wait返回值的一個簡單測試
 
void test(int epollfd)
{
  struct epoll_event events[MAX_EVENT_NUMBER];
  int number;
 
  while (1)
  {
    number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
    printf("number : %2d\n\n", number);
    for (i = 0; i < number; i++)
    {
      sockfd = events[i].data.fd;
 
      if (sockfd == listenfd)
      {/*用戶上線*/
 
      }
      else if (events[i].events & EPOLLIN)
      {/*有數據可讀*/
 
      }
      else if (events[i].events & EPOLLOUT)
      {/*有數據可寫*/
 
      }
      else
      {/*出錯*/
 
      }
    }
  }
}
 
通過測試發現epoll_wait返回值number是不會大於MAX_EVENT_NUMBER的。
 
測試過程中,連接的客戶端數遠大於MAX_EVENT_NUMBER,由此可以推論:epoll_wait()每次返回的是活躍客戶端的個數,每次並將這些活躍的客戶端信息加入到events[MAX_EVENT_NUMBER]。
 
由此可見,活躍客戶端的個數相同的情況下,events[MAX_EVENT_NUMBER]越大,epoll_wait()函數執行次數越少,但是events[MAX_EVENT_NUMBER]越大越消耗存儲資源。
 
所以,MAX_EVENT_NUMBER的選擇應該在效率和資源間取一個平衡點。

示例代碼

for( ; ; )
    {
        nfds = epoll_wait(epfd,events,20,500);
        for(i=0;i<nfds;++i)
        {
            if(events[i].data.fd==listenfd) //服務端監聽的套接字listenfd上有新的連接
            {
                connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept這個連接
                ev.data.fd=connfd;          //把[這個新連接的fd]加入ev
                ev.events=EPOLLIN|EPOLLET;  //[關注該連接的EPOLLIN事件,EPOLLET邊緣觸發]加入ev
                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //ev添加到epoll的監聽隊列中
            }
            else if( events[i].events&EPOLLIN ) //接收到數據,讀socket
            {
                n = read(sockfd, line, MAXLINE)) < 0    //讀
                ev.data.ptr = md;     //md爲自定義類型,添加數據
                ev.events=EPOLLOUT|EPOLLET;
                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改標識符,等待下一個循環時發送數據,異步處理的精髓
                //epfd 就是epoll_create()返回的epfd
            }
            else if(events[i].events&EPOLLOUT) //有數據待發送,寫socket
            {
                struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;    //取數據
                sockfd = md->fd;
                send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //發送數據
               
                ev.data.fd=sockfd;
                ev.events=EPOLLIN|EPOLLET;
                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改標識符,等待下一個循環時接收數據
            }
            else
            {
                //其他的處理
            }
        }
    }

大致流程

 struct epoll_event ev, event_list[EVENT_MAX_COUNT];//ev用於註冊事件,event_list用於回傳要處理的事件
 
 listenfd = socket(AF_INET, SOCK_STREAM, 0);
 if(0 != bind(listenfd, (struct sockaddr *)
 if(0 != listen(listenfd, LISTENQ)) //LISTENQ 定義了宏//#define LISTENQ   20  
 
 ev.data.fd = listenfd; //設置與要處理的事件相關的文件描述符
 ev.events = EPOLLIN | EPOLLET; //設置要處理的事件類型EPOLLIN :表示對應的文件描述符可以讀,EPOLLET狀態變化才通知
 
 
epfd = epoll_create(256);  //生成用於處理accept的epoll專用的文件描述符
//註冊epoll事件
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //epfd epoll實例ID,EPOLL_CTL_ADD添加,listenfd:socket,ev事件(監聽listenfd)
 
 
nfds = epoll_wait(epfd, event_list, EVENT_MAX_COUNT, TIMEOUT_MS); //等待epoll事件的發生
  1. 首先熟悉下 epoll 的三個接口

int epoll_create(int size);

創建 epoll 相關數據結構,其最重要的是

  1. 紅黑樹, 用於存儲需要監控的文件句柄以及事件

  2. 就緒鏈表,用於存儲被觸發的文件句柄以及事件

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

用於設定,修改,或者刪除 監控的文件句柄以及事件

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

阻塞等待 timeout 時間,如果文件句柄上相關事件被觸發,則 epoll_wait 退出,並將觸發的事件 寫入出參 events 參數,觸發的事件個數作爲返回值返回

  1. 如何使用這三個接口寫一個 server
           typedef union epoll_data {
               void    *ptr;
               int      fd; //可以用fd, 也可以用ptr來保存事件對應的文件句柄
               uint32_t u32;
               uint64_t u64;
           } epoll_data_t;
 
           struct epoll_event {
               uint32_t     events;    /* Epoll events */
               epoll_data_t data;      /* User data variable */
           };

實例源碼


原文有相當多的如錯誤:

需要增加到頭文件和錯誤修改

//'/0'->'\0'
//bzero() 替換爲memset (注意二者參數不一樣,bzero將前n個字節設爲0,memset將前n 個字節的值設爲值 c)
//local_addr 由char* 改爲 string
#include <cstdlib>
#include <stdlib.h> //atoi
#include <string.h> //memset
#include <string>   //std:cout  等

修正後的源碼 C++ lnux:

#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
 
//'/0'->'\0'
//bzero() 替換爲memset (注意二者參數不一樣,bzero將前n個字節設爲0,memset將前n 個字節的值設爲值 c)
//local_addr 由char* 改爲 string
#include <cstdlib>
#include <stdlib.h> //atoi
#include <string.h> //memset
#include <string>   //std:cout  等
 
 
 
using namespace std;
 
#define MAXLINE   255              //讀寫緩衝
#define OPEN_MAX  100
#define LISTENQ   20             //listen的第二個參數  定義TCP鏈接未完成隊列的大小(linux >2.6 則表示accpet之前的隊列)
#define SERV_PORT 5000
#define INFTIM    1000
 
#define TIMEOUT_MS      500
#define EVENT_MAX_COUNT 20
 
void setnonblocking(int sock)
{
    int opts;
    opts = fcntl(sock, F_GETFL);
    if(opts < 0)
    {
        perror("fcntl(sock,GETFL)");
        exit(1);
    }
    opts = opts | O_NONBLOCK;
    if(fcntl(sock, F_SETFL, opts) < 0)
    {
        perror("fcntl(sock,SETFL,opts)");
        exit(1);
    }
}
 
int main(int argc, char *argv[])
{
    int i, maxi, listenfd, connfd, sockfd, epfd, nfds, portnumber;
    ssize_t n;
    char line_buff[MAXLINE];
    
 
 
    if ( 2 == argc )
    {
        if( (portnumber = atoi(argv[1])) < 0 )
        {
            fprintf(stderr, "Usage:%s portnumber/r/n", argv[0]);
            //fprintf()函數根據指定的format(格式)(格式)發送信息(參數)到由stream()指定的文件
            //printf 將內容發送到Default的輸出設備,通常爲本機的顯示器,fprintf需要指定輸出設備,可以爲文件,設備。
            //stderr
            return 1;
        }
    }
    else
    {
        fprintf(stderr, "Usage:%s portnumber/r/n", argv[0]);
        return 1;
    }
 
    //聲明epoll_event結構體的變量,ev用於註冊事件,數組用於回傳要處理的事件
    struct epoll_event ev, event_list[EVENT_MAX_COUNT];
 
    //生成用於處理accept的epoll專用的文件描述符
    epfd = epoll_create(256); //生成epoll文件描述符,既在內核申請一空間,存放關注的socket fd上是否發生以及發生事件。size既epoll fd上能關注的最大socket fd數。隨你定好了。只要你有空間。
 
    struct sockaddr_in clientaddr;
    socklen_t clilenaddrLen;
    struct sockaddr_in serveraddr;
    
    listenfd = socket(AF_INET, SOCK_STREAM, 0);//Unix/Linux“一切皆文件”,創建(套接字)文件,id=listenfd
    if (listenfd < 0)
    {
       printf("socket error,errno %d:%s\r\n",errno,strerror(errno));
    }
    //把socket設置爲非阻塞方式
    //setnonblocking(listenfd);
 
    //設置與要處理的事件相關的文件描述符
    ev.data.fd = listenfd;
 
    //設置要處理的事件類型
    ev.events = EPOLLIN | EPOLLET; //EPOLLIN :表示對應的文件描述符可以讀,EPOLLET狀態變化才通知
    //ev.events=EPOLLIN;
 
    //註冊epoll事件
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //epfd epoll實例ID,EPOLL_CTL_ADD添加,listenfd:socket,ev事件(監聽listenfd)
   
 
    memset(&serveraddr, 0, sizeof(serveraddr));
    serveraddr.sin_family     = AF_INET;
    serveraddr.sin_addr.s_addr=htonl(INADDR_ANY); /*IP,INADDR_ANY轉換過來就是0.0.0.0,泛指本機的意思,也就是表示本機的所有IP*/
    serveraddr.sin_port       = htons(portnumber);
    
    if(0 != bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)))
     {
        printf("bind error,errno %d:%s\r\n",errno,strerror(errno));
     }
 
     if(0 != listen(listenfd, LISTENQ)) //LISTENQ 定義了宏
     {
         printf("listen error,errno %d:%s\r\n",errno,strerror(errno));
     }
 
    maxi = 0;
 
    for ( ; ; )
    {
 
        //等待epoll事件的發生
        nfds = epoll_wait(epfd, event_list, EVENT_MAX_COUNT, TIMEOUT_MS); //epoll_wait(int epfd, struct epoll_event * event_list, int maxevents, int timeout),返回需要處理的事件數目
 
        //處理所發生的所有事件
        for(i = 0; i < nfds; ++i)
        {
            if(event_list[i].data.fd == listenfd) //如果新監測到一個SOCKET用戶連接到了綁定的SOCKET端口,建立新的連接。
            {
                clilenaddrLen = sizeof(struct sockaddr_in);//在調用accept()前,要給addrLen賦值,這樣纔不會出錯,addrLen = sizeof(clientaddr);或addrLen = sizeof(struct sockaddr_in);
                connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clilenaddrLen);//(accpet詳解:https://blog.csdn.net/David_xtd/article/details/7087843)
                if(connfd < 0)
                {
                    //perror("connfd<0:connfd= %d",connfd);
                    printf("connfd<0,accept error,errno %d:%s\r\n",errno,strerror(errno));
                    exit(1);
                }
 
                //setnonblocking(connfd);
 
                char *str = inet_ntoa(clientaddr.sin_addr);//將一個32位網絡字節序的二進制IP地址轉換成相應的點分十進制的IP地址
 
                cout << "accapt a connection from " << str << endl;
 
                //設置用於讀操作的文件描述符
                ev.data.fd = connfd;
 
                //設置用於注測的讀操作事件
                ev.events = EPOLLIN | EPOLLET;
                //ev.events=EPOLLIN;
 
                //註冊ev
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); //將accpet的句柄添加進入(增加監聽的對象)
            }
            else if(event_list[i].events & EPOLLIN) //如果是已經連接的用戶,並且收到數據,那麼進行讀入。
            {
                cout << "EPOLLIN" << endl;
                if ( (sockfd = event_list[i].data.fd) < 0)
                    continue;
 
                
                if ( (n = read(sockfd, line_buff, MAXLINE)) < 0)  //read時fd中的數據如果小於要讀取的數據,就會引起阻塞?
                {
               //當read()或者write()返回-1時,一般要判斷errno
                    if (errno == ECONNRESET)//與客戶端的Socket被客戶端強行被斷開,而服務器還企圖read
                    {
                        close(sockfd);
                        event_list[i].data.fd = -1;
                    }
                    else
                        std::cout << "readline error" << std::endl;
                }
                else if (n == 0) //返回的n爲0時,說明客戶端已經關閉 
                {
                    close(sockfd);
                    event_list[i].data.fd = -1;
                }
 
                line_buff[n] = '\0';
                cout << "read " << line_buff << endl;
 
                //設置用於寫操作的文件描述符
                ev.data.fd = sockfd;
 
                //設置用於注測的寫操作事件
                ev.events = EPOLLOUT | EPOLLET; //EPOLLOUT:表示對應的文件描述符可以寫;
 
                //修改sockfd上要處理的事件爲EPOLLOUT
                //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
 
            }
            else if(event_list[i].events & EPOLLOUT) // 如果有數據發送
            {
                sockfd = event_list[i].data.fd;
                write(sockfd, line_buff, n);
               
                //設置用於讀操作的文件描述符
                ev.data.fd = sockfd;
                
                //設置用於注測的讀操作事件
                ev.events = EPOLLIN | EPOLLET;
                
                //修改sockfd上要處理的事件爲EPOLIN
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
            }
        }
    }
    return 0;
}

編譯命令

linux 下編譯:g++ epoll.cpp -o epoll

命令行簡單測試

curl  192.168.0.250:5000  -d   "phone=123456789&

相關知識


如何動態的改變 listen 監聽的個數呢?

如果指定值在源代碼中是一個常值,那麼增長其大小需要重新編譯服務器程序。那麼,我們可以爲它設定一個缺省值,不過允許通過命令行選項或者環境變量來覆寫該值。

void Listen(int fd, int backlog)
{
    char *ptr;
    
    if((ptr = getenv("LISTENQ")) != NULL)
        backlog = atoi(ptr);
 
    if(listen(fd, backlog) < 0)
        printf("listen error\n");
}

隊列已滿的情況,如何處理?

當一個客戶 SYN 到達時,若這個隊列是滿的,TCP 就忽略該分節,也就是不會發送 RST。

這麼做的原因在於,隊列已滿的情況是暫時的,客戶 TCP 如果沒收收到 RST,就會重發 SYN,在隊列有空閒的時候處理該請求。如果服務器 TCP 立即響應一個 RST,客戶的 connect 調用就會立即返回一個錯誤,強制應用進程處理這種情況,而不會再次重發 SYN。而且客戶端也不無區別該套接口的狀態,是 “隊列已滿” 還是“該端口沒有在監聽”。

SYN 氾濫攻擊

向某一目標服務器發送大量的 SYN,用以填滿一個或多個 TCP 端口的未完成隊列。每個 SYN 的源 IP 地址都置成隨機數(IP 欺騙),這樣防止攻擊服務器獲悉黑客的真實 IP 地址。通過僞造的 SYN 裝滿未完成連接隊列,使得合法的 SYN 不能排上隊,導致針對合法用戶的服務被拒絕。

防禦方法:

  1. 針對服務器主機的方法。增加連接緩衝隊列長度和縮短連接請求佔用緩衝隊列的超時時間。該方式最簡單,被很多操作系統採用,但防禦性能也最弱。

  2. 針對路由器過濾的方法。由於 DDoS 攻擊,包括 SYN-Flood,都使用地址僞裝技術,所以在路由器上使用規則過濾掉被認爲地址僞裝的包,會有效的遏制攻擊流量。

  3. 針對防火牆的方法。在 SYN 請求連接到真正的服務器之前,使用基於防火牆的網關來測試其合法性。它是一種被普遍採用的專門針對 SYN-Flood 攻擊的防禦機制。

SYN: 同步序列編號(Synchronize Sequence Numbers)

它們的含義是:
SYN表示建立連接,
FIN表示關閉連接,
ACK表示響應,
PSH表示有 
DATA數據傳輸,
RST表示連接重置。

SYN(synchronous 建立聯機)

ACK(acknowledgement 確認)

PSH(push 傳送)

FIN(finish 結束)

RST(reset 重置)

URG(urgent 緊急)

Sequence number(順序號碼)

Acknowledge number(確認號碼)

三次握手:

在TCP/IP協議中,TCP協議提供可靠的連接服務,採用三次握手建立一個連接。
 第一次握手:建立連接時,客戶端發送syn包(syn=j)到服務器,並進入SYN_SEND狀態,等待服務器確認;
第二次握手:服務器收到syn包,必須確認客戶的SYN(ack=j+1),同時自己也發送一個SYN包(syn=k),即SYN+ACK包,此時服務器進入SYN_RECV狀態;
 第三次握手:客戶端收到服務器的SYN+ACK包,向服務器發送確認包ACK(ack=k+1),此包發送完畢,客戶端和服務器進入ESTABLISHED狀態,完成三次握手。完成三次握手,客戶端與服務器開始傳送數據.

第一次握手:主機 A 發送位碼爲 syn=1,隨機產生 seq number=1234567 的數據包到服務器,主機 B 由 SYN=1 知道,A 要求建立聯機;

第二次握手:主機 B 收到請求後要確認聯機信息,向 A 發送 ack number=(主機 A 的 seq+1),syn=1,ack=1,隨機產生 seq=7654321 的包;

第三次握手:主機 A 收到後檢查 ack number 是否正確,即第一次發送的 seq number+1,以及位碼 ack 是否爲 1,若正確,主機 A 會再發送 ack number=(主機 B 的 seq+1),ack=1,主機 B 收到後確認 seq 值與 ack=1 則連接建立成功。

實例代碼二


多進程 Epoll:

#include <sys/types.h>  
#include <sys/socket.h>  
#include <sys/epoll.h>  
#include <netdb.h>  
#include <string.h>  
#include <stdio.h>  
#include <unistd.h>  
#include <fcntl.h>  
#include <stdlib.h>  
#include <errno.h>  
#include <sys/wait.h>  
#define PROCESS_NUM 10  
static int  
create_and_bind (char *port)  
{  
    int fd = socket(PF_INET, SOCK_STREAM, 0);  
    struct sockaddr_in serveraddr;  
    serveraddr.sin_family = AF_INET;  
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);  
    serveraddr.sin_port = htons(atoi(port));  
    bind(fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));  
    return fd;  
}  
    static int  
make_socket_non_blocking (int sfd)  
{  
    int flags, s;  
 
    flags = fcntl (sfd, F_GETFL, 0);  
    if (flags == -1)  
    {  
        perror ("fcntl");  
        return -1;  
    }  
 
    flags |= O_NONBLOCK;  
    s = fcntl (sfd, F_SETFL, flags);  
    if (s == -1)  
    {  
        perror ("fcntl");  
        return -1;  
    }  
 
    return 0;  
}  
  
#define MAXEVENTS 64  
 
int  
main (int argc, char *argv[])  
{  
    int sfd, s;  
    int efd;  
    struct epoll_event event;  
    struct epoll_event *events;  
 
    sfd = create_and_bind("1234");  
    if (sfd == -1)  
        abort ();  
 
    s = make_socket_non_blocking (sfd);  
    if (s == -1)  
        abort ();  
 
    s = listen(sfd, SOMAXCONN);  
    if (s == -1)  
    {  
        perror ("listen");  
        abort ();  
    }  
 
    efd = epoll_create(MAXEVENTS);  
    if (efd == -1)  
    {  
        perror("epoll_create");  
        abort();  
    }  
 
    event.data.fd = sfd;  
    //event.events = EPOLLIN | EPOLLET;  
    event.events = EPOLLIN;  
    s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);  
    if (s == -1)  
    {  
        perror("epoll_ctl");  
        abort();  
    }  
 
    /* Buffer where events are returned */  
    events = calloc(MAXEVENTS, sizeof event);  
            int k;  
    for(k = 0; k < PROCESS_NUM; k++)  
    {  
        int pid = fork();  
        if(pid == 0)  
        {  
 
            /* The event loop */  
            while (1)  
            {  
                int n, i;  
                n = epoll_wait(efd, events, MAXEVENTS, -1);  
                printf("process %d return from epoll_wait!\n", getpid());  
                                       /* sleep here is very important!*/  
                //sleep(2);  
                                       for (i = 0; i < n; i++)  
                {  
                    if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)
                                                    || (!(events[i].events & EPOLLIN)))  
                    {  
                        /* An error has occured on this fd, or the socket is not  
                        ready for reading (why were we notified then?) */  
                        fprintf (stderr, "epoll error\n");  
                        close (events[i].data.fd);  
                        continue;  
                    }  
                    else if (sfd == events[i].data.fd)  
                    {  
                        /* We have a notification on the listening socket, which  
                        means one or more incoming connections. */  
                        struct sockaddr in_addr;  
                        socklen_t in_len;  
                        int infd;  
                        char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];  
 
                        in_len = sizeof in_addr;  
                        infd = accept(sfd, &in_addr, &in_len);  
                        if (infd == -1)  
                        {  
                            printf("process %d accept failed!\n", getpid());  
                            break;  
                        }  
                        printf("process %d accept successed!\n", getpid());  
 
                        /* Make the incoming socket non-blocking and add it to the  
                        list of fds to monitor. */  
                        close(infd); 
                    }  
                }  
            }  
        }  
    }  
    int status;  
    wait(&status);  
    free (events);  
    close (sfd);  
    return EXIT_SUCCESS;  
}

建立 2000 + 個鏈接的測試代碼

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include<arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
 
const int MAXLINE = 5;
int count = 1;
 
static int make_socket_non_blocking(int fd)
{
  int flags, s;
 
  flags = fcntl (fd, F_GETFL, 0);
  if (flags == -1)
    {
      perror ("fcntl");
      return -1;
    }
 
  flags |= O_NONBLOCK;
  s = fcntl (fd, F_SETFL, flags);
  if (s == -1)
    {
      perror ("fcntl");
      return -1;
    }
 
  return 0;
}
 
void sockconn()
{
	int sockfd;
	struct sockaddr_in server_addr;
	struct hostent *host;
	char buf[100];
	unsigned int value = 1;
 
	host = gethostbyname("127.0.0.1");
	sockfd = socket(AF_INET, SOCK_STREAM, 0);
	if (sockfd == -1) {
		perror("socket error\r\n");
		return;
	}
	
	//setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
	
	//make_socket_non_blocking(sockfd);
 
	bzero(&server_addr, sizeof(server_addr));
 
	server_addr.sin_family = AF_INET;
	server_addr.sin_port = htons(8080);
	server_addr.sin_addr = *((struct in_addr*) host->h_addr);
 
	int cn = connect(sockfd, (struct sockaddr *) &server_addr,
			sizeof(server_addr));
	if (cn == -1) {
		printf("connect error errno=%d\r\n", errno);
		return;
 
	}
//	char *buf = "h";
	sprintf(buf, "%d", count);
	count++;
	write(sockfd, buf, strlen(buf));
	close(sockfd);
	
	printf("client send %s\r\n", buf);
	
	return;
}
 
int main(void) {
 
 int i;
 for (i = 0; i < 2000; i++)
 {
 	  sockconn();
 }
 
 return 0;
}

關於 ET、LT 兩種工作模式


水平觸發 LT:

其中 LT 就是與 select 和 poll 類似,當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據一次性全部讀寫完 (如讀寫緩衝區太小),那麼下次調用 epoll_wait() 時,它還會通知你在上次沒讀寫完的文件描述符上繼續讀寫

邊緣觸發 ET:

當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據全部讀寫完 (如讀寫緩衝區太小),那麼下次調用 epoll_wait() 時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件纔會通知你

水平觸發:只要緩衝區有數據就會一直觸發

邊沿觸發:只有在緩衝區增加數據的那一刻纔會觸發

由此可見,水平觸發時如果系統中有大量你不需要讀寫的就緒文件描述符(有些 fd 有數據,但是你不處理那些 fd),而它們每次都會返回,這樣會大大降低處理程序檢索自己關心的就緒文件描述符的效率,而邊緣觸發,則不會充斥大量你不關心的就緒文件描述符,從而性能差異,高下立見。

4、關於 ET、LT 兩種工作模式

可以得出這樣的結論:

ET 模式僅當狀態發生變化的時候才獲得通知, 這裏所謂的狀態的變化並不包括緩衝區中還有未處理的數據, 也就是說, 如果要採用 ET 模式, 需要一直 read/write 直到出錯爲止, 很多人反映爲什麼採用 ET 模式只接收了一部分數據就再也得不到通知了, 大多因爲這樣; 而 LT 模式是隻要有數據沒有處理就會一直通知下去的.

epoll 中讀寫數據 的注意事項

在一個非阻塞的 socket 上調用 read/write 函數,返回 EAGAIN 或者 EWOULDBLOCK(注:EAGAIN 就是 EWOULDBLOCK)。

      從字面上看,意思是:

EAGAIN: 再試一次

EWOULDBLOCK:如果這是一個阻塞 socket, 操作將被 block

perror 輸出:Resource temporarily unavailable

總結:

這個錯誤表示資源暫時不夠,可能 read 時, 讀緩衝區沒有數據, 或者 write 時,寫緩衝區滿了。

遇到這種情況,如果是阻塞 socket、 read/write 就要阻塞掉。而如果是非阻塞 socket、 read/write 立即返回 - 1, 同 時 errno 設置爲 EAGAIN。

所以對於阻塞 socket、 read/write 返回 - 1 代表網絡出錯了。但對於非阻塞 socket、read/write 返回 - 1 不一定網絡真的出錯了。可能是 Resource temporarily unavailable。這時你應該再試,直到 Resource available。

本文主要講述 epoll 模型(不完全是針對 epoll)下讀寫數據接口使用的注意事項

1、read  write

函數原型如下:

#include <unistd.h>
ssize_t read(int filedes, void* buf, size_t nbytes)
ssize_t write(int filedes, const void* buf, size_t nbytes)

其中,read 返回實際讀取到的字節數。但實際讀取的字節很有可能少於指定要讀取的字節數 nbytes。因此會分爲:

①返回值大於 0。 讀取正常,返回實際讀取到的字節數

②返回值等於 0。 讀取異常,讀取到文件 filedes 結尾處了。這裏邏輯上要理解爲 read 已經讀取完數據

③返回值小於 0(-1)。 讀取出錯,在處理網絡請求時可能是網絡異常。着重注意當返回 - 1,此時 errno 的值 EAGAIN、EWOULLDBLOCK,表示內核對應的讀緩衝區爲空

而 write 返回的實際寫入字節數正常情況是與制定寫入的字節數 nbytes 相同的,不相等說明寫入異常了,着重注意,此時 errno 的值 EAGAIN、EWOULLDBLOCK,表示內核對應的寫緩衝區爲空。注,EAGAIN 等同於 EWOULLDBLOCK。

總之,這個錯誤表示資源暫時不夠,可能 read 時讀緩衝區沒有數據, 或者 write 時寫緩衝區滿了。遇到這種情況,如果是阻塞 socket、 read/write 就要阻塞掉。而如果是非阻塞 socket、 read/write 立即返回 - 1, 同時 errno 設置爲 EAGAIN。

所以對於阻塞 socket、 read/write 返回 - 1 代表網絡出錯了。但對於非阻塞 socket、read/write 返回 - 1 不一定網絡真的出錯了。可能支持緩衝區空或者滿,這時應該再試,直到 Resource available。

綜上,對於非阻塞的 socket,正確的讀寫操作爲:

LT 模式

讀: 忽略掉 errno = EAGAIN 的錯誤,下次繼續讀;

寫:忽略掉 errno = EAGAIN 的錯誤,下次繼續寫。

對於 select 和 epoll 的 LT 模式,這種讀寫方式是沒有問題的。但對於 epoll 的 ET 模式,這種方式還有漏洞。

下面來介紹下 epoll 事件的兩種模式 LT(水平觸發)和 ET(邊沿觸發),根據可以理解爲,文件描述符的讀寫狀態發生變化纔會觸發 epoll 事件,具體說來如下:二者的差異在於 level-trigger 模式下只要某個 socket 處於 readable/writable 狀態,無論什麼時候進行 epoll_wait 都會返回該 socket;而 edge-trigger 模式下只有某個 socket 從 unreadable 變爲 readable,或從 unwritable 變爲 writable 時,epoll_wait 纔會返回該 socket。如下兩個示意圖:

從 socket 讀數據:      

往 socket 寫數據:

所以在 epoll 的 ET 模式下,正確的讀寫方式爲:

讀: 只要可讀, 就一直讀,直到返回 0,或者 errno = EAGAIN 寫:只要可寫, 就一直寫,直到數據發送完,或者 errno = EAGAIN

這裏的意思是,對於 ET 模式,相當於我們要自己重寫 read 和 write,使其像” 原子操作 “一樣,保證一次 read 或 write 能夠完整的讀完緩衝區的數據或者寫完要寫入緩衝區的數據。因此,實現爲用 while 包住 read 和 write 即可。但是對於 select 或者 LT 模式,我們可以只使用一次 read 和 write,因爲在主程序中會一直 while,而事件再下一次 select 時還會被獲取到。但也可以實現爲用 while 包住 read 和 write。從邏輯上講,一次性把數據讀取完整可以保證數據的完整性。

下面來說明這種” 原子操作 “read 和 write

int n = 0;
while(1)
{
    nread = read(fd, buf + n, BUFSIZ - 1); //讀時,用戶進程指定的接收數據緩衝區大小固定,一般要比數據大
    if(nread < 0)
    {
        if(errno == EAGAIN || errno == EWOULDBLOCK)
        {
            continue;
        }
        else
        {
            break; //or return;
        }
    }
    else if(nread == 0)
    {
        break; //or return. because read the EOF
    }
    else
    {
        n += nread;
    }
}
int data_size = strlen(buf);
int n = 0;
while(1)
{
    nwrite = write(fd, buf + n, data_size);//寫時,數據大小一直在變化
    if(nwrite < data_size)
    {
        if(errno == EAGAIN || errno == EWOULDBLOCK)
        {
            continue;
        }
        else
        {
            break;//or return;        
        }
 
    }
    else
    {
        n += nwrite;
        data_size -= nwrite;
    }
}

正確的 accept,accept 要考慮 2 個問題:

(1) LT 模式下或 ET 模式下,阻塞的監聽 socket, accept 存在的問題

accept 每次都是從已經完成三次握手的 tcp 隊列中取出一個連接,考慮這種情況: TCP 連接被客戶端夭折,即在服務器調用 accept 之前,客戶端主動發送 RST 終止連接,導致剛剛建立的連接從就緒隊列中移出,如果套接口被設置成阻塞模式,服務器就會一直阻塞在 accept 調用上,直到其他某個客戶建立一個新的連接爲止。但是在此期間,服務器單純地阻塞在 accept 調用上,就緒隊列中的其他描述符都得不到處理。

解決辦法是:把監聽套接口設置爲非阻塞,當客戶在服務器調用 accept 之前中止某個連接時,accept 調用可以立即返回 -1, 這時源自 Berkeley 的實現會在內核中處理該事件,並不會將該事件通知給 epoll,而其他實現把 errno 設置爲 ECONNABORTED 或者 EPROTO 錯誤,我們應該忽略這兩個錯誤。

(2) ET 模式下 accept 存在的問題

考慮這種情況:多個連接同時到達,服務器的 TCP 就緒隊列瞬間積累多個就緒連接,由於是邊緣觸發模式,epoll 只會通知一次,accept 只處理一個連接,導致 TCP 就緒隊列中剩下的連接都得不到處理。

解決辦法是:將監聽套接字設置爲非阻塞模式,用 while 循環抱住 accept 調用,處理完 TCP 就緒隊列中的所有連接後再退出循環。如何知道是否處理完就緒隊列中的所有連接呢? accept  返回 -1 並且 errno 設置爲 EAGAIN 就表示所有連接都處理完。

綜合以上兩種情況,服務器應該使用非阻塞地 accept, accept 在 ET 模式下 的正確使用方式爲:

while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,   
                (size_t *)&addrlen)) > 0) {  
    handle_client(conn_sock);  
}  
if (conn_sock == -1) {  
    if (errno != EAGAIN && errno != ECONNABORTED   
            && errno != EPROTO && errno != EINTR)   
        perror("accept");  
}

一道騰訊後臺開發的面試題:

使用 Linux epoll 模型,水平觸發模式;當 socket 可寫時,會不停的觸發 socket 可寫的事件,如何處理?

需要向 socket 寫數據的時候才把 socket 加入 epoll ,等待可寫事件。接受到可寫事件後,調用 write 或者 send 發送數據。當所有數據都寫完後,把 socket 移出 epoll。

這種方式的缺點是,即使發送很少的數據,也要把 socket 加入 epoll,寫完後在移出 epoll,有一定操作代價。

開始不把 socket 加入 epoll,需要向 socket 寫數據的時候,直接調用 write 或者 send 發送數據。如果返回 EAGAIN,把 socket 加入 epoll,在 epoll 的驅動下寫數據,全部數據發送完畢後,再移出 epoll。

這種方式的優點是:數據不多的時候可以避免 epoll 的事件處理,提高效率。

多路複用


如文初的說明表示,這三者都是 I/O 多路複用機制,且簡要介紹了多路複用的定義,那麼如何更加直觀地瞭解多路複用呢?這裏有張圖:

對於網頁服務器 Nginx 來說,會有很多連接進來, epoll 會把他們都監視起來,然後像撥開關一樣,誰有數據就撥向誰,然後調用相應的代碼處理。

一般來說以下場合需要使用 I/O 多路複用:

爲什麼 epoll 可以支持百萬級別的連接?


  1. 在 server 的處理過程中,大家可以看到其中重要的操作是,使用 epoll_ctl 修改 clientfd 在 epoll 中註冊的 epoll_event, 這個操作首先在紅黑樹中找到 fd 對應的 epoll_event, 然後進行修改,紅黑樹是典型的二叉平衡樹,其時間複雜度是 log2(n), 1 百萬的文件句柄,只需要 16 次左右的查找,速度是非常快的,支持百萬級別毫無壓力

  2. 另外,epoll 通過註冊 fd 上的回調函數,回調函數監控到有事件發生,則準備好相關的數據放到到就緒鏈表裏面去,這個動作非常快,成本也非常小

socket 讀寫返回值的處理


在調用 socket 讀寫函數 read(),write() 時,都會有返回值。如果沒有正確處理返回值,就可能引入一些問題

總結了以下幾點

1 當 read() 或者 write() 函數返回值大於 0 時,表示實際從緩衝區讀取或者寫入的字節數目

2 當 read() 函數返回值爲 0 時,表示對端已經關閉了 socket,這時候也要關閉這個 socket,否則會導致 socket 泄露。netstat 命令查看下,如果有 closewait 狀態的 socket, 就是 socket 泄露了

當 write() 函數返回 0 時,表示當前寫緩衝區已滿,是正常情況,下次再來寫就行了。

3 當 read() 或者 write() 返回 - 1 時,一般要判斷 errno

如果 errno == EINTR, 表示系統當前中斷了,直接忽略

如果 errno == EAGAIN 或者 EWOULDBLOCK,非阻塞 socket 直接忽略; 如果是阻塞的 socket, 一般是讀寫操作超時了,還未返回。這個超時是指 socket 的 SO_RCVTIMEO 與 SO_SNDTIMEO 兩個屬性。所以在使用阻塞 socket 時,不要將超時時間設置的過小。不然返回了 - 1,你也不知道是 socket 連接是真的斷開了,還是正常的網絡抖動。一般情況下,阻塞的 socket 返回了 - 1,都需要關閉重新連接。

  1. 另外,對於非阻塞的 connect, 可能返回 - 1. 這時需要判斷 errno,如果 errno == EINPROGRESS,表示正在處理中,否則表示連接出錯了,需要關閉重連。之後使用 select,檢測到該 socket 的可寫事件時,要判斷 getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &err, &errlen),看 socket 是否出錯了。如果 err 值爲 0, 則表示 connect 成功;否則也應該關閉重連

5 在使用 epoll 時,有 ET 與 LT 兩種模式。ET 模式下,socket 需要 read 或者 write 到返回 - 1 爲止。對於非阻塞的 socket 沒有問題,但是如果是阻塞的 socket,正如第三條中所說的,只有超時纔會返回。所以在 ET 模式下千萬不要使用阻塞的 socket。那麼 LT 模式爲什麼沒問題呢?一般情況下,使用 LT 模式,我們只要調用一次 read 或者 write 函數,如果沒有讀完或者沒有寫完,下次再來就是了。由於已經返回了可讀或者可寫事件,所以可以保證調用一次 read 或者 write 會正常返回。

nread 爲 - 1 且 errno==EAGAIN,說明數據已經讀完,設置 EPOLLOUT。

網絡狀態查詢命令


sar、iostat、lsof

問題記錄


客戶端


1、Cannot assign requested address

大致上是由於客戶端頻繁的連服務器,由於每次連接都在很短的時間內結束,導致很多的 TIME_WAIT,以至於用光了可用的端 口號,所以新的連接沒辦法綁定端口,即 “Cannot assign requested address”。是客戶端的問題不是服務器端的問題。通過 netstat,的確看到很多 TIME_WAIT 狀態的連接。

client 端頻繁建立連接,而端口釋放較慢,導致建立新連接時無可用端口。

netstat -a|grep TIME_WAIT
tcp        0      0 e100069210180.zmf:49477     e100069202104.zmf.tbs:websm TIME_WAIT   
tcp        0      0 e100069210180.zmf:49481     e100069202104.zmf.tbs:websm TIME_WAIT   
tcp        0      0 e100069210180.zmf:49469     e100069202104.zmf.tbs:websm TIME_WAIT   
……

解決辦法

執行命令修改如下內核參數 (需要 root 權限)

sysctl -w net.ipv4.tcp_fin_timeout=30

sysctl -w net.ipv4.tcp_timestamps=1

sysctl -w net.ipv4.tcp_tw_recycle=1

sysctl -w net.ipv4.tcp_tw_reuse = 1

2、2.8 萬左右的鏈接,報錯誤(可能端口用盡)

如果沒有 TIME_WAIT 狀態的連接,那有可能端口用盡,特別是長連接的時候,查看開放的端口範圍:

[root@VM_0_8_centos usr]# sysctl -a |grep port_range
net.ipv4.ip_local_port_range = 32768    60999
sysctl: reading key "net.ipv6.conf.all.stable_secret"
sysctl: reading key "net.ipv6.conf.default.stable_secret"
sysctl: reading key "net.ipv6.conf.eth0.stable_secret"
sysctl: reading key "net.ipv6.conf.lo.stable_secret"
[root@VM_0_8_centos usr]#

60999 - 32768 = 28,231, 剛好 2.8 萬,長連接把端口用光了。修改端口開放範圍:

vi /etc/sysctl.conf net.ipv4.ip_local_port_range = 10000     65535

執行 sysctl -p 使得生效

服務端


影響鏈接不往上走的原因:

1、端口用完了 (客戶端,

查看端口範圍 sysctl -a |grep port_range,返回:net.ipv4.ip_local_port_range = 32768    60999,所以可用端口是 60999-32768 =2.8w )

2、文件 fd 用完了

3、內存用完了

4、網絡

5、配置:fsfile-max = 1048576 #文件 fd 的最大值,

fd=open(),fd 從 3 開始,0:stdin 標準輸入 1:stdout 標準輸出 2:stderr 錯誤輸出

Epoll 難以解決的問題


int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);   還是要從這個 api 說起,這個 api 可以監聽很多個 fd,但是 timeout 只有一個。 有這一個場景: 你想關注 fd == 10 的這個描述符,你希望該描述符有數據到來的時候通知你,並且,如果一直沒有數據來,那麼希望 20s 之後能通知你。 你改怎麼做?

進一步假設,你希望關注 100 個 fd, 並希望這個一百個 fd,從他們加入監聽隊列的時候開始計算,20s 後通知你超時

如果使用這個 epoll_wait 的話,是不是要自己記住所有 fd 的剩餘超時時間呢?

libevent 就解決了這個困擾。

配置和調試


net.ipv4.tcp_syncookies = 1 表示開啓 SYN Cookies。當出現 SYN 等待隊列溢出時,啓用 cookies 來處理,可防範少量 SYN 攻擊,默認爲 0,表示關閉;

net.ipv4.tcp_tw_reuse = 1 表示開啓重用。允許將 TIME-WAIT sockets 重新用於新的 TCP 連接,默認爲 0,表示關閉;

net.ipv4.tcp_tw_recycle = 1 表示開啓 TCP 連接中 TIME-WAIT sockets 的快速回收,默認爲 0,表示關閉。

net.ipv4.tcp_fin_timeout 修改系默認的 TIMEOUT 時間

優化 time_wait 爲啥要開 syncookie 呢? syncookie 可以繞過 seq queue 的限制,跟優化 time_wait 沒有關係

另外得打開 timestamps,才能讓 reuse、recycle 生效。

修改 fin_timeout,如何調?調大調小呢?

// 測試數據

單線程,峯值處理鏈接,貌似目前測試條件可以測試到 2.9K

CONNECT: 2.9K/s

QPS : 27.6 萬

5 萬的鏈接。

QPS : 1.9 萬  

客戶端、服務端支持多少連接


客戶端

現在我們終於可以得出更爲正確的結論了,對於有 1 個 Ip 的客戶端來說,受限於 ip_local_port_range 參數,也受限於 65535。但單 Linux 可以配置多個 ip,有幾個 ip,最大理論值就翻幾倍

多張網卡不是必須的。即使只有一張網卡,也可以配置多 ip。k8s 就是這麼幹的,在 k8s 裏,一臺物理機上可以部署多個 pod。但每一個 pod 都會被分配一個獨立的 ip,所以完全不用擔心物理機上部署了過多的 pod 而影響你用的 pod 裏的 TCP 連接數量。在 ip 給你的那一刻,你的 pod 就和其它應用隔離開了。

服務器端

一條 TCP 連接如果不發送數據的話,消耗內存是 3.3K 左右。如果有數據發送,需要爲每條 TCP 分配發送緩存區,大小受你的參數 net.ipv4.tcp_wmem 配置影響,默認情況下最小是 4K。如果發送結束,緩存區消耗的內存會被回收。

假設你只保持連接不發送數據,那麼你服務器可以建立的連接最大數量 = 你的內存 / 3.3K。 假如是 4GB 的內存,那麼大約可接受的 TCP 連接數量是 100 萬左右。

這個例子裏,我們考慮的前提是在一個進程下 hold 所有的服務器端連接。而在實際中的項目裏,爲了收發數據方便,很多網絡 IO 模型還會爲 TCP 連接再創建一個線程或協程。拿最輕量的 golang 來說,一個協程棧也需要 2KB 的內存開銷。

結論

select、poll、epoll 之間的區別 (搜狗面試)


(1)select==> 時間複雜度 O(n)

它僅僅知道了,有 I/O 事件發生了,卻並不知道是哪那幾個流(可能有一個,多個,甚至全部),我們只能無差別輪詢所有流,找出能讀出數據,或者寫入數據的流,對他們進行操作。所以 select 具有 O(n) 的無差別輪詢複雜度,同時處理的流越多,無差別輪詢時間就越長。

(2)poll==> 時間複雜度 O(n)

poll 本質上和 select 沒有區別,它將用戶傳入的數組拷貝到內核空間,然後查詢每個 fd 對應的設備狀態, 但是它沒有最大連接數的限制,原因是它是基於鏈表來存儲的.

(3)epoll==> 時間複雜度 O(1)

epoll 可以理解爲 event poll,不同於忙輪詢和無差別輪詢,epoll 會把哪個流發生了怎樣的 I/O 事件通知我們。所以我們說 epoll 實際上是事件驅動(每個事件關聯上 fd)的,此時我們對這些流的操作都是有意義的。(複雜度降低到了 O(1))

select,poll,epoll 都是 IO 多路複用的機制。I/O 多路複用就通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。但 select,poll,epoll 本質上都是同步 I/O,因爲他們都需要在讀寫事件就緒後自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步 I/O 則無需自己負責進行讀寫,異步 I/O 的實現會負責把數據從內核拷貝到用戶空間。  

epoll 跟 select 都能提供多路 I/O 複用的解決方案。在現在的 Linux 內核裏有都能夠支持,其中 epoll 是 Linux 所特有,而 select 則應該是 POSIX 所規定,一般操作系統均有實現

select:

select 本質上是通過設置或者檢查存放 fd 標誌位的數據結構來進行下一步處理。這樣所帶來的缺點是:

1、 單個進程可監視的 fd 數量被限制,即能監聽端口的大小有限。

一般來說這個數目和系統內存關係很大,具體數目可以 cat /proc/sys/fs/file-max 察看。32 位機默認是 1024 個。64 位機默認是 2048.

2、 對 socket 進行掃描時是線性掃描,即採用輪詢的方法,效率較低:

當套接字比較多的時候,每次 select() 都要通過遍歷 FD_SETSIZE 個 Socket 來完成調度, 不管哪個 Socket 是活躍的, 都遍歷一遍。這會浪費很多 CPU 時間。如果能給套接字註冊某個回調函數,當他們活躍時,自動完成相關操作,那就避免了輪詢,這正是 epoll 與 kqueue 做的。

3、需要維護一個用來存放大量 fd 的數據結構,這樣會使得用戶空間和內核空間在傳遞該結構時複製開銷大

poll:

poll 本質上和 select 沒有區別,它將用戶傳入的數組拷貝到內核空間,然後查詢每個 fd 對應的設備狀態,如果設備就緒則在設備等待隊列中加入一項並繼續遍歷,如果遍歷完所有 fd 後沒有發現就緒設備,則掛起當前進程,直到設備就緒或者主動超時,被喚醒後它又要再次遍歷 fd。這個過程經歷了多次無謂的遍歷。

它沒有最大連接數的限制,原因是它是基於鏈表來存儲的,但是同樣有一個缺點:

1、大量的 fd 的數組被整體複製於用戶態和內核地址空間之間,而不管這樣的複製是不是有意義。                  

2、poll 還有一個特點是 “水平觸發”,如果報告了 fd 後,沒有被處理,那麼下次 poll 時會再次報告該 fd。

epoll:

epoll 有 EPOLLLT 和 EPOLLET 兩種觸發模式,LT 是默認的模式,ET 是 “高速” 模式。LT 模式下,只要這個 fd 還有數據可讀,每次 epoll_wait 都會返回它的事件,提醒用戶程序去操作,而在 ET(邊緣觸發)模式中,它只會提示一次,直到下次再有數據流入之前都不會再提示了,無 論 fd 中是否還有數據可讀。所以在 ET 模式下,read 一個 fd 的時候一定要把它的 buffer 讀光,也就是說一直讀到 read 的返回值小於請求值,或者 遇到 EAGAIN 錯誤。還有一個特點是,epoll 使用 “事件” 的就緒通知方式,通過 epoll_ctl 註冊 fd,一旦該 fd 就緒,內核就會採用類似 callback 的回調機制來激活該 fd,epoll_wait 便可以收到通知。

epoll 爲什麼要有 EPOLLET 觸發模式?

如果採用 EPOLLLT 模式的話,系統中一旦有大量你不需要讀寫的就緒文件描述符,它們每次調用 epoll_wait 都會返回,這樣會大大降低處理程序檢索自己關心的就緒文件描述符的效率.。而採用 EPOLLET 這種邊沿觸發模式的話,當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據全部讀寫完 (如讀寫緩衝區太小),那麼下次調用 epoll_wait() 時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件纔會通知你!!!這種模式比水平觸發效率高,系統不會充斥大量你不關心的就緒文件描述符

epoll 的優點:

1、沒有最大併發連接的限制,能打開的 FD 的上限遠大於 1024(1G 的內存上能監聽約 10 萬個端口);

2、效率提升,不是輪詢的方式,不會隨着 FD 數目的增加效率下降。只有活躍可用的 FD 纔會調用 callback 函數;

即 Epoll 最大的優點就在於它只管你 “活躍” 的連接,而跟連接總數無關,因此在實際的網絡環境中,Epoll 的效率就會遠遠高於 select 和 poll。

3、 內存拷貝,利用 mmap() 文件映射內存加速與內核空間的消息傳遞;即 epoll 使用 mmap 減少複製開銷。

select、poll、epoll 區別總結:

1、支持一個進程所能打開的最大連接數

select

單個進程所能打開的最大連接數有 FD_SETSIZE 宏定義,其大小是 32 個整數的大小(在 32 位的機器上,大小就是 3232,同理 64 位機器上 FD_SETSIZE 爲 3264),當然我們可以對進行修改,然後重新編譯內核,但是性能可能會受到影響,這需要進一步的測試。

poll

poll 本質上和 select 沒有區別,但是它沒有最大連接數的限制,原因是它是基於鏈表來存儲的

epoll

雖然連接數有上限,但是很大,1G 內存的機器上可以打開 10 萬左右的連接,2G 內存的機器可以打開 20 萬左右的連接

2、FD 劇增後帶來的 IO 效率問題

select

因爲每次調用時都會對連接進行線性遍歷,所以隨着 FD 的增加會造成遍歷速度慢的 “線性下降性能問題”。

poll

同上

epoll

因爲 epoll 內核中實現是根據每個 fd 上的 callback 函數來實現的,只有活躍的 socket 纔會主動調用 callback,所以在活躍 socket 較少的情況下,使用 epoll 沒有前面兩者的線性下降的性能問題,但是所有 socket 都很活躍的情況下,可能會有性能問題。

3、 消息傳遞方式

select

內核需要將消息傳遞到用戶空間,都需要內核拷貝動作

poll

同上

epoll

epoll 通過內核和用戶空間共享一塊內存來實現的。

總結:

綜上,在選擇 select,poll,epoll 時要根據具體的使用場合以及這三種方式的自身特點。

1、表面上看 epoll 的性能最好,但是在連接數少並且連接都十分活躍的情況下,select 和 poll 的性能可能比 epoll 好,畢竟 epoll 的通知機制需要很多函數回調。

2、select 低效是因爲每次它都需要輪詢。但低效也是相對的,視情況而定,也可通過良好的設計改善

今天對這三種 IO 多路複用進行對比,參考網上和書上面的資料,整理如下:

1、select 實現

select 的調用過程如下所示:

(1)使用 copy_from_user 從用戶空間拷貝 fd_set 到內核空間

(2)註冊回調函數__pollwait

(3)遍歷所有 fd,調用其對應的 poll 方法(對於 socket,這個 poll 方法是 sock_poll,sock_poll 根據情況會調用到 tcp_poll,udp_poll 或者 datagram_poll)

(4)以 tcp_poll 爲例,其核心實現就是__pollwait,也就是上面註冊的回調函數。

(5)__pollwait 的主要工作就是把 current(當前進程)掛到設備的等待隊列中,不同的設備有不同的等待隊列,對於 tcp_poll 來說,其等待隊列是 sk->sk_sleep(注意把進程掛到等待隊列中並不代表進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時 current 便被喚醒了。

(6)poll 方法返回時會返回一個描述讀寫操作是否就緒的 mask 掩碼,根據這個 mask 掩碼給 fd_set 賦值。

(7)如果遍歷完所有的 fd,還沒有返回一個可讀寫的 mask 掩碼,則會調用 schedule_timeout 是調用 select 的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。如果超過一定的超時時間(schedule_timeout 指定),還是沒人喚醒,則調用 select 的進程會重新被喚醒獲得 CPU,進而重新遍歷 fd,判斷有沒有就緒的 fd。

(8)把 fd_set 從內核空間拷貝到用戶空間。

總結:

select 的幾大缺點:

(1)每次調用 select,都需要把 fd 集合從用戶態拷貝到內核態,這個開銷在 fd 很多時會很大

(2)同時每次調用 select 都需要在內核遍歷傳遞進來的所有 fd,這個開銷在 fd 很多時也很大

(3)select 支持的文件描述符數量太小了,默認是 1024

2 poll 實現

poll 的實現和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 結構而不是 select 的 fd_set 結構,其他的都差不多, 管理多個描述符也是進行輪詢,根據描述符的狀態進行處理,但是 poll 沒有最大文件描述符數量的限制。poll 和 select 同樣存在一個缺點就是,包含大量文件描述符的數組被整體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。

3、epoll

epoll 既然是對 select 和 poll 的改進,就應該能避免上述的三個缺點。那 epoll 都是怎麼解決的呢?在此之前,我們先看一下 epoll 和 select 和 poll 的調用接口上的不同,select 和 poll 都只提供了一個函數——select 或者 poll 函數。而 epoll 提供了三個函數,epoll_create,epoll_ctl 和 epoll_wait,epoll_create 是創建一個 epoll 句柄;epoll_ctl 是註冊要監聽的事件類型;epoll_wait 則是等待事件的產生。

對於第一個缺點,epoll 的解決方案在 epoll_ctl 函數中。每次註冊新的事件到 epoll 句柄中時(在 epoll_ctl 中指定 EPOLL_CTL_ADD),會把所有的 fd 拷貝進內核,而不是在 epoll_wait 的時候重複拷貝。epoll 保證了每個 fd 在整個過程中只會拷貝一次。

對於第二個缺點,epoll 的解決方案不像 select 或 poll 一樣每次都把 current 輪流加入 fd 對應的設備等待隊列中,而只在 epoll_ctl 時把 current 掛一遍(這一遍必不可少)併爲每個 fd 指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的 fd 加入一個就緒鏈表)。epoll_wait 的工作實際上就是在這個就緒鏈表中查看有沒有就緒的 fd(利用 schedule_timeout() 實現睡一會,判斷一會的效果,和 select 實現中的第 7 步是類似的)。

對於第三個缺點,epoll 沒有這個限制,它所支持的 FD 上限是最大可以打開文件的數目,這個數字一般遠大於 2048, 舉個例子, 在 1GB 內存的機器上大約是 10 萬左右,具體數目可以 cat /proc/sys/fs/file-max 察看, 一般來說這個數目和系統內存關係很大。

總結:

(1)select,poll 實現需要自己不斷輪詢所有 fd 集合,直到設備就緒,期間可能要睡眠和喚醒多次交替。而 epoll 其實也需要調用 epoll_wait 不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替,但是它是設備就緒時,調用回調函數,把就緒 fd 放入就緒鏈表中,並喚醒在 epoll_wait 中進入睡眠的進程。雖然都要睡眠和交替,但是 select 和 poll 在 “醒着” 的時候要遍歷整個 fd 集合,而 epoll 在 “醒着” 的時候只要判斷一下就緒鏈表是否爲空就行了,這節省了大量的 CPU 時間。這就是回調機制帶來的性能提升。

(2)select,poll 每次調用都要把 fd 集合從用戶態往內核態拷貝一次,並且要把 current 往設備等待隊列中掛一次,而 epoll 只要一次拷貝,而且把 current 往等待隊列上掛也只掛一次(在 epoll_wait 的開始,注意這裏的等待隊列並不是設備等待隊列,只是一個 epoll 內部定義的等待隊列)。這也能節省不少的開銷。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_tyXtHTHZPzTlwt9z6JKCA