Home > Web Front-end > JS Tutorial > An article about readable streams in Node

An article about readable streams in Node

青灯夜游
Release: 2023-02-02 20:13:03
forward
1897 people have browsed it

This article will help you interpret the Node.js stream source code, gain an in-depth understanding of Node's readable streams, and take a look at its basic principles, usage and working mechanisms. I hope it will be helpful to everyone!

An article about readable streams in Node

1. Basic concepts

##1.1. Historical evolution of streams

Streams are not a concept unique to Nodejs. They were introduced decades ago in the Unix operating system, and programs can interact with each other on streams through the pipe operator (|).

The pipe operator (|) can be used in MacOS and Linux based on Unix systems. It can convert the output of the process on the left side of the operator into the input on the right side.

In Node, if we use the traditional readFile to read a file, the file will be read into the memory from beginning to end. Only after all the contents have been read will the file be loaded into the memory. File contents are processed uniformly.

There will be two disadvantages in doing this:

  • Memory aspect: taking up a lot of memory

  • Time aspect: need to wait for data The data will not be processed until the entire payload of There are three types of streams, they are all instances of EventEmitter in Node.js:

Readable Stream

  • Writable Stream (Writable Stream)

  • Readable and writable full-duplex stream (Duplex Stream)

  • Transform Stream (Transform Stream)

  • In order to study this part in depth and gradually understand the concept of streams in Node.js, and because the source code part is relatively complicated, I decided to start learning this part from the readable stream.

1.2. What is a Stream

A stream is an abstract data structure, a collection of data, in which the data stored The type can only be the following types (only for the case where objectMode === false): string

Buffer

  • We can think of the stream as These collections of data are just like liquids. We first save these liquids in a container (the internal buffer BufferList of the stream). When the corresponding event is triggered, we pour the liquid inside into the pipe and notify Others have their own containers on the other side of the pipe to catch the liquid inside for disposal.

1.3. What is a Readable Stream An article about readable streams in Node

A readable stream is a stream One type, it has two modes and three statesTwo reading modes:

Flow mode: data will be read from the underlying system and passed through EventEmitter Pass the data to the registered event handler as quickly as possible

  • Pause mode: In this mode, the data will not be read, and the Stream.read() method must be explicitly called To read data from the stream

  • Three states:

readableFlowing === null: No data will be generated, call Stream.pipe (), Stream.resume will change its status to true, start generating data and actively trigger events

  • readableFlowing === false: The flow of data will be suspended at this time, but it will not Pause the generation of data, so there will be a data backlog

  • readableFlowing === true: generate and consume data normally

  • 2. Basic principles

2.1. Internal state definition (ReadableState)

ReadableState

_readableState: ReadableState {
  objectMode: false, // 操作除了string、Buffer、null之外的其他类型的数据需要把这个模式打开
  highWaterMark: 16384, // 水位限制,1024 \* 16,默认16kb,超过这个限制则会停止调用\_read()读数据到buffer中
  buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer链表,用于保存数据
  length: 0, // 整个可读流数据的大小,如果是objectMode则与buffer.length相等
  pipes: [], // 保存监听了该可读流的所有管道队列
  flowing: null, // 可独流的状态 null、false、true
  ended: false, // 所有数据消费完毕
  endEmitted: false, // 结束事件收否已发送
  reading: false, // 是否正在读取数据
  constructed: true, // 流在构造好之前或者失败之前,不能被销毁
  sync: true, // 是否同步触发'readable'/'data'事件,或是等到下一个tick
  needReadable: false, // 是否需要发送readable事件
  emittedReadable: false, // readable事件发送完毕
  readableListening: false, // 是否有readable监听事件
  resumeScheduled: false, // 是否调用过resume方法
  errorEmitted: false, // 错误事件已发送
  emitClose: true, // 流销毁时,是否发送close事件
  autoDestroy: true, // 自动销毁,在'end'事件触发后被调用
  destroyed: false, // 流是否已经被销毁
  errored: null, // 标识流是否报错
  closed: false, // 流是否已经关闭
  closeEmitted: false, // close事件是否已发送
  defaultEncoding: 'utf8', // 默认字符编码格式
  awaitDrainWriters: null, // 指向监听了&#39;drain&#39;事件的writer引用,类型为null、Writable、Set<Writable>
  multiAwaitDrain: false, // 是否有多个writer等待drain事件 
  readingMore: false, // 是否可以读取更多数据
  dataEmitted: false, // 数据已发送
  decoder: null, // 解码器
  encoding: null, // 编码器
  [Symbol(kPaused)]: null
},
Copy after login

2.2. Internal data storage implementation (BufferList)

BufferList is a container used to store internal data in a stream. It is designed in the form of a linked list. There are three attributes head, tail and length. I represent each node in the BufferList as a BufferNode, and the type of Data inside depends on the objectMode.

This data structure obtains header data faster than Array.prototype.shift().

2.2.1. Data storage typeAn article about readable streams in Node

If objectMode === true: Then data can be of any type, and whatever data is pushed will be stored.

objectMode=true

const Stream = require(&#39;stream&#39;);
const readableStream = new Stream.Readable({
  objectMode: true,
  read() {},
});

readableStream.push({ name: &#39;lisa&#39;});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(true);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(&#39;lisa&#39;);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(666);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(() => {});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(Symbol(1));
console.log(readableStream._readableState.buffer.tail);
readableStream.push(BigInt(123));
console.log(readableStream._readableState.buffer.tail);
Copy after login

Run result:

If objectMode === false:

Then data can only be string or Buffer or Uint8ArrayAn article about readable streams in Node

objectMode=false

const Stream = require(&#39;stream&#39;);

const readableStream = new Stream.Readable({

  objectMode: false,

  read() {},

});

readableStream.push({ name: &#39;lisa&#39;});
Copy after login

Running results:

An article about readable streams in Node

2.2.2. 数据存储结构

我们在控制台通过node命令行创建一个可读流,来观察buffer中数据的变化:

164638400399706An article about readable streams in Node

当然在push数据之前我们需要实现他的_read方法,或者在构造函数的参数中实现read方法:

const Stream = require(&#39;stream&#39;);

const readableStream = new Stream.Readable();

RS._read = function(size) {}
Copy after login

或者

const Stream = require(&#39;stream&#39;);

const readableStream = new Stream.Readable({
  
  read(size) {}

});
Copy after login

经过readableStream.push('abc')操作之后,当前的buffer为:

An article about readable streams in Node

可以看到目前的数据存储了,头尾存储的数据都是字符串'abc'的ascii码,类型为Buffer类型,length表示当前保存的数据的条数而非数据内容的大小。

2.2.3. 相关API

打印一下BufferList的所有方法可以得到:

An article about readable streams in Node

除了join是将BufferList序列化为字符串之外,其他都是对数据的存取操作。

这里就不一一讲解所有的方法了,重点讲一下其中的consume 、_getString和_getBuffer。

2.2.3.1. consume

源码地址:BufferList.consume
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
Copy after login

comsume

// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
  const data = this.head.data;
  if (n < data.length) {
    // `slice` is the same for buffers and strings.
    const slice = data.slice(0, n);
    this.head.data = data.slice(n);
    return slice;
  }
  if (n === data.length) {
    // First chunk is a perfect match.
    return this.shift();
  }
  // Result spans more than one buffer.
  return hasStrings ? this.\_getString(n) : this.\_getBuffer(n);
}
Copy after login

代码一共有三个判断条件:

  • 如果所消耗的数据的字节长度小于链表头节点存储数据的长度,则将头节点的数据取前n字节,并把当前头节点的数据设置为切片之后的数据

  • 如果所消耗的数据恰好等于链表头节点所存储的数据的长度,则直接返回当前头节点的数据

An article about readable streams in Node

  • 如果所消耗的数据的长度大于链表头节点的长度,那么会根据传入的第二个参数进行最后一次判断,判断当前的BufferList底层存储的是string还是Buffer

2.2.3.2. _getBuffer

源码地址:BufferList._getBuffer
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
Copy after login

comsume

// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
  const ret = Buffer.allocUnsafe(n);
  const retLen = n;
  let p = this.head;
  let c = 0;
  do {
    const buf = p.data;
    if (n > buf.length) {
      TypedArrayPrototypeSet(ret, buf, retLen - n);
      n -= buf.length;
    } else {
      if (n === buf.length) {
        TypedArrayPrototypeSet(ret, buf, retLen - n);
        ++c;
        if (p.next)
          this.head = p.next;
        else
          this.head = this.tail = null;
      } else {
       TypedArrayPrototypeSet(ret,
                              new Uint8Array(buf.buffer, buf.byteOffset, n),
                              retLen - n);
        this.head = p;
        p.data = buf.slice(n);
      }
      break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}
Copy after login

总的来说就是循环对链表中的节点进行操作,新建一个Buffer数组用于存储返回的数据。

首先从链表的头节点开始取数据,不断的复制到新建的Buffer中,直到某一个节点的数据大于等于要取的长度减去已经取得的长度。

或者说读到链表的最后一个节点后,都还没有达到要取的长度,那么就返回这个新建的Buffer。

2.2.3.3. _getString

源码地址:BufferList._getString
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
Copy after login

comsume

// Consumes a specified amount of characters from the buffered data.
_getString(n) {
  let ret = &#39;&#39;;
  let p = this.head;
  let c = 0;
  do {
    const str = p.data;
    if (n > str.length) {
    ret += str;
    n -= str.length;
  } else {
    if (n === str.length) {
      ret += str;
      ++c;
      if (p.next)
        this.head = p.next;
      else
        this.head = this.tail = null;
    } else {
      ret += StringPrototypeSlice(str, 0, n);
      this.head = p;
      p.data = StringPrototypeSlice(str, n);
    }
    break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}
Copy after login

对于操作字符串来说和操作Buffer是一样的,也是循环从链表的头部开始读数据,只是进行数据的拷贝存储方面有些差异,还有就是_getString操作返回的数据类型是string类型。

2.3. 为什么可读流是EventEmitter的实例?

对于这个问题而言,首先要了解什么是发布订阅模式,发布订阅模式在大多数API中都有重要的应用,无论是Promise还是Redux,基于发布订阅模式实现的高级API随处可见。

它的优点在于能将事件的相关回调函数存储到队列中,然后在将来的某个时刻通知到对方去处理数据,从而做到关注点分离,生产者只管生产数据和通知消费者,而消费者则只管处理对应的事件及其对应的数据,而Node.js流模式刚好符合这一特点。

那么Node.js流是怎样实现基于EventEmitter创建实例的呢?

这部分源码在这儿:stream/legacy
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
Copy after login

legacy

function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
Copy after login

然后在可读流的源码中有这么几行代码:

这部分源码在这儿:readable
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
Copy after login

legacy

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
Copy after login

首先将Stream的原型对象继承自EventEmitter,这样Stream的所有实例都可以访问到EventEmitter上的方法。

同时通过ObjectSetPrototypeOf(Stream, EE)将EventEmitter上的静态方法也继承过来,并在Stream的构造函数中,借用构造函数EE来实现所有EventEmitter中的属性的继承,然后在可读流里,用同样的的方法实现对Stream类的原型继承和静态属性继承,从而得到:

Readable.prototype.__proto__ === Stream.prototype;

Stream.prototype.__proto__ === EE.prototype

因此:

Readable.prototype.__proto__.__proto__ === EE.prototype

所以捋着可读流的原型链可以找到EventEmitter的原型,实现对EventEmitter的继承

2.4. 相关API的实现

这里会按照源码文档中API的出现顺序来展示,且仅解读其中的核心API实现。

注:此处仅解读Node.js可读流源码中所声明的函数,不包含外部引入的函数定义,同时为了减少篇幅,不会将所有代码都拷贝下来。

Readable.prototype

Stream {
  destroy: [Function: destroy],
  _undestroy: [Function: undestroy],
  _destroy: [Function (anonymous)],
  push: [Function (anonymous)],
  unshift: [Function (anonymous)],
  isPaused: [Function (anonymous)],
  setEncoding: [Function (anonymous)],
  read: [Function (anonymous)],
  _read: [Function (anonymous)],
  pipe: [Function (anonymous)],
  unpipe: [Function (anonymous)],
  on: [Function (anonymous)],
  addListener: [Function (anonymous)],
  removeListener: [Function (anonymous)],
  off: [Function (anonymous)],
  removeAllListeners: [Function (anonymous)],
  resume: [Function (anonymous)],
  pause: [Function (anonymous)],
  wrap: [Function (anonymous)],
  iterator: [Function (anonymous)],
  [Symbol(nodejs.rejection)]: [Function (anonymous)],
  [Symbol(Symbol.asyncIterator)]: [Function (anonymous)]
}
Copy after login

2.4.1. push

readable.push

Readable.prototype.push = function(chunk, encoding) {
  return readableAddChunk(this, chunk, encoding, false);
};
Copy after login

push方法的主要作用就是将数据块通过触发'data'事件传递给下游管道,或者将数据存储到自身的缓冲区中。

以下代码为相关伪代码,仅展示主流程:

readable.push

function readableAddChunk(stream, chunk, encoding, addToFront) {
  const state = stream.\_readableState;
  if (chunk === null) { // push null 流结束信号,之后不能再写入数据
    state.reading = false;
    onEofChunk(stream, state);
  } else if (!state.objectMode) { // 如果不是对象模式
    if (typeof chunk === &#39;string&#39;) {
      chunk = Buffer.from(chunk);
    } else if (chunk instanceof Buffer) { //如果是Buffer
    // 处理一下编码
    } else if (Stream.\_isUint8Array(chunk)) {
      chunk = Stream.\_uint8ArrayToBuffer(chunk);
    } else if (chunk != null) {
      err = new ERR\_INVALID\_ARG\_TYPE(&#39;chunk&#39;, [&#39;string&#39;, &#39;Buffer&#39;, &#39;Uint8Array&#39;], chunk);
    }
  }

  if (state.objectMode || (chunk && chunk.length > 0)) { // 是对象模式或者chunk是Buffer
    // 这里省略几种数据的插入方式的判断
    addChunk(stream, state, chunk, true);
  }
}

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync &&
    stream.listenerCount(&#39;data&#39;) > 0) { // 如果处于流动模式,有监听data的订阅者
      stream.emit(&#39;data&#39;, chunk);
  } else { // 否则保存数据到缓冲区中
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront) {
      state.buffer.unshift(chunk);
    } else {
      state.buffer.push(chunk);
    }
  }
  maybeReadMore(stream, state); // 尝试多读一点数据
}
Copy after login

push操作主要分为对objectMode的判断,不同的类型对传入的数据会做不同的操作:

  • objectMode === false: 将数据(chunk)转换成Buffer
  • objectMode === true: 将数据原封不动的传递给下游

其中addChunk的第一个判断主要是处理Readable处于流动模式、有data监听器、并且缓冲区数据为空时的情况。

这时主要将数据passthrough透传给订阅了data事件的其他程序,否则就将数据保存到缓冲区里面。

2.4.2. read

除去对边界条件的判断、流状态的判断,这个方法主要有两个操作

  • 调用用户实现的_read方法,对执行结果进行处理

  • 从缓冲区buffer中读取数据,并触发'data'事件

readable.read

// 如果read的长度大于hwm,则会重新计算hwm
if (n > state.highWaterMark) {
  state.highWaterMark = computeNewHighWaterMark(n);  
}
// 调用用户实现的\_read方法
try {
  const result = this.\_read(state.highWaterMark);
  if (result != null) {
    const then = result.then;
    if (typeof then === &#39;function&#39;) {
      then.call(
        result,
        nop,
        function(err) {
          errorOrDestroy(this, err);
        });
    }
  }
} catch (err) {
  errorOrDestroy(this, err);
}
Copy after login

如果说用户实现的_read方法返回的是一个promise,则调用这个promise的then方法,将成功和失败的回调传入,便于处理异常情况。

read方法从缓冲区里读区数据的核心代码如下:

readable.read

function fromList(n, state) {
  // nothing buffered.
  if (state.length === 0)
    return null;
  let ret;
  if (state.objectMode)
    ret = state.buffer.shift();
  else if (!n || n >= state.length) { // 处理n为空或者大于缓冲区的长度的情况
    // Read it all, truncate the list.
    if (state.decoder) // 有解码器,则将结果序列化为字符串
      ret = state.buffer.join(&#39;&#39;);
    else if (state.buffer.length === 1) // 只有一个数据,返回头节点数据
      ret = state.buffer.first();
    else // 将所有数据存储到一个Buffer中
      ret = state.buffer.concat(state.length);
    state.buffer.clear(); // 清空缓冲区
  } else {
    // 处理读取长度小于缓冲区的情况
    ret = state.buffer.consume(n, state.decoder);
  }
  return ret;
}
Copy after login

2.4.3. _read

用户初始化Readable stream时必须实现的方法,可以在这个方法里调用push方法,从而持续的触发read方法,当我们push null时可以停止流的写入操作。

示例代码:

readable._read

const Stream = require(&#39;stream&#39;);
const readableStream = new Stream.Readable({
  read(hwm) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 122) {
      this.push(null);
    }
  },
});
readableStream.currentCharCode = 97;
readableStream.pipe(process.stdout);
// abcdefghijklmnopqrstuvwxyz%
Copy after login

2.4.4. pipe(重要)

将一个或多个writable流绑定到当前的Readable流上,并且将Readable流切换到流动模式。

这个方法里面有很多的事件监听句柄,这里不会一一介绍:

readable.pipe

Readable.prototype.pipe = function(dest, pipeOpts) {
  const src = this;
  const state = this.\_readableState;
  state.pipes.push(dest); // 收集Writable流

  src.on(&#39;data&#39;, ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      pause();
    }
  }
  // Tell the dest that it&#39;s being piped to.
  dest.emit(&#39;pipe&#39;, src);
  // 启动流,如果流处于暂停模式
  if (dest.writableNeedDrain === true) {
    if (state.flowing) {
      pause();
    }
  } else if (!state.flowing) {
    src.resume();
  }
  return dest;
}
Copy after login

pipe操作和Linux的管道操作符'|'非常相似,将左侧输出变为右侧输入,这个方法会将可写流收集起来进行维护,并且当可读流触发'data'事件。

有数据流出时,就会触发可写流的写入事件,从而做到数据传递,实现像管道一样的操作。并且会自动将处于暂停模式的可读流变为流动模式。

2.4.5. resume

使流从'暂停'模式切换到'流动'模式,如果设置了'readable'事件监听,那么这个方法其实是没有效果的

readable.resume

Readable.prototype.resume = function() {
  const state = this._readableState;
  if (!state.flowing) {
    state.flowing = !state.readableListening; // 是否处于流动模式取决于是否设置了&#39;readable&#39;监听句柄
    resume(this, state);
  }
};

function resume(stream, state) {
  if (!state.resumeScheduled) { // 开关,使resume_方法仅在同一个Tick中调用一次
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state);
  }
}

function resume_(stream, state) {
  if (!state.reading) {
    stream.read(0);
  }
  state.resumeScheduled = false;
  stream.emit(&#39;resume&#39;);
  flow(stream);
}

function flow(stream) { // 当流处于流模式该方法会不断的从buffer中读取数据,直到缓冲区为空
  const state = stream._readableState;
  while (state.flowing && stream.read() !== null); 
  // 因为这里会调用read方法,设置了&#39;readable&#39;事件监听器的stream,也有可能会调用read方法,
  //从而导致数据不连贯(不影响data,仅影响在&#39;readable&#39;事件回调中调用read方法读取数据)
}
Copy after login

2.4.6. pause

将流从流动模式转变为暂停模式,停止触发'data'事件,将所有的数据保存到缓冲区

readable.pause

Readable.prototype.pause = function() {
  if (this._readableState.flowing !== false) {
    debug(&#39;pause&#39;);
    this._readableState.flowing = false;
    this.emit(&#39;pause&#39;);
  }
  return this;
};
Copy after login

2.5. 使用方法与工作机制

使用方法在BufferList部分已经讲过了,创建一个Readable实例,并实现其_read()方法,或者在构造函数的第一个对象参数中实现read方法。

2.5.1. 工作机制

An article about readable streams in Node

Only the general process is drawn here, as well as the mode conversion triggering conditions of the Readable stream.

Among them:

  • needReadable(true): pause mode and buffer data
  • push: If in flow mode, there is no data in the buffer and the 'data' event will be triggered; otherwise, the data will be saved to the buffer and the 'readable' event will be triggered according to the needReadable status
  • read: When reading data with length=0, the data in the buffer has reached hwm or overflowed and needs to trigger the 'readable' event; read the data from the buffer and trigger the 'data' event
  • resume: There is a 'readable' listener , this method has no effect; otherwise, the stream will be changed from pause mode to flow mode, and the data in the buffer will be cleared
  • readable trigger condition: the 'readable' event is bound and there is data in the buffer, push When there is data in the buffer, and needReadable === true, and when reading data with length=0, the data in the buffer has reached hwm or overflowed

3. Summary

  • In order to solve the memory problem and time problem, Node.js implements its own stream, so that the data can be read into the memory in small pieces for consumers to consume
  • Streams are not a concept unique to Node.js. They were introduced in the Unix operating system decades ago.
  • There are four types of streams: readable stream, writable stream, readable and writable stream , conversion stream, they all inherit the instance methods and static methods of EventEmiiter, and are all instances of EE
  • The underlying container of the stream is based on BufferList, which is a customized linked list implementation. The head and tail are A "pointer" points to the next node reference
  • The readable stream has two modes and three states. In the flow mode, the data will be sent to the consumer through EventEmitter
  • Based on the stream we can achieve Chain processing of data, and different stream processing functions can be assembled to implement various operations on the stream and convert it into the data we want

For more node-related knowledge, please visit:nodejs tutorial!

The above is the detailed content of An article about readable streams in Node. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:juejin.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template