Qu'est-ce qu'un flux ? Comment comprendre les flux dans Nodejs
青灯夜游
Libérer: 2022-05-16 20:56:23
avant
2201 Les gens l'ont consulté
Qu'est-ce que le flux ? Comment comprendre le flux ? L'article suivant vous donnera une compréhension approfondie du Stream dans Node J'espère qu'il vous sera utile !
L'auteur a souvent utilisé la fonction pipe dans le développement récemment. Je sais seulement qu'il s'agit d'un pipeline pour les flux, mais je ne sais pas comment cela fonctionne. Donc, avec la mentalité de découvrir, je pars simplement de. le flux et apprenez-le avec désinvolture. Les connaissances et le code source que j'ai lu sont compilés dans un article à partager avec tout le monde.
Stream est un concept très basique dans Nodejs De nombreux modules de base sont implémentés sur la base de flux et jouent un rôle très important. Dans le même temps, le flux est également un concept très difficile à comprendre. Cela est principalement dû au manque de documentation pertinente. Pour les débutants en NodeJ, il faut souvent beaucoup de temps pour comprendre le flux avant de pouvoir réellement maîtriser ce concept. pour la plupart des NodeJ, c'est le cas. Pour les utilisateurs, il n'est utilisé que pour développer des applications Web. Une compréhension insuffisante des flux n'affecte pas leur utilisation. Cependant, comprendre les flux peut conduire à une meilleure compréhension des autres modules de NodeJs et, dans certains cas, l'utilisation de flux pour traiter les données fournira de meilleurs résultats. [Recommandations de didacticiel associées : Tutoriel vidéo Nodejs]
Comment comprendre les flux
Pour les utilisateurs de flux, le flux peut être considéré comme un tableau, et nous devons uniquement nous concentrer sur l'obtention (la consommation) et l'écriture à partir de ça (Production) va bien.
Pour les développeurs de flux (utilisant le module stream pour créer une nouvelle instance), ils se concentrent sur la façon d'implémenter certaines méthodes dans le flux. Ils se concentrent généralement sur deux points, qui est la ressource cible et comment exploiter la ressource cible. .Une fois déterminé, il est nécessaire d'exploiter les ressources cibles en fonction des différents états et événements du flux
Pool de cache
Tous les flux dans NodeJs ont des pools de tampons. Le but du pool de tampons est d'augmenter l'efficacité de. le flux lorsque les données sont produites et consommées. Lorsque tout cela prend du temps, nous pouvons produire des données à l'avance et les stocker dans le pool tampon avant la prochaine consommation. Cependant, le pool de mémoire tampon n'est pas toujours utilisé. Par exemple, lorsque le pool de cache est vide, les données ne seront pas placées dans le pool de cache après la production mais seront consommées directement.
.
Si la vitesse de production des données est supérieure à la vitesse de consommation des données, les données excédentaires attendront quelque part. Si la vitesse de production des données est inférieure à la vitesse de consommation des données du processus, les données s'accumuleront jusqu'à une certaine quantité quelque part, puis seront consommées. (Les développeurs ne peuvent pas contrôler la vitesse de production et de consommation des données, ils peuvent seulement essayer de produire des données ou de consommer des données à quelle heure)
Cet endroit où les données attendent, accumulent des données, puis se produisent. C'est le pool tampon. Le pool de mémoire tampon est généralement situé dans la RAM (mémoire) de l'ordinateur.
Pour donner un exemple de tampon courant, lorsque nous regardons des vidéos en ligne, si votre vitesse Internet est rapide, le tampon sera toujours rempli immédiatement, puis envoyé au système pour lecture, puis la vidéo suivante sera immédiatement mise en mémoire tampon. Il n'y aura aucun décalage pendant la visualisation. Si la vitesse du réseau est très lente, vous verrez un chargement, indiquant que le tampon est en cours de remplissage. Une fois le remplissage terminé, les données sont envoyées au système et vous pouvez voir cette vidéo.
Le pool de cache du flux NodeJs est une liste chaînée Buffer. Chaque fois que vous souhaitez ajouter des données au pool de cache, un nœud Buffer sera recréé et inséré à la fin de la liste chaînée.
EventEmitter
Stream dans NodeJs est une interface abstraite qui implémente EventEmitter, je vais donc d'abord présenter brièvement EventEmitter.
EventEmitter est une classe qui implémente des fonctions de publication d'événements et d'abonnement. Plusieurs méthodes couramment utilisées (on, once, off, submit) sont considérées comme familières à tout le monde, je ne les présenterai donc pas une par une.
const { EventEmitter } = require('events')
const eventEmitter = new EventEmitter()
// 为 eventA 事件绑定处理函数
eventEmitter.on('eventA', () => {
console.log('eventA active 1');
});
// 为 eventB 事件绑定处理函数
eventEmitter.on('eventB', () => {
console.log('eventB active 1');
});
eventEmitter.once('eventA', () => {
console.log('eventA active 2');
});
// 触发 eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2
Copier après la connexion
Il convient de noter que EventEmitter a deux événements appelés newListener et removeListener lorsque vous ajoutez un événement à un objet événement après avoir écouté. la fonction newListener (eventEmitter.emit('newListener')) sera déclenchée lorsqu'une fonction de traitement est supprimée, removeListener sera déclenchée de la même manière. EventEmitter 有两个叫做 newListener 和 removeListener 的事件,当你向一个事件对象中添加任何事件监听函数后,都会触发 newListener(eventEmitter.emit('newListener')),当一个处理函数被移除时同理会触发 removeListener。
还需要注意的是, once 绑定的处理函数只会执行一次,removeListener 将在其执行前被触发,这意味着 once
Il convient également de noter que la fonction de gestionnaire liée une fois ne sera exécutée qu'une seule fois, et removeListener sera déclenchée avant son exécution, ce qui signifie que l'écouteur lié par once La fonction est d'abord supprimée avant d'être déclenchée.
Mais cela n'a pas d'importance pour ce que nous verrons plus tard. StreamStream est une interface abstraite pour traiter les données en streaming dans Node.js. Stream n'est pas une interface réelle, mais un terme général désignant tous les flux. Les interfaces réelles sont ReadableStream, WritableStream et ReadWriteStream.
On peut voir que ReadableStream et WritableStream sont toutes deux des interfaces qui héritent de la classe EventEmitter (les interfaces dans ts peuvent hériter de classes, car ce ne sont que des types de fusion). Les classes d'implémentation correspondant aux interfaces ci-dessus sont respectivement Readable, Writable et Duplex Il existe 4 types de flux dans NodeJs : 🎜
Flux lisible en lecture (implémente ReadableStream)
Flux inscriptible en écriture (implémente WritableStream)
Flux duplex lisible et inscriptible (implémente WritableStream après avoir hérité de Readable)
Transformer le flux de conversion (hérite de Duplex)
Problème de contre-pression
Le la vitesse d'écriture des données sur le disque est bien inférieure à celle de la mémoire. Nous imaginons qu'il y a un « tuyau » entre la mémoire et le disque, et le « tuyau » est le « flux » dans lequel les données de la mémoire affluent. le tuyau très rapidement. Lorsque le tuyau est bloqué Lorsqu'il est plein, une contre-pression des données se produira dans la mémoire et les données seront accumulées dans la mémoire, occupant des ressources.
La solution pour NodeJs Stream est de définir une valeur flottante pour le pool de cache (c'est-à-dire la file d'attente d'écriture dans l'image) de chaque flux lorsque la quantité de données atteint ce flottant. valeur, accédez au cache Lorsque le pool push à nouveau les données, il renverra false, ce qui signifie que le contenu du pool de cache dans le flux actuel a atteint la valeur flottante et qu'aucune donnée supplémentaire ne devrait être écrit. À ce stade, nous devons arrêter immédiatement la production de données pour empêcher la mise en cache. Un pool surdimensionné crée une contre-pression. 缓存池(就是图中写入队列)设置一个浮标值,当其中数据量达到这个浮标值后,往缓存池再次 push 数据时就会返回 false,表示当前流中缓存池内容已经达到浮标值,不希望再有数据写入了,这时我们应该立即停止数据的生产,防止缓存池过大产生背压。
Readable stream (Readable) est un type de flux Il a deux modes et trois étatsDeux modes de lecture :
Mode flux : les données seront lues et écrites du système sous-jacent dans le tampon. Lorsque le tampon est plein, les données seront automatiquement transmises au gestionnaire d'événements enregistré via EventEmitter dès que possible.
Mode Pause dans le programme : Dans ce mode, EventEmitter ne sera pas activement déclenché pour transmettre des données. La méthode Readable.read() doit être explicitement appelée pour lire les données du tampon de lecture des événements. sera déclenché en réponse à EventEmitter.
Trois états :
readableFlowing === null (état initial)
readableFlowing === false (mode pause)
readableFlowing === true (mode flux)
Le readable.readableFlowing du flux initial est null
et devient vrai après l'ajout de l'événement de données. Lorsque pause(), unpipe() est appelé, ou qu'une contre-pression est reçue ou qu'un événement readable est ajouté, readableFlowing</code > will Est défini sur false,<strong>Dans cet état, lier un écouteur à l'événement de données ne fera pas passer readableFlowing à true</strong>. </tr><tr>Appelez <code>resume() pour faire passer le readableFlowing du flux lisible à true
Supprimer tous les événements lisibles est le seul moyen de rendre readableFlowing nul. Nom de l'événement
Ce sera être déclenché à chaque fois que les données sont consommées. Le paramètre est les données consommées cette fois. Déclenché lorsque le flux est fermé. Déclenché lorsqu'une erreur se produit dans le flux. une longueur de taille. Renvoyer null signifie que les données actuelles sont inférieures à la taille. Sinon, les données consommées cette fois sont renvoyées. Lorsque la taille n'est pas transmise, cela signifie consommer toutes les données du pool de cache
// hwm 不会大于 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
// 1GB限制
n = MAX_HWM;
} else {
//取下一个2最高幂,以防止过度增加hwm
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
}
return n;
}
Copier après la connexion
流动模式
所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:
添加 " data " 事件句柄;
调用 “ resume ”方法;
使用 " pipe " 方法把数据发送到可写流
流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调 _read 方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null) );
可读流可以通过以下方式切换回暂停模式:
如果没有管道目标,则调用 stream.pause() 。
如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。
const { Readable } = require('stream')
let count = 1000
const myReadable = new Readable({
highWaterMark: 300,
read(size) {
let chunk = null
setTimeout(() => {
if (count > 0) {
let chunkLength = Math.min(count, size)
chunk = '1'.repeat(chunkLength)
count -= chunkLength
}
this.push(chunk)
}, 500)
}
})
myReadable.on('data', data => {
console.log(data.toString())
})
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn