목차
스트림 이해 방법
캐시 풀
EventEmitter
배압 문제
Readable
暂停模式
流动模式
Writable
Duplex
Transform
Pipe
웹 프론트엔드 JS 튜토리얼 스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

May 06, 2022 pm 07:27 PM
nodejs node.js node

플로우란 무엇인가요? 흐름을 이해하는 방법? 다음 기사는 Node의 스트림에 대한 심층적인 이해를 제공할 것입니다. 도움이 되기를 바랍니다!

스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

저자는 최근 개발에서 파이프 기능을 자주 사용했습니다. 스트림을 위한 파이프라인이라는 것만 알 뿐 어떻게 작동하는지 모르겠습니다. 그래서 알아보겠다는 마음으로 간단히 시작합니다. 내가 읽은 지식과 소스 코드를 기사로 정리하여 모두와 공유합니다.

Stream은 Nodejs의 매우 기본적인 개념입니다. 많은 기본 모듈이 스트림을 기반으로 구현되며 매우 중요한 역할을 합니다. 동시에 흐름은 이해하기 매우 어려운 개념이기도 합니다. 이는 주로 관련 문서가 부족하기 때문입니다. 다행히도 NodeJ 초보자의 경우 이 개념을 완전히 익히기 전에 흐름을 이해하는 데 많은 시간이 걸립니다. 대부분의 NodeJ에서는 사용자의 경우 웹 애플리케이션을 개발하는 데만 사용됩니다. 스트림에 대한 이해가 부족해도 사용에 영향을 미치지 않습니다. 그러나 스트림을 이해하면 NodeJ의 다른 모듈을 더 잘 이해할 수 있으며 경우에 따라 스트림을 사용하여 데이터를 처리하면 더 나은 결과를 얻을 수 있습니다. [관련 튜토리얼 추천 : nodejs 동영상 튜토리얼]

스트림 이해 방법

  • 스트림 사용자의 경우 스트림을 배열로 간주할 수 있으며, 스트림을 가져오고(소비하고) 쓰는 데만 집중하면 됩니다. 그것(생산)은 괜찮습니다.

  • 스트림 개발자의 경우(스트림 모듈을 사용하여 새 인스턴스 생성) 스트림에서 일부 메서드를 구현하는 방법에 중점을 둡니다. 일반적으로 대상 리소스가 누구인지, 대상 리소스를 작동하는 방법이라는 두 가지 사항에 중점을 둡니다. . 일단 결정되면 스트림의 다양한 상태와 이벤트에 따라 대상 리소스를 운영해야 합니다

캐시 풀

NodeJ의 모든 스트림에는 버퍼 풀이 있습니다. 버퍼 풀의 목적은 스트림의 효율성을 높이는 것입니다. 데이터가 생산되고 소비될 때 시간이 걸릴 때 미리 데이터를 생산하여 다음 소비 전에 버퍼 풀에 저장할 수 있습니다. 그러나 버퍼 풀이 항상 사용되는 것은 아닙니다. 예를 들어 캐시 풀이 비어 있으면 생성 후 데이터가 캐시 풀에 저장되지 않고 직접 소비됩니다. .

데이터 생산 속도가 데이터 소비 속도보다 빠르면 초과된 데이터가 어딘가에서 기다리고 있을 것입니다. 데이터 생성 속도가 처리 데이터 소비 속도보다 느린 경우, 데이터는 어딘가에 일정량 축적되어 소비됩니다. (개발자는 데이터의 생산과 소비의 속도를 통제할 수 없으며, 어떤 시간에 데이터를 생산하거나 소비하려고만 할 수 있습니다.)

데이터가 기다리고, 축적되고, 발생하는 그 곳. 버퍼풀 입니다. 버퍼 풀은 일반적으로 컴퓨터의 RAM(메모리)에 위치합니다.

일반적인 버퍼의 예를 들자면, 온라인 동영상을 볼 때 인터넷 속도가 빠르면 버퍼는 항상 즉시 채워진 다음 재생을 위해 시스템으로 전송되고 다음 동영상이 즉시 버퍼링됩니다. 시청하는 동안 지연이 발생하지 않습니다. 네트워크 속도가 매우 느린 경우 로딩이 표시되며, 이는 버퍼가 채워지고 있음을 나타내는 것입니다. 채우기가 완료되면 데이터가 시스템으로 전송되며 이 비디오를 볼 수 있습니다.

NodeJs 스트림의 캐시 풀은 버퍼 연결 목록입니다. 캐시 풀에 데이터를 추가할 때마다 버퍼 노드가 다시 생성되어 연결 목록 끝에 삽입됩니다.

EventEmitter

NodeJs의 Stream은 EventEmitter를 구현하는 추상 인터페이스이므로 먼저 EventEmitter를 간략하게 소개하겠습니다.

EventEmitter는 이벤트 게시 및 구독 기능을 구현하는 클래스입니다. 일반적으로 사용되는 몇 가지 방법(on, Once, off, Emit)은 누구에게나 친숙할 것으로 생각되므로 하나씩 소개하지 않겠습니다.

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

// 为 eventA 事件绑定处理函数
eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});

// 为 eventB 事件绑定处理函数
eventEmitter.on('eventB', () => {
    console.log('eventB active 1');
});

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});

// 触发 eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2
로그인 후 복사

EventEmitter에는 newListenerremoveListener라는 두 가지 이벤트가 있다는 점에 주목할 가치가 있습니다. newListener(eventEmitter.emit('newListener')) 함수가 트리거됩니다. 처리 함수가 제거되면 removeListener가 동일한 방식으로 트리거됩니다. EventEmitter 有两个叫做 newListenerremoveListener 的事件,当你向一个事件对象中添加任何事件监听函数后,都会触发 newListener(eventEmitter.emit('newListener')),当一个处理函数被移除时同理会触发 removeListener

还需要注意的是, once 绑定的处理函数只会执行一次,removeListener 将在其执行前被触发,这意味着 once

한 번 바인딩된 핸들러 함수는 한 번만 실행되며 removeListener는 실행 전에 트리거됩니다. 즉, 한 번으로 바인딩된 리스너는 해당 기능은 트리거되기 전에 먼저 제거됩니다.

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

eventEmitter.on('newListener', (event, listener)=>{
    console.log('newListener', event, listener)
})

eventEmitter.on('removeListener', (event, listener) => {
    console.log('removeListener', event, listener)
})
//newListener removeListener[Function(anonymous)]


eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});
//newListener eventA [Function (anonymous)]

function listenerB() { console.log('eventB active 1'); }
eventEmitter.on('eventB', listenerB);
// newListener eventB [Function (anonymous)]

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});
// newListener eventA [Function (anonymous)]

eventEmitter.emit('eventA')
// eventA active 1
// removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] }
// eventA active 2

eventEmitter.off('eventB', listenerB)
// removeListener eventB[Function: listenerB]
로그인 후 복사

하지만 나중에 보게 될 내용에는 이것이 중요하지 않습니다.

Stream

Stream은 Node.js에서 스트리밍 데이터를 처리하기 위한 추상 인터페이스입니다. 스트림은 실제 인터페이스가 아니라 모든 스트림을 가리키는 일반적인 용어입니다. 실제 인터페이스는 ReadableStream, WritableStream 및 ReadWriteStream입니다.

interface ReadableStream extends EventEmitter {
    readable: boolean;
    read(size?: number): string | Buffer;
    setEncoding(encoding: BufferEncoding): this;
    pause(): this;
    resume(): this;
    isPaused(): boolean;
    pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;
    unpipe(destination?: WritableStream): this;
    unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;
    wrap(oldStream: ReadableStream): this;
    [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}

interface WritableStream extends EventEmitter {
    writable: boolean;
    write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;
    write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;
    end(cb?: () => void): this;
    end(data: string | Uint8Array, cb?: () => void): this;
    end(str: string, encoding?: BufferEncoding, cb?: () => void): this;
}

interface ReadWriteStream extends ReadableStream, WritableStream { }
로그인 후 복사

ReadableStream과 WritableStream은 모두 EventEmitter 클래스를 상속하는 인터페이스임을 알 수 있습니다(ts의 인터페이스는 병합 유형이므로 클래스를 상속할 수 있습니다).

위 인터페이스에 해당하는 구현 클래스는 각각 Readable, Writable 및 Duplex입니다.

NodeJ에는 4가지 유형의 스트림이 있습니다. 🎜
  • 읽기 가능하고 읽기 가능한 스트림(ReadableStream 구현)
  • 쓰기 가능한 쓰기 가능한 스트림(WritableStream 구현)
  • 이중 읽기 및 쓰기 가능 스트림(Readable을 상속한 후 WritableStream 구현)
  • 변환 변환 스트림(Duplex 상속)

배압 문제

The 디스크에 데이터를 쓰는 속도는 메모리의 속도보다 훨씬 낮습니다. 우리는 메모리와 디스크 사이에 "파이프"가 있고 "파이프"는 메모리의 데이터가 흐르는 것을 상상합니다. 파이프가 막히면 메모리에 데이터 역압이 발생하고 데이터가 메모리에 축적되어 리소스를 차지하게 됩니다.

스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

NodeJs Stream의 솔루션은 데이터 양이 이 float에 도달하면 각 스트림의 캐시 풀(즉, 그림의 쓰기 대기열)에 대해 float 값을 설정하는 것입니다. 값, 캐시로 이동 풀 데이터를 다시 push하면 false를 반환합니다. 이는 현재 스트림의 캐시 풀 콘텐츠가 float 값에 도달했으며 더 이상 데이터가 없을 것으로 예상됨을 의미합니다. 이때 캐싱을 방지하기 위해 데이터 생성을 즉시 중단해야 합니다. 缓存池(就是图中写入队列)设置一个浮标值,当其中数据量达到这个浮标值后,往缓存池再次 push 数据时就会返回 false,表示当前流中缓存池内容已经达到浮标值,不希望再有数据写入了,这时我们应该立即停止数据的生产,防止缓存池过大产生背压。

Readable

可读流(Readable)是流的一种类型,他有两种模式三种状态

两种读取模式:

  • 流动模式:数据会从底层系统读取写入到缓冲区,当缓冲区被写满后自动通过 EventEmitter 尽快的将数据传递给所注册的事件处理程序中

  • 暂停模式:在这种模式下将不会主动触发 EventEmitter 传输数据,必须显示的调用 Readable.read() 方法来从缓冲区中读取数据,read 会触发响应到 EventEmitter 事件。

三种状态:

  • readableFlowing === null(初始状态)

  • readableFlowing === false(暂停模式)

  • readableFlowing === true(流动模式)

初始时流的 readable.readableFlowingnull

添加data事件后变为 true 。调用 pause()unpipe()、或接收到背压或者添加 readable 事件,则 readableFlowing 会被设为 false ,在这个状态下,为 data 事件绑定监听器不会使 readableFlowing 切换到 true

调用 resume() 可以让可读流的 readableFlowing

읽기 가능

읽기 가능 스트림(Readable)은 스트림 유형입니다. 여기에는 두 가지 모드와 세 가지 상태가 있습니다.

두 가지 읽기 모드:
    Flow 모드: 기본 시스템에서 버퍼로 데이터를 읽고 씁니다. 버퍼가 가득 차면 데이터는 가능한 한 빨리 EventEmitter를 통해 등록된 이벤트 핸들러에 자동으로 전달됩니다. 세 가지 상태:
      readableFlowing === null(초기 상태) readableFlowing === false(일시 중지 모드) readFlowing === true(흐름 모드) 초기 흐름의 reader.readableFlowingnull이며 데이터 이벤트를 추가한 후 true가 됩니다. pause(), unpipe()가 호출되거나 backPressure가 수신되거나 readable 이벤트가 추가되면 readableFlowing</code > will이 false로 설정되어 있습니다.<strong>이 상태에서는 리스너를 데이터 이벤트에 바인딩해도 readableFlowing이 true로 전환되지 않습니다</strong>. </tr><tr><code>resume()을 호출하여 읽기 가능한 흐름의 readableFlowing을 true로 전환하세요.
      프로그램의 일시 중지 모드: 이 모드에서는 EventEmitter가 데이터를 전송하기 위해 적극적으로 트리거되지 않습니다. 버퍼 읽기 이벤트에서 데이터를 읽으려면 Readable.read() 메서드를 명시적으로 호출해야 합니다. EventEmitter에 대한 응답으로 트리거됩니다.
      모든 읽기 가능한 이벤트를 제거하는 것이 readableFlowing을 null로 만드는 유일한 방법입니다. 버퍼에 새로 읽을 수있는 데이터가있을 때 (캐시 풀에 삽입 된 모든 노드에 대해 트리거링) 데이터가 소비될 때마다 트리거됩니다. 매개변수는 스트림이 닫힐 때 트리거됩니다. 설명
      read(size)
      const fs = require(&#39;fs&#39;);
      
      const readStreams = fs.createReadStream(&#39;./EventEmitter.js&#39;, {
          highWaterMark: 100// 缓存池浮标值
      })
      
      readStreams.on(&#39;readable&#39;, () => {
          console.log(&#39;缓冲区满了&#39;)
          readStreams.read()// 消费缓存池的所有数据,返回结果并且触发data事件
      })
      
      
      readStreams.on(&#39;data&#39;, (data) => {
          console.log(&#39;data&#39;)
      })
      로그인 후 복사

      https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

      当 size 为 0 会触发 readable 事件。

      当缓存池中的数据长度达到浮标值 highWaterMark 后,就不会在主动请求生产数据,而是等待数据被消费后在生产数据

      暂停状态的流如果不调用 read 来消费数据时,后续也不会在触发 datareadable,当调用 read 消费时会先判断本次消费后剩余的数据长度是否低于 浮标值,如果低于 浮标值 就会在消费前请求生产数据。这样在 read 后的逻辑执行完成后新的数据大概率也已经生产完成,然后再次触发 readable,这种提前生产下一次消费的数据存放在缓存池的机制也是缓存流为什么快的原因

      流动状态下的流有两种情况

      • 生产速度慢于消费速度时:这种情况下每一个生产数据后一般缓存池中都不会有剩余数据,直接将本次生产的数据传递给 data 事件即可(因为没有进入缓存池,所以也不用调用 read 来消费),然后立即开始生产新数据,待上一次数据消费完后新数据才生产好,再次触发 data ,一只到流结束。
      • 生产速度快于消费速度时:此时每一次生产完数据后一般缓存池都还存在未消费的数据,这种情况一般会在消费数据时开始生产下一次消费的数据,待旧数据消费完后新数据已经生产完并且放入缓存池

      他们的区别仅仅在于数据生产后缓存池是否还存在数据,如果存在数据则将生产的数据 push 到缓存池等待消费,如果不存在则直接将数据交给 data 而不加入缓存池。

      值得注意的是当一个缓存池中存在数据的流从暂停模式进入的流动模式时,会先循环调用 read 来消费数据只到返回 null

      暂停模式

      스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

      暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用 _read 方法,把数据从数据源 push 到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 " readable " 事件,告诉消费者有数据已经准备好了,可以继续消费。

      一般来说, &#39;readable&#39; 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次 &#39;readable&#39; 事件;

      消费者 " readable " 事件的处理函数中,通过 stream.read(size) 主动消费缓冲池中的数据。

      const { Readable } = require(&#39;stream&#39;)
      
      let count = 1000
      const myReadable = new Readable({
          highWaterMark: 300,
          // 参数的 read 方法会作为流的 _read 方法,用于获取源数据
          read(size) {
              // 假设我们的源数据上 1000 个1
              let chunk = null
              // 读取数据的过程一般是异步的,例如IO操作
              setTimeout(() => {
                  if (count > 0) {
                      let chunkLength = Math.min(count, size)
                      chunk = &#39;1&#39;.repeat(chunkLength)
                      count -= chunkLength
                  }
                  this.push(chunk)
              }, 500)
          }
      })
      // 每一次成功 push 数据到缓存池后都会触发 readable
      myReadable.on(&#39;readable&#39;, () => {
          const chunk = myReadable.read()//消费当前缓存池中所有数据
          console.log(chunk.toString())
      })
      로그인 후 복사

      值得注意的是, 如果 read(size) 的 size 大于浮标值,会重新计算新的浮标值,新浮标值是size的下一个二次幂(size <= 2^n,n取最小值)

      //  hwm 不会大于 1GB.
      const MAX_HWM = 0x40000000;
      function computeNewHighWaterMark(n) {
        if (n >= MAX_HWM) {
          // 1GB限制
          n = MAX_HWM;
        } else {
          //取下一个2最高幂,以防止过度增加hwm
          n--;
          n |= n >>> 1;
          n |= n >>> 2;
          n |= n >>> 4;
          n |= n >>> 8;
          n |= n >>> 16;
          n++;
        }
        return n;
      }
      로그인 후 복사

      流动模式

      스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

      所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:

      • 添加 " data " 事件句柄;
      • 调用 “ resume ”方法;
      • 使用 " pipe " 方法把数据发送到可写流

      流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调 _read 方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null) );

      可读流可以通过以下方式切换回暂停模式:

      • 如果没有管道目标,则调用 stream.pause()
      • 如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。
      const { Readable } = require(&#39;stream&#39;)
      
      let count = 1000
      const myReadable = new Readable({
          highWaterMark: 300,
          read(size) {
              let chunk = null
              setTimeout(() => {
                  if (count > 0) {
                      let chunkLength = Math.min(count, size)
                      chunk = &#39;1&#39;.repeat(chunkLength)
                      count -= chunkLength
                  }
                  this.push(chunk)
              }, 500)
          }
      })
      
      myReadable.on(&#39;data&#39;, data => {
          console.log(data.toString())
      })
      로그인 후 복사

      Writable

      相对可读流来说,可写流要简单一些。

      스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

      当生产者调用 write(chunk) 时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用 _write,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用 write(chunk) 后会返回 false,这时候生产者应该停止继续写入。

      那么什么时候可以继续写入呢?当缓冲中的数据都被成功 _write 之后,清空了缓冲队列后会触发 drain 事件,这时候生产者可以继续写入数据。

      当生产者需要结束写入数据时,需要调用 stream.end 方法通知可写流结束。

      const { Writable, Duplex } = require(&#39;stream&#39;)
      let fileContent = &#39;&#39;
      const myWritable = new Writable({
          highWaterMark: 10,
          write(chunk, encoding, callback) {// 会作为_write方法
              setTimeout(()=>{
                  fileContent += chunk
                  callback()// 写入结束后调用
              }, 500)
          }
      })
      
      myWritable.on(&#39;close&#39;, ()=>{
          console.log(&#39;close&#39;, fileContent)
      })
      myWritable.write(&#39;123123&#39;)// true
      myWritable.write(&#39;123123&#39;)// false
      myWritable.end()
      로그인 후 복사

      注意,在缓存池中数据到达浮标值后,此时缓存池中可能存在多个节点,在清空缓存池的过程中(循环调用_read),并不会向可读流一样尽量一次消费长度为浮标值的数据,而是每次消费一个缓冲区节点,即使这个缓冲区长度于浮标值不一致也是如此

      const { Writable } = require(&#39;stream&#39;)
      
      
      let fileContent = &#39;&#39;
      const myWritable = new Writable({
          highWaterMark: 10,
          write(chunk, encoding, callback) {
              setTimeout(()=>{
                  fileContent += chunk
                  console.log(&#39;消费&#39;, chunk.toString())
                  callback()// 写入结束后调用
              }, 100)
          }
      })
      
      myWritable.on(&#39;close&#39;, ()=>{
          console.log(&#39;close&#39;, fileContent)
      })
      
      let count = 0
      function productionData(){
          let flag = true
          while (count <= 20 && flag){
              flag = myWritable.write(count.toString())
              count++
          }
          if(count > 20){
              myWritable.end()
          }
      }
      productionData()
      myWritable.on(&#39;drain&#39;, productionData)
      로그인 후 복사

      上述是一个浮标值为 10 的可写流,现在数据源是一个 0——20 到连续的数字字符串,productionData 用于写入数据。

      • 首先第一次调用 myWritable.write("0") 时,因为缓存池不存在数据,所以 "0" 不进入缓存池,而是直接交给 _wirtemyWritable.write("0") 返回值为 true

      • 当执行 myWritable.write("1") 时,因为 _wirtecallback 还未调用,表明上一次数据还未写入完,位置保证数据写入的有序性,只能创建一个缓冲区将 "1" 加入缓存池中。后面 2-9 都是如此

      • 当执行 myWritable.write("10") 时,此时缓冲区长度为 9(1-9),还未到达浮标值, "10" 继续作为一个缓冲区加入缓存池中,此时缓存池长度变为 11,所以 myWritable.write("1") 返回 false,这意味着缓冲区的数据已经足够,我们需要等待 drain 事件通知时再生产数据。

      • 100ms过后,_write("0", encoding, callback)callback 被调用,表明 "0" 已经写入完成。然后会检查缓存池中是否存在数据,如果存在则会先调用 _read 消费缓存池的头节点("1"),然后继续重复这个过程直到缓存池为空后触发 drain 事件,再次执行 productionData

      • 调用 myWritable.write("11"),触发第1步开始的过程,直到流结束。

      Duplex

      在理解了可读流与可写流后,双工流就好理解了,双工流事实上是继承了可读流然后实现了可写流(源码是这么写的,但是应该说是同时实现了可读流和可写流更加好)。

      스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

      Duplex 流需要同时实现下面两个方法

      • 实现 _read() 方法,为可读流生产数据

      • 实现 _write() 方法,为可写流消费数据

      上面两个方法如何实现在上面可写流可读流的部分已经介绍过了,这里需要注意的是,双工流是存在两个独立的缓存池分别提供给两个流,他们的数据源也不一样

      以 NodeJs 的标准输入输出流为例:

      • 当我们在控制台输入数据时会触发其 data 事件,这证明他有可读流的功能,每一次用户键入回车相当于调用可读的 push 方法推送生产的数据。
      • 当我们调用其 write 方法时也可以向控制台输出内容,但是不会触发 data 事件,这说明他有可写流的功能,而且有独立的缓冲区,_write 方法的实现内容就是让控制台展示文字。
      // 每当用户在控制台输入数据(_read),就会触发data事件,这是可读流的特性
      process.stdin.on(&#39;data&#39;, data=>{
          process.stdin.write(data);
      })
      
      // 每隔一秒向标准输入流生产数据(这是可写流的特性,会直接输出到控制台上),不会触发data
      setInterval(()=>{
          process.stdin.write(&#39;不是用户控制台输入的数据&#39;)
      }, 1000)
      로그인 후 복사

      Transform

      스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

      可以将 Duplex 流视为具有可写流的可读流。两者都是独立的,每个都有独立的内部缓冲区。读写事件独立发生。

                                   Duplex Stream
                                ------------------|
                          Read  <-----               External Source
                  You           ------------------|  
                          Write ----->               External Sink
                                ------------------|
      로그인 후 복사

      Transform 流是双工的,其中读写以因果关系进行。双工流的端点通过某种转换链接。读取要求发生写入。

                                       Transform Stream
                                 --------------|--------------
                  You     Write  ---->                   ---->  Read  You
                                 --------------|--------------
      로그인 후 복사

      对于创建 Transform 流,最重要的是要实现 _transform 方法而不是 _write 或者 _read_transform 中对可写流写入的数据做处理(消费)然后为可读流生产数据。

      转换流还经常会实现一个 `_flush` 方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的
      로그인 후 복사
      const { write } = require(&#39;fs&#39;)
      const { Transform, PassThrough } = require(&#39;stream&#39;)
      
      const reurce = &#39;1312123213124341234213423428354816273513461891468186499126412&#39;
      
      const transform = new Transform({
          highWaterMark: 10,
          transform(chunk ,encoding, callback){// 转换数据,调用push将转换结果加入缓存池
              this.push(chunk.toString().replace(&#39;1&#39;, &#39;@&#39;))
              callback()
          },
          flush(callback){// end触发前执行
              this.push(&#39;<<<&#39;)
              callback()
          }
      })
      
      
      // write 不断写入数据
      let count = 0
      transform.write(&#39;>>>&#39;)
      function productionData() {
          let flag = true
          while (count <= 20 && flag) {
              flag = transform.write(count.toString())
              count++
          }
          if (count > 20) {
              transform.end()
          }
      }
      productionData()
      transform.on(&#39;drain&#39;, productionData)
      
      
      let result = &#39;&#39;
      transform.on(&#39;data&#39;, data=>{
          result += data.toString()
      })
      transform.on(&#39;end&#39;, ()=>{
          console.log(result)
          // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<<
      })
      로그인 후 복사

      Pipe

      管道是将上一个程序的输出作为下一个程序的输入,这是管道在 Linux 中管道的作用。NodeJs 中的管道其实也类似,它管道用于连接两个流,上游的流的输出会作为下游的流的输入。

      스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

      管道 sourec.pipe(dest, options) 要求 sourec 是可读的,dest是可写的。其返回值是 dest。

      对于处于管道中间的流既是下一个流的上游也是上一个流的下游,所以其需要时一个可读可写的双工流,一般我们会使用转换流来作为管道中间的流。

      https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33

      Stream.prototype.pipe = function(dest, options) {
        const source = this;
      
        function ondata(chunk) {
          if (dest.writable && dest.write(chunk) === false && source.pause) {
            source.pause();
          }
        }
      
        source.on(&#39;data&#39;, ondata);
      
        function ondrain() {
          if (source.readable && source.resume) {
            source.resume();
          }
        }
      
        dest.on(&#39;drain&#39;, ondrain);
        // ...后面的代码省略
      }
      로그인 후 복사

      pipe 的实现非常清晰,当上游的流发出 data 事件时会调用下游流的 write 方法写入数据,然后立即调用 source.pause() 使得上游变为暂停状态,这主要是为了防止背压。

      当下游的流将数据消费完成后会调用 source.resume() 使上游再次变为流动状态。

      我们实现一个将 data 文件中所有 1 替换为 @ 然后输出到 result 文件到管道。

      const { Transform } = require(&#39;stream&#39;)
      const { createReadStream, createWriteStream } = require(&#39;fs&#39;)
      
      // 一个位于管道中的转换流
      function createTransformStream(){
          return new Transform({
              transform(chunk, encoding, callback){
                  this.push(chunk.toString().replace(/1/g, &#39;@&#39;))
                  callback()
              }
          })
      }
      createReadStream(&#39;./data&#39;)
      .pipe(createTransformStream())
      .pipe(createWriteStream(&#39;./result&#39;))
      로그인 후 복사

      在管道中只存在两个流时,其功能和转换流有点类似,都是将一个可读流与一个可写流串联起来,但是管道可以串联多个流。

      原文地址:https://juejin.cn/post/7077511716564631566

      作者:月夕

      更多node相关知识,请访问:nodejs 教程

      위 내용은 스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

      본 웹사이트의 성명
      본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

      핫 AI 도구

      Undresser.AI Undress

      Undresser.AI Undress

      사실적인 누드 사진을 만들기 위한 AI 기반 앱

      AI Clothes Remover

      AI Clothes Remover

      사진에서 옷을 제거하는 온라인 AI 도구입니다.

      Undress AI Tool

      Undress AI Tool

      무료로 이미지를 벗다

      Clothoff.io

      Clothoff.io

      AI 옷 제거제

      Video Face Swap

      Video Face Swap

      완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

      뜨거운 도구

      메모장++7.3.1

      메모장++7.3.1

      사용하기 쉬운 무료 코드 편집기

      SublimeText3 중국어 버전

      SublimeText3 중국어 버전

      중국어 버전, 사용하기 매우 쉽습니다.

      스튜디오 13.0.1 보내기

      스튜디오 13.0.1 보내기

      강력한 PHP 통합 개발 환경

      드림위버 CS6

      드림위버 CS6

      시각적 웹 개발 도구

      SublimeText3 Mac 버전

      SublimeText3 Mac 버전

      신 수준의 코드 편집 소프트웨어(SublimeText3)

      nodejs는 백엔드 프레임워크인가요? nodejs는 백엔드 프레임워크인가요? Apr 21, 2024 am 05:09 AM

      Node.js는 고성능, 확장성, 크로스 플랫폼 지원, 풍부한 생태계, 개발 용이성 등의 기능을 제공하므로 백엔드 프레임워크로 사용할 수 있습니다.

      nodejs를 mysql 데이터베이스에 연결하는 방법 nodejs를 mysql 데이터베이스에 연결하는 방법 Apr 21, 2024 am 06:13 AM

      MySQL 데이터베이스에 연결하려면 다음 단계를 따라야 합니다. mysql2 드라이버를 설치합니다. mysql2.createConnection()을 사용하여 호스트 주소, 포트, 사용자 이름, 비밀번호 및 데이터베이스 이름이 포함된 연결 개체를 만듭니다. 쿼리를 수행하려면 Connection.query()를 사용하세요. 마지막으로 Connection.end()를 사용하여 연결을 종료합니다.

      nodejs의 전역 변수는 무엇입니까 nodejs의 전역 변수는 무엇입니까 Apr 21, 2024 am 04:54 AM

      Node.js에는 다음과 같은 전역 변수가 존재합니다. 전역 개체: 전역 핵심 모듈: 프로세스, 콘솔, 필수 런타임 환경 변수: __dirname, __filename, __line, __column 상수: undefine, null, NaN, Infinity, -Infinity

      nodejs 설치 디렉토리에 있는 npm과 npm.cmd 파일의 차이점은 무엇입니까? nodejs 설치 디렉토리에 있는 npm과 npm.cmd 파일의 차이점은 무엇입니까? Apr 21, 2024 am 05:18 AM

      Node.js 설치 디렉터리에는 npm과 npm.cmd라는 두 가지 npm 관련 파일이 있습니다. 차이점은 다음과 같습니다. 확장자가 다릅니다. npm은 실행 파일이고 npm.cmd는 명령 창 바로 가기입니다. Windows 사용자: npm.cmd는 명령 프롬프트에서 사용할 수 있으며, npm은 명령줄에서만 실행할 수 있습니다. 호환성: npm.cmd는 Windows 시스템에만 해당되며 npm은 크로스 플랫폼에서 사용할 수 있습니다. 사용 권장사항: Windows 사용자는 npm.cmd를 사용하고, 기타 운영 체제는 npm을 사용합니다.

      PI 노드 교육 : PI 노드 란 무엇입니까? Pi 노드를 설치하고 설정하는 방법은 무엇입니까? PI 노드 교육 : PI 노드 란 무엇입니까? Pi 노드를 설치하고 설정하는 방법은 무엇입니까? Mar 05, 2025 pm 05:57 PM

      Pinetwork 노드에 대한 자세한 설명 및 설치 안내서이 기사에서는 Pinetwork Ecosystem을 자세히 소개합니다. Pi 노드, Pinetwork 생태계의 주요 역할을 수행하고 설치 및 구성을위한 전체 단계를 제공합니다. Pinetwork 블록 체인 테스트 네트워크가 출시 된 후, PI 노드는 다가오는 주요 네트워크 릴리스를 준비하여 테스트에 적극적으로 참여하는 많은 개척자들의 중요한 부분이되었습니다. 아직 Pinetwork를 모른다면 Picoin이 무엇인지 참조하십시오. 리스팅 가격은 얼마입니까? PI 사용, 광업 및 보안 분석. Pinetwork 란 무엇입니까? Pinetwork 프로젝트는 2019 년에 시작되었으며 독점적 인 Cryptocurrency Pi Coin을 소유하고 있습니다. 이 프로젝트는 모든 사람이 참여할 수있는 사람을 만드는 것을 목표로합니다.

      nodejs와 java 사이에 큰 차이가 있나요? nodejs와 java 사이에 큰 차이가 있나요? Apr 21, 2024 am 06:12 AM

      Node.js와 Java의 주요 차이점은 디자인과 기능입니다. 이벤트 중심 대 스레드 중심: Node.js는 이벤트 중심이고 Java는 스레드 중심입니다. 단일 스레드 대 다중 스레드: Node.js는 단일 스레드 이벤트 루프를 사용하고 Java는 다중 스레드 아키텍처를 사용합니다. 런타임 환경: Node.js는 V8 JavaScript 엔진에서 실행되는 반면 Java는 JVM에서 실행됩니다. 구문: Node.js는 JavaScript 구문을 사용하고 Java는 Java 구문을 사용합니다. 목적: Node.js는 I/O 집약적인 작업에 적합한 반면, Java는 대규모 엔터프라이즈 애플리케이션에 적합합니다.

      nodejs는 백엔드 개발 언어인가요? nodejs는 백엔드 개발 언어인가요? Apr 21, 2024 am 05:09 AM

      예, Node.js는 백엔드 개발 언어입니다. 서버 측 비즈니스 로직 처리, 데이터베이스 연결 관리, API 제공 등 백엔드 개발에 사용됩니다.

      nodejs 프로젝트를 서버에 배포하는 방법 nodejs 프로젝트를 서버에 배포하는 방법 Apr 21, 2024 am 04:40 AM

      Node.js 프로젝트의 서버 배포 단계: 배포 환경 준비: 서버 액세스 권한 획득, Node.js 설치, Git 저장소 설정. 애플리케이션 빌드: npm run build를 사용하여 배포 가능한 코드와 종속성을 생성합니다. Git 또는 파일 전송 프로토콜을 통해 서버에 코드를 업로드합니다. 종속성 설치: SSH를 서버에 연결하고 npm install을 사용하여 애플리케이션 종속성을 설치합니다. 애플리케이션 시작: node index.js와 같은 명령을 사용하여 애플리케이션을 시작하거나 pm2와 같은 프로세스 관리자를 사용합니다. 역방향 프록시 구성(선택 사항): Nginx 또는 Apache와 같은 역방향 프록시를 사용하여 트래픽을 애플리케이션으로 라우팅합니다.

      See all articles
      는 데이터를 소비합니다. null을 반환하면 현재 데이터가 크기보다 작다는 의미이며, 그렇지 않으면 이번에 소비된 데이터가 반환됩니다. size를 넘기지 못한다면 캐시풀에 있는 모든 데이터를 소모한다는 뜻입니다