什麼是流?如何理解流?以下這篇文章就來帶大家深入了解一下Node中的串流(Stream),希望對大家有幫助!
作者最近在開發中經常使用pipe 函數,只知道這是流的管道,卻不知道他是如何工作的,所以抱著一探究竟的心理乾脆就從流開始學起,隨便將看過的知識和源碼整理成一篇文章分享給大家。
串流(Stream)在 Nodejs 中是個十分基礎的概念,許多基礎模組都是基於流實現的,扮演著十分重要的角色。同時流也是一個十分難以理解的概念,這主要是相關的文檔比較缺少,對於NodeJs 初學者來說,理解流往往需要花很多時間理解,才能真正掌握這個概念,所幸的是,對於大部分NodeJs使用者來說,只是用來開發Web 應用,對流的不充分認識並不影響使用。但是,理解流能夠對 NodeJs 中的其他模組有更好的理解,同時在某些情況下,使用流來處理資料會有更好的效果。 【相關教學推薦:nodejs影片教學】
對於串流的使用者來說,可以將流看作一個數組,我們只需要關注從中獲取(消費)和寫入(生產)就可以了。
對於串流的開發者(使用stream模組建立一個新實例),專注的是如何實現流程中的一些方法,通常專注於兩點,目標資源是誰和如何操作目標資源,確定了後就需要根據流的不同狀態和事件來對目標資源進行操作
緩衝池。緩衝池通常位於電腦的RAM(記憶體)。
舉一個常見的緩衝區的例子,我們在觀看線上影片的時候,如果你的網速很快,緩衝區總是會被立即填充,然後發送給系統播放,然後立即緩衝下一段影片。觀看的過程中,不會有卡頓。如果網路速度很慢,則會看到loading,表示緩衝區正在被填充,當填充完成後資料傳送給系統,才能看到這段影片。 NodeJs 流的快取池是一個 Buffer 鍊錶,每一次想快取池中加入資料都會重新建立一個 Buffer 節點插入到鍊錶尾部。 EventEmitterNodeJs 中對 Stream 是一個實作了 EventEmitter 的抽象接口,所以我會先簡單的介紹一下 EventEmitter。 EventEmitter 是一個實現事件發布訂閱功能的類,其中常用的幾個方法(on, once, off, emit)相信大家都耳熟能詳了,就不一一介紹了。const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() // 为 eventA 事件绑定处理函数 eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); // 为 eventB 事件绑定处理函数 eventEmitter.on('eventB', () => { console.log('eventB active 1'); }); eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // 触发 eventA eventEmitter.emit('eventA') // eventA active 1 // eventA active 2
EventEmitter 有兩個叫做
newListener 和
removeListener 的事件,當你在一個事件物件中加入任何事件監聽函數後,都會觸發
newListener(eventEmitter.emit('newListener')),當一個處理函數被移除時同理會觸發
removeListener。
removeListener 將在其執行前被觸發,這意味著
once 綁定的監聽函數是先被移除才被觸發的。
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() eventEmitter.on('newListener', (event, listener)=>{ console.log('newListener', event, listener) }) eventEmitter.on('removeListener', (event, listener) => { console.log('removeListener', event, listener) }) //newListener removeListener[Function(anonymous)] eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); //newListener eventA [Function (anonymous)] function listenerB() { console.log('eventB active 1'); } eventEmitter.on('eventB', listenerB); // newListener eventB [Function (anonymous)] eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // newListener eventA [Function (anonymous)] eventEmitter.emit('eventA') // eventA active 1 // removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] } // eventA active 2 eventEmitter.off('eventB', listenerB) // removeListener eventB[Function: listenerB]
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>; } interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this; } interface ReadWriteStream extends ReadableStream, WritableStream { }
磁碟寫入資料的速度是遠低於記憶體的,我們想像記憶體和磁碟之間有一個“管道”,“管道”中是“流”,內存的數據流入管道是非常快的,當管道塞滿時,內存中就會產生數據背壓,數據積壓在內存中,佔用資源。
NodeJs Stream 的解決方法是為每個流的快取池
(就是圖中寫入佇列)設定一個浮標值,當其中數據量達到這個浮標值後,往快取池再次push
資料時就會回傳false,表示目前流中快取池內容已經達到浮標值,不希望再有資料寫入了,這時我們應該立即停止資料的生產,防止快取池過大產生背壓。
可讀流(Readable)是流的一種類型,他有兩種模式三種狀態
#兩種讀取模式:
流動模式:資料會從底層系統讀取寫入到緩衝區,當緩衝區被寫滿後自動透過EventEmitter 盡快的將資料傳遞給所註冊的事件處理程序中
暫停模式:在這種模式下將不會主動觸發EventEmitter 傳輸數據,必須顯示的呼叫Readable.read()
方法來從緩衝區讀取數據,read 會觸發回應到EventEmitter 事件。
三種狀態:
readableFlowing === null(初始狀態)
readableFlowing === false(暫停模式)
readableFlowing === true(流動模式)
初始時流的readable. readableFlowing
為null
新增data事件後變成true 。呼叫pause()
、unpipe()
、或接收到背壓或新增readable
事件,則readableFlowing
會被設為false ,在這個狀態下,為data 事件綁定監聽器不會使readableFlowing 切換到 true。
呼叫resume()
可以讓可讀流的readableFlowing
切換到 true
移除所有的readable 事件是讓 readableFlowing變成null 的唯一方法。
事件名稱 | 說明 |
---|---|
readable | 當緩衝區有新的可讀取資料時觸發(每一個想快取池插入節點都會觸發) |
#data | 每一次消費資料後都會觸發) |
#data | 每次消費資料後都會觸發,參數是本次消費的資料 |
close | #流關閉時觸發 |
說明 |
const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// 缓存池浮标值 }) readStreams.on('readable', () => { console.log('缓冲区满了') readStreams.read()// 消费缓存池的所有数据,返回结果并且触发data事件 }) readStreams.on('data', (data) => { console.log('data') })
https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
当 size 为 0 会触发 readable 事件。
当缓存池中的数据长度达到浮标值 highWaterMark
后,就不会在主动请求生产数据,而是等待数据被消费后在生产数据
暂停状态的流如果不调用 read
来消费数据时,后续也不会在触发 data
和 readable
,当调用 read
消费时会先判断本次消费后剩余的数据长度是否低于 浮标值
,如果低于 浮标值
就会在消费前请求生产数据。这样在 read
后的逻辑执行完成后新的数据大概率也已经生产完成,然后再次触发 readable
,这种提前生产下一次消费的数据存放在缓存池的机制也是缓存流为什么快的原因
流动状态下的流有两种情况
他们的区别仅仅在于数据生产后缓存池是否还存在数据,如果存在数据则将生产的数据 push 到缓存池等待消费,如果不存在则直接将数据交给 data 而不加入缓存池。
值得注意的是当一个缓存池中存在数据的流从暂停模式进入的流动模式时,会先循环调用 read 来消费数据只到返回 null
暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用 _read
方法,把数据从数据源 push
到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 " readable
" 事件,告诉消费者有数据已经准备好了,可以继续消费。
一般来说, 'readable'
事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次 'readable'
事件;
消费者 " readable
" 事件的处理函数中,通过 stream.read(size)
主动消费缓冲池中的数据。
const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, // 参数的 read 方法会作为流的 _read 方法,用于获取源数据 read(size) { // 假设我们的源数据上 1000 个1 let chunk = null // 读取数据的过程一般是异步的,例如IO操作 setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) // 每一次成功 push 数据到缓存池后都会触发 readable myReadable.on('readable', () => { const chunk = myReadable.read()//消费当前缓存池中所有数据 console.log(chunk.toString()) })
值得注意的是, 如果 read(size) 的 size 大于浮标值,会重新计算新的浮标值,新浮标值是size的下一个二次幂(size <= 2^n,n取最小值)
// hwm 不会大于 1GB. const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB限制 n = MAX_HWM; } else { //取下一个2最高幂,以防止过度增加hwm n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; } return n; }
所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:
data
" 事件句柄;resume
”方法;pipe
" 方法把数据发送到可写流流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调 _read
方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null)
);
可读流可以通过以下方式切换回暂停模式:
stream.pause()
。stream.unpipe()
可以移除多个管道目标。const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) myReadable.on('data', data => { console.log(data.toString()) })
相对可读流来说,可写流要简单一些。
当生产者调用 write(chunk)
时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用 _write
,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用 write(chunk)
后会返回 false
,这时候生产者应该停止继续写入。
那么什么时候可以继续写入呢?当缓冲中的数据都被成功 _write
之后,清空了缓冲队列后会触发 drain
事件,这时候生产者可以继续写入数据。
当生产者需要结束写入数据时,需要调用 stream.end
方法通知可写流结束。
const { Writable, Duplex } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// 会作为_write方法 setTimeout(()=>{ fileContent += chunk callback()// 写入结束后调用 }, 500) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) myWritable.write('123123')// true myWritable.write('123123')// false myWritable.end()
注意,在缓存池中数据到达浮标值后,此时缓存池中可能存在多个节点,在清空缓存池的过程中(循环调用_read),并不会向可读流一样尽量一次消费长度为浮标值的数据,而是每次消费一个缓冲区节点,即使这个缓冲区长度于浮标值不一致也是如此
const { Writable } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log('消费', chunk.toString()) callback()// 写入结束后调用 }, 100) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) let count = 0 function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end() } } productionData() myWritable.on('drain', productionData)
上述是一个浮标值为 10
的可写流,现在数据源是一个 0——20
到连续的数字字符串,productionData
用于写入数据。
首先第一次调用 myWritable.write("0")
时,因为缓存池不存在数据,所以 "0"
不进入缓存池,而是直接交给 _wirte
,myWritable.write("0")
返回值为 true
当执行 myWritable.write("1")
时,因为 _wirte
的 callback
还未调用,表明上一次数据还未写入完,位置保证数据写入的有序性,只能创建一个缓冲区将 "1"
加入缓存池中。后面 2-9
都是如此
当执行 myWritable.write("10")
时,此时缓冲区长度为 9
(1-9),还未到达浮标值, "10"
继续作为一个缓冲区加入缓存池中,此时缓存池长度变为 11
,所以 myWritable.write("1")
返回 false
,这意味着缓冲区的数据已经足够,我们需要等待 drain
事件通知时再生产数据。
100ms过后,_write("0", encoding, callback)
的 callback
被调用,表明 "0"
已经写入完成。然后会检查缓存池中是否存在数据,如果存在则会先调用 _read
消费缓存池的头节点("1"
),然后继续重复这个过程直到缓存池为空后触发 drain
事件,再次执行 productionData
调用 myWritable.write("11")
,触发第1步开始的过程,直到流结束。
在理解了可读流与可写流后,双工流就好理解了,双工流事实上是继承了可读流然后实现了可写流(源码是这么写的,但是应该说是同时实现了可读流和可写流更加好)。
Duplex 流需要同时实现下面两个方法
实现 _read() 方法,为可读流生产数据
实现 _write() 方法,为可写流消费数据
上面两个方法如何实现在上面可写流可读流的部分已经介绍过了,这里需要注意的是,双工流是存在两个独立的缓存池分别提供给两个流,他们的数据源也不一样
以 NodeJs 的标准输入输出流为例:
// 每当用户在控制台输入数据(_read),就会触发data事件,这是可读流的特性 process.stdin.on('data', data=>{ process.stdin.write(data); }) // 每隔一秒向标准输入流生产数据(这是可写流的特性,会直接输出到控制台上),不会触发data setInterval(()=>{ process.stdin.write('不是用户控制台输入的数据') }, 1000)
可以将 Duplex 流视为具有可写流的可读流。两者都是独立的,每个都有独立的内部缓冲区。读写事件独立发生。
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|登入後複製
Transform 流是双工的,其中读写以因果关系进行。双工流的端点通过某种转换链接。读取要求发生写入。
Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|--------------登入後複製
对于创建 Transform 流,最重要的是要实现 _transform
方法而不是 _write
或者 _read
。 _transform
中对可写流写入的数据做处理(消费)然后为可读流生产数据。
转换流还经常会实现一个 `_flush` 方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的
const { write } = require('fs') const { Transform, PassThrough } = require('stream') const reurce = '1312123213124341234213423428354816273513461891468186499126412' const transform = new Transform({ highWaterMark: 10, transform(chunk ,encoding, callback){// 转换数据,调用push将转换结果加入缓存池 this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){// end触发前执行 this.push('<<<') callback() } }) // write 不断写入数据 let count = 0 transform.write('>>>') function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() } } productionData() transform.on('drain', productionData) let result = '' transform.on('data', data=>{ result += data.toString() }) transform.on('end', ()=>{ console.log(result) // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<< })
管道是将上一个程序的输出作为下一个程序的输入,这是管道在 Linux 中管道的作用。NodeJs 中的管道其实也类似,它管道用于连接两个流,上游的流的输出会作为下游的流的输入。
管道 sourec.pipe(dest, options) 要求 sourec 是可读的,dest是可写的。其返回值是 dest。
对于处于管道中间的流既是下一个流的上游也是上一个流的下游,所以其需要时一个可读可写的双工流,一般我们会使用转换流来作为管道中间的流。
https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33
Stream.prototype.pipe = function(dest, options) { const source = this; function ondata(chunk) { if (dest.writable && dest.write(chunk) === false && source.pause) { source.pause(); } } source.on('data', ondata); function ondrain() { if (source.readable && source.resume) { source.resume(); } } dest.on('drain', ondrain); // ...后面的代码省略 }
pipe 的实现非常清晰,当上游的流发出 data 事件时会调用下游流的 write 方法写入数据,然后立即调用 source.pause() 使得上游变为暂停状态,这主要是为了防止背压。
当下游的流将数据消费完成后会调用 source.resume() 使上游再次变为流动状态。
我们实现一个将 data 文件中所有 1
替换为 @
然后输出到 result 文件到管道。
const { Transform } = require('stream') const { createReadStream, createWriteStream } = require('fs') // 一个位于管道中的转换流 function createTransformStream(){ return new Transform({ transform(chunk, encoding, callback){ this.push(chunk.toString().replace(/1/g, '@')) callback() } }) } createReadStream('./data') .pipe(createTransformStream()) .pipe(createWriteStream('./result'))
在管道中只存在两个流时,其功能和转换流有点类似,都是将一个可读流与一个可写流串联起来,但是管道可以串联多个流。
原文地址:https://juejin.cn/post/7077511716564631566
作者:月夕
更多node相关知识,请访问:nodejs 教程!
以上是什麼是流(Stream)?如何理解Nodejs中的流的詳細內容。更多資訊請關注PHP中文網其他相關文章!