深入理解 node 中的文件流

爲什麼要使用文件流

想象這樣一個場景,我要處理一個 10G 的文件,但我的內存大小隻有 2G,該怎麼辦?

我們可以分 5 次讀取文件,每次只讀取 2G 的數據,這樣就可以解決這個問題,那麼這個分段讀取的過程就是流!

在 node 中 stream 模塊封裝了流的基本操作,文件流也是直接依賴的此模塊,這裏我們藉助文件流來深入理解 stream

文件可讀流

讀取文件,將文件內容一點一點的讀入內存當中。

使用方式

我們先看一下基本的使用方式。

const fs = require('fs')

const rs = fs.createReadStream('./w-test.js')

rs.on('data'(chunk) ={
  console.log(chunk)
})

rs.on('close'() ={
  console.log('close')
})

如上代碼所示,我們通過 fs.createStream() 創建了一個可讀流,用來讀取 w-test.js 文件。

on('data') 時,會自動的讀取文件數據,每次默認讀取 64kb 的內容,也可以通過 highWaterMark 參數來動態改變每次內容流程的閾值。

文件讀取完畢後會自動觸發 close 事件。

如下代碼爲 createReadStream 可以配置的參數

const rs = fs.createReadStream('./w-test.js'{
  flags: 'r', // 文件系統表示,這裏是指以可讀的形式操作文件
  encoding: null, // 編碼方式
  autoClose: false, // 讀取完畢時是否自動觸發 close 事件
  start: 0, // 開始讀取的位置
  end: 2, // 結束讀取的位置
  highWaterMark: 2 // 每次讀取的內容大小
})

注意: start 跟 end 都是包含的,即 [start, end]。

其實,fs.crateReadStream 就是返回一個 fs.ReadStream 類的實例,所以上述代碼就等同於:

const rs = new fs.ReadStream('./w-test.js'{
  flags: 'r', // 文件系統表示,這裏是指以可讀的形式操作文件
  encoding: null, // 編碼方式
  autoClose: false, // 讀取完畢時是否自動觸發 close 事件
  start: 0, // 開始讀取的位置
  end: 2, // 結束讀取的位置
  highWaterMark: 2 // 每次讀取的內容大小
})

文件可讀流的實現

瞭解完使用方式,那我們就應該嘗試從原理上去搞定它,接下來,我們手寫一個可讀流。

fs.read / fs.open

可讀流的本質就是分批讀取文件數據,而 fs.read() 方法可以控制讀取文件的內容大小

fs.read(fd, buffer, offset, length, position, callback)

讀取文件需要一個文件標識符,我們應該需要使用 fs.open 來獲取

初始化

首先,ReadStream 是一個類,從表現上來看這個類可以監聽事件即 on('data'),所以我們應該讓它繼承自 EventEmitter,如下代碼:

class ReadStream extends EventEmitter {
  constructor() {
    super();
  }
}

然後我們初始化參數,並打開文件,如下代碼(代碼中會對關鍵代碼作註釋):

class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super()

    // 解析參數
    this.path = path
    this.flags = options.flags ?? 'r'
    this.encoding = options.encoding ?? 'utf8'
    this.autoClose = options.autoClose ?? true
    this.start = options.start ?? 0
    this.end = options.end ?? undefined
    this.highWaterMark = options.highWaterMark ?? 16 * 1024

    // 文件的偏移量
    this.offset = this.start

    // 是否處於流動狀態,調用 pause 或 resume 方法時會用到,下文會講到
    this.flowing = false

    // 打開文件
    this.open()

    // 當綁定新事件時會觸發 newListener
    // 這裏當綁定 data 事件時,自動觸發文件的讀取
    this.on('newListener'(type) ={
      if (type === 'data') {
        // 標記爲開始流動
        this.flowing = true
        // 開始讀取文件
        this.read()
      }
    })
  }
}

open

open 方法如下:

open() {
  fs.open(this.path, this.flags, (err, fd) ={
    if (err) {
      // 文件打開失敗觸發 error 事件
      this.emit('error', err)
      return
    }

    // 記錄文件標識符
    this.fd = fd
    // 文件打開成功後觸發 open 事件
    this.emit('open')
  })
}

當打開文件後記錄下文件標識符,即 this.fd

read

read 方法如下:

read() {
  // 由於 ```fs.open``` 是異步操作,
  // 所以當調用 read 方法時,文件可能還沒有打開
  // 所以我們要等 open 事件觸發之後,再次調用 read 方法
  if (typeof this.fd !== 'number') {
    this.once('open'() => this.read())
    return
  }

  // 申請一個 highWaterMark 字節的 buffer,
  // 用來存儲從文件讀取的內容
  const buf = Buffer.alloc(this.highWaterMark)

  // 開始讀取文件
  // 每次讀取時,都記錄下文件的偏移量
  fs.read(this.fd, buf, 0, buf.length, this.offset, (err, bytesRead) ={
    this.offset += bytesRead

    // bytesRead 爲實際讀取的文件字節數
    // 如果 bytesRead 爲 0,則代表沒有讀取到內容,即讀取完畢
    if (bytesRead) {
      // 每次讀取都觸發 data 事件
      this.emit('data', buf.slice(0, bytesRead))
      // 如果處於流動狀態,則繼續讀取
      // 這裏當調用 pause 方法時,會將 this.flowing 置爲 false
      this.flowing && this.read()
    } else {
      // 讀取完畢後觸發 end 事件
      this.emit('end')

      // 如果可以自動關閉,則關閉文件並觸發 close 事件
      this.autoClose && fs.close(this.fd, () => this.emit('close'))
    }
  })
}

上述每行代碼都有註釋,相信也不難理解,這裏有幾個關鍵點要注意一下

pause

pause() {
  this.flowing =false
}

resume

resume() {
  if (!this.flowing) {
    this.flowing = true
    this.read()
  }
}

完整代碼

const { EventEmitter } = require('events')
const fs = require('fs')

class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super()

    this.path = path
    this.flags = options.flags ?? 'r'
    this.encoding = options.encoding ?? 'utf8'
    this.autoClose = options.autoClose ?? true
    this.start = options.start ?? 0
    this.end = options.end ?? undefined
    this.highWaterMark = options.highWaterMark ?? 16 * 1024
    this.offset = this.start
    this.flowing = false

    this.open()

    this.on('newListener'(type) ={
      if (type === 'data') {
        this.flowing = true
        this.read()
      }
    })
  }

  open() {
    fs.open(this.path, this.flags, (err, fd) ={
      if (err) {
        this.emit('error', err)
        return
      }

      this.fd = fd
      this.emit('open')
    })
  }

  pause() {
    this.flowing =false
  }

  resume() {
    if (!this.flowing) {
      this.flowing = true
      this.read()
    }
  }

  read() {
    if (typeof this.fd !== 'number') {
      this.once('open'() => this.read())
      return
    }

    const buf = Buffer.alloc(this.highWaterMark)
    fs.read(this.fd, buf, 0, buf.length, this.offset, (err, bytesRead) ={
      this.offset += bytesRead
      if (bytesRead) {
        this.emit('data', buf.slice(0, bytesRead))
        this.flowing && this.read()
      } else {
        this.emit('end')
        this.autoClose && fs.close(this.fd, () => this.emit('close'))
      }
    })
  }
}

文件可寫流

顧名思義了,將內容一點一點寫入到文件裏去。

fs.write

使用方式

// 使用方式 1:
const ws = fs.createWriteStream('./w-test.js')

// 使用方式 2:
const ws = new WriteStream('./w-test.js'{
  flags: 'w',
  encoding: 'utf8',
  autoClose: true,
  highWaterMark: 2
})

// 寫入文件
const flag = ws.write('2')

ws.on('drain'() => console.log('drain'))

文件可寫流的實現

初始化

先定義 WriteStream 類,並繼承 EventEmitter, 然後,初始化參數。注意看代碼註釋

const { EventEmitter } = require('events')
const fs = require('fs')

class WriteStream extends EventEmitter {
  constructor(path, options = {}) {
    super()

    // 初始化參數
    this.path = path
    this.flags = options.flags ?? 'w'
    this.encoding = options.encoding ?? 'utf8'
    this.autoClose = options.autoClose ?? true
    this.highWaterMark = options.highWaterMark ?? 16 * 1024

    this.offset = 0 // 文件讀取偏移量
    this.cache = [] // 緩存的要被寫入的內容

    // 將要被寫入的總長度,包括緩存中的內容長度
    this.writtenLen = 0

    // 是否正在執行寫入操作,
    // 如果正在寫入,那以後的操作需放入 this.cache
    this.writing = false

    // 是否應該觸發 drain 事件
    this.needDrain = false

    // 打開文件
    this.open()
  }
}

open()

跟 ReadStream 一樣的代碼。

open() {
  fs.open(this.path, this.flags, (err, fd) ={
    if (err) {
      this.emit('error', err)
      return
    }

    this.fd = fd
    this.emit('open')
  })
}

write()

執行寫入操作

write(chunk, encoding, cb = () ={}) {
  // 初始化被寫入的內容
  // 如果時字符串,則轉爲 buffer
  chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding)
  // 計算要被寫入的長度
  this.writtenLen += chunk.length
  // 判斷是否已經超過 highWaterMark
  const hasLimit = this.writtenLen >= this.highWaterMark

  // 是否需要觸發 drain
  // 如果超過 highWaterMark,則代表需要觸發
  this.needDrain = hasLimit

  // 如果沒有正在寫入的內容,則調用 _write 直接開始寫入
  // 否則放入 cache 中
  // 寫入完成後,調用 clearBuffer,從緩存中拿取最近一次內容開始寫入
  if (!this.writing) {
    this.writing = true
    this._write(chunk, () ={
      cb()
      this.clearBuffer()
    })
  } else {
    this.cache.push({
      chunk: chunk,
      cb
    })
  }

  return !hasLimit
}

// 寫入操作
_write(chunk, cb) {
  if (typeof this.fd !== 'number') {
    this.once('open'() => this._write(chunk, cb))
    return
  }

  // 寫入文件
  fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, bytesWritten) ={
    if (err) {
      this.emit('error', err)
      return
    }

    // 計算偏移量
    this.offset += bytesWritten
    // 寫入完畢,則減去當前寫入的長度
    this.writtenLen -= bytesWritten
    cb()
  })
}
  1. 首先初始化要被寫入的內容,只支持 buffer 跟字符串,如果是字符串則直接轉爲 buffer。

  2. 計算要被寫入的總長度,即 this.writtenLen += chunk.length

  3. 判斷是否已經超過 highWaterMark

  4. 判斷是否需要觸發 drain

  5. 判斷是否已經有正在被寫入的內容了,如果沒有則調用 _write() 直接寫入,如果有則放入緩存中。當 _write() 寫入完畢後,調用 clearBuffer() 方法,從 this.cache 中取出最先被緩存的內容進行寫入操作。clearBuffer 方法如下所示

clearBuffer()

clearBuffer() {
  // 取出緩存
  const data = this.cache.shift()
  if (data) {
    const { chunk, cb } = data
    // 繼續進行寫入操作
    this._write(chunk, () ={
      cb()
      this.clearBuffer()
    })
    return
  }

  // 觸發 drain
  this.needDrain && this.emit('drain')
  // 寫入完畢,將writing置爲false
  this.writing = false
  // needDrain 置爲 false
  this.needDrain = false
}

完整代碼

const { EventEmitter } = require('events')
const fs = require('fs')

class WriteStream extends EventEmitter {
  constructor(path, options = {}) {
    super()

    this.path = path
    this.flags = options.flags ?? 'w'
    this.encoding = options.encoding ?? 'utf8'
    this.autoClose = options.autoClose ?? true
    this.highWaterMark = options.highWaterMark ?? 16 * 1024

    this.offset = 0
    this.cache = []
    this.writtenLen = 0
    this.writing = false
    this.needDrain = false

    this.open()
  }

  open() {
    fs.open(this.path, this.flags, (err, fd) ={
      if (err) {
        this.emit('error', err)
        return
      }

      this.fd = fd
      this.emit('open')
    })
  }

  clearBuffer() {
    const data = this.cache.shift()
    if (data) {
      const { chunk, cb } = data
      this._write(chunk, () ={
        cb()
        this.clearBuffer()
      })
      return
    }

    this.needDrain && this.emit('drain')
    this.writing = false
    this.needDrain = false
  }

  write(chunk, encoding, cb = () ={}) {
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding)
    this.writtenLen += chunk.length
    const hasLimit = this.writtenLen >= this.highWaterMark
    this.needDrain = hasLimit

    if (!this.writing) {
      this.writing = true
      this._write(chunk, () ={
        cb()
        this.clearBuffer()
      })
    } else {
      this.cache.push({
        chunk: chunk,
        cb
      })
    }

    return !hasLimit
  }

  _write(chunk, cb) {
    if (typeof this.fd !== 'number') {
      this.once('open'() => this._write(chunk, cb))
      return
    }

    fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, bytesWritten) ={
      if (err) {
        this.emit('error', err)
        return
      }

      this.offset += bytesWritten
      this.writtenLen -= bytesWritten
      cb()
    })
  }
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/NCe0hkeXPLb9YS1Wr-Lxtg