为什么应该使用流
你可能看过这样的代码。
var http = require('http'); var fs = require('fs') var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, dat res.end(data); }); }); server.listen(8000); 复制代码
这段代码中,服务器每收到一次请求,就会先把data.txt读入到内存中,然后再从内存取出返回给客户端。尴尬的是,如果data.txt非常的大,而每次请求都需要先把它全部存到内存,再全部取出,不仅会消耗服务器的内存,也可能造成用户等待时间过长。
幸好,HTTP请求中的request对象和response对象都是流对象,于是我们可以换一种更好的方法:
var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { let stream = fs.createReadStream(__dirname + '/data.txt');//创造可读流 stream.pipe(res);//将可读流写入response }); server.listen(8000); 复制代码
pipe方法如同stream和response之间的一个管道,将data.txt文件一小段一小段地发送到客户端,减小了服务器的内存压力。
比喻理解Stream
在node中,一共有五种类型的流:readable,writable,transform,duplex以及"classic"。其中最核心的是可读流和可写流。我们举个栗子来生动形象地理解它们。
可读流可理解为:从一个装满了水的水桶中一点一点把水抽取出来的过程
可写流可理解为:把从可读流抽出来的水一点一点倒入一个空的桶中的过程
也可以以经典的生产者和消费者的问题来理解Stream,生产者不断在缓存中制造产品,而消费者则不断地从缓存中消费产品
readableStream
可读流(Readable streams)是对提供数据的 源头 (source)的抽象 可读流的流程如图所示
资源的数据流并不是直接流向消费者,而是先 push 到缓存池,缓存池有一个水位标记 highWatermark,超过这个标记阈值,push 的时候会返回 false,从而控制读取数据流的速度,如同水管上的阀门,当水管面装满了水,就暂时关上阀门,不再从资源里“抽水”出来。什么场景下会出现这种情况呢?
-
消费者主动执行了 .pause()
-
消费速度比数据 push 到缓存池的生产速度慢
可读流有两种模式,flowing和pause
-
flowing模式下 可读流可自动从资源读取数据
-
pause模式下 需要显式调用stream.read()方法来读取数据
缓存池就像一个空的水桶,消费者通过管口接水,同时,资源池就像一个水泵,不断地往水桶中泵水,而 highWaterMark 是水桶的浮标,达到阈值就停止蓄水。下面是一个简单的flowing模式 Demo:
const Readable = require('stream').Readable class MyReadable extends Readable{ constructor(dataSource, options){ super(options) this.dataSource = dataSource } //_read表示需要从MyReadable类内部调用该方法 _read(){ const data = this.dataSource.makeData() this.push(data) } } //模拟资源池 const dataSource = { data: new Array('abcdefghijklmnopqrstuvwxyz'), makeData: function(){ if(!this.data.length) return null return this.data.pop() } } const myReadable = new MyReadable(dataSource); myReadable.setEncoding('utf8'); myReadable.on('data', (chunk) => { console.log(chunk); }); 复制代码
另外一种模式是pause模式,这种模式下可读流有三种状态
- readable._readableState.flowing = null 目前没有数据消费者,所以不会从资源库中读取数据
- readable._readableState.flowing = false 暂停从资源库读取数据,但 不会 暂停数据生成,主动触发了 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure)可达到此状态
- readable._readableState.flowing = true 正在从资源库中读取数据,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法可达到此状态 一个简单的切换状态的demo:
const myReadable = new MyReadable(dataSource); myReadable.setEncoding('utf8'); myReadable.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes of data.`); myReadable.pause() console.log('pausing for 1 second') setTimeout(()=>{ console.log('now restart') myReadable.resume() }, 1000) }); 复制代码
pause模式的流程图如下
资源池会不断地往缓存池输送数据,直到 highWaterMark 阈值,消费者需要主动调用 .read([size]) 函数才会从缓存池取出,并且可以带上 size 参数,用多少就取多少:
const myReadable = new MyReadable(dataSource); myReadable.setEncoding('utf8'); myReadable.on('readable', () => { let chunk; while (null !== (chunk = myReadable.read(5))) {//每次读5个字节 console.log(`Received ${chunk.length} bytes of data.`); } }); 复制代码
这里值得注意的是,readable事件的回调函数没有参数。因为 'readable' 事件将在流中有数据可供读取时就会触发,而在pause模式下读取数据需要显式调用read()才会消费数据 输出为:
Received 5 bytes of data. Received 5 bytes of data. Received 5 bytes of data. Received 5 bytes of data. Received 5 bytes of data. Received 1 bytes of data. 复制代码
readableStream一些需要注意的事件
- 'data' 事件会在流将数据传递给消费者时触发'end' 事件将在流中再没有数据可供消费时触发
- 'end' 事件将在流中再没有数据可供消费时触发
- 'readable'(从字面上看:“可以读的”)事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。'readable' 事件表明 流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。
- 'setEncoding'设置编码会使得该流数据返回指定编码的字符串而不是Buffer对象。
- 'pipe' 事件放到后面详谈。
writableStream
Writable streams 是 destination 的一种抽象,一个writable流指的是只能流进不能流出的流:
readableStream.pipe(writableStream) 复制代码
数据流过来的时候,会直接写入到资源池,当写入速度比较缓慢或者写入暂停时,数据流会进入队列池缓存起来,当生产者写入速度过快,把队列池装满了之后,就会出现「背压」(backpressure),这个时候是需要告诉生产者暂停生产的,当队列释放之后,Writable Stream 会给生产者发送一个 drain 消息,让它恢复生产.
writable.write() 方法向流中写入数据,并在数据处理完成后调用callback。在确认了 chunk 后,如果内部缓冲区的大小小于创建流时设定的 highWaterMark 阈值,函数将返回 true 。 如果返回值为 false (即队列池已经装满),应该停止向流中写入数据,直到 'drain' 事件被触发。
构造一个可写流需要重写_write方法
const Writable = require('stream').writable class MyWritableStream extends Writable{ constructor(options){ super(options) } _write(chunk, encoding, callback){ console.log(chunk) } } 复制代码
一个写入数据10000次的demo,其中可以加深对write方法和drain方法的认识
function writeOneMillionTimes(writer, data, encoding, callback) { let i = 10000; write(); function write() { let ok = true; while(i-- > 0 && ok) { // 写入结束时回调 if(i===0){ writer.write(data, encoding, callback)//当最后一次写入数据即将结束时,再调用callback }else{ ok = writer.write(data, encoding)//写数据还没有结束,不能调用callback } } if (i > 0) { // 这里提前停下了,'drain' 事件触发后才可以继续写入 console.log('drain', i); writer.once('drain', write); } } } const Writable = require('stream').Writable; class MyWritableStream extends Writable{ constructor(options){ super(options) } _write(chunk, encoding, callback){ setTimeout(()=>{ callback(null) },0) } } let writer = new MyWritableStream() writeOneMillionTimes(writer, 'simple', 'utf8', () => { console.log('end'); }); 复制代码
输出是
drain 7268 drain 4536 drain 1804 end复制代码
输出结果说明程序遇到了三次「背压」,如果我们没有在上面绑定 writer.once('drain'),那么最后的结果就是 Stream 将第一次获取的数据消耗完就结束了程序,即只输出drain 7268
pipe
readable.pipe(writable);复制代码
readable 通过 pipe(管道)传输给 writable
Readable.prototype.pipe = function(writable, options) { this.on('data', (chunk) => { let ok = writable.write(chunk); if(!ok) this.pause();// 背压,暂停 }); writable.on('drain', () => { // 恢复 this.resume(); }); // 告诉 writable 有流要导入 writable.emit('pipe', this); // 支持链式调用 return writable; }; 复制代码
核心有5点:
- emit(pipe),通知写入
- write(),新数据过来,写入
- pause(),消费者消费速度慢,暂停写入
- resume(),消费者完成消费,继续写入
- return writable,支持链式调用