ストリームとは何ですか?流れをどう理解するか? Node のストリーム (Stream) については、次の記事で詳しく説明していますので、お役に立てれば幸いです。
筆者は最近開発でパイプ関数をよく使っているのですが、ストリームパイプということだけは知っていて、仕組みがよく分からないので、まずは流れから学び始め、読んだ知識やソースコードを記事にまとめてみんなで共有しましょう。
ストリームは Nodejs の非常に基本的な概念であり、多くの基本モジュールがストリームに基づいて実装され、非常に重要な役割を果たします。同時に、フローは理解するのが非常に難しい概念でもあります。これは主に、関連ドキュメントが不足していることが原因です。NodeJ の初心者にとって、この概念を真に習得する前に、フローを理解するのに多くの時間がかかります。幸いなことに、ほとんどの NodeJ では、ユーザーにとっては Web アプリケーションの開発にのみ使用され、ストリームの理解が不十分でも使用には影響しません。ただし、ストリームを理解すると、NodeJ の他のモジュールをより深く理解できるようになり、場合によっては、ストリームを使用してデータを処理した方が良い結果が得られることがあります。 [関連チュートリアルの推奨事項: nodejs ビデオ チュートリアル ]
ストリームを理解する方法
ストリームのユーザーは、次のことができます。ストリームを配列として考えると、ストリームからの取得 (消費) と書き込み (生成) だけに集中する必要があります。
ストリーム開発者 (ストリーム モジュールを使用して新しいインスタンスを作成する) は、ストリームにいくつかのメソッドを実装する方法に焦点を当てます。通常は、ターゲットが誰であるかという 2 つの点に焦点を当てます。リソースとその操作方法。ターゲット リソースが決定したら、ストリームのさまざまな状態やイベントに従ってターゲット リソース上で操作する必要があります。
キャッシュ プール
NodeJs のすべて すべてのストリームにはバッファー プールがあります。バッファー プールの目的は、ストリームの効率を高めることです。データの生成と消費に時間がかかる場合は、事前にデータを生成し、次の消費の前にバッファプールに保存します。ただし、バッファ プールは常に使用されているわけではなく、たとえば、キャッシュ プールが空の場合、データは本番後にキャッシュ プールに置かれず、直接消費されます。
。
データ生成の速度がデータ消費の速度よりも速い場合、余ったデータはどこかで待機します。データ生成速度がプロセスデータ消費速度よりも遅い場合、データはどこかに一定量蓄積されてから消費されます。 (開発者はデータの生成と消費の速度を制御できません。適切なタイミングでデータの生成または消費を試みることしかできません。)
データが待機し、データが蓄積され、その後発生する場所。 バッファプール です。バッファ プールは通常、コンピュータの RAM (メモリ) にあります。
一般的なバッファの例を挙げると、オンライン ビデオを視聴する場合、インターネット速度が非常に速い場合、バッファは常にすぐにいっぱいになり、再生のためにシステムに送信され、すぐにバッファされます。 。視聴中に遅延が発生することはありません。ネットワーク速度が非常に遅い場合は、バッファがいっぱいになっていることを示す読み込み中が表示されます。いっぱいになると、データがシステムに送信され、このビデオが表示されます。
NodeJs ストリームのキャッシュ プールはバッファ リンク リストであり、キャッシュ プールにデータを追加するたびにバッファ ノードが再作成され、リンク リストの最後に挿入されます。
EventEmitter
NodeJs の Stream は EventEmitter を実装した抽象インターフェイスなので、最初に EventEmitter について簡単に紹介します。
EventEmitter はイベントの発行とサブスクリプションの機能を実装するクラスで、一般的に使用されるいくつかのメソッド (on、once、off、emit) は誰もがよく知っていると思われるので、個別に紹介することはしません。
const { EventEmitter } = require('events')
const eventEmitter = new EventEmitter()
// 为 eventA 事件绑定处理函数
eventEmitter.on('eventA', () => {
console.log('eventA active 1');
});
// 为 eventB 事件绑定处理函数
eventEmitter.on('eventB', () => {
console.log('eventB active 1');
});
eventEmitter.once('eventA', () => {
console.log('eventA active 2');
});
// 触发 eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2 ログイン後にコピー
EventEmitter
には、newListener
と removeListener
という 2 つのイベントがあることに注意してください。イベントリスニング関数がトリガーされると、newListener
(eventEmitter.emit('newListener')) がトリガーされ、ハンドラー関数が削除されると、同様に removeListener
がトリガーされます。
また、一度バインドされた処理関数は 1 回だけ実行され、removeListener
はその実行前にトリガーされることにも注意してください。これは、once
バインディングを意味します。トリガーされる前に、まず削除されます。
const { EventEmitter } = require('events')
const eventEmitter = new EventEmitter()
eventEmitter.on('newListener', (event, listener)=>{
console.log('newListener', event, listener)
})
eventEmitter.on('removeListener', (event, listener) => {
console.log('removeListener', event, listener)
})
//newListener removeListener[Function(anonymous)]
eventEmitter.on('eventA', () => {
console.log('eventA active 1');
});
//newListener eventA [Function (anonymous)]
function listenerB() { console.log('eventB active 1'); }
eventEmitter.on('eventB', listenerB);
// newListener eventB [Function (anonymous)]
eventEmitter.once('eventA', () => {
console.log('eventA active 2');
});
// newListener eventA [Function (anonymous)]
eventEmitter.emit('eventA')
// eventA active 1
// removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] }
// eventA active 2
eventEmitter.off('eventB', listenerB)
// removeListener eventB[Function: listenerB] ログイン後にコピー
しかし、これは後続のコンテンツにとっては重要ではありません。
Stream Stream は、Node.js でストリーミング データを処理するための抽象インターフェイスです。ストリームは実際のインターフェイスではなく、すべてのストリームの総称です。実際のインターフェイスは、ReadableStream、WritableStream、および ReadWriteStream です。
interface ReadableStream extends EventEmitter {
readable: boolean;
read(size?: number): string | Buffer;
setEncoding(encoding: BufferEncoding): this;
pause(): this;
resume(): this;
isPaused(): boolean;
pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;
unpipe(destination?: WritableStream): this;
unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;
wrap(oldStream: ReadableStream): this;
[Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}
interface WritableStream extends EventEmitter {
writable: boolean;
write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;
write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;
end(cb?: () => void): this;
end(data: string | Uint8Array, cb?: () => void): this;
end(str: string, encoding?: BufferEncoding, cb?: () => void): this;
}
interface ReadWriteStream extends ReadableStream, WritableStream { } ログイン後にコピー
ReadableStream と WritableStream はどちらも EventEmitter クラスを継承するインターフェイスであることがわかります (ts のインターフェイスは型をマージするだけであるため、クラスを継承できます)。
上記のインターフェイスに対応する実装クラスは、Readable、Writable、Duplex です。
NodeJ には 4 種類のストリームがあります。
Readable 読み取り可能ストリーム (ReadableStream を実装) Writable 書き込み可能ストリーム (WritableStream を実装) 二重読み取り可能および書き込み可能ストリーム (Readable を継承した後に WritableStream を実装) 変換変換ストリーム (Duplex を継承) バックプレッシャー問題 ディスクのデータ書き込み速度はメモリの速度よりもはるかに遅いです。メモリとディスクの間にはギャップがあると考えられます。 「パイプライン」、「パイプライン」とは「流れ」を意味します。メモリ内のデータは非常に速くパイプに流れ込みます。パイプがいっぱいになると、メモリ内にデータ バック プレッシャーが発生し、データがメモリ内にバックログされて占有されます。リソース。
NodeJs ストリームの解決策は、各ストリームの バッファ プール
(つまり、図の書き込みキュー) に float 値を設定することです。その中のデータ量がこの float 値に達した後、push
データがキャッシュ プールに再度プッシュされると false が返され、現在のストリームのキャッシュ プールの内容が float 値に達したことを示します。 、これ以上データが書き込まれる予定はありません。現時点では、大きすぎるキャッシュ プールによって引き起こされるバック プレッシャーを防ぐために、データの生成をただちに停止する必要があります。
Readable 読み取り可能なストリーム (Readable) はストリームの一種で、2 つのモードと 3 つの状態があります。
2 つの読み取りモード:
フロー モード: データは基盤となるシステムからバッファーに読み書きされます。バッファーがいっぱいになると、データは EventEmitter を通じてできるだけ早く登録されたイベント ハンドラーに自動的に渡されます。
一時停止モード: このモードでは、EventEmitter はデータ送信のためにアクティブにトリガーされず、バッファからデータを読み取るために Readable.read()
メソッドを明示的に呼び出す必要があります。 EventEmitter イベントへの応答をトリガーします。
3 つの状態:
readableFlowing === null (初期状態)
readableFlowing = == false (一時停止モード)
readableFlowing === true (フロー モード)
初期フロー readable.readableFlowing
null
のデータ イベントを追加すると true になります。 pause()
、unpipe()
が呼び出されるか、バック プレッシャーが受信されるか、readable
イベントが追加されると、readableFlowing
が実行されます。 false に設定します。この状態では、リスナーをデータ イベントにバインドしても、readableFlowing が true に切り替わりません。
resume()
を呼び出して、読み取り可能なストリームの readableFlowing
を true に切り替えます
すべての読み取り可能なイベントを削除して、readableFlowing を有効にしますnullになる方法。
#イベント名 説明 ##読めるバッファーに新しい読み取り可能なデータがあるときにトリガーされます (ノードがキャッシュ プールに挿入されるたびにトリガーされます) dataその後毎回トリガーされますデータの消費 トリガーされる、パラメータは今回消費されるデータです closeストリームが閉じられたときにトリガーされます error ストリームでエラーが発生したときにトリガー
メソッド名説明 read(size)size の長さのデータを消費します。現在のデータが size 未満であることを示すには null を返します。それ以外の場合は、データを返します。今回消費した。サイズが渡されない場合、キャッシュ プール内のすべてのデータが消費されることを意味しますconst fs = require('fs');
const readStreams = fs.createReadStream('./EventEmitter.js', {
highWaterMark: 100// 缓存池浮标值
})
readStreams.on('readable', () => {
console.log('缓冲区满了')
readStreams.read()// 消费缓存池的所有数据,返回结果并且触发data事件
})
readStreams.on('data', (data) => {
console.log('data')
}) ログイン後にコピー
https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
当 size 为 0 会触发 readable 事件。
当缓存池中的数据长度达到浮标值 highWaterMark
后,就不会在主动请求生产数据,而是等待数据被消费后在生产数据
暂停状态的流如果不调用 read
来消费数据时,后续也不会在触发 data
和 readable
,当调用 read
消费时会先判断本次消费后剩余的数据长度是否低于 浮标值
,如果低于 浮标值
就会在消费前请求生产数据。这样在 read
后的逻辑执行完成后新的数据大概率也已经生产完成,然后再次触发 readable
,这种提前生产下一次消费的数据存放在缓存池的机制也是缓存流为什么快的原因
流动状态下的流有两种情况
生产速度慢于消费速度时:这种情况下每一个生产数据后一般缓存池中都不会有剩余数据,直接将本次生产的数据传递给 data 事件即可(因为没有进入缓存池,所以也不用调用 read 来消费),然后立即开始生产新数据,待上一次数据消费完后新数据才生产好,再次触发 data ,一只到流结束。 生产速度快于消费速度时:此时每一次生产完数据后一般缓存池都还存在未消费的数据,这种情况一般会在消费数据时开始生产下一次消费的数据,待旧数据消费完后新数据已经生产完并且放入缓存池 他们的区别仅仅在于数据生产后缓存池是否还存在数据,如果存在数据则将生产的数据 push 到缓存池等待消费,如果不存在则直接将数据交给 data 而不加入缓存池。
值得注意的是当一个缓存池中存在数据的流从暂停模式进入的流动模式时,会先循环调用 read 来消费数据只到返回 null
暂停模式
暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用 _read
方法,把数据从数据源 push
到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 " readable
" 事件,告诉消费者有数据已经准备好了,可以继续消费。
一般来说, 'readable'
事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次 'readable'
事件;
消费者 " readable
" 事件的处理函数中,通过 stream.read(size)
主动消费缓冲池中的数据。
const { Readable } = require('stream')
let count = 1000
const myReadable = new Readable({
highWaterMark: 300,
// 参数的 read 方法会作为流的 _read 方法,用于获取源数据
read(size) {
// 假设我们的源数据上 1000 个1
let chunk = null
// 读取数据的过程一般是异步的,例如IO操作
setTimeout(() => {
if (count > 0) {
let chunkLength = Math.min(count, size)
chunk = '1'.repeat(chunkLength)
count -= chunkLength
}
this.push(chunk)
}, 500)
}
})
// 每一次成功 push 数据到缓存池后都会触发 readable
myReadable.on('readable', () => {
const chunk = myReadable.read()//消费当前缓存池中所有数据
console.log(chunk.toString())
}) ログイン後にコピー
值得注意的是, 如果 read(size) 的 size 大于浮标值,会重新计算新的浮标值,新浮标值是size的下一个二次幂(size <= 2^n,n取最小值)
// hwm 不会大于 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
// 1GB限制
n = MAX_HWM;
} else {
//取下一个2最高幂,以防止过度增加hwm
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
}
return n;
} ログイン後にコピー
流动模式
所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:
添加 " data
" 事件句柄; 调用 “ resume
”方法; 使用 " pipe
" 方法把数据发送到可写流 流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调 _read
方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null)
);
可读流可以通过以下方式切换回暂停模式:
如果没有管道目标,则调用 stream.pause()
。 如果有管道目标,则移除所有管道目标。调用 stream.unpipe()
可以移除多个管道目标。 const { Readable } = require('stream')
let count = 1000
const myReadable = new Readable({
highWaterMark: 300,
read(size) {
let chunk = null
setTimeout(() => {
if (count > 0) {
let chunkLength = Math.min(count, size)
chunk = '1'.repeat(chunkLength)
count -= chunkLength
}
this.push(chunk)
}, 500)
}
})
myReadable.on('data', data => {
console.log(data.toString())
}) ログイン後にコピー
Writable 相对可读流来说,可写流要简单一些。
当生产者调用 write(chunk)
时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用 _write
,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用 write(chunk)
后会返回 false
,这时候生产者应该停止继续写入。
那么什么时候可以继续写入呢?当缓冲中的数据都被成功 _write
之后,清空了缓冲队列后会触发 drain
事件,这时候生产者可以继续写入数据。
当生产者需要结束写入数据时,需要调用 stream.end
方法通知可写流结束。
const { Writable, Duplex } = require('stream')
let fileContent = ''
const myWritable = new Writable({
highWaterMark: 10,
write(chunk, encoding, callback) {// 会作为_write方法
setTimeout(()=>{
fileContent += chunk
callback()// 写入结束后调用
}, 500)
}
})
myWritable.on('close', ()=>{
console.log('close', fileContent)
})
myWritable.write('123123')// true
myWritable.write('123123')// false
myWritable.end() ログイン後にコピー
注意,在缓存池中数据到达浮标值后,此时缓存池中可能存在多个节点,在清空缓存池的过程中(循环调用_read),并不会向可读流一样尽量一次消费长度为浮标值的数据,而是每次消费一个缓冲区节点,即使这个缓冲区长度于浮标值不一致也是如此
const { Writable } = require('stream')
let fileContent = ''
const myWritable = new Writable({
highWaterMark: 10,
write(chunk, encoding, callback) {
setTimeout(()=>{
fileContent += chunk
console.log('消费', chunk.toString())
callback()// 写入结束后调用
}, 100)
}
})
myWritable.on('close', ()=>{
console.log('close', fileContent)
})
let count = 0
function productionData(){
let flag = true
while (count <= 20 && flag){
flag = myWritable.write(count.toString())
count++
}
if(count > 20){
myWritable.end()
}
}
productionData()
myWritable.on('drain', productionData) ログイン後にコピー
上述是一个浮标值为 10
的可写流,现在数据源是一个 0——20
到连续的数字字符串,productionData
用于写入数据。
首先第一次调用 myWritable.write("0")
时,因为缓存池不存在数据,所以 "0"
不进入缓存池,而是直接交给 _wirte
,myWritable.write("0")
返回值为 true
当执行 myWritable.write("1")
时,因为 _wirte
的 callback
还未调用,表明上一次数据还未写入完,位置保证数据写入的有序性,只能创建一个缓冲区将 "1"
加入缓存池中。后面 2-9
都是如此
当执行 myWritable.write("10")
时,此时缓冲区长度为 9
(1-9),还未到达浮标值, "10"
继续作为一个缓冲区加入缓存池中,此时缓存池长度变为 11
,所以 myWritable.write("1")
返回 false
,这意味着缓冲区的数据已经足够,我们需要等待 drain
事件通知时再生产数据。
100ms过后,_write("0", encoding, callback)
的 callback
被调用,表明 "0"
已经写入完成。然后会检查缓存池中是否存在数据,如果存在则会先调用 _read
消费缓存池的头节点("1"
),然后继续重复这个过程直到缓存池为空后触发 drain
事件,再次执行 productionData
调用 myWritable.write("11")
,触发第1步开始的过程,直到流结束。
Duplex 在理解了可读流与可写流后,双工流就好理解了,双工流事实上是继承了可读流然后实现了可写流(源码是这么写的,但是应该说是同时实现了可读流和可写流更加好)。
Duplex 流需要同时实现下面两个方法
实现 _read() 方法,为可读流生产数据
实现 _write() 方法,为可写流消费数据
上面两个方法如何实现在上面可写流可读流的部分已经介绍过了,这里需要注意的是,双工流是存在两个独立的缓存池分别提供给两个流,他们的数据源也不一样
以 NodeJs 的标准输入输出流为例:
当我们在控制台输入数据时会触发其 data 事件,这证明他有可读流的功能,每一次用户键入回车相当于调用可读的 push 方法推送生产的数据。 当我们调用其 write 方法时也可以向控制台输出内容,但是不会触发 data 事件,这说明他有可写流的功能,而且有独立的缓冲区,_write 方法的实现内容就是让控制台展示文字。 // 每当用户在控制台输入数据(_read),就会触发data事件,这是可读流的特性
process.stdin.on('data', data=>{
process.stdin.write(data);
})
// 每隔一秒向标准输入流生产数据(这是可写流的特性,会直接输出到控制台上),不会触发data
setInterval(()=>{
process.stdin.write('不是用户控制台输入的数据')
}, 1000) ログイン後にコピー
Transform
可以将 Duplex 流视为具有可写流的可读流。两者都是独立的,每个都有独立的内部缓冲区。读写事件独立发生。
Duplex Stream
------------------|
Read <----- External Source
You ------------------|
Write -----> External Sink
------------------| ログイン後にコピー
Transform 流是双工的,其中读写以因果关系进行。双工流的端点通过某种转换链接。读取要求发生写入。
Transform Stream
--------------|--------------
You Write ----> ----> Read You
--------------|-------------- ログイン後にコピー
对于创建 Transform 流,最重要的是要实现 _transform
方法而不是 _write
或者 _read
。 _transform
中对可写流写入的数据做处理(消费)然后为可读流生产数据。
转换流还经常会实现一个 `_flush` 方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的 ログイン後にコピー
const { write } = require('fs')
const { Transform, PassThrough } = require('stream')
const reurce = '1312123213124341234213423428354816273513461891468186499126412'
const transform = new Transform({
highWaterMark: 10,
transform(chunk ,encoding, callback){// 转换数据,调用push将转换结果加入缓存池
this.push(chunk.toString().replace('1', '@'))
callback()
},
flush(callback){// end触发前执行
this.push('<<<')
callback()
}
})
// write 不断写入数据
let count = 0
transform.write('>>>')
function productionData() {
let flag = true
while (count <= 20 && flag) {
flag = transform.write(count.toString())
count++
}
if (count > 20) {
transform.end()
}
}
productionData()
transform.on('drain', productionData)
let result = ''
transform.on('data', data=>{
result += data.toString()
})
transform.on('end', ()=>{
console.log(result)
// >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<<
}) ログイン後にコピー
Pipe 管道是将上一个程序的输出作为下一个程序的输入,这是管道在 Linux 中管道的作用。NodeJs 中的管道其实也类似,它管道用于连接两个流,上游的流的输出会作为下游的流的输入。
管道 sourec.pipe(dest, options) 要求 sourec 是可读的,dest是可写的。其返回值是 dest。
对于处于管道中间的流既是下一个流的上游也是上一个流的下游,所以其需要时一个可读可写的双工流,一般我们会使用转换流来作为管道中间的流。
https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33
Stream.prototype.pipe = function(dest, options) {
const source = this;
function ondata(chunk) {
if (dest.writable && dest.write(chunk) === false && source.pause) {
source.pause();
}
}
source.on('data', ondata);
function ondrain() {
if (source.readable && source.resume) {
source.resume();
}
}
dest.on('drain', ondrain);
// ...后面的代码省略
} ログイン後にコピー
pipe 的实现非常清晰,当上游的流发出 data 事件时会调用下游流的 write 方法写入数据,然后立即调用 source.pause() 使得上游变为暂停状态,这主要是为了防止背压。
当下游的流将数据消费完成后会调用 source.resume() 使上游再次变为流动状态。
我们实现一个将 data 文件中所有 1
替换为 @
然后输出到 result 文件到管道。
const { Transform } = require('stream')
const { createReadStream, createWriteStream } = require('fs')
// 一个位于管道中的转换流
function createTransformStream(){
return new Transform({
transform(chunk, encoding, callback){
this.push(chunk.toString().replace(/1/g, '@'))
callback()
}
})
}
createReadStream('./data')
.pipe(createTransformStream())
.pipe(createWriteStream('./result')) ログイン後にコピー
在管道中只存在两个流时,其功能和转换流有点类似,都是将一个可读流与一个可写流串联起来,但是管道可以串联多个流。
原文地址:https://juejin.cn/post/7077511716564631566
作者:月夕
更多node相关知识,请访问:nodejs 教程 !
以上がストリームとは何ですか? Nodejs のストリームを理解する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
2023-04-26 17:59:18
2023-04-26 17:47:48
2023-04-26 17:41:42
2023-04-26 17:37:05
2023-04-26 17:31:25
2023-04-26 17:27:32
2023-04-25 19:57:58
2023-04-25 19:53:11
2023-04-25 19:49:11
2023-04-25 19:41:54