Node.js のストリームは、使用すること、さらには理解することが難しいことで有名です。 [ビデオチュートリアルの推奨: nodejs ビデオチュートリアル ]
Dominic Tarr の言葉: 「ストリーミングは、Node における最良の、そして最も誤解されているアイデアです。」Redux Creator と React.js コア チームさえもメンバーのダン・アブラモフもノードストリームを恐れています。
#この記事は、ストリームとその使用方法を理解するのに役立ちます。恐れることはありません。完全に理解できます。
ストリームとは何ですか?
ストリームは、Node.js アプリケーションを強化する基本概念の 1 つです。入力データを順次読み取ったり、データを書き込んで出力したりするデータ処理方式です。 ストリーミングは、ファイルの読み取りと書き込み、ネットワーク通信、またはあらゆる種類のエンドツーエンドの情報交換を効率的に処理する方法です。 ストリームの処理方法は非常に独特で、従来のようにファイルを一度に 読み込むのではなく、データブロックを 1 つずつ読み込んでデータの内容を処理します。 . すべてを記憶に留めておいてください。
このアプローチにより、大量のデータ を処理するときにストリームが非常に強力になります。たとえば、ファイルのサイズが利用可能なメモリ領域よりも大きくなり、ファイル全体を読み取ることができなくなる可能性があります。処理のためにメモリに格納されます。そこで流れが生まれます!
ストリームを使用すると、より小さなデータ ブロックを処理し、より大きなファイルを読み取ることができます。 YouTube や Netflix などの「ストリーミング」サービスを例に挙げます。これらのサービスでは、ビデオ ファイルやオーディオ ファイルを即座にダウンロードできません。代わりに、ブラウザはビデオをチャンクの連続ストリームとして受信するため、受信者はほぼ即座に視聴を開始できます。 しかし、ストリーミングはメディアやビッグデータを処理するだけではありません。また、コードに「構成可能性」という力も与えてくれます。構成可能性を念頭に置いて設計するということは、複数のコンポーネントを何らかの方法で組み合わせて同じタイプの結果を生成できることを意味します。 Node.js では、ストリームを介して他の小さなスニペット内のデータを渡して、強力なスニペットを形成できます。ストリームを使用する理由
ストリーミングには、基本的に他のデータ処理方法に比べて 2 つの主な利点があります:Node.js には 4 種類のストリームがあります:
を使用すると、ストリームを使用してデータをファイルに書き込むことができます。
ファイルの内容を読んでみましょう。
request は読み取り可能なストリームであり、
response は書き込み可能なストリームです。
fs モジュールを使用したことがあるかもしれません。このモジュールを使用すると、読み取りおよび書き込み可能なファイル ストリームを使用できるようになります。 Express を使用するときは常にストリームを使用してクライアントと対話することになり、TCP ソケット、TLS スタック、およびその他の接続はすべて Node.js に基づいているため、使用できるすべてのデータベース接続駆動型プログラムでストリームを使用します。
#例
#読み取り可能なストリームを作成するにはどうすればよいですか? 可読性ストリームは最初に要求され、次に初期化されます。
const Stream = require('stream') const readableStream = new Stream.Readable()
これでストリームが初期化され、データを送信できるようになります:
readableStream.push('ping!') readableStream.push('pong!')
Async Iterator# #ストリームを使用する場合は、非同期イテレータを使用することを強くお勧めします。
Axel Rauschmayer 博士によると、非同期反復はデータ コンテナの内容を非同期に取得するためのプロトコルです (つまり、アイテムを取得する前に現在の「タスク」を一時停止できる)。ストリーム非同期反復子の実装では内部 readable イベントが使用されることにも言及する必要があります。 読み取り可能なストリームから読み取る場合、非同期イテレータを使用できます: <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;">import * as fs from &#39;fs&#39;;
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
&#39;tmp/test.txt&#39;, {encoding: &#39;utf8&#39;});
logChunks(readable);
// Output:
// &#39;This is a test!\n&#39;</pre><div class="contentsignin">ログイン後にコピー</div></div>
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!');
注意,在这种情况下必须使用异步函数,因为我们想返回 Promise。
请切记不要将异步功能与 EventEmitter
混合使用,因为当前在事件处理程序中发出拒绝时,无法捕获拒绝,从而导致难以跟踪错误和内存泄漏。目前的最佳实践是始终将异步函数的内容包装在 try/catch 块中并处理错误,但这很容易出错。 这个 pull request 旨在解决一旦其落在 Node 核心上产生的问题。
要了解有关异步迭代的 Node.js 流的更多信息,请查看这篇很棒的文章。
Readable.from():从可迭代对象创建可读流
stream.Readable.from(iterable, [options])
这是一种实用方法,用于从迭代器中创建可读流,该迭代器保存可迭代对象中包含的数据。可迭代对象可以是同步可迭代对象或异步可迭代对象。参数选项是可选的,除其他作用外,还可以用于指定文本编码。
const { Readable } = require('stream'); async function * generate() { yield 'hello'; yield 'streams'; } const readable = Readable.from(generate()); readable.on('data', (chunk) => { console.log(chunk); });
两种读取模式
根据 Streams API,可读流有效地以两种模式之一运行:flowing和paused。可读流可以处于对象模式,无论处于 flowing 模式还是 paused 模式。
EventEmitter
接口使用事件将其尽快提供给程序。stream.read()
方法以从流中读取数据块。在 flowing 模式中,要从流中读取数据,可以监听数据事件并附加回调。当有大量数据可用时,可读流将发出一个数据事件,并执行你的回调。看下面的代码片段:
var fs = require("fs"); var data = ''; var readerStream = fs.createReadStream('file.txt'); //Create a readable stream readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. // Handle stream events --> data, end, and error readerStream.on('data', function(chunk) { data += chunk; }); readerStream.on('end',function() { console.log(data); }); readerStream.on('error', function(err) { console.log(err.stack); }); console.log("Program Ended");
函数调用 fs.createReadStream()
给你一个可读流。最初流处于静态状态。一旦你侦听数据事件并附加了回调,它就会开始流动。之后将读取大块数据并将其传递给你的回调。流实现者决定发送数据事件的频率。例如,每当有几 KB 的数据被读取时,HTTP 请求就可能发出一个数据事件。当从文件中读取数据时,你可能会决定读取一行后就发出数据事件。
当没有更多数据要读取(结束)时,流将发出结束事件。在以上代码段中,我们监听此事件以在结束时得到通知。
另外,如果有错误,流将发出并通知错误。
在 paused 模式下,你只需在流实例上重复调用 read()
,直到读完所有数据块为止,如以下示例所示:
var fs = require('fs'); var readableStream = fs.createReadStream('file.txt'); var data = ''; var chunk; readableStream.on('readable', function() { while ((chunk=readableStream.read()) != null) { data += chunk; } }); readableStream.on('end', function() { console.log(data) });
read()
函数从内部缓冲区读取一些数据并将其返回。当没有内容可读取时返回 null
。所以在 while
循环中,我们检查是否为 null
并终止循环。请注意,当可以从流中读取大量数据时,将会发出可读事件。
所有 Readable
流均以 paused 模式开始,但可以通过以下方式之一切换为 flowing 模式:
stream.resume()
方法。stream.pipe()
方法将数据发送到可写对象。Readable
可以使以下方法之一切换回 paused 模式:
stream.pause()
方法。stream.unpipe()
方法来删除多个管道目标。一个需要记住的重要概念是,除非提供了一种用于消耗或忽略该数据的机制,否则 Readable
将不会生成数据。如果使用机制被禁用或取消,则 Readable
将会试图停止生成数据。添加 readable
事件处理会自动使流停止 flowing,并通过 read.read()
得到数据。如果删除了 readable
事件处理,那么如果存在 'data' 事件处理,则流将再次开始 flowing。
如何创建可写流?
要将数据写入可写流,你需要在流实例上调用 write()
。如以下示例所示:
var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { writableStream.write(chunk); });
上面的代码很简单。它只是简单地从输入流中读取数据块,并使用 write()
写入目的地。该函数返回一个布尔值,指示操作是否成功。如果为 true
,则写入成功,你可以继续写入更多数据。如果返回 false
,则表示出了点问题,你目前无法写任何内容。可写流将通过发出 drain
事件来通知你什么时候可以开始写入更多数据。
调用 writable.end()
方法表示没有更多数据将被写入 Writable。如果提供,则可选的回调函数将作为 finish
事件的侦听器附加。
// Write 'hello, ' and then end with 'world!'. const fs = require('fs'); const file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!'); // Writing more now is not allowed!
你可以用可写流从可读流中读取数据:
const Stream = require('stream') const readableStream = new Stream.Readable() const writableStream = new Stream.Writable() writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()) next() } readableStream.pipe(writableStream) readableStream.push('ping!') readableStream.push('pong!') writableStream.end()
还可以用异步迭代器来写入可写流,建议使用
import * as util from 'util'; import * as stream from 'stream'; import * as fs from 'fs'; import {once} from 'events'; const finished = util.promisify(stream.finished); // (A) async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // (B) // Handle backpressure await once(writable, 'drain'); } } writable.end(); // (C) // Wait until done. Throws if there are errors. await finished(writable); } await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt'); assert.equal( fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}), 'One line of text.\n');
stream.finished()
的默认版本是基于回调的,但是可以通过 util.promisify()
转换为基于 Promise 的版本(A行)。
在此例中,使用以下两种模式:
Writing to a writable stream while handling backpressure (line B):
在处理 backpressure
时写入可写流(B行):
if (!writable.write(chunk)) { await once(writable, 'drain'); }
关闭可写流,并等待写入完成(C行):
writable.end(); await finished(writable);
pipeline(管道)是一种机制,可以将一个流的输出作为另一流的输入。它通常用于从一个流中获取数据并将该流的输出传递到另一个流。管道操作没有限制。换句话说,管道可用于分多个步骤处理流数据。
在 Node 10.x 中引入了 stream.pipeline()
。这是一种模块方法,用于在流转发错误和正确清理之间进行管道传输,并在管道完成后提供回调。
这是使用管道的例子:
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); // 使用 pipeline API 可以轻松将一系列流 // 通过管道传输在一起,并在管道完全完成后得到通知。 // 一个有效地用 gzip压缩巨大视频文件的管道: pipeline( fs.createReadStream('The.Matrix.1080p.mkv'), zlib.createGzip(), fs.createWriteStream('The.Matrix.1080p.mkv.gz'), (err) => { if (err) { console.error('Pipeline failed', err); } else { console.log('Pipeline succeeded'); } } );
由于pipe
不安全,应使用 pipeline
代替 pipe
。
流模块
Node.js 流模块 提供了构建所有流 API 的基础。
Stream 模块是 Node.js 中默认提供的原生模块。 Stream 是 EventEmitter 类的实例,该类在 Node 中异步处理事件。因此流本质上是基于事件的。
要访问流模块:
const stream = require('stream');
stream
模块对于创建新型流实例非常有用。通常不需要使用 stream
模块来消耗流。
流驱动的 Node API
由于它们的优点,许多 Node.js 核心模块提供了原生流处理功能,最值得注意的是:
net.Socket
是流所基于的主 API 节点,它是以下大多数 API 的基础process.stdin
返回连接到 stdin 的流process.stdout
返回连接到 stdout 的流process.stderr
返回连接到 stderr 的流fs.createReadStream()
创建一个可读的文件流fs.createWriteStream()
创建可写的文件流net.connect()
启动基于流的连接http.request()
返回 http.ClientRequest
类的实例,它是可写流zlib.createGzip()
使用gzip(一种压缩算法)将数据压缩到流中zlib.createGunzip()
解压缩 gzip 流。zlib.createDeflate()
deflate(压缩算法)将数据压缩到流中zlib.createInflate()
解压缩一个deflate流流 备忘单:
查看更多:Node.js 流速查表
以下是与可写流相关的一些重要事件:
error
– パイプラインの書き込みまたは構成中にエラーが発生したことを示します。 pipeline
– このイベントは、読み取り可能なストリームが書き込み可能なストリームに渡されるときに、書き込み可能なストリームによって発行されます。 unpipe
– 読み取り可能なストリームに対して unpipe を呼び出し、宛先ストリームへのパイプ処理を停止すると発行されます。 これがストリームに関する基本のすべてです。ストリーム、パイプ、チェーンは Node.js の中核で最も強力な機能です。ストリームは、I/O を実行するための簡潔で効率的なコードを作成するのに非常に役立ちます。
また、BOB と呼ばれる、期待に値する Node.js 戦略計画 があります。これは、Node.js の内部データ フローを改善することを目的としており、できれば将来的には、Node.js ストリーミング データ インターフェイスのパブリック API。
英語のオリジナルアドレス: https://nodesource.com/blog/ Understanding-streams-in-nodejs
著者: Liz Parody
翻訳: Crazy Technology house
関連する推奨事項:nodejs チュートリアル
以上がNode.js のストリームについての深い理解の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。