Node-js 流源碼解讀之可讀流

1. 基本概念

1.1. 流的歷史演變

流不是 Node.js 特有的概念。它們是幾十年前在 Unix 操作系統中引入的,程序可以通過管道運算符(|)對流進行相互交互。

在基於 Unix 系統的 MacOS 以及 Linux 中都可以使用管道運算符(|),它可以將運算符左側進程的輸出轉換成右側的輸入。

在 Node 中,我們使用傳統的 readFile 去讀取文件的話,會將文件從頭到尾都讀到內存中,當所有內容都被讀取完畢之後纔會對加載到內存中的文件內容進行統一處理。

這樣做會有兩個缺點:

  1. 內存方面:佔用大量內存

  2. 時間方面:需要等待數據的整個有效負載都加載完纔會開始處理數據

爲了解決上述問題,Node.js 效仿並實現了流的概念,在 Node.js 流中,一共有四種類型的流,它們都是 Node.js 中 EventEmitter 的實例:

  1. 可讀流(Readable Stream)

  2. 可寫流(Writable Stream)

  3. 可讀可寫全雙工流(Duplex Stream)

  4. 轉換流(Transform Stream)

爲了深入學習這部分的內容,循序漸進的理解 Node.js 中流的概念,並且由於源碼部分較爲複雜,本人決定先從可讀流開始學習這部分內容。

1.2. 什麼是流(Stream)

流是一種抽象的數據結構,是數據的集合,其中存儲的數據類型只能爲以下類型(僅針對 objectMode === false 的情況):

我們可以把流看作這些數據的集合,就像液體一樣,我們先把這些液體保存在一個容器裏(流的內部緩衝區 BufferList),等到相應的事件觸發的時候,我們再把裏面的液體倒進管道里,並通知其他人在管道的另一側拿自己的容器來接裏面的液體進行處理。

1.3. 什麼是可讀流(Readable Stream)

可讀流是流的一種類型,它有兩種模式三種狀態。

兩種讀取模式:

  1. 流動模式:數據會從底層系統讀取,並通過 EventEmitter 儘快的將數據傳遞給所註冊的事件處理程序中

  2. 暫停模式:在這種模式下將不會讀取數據,必須顯示的調用 Stream.read () 方法來從流中讀取數據

三種狀態:

  1. readableFlowing === null:不會產生數據,調用 Stream.pipe ()、Stream.resume 會使其狀態變爲 true,開始產生數據並主動觸發事件

  2. readableFlowing === false:此時會暫停數據的流動,但不會暫停數據的生成,因此會產生數據積壓

  3. readableFlowing === true:正常產生和消耗數據

2. 基本原理

2.1. 內部狀態定義(ReadableState)

ReadableState

_readableState: ReadableState {
  objectMode: false, // 操作除了string、Buffer、null之外的其他類型的數據需要把這個模式打開
  highWaterMark: 16384, // 水位限制,1024 \* 16,默認16kb,超過這個限制則會停止調用\_read()讀數據到buffer中
  buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer鏈表,用於保存數據
  length: 0, // 整個可讀流數據的大小,如果是objectMode則與buffer.length相等
  pipes: [], // 保存監聽了該可讀流的所有管道隊列
  flowing: null, // 可獨流的狀態 null、false、true
  ended: false, // 所有數據消費完畢
  endEmitted: false, // 結束事件收否已發送
  reading: false, // 是否正在讀取數據
  constructed: true, // 流在構造好之前或者失敗之前,不能被銷燬
  sync: true, // 是否同步觸發'readable'/'data'事件,或是等到下一個tick
  needReadable: false, // 是否需要發送readable事件
  emittedReadable: false, // readable事件發送完畢
  readableListening: false, // 是否有readable監聽事件
  resumeScheduled: false, // 是否調用過resume方法
  errorEmitted: false, // 錯誤事件已發送
  emitClose: true, // 流銷燬時,是否發送close事件
  autoDestroy: true, // 自動銷燬,在'end'事件觸發後被調用
  destroyed: false, // 流是否已經被銷燬
  errored: null, // 標識流是否報錯
  closed: false, // 流是否已經關閉
  closeEmitted: false, // close事件是否已發送
  defaultEncoding: 'utf8', // 默認字符編碼格式
  awaitDrainWriters: null, // 指向監聽了'drain'事件的writer引用,類型爲null、Writable、Set<Writable>
  multiAwaitDrain: false, // 是否有多個writer等待drain事件 
  readingMore: false, // 是否可以讀取更多數據
  dataEmitted: false, // 數據已發送
  decoder: null, // 解碼器
  encoding: null, // 編碼器
  [Symbol(kPaused)]: null
},

2.2. 內部數據存儲實現(BufferList)

BufferList 是用於流保存內部數據的容器,它被設計爲了鏈表的形式,一共有三個屬性 head、tail 和 length。

BufferList 中的每一個節點我把它表示爲了 BufferNode,裏面的 Data 的類型取決於 objectMode。

這種數據結構獲取頭部的數據的速度快於 Array.prototype.shift()。

2.2.1. 數據存儲類型

如果 objectMode === true:

那麼 data 則可以爲任意類型,push 的是什麼數據則存儲的就是什麼數據。

objectMode=true

const Stream = require('stream');
const readableStream = new Stream.Readable({
  objectMode: true,
  read() {},
});

readableStream.push({ name: 'lisa'});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(true);
console.log(readableStream._readableState.buffer.tail);
readableStream.push('lisa');
console.log(readableStream._readableState.buffer.tail);
readableStream.push(666);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(() => {});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(Symbol(1));
console.log(readableStream._readableState.buffer.tail);
readableStream.push(BigInt(123))
console.log(readableStream._readableState.buffer.tail);

運行結果:

如果 objectMode === false:

那麼 data 只能爲 string 或者 Buffer 或者 Uint8Array 。

objectMode=false

const Stream = require('stream');
const readableStream = new Stream.Readable({
  objectMode: false,
  read() {},
});
readableStream.push({ name: 'lisa'});

運行結果:

2.2.2. 數據存儲結構

我們在控制檯通過 node 命令行創建一個可讀流,來觀察 buffer 中數據的變化:

當然在 push 數據之前我們需要實現它的 _read 方法,或者在構造函數的參數中實現 read 方法:

const Stream = require('stream');
const readableStream = new Stream.Readable();
RS._read = function(size) {}

或者

const Stream = require('stream');
const readableStream = new Stream.Readable({
  read(size) {}
});

經過 readableStream.push ('abc') 操作之後,當前的 buffer 爲:

可以看到目前的數據存儲了,頭尾存儲的數據都是字符串'abc' 的 ascii 碼,類型爲 Buffer 類型,length 表示當前保存的數據的條數而非數據內容的大小。

2.2.3. 相關 API

打印一下 BufferList 的所有方法可以得到:

除了 join 是將 BufferList 序列化爲字符串之外,其他都是對數據的存取操作。

這裏就不一一講解所有的方法了,重點講一下其中的 consume 、_getString 和_getBuffer。

2.2.3.1. consume

源碼地址:BufferList.consume

comsume

// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
  const data = this.head.data;
  if (n < data.length) {
    // `slice` is the same for buffers and strings.
    const slice = data.slice(0, n);
    this.head.data = data.slice(n);
    return slice;
  }
  if (n === data.length) {
    // First chunk is a perfect match.
    returnthis.shift();
  }
  // Result spans more than one buffer.
  return hasStrings ? this.\_getString(n) : this.\_getBuffer(n);
}

代碼一共有三個判斷條件:

1. 如果所消耗的數據的字節長度小於鏈表頭節點存儲數據的長度,則將頭節點的數據取前 n 字節,並把當前頭節點的數據設置爲切片之後的數據

  1. 如果所消耗的數據恰好等於鏈表頭節點所存儲的數據的長度,則直接返回當前頭節點的數據

  1. 如果所消耗的數據的長度大於鏈表頭節點的長度,那麼會根據傳入的第二個參數進行最後一次判斷,判斷當前的 BufferList 底層存儲的是 string 還是 Buffer

2.2.3.2. _getBuffer

源碼地址:BufferList._getBuffer

comsume

// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
  const ret = Buffer.allocUnsafe(n);
  const retLen = n;
  let p = this.head;
  let c = 0;
  do {
    const buf = p.data;
    if (n > buf.length) {
      TypedArrayPrototypeSet(ret, buf, retLen - n);
      n -= buf.length;
    } else {
      if (n === buf.length) {
        TypedArrayPrototypeSet(ret, buf, retLen - n);
        ++c;
        if (p.next)
          this.head = p.next;
        else
          this.head = this.tail = null;
      } else {
       TypedArrayPrototypeSet(ret,
                              newUint8Array(buf.buffer, buf.byteOffset, n),
                              retLen - n);
        this.head = p;
        p.data = buf.slice(n);
      }
      break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}

總的來說就是循環對鏈表中的節點進行操作,新建一個 Buffer 數組用於存儲返回的數據。

首先從鏈表的頭節點開始取數據,不斷的複製到新建的 Buffer 中,直到某一個節點的數據大於等於要取的長度減去已經取得的長度。

或者說讀到鏈表的最後一個節點後,都還沒有達到要取的長度,那麼就返回這個新建的 Buffer。

2.2.3.3. _getString

源碼地址:BufferList._getString

comsume

// Consumes a specified amount of characters from the buffered data.
_getString(n) {
  let ret = '';
  let p = this.head;
  let c = 0;
  do {
    const str = p.data;
    if (n > str.length) {
    ret += str;
    n -= str.length;
  } else {
    if (n === str.length) {
      ret += str;
      ++c;
      if (p.next)
        this.head = p.next;
      else
        this.head = this.tail = null;
    } else {
      ret += StringPrototypeSlice(str, 0, n);
      this.head = p;
      p.data = StringPrototypeSlice(str, n);
    }
    break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}

對於操作字符串來說和操作 Buffer 是一樣的,也是循環從鏈表的頭部開始讀數據,只是進行數據的拷貝存儲方面有些差異,還有就是 _getString 操作返回的數據類型是 string 類型。

2.3. 爲什麼可讀流是 EventEmitter 的實例?

對於這個問題而言,首先要了解什麼是發佈訂閱模式,發佈訂閱模式在大多數 API 中都有重要的應用,無論是 Promise 還是 Redux,基於發佈訂閱模式實現的高級 API 隨處可見。

它的優點在於能將事件的相關回調函數存儲到隊列中,然後在將來的某個時刻通知到對方去處理數據,從而做到關注點分離,生產者只管生產數據和通知消費者,而消費者則只管處理對應的事件及其對應的數據,而 Node.js 流模式剛好符合這一特點。

那麼 Node.js 流是怎樣實現基於 EventEmitter 創建實例的呢?

這部分源碼在這兒:stream/legacy

legacy

function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);

然後在可讀流的源碼中有這麼幾行代碼:

這部分源碼在這兒:readable

legacy

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

首先將 Stream 的原型對象繼承自 EventEmitter,這樣 Stream 的所有實例都可以訪問到 EventEmitter 上的方法。

同時通過 ObjectSetPrototypeOf (Stream, EE) 將 EventEmitter 上的靜態方法也繼承過來,並在 Stream 的構造函數中,借用構造函數 EE 來實現所有 EventEmitter 中的屬性的繼承,然後在可讀流裏,用同樣的的方法實現對 Stream 類的原型繼承和靜態屬性繼承,從而得到:

Readable.prototype.proto === Stream.prototype;

Stream.prototype.proto === EE.prototype

因此:

Readable.prototype.proto.proto === EE.prototype

所以捋着可讀流的原型鏈可以找到 EventEmitter 的原型,實現對 EventEmitter 的繼承。

2.4. 相關 API 的實現

這裏會按照源碼文檔中 API 的出現順序來展示,且僅解讀其中的核心 API 實現。

注:此處僅解讀 Node.js 可讀流源碼中所聲明的函數,不包含外部引入的函數定義,同時爲了減少篇幅,不會將所有代碼都拷貝下來。

Readable.prototype

Stream {
  destroy: [Function: destroy],
  _undestroy: [Function: undestroy],
  _destroy: [Function (anonymous)],
  push: [Function (anonymous)],
  unshift: [Function (anonymous)],
  isPaused: [Function (anonymous)],
  setEncoding: [Function (anonymous)],
  read: [Function (anonymous)],
  _read: [Function (anonymous)],
  pipe: [Function (anonymous)],
  unpipe: [Function (anonymous)],
  on: [Function (anonymous)],
  addListener: [Function (anonymous)],
  removeListener: [Function (anonymous)],
  off: [Function (anonymous)],
  removeAllListeners: [Function (anonymous)],
  resume: [Function (anonymous)],
  pause: [Function (anonymous)],
  wrap: [Function (anonymous)],
  iterator: [Function (anonymous)],
  [Symbol(nodejs.rejection)]: [Function (anonymous)],
  [Symbol(Symbol.asyncIterator)]: [Function (anonymous)]
}

2.4.1. push

readable.push

Readable.prototype.push = function(chunk, encoding) {
  return readableAddChunk(this, chunk, encoding, false);
};

push 方法的主要作用就是將數據塊通過觸發'data' 事件傳遞給下游管道,或者將數據存儲到自身的緩衝區中。

以下代碼爲相關僞代碼,僅展示主流程:

readable.push

function readableAddChunk(stream, chunk, encoding, addToFront) {
  const state = stream.\_readableState;
  if (chunk === null) { // push null 流結束信號,之後不能再寫入數據
    state.reading = false;
    onEofChunk(stream, state);
  } elseif (!state.objectMode) { // 如果不是對象模式
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk);
    } elseif (chunk instanceof Buffer) { //如果是Buffer
    // 處理一下編碼
    } elseif (Stream.\_isUint8Array(chunk)) {
      chunk = Stream.\_uint8ArrayToBuffer(chunk);
    } elseif (chunk != null) {
      err = new ERR\_INVALID\_ARG\_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
    }
  }

  if (state.objectMode || (chunk && chunk.length > 0)) { // 是對象模式或者chunk是Buffer
    // 這裏省略幾種數據的插入方式的判斷
    addChunk(stream, state, chunk, true);
  }
}

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync &&
    stream.listenerCount('data') > 0) { // 如果處於流動模式,有監聽data的訂閱者
      stream.emit('data', chunk);
  } else { // 否則保存數據到緩衝區中
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront) {
      state.buffer.unshift(chunk);
    } else {
      state.buffer.push(chunk);
    }
  }
  maybeReadMore(stream, state); // 嘗試多讀一點數據
}

push 操作主要分爲對 objectMode 的判斷,不同的類型對傳入的數據會做不同的操作:

其中 addChunk 的第一個判斷主要是處理 Readable 處於流動模式、有 data 監聽器、並且緩衝區數據爲空時的情況。

這時主要將數據 passthrough 透傳給訂閱了 data 事件的其他程序,否則就將數據保存到緩衝區裏面。

2.4.2. read

除去對邊界條件的判斷、流狀態的判斷,這個方法主要有兩個操作

  1. 調用用戶實現的_read 方法,對執行結果進行處理

  2. 從緩衝區 buffer 中讀取數據,並觸發'data' 事件

readable.read

// 如果read的長度大於hwm,則會重新計算hwm
if (n > state.highWaterMark) {
  state.highWaterMark = computeNewHighWaterMark(n);  
}
// 調用用戶實現的\_read方法
try {
  const result = this.\_read(state.highWaterMark);
  if (result != null) {
    const then = result.then;
    if (typeof then === 'function') {
      then.call(
        result,
        nop,
        function(err) {
          errorOrDestroy(this, err);
        });
    }
  }
} catch (err) {
  errorOrDestroy(this, err);
}

如果說用戶實現的_read 方法返回的是一個 promise,則調用這個 promise 的 then 方法,將成功和失敗的回調傳入,便於處理異常情況。

read 方法從緩衝區裏讀區數據的核心代碼如下:

readable.read

function fromList(n, state) {
  // nothing buffered.
  if (state.length === 0)
    returnnull;
  let ret;
  if (state.objectMode)
    ret = state.buffer.shift();
  elseif (!n || n >= state.length) { // 處理n爲空或者大於緩衝區的長度的情況
    // Read it all, truncate the list.
    if (state.decoder) // 有解碼器,則將結果序列化爲字符串
      ret = state.buffer.join('');
    elseif (state.buffer.length === 1) // 只有一個數據,返回頭節點數據
      ret = state.buffer.first();
    else// 將所有數據存儲到一個Buffer中
      ret = state.buffer.concat(state.length);
    state.buffer.clear(); // 清空緩衝區
  } else {
    // 處理讀取長度小於緩衝區的情況
    ret = state.buffer.consume(n, state.decoder);
  }
  return ret;
}

2.4.3. _read

用戶初始化 Readable stream 時必須實現的方法,可以在這個方法裏調用 push 方法,從而持續的觸發 read 方法,當我們 push null 時可以停止流的寫入操作。

示例代碼:

readable._read

const Stream = require('stream');
const readableStream = new Stream.Readable({
  read(hwm) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 122) {
      this.push(null);
    }
  },
});
readableStream.currentCharCode = 97;
readableStream.pipe(process.stdout);
// abcdefghijklmnopqrstuvwxyz%

2.4.4. pipe(重要)

將一個或多個 writable 流綁定到當前的 Readable 流上,並且將 Readable 流切換到流動模式。

這個方法裏面有很多的事件監聽句柄,這裏不會一一介紹:

readable.pipe

Readable.prototype.pipe = function(dest, pipeOpts) {
  const src = this;
  const state = this.\_readableState;
  state.pipes.push(dest); // 收集Writable流

  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      pause();
    }
  }
  // Tell the dest that it's being piped to.
  dest.emit('pipe', src);
  // 啓動流,如果流處於暫停模式
  if (dest.writableNeedDrain === true) {
    if (state.flowing) {
      pause();
    }
  } elseif (!state.flowing) {
    src.resume();
  }
  return dest;
}

pipe 操作和 Linux 的管道操作符 '|' 非常相似,將左側輸出變爲右側輸入,這個方法會將可寫流收集起來進行維護,並且當可讀流觸發'data' 事件。

有數據流出時,就會觸發可寫流的寫入事件,從而做到數據傳遞,實現像管道一樣的操作。並且會自動將處於暫停模式的可讀流變爲流動模式。

2.4.5. resume

使流從 '暫停' 模式切換到 '流動' 模式,如果設置了'readable' 事件監聽,那麼這個方法其實是沒有效果的

readable.resume

Readable.prototype.resume = function() {
  const state = this._readableState;
  if (!state.flowing) {
    state.flowing = !state.readableListening; // 是否處於流動模式取決於是否設置了'readable'監聽句柄
    resume(this, state);
  }
};

function resume(stream, state) {
  if (!state.resumeScheduled) { // 開關,使resume_方法僅在同一個Tick中調用一次
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state);
  }
}

function resume_(stream, state) {
  if (!state.reading) {
    stream.read(0);
  }
  state.resumeScheduled = false;
  stream.emit('resume');
  flow(stream);
}

function flow(stream) { // 當流處於流模式該方法會不斷的從buffer中讀取數據,直到緩衝區爲空
  const state = stream._readableState;
  while (state.flowing && stream.read() !== null); 
  // 因爲這裏會調用read方法,設置了'readable'事件監聽器的stream,也有可能會調用read方法,
  //從而導致數據不連貫(不影響data,僅影響在'readable'事件回調中調用read方法讀取數據)
}

2.4.6. pause

將流從流動模式轉變爲暫停模式,停止觸發'data' 事件,將所有的數據保存到緩衝區

readable.pause

Readable.prototype.pause = function() {
  if (this._readableState.flowing !== false) {
    debug('pause');
    this._readableState.flowing = false;
    this.emit('pause');
  }
  returnthis;
};

2.5. 使用方法與工作機制

使用方法在 BufferList 部分已經講過了,創建一個 Readable 實例,並實現其_read () 方法,或者在構造函數的第一個對象參數中實現 read 方法。

2.5.1. 工作機制

這裏只畫了大致的流程,以及 Readable 流的模式轉換觸發條件。

其中:

3. 總結

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/O59JUnbWshJSWrV8NBL5Ng