nginx 基於 epoll 模型事件驅動流程詳解

epoll 是一種基於事件驅動的模型,其是 nginx 能夠高效處理客戶端請求的重要原因之一。從流程上來講,epoll 模型的使用主要分爲三步:epoll 句柄的創建,監聽文件描述符的添加和等待事件的觸發,本文將介紹 nginx 是如何基於這三個步驟實現客戶端請求的高效處理的。

1. epoll 模型介紹

        在介紹 nginx 的實現原理之前,我們首先需要介紹一下 epoll 模型的基本使用方式。epoll 在使用的時候主要有三個方法:

// 等待需要監聽的文件描述符上對應的事件的發生
int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);

        首先,我們會調用epoll_create()方法創建一個 epoll 實例的句柄,可以將這裏的句柄理解爲一個 eventpoll 結構體實例,而這個結構體中有一個紅黑樹和一個隊列,紅黑樹中主要存儲需要監聽的文件描述符,而隊列則是在所監聽的文件描述符中有指定的事件發生時就會將這些事件添加到隊列中,如下圖所示爲 eventpoll 的示意圖:

        一般來說,這個 epoll 句柄在程序的整個運行週期中只會有一個,比如 nginx 每個 worker 進程就都只維護了一個 epoll 句柄。在創建完句柄之後,對於我們的程序監聽的每一個端口,其實本質上也都是一個文件描述符,這個文件描述符上是可以發生 Accept 事件,也即接收到客戶端請求的。因而,初始時,我們會將需要監聽的端口對應的文件描述符通過epoll_ctl()方法添加到 epoll 句柄中。添加成功之後,這每一個監聽的文件描述符就對應了 eventpoll 的紅黑樹中的一個節點。另外,在調用epoll_ctl()方法添加了文件描述符之後,會將其與相應的設備(網卡)進行關聯,當設備驅動發生某個事件時,就會回調當前文件描述符的回調方法ep_poll_callback(),從而生成一個事件,並且將該事件添加到 eventpoll 的事件隊列中。最後,當我們調用epoll_wait()方法時,就會從 epoll 句柄中獲取對應的事件,本質上就是檢查 eventpoll 的事件隊列是否爲空,如果有事件則將其返回,否則就會等待事件的發生。另外,對於 epoll 的使用,這裏獲取的事件一般都是 Accept 事件,而在處理這個事件的時候,會獲取客戶端的連接的句柄,這個句柄本質上也是一個文件描述符,此時我們則會將其繼續通過epoll_ctl()方法添加到當前的 epoll 句柄中,以繼續通過epoll_wait()方法等待其數據的讀取和寫入事件。

        通過這裏我們可以看出,在 epoll 使用的過程中,會有兩類文件描述符,一類是我們所監聽的端口所對應的文件描述符,這類描述符我們一般監聽其 Accept 事件,以等待客戶端連接,另一類則是每個客戶端連接所對應的一個文件描述符,而這裏描述符我們一般監聽其讀寫事件以接收和發送數據給客戶端。

2. nginx 中 epoll 實現方式

        在前面的文章中,我們講解了 nginx 是如何初始化事件驅動框架的,其中講到事件框架的一個核心模塊的定義如下:

ngx_module_t ngx_event_core_module = {
    NGX_MODULE_V1,
    &ngx_event_core_module_ctx,            /* module context */
    ngx_event_core_commands,               /* module directives */
    NGX_EVENT_MODULE,                      /* module type */
    NULL,                                  /* init master */
    // 該方法主要是在master進程啓動的過程中調用的,用於初始化時間模塊
    ngx_event_module_init,                 /* init module */
    // 該方法是在各個worker進程啓動之後調用的
    ngx_event_process_init,                /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

        這裏我們需要特別注意一下ngx_event_process_init()方法,我們講到,這個方法是在每個 worker 創建的時候進行初始化調用的,這裏面就涉及到兩個非常重要的調用:a. 進行對應的事件模型的初始化;b. 監聽配置文件中指定的各個端口。如下是這兩個步驟的主要代碼:

static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) {
  // 省略部分代碼....
  
  // 在nginx.conf配置文件的events{}配置塊中需要使用use指令指定當前使用的事件模型,
  // 此時就會將所使用的事件模型的索引號存儲在ecf->use中,下面的代碼就是通過這種方式獲取當前
  // 所指定的事件模型所對應的模塊的,然後調用該模塊的actions.init()方法初始化該事件模型
  for (m = 0; cycle->modules[m]; m++) {
    if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
      continue;
    }

    // ecf->use存儲了所選用的事件模型的模塊序號,這裏是找到該模塊
    if (cycle->modules[m]->ctx_index != ecf->use) {
      continue;
    }

    // module即爲所選用的事件模型對應的模塊
    module = cycle->modules[m]->ctx;

    // 調用指定事件模型的初始化方法
    if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
      exit(2);
    }

    break;
  }

  // 省略部分代碼...
  
  ls = cycle->listening.elts;
  for (i = 0; i < cycle->listening.nelts; i++) {

#if (NGX_HAVE_REUSEPORT)
    if (ls[i].reuseport && ls[i].worker != ngx_worker) {
      continue;
    }
#endif

    // 這裏是爲當前所監聽的每一個端口都綁定一個ngx_connection_t結構體
    c = ngx_get_connection(ls[i].fd, cycle->log);

    if (c == NULL) {
      return NGX_ERROR;
    }

    rev = c->read;

    // SOCK_STREAM表示TCP,一般都是TCP,也就是說在接收到客戶端的accept事件之後,
    // 就會調用ngx_event_accept()方法處理該事件
    rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg;

    if ((ngx_event_flags & NGX_USE_EPOLL_EVENT) && ccf->worker_processes > 1) {
        if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT) == NGX_ERROR) {
            return NGX_ERROR;
        }

        continue;
    }
  }

  return NGX_OK;
}

        對這裏的代碼主要完成了如下幾部分的工作:

        這裏我們首先看一下上面第一點中介紹的module->actions.init(cycle, ngx_timer_resolution)方法調用時是如何初始化 epoll 模塊的。由於是 epoll 模塊,這裏的init()方法指向的就是ngx_epoll_init()方法,如下是該方法的源碼:

static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) {
  ngx_epoll_conf_t *epcf;

  // 獲取解析得到的ngx_epoll_conf_t結構體
  epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

  if (ep == -1) {
    // 創建eventpoll結構體,將創建得到的文件描述符返回
    ep = epoll_create(cycle->connection_n / 2);

    // ep==-1表示創建失敗
    if (ep == -1) {
      ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                    "epoll_create() failed");
      return NGX_ERROR;
    }
  }

  // 如果nevents小於epcf->events,說明event_list數組的長度不夠,因而需要重新申請內存空間
  if (nevents < epcf->events) {
    if (event_list) {
      ngx_free(event_list);
    }

    // 爲event_list重新申請內存空間
    event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log);
    if (event_list == NULL) {
      return NGX_ERROR;
    }
  }

  // 將nevents更新爲配置文件中指定的大小
  nevents = epcf->events;

  ngx_io = ngx_os_io;

  // 這裏將epoll相關的事件操作方法賦值給ngx_event_actions,也就是說後續有相關的事件發生則
  // 都會使用epoll相關的方法
  ngx_event_actions = ngx_epoll_module_ctx.actions;

  // 這裏NGX_USE_CLEAR_EVENT指的是使用ET模式來使用epoll,默認使用ET模式,
  // 而NGX_USE_LEVEL_EVENT表示使用LE模式來使用epoll
#if (NGX_HAVE_CLEAR_EVENT)
  ngx_event_flags = NGX_USE_CLEAR_EVENT
                    #else
                    ngx_event_flags = NGX_USE_LEVEL_EVENT
                    #endif
                        // NGX_USE_GREEDY_EVENT表示每次拉取事件是都嘗試拉取最多的事件
                    | NGX_USE_GREEDY_EVENT
                    | NGX_USE_EPOLL_EVENT;

  return NGX_OK;
}

        可以看到,這裏的ngx_epoll_init()方法主要的作用有兩個:a. 通過epoll_create()方法創建一個 epoll 句柄;b. 設置ngx_event_actions屬性所指向的方法的實現,從而確定ngx_add_event()等宏的實現方法。下面我們來看一下ngx_add_event()是如何將需要監聽的文件描述符添加到 epoll 句柄中的:

static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) {
  int op;
  uint32_t events, prev;
  ngx_event_t *e;
  ngx_connection_t *c;
  struct epoll_event ee;

  // ev->data在使用的過程中存儲的是當前對應的ngx_connection_t,如果是free_connection,
  // 則存儲的是下一個節點的指針
  c = ev->data;

  // 事件類型
  events = (uint32_t) event;

  // 如果是讀事件
  if (event == NGX_READ_EVENT) {
    e = c->write;
    prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN | EPOLLRDHUP)
    events = EPOLLIN | EPOLLRDHUP;  // 設置讀事件類型
#endif

  } else {
    e = c->read;
    prev = EPOLLIN | EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
    events = EPOLLOUT;  // 設置寫事件類型
#endif
  }

  // 根據active標誌位確定是否爲活躍事件,以決定到底是修改還是添加事件
  if (e->active) {
    op = EPOLL_CTL_MOD; // 類型爲修改事件
    events |= prev;

  } else {
    op = EPOLL_CTL_ADD; // 類型爲添加事件
  }

#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
  if (flags & NGX_EXCLUSIVE_EVENT) {
      events &= ~EPOLLRDHUP;
  }
#endif

  // 將flags參數指定的事件添加到監聽列表中
  ee.events = events | (uint32_t) flags;
  // 這裏是將connection指針的最後一位賦值爲ev->instance,然後將其賦值給事件的ptr屬性,通過這種方式檢測事件是否過期
  ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

  ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                 "epoll add event: fd:%d op:%d ev:%08XD",
                 c->fd, op, ee.events);

  // 將事件添加到epoll句柄中
  if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                  "epoll_ctl(%d, %d) failed", op, c->fd);
    return NGX_ERROR;
  }

  // 將事件標記爲活躍狀態
  ev->active = 1;
#if 0
  ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif

  return NGX_OK;
}

        這裏的ngx_add_event()方法本質上是比較簡單的,就是將當前的ngx_event_t轉換爲一個epoll_event結構體,並且會設置該結構體中需要監聽的事件類型,然後通過epoll_ctl()方法將當前epoll_event添加到 epoll 句柄中。

        在前面的ngx_event_process_init()方法中,nginx 通過ngx_add_event()方法將各個監聽的端口的描述符添加到 epoll 句柄中之後,就會開始監聽這些描述符上的 accept 連接事件,如果有客戶端連接請求,此時就會回調ngx_event_accept()方法處理該請求,我們來看一下該方法是如何處理客戶端建立連接的請求的:

/**
 * 當客戶端有accept事件到達時,將調用此方法處理該事件
 */
void ngx_event_accept(ngx_event_t *ev) {
  socklen_t socklen;
  ngx_err_t err;
  ngx_log_t *log;
  ngx_uint_t level;
  ngx_socket_t s;
  ngx_event_t *rev, *wev;
  ngx_sockaddr_t sa;
  ngx_listening_t *ls;
  ngx_connection_t *c, *lc;
  ngx_event_conf_t *ecf;
#if (NGX_HAVE_ACCEPT4)
  static ngx_uint_t  use_accept4 = 1;
#endif

  if (ev->timedout) {
    // 如果當前事件超時了,則繼續將其添加到epoll句柄中以監聽accept事件
    if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
      return;
    }

    ev->timedout = 0;
  }

  // 獲取解析event核心配置結構體
  ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);

  if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
    ev->available = ecf->multi_accept;
  }

  lc = ev->data;
  ls = lc->listening;
  ev->ready = 0;

  do {
    socklen = sizeof(ngx_sockaddr_t);

#if (NGX_HAVE_ACCEPT4)
    if (use_accept4) {
        s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
    } else {
        s = accept(lc->fd, &sa.sockaddr, &socklen);
    }
#else
    // 這裏lc->fd指向的是監聽的文件句柄,調用accept()獲取客戶端的連接,並且將其存儲到sa.sockaddr中
    s = accept(lc->fd, &sa.sockaddr, &socklen);
#endif

    // 檢查當前進程獲取的連接個數是否超過了最大可用連接數的7/8,是則不再繼續接收連接
    ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

    // 獲取新的連接
    c = ngx_get_connection(s, ev->log);

    // 獲取連接失敗則直接返回
    if (c == NULL) {
      if (ngx_close_socket(s) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                      ngx_close_socket_n
                          " failed");
      }

      return;
    }

    // 標記當前爲TCP連接
    c->type = SOCK_STREAM;

    // 爲當前連接創建連接池
    c->pool = ngx_create_pool(ls->pool_size, ev->log);
    if (c->pool == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }

    // 更新socklen的長度
    if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {
      socklen = sizeof(ngx_sockaddr_t);
    }

    // 爲sockaddr申請內存空間,並且將客戶端連接地址複製到c->sockaddr中
    c->sockaddr = ngx_palloc(c->pool, socklen);
    if (c->sockaddr == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }

    ngx_memcpy(c->sockaddr, &sa, socklen);

    // 申請ngx_log_t結構體的內存空間
    log = ngx_palloc(c->pool, sizeof(ngx_log_t));
    if (log == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }

    /* set a blocking mode for iocp and non-blocking mode for others */

    if (ngx_inherited_nonblocking) {
      if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
        // 將連接設置爲阻塞模式
        if (ngx_blocking(s) == -1) {
          ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                        ngx_blocking_n
                            " failed");
          ngx_close_accepted_connection(c);
          return;
        }
      }

    } else {
      if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
        // 將連接設置爲非阻塞模式
        if (ngx_nonblocking(s) == -1) {
          ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                        ngx_nonblocking_n
                            " failed");
          ngx_close_accepted_connection(c);
          return;
        }
      }
    }

    *log = ls->log;

    // 設置連接的基本屬性
    c->recv = ngx_recv;
    c->send = ngx_send;
    c->recv_chain = ngx_recv_chain;
    c->send_chain = ngx_send_chain;

    c->log = log;
    c->pool->log = log;

    c->socklen = socklen;
    c->listening = ls;
    c->local_sockaddr = ls->sockaddr;
    c->local_socklen = ls->socklen;

#if (NGX_HAVE_UNIX_DOMAIN)
    if (c->sockaddr->sa_family == AF_UNIX) {
      c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
      c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
      /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
      c->sendfile = 0;
#endif
    }
#endif

    rev = c->read;
    wev = c->write;

    wev->ready = 1;

    if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
      rev->ready = 1;
    }

    if (ev->deferred_accept) {
      rev->ready = 1;
#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)
      rev->available = 1;
#endif
    }

    rev->log = log;
    wev->log = log;

    // 更新連接使用次數
    c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

    // 將網絡地址更新爲字符串形式的地址
    if (ls->addr_ntop) {
      c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
      if (c->addr_text.data == NULL) {
        ngx_close_accepted_connection(c);
        return;
      }

      c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
                                       c->addr_text.data,
                                       ls->addr_text_max_len, 0);
      if (c->addr_text.len == 0) {
        ngx_close_accepted_connection(c);
        return;
      }
    }

#if (NGX_DEBUG)
    {
    ngx_str_t  addr;
    u_char     text[NGX_SOCKADDR_STRLEN];

    ngx_debug_accepted_connection(ecf, c);

    if (log->log_level & NGX_LOG_DEBUG_EVENT) {
        addr.data = text;
        addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
                                 NGX_SOCKADDR_STRLEN, 1);

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0,
                       "*%uA accept: %V fd:%d", c->number, &addr, s);
    }

    }
#endif

    // 將當前連接添加到epoll句柄中進行監控
    if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
      if (ngx_add_conn(c) == NGX_ERROR) {
        ngx_close_accepted_connection(c);
        return;
      }
    }

    log->data = NULL;
    log->handler = NULL;

    // 建立新連接之後的回調方法
    ls->handler(c);

    if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
      ev->available--;
    }

  } while (ev->available);
}

        這裏客戶端連接的建立過程主要可以分爲如下幾個步驟:

        如此我們就講解了從 epoll 句柄的創建,到指定的端口的監聽,接着處理客戶端連接,並且將客戶端連接對應的文件描述符繼續添加到 epoll 句柄中以監聽讀寫事件的流程。下面我們繼續來看一下 nginx 是如何等待所監聽的這些句柄上的事件的發生的,也即整個事件框架的驅動程序。worker 進程對於事件的處理,主要在ngx_process_events_and_timers()方法中,如下是該方法的源碼:

void ngx_process_events_and_timers(ngx_cycle_t *cycle) {
	// 嘗試獲取共享鎖
  if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
    return;
  }

  // 這裏開始處理事件,對於kqueue模型,其指向的是ngx_kqueue_process_events()方法,
  // 而對於epoll模型,其指向的是ngx_epoll_process_events()方法
  // 這個方法的主要作用是,在對應的事件模型中獲取事件列表,然後將事件添加到ngx_posted_accept_events
  // 隊列或者ngx_posted_events隊列中
  (void) ngx_process_events(cycle, timer, flags);

  // 這裏開始處理accept事件,將其交由ngx_event_accept.c的ngx_event_accept()方法處理;
  ngx_event_process_posted(cycle, &ngx_posted_accept_events);

  // 開始釋放鎖
  if (ngx_accept_mutex_held) {
    ngx_shmtx_unlock(&ngx_accept_mutex);
  }

  // 如果不需要在事件隊列中進行處理,則直接處理該事件
  // 對於事件的處理,如果是accept事件,則將其交由ngx_event_accept.c的ngx_event_accept()方法處理;
  // 如果是讀事件,則將其交由ngx_http_request.c的ngx_http_wait_request_handler()方法處理;
  // 對於處理完成的事件,最後會交由ngx_http_request.c的ngx_http_keepalive_handler()方法處理。

  // 這裏開始處理除accept事件外的其他事件
  ngx_event_process_posted(cycle, &ngx_posted_events);
}

        這裏的ngx_process_events_and_timers()方法我們省略了大部分代碼,只留下了主要的流程。簡而言之,其主要實現瞭如下幾個步驟的工作:

        下面我們來看一下ngx_epoll_process_events()方法是如何處理 epoll 句柄中的事件的:

static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) {
  int events;
  uint32_t revents;
  ngx_int_t instance, i;
  ngx_uint_t level;
  ngx_err_t err;
  ngx_event_t *rev, *wev;
  ngx_queue_t *queue;
  ngx_connection_t *c;

  /* NGX_TIMER_INFINITE == INFTIM */

  ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                 "epoll timer: %M", timer);

  // 通過epoll_wait()方法進行事件的獲取,獲取到的事件將存放在event_list中,並且會將獲取的事件個數返回
  events = epoll_wait(ep, event_list, (int) nevents, timer);

  err = (events == -1) ? ngx_errno : 0;

  // 這裏的ngx_event_timer_alarm是通過一個定時器任務來觸發的,在定時器中會將其置爲1,
  // 從而實現定期更新nginx緩存的時間的目的
  if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
    ngx_time_update();
  }

  if (err) {
    if (err == NGX_EINTR) {

      if (ngx_event_timer_alarm) {
        ngx_event_timer_alarm = 0;
        return NGX_OK;
      }

      level = NGX_LOG_INFO;

    } else {
      level = NGX_LOG_ALERT;
    }

    ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
    return NGX_ERROR;
  }

  // 獲取的事件個數爲0
  if (events == 0) {
    // 如果當前時間類型不爲NGX_TIMER_INFINITE,說明獲取事件超時了,則直接返回
    if (timer != NGX_TIMER_INFINITE) {
      return NGX_OK;
    }

    // 這裏說明時間類型爲NGX_TIMER_INFINITE,但是卻返回了0個事件,說明epoll_wait()調用出現了問題
    ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                  "epoll_wait() returned no events without timeout");
    return NGX_ERROR;
  }

  // 遍歷各個事件
  for (i = 0; i < events; i++) {
    // 每個事件的data.ptr中存儲了當前事件對應的connection對象
    c = event_list[i].data.ptr;

    // 獲取事件中存儲的instance的值
    instance = (uintptr_t) c & 1;
    // 獲取connection指針地址值
    c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

    // 獲取讀事件結構體
    rev = c->read;

    // 如果當前連接的文件描述符爲-1,獲取其instance不等於當前事件的instance,
    // 說明該連接已經過期了,則不對該事件進行處理
    if (c->fd == -1 || rev->instance != instance) {

      /*
       * the stale event from a file descriptor
       * that was just closed in this iteration
       */

      ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                     "epoll: stale event %p", c);
      continue;
    }

    // 獲取當前事件監聽的類型
    revents = event_list[i].events;

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll: fd:%d ev:%04XD d:%p",
                   c->fd, revents, event_list[i].data.ptr);

    // 如果事件發生錯誤,則打印相應的日誌
    if (revents & (EPOLLERR | EPOLLHUP)) {
      ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                     "epoll_wait() error on fd:%d ev:%04XD",
                     c->fd, revents);

      /*
       * if the error events were returned, add EPOLLIN and EPOLLOUT
       * to handle the events at least in one active handler
       */

      revents |= EPOLLIN | EPOLLOUT;
    }

#if 0
    if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "strange epoll_wait() events fd:%d ev:%04XD",
                      c->fd, revents);
    }
#endif

    // 如果當前是讀事件,並且事件是活躍的
    if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
      if (revents & EPOLLRDHUP) {
          rev->pending_eof = 1;
      }

      rev->available = 1;
#endif

      // 將事件標記爲就緒狀態
      rev->ready = 1;

      // 默認是開啓了NGX_POST_EVENTS開關的
      if (flags & NGX_POST_EVENTS) {
        // 如果當前是accept事件,則將其添加到ngx_posted_accept_events隊列中,
        // 如果是讀寫事件,則將其添加到ngx_posted_events隊列中
        queue = rev->accept ? &ngx_posted_accept_events
                            : &ngx_posted_events;

        ngx_post_event(rev, queue);

      } else {
        // 如果不需要分離accept和讀寫事件,則直接處理該事件
        rev->handler(rev);
      }
    }

    // 獲取寫事件結構體
    wev = c->write;

    if ((revents & EPOLLOUT) && wev->active) {

      // 如果當前連接的文件描述符爲-1,獲取其instance不等於當前事件的instance,
      // 說明該連接已經過期了,則不對該事件進行處理
      if (c->fd == -1 || wev->instance != instance) {

        /*
         * the stale event from a file descriptor
         * that was just closed in this iteration
         */

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: stale event %p", c);
        continue;
      }

      // 將當前事件標記爲就緒狀態
      wev->ready = 1;
#if (NGX_THREADS)
      wev->complete = 1;
#endif

      // 由於是寫事件,並且需要標記爲了NGX_POST_EVENTS狀態,
      // 因而將其直接添加到ngx_posted_events隊列中,否則直接處理該事件
      if (flags & NGX_POST_EVENTS) {
        ngx_post_event(wev, &ngx_posted_events);

      } else {
        wev->handler(wev);
      }
    }
  }

  return NGX_OK;
}

        這裏ngx_epoll_process_events()方法首先就是調用epoll_wait()方法獲取所監聽的句柄的事件,然後遍歷獲取的事件,根據事件的類型,如果是 accept 事件,則添加到ngx_posted_accept_events隊列中,如果是讀寫事件,則添加到ngx_posted_events隊列中,而隊列中事件的處理,則在上面介紹的ngx_process_events_and_timers()方法中進行。

4. 小結

        本文首先對 epoll 模型的實現原理進行了講解,然後從源碼的層面對 nginx 是如何基於 epoll 模型實現事件驅動模式的原理進行了講解。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/6BZExEjShtfWXR7yM-2edw