Node.js의 4개 스트림은 무엇인가요? 배압 문제란 무엇입니까? 다음 기사에서는 Node.js의 네 가지 주요 스트림을 이해하고 역압 문제와 해결 방법을 소개하겠습니다. 도움이 되기를 바랍니다.
A에서 B로 무언가를 이동하는 방법은 무엇입니까?
들어올려 목적지까지 이동한 후 내려놓으면 됩니다.
이것의 무게가 1톤이라면 어떨까요?
그럼 파트별로 옮겨보세요.
사실 IO는 네트워크 IO, 파일 IO 등을 옮기는 것을 의미합니다. 데이터 양이 적다면 모든 콘텐츠를 직접 전송할 수 있지만, 콘텐츠가 많으면 한꺼번에 메모리에 로드하는 것입니다. 시간이 충돌하고 속도가 느려질 것입니다. 이때 부분적으로 처리하면 됩니다.
다양한 언어가 기본적으로 스트림 API를 구현했으며 Node.js도 이를 구현했습니다. 스트림 API는 아래에서 더 일반적으로 사용됩니다.
이 기사는 다음 질문에 답할 것입니다:
Node.js의 4가지 스트림은 무엇입니까?
생성기는 읽기 가능한 스트림과 어떻게 결합됩니까?
스트림의 일시 중지 및 흐름
무엇입니까? 배압 문제, 해결 방법
[추천 학습: "nodejs tutorial"]
한 곳에서 스트리밍하는 직관적인 느낌
분명히 유출측과 유입측이 있습니다. 유출측은 읽기 가능한 스트림(readable)이고, 유입측은 쓰기 가능한 스트림(writable)입니다.
물론 들어오고 나가는 스트림도 있습니다. 이를 이중 흐름이라고 합니다.
들어오고 나가는 콘텐츠를 변환할 수 있나요? , 이러한 종류의 흐름을 변환 흐름(transform)
이중 흐름이라고 합니다. 이중 흐름의 유입 및 유출 내용은 관련될 필요가 없지만 변환 흐름의 유입 및 유출은 관련됩니다. 둘의 차이점입니다.
Stream api
Node.js에서 제공하는 스트림은 위에서 소개한 4가지 유형입니다.
const stream = require('stream'); // 可读流 const Readable = stream.Readable; // 可写流 const Writable = stream.Writable; // 双工流 const Duplex = stream.Duplex; // 转换流 const Transform = stream.Transform;
모두 구현해야 할 메서드가 있습니다.
Readable은 콘텐츠를 반환하려면 _read 메서드를 구현해야 합니다
Writable은 콘텐츠를 승인하기 위해 _write 메소드를 구현해야 합니다
Duplex는 콘텐츠를 승인하고 반환하기 위해 _read 및 _write 메소드를 구현해야 합니다.
Transform은 수신된 콘텐츠를 변환하고 변환하기 위해 _transform 메소드를 구현해야 합니다. return it
따로 살펴보겠습니다.
Readable
Readable _read 메소드를 구현하려면 push를 통해 특정 데이터를 반환해야 합니다.
const Stream = require('stream'); const readableStream = Stream.Readable(); readableStream._read = function() { this.push('阿门阿前一棵葡萄树,'); this.push('阿东阿东绿的刚发芽,'); this.push('阿东背着那重重的的壳呀,'); this.push('一步一步地往上爬。') this.push(null); } readableStream.on('data', (data)=> { console.log(data.toString()) }); readableStream.on('end', () => { console.log('done~'); });
Null을 푸시하면 스트림의 끝을 의미합니다.
실행 효과는 다음과 같습니다.
읽기 가능 생성은 상속을 통해서도 수행할 수 있습니다.
const Stream = require('stream'); class ReadableDong extends Stream.Readable { constructor() { super(); } _read() { this.push('阿门阿前一棵葡萄树,'); this.push('阿东阿东绿的刚发芽,'); this.push('阿东背着那重重的的壳呀,'); this.push('一步一步地往上爬。') this.push(null); } } const readableStream = new ReadableDong(); readableStream.on('data', (data)=> { console.log(data.toString()) }); readableStream.on('end', () => { console.log('done~'); });
읽기 가능 스트림은 콘텐츠를 생성하므로 생성기와 자연스럽게 결합될 수 있습니다.
const Stream = require('stream'); class ReadableDong extends Stream.Readable { constructor(iterator) { super(); this.iterator = iterator; } _read() { const next = this.iterator.next(); if(next.done) { return this.push(null); } else { this.push(next.value) } } } function *songGenerator() { yield '阿门阿前一棵葡萄树,'; yield '阿东阿东绿的刚发芽,'; yield '阿东背着那重重的的壳呀,'; yield '一步一步地往上爬。'; } const songIterator = songGenerator(); const readableStream = new ReadableDong(songIterator); readableStream.on('data', (data)=> { console.log(data.toString()) }); readableStream.on('end', () => { console.log('done~'); });
읽기 가능 스트림은 다음과 같습니다. , _read 메소드를 구현하여 콘텐츠를 반환합니다.
Writable
Writable _write 메서드를 구현하려면 작성된 내용을 받습니다.
const Stream = require('stream'); const writableStream = Stream.Writable(); writableStream._write = function (data, enc, next) { console.log(data.toString()); // 每秒写一次 setTimeout(() => { next(); }, 1000); } writableStream.on('finish', () => console.log('done~')); writableStream.write('阿门阿前一棵葡萄树,'); writableStream.write('阿东阿东绿的刚发芽,'); writableStream.write('阿东背着那重重的的壳呀,'); writableStream.write('一步一步地往上爬。'); writableStream.end();
작성된 내용을 받아 인쇄하고, 다음으로 작성된 내용을 처리하기 위해 next를 호출합니다. 여기서 다음 호출은 비동기식이며 빈도를 제어할 수 있습니다.
한 동안 실행한 후 작성된 내용은 실제로 정상적으로 처리될 수 있습니다.
이것은 쓰기 가능한 스트림이며 작성된 내용은 _write 메소드를 구현하여 처리됩니다.
Duplex
Duplex는 읽기 및 쓰기가 가능합니다. _read와 _write를 동시에 구현하세요
const Stream = require('stream'); var duplexStream = Stream.Duplex(); duplexStream._read = function () { this.push('阿门阿前一棵葡萄树,'); this.push('阿东阿东绿的刚发芽,'); this.push('阿东背着那重重的的壳呀,'); this.push('一步一步地往上爬。') this.push(null); } duplexStream._write = function (data, enc, next) { console.log(data.toString()); next(); } duplexStream.on('data', data => console.log(data.toString())); duplexStream.on('end', data => console.log('read done~')); duplexStream.write('阿门阿前一棵葡萄树,'); duplexStream.write('阿东阿东绿的刚发芽,'); duplexStream.write('阿东背着那重重的的壳呀,'); duplexStream.write('一步一步地往上爬。'); duplexStream.end(); duplexStream.on('finish', data => console.log('write done~'));
Readable 스트림과 Writable 스트림의 기능을 통합합니다. 이것이 이중 스트림 Duplex입니다.
Transform
이중 스트림은 읽고 쓸 수 있지만 때로는 들어오는 콘텐츠를 변환한 후 흘러나와야 하는 경우가 있습니다. 변형됩니다.
Transform 스트림은 _transform API를 구현해야 합니다. 우리는 콘텐츠를 반전시키는 변환 스트림을 구현합니다:
const Stream = require('stream'); class TransformReverse extends Stream.Transform { constructor() { super() } _transform(buf, enc, next) { const res = buf.toString().split('').reverse().join(''); this.push(res) next() } } var transformStream = new TransformReverse(); transformStream.on('data', data => console.log(data.toString())) transformStream.on('end', data => console.log('read done~')); transformStream.write('阿门阿前一棵葡萄树'); transformStream.write('阿东阿东绿的刚发芽'); transformStream.write('阿东背着那重重的的壳呀'); transformStream.write('一步一步地往上爬'); transformStream.end() transformStream.on('finish', data => console.log('write done~'));
잠시 동안 실행하면 효과는 다음과 같습니다:
我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。
背压
但是 read 和 write 都是异步的,如果两者速率不一致呢?
如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。
而如果 Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。
这种读入速率大于写入速率的现象叫做“背压”,或者“负压”。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。
这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。
解决背压
怎么解决这种读写速率不一致的问题呢?
当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。
readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。
当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。
readableStream.readableFlowing = false; let data; while((data = readableStream.read()) != null) { console.log(data.toString()); }
但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。
当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:
true: 数据已经写入目标
false:目标不可写入,暂时放在缓冲区
我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:
const rs = fs.createReadStream(src); const ws = fs.createWriteStream(dst); rs.on('data', function (chunk) { if (ws.write(chunk) === false) { rs.pause(); } }); rs.on('end', function () { ws.end(); }); ws.on('drain', function () { rs.resume(); });
这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。
pipe 有背压问题么?
平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。
const rs = fs.createReadStream(src); const ws = fs.createWriteStream(dst); rs.pipe(ws);
流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。
Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。
创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)
当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。
流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!
更多编程相关知识,请访问:编程视频!!
위 내용은 '배압' 문제를 해결하기 위한 Node.js의 4가지 주요 스트림에 대한 심층적인 이해의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!