What is a stream? How to understand flow? The following article will give you an in-depth understanding of the stream (Stream) in Node. I hope it will be helpful to you!
The author has often used the pipe function in development recently. I only know that it is a stream pipe, but I don’t know how it works, so I want to find out. Just start learning from the flow, and compile the knowledge and source code you have read into an article to share with everyone.
Stream is a very basic concept in Nodejs. Many basic modules are implemented based on streams and play a very important role. At the same time, flow is also a very difficult concept to understand. This is mainly due to the lack of relevant documentation. For NodeJs beginners, it often takes a lot of time to understand flow before they can truly master this concept. Fortunately, for most NodeJs, it is For users, it is only used to develop Web applications. Insufficient understanding of streams does not affect their use. However, understanding streams can lead to a better understanding of other modules in NodeJs, and in some cases, using streams to process data will have better results. [Related tutorial recommendations: nodejs video tutorial]
For users of streams, You can think of a stream as an array, and we only need to focus on getting (consuming) and writing (producing) from it.
For stream developers (using the stream module to create a new instance), they focus on how to implement some methods in the stream. They usually focus on two points, who is the target resource and how to operate it. After the target resource is determined, it needs to be operated on the target resource according to the different states and events of the stream
All in NodeJs All streams have buffer pools. The purpose of the buffer pool is to increase the efficiency of the stream. When the production and consumption of data take time, we can produce data in advance and store it in the buffer pool before the next consumption. However, the buffer pool is not always in use. For example, when the cache pool is empty, the data will not be put into the cache pool after production but will be consumed directly. .
If the speed of data production is greater than the speed of data consumption, the excess data will wait somewhere. If the data production speed is slower than the process data consumption speed, then the data will accumulate to a certain amount somewhere and then be consumed. (Developers cannot control the production and consumption speed of data, they can only try to produce data or consume data at the right time)
The place where data waits, accumulates data, and then occurs. It is buffer pool. The buffer pool is usually located in the computer's RAM (memory).
To give a common buffer example, when we watch online videos, if your Internet speed is very fast, the buffer will always be filled immediately, then sent to the system for playback, and then buffered immediately. A video. There will be no lag during viewing. If the network speed is very slow, you will see loading, indicating that the buffer is being filled. When the filling is completed, the data is sent to the system and you can see this video.
The cache pool of NodeJs stream is a Buffer linked list. Every time you add data to the cache pool, a Buffer node will be re-created and inserted into the end of the linked list.
Stream in NodeJs is an abstract interface that implements EventEmitter, so I will briefly introduce EventEmitter first.
EventEmitter is a class that implements event publishing and subscription functions. Several commonly used methods (on, once, off, emit) are believed to be familiar to everyone, so I will not introduce them one by one.
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() // 为 eventA 事件绑定处理函数 eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); // 为 eventB 事件绑定处理函数 eventEmitter.on('eventB', () => { console.log('eventB active 1'); }); eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // 触发 eventA eventEmitter.emit('eventA') // eventA active 1 // eventA active 2
It is worth noting that EventEmitter
has two events called newListener
and removeListener
. When you add any events to an event object After the event listening function is triggered, newListener
(eventEmitter.emit('newListener')) will be triggered. When a handler function is removed, removeListener
will be triggered in the same way.
It should also be noted that the once bound processing function will only be executed once, removeListener
will be triggered before its execution, which means once
binding The listening function is first removed before being triggered.
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() eventEmitter.on('newListener', (event, listener)=>{ console.log('newListener', event, listener) }) eventEmitter.on('removeListener', (event, listener) => { console.log('removeListener', event, listener) }) //newListener removeListener[Function(anonymous)] eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); //newListener eventA [Function (anonymous)] function listenerB() { console.log('eventB active 1'); } eventEmitter.on('eventB', listenerB); // newListener eventB [Function (anonymous)] eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // newListener eventA [Function (anonymous)] eventEmitter.emit('eventA') // eventA active 1 // removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] } // eventA active 2 eventEmitter.off('eventB', listenerB) // removeListener eventB[Function: listenerB]
But this is not important for our subsequent content.
Stream is an abstract interface for processing streaming data in Node.js. Stream is not an actual interface, but a general term for all streams. The actual interfaces are ReadableStream, WritableStream, and ReadWriteStream.
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>; } interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this; } interface ReadWriteStream extends ReadableStream, WritableStream { }
It can be seen that ReadableStream and WritableStream are both interfaces that inherit the EventEmitter class (interfaces in ts can inherit classes, because they are only merging types).
The implementation classes corresponding to the above interfaces are Readable, Writable and Duplex
There are 4 types of streams in NodeJs:
The speed of disk writing data is much lower than that of memory. We imagine that there is a gap between memory and disk "Pipeline", "pipeline" means "flow". The data in the memory flows into the pipe very quickly. When the pipe is full, data back pressure will be generated in the memory, and the data will be backlogged in the memory, occupying resources.
The solution for NodeJs Stream is to set a float value for each stream's buffer pool
(that is, the write queue in the figure). When the data in it After the amount reaches this float value, false will be returned when push
data is pushed to the cache pool again, indicating that the content of the cache pool in the current stream has reached the float value, and no more data is expected to be written. At this time we should Immediately stop the production of data to prevent back pressure caused by an excessively large cache pool.
Readable stream (Readable) is a type of stream. It has two modes and three states.
Two reading modes:
Flow mode: Data will be read and written from the underlying system to the buffer. When the buffer is full, the data will automatically be passed to the registered event handler as quickly as possible through EventEmitter
Pause mode: In this mode, EventEmitter will not be actively triggered to transmit data, and the Readable.read()
method must be explicitly called to read data from the buffer. , read will trigger the response to the EventEmitter event.
Three states:
readableFlowing === null (initial state)
readableFlowing === false (pause mode)
readableFlowing === true (flowing mode)
Initially flowing readable. readableFlowing
becomes true after adding data event for null
. When pause()
, unpipe()
is called, or back pressure is received or a readable
event is added, readableFlowing
will be set to false ,In this state, binding a listener to the data event will not switch readableFlowing to true.
Call resume()
to switch the readableFlowing
of the readable stream to true
Remove all readable events to enable readableFlowing The only way to become null.
Event name | Description |
---|---|
Triggered when there is new readable data in the buffer (triggered every time a node is inserted into the cache pool) | |
It will be triggered every time after consuming data Triggered, the parameter is the data consumed this time | |
Triggered when the stream is closed | |
Triggered when an error occurs in the stream |
Description | |
---|---|
Consume data with a length of size. Return null to indicate that the current data is less than size. Otherwise, return the data consumed this time. When size is not passed, it means consuming all the data in the cache pool |