This article introduces the node.js stream api to you from the shallower to the deeper. Please see below for specific details.
Basic introduction
In Node.js, there are two ways to read files, one is to use fs.readFile, the other is to use fs.createReadStream to read.
fs.readFile is the most familiar to every Node.js user. It is simple to understand and easy to use. But its disadvantage is that all the data will be read into the memory first. Once a large file is encountered, the efficiency of reading in this way is very low.
And fs.createReadStream reads data through Stream. It divides the file (data) into small pieces and then triggers some specific events. We can listen to these events and write specific processing functions. This method is not easy to use compared to the above, but it is very efficient.
In fact, Stream is not only used for file processing in Node.js, it can also be seen in other places, such as process.stdin/stdout, http, tcp sockets, zlib, crypto, etc.
This article is a summary of my learning about the Stream API in Node.js. I hope it will be useful to everyone.
Features
Event-based communication
You can connect streams through pipe
Type
Readable Stream readable data stream
Writeable Stream writable data stream
Duplex Stream is a bidirectional data stream that can read and write at the same time
Transform Stream converts data streams, which can be read and written, and can also convert (process) data
Event
Events for readable data streams
readable triggers when data flows out
data For those data streams that are not explicitly paused, add the data event listening function, which will switch the data stream to stream dynamics and provide data to the outside as soon as possible
end is triggered when the data is read. Note that it cannot be confused with writeableStream.end(). writeableStream does not have an end event, only the .end() method
close triggers when the data source is closed
error triggered when an error occurs while reading data
Events of writable data streams
drain writable.write(chunk) returns false. After all the cache is written, it will be triggered when it can be written again
When finish calls the .end method, it is triggered after all cached data is released, similar to the end event in the readable data stream, indicating the end of the writing process
Triggered when pipe is used as a pipe target
unpipe Triggered when unpipe is the target
error triggered when an error occurs when writing data
Status
The readable data stream has two states: stream dynamic and pause state. The method to change the data stream state is as follows:
Pause status -> Streaming status
Add a listening function for the data event
Call resume method
Call the pipe method
Note: If there is no listening function for the data event and no destination for the pipe method when converting to streaming dynamics, the data will be lost.
Streaming status -> Pause status
When the destination of the pipe method does not exist, the pause method is called
When there is a destination for the pipe method, remove all data event listening functions, and call the unpipe method to remove all pipe method destinations
Note: Only removing the listening function of the data event will not automatically cause the data flow to enter the "pause state". In addition, calling the pause method when there are destinations of the pipe method does not guarantee that the data flow is always paused. Once those destinations issue data requests, the data flow may continue to provide data.
Usage
Read and write files
var fs = require('fs'); // 新建可读数据流 var rs = fs.createReadStream('./test1.txt'); // 新建可写数据流 var ws = fs.createWriteStream('./test2.txt'); // 监听可读数据流结束事件 rs.on('end', function() { console.log('read text1.txt successfully!'); }); // 监听可写数据流结束事件 ws.on('finish', function() { console.log('write text2.txt successfully!'); }); // 把可读数据流转换成流动态,流进可写数据流中 rs.pipe(ws); 读取 CSV 文件,并上传数据(我在生产环境中写过) var fs = require('fs'); var es = require('event-stream'); var csv = require('csv'); var parser = csv.parse(); var transformer = csv.transform(function(record) { return record.join(','); }); var data = fs.createReadStream('./demo.csv'); data .pipe(parser) .pipe(transformer) // 处理前一个 stream 传递过来的数据 .pipe(es.map(function(data, callback) { upload(data, function(err) { callback(err); }); })) // 相当于监听前一个 stream 的 end 事件 .pipe(es.wait(function(err, body) { process.stdout.write('done!'); }));
More Usage
You can refer to https://github.com/jeresig/node-stream-playground. After entering the sample website, click add stream directly to see the results.
Common Pitfalls
Using rs.pipe(ws) to write files does not append the content of rs to the back of ws, but directly overwrites the original content of ws with the content of rs
Ended/closed streams cannot be reused and the data stream must be recreated
The pipe method returns the target data stream, for example, a.pipe(b) returns b, so when listening for events, please pay attention to whether the object you are monitoring is correct
If you want to monitor multiple data streams and you use the pipe method to connect the data streams in series, you have to write:
data
.on('end', function() { console.log('data end'); }) .pipe(a) .on('end', function() { console.log('a end'); }) .pipe(b) .on('end', function() { console.log('b end'); });
Commonly used class libraries
Event-stream feels like functional programming when used, which I personally like
awesome-nodejs#streams Since I have never used other stream libraries, so if you need it, just look here
The above content is the use of Stream API in Node.js introduced by the editor. I hope you like it.