Heim > Web-Frontend > js-Tutorial > Eine kurze Diskussion über Node.js: Streams verstehen

Eine kurze Diskussion über Node.js: Streams verstehen

高洛峰
Freigeben: 2016-12-28 13:22:31
Original
1158 Leute haben es durchsucht

Stream ist eine abstrakte Schnittstelle in node.js, die auf EventEmitter basiert, und außerdem eine erweiterte Kapselung von Buffer, die zur Verarbeitung von Streaming-Daten verwendet wird. Das Stream-Modul stellt verschiedene APIs bereit, damit wir Stream einfach verwenden können.

Streams werden wie folgt in vier Typen unterteilt:

Lesbarer, lesbarer Stream

Beschreibbarer, beschreibbarer Stream

Duplex, Lesen und Schreiben stream

Transformation, erweitertes Duplex, kann die geschriebenen Daten ändern

1. Lesbarer lesbarer Stream

Ein lesbarer Stream kann über stream.Readable erstellt werden. Es gibt zwei Modi: innehalten und fließen.

Im Flow-Modus werden Daten automatisch vom Downstream-System gelesen und mithilfe des Datenereignisses ausgegeben. Im Pause-Modus muss die Methode stream.read() explizit aufgerufen werden, um die Daten zu lesen und auszulösen das Datenereignis.

Alle lesbaren Streams befinden sich zunächst im Pausenmodus und können mit den folgenden Methoden in den Fließmodus geschaltet werden:

Das Ereignis „Daten“ abhören

Call-Stream . resume()-Methode

Rufen Sie die stream.pipe()-Methode auf, um Daten in einen beschreibbaren Stream auszugeben.

Ähnlich können Sie auch in den Pausenmodus wechseln:

Wenn das Pipe-Ziel nicht festgelegt ist, rufen Sie einfach die Methode stream.pause() auf.

Wenn das Pipe-Ziel festgelegt ist, müssen Sie alle Daten-Listener entfernen und die Methode stream.unpipe() aufrufen.

Im Readable-Objekt befindet sich ein _readableSate-Objekt, über das Sie darauf zugreifen können Erfahren Sie, in welchem ​​Modus sich der Stream derzeit befindet, wie unten gezeigt:

readable._readableState.flowing = null, es gibt keinen Datenkonsumenten, der Stream erzeugt keine Daten

readable._readableState.flowing = wahr, es ist Fließmodus

readable._readableState.flowing = false, im Pausenmodus

Warum Streaming zum Abrufen von Daten verwenden

Für kleine Dateien verwenden Die Methode fs.readFile() zum Lesen ist bequemer, um Daten abzurufen. Wenn Sie jedoch große Dateien lesen müssen, z. B. Dateien mit einer Größe von mehreren G, verbraucht die Verwendung dieser Methode viel Speicher und führt sogar dazu, dass das Programm dies tut Absturz. In diesem Fall ist es sinnvoller, Streams für die Verarbeitung zu verwenden. Die Verwendung segmentierter Lesevorgänge führt nicht zu dem Problem der Speicherexplosion.

Datenereignis

wird ausgelöst, wenn der Stream Datenblöcke für Verbraucher bereitstellt. Dies kann der Fall sein, wenn in den Flow-Modus gewechselt wird oder wenn die Methode readable.read() aufgerufen wird Wenn gültige Datenblöcke verwendet werden, verwenden Sie Folgendes:

const fs = require('fs');
 
const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
rs.on('data',(chunk)=>{
  chunkArr.push(chunk);
  chunkLen+=chunk.length;
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
Nach dem Login kopieren

lesbares Ereignis

wird ausgelöst, wenn im Stream verfügbare Daten vorhanden sind, in die es unterteilt werden kann Es gibt zwei Arten: neue verfügbare Daten und Erreichen des Endes des Streams. Die erstere Methode stream.read() gibt die verfügbaren Daten zurück und die letztere gibt null zurück, wie unten gezeigt:

const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
 
rs.on('readable',()=>{
  var chunk = null;
  //这里需要判断是否到了流的末尾
  if((chunk = rs.read()) !== null){
    chunkArr.push(chunk);
    chunkLen+=chunk.length;
  }
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
Nach dem Login kopieren

Pause und Fortsetzen Methoden

stream Die .pause()-Methode versetzt den Stream in den Pausenmodus und stoppt die Auslösung des „Daten“-Ereignisses. Die stream.resume()-Methode versetzt den Stream in den Fließmodus und setzt die Auslösung des „Daten“-Ereignisses fort . Es kann auch verwendet werden, um alle Daten zu verbrauchen, wie unten gezeigt:

const rs = fs.createReadStream('./下载.png');
rs.on('data',(chunk)=>{
  console.log(`接收到${chunk.length}字节数据...`);
  rs.pause();
  console.log(`数据接收将暂停1.5秒.`);
  setTimeout(()=>{
    rs.resume();
  },1000);
});
rs.on('end',(chunk)=>{
  console.log(`数据接收完毕`);
});
Nach dem Login kopieren

pipe(destination[, options])-Methode

pipe()-Methode bindet einen beschreibbaren Stream an einen lesbaren Stream und wechselt automatisch in den Flow-Modus, alle Daten werden in einen beschreibbaren Stream ausgegeben und der Datenstrom wird gut verwaltet, sodass kein Datenverlust auftritt. Die Verwendung ist wie folgt:

const rs = fs.createReadStream('./app.js');
rs.pipe(process.stdout);
Nach dem Login kopieren

Das Obige führt das ein Datenverbrauch mehrerer lesbarer Streams. Für einen lesbaren Stream ist es jedoch am besten, nur eine davon auszuwählen, und es wird empfohlen, die Methode „pipe()“ zu verwenden.

2. Beschreibbarer Stream

Alle beschreibbaren Streams werden basierend auf der Klasse stream.Writable erstellt. Nach der Erstellung können Daten in den Stream geschrieben werden.

write(chunk[, binding][, callback])-Methode

write()-Methode schreibt Daten in einen beschreibbaren Stream. Parameterbedeutung:

Chunk, String oder Puffer

Codierung, wenn der Chunk eine Zeichenfolge ist, ist es die Chunk-Codierung

Callback, die Callback-Funktion, wenn die aktuellen Chunk-Daten auf die Festplatte geschrieben werden

Der Rückgabewert dieser Methode ist ein boolescher Wert. Wenn er falsch ist, bedeutet dies, dass der Datenblock, der geschrieben werden muss, zwischengespeichert ist und die Cachegröße zu diesem Zeitpunkt den HighWaterMark-Schwellenwert überschreitet, andernfalls ist er wahr .

Verwenden Sie Folgendes:

const ws = fs.createWriteStream('./test.txt');
ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');});
ws.end('done.')
Nach dem Login kopieren

Gegendruckmechanismus

Wenn die Schreibgeschwindigkeit des beschreibbaren Streams nicht mit der Lesegeschwindigkeit des lesbaren Streams mithalten kann, Die durch die Schreibmethode hinzugefügten Daten werden zwischengespeichert und nehmen allmählich zu, was dazu führt, dass viel Speicher belegt wird. Wir hoffen, ein Datenelement zu verbrauchen und dann ein anderes Datenelement zu lesen, damit der Speicher auf dem gleichen Niveau bleibt. Wie geht das? Sie können den Rückgabewert der Schreibmethode verwenden, um den Cache-Status des beschreibbaren Streams und des „Drain“-Ereignisses zu ermitteln und den Modus des lesbaren Streams rechtzeitig umzuschalten, wie unten gezeigt:

function copy(src,dest){
  src = path.resolve(src);
  dest = path.resolve(dest);
  const rs = fs.createReadStream(src);
  const ws = fs.createWriteStream(dest);
  console.log('正在复制中...');
  const stime = +new Date();
  rs.on('data',(chunk)=>{
    if(null === ws.write(chunk)){
      rs.pause();
    }
  });
  ws.on('drain',()=>{
    rs.resume();
  });
  rs.on('end',()=>{
    const etime = +new Date();
    console.log(`已完成,用时:${(etime-stime)/1000}秒`);
    ws.end();
  });
  function calcProgress(){
     
  }
}
copy('./CSS权威指南 第3版.pdf','./javascript.pdf');
Nach dem Login kopieren

Drain-Ereignis

Wenn die Methode Writable.write() false zurückgibt, wird das Drain-Ereignis ausgelöst und der obige Gegendruckmechanismus hat dieses Ereignis verwendet.

Finish-Ereignis

Nachdem die stream.end()-Methode aufgerufen und alle Pufferdaten in das Downstream-System geschrieben wurden, wird dieses Ereignis ausgelöst, wie unten gezeigt:

const ws = fs.createWriteStream('./alphabet.txt');
const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
ws.on('finish',()=>{
  console.log('done.');
});
for(let letter of alphabetStr.split()){
  ws.write(letter);
}
ws.end();//必须调用
Nach dem Login kopieren

end([chunk][, binding][, callback]) method

Nachdem die

end()-Methode aufgerufen wurde, kann die stream.write()-Methode nicht mehr aufgerufen werden write Die verantwortlichen Daten lösen einen Fehler aus.

3. Duplex-Lese- und Schreibstream

Der Duplex-Stream implementiert gleichzeitig die Schnittstellen der lesbaren und beschreibbaren Klassen und ist sowohl ein lesbarer als auch ein beschreibbarer Stream. Beispielsweise sind „Zlib-Streams“, „Krypto-Streams“, „TCP-Sockets“ usw. allesamt Duplex-Streams.

4. Fluss transformieren

Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。

5、四种流的实现

stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

| Use-case | Class | Method(s) to implement |
| ------------- |-------------| -----|
| Reading only | Readable | _read |
| Writing only | Writable | _write, _writev |
| Reading and writing | Duplex | _read, _write, _writev |
| Operate on written data, then read the result | Transform | _transform, _flush |

Readable流实现

如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:

const Readable = require('stream').Readable;
const util = require('util');
const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
/*function AbReadable(){
  if(!this instanceof AbReadable){
    return new AbReadable();
  }
  Readable.call(this);
}
util.inherits(AbReadable,Readable);
AbReadable.prototype._read = function(){
  if(!alphabetArr.length){
    this.push(null);
  }else{
    this.push(alphabetArr.shift());
  }
};
 
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*class AbReadable extends Readable{
  constructor(){
    super();
  }
  _read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
}
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*const abReadable = new Readable({
  read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
});
abReadable.pipe(process.stdout);*/
 
const abReadable = Readable();
abReadable._read = function(){
  if (!alphabetArr.length) {
    this.push(null);
  } else {
    this.push(alphabetArr.shift());
  }
}
abReadable.pipe(process.stdout);
Nach dem Login kopieren

以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。

Writable流实现

我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):

/*class MyWritable extends Writable{
  constructor(){
    super();
  }
  _write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
}
const myWritable = new MyWritable();*/
const myWritable = new Writable({
  write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
});
myWritable.on('finish',()=>{
  process.stdout.write('done');
})
myWritable.write('a');
myWritable.write('b');
myWritable.write('c');
myWritable.end();
Nach dem Login kopieren

Duplex流实现

实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:

class MyDuplex extends Duplex{
  constructor(){
    super();
    this.source = [];
  }
  _read(){
    if (!this.source.length) {
      this.push(null);
    } else {
      this.push(this.source.shift());
    }
  }
  _write(chunk,encoding,cb){
    this.source.push(chunk);
    cb();
  }
}
 
const myDuplex = new MyDuplex();
myDuplex.on('finish',()=>{
  process.stdout.write('write done.')
});
myDuplex.on('end',()=>{
  process.stdout.write('read done.')
});
myDuplex.write('\na\n');
myDuplex.write('c\n');
myDuplex.end('b\n');
myDuplex.pipe(process.stdout);
Nach dem Login kopieren

上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。

Transform流实现

实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:

class MyTransform extends Transform{
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    chunk = (chunk+'').toUpperCase();
    callback(null,chunk);
  }
}
const myTransform = new MyTransform();
myTransform.write('hello world!');
myTransform.end();
myTransform.pipe(process.stdout);
Nach dem Login kopieren

上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:

_transform(chunk, encoding, callback){
  chunk = (chunk+'').toUpperCase()
  this.push(chunk)
  callback();
}
Nach dem Login kopieren

Object Mode流实现

我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:

const rs = Readable();
rs.push('a');
rs.push('b');
rs.push(null);
rs.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>
 
const rs1 = Readable({objectMode:!0});
rs1.push(&#39;a&#39;);
rs1.push(&#39;b&#39;);
rs1.push(null);
rs1.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//a与b
Nach dem Login kopieren

下面利用Transform流实现一个简单的CSS压缩工具,如下所示:

function minify(src,dest){
  const transform = new Transform({
    transform(chunk,encoding,cb){
      cb(null,(chunk.toString()).replace(/[\s\r\n\t]/g,&#39;&#39;));
    }
  });
  fs.createReadStream(src,{encoding:&#39;utf8&#39;}).pipe(transform).pipe(fs.createWriteStream(dest));
}
minify(&#39;./reset.css&#39;,&#39;./reset.min.css&#39;);
Nach dem Login kopieren

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持PHP中文网。

更多浅谈Node.js:理解stream相关文章请关注PHP中文网!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage