Ein lesbarer Stream ist ein Stream, der Daten erzeugt für den Programmverbrauch. Zu den gängigen Datenproduktionsmethoden gehören das Lesen von Festplattendateien, das Lesen von Netzwerkanforderungsinhalten usw. Schauen Sie sich das vorherige Beispiel an, was ein Stream ist:
const rs = fs.createReadStream(filePath);
rs ist ein lesbarer Stream, und seine Methode zur Datenproduktion besteht darin, ihn zu erzeugen read Die Festplattendatei console process.stdin ist ebenfalls ein lesbarer Stream:
process.stdin.pipe(process.stdout);
Sie können die Konsoleneingabe mit einem einfachen Satz ausdrucken. Die Art und Weise, wie Process.stdin Daten erzeugt, besteht darin, die Eingaben des Benutzers in der Konsole zu lesen Eingang.
Schauen Sie sich noch einmal die Definition lesbarer Streams an:
Lesbare Streams sind Streams, die Daten für den Programmverbrauch erzeugen.
Zusätzlich zum vom System bereitgestellten
fs.CreateReadStream
Verwendeter Gulp oder Vinyl Die von -fs bereitgestellte src-Methode verwendet auch lesbare Streams
gulp.src(['*.js', 'dist/**/*.scss'])
Wenn Sie Daten auf eine bestimmte Art und Weise produzieren und dem Programm zum Verbrauch übergeben möchten, wie fangen Sie an?
Nur zwei einfache Schritte:
Readable
-Klasse_read
-Methode, rufen Siethis.push
auf, um die erzeugten Daten in die Warteschlange zu stellen, damit sie von derReadable-Klasse gelesen werden können Der größte Teil der vom lesbaren Stream zu erledigenden Arbeiten ist abgeschlossen. Sie müssen ihn nur erben und dann die Methode zum Erzeugen von Daten in die _read-Methode schreiben, um einen benutzerdefinierten lesbaren Stream zu implementieren.
Zum Beispiel: Implementieren Sie einen Stream, der alle 100 Millisekunden eine Zufallszahl generiert (nicht sehr nützlich)
const Readable = require('stream').Readable; class RandomNumberStream extends Readable { constructor(max) { super() } _read() { const ctx = this; setTimeout(() => { const randomNumber = parseInt(Math.random() * 10000); // 只能 push 字符串或 Buffer,为了方便显示打一个回车 ctx.push(`${randomNumber}\n`); }, 100); } } module.exports = RandomNumberStream;
Der Klassenvererbungsteil des Codes ist sehr einfach, schauen Sie sich hauptsächlich die Implementierung an _read-Methode, es gibt mehrere Ein bemerkenswerter Punkt
Führen Sie es aus, um den Effekt zu sehen
const RandomNumberStream = require('./RandomNumberStream'); const rns = new RandomNumberStream(); rns.pipe(process.stdout);
Auf diese Weise können Sie sehen, dass die Zahlen kontinuierlich im Steuerelement angezeigt werden. Auf der Plattform wurde ein lesbarer Stream implementiert, der Zufallszahlen generiert. Es gibt noch ein paar kleine Zu lösende Probleme
Schieben Sie alle 100 Millisekunden eine Zahl in den Puffer. Wenn beispielsweise das Lesen einer lokalen Datei immer abgeschlossen ist, wie stoppt man und zeigt an, dass die Daten gespeichert sind wurde gelesen?
Schieben Sie einfach eine Null in den Puffer, damit Verbraucher definieren können, wie viele Zufallszahlen benötigt werden:
const Readable = require('stream').Readable; class RandomNumberStream extends Readable { constructor(max) { super() this.max = max; } _read() { const ctx = this; setTimeout(() => { if (ctx.max) { const randomNumber = parseInt(Math.random() * 10000); // 只能 push 字符串或 Buffer,为了方便显示打一个回车 ctx.push(`${randomNumber}\n`); ctx.max -= 1; } else { ctx.push(null); } }, 100); } } module.exports = RandomNumberStream;
Der Code verwendet eine maximale Kennung, damit Verbraucher die Anforderungen angeben können. Die Anzahl der Zeichen kann bei der Instanziierung angegeben werden
const RandomNumberStream = require('./'); const rns = new RandomNumberStream(5); rns.pipe(process.stdout);
Auf diese Weise können Sie sehen, dass die Konsole nur 5 Zeichen druckt
Aufmerksame Schüler Möglicherweise ist Ihnen aufgefallen, dass beim Generieren einer Zufallszahl alle 100 Millisekunden nicht setInterval aufgerufen wird, sondern setTimeout verwendet wird. Warum wird es nur verzögert und nicht wiederholt, aber das Ergebnis ist korrekt?
Dies erfordert das Verständnis der beiden Funktionsweisen von Streams
Der Stream befindet sich standardmäßig im Pausenmodus, was bedeutet, dass das Programm die Methode read() explizit aufrufen muss Im obigen Beispiel können die Daten ohne Aufruf abgerufen werden, da der Stream über die Methode „pipe()“ in den Flussmodus geschaltet wird, sodass die Methode „_read()“ automatisch wiederholt aufgerufen wird, bis die Daten gelesen werden, also jedes Mal, wenn _read( ) Die Methode muss die Daten nur einmal lesen
Die folgenden Methoden können verwendet werden, um den Flow vom Standard-Pausemodus in den Flow-Modus umzuschalten:
Es gibt zwei Möglichkeiten, vom Flussmodus in den Pausenmodus zu wechseln:
Nach der Verwendung der Pipe( )-Methode werden die Daten aus dem lesbaren Stream gestreamt. Es ist ein beschreibbarer Stream eingetreten, aber es scheint für den Benutzer eine Blackbox zu sein. Wie fließen die Daten? Beim Umschalten zwischen Flow-Modus und Pausenmodus gibt es zwei wichtige Begriffe
Diese beiden Mechanismen sind der Grund, warum das Programm den Datenfluss steuern kann. Sobald das Datenereignis des lesbaren Streams überwacht wird, kann der Code neu geschrieben werden das ruft den Stream oben auf
const RandomNumberStream = require('./RandomNumberStream'); const rns = new RandomNumberStream(5); rns.on('data', chunk => { console.log(chunk); });
这样可以看到控制台打印出了类似下面的结果
<Buffer 39 35 37 0a> <Buffer 31 30 35 37 0a> <Buffer 38 35 31 30 0a> <Buffer 33 30 35 35 0a> <Buffer 34 36 34 32 0a>
当可读流生产出可供消费的数据后就会触发 data 事件,data 事件监听器绑定后,数据会被尽可能地传递。data 事件的监听器可以在第一个参数收到可读流传递过来的 Buffer 数据,这也就是控制台打印的 chunk,如果想显示为数字,可以调用 Buffer 的 toString() 方法
当数据处理完成后还会触发一个
end
事件,因为流的处理不是同步调用,所以如果希望完事后做一些事情就需要监听这个事件,在代码最后追加一句:
rns.on('end', () => { console.log('done'); });复制代码
这样可以在数据接收完了显示 done
,当然数据处理过程中出现了错误会触发 error 事件,可以监听做异常处理:
rns.on('error', (err) => { console.log(err); });复制代码
流在暂停模式下需要程序显式调用 read() 方法才能得到数据,read() 方法会从内部缓冲区中拉取并返回若干数据,当没有更多可用数据时,会返回null
使用 read() 方法读取数据时,如果传入了 size 参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据
现在有一个矛盾,在流动模式下流生产出了数据,然后触发 data 事件通知给程序,这样很方便。在暂停模式下需要程序去读取,那么就有一种可能是读取的时候还没生产好,如果使用轮询的方式未免效率有些低
NodeJS 提供了一个
readable的事件,事件在可读流准备好数据的时候触发,也就是先监听这个事件,收到通知有数据了再去读取就好了:
const rns = new RandomNumberStream(5); rns.on('readable', () => { let chunk; while((chunk = rns.read()) !== null){ console.log(chunk); } });
这样可以读取到数据,值得注意的一点是并不是每次调用 read() 方法都可以返回数据,前面提到了如果可用的数据没有达到 size 那么返回 null,所以在程序中加了个判断
const stream = fs.createReadStream('/dev/input/event0'); stream.on('readable', callback);复制代码
在流动模式会不会有这样的问题:可读流在创建好的时候就生产数据了,如果在绑定 readable 事件之前就生产了某些数据,触发了 readable 事件,在极端情况下会造成数据丢失吗?
事实并不会,按照 NodeJS event loop 程序创建流和调用事件监听在一个事件队列里面,生产数据和事件监听都是异步操作,而 on 监听事件使用了 process.nextTick
会保证在数据生产之前被绑定好,相关知识可以看定时器章节中对 event loop 的解读
到这里可能对 data事件、readable事件触发时机, read() 方法每次读多少数据,什么时候返回 null 还有一定的疑问,在后续可写流章节会在 back pressure 部分结合源码介绍相关机制
推荐教程:《JS教程》
Das obige ist der detaillierte Inhalt vonDie Verwendung von Stream-lesbaren Streams in Node.js. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!