> 웹 프론트엔드 > JS 튜토리얼 > 스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

青灯夜游
풀어 주다: 2022-03-04 17:11:45
앞으로
2030명이 탐색했습니다.

이 기사에서는 Node.js 스트림 소스 코드의 해석을 안내하고 Node 읽을 수 있는 스트림은 물론 기본 원칙, 사용법 및 작동 메커니즘에 대한 심층적인 이해를 제공할 것입니다. 모두에게 도움이 되세요!

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

1. 기본 개념

1.1. 스트림의 역사적 진화

스트림은 Nodejs에만 있는 개념이 아닙니다. 이는 수십 년 전에 Unix 운영 체제에 도입되었으며 프로그램은 파이프 연산자(|)를 통해 스트림에서 서로 상호 작용할 수 있습니다.

파이프 연산자(|)는 Unix 시스템 기반의 MacOS 및 Linux에서 사용할 수 있으며 연산자 왼쪽의 프로세스 출력을 오른쪽의 입력으로 변환할 수 있습니다.

Node에서는 기존의 readFile을 사용하여 파일을 읽으면 파일을 처음부터 끝까지 메모리로 읽어 들입니다. 모든 내용을 읽은 후에는 메모리에 로드된 파일의 내용이 통합됩니다. 다루다.

이 작업에는 두 가지 단점이 있습니다.

  • 메모리: 많은 메모리를 차지합니다.

  • 시간: 데이터 처리를 시작하기 전에 데이터의 전체 페이로드가 로드될 때까지 기다려야 합니다.

위 문제를 해결하기 위해 Node.js는 스트림 개념을 모방하고 구현합니다. Node.js 스트림에는 모두 Node.js의 EventEmitter 인스턴스입니다.

  • Readable Stream(읽기 가능한 스트림)

  • Writable Stream

  • Readable and Writable Full-Duplex Stream(Duplex Stream)

  • Transform Stream

이 부분의 콘텐츠를 자세히 알아보려면, 단계별로 Node.js의 스트림 개념을 이해하고, 소스 코드 부분이 상대적으로 복잡하기 때문에 이 부분을 읽을 수 있는 스트림부터 학습하기로 결정했습니다.

1.2. 스트림이란 무엇인가요?

스트림은 추상적인 데이터 구조이자 데이터의 집합입니다. 여기에 저장되는 데이터 유형은 다음과 같습니다(objectMode ===의 경우). false ):

  • string
  • Buffer

스트림을 액체와 마찬가지로 이러한 데이터의 모음으로 생각할 수 있습니다. 먼저 이러한 액체를 컨테이너(스트림의 내부 버퍼 BufferList)에 저장하고 기다립니다. 해당 이벤트가 발생할 때까지 우리는 내부의 액체를 파이프에 붓고 다른 사람들에게 파이프 반대편에 있는 자신의 용기를 사용하여 내부의 액체를 집어 처리하도록 알립니다.

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

1.3. 읽기 가능한 스트림이란?

읽기 가능한 스트림은 두 가지 모드와 세 가지 상태를 가집니다.

두 가지 읽기 모드:

  • 흐름 모드: 데이터를 읽습니다.

  • Pause 모드: 이 모드에서는 데이터를 읽을 수 없으며 표시해야 합니다. Stream.read() 메서드를 호출하여 데이터를 읽습니다. 스트림

세 가지 상태:

  • readableFlowing === null: Stream.pipe() 및 Stream.resume을 호출하면 상태가 true로 변경되고 데이터 생성이 시작되며 적극적으로 트리거됩니다. events

  • readFlowing === false: 이때 데이터의 흐름은 중단되지만, 데이터 생성은 중단되지 않으므로 데이터 백로그가 발생합니다

  • readableFlowing = == TRUE: 정상 및 소비되는 데이터

2. 기본 원칙

2.1. 내부 상태 정의(ReadableState)

ReadableState

_readableState: ReadableState {
  objectMode: false, // 操作除了string、Buffer、null之外的其他类型的数据需要把这个模式打开
  highWaterMark: 16384, // 水位限制,1024 \* 16,默认16kb,超过这个限制则会停止调用\_read()读数据到buffer中
  buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer链表,用于保存数据
  length: 0, // 整个可读流数据的大小,如果是objectMode则与buffer.length相等
  pipes: [], // 保存监听了该可读流的所有管道队列
  flowing: null, // 可独流的状态 null、false、true
  ended: false, // 所有数据消费完毕
  endEmitted: false, // 结束事件收否已发送
  reading: false, // 是否正在读取数据
  constructed: true, // 流在构造好之前或者失败之前,不能被销毁
  sync: true, // 是否同步触发'readable'/'data'事件,或是等到下一个tick
  needReadable: false, // 是否需要发送readable事件
  emittedReadable: false, // readable事件发送完毕
  readableListening: false, // 是否有readable监听事件
  resumeScheduled: false, // 是否调用过resume方法
  errorEmitted: false, // 错误事件已发送
  emitClose: true, // 流销毁时,是否发送close事件
  autoDestroy: true, // 自动销毁,在'end'事件触发后被调用
  destroyed: false, // 流是否已经被销毁
  errored: null, // 标识流是否报错
  closed: false, // 流是否已经关闭
  closeEmitted: false, // close事件是否已发送
  defaultEncoding: 'utf8', // 默认字符编码格式
  awaitDrainWriters: null, // 指向监听了&#39;drain&#39;事件的writer引用,类型为null、Writable、Set<Writable>
  multiAwaitDrain: false, // 是否有多个writer等待drain事件 
  readingMore: false, // 是否可以读取更多数据
  dataEmitted: false, // 数据已发送
  decoder: null, // 解码器
  encoding: null, // 编码器
  [Symbol(kPaused)]: null
},
로그인 후 복사

2.2.

BufferList는 사용되는 컨테이너입니다. 스트림이 내부 데이터를 저장하기 위해 연결 목록 형태로 설계되었으며 헤드(head), 테일(tail), 길이(length)의 세 가지 속성을 갖습니다.

BufferList의 각 노드를 BufferNode로 표현하고, 그 안에 들어있는 Data의 종류는 objectMode에 따라 달라집니다.

이 데이터 구조는 Array.prototype.shift()보다 더 빠르게 헤더 데이터를 얻습니다.

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

2.2.1. 데이터 저장 유형

objectMode === true:

그러면 데이터는 모든 유형이 될 수 있으며 푸시되는 데이터는 무엇이든 저장됩니다

objectMode =true

const Stream = require(&#39;stream&#39;);
const readableStream = new Stream.Readable({
  objectMode: true,
  read() {},
});

readableStream.push({ name: &#39;lisa&#39;});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(true);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(&#39;lisa&#39;);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(666);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(() => {});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(Symbol(1));
console.log(readableStream._readableState.buffer.tail);
readableStream.push(BigInt(123));
console.log(readableStream._readableState.buffer.tail);
로그인 후 복사

실행 결과:

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

objectMode === false:

데이터는 문자열, Buffer 또는 Uint8Array

objectMode=false

const Stream = require(&#39;stream&#39;);

const readableStream = new Stream.Readable({

  objectMode: false,

  read() {},

});

readableStream.push({ name: &#39;lisa&#39;});
로그인 후 복사

실행 결과:

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

2.2.2. 数据存储结构

我们在控制台通过node命令行创建一个可读流,来观察buffer中数据的变化:

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

当然在push数据之前我们需要实现他的_read方法,或者在构造函数的参数中实现read方法:

const Stream = require(&#39;stream&#39;);

const readableStream = new Stream.Readable();

RS._read = function(size) {}
로그인 후 복사

或者

const Stream = require(&#39;stream&#39;);

const readableStream = new Stream.Readable({
  
  read(size) {}

});
로그인 후 복사

经过readableStream.push('abc')操作之后,当前的buffer为:

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

可以看到目前的数据存储了,头尾存储的数据都是字符串'abc'的ascii码,类型为Buffer类型,length表示当前保存的数据的条数而非数据内容的大小。

2.2.3. 相关API

打印一下BufferList的所有方法可以得到:

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

除了join是将BufferList序列化为字符串之外,其他都是对数据的存取操作。

这里就不一一讲解所有的方法了,重点讲一下其中的consume 、_getString和_getBuffer。

2.2.3.1. consume

源码地址:BufferList.consume
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
로그인 후 복사

comsume

// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
  const data = this.head.data;
  if (n < data.length) {
    // `slice` is the same for buffers and strings.
    const slice = data.slice(0, n);
    this.head.data = data.slice(n);
    return slice;
  }
  if (n === data.length) {
    // First chunk is a perfect match.
    return this.shift();
  }
  // Result spans more than one buffer.
  return hasStrings ? this.\_getString(n) : this.\_getBuffer(n);
}
로그인 후 복사

代码一共有三个判断条件:

  • 如果所消耗的数据的字节长度小于链表头节点存储数据的长度,则将头节点的数据取前n字节,并把当前头节点的数据设置为切片之后的数据

  • 如果所消耗的数据恰好等于链表头节点所存储的数据的长度,则直接返回当前头节点的数据

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

  • 如果所消耗的数据的长度大于链表头节点的长度,那么会根据传入的第二个参数进行最后一次判断,判断当前的BufferList底层存储的是string还是Buffer

2.2.3.2. _getBuffer

源码地址:BufferList._getBuffer
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
로그인 후 복사

comsume

// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
  const ret = Buffer.allocUnsafe(n);
  const retLen = n;
  let p = this.head;
  let c = 0;
  do {
    const buf = p.data;
    if (n > buf.length) {
      TypedArrayPrototypeSet(ret, buf, retLen - n);
      n -= buf.length;
    } else {
      if (n === buf.length) {
        TypedArrayPrototypeSet(ret, buf, retLen - n);
        ++c;
        if (p.next)
          this.head = p.next;
        else
          this.head = this.tail = null;
      } else {
       TypedArrayPrototypeSet(ret,
                              new Uint8Array(buf.buffer, buf.byteOffset, n),
                              retLen - n);
        this.head = p;
        p.data = buf.slice(n);
      }
      break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}
로그인 후 복사

总的来说就是循环对链表中的节点进行操作,新建一个Buffer数组用于存储返回的数据。

首先从链表的头节点开始取数据,不断的复制到新建的Buffer中,直到某一个节点的数据大于等于要取的长度减去已经取得的长度。

或者说读到链表的最后一个节点后,都还没有达到要取的长度,那么就返回这个新建的Buffer。

2.2.3.3. _getString

源码地址:BufferList._getString
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
로그인 후 복사

comsume

// Consumes a specified amount of characters from the buffered data.
_getString(n) {
  let ret = &#39;&#39;;
  let p = this.head;
  let c = 0;
  do {
    const str = p.data;
    if (n > str.length) {
    ret += str;
    n -= str.length;
  } else {
    if (n === str.length) {
      ret += str;
      ++c;
      if (p.next)
        this.head = p.next;
      else
        this.head = this.tail = null;
    } else {
      ret += StringPrototypeSlice(str, 0, n);
      this.head = p;
      p.data = StringPrototypeSlice(str, n);
    }
    break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}
로그인 후 복사

对于操作字符串来说和操作Buffer是一样的,也是循环从链表的头部开始读数据,只是进行数据的拷贝存储方面有些差异,还有就是_getString操作返回的数据类型是string类型。

2.3. 为什么可读流是EventEmitter的实例?

对于这个问题而言,首先要了解什么是发布订阅模式,发布订阅模式在大多数API中都有重要的应用,无论是Promise还是Redux,基于发布订阅模式实现的高级API随处可见。

它的优点在于能将事件的相关回调函数存储到队列中,然后在将来的某个时刻通知到对方去处理数据,从而做到关注点分离,生产者只管生产数据和通知消费者,而消费者则只管处理对应的事件及其对应的数据,而Node.js流模式刚好符合这一特点。

那么Node.js流是怎样实现基于EventEmitter创建实例的呢?

这部分源码在这儿:stream/legacy
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
로그인 후 복사

legacy

function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
로그인 후 복사

然后在可读流的源码中有这么几行代码:

这部分源码在这儿:readable
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
로그인 후 복사

legacy

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
로그인 후 복사

首先将Stream的原型对象继承自EventEmitter,这样Stream的所有实例都可以访问到EventEmitter上的方法。

同时通过ObjectSetPrototypeOf(Stream, EE)将EventEmitter上的静态方法也继承过来,并在Stream的构造函数中,借用构造函数EE来实现所有EventEmitter中的属性的继承,然后在可读流里,用同样的的方法实现对Stream类的原型继承和静态属性继承,从而得到:

Readable.prototype.__proto__ === Stream.prototype;

Stream.prototype.__proto__ === EE.prototype

因此:

Readable.prototype.__proto__.__proto__ === EE.prototype

所以捋着可读流的原型链可以找到EventEmitter的原型,实现对EventEmitter的继承

2.4. 相关API的实现

这里会按照源码文档中API的出现顺序来展示,且仅解读其中的核心API实现。

注:此处仅解读Node.js可读流源码中所声明的函数,不包含外部引入的函数定义,同时为了减少篇幅,不会将所有代码都拷贝下来。

Readable.prototype

Stream {
  destroy: [Function: destroy],
  _undestroy: [Function: undestroy],
  _destroy: [Function (anonymous)],
  push: [Function (anonymous)],
  unshift: [Function (anonymous)],
  isPaused: [Function (anonymous)],
  setEncoding: [Function (anonymous)],
  read: [Function (anonymous)],
  _read: [Function (anonymous)],
  pipe: [Function (anonymous)],
  unpipe: [Function (anonymous)],
  on: [Function (anonymous)],
  addListener: [Function (anonymous)],
  removeListener: [Function (anonymous)],
  off: [Function (anonymous)],
  removeAllListeners: [Function (anonymous)],
  resume: [Function (anonymous)],
  pause: [Function (anonymous)],
  wrap: [Function (anonymous)],
  iterator: [Function (anonymous)],
  [Symbol(nodejs.rejection)]: [Function (anonymous)],
  [Symbol(Symbol.asyncIterator)]: [Function (anonymous)]
}
로그인 후 복사

2.4.1. push

readable.push

Readable.prototype.push = function(chunk, encoding) {
  return readableAddChunk(this, chunk, encoding, false);
};
로그인 후 복사

push方法的主要作用就是将数据块通过触发'data'事件传递给下游管道,或者将数据存储到自身的缓冲区中。

以下代码为相关伪代码,仅展示主流程:

readable.push

function readableAddChunk(stream, chunk, encoding, addToFront) {
  const state = stream.\_readableState;
  if (chunk === null) { // push null 流结束信号,之后不能再写入数据
    state.reading = false;
    onEofChunk(stream, state);
  } else if (!state.objectMode) { // 如果不是对象模式
    if (typeof chunk === &#39;string&#39;) {
      chunk = Buffer.from(chunk);
    } else if (chunk instanceof Buffer) { //如果是Buffer
    // 处理一下编码
    } else if (Stream.\_isUint8Array(chunk)) {
      chunk = Stream.\_uint8ArrayToBuffer(chunk);
    } else if (chunk != null) {
      err = new ERR\_INVALID\_ARG\_TYPE(&#39;chunk&#39;, [&#39;string&#39;, &#39;Buffer&#39;, &#39;Uint8Array&#39;], chunk);
    }
  }

  if (state.objectMode || (chunk && chunk.length > 0)) { // 是对象模式或者chunk是Buffer
    // 这里省略几种数据的插入方式的判断
    addChunk(stream, state, chunk, true);
  }
}

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync &&
    stream.listenerCount(&#39;data&#39;) > 0) { // 如果处于流动模式,有监听data的订阅者
      stream.emit(&#39;data&#39;, chunk);
  } else { // 否则保存数据到缓冲区中
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront) {
      state.buffer.unshift(chunk);
    } else {
      state.buffer.push(chunk);
    }
  }
  maybeReadMore(stream, state); // 尝试多读一点数据
}
로그인 후 복사

push操作主要分为对objectMode的判断,不同的类型对传入的数据会做不同的操作:

  • objectMode === false: 将数据(chunk)转换成Buffer
  • objectMode === true: 将数据原封不动的传递给下游

其中addChunk的第一个判断主要是处理Readable处于流动模式、有data监听器、并且缓冲区数据为空时的情况。

这时主要将数据passthrough透传给订阅了data事件的其他程序,否则就将数据保存到缓冲区里面。

2.4.2. read

除去对边界条件的判断、流状态的判断,这个方法主要有两个操作

  • 调用用户实现的_read方法,对执行结果进行处理

  • 从缓冲区buffer中读取数据,并触发'data'事件

readable.read

// 如果read的长度大于hwm,则会重新计算hwm
if (n > state.highWaterMark) {
  state.highWaterMark = computeNewHighWaterMark(n);  
}
// 调用用户实现的\_read方法
try {
  const result = this.\_read(state.highWaterMark);
  if (result != null) {
    const then = result.then;
    if (typeof then === &#39;function&#39;) {
      then.call(
        result,
        nop,
        function(err) {
          errorOrDestroy(this, err);
        });
    }
  }
} catch (err) {
  errorOrDestroy(this, err);
}
로그인 후 복사

如果说用户实现的_read方法返回的是一个promise,则调用这个promise的then方法,将成功和失败的回调传入,便于处理异常情况。

read方法从缓冲区里读区数据的核心代码如下:

readable.read

function fromList(n, state) {
  // nothing buffered.
  if (state.length === 0)
    return null;
  let ret;
  if (state.objectMode)
    ret = state.buffer.shift();
  else if (!n || n >= state.length) { // 处理n为空或者大于缓冲区的长度的情况
    // Read it all, truncate the list.
    if (state.decoder) // 有解码器,则将结果序列化为字符串
      ret = state.buffer.join(&#39;&#39;);
    else if (state.buffer.length === 1) // 只有一个数据,返回头节点数据
      ret = state.buffer.first();
    else // 将所有数据存储到一个Buffer中
      ret = state.buffer.concat(state.length);
    state.buffer.clear(); // 清空缓冲区
  } else {
    // 处理读取长度小于缓冲区的情况
    ret = state.buffer.consume(n, state.decoder);
  }
  return ret;
}
로그인 후 복사

2.4.3. _read

用户初始化Readable stream时必须实现的方法,可以在这个方法里调用push方法,从而持续的触发read方法,当我们push null时可以停止流的写入操作。

示例代码:

readable._read

const Stream = require(&#39;stream&#39;);
const readableStream = new Stream.Readable({
  read(hwm) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 122) {
      this.push(null);
    }
  },
});
readableStream.currentCharCode = 97;
readableStream.pipe(process.stdout);
// abcdefghijklmnopqrstuvwxyz%
로그인 후 복사

2.4.4. pipe(重要)

将一个或多个writable流绑定到当前的Readable流上,并且将Readable流切换到流动模式。

这个方法里面有很多的事件监听句柄,这里不会一一介绍:

readable.pipe

Readable.prototype.pipe = function(dest, pipeOpts) {
  const src = this;
  const state = this.\_readableState;
  state.pipes.push(dest); // 收集Writable流

  src.on(&#39;data&#39;, ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      pause();
    }
  }
  // Tell the dest that it&#39;s being piped to.
  dest.emit(&#39;pipe&#39;, src);
  // 启动流,如果流处于暂停模式
  if (dest.writableNeedDrain === true) {
    if (state.flowing) {
      pause();
    }
  } else if (!state.flowing) {
    src.resume();
  }
  return dest;
}
로그인 후 복사

pipe操作和Linux的管道操作符'|'非常相似,将左侧输出变为右侧输入,这个方法会将可写流收集起来进行维护,并且当可读流触发'data'事件。

有数据流出时,就会触发可写流的写入事件,从而做到数据传递,实现像管道一样的操作。并且会自动将处于暂停模式的可读流变为流动模式。

2.4.5. resume

使流从'暂停'模式切换到'流动'模式,如果设置了'readable'事件监听,那么这个方法其实是没有效果的

readable.resume

Readable.prototype.resume = function() {
  const state = this._readableState;
  if (!state.flowing) {
    state.flowing = !state.readableListening; // 是否处于流动模式取决于是否设置了&#39;readable&#39;监听句柄
    resume(this, state);
  }
};

function resume(stream, state) {
  if (!state.resumeScheduled) { // 开关,使resume_方法仅在同一个Tick中调用一次
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state);
  }
}

function resume_(stream, state) {
  if (!state.reading) {
    stream.read(0);
  }
  state.resumeScheduled = false;
  stream.emit(&#39;resume&#39;);
  flow(stream);
}

function flow(stream) { // 当流处于流模式该方法会不断的从buffer中读取数据,直到缓冲区为空
  const state = stream._readableState;
  while (state.flowing && stream.read() !== null); 
  // 因为这里会调用read方法,设置了&#39;readable&#39;事件监听器的stream,也有可能会调用read方法,
  //从而导致数据不连贯(不影响data,仅影响在&#39;readable&#39;事件回调中调用read方法读取数据)
}
로그인 후 복사

2.4.6. pause

将流从流动模式转变为暂停模式,停止触发'data'事件,将所有的数据保存到缓冲区

readable.pause

Readable.prototype.pause = function() {
  if (this._readableState.flowing !== false) {
    debug(&#39;pause&#39;);
    this._readableState.flowing = false;
    this.emit(&#39;pause&#39;);
  }
  return this;
};
로그인 후 복사

2.5. 使用方法与工作机制

使用方法在BufferList部分已经讲过了,创建一个Readable实例,并实现其_read()方法,或者在构造函数的第一个对象参数中实现read方法。

2.5.1. 工作机制

스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석

这里只画了大致的流程,以及Readable流的模式转换触发条件。

其中:

  • needReadable(true): 일시정지 모드 및 버퍼 데이터
  • push: 흐름 모드인 경우, 버퍼에 데이터가 없습니다. 데이터가 'data' 이벤트를 트리거합니다. 그렇지 않으면 데이터를 버퍼에 저장하고 needReadable 상태에 따라 '읽기 가능' 이벤트를 트리거합니다.
  • read: 길이가 0인 데이터를 읽을 때 버퍼의 데이터가 hwm에 도달했거나 오버플로가 '읽기 가능' 이벤트를 트리거해야 합니다. 버퍼에서 데이터를 읽고 '데이터' 이벤트를 트리거해야 합니다.
  • resume: '읽기 가능' 리스너가 있는 경우 그렇지 않으면 이 메서드는 효과가 없습니다. 스트림이 일시 정지 모드에서 흐름 모드로 변경되고 버퍼의 데이터가 지워집니다
  • 읽기 가능한 트리거 조건: '읽기 가능' 이벤트가 바인딩되고 버퍼에 데이터가 있습니다. 데이터를 푸시할 때 데이터가 있습니다. buffer, needReadable === true입니다. 길이가 0인 데이터를 읽을 때 버퍼의 데이터가 hwm 또는 Overflow에 도달했습니다

3. 요약

  • 메모리 문제와 시간 문제를 해결하기 위해 Node .js는 소비자가 소비할 수 있도록 데이터를 작은 조각으로 메모리에 읽어 들일 수 있도록 자체 스트림을 구현합니다. 스트림은 수십 년 전에 Unix 운영 체제에 도입된 고유한 개념이 아닙니다. 네 가지 유형의 스트림: 읽기 가능한 스트림, 쓰기 가능한 스트림, 읽기 및 쓰기 가능한 스트림, 변환 스트림 모두 EventEmiiter의 인스턴스 메서드와 정적 메서드를 상속하며 모두 EE
  • 의 인스턴스입니다. 스트림의 기본 컨테이너는 다음을 기반으로 합니다. BufferList는 시작과 끝에 다음 노드 참조를 가리키는 "포인터"가 있는 사용자 정의 연결 목록 구현입니다.
  • 읽기 가능한 스트림에는 두 가지 모드와 세 가지 상태가 있습니다.
  • 스트림을 기반으로 데이터의 연쇄 처리를 구현할 수 있으며, 다양한 스트림 처리 기능을 조합하여 다양한 대류 작업을 구현하고 이를 원하는 데이터로 변환할 수 있습니다.
  • 노드 관련 더 자세한 내용은 , 방문해주세요:
  • nodejs 튜토리얼
  • !

위 내용은 스트림이란 무엇입니까? Node.js에서 읽을 수 있는 스트림에 대한 심층 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:juejin.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿