目錄
自訂可讀流
read(size)
数据会不会漏掉
首頁 web前端 js教程 Node.js中Stream-可讀流的使用

Node.js中Stream-可讀流的使用

Jun 17, 2020 am 09:26 AM
node

可讀流是生產資料用來供程式消費的流。常見的資料生產方式有讀取磁碟檔案、讀取網路請求內容等,看一下前面介紹什麼是流用的例子:

const rs = fs.createReadStream(filePath);
登入後複製

rs 就是一個可讀流,其生產資料的方式是讀取磁碟的文件,控制台process.stdin 也是一個可讀流:

process.stdin.pipe(process.stdout);
登入後複製

透過簡單的一句話可以把控制台的輸入列印出來,process.stdin 生產資料的方式是讀取使用者在控制台的輸入。

回頭再看一下可讀流的定義:

可讀流是生產資料用來供程式消費的流。

自訂可讀流


除了系統提供的

# fs.CreateReadStream

使用過gulp 或vinyl -fs 提供的src 方法時候也在使用可讀流

gulp.src(['*.js', 'dist/**/*.scss'])
登入後複製

如果希望自己以某種特定的方式生產數據,交給程式消費,那麼改如何開始呢?

簡單兩步驟即可

  1. 繼承sream 模組的

    #Readable

    # 類別
  2. 重寫

    _read

    方法,呼叫##this.push

    將生產的資料放入待讀取佇列
Readable 類別已經把可讀流要做的大部分工作完成,只需要繼承它,然後把生產資料的方式寫在_read 方法裡就可以實作一個自訂的可讀流。

舉個例子:實現一個每100 毫秒生產一個隨機數的流(沒什麼用處)

const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
    constructor(max) {
        super()
    }
    _read() {
        const ctx = this;
        setTimeout(() => {
            const randomNumber = parseInt(Math.random() * 10000);
            // 只能 push 字符串或 Buffer,为了方便显示打一个回车
            ctx.push(`${randomNumber}\n`);
        }, 100);
    }
}
module.exports = RandomNumberStream;
登入後複製
類別繼承部分程式碼很簡單,主要看一下_read 方法的實現,有幾個值得注意的地方

    Readable 類別中預設有_read 方法的實現,不過什麼都沒有做,我們做的是覆蓋重寫
  1. _read 方法有一個參數size ,用來向read 方法指定應該讀取多少數據返回,不過只是一個參考資料,很多實現忽略此參數,我們這裡也忽略了,後面會詳細提到
  2. 透過this.push 向緩衝區推送數據,緩衝區概念後面會提到,暫時理解為擠到了水管中可消費了
  3. push 的內容只能是字串或Buffer,不能是數字
  4. push 方法有第二個參數encoding,用於第一個參數是字串時指定encoding
執行一下看看效果

const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream();
rns.pipe(process.stdout);
登入後複製
這樣可以看到數字源源不斷的顯示到了控制台上,實作了一個產生隨機數的可讀流,還有幾個小問題待解決

如何停下來

每隔100 毫秒向緩衝區推送一個數字,那麼就像是讀取一個本地文件總有讀完的時候,如何停下來識別資料讀取完畢?

向緩衝區push 一個null 就可以,修改一下代碼,允許消費者定義需要多少個隨機數字:

const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
    constructor(max) {
        super()
        this.max = max;
    }
    _read() {
        const ctx = this;
        setTimeout(() => {
            if (ctx.max) {
                const randomNumber = parseInt(Math.random() * 10000);
                // 只能 push 字符串或 Buffer,为了方便显示打一个回车
                ctx.push(`${randomNumber}\n`);
                ctx.max -= 1;
            } else {
                ctx.push(null);
            }
        }, 100);
    }
}
module.exports = RandomNumberStream;
登入後複製
代碼中使用了一個max 的標識,允許消費者指定需要的字元數,在實例化的時候指定即可

const RandomNumberStream = require('./');
const rns = new RandomNumberStream(5);
rns.pipe(process.stdout);
登入後複製
這樣可以看到控制台只列印了5 個字元

為什麼是setTimeout 而不是setInterval

細心的同學可能注意到,每隔100 毫秒生產一個隨機數並不是調用的setInterval,而是使用的setTimeout,為什麼僅僅是延時了一下並沒有重複生產,結果卻是正確的呢?

這就需要了解流的兩種工作方式

    流動模式:資料由底層系統讀出,並儘可能快地提供給應用程式
  1. #暫停模式:必須顯示地呼叫read() 方法來讀取若干資料塊
流在預設狀態下是處於暫停模式的,也就是需要程式明確的呼叫read() 方法,但上面例子中並沒有調用就可以得到數據,因為流透過pipe() 方法切換成了流動模式,這樣_read() 方法會自動被重複調用,直到數據讀取完畢,所以每次_read()方法裡面只需要讀取一次資料即可

流動模式和暫停模式切換

流從預設的暫停模式切換到流動模式可以使用以下幾種方式:

    透過新增data 事件監聽器來啟動資料監聽
  1. 呼叫resume() 方法啟動資料流
  2. 呼叫pipe() 方法將資料轉接到另一個可寫流
從流動模式切換為暫停模式又兩種方法:

    當流沒有pipe() 時,呼叫pause() 方法可以將流暫停
  1. pipe() 時移除所有data 事件的監聽,再呼叫unpipe() 方法
data 事件

使用了pipe() 方法後資料就從可讀流進入了可寫流,但對使用者好像是個黑盒,資料究竟是怎麼流向的呢?切換流動模式和暫停模式的時候有兩個重要的名詞

    流動模式對應的data 事件
  1. 暫停模式對應的read() 方法
這兩個機制是程式能夠驅動資料流的原因,先來看流動模式data 事件,一旦監聽了可讀流的data 事件,流就進入了流動模式,可以改寫一下上面呼叫流的程式碼

const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream(5);
rns.on('data', chunk => {
  console.log(chunk);
});
登入後複製

这样可以看到控制台打印出了类似下面的结果

<Buffer 39 35 37 0a>
<Buffer 31 30 35 37 0a>
<Buffer 38 35 31 30 0a>
<Buffer 33 30 35 35 0a>
<Buffer 34 36 34 32 0a>
登入後複製

当可读流生产出可供消费的数据后就会触发 data 事件,data 事件监听器绑定后,数据会被尽可能地传递。data 事件的监听器可以在第一个参数收到可读流传递过来的 Buffer 数据,这也就是控制台打印的 chunk,如果想显示为数字,可以调用 Buffer 的 toString() 方法

当数据处理完成后还会触发一个

end

事件,因为流的处理不是同步调用,所以如果希望完事后做一些事情就需要监听这个事件,在代码最后追加一句:

rns.on('end', () => {
  console.log('done');
});复制代码
登入後複製

这样可以在数据接收完了显示 done ,当然数据处理过程中出现了错误会触发 error 事件,可以监听做异常处理:

rns.on('error', (err) => {
  console.log(err);
});复制代码
登入後複製

read(size)

流在暂停模式下需要程序显式调用 read() 方法才能得到数据,read() 方法会从内部缓冲区中拉取并返回若干数据,当没有更多可用数据时,会返回null

使用 read() 方法读取数据时,如果传入了 size 参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据

现在有一个矛盾,在流动模式下流生产出了数据,然后触发 data 事件通知给程序,这样很方便。在暂停模式下需要程序去读取,那么就有一种可能是读取的时候还没生产好,如果使用轮询的方式未免效率有些低

NodeJS 提供了一个

readable的事件,事件在可读流准备好数据的时候触发,也就是先监听这个事件,收到通知有数据了再去读取就好了:

const rns = new RandomNumberStream(5);
rns.on('readable', () => {
  let chunk;
  while((chunk = rns.read()) !== null){
    console.log(chunk);
  }
});
登入後複製

这样可以读取到数据,值得注意的一点是并不是每次调用 read() 方法都可以返回数据,前面提到了如果可用的数据没有达到 size 那么返回 null,所以在程序中加了个判断

数据会不会漏掉

const stream = fs.createReadStream('/dev/input/event0');
stream.on('readable', callback);复制代码
登入後複製

在流动模式会不会有这样的问题:可读流在创建好的时候就生产数据了,如果在绑定 readable 事件之前就生产了某些数据,触发了 readable 事件,在极端情况下会造成数据丢失吗?

事实并不会,按照 NodeJS event loop 程序创建流和调用事件监听在一个事件队列里面,生产数据和事件监听都是异步操作,而 on 监听事件使用了 process.nextTick 会保证在数据生产之前被绑定好,相关知识可以看定时器章节中对 event loop 的解读

到这里可能对 data事件、readable事件触发时机, read() 方法每次读多少数据,什么时候返回 null 还有一定的疑问,在后续可写流章节会在 back pressure 部分结合源码介绍相关机制

推荐教程:《JS教程

以上是Node.js中Stream-可讀流的使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
威爾R.E.P.O.有交叉遊戲嗎?
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

nvm 怎麼刪除node nvm 怎麼刪除node Dec 29, 2022 am 10:07 AM

nvm刪除node的方法:1、下載「nvm-setup.zip」並將其安裝在C碟;2、設定環境變量,並透過「nvm -v」指令查看版本號;3、使用「nvm install」指令安裝node;4、透過「nvm uninstall」指令刪除已安裝的node即可。

node專案中如何使用express來處理檔案的上傳 node專案中如何使用express來處理檔案的上傳 Mar 28, 2023 pm 07:28 PM

怎麼處理文件上傳?以下這篇文章為大家介紹一下node專案中如何使用express來處理文件的上傳,希望對大家有幫助!

Node服務怎麼進行Docker鏡像化?極致優化詳解 Node服務怎麼進行Docker鏡像化?極致優化詳解 Oct 19, 2022 pm 07:38 PM

這段時間在開發一個騰訊文檔全品類通用的HTML 動態服務,為了方便各品類接入的生成與部署,也順應上雲的趨勢,考慮使用Docker 的方式來固定服務內容,統一進行製品版本的管理。這篇文章就將我在服務 Docker 化的過程中累積起來的優化經驗分享出來,供大家參考。

深入淺析Node的進程管理工具'pm2” 深入淺析Node的進程管理工具'pm2” Apr 03, 2023 pm 06:02 PM

這篇文章跟大家分享Node的進程管理工具“pm2”,聊聊為什麼需要pm2、安裝和使用pm2的方法,希望對大家有幫助!

Pi Node教學:什麼是Pi節點?如何安裝和設定Pi Node? Pi Node教學:什麼是Pi節點?如何安裝和設定Pi Node? Mar 05, 2025 pm 05:57 PM

PiNetwork節點詳解及安裝指南本文將詳細介紹PiNetwork生態系統中的關鍵角色——Pi節點,並提供安裝和配置的完整步驟。 Pi節點在PiNetwork區塊鏈測試網推出後,成為眾多先鋒積極參與測試的重要環節,為即將到來的主網發布做準備。如果您還不了解PiNetwork,請參考Pi幣是什麼?上市價格多少? Pi用途、挖礦及安全性分析。什麼是PiNetwork? PiNetwork項目始於2019年,擁有其專屬加密貨幣Pi幣。該項目旨在創建一個人人可參與

聊聊用pkg將Node.js專案打包為執行檔的方法 聊聊用pkg將Node.js專案打包為執行檔的方法 Dec 02, 2022 pm 09:06 PM

如何用pkg打包nodejs可執行檔?以下這篇文章跟大家介紹一下使用pkg將Node專案打包為執行檔的方法,希望對大家有幫助!

npm node gyp失敗怎麼辦 npm node gyp失敗怎麼辦 Dec 29, 2022 pm 02:42 PM

npm node gyp失敗是因為“node-gyp.js”跟“Node.js”版本不匹配,其解決辦法:1、透過“npm cache clean -f”清除node快取;2、透過“npm install -g n”安裝n模組;3、透過「n v12.21.0」指令安裝「node v12.21.0」版本即可。

使用Angular和Node進行基於令牌的身份驗證 使用Angular和Node進行基於令牌的身份驗證 Sep 01, 2023 pm 02:01 PM

身份驗證是任何網路應用程式中最重要的部分之一。本教程討論基於令牌的身份驗證系統以及它們與傳統登入系統的差異。在本教程結束時,您將看到一個用Angular和Node.js編寫的完整工作演示。傳統身份驗證系統在繼續基於令牌的身份驗證系統之前,讓我們先來看看傳統的身份驗證系統。使用者在登入表單中提供使用者名稱和密碼,然後點擊登入。發出請求後,透過查詢資料庫在後端驗證使用者。如果請求有效,則使用從資料庫中獲取的使用者資訊建立會話,然後在回應頭中傳回會話訊息,以便將會話ID儲存在瀏覽器中。提供用於存取應用程式中受

See all articles