Maison > interface Web > js tutoriel > le corps du texte

Une brève discussion sur Node.js : comprendre les flux

高洛峰
Libérer: 2016-12-28 13:22:31
original
1128 Les gens l'ont consulté

Stream est une interface abstraite dans node.js, basée sur EventEmitter, et est également une encapsulation avancée de Buffer, utilisée pour traiter les données en streaming. Le module stream fournit diverses API afin que nous puissions utiliser Stream facilement. Les flux

sont divisés en quatre types, comme suit :

Flux lisible et lisible

Flux inscriptible et inscriptible

Duplex, lecture et écriture stream

Transform, Duplex étendu, peut modifier les données écrites

1 Flux lisible lisible

Un flux lisible peut être créé via stream.Readable, il a deux modes : faites une pause et coulez.

En mode flux, les données seront automatiquement lues à partir du système en aval et sorties à l'aide de l'événement de données ; en mode pause, la méthode stream.read() doit être explicitement appelée pour lire les données et déclencher. l'événement de données.

Tous les flux lisibles sont initialement en mode pause et peuvent être basculés en mode flux via les méthodes suivantes :

Écoutez l'événement « données »

Flux d'appel .Méthode curriculum vitae()

Appelez la méthode stream.pipe() pour générer des données dans un flux inscriptible. Writable

De même, vous pouvez également passer en mode pause :

Si la cible du canal n'est pas définie, appelez simplement la méthode stream.pause().

Si la cible du canal est définie, vous devez supprimer tous les écouteurs de données et appeler la méthode stream.unpipe()

Il y a un objet _readableSate dans l'objet Readable, à travers lequel vous pouvez obtenir Sachez dans quel mode se trouve actuellement le flux, comme indiqué ci-dessous :

readable._readableState.flowing = null, il n'y a pas de consommateur de données, le flux ne produit pas de données

readable._readableState.flowing = vrai, c'est le mode fluide

readable._readableState.flowing = false, en mode pause

Pourquoi utiliser le streaming pour obtenir des données

Pour les petits fichiers, utilisez la méthode fs.readFile() pour lire Il est plus pratique de récupérer des données, mais lorsque vous avez besoin de lire des fichiers volumineux, tels que des fichiers de plusieurs Go, l'utilisation de cette méthode consommera beaucoup de mémoire et entraînera même la lecture du programme. accident. Dans ce cas, il est plus approprié d'utiliser des flux pour le traitement. L'utilisation de la lecture segmentée ne provoquera pas de problème « d'explosion » de mémoire.

l'événement de données

est déclenché lorsque le flux fournit des blocs de données aux consommateurs. Cela peut se produire lors du passage en mode flux, ou lorsque la méthode readable.read() est appelée et qu'il y en a. Lorsque des blocs de données valides sont utilisés, utilisez ce qui suit :

const fs = require('fs');
 
const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
rs.on('data',(chunk)=>{
  chunkArr.push(chunk);
  chunkLen+=chunk.length;
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
Copier après la connexion

l'événement lisible

est déclenché lorsqu'il y a des données disponibles dans le flux qui peuvent être lues. deux types, nouveau de données disponibles et atteint la fin du flux, l'ancienne méthode stream.read() renvoie les données disponibles, et la seconde renvoie null, comme indiqué ci-dessous :

const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
 
rs.on('readable',()=>{
  var chunk = null;
  //这里需要判断是否到了流的末尾
  if((chunk = rs.read()) !== null){
    chunkArr.push(chunk);
    chunkLen+=chunk.length;
  }
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
Copier après la connexion

pause et reprise méthodes

stream La méthode .pause() met le flux en mode pause et arrête le déclenchement de l'événement 'data'. La méthode stream.resume() met le flux en mode flow et reprend le déclenchement de l'événement 'data'. . Il peut également être utilisé pour consommer toutes les données, comme indiqué ci-dessous :

const rs = fs.createReadStream('./下载.png');
rs.on('data',(chunk)=>{
  console.log(`接收到${chunk.length}字节数据...`);
  rs.pause();
  console.log(`数据接收将暂停1.5秒.`);
  setTimeout(()=>{
    rs.resume();
  },1000);
});
rs.on('end',(chunk)=>{
  console.log(`数据接收完毕`);
});
Copier après la connexion

méthode pipe(destination[, options])

la méthode pipe() lie un flux inscriptible à un flux lisible. flux et passe automatiquement en mode flux, toutes les données sont sorties vers un flux inscriptible et le flux de données est bien géré afin qu'aucune perte de données ne se produise :

const rs = fs.createReadStream('./app.js');
rs.pipe(process.stdout);
Copier après la connexion

Ce qui précède présente le. consommation de données de plusieurs méthodes de flux lisibles, mais pour un flux lisible, il est préférable de n'en choisir qu'une seule, et il est recommandé d'utiliser la méthode pipe().

2. Flux inscriptible

Tous les flux inscriptibles sont créés en fonction de la classe stream.Writable Après la création, les données peuvent être écrites dans le flux.

méthode write(chunk[, encoding][, callback])

la méthode write() écrit des données dans un flux accessible en écriture :

chunk, string ou buffer

encodage, si le chunk est une chaîne, c'est l'encodage du chunk

callback, la fonction de rappel lorsque les données actuelles du chunk sont écrites sur le disque

La valeur de retour de cette méthode est une valeur booléenne. Si elle est fausse, cela signifie que le bloc de données qui doit être écrit est mis en cache et que la taille du cache dépasse le seuil highWaterMark à ce moment-là, sinon c'est vrai. .

Utilisez ce qui suit :

const ws = fs.createWriteStream('./test.txt');
ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');});
ws.end('done.')
Copier après la connexion

Mécanisme de contre-pression

Si la vitesse d'écriture du flux inscriptible ne peut pas suivre la vitesse de lecture du flux lisible, les données ajoutées par la méthode d'écriture seront mises en cache et augmenteront progressivement, entraînant l'occupation d'une grande quantité de mémoire. Ce que nous espérons, c'est consommer une donnée puis lire une autre donnée, afin que la mémoire soit maintenue au même niveau. Comment faire cela ? Vous pouvez utiliser la valeur de retour de la méthode write pour déterminer l'état du cache du flux inscriptible et l'événement « drain », et changer le mode du flux lisible dans le temps, comme indiqué ci-dessous :

function copy(src,dest){
  src = path.resolve(src);
  dest = path.resolve(dest);
  const rs = fs.createReadStream(src);
  const ws = fs.createWriteStream(dest);
  console.log('正在复制中...');
  const stime = +new Date();
  rs.on('data',(chunk)=>{
    if(null === ws.write(chunk)){
      rs.pause();
    }
  });
  ws.on('drain',()=>{
    rs.resume();
  });
  rs.on('end',()=>{
    const etime = +new Date();
    console.log(`已完成,用时:${(etime-stime)/1000}秒`);
    ws.end();
  });
  function calcProgress(){
     
  }
}
copy('./CSS权威指南 第3版.pdf','./javascript.pdf');
Copier après la connexion

événement drain

Si la méthode Writable.write() renvoie false, l'événement drain sera déclenché, qui a été utilisé par le mécanisme de contre-pression ci-dessus.

événement de fin

Une fois la méthode stream.end() appelée et toutes les données du tampon écrites dans le système en aval, cet événement sera déclenché, comme indiqué ci-dessous :

const ws = fs.createWriteStream('./alphabet.txt');
const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
ws.on('finish',()=>{
  console.log('done.');
});
for(let letter of alphabetStr.split()){
  ws.write(letter);
}
ws.end();//必须调用
Copier après la connexion

Méthode end([chunk][, encoding][, callback])

Une fois la méthode

end() appelée, la méthode stream.write() ne peut plus être appelée pour écrire Les données responsables généreront l'erreur.

3. Flux de lecture et d'écriture duplex

Le flux duplex implémente les interfaces des classes Readable et Writable en même temps, et est à la fois un flux lisible et un flux inscriptible. Par exemple, les « flux zlib », les « flux crypto », les « sockets TCP », etc. sont tous des flux Duplex.

4. Transformer le flux

Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。

5、四种流的实现

stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

| Use-case | Class | Method(s) to implement |
| ------------- |-------------| -----|
| Reading only | Readable | _read |
| Writing only | Writable | _write, _writev |
| Reading and writing | Duplex | _read, _write, _writev |
| Operate on written data, then read the result | Transform | _transform, _flush |

Readable流实现

如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:

const Readable = require('stream').Readable;
const util = require('util');
const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
/*function AbReadable(){
  if(!this instanceof AbReadable){
    return new AbReadable();
  }
  Readable.call(this);
}
util.inherits(AbReadable,Readable);
AbReadable.prototype._read = function(){
  if(!alphabetArr.length){
    this.push(null);
  }else{
    this.push(alphabetArr.shift());
  }
};
 
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*class AbReadable extends Readable{
  constructor(){
    super();
  }
  _read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
}
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*const abReadable = new Readable({
  read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
});
abReadable.pipe(process.stdout);*/
 
const abReadable = Readable();
abReadable._read = function(){
  if (!alphabetArr.length) {
    this.push(null);
  } else {
    this.push(alphabetArr.shift());
  }
}
abReadable.pipe(process.stdout);
Copier après la connexion

以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。

Writable流实现

我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):

/*class MyWritable extends Writable{
  constructor(){
    super();
  }
  _write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
}
const myWritable = new MyWritable();*/
const myWritable = new Writable({
  write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
});
myWritable.on('finish',()=>{
  process.stdout.write('done');
})
myWritable.write('a');
myWritable.write('b');
myWritable.write('c');
myWritable.end();
Copier après la connexion

Duplex流实现

实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:

class MyDuplex extends Duplex{
  constructor(){
    super();
    this.source = [];
  }
  _read(){
    if (!this.source.length) {
      this.push(null);
    } else {
      this.push(this.source.shift());
    }
  }
  _write(chunk,encoding,cb){
    this.source.push(chunk);
    cb();
  }
}
 
const myDuplex = new MyDuplex();
myDuplex.on('finish',()=>{
  process.stdout.write('write done.')
});
myDuplex.on('end',()=>{
  process.stdout.write('read done.')
});
myDuplex.write('\na\n');
myDuplex.write('c\n');
myDuplex.end('b\n');
myDuplex.pipe(process.stdout);
Copier après la connexion

上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。

Transform流实现

实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:

class MyTransform extends Transform{
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    chunk = (chunk+'').toUpperCase();
    callback(null,chunk);
  }
}
const myTransform = new MyTransform();
myTransform.write('hello world!');
myTransform.end();
myTransform.pipe(process.stdout);
Copier après la connexion

上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:

_transform(chunk, encoding, callback){
  chunk = (chunk+'').toUpperCase()
  this.push(chunk)
  callback();
}
Copier après la connexion

Object Mode流实现

我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:

const rs = Readable();
rs.push('a');
rs.push('b');
rs.push(null);
rs.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>
 
const rs1 = Readable({objectMode:!0});
rs1.push(&#39;a&#39;);
rs1.push(&#39;b&#39;);
rs1.push(null);
rs1.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//a与b
Copier après la connexion

下面利用Transform流实现一个简单的CSS压缩工具,如下所示:

function minify(src,dest){
  const transform = new Transform({
    transform(chunk,encoding,cb){
      cb(null,(chunk.toString()).replace(/[\s\r\n\t]/g,&#39;&#39;));
    }
  });
  fs.createReadStream(src,{encoding:&#39;utf8&#39;}).pipe(transform).pipe(fs.createWriteStream(dest));
}
minify(&#39;./reset.css&#39;,&#39;./reset.min.css&#39;);
Copier après la connexion

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持PHP中文网。

更多浅谈Node.js:理解stream相关文章请关注PHP中文网!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal
À propos de nous Clause de non-responsabilité Sitemap
Site Web PHP chinois:Formation PHP en ligne sur le bien-être public,Aidez les apprenants PHP à grandir rapidement!