流操作 API 源码实现
文件可读流实现
处理错误和文件标志符
const fs = require('fs')
const EventEmitter = require('events')
class $ReadStream extends EventEmitter {
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags || 'r'
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.end
this.highWaterMark = options.highWaterMark || 64 * 1024
this.open()
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit('error', err)
return
}
this.fd = fd
this.emit('open', fd)
})
}
}
const rs = new $FileReadStream('test.txt')
rs.on('open', fd => {
console.log('open', fd)
})
rs.on('error', err => {
console.log('error', err)
})
js
处理数据读取、close、end 事件
const fs = require('fs')
const EventEmitter = require('events')
class $FileReadStream extends EventEmitter {
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags || 'r'
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.end
this.highWaterMark = options.highWaterMark || 64 * 1024
this.readOffset = 0
this.open()
// all listener
this.on('newListener', type => {
switch (type) {
case 'data':
this.read()
break
}
})
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit('error', err)
return
}
this.fd = fd
this.emit('open', fd)
})
}
read() {
if (typeof this.fd !== 'number') {
return this.once('open', this.read)
}
const buf = Buffer.alloc(this.highWaterMark)
fs.read(this.fd, buf, 0, this.highWaterMark, this.readOffset, (err, readBytes) => {
if (readBytes) {
this.readOffset += readBytes
this.emit('data', buf.slice(0, readBytes))
this.read()
return
}
this.emit('end')
this.close()
})
}
close() {
fs.close(this.fd, () => {
this.emit('close')
})
}
}
// -----------------
const rs = new $FileReadStream('test.txt', {
highWaterMark: 3
})
// rs.on('open', fd => {
// console.log('open', fd)
// })
// rs.on('error', err => {
// console.log('error', err)
// })
rs.on('data', chunk => {
console.log('data', chunk)
})
rs.on('end', () => {
console.log('end')
})
rs.on('close', () => {
console.log('close')
})
js
处理 end 配置
const fs = require('fs')
const EventEmitter = require('events')
class $FileReadStream extends EventEmitter {
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags || 'r'
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.end
this.highWaterMark = options.highWaterMark || 64 * 1024
this.readOffset = 0
this.open()
// all listener
this.on('newListener', type => {
switch (type) {
case 'data':
this.read()
break
}
})
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit('error', err)
return
}
this.fd = fd
this.emit('open', fd)
})
}
read() {
if (typeof this.fd !== 'number') {
return this.once('open', this.read)
}
const buf = Buffer.alloc(this.highWaterMark)
const howMuchToRead = this.end
? Math.min(this.end - this.readOffset + 1, this.highWaterMark)
: this.highWaterMark
fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
if (readBytes) {
this.readOffset += readBytes
this.emit('data', buf.slice(0, readBytes))
this.read()
return
}
this.emit('end')
this.close()
})
}
close() {
fs.close(this.fd, () => {
this.emit('close')
})
}
}
// -----------------
const rs = new $FileReadStream('test.txt', {
end: 7,
highWaterMark: 3
})
// rs.on('open', fd => {
// console.log('open', fd)
// })
// rs.on('error', err => {
// console.log('error', err)
// })
rs.on('data', chunk => {
console.log('data', chunk)
})
rs.on('end', () => {
console.log('end')
})
rs.on('close', () => {
console.log('close')
})
js
链表
链表结构
文件可写流的 read 方法工作时,有些被写入的内容需要在缓冲区排队等待,遵循先进选出的规则,为了保存这些数据,新版 node 中就采用了链表的结构来存储这些数据。
为什么不采用数组存储数组?
- 数组缺点:
- 数组存储数据的长度具有上限
- 数组存在塌陷问题,需要频繁移动位置
链表是一系列节点的集合,每个节点都具有指向下一个节点的属性。
链表分类:
- 双向链表
- 最常用,查询速度比较快
- 单向链表
- 循环链表
单向链表实现
/**
* 1. node + head + null
* 2. head -> null
* 3. size
* 4. next element
* 5. add、delete、set、get、clear
*/
class Node {
constructor(element, next) {
this.element = element
this.next = next
}
}
class LinkedList {
constructor() {
this.head = null
this.size = 0
}
add(index, element) {
if (arguments.length === 1) {
element = index
index = this.size
}
if (index < 0 || index > this.size) {
throw new Error('out of bounds')
}
if (index == 0) {
const head = this.head
this.head = new Node(element, head)
} else {
const prevNode = this._getNode(index - 1)
prevNode.next = new Node(element, prevNode.next)
}
this.size++
}
remove(index) {
if (index === 0) {
const head = this.head
this.head = head.next
} else {
const prevNode = this._getNode(index - 1)
prevNode.next = prevNode.next.next
}
this.size--
}
set(index, element) {
const node = this._getNode(index)
node.element = element
}
clear() {
this.head = null
this.size = 0
}
get(index) {
return this._getNode(index)
}
_getNode(index) {
if (index < 0 || index >= this.size) {
throw new Error('out of bounds')
}
let currentNode = this.head
for (let i = 0; i < index; i++) {
currentNode = currentNode.next
}
return currentNode
}
}
// ------------------------
const l1 = new LinkedList()
l1.add('node01')
l1.add('node02')
l1.add(1, 'node3')
// l1.remove(0)
// l1.remove(1)
l1.set(1, 'node3-3')
console.log(l1.get(0))
l1.clear()
console.log(l1)
js
链表实现队列
// linked_list.js
class LinkedList {
remove(index) {
let curNode = null
if (index === 0) {
curNode = this.head
if (!curNode) return undefined
this.head = curNode.next
} else {
const prevNode = this._getNode(index - 1)
curNode = prevNode.next
prevNode.next = curNode.next.next
}
this.size--
return curNode
}
}
js
// linked_queue.js
const { LinkedList } = require('./linked_list')
class Queue {
constructor() {
this.linkedList = new LinkedList()
}
push(element) {
this.linkedList.add(element)
}
shift() {
return this.linkedList.remove(0)
}
}
const q = new Queue()
console.log(q)
q.push('node1')
q.push('node2')
console.log(q.shift())
console.log(q.shift())
console.log(q.shift())
js
文件可写流实现
可写流基础实现
const fs = require('fs')
const EventListener = require('events')
const { Queue } = require('./linked_queue')
class $WriteStream extends EventListener {
constructor(path, options) {
super()
this.path = path
this.flags = options.flags || 'w'
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.encoding = options.encoding || 'uft8'
this.highWaterMark = options.highWaterMark || 16 * 1024
this.open()
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err)
}
// normal
this.fd = fd
this.emit('open', fd)
})
}
}
const ws = new $WriteStream('./test04.txt', {})
ws.on('open', fd => {
console.log('open file: ', fd)
})
js
write 方法实现以及 cache 使用
const fs = require('fs')
const EventListener = require('events')
const { Queue } = require('./linked_queue')
class $WriteStream extends EventListener {
constructor(path, options) {
super()
this.path = path
this.flags = options.flags || 'w'
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.encoding = options.encoding || 'uft8'
this.highWaterMark = options.highWaterMark || 16 * 1024
this.open()
this.writeOffset = this.start
this.wirtting = false
this.writeLen = 0
this.needDrain = false
this.cache = new Queue()
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err)
}
// normal
this.fd = fd
this.emit('open', fd)
})
}
write(chunk, encoding, cb) {
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)
this.writeLen += chunk.length
const falg = this.writeLen < this.highWaterMark
this.needDrain = !falg
if (this.wirtting) {
// 正在执行写入,需要排队
this.cache.push({
chunk,
encoding,
cb
})
} else {
this.wirtting = true
// 正常首次写入
this._write(chunk, encoding, () => {
cb()
// 清空排队内容
this._clearBuffer()
})
}
return falg
}
_write(chunk, encoding, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', () => {
return this._write(chunk, encoding, cb)
})
}
fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {
this.writeOffset += written
this.writeLen -= written
cb && cb()
})
}
_clearBuffer() {
const data = this.cache.shift()
if (data) {
this._write(data.element.chunk, data.element.encoding, () => {
data.element.cb && data.element.cb()
this._clearBuffer()
})
} else {
if (this.needDrain) {
this.needDrain = false
this.emit('drain')
}
}
}
}
const ws = new $WriteStream('./test04.txt', {
highWaterMark: 2
})
ws.on('open', fd => {
console.log('open file: ', fd)
})
let flag = ws.write('1', ' utf8', () => {
console.log('write success')
})
// console.log('flag: ', flag)
flag = ws.write('23', 'utf8', () => {
console.log('write success')
})
// console.log('flag: ', flag)
flag = ws.write('月落', 'utf8', () => {
console.log('write success')
})
ws.on('drain', () => {
console.log('drain')
})
js
pipe 方法实现
文件读写操作的终极语法糖,无论是文件可写流还是可读流,核心目的还是为了完成数据读取和数据写入,本质还是执行文件拷贝行为。
基础使用
const fs = require('fs')
const rs = fs.createReadStream('./test04.txt', {
highWaterMark: 4 // default: 64kb
})
const ws = fs.createWriteStream('./test05.txt', {
highWaterMark: 1 // default: 16kb
})
rs.pipe(ws)
js
自定义实现
const fs = require('fs')
const EventEmitter = require('events')
class $ReadStream extends EventEmitter {
pipe(ws) {
this.on('data', data => {
const flag = ws.write(data)
console.log(data)
if (!flag) {
this.pause()
}
})
ws.on('drain', () => {
this.resume()
})
}
}
js
const fs = require('fs')
const { $ReadStream } = require('./read_stream')
const rs = new $ReadStream('./test04.txt', {
highWaterMark: 4 // default: 64kb
})
const ws = fs.createWriteStream('./test05.txt')
rs.pipe(ws)
js