深入理解 node 中的文件流
爲什麼要使用文件流
想象這樣一個場景,我要處理一個 10G 的文件,但我的內存大小隻有 2G,該怎麼辦?
我們可以分 5 次讀取文件,每次只讀取 2G 的數據,這樣就可以解決這個問題,那麼這個分段讀取的過程就是流!
在 node 中 stream
模塊封裝了流的基本操作,文件流也是直接依賴的此模塊,這裏我們藉助文件流來深入理解 stream
文件可讀流
讀取文件,將文件內容一點一點的讀入內存當中。
使用方式
我們先看一下基本的使用方式。
const fs = require('fs')
const rs = fs.createReadStream('./w-test.js')
rs.on('data', (chunk) => {
console.log(chunk)
})
rs.on('close', () => {
console.log('close')
})
如上代碼所示,我們通過 fs.createStream()
創建了一個可讀流,用來讀取 w-test.js 文件。
當 on('data')
時,會自動的讀取文件數據,每次默認讀取 64kb 的內容,也可以通過 highWaterMark
參數來動態改變每次內容流程的閾值。
文件讀取完畢後會自動觸發 close
事件。
如下代碼爲 createReadStream
可以配置的參數
const rs = fs.createReadStream('./w-test.js', {
flags: 'r', // 文件系統表示,這裏是指以可讀的形式操作文件
encoding: null, // 編碼方式
autoClose: false, // 讀取完畢時是否自動觸發 close 事件
start: 0, // 開始讀取的位置
end: 2, // 結束讀取的位置
highWaterMark: 2 // 每次讀取的內容大小
})
注意: start 跟 end 都是包含的,即 [start, end]。
其實,fs.crateReadStream
就是返回一個 fs.ReadStream
類的實例,所以上述代碼就等同於:
const rs = new fs.ReadStream('./w-test.js', {
flags: 'r', // 文件系統表示,這裏是指以可讀的形式操作文件
encoding: null, // 編碼方式
autoClose: false, // 讀取完畢時是否自動觸發 close 事件
start: 0, // 開始讀取的位置
end: 2, // 結束讀取的位置
highWaterMark: 2 // 每次讀取的內容大小
})
文件可讀流的實現
瞭解完使用方式,那我們就應該嘗試從原理上去搞定它,接下來,我們手寫一個可讀流。
fs.read / fs.open
可讀流的本質就是分批讀取文件數據,而 fs.read()
方法可以控制讀取文件的內容大小
fs.read(fd, buffer, offset, length, position, callback)
-
fd:要讀取的文件描述符
-
buffer:數據要被寫入的 buffer(將讀取到的文件內容寫入到此 buffer 內)
-
offset:buffer 中開始寫入的偏移量(從 buffer 的第幾個索引開始寫入)
-
length:讀取的字節數(從文件中讀取幾個字節)
-
postion:指定從文件中開始讀取的位置(從文件的第幾個字節開始讀)
-
callback:回調函數
-
err
-
bytesRead:實際讀取的字節數
讀取文件需要一個文件標識符,我們應該需要使用 fs.open
來獲取
-
path:文件路徑
-
flags:文件系統標誌,默認值:'r'。意思是要對文件進行什麼操作,常見的有以下幾種:
-
r:打開文件用於讀取
-
w:打開文件用於寫入
-
a:打開文件用於追加
-
mode:文件操作權限,默認值:0o666(可讀寫)。
-
callback:回調函數。函數上攜帶的參數如下:
-
err:如果失敗,則值爲錯誤原因
-
fd(number):文件描述符,讀取、寫入文件時都要用到這個值
初始化
首先,ReadStream
是一個類,從表現上來看這個類可以監聽事件即 on('data')
,所以我們應該讓它繼承自 EventEmitter
,如下代碼:
class ReadStream extends EventEmitter {
constructor() {
super();
}
}
然後我們初始化參數,並打開文件,如下代碼(代碼中會對關鍵代碼作註釋):
class ReadStream extends EventEmitter {
constructor(path, options = {}) {
super()
// 解析參數
this.path = path
this.flags = options.flags ?? 'r'
this.encoding = options.encoding ?? 'utf8'
this.autoClose = options.autoClose ?? true
this.start = options.start ?? 0
this.end = options.end ?? undefined
this.highWaterMark = options.highWaterMark ?? 16 * 1024
// 文件的偏移量
this.offset = this.start
// 是否處於流動狀態,調用 pause 或 resume 方法時會用到,下文會講到
this.flowing = false
// 打開文件
this.open()
// 當綁定新事件時會觸發 newListener
// 這裏當綁定 data 事件時,自動觸發文件的讀取
this.on('newListener', (type) => {
if (type === 'data') {
// 標記爲開始流動
this.flowing = true
// 開始讀取文件
this.read()
}
})
}
}
-
讀取文件之前,我們要先打開文件,即
this.open()
。 -
on('newListener')
是 EventEmitter 的一個事件,每當我們綁定新的事件時都會觸發newListener
,例如:當我們on('data')
時,會觸發newListener
事件,並且 type 爲'data'。 -
這裏當我們監聽到
data
事件綁定(即on('data')
)時,就開始讀取文件即this.read()
,this.read()
是我們核心方法。
open
open
方法如下:
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
// 文件打開失敗觸發 error 事件
this.emit('error', err)
return
}
// 記錄文件標識符
this.fd = fd
// 文件打開成功後觸發 open 事件
this.emit('open')
})
}
當打開文件後記錄下文件標識符,即 this.fd
read
read
方法如下:
read() {
// 由於 ```fs.open``` 是異步操作,
// 所以當調用 read 方法時,文件可能還沒有打開
// 所以我們要等 open 事件觸發之後,再次調用 read 方法
if (typeof this.fd !== 'number') {
this.once('open', () => this.read())
return
}
// 申請一個 highWaterMark 字節的 buffer,
// 用來存儲從文件讀取的內容
const buf = Buffer.alloc(this.highWaterMark)
// 開始讀取文件
// 每次讀取時,都記錄下文件的偏移量
fs.read(this.fd, buf, 0, buf.length, this.offset, (err, bytesRead) => {
this.offset += bytesRead
// bytesRead 爲實際讀取的文件字節數
// 如果 bytesRead 爲 0,則代表沒有讀取到內容,即讀取完畢
if (bytesRead) {
// 每次讀取都觸發 data 事件
this.emit('data', buf.slice(0, bytesRead))
// 如果處於流動狀態,則繼續讀取
// 這裏當調用 pause 方法時,會將 this.flowing 置爲 false
this.flowing && this.read()
} else {
// 讀取完畢後觸發 end 事件
this.emit('end')
// 如果可以自動關閉,則關閉文件並觸發 close 事件
this.autoClose && fs.close(this.fd, () => this.emit('close'))
}
})
}
上述每行代碼都有註釋,相信也不難理解,這裏有幾個關鍵點要注意一下
-
一定要等文件打開後才能開始讀取文件,但是文件打開是一個異步操作,我們並不知道具體的打開完畢時間,所以,我們會在文件打開後觸發一個
on('open')
事件,read 方法內會等open
事件觸發後再次重新調用read()
-
fs.read()
方法之前有講過,可以從前文回顧裏看一下 手寫 fs 核心方法 -
this.flowing
屬性是用來判斷是否是流動的,會用對應的pasue()
方法與resume()
來控制,下面我們來看一下這兩個方法。
pause
pause() {
this.flowing =false
}
resume
resume() {
if (!this.flowing) {
this.flowing = true
this.read()
}
}
完整代碼
const { EventEmitter } = require('events')
const fs = require('fs')
class ReadStream extends EventEmitter {
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags ?? 'r'
this.encoding = options.encoding ?? 'utf8'
this.autoClose = options.autoClose ?? true
this.start = options.start ?? 0
this.end = options.end ?? undefined
this.highWaterMark = options.highWaterMark ?? 16 * 1024
this.offset = this.start
this.flowing = false
this.open()
this.on('newListener', (type) => {
if (type === 'data') {
this.flowing = true
this.read()
}
})
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err)
return
}
this.fd = fd
this.emit('open')
})
}
pause() {
this.flowing =false
}
resume() {
if (!this.flowing) {
this.flowing = true
this.read()
}
}
read() {
if (typeof this.fd !== 'number') {
this.once('open', () => this.read())
return
}
const buf = Buffer.alloc(this.highWaterMark)
fs.read(this.fd, buf, 0, buf.length, this.offset, (err, bytesRead) => {
this.offset += bytesRead
if (bytesRead) {
this.emit('data', buf.slice(0, bytesRead))
this.flowing && this.read()
} else {
this.emit('end')
this.autoClose && fs.close(this.fd, () => this.emit('close'))
}
})
}
}
文件可寫流
顧名思義了,將內容一點一點寫入到文件裏去。
fs.write
-
fd:要被寫入的文件描述符
-
buffer:將指定 buffer 的內容寫入到文件中去
-
offset:指定 buffer 的寫入位置(從 buffer 的第 offset 個索引讀取內容寫入到文件中去)
-
length:指定要寫入的字節數
-
position:文件的偏移量(從文件的第 position 個字節開始寫入)
使用方式
// 使用方式 1:
const ws = fs.createWriteStream('./w-test.js')
// 使用方式 2:
const ws = new WriteStream('./w-test.js', {
flags: 'w',
encoding: 'utf8',
autoClose: true,
highWaterMark: 2
})
// 寫入文件
const flag = ws.write('2')
ws.on('drain', () => console.log('drain'))
-
ws.write()
寫入文件。這裏有一個返回值,代表是否已經達到最大緩存。當我們同步連續調用多次write()
時,並不是每次調用都立即寫入文件,而是同一時間只能執行一次寫入操作,所以剩下的會被寫入到緩存中,等上一次寫入完畢後再從緩存中依次取出執行。所以,這時就會有一個最大的緩存大小,默認爲 64kb。而這裏的返回值則代表是否還可以繼續寫入,也就是:是否達到了最大緩存。true 代表可以繼續寫入。 -
ws.on('drain')
,如果調用ws.write()
返回 false,則當可以繼續寫入數據到流時會觸發'drain' 事件。
文件可寫流的實現
初始化
先定義 WriteStream
類,並繼承 EventEmitter
, 然後,初始化參數。注意看代碼註釋
const { EventEmitter } = require('events')
const fs = require('fs')
class WriteStream extends EventEmitter {
constructor(path, options = {}) {
super()
// 初始化參數
this.path = path
this.flags = options.flags ?? 'w'
this.encoding = options.encoding ?? 'utf8'
this.autoClose = options.autoClose ?? true
this.highWaterMark = options.highWaterMark ?? 16 * 1024
this.offset = 0 // 文件讀取偏移量
this.cache = [] // 緩存的要被寫入的內容
// 將要被寫入的總長度,包括緩存中的內容長度
this.writtenLen = 0
// 是否正在執行寫入操作,
// 如果正在寫入,那以後的操作需放入 this.cache
this.writing = false
// 是否應該觸發 drain 事件
this.needDrain = false
// 打開文件
this.open()
}
}
open()
跟 ReadStream 一樣的代碼。
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err)
return
}
this.fd = fd
this.emit('open')
})
}
write()
執行寫入操作
write(chunk, encoding, cb = () => {}) {
// 初始化被寫入的內容
// 如果時字符串,則轉爲 buffer
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding)
// 計算要被寫入的長度
this.writtenLen += chunk.length
// 判斷是否已經超過 highWaterMark
const hasLimit = this.writtenLen >= this.highWaterMark
// 是否需要觸發 drain
// 如果超過 highWaterMark,則代表需要觸發
this.needDrain = hasLimit
// 如果沒有正在寫入的內容,則調用 _write 直接開始寫入
// 否則放入 cache 中
// 寫入完成後,調用 clearBuffer,從緩存中拿取最近一次內容開始寫入
if (!this.writing) {
this.writing = true
this._write(chunk, () => {
cb()
this.clearBuffer()
})
} else {
this.cache.push({
chunk: chunk,
cb
})
}
return !hasLimit
}
// 寫入操作
_write(chunk, cb) {
if (typeof this.fd !== 'number') {
this.once('open', () => this._write(chunk, cb))
return
}
// 寫入文件
fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, bytesWritten) => {
if (err) {
this.emit('error', err)
return
}
// 計算偏移量
this.offset += bytesWritten
// 寫入完畢,則減去當前寫入的長度
this.writtenLen -= bytesWritten
cb()
})
}
-
首先初始化要被寫入的內容,只支持 buffer 跟字符串,如果是字符串則直接轉爲 buffer。
-
計算要被寫入的總長度,即
this.writtenLen += chunk.length
-
判斷是否已經超過 highWaterMark
-
判斷是否需要觸發 drain
-
判斷是否已經有正在被寫入的內容了,如果沒有則調用
_write()
直接寫入,如果有則放入緩存中。當_write()
寫入完畢後,調用clearBuffer()
方法,從this.cache
中取出最先被緩存的內容進行寫入操作。clearBuffer 方法如下所示
clearBuffer()
clearBuffer() {
// 取出緩存
const data = this.cache.shift()
if (data) {
const { chunk, cb } = data
// 繼續進行寫入操作
this._write(chunk, () => {
cb()
this.clearBuffer()
})
return
}
// 觸發 drain
this.needDrain && this.emit('drain')
// 寫入完畢,將writing置爲false
this.writing = false
// needDrain 置爲 false
this.needDrain = false
}
完整代碼
const { EventEmitter } = require('events')
const fs = require('fs')
class WriteStream extends EventEmitter {
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags ?? 'w'
this.encoding = options.encoding ?? 'utf8'
this.autoClose = options.autoClose ?? true
this.highWaterMark = options.highWaterMark ?? 16 * 1024
this.offset = 0
this.cache = []
this.writtenLen = 0
this.writing = false
this.needDrain = false
this.open()
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err)
return
}
this.fd = fd
this.emit('open')
})
}
clearBuffer() {
const data = this.cache.shift()
if (data) {
const { chunk, cb } = data
this._write(chunk, () => {
cb()
this.clearBuffer()
})
return
}
this.needDrain && this.emit('drain')
this.writing = false
this.needDrain = false
}
write(chunk, encoding, cb = () => {}) {
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding)
this.writtenLen += chunk.length
const hasLimit = this.writtenLen >= this.highWaterMark
this.needDrain = hasLimit
if (!this.writing) {
this.writing = true
this._write(chunk, () => {
cb()
this.clearBuffer()
})
} else {
this.cache.push({
chunk: chunk,
cb
})
}
return !hasLimit
}
_write(chunk, cb) {
if (typeof this.fd !== 'number') {
this.once('open', () => this._write(chunk, cb))
return
}
fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, bytesWritten) => {
if (err) {
this.emit('error', err)
return
}
this.offset += bytesWritten
this.writtenLen -= bytesWritten
cb()
})
}
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/NCe0hkeXPLb9YS1Wr-Lxtg