This article mainly introduces the source code analysis of Node.js pipe. The editor thinks it is quite good. Now I will share it with you and give it as a reference. Let’s follow the editor and take a look.
From the previous two articles, we learned. If you want to write Readable data to Writable, you must first manually read the data into memory and then write it to Writable. In other words, every time you pass data, you need to write the following template code
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) })
For ease of use, Node.js provides the pipe() method, which allows us to Pass data elegantly
readable.pipe(writable)
Now, let’s see how it is implemented
pipe
First you need to call the pipe() method of Readable
// lib/_stream_readable.js Readable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 记录 Writable switch (state.pipesCount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipesCount += 1; // ... src.once('end', endFn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保证 error 事件触发时,onerror 首先被执行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 触发 Writable 的 pipe 事件 dest.emit('pipe', src); // 将 Readable 改为 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
When executing the pipe() function, first record the Writable into state.pipes, and then bind the relevant event, finally if the Readable is not in flow mode, call resume() to change the Readable to the flow mode
Transfer data
Readable triggers after getting the data from the data source data event, execute ondata()
ondata() related code:
// lib/_stream_readable.js // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况 // 详情见 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况 if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // 进入 pause 模式 src.pause(); } }
In the ondata(chunk) function, pass dest.write(chunk) Write data to Writable
At this time, src.push(chunk) may be called or unpipeed inside _write(), which will cause awaitDrain to increase multiple times and cannot be cleared, causing Readable to get stuck
When no more data can be written to Writable, Readable will enter pause mode until all drain events are triggered
Trigger drain events and execute ondrain()
// lib/_stream_readable.js var ondrain = pipeOnDrain(src); function pipeOnDrain(src) { return function() { var state = src._readableState; debug('pipeOnDrain', state.awaitDrain); if (state.awaitDrain) state.awaitDrain--; // awaitDrain === 0,且有 data 监听器 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); } }; }
When each drain event is triggered, awaitDrain will be reduced until awaitDrain is 0. At this time, call flow(src) to make the Readable enter the flow mode
At this point, the entire data transfer cycle has been established, and the data will flow into the Writable continuously along the cycle until all data is written.
unpipe
No matter whether there is an error during the writing process, unpipe() will be executed in the end
// lib/_stream_readable.js // ... function unpipe() { debug('unpipe'); src.unpipe(dest); } // ... Readable.prototype.unpipe = function(dest) { var state = this._readableState; var unpipeInfo = { hasUnpiped: false }; // 啥也没有 if (state.pipesCount === 0) return this; // 只有一个 if (state.pipesCount === 1) { if (dest && dest !== state.pipes) return this; // 没有指定就 unpipe 所有 if (!dest) dest = state.pipes; state.pipes = null; state.pipesCount = 0; state.flowing = false; if (dest) dest.emit('unpipe', this, unpipeInfo); return this; } // 没有指定就 unpipe 所有 if (!dest) { var dests = state.pipes; var len = state.pipesCount; state.pipes = null; state.pipesCount = 0; state.flowing = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, unpipeInfo); return this; } // 找到指定 Writable,并 unpipe var index = state.pipes.indexOf(dest); if (index === -1) return this; state.pipes.splice(index, 1); state.pipesCount -= 1; if (state.pipesCount === 1) state.pipes = state.pipes[0]; dest.emit('unpipe', this, unpipeInfo); return this; };
Readable.prototype The .unpipe() function selects an execution strategy based on the state.pipes property and dest parameter. Finally, the unpipe event of dest will be triggered
After the unpipe event is triggered, call onunpipe() to clean up the relevant data
// lib/_stream_readable.js function onunpipe(readable, unpipeInfo) { debug('onunpipe'); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; // 清理相关数据 cleanup(); } } }
End
In the entire pipeline process, Readable is the active party (responsible for the entire pipeline process: including data transfer, unpipe and exception handling), and Writable is the passive party (only needs to trigger the drain event)
Summary Let’s take a look at the pipeline process:
First execute readbable.pipe(writable) to connect readable and writable
When there is data in readable , readable.emit('data'), writes data to writable
If writable.write(chunk) returns false, enter pause mode and wait for the drain event to trigger
After all drain events are triggered, enter the flow mode again and write data
No matter the data writing is completed or an interruption occurs, unpipe()# will be called in the end.
The above is the detailed content of Node.js pipe() method introduction. For more information, please follow other related articles on the PHP Chinese website!