Streams in Node.js are very powerful. They provide support for processing potentially large files, and also abstract data processing and transmission in some scenarios. Because it is so easy to use, in actual combat we often write some tool functions/libraries based on it. However, often due to our negligence of certain characteristics of streams, the functions/libraries we write will not meet the requirements in some cases. desired effect, or lay some hidden mines. This article will provide two tips that I think are useful when writing stream-based tools.
1. Be wary of EVENTEMITTER memory leaks
In a function that may be called multiple times, if you need to add an event listener to the stream to perform certain operations. Then you need to be wary of memory leaks caused by adding listeners:
'use strict'; const fs = require('fs'); const co = require('co'); function getSomeDataFromStream (stream) { let data = stream.read(); if (data) return Promise.resolve(data); if (!stream.readable) return Promise.resolve(null); return new Promise((resolve, reject) => { stream.once('readable', () => resolve(stream.read())); stream.on('error', reject); stream.on('end', resolve); }) } let stream = fs.createReadStream('/Path/to/a/big/file'); co(function *() { let chunk; while ((chunk = yield getSomeDataFromStream(stream)) !== null) { console.log(chunk); } }).catch(console.error);
In the above code, the getSomeDataFromStream function will complete the Promise when the stream reports an error or has no data by listening to the error event and end event. However, when executing the code, we will soon see the alarm message in the console: (node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit., because we are in every Each time this function is called, an additional error event listener and end event listener are added to the incoming stream. In order to avoid this potential memory leak, we need to ensure that after each function execution, all additional listeners added by this call are cleared to keep the function pollution-free:
function getSomeDataFromStream (stream) { let data = stream.read(); if (data) return Promise.resolve(data); if (!stream.readable) return Promise.resolve(null); return new Promise((resolve, reject) => { stream.once('readable', onData); stream.on('error', onError); stream.on('end', done); function onData () { done(); resolve(stream.read()); } function onError (err) { done(); reject(err); } function done () { stream.removeListener('readable', onData); stream.removeListener('error', onError); stream.removeListener('end', done); } }) }
Second, ensure that the callback of the tool function is called after processing the data
Tool functions often provide a callback function parameter to the outside world. After all the data in the stream is processed, it is triggered with a specified value. The usual approach is to hang the callback function in the end event of the stream, but if it is processed Functions are time-consuming asynchronous operations, and callback functions may be called before all data is processed:
'use strict'; const fs = require('fs'); let stream = fs.createReadStream('/Path/to/a/big/file'); function processSomeData (stream, callback) { stream.on('data', (data) => { // 对数据进行一些异步耗时操作 setTimeout(() => console.log(data), 2000); }); stream.on('end', () => { // ... callback() }) } processSomeData(stream, () => console.log('end'));
The above code callback may be called when all the data has not been processed, because the end event of the stream is only triggered when the data in the stream is read. So we need to additionally check whether the data has been processed:
function processSomeData (stream, callback) { let count = 0; let finished = 0; let isEnd = false; stream.on('data', (data) => { count++; // 对数据进行一些异步耗时操作 setTimeout(() => { console.log(data); finished++; check(); }, 2000); }); stream.on('end', () => { isEnd = true; // ... check(); }) function check () { if (count === finished && isEnd) callback() } }
In this way, the callback will be triggered after all data has been processed.