Home > Web Front-end > JS Tutorial > Nodejs Stream Data Stream User Manual_node.js

Nodejs Stream Data Stream User Manual_node.js

WBOY
Release: 2016-05-16 15:05:02
Original
1734 people have browsed it

1. Introduction

This article introduces the basic method of developing programs using node.js streams.

<code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."
Doug McIlroy. October 11, 1964</code>
Copy after login

The earliest exposure to Stream was from the early days of Unix. Decades of practice have proven that Stream ideas can easily develop some huge systems. In unix, Stream is implemented through |; in node, as a built-in stream module, many core modules and third-party modules are used. Like Unix, the main operation of node Stream is also .pipe(). Users can use the anti-pressure mechanism to control the balance of reading and writing.

Stream can provide developers with a unified interface that can be reused and control the read and write balance between streams through the abstract Stream interface.

2. Why use Stream

I/O in node is asynchronous, so reading and writing to disk and network require callback functions to read data. The following is a simple code for a file download server:

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);</code>
Copy after login

These codes can achieve the required functions, but the service needs to cache the entire file data into memory before sending the file data. If the "data.txt" file is large and the amount of concurrency is large, a lot of memory will be wasted. Because the user needs to wait until the entire file is cached in memory before accepting the file data, this results in a rather poor user experience. But fortunately, both parameters (req, res) are Stream, so we can use fs.createReadStream() instead of fs.readFile():

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);</code>
Copy after login
The

.pipe() method listens to the 'data' and 'end' events of fs.createReadStream(), so that the "data.txt" file does not need to cache the entire file, and a data block can be sent immediately after the client connection is completed. to the client. Another benefit of using .pipe() is that it can solve the read-write imbalance problem caused when the client delay is very large. If you want to compress the file before sending it, you can use a third-party module:

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);</code>
Copy after login

This way the file will be compressed by browsers that support gzip and deflate. The oppressor module handles all content-encoding.

Stream makes developing programs easy.

3. Basic concepts

There are five basic Streams: readable, writable, transform, duplex, and "classic".

3-1, pipe

All types of Stream collection use .pipe() to create an input-output pair, receive a readable stream src and output its data to a writable stream dst, as follows:

<code class="hljs perl">src.pipe(dst)</code>
Copy after login
The

.pipe(dst) method returns the dst stream, so that multiple .pipe() can be used in succession, as follows:

<code class="hljs perl">a.pipe( b ).pipe( c ).pipe( d )</code>
Copy after login

Functions the same as the following code:

<code class="hljs perl">a.pipe( b );
b.pipe( c );
c.pipe( d );</code>
Copy after login

3-2, readable streams

By calling the .pipe() method of Readable streams, the data of Readable streams can be written to a Writable, Transform, or Duplex stream.

<code class="hljs perl">readableStream.pipe( dst )</code>
Copy after login

1>Create readable stream

Here we create a readable stream!

<code class="hljs perl">var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
$ node read0.js
beep boop
</code>
Copy after login

rs.push( null ) notifies the data receiver that the data has been sent.

Notice that we did not call rs.pipe(process.stdout); before pushing all the data content into the readable stream, but all the data content we pushed in was still completely output. This is because the readable stream All pushed data will be cached until the receiver reads the data. But in many cases, it is better to push the data to the readable stream only when the data is received, rather than caching the entire data. Let’s rewrite the ._read() function:

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);</code>
<code class="hljs bash">$ node read1.js
abcdefghijklmnopqrstuvwxyz</code>
Copy after login

The above code achieves by overriding the _read() method to push data into the readable stream only when the data recipient requests data. The _read() method can also receive a size parameter indicating the data size requested by the data request, but the readable stream can ignore this parameter as needed.

Note that we can also inherit readable streams using util.inherits(). In order to illustrate that the _read() method is only called when the data receiver requests data, we make a delay when pushing data into the readable stream, as follows:

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);</code>
Copy after login

Run the program using the following command and we find that the _read() method is only called 5 times:

<code class="hljs bash">$ node read2.js | head -c5
abcde
_read() called 5 times</code>
Copy after login

The reason for using a timer is that the system needs time to send a signal to notify the program to close the pipe. Process.stdout.on('error', fn) is used to handle the system sending a SIGPIPE signal because the header command closes the pipe, because this will cause process.stdout to trigger the EPIPE event. If you want to create a readable stream that can push data in any form, just set the parameter objectMode to true when creating the stream, for example: Readable({ objectMode: true }).

2>Read readable stream data

In most cases we simply use the pipe method to redirect the data from the readable stream to another form of stream, but in some cases it may be more useful to read data directly from the readable stream. As follows:

<code class="hljs php">process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<buffer 0a="" 61="" 62="" 63="">
<buffer 0a="" 64="" 65="" 66="">
<buffer 0a="" 67="" 68="" 69="">
null</buffer></buffer></buffer></code>

Copy after login

当可读流中有数据可读取时,流会触发'readable' 事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});</code>
Copy after login

如下运行程序发现,输出结果并不完全!

<code class="hljs bash">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<buffer 61="" 62="" 63="">
<buffer 0a="" 64="" 65="">
<buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>
Copy after login

这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)可以达到这个目的。

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});</code>
Copy after login

这次运行结果如下:

<code class="hljs xml">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<buffer 0a="" 64="" 65="">
<buffer 0a="" 68="" 69=""></buffer></buffer></code>
Copy after login

我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:

<code class="hljs javascript">var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'</code>
Copy after login

当然,有很多模块可以实现这个功能,如:split 。

3-3、writable streams

writable streams只可以作为.pipe()函数的目的参数。如下代码:

<code class="hljs perl">src.pipe( writableStream );</code>
Copy after login

1>创建 writable stream

重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。

<code class="hljs php">var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
$ (echo beep; sleep 1; echo boop) | node write0.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>
Copy after login

第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream

<code class="hljs css">Writable({ objectMode: true })</code>
Copy after login

2>写数据到 writable stream

调用writable stream的.write(data)方法即可完成数据写入。

<code class="hljs vala">process.stdout.write('beep boop\n');</code>
Copy after login

调用.end()方法通知writable stream 数据已经写入完成。

<code class="hljs javascript">var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
$ node writing1.js 
$ cat message.txt
beep boop</code>
Copy after login

如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark,.write(data)方法会返回false。当缓冲区可写的时候,writable stream会触发'drain' 事件。

3-4、classic streams

Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data" 事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。

1>classic readable streams

Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable 的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:

<code class="hljs javascript">var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
$ node classic0.js
ABCDEFGHIJ</code>
Copy after login

如果要从classic readable stream中读取数据,注册"data" 和"end"两个事件的回调函数即可,代码如下:

<code class="hljs php">process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>
Copy after login

需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个 延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听”data” 和”end”事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听”data” 和”end” 事件,如下面的代码:

<code class="hljs php">var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
$ (echo beep; sleep 1; echo boop) | node through.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>
Copy after login

或者也可以使用concat-stream来缓存整个流的内容:

<code class="hljs oxygene">var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }</code>
Copy after login

当然如果你非要自己监听"data" 和"end"事件,那么你可以在写数据的流不可写的时候使用.pause()方法暂停Classic readable streams继续触发”data” 事件。等到写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取
数据。

2>classic writable streams

Classic writable streams 非常简单。只有 .write(buf), .end(buf)和.destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。

4、transform

transform是一个对读入数据过滤然输出的流。

5、duplex

duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:

<code class="hljs livecodeserver">a.pipe(b).pipe(a)</code>
Copy after login

以上内容是小编给大家介绍的Nodejs Stream 数据流使用手册,希望对大家有所帮助!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template