스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

青灯夜游
풀어 주다: 2022-05-16 20:56:23
앞으로
2061명이 탐색했습니다.

플로우란 무엇인가요? 흐름을 이해하는 방법? 다음 기사는 Node의 스트림에 대한 심층적인 이해를 제공할 것입니다. 도움이 되기를 바랍니다!

스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

저자는 최근 개발에서 파이프 기능을 자주 사용했습니다. 스트림을 위한 파이프라인이라는 것만 알 뿐 어떻게 작동하는지 모르겠습니다. 그래서 알아보겠다는 마음으로 간단히 시작합니다. 내가 읽은 지식과 소스 코드를 기사로 정리하여 모두와 공유합니다.

Stream은 Nodejs의 매우 기본적인 개념입니다. 많은 기본 모듈이 스트림을 기반으로 구현되며 매우 중요한 역할을 합니다. 동시에 흐름은 이해하기 매우 어려운 개념이기도 합니다. 이는 주로 관련 문서가 부족하기 때문입니다. 다행히도 NodeJ 초보자의 경우 이 개념을 완전히 익히기 전에 흐름을 이해하는 데 많은 시간이 걸립니다. 대부분의 NodeJ에서는 사용자의 경우 웹 애플리케이션을 개발하는 데만 사용됩니다. 스트림에 대한 이해가 부족해도 사용에 영향을 미치지 않습니다. 그러나 스트림을 이해하면 NodeJ의 다른 모듈을 더 잘 이해할 수 있으며 경우에 따라 스트림을 사용하여 데이터를 처리하면 더 나은 결과를 얻을 수 있습니다. [관련 튜토리얼 추천 : nodejs 동영상 튜토리얼]

스트림 이해 방법

  • 스트림 사용자의 경우 스트림을 배열로 간주할 수 있으며, 스트림을 가져오고(소비하고) 쓰는 데만 집중하면 됩니다. 그것(생산)은 괜찮습니다.

  • 스트림 개발자의 경우(스트림 모듈을 사용하여 새 인스턴스 생성) 스트림에서 일부 메서드를 구현하는 방법에 중점을 둡니다. 일반적으로 대상 리소스가 누구인지, 대상 리소스를 작동하는 방법이라는 두 가지 사항에 중점을 둡니다. . 일단 결정되면 스트림의 다양한 상태와 이벤트에 따라 대상 리소스를 운영해야 합니다

캐시 풀

NodeJ의 모든 스트림에는 버퍼 풀이 있습니다. 버퍼 풀의 목적은 스트림의 효율성을 높이는 것입니다. 데이터가 생산되고 소비될 때 시간이 걸릴 때 미리 데이터를 생산하여 다음 소비 전에 버퍼 풀에 저장할 수 있습니다. 그러나 버퍼 풀이 항상 사용되는 것은 아닙니다. 예를 들어 캐시 풀이 비어 있으면 생성 후 데이터가 캐시 풀에 저장되지 않고 직접 소비됩니다. .

데이터 생산 속도가 데이터 소비 속도보다 빠르면 초과된 데이터가 어딘가에서 기다리고 있을 것입니다. 데이터 생성 속도가 처리 데이터 소비 속도보다 느린 경우, 데이터는 어딘가에 일정량 축적되어 소비됩니다. (개발자는 데이터의 생산과 소비의 속도를 통제할 수 없으며, 어떤 시간에 데이터를 생산하거나 소비하려고만 할 수 있습니다.)

데이터가 기다리고, 축적되고, 발생하는 그 곳. 버퍼풀 입니다. 버퍼 풀은 일반적으로 컴퓨터의 RAM(메모리)에 위치합니다.

일반적인 버퍼의 예를 들자면, 온라인 동영상을 볼 때 인터넷 속도가 빠르면 버퍼는 항상 즉시 채워진 다음 재생을 위해 시스템으로 전송되고 다음 동영상이 즉시 버퍼링됩니다. 시청하는 동안 지연이 발생하지 않습니다. 네트워크 속도가 매우 느린 경우 로딩이 표시되며, 이는 버퍼가 채워지고 있음을 나타내는 것입니다. 채우기가 완료되면 데이터가 시스템으로 전송되며 이 비디오를 볼 수 있습니다.

NodeJs 스트림의 캐시 풀은 버퍼 연결 목록입니다. 캐시 풀에 데이터를 추가할 때마다 버퍼 노드가 다시 생성되어 연결 목록 끝에 삽입됩니다.

EventEmitter

NodeJs의 Stream은 EventEmitter를 구현하는 추상 인터페이스이므로 먼저 EventEmitter를 간략하게 소개하겠습니다.

EventEmitter는 이벤트 게시 및 구독 기능을 구현하는 클래스입니다. 일반적으로 사용되는 몇 가지 방법(on, Once, off, Emit)은 누구에게나 친숙할 것으로 생각되므로 하나씩 소개하지 않겠습니다.

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

// 为 eventA 事件绑定处理函数
eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});

// 为 eventB 事件绑定处理函数
eventEmitter.on('eventB', () => {
    console.log('eventB active 1');
});

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});

// 触发 eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2
로그인 후 복사

EventEmitter에는 newListenerremoveListener라는 두 가지 이벤트가 있다는 점에 주목할 가치가 있습니다. newListener(eventEmitter.emit('newListener')) 함수가 트리거됩니다. 처리 함수가 제거되면 removeListener가 동일한 방식으로 트리거됩니다. EventEmitter 有两个叫做 newListenerremoveListener 的事件,当你向一个事件对象中添加任何事件监听函数后,都会触发 newListener(eventEmitter.emit('newListener')),当一个处理函数被移除时同理会触发 removeListener

还需要注意的是, once 绑定的处理函数只会执行一次,removeListener 将在其执行前被触发,这意味着 once

한 번 바인딩된 핸들러 함수는 한 번만 실행되며 removeListener는 실행 전에 트리거됩니다. 즉, 한 번으로 바인딩된 리스너는 해당 기능은 트리거되기 전에 먼저 제거됩니다.

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

eventEmitter.on('newListener', (event, listener)=>{
    console.log('newListener', event, listener)
})

eventEmitter.on('removeListener', (event, listener) => {
    console.log('removeListener', event, listener)
})
//newListener removeListener[Function(anonymous)]


eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});
//newListener eventA [Function (anonymous)]

function listenerB() { console.log('eventB active 1'); }
eventEmitter.on('eventB', listenerB);
// newListener eventB [Function (anonymous)]

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});
// newListener eventA [Function (anonymous)]

eventEmitter.emit('eventA')
// eventA active 1
// removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] }
// eventA active 2

eventEmitter.off('eventB', listenerB)
// removeListener eventB[Function: listenerB]
로그인 후 복사

하지만 나중에 보게 될 내용에는 이것이 중요하지 않습니다.

Stream

Stream은 Node.js에서 스트리밍 데이터를 처리하기 위한 추상 인터페이스입니다. 스트림은 실제 인터페이스가 아니라 모든 스트림을 가리키는 일반적인 용어입니다. 실제 인터페이스는 ReadableStream, WritableStream 및 ReadWriteStream입니다.

interface ReadableStream extends EventEmitter {
    readable: boolean;
    read(size?: number): string | Buffer;
    setEncoding(encoding: BufferEncoding): this;
    pause(): this;
    resume(): this;
    isPaused(): boolean;
    pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;
    unpipe(destination?: WritableStream): this;
    unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;
    wrap(oldStream: ReadableStream): this;
    [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}

interface WritableStream extends EventEmitter {
    writable: boolean;
    write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;
    write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;
    end(cb?: () => void): this;
    end(data: string | Uint8Array, cb?: () => void): this;
    end(str: string, encoding?: BufferEncoding, cb?: () => void): this;
}

interface ReadWriteStream extends ReadableStream, WritableStream { }
로그인 후 복사

ReadableStream과 WritableStream은 모두 EventEmitter 클래스를 상속하는 인터페이스임을 알 수 있습니다(ts의 인터페이스는 병합 유형이므로 클래스를 상속할 수 있습니다).

위 인터페이스에 해당하는 구현 클래스는 각각 Readable, Writable 및 Duplex입니다.

NodeJ에는 4가지 유형의 스트림이 있습니다. 🎜
  • 읽기 가능하고 읽기 가능한 스트림(ReadableStream 구현)
  • 쓰기 가능한 쓰기 가능한 스트림(WritableStream 구현)
  • 이중 읽기 및 쓰기 가능 스트림(Readable을 상속한 후 WritableStream 구현)
  • 변환 변환 스트림(Duplex 상속)

배압 문제

The 디스크에 데이터를 쓰는 속도는 메모리의 속도보다 훨씬 낮습니다. 우리는 메모리와 디스크 사이에 "파이프"가 있고 "파이프"는 메모리의 데이터가 흐르는 것을 상상합니다. 파이프가 막히면 메모리에 데이터 역압이 발생하고 데이터가 메모리에 축적되어 리소스를 차지하게 됩니다.

스트림이란 무엇입니까? Nodejs에서 스트림을 이해하는 방법

NodeJs Stream의 솔루션은 데이터 양이 이 float에 도달하면 각 스트림의 캐시 풀(즉, 그림의 쓰기 대기열)에 대해 float 값을 설정하는 것입니다. 값, 캐시로 이동 풀 데이터를 다시 push하면 false를 반환합니다. 이는 현재 스트림의 캐시 풀 콘텐츠가 float 값에 도달했으며 더 이상 데이터가 없을 것으로 예상됨을 의미합니다. 이때 캐싱을 방지하기 위해 데이터 생성을 즉시 중단해야 합니다. 缓存池(就是图中写入队列)设置一个浮标值,当其中数据量达到这个浮标值后,往缓存池再次 push 数据时就会返回 false,表示当前流中缓存池内容已经达到浮标值,不希望再有数据写入了,这时我们应该立即停止数据的生产,防止缓存池过大产生背压。

Readable

可读流(Readable)是流的一种类型,他有两种模式三种状态

两种读取模式:

  • 流动模式:数据会从底层系统读取写入到缓冲区,当缓冲区被写满后自动通过 EventEmitter 尽快的将数据传递给所注册的事件处理程序中

  • 暂停模式:在这种模式下将不会主动触发 EventEmitter 传输数据,必须显示的调用 Readable.read() 方法来从缓冲区中读取数据,read 会触发响应到 EventEmitter 事件。

三种状态:

  • readableFlowing === null(初始状态)

  • readableFlowing === false(暂停模式)

  • readableFlowing === true(流动模式)

初始时流的 readable.readableFlowingnull

添加data事件后变为 true 。调用 pause()unpipe()、或接收到背压或者添加 readable 事件,则 readableFlowing 会被设为 false ,在这个状态下,为 data 事件绑定监听器不会使 readableFlowing 切换到 true

调用 resume() 可以让可读流的 readableFlowing

읽기 가능

읽기 가능 스트림(Readable)은 스트림 유형입니다. 여기에는 두 가지 모드와 세 가지 상태가 있습니다.

두 가지 읽기 모드: