流操作 API 源码实现

文件可读流实现

处理错误和文件标志符

const fs = require('fs')

const EventEmitter = require('events')

class $ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super()
    this.path = path
    this.flags = options.flags || 'r'
    this.mode = options.mode || 438
    this.autoClose = options.autoClose || true
    this.start = options.start || 0
    this.end = options.end
    this.highWaterMark = options.highWaterMark || 64 * 1024

    this.open()
  }

  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        this.emit('error', err)
        return
      }

      this.fd = fd
      this.emit('open', fd)
    })
  }
}

const rs = new $FileReadStream('test.txt')

rs.on('open', fd => {
  console.log('open', fd)
})

rs.on('error', err => {
  console.log('error', err)
})
js

处理数据读取、close、end 事件

const fs = require('fs')

const EventEmitter = require('events')

class $FileReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super()
    this.path = path
    this.flags = options.flags || 'r'
    this.mode = options.mode || 438
    this.autoClose = options.autoClose || true
    this.start = options.start || 0
    this.end = options.end
    this.highWaterMark = options.highWaterMark || 64 * 1024
    this.readOffset = 0

    this.open()

    // all listener
    this.on('newListener', type => {
      switch (type) {
        case 'data':
          this.read()
          break
      }
    })
  }

  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        this.emit('error', err)
        return
      }

      this.fd = fd
      this.emit('open', fd)
    })
  }

  read() {
    if (typeof this.fd !== 'number') {
      return this.once('open', this.read)
    }

    const buf = Buffer.alloc(this.highWaterMark)

    fs.read(this.fd, buf, 0, this.highWaterMark, this.readOffset, (err, readBytes) => {
      if (readBytes) {
        this.readOffset += readBytes
        this.emit('data', buf.slice(0, readBytes))
        this.read()
        return
      }

      this.emit('end')
      this.close()
    })
  }

  close() {
    fs.close(this.fd, () => {
      this.emit('close')
    })
  }
}

// -----------------

const rs = new $FileReadStream('test.txt', {
  highWaterMark: 3
})

// rs.on('open', fd => {
//   console.log('open', fd)
// })

// rs.on('error', err => {
//   console.log('error', err)
// })

rs.on('data', chunk => {
  console.log('data', chunk)
})

rs.on('end', () => {
  console.log('end')
})

rs.on('close', () => {
  console.log('close')
})
js

处理 end 配置

const fs = require('fs')

const EventEmitter = require('events')

class $FileReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super()
    this.path = path
    this.flags = options.flags || 'r'
    this.mode = options.mode || 438
    this.autoClose = options.autoClose || true
    this.start = options.start || 0
    this.end = options.end
    this.highWaterMark = options.highWaterMark || 64 * 1024
    this.readOffset = 0

    this.open()

    // all listener
    this.on('newListener', type => {
      switch (type) {
        case 'data':
          this.read()
          break
      }
    })
  }

  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        this.emit('error', err)
        return
      }

      this.fd = fd
      this.emit('open', fd)
    })
  }

  read() {
    if (typeof this.fd !== 'number') {
      return this.once('open', this.read)
    }

    const buf = Buffer.alloc(this.highWaterMark)

    const howMuchToRead = this.end
      ? Math.min(this.end - this.readOffset + 1, this.highWaterMark)
      : this.highWaterMark

    fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
      if (readBytes) {
        this.readOffset += readBytes
        this.emit('data', buf.slice(0, readBytes))
        this.read()
        return
      }

      this.emit('end')
      this.close()
    })
  }

  close() {
    fs.close(this.fd, () => {
      this.emit('close')
    })
  }
}

// -----------------

const rs = new $FileReadStream('test.txt', {
  end: 7,
  highWaterMark: 3
})

// rs.on('open', fd => {
//   console.log('open', fd)
// })

// rs.on('error', err => {
//   console.log('error', err)
// })

rs.on('data', chunk => {
  console.log('data', chunk)
})

rs.on('end', () => {
  console.log('end')
})

rs.on('close', () => {
  console.log('close')
})
js

链表

链表结构

文件可写流的 read 方法工作时,有些被写入的内容需要在缓冲区排队等待,遵循先进选出的规则,为了保存这些数据,新版 node 中就采用了链表的结构来存储这些数据。

为什么不采用数组存储数组?

  • 数组缺点:
  • 数组存储数据的长度具有上限
  • 数组存在塌陷问题,需要频繁移动位置

链表是一系列节点的集合,每个节点都具有指向下一个节点的属性。

链表分类:

  • 双向链表
    • 最常用,查询速度比较快
  • 单向链表
  • 循环链表

单向链表实现

/**
 * 1. node + head + null
 * 2. head -> null
 * 3. size
 * 4. next element
 * 5. add、delete、set、get、clear
 */

class Node {
  constructor(element, next) {
    this.element = element
    this.next = next
  }
}

class LinkedList {
  constructor() {
    this.head = null
    this.size = 0
  }

  add(index, element) {
    if (arguments.length === 1) {
      element = index
      index = this.size
    }

    if (index < 0 || index > this.size) {
      throw new Error('out of bounds')
    }

    if (index == 0) {
      const head = this.head
      this.head = new Node(element, head)
    } else {
      const prevNode = this._getNode(index - 1)
      prevNode.next = new Node(element, prevNode.next)
    }

    this.size++
  }

  remove(index) {
    if (index === 0) {
      const head = this.head
      this.head = head.next
    } else {
      const prevNode = this._getNode(index - 1)
      prevNode.next = prevNode.next.next
    }

    this.size--
  }

  set(index, element) {
    const node = this._getNode(index)
    node.element = element
  }

  clear() {
    this.head = null
    this.size = 0
  }

  get(index) {
    return this._getNode(index)
  }

  _getNode(index) {
    if (index < 0 || index >= this.size) {
      throw new Error('out of bounds')
    }

    let currentNode = this.head

    for (let i = 0; i < index; i++) {
      currentNode = currentNode.next
    }

    return currentNode
  }
}

// ------------------------

const l1 = new LinkedList()

l1.add('node01')
l1.add('node02')
l1.add(1, 'node3')

// l1.remove(0)
// l1.remove(1)

l1.set(1, 'node3-3')

console.log(l1.get(0))

l1.clear()

console.log(l1)
js

链表实现队列

// linked_list.js

class LinkedList {
  remove(index) {
    let curNode = null

    if (index === 0) {
      curNode = this.head

      if (!curNode) return undefined

      this.head = curNode.next
    } else {
      const prevNode = this._getNode(index - 1)

      curNode = prevNode.next
      prevNode.next = curNode.next.next
    }

    this.size--

    return curNode
  }
}
js
// linked_queue.js

const { LinkedList } = require('./linked_list')

class Queue {
  constructor() {
    this.linkedList = new LinkedList()
  }

  push(element) {
    this.linkedList.add(element)
  }

  shift() {
    return this.linkedList.remove(0)
  }
}

const q = new Queue()

console.log(q)

q.push('node1')
q.push('node2')

console.log(q.shift())
console.log(q.shift())
console.log(q.shift())
js

文件可写流实现

可写流基础实现

const fs = require('fs')
const EventListener = require('events')

const { Queue } = require('./linked_queue')

class $WriteStream extends EventListener {
  constructor(path, options) {
    super()
    this.path = path
    this.flags = options.flags || 'w'
    this.mode = options.mode || 438
    this.autoClose = options.autoClose || true
    this.start = options.start || 0
    this.encoding = options.encoding || 'uft8'
    this.highWaterMark = options.highWaterMark || 16 * 1024

    this.open()
  }

  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        this.emit('error', err)
      }

      // normal
      this.fd = fd
      this.emit('open', fd)
    })
  }
}

const ws = new $WriteStream('./test04.txt', {})

ws.on('open', fd => {
  console.log('open file: ', fd)
})
js

write 方法实现以及 cache 使用

const fs = require('fs')
const EventListener = require('events')

const { Queue } = require('./linked_queue')

class $WriteStream extends EventListener {
  constructor(path, options) {
    super()
    this.path = path
    this.flags = options.flags || 'w'
    this.mode = options.mode || 438
    this.autoClose = options.autoClose || true
    this.start = options.start || 0
    this.encoding = options.encoding || 'uft8'
    this.highWaterMark = options.highWaterMark || 16 * 1024

    this.open()

    this.writeOffset = this.start
    this.wirtting = false
    this.writeLen = 0
    this.needDrain = false
    this.cache = new Queue()
  }

  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        this.emit('error', err)
      }

      // normal
      this.fd = fd
      this.emit('open', fd)
    })
  }

  write(chunk, encoding, cb) {
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)

    this.writeLen += chunk.length

    const falg = this.writeLen < this.highWaterMark

    this.needDrain = !falg

    if (this.wirtting) {
      // 正在执行写入,需要排队
      this.cache.push({
        chunk,
        encoding,
        cb
      })
    } else {
      this.wirtting = true
      // 正常首次写入
      this._write(chunk, encoding, () => {
        cb()
        // 清空排队内容
        this._clearBuffer()
      })
    }

    return falg
  }

  _write(chunk, encoding, cb) {
    if (typeof this.fd !== 'number') {
      return this.once('open', () => {
        return this._write(chunk, encoding, cb)
      })
    }

    fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {
      this.writeOffset += written
      this.writeLen -= written

      cb && cb()
    })
  }

  _clearBuffer() {
    const data = this.cache.shift()

    if (data) {
      this._write(data.element.chunk, data.element.encoding, () => {
        data.element.cb && data.element.cb()
        this._clearBuffer()
      })
    } else {
      if (this.needDrain) {
        this.needDrain = false
        this.emit('drain')
      }
    }
  }
}

const ws = new $WriteStream('./test04.txt', {
  highWaterMark: 2
})

ws.on('open', fd => {
  console.log('open file: ', fd)
})

let flag = ws.write('1', ' utf8', () => {
  console.log('write success')
})
// console.log('flag: ', flag)

flag = ws.write('23', 'utf8', () => {
  console.log('write success')
})
// console.log('flag: ', flag)

flag = ws.write('月落', 'utf8', () => {
  console.log('write success')
})

ws.on('drain', () => {
  console.log('drain')
})
js

pipe 方法实现

文件读写操作的终极语法糖,无论是文件可写流还是可读流,核心目的还是为了完成数据读取和数据写入,本质还是执行文件拷贝行为。

基础使用

const fs = require('fs')

const rs = fs.createReadStream('./test04.txt', {
  highWaterMark: 4 // default: 64kb
})
const ws = fs.createWriteStream('./test05.txt', {
  highWaterMark: 1 // default: 16kb
})

rs.pipe(ws)
js

自定义实现

const fs = require('fs')
const EventEmitter = require('events')

class $ReadStream extends EventEmitter {
  pipe(ws) {
    this.on('data', data => {
      const flag = ws.write(data)

      console.log(data)

      if (!flag) {
        this.pause()
      }
    })

    ws.on('drain', () => {
      this.resume()
    })
  }
}
js
const fs = require('fs')
const { $ReadStream } = require('./read_stream')

const rs = new $ReadStream('./test04.txt', {
  highWaterMark: 4 // default: 64kb
})
const ws = fs.createWriteStream('./test05.txt')

rs.pipe(ws)
js