Node.js에 대한 간략한 토론: 스트림 이해

高洛峰
풀어 주다: 2016-12-28 13:22:31
원래의
1128명이 탐색했습니다.

Stream은 EventEmitter를 기반으로 하는 node.js의 추상 인터페이스이자 스트리밍 데이터를 처리하는 데 사용되는 Buffer의 고급 캡슐화이기도 합니다. Stream 모듈은 Stream을 쉽게 사용할 수 있도록 다양한 API를 제공합니다.

스트림은 다음과 같이 네 가지 유형으로 나뉩니다.

읽기 가능, 읽기 가능 스트림

쓰기 가능, 쓰기 가능 스트림

이중, 읽기 및 쓰기 스트림

변환, 확장 이중, 작성된 데이터 수정 가능

1. 읽기 가능한 읽기 가능한 스트림

읽기 가능한 스트림은 스트림을 통해 생성될 수 있습니다. 읽기 가능, 두 가지 모드가 있습니다. 일시 정지와 흐름.

흐름 모드에서는 데이터가 다운스트림 시스템에서 자동으로 읽혀지고 일시 중지 모드에서 데이터 이벤트를 사용하여 출력됩니다. 데이터를 읽고 트리거하려면 stream.read() 메서드를 명시적으로 호출해야 합니다. 데이터 이벤트.

읽을 수 있는 모든 스트림은 처음에는 일시 중지 모드이며 다음 방법을 통해 흐름 모드로 전환할 수 있습니다.

'데이터' 이벤트 수신

호출 스트림 .resume() 메서드

stream.pipe() 메서드를 호출하여 쓰기 가능한 스트림으로 데이터를 출력합니다.

마찬가지로 일시 중지 모드로 전환할 수도 있습니다.

파이프 대상이 설정되지 않은 경우 stream.pause() 메서드를 호출하면 됩니다.

파이프 대상이 설정된 경우 모든 데이터 리스너를 제거하고 stream.unpipe() 메서드를 호출해야 합니다.

Readable 개체에는 _readableSate 개체가 있으며 이를 통해 아래와 같이 스트림이 현재 어떤 모드에 있는지 확인하세요.

readable._readableState.flowing = null, 데이터 소비자가 없으며 스트림이 데이터를 생성하지 않습니다.

readable._readableState.flowing = true, flowing Mode

readable._readableState.flowing = false, 일시 정지 모드

스트리밍을 사용하여 데이터를 가져오는 이유

작은 파일의 경우 다음을 사용하세요. fs.readFile() 메소드를 사용하여 데이터를 검색하는 것이 더 편리하지만, 수 G 크기의 파일과 같은 대용량 파일을 읽어야 할 때 이 메소드를 사용하면 많은 메모리를 소비하고 프로그램이 작동하지 않을 수도 있습니다. 충돌. 이 경우 처리를 위해 스트림을 사용하는 것이 더 적절합니다. 분할된 읽기를 사용하면 메모리 '폭발' 문제가 발생하지 않습니다.

데이터 이벤트

는 스트림이 소비자에게 데이터 블록을 제공할 때 트리거되거나 reader.read() 메서드가 호출될 때 발생할 수 있습니다. 유효한 데이터 블록을 사용하는 경우에는 다음을 사용하십시오.

const fs = require('fs');
 
const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
rs.on('data',(chunk)=>{
  chunkArr.push(chunk);
  chunkLen+=chunk.length;
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
로그인 후 복사

읽을 수 있는 이벤트

는 스트림에 읽을 수 있는 사용 가능한 데이터가 있을 때 발생합니다. 두 가지 유형, 사용 가능한 데이터 중 새로운 것이 있고 스트림의 끝에 도달하면 전자의 stream.read() 메서드는 사용 가능한 데이터를 반환하고 후자는 아래와 같이 null을 반환합니다.

const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
 
rs.on('readable',()=>{
  var chunk = null;
  //这里需要判断是否到了流的末尾
  if((chunk = rs.read()) !== null){
    chunkArr.push(chunk);
    chunkLen+=chunk.length;
  }
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});
로그인 후 복사

일시 중지 및 재개

stream .pause() 메서드는 스트림을 일시 중지 모드로 전환하고 'data' 이벤트 트리거를 중지합니다. stream.resume() 메서드는 스트림을 흐름 모드로 전환하고 'data' 이벤트 트리거를 재개합니다. . 아래와 같이 모든 데이터를 소비하는 데에도 사용할 수 있습니다.

const rs = fs.createReadStream('./下载.png');
rs.on('data',(chunk)=>{
  console.log(`接收到${chunk.length}字节数据...`);
  rs.pause();
  console.log(`数据接收将暂停1.5秒.`);
  setTimeout(()=>{
    rs.resume();
  },1000);
});
rs.on('end',(chunk)=>{
  console.log(`数据接收完毕`);
});
로그인 후 복사

pipe(destination[, options]) 메서드

pipe() 메서드는 쓰기 가능한 스트림을 읽기 가능한 스트림에 바인딩합니다. stream을 실행하고 자동으로 flow 모드로 전환되며 모든 Data는 쓰기 가능한 스트림으로 출력되며 데이터 손실이 발생하지 않도록 데이터 스트림이 잘 관리됩니다.

const rs = fs.createReadStream('./app.js');
rs.pipe(process.stdout);
로그인 후 복사

위에서 소개한 내용은 다음과 같습니다. 여러 개의 읽기 가능한 스트림 메서드의 데이터 소비가 발생하지만 읽기 가능한 스트림의 경우 그 중 하나만 선택하는 것이 가장 좋으며, Pipe() 메서드를 사용하는 것이 좋습니다.

2. 쓰기 가능한 스트림

모든 쓰기 가능한 스트림은 스트림을 기반으로 생성됩니다. Writable 클래스를 생성한 후 스트림에 데이터를 쓸 수 있습니다.

write(chunk[, 인코딩][, callback]) 메서드

write() 메서드는 쓰기 가능한 스트림에 데이터를 씁니다. 매개변수 의미:

chunk, string or buffer

encoding, 청크가 문자열이면 해당 청크의 인코딩

callback, 현재 청크 데이터가 디스크에 기록될 때의 콜백 함수

이 메소드의 반환 값은 Boolean 값입니다. false이면 기록해야 하는 데이터 블록이 캐시되어 있으며 이때 캐시 크기가 highWaterMark 임계값을 초과했다는 의미이고, 그렇지 않으면 true입니다. .

다음을 사용하세요.

const ws = fs.createWriteStream('./test.txt');
ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');});
ws.end('done.')
로그인 후 복사

배압 메커니즘

쓰기 가능한 스트림의 쓰기 속도가 읽기 가능한 스트림의 읽기 속도를 따라갈 수 없는 경우, 쓰기 방식으로 추가된 데이터는 캐시되어 점차 증가하므로 많은 양의 메모리를 차지하게 됩니다. 우리가 바라는 것은 한 데이터 조각을 소비한 다음 다른 데이터 조각을 읽어서 메모리가 동일한 수준으로 유지되는 것입니다. 어떻게 해야 하나요? 아래와 같이 write 메소드의 반환 값을 사용하여 쓰기 가능한 스트림의 캐시 상태와 'drain' 이벤트를 확인하고 시간에 맞춰 읽기 가능한 스트림의 모드를 전환할 수 있습니다.

function copy(src,dest){
  src = path.resolve(src);
  dest = path.resolve(dest);
  const rs = fs.createReadStream(src);
  const ws = fs.createWriteStream(dest);
  console.log('正在复制中...');
  const stime = +new Date();
  rs.on('data',(chunk)=>{
    if(null === ws.write(chunk)){
      rs.pause();
    }
  });
  ws.on('drain',()=>{
    rs.resume();
  });
  rs.on('end',()=>{
    const etime = +new Date();
    console.log(`已完成,用时:${(etime-stime)/1000}秒`);
    ws.end();
  });
  function calcProgress(){
     
  }
}
copy('./CSS权威指南 第3版.pdf','./javascript.pdf');
로그인 후 복사

배수 이벤트

Writable.write() 메서드가 false를 반환하면 배수 이벤트가 트리거되며 위의 배압 메커니즘은 이 이벤트를 사용했습니다.

finish 이벤트

stream.end() 메서드가 호출되고 모든 버퍼 데이터가 다운스트림 시스템에 기록된 후 아래와 같이 이 이벤트가 트리거됩니다.

const ws = fs.createWriteStream('./alphabet.txt');
const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
ws.on('finish',()=>{
  console.log('done.');
});
for(let letter of alphabetStr.split()){
  ws.write(letter);
}
ws.end();//必须调用
로그인 후 복사

end([chunk][, 인코딩][, callback]) 메서드

end() 메서드가 호출된 후에는 stream.write() 메서드를 더 이상 호출할 수 없습니다. write 담당 데이터가 오류를 발생시킵니다.

3. 이중 읽기 및 쓰기 스트림

이중 스트림은 Readable 및 Writable 클래스의 인터페이스를 동시에 구현하며 읽기 가능한 스트림이자 쓰기 가능한 스트림입니다. 예를 들어 'zlib 스트림', '암호화 스트림', 'TCP 소켓' 등은 모두 이중 스트림입니다.

4. 흐름 전환

Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。

5、四种流的实现

stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

| Use-case | Class | Method(s) to implement |
| ------------- |-------------| -----|
| Reading only | Readable | _read |
| Writing only | Writable | _write, _writev |
| Reading and writing | Duplex | _read, _write, _writev |
| Operate on written data, then read the result | Transform | _transform, _flush |

Readable流实现

如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:

const Readable = require('stream').Readable;
const util = require('util');
const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
/*function AbReadable(){
  if(!this instanceof AbReadable){
    return new AbReadable();
  }
  Readable.call(this);
}
util.inherits(AbReadable,Readable);
AbReadable.prototype._read = function(){
  if(!alphabetArr.length){
    this.push(null);
  }else{
    this.push(alphabetArr.shift());
  }
};
 
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*class AbReadable extends Readable{
  constructor(){
    super();
  }
  _read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
}
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*const abReadable = new Readable({
  read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
});
abReadable.pipe(process.stdout);*/
 
const abReadable = Readable();
abReadable._read = function(){
  if (!alphabetArr.length) {
    this.push(null);
  } else {
    this.push(alphabetArr.shift());
  }
}
abReadable.pipe(process.stdout);
로그인 후 복사

以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。

Writable流实现

我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):

/*class MyWritable extends Writable{
  constructor(){
    super();
  }
  _write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
}
const myWritable = new MyWritable();*/
const myWritable = new Writable({
  write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
});
myWritable.on('finish',()=>{
  process.stdout.write('done');
})
myWritable.write('a');
myWritable.write('b');
myWritable.write('c');
myWritable.end();
로그인 후 복사

Duplex流实现

实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:

class MyDuplex extends Duplex{
  constructor(){
    super();
    this.source = [];
  }
  _read(){
    if (!this.source.length) {
      this.push(null);
    } else {
      this.push(this.source.shift());
    }
  }
  _write(chunk,encoding,cb){
    this.source.push(chunk);
    cb();
  }
}
 
const myDuplex = new MyDuplex();
myDuplex.on('finish',()=>{
  process.stdout.write('write done.')
});
myDuplex.on('end',()=>{
  process.stdout.write('read done.')
});
myDuplex.write('\na\n');
myDuplex.write('c\n');
myDuplex.end('b\n');
myDuplex.pipe(process.stdout);
로그인 후 복사

上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。

Transform流实现

实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:

class MyTransform extends Transform{
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    chunk = (chunk+'').toUpperCase();
    callback(null,chunk);
  }
}
const myTransform = new MyTransform();
myTransform.write('hello world!');
myTransform.end();
myTransform.pipe(process.stdout);
로그인 후 복사

上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:

_transform(chunk, encoding, callback){
  chunk = (chunk+'').toUpperCase()
  this.push(chunk)
  callback();
}
로그인 후 복사

Object Mode流实现

我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:

const rs = Readable();
rs.push('a');
rs.push('b');
rs.push(null);
rs.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>
 
const rs1 = Readable({objectMode:!0});
rs1.push(&#39;a&#39;);
rs1.push(&#39;b&#39;);
rs1.push(null);
rs1.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//a与b
로그인 후 복사

下面利用Transform流实现一个简单的CSS压缩工具,如下所示:

function minify(src,dest){
  const transform = new Transform({
    transform(chunk,encoding,cb){
      cb(null,(chunk.toString()).replace(/[\s\r\n\t]/g,&#39;&#39;));
    }
  });
  fs.createReadStream(src,{encoding:&#39;utf8&#39;}).pipe(transform).pipe(fs.createWriteStream(dest));
}
minify(&#39;./reset.css&#39;,&#39;./reset.min.css&#39;);
로그인 후 복사

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持PHP中文网。

更多浅谈Node.js:理解stream相关文章请关注PHP中文网!

관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
최신 이슈
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿
회사 소개 부인 성명 Sitemap
PHP 중국어 웹사이트:공공복지 온라인 PHP 교육,PHP 학습자의 빠른 성장을 도와주세요!