Home > Web Front-end > JS Tutorial > In-depth understanding of the four major flows in Node.js and solving the 'back pressure' problem

In-depth understanding of the four major flows in Node.js and solving the 'back pressure' problem

Release: 2021-09-13 18:28:30
forward
2919 people have browsed it

Node.jsWhat are the four streams? What is a back pressure problem? The following article will take you to understand the four major streams in Node.js, and introduce the backpressure problem and solutions. I hope it will be helpful to you!

In-depth understanding of the four major flows in Node.js and solving the 'back pressure' problem

How to move something from A to B?

Just lift it up, move it to the destination, and put it down.

What if this thing weighs a ton?

Then move it part by part.

In fact, IO means moving things, including network IO and file IO. If the amount of data is small, then all the content can be transferred directly. But if there is a lot of content, loading it into the memory at one time will crash, and The speed is also slow, so you can process it part by part. This is the idea of ​​flow.

Various languages ​​have basically implemented the stream api, and Node.js has also implemented it. The stream api is relatively commonly used. Let’s explore stream below.

This article will answer the following questions:

  • What are the four streams of Node.js

  • How does the generator interact with Readable Stream combines the pause and flow of

  • stream

  • What is the back pressure problem and how to solve it

[Recommended learning: "nodejs Tutorial"]

The 4 kinds of streams of Node.js

Intuitive flow Feel

Flow from one place to another. Obviously there is an outflow side and an inflow side. The outflow side is a readable stream (readable), and the inflow side is writable. Stream(writable).

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

Of course, there are also streams that can both flow in and out. This is called a duplex stream

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

Since it can flow in and out, can we down-convert the incoming content and then flow it out? This kind of flow is called transformation flow (transform)

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

duplex flow The inflow and outflow content do not need to be related, but the inflow and outflow of the transform flow are related. This is the difference between the two.

Stream api

The streams provided by Node.js are the four types introduced above:

const stream = require('stream');

// 可读流
const Readable = stream.Readable;
// 可写流
const Writable = stream.Writable;
// 双工流
const Duplex = stream.Duplex;
// 转换流
const Transform = stream.Transform;
Copy after login

They are all There are methods to be implemented:

  • Readable needs to implement the _read method to return the content

  • Writable needs to implement the _write method to accept the content

  • Duplex needs to implement the _read and _write methods to accept and return content

  • Transform needs to implement the _transform method to convert the received content and return it

Let’s take a look at them separately:

Readable

Readable To implement the _read method, return specific data through push .

const Stream = require('stream');
const readableStream = Stream.Readable();
readableStream._read = function() {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}
readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});
Copy after login

When pushing a null, it means the end of the stream.

The execution effect is as follows:

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

Creating Readable can also be done through inheritance:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {
    constructor() {
        super();
    }
    _read() {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }
}

const readableStream = new ReadableDong();

readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});
Copy after login

The readable stream generates content, then It is natural to combine it with the generator:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator;
    }
    _read() {
        const next = this.iterator.next();
        if(next.done) {
            return this.push(null);
        } else {
            this.push(next.value)
        }
    }
}
function *songGenerator() {
    yield '阿门阿前一棵葡萄树,';
    yield '阿东阿东绿的刚发芽,';
    yield '阿东背着那重重的的壳呀,';
    yield '一步一步地往上爬。';
}
const songIterator = songGenerator();
const readableStream = new ReadableDong(songIterator);
readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});
Copy after login

This is a readable stream, and the content is returned by implementing the _read method.

Writable

Writable implements the _write method to receive the written content.

const Stream = require('stream');
const writableStream = Stream.Writable();
writableStream._write = function (data, enc, next) {
   console.log(data.toString());
   // 每秒写一次
   setTimeout(() => {
       next();
   }, 1000);
}
writableStream.on('finish', () => console.log('done~'));
writableStream.write('阿门阿前一棵葡萄树,');
writableStream.write('阿东阿东绿的刚发芽,');
writableStream.write('阿东背着那重重的的壳呀,');
writableStream.write('一步一步地往上爬。');
writableStream.end();
Copy after login

Receive the written content, print it out, and call next to process the next written content. The next call here is asynchronous and the frequency can be controlled.

After running for a while, the written content can indeed be processed normally:

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

This is a writable stream, and the write is processed by implementing the _write method Content.

Duplex

Duplex is readable and writable, just implement _read and _write at the same time

const Stream = require('stream');
var duplexStream = Stream.Duplex();
duplexStream._read = function () {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}
duplexStream._write = function (data, enc, next) {
    console.log(data.toString());
    next();
}
duplexStream.on('data', data => console.log(data.toString()));
duplexStream.on('end', data => console.log('read done~'));
duplexStream.write('阿门阿前一棵葡萄树,');
duplexStream.write('阿东阿东绿的刚发芽,');
duplexStream.write('阿东背着那重重的的壳呀,');
duplexStream.write('一步一步地往上爬。');
duplexStream.end();
duplexStream.on('finish', data => console.log('write done~'));
Copy after login

Integrates Readable stream and Writable stream Function, this is Duplex.

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

Transform

Although the Duplex stream can be read and written, there is no connection between the two, and sometimes it is necessary After converting the incoming content, it flows out. At this time, you need to convert the stream Transform.

Transform stream needs to implement the _transform api. We implement the conversion stream that reverses the content:

const Stream = require('stream');
class TransformReverse extends Stream.Transform {
  constructor() {
    super()
  }
  _transform(buf, enc, next) {
    const res = buf.toString().split('').reverse().join('');
    this.push(res)
    next()
  }
}
var transformStream = new TransformReverse();
transformStream.on('data', data => console.log(data.toString()))
transformStream.on('end', data => console.log('read done~'));
transformStream.write('阿门阿前一棵葡萄树');
transformStream.write('阿东阿东绿的刚发芽');
transformStream.write('阿东背着那重重的的壳呀');
transformStream.write('一步一步地往上爬');
transformStream.end()
transformStream.on('finish', data => console.log('write done~'));
Copy after login

After running it, the effect is as follows:

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

流的暂停和流动

我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

背压

但是 read 和 write 都是异步的,如果两者速率不一致呢?

如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。

而如果 Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。

这种读入速率大于写入速率的现象叫做“背压”,或者“负压”。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。

这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。

In-depth understanding of the four major flows in Node.js and solving the back pressure problem

解决背压

怎么解决这种读写速率不一致的问题呢?

当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。

readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。

当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。

readableStream.readableFlowing = false;

let data;
while((data = readableStream.read()) != null) {
    console.log(data.toString());
}
Copy after login

但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。

当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:

  • true: 数据已经写入目标

  • false:目标不可写入,暂时放在缓冲区

我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);
rs.on('data', function (chunk) {
    if (ws.write(chunk) === false) {
        rs.pause();
    }
});
rs.on('end', function () {
    ws.end();
});
ws.on('drain', function () {
    rs.resume();
});
Copy after login

这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。

pipe 有背压问题么?

平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);

rs.pipe(ws);
Copy after login

总结

流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。

Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。

创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)

当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。

流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!

更多编程相关知识,请访问:编程视频!!

The above is the detailed content of In-depth understanding of the four major flows in Node.js and solving the 'back pressure' problem. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:微信公众号- 神光的编程秘籍
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Issues
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template