Node-js 是如何跑起來的

一個 TCP 連接的案例

TCP 服務端

const net = require('net');

const server = new net.Server();

server.listen(9999, '127.0.0.1'() ={
   console.log(`server is listening on ${server.address().address}:${server.address().port}`);
});

server.on('connection'(socket) ={
   server.getConnections((err, connections) ={
    console.log('current clients is: ', connections);
   });

    socket.on('data'(data) ={
        console.log(`received data: ${data.toString()}`);
    });
});

TCP 客戶端

const net = require('net');

const client = new net.Socket();

client.connect({
    port: 9999,
    address: '127.0.0.1',
});

client.on('connect'() ={
   console.log('connect success');

    client.write(`Hello Server!, I'${Math.round(Math.random() * 100)}`);
});

疑問❓

核心架構

NodeJS 架構

NodeJS 源碼分爲三層:JS、C++ 以及 C。

JS 層

JS 層提供面向用戶的調用底層能力的接口,即各種 NodeJS 原生模塊,如 net、http、fs、DNS 以及 path 等

C++ 層

C++ 層主要通過 V8 爲 JS 層提供與底層交互的能力,起到類似橋樑的作用,通過 V8 不僅實現 JS 的解釋執行,還擴展的 JS 的能力邊界

C 層

C 層主要包括 Libuv 這一跨平臺的異步 IO 庫以及其他第三方 C 庫

啓動過程

分析

  1. 註冊 C++ 模塊

RegisterBuiltinModules 函數的作用是註冊一系列 C++ 模塊,通過宏定義展開,最終變成如下邏輯:

   void RegisterBuiltinModules() {  
      _register_async_wrap();  
      _register_buffer();
      _register_fs();
      _register_url();
      // ...
    }

通過註冊函數,將各個 C++ 模塊維護在 modlist_internal 這一鏈表中,後續在原生 JS 模塊中調用 C++ 模塊時就可以根據模塊名找到對應的模塊。

  1. 創建 Environment 對象

Environment 在 NodeJS 中是一個運行時的環境對象,很多全局變量都託管在該類上,創建完 environment 後,就將其和 Context 進行綁定,後續 V8 可通過 context 獲取 env 對象。

下面簡單介紹一下 V8 的 isolate 、 context、scope、handle 等對象。

isolate 是一個獨立隔離實例的環境,同一時刻只能被一個線程進入;

context 可以理解爲執行上下文對象,可以導入不同的環境變量和函數;

Scope 指的是作用域,可看成是句柄的容器,一個作用域裏面可以有很多個句柄;

HandleScope 是用來管理 Handle 的,而 Context::Scope 僅僅用來管理 Context 對象。

Handle 是 V8 引用對象的技術手段,Handle 分爲 Local 和 Persistent 兩種。Local 是局部的,它同時被 HandleScope 進行管理。 persistent,類似於全局的,不受 HandleScope 的管理,其作用域可以延伸到不同的函數。

  1. 初始化 loader 和執行上下文

RunBootstrapping 主要調用了 BootstrapInternalLoaders 和 BootstrapNode 函數。

BootstrapInternalLoaders 用於編譯執行 /lib/internal/bootstrap/loader.js,它的具體邏輯是爲了 NodeJS 能在 JS 層 通過 binding 函數加載 C++ 模塊,以便在原生 JS 模塊中調用 C++ 模塊。

BootstrapNode 用於初始化執行上下文,暴露 global 對象在全局上下文中,編譯執行 /lib/internal/bootstrap/node,從而設置一些全局變量或方法到 global 或者 process

// lib/internal/bootstrap/node.js

// proces 掛載一系列屬性方法
{
  process.dlopen = rawMethods.dlopen;
  process.uptime = rawMethods.uptime;

  // TODO(joyeecheung): either remove them or make them public
  process._getActiveRequests = rawMethods._getActiveRequests;
  process._getActiveHandles = rawMethods._getActiveHandles;

  // TODO(joyeecheung): remove these
  process.reallyExit = rawMethods.reallyExit;
  process._kill = rawMethods._kill;

  const wrapped = perThreadSetup.wrapProcessMethods(rawMethods);
  process._rawDebug = wrapped._rawDebug;
  process.hrtime = wrapped.hrtime;
  process.hrtime.bigint = wrapped.hrtimeBigInt;
  process.cpuUsage = wrapped.cpuUsage;
  process.resourceUsage = wrapped.resourceUsage;
  process.memoryUsage = wrapped.memoryUsage;
  process.kill = wrapped.kill;
  process.exit = wrapped.exit;

  process.openStdin = function() {
    process.stdin.resume();
    return process.stdin;
  };
}
// global 掛載一系列屬性和方法
if (!config.noBrowserGlobals) {
  // Override global console from the one provided by the VM
  // to the one implemented by Node.js
  // https://console.spec.whatwg.org/#console-namespace
  exposeNamespace(global, 'console', createGlobalConsole(global.console));

  const { URL, URLSearchParams } = require('internal/url');
  // https://url.spec.whatwg.org/#url
  exposeInterface(global, 'URL', URL);
  // https://url.spec.whatwg.org/#urlsearchparams
  exposeInterface(global, 'URLSearchParams', URLSearchParams);

  const {
    TextEncoder, TextDecoder
  } = require('internal/encoding');
  // https://encoding.spec.whatwg.org/#textencoder
  exposeInterface(global, 'TextEncoder', TextEncoder);
  // https://encoding.spec.whatwg.org/#textdecoder
  exposeInterface(global, 'TextDecoder', TextDecoder);

  // https://html.spec.whatwg.org/multipage/webappapis.html#windoworworkerglobalscope
  const timers = require('timers');
  defineOperation(global, 'clearInterval', timers.clearInterval);
  defineOperation(global, 'clearTimeout', timers.clearTimeout);
  defineOperation(global, 'setInterval', timers.setInterval);
  defineOperation(global, 'setTimeout', timers.setTimeout);

  defineOperation(global, 'queueMicrotask', queueMicrotask);

  // Non-standard extensions:
  defineOperation(global, 'clearImmediate', timers.clearImmediate);
  defineOperation(global, 'setImmediate', timers.setImmediate);
}
// ...
  1. 初始化 Libuv

這裏對事件循環的部分階段做一些初始化的操作,創建一個默認的 event_loop 結構體用於管理後續各個階段產生的任務

void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
  HandleScope handle_scope(isolate());
  Context::Scope context_scope(context());

  CHECK_EQ(0, uv_timer_init(event_loop(), timer_handle()));
  uv_unref(reinterpret_cast<uv_handle_t*>(timer_handle()));

  uv_check_init(event_loop(), immediate_check_handle());
  uv_unref(reinterpret_cast<uv_handle_t*>(immediate_check_handle()));

  uv_idle_init(event_loop(), immediate_idle_handle());

  uv_check_start(immediate_check_handle(), CheckImmediate);
  uv_prepare_init(event_loop()&idle_prepare_handle_);
  uv_check_init(event_loop()&idle_check_handle_);
  uv_async_init(
      event_loop(),
      &task_queues_async_,
      [](uv_async_t* async) {
        Environment* env = ContainerOf(
            &Environment::task_queues_async_, async);
        env->CleanupFinalizationGroups();
        env->RunAndClearNativeImmediates();
      });
  uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
  uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
  uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
  // ...
}
  1. 執行用戶 JS 代碼

StartExecution 用於加載用戶 JS 代碼並執行

// src/node.cc
MaybeLocal<Value> StartExecution(Environment* env, StartExecutionCallback cb) {
  // ...
  if (!first_argv.empty() && first_argv != "-") {
    return StartExecution(env, "internal/main/run_main_module");
  }
  // ...
}
// lib/internal/main/run_main_module.js
require('internal/modules/cjs/loader').Module.runMain(process.argv[1]);
  1. 進入 Libuv 事件循環

執行完用戶 JS 代碼,用戶代碼就會往 Libuv 中註冊一些任務,然後進入整個事件循環,直到沒有待處理的任務,Libuv 則會退出事件循環,進而退出 NodeJS 進程。

  // src/node_main_instance.cc
  do {
    uv_run(env->event_loop(), UV_RUN_DEFAULT);

    per_process::v8_platform.DrainVMTasks(isolate_);

    more = uv_loop_alive(env->event_loop());
    if (more && !env->is_stopping()) continue;

    if (!uv_loop_alive(env->event_loop())) {
      EmitBeforeExit(env.get());
    }

    // Emit `beforeExit` if the loop became alive either after emitting
    // event, or after running some callbacks.
    more = uv_loop_alive(env->event_loop());
  } while (more == true && !env->is_stopping());

源代碼概覽

// src/node_main.cc
int main(int argc, char* argv[]) {
    return node::Start(argc, argv);
}
// src/node.cc
namespace node {
    int Start(int argc, char** argv) {
        
        InitializationResult result = InitializeOncePerProcess(argc, argv);
        // ...
        
        NodeMainInstance main_instance(¶ms,
                               uv_default_loop(),
                               per_process::v8_platform.Platform(),
                               result.args,
                               result.exec_args,
                               indexes);
        result.exit_code = main_instance.Run();
    }
    
    InitializationResult InitializeOncePerProcess(int argc, char** argv) {
        // ...
      {
        result.exit_code =
            InitializeNodeWithArgs(&(result.args)&(result.exec_args)&errors);
            //...
      }
    
      V8::Initialize();
      return result;
    }
    
    int InitializeNodeWithArgs(std::vector<std::string>* argv,
                               std::vector<std::string>* exec_argv,
                               std::vector<std::string>* errors) {
      
      // ...
      // Register built-in modules
      binding::RegisterBuiltinModules();
      // ...
    }
    
    MaybeLocal<Value> Environment::RunBootstrapping() {
      EscapableHandleScope scope(isolate_);
    
      //...
      if (BootstrapInternalLoaders().IsEmpty()) {
        return MaybeLocal<Value>();
      }
    
      Local<Value> result;
      if (!BootstrapNode().ToLocal(&result)) {
        return MaybeLocal<Value>();
      }
      
      //...
    
      return scope.Escape(result);
    }
}
// src/node_main_instance.cc
namespace node {
    int NodeMainInstance::Run() {
      // ...
      DeleteFnPtr<Environment, FreeEnvironment> env =
          CreateMainEnvironment(&exit_code);
    
      if (exit_code == 0) {
        LoadEnvironment(env.get());
        // ...
        {
          // ...
          do {
            uv_run(env->event_loop(), UV_RUN_DEFAULT);
    
            per_process::v8_platform.DrainVMTasks(isolate_);
    
            more = uv_loop_alive(env->event_loop());
            if (more && !env->is_stopping()) continue;
    
            if (!uv_loop_alive(env->event_loop())) {
              EmitBeforeExit(env.get());
            }
    
            // Emit `beforeExit` if the loop became alive either after emitting
            // event, or after running some callbacks.
            more = uv_loop_alive(env->event_loop());
          } while (more == true && !env->is_stopping());
        }
      }
      // ...
    }
    
    NodeMainInstance::CreateMainEnvironment(int* exit_code) {

      // ...
      context = NewContext(isolate_);
      Context::Scope context_scope(context);
    
      DeleteFnPtr<Environment, FreeEnvironment> env { CreateEnvironment(
          isolate_data_.get(),
          context,
          args_,
          exec_args_,
          EnvironmentFlags::kDefaultFlags) };
    
      return env;
    }
}
// src/environment.cc
namespace node {
    void LoadEnvironment(Environment* env) {
      USE(LoadEnvironment(env,
                          StartExecutionCallback{},
                          {}));
    }
    
    MaybeLocal<Value> LoadEnvironment(
        Environment* env,
        StartExecutionCallback cb,
        std::unique_ptr<InspectorParentHandle> removeme) {
      env->InitializeLibuv(per_process::v8_is_profiling);
      env->InitializeDiagnostics();
    
      return StartExecution(env, cb);
    }
    
    Environment* CreateEnvironment(IsolateData* isolate_data,
                               Local<Context> context,
                               int argc,
                               const char* const* argv,
                               int exec_argc,
                               const char* const* exec_argv) {
      return CreateEnvironment(
          isolate_data, context,
          std::vector<std::string>(argv, argv + argc),
          std::vector<std::string>(exec_argv, exec_argv + exec_argc));
    }
    
    Environment* CreateEnvironment(
        IsolateData* isolate_data,
        Local<Context> context,
        const std::vector<std::string>& args,
        const std::vector<std::string>& exec_args,
        EnvironmentFlags::Flags flags,
        ThreadId thread_id,
        std::unique_ptr<InspectorParentHandle> inspector_parent_handle) {
      Isolate* isolate = context->GetIsolate();
      HandleScope handle_scope(isolate);
      Context::Scope context_scope(context);
      // TODO(addaleax): This is a much better place for parsing per-Environment
      // options than the global parse call.
      Environment* env = new Environment(
          isolate_data,
          context,
          args,
          exec_args,
          flags,
          thread_id);
      // ... 
      if (env->RunBootstrapping().IsEmpty()) {
        FreeEnvironment(env);
        return nullptr;
      }
    
      return env;
    }
}

Libuv 架構

Libuv 是 NodeJS 的核心組件,是一個跨平臺的處理異步 I/O 請求的 C 庫,從架構來看,它把各類請求主要分爲兩大類:網絡 I/O 相關請求,以及文件 I/O、DNS Ops 以及 User code 組成的請求。

對於網絡 I/O 相關請求,根據 OS 平臺的不同,分別採用了 Linux 的 epoll、OSX 和 BSD 類 OS 的 kqueue、SunOS 的 event ports 以及 Windows 的 IOCP 等 I/O 讀寫機制。

對於 File I/O 爲代表的請求,則使用線程池實現異步請求處理,具有更好的跨平臺特性。

事件循環 event loop

在 Node 應用啓動後,就會進入 Libuv 事件循環中,每一輪循環 Libuv 都會處理維護在各個階段的任務隊列的回調節點,在回調節點中可能會產生新的任務,任務可能在當前循環或是下個循環繼續被處理。

以下是 Libuv 的執行流程圖:

下面簡述一下各個階段代表的含義:

  1. 首先判斷當前事件循環是否處於 alive 狀態,否則退出整個事件循環。alive 狀態表示是否有 active 狀態的 handle 和 request,closing 狀態的 handle

  2. 基於系統時間更新時間戳

  3. 判斷由定時器組成的小頂堆中那個節點超時,超時則執行定時器回調

  4. 執行 pending 回調任務,一般 I/O 回調添加的錯誤或寫數據成功的任務都會在下一個事件循環的 pending 階段執行

  5. 執行 idle 階段的回調任務

  6. 執行 prepare 階段的回調任務

  7. 調用各平臺的 I/O 讀寫接口,最多等待 timeout 時間(定時器最快過期時間),期間如果有數據返回,則執行 I/O 對應的回調

  8. 執行 check 階段的回調任務

  9. 執行 closing 階段的回調任務

  10. 重新回到流程 1

源碼概覽

// src/unix/core.c
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int can_sleep;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    uv__run_timers(loop);

    can_sleep =
        QUEUE_EMPTY(&loop->pending_queue) && QUEUE_EMPTY(&loop->idle_handles);

    uv__run_pending(loop);
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && can_sleep) || mode == UV_RUN_DEFAULT)
      timeout = uv__backend_timeout(loop);

    uv__metrics_inc_loop_count(loop);
    // 動態設置 epoll_wait 的超時時間
    uv__io_poll(loop, timeout);

    for (r = 0; r < 8 && !QUEUE_EMPTY(&loop->pending_queue); r++)
      uv__run_pending(loop);

    uv__metrics_update_idle_time(loop);

    uv__run_check(loop);
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  /* The if statement lets gcc compile it to a conditional store. Avoids
   * dirtying a cache line.
   */
  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

任務調度

瞭解了 Libuv 的事件循環流程,接下來結合 JS 代碼具體看看 NodeJS 是如何進行任務調度的。

目前,主要有五種主要類型的隊列被 Libuv 的事件循環所處理:

除了以上五種主要的任務列表,還有額外兩種不屬於 libuv 而是作爲 NodeJS 一部分的任務隊列:

nextTicks 隊列和其他微任務隊列會在事件循環每一階段穿插調用,nextTicks 優先級會比其他微任務隊列更高。

示例

// timer -> pending -> idle -> prepare -> poll io -> check -> close

// timer phase
setTimeout(() ={
    Promise.resolve().then(() ={
        console.log('promise resolve in timeout');
        process.nextTick(() ={
            console.log('tick task in timeout promise');
        });
    })
    process.nextTick(() ={
        console.log('tick task in timeout');
        process.nextTick(() ={
            console.log("tick task in timeout->tick");
        })
    });
    console.log('timer task');

}, 0);

// check phase
setImmediate(() ={
    process.nextTick(() ={
        console.log('imeediate->tick task')
    });
    console.log('immediate task');
});

Promise.resolve().then(() ={
    console.log('promise resolve');
});

process.nextTick(() ={
    console.log("tick task");
});

console.log('run main thread');
// result
run main thread
tick task
promise resolve
timer task
tick task in timeout
tick task in timeout->tick
promise resolve in timeout
tick task in timeout promise
immediate task
imeediate->tick task

現在解讀一下以上的執行流程:

NodeJS 在經過一系列初始化工作後,開始執行用戶 JS 代碼,解釋執行過程中,分別把 setTimeout、setImmediate、Promise、nextTick 函數的回調插入 timer、immediate、microtask 和 nexttick 隊列。

  1. 執行主線程 console.log("run main thread"),打印 "run main thread"

  2. 進入 timer 階段前,發現 nextTick 、promise 隊列有任務如下:

nextTicks = [() ={
    console.log("tick task");
}];

microtasks = [() ={
    console.log('promise resolve');
}];

分別打印 "tick task" 以及 "promise resolve"

  1. 進入 timer 階段,執行定時器回調,定時器回調中再次往 microtask 和 nextTick 插入新的任務如下:
nextTicks = [() ={
        console.log('tick task in timeout');
        process.nextTick(() ={
            console.log("tick task in timeout->tick");
        })
    }];

microtasks = [() ={
        console.log('promise resolve in timeout');
        process.nextTick(() ={
            console.log('tick task in timeout promise');
        });
    }];

打印主線程任務中的 "timer task",再進入下一階段,發現 nextTicks 和 microtasks 隊列爲非空,執行微任務。由於 nextTicks 優先級更高,先打印 "tick task in timeout",然後又往 nextTicks 插入 () => {console.log("tick task in timeout->tick")} ,繼續執行 nextTicks 任務打印 "tick task in timeout->tick"。

此時 nextTicks 隊列已空,執行 miacrotasks 隊列,打印 "promise resolve in timeout",此時又往 nextTicks 插入任務 () => {console.log('tick task in timeout promise')},繼續執行 nextTick 任務,打印 "tick task in timeout promise"。

進入 check 階段(Immediate),爲 nextTicks 添加 () => {console.log('imeediate->tick task') },主線程打印 "immediate task",進入下一階段前先執行 nextTicks 任務,打印 'imeediate->tick task'

拓展

setImmediate(() => console.log('this is set immediate 1'));
setImmediate(() => console.log('this is set immediate 2'));
setImmediate(() => console.log('this is set immediate 3'));

setTimeout(() => console.log('this is set timeout 1'), 0);
setTimeout(() ={
    console.log('this is set timeout 2');
    process.nextTick(() => console.log('this is process.nextTick added inside setTimeout'));
}, 0);
setTimeout(() => console.log('this is set timeout 3'), 0);
setTimeout(() => console.log('this is set timeout 4'), 0);
setTimeout(() => console.log('this is set timeout 5'), 0);

process.nextTick(() => console.log('this is process.nextTick 1'));
process.nextTick(() ={
    process.nextTick(console.log.bind(console, 'this is the inner next tick inside next tick'));
});
process.nextTick(() => console.log('this is process.nextTick 2'));
process.nextTick(() => console.log('this is process.nextTick 3'));
process.nextTick(() => console.log('this is process.nextTick 4'));

I/O 模型

得益於 Libuv 這一跨平臺的高性能異步 I/O 庫,使得 NodeJS 在處理 I/O 密集型任務上十分彰顯優勢。下面結合不同的 I/O 模型,對比分析一下 NodeJS 目前工程實踐所採用的 I/O 模型的優越性。

首先理清一下阻塞和非阻塞、異步和同步的概念:

在應用程序通過 I/O 函數申請讀寫數據時,如果在數據就緒前進程一直在等待的,就是阻塞 I/O,即發起 I/O 請求時是阻塞的

數據從內核緩衝區到到用戶內存複製過程中,需要用戶進程等待,就是同步 I/O,即實際的 I/O 讀寫是同步的

同步阻塞

圖片來源:https://www.51cto.com/article/693213.html

在網絡編程中,當調用 recvfrom 獲取客戶端數據時,首先會阻塞進程,等待數據通過網卡到內核緩衝區;當數據就緒後,再同步等待指代數據從內核緩衝區拷貝到用戶空間,此時用戶進程再進行數據處理。

同步阻塞 I/O 模型是最簡單的 I/O 模型,好處是使用簡單,通常在 fd 較少、數據就緒很快的場景,缺點是如果內核數據一直沒準備好,則用戶進程將會一直阻塞無法執行後續任務。

以網絡編程爲例,默認情況下 socket 是 blocking 的,即函數 accept , recvfrom 等,需等待函數執行結束之後才能夠返回 (此時操作系統切換到其他進程執行)。accpet 等待到有 client 連接請求並接受成功之後,recvfrom 需要讀取完 client 發送的數據之後才能夠返回

// 創建套接字
int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
// IP 和端口配置 ...
// ...
// 綁定 IP 和端口
bind(serv_sock, ...);
// 監聽來自監聽型套接字的請求
listen(serv_sock, ...);

int clnt_sock;
while (1) {
    // 創建新的通信型套接字用於接收來自客戶端的請求,此時會阻塞程序執行,直到有請求到來
    clnt_sock = accept(serv_sock, ...);
    // 接收客戶端的數據,同步阻塞 I/O,等待數據就緒
    recvfrom(clnt_sock, ...);
    // 處理數據
    handle(data);
}

同步非阻塞

圖片來源:https://www.51cto.com/article/693213.html

同步非阻塞 I/O 的特點是當用戶進程發起網絡讀請求時,如果內核緩衝區還沒接收到客戶端數據,會立即返回 EWOULDBLOCK 錯誤碼,而不會阻塞用戶進程,用戶進程可結合輪詢調用方式繼續發起 recvfrom 調用,直到數據就緒,然後同步等待數據從內核緩衝區複製到用戶空間,然後用戶進程進行數據處理。

同步非阻塞 I/O 的優勢在於當發起 I/O 請求時不會阻塞用戶進程,一定程度上提升了程序的性能,但是爲了及時獲取數據的就緒狀態,需要頻繁輪詢,這樣也會消耗不小的 CPU 資源。

以網絡編程爲例,可設置 socket 爲 non-blocking 模式,使用 socket() 創建的 socket 默認是阻塞的;可使用函數 fcntl 可設置創建的 socket 爲非阻塞的,這樣使用原本 blocking 的各種函數(accept、recvfrom),可以立即獲得返回結果。通過判斷返回的 errno 瞭解狀態:

這樣就實現同步非阻塞 I/O 請求:

// 創建套接字
int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
// 設置 socket 爲非阻塞 I/O
int flags = fcntl(serv_sock, F_GETFL, 0);
fcntl(serv_sock, F_SETFL, flags | O_NONBLOCK);


// IP 和端口配置 ...
// ...
// 綁定 IP 和端口
bind(serv_sock, ...);
// 監聽來自監聽型套接字的請求
listen(serv_sock, ...);


// 創建新的通信型套接字用於接收來自客戶端的請求
int clnt_sock;

while (1) {
    // 在non-blocking模式下,如果返回值爲-1,且 errno == EAGAIN 或errno == EWOULDBLOCK 表示no connections 沒有新連接請求;
    clnt_sock = accept(serv_sock, ...);
    if (clnt_sock == -1 && errno == EAGAIN) {
        fprintf(stderr, "no client connections\n");
        continue;
    } else if (clnt_sock == -1) {
        perror("accept failed");
    }
    
    // 接收客戶端的數據,同步非阻塞 I/O,在non-blocking模式下,如果返回值爲-1,且 errno == EAGAIN表示沒有可接受的數據或正在接受尚未完成;
    while (1) {
        int ret = recvfrom(clnt_sock, ...);
        if (ret == -1 && errno == EAGAIN) {
        fprintf(stderr, "no data ready\n");
            continue;
        } else if (ret == -1) {
            perror("read failed");
        }
        // 處理數據
        handle(data);
    }
}

I/O 多路複用

圖片來源:https://www.51cto.com/article/693213.html

上述兩種 I/O 模型均是面向單個客戶端連接的,同一時間只能處理一個 client 請求,雖然可以通過多進程 / 多線程的方法解決,但是多進程 / 多線程需要考慮額外的資源消耗以及同步互斥的相關問題。

爲了高效解決多個 fd 的狀態監聽,I/O 多路複用技術應運而生。

I/O 多路複用的核心思想是可以同時監聽多個不同的 fd(網絡環境下即是網絡套接字),當套接字中的任何一個數據就緒了,就可以通知用戶進程,此時用戶進程再發起 recvfrom 請求去讀取數據。

以網絡編程爲例,可通過維護一個需要監聽的所有 socket 的 fd 列表,然後調用 select/epoll 等監聽函數,如果 fd 列表中所有 socket 都沒有數據就緒,則 select/epoll 會阻塞,直到有一個 socket 接收到數據,然後喚醒進程。

int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
// 設置 socket 爲非阻塞 I/O
int flags = fcntl(serv_sock, F_GETFL, 0);
fcntl(serv_sock, F_SETFL, flags | O_NONBLOCK);

// IP 和端口配置 ...
// ...
// 綁定 IP 和端口
bind(serv_sock, ...);
// 監聽來自監聽型套接字的請求
listen(serv_sock, ...);

// 存放需要監聽的 socket 列表
fd_set readfds;
// 添加 需要監聽的 socket 到 readfds
FD_SET(serv_sock, readfds);

// 創建新的通信型套接字用於接收來自客戶端的請求
int clnt_sock;
// 調用 select 返回的結果值
int res;
// 預計可接受的最大連接數量,select 最大支持 1024 個 fd
int maxfd = 1000;
while (1) {
    // 調用 select 阻塞監聽 fd 列表,直到有一個 socket 接收到請求,喚醒進程
    res = select(maxfd + 1, &readfds, ...);
    if (res == -1) {
        perror("select failed");
        exit(EXIT_FAILURE);
    } else if (res == 0) {
        fprintf(stderr, "no socket ready for read\n");
    }
    // 遍歷每個 socket,如果是 serv_sock 則 accept,否則進行讀操作
    for (int i = 0; i <= maxfd; i++) {
        // 是否 socket 是否在 監聽的 fd 列表中
        if (!FD_ISSET(i, &readfds)) {
            continue;
        }
        
        if (i == serv_sock) {
            // 當前請求是 server sock,則建立 accept 連接
            clnt_sock = accpet(serv_sock, ...);
            // 將新建立的客戶端連接添加進行 readfds 監聽列表中
            FD_SET(clnt_sock, &readfds);
        } else {
            // 當請求是客戶端的 socket,接收客戶端的數據,此時數據已經就緒,將數據從內核空間複製到用戶空間
            int ret = recvfrom(i, ...);
            if (ret == -1 && errno == EAGAIN) {
            fprintf(stderr, "no data ready\n");
                continue;
            } else if (ret == -1) {
                perror("read failed");
            }
            // 處理數據
            handle(data);
        }
    }
}

上面是使用 select 函數實現的 I/O 多路複用,實際在 Libuv 採用的是 epoll 函數,epoll 函數是爲了解決 select 的以下缺點而誕生的:

圖片來源:https://www.51cto.com/article/693213.html

epoll 的優勢在於:

int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
// 設置 socket 爲非阻塞 I/O
int flags = fcntl(serv_sock, F_GETFL, 0);
fcntl(serv_sock, F_SETFL, flags | O_NONBLOCK);

// IP 和端口配置 ...
// ...
// 綁定 IP 和端口
bind(serv_sock, ...);
// 監聽來自監聽型套接字的請求
listen(serv_sock, ...);

// 創建 epoll 對象
int MAX_EVENTS = 5; // 告訴內核可能需要監聽的 fd 數量,如果使用時大於該數,則內核會申請動態申請工多空間
int epoll_fd = epoll_create(MAX_EVENTS);

if (epoll_fd == -1) {
    printf("epoll_create error!\n");
    return -1;
}

// 註冊 serv_sock 所監聽的事件
struct epoll_event ev;
struct epoll_event events[MAX_EVENTS];

ev.data.fd = serv_sock; // 設置該事件的 fd 爲 serv_sock
ev.events = EPOLLIN; // 設置監聽 serv_sock 的可讀事件


// 添加 serv_sock 到 epoll 的可讀事件監聽隊列中
int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serv_sock, &ev);

if (ret == -1) {
    printf("epoll_ctl error!\n");
    return -1;
}

int connfd = 0; // 與客戶端連接成功後的通信型 fd
while (1) {
    // int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
    // 等待 epoll_fd 中的事件,如果 serv_sock 有可讀事件發生,則函數返回就緒後的 fd 數量
    // 最後一個 timeout 參數可用來控制 epoll_wait 的等待事件發生的時間,-1 爲阻塞等待,0 爲非阻塞立即返回
    int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
    
    for (int i = 0; i < nfds; i++) {
        // 客戶端發起請求
        if (events[i].data.fd == serv_sock) {
            connfd = accept(serv_sock, ...);
            if (connfd == -1) {
                printf("accept error!\n");
            }
            
            ev.data.fd = connfd; // 設置該事件的 fd 爲當前的 connfd
            ev.events = EPOLLIN; // 設置當前的 connfd 的可讀事件
             
             // 添加當前 connfd 到 epoll 的可讀事件監聽隊列中
            if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connfd, &ev) == -1) {
                printf("epoll_ctl add error!\n");
                return -1;
            }
        } else {
            // 某個來自客戶端的請求的數據已就緒
            int ret = recvfrom(i, ...);
            if (ret == -1 && errno == EAGAIN) {
            fprintf(stderr, "no data ready\n");
                continue;
            } else if (ret == -1) {
                perror("read failed");
            }
            // 處理數據
            handle(data);
        }
    }
}

以上就是基於 epoll 機制的事件驅動型的 I/O 多路複用模型,服務器通過註冊文件描述符及其對應監聽的事件到 epoll(epoll_ctl),epoll 開始阻塞監聽事件直到有某個 fd 的監聽事件觸發(epoll_wait),然後就遍歷就緒事件,根據 fd 類型的不同執行不同的任務。

服務器架構

單進程單線程 · 串行模型

int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
// IP 和端口配置 ...
// ...
// 綁定 IP 和端口
bind(serv_sock, ...);
// 監聽來自監聽型套接字的請求
listen(serv_sock, ...);

int clnt_sock;
while (1) {
    // 創建新的通信型套接字用於接收來自客戶端的請求,此時會阻塞程序執行,直到有請求到來
    clnt_sock = accept(serv_sock, ...);
    // 接收客戶端的數據,同步阻塞 I/O,等待數據就緒
    recvfrom(clnt_sock, ...);
    // 處理數據
    handle(data);
}

單進程單線程 · 串行處理請求是最簡單的服務器架構,先從經過三次握手,然後從連接隊列中獲取客戶端連接節點(accept 返回的套接字),然後從客戶端的套接字獲取數據進行處理,接下來再進行下個連接節點處理。

在併發連接數較大的情況下,並且採用的是阻塞式 I/O 模型,那麼處理客戶端連接的效率就會非常低。

多進程 / 多線程

單進程串行處理請求因爲阻塞 I/O 導致連接隊列中的節點被阻塞導致處理效率低下,通過把請求分給多個進程處理從而提升效率,人多力量大。在多進程 / 多線程架構下,如果一個請求發送阻塞 I/O,那麼操作系統會掛起該進程,接着調度其他進程,實現併發處理能力的提高。

但這種架構模式下的性能瓶頸在於系統的進程數、線程數是有限的,開闢進程和線程的開銷也是需要考慮的問題,系統資源消耗高。

int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
// IP 和端口配置 ...
// ...
// 綁定 IP 和端口
bind(serv_sock, ...);
// 監聽來自監聽型套接字的請求
listen(serv_sock, ...);

int clnt_sock;
int pid;
while (1) {
    // 創建新的通信型套接字用於接收來自客戶端的請求,此時會阻塞程序執行,直到有請求到來
    clnt_sock = accept(serv_sock, ...);
    pid = fork();
    
    if (pid < 0) {
        perror("fork failed");
        return -1;
    }
    if (fork() > 0) {
        // 父進程
        continue;
    } else {
        // 子進程
        // 接收客戶端的數據,同步阻塞 I/O,等待數據就緒
        recvfrom(clnt_sock, ...);
        // 處理數據
        handle(data);
    }
}

單進程單線程 · 事件驅動

除了通過多進程 / 多線程方式去應對併發量大的場景,基於 I/O 多路複用模型的單進程單線程 · 事件驅動架構也是較好的解決方案,同時由於是單線程,所以不會因開闢大量進場 / 線程所帶來的資源開銷以及同步互斥的問題。

單線程不適合執行 CPU 密集型任務,因爲如果任務一直佔用 CPU 時間,則後續任務無法執行,因此針對大量 CPU 計算、引起進程阻塞的任務,可引入線程池技術去解決。

目前 NodeJS 就是採用這種設計架構,所有 JS 代碼跑在主線程中(單線程),基於 I/O 多路複用的模型去實現事件驅動的多讀寫請求的管理,配合線程池,將 CPU 密集型任務從主線程分離出來,以保證主線程的高效響應。

解答

緊接着 NodeJS 執行用戶 JS 代碼,用戶 JS 代碼執行一些初始化的邏輯以及往事件循環註冊任務,然後進程就進入事件循環的階段。

整個事件循環分爲 7 個階段,timer 處理定時器任務,pending 處理 poll io 階段的成功或錯誤回調,idle、prepare、check 是自定義階段,poll io 主要處理網絡 I/O,文件 I/O 等任務,close 處理關閉的回調任務,同時在各個事件階段還會穿插微任務隊列。

以開篇的 TCP 服務爲例,當創建 TCP 服務器調用原生 JS 的 net 模塊的 server.listen 方法後, net 模塊就會引用 C++ 的 TCP 模塊實例化一個 TCP 服務器,內部調用了 Libuv 的 uv_tcp_init 方法,該方法封裝了 C 中用於創建套接字的 socket 函數;接着就是調用 C++ 的 TCP 模塊的 Bind 方法,該方法封裝了 Libuv 的 uv_ip_addr 以及 uv_tcp_bind,分別用於設置 TCP 的 IP 地址和端口信息以及調用 C 中的 bind 方法用於綁定地址信息。

然後 net 模塊註冊 onconnect 回調函數,該函數將在客戶端請求到來後,在 Libuv 的 poll io 階段執行,onconnect 函數調用了 C++ 的 ConnectionWrap::OnConnection 方法,內部調用了 Libuv 的 uv_accpet 去接收來自客戶端的連接。最後調用 TCP 實例的 listen 方法使得服務器進入被動監聽狀態,listen 使用了 C++ 的 TCPWrap::Listen 方法,該方法是對 uv_listen 的封裝,最終調用的 C 的 listen 方法。

當客戶端請求通過網卡傳遞過來,對應的監聽型 socket 發生狀態變更,事件循環模塊根據命中之前設置的可讀事件,將 onconnection 回調插入 poll io 階段的任務隊列,當新一輪的事件循環到達 poll io 時執行回調,調用 accept 方法創建與客戶端的通信型 socket,此時進入進程阻塞,經過三次握手後,建立與客戶端的連接,將用戶 JS 的回調插入 poll io 的任務隊列,在新一輪的事件循環中進行數據的處理。

TCP 服務器在啓動之後,就往 NodeJS 的事件循環系統插入 listen 的監聽任務,該任務會一直阻塞監聽(不超過 timeout)來自客戶端的請求,當發生請求後,建立連接然後進行數據處理後,再會進入監聽請求的阻塞狀態,新一輪的事件循環發現 poll io 隊列還有任務所以不會退出事件循環,從而驅動進程一直運行。

NodeJS 採用的是單線程 + 事件驅動的服務端架構,首先對於事件循環以外的代碼會在初始化時執行完,然後進程就進入事件循環,針對網絡 I/O NodeJS 底層採用的是 I/O 多路複用模型,通過監聽就緒的連接做到從容應對大併發連接。對於網絡數據而言,當調用阻塞的 recvfrom 處理來自的網絡的數據,此時數據已經就緒,所以數據處理起來很快,如果是大文件,則需要業務代碼自行開闢線程去處理;對於文件 I/O,NodeJS 底層採用線程池的機制,在主線程外開闢工作線程去處理本地大文件,在處理完後通過事件通知機制告訴上層 JS 代碼。

參考資料

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/TD7K22i_N_Hp1sVK7n1gpQ