ホームページ > ウェブフロントエンド > jsチュートリアル > Node.js の簡単な説明: ストリームについて

Node.js の簡単な説明: ストリームについて

高洛峰
リリース: 2016-12-28 13:22:31
オリジナル
1161 人が閲覧しました

Stream は、EventEmitter に基づく、node.js の抽象インターフェイスであり、ストリーミング データの処理に使用される Buffer の高度なカプセル化でもあります。ストリームモジュールは、ストリームを簡単に利用できるようにさまざまな API を提供します。

ストリームは次の 4 つのタイプに分類されます:

Readable、読み取り可能なストリーム

Writable、書き込み可能なストリーム

Duplex、読み書き可能なストリーム

Transform、拡張 Duplex、書き込まれたデータを変更可能

1. stream

読み取り可能なストリームは、一時停止とフローの 2 つのモードがある stream.Readable を通じて作成できます。

フロー モードでは、データはダウンストリーム システムから自動的に読み取られ、データ イベントを使用して出力されます。一時停止モードでは、stream.read() メソッドを明示的に呼び出してデータを読み取り、データ イベントをトリガーする必要があります。

すべての読み取り可能なストリームは、最初は一時停止モードにありますが、次のメソッドを通じてフロー モードに切り替えることができます:

「data」イベントをリッスンする

stream.resume() メソッドを呼び出す

stream.pipe( ) メソッドを使用して、データを書き込み可能なストリームに出力します。Writable

同様に、一時停止モードに切り替えることもできます。メソッドは 2 つあります:

パイプターゲットが設定されていない場合は、stream.pause() メソッドを呼び出します。

パイプターゲットが設定されている場合は、すべてのデータモニターを削除し、stream.unpipe()メソッドを呼び出す必要があります

Readableオブジェクトには_readableSateオブジェクトがあり、これを通じてストリームが現在どのモードにあるかを知ることができます。以下のように表示されます:

readable._readableState.flowing = null、データコンシューマーが存在せず、ストリームはデータを生成しません

readable._readableState.flowing = true、フローモードの場合

readable._readableState.flowing = false、in一時停止モード

データの取得にストリームを使用する理由

小さなファイルの場合は、fs.readFile() メソッドを使用してデータを読み取る方が便利ですが、数 G のファイルなどの大きなファイルを読み取る必要がある場合は、サイズが大きい場合、この方法を使用すると大量のメモリが消費され、プログラムがクラッシュすることもあります。この場合、処理にストリームを使用する方が適切です。セグメント化された読み取りを使用すると、メモリの「爆発」問題が発生しません。

data イベント

は、ストリームがデータ ブロックをコンシューマに提供するとき、または readable.read() メソッドが呼び出され、有効なデータ ブロックが存在するときにトリガーされます。以下:

const fs = require('fs');
 
const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
rs.on('data',(chunk)=>{
  chunkArr.push(chunk);
  chunkLen+=chunk.length;
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
ログイン後にコピー

readableイベント

は、ストリーム内に読み取り可能なデータがある場合にトリガーされます。これは、新しい利用可能なデータとストリームの最後に到達した場合の2つのタイプに分けられます。以下に示すように、メソッドは利用可能なデータを返し、後者は null を返します。

const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
 
rs.on('readable',()=>{
  var chunk = null;
  //这里需要判断是否到了流的末尾
  if((chunk = rs.read()) !== null){
    chunkArr.push(chunk);
    chunkLen+=chunk.length;
  }
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
ログイン後にコピー

一時停止メソッドと再開メソッド

stream.pause() メソッドはストリームを一時停止モードにし、「data」イベントの発生を停止します。 () メソッドは、ストリームをフロー モードに設定し、以下に示すように、すべてのデータの消費にも使用できる「data」イベント トリガーを復元します。メソッドは、書き込み可能なストリームを読み取り可能なストリームにバインドし、自動的にフロー モードに切り替え、すべてのデータを書き込み可能なストリームに出力し、データ損失の問題を引き起こすことなくデータ ストリームを適切に管理します。 使用方法は次のとおりです。では、複数の読み取り可能なストリームのデータ消費方法を紹介していますが、読み取り可能なストリームの場合は、そのうちの 1 つだけを選択するのが最善であり、pipe() メソッドを使用することをお勧めします。

2. 書き込み可能なストリーム

すべての書き込み可能なストリームは、stream.Writable クラスに基づいて作成されます。作成後、データをストリームに書き込むことができます。

write(chunk[, encoding][, callback]) メソッド

write() メソッドは、書き込み可能なストリームにデータを書き込みます。 パラメータの意味:

chunk、文字列またはバッファ


encoding (チャンクが String の場合)。チャンク

コールバックのエンコード、現在のチャンクデータがディスクに書き込まれるときのコールバック関数

このメソッドの戻り値はブール値です。 false の場合、書き込む必要があるデータブロックを意味します。がキャッシュされており、この時点でキャッシュのサイズが highWaterMark しきい値を超えています。それ以外の場合は true。

次のように使用します:

const rs = fs.createReadStream('./下载.png');
rs.on('data',(chunk)=>{
  console.log(`接收到${chunk.length}字节数据...`);
  rs.pause();
  console.log(`数据接收将暂停1.5秒.`);
  setTimeout(()=>{
    rs.resume();
  },1000);
});
rs.on('end',(chunk)=>{
  console.log(`数据接收完毕`);
});
ログイン後にコピー

バックプレッシャーメカニズム

書き込み可能なストリームの書き込み速度が読み取り可能なストリームの読み取り速度に追いつかない場合、書き込みメソッドによって追加されたデータはキャッシュされ、徐々に増加します。その結果、大量のスペースが占有されているメモリ。私たちが望んでいるのは、あるデータを消費してから別のデータを読み取って、メモリを同じレベルに維持することです。どうすればいいでしょうか?以下に示すように、write メソッドの戻り値を使用して、書き込み可能なストリームと「ドレイン」イベントのキャッシュ ステータスを判断し、時間に合わせて読み取り可能なストリームのモードを切り替えることができます。 Writable.write() メソッドが false を返すと、ドレイン イベントがトリガーされ、上記のバックプレッシャー メカニズムがすでにこのイベントを使用しています。

finish イベント


stream.end() メソッドが呼び出され、すべてのバッファ データがダウンストリーム システムに書き込まれた後、以下に示すように、このイベントがトリガーされます。 [, callback]) method

end() メソッドが呼び出されると、データを書き込むために stream.write() メソッドを呼び出すことができなくなり、エラーがスローされます。


3. 二重読み書きストリーム

二重ストリームは、Readable クラスと Writable クラスのインターフェイスを同時に実装し、読み取り可能なストリームと書き込み可能なストリームの両方です。たとえば、「zlib ストリーム」、「暗号ストリーム」、「TCP ソケット」などはすべて二重ストリームです。

4. 変換フロー

Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。

5、四种流的实现

stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

| Use-case | Class | Method(s) to implement |
| ------------- |-------------| -----|
| Reading only | Readable | _read |
| Writing only | Writable | _write, _writev |
| Reading and writing | Duplex | _read, _write, _writev |
| Operate on written data, then read the result | Transform | _transform, _flush |

Readable流实现

如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:

const Readable = require('stream').Readable;
const util = require('util');
const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
/*function AbReadable(){
  if(!this instanceof AbReadable){
    return new AbReadable();
  }
  Readable.call(this);
}
util.inherits(AbReadable,Readable);
AbReadable.prototype._read = function(){
  if(!alphabetArr.length){
    this.push(null);
  }else{
    this.push(alphabetArr.shift());
  }
};
 
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*class AbReadable extends Readable{
  constructor(){
    super();
  }
  _read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
}
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*const abReadable = new Readable({
  read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
});
abReadable.pipe(process.stdout);*/
 
const abReadable = Readable();
abReadable._read = function(){
  if (!alphabetArr.length) {
    this.push(null);
  } else {
    this.push(alphabetArr.shift());
  }
}
abReadable.pipe(process.stdout);
ログイン後にコピー

以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。

Writable流实现

我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):

/*class MyWritable extends Writable{
  constructor(){
    super();
  }
  _write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
}
const myWritable = new MyWritable();*/
const myWritable = new Writable({
  write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
});
myWritable.on('finish',()=>{
  process.stdout.write('done');
})
myWritable.write('a');
myWritable.write('b');
myWritable.write('c');
myWritable.end();
ログイン後にコピー

Duplex流实现

实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:

class MyDuplex extends Duplex{
  constructor(){
    super();
    this.source = [];
  }
  _read(){
    if (!this.source.length) {
      this.push(null);
    } else {
      this.push(this.source.shift());
    }
  }
  _write(chunk,encoding,cb){
    this.source.push(chunk);
    cb();
  }
}
 
const myDuplex = new MyDuplex();
myDuplex.on('finish',()=>{
  process.stdout.write('write done.')
});
myDuplex.on('end',()=>{
  process.stdout.write('read done.')
});
myDuplex.write('\na\n');
myDuplex.write('c\n');
myDuplex.end('b\n');
myDuplex.pipe(process.stdout);
ログイン後にコピー

上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。

Transform流实现

实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:

class MyTransform extends Transform{
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    chunk = (chunk+'').toUpperCase();
    callback(null,chunk);
  }
}
const myTransform = new MyTransform();
myTransform.write('hello world!');
myTransform.end();
myTransform.pipe(process.stdout);
ログイン後にコピー

上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:

_transform(chunk, encoding, callback){
  chunk = (chunk+'').toUpperCase()
  this.push(chunk)
  callback();
}
ログイン後にコピー

Object Mode流实现

我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:

const rs = Readable();
rs.push('a');
rs.push('b');
rs.push(null);
rs.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>
 
const rs1 = Readable({objectMode:!0});
rs1.push(&#39;a&#39;);
rs1.push(&#39;b&#39;);
rs1.push(null);
rs1.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//a与b
ログイン後にコピー

下面利用Transform流实现一个简单的CSS压缩工具,如下所示:

function minify(src,dest){
  const transform = new Transform({
    transform(chunk,encoding,cb){
      cb(null,(chunk.toString()).replace(/[\s\r\n\t]/g,&#39;&#39;));
    }
  });
  fs.createReadStream(src,{encoding:&#39;utf8&#39;}).pipe(transform).pipe(fs.createWriteStream(dest));
}
minify(&#39;./reset.css&#39;,&#39;./reset.min.css&#39;);
ログイン後にコピー

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持PHP中文网。

更多浅谈Node.js:理解stream相关文章请关注PHP中文网!

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート