首頁 > web前端 > js教程 > 主體

淺談Node.js:理解stream

高洛峰
發布: 2016-12-28 13:22:31
原創
1125 人瀏覽過

Stream在node.js中是一個抽象的接口,基於EventEmitter,也是一種Buffer的高階封裝,用來處理流資料。流模組便是提供各種API讓我們可以很簡單的使用Stream。

流分為四種類型,如下所示:

Readable,可讀流

Writable,可寫流

Duplex,讀寫流

Transform,擴展的Duplex,可修改寫入的數據,可修改寫入的數據

1、Readable可讀流

透過stream.Readable可建立一個可讀流,它有兩種模式:暫停和流動。


在流動模式下,將自動從下游系統讀取資料並使用data事件輸出;暫停模式下,必須顯示呼叫stream.read()方法讀取數據,並觸發data事件。


所有的可讀流最開始都是暫停模式,可以透過以下方法切換到流動模式:

監聽'data'事件

呼叫stream.resume()方法

方法將呼叫stream.pipe()方法將調用資料輸出到一個可寫流Writable

同樣地,也可以切換到暫停模式,有兩種方法:

如果沒有設定pipe目標,呼叫stream.pause()方法即可。

如果設定了pipe目標,則需要移除所有的data監聽和調用stream.unpipe()方法

在Readable對像中有一個_readableSate的對象,透過該對象可以得知流目前處於什麼模式,如下所示:

readable._readableState.flowing = null,沒有數據消費者,流不產生數據

readable._readableState.flowing = true,處於流動模式

readable._readableState.flowing = false,處於暫停模式

為什麼使用流取資料


對於小文件,使用fs.readFile()方法讀取資料更方便,但需要讀取大文件的時候,例如幾G大小的文件,使用該方法將消耗大量的內存,甚至使程式崩潰。在這種情況下,使用流來處理是更合適的,採用分段讀取,便不會造成內存的'爆倉'問題。

data事件


在stream提供資料區塊給消費者時觸發,有可能是切換到流動模式的時候,也有可能是呼叫readable.read()方法且有有效資料區塊的時候,使用如下所示:

const fs = require('fs');
 
const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
rs.on('data',(chunk)=>{
  chunkArr.push(chunk);
  chunkLen+=chunk.length;
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
登入後複製

readable事件


當流中有可用數據能被讀取時觸發,分為兩種,新的可用的數據和到達流的末尾,前者stream.read()方法返回可用數據,後者回傳null,如下:

const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
 
rs.on('readable',()=>{
  var chunk = null;
  //这里需要判断是否到了流的末尾
  if((chunk = rs.read()) !== null){
    chunkArr.push(chunk);
    chunkLen+=chunk.length;
  }
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
登入後複製

pause和resume方法

stream.pause()方法讓流進入暫停模式,並停止'data'事件觸發,stream.resume()方法使流進入流動模式,並恢復'data'事件觸發,也可以用來消費所有數據,如下所示:

const rs = fs.createReadStream('./下载.png');
rs.on('data',(chunk)=>{
  console.log(`接收到${chunk.length}字节数据...`);
  rs.pause();
  console.log(`数据接收将暂停1.5秒.`);
  setTimeout(()=>{
    rs.resume();
  },1000);
});
rs.on('end',(chunk)=>{
  console.log(`数据接收完毕`);
});
登入後複製

pipe(destination[, options])方法

pipe()方法綁定一個可寫流到可讀流上,並自動切換到流動模式,將所有資料輸出到可寫流,以及做好了資料流的管理,不會發生資料遺失的問題,使用如下所示:

const rs = fs.createReadStream('./app.js');
rs.pipe(process.stdout);
登入後複製

以上介紹了多種可讀流的資料消費的方法,但對於一個可讀流,最好只選擇其中的一種,建議使用pipe()方法。

2、Writable可寫流

所有的可寫流都是基於stream.Writable類別創建的,創建之後便可將資料寫入該流。


write(chunk[, encoding][, callback])方法


write()方法向可寫流中寫入數據,參數意義:

chunk,為字串或buffer

encoding,若chunk,為字串為字串,則是chunk的編碼

callback,當前chunk資料寫入磁碟時的回呼函數

該方法的傳回值為布林值,如果為false,則表示需要寫入的資料區塊已快取並且此時快取的大小超出highWaterMark閥值,否則為true。

 使用如下所示:

const ws = fs.createWriteStream('./test.txt');
ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');});
ws.end('done.')
登入後複製

背壓機制


如果可寫流的寫入速度跟不上可讀流的讀取速度,write方法添加的資料將被緩存,逐漸增多,導致佔用內存。我們希望的是消耗一個數據,再去讀取一個數據,這樣記憶體就維持在一個層次。如何做到這一點?可以利用write方法的回傳值來判斷可寫流的快取狀態和'drain'事件,及時切換可讀流的模式,如下所示:

function copy(src,dest){
  src = path.resolve(src);
  dest = path.resolve(dest);
  const rs = fs.createReadStream(src);
  const ws = fs.createWriteStream(dest);
  console.log('正在复制中...');
  const stime = +new Date();
  rs.on('data',(chunk)=>{
    if(null === ws.write(chunk)){
      rs.pause();
    }
  });
  ws.on('drain',()=>{
    rs.resume();
  });
  rs.on('end',()=>{
    const etime = +new Date();
    console.log(`已完成,用时:${(etime-stime)/1000}秒`);
    ws.end();
  });
  function calcProgress(){
     
  }
}
copy('./CSS权威指南 第3版.pdf','./javascript.pdf');
登入後複製

drain事件


如果Writable.write()方法傳回false ,則drain事件將會觸發,上面的背壓機制已經使用了該事件。

finish事件


在呼叫stream.end()方法之後且所有快取區的資料都被寫入到下游系統,就會觸發該事件,如下所示:

const ws = fs.createWriteStream('./alphabet.txt');
const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
ws.on('finish',()=>{
  console.log('done.');
});
for(let letter of alphabetStr.split()){
  ws.write(letter);
}
ws.end();//必须调用
登入後複製

end([chunk][, encoding][, callback])方法

end()方法被呼叫之後,便不能再呼叫stream.write()方法寫入數據,負責將拋出錯誤。

3、Duplex讀寫流

Duplex流同時實現了Readable與Writable類的接口,既是可讀流,也是可寫流。例如'zlib streams'、'crypto streams'、'TCP sockets'等都是Duplex流。

4、Transform流

Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。

5、四种流的实现

stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

| Use-case | Class | Method(s) to implement |
| ------------- |-------------| -----|
| Reading only | Readable | _read |
| Writing only | Writable | _write, _writev |
| Reading and writing | Duplex | _read, _write, _writev |
| Operate on written data, then read the result | Transform | _transform, _flush |

Readable流实现

如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:

const Readable = require('stream').Readable;
const util = require('util');
const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
/*function AbReadable(){
  if(!this instanceof AbReadable){
    return new AbReadable();
  }
  Readable.call(this);
}
util.inherits(AbReadable,Readable);
AbReadable.prototype._read = function(){
  if(!alphabetArr.length){
    this.push(null);
  }else{
    this.push(alphabetArr.shift());
  }
};
 
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*class AbReadable extends Readable{
  constructor(){
    super();
  }
  _read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
}
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*const abReadable = new Readable({
  read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
});
abReadable.pipe(process.stdout);*/
 
const abReadable = Readable();
abReadable._read = function(){
  if (!alphabetArr.length) {
    this.push(null);
  } else {
    this.push(alphabetArr.shift());
  }
}
abReadable.pipe(process.stdout);
登入後複製

以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。

Writable流实现

我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):

/*class MyWritable extends Writable{
  constructor(){
    super();
  }
  _write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
}
const myWritable = new MyWritable();*/
const myWritable = new Writable({
  write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
});
myWritable.on('finish',()=>{
  process.stdout.write('done');
})
myWritable.write('a');
myWritable.write('b');
myWritable.write('c');
myWritable.end();
登入後複製

Duplex流实现

实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:

class MyDuplex extends Duplex{
  constructor(){
    super();
    this.source = [];
  }
  _read(){
    if (!this.source.length) {
      this.push(null);
    } else {
      this.push(this.source.shift());
    }
  }
  _write(chunk,encoding,cb){
    this.source.push(chunk);
    cb();
  }
}
 
const myDuplex = new MyDuplex();
myDuplex.on('finish',()=>{
  process.stdout.write('write done.')
});
myDuplex.on('end',()=>{
  process.stdout.write('read done.')
});
myDuplex.write('\na\n');
myDuplex.write('c\n');
myDuplex.end('b\n');
myDuplex.pipe(process.stdout);
登入後複製

上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。

Transform流实现

实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:

class MyTransform extends Transform{
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    chunk = (chunk+'').toUpperCase();
    callback(null,chunk);
  }
}
const myTransform = new MyTransform();
myTransform.write('hello world!');
myTransform.end();
myTransform.pipe(process.stdout);
登入後複製

上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:

_transform(chunk, encoding, callback){
  chunk = (chunk+'').toUpperCase()
  this.push(chunk)
  callback();
}
登入後複製

Object Mode流实现

我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:

const rs = Readable();
rs.push('a');
rs.push('b');
rs.push(null);
rs.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>
 
const rs1 = Readable({objectMode:!0});
rs1.push(&#39;a&#39;);
rs1.push(&#39;b&#39;);
rs1.push(null);
rs1.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//a与b
登入後複製

下面利用Transform流实现一个简单的CSS压缩工具,如下所示:

function minify(src,dest){
  const transform = new Transform({
    transform(chunk,encoding,cb){
      cb(null,(chunk.toString()).replace(/[\s\r\n\t]/g,&#39;&#39;));
    }
  });
  fs.createReadStream(src,{encoding:&#39;utf8&#39;}).pipe(transform).pipe(fs.createWriteStream(dest));
}
minify(&#39;./reset.css&#39;,&#39;./reset.min.css&#39;);
登入後複製

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持PHP中文网。

更多浅谈Node.js:理解stream相关文章请关注PHP中文网!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
最新問題
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!