首頁 > web前端 > js教程 > 深入淺析Node中的Stream(流)

深入淺析Node中的Stream(流)

青灯夜游
發布: 2023-01-29 19:46:30
轉載
2861 人瀏覽過

什麼是流?如何理解流?以下這篇文章就帶大家深入了解Nodejs中的串流(Stream),希望對大家有幫助!

深入淺析Node中的Stream(流)

stream 是一個抽象的資料接口,它繼承了EventEmitter,它能夠發送/接受數據,本質就是讓資料流動起來,如下圖:深入淺析Node中的Stream(流)

流不是Node 中獨有的概念,是作業系統最基本的操作方式,在Linux 中| 是Stream,只是Node 層面對其做了封裝,提供了對應的API

為啥要一點一點?

先用下面的程式碼建立一個文件,大概在400MB 左右【相關教學推薦:nodejs影片教學

Untitled 1.png

當我們使用readFile 去讀取的時候,如下程式碼

Untitled 2.png

正常啟動服務時,佔用10MB 左右的記憶體

Untitled 3.png

使用curl http://127.0.0.1:8000發起請求時,記憶體變成了420MB 左右,和我們建立的檔案大小差不多

Untitled 4.png

#改為使用使用stream 的寫法,程式碼如下

Untitled 5.png

再次發起請求時,發現記憶體只佔用了35MB 左右,相較於readFile 大幅減少

Untitled 6.png

如果我們不採用流的模式,等待大檔案載入完成在操作,會有以下的問題:

  • 記憶體暫用過多,導致系統崩潰
  • CPU 運算速度有限制,且服務於多個程序,大文件加載過大且時間久

總結來說就是,一次性讀取大文件,內存和網路都吃不消

如何才能一點一點?

我們讀取檔案的時候,可以採用讀取完成之後在輸出資料

Untitled 7.png

上述說到stream 繼承了EventEmitter 可以是實現監聽數據。首先將讀取資料改為串流讀取,使用on("data", ()⇒{}) 接收數據,最後透過on("end", ()⇒{} ) 最後的結果

Untitled 8.png

有資料傳遞過來的時候就會觸發data 事件,接收這段資料做處理,最後等待所有的資料全部傳遞完成之後觸發end 事件。

資料的流轉過程

資料從哪裡來—source

資料是從一個地方流向另一個地方,先看看數據的來源。

  • http 請求,請求介面來的資料

    Untitled 9.png

  • #console 控制台,標準輸入stdin

    Untitled 10.png

  • file 文件,讀取文件內容,例如上面的範例

##連接的管道— pipe

在source 和dest 中有一個連接的管道pipe,基本語法為

source.pipe(dest) ,source 和dest 透過pipe 連接,讓資料從source 流向dest

我們不需要向上面的程式碼那樣手動監聽data/end 事件.

pipe 使用時有嚴格的要求,source 必須是可讀流,dest 必須是可寫流

??? 流動的資料到底是什麼?程式碼中的 chunk 是什麼?

到哪裡去—dest

stream 常見的三種輸出方式

  • console 控制台,標準輸出stdout

    Untitled 11.png

  • http 請求,介面請求中的response

    Untitled 12.png

  • #file 文件,寫入檔案

    Untitled 13.png

流的種類

Untitled 14.png

#可讀流Readable Streams

可讀流是對提供資料的來源(source)的抽象

所有的Readable 都實作了stream.Readable 類別定義的介面

Untitled 15.png

? 讀取檔案流建立

fs.createReadStream 建立一個Readable 物件

Untitled 16.png

## 讀取模式

  • ##可讀流有兩種模式,
流動模式(flowing mode)

Untitled 17.png暫停模式(pause mode)

,這個決定了chunk 資料的流動方式:自動流動和手工流動

在ReadableStream 中有一個_readableState 屬性,在其中有一個flowing 的屬性來判斷流的模式,他有三種狀態值:
ture:表示為流動模式

false:表示為暫停模式

    null:初始狀態

  • #可以使用熱水器模型來模擬資料的流動。熱水器水箱(buffer 緩存區)儲存著熱水(需要的資料),當我們打開水龍頭的時候,熱水就會從水箱中不斷流出來,自來水也會不斷的流入水箱,這就是流動模式。當我們關閉水龍頭時,水箱會暫停進水,水龍頭會暫停出水,這就是暫停模式。

  • 流動模式
  • 資料會自動地從底層讀取,形成流動現象,並透過事件提供給應用程式。

    Untitled 18.png

  • 監聽data 事件即可進入該模式
當 data 事件被加入後,可寫流中有資料後會將資料推到該事件回呼函數中,需要自己去消費資料塊,如果不處理則該資料會遺失
呼叫 stream.pipe 方法將資料傳送至Writeable

  • 呼叫stream. resume 方法

    Untitled 19.png

暫停模式
  • 資料會堆積在內部緩衝器中,必須明確調用stream.read() 讀取資料塊

  • 監聽readable 事件 可寫流在資料準備好後會觸發此事件回調,此時需要在回呼函數中使用 stream.read() 來主動消費資料。 readable 事件顯示流有新的動態:要麼有新的數據,要麼流已經讀取所有資料

    Untitled 20.png

  • 兩種模式之間如何進行轉換呢

可讀流在建立完成之後處於初始狀態  //TODO:和網路上的分享不一致

暫停模式切換到流動模式

- 监听 data 事件
- 调用 stream.resume 方法
- 调用 stream.pipe 方法将数据发送到 Writable
登入後複製

Untitled 21.png

流動模式切換到暫停模式

- 移除 data 事件
- 调用 stream.pause 方法
- 调用 stream.unpipe 移除管道目标
登入後複製
Untitled 22.png

Untitled 23.png實作原理

  • 建立可讀流的時候,需要繼承Readable 對象,並且實作_read 方法

    ####建立一個自訂可讀流##### ##########當我們呼叫read 方法時,整體的流程如下:###############doRead######流中維護了一個緩存,當呼叫read 方法的時候來判斷是否需要向底層請求資料###

    當快取區長度為0或小於highWaterMark 這個值得時候就會呼叫_read 去底層取得資料原始碼連結

    Untitled 24.png

Untitled 25.png

可寫流Writeable Stream

可寫流是對資料寫入目的地的一種抽象,是用來消費上游流過來的數據,透過可寫流把資料寫入設備,常見的寫入流就是本機磁碟的寫入

  • #可寫入流的特性Untitled 26.png

  • 透過write 寫入資料

    Untitled 27.pngUntitled 28.png

  • #透過end 寫資料並且關閉流,end = write close

    Untitled 29.png

    #當寫入資料達到highWaterMark 的大小時,會觸發drain 事件

呼叫ws.write( chunk) 傳回false,表示目前緩衝區資料大於或等於highWaterMark 的值,就會觸發drain 事件。其實是起到一個警示作用,我們依舊可以寫入數據,只是未處理的數據會一直積壓在可寫流的內部緩衝區

中,直到積壓沾滿Node.js 緩衝區後,才會被強行中斷

自訂可寫流Untitled 30.png

    所有的Writeable 都實作了stream.Writeable 類別定義的介面
  • #只需要實作_write 方法就能夠將資料寫入底層
  • #透過呼叫呼叫writable.write 方法將資料寫入流中,會調用_write 方法將資料寫入底層
  • 當_write 資料成功後,需要呼叫next 方法去處理下一個資料

必須呼叫writable.end(data)來結束可寫流,data 是可選的。此後,不能再呼叫write 新增數據,否則會報錯

在end 方法呼叫後,當所有底層的寫入操作均完成時,會觸發finish 事件

雙工流Duplex StreamUntitled 31.png

雙工流,既可讀,也可寫入。實際上繼承了Readable 和Writable 的一種流,那它既可以當做可讀流來用又可以當做可寫流來用

自訂的雙工流需要實作Readable 的_read 方法和Writable 的_write 方法Untitled 32.png

net 模組可以用來建立socket,socket 在NodeJS 中是一個典型的Duplex,看一個TCP 用戶端的範例

#client 是一個Duplex,可寫流用於向伺服器發送訊息,可讀流用於接受伺服器訊息,兩個流內的資料並沒有直接的關係

轉換流Transform StreamUntitled 33.png

上述的例子中,可讀流中的資料(0/1)和可寫流中的資料('F','B','B')是隔離的,兩者並沒有產生關係,但對於Transform 來說在可寫端寫入的資料經過變換後會自動添加到可讀端。

Transform 繼承於Duplex,並且已經實作了_write 和_read 方法,只需要實作_tranform 方法可以Untitled 34.png

gulp 基於Stream 的自動化建置工具,看一段官網的範例程式碼

###less → less 轉為css → 執行css 壓縮→ 壓縮後的css#######其實less() 和minifyCss() 都是對輸入的資料做了一些處理,然後交給了輸出資料#########Duplex 和Transform 的選擇###

和上面的範例對比起來,我們發現一個串流同時面向生產者和消費者服務的時候我們會選擇Duplex,當只是對資料做一些轉換工作的時候我們便會選擇使用Tranform

背壓問題

什麼是背壓

背壓問題來自於生產者消費者模式中,消費者處理速度過慢

比如說,我們下載過程,處理速度為3Mb/s,而壓縮過程,處理速度為1Mb/s,這樣的話,很快緩衝區隊列就會形成堆積

要麼導致整個過程記憶體消耗增加,要麼導致整個緩衝區慢,部分資料遺失

Untitled 35.png

什麼是背壓處理

#背壓處理可以理解為一個向上」喊話」的過程

當壓縮處理發現自己的緩衝區資料擠壓超過閾值的時候,就對下載處理“喊話”,我忙不過來了,不要再發了

下載處理收到訊息就暫停向下發送資料

Untitled 36.png

如何處理背壓

#我們有不同的函數將資料從一個進程傳入另一個進程。在Node.js 中,有一個內建函數稱為 .pipe(),同樣地最終,在這個進程的基本層面上我們有二個互不相關的元件:資料的_源頭_,和_消費者_

當 .pipe() 被來源呼叫之後,它通知消費者有資料需要傳輸。管道函數為事件觸發建立了合適的積壓封裝

在資料快取超出了highWaterMark 或寫入的列隊處於繁忙狀態,.write() 會返回 false

#當 false 返回之後,積壓系統介入了。它將暫停從任何發送資料的資料流中進入的 Readable。一旦資料流清空了,drain 事件將被觸發,消耗進來的資料流

一旦佇列全部處理完畢,積壓機制將允許資料再次發送。在使用中的記憶體空間將自我釋放,同時準備接收下一次的批次資料

Untitled 37.png

我們可以看到pipe 的背壓處理:

  • #將資料依照chunk劃分,寫入
  • 當chunk過大,或是佇列忙碌時,暫停讀取
  • 當佇列為空時,繼續讀取資料

更多node相關知識,請造訪:nodejs 教學

以上是深入淺析Node中的Stream(流)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:juejin.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板