This article will introduce you to the Node stream module and introduce how to use Stream to build high-performance Node.js applications. I hope it will be helpful to everyone!
When you type characters on the keyboard, read a file from a disk, or download a file from the Internet, a stream of information (bits) flows through different devices and applications. .
If you learn to process these byte streams, you will be able to build high-performance and valuable applications. For example, imagine that when you watch a video on YouTube, you don’t have to wait until the full video downloads. Once there's a small buffer, the video will start playing and the rest will continue to download while you watch.
Nodejs contains a built-in module stream
that allows us to process streaming data. In this article, we will explain the usage of stream
through several simple examples. We will also describe how to build pipelines to merge different streams when building high-performance applications in complex cases. .
Before we dive into understanding application building, it is important to understand the features provided by the Node.js stream
module.
let's start!
Node.js stream
Provides four types of streams
For more details, please check the Node.js official documentation
https://nodejs.org/api/stream.html#stream_types_of_streams
Let’s Let’s take a high-level look at each stream type.
Readable streams can read data from a specific data source, most commonly from a file system. Other common uses of readable streams in Node.js applications are:
process.stdin
- Reading user input in a terminal application through stdin
. http.IncomingMessage
- Read the content of an incoming request in an HTTP server or read the server's HTTP response in an HTTP client. You can use a writable stream to write data from an application to a specific place, such as a file.
process.stdout
can be used to write data to standard output and is used internally by console.log
.
Next are duplex streams and conversion streams, which can be defined as mixed stream types based on readable streams and writable streams.
Duplex stream is a combination of a readable stream and a writable stream. It can both write data to a specific place and retrieve data from it. Source reads data. The most common example of a duplex stream is net.Socket
, which is used to read and write data from a socket.
It is important to note that the operations of the readable end and the writable end of the duplex stream are independent of each other, and data will not flow from one end to the other.
Translation stream is slightly similar to duplex stream, but in transformation stream, the readable end and the writable end are related. The
crypto.Cipher
class is a good example, which implements an encrypted stream. Through the crypto.Cipher
stream, applications can write plain text data to the writable end of the stream and read encrypted ciphertext from the readable end of the stream. This type of flow is called a conversion flow because of its conversion properties.
Note: Another conversion stream is stream.PassThrough
. stream.PassThrough
Passes data from the writable side to the readable side without any conversion. This may sound redundant, but Passthrough streams are very helpful for building custom streams as well as stream pipelines. (such as creating multiple copies of a stream's data)
Once the readable stream is connected to the production data A source, such as a file, can read data through the stream in several ways.
First, create a simple text file named myfile
, 85 bytes in size, containing the following string:
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.
Now, let’s take a look at the available Read streams are two different ways of reading data.
data
eventsThe most common way to read data from a readable stream is to listen to the data# emitted by the stream. ## event. The following code demonstrates this approach:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); })
highWaterMark The property is passed to
fs.createReadStream as an option to determine how much data is buffered in the stream. The data is then flushed to the reading mechanism (in this case, our
data handler). By default, the
highWaterMark value for a readable
fs stream is 64kb. We deliberately rewrite this value to 20 bytes to trigger multiple
data events.
如果你运行上述程序,它会在五个迭代内从 myfile
中读取 85 个字节。你会在 console 看到以下输出:
Read 20 bytes "Lorem ipsum dolor si" Read 20 bytes "t amet, consectetur " Read 20 bytes "adipiscing elit. Cur" Read 20 bytes "abitur nec mauris tu" Read 5 bytes "rpis."
从可读流中读取数据的另一种方法是使用异步迭代器:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); (async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } })()
如果你运行这个程序,你会得到和前面例子一样的输出。
当一个监听器监听到可读流的 data
事件时,流的状态会切换成”流动”状态(除非该流被显式的暂停了)。你可以通过流对象的 readableFlowing
属性检查流的”流动”状态
我们可以稍微修改下前面的例子,通过 data
处理器来示范:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); let bytesRead = 0 console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes`); bytesRead += chunk.length // 在从可读流中读取 60 个字节后停止阅读 if (bytesRead === 60) { readable.pause() console.log(`after pause() call. is flowing: ${readable.readableFlowing}`); // 在等待 1 秒后继续读取 setTimeout(() => { readable.resume() console.log(`after resume() call. is flowing: ${readable.readableFlowing}`); }, 1000) } }) console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
在这个例子中,我们从一个可读流中读取 myfile
,但在读取 60 个字节后,我们临时暂停了数据流 1 秒。我们也在不同的时间打印了 readableFlowing
属性的值去理解他是如何变化的。
如果你运行上述程序,你会得到以下输出:
before attaching 'data' handler. is flowing: null after attaching 'data' handler. is flowing: true Read 20 bytes Read 20 bytes Read 20 bytes after pause() call. is flowing: false after resume() call. is flowing: true Read 20 bytes Read 5 bytes
我们可以用以下来解释输出:
当我们的程序开始时,readableFlowing
的值是 null
,因为我们没有提供任何消耗流的机制。
在连接到 data
处理器后,可读流变为“流动”模式,readableFlowing
变为 true
。
一旦读取 60 个字节,通过调用 pause()
来暂停流,readableFlowing
也转变为 false
。
在等待 1 秒后,通过调用 resume()
,流再次切换为“流动”模式,readableFlowing
改为 `true'。然后剩下的文件内容在流中流动。
因为有流,应用不需要在内存中保留大型的二进制对象:小型的数据块可以接收到就进行处理。
在这部分,让我们组合不同的流来构建一个可以处理大量数据的真实应用。我们会使用一个小型的工具程序来生成一个给定文件的 SHA-256。
但首先,我们需要创建一个大型的 4GB 的假文件来测试。你可以通过一个简单的 shell 命令来完成:
mkfile -n 4g 4gb_file
xfs_mkfile 4096m 4gb_file
在我们创建了假文件 4gb_file
后,让我们在不使用 stream
模块的情况下来生成来文件的 SHA-256 hash。
const fs = require("fs"); const crypto = require("crypto"); fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log(readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console.error(err) }); });
如果你运行以上代码,你可能会得到以下错误:
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: 'ERR_FS_FILE_TOO_LARGE' }
以上报错之所以发生是因为 JavaScript 运行时无法处理随机的大型缓冲。运行时可以处理的最大尺寸的缓冲取决于你的操作系统结构。你可以通过使用内建的 buffer
模块里的 buffer.constants.MAX_LENGTH
变量来查看你操作系统缓存的最大尺寸。
即使上述报错没有发生,在内存中保留大型文件也是有问题的。我们所拥有的可用的物理内存会限制我们应用能使用的内存量。高内存使用率也会造成应用在 CPU 使用方面性能低下,因为垃圾回收会变得昂贵。
pipeline()
减少 APP 的内存占用现在,让我们看看如何修改应用去使用流且避免遇到这个报错:
const fs = require("fs"); const crypto = require("crypto"); const { pipeline } = require("stream"); const hashStream = crypto.createHash("sha256"); hashStream.setEncoding('base64') const inputStream = fs.createReadStream("./4gb_file"); const outputStream = fs.createWriteStream("./checksum.txt"); pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
在这个例子中,我们使用 crypto.createHash
函数提供的流式方法。它返回一个“转换”流对象 hashStream
,为随机的大型文件生成 hash。
为了将文件内容传输到这个转换流中,我们使用 fs.createReadStream
为 4gb_file
创建了一个可读流 inputStream
。我们将 hashStream
转换流的输出传递到可写流 outputStream
中,而 checksum.txt
通过 fs.createWriteStream
创建的。
如果你运行以上程序,你将看见在 checksum.txt
文件中看见 4GB 文件的 SHA-256 hash。
pipeline()
和 pipe()
的对比在前面的案例中,我们使用 pipeline
函数来连接多个流。另一种常见的方法是使用 .pipe()
函数,如下所示:
inputStream .pipe(hashStream) .pipe(outputStream)
但这里有几个原因,所以并不推荐在生产应用中使用 .pipe()
。如果其中一个流被关闭或者出现报错,pipe()
不会自动销毁连接的流,这会导致应用内存泄露。同样的,pipe()
不会自动跨流转发错误到一个地方处理。
因为这些问题,所以就有了 pipeline()
,所以推荐你使用 pipeline()
而不是 pipe()
来连接不同的流。 我们可以重写上述的 pipe()
例子来使用 pipeline()
函数,如下:
pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
pipeline()
接受一个回调函数作为最后一个参数。任何来自被连接的流的报错都将触发该回调函数,所以可以很轻松的在一个地方处理报错。
在 Node.js 中使用流有助于我们构建可以处理大型数据的高性能应用。
在这篇文章中,我们覆盖了:
data
事件或者使用异步迭代器来从可读流中读取数据。pipeline
连接多个流来减少内存占用。一个简短的警告:你很可能不会遇到太多必须使用流的场景,而基于流的方案会提高你的应用的复杂性。务必确保使用流的好处胜于它所带来的复杂性。
更多node相关知识,请访问:nodejs 教程!
The above is the detailed content of Let's talk about the Node.js stream module and see how to build high-performance applications. For more information, please follow other related articles on the PHP Chinese website!