Un flux lisible est un flux qui produit des données pour la consommation du programme. Les méthodes courantes de production de données incluent la lecture de fichiers disque, la lecture du contenu des requêtes réseau, etc. Jetez un œil à l'exemple précédent de ce qu'est un flux :
const rs = fs.createReadStream(filePath);
Copier après la connexion
rs est un flux lisible, et sa méthode de production de données consiste à read Le fichier sur le disque, la console process.stdin est également un flux lisible :
process.stdin.pipe(process.stdout);
Copier après la connexion
Vous pouvez imprimer l'entrée de la console avec une phrase simple. La façon dont process.stdin produit des données est de lire le. entrée de l'utilisateur dans l'entrée de la console.
Retour sur la définition des flux lisibles :
Les flux lisibles sont des flux qui produisent des données destinées à la consommation du programme.
Flux lisible personnalisé
En plus du système fourni
fs.CreateReadStream
Gulp ou vinyle utilisé La méthode src fournie par -fs utilise également des flux lisibles
gulp.src(['*.js', 'dist/**/*.scss'])
Copier après la connexion
Si vous souhaitez produire des données d'une manière spécifique et les transmettre au programme pour consommation, par où commencer ?
Juste deux étapes simples :
- Héritez la classe
Readable
- du module sream _read, appelez this.push pour mettre les données produites dans la file d'attente à lire
La classe lisible a mis La plupart du travail à effectuer par le flux lisible est terminé. Il vous suffit d'en hériter, puis d'écrire la méthode de production de données dans la méthode _read pour implémenter un flux lisible personnalisé.
Par exemple : implémenter un flux qui génère un nombre aléatoire toutes les 100 millisecondes (pas très utile)
const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
constructor(max) {
super()
}
_read() {
const ctx = this;
setTimeout(() => {
const randomNumber = parseInt(Math.random() * 10000);
// 只能 push 字符串或 Buffer,为了方便显示打一个回车
ctx.push(`${randomNumber}\n`);
}, 100);
}
}
module.exports = RandomNumberStream;
Copier après la connexion
La partie héritage de classe du code est très simple, regardez principalement l'implémentation du Méthode _read, il y en a plusieurs Un point remarquable
La classe Readable a une implémentation par défaut de la méthode _read, mais rien n'est fait. Ce que nous faisons est de remplacer et de réécrire le - _read. méthode qui a un paramètre size , utilisé pour spécifier la quantité de données qui doivent être lues et renvoyées à la méthode read, mais ce n'est qu'une donnée de référence. De nombreuses implémentations ignorent ce paramètre, et nous l'ignorons ici. Nous le mentionnerons en détail plus tard.
- grâce à this.push vers le tampon Push data, la notion de tampon sera évoquée plus tard. Pour l'instant, il est entendu qu'il est pressé dans la conduite d'eau et peut être consommé
- Le. le contenu de push ne peut être qu'une chaîne ou un tampon, pas un nombre
- La méthode push a Le deuxième paramètre d'encodage est utilisé pour spécifier l'encodage lorsque le premier paramètre est une chaîne
-
Exécuter pour voir l'effet
const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream();
rns.pipe(process.stdout);
Copier après la connexion
De cette façon, vous pouvez voir que les nombres sont affichés en continu dans le contrôle. Sur la plateforme, un flux lisible qui génère des nombres aléatoires a été implémenté. Il y a encore quelques petits problèmes. à résoudre
Comment arrêter
Pousser un numéro dans le tampon toutes les 100 millisecondes, puis Par exemple, lors de la lecture d'un fichier local, c'est toujours terminé Comment arrêter et indiquer que le. les données ont été lues ?
Envoyez simplement une valeur nulle dans le tampon. Modifiez le code pour permettre aux consommateurs de définir le nombre de nombres aléatoires nécessaires :
const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
constructor(max) {
super()
this.max = max;
}
_read() {
const ctx = this;
setTimeout(() => {
if (ctx.max) {
const randomNumber = parseInt(Math.random() * 10000);
// 只能 push 字符串或 Buffer,为了方便显示打一个回车
ctx.push(`${randomNumber}\n`);
ctx.max -= 1;
} else {
ctx.push(null);
}
}, 100);
}
}
module.exports = RandomNumberStream;
Copier après la connexion
Le code utilise un identifiant maximum pour permettre aux consommateurs de spécifier les besoins. Le nombre de caractères peut être spécifié lors de l'instanciation
const RandomNumberStream = require('./');
const rns = new RandomNumberStream(5);
rns.pipe(process.stdout);
Copier après la connexion
De cette façon, vous pouvez voir que la console n'imprime que 5 caractères
Pourquoi setTimeout au lieu de setInterval
Étudiants prudents Vous avez peut-être remarqué que la génération d'un nombre aléatoire toutes les 100 millisecondes n'appelle pas setInterval, mais utilise setTimeout. Pourquoi est-il simplement retardé et non répété, mais le résultat est correct ?
Cela nécessite de comprendre les deux façons dont fonctionnent les flux
Mode fluide : les données sont lues par le système sous-jacent et fournies à l'application le plus rapidement possible - Pause mode : La méthode read() doit être appelée explicitement pour lire plusieurs blocs de données.
-
Le flux est en mode pause par défaut, ce qui signifie que le programme doit appeler explicitement la méthode read(). dans l'exemple ci-dessus, les données peuvent être obtenues sans appel, car le flux passe en mode flux via la méthode pipe(), donc la méthode _read() sera automatiquement appelée à plusieurs reprises jusqu'à ce que les données soient lues, donc à chaque fois _read( ) La méthode n'a besoin de lire les données qu'une seule fois
Basculement entre le mode flux et le mode pause
Les méthodes suivantes peuvent être utilisées pour faire passer le flux du mode pause par défaut au mode flux :
Démarrez la surveillance des données en ajoutant un écouteur d'événements de données- Appelez la méthode curriculum vitae() pour démarrer le flux de données
- Appelez la méthode pipe() pour transférer les données vers un autre flux inscriptible
-
Il existe deux manières de passer du mode flux au mode pause :
Lorsque le flux n'a pas de pipe(), appeler la méthode pause() peut mettre le flux en pause- Supprimer tous les écouteurs d'événements de données lorsque pipe(), puis appeler la méthode unpipe()
-
événement de données
Après avoir utilisé le pipe( ), les données seront diffusées à partir du flux lisible. Il est entré dans un flux inscriptible, mais cela semble être une boîte noire pour l'utilisateur. Comment les données circulent-elles ? Il existe deux termes importants lors du basculement entre le mode flux et le mode pause
événement de données correspondant au mode flux- méthode read() correspondant au mode pause
-
Ces deux mécanismes sont la raison pour laquelle le programme peut piloter le flux de données. Examinons d'abord l'événement de données en mode flux. Une fois l'événement de données du flux lisible surveillé, le flux entre en mode flux. qui appelle le flux ci-dessus
const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream(5);
rns.on('data', chunk => {
console.log(chunk);
});
Copier après la connexion
这样可以看到控制台打印出了类似下面的结果
<Buffer 39 35 37 0a>
<Buffer 31 30 35 37 0a>
<Buffer 38 35 31 30 0a>
<Buffer 33 30 35 35 0a>
<Buffer 34 36 34 32 0a>
Copier après la connexion
当可读流生产出可供消费的数据后就会触发 data 事件,data 事件监听器绑定后,数据会被尽可能地传递。data 事件的监听器可以在第一个参数收到可读流传递过来的 Buffer 数据,这也就是控制台打印的 chunk,如果想显示为数字,可以调用 Buffer 的 toString() 方法
当数据处理完成后还会触发一个
end
事件,因为流的处理不是同步调用,所以如果希望完事后做一些事情就需要监听这个事件,在代码最后追加一句:
rns.on('end', () => {
console.log('done');
});复制代码
Copier après la connexion
这样可以在数据接收完了显示 done
,当然数据处理过程中出现了错误会触发 error 事件,可以监听做异常处理:
rns.on('error', (err) => {
console.log(err);
});复制代码
Copier après la connexion
read(size)
流在暂停模式下需要程序显式调用 read() 方法才能得到数据,read() 方法会从内部缓冲区中拉取并返回若干数据,当没有更多可用数据时,会返回null
使用 read() 方法读取数据时,如果传入了 size 参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据
现在有一个矛盾,在流动模式下流生产出了数据,然后触发 data 事件通知给程序,这样很方便。在暂停模式下需要程序去读取,那么就有一种可能是读取的时候还没生产好,如果使用轮询的方式未免效率有些低
NodeJS 提供了一个
readable的事件,事件在可读流准备好数据的时候触发,也就是先监听这个事件,收到通知有数据了再去读取就好了:
const rns = new RandomNumberStream(5);
rns.on('readable', () => {
let chunk;
while((chunk = rns.read()) !== null){
console.log(chunk);
}
});
Copier après la connexion
这样可以读取到数据,值得注意的一点是并不是每次调用 read() 方法都可以返回数据,前面提到了如果可用的数据没有达到 size 那么返回 null,所以在程序中加了个判断
数据会不会漏掉
const stream = fs.createReadStream('/dev/input/event0');
stream.on('readable', callback);复制代码
Copier après la connexion
在流动模式会不会有这样的问题:可读流在创建好的时候就生产数据了,如果在绑定 readable 事件之前就生产了某些数据,触发了 readable 事件,在极端情况下会造成数据丢失吗?
事实并不会,按照 NodeJS event loop 程序创建流和调用事件监听在一个事件队列里面,生产数据和事件监听都是异步操作,而 on 监听事件使用了 process.nextTick
会保证在数据生产之前被绑定好,相关知识可以看定时器章节中对 event loop 的解读
到这里可能对 data事件、readable事件触发时机, read() 方法每次读多少数据,什么时候返回 null 还有一定的疑问,在后续可写流章节会在 back pressure 部分结合源码介绍相关机制
推荐教程:《JS教程》
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!