nginx worker 進程循環

        worker 進程啓動後,其首先會初始化自身運行所需要的環境,然後會進入一個循環,在該循環中不斷檢查是否有需要執行的事件,然後處理事件。在這個過程中,worker 進程也是需要與 master 進程交互的,更有甚者,worker 進程作爲一個子進程,也是可以接收命令行指令(比如 kill 等)以進行相應邏輯的處理的。那麼 worker 進程是如何與 master 或者命令行指令進行交互的呢?本文首先會對 worker 進程與 master 進程交互方式,以及 worker 進程如何處理命令行指令的流程進行講解,然後會從源碼上對 worker 進程交互的整個工作流程進行介紹。

1. worker 與 master 進程交互方式

        這裏首先需要說明的是,無論是 master 還是外部命令的方式,nginx 都是通過標誌位的方式來處理相應的指令的,也即在接收到一個指令(無論是 master 還是外部命令)的時候,worker 會在其回調方法中設置與該指令相對應的標誌位,然後在 worker 進程在其自身的循環中處理完事件之後會依次檢查這些標誌位是否爲真,是則根據該標誌位的作用執行相應的邏輯。

        對於 worker 進程與 master 進程的交互,其是通過 socket 管道的方式進行的。在ngx_process.h文件中聲明瞭一個ngx_process_t結構體,這裏我們主要關注其 channel 屬性:

typedef struct {
  	
    
    ngx_socket_t channel[2];
} ngx_process_t;

        這裏的ngx_process_t結構體的作用是存儲某個進程相關的信息的,比如 pid、channel、status 等。每個進程中都有一個ngx_processes數組,數組元素就是這裏的ngx_process_t結構體,也就是說每個進程都會通過ngx_processes數組保存其餘進程的基本信息。其聲明如下:

extern ngx_process_t ngx_processes[NGX_MAX_PROCESSES];

        這裏我們就可以看出,每個進程都會一個與之對應的 channel 數組,這個數組的長度爲 2,其是與 master 進程進行交互的管道流。在 master 進程創建每一個子進程的之前,都會創建一個 channel 數組,該數組的創建方法爲:

int socketpair(int domain, int type, int protocol, int sv[2]);

        這個方法的主要作用是創建一對匿名的已經連接的套接字,也就是說,如果在一個套接字中寫入數據,那麼在另一個套接字中就可以接收到寫入的數據。通過這種方式,如果在父進程中往管道的一邊寫入數據,那麼在子進程就可以在另一邊接收到數據,這樣就可以實現父子進程的數據通信了。

        在 master 進程啓動完子進程之後,子進程會保有 master 進程中相應的數據,也包括這裏的 channel 數組。如此,master 進程就可以通過 channel 數組實現與子進程的通信了。

2. worker 處理外部命令

        對於外部命令,其本質上是通過signals數組中定義的各個信號以及回調方法進行處理的。在 master 進程初始化基本環境的時候,會將signals數組中指定的信號回調方法設置到對應的信號中。由於 worker 進程會繼承 master 進程的基本環境,因而 worker 進程在接收到這裏設置的信號之後,也會調用對應的回調方法。而該回調方法的主要邏輯也僅僅只是設置相應的標誌位的值。關於 nginx 接收到信號之後如何設置對應的標誌位,可以參照本人前面的文章 (nginx master 工作循環 超鏈接),這裏不再贅述。

3. 源碼講解

        master 進程是通過ngx_start_worker_processes()方法啓動各個子進程的,如下是該方法源碼:

static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) {
  ngx_int_t i;
  ngx_channel_t ch;
  
  ngx_memzero(&ch, sizeof(ngx_channel_t));
  ch.command = NGX_CMD_OPEN_CHANNEL;

  for (i = 0; i < n; i++) {

    
    
    
    
    ngx_spawn_process(cycle, ngx_worker_process_cycle, 
                      (void *) (intptr_t) i, "worker process", type);

    
    
    
    
    
    
    
    
    ch.pid = ngx_processes[ngx_process_slot].pid;
    ch.slot = ngx_process_slot;
    ch.fd = ngx_processes[ngx_process_slot].channel[0];

    
    ngx_pass_open_channel(cycle, &ch);
  }
}

        這裏我們主要需要關注上面的啓動子進程的方法調用,也即這裏的ngx_spawn_process()方法,該方法的第二個參數是一個方法,在啓動子進程之後,子進程就會進入該方法所指定的循環中。而在ngx_spawn_process()方法中,master 進程會爲當前新創建的子進程創建一個 channel 數組,以用於與當前子進程進行通信。如下是ngx_spawn_process()方法的源碼:

ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {
  u_long on;
  ngx_pid_t pid;
  ngx_int_t s;

  if (respawn >= 0) {
    s = respawn;

  } else {
    
    
    
    
    for (s = 0; s < ngx_last_process; s++) {
      if (ngx_processes[s].pid == -1) {
        break;
      }
    }

    
    if (s == NGX_MAX_PROCESSES) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                    "no more than %d processes can be spawned",
                    NGX_MAX_PROCESSES);
      return NGX_INVALID_PID;
    }
  }

  
  
  if (respawn != NGX_PROCESS_DETACHED) {

    

    
    
    
    
    
    
    
    
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "socketpair() failed while spawning \"%s\"", name);
      return NGX_INVALID_PID;
    }

    ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                   "channel %d:%d",
                   ngx_processes[s].channel[0],
                   ngx_processes[s].channel[1]);

    
    if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    ngx_nonblocking_n
                        " failed while spawning \"%s\"",
                    name);
      ngx_close_channel(ngx_processes[s].channel, cycle->log);
      return NGX_INVALID_PID;
    }

    
    if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    ngx_nonblocking_n
                        " failed while spawning \"%s\"",
                    name);
      ngx_close_channel(ngx_processes[s].channel, cycle->log);
      return NGX_INVALID_PID;
    }

    on = 1;
    
    if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "ioctl(FIOASYNC) failed while spawning \"%s\"", name);
      ngx_close_channel(ngx_processes[s].channel, cycle->log);
      return NGX_INVALID_PID;
    }

    
    
    
    if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
      ngx_close_channel(ngx_processes[s].channel, cycle->log);
      return NGX_INVALID_PID;
    }

    
    if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
                    name);
      ngx_close_channel(ngx_processes[s].channel, cycle->log);
      return NGX_INVALID_PID;
    }

    
    if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
                    name);
      ngx_close_channel(ngx_processes[s].channel, cycle->log);
      return NGX_INVALID_PID;
    }

    
    
    
    ngx_channel = ngx_processes[s].channel[1];

  } else {
    
    ngx_processes[s].channel[0] = -1;
    ngx_processes[s].channel[1] = -1;
  }

  ngx_process_slot = s;


  
  
  
  
  pid = fork();

  switch (pid) {

    case -1:
      
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "fork() failed while spawning \"%s\"", name);
      ngx_close_channel(ngx_processes[s].channel, cycle->log);
      return NGX_INVALID_PID;

    case 0:
      
      
      ngx_pid = ngx_getpid();
      proc(cycle, data);
      break;

    default:
      
      break;
  }

  ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

  
  ngx_processes[s].pid = pid;
  ngx_processes[s].exited = 0;

  if (respawn >= 0) {
    return pid;
  }

  
  ngx_processes[s].proc = proc;
  ngx_processes[s].data = data;
  ngx_processes[s].name = name;
  ngx_processes[s].exiting = 0;

  switch (respawn) {

    case NGX_PROCESS_NORESPAWN:
      ngx_processes[s].respawn = 0;
      ngx_processes[s].just_spawn = 0;
      ngx_processes[s].detached = 0;
      break;

    case NGX_PROCESS_JUST_SPAWN:
      ngx_processes[s].respawn = 0;
      ngx_processes[s].just_spawn = 1;
      ngx_processes[s].detached = 0;
      break;

    case NGX_PROCESS_RESPAWN:
      ngx_processes[s].respawn = 1;
      ngx_processes[s].just_spawn = 0;
      ngx_processes[s].detached = 0;
      break;

    case NGX_PROCESS_JUST_RESPAWN:
      ngx_processes[s].respawn = 1;
      ngx_processes[s].just_spawn = 1;
      ngx_processes[s].detached = 0;
      break;

    case NGX_PROCESS_DETACHED:
      ngx_processes[s].respawn = 0;
      ngx_processes[s].just_spawn = 0;
      ngx_processes[s].detached = 1;
      break;
  }

  if (s == ngx_last_process) {
    ngx_last_process++;
  }

  return pid;
}

        ngx_spawn_process()方法最後會 fork() 一個子進程以執行其第二個參數所指定的回調方法。但是在這之前,我們需要說明的是,其通過socketpair()方法調用會創建一對匿名的 socket,然後將其存儲在當前進程的 channel 數組中,如此就完成了 channel 數組的創建。

        worker 進程啓動之後會執行ngx_worker_process_cycle()方法,該方法首先會對 worker 進程進行初始化,其中就包括對繼承而來的 channel 數組的處理。由於 master 進程和 worker 進程都保有 channel 數組所指代的 socket 描述符,而本質上 master 進程和各個 worker 進程只需要保有該數組的某一邊的描述符即可。因而這裏 worker 進程在初始化過程中,會關閉其所保存的另一邊的描述符。在 nginx 中,master 進程統一的會保留 channel 數組的 0 號位的 socket 描述符,關閉 1 號位的 socket 描述符,而 worker 進程則會關閉 0 號位的 socket 描述符,保留 1 號位的描述符。這樣 master 進程需要與 worker 進程通信時,就只需要往 channel[0] 中寫入數據,而 worker 進程則會監聽 channel[1],從而接收到 master 進程的數據寫入。這裏我們首先看一下 worker 進程的初始化方法ngx_worker_process_init()的源碼:

static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) {
  sigset_t set;
  ngx_int_t n;
  ngx_time_t *tp;
  ngx_uint_t i;
  ngx_cpuset_t *cpu_affinity;
  struct rlimit rlmt;
  ngx_core_conf_t *ccf;
  ngx_listening_t *ls;

  
  if (ngx_set_environment(cycle, NULL) == NULL) {
    
    exit(2);
  }

  ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

  
  if (worker >= 0 && ccf->priority != 0) {
    if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "setpriority(%d) failed", ccf->priority);
    }
  }

  
  if (ccf->rlimit_nofile != NGX_CONF_UNSET) {
    rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile;
    rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile;

    if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "setrlimit(RLIMIT_NOFILE, %i) failed",
                    ccf->rlimit_nofile);
    }
  }

  
  
  if (ccf->rlimit_core != NGX_CONF_UNSET) {
    rlmt.rlim_cur = (rlim_t) ccf->rlimit_core;
    rlmt.rlim_max = (rlim_t) ccf->rlimit_core;

    if (setrlimit(RLIMIT_CORE, &rlmt) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "setrlimit(RLIMIT_CORE, %O) failed",
                    ccf->rlimit_core);
    }
  }

  
  if (geteuid() == 0) {
    
    if (setgid(ccf->group) == -1) {
      ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                    "setgid(%d) failed", ccf->group);
      
      exit(2);
    }

    
    if (initgroups(ccf->username, ccf->group) == -1) {
      ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                    "initgroups(%s, %d) failed",
                    ccf->username, ccf->group);
    }

    
    if (setuid(ccf->user) == -1) {
      ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                    "setuid(%d) failed", ccf->user);
      
      exit(2);
    }
  }

  
  
  if (worker >= 0) {
    
    cpu_affinity = ngx_get_cpu_affinity(worker);

    if (cpu_affinity) {
      
      ngx_setaffinity(cpu_affinity, cycle->log);
    }
  }


  if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "prctl(PR_SET_DUMPABLE) failed");
  }



  if (ccf->working_directory.len) {
    
    if (chdir((char *) ccf->working_directory.data) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "chdir(\"%s\") failed", ccf->working_directory.data);
      
      exit(2);
    }
  }

  
  sigemptyset(&set);

  
  
  
  
  if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {
    ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                  "sigprocmask() failed");
  }

  tp = ngx_timeofday();
  srandom(((unsigned) ngx_pid << 16) ^ tp->sec ^ tp->msec);

  ls = cycle->listening.elts;
  for (i = 0; i < cycle->listening.nelts; i++) {
    ls[i].previous = NULL;
  }

  
  for (i = 0; cycle->modules[i]; i++) {
    if (cycle->modules[i]->init_process) {
      if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
        
        exit(2);
      }
    }
  }

  
  for (n = 0; n < ngx_last_process; n++) {

    if (ngx_processes[n].pid == -1) {
      continue;
    }

    if (n == ngx_process_slot) {
      continue;
    }

    if (ngx_processes[n].channel[1] == -1) {
      continue;
    }

    if (close(ngx_processes[n].channel[1]) == -1) {
      ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                    "close() channel failed");
    }
  }

  
  if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
    ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                  "close() channel failed");
  }


  ngx_last_process = 0;


  
  
  
  
  
  
  if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
                            ngx_channel_handler)
      == NGX_ERROR) {
    
    exit(2);
  }
}

        該方法主要是對 worker 進程進行初始化,這裏我們主要需要關注最後會遍歷ngx_processes數組,這個數組中保存了當前 nginx 中各個進程的相關信息。在遍歷過程中,會關閉當前進程保有的其餘進程的 channel[1] 句柄,而保留有 channel[0] 句柄,這樣當前進程如果需要與其他進程通信,也只需要往目標進程的 channel[0] 中寫入數據即可。在遍歷完成之後,當前進程就會關閉自身的 channel[0] 句柄,而保留 channel[1] 句柄。最後,會通過ngx_add_channel_event()方法爲當前進程添加對 channel[1] 的監聽事件,這裏在調用ngx_add_channel_event()方法時傳入的第二個參數是ngx_channel,該參數是在前面的ngx_spawn_process()方法中賦值的,指向的就是當前進程的 channel[1] 的 socket 句柄。

        關於ngx_add_channel_event()方法,其本質就是創建一個ngx_event_t結構體的事件,然後將其添加到當前所使用的事件模型(比如 epoll)句柄中。這裏不再贅述該方法的實現源碼,不過我們需要關注的是該事件觸發時的回調方法,即調用ngx_add_channel_event()方法時傳入的第三個參數ngx_channel_handler()方法。如下是該方法的源碼:

static void ngx_channel_handler(ngx_event_t *ev) {
  ngx_int_t n;
  ngx_channel_t ch;
  ngx_connection_t *c;

  if (ev->timedout) {
    ev->timedout = 0;
    return;
  }

  c = ev->data;

  for (;;) {

    
    n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);

    
    if (n == NGX_ERROR) {
      if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
        ngx_del_conn(c, 0);
      }

      ngx_close_connection(c);
      return;
    }

    if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {
      if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) {
        return;
      }
    }

    if (n == NGX_AGAIN) {
      return;
    }

    
    switch (ch.command) {
      
      case NGX_CMD_QUIT:
        ngx_quit = 1;
        break;

        
      case NGX_CMD_TERMINATE:
        ngx_terminate = 1;
        break;

        
      case NGX_CMD_REOPEN:
        ngx_reopen = 1;
        break;

        
      case NGX_CMD_OPEN_CHANNEL:
        ngx_processes[ch.slot].pid = ch.pid;
        ngx_processes[ch.slot].channel[0] = ch.fd;
        break;

        
      case NGX_CMD_CLOSE_CHANNEL:
        if (close(ngx_processes[ch.slot].channel[0]) == -1) {
          ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                        "close() channel failed");
        }

        ngx_processes[ch.slot].channel[0] = -1;
        break;
    }
  }
}

        在ngx_channel_handler()方法中,主要是讀取所監聽的 socket 句柄中的數據,而數據是以一個ngx_channel_t結構體所承載的,這個ngx_channel_t是 nginx 所統一使用的 master 與 worker 進程進行通信的結構體,其會指定當前發生的事件類型,以及發生該事件的進程信息。如下是ngx_channel_t結構體的聲明:

typedef struct {
  	
    ngx_uint_t command;
  	
    ngx_pid_t pid;
  	
    ngx_int_t slot;
  	
    ngx_fd_t fd;
} ngx_channel_t;

       在從當前進程的 channel[1] 中讀取了ngx_channel_t結構體的數據之後,ngx_channel_handler()方法會根據發生的事件類型更新相應的標誌位的狀態,並且會更新當前進程的ngx_processes數組中對應的發生事件的進程的狀態信息。

        在處理了 master 進程所發送的事件之後,worker 進程就會繼續其循環,在該循環中會檢查其所關注的標誌位的狀態,然後會根據這些狀態執行對應的邏輯。如下是 worker 進程工作的循環的源碼:

static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) {
  ngx_int_t worker = (intptr_t) data;

  ngx_process = NGX_PROCESS_WORKER;
  ngx_worker = worker;

  
  ngx_worker_process_init(cycle, worker);

  ngx_setproctitle("worker process");

  for (;;) {

    if (ngx_exiting) {
      
      
      
      
      if (ngx_event_no_timers_left() == NGX_OK) {
        ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
        ngx_worker_process_exit(cycle);
      }
    }

    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

    
    
    ngx_process_events_and_timers(cycle);

    
    if (ngx_terminate) {
      ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
      ngx_worker_process_exit(cycle);
    }

    
    
    
    
    
    
    if (ngx_quit) {
      ngx_quit = 0;
      ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down");
      ngx_setproctitle("worker process is shutting down");

      if (!ngx_exiting) {
        ngx_exiting = 1;
        ngx_set_shutdown_timer(cycle);
        ngx_close_listening_sockets(cycle);
        ngx_close_idle_connections(cycle);
      }
    }

    
    if (ngx_reopen) {
      ngx_reopen = 0;
      ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
      ngx_reopen_files(cycle, -1);
    }
  }
}

        可以看到,worker 進程主要處理了 nginx 是否退出相關的標誌位,還處理了 nginx 是否重新讀取了配置文件的標誌位。

4. 小結

        本文首先對 master-worker 進程交互的基本原理進行了講解,然後深入到源碼中講解了 nginx 是如何實現 master 和 worker 進程的相互通信的。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://my.oschina.net/zhangxufeng/blog/3163109