Apakah aliran? Bagaimana untuk memahami aliran? Artikel berikut akan memberi anda pemahaman yang mendalam tentang strim dalam Node saya harap ia akan membantu anda!
Penulis sering menggunakan fungsi paip dalam pembangunan baru-baru ini Saya hanya tahu bahawa ia adalah saluran paip, tetapi saya tidak tahu bagaimana ia berfungsi, jadi saya mahu untuk mengetahuinya. Hanya mula belajar dari aliran, dan kumpulkan pengetahuan dan kod sumber yang telah anda baca ke dalam artikel untuk dikongsi dengan semua orang.
Strim ialah konsep yang sangat asas dalam Nodejs Banyak modul asas dilaksanakan berdasarkan aliran dan memainkan peranan yang sangat penting. Pada masa yang sama, aliran juga merupakan konsep yang sangat sukar untuk difahami Ini terutamanya disebabkan oleh kekurangan dokumentasi yang relevan Bagi pemula NodeJ, ia sering mengambil banyak masa untuk memahami aliran sebelum mereka benar-benar dapat menguasai konsep ini. bagi kebanyakan NodeJ, ia Bagi pengguna, ia hanya digunakan untuk membangunkan aplikasi Web Pemahaman yang tidak mencukupi tentang aliran tidak menjejaskan penggunaannya. Walau bagaimanapun, pemahaman aliran boleh membawa kepada pemahaman yang lebih baik tentang modul lain dalam NodeJs, dan dalam beberapa kes, menggunakan strim untuk memproses data akan memberikan hasil yang lebih baik. [Cadangan tutorial berkaitan: tutorial video nodejs]
Untuk pengguna strim, Strim boleh dianggap sebagai tatasusunan, dan kita hanya perlu menumpukan pada mendapatkan (memakan) dan menulis (menghasilkan) daripadanya.
Untuk pembangun strim (menggunakan modul strim untuk mencipta tika baharu), mereka menumpukan pada cara melaksanakan beberapa kaedah dalam strim Mereka biasanya menumpukan pada dua mata, siapa yang menjadi sasaran sumber dan cara mengendalikannya Selepas sumber sasaran ditentukan, ia perlu dikendalikan pada sumber sasaran mengikut keadaan dan peristiwa strim yang berbeza
Semua dalam NodeJs Semua strim mempunyai kumpulan penimbal Tujuan kumpulan penimbal adalah untuk meningkatkan kecekapan strim Apabila pengeluaran dan penggunaan data mengambil masa, kami boleh menghasilkan data terlebih dahulu dan menyimpan ia dalam kolam penampan sebelum penggunaan seterusnya. Walau bagaimanapun, kumpulan penimbal tidak selalu digunakan Contohnya, apabila kumpulan cache kosong, data tidak akan dimasukkan ke dalam kumpulan cache selepas pengeluaran tetapi akan digunakan secara langsung. .
Jika kelajuan pengeluaran data lebih besar daripada kelajuan penggunaan data, lebihan data akan menunggu di suatu tempat. Jika kelajuan pengeluaran data lebih perlahan daripada kelajuan penggunaan data proses, maka data akan terkumpul kepada jumlah tertentu di suatu tempat dan kemudian digunakan. (Pembangun tidak boleh mengawal kelajuan pengeluaran dan penggunaan data, mereka hanya boleh mencuba untuk menghasilkan data atau menggunakan data pada masa apa)
Tempat di mana data menunggu, mengumpul data, dan kemudian berlaku. Ia ialah kolam penimbal. Kolam penimbal biasanya terletak di dalam RAM (memori) komputer.
Berikan contoh penimbal biasa Apabila kami menonton video dalam talian, jika kelajuan Internet anda sangat pantas, penimbal akan sentiasa diisi serta-merta, kemudian dihantar ke sistem untuk main semula, dan kemudian ditimbalkan dengan serta-merta. Tidak akan ada lag semasa tontonan. Jika kelajuan rangkaian sangat perlahan, anda akan melihat pemuatan, menunjukkan bahawa penimbal sedang diisi Apabila pengisian selesai, data dihantar ke sistem dan anda boleh melihat video ini.
Kumpulan cache strim NodeJs ialah senarai terpaut Penimbal Setiap kali anda mahu menambah data pada kumpulan cache, nod Penimbal akan dibuat semula dan dimasukkan ke penghujung senarai terpaut.
Strim dalam NodeJs ialah antara muka abstrak yang melaksanakan EventEmitter, jadi saya akan memperkenalkan EventEmitter secara ringkas terlebih dahulu.
EventEmitter ialah kelas yang melaksanakan fungsi penerbitan dan langganan acara Beberapa kaedah yang biasa digunakan (hidup, sekali, matikan, emit) dipercayai biasa kepada semua orang, jadi saya tidak akan memperkenalkannya satu demi satu.
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() // 为 eventA 事件绑定处理函数 eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); // 为 eventB 事件绑定处理函数 eventEmitter.on('eventB', () => { console.log('eventB active 1'); }); eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // 触发 eventA eventEmitter.emit('eventA') // eventA active 1 // eventA active 2
Perlu diperhatikan bahawa EventEmitter
mempunyai dua acara yang dipanggil newListener
dan removeListener
Apabila anda menambah sebarang fungsi mendengar acara pada objek acara, newListener
akan dicetuskan .emit('newListener')), removeListener
juga akan dicetuskan apabila pengendali dialih keluar.
Perlu diingatkan juga bahawa fungsi pemprosesan terikat sekali hanya akan dilaksanakan sekali, removeListener
akan dicetuskan sebelum pelaksanaannya, yang bermaksud fungsi pendengaran terikat once
dialih keluar dahulu Baru sahaja dicetuskan.
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() eventEmitter.on('newListener', (event, listener)=>{ console.log('newListener', event, listener) }) eventEmitter.on('removeListener', (event, listener) => { console.log('removeListener', event, listener) }) //newListener removeListener[Function(anonymous)] eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); //newListener eventA [Function (anonymous)] function listenerB() { console.log('eventB active 1'); } eventEmitter.on('eventB', listenerB); // newListener eventB [Function (anonymous)] eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // newListener eventA [Function (anonymous)] eventEmitter.emit('eventA') // eventA active 1 // removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] } // eventA active 2 eventEmitter.off('eventB', listenerB) // removeListener eventB[Function: listenerB]
Tetapi ini tidak penting untuk apa yang kita akan lihat nanti.
Strim ialah antara muka abstrak untuk memproses data penstriman dalam Node.js. Strim bukan antara muka sebenar, tetapi istilah umum untuk semua strim. Antara muka sebenar ialah ReadableStream, WritableStream dan ReadWriteStream.
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>; } interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this; } interface ReadWriteStream extends ReadableStream, WritableStream { }
Dapat dilihat bahawa ReadableStream dan WritableStream adalah kedua-dua antara muka yang mewarisi kelas EventEmitter (antara muka dalam ts boleh mewarisi kelas, kerana ia hanya jenis penggabungan).
Kelas pelaksanaan yang sepadan dengan antara muka di atas ialah Boleh Dibaca, Boleh Ditulis dan Dupleks masing-masing
Terdapat 4 jenis strim dalam NodeJs:
(iaitu baris gilir tulis dalam gambar), apabila jumlah data mencapai nilai apungan ini Akhirnya, apabila data 缓存池
dikembalikan ke kolam cache sekali lagi, palsu akan dikembalikan, menunjukkan bahawa kandungan kumpulan cache dalam strim semasa telah mencapai nilai apungan dan tiada lagi data dijangka akan ditulis Pada kali ini, kita harus menghentikan pengeluaran data serta-merta untuk mengelakkan kumpulan cache daripada ditulis Tekanan yang berlebihan mewujudkan tekanan belakang. push
.
Readable.read()
readable.readableFlowing
null
Ia menjadi benar selepas menambahkan peristiwa data. Apabila
dipanggil, atau tekanan belakang diterima atau acara pause()
ditambah, unpipe()
akan ditetapkan kepada palsu readable
Dalam keadaan ini, mengikat pendengar kepada peristiwa data tidak akan dayakan readableFlowing Tukar kepada benarreadableFlowing
. Memanggil
kepada benarresume()
readableFlowing
const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// 缓存池浮标值 }) readStreams.on('readable', () => { console.log('缓冲区满了') readStreams.read()// 消费缓存池的所有数据,返回结果并且触发data事件 }) readStreams.on('data', (data) => { console.log('data') })
https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
当 size 为 0 会触发 readable 事件。
当缓存池中的数据长度达到浮标值 highWaterMark
后,就不会在主动请求生产数据,而是等待数据被消费后在生产数据
暂停状态的流如果不调用 read
来消费数据时,后续也不会在触发 data
和 readable
,当调用 read
消费时会先判断本次消费后剩余的数据长度是否低于 浮标值
,如果低于 浮标值
就会在消费前请求生产数据。这样在 read
后的逻辑执行完成后新的数据大概率也已经生产完成,然后再次触发 readable
,这种提前生产下一次消费的数据存放在缓存池的机制也是缓存流为什么快的原因
流动状态下的流有两种情况
他们的区别仅仅在于数据生产后缓存池是否还存在数据,如果存在数据则将生产的数据 push 到缓存池等待消费,如果不存在则直接将数据交给 data 而不加入缓存池。
值得注意的是当一个缓存池中存在数据的流从暂停模式进入的流动模式时,会先循环调用 read 来消费数据只到返回 null
暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用 _read
方法,把数据从数据源 push
到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 " readable
" 事件,告诉消费者有数据已经准备好了,可以继续消费。
一般来说, 'readable'
事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次 'readable'
事件;
消费者 " readable
" 事件的处理函数中,通过 stream.read(size)
主动消费缓冲池中的数据。
const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, // 参数的 read 方法会作为流的 _read 方法,用于获取源数据 read(size) { // 假设我们的源数据上 1000 个1 let chunk = null // 读取数据的过程一般是异步的,例如IO操作 setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) // 每一次成功 push 数据到缓存池后都会触发 readable myReadable.on('readable', () => { const chunk = myReadable.read()//消费当前缓存池中所有数据 console.log(chunk.toString()) })
值得注意的是, 如果 read(size) 的 size 大于浮标值,会重新计算新的浮标值,新浮标值是size的下一个二次幂(size <= 2^n,n取最小值)
// hwm 不会大于 1GB. const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB限制 n = MAX_HWM; } else { //取下一个2最高幂,以防止过度增加hwm n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; } return n; }
所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:
data
" 事件句柄;resume
”方法;pipe
" 方法把数据发送到可写流流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调 _read
方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null)
);
可读流可以通过以下方式切换回暂停模式:
stream.pause()
。stream.unpipe()
可以移除多个管道目标。const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) myReadable.on('data', data => { console.log(data.toString()) })
相对可读流来说,可写流要简单一些。
当生产者调用 write(chunk)
时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用 _write
,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用 write(chunk)
后会返回 false
,这时候生产者应该停止继续写入。
那么什么时候可以继续写入呢?当缓冲中的数据都被成功 _write
之后,清空了缓冲队列后会触发 drain
事件,这时候生产者可以继续写入数据。
当生产者需要结束写入数据时,需要调用 stream.end
方法通知可写流结束。
const { Writable, Duplex } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// 会作为_write方法 setTimeout(()=>{ fileContent += chunk callback()// 写入结束后调用 }, 500) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) myWritable.write('123123')// true myWritable.write('123123')// false myWritable.end()
注意,在缓存池中数据到达浮标值后,此时缓存池中可能存在多个节点,在清空缓存池的过程中(循环调用_read),并不会向可读流一样尽量一次消费长度为浮标值的数据,而是每次消费一个缓冲区节点,即使这个缓冲区长度于浮标值不一致也是如此
const { Writable } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log('消费', chunk.toString()) callback()// 写入结束后调用 }, 100) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) let count = 0 function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end() } } productionData() myWritable.on('drain', productionData)
上述是一个浮标值为 10
的可写流,现在数据源是一个 0——20
到连续的数字字符串,productionData
用于写入数据。
首先第一次调用 myWritable.write("0")
时,因为缓存池不存在数据,所以 "0"
不进入缓存池,而是直接交给 _wirte
,myWritable.write("0")
返回值为 true
当执行 myWritable.write("1")
时,因为 _wirte
的 callback
还未调用,表明上一次数据还未写入完,位置保证数据写入的有序性,只能创建一个缓冲区将 "1"
加入缓存池中。后面 2-9
都是如此
当执行 myWritable.write("10")
时,此时缓冲区长度为 9
(1-9),还未到达浮标值, "10"
继续作为一个缓冲区加入缓存池中,此时缓存池长度变为 11
,所以 myWritable.write("1")
返回 false
,这意味着缓冲区的数据已经足够,我们需要等待 drain
事件通知时再生产数据。
100ms过后,_write("0", encoding, callback)
的 callback
被调用,表明 "0"
已经写入完成。然后会检查缓存池中是否存在数据,如果存在则会先调用 _read
消费缓存池的头节点("1"
),然后继续重复这个过程直到缓存池为空后触发 drain
事件,再次执行 productionData
调用 myWritable.write("11")
,触发第1步开始的过程,直到流结束。
在理解了可读流与可写流后,双工流就好理解了,双工流事实上是继承了可读流然后实现了可写流(源码是这么写的,但是应该说是同时实现了可读流和可写流更加好)。
Duplex 流需要同时实现下面两个方法
实现 _read() 方法,为可读流生产数据
实现 _write() 方法,为可写流消费数据
上面两个方法如何实现在上面可写流可读流的部分已经介绍过了,这里需要注意的是,双工流是存在两个独立的缓存池分别提供给两个流,他们的数据源也不一样
以 NodeJs 的标准输入输出流为例:
// 每当用户在控制台输入数据(_read),就会触发data事件,这是可读流的特性 process.stdin.on('data', data=>{ process.stdin.write(data); }) // 每隔一秒向标准输入流生产数据(这是可写流的特性,会直接输出到控制台上),不会触发data setInterval(()=>{ process.stdin.write('不是用户控制台输入的数据') }, 1000)
可以将 Duplex 流视为具有可写流的可读流。两者都是独立的,每个都有独立的内部缓冲区。读写事件独立发生。
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|Salin selepas log masuk
Transform 流是双工的,其中读写以因果关系进行。双工流的端点通过某种转换链接。读取要求发生写入。
Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|--------------Salin selepas log masuk
对于创建 Transform 流,最重要的是要实现 _transform
方法而不是 _write
或者 _read
。 _transform
中对可写流写入的数据做处理(消费)然后为可读流生产数据。
转换流还经常会实现一个 `_flush` 方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的
const { write } = require('fs') const { Transform, PassThrough } = require('stream') const reurce = '1312123213124341234213423428354816273513461891468186499126412' const transform = new Transform({ highWaterMark: 10, transform(chunk ,encoding, callback){// 转换数据,调用push将转换结果加入缓存池 this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){// end触发前执行 this.push('<<<') callback() } }) // write 不断写入数据 let count = 0 transform.write('>>>') function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() } } productionData() transform.on('drain', productionData) let result = '' transform.on('data', data=>{ result += data.toString() }) transform.on('end', ()=>{ console.log(result) // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<< })
管道是将上一个程序的输出作为下一个程序的输入,这是管道在 Linux 中管道的作用。NodeJs 中的管道其实也类似,它管道用于连接两个流,上游的流的输出会作为下游的流的输入。
管道 sourec.pipe(dest, options) 要求 sourec 是可读的,dest是可写的。其返回值是 dest。
对于处于管道中间的流既是下一个流的上游也是上一个流的下游,所以其需要时一个可读可写的双工流,一般我们会使用转换流来作为管道中间的流。
https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33
Stream.prototype.pipe = function(dest, options) { const source = this; function ondata(chunk) { if (dest.writable && dest.write(chunk) === false && source.pause) { source.pause(); } } source.on('data', ondata); function ondrain() { if (source.readable && source.resume) { source.resume(); } } dest.on('drain', ondrain); // ...后面的代码省略 }
pipe 的实现非常清晰,当上游的流发出 data 事件时会调用下游流的 write 方法写入数据,然后立即调用 source.pause() 使得上游变为暂停状态,这主要是为了防止背压。
当下游的流将数据消费完成后会调用 source.resume() 使上游再次变为流动状态。
我们实现一个将 data 文件中所有 1
替换为 @
然后输出到 result 文件到管道。
const { Transform } = require('stream') const { createReadStream, createWriteStream } = require('fs') // 一个位于管道中的转换流 function createTransformStream(){ return new Transform({ transform(chunk, encoding, callback){ this.push(chunk.toString().replace(/1/g, '@')) callback() } }) } createReadStream('./data') .pipe(createTransformStream()) .pipe(createWriteStream('./result'))
在管道中只存在两个流时,其功能和转换流有点类似,都是将一个可读流与一个可写流串联起来,但是管道可以串联多个流。
原文地址:https://juejin.cn/post/7077511716564631566
作者:月夕
更多node相关知识,请访问:nodejs 教程!
Atas ialah kandungan terperinci Apakah aliran? Bagaimana untuk memahami aliran dalam Nodejs. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!