Home Web Front-end JS Tutorial Node.js pipe() method introduction

Node.js pipe() method introduction

Aug 15, 2017 am 10:14 AM
javascript node.js pipe

This article mainly introduces the source code analysis of Node.js pipe. The editor thinks it is quite good. Now I will share it with you and give it as a reference. Let’s follow the editor and take a look.

From the previous two articles, we learned. If you want to write Readable data to Writable, you must first manually read the data into memory and then write it to Writable. In other words, every time you pass data, you need to write the following template code


readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})
Copy after login

For ease of use, Node.js provides the pipe() method, which allows us to Pass data elegantly


readable.pipe(writable)
Copy after login

Now, let’s see how it is implemented

pipe

First you need to call the pipe() method of Readable


// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 记录 Writable
 switch (state.pipesCount) {
  case 0:
   state.pipes = dest;
   break;
  case 1:
   state.pipes = [state.pipes, dest];
   break;
  default:
   state.pipes.push(dest);
   break;
 }
 state.pipesCount += 1;

 // ...

  src.once('end', endFn);

 dest.on('unpipe', onunpipe);
 
 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保证 error 事件触发时,onerror 首先被执行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);
 
 // ...

 dest.once('finish', onfinish);

 // ...

 // 触发 Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 将 Readable 改为 flow 模式
 if (!state.flowing) {
  debug('pipe resume');
  src.resume();
 }

 return dest;
};
Copy after login

When executing the pipe() function, first record the Writable into state.pipes, and then bind the relevant event, finally if the Readable is not in flow mode, call resume() to change the Readable to the flow mode

Transfer data

Readable triggers after getting the data from the data source data event, execute ondata()

ondata() related code:


// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
 // 详情见 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
   // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
   if (((state.pipesCount === 1 && state.pipes === dest) ||
      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
     ) && 
     !cleanedUp) {
    debug('false write response, pause', src._readableState.awaitDrain);
    src._readableState.awaitDrain++;
    increasedAwaitDrain = true;
   }
   // 进入 pause 模式
   src.pause();
  }
 }
Copy after login

In the ondata(chunk) function, pass dest.write(chunk) Write data to Writable

At this time, src.push(chunk) may be called or unpipeed inside _write(), which will cause awaitDrain to increase multiple times and cannot be cleared, causing Readable to get stuck

When no more data can be written to Writable, Readable will enter pause mode until all drain events are triggered

Trigger drain events and execute ondrain()


// lib/_stream_readable.js

 var ondrain = pipeOnDrain(src);

 function pipeOnDrain(src) {
  return function() {
   var state = src._readableState;
   debug('pipeOnDrain', state.awaitDrain);
   if (state.awaitDrain)
    state.awaitDrain--;
   // awaitDrain === 0,且有 data 监听器
   if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
    state.flowing = true;
    flow(src);
   }
  };
 }
Copy after login

When each drain event is triggered, awaitDrain will be reduced until awaitDrain is 0. At this time, call flow(src) to make the Readable enter the flow mode

At this point, the entire data transfer cycle has been established, and the data will flow into the Writable continuously along the cycle until all data is written.

unpipe

No matter whether there is an error during the writing process, unpipe() will be executed in the end


// lib/_stream_readable.js

// ...

 function unpipe() {
  debug('unpipe');
  src.unpipe(dest);
 }

// ...

Readable.prototype.unpipe = function(dest) {
 var state = this._readableState;
 var unpipeInfo = { hasUnpiped: false };

 // 啥也没有
 if (state.pipesCount === 0)
  return this;

 // 只有一个
 if (state.pipesCount === 1) {
  if (dest && dest !== state.pipes)
   return this;
  // 没有指定就 unpipe 所有
  if (!dest)
   dest = state.pipes;

  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;
  if (dest)
   dest.emit('unpipe', this, unpipeInfo);
  return this;
 }

 // 没有指定就 unpipe 所有
 if (!dest) {
  var dests = state.pipes;
  var len = state.pipesCount;
  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;

  for (var i = 0; i < len; i++)
   dests[i].emit(&#39;unpipe&#39;, this, unpipeInfo);
  return this;
 }

 // 找到指定 Writable,并 unpipe
 var index = state.pipes.indexOf(dest);
 if (index === -1)
  return this;

 state.pipes.splice(index, 1);
 state.pipesCount -= 1;
 if (state.pipesCount === 1)
  state.pipes = state.pipes[0];

 dest.emit(&#39;unpipe&#39;, this, unpipeInfo);

 return this;
};
Copy after login

Readable.prototype The .unpipe() function selects an execution strategy based on the state.pipes property and dest parameter. Finally, the unpipe event of dest will be triggered

After the unpipe event is triggered, call onunpipe() to clean up the relevant data


// lib/_stream_readable.js

 function onunpipe(readable, unpipeInfo) {
  debug(&#39;onunpipe&#39;);
  if (readable === src) {
   if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
    unpipeInfo.hasUnpiped = true;
    // 清理相关数据
    cleanup();
   }
  }
 }
Copy after login

End

In the entire pipeline process, Readable is the active party (responsible for the entire pipeline process: including data transfer, unpipe and exception handling), and Writable is the passive party (only needs to trigger the drain event)

Summary Let’s take a look at the pipeline process:

  • First execute readbable.pipe(writable) to connect readable and writable

  • When there is data in readable , readable.emit('data'), writes data to writable

  • If writable.write(chunk) returns false, enter pause mode and wait for the drain event to trigger

  • After all drain events are triggered, enter the flow mode again and write data

  • No matter the data writing is completed or an interruption occurs, unpipe()# will be called in the end.

  • ##unpipe() calls Readable.prototype.unpipe(), triggers the unpipe event of dest, and cleans up related data

The above is the detailed content of Node.js pipe() method introduction. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
4 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: How To Unlock Everything In MyRise
1 months ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

How to implement an online speech recognition system using WebSocket and JavaScript How to implement an online speech recognition system using WebSocket and JavaScript Dec 17, 2023 pm 02:54 PM

How to use WebSocket and JavaScript to implement an online speech recognition system Introduction: With the continuous development of technology, speech recognition technology has become an important part of the field of artificial intelligence. The online speech recognition system based on WebSocket and JavaScript has the characteristics of low latency, real-time and cross-platform, and has become a widely used solution. This article will introduce how to use WebSocket and JavaScript to implement an online speech recognition system.

WebSocket and JavaScript: key technologies for implementing real-time monitoring systems WebSocket and JavaScript: key technologies for implementing real-time monitoring systems Dec 17, 2023 pm 05:30 PM

WebSocket and JavaScript: Key technologies for realizing real-time monitoring systems Introduction: With the rapid development of Internet technology, real-time monitoring systems have been widely used in various fields. One of the key technologies to achieve real-time monitoring is the combination of WebSocket and JavaScript. This article will introduce the application of WebSocket and JavaScript in real-time monitoring systems, give code examples, and explain their implementation principles in detail. 1. WebSocket technology

How to use JavaScript and WebSocket to implement a real-time online ordering system How to use JavaScript and WebSocket to implement a real-time online ordering system Dec 17, 2023 pm 12:09 PM

Introduction to how to use JavaScript and WebSocket to implement a real-time online ordering system: With the popularity of the Internet and the advancement of technology, more and more restaurants have begun to provide online ordering services. In order to implement a real-time online ordering system, we can use JavaScript and WebSocket technology. WebSocket is a full-duplex communication protocol based on the TCP protocol, which can realize real-time two-way communication between the client and the server. In the real-time online ordering system, when the user selects dishes and places an order

How to implement an online reservation system using WebSocket and JavaScript How to implement an online reservation system using WebSocket and JavaScript Dec 17, 2023 am 09:39 AM

How to use WebSocket and JavaScript to implement an online reservation system. In today's digital era, more and more businesses and services need to provide online reservation functions. It is crucial to implement an efficient and real-time online reservation system. This article will introduce how to use WebSocket and JavaScript to implement an online reservation system, and provide specific code examples. 1. What is WebSocket? WebSocket is a full-duplex method on a single TCP connection.

JavaScript and WebSocket: Building an efficient real-time weather forecasting system JavaScript and WebSocket: Building an efficient real-time weather forecasting system Dec 17, 2023 pm 05:13 PM

JavaScript and WebSocket: Building an efficient real-time weather forecast system Introduction: Today, the accuracy of weather forecasts is of great significance to daily life and decision-making. As technology develops, we can provide more accurate and reliable weather forecasts by obtaining weather data in real time. In this article, we will learn how to use JavaScript and WebSocket technology to build an efficient real-time weather forecast system. This article will demonstrate the implementation process through specific code examples. We

Simple JavaScript Tutorial: How to Get HTTP Status Code Simple JavaScript Tutorial: How to Get HTTP Status Code Jan 05, 2024 pm 06:08 PM

JavaScript tutorial: How to get HTTP status code, specific code examples are required. Preface: In web development, data interaction with the server is often involved. When communicating with the server, we often need to obtain the returned HTTP status code to determine whether the operation is successful, and perform corresponding processing based on different status codes. This article will teach you how to use JavaScript to obtain HTTP status codes and provide some practical code examples. Using XMLHttpRequest

How to use insertBefore in javascript How to use insertBefore in javascript Nov 24, 2023 am 11:56 AM

Usage: In JavaScript, the insertBefore() method is used to insert a new node in the DOM tree. This method requires two parameters: the new node to be inserted and the reference node (that is, the node where the new node will be inserted).

How to get HTTP status code in JavaScript the easy way How to get HTTP status code in JavaScript the easy way Jan 05, 2024 pm 01:37 PM

Introduction to the method of obtaining HTTP status code in JavaScript: In front-end development, we often need to deal with the interaction with the back-end interface, and HTTP status code is a very important part of it. Understanding and obtaining HTTP status codes helps us better handle the data returned by the interface. This article will introduce how to use JavaScript to obtain HTTP status codes and provide specific code examples. 1. What is HTTP status code? HTTP status code means that when the browser initiates a request to the server, the service

See all articles