目次
ストリームを理解する方法
キャッシュ プール
EventEmitter
Stream
バックプレッシャー問題
Readable
暂停模式
流动模式
Writable
Duplex
Transform
Pipe
ホームページ ウェブフロントエンド jsチュートリアル ストリームとは何ですか? Nodejs のストリームを理解する方法

ストリームとは何ですか? Nodejs のストリームを理解する方法

May 06, 2022 pm 07:27 PM
nodejs node.js node

ストリームとは何ですか?流れをどう理解するか? Node のストリーム (Stream) については、次の記事で詳しく説明していますので、お役に立てれば幸いです。

ストリームとは何ですか? Nodejs のストリームを理解する方法

筆者は最近開発でパイプ関数をよく使っているのですが、ストリームパイプということだけは知っていて、仕組みがよく分からないので、まずは流れから学び始め、読んだ知識やソースコードを記事にまとめてみんなで共有しましょう。

ストリームは Nodejs の非常に基本的な概念であり、多くの基本モジュールがストリームに基づいて実装され、非常に重要な役割を果たします。同時に、フローは理解するのが非常に難しい概念でもあります。これは主に、関連ドキュメントが不足していることが原因です。NodeJ の初心者にとって、この概念を真に習得する前に、フローを理解するのに多くの時間がかかります。幸いなことに、ほとんどの NodeJ では、ユーザーにとっては Web アプリケーションの開発にのみ使用され、ストリームの理解が不十分でも使用には影響しません。ただし、ストリームを理解すると、NodeJ の他のモジュールをより深く理解できるようになり、場合によっては、ストリームを使用してデータを処理した方が良い結果が得られることがあります。 [関連チュートリアルの推奨事項: nodejs ビデオ チュートリアル ]

ストリームを理解する方法

  • ストリームのユーザーは、次のことができます。ストリームを配列として考えると、ストリームからの取得 (消費) と書き込み (生成) だけに集中する必要があります。

  • ストリーム開発者 (ストリーム モジュールを使用して新しいインスタンスを作成する) は、ストリームにいくつかのメソッドを実装する方法に焦点を当てます。通常は、ターゲットが誰であるかという 2 つの点に焦点を当てます。リソースとその操作方法。ターゲット リソースが決定したら、ストリームのさまざまな状態やイベントに従ってターゲット リソース上で操作する必要があります。

キャッシュ プール

NodeJs のすべて すべてのストリームにはバッファー プールがあります。バッファー プールの目的は、ストリームの効率を高めることです。データの生成と消費に時間がかかる場合は、事前にデータを生成し、次の消費の前にバッファプールに保存します。ただし、バッファ プールは常に使用されているわけではなく、たとえば、キャッシュ プールが空の場合、データは本番後にキャッシュ プールに置かれず、直接消費されます。 。

データ生成の速度がデータ消費の速度よりも速い場合、余ったデータはどこかで待機します。データ生成速度がプロセスデータ消費速度よりも遅い場合、データはどこかに一定量蓄積されてから消費されます。 (開発者はデータの生成と消費の速度を制御できません。適切なタイミングでデータの生成または消費を試みることしかできません。)

データが待機し、データが蓄積され、その後発生する場所。 バッファプールです。バッファ プールは通常、コンピュータの RAM (メモリ) にあります。

一般的なバッファの例を挙げると、オンライン ビデオを視聴する場合、インターネット速度が非常に速い場合、バッファは常にすぐにいっぱいになり、再生のためにシステムに送信され、すぐにバッファされます。 。視聴中に遅延が発生することはありません。ネットワーク速度が非常に遅い場合は、バッファがいっぱいになっていることを示す読み込み中が表示されます。いっぱいになると、データがシステムに送信され、このビデオが表示されます。

NodeJs ストリームのキャッシュ プールはバッファ リンク リストであり、キャッシュ プールにデータを追加するたびにバッファ ノードが再作成され、リンク リストの最後に挿入されます。

EventEmitter

NodeJs の Stream は EventEmitter を実装した抽象インターフェイスなので、最初に EventEmitter について簡単に紹介します。

EventEmitter はイベントの発行とサブスクリプションの機能を実装するクラスで、一般的に使用されるいくつかのメソッド (on、once、off、emit) は誰もがよく知っていると思われるので、個別に紹介することはしません。

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

// 为 eventA 事件绑定处理函数
eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});

// 为 eventB 事件绑定处理函数
eventEmitter.on('eventB', () => {
    console.log('eventB active 1');
});

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});

// 触发 eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2
ログイン後にコピー

EventEmitter には、newListenerremoveListener という 2 つのイベントがあることに注意してください。イベントリスニング関数がトリガーされると、newListener (eventEmitter.emit('newListener')) がトリガーされ、ハンドラー関数が削除されると、同様に removeListener がトリガーされます。

また、一度バインドされた処理関数は 1 回だけ実行され、removeListener はその実行前にトリガーされることにも注意してください。これは、once バインディングを意味します。トリガーされる前に、まず削除されます。

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

eventEmitter.on('newListener', (event, listener)=>{
    console.log('newListener', event, listener)
})

eventEmitter.on('removeListener', (event, listener) => {
    console.log('removeListener', event, listener)
})
//newListener removeListener[Function(anonymous)]


eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});
//newListener eventA [Function (anonymous)]

function listenerB() { console.log('eventB active 1'); }
eventEmitter.on('eventB', listenerB);
// newListener eventB [Function (anonymous)]

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});
// newListener eventA [Function (anonymous)]

eventEmitter.emit('eventA')
// eventA active 1
// removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] }
// eventA active 2

eventEmitter.off('eventB', listenerB)
// removeListener eventB[Function: listenerB]
ログイン後にコピー

しかし、これは後続のコンテンツにとっては重要ではありません。

Stream

Stream は、Node.js でストリーミング データを処理するための抽象インターフェイスです。ストリームは実際のインターフェイスではなく、すべてのストリームの総称です。実際のインターフェイスは、ReadableStream、WritableStream、および ReadWriteStream です。

interface ReadableStream extends EventEmitter {
    readable: boolean;
    read(size?: number): string | Buffer;
    setEncoding(encoding: BufferEncoding): this;
    pause(): this;
    resume(): this;
    isPaused(): boolean;
    pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;
    unpipe(destination?: WritableStream): this;
    unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;
    wrap(oldStream: ReadableStream): this;
    [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}

interface WritableStream extends EventEmitter {
    writable: boolean;
    write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;
    write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;
    end(cb?: () => void): this;
    end(data: string | Uint8Array, cb?: () => void): this;
    end(str: string, encoding?: BufferEncoding, cb?: () => void): this;
}

interface ReadWriteStream extends ReadableStream, WritableStream { }
ログイン後にコピー

ReadableStream と WritableStream はどちらも EventEmitter クラスを継承するインターフェイスであることがわかります (ts のインターフェイスは型をマージするだけであるため、クラスを継承できます)。

上記のインターフェイスに対応する実装クラスは、Readable、Writable、Duplex です。

NodeJ には 4 種類のストリームがあります。

  • Readable 読み取り可能ストリーム (ReadableStream を実装)
  • Writable 書き込み可能ストリーム (WritableStream を実装)
  • 二重読み取り可能および書き込み可能ストリーム (Readable を継承した後に WritableStream を実装)
  • 変換変換ストリーム (Duplex を継承)

バックプレッシャー問題

ディスクのデータ書き込み速度はメモリの速度よりもはるかに遅いです。メモリとディスクの間にはギャップがあると考えられます。 「パイプライン」、「パイプライン」とは「流れ」を意味します。メモリ内のデータは非常に速くパイプに流れ込みます。パイプがいっぱいになると、メモリ内にデータ バック プレッシャーが発生し、データがメモリ内にバックログされて占有されます。リソース。

ストリームとは何ですか? Nodejs のストリームを理解する方法

NodeJs ストリームの解決策は、各ストリームの バッファ プール (つまり、図の書き込みキュー) に float 値を設定することです。その中のデータ量がこの float 値に達した後、push データがキャッシュ プールに再度プッシュされると false が返され、現在のストリームのキャッシュ プールの内容が float 値に達したことを示します。 、これ以上データが書き込まれる予定はありません。現時点では、大きすぎるキャッシュ プールによって引き起こされるバック プレッシャーを防ぐために、データの生成をただちに停止する必要があります。

Readable

読み取り可能なストリーム (Readable) はストリームの一種で、2 つのモードと 3 つの状態があります。

2 つの読み取りモード:

  • フロー モード: データは基盤となるシステムからバッファーに読み書きされます。バッファーがいっぱいになると、データは EventEmitter を通じてできるだけ早く登録されたイベント ハンドラーに自動的に渡されます。

  • 一時停止モード: このモードでは、EventEmitter はデータ送信のためにアクティブにトリガーされず、バッファからデータを読み取るために Readable.read() メソッドを明示的に呼び出す必要があります。 EventEmitter イベントへの応答をトリガーします。

3 つの状態:

  • readableFlowing === null (初期状態)

  • readableFlowing = == false (一時停止モード)

  • readableFlowing === true (フロー モード)

初期フロー readable.readableFlowing null

のデータ イベントを追加すると true になります。 pause()unpipe() が呼び出されるか、バック プレッシャーが受信されるか、readable イベントが追加されると、readableFlowing が実行されます。 false に設定します。この状態では、リスナーをデータ イベントにバインドしても、readableFlowing が true に切り替わりません。

resume() を呼び出して、読み取り可能なストリームの readableFlowing を true に切り替えます

すべての読み取り可能なイベントを削除して、readableFlowing を有効にしますnullになる方法。

#イベント名説明##読めるdatacloseerror
バッファーに新しい読み取り可能なデータがあるときにトリガーされます (ノードがキャッシュ プールに挿入されるたびにトリガーされます)
その後毎回トリガーされますデータの消費 トリガーされる、パラメータは今回消費されるデータです
ストリームが閉じられたときにトリガーされます
ストリームでエラーが発生したときにトリガー
メソッド名説明read(size)size の長さのデータを消費します。現在のデータが size 未満であることを示すには null を返します。それ以外の場合は、データを返します。今回消費した。サイズが渡されない場合、キャッシュ プール内のすべてのデータが消費されることを意味します
const fs = require(&#39;fs&#39;);

const readStreams = fs.createReadStream(&#39;./EventEmitter.js&#39;, {
    highWaterMark: 100// 缓存池浮标值
})

readStreams.on(&#39;readable&#39;, () => {
    console.log(&#39;缓冲区满了&#39;)
    readStreams.read()// 消费缓存池的所有数据,返回结果并且触发data事件
})


readStreams.on(&#39;data&#39;, (data) => {
    console.log(&#39;data&#39;)
})
ログイン後にコピー

https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

当 size 为 0 会触发 readable 事件。

当缓存池中的数据长度达到浮标值 highWaterMark 后,就不会在主动请求生产数据,而是等待数据被消费后在生产数据

暂停状态的流如果不调用 read 来消费数据时,后续也不会在触发 datareadable,当调用 read 消费时会先判断本次消费后剩余的数据长度是否低于 浮标值,如果低于 浮标值 就会在消费前请求生产数据。这样在 read 后的逻辑执行完成后新的数据大概率也已经生产完成,然后再次触发 readable,这种提前生产下一次消费的数据存放在缓存池的机制也是缓存流为什么快的原因

流动状态下的流有两种情况

  • 生产速度慢于消费速度时:这种情况下每一个生产数据后一般缓存池中都不会有剩余数据,直接将本次生产的数据传递给 data 事件即可(因为没有进入缓存池,所以也不用调用 read 来消费),然后立即开始生产新数据,待上一次数据消费完后新数据才生产好,再次触发 data ,一只到流结束。
  • 生产速度快于消费速度时:此时每一次生产完数据后一般缓存池都还存在未消费的数据,这种情况一般会在消费数据时开始生产下一次消费的数据,待旧数据消费完后新数据已经生产完并且放入缓存池

他们的区别仅仅在于数据生产后缓存池是否还存在数据,如果存在数据则将生产的数据 push 到缓存池等待消费,如果不存在则直接将数据交给 data 而不加入缓存池。

值得注意的是当一个缓存池中存在数据的流从暂停模式进入的流动模式时,会先循环调用 read 来消费数据只到返回 null

暂停模式

ストリームとは何ですか? Nodejs のストリームを理解する方法

暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用 _read 方法,把数据从数据源 push 到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 " readable " 事件,告诉消费者有数据已经准备好了,可以继续消费。

一般来说, &#39;readable&#39; 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次 &#39;readable&#39; 事件;

消费者 " readable " 事件的处理函数中,通过 stream.read(size) 主动消费缓冲池中的数据。

const { Readable } = require(&#39;stream&#39;)

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    // 参数的 read 方法会作为流的 _read 方法,用于获取源数据
    read(size) {
        // 假设我们的源数据上 1000 个1
        let chunk = null
        // 读取数据的过程一般是异步的,例如IO操作
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = &#39;1&#39;.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})
// 每一次成功 push 数据到缓存池后都会触发 readable
myReadable.on(&#39;readable&#39;, () => {
    const chunk = myReadable.read()//消费当前缓存池中所有数据
    console.log(chunk.toString())
})
ログイン後にコピー

值得注意的是, 如果 read(size) 的 size 大于浮标值,会重新计算新的浮标值,新浮标值是size的下一个二次幂(size <= 2^n,n取最小值)

//  hwm 不会大于 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
  if (n >= MAX_HWM) {
    // 1GB限制
    n = MAX_HWM;
  } else {
    //取下一个2最高幂,以防止过度增加hwm
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
  }
  return n;
}
ログイン後にコピー

流动模式

ストリームとは何ですか? Nodejs のストリームを理解する方法

所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:

  • 添加 " data " 事件句柄;
  • 调用 “ resume ”方法;
  • 使用 " pipe " 方法把数据发送到可写流

流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调 _read 方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null) );

可读流可以通过以下方式切换回暂停模式:

  • 如果没有管道目标,则调用 stream.pause()
  • 如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。
const { Readable } = require(&#39;stream&#39;)

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    read(size) {
        let chunk = null
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = &#39;1&#39;.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})

myReadable.on(&#39;data&#39;, data => {
    console.log(data.toString())
})
ログイン後にコピー

Writable

相对可读流来说,可写流要简单一些。

ストリームとは何ですか? Nodejs のストリームを理解する方法

当生产者调用 write(chunk) 时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用 _write,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用 write(chunk) 后会返回 false,这时候生产者应该停止继续写入。

那么什么时候可以继续写入呢?当缓冲中的数据都被成功 _write 之后,清空了缓冲队列后会触发 drain 事件,这时候生产者可以继续写入数据。

当生产者需要结束写入数据时,需要调用 stream.end 方法通知可写流结束。

const { Writable, Duplex } = require(&#39;stream&#39;)
let fileContent = &#39;&#39;
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {// 会作为_write方法
        setTimeout(()=>{
            fileContent += chunk
            callback()// 写入结束后调用
        }, 500)
    }
})

myWritable.on(&#39;close&#39;, ()=>{
    console.log(&#39;close&#39;, fileContent)
})
myWritable.write(&#39;123123&#39;)// true
myWritable.write(&#39;123123&#39;)// false
myWritable.end()
ログイン後にコピー

注意,在缓存池中数据到达浮标值后,此时缓存池中可能存在多个节点,在清空缓存池的过程中(循环调用_read),并不会向可读流一样尽量一次消费长度为浮标值的数据,而是每次消费一个缓冲区节点,即使这个缓冲区长度于浮标值不一致也是如此

const { Writable } = require(&#39;stream&#39;)


let fileContent = &#39;&#39;
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {
        setTimeout(()=>{
            fileContent += chunk
            console.log(&#39;消费&#39;, chunk.toString())
            callback()// 写入结束后调用
        }, 100)
    }
})

myWritable.on(&#39;close&#39;, ()=>{
    console.log(&#39;close&#39;, fileContent)
})

let count = 0
function productionData(){
    let flag = true
    while (count <= 20 && flag){
        flag = myWritable.write(count.toString())
        count++
    }
    if(count > 20){
        myWritable.end()
    }
}
productionData()
myWritable.on(&#39;drain&#39;, productionData)
ログイン後にコピー

上述是一个浮标值为 10 的可写流,现在数据源是一个 0——20 到连续的数字字符串,productionData 用于写入数据。

  • 首先第一次调用 myWritable.write("0") 时,因为缓存池不存在数据,所以 "0" 不进入缓存池,而是直接交给 _wirtemyWritable.write("0") 返回值为 true

  • 当执行 myWritable.write("1") 时,因为 _wirtecallback 还未调用,表明上一次数据还未写入完,位置保证数据写入的有序性,只能创建一个缓冲区将 "1" 加入缓存池中。后面 2-9 都是如此

  • 当执行 myWritable.write("10") 时,此时缓冲区长度为 9(1-9),还未到达浮标值, "10" 继续作为一个缓冲区加入缓存池中,此时缓存池长度变为 11,所以 myWritable.write("1") 返回 false,这意味着缓冲区的数据已经足够,我们需要等待 drain 事件通知时再生产数据。

  • 100ms过后,_write("0", encoding, callback)callback 被调用,表明 "0" 已经写入完成。然后会检查缓存池中是否存在数据,如果存在则会先调用 _read 消费缓存池的头节点("1"),然后继续重复这个过程直到缓存池为空后触发 drain 事件,再次执行 productionData

  • 调用 myWritable.write("11"),触发第1步开始的过程,直到流结束。

Duplex

在理解了可读流与可写流后,双工流就好理解了,双工流事实上是继承了可读流然后实现了可写流(源码是这么写的,但是应该说是同时实现了可读流和可写流更加好)。

ストリームとは何ですか? Nodejs のストリームを理解する方法

Duplex 流需要同时实现下面两个方法

  • 实现 _read() 方法,为可读流生产数据

  • 实现 _write() 方法,为可写流消费数据

上面两个方法如何实现在上面可写流可读流的部分已经介绍过了,这里需要注意的是,双工流是存在两个独立的缓存池分别提供给两个流,他们的数据源也不一样

以 NodeJs 的标准输入输出流为例:

  • 当我们在控制台输入数据时会触发其 data 事件,这证明他有可读流的功能,每一次用户键入回车相当于调用可读的 push 方法推送生产的数据。
  • 当我们调用其 write 方法时也可以向控制台输出内容,但是不会触发 data 事件,这说明他有可写流的功能,而且有独立的缓冲区,_write 方法的实现内容就是让控制台展示文字。
// 每当用户在控制台输入数据(_read),就会触发data事件,这是可读流的特性
process.stdin.on(&#39;data&#39;, data=>{
    process.stdin.write(data);
})

// 每隔一秒向标准输入流生产数据(这是可写流的特性,会直接输出到控制台上),不会触发data
setInterval(()=>{
    process.stdin.write(&#39;不是用户控制台输入的数据&#39;)
}, 1000)
ログイン後にコピー

Transform

ストリームとは何ですか? Nodejs のストリームを理解する方法

可以将 Duplex 流视为具有可写流的可读流。两者都是独立的,每个都有独立的内部缓冲区。读写事件独立发生。

                             Duplex Stream
                          ------------------|
                    Read  <-----               External Source
            You           ------------------|  
                    Write ----->               External Sink
                          ------------------|
ログイン後にコピー

Transform 流是双工的,其中读写以因果关系进行。双工流的端点通过某种转换链接。读取要求发生写入。

                                 Transform Stream
                           --------------|--------------
            You     Write  ---->                   ---->  Read  You
                           --------------|--------------
ログイン後にコピー

对于创建 Transform 流,最重要的是要实现 _transform 方法而不是 _write 或者 _read_transform 中对可写流写入的数据做处理(消费)然后为可读流生产数据。

转换流还经常会实现一个 `_flush` 方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的
ログイン後にコピー
const { write } = require(&#39;fs&#39;)
const { Transform, PassThrough } = require(&#39;stream&#39;)

const reurce = &#39;1312123213124341234213423428354816273513461891468186499126412&#39;

const transform = new Transform({
    highWaterMark: 10,
    transform(chunk ,encoding, callback){// 转换数据,调用push将转换结果加入缓存池
        this.push(chunk.toString().replace(&#39;1&#39;, &#39;@&#39;))
        callback()
    },
    flush(callback){// end触发前执行
        this.push(&#39;<<<&#39;)
        callback()
    }
})


// write 不断写入数据
let count = 0
transform.write(&#39;>>>&#39;)
function productionData() {
    let flag = true
    while (count <= 20 && flag) {
        flag = transform.write(count.toString())
        count++
    }
    if (count > 20) {
        transform.end()
    }
}
productionData()
transform.on(&#39;drain&#39;, productionData)


let result = &#39;&#39;
transform.on(&#39;data&#39;, data=>{
    result += data.toString()
})
transform.on(&#39;end&#39;, ()=>{
    console.log(result)
    // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<<
})
ログイン後にコピー

Pipe

管道是将上一个程序的输出作为下一个程序的输入,这是管道在 Linux 中管道的作用。NodeJs 中的管道其实也类似,它管道用于连接两个流,上游的流的输出会作为下游的流的输入。

ストリームとは何ですか? Nodejs のストリームを理解する方法

管道 sourec.pipe(dest, options) 要求 sourec 是可读的,dest是可写的。其返回值是 dest。

对于处于管道中间的流既是下一个流的上游也是上一个流的下游,所以其需要时一个可读可写的双工流,一般我们会使用转换流来作为管道中间的流。

https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33

Stream.prototype.pipe = function(dest, options) {
  const source = this;

  function ondata(chunk) {
    if (dest.writable && dest.write(chunk) === false && source.pause) {
      source.pause();
    }
  }

  source.on(&#39;data&#39;, ondata);

  function ondrain() {
    if (source.readable && source.resume) {
      source.resume();
    }
  }

  dest.on(&#39;drain&#39;, ondrain);
  // ...后面的代码省略
}
ログイン後にコピー

pipe 的实现非常清晰,当上游的流发出 data 事件时会调用下游流的 write 方法写入数据,然后立即调用 source.pause() 使得上游变为暂停状态,这主要是为了防止背压。

当下游的流将数据消费完成后会调用 source.resume() 使上游再次变为流动状态。

我们实现一个将 data 文件中所有 1 替换为 @ 然后输出到 result 文件到管道。

const { Transform } = require(&#39;stream&#39;)
const { createReadStream, createWriteStream } = require(&#39;fs&#39;)

// 一个位于管道中的转换流
function createTransformStream(){
    return new Transform({
        transform(chunk, encoding, callback){
            this.push(chunk.toString().replace(/1/g, &#39;@&#39;))
            callback()
        }
    })
}
createReadStream(&#39;./data&#39;)
.pipe(createTransformStream())
.pipe(createWriteStream(&#39;./result&#39;))
ログイン後にコピー

在管道中只存在两个流时,其功能和转换流有点类似,都是将一个可读流与一个可写流串联起来,但是管道可以串联多个流。

原文地址:https://juejin.cn/post/7077511716564631566

作者:月夕

更多node相关知识,请访问:nodejs 教程

以上がストリームとは何ですか? Nodejs のストリームを理解する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Nodejs はバックエンド フレームワークですか? Nodejs はバックエンド フレームワークですか? Apr 21, 2024 am 05:09 AM

Node.js は、高いパフォーマンス、スケーラビリティ、クロスプラットフォーム サポート、豊富なエコシステム、開発の容易さなどの機能を備えているため、バックエンド フレームワークとして使用できます。

Nodejsをmysqlデータベースに接続する方法 Nodejsをmysqlデータベースに接続する方法 Apr 21, 2024 am 06:13 AM

MySQL データベースに接続するには、次の手順に従う必要があります。 mysql2 ドライバーをインストールします。 mysql2.createConnection() を使用して、ホスト アドレス、ポート、ユーザー名、パスワード、データベース名を含む接続オブジェクトを作成します。 connection.query() を使用してクエリを実行します。最後に connection.end() を使用して接続を終了します。

Nodejsのグローバル変数とは何ですか Nodejsのグローバル変数とは何ですか Apr 21, 2024 am 04:54 AM

Node.js には次のグローバル変数が存在します。 グローバル オブジェクト: グローバル コア モジュール: プロセス、コンソール、require ランタイム環境変数: __dirname、__filename、__line、__column 定数: unknown、null、NaN、Infinity、-Infinity

Nodejs インストール ディレクトリ内の npm ファイルと npm.cmd ファイルの違いは何ですか? Nodejs インストール ディレクトリ内の npm ファイルと npm.cmd ファイルの違いは何ですか? Apr 21, 2024 am 05:18 AM

Node.js インストール ディレクトリには、npm と npm.cmd という 2 つの npm 関連ファイルがあります。違いは次のとおりです。拡張子が異なります。npm は実行可能ファイルで、npm.cmd はコマンド ウィンドウのショートカットです。 Windows ユーザー: npm.cmd はコマンド プロンプトから使用できますが、npm はコマンド ラインからのみ実行できます。互換性: npm.cmd は Windows システムに固有ですが、npm はクロスプラットフォームで使用できます。使用上の推奨事項: Windows ユーザーは npm.cmd を使用し、他のオペレーティング システムは npm を使用します。

PIノードティーチング:PIノードとは何ですか? PIノードをインストールしてセットアップする方法は? PIノードティーチング:PIノードとは何ですか? PIノードをインストールしてセットアップする方法は? Mar 05, 2025 pm 05:57 PM

ピン張りのノードの詳細な説明とインストールガイドこの記事では、ピネットワークのエコシステムを詳細に紹介します - PIノードは、ピン系生態系における重要な役割であり、設置と構成の完全な手順を提供します。 Pinetworkブロックチェーンテストネットワークの発売後、PIノードは多くの先駆者の重要な部分になり、テストに積極的に参加し、今後のメインネットワークリリースの準備をしています。まだピン張りのものがわからない場合は、ピコインとは何かを参照してください。リストの価格はいくらですか? PIの使用、マイニング、セキュリティ分析。パインワークとは何ですか?ピン競技プロジェクトは2019年に開始され、独占的な暗号通貨PIコインを所有しています。このプロジェクトは、誰もが参加できるものを作成することを目指しています

NodejsとJavaの間に大きな違いはありますか? NodejsとJavaの間に大きな違いはありますか? Apr 21, 2024 am 06:12 AM

Node.js と Java の主な違いは、設計と機能です。 イベント駆動型とスレッド駆動型: Node.js はイベント駆動型で、Java はスレッド駆動型です。シングルスレッドとマルチスレッド: Node.js はシングルスレッドのイベント ループを使用し、Java はマルチスレッド アーキテクチャを使用します。ランタイム環境: Node.js は V8 JavaScript エンジン上で実行され、Java は JVM 上で実行されます。構文: Node.js は JavaScript 構文を使用し、Java は Java 構文を使用します。目的: Node.js は I/O 集中型のタスクに適しており、Java は大規模なエンタープライズ アプリケーションに適しています。

Nodejs はバックエンド開発言語ですか? Nodejs はバックエンド開発言語ですか? Apr 21, 2024 am 05:09 AM

はい、Node.js はバックエンド開発言語です。これは、サーバー側のビジネス ロジックの処理、データベース接続の管理、API の提供などのバックエンド開発に使用されます。

Nodejsプロジェクトをサーバーにデプロイする方法 Nodejsプロジェクトをサーバーにデプロイする方法 Apr 21, 2024 am 04:40 AM

Node.js プロジェクトのサーバー デプロイメント手順: デプロイメント環境を準備します。サーバー アクセスの取得、Node.js のインストール、Git リポジトリのセットアップ。アプリケーションをビルドする: npm run build を使用して、デプロイ可能なコードと依存関係を生成します。コードをサーバーにアップロードします: Git またはファイル転送プロトコル経由。依存関係をインストールする: サーバーに SSH で接続し、npm install を使用してアプリケーションの依存関係をインストールします。アプリケーションを開始します。node Index.js などのコマンドを使用してアプリケーションを開始するか、pm2 などのプロセス マネージャーを使用します。リバース プロキシの構成 (オプション): Nginx や Apache などのリバース プロキシを使用して、トラフィックをアプリケーションにルーティングします。

See all articles