In diesem Artikel wird hauptsächlich die Quellcode-Analyse der Node.js-Pipe vorgestellt. Jetzt werde ich sie mit Ihnen teilen und Ihnen eine Referenz geben. Folgen wir dem Herausgeber, um einen Blick darauf zu werfen
Aus den beiden vorherigen Artikeln haben wir gelernt. Wenn Sie lesbare Daten in Writable schreiben möchten, müssen Sie die Daten zunächst manuell in den Speicher einlesen und sie dann in Writable schreiben. Mit anderen Worten: Jedes Mal, wenn Sie Daten übergeben, müssen Sie den folgenden Vorlagencode schreiben
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) })
Zur Vereinfachung der Verwendung stellt Node.js die Methode „pipe()“ bereit , damit wir Daten elegant übergeben können
readable.pipe(writable)
Jetzt wollen wir sehen, wie es implementiert wird
Pipe
Zuerst müssen Sie die Methode „pipe()“ von Readable aufrufen
// lib/_stream_readable.js Readable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 记录 Writable switch (state.pipesCount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipesCount += 1; // ... src.once('end', endFn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保证 error 事件触发时,onerror 首先被执行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 触发 Writable 的 pipe 事件 dest.emit('pipe', src); // 将 Readable 改为 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
Wenn Sie die Funktion „pipe()“ ausführen, zeichnen Sie zuerst „Writable“ in „state.pipes“ auf, und Binden Sie dann relevante Ereignisse. Wenn sich das Readable nicht im Flow-Modus befindet, rufen Sie „resume()“ auf, um das Readable in den Flow-Modus zu ändern.
Pass data
Readable from Lösen Sie nach dem Abrufen der Daten das Datenereignis aus und führen Sie ondata()
ondata() aus. Zugehöriger Code:
// lib/_stream_readable.js // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况 // 详情见 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况 if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // 进入 pause 模式 src.pause(); } }
in ondata (chunk)-Funktion: Schreiben Sie Daten über dest.write(chunk) in Writable
. Zu diesem Zeitpunkt kann src.push(chunk) innerhalb von _write() aufgerufen oder aus der Pipe entfernt werden, was dazu führt, dass „awaitDrain“ um ein Vielfaches erhöht wird Kann nicht gelöscht werden, Readable bleibt hängen
Wenn keine Daten mehr auf das Writable geschrieben werden können, wechselt das Readable in den Pausenmodus, bis alle Drain-Ereignisse ausgelöst werden
Lösen Sie das Drain-Ereignis aus und führen Sie es aus ondrain()
// lib/_stream_readable.js var ondrain = pipeOnDrain(src); function pipeOnDrain(src) { return function() { var state = src._readableState; debug('pipeOnDrain', state.awaitDrain); if (state.awaitDrain) state.awaitDrain--; // awaitDrain === 0,且有 data 监听器 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); } }; }
Wenn jedes Drain-Ereignis ausgelöst wird, wird WaitDrain reduziert, bis WaitDrain 0 ist. Rufen Sie zu diesem Zeitpunkt flow(src) auf, um das Readable in den Flussmodus zu versetzen
Zu diesem Zeitpunkt ist der gesamte Datenübertragungszyklus eingerichtet und die Daten fließen kontinuierlich entlang des Zyklus in das Writable, bis alle Daten werden geschrieben
unpipe
Egal ob während des Schreibvorgangs ein Fehler auftritt, unpipe() wird am Ende ausgeführt
// lib/_stream_readable.js // ... function unpipe() { debug('unpipe'); src.unpipe(dest); } // ... Readable.prototype.unpipe = function(dest) { var state = this._readableState; var unpipeInfo = { hasUnpiped: false }; // 啥也没有 if (state.pipesCount === 0) return this; // 只有一个 if (state.pipesCount === 1) { if (dest && dest !== state.pipes) return this; // 没有指定就 unpipe 所有 if (!dest) dest = state.pipes; state.pipes = null; state.pipesCount = 0; state.flowing = false; if (dest) dest.emit('unpipe', this, unpipeInfo); return this; } // 没有指定就 unpipe 所有 if (!dest) { var dests = state.pipes; var len = state.pipesCount; state.pipes = null; state.pipesCount = 0; state.flowing = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, unpipeInfo); return this; } // 找到指定 Writable,并 unpipe var index = state.pipes.indexOf(dest); if (index === -1) return this; state.pipes.splice(index, 1); state.pipesCount -= 1; if (state.pipesCount === 1) state.pipes = state.pipes[0]; dest.emit('unpipe', this, unpipeInfo); return this; };
Lesbar Die Funktion .prototype.unpipe() wählt eine Ausführungsstrategie basierend auf der Eigenschaft state.pipes und dem Parameter dest aus. Schließlich wird das Unpipe-Ereignis von dest ausgelöst
Nachdem das Unpipe-Ereignis ausgelöst wurde, rufen Sie onunpipe() auf, um die relevanten Daten zu bereinigen
// lib/_stream_readable.js function onunpipe(readable, unpipeInfo) { debug('onunpipe'); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; // 清理相关数据 cleanup(); } } }
Ende
Im gesamten Pipeline-Prozess ist Readable die aktive Partei (verantwortlich für den gesamten Pipeline-Prozess: einschließlich Datenübertragung, Unpipe und Ausnahmebehandlung) und Writable ist die passive Partei ( muss nur das Drain-Ereignis auslösen)
Um den Pipe-Prozess zusammenzufassen:
Führen Sie zuerst readbable.pipe(writable) aus, um lesbar und beschreibbar zu verbinden
Wenn lesbar ist. Beim Lesen von Daten schreibt readable.emit('data') die Daten in beschreibbar
Wenn writable.write(chunk) false zurückgibt, Wechseln Sie in den Pausenmodus und warten Sie, bis das Drain-Ereignis ausgelöst wird
Nachdem alle Drain-Ereignisse ausgelöst wurden, wechseln Sie erneut in den Flow-Modus und schreiben Sie Daten
Unabhängig davon, ob das Schreiben der Daten abgeschlossen oder unterbrochen ist, ruft unpipe(( )
unpipe() Readable.prototype.unpipe() auf, löst das Unpipe-Ereignis von dest aus und bereinigt zugehörige Daten
Das obige ist der detaillierte Inhalt vonEinführung in die Pipe()-Methode von Node.js. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!