目錄
引入Stream
Stream 的分類
Readable Stream
Writable Stream
Duplex Stream 与 Transform Stream
pipe
实现流
实现可读流
实现可写流
实现双工流
实现转换流
首頁 web前端 js教程 聊聊Node.js中如何實作Stream流(可讀、可寫、雙工和轉換流)

聊聊Node.js中如何實作Stream流(可讀、可寫、雙工和轉換流)

Dec 22, 2021 pm 07:23 PM
node.js stream串流

這篇文章帶大家了解一下Node中的之Stream,介紹一下引入Stream,實現可讀流、可寫流、雙工流和轉換流的方法,希望對大家有所幫助!

聊聊Node.js中如何實作Stream流(可讀、可寫、雙工和轉換流)

引入Stream

假設我們有這麼一個需求,我們需要複製一個檔案中的內容到另一個檔案中,我們會寫出以下程式碼

const fs = require('fs');
const path = require('path');

const copy = (source, target) => {
    fs.readFile(path.resolve(source), (err, data) => {
        if(err) {
            throw new Error(err.toString());
            return;
        }
        fs.writeFile(path.resolve(target), data, (err) => {
            if(!err) {
                console.log("复制成功!");
            }
        })
    })
}
登入後複製

上面的程式碼很簡單,就是先讀取source 檔案裡面的內容,然後將內容寫入到target 檔案中。它的特點是需要讀取完 source 裡面的所有內容,然後將內容寫入到 target 中。

這樣做就有一個缺點,當我們讀取大檔案時,可能會發生記憶體不夠用的情況,因為它會先將檔案的所有內容都讀取到記憶體;另外還就是時間,一次讀取一個大檔案到內存,是需要比較長的時間的,用戶可能會有卡頓的感覺。

另一種解決方法就是邊讀邊寫,讀取部分檔案內容,然後將內容寫入到新檔案中,這樣在記憶體中的資料只是部分內容,不會佔有太多的記憶體,由於是邊讀編寫,使用者可以很快的得到回應,提高使用者體驗。

網上找到一幅動圖來形象的展示使用流前後資料的流動情況

聊聊Node.js中如何實作Stream流(可讀、可寫、雙工和轉換流)

聊聊Node.js中如何實作Stream流(可讀、可寫、雙工和轉換流)

##Node.js 提供我們Stream 的API,它是專門用來處理大檔案的。因為資料是一部分一部分的處理,就像是水流一樣,所以這個模組的名稱稱為 Stream。

const fs = require('fs');

function copy(source, target) {
    const rs = fs.createReadStream(source);
    const ws = fs.createWriteStream(target);

    rs.on('data', data => {
        ws.write(data);
    });

    rs.on('end', () => {
        ws.end();
    });
}
登入後複製

上面程式碼的細節將在後文揭曉。

Stream 的分類

Stream 可以分成四類

    Readable:可讀流,資料的提供者
  • Writeable:可寫流,資料的消費者
  • Duplex:可寫可讀流(雙工流)
  • Transform:是Duplex 的特殊情況,轉換流,對輸入的資料進行處理,然後輸出
可讀流與可寫流是基礎,常見的可讀流與可寫流如下

可讀流可寫入流HTTP Request#HTTP Reponsefs read streamsfs write streams#process.stdinprocess.stdout# TCP socketsTCP socketszlib streamszlib streamscrypto streams#crypto streams

Stream 是 EventEmitter 的实例,有自定义的事件。

Readable Stream

可读流有两个模式,暂停模式与流动模式。当我们创建一个流时,如果我们监听了 readable 事件,它就会来到暂停模式,在暂停模式下,它会不断的读取数据到缓冲区,当读取到的数据超过预设的大小时,它由属性 highWaterMark 指定(默认为 64kB),便会触发 readable 事件,readable 事件的触发有两种情况:

  • 缓存区中的数据达到 highWaterMark 预设的大小
  • 数据源的数据已经被读取完毕
const fs = require('fs');

const rs = fs.createReadStream('a.txt', {
    highWaterMark: 1 // 缓存区最多存储 1 字节
});

rs.on('readable', () => {
    let data;
    while(data=rs.read()) {
        console.log(data.toString());
    }
})
登入後複製

上面的程序设置 highWaterMark 为 1,即每次读取到一个字节便会触发 readable 命令,每次当触发 readable 命令时,我们调用可读流的 read([size]) 方法从缓冲区中读取数据(读取到的数据为 Buffer),然后打印到控制台。

当我们为可读流绑定 data 事件时,可读流便会切换到流动状态,当位于流动状态时,可读流会自动的从文件中读取内容到缓冲区,当缓冲区中的内容大于设定的 highWaterMark 的大小时,便会触发 data 事件,将缓冲区中的数据传递给 data 事件绑定的函数。以上过程会自动不断进行。当文件中的所有内容都被读取完成时,那么就会触发 end 事件。

const fs = require('fs');

const rs = fs.createReadStream('a.txt', {
    highWaterMark: 2
});

rs.on('data', data => {
    console.log(data.toString());
});

rs.on('end', () => {
    console.log("文件读取完毕!");
});
登入後複製

暂停模式像是手动步枪,而流动模式则像是自动步枪。暂停模式与流动模式也可以相互切换,通过 pause() 可以从流动状态切换到暂停状态,通过 resume() 则可以从暂停模式切换到流动模式。

可读流的一个经典实例就是 http 中的请求对象 req,下面的程序展示了通过监听 reqdata 事件来读取 HTTP 请求体中的内容

const http = require('http');

const app = http.createServer();

app.on('request', (req, res) => {
    let datas = [];
    req.on('data', data => {
        datas.push(data);
    });

    req.on('end', () => {
        req.body = Buffer.concat(datas);
        // 当读取完 body 中的内容之后,将内容返回给客户端
        res.end(req.body);
    });
})

app.listen(3000, () => {
    console.log("服务启动在 3000 端口... ...");
})
登入後複製

聊聊Node.js中如何實作Stream流(可讀、可寫、雙工和轉換流)

Writable Stream

可写流与可读流相似,当我们向可写流写入数据时(通过可写流的 write() 方法写数据),会直接将数据写入到文件中,如果写入的数据比较慢的话,那就就会将数据写入到缓冲区,当缓冲区中的内容达到 highWaterMark 设定的大小时,write 方法就会返回一个 false,表明不能接受更多的数据了。

当缓冲区中的数据全部被消费完了(写入了文件中或者被别的流消费了),那么就会触发 drain 事件。

const fs = require('fs');

const ws = fs.createWriteStream('b.txt', {
    highWaterMark: 16 * 1024
});

function writeMillionTimes(writer, data, encoding, callback) {
    let i = 10000;
    
    write();

    function write() {
        // 表示是否可以向可写流中写入数据
        let ok = true;
        while(i-- > 0 && ok) {
            // 当 writer.write() 方法返回  false 表示不可写入数据
            ok = writer.write(data, encoding, i === 0 ? callback : null);
        }

        if(i > 0) {
            // 说明 ok 为 false,即不能向缓冲区中写入内容了
            console.log("drain", i);
            // 监听 drain 事件,当队列消费完毕时继续调用 write() 方法写入
            writer.once('drain', write);
        }
    }
}

writeMillionTimes(ws, 'simple', 'utf-8', () => {
    console.log("end");
})
登入後複製

输出为

drain 7268
drain 4536
drain 1804
end
登入後複製

说明有三次缓冲区中的内容达到了 16KB,可以验算上面的数字之间的差值,在乘以 6(simple 的字节数),大小大约为 16 * 1024 左右,如

(72684536)6=1639216384=161024(7268 - 4536) * 6 = 16392 \approx 16384 = 16 * 1024

我们还可以调用可写流的 end() 方法,表示将缓存中的内容清空写入文件,并关闭文件,此时会触发 close 事件

const fs = require('fs');

const ws = fs.createWriteStream('b.txt');

ws.write('Hello');
ws.write('World');
ws.end('!');

ws.on('close', () => {
    console.log("close"); // close
})
登入後複製

当调用 end() 方法之后就不能调用 write() 方法了,否则会报错

const fs = require('fs');

const ws = fs.createWriteStream('b.txt');

ws.write('Hello');
ws.write('World');
ws.end('!');

ws.write('write again'); // Error [ERR_STREAM_WRITE_AFTER_END]: write after end
登入後複製

当调用 end() 方法之后,并且数据缓冲区中的数据已经写入之后会触发可写流的 finish 事件

const fs = require('fs');

const ws = fs.createWriteStream('b.txt');

ws.write('Hello');
ws.write('World');
ws.end('!');

ws.on('close', () => {
    console.log("close");
});

ws.on('finish', () => {
    console.log("finish");
});
登入後複製

打印结果是

finish
close
登入後複製

说明 finish 事件会在 close 事件之前被触发。

可写流的经典例子就是 http 模块的响应对象 res,下面的程序演示了当请求到来时,我们读取一个 html 页面返回给客户端

const http = require('http');
const fs = require('fs');

const app = http.createServer();

app.on('request', (req, res) => {
    const rs = fs.createReadStream('index.html');
    
    rs.on('data', data => {
        res.write(data);
    })

    rs.on('end', () => {
        res.end()
    });
});

app.listen(3000, () => {
    console.log("服务启动在 3000 端口 ... ...");
})
登入後複製

Duplex Stream 与 Transform Stream

Duplex,即双工的意思,它既可以接收数据,也可以输出数据,它的输入和输出之间可以没有任何的关系,就像是一个部件内部有两个独立的系统。Duplex 继承了可读流(Readable),并且拥有可写流(Writable)的所有方法。

Transform Stream 继承了 Duplex Stream,它同样具有可读流与可写流的能力,并且它的输出与输入之间是有关系的,中间做了一次转换。常见的转换流有 zlibcrypto

出于文章结构的考虑,在这里不详细讲解这两个流,在后文中会实现这两个流,以加深对这两个流的理解。

pipe

我们可以混合使用可读流与可写流来进行文件的复制

const fs = require('fs');

function copy(source, target) {
    const rs = fs.createReadStream(source);
    const ws = fs.createWriteStream(target);

    rs.on('data', data => {
        ws.write(data);
    });

    rs.on('end', () => {
        ws.end();
    });
}

copy('a.txt', 'b.txt');
登入後複製

但是上面的写法却不被建议使用,因为没有考虑到可读流与可写流速度之间的差异,如果可读流输出数据的速度大于可写流写入数据的速度,这个时候就会有数据一直堆压在缓存区,导致占用过高的内存,专业术语叫做积压。

我们需要改善上面的程序,具体做法就是当 write() 方法返回 false 时,我们切换可读流的模式为暂停模式,当可写流触发了 drain 事件时,我们便将可读流的状态切换为流动模式

const fs = require('fs');

function copy(source, target) {
    const rs = fs.createReadStream(source);
    const ws = fs.createWriteStream(target);

    rs.on('data', data => {
        if (!ws.write(data)) {
            rs.pause();
        }
    });

    rs.on('end', () => {
        ws.end();
    });

    ws.on('drain', () => {
        rs.resume();
    })
}
登入後複製

那是不是每次我们使用流都需要写这么多的代码,当然不是。官方为可读流提供了一个 pipe(ws) 方法,pipe 方法接收一个可写流,它的作用就是将可读流中数据写入到可写流中去,并且它内部有做速度差异的处理。所以上面的写法可以改为下面的版本

const fs = require('fs');

function copy(source, target) {
    const rs = fs.createReadStream(source);
    const ws = fs.createWriteStream(target);

    rs.pipe(ws);
}
登入後複製

当我们调用 pipe 方法时,会触发可写流的 pipe 事件。pipe 的实现参考如下

Readable.prototype.pipe = function(ws) {
    this.on('data', data => {
        if (!ws.write(data)) {
            this.pause();
        }
    });

    ws.on('drain', () => {
        this.resume();
    });

    // 触发 pipe 事件
    ws.emit('pipe', this);

    // 返回可写流,以支持链式调用
    return ws;
}
登入後複製

这里给出官网画的一个有关 pipe 的流程图

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+
登入後複製

实现流

在本节中我们来实现具体的流,通过实现流可以进一步加深对 Stream 内部工作细节的理解。

实现可读流

上面我们都是通过 fs.createReadableStream() 方法来得到一个可读流的,在这里我们自己实现一个可读流。实现可读流只需要继承 Readable 类,然后实现 _read() 方法即可

const { Readable } = require(&#39;stream&#39;);

class IeteratorReadableStream extends Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator;
    }

    _read() {
        let data = this.iterator.next();
        // console.log(data);
        if(data.done) {
            this.push(null);
        } else {
            // 必须 push 字符串或者 Buffer
            this.push(data.value+&#39;&#39;);
        }
    }
}

module.exports = IeteratorReadableStream;
登入後複製

上述我们实现了一个可读流,可读流接收一个迭代器作为参数,这个迭代器作为这个可读流的数据源。可读流会自动的调用 _read 获取数据,在 _read 方法中我们从迭代器中获取数据,并且调用了 push 方法,该方法的作用就是将数据放入到缓存区中,只能向其中 push 字符串或者 Buffer,当我们向其中 push null 时就表示数据已经被全部读取完毕。

所以可读流的执行逻辑为,每次调用 _read 方法从数据源读取数据,并将数据存入缓存区,然后触发 data 事件,将缓存区中的数据作为参数传递给 data 事件绑定的回调函数,循环上述过程直到向缓存区 push null 时,就表示数据源中的数据已经被读取完毕,此时会触发 end 事件。

我们创建一个迭代器作为数据源传入

const IeteratorReadableStream = require(&#39;./IteratorReadableStream&#39;);

function *getData() {
    for(let i = 0; i < 5; i++) {
        yield i;
    }
}

let rs = new IeteratorReadableStream(getData());

rs.on(&#39;data&#39;, data => {
    console.log(data.toString());
});

rs.on(&#39;end&#39;, () => {
    console.log("迭代结束");
});
登入後複製

输出为

0
1
2
3
4
迭代结束
登入後複製

实现可写流

实现可写流的过程同实现可读流的过程类似,首先需要继承 Writable 类,接着实现 _write 方法即可

const fs = require(&#39;fs&#39;);
const { Writable } = require(&#39;stream&#39;);

class FileWritableStream extends Writable {
    constructor(filepath) {
        super();
        this.filepath = filepath;
    }

    _write(chunk, encoding, callback) {
        fs.appendFile(this.filepath, chunk, {
            encoding
        }, callback)
    }
}
登入後複製

上面我们实现了一个可写流,这个可写流接收一个文件路径作为参数,它的作用就是向这个文件中追加数据,每次当我们调用可写流的 write() 方法时,它会向缓冲区写入数据,当达到阈值时,便会调用 _write() 方法将数据新增到文件中。

process.stdin.pipe(new FileWritableStream(&#39;c.txt&#39;));
登入後複製

上面这行代码的作用就是将从标准输入的字符输出到 c.txt 中。

实现双工流

Duplex Stream 既可以作为可读流,也可以作为可写流,并且它的输入与输出之间可以没有关系。Duplex Stream 继承了 Readable,并且拥有 Writable 的所有,我们只要分别实现 _read()_write() 方法即可

const { Duplex } = require(&#39;stream&#39;);

class CustomDuplexStream extends Duplex {
    constructor() {
        super();
        this.currentCharCode = 65;
    }

    _read() {
        if(this.currentCharCode <= 90) {
            this.push(String.fromCharCode(this.currentCharCode++))
        } else {
            this.push(null);
        }
    }

    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    }
}
登入後複製

上面双工流的可读流部分就是将大写的 26 个字母添加进了缓存区,而可写流部分就是直接将数据输出到控制台。可见双工流可读流与可写流之间并没有任何的关系

const dp = new CustomDuplexStream();

dp.write("1");
dp.write("2");
dp.end();

dp.pipe(process.stdout);
登入後複製

输出为

1
2
ABCDEFGHIJKLMNOPQRSTUVWXYZ
登入後複製

实现转换流

Tranform Stream 是 Duplex 的特例,它也是一个双工流,不过它的输入和输出之间有关联,它的内部通过 _transform() 方法将可写流接收到的数据经过转换后传入到可读流中,所以我们要实现转换流,只需要实现 _transform() 方法即可

const { Transform } = require(&#39;stream&#39;);

class UpperTransformStream extends Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
}
登入後複製

上面我们实现了一个转换流,它可以将输入的小写字符转化为大写字符然后输出

const ts = new UpperTransformStream();
const rs = fs.createReadStream(&#39;a.txt&#39;);
rs.pipe(ts).pipe(process.stdout);
登入後複製

上面程序会读取 a.txt 中的所有字符,将字符转换为大写然后输出在控制台。

转换流在实际应用中还是比较多的,这里介绍一个 Node.js 内置的转换流 zlib,它的作用对文件进行解压缩,将文件压缩为压缩文件,或者将压缩文件解压为正常文件,这不就是一个典型的转换流嘛!

const zlib = require(&#39;zlib&#39;);
const fs = require(&#39;fs&#39;);

const args = process.argv.slice(2);

const source = fs.createReadStream(args[0]);
const target = fs.createWriteStream(args[1]);
const gzip = zlib.createGzip();

source.pipe(gzip).pipe(target);
登入後複製

我们可以通过

node gzip.js Graph.md Graph.md.gz
登入後複製

来运行上面的程序,它可以将 Graph.md 使用 gzip 压缩为 Graph.md.gz。

聊聊Node.js中如何實作Stream流(可讀、可寫、雙工和轉換流)

文件大小从 201KB 压缩到了 51KB

同样的我们也可以通过 zlib.createGunzip() 来创建一个解压缩的转换流,具体细节同压缩文件相同,不做介绍。

本篇文章到此结束,想必到这里已经对 Stream 的使用已经有所了解了,但是本篇文章并没有列举所有有关 Stream 的 API,如果想更加详细的了解 Stream 的 API,那么接下去就可以阅读官方文档(http://nodejs.cn/api/stream.html)了。

更多node相关知识,请访问:nodejs 教程!!

以上是聊聊Node.js中如何實作Stream流(可讀、可寫、雙工和轉換流)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

圖文詳解Node V8引擎的記憶體和GC 圖文詳解Node V8引擎的記憶體和GC Mar 29, 2023 pm 06:02 PM

這篇文章帶大家深入了解NodeJS V8引擎的記憶體和垃圾回收器(GC),希望對大家有幫助!

一文聊聊Node中的記憶體控制 一文聊聊Node中的記憶體控制 Apr 26, 2023 pm 05:37 PM

基於無阻塞、事件驅動建立的Node服務,具有記憶體消耗低的優點,非常適合處理海量的網路請求。在海量請求的前提下,就需要考慮「記憶體控制」的相關問題了。 1. V8的垃圾回收機制與記憶體限制 Js由垃圾回收機

聊聊如何選擇一個最好的Node.js Docker映像? 聊聊如何選擇一個最好的Node.js Docker映像? Dec 13, 2022 pm 08:00 PM

選擇一個Node的Docker映像看起來像是小事,但是映像的大小和潛在漏洞可能會對你的CI/CD流程和安全造成重大的影響。那我們要如何選擇一個最好Node.js Docker映像呢?

Node.js 19正式發布,聊聊它的 6 大功能! Node.js 19正式發布,聊聊它的 6 大功能! Nov 16, 2022 pm 08:34 PM

Node 19已正式發布,以下這篇文章就來帶大家詳解了解Node.js 19的 6 大特性,希望對大家有幫助!

深入聊聊Node中的File模組 深入聊聊Node中的File模組 Apr 24, 2023 pm 05:49 PM

文件模組是對底層文件操作的封裝,例如文件讀寫/打開關閉/刪除添加等等文件模組最大的特點就是所有的方法都提供的**同步**和**異步**兩個版本,具有sync 字尾的方法都是同步方法,沒有的都是異

一起聊聊Node中的事件循環 一起聊聊Node中的事件循環 Apr 11, 2023 pm 07:08 PM

事件循環是 Node.js 的基本組成部分,透過確保主執行緒不被阻塞來實現非同步編程,了解事件循環對建立高效應用程式至關重要。以下這篇文章就來帶大家深入了解Node中的事件循環 ,希望對大家有幫助!

聊聊Node.js中的 GC (垃圾回收)機制 聊聊Node.js中的 GC (垃圾回收)機制 Nov 29, 2022 pm 08:44 PM

Node.js 是如何做 GC (垃圾回收)的?下面這篇文章就來帶大家了解一下。

聊聊用pkg將Node.js專案打包為執行檔的方法 聊聊用pkg將Node.js專案打包為執行檔的方法 Dec 02, 2022 pm 09:06 PM

如何用pkg打包nodejs可執行檔?以下這篇文章跟大家介紹一下使用pkg將Node專案打包為執行檔的方法,希望對大家有幫助!

See all articles