この記事では主に Node.js パイプのソースコード分析を紹介します。編集者がそれを紹介し、参考にさせていただきます。編集者をフォローして見てみましょう
前の 2 つの記事から、私たちは学びました。 Readable データを Writable に書き込む場合は、まず手動でデータをメモリに読み取り、それから Writable に書き込む必要があります。つまり、データを渡すたびに、次のテンプレート コードを記述する必要があります
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) })
使いやすいように、Node.js には Pipe() メソッドが用意されており、データをエレガントに渡すことができます
readable.pipe(writable)
それでは、その実装方法を見てみましょう
pipe
まず、Readableのpipe()メソッドを呼び出す必要があります
// lib/_stream_readable.js Readable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 记录 Writable switch (state.pipesCount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipesCount += 1; // ... src.once('end', endFn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保证 error 事件触发时,onerror 首先被执行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 触发 Writable 的 pipe 事件 dest.emit('pipe', src); // 将 Readable 改为 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
pipe()関数を実行するときは、まずWritableを記録します最後に、Readable がフロー モードでない場合は、Readable がデータ ソースからデータを取得した後、resume() を呼び出してデータを渡します
。 、データイベントをトリガーし、ondata()
ondata() 関連コードを実行します:// lib/_stream_readable.js // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况 // 详情见 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况 if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // 进入 pause 模式 src.pause(); } }
ondata(chunk) 関数では、dest.write(chunk) を通じてデータを Writable に書き込みます
// lib/_stream_readable.js var ondrain = pipeOnDrain(src); function pipeOnDrain(src) { return function() { var state = src._readableState; debug('pipeOnDrain', state.awaitDrain); if (state.awaitDrain) state.awaitDrain--; // awaitDrain === 0,且有 data 监听器 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); } }; }
各ドレイン イベントがトリガーされると、awaitDrain が 0 になるまで awaitDrain が減らされます。このとき、flow(src)を呼び出して、Readableをフローモードにします
unpipe
書き込みプロセス中にエラーが発生したかどうかに関係なく、unpipe()
// lib/_stream_readable.js // ... function unpipe() { debug('unpipe'); src.unpipe(dest); } // ... Readable.prototype.unpipe = function(dest) { var state = this._readableState; var unpipeInfo = { hasUnpiped: false }; // 啥也没有 if (state.pipesCount === 0) return this; // 只有一个 if (state.pipesCount === 1) { if (dest && dest !== state.pipes) return this; // 没有指定就 unpipe 所有 if (!dest) dest = state.pipes; state.pipes = null; state.pipesCount = 0; state.flowing = false; if (dest) dest.emit('unpipe', this, unpipeInfo); return this; } // 没有指定就 unpipe 所有 if (!dest) { var dests = state.pipes; var len = state.pipesCount; state.pipes = null; state.pipesCount = 0; state.flowing = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, unpipeInfo); return this; } // 找到指定 Writable,并 unpipe var index = state.pipes.indexOf(dest); if (index === -1) return this; state.pipes.splice(index, 1); state.pipesCount -= 1; if (state.pipesCount === 1) state.pipes = state.pipes[0]; dest.emit('unpipe', this, unpipeInfo); return this; };
Readable.prototype.unpipe() 関数は、state.pipes 属性と dest に基づいて実行戦略を選択します。パラメータ。最後に、dest の unpipe イベントがトリガーされ、onunpipe() が呼び出されて関連データがクリーンアップされます (データ転送、アンパイプ、例外処理を含む)。Writable は受動的なパーティーです (トリガーする必要があるだけです)。ドレイン イベント)
読み取り可能にデータがある場合は、readable.emit('data') が書き込みますwritable.write(chunk) が false を返した場合は、一時停止モードに入り、ドレイン イベントがトリガーされるのを待ちます
すべてのドレイン イベントがトリガーされた後、再びフロー モードに入り、データを書き込みます
以上がNode.jsのpipe()メソッドの紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。