Dieser Artikel bietet Ihnen eine Erklärung des Funktionsmechanismus von Node Stream (mit Beispielen). Freunde in Not können sich darauf beziehen.
Wenn Sie Node lernen, müssen Streams ein Konzept sein, das Sie beherrschen müssen. Wenn Sie ein Node-Meister werden möchten, muss Flow ein unverzichtbarer Bestandteil der Kampfkunstgeheimnisse sein.
Zitiert aus Stream-Handbook. Dies zeigt die Bedeutung von Streams für das vertiefte Lernen von Node.
Streaming kann man sich als Übertragungsmöglichkeit vorstellen. Durch Streams können Daten reibungslos und ohne Nebenwirkungen an das Ziel übertragen werden. In Node sind die von Node Stream erstellten Streams für String und Buffer vorgesehen. Im Allgemeinen wird Buffer verwendet. Stream stellt eine Übertragungsfunktion dar, und Buffer ist der Träger für die Übertragung von Inhalten ( kann so verstanden werden: Stream: der Mitnahme-Bruder, Buffer: Ihr Mitnahme-Bruder ). Setzen Sie ObjectMode beim Erstellen eines Streams auf true. Stream kann auch jede Art von JS-Objekt übertragen (außer null, das spezielle Verwendungszwecke in Streams hat).
Jetzt gibt es eine Anforderung, wir müssen eine große Datei an den Kunden übertragen. Bei Verwendung der folgenden Methode
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);
muss bei jedem Empfang einer Anfrage die große Datei in den Speicher eingelesen und dann an den Client übertragen werden. Folgende drei Konsequenzen können auf diese Weise eintreten:
Speichererschöpfung
Verlangsamung anderer Prozesse
Erhöhen Sie die Auslastung des Garbage Collectors
Daher ist diese Methode keine gute Lösung bei der Übertragung großer Dateien. Der Umfang der Parallelität ist groß und Hunderte von Anfragen können leicht den Speicher erschöpfen.
Was wäre, wenn wir Streaming nutzen würden?
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);
Mit dieser Methode wird nicht zu viel Speicher beansprucht. Lesen und übertragen Sie einfach ein wenig. Der gesamte Vorgang verläuft reibungslos und ist sehr elegant. Wenn Sie die Dateien während des Übertragungsprozesses verarbeiten möchten, z. B. Komprimierung, Verschlüsselung usw., ist die Erweiterung ebenfalls einfach (wird später ausführlich vorgestellt).
Streams gibt es überall in Node. Wie aus dem Bild unten ersichtlich ist:
Stream ist in vier Hauptkategorien unterteilt:
Lesbar (lesbarer Stream)
Beschreibbar (beschreibbarer Stream)
Duplex (Duplex-Stream)
Transformation (Konvertierungsstream)
Daten im lesbaren Stream können in den folgenden zwei Datenmodi generiert werden.
Fließender Modus
Nicht fließender Modus
Triggermethoden in den beiden Modi Und die Die Art des Konsums ist unterschiedlich.
Fließender Modus: Daten werden kontinuierlich produziert und bilden ein „fließendes“ Phänomen. Dieser Modus kann durch Abhören des data
-Ereignisses des Streams aufgerufen werden.
Nicht fließender Modus: Sie müssen die Methode read()
explizit aufrufen, um Daten zu erhalten.
Die beiden Modi können ineinander konvertiert werden
Der Anfangszustand des Streams ist Null, durch Abhören des data
-Ereignis oder pipe
-Methode, rufen Sie die resume
-Methode auf, um den Fluss in den Flowing Mode
-Zustand zu übertragen. Rufen Sie die Methode Flowing Mode
im Status pause
auf, um den Stream in den Status Non-Flowing Mode
zu versetzen. Durch Aufrufen der Methode Non-Flowing Mode
im Status resume
kann der Stream auch in den Status Flowing Mode
versetzt werden.
Im Folgenden finden Sie eine detaillierte Einführung in den Betriebsmechanismus des Readable-Streams in den beiden Modi.
Im Flowing Mode-Zustand überwacht der erstellte myReadable-Lesestream direkt das Datenereignis und die Daten fließen kontinuierlich zum Verbrauch aus.
myReadable.on('data',function(chunk){ consume(chunk);//消费流 })
Sobald das Datenereignis überwacht ist, ist der interne Prozess von Readable wie unten gezeigt
Die Kernmethode wird im Inneren gelesen die Stream-Methode, die verschiedene Vorgänge auslöst, wenn der Parameter n unterschiedliche Werte hat. Der Hightwatermark in der Beschreibung unten stellt die Größe des Pufferpools innerhalb des Streams dar.
n=undefiniert (Daten verbrauchen und einen lesbaren Stream auslösen)
n=0 (einen lesbaren Stream auslösen, aber nicht verbrauchen)
n>hightwatermark (ändern Sie den Wert von hightwatermark)
n
n>Puffer (kann null oder alle Daten im Puffer zurückgeben (der letzte Lesevorgang zu diesem Zeitpunkt))
图中黄色标识的_read(),是用户实现流所需要自己实现的方法,这个方法就是实际读取流的方式(可以这样理解,外卖平台给你提供外卖的能力,那_read()方法就相当于你下单点外卖)。后面会详细介绍如何实现_read方法。
以上的流程可以描述为:监听data方法,Readable内部就会调用read方法,来进行触发读流操作,通过判断是同步还是异步读取,来决定读取的数据是否放入缓冲区。如果为异步的,那么就要调用flow方法,来继续触发read方法,来读取流,同时根据size参数判定是否emit('data')来消费流,循环读取。如果是同步的,那就emit('data')来消费流,同时继续触发read方法,来读取流。一旦push方法传入的是null,整个流就结束了。
从使用者的角度来看,在这种模式下,你可以通过下面的方式来使用流
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk){ writeFile1.write(chunk); })
相对于Flowing mode,Non-Flowing Mode要相对简单很多。
消费该模式下的流,需要使用下面的方式
myReadable.on(‘readable’,function(){ const chunk = myReadable.read() consume(chunk);//消费流 })
在Non-Flowing Mode下,Readable内部的流程如下图:
从这个图上看出,你要实现该模式的读流,同样要实现一个_read方法。
整个流程如下:监听readable方法,Readable内部就会调用read方法。调用用户实现的_read方法,来push数据到缓冲池,然后发送emit readable事件,通知用户端消费。
从使用者的角度来看,你可以通过下面的方式来使用该模式下的流
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('readable',function(chunk) { while (null !== (chunk = myReadable.read())) { writeFile.write(chunk); } });
相对于读流,写流的机制就更容易理解了。
写流使用下面的方式进行数据写入
myWrite.write(chunk);
调用write后,内部Writable的流程如下图所示
类似于读流,实现一个写流,同样需要用户实现一个_write方法。
整个流程是这样的:调用write之后,会首先判定是否要写入缓冲区。如果不需要,那就调用用户实现的_write方法,将流写入到相应的地方,_write会调用一个writeable内部的一个回调函数。
从使用者的角度来看,使用一个写流,采用下面的代码所示的方式。
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk) { writeFile.write(chunk); })
可以看到,使用写流是非常简单的。
我们先讲解一下如何实现一个读流和写流,再来看Duplex和Transform是什么,因为了解了如何实现一个读流和写流,再来理解Duplex和Transform就非常简单了。
实现自定义的Readable,只需要实现一个_read方法即可,需要在_read方法中调用push方法来实现数据的生产。如下面的代码所示:
const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }); } } // 模拟资源池 const dataSource = { data: new Array(10).fill('-'), makeData() { if (!dataSource.data.length) return null; return dataSource.data.pop(); } }; const myReadable = new MyReadable(dataSource,); myReadable.on('readable', () => { let chunk; while (null !== (chunk = myReadable.read())) { console.log(chunk); } });
实现自定义的writable,只需要实现一个_write方法即可。在_write中消费chunk写入到相应地方,并且调用callback回调。如下面代码所示:
const Writable = require('stream').Writable; class Mywritable extends Writable{ constuctor(options){ super(options); } _write(chunk,endcoding,callback){ console.log(chunk); callback && callback(); } } const myWritable = new Mywritable();
双工流:简单理解,就是讲一个Readable流和一个Writable流绑定到一起,它既可以用来做读流,又可以用来做写流。
实现一个Duplex流,你需要同时实现_read和_write方法。
有一点需要注意的是:它所包含的 Readable流和Writable流是完全独立,互不影响的两个流,两个流使用的不是同一个缓冲区。通过下面的代码可以验证
// 模拟资源池1 const dataSource1 = { data: new Array(10).fill('a'), makeData() { if (!dataSource1.data.length) return null; return dataSource1.data.pop(); } }; // 模拟资源池2 const dataSource2 = { data: new Array(10).fill('b'), makeData() { if (!dataSource2.data.length) return null; return dataSource2.data.pop(); } }; const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } } const Writable = require('stream').Writable; class MyWritable extends Writable{ constructor(options){ super(options); } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const Duplex = require('stream').Duplex; class MyDuplex extends Duplex{ constructor(dataSource,options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const myWritable = new MyWritable(); const myReadable = new MyReadable(dataSource1); const myDuplex = new MyDuplex(dataSource1); myReadable.pipe(myDuplex).pipe(myWritable);
打印的结果是
abababababababababab
从这个结果可以看出,myReadable.pipe(myDuplex)
,myDuplex充当的是写流,写入的内容是a;myDuplex.pipe(myWritable)
,myDuplex充当的是读流,往myWritable写的却是b;所以说它所包含的 Readable流和Writable流是完全独立的。
理解了Duplex,就更好理解Transform了。Transform是一个转换流,它既有读的功能又有写的功能,但是它和Duplex不同的是,它的读流和写流共用同一个缓冲区;也就是说,通过它读入什么,那它就能写入什么。
实现一个Transform,你只需要实现一个_transform方法。比如最简单的Transform:PassThrough,其源代码如下所示
PassThrough就是一个Transform,但是这个转换流,什么也没做,相当于一个透明的转换流。可以看到_transform中什么都没有,只是简单的将数据进行回调。
如果我们在这个环节做些扩展,只需要在_transform中直接扩展就行了。比如我们可以对流进行压缩,加密,混淆等等操作。
最后介绍一个流中非常重要的一个概念:背压。要了解这个,我们首先来看下pipe和highWaterMaker是什么。
首先看下下面的代码
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.pipe(writeFile);
上面的代码和下面是等价的
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(data){ var flag = ws.write(data); if(!flag){ // 当前写流缓冲区已满,暂停读数据 readFile.pause(); } }) writeFile.on('drain',function()){ readFile.resume();// 当前写流缓冲区已清空,重新开始读流 } readFile.on('end',function(data){ writeFile.end();//将写流缓冲区的数据全部写入,并且关闭写入的文件 })
pipe所做的操作就是相当于为写流和读流自动做了速度的匹配。
读写流速度不匹配的情况下,一般情况下不会造成什么问题,但是会造成内存增加。内存消耗增加,就有可能会带来一系列的问题。所以在使用的流的时候,强烈推荐使用pipe。
highWaterMaker说白了,就是定义缓冲区的大小。
默认16Kb(Readable最大8M)
可以自定义
背压的概念可以理解为:为了防止读写流速度不匹配而产生的一种调整机制;背压该调整机制的触发时机,受限于highWaterMaker设置的大小。
如上面的代码 var flag = ws.write(data);
,一旦写流的缓冲区满了,那flag
就会置为false,反向促进读流的速度调整。
主要有以下场景
文件操作(复制,压缩,解压,加密等)
下面的就很容易就实现了文件复制的功能。
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big_copy.file'); readFile.pipe(writeFile);
那我们想在复制的过程中对文件进行压缩呢?
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big.gz'); const zlib = require('zlib'); readFile.pipe(zlib.createGzip()).pipe(writeFile);
实现解压、加密也是类似的。
静态文件服务器
比如需要返回一个html,可以使用如下代码。
var http = require('http'); var fs = require('fs'); http.createServer(function(req,res){ fs.createReadStream('./a.html').pipe(res); }).listen(8000);
Das obige ist der detaillierte Inhalt vonErläuterung des Betriebsmechanismus von Node Stream (mit Beispielen). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!