Node.js에서 스트림 판독 가능 스트림 사용

hzc
풀어 주다: 2020-06-17 09:28:12
앞으로
1983명이 탐색했습니다.

읽기 가능한 스트림은 프로그램 소비를 위한 데이터를 생성하는 데 사용되는 스트림입니다. 일반적인 데이터 생성 방법에는 디스크 파일 읽기, 네트워크 요청 콘텐츠 읽기 등이 포함됩니다. 스트림이 무엇인지에 대한 이전 예를 살펴보세요.

const rs = fs.createReadStream(filePath);
로그인 후 복사

rs는 읽기 가능한 스트림이고 데이터 생성 방법은 디스크 파일을 읽는 것입니다. 콘솔 process.stdin은 읽기 가능한 스트림이기도 합니다.

process.stdin.pipe(process.stdout);
로그인 후 복사

간단한 문장으로 콘솔 입력을 인쇄할 수 있습니다. process.stdin이 데이터를 생성하는 방식은 콘솔에서 사용자 입력을 읽는 것입니다.

읽기 가능한 스트림의 정의를 다시 살펴보겠습니다.

읽기 가능한 스트림은 프로그램 소비를 위한 데이터를 생성하는 스트림입니다.

사용자 정의 읽기 가능한 스트림


시스템에서 제공하는

fs.CreateReadStream

외에도 gulp나 Vinyl-fs

gulp.src(['*.js', 'dist/**/*.scss'])
로그인 후 복사

에서 제공하는 src 메소드를 사용할 때 읽기 가능한 스트림을 사용할 수도 있습니다. 어떤 방식으로든 사용하고 싶은 경우 데이터는 특정 방식으로 생성되어 프로그램에 제공되어 소비됩니다. 그러면 변경은 어떻게 시작되나요? stream 스트림 모듈의

readable

    클래스
  1. honerit

    _Read
  2. 메서드를 호출하려면 생성 된 데이터를 큐에 넣으려면
  3. this.push

    를 호출하십시오. to be read

    Readable 클래스는 읽기 가능한 스트림에 대한 대부분의 작업을 이미 완료했습니다. 이를 상속한 다음 _read 메서드에 데이터를 생성하는 메서드를 작성하여 사용자 정의 읽기 가능한 스트림을 구현하기만 하면 됩니다.

    예: 100밀리초마다 난수를 생성하는 스트림 구현(별로 유용하지 않음)
  4. const Readable = require('stream').Readable;
    class RandomNumberStream extends Readable {
        constructor(max) {
            super()
        }
        _read() {
            const ctx = this;
            setTimeout(() => {
                const randomNumber = parseInt(Math.random() * 10000);
                // 只能 push 字符串或 Buffer,为了方便显示打一个回车
                ctx.push(`${randomNumber}\n`);
            }, 100);
        }
    }
    module.exports = RandomNumberStream;
    로그인 후 복사
코드의 클래스 상속 부분은 매우 간단합니다. 주로 _read 메서드의 구현을 살펴보세요. 몇 가지 가치가 있는 것들이 있습니다. noting

기본적으로 Readable 클래스에 _read 메소드 구현이 있지만 우리가 하는 일은 재정의하고 다시 쓰는 것입니다. _read 메소드에는 데이터 양을 지정하는 데 사용되는 매개변수 크기가 있습니다. 읽어서 read 메소드로 반환하지만 이는 단지 참조일 뿐입니다. 데이터, 많은 구현에서는 이 매개변수를 무시합니다. 여기서도 나중에 자세히 설명하겠습니다. 버퍼 개념은 나중에 언급할 예정입니다. 수도관에 압착되어 소비될 수 있다는 점은 일시적으로 이해됩니다.

push의 내용은 숫자가 아닌 문자열 또는 버퍼일 수 있습니다.

    실행해 보면 효과를 볼 수 있습니다
  1. const RandomNumberStream = require('./RandomNumberStream');
    const rns = new RandomNumberStream();
    rns.pipe(process.stdout);
    로그인 후 복사
  2. 이렇게 하면 콘솔에 숫자가 계속 표시되고, 난수를 생성하는 읽기 가능한 스트림이 나오는 것을 볼 수 있습니다. 아직 해결해야 할 몇 가지 작은 문제가 있습니다
  3. 중지 방법
  4. 100밀리초마다 숫자를 버퍼에 푸시합니다. 따라서 로컬 파일 읽기가 항상 완료될 때와 마찬가지로 중지하고 데이터가 있음을 표시하는 방법도 있습니다. 읽다?
  5. 버퍼에 null을 푸시하면 소비자가 필요한 난수 수를 정의할 수 있도록 코드를 수정할 수 있습니다.
  6. const Readable = require('stream').Readable;
    class RandomNumberStream extends Readable {
        constructor(max) {
            super()
            this.max = max;
        }
        _read() {
            const ctx = this;
            setTimeout(() => {
                if (ctx.max) {
                    const randomNumber = parseInt(Math.random() * 10000);
                    // 只能 push 字符串或 Buffer,为了方便显示打一个回车
                    ctx.push(`${randomNumber}\n`);
                    ctx.max -= 1;
                } else {
                    ctx.push(null);
                }
            }, 100);
        }
    }
    module.exports = RandomNumberStream;
    로그인 후 복사
  7. 코드는 최대 식별자를 사용하여 소비자가 인스턴스화된 문자 수를 지정할 수 있도록 합니다. 시간만 지정하세요
const RandomNumberStream = require('./');
const rns = new RandomNumberStream(5);
rns.pipe(process.stdout);
로그인 후 복사

이렇게 하면 콘솔이 5자만 인쇄한다는 것을 알 수 있습니다

setInterval 대신 setTimeout이 필요한 이유

주의 깊은 학생들은 100밀리초마다 난수를 생성하는 것이 setInterval이 아닌 setTimeout이라는 것을 알 수 있습니다. 왜 지연되고 반복되지 않는데 결과는 정확합니까?

이를 위해서는 스트림이 작동하는 두 가지 방식을 이해해야 합니다.

플로우 모드: 기본 시스템에서 데이터를 읽고 가능한 한 빨리 애플리케이션에 제공합니다.

일시 중지 모드: read() 메서드를 명시적으로 호출해야 합니다. read 여러 데이터 블록

스트림은 기본적으로 일시 중지 모드에 있습니다. 이는 프로그램이 read() 메서드를 명시적으로 호출해야 함을 의미합니다. 그러나 위의 예에서는 스트림이 전환되기 때문에 호출하지 않고도 데이터를 얻을 수 있습니다. 파이프() 메소드가 플로우 모드가 되어 데이터를 읽을 때까지 _read() 메소드가 자동으로 반복 호출되므로 매번 _read() 메소드에서 데이터를 한 번만 읽으면 됩니다

흐름 모드와 일시 중지 모드 간 전환

다음 방법을 사용하여 스트림을 기본 일시 중지 모드에서 흐름 모드로 전환할 수 있습니다.

  1. 데이터 이벤트 리스너를 추가하여 데이터 모니터링을 시작합니다.
  2. 이력서() 메서드를 호출하여 데이터 흐름 시작
데이터 전송을 위해 파이프() 메서드 호출 또 다른 쓰기 가능한 스트림 받기

흐름 모드에서 일시 중지 모드로 전환하는 방법에는 두 가지가 있습니다.

스트림에 파이프()가 없으면 다음을 호출합니다. Pause() 메서드는 stream
  1. pipe()를 일시 중지하고 제거할 수 있습니다. 모든 데이터 이벤트를 모니터링하려면 unpipe() 메서드를 호출하세요
  2. data event
  3. pipe() 메서드를 사용한 후 데이터는 쓰기 가능 상태로 들어갑니다. 읽을 수 있는 스트림에서 스트리밍되지만 사용자에게는 블랙박스처럼 보입니다. 데이터의 흐름 방향은 어떻게 되나요? 흐름 모드와 일시 정지 모드 사이를 전환할 때 두 가지 중요한 용어가 있습니다

흐름 모드에 해당하는 데이터 이벤트
  1. 일시 중지 모드에 해당하는 read() 메서드
  2. 이 두 가지 메커니즘이 프로그램이 데이터 흐름을 구동할 수 있는 이유입니다. 흐름 모드 데이터 이벤트를 살펴보겠습니다. 읽기 가능한 스트림의 데이터 이벤트가 모니터링되면 스트림은 위의 스트림을 호출하는 코드를 다시 작성할 수 있습니다.
    const RandomNumberStream = require('./RandomNumberStream');
    const rns = new RandomNumberStream(5);
    rns.on('data', chunk => {
      console.log(chunk);
    });
    로그인 후 복사

    这样可以看到控制台打印出了类似下面的结果

    <Buffer 39 35 37 0a>
    <Buffer 31 30 35 37 0a>
    <Buffer 38 35 31 30 0a>
    <Buffer 33 30 35 35 0a>
    <Buffer 34 36 34 32 0a>
    로그인 후 복사

    当可读流生产出可供消费的数据后就会触发 data 事件,data 事件监听器绑定后,数据会被尽可能地传递。data 事件的监听器可以在第一个参数收到可读流传递过来的 Buffer 数据,这也就是控制台打印的 chunk,如果想显示为数字,可以调用 Buffer 的 toString() 方法

    当数据处理完成后还会触发一个

    end

    事件,因为流的处理不是同步调用,所以如果希望完事后做一些事情就需要监听这个事件,在代码最后追加一句:

    rns.on('end', () => {
      console.log('done');
    });复制代码
    로그인 후 복사

    这样可以在数据接收完了显示 done ,当然数据处理过程中出现了错误会触发 error 事件,可以监听做异常处理:

    rns.on('error', (err) => {
      console.log(err);
    });复制代码
    로그인 후 복사

    read(size)

    流在暂停模式下需要程序显式调用 read() 方法才能得到数据,read() 方法会从内部缓冲区中拉取并返回若干数据,当没有更多可用数据时,会返回null

    使用 read() 方法读取数据时,如果传入了 size 参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据

    现在有一个矛盾,在流动模式下流生产出了数据,然后触发 data 事件通知给程序,这样很方便。在暂停模式下需要程序去读取,那么就有一种可能是读取的时候还没生产好,如果使用轮询的方式未免效率有些低

    NodeJS 提供了一个

    readable的事件,事件在可读流准备好数据的时候触发,也就是先监听这个事件,收到通知有数据了再去读取就好了:

    const rns = new RandomNumberStream(5);
    rns.on('readable', () => {
      let chunk;
      while((chunk = rns.read()) !== null){
        console.log(chunk);
      }
    });
    로그인 후 복사

    这样可以读取到数据,值得注意的一点是并不是每次调用 read() 方法都可以返回数据,前面提到了如果可用的数据没有达到 size 那么返回 null,所以在程序中加了个判断

    数据会不会漏掉

    const stream = fs.createReadStream('/dev/input/event0');
    stream.on('readable', callback);复制代码
    로그인 후 복사

    在流动模式会不会有这样的问题:可读流在创建好的时候就生产数据了,如果在绑定 readable 事件之前就生产了某些数据,触发了 readable 事件,在极端情况下会造成数据丢失吗?

    事实并不会,按照 NodeJS event loop 程序创建流和调用事件监听在一个事件队列里面,生产数据和事件监听都是异步操作,而 on 监听事件使用了 process.nextTick 会保证在数据生产之前被绑定好,相关知识可以看定时器章节中对 event loop 的解读

    到这里可能对 data事件、readable事件触发时机, read() 方法每次读多少数据,什么时候返回 null 还有一定的疑问,在后续可写流章节会在 back pressure 部分结合源码介绍相关机制

    推荐教程:《JS教程

위 내용은 Node.js에서 스트림 판독 가능 스트림 사용의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:juejin.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿