Node.js でストリーム (読み取り可能、書き込み可能、二重および変換ストリーム) を実装する方法について話しましょう。
この記事では、Node における Stream を理解し、Readable Stream、Writable Stream、Duplex Stream、Conversion Stream を実現するための Stream 導入方法を紹介します。 。 ヘルプ!
ストリームの紹介
このような要件があると仮定すると、あるファイルの内容を別のファイルにコピーする必要があります。次のコードを書き込みます。
const fs = require('fs'); const path = require('path'); const copy = (source, target) => { fs.readFile(path.resolve(source), (err, data) => { if(err) { throw new Error(err.toString()); return; } fs.writeFile(path.resolve(target), data, (err) => { if(!err) { console.log("复制成功!"); } }) }) }
上記のコードは非常に単純です。つまり、最初に source
ファイルの内容を読み取り、次にその内容を target## に書き込みます。 # ファイル 。その特徴は、
source の
すべての内容 を読み取って、その内容を target に書き込む必要があることです。
で、フローを使用する前と後のデータの流れを生き生きと示すアニメーション画像を見つけました
は、特に大きなファイルの処理に使用される Stream API を提供します。水の流れのようにデータを部分ごとに処理するため、このモジュールの名前はストリームと呼ばれます。 const fs = require('fs');
function copy(source, target) {
const rs = fs.createReadStream(source);
const ws = fs.createWriteStream(target);
rs.on('data', data => {
ws.write(data);
});
rs.on('end', () => {
ws.end();
});
}
#ストリームの分類
ストリームは 4 つのカテゴリに分類できます
Readable: 読み取り可能なストリーム、データ プロバイダー
# Writeable: 書き込み可能なストリーム、データのコンシューマー- Duplex: 書き込み可能および読み取り可能なストリーム (二重ストリーム)
- Transform: これは Duplex の特殊なケースであり、入力用の変換ストリームです。データは処理され、次に出力 #読み取り可能なストリームと書き込み可能なストリームが基本です。一般的な読み取り可能なストリームと書き込み可能なストリームは次のとおりです。
#HTTP リクエスト | |
---|---|
fs 書き込みストリーム | |
process.stdout | |
TCP ソケット | |
暗号ストリーム | #暗号ストリーム|
# Stream 是 EventEmitter 的实例,有自定义的事件。 Readable Stream可读流有两个模式,暂停模式与流动模式。当我们创建一个流时,如果我们监听了
const fs = require('fs'); const rs = fs.createReadStream('a.txt', { highWaterMark: 1 // 缓存区最多存储 1 字节 }); rs.on('readable', () => { let data; while(data=rs.read()) { console.log(data.toString()); } }) ログイン後にコピー 上面的程序设置 当我们为可读流绑定 const fs = require('fs'); const rs = fs.createReadStream('a.txt', { highWaterMark: 2 }); rs.on('data', data => { console.log(data.toString()); }); rs.on('end', () => { console.log("文件读取完毕!"); }); ログイン後にコピー 暂停模式像是手动步枪,而流动模式则像是自动步枪。暂停模式与流动模式也可以相互切换,通过 可读流的一个经典实例就是 const http = require('http'); const app = http.createServer(); app.on('request', (req, res) => { let datas = []; req.on('data', data => { datas.push(data); }); req.on('end', () => { req.body = Buffer.concat(datas); // 当读取完 body 中的内容之后,将内容返回给客户端 res.end(req.body); }); }) app.listen(3000, () => { console.log("服务启动在 3000 端口... ..."); }) ログイン後にコピー Writable Stream可写流与可读流相似,当我们向可写流写入数据时(通过可写流的 当缓冲区中的数据全部被消费完了(写入了文件中或者被别的流消费了),那么就会触发 const fs = require('fs'); const ws = fs.createWriteStream('b.txt', { highWaterMark: 16 * 1024 }); function writeMillionTimes(writer, data, encoding, callback) { let i = 10000; write(); function write() { // 表示是否可以向可写流中写入数据 let ok = true; while(i-- > 0 && ok) { // 当 writer.write() 方法返回 false 表示不可写入数据 ok = writer.write(data, encoding, i === 0 ? callback : null); } if(i > 0) { // 说明 ok 为 false,即不能向缓冲区中写入内容了 console.log("drain", i); // 监听 drain 事件,当队列消费完毕时继续调用 write() 方法写入 writer.once('drain', write); } } } writeMillionTimes(ws, 'simple', 'utf-8', () => { console.log("end"); }) ログイン後にコピー 输出为 drain 7268 drain 4536 drain 1804 end ログイン後にコピー 说明有三次缓冲区中的内容达到了 16KB,可以验算上面的数字之间的差值,在乘以 我们还可以调用可写流的 const fs = require('fs'); const ws = fs.createWriteStream('b.txt'); ws.write('Hello'); ws.write('World'); ws.end('!'); ws.on('close', () => { console.log("close"); // close }) ログイン後にコピー 当调用 const fs = require('fs'); const ws = fs.createWriteStream('b.txt'); ws.write('Hello'); ws.write('World'); ws.end('!'); ws.write('write again'); // Error [ERR_STREAM_WRITE_AFTER_END]: write after end ログイン後にコピー 当调用 const fs = require('fs'); const ws = fs.createWriteStream('b.txt'); ws.write('Hello'); ws.write('World'); ws.end('!'); ws.on('close', () => { console.log("close"); }); ws.on('finish', () => { console.log("finish"); }); ログイン後にコピー 打印结果是 finish close ログイン後にコピー 说明 可写流的经典例子就是 const http = require('http'); const fs = require('fs'); const app = http.createServer(); app.on('request', (req, res) => { const rs = fs.createReadStream('index.html'); rs.on('data', data => { res.write(data); }) rs.on('end', () => { res.end() }); }); app.listen(3000, () => { console.log("服务启动在 3000 端口 ... ..."); }) ログイン後にコピー Duplex Stream 与 Transform StreamDuplex,即双工的意思,它既可以接收数据,也可以输出数据,它的输入和输出之间可以没有任何的关系,就像是一个部件内部有两个独立的系统。Duplex 继承了可读流(Readable),并且拥有可写流(Writable)的所有方法。 Transform Stream 继承了 Duplex Stream,它同样具有可读流与可写流的能力,并且它的输出与输入之间是有关系的,中间做了一次转换。常见的转换流有 出于文章结构的考虑,在这里不详细讲解这两个流,在后文中会实现这两个流,以加深对这两个流的理解。 pipe我们可以混合使用可读流与可写流来进行文件的复制 const fs = require('fs'); function copy(source, target) { const rs = fs.createReadStream(source); const ws = fs.createWriteStream(target); rs.on('data', data => { ws.write(data); }); rs.on('end', () => { ws.end(); }); } copy('a.txt', 'b.txt'); ログイン後にコピー 但是上面的写法却不被建议使用,因为没有考虑到可读流与可写流速度之间的差异,如果可读流输出数据的速度大于可写流写入数据的速度,这个时候就会有数据一直堆压在缓存区,导致占用过高的内存,专业术语叫做积压。 我们需要改善上面的程序,具体做法就是当 const fs = require('fs'); function copy(source, target) { const rs = fs.createReadStream(source); const ws = fs.createWriteStream(target); rs.on('data', data => { if (!ws.write(data)) { rs.pause(); } }); rs.on('end', () => { ws.end(); }); ws.on('drain', () => { rs.resume(); }) } ログイン後にコピー 那是不是每次我们使用流都需要写这么多的代码,当然不是。官方为可读流提供了一个 const fs = require('fs'); function copy(source, target) { const rs = fs.createReadStream(source); const ws = fs.createWriteStream(target); rs.pipe(ws); } ログイン後にコピー 当我们调用 Readable.prototype.pipe = function(ws) { this.on('data', data => { if (!ws.write(data)) { this.pause(); } }); ws.on('drain', () => { this.resume(); }); // 触发 pipe 事件 ws.emit('pipe', this); // 返回可写流,以支持链式调用 return ws; } ログイン後にコピー 这里给出官网画的一个有关 +===================+ x--> Piping functions +--> src.pipe(dest) | x are set up during |===================| x the .pipe method. | Event callbacks | +===============+ x |-------------------| | Your Data | x They exist outside | .on('close', cb) | +=======+=======+ x the data flow, but | .on('data', cb) | | x importantly attach | .on('drain', cb) | | x events, and their | .on('unpipe', cb) | +---------v---------+ x respective callbacks. | .on('error', cb) | | Readable Stream +----+ | .on('finish', cb) | +-^-------^-------^-+ | | .on('end', cb) | ^ | ^ | +-------------------+ | | | | | ^ | | ^ ^ ^ | +-------------------+ +=================+ ^ | ^ +----> Writable Stream +---------> .write(chunk) | | | | +-------------------+ +=======+=========+ | | | | | ^ | +------------------v---------+ ^ | +-> if (!chunk) | Is this chunk too big? | ^ | | emit .end(); | Is the queue busy? | | | +-> else +-------+----------------+---+ | ^ | emit .write(); | | | ^ ^ +--v---+ +---v---+ | | ^-----------------------------------< No | | Yes | ^ | +------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---------------^-----------------------+ return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^------------^-----------------------< Buffering | | | |============| | +> emit .drain(); | ^Buffer^ | | +> emit .resume(); +------------+ | | ^Buffer^ | | +------------+ add chunk to queue | | <---^---------------------< +============+ ログイン後にコピー 实现流在本节中我们来实现具体的流,通过实现流可以进一步加深对 Stream 内部工作细节的理解。 实现可读流上面我们都是通过 const { Readable } = require('stream'); class IeteratorReadableStream extends Readable { constructor(iterator) { super(); this.iterator = iterator; } _read() { let data = this.iterator.next(); // console.log(data); if(data.done) { this.push(null); } else { // 必须 push 字符串或者 Buffer this.push(data.value+''); } } } module.exports = IeteratorReadableStream; ログイン後にコピー 上述我们实现了一个可读流,可读流接收一个迭代器作为参数,这个迭代器作为这个可读流的数据源。可读流会自动的调用 所以可读流的执行逻辑为,每次调用 我们创建一个迭代器作为数据源传入 const IeteratorReadableStream = require('./IteratorReadableStream'); function *getData() { for(let i = 0; i < 5; i++) { yield i; } } let rs = new IeteratorReadableStream(getData()); rs.on('data', data => { console.log(data.toString()); }); rs.on('end', () => { console.log("迭代结束"); }); ログイン後にコピー 输出为 0 1 2 3 4 迭代结束 ログイン後にコピー 实现可写流实现可写流的过程同实现可读流的过程类似,首先需要继承 const fs = require('fs'); const { Writable } = require('stream'); class FileWritableStream extends Writable { constructor(filepath) { super(); this.filepath = filepath; } _write(chunk, encoding, callback) { fs.appendFile(this.filepath, chunk, { encoding }, callback) } } ログイン後にコピー 上面我们实现了一个可写流,这个可写流接收一个文件路径作为参数,它的作用就是向这个文件中追加数据,每次当我们调用可写流的 process.stdin.pipe(new FileWritableStream('c.txt')); ログイン後にコピー 上面这行代码的作用就是将从标准输入的字符输出到 实现双工流Duplex Stream 既可以作为可读流,也可以作为可写流,并且它的输入与输出之间可以没有关系。Duplex Stream 继承了 Readable,并且拥有 Writable 的所有,我们只要分别实现 const { Duplex } = require('stream'); class CustomDuplexStream extends Duplex { constructor() { super(); this.currentCharCode = 65; } _read() { if(this.currentCharCode <= 90) { this.push(String.fromCharCode(this.currentCharCode++)) } else { this.push(null); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } ログイン後にコピー 上面双工流的可读流部分就是将大写的 26 个字母添加进了缓存区,而可写流部分就是直接将数据输出到控制台。可见双工流可读流与可写流之间并没有任何的关系 const dp = new CustomDuplexStream(); dp.write("1"); dp.write("2"); dp.end(); dp.pipe(process.stdout); ログイン後にコピー 输出为 1 2 ABCDEFGHIJKLMNOPQRSTUVWXYZ ログイン後にコピー 实现转换流Tranform Stream 是 Duplex 的特例,它也是一个双工流,不过它的输入和输出之间有关联,它的内部通过 const { Transform } = require('stream'); class UpperTransformStream extends Transform { _transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } } ログイン後にコピー 上面我们实现了一个转换流,它可以将输入的小写字符转化为大写字符然后输出 const ts = new UpperTransformStream(); const rs = fs.createReadStream('a.txt'); rs.pipe(ts).pipe(process.stdout); ログイン後にコピー 上面程序会读取 转换流在实际应用中还是比较多的,这里介绍一个 Node.js 内置的转换流 const zlib = require('zlib'); const fs = require('fs'); const args = process.argv.slice(2); const source = fs.createReadStream(args[0]); const target = fs.createWriteStream(args[1]); const gzip = zlib.createGzip(); source.pipe(gzip).pipe(target); ログイン後にコピー 我们可以通过 node gzip.js Graph.md Graph.md.gz ログイン後にコピー 来运行上面的程序,它可以将 Graph.md 使用 gzip 压缩为 Graph.md.gz。 文件大小从 同样的我们也可以通过 本篇文章到此结束,想必到这里已经对 Stream 的使用已经有所了解了,但是本篇文章并没有列举所有有关 Stream 的 API,如果想更加详细的了解 Stream 的 API,那么接下去就可以阅读官方文档(http://nodejs.cn/api/stream.html)了。 更多node相关知识,请访问:nodejs 教程!! 以上がNode.js でストリーム (読み取り可能、書き込み可能、二重および変換ストリーム) を実装する方法について話しましょう。の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。 このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
![]() ホットAIツール![]() Undresser.AI Undressリアルなヌード写真を作成する AI 搭載アプリ ![]() AI Clothes Remover写真から衣服を削除するオンライン AI ツール。 ![]() Undress AI Tool脱衣画像を無料で ![]() Clothoff.ioAI衣類リムーバー ![]() AI Hentai GeneratorAIヘンタイを無料で生成します。 ![]() 人気の記事
R.E.P.O.説明されたエネルギー結晶と彼らが何をするか(黄色のクリスタル)
2週間前
By 尊渡假赌尊渡假赌尊渡假赌
レポ:チームメイトを復活させる方法
4週間前
By 尊渡假赌尊渡假赌尊渡假赌
ハローキティアイランドアドベンチャー:巨大な種を手に入れる方法
4週間前
By 尊渡假赌尊渡假赌尊渡假赌
スプリットフィクションを打ち負かすのにどれくらい時間がかかりますか?
3週間前
By DDD
R.E.P.O.ファイルの保存場所:それはどこにあり、それを保護する方法は?
3週間前
By DDD
![]() ホットツール![]() メモ帳++7.3.1使いやすく無料のコードエディター ![]() SublimeText3 中国語版中国語版、とても使いやすい ![]() ゼンドスタジオ 13.0.1強力な PHP 統合開発環境 ![]() ドリームウィーバー CS6ビジュアル Web 開発ツール ![]() SublimeText3 Mac版神レベルのコード編集ソフト(SublimeText3) ![]() ホットトピック
Gmailメールのログイン入り口はどこですか?
![]() ![]()
Java チュートリアル
![]() ![]()
CakePHP チュートリアル
![]() ![]()
Laravel チュートリアル
![]() ![]()
PHP チュートリアル
![]() ![]() ![]() ノンブロッキングおよびイベント駆動に基づいて構築されたノード サービスには、メモリ消費量が少ないという利点があり、大量のネットワーク リクエストの処理に非常に適しています。大量のリクエストを前提として、「メモリ制御」に関する問題を考慮する必要があります。 1. V8 のガベージ コレクション メカニズムとメモリ制限 Js はガベージ コレクション マシンによって制御されます ![]() この記事では、NodeJS V8 エンジンのメモリとガベージ コレクター (GC) について詳しく説明します。 ![]() Node 19 が正式リリースされましたので、この記事では Node.js 19 の 6 つの主要な機能について詳しく説明します。 ![]() ファイル モジュールは、ファイルの読み取り/書き込み/開く/閉じる/削除の追加など、基礎となるファイル操作をカプセル化したものです。ファイル モジュールの最大の特徴は、すべてのメソッドが **同期** と ** の 2 つのバージョンを提供することです。 asynchronous**、sync サフィックスが付いているメソッドはすべて同期メソッドであり、持たないメソッドはすべて異種メソッドです。 ![]() ノード用の Docker イメージの選択は些細なことのように思えるかもしれませんが、イメージのサイズと潜在的な脆弱性は、CI/CD プロセスとセキュリティに大きな影響を与える可能性があります。では、最適な Node.js Docker イメージを選択するにはどうすればよいでしょうか? ![]() ノードが npm コマンドを使用できない理由は、環境変数が正しく設定されていないためです。解決策は次のとおりです: 1. 「システムのプロパティ」を開きます; 2. 「環境変数」->「システム変数」を見つけて、環境を編集します。変数; 3.nodejs フォルダーの場所を見つけます; 4.「OK」をクリックします。 ![]() Node.js はどのように GC (ガベージ コレクション) を行うのでしょうか?次の記事で詳しく説明します。 ![]() イベント ループは Node.js の基本的な部分であり、メイン スレッドがブロックされていないことを確認することで非同期プログラミングが可能になります。イベント ループを理解することは、効率的なアプリケーションを構築するために重要です。次の記事では、Node のイベント ループについて詳しく説明します。お役に立てれば幸いです。 ![]() |