什麼是流?
說到流,就涉及到一個*nix的概念:管道——在*nix中,流在Shell中被實現為可以通過|(管道符) 進行橋接的數據,一個進程的輸出(stdout)可直接作為下一個進程的輸入(stdin)。
在Node中,流(Stream)的概念與之類似,代表一種資料流可供橋接的能力。
pipe
流化的精髓在於 .pipe()方法。可供橋接的能力,在於資料流的兩端(上游/下游 或稱為 讀/寫流)以一個 .pipe()方法進行橋接。
偽代碼的表現形式為:
流的分類
這裡並不打算討論所謂的Node v0.4 之前的「經典」流。那麼,流分為這麼幾類(皆為抽象介面:
1.stream.Readable 可讀流(需要實作_read方法,重點在於資料流讀取的細部
2.stream.Writable 可寫流(需要實作_write方法,重點在於資料流寫入的細節
3.stream.Duplex 可讀/寫流(上兩個接口,關注點以上為兩個接口的細部
4.stream.Transform 繼承自Duplex(需要實作_transform方法,重點在於資料區塊的處理
簡單來說:
1).pipe() 的擁有者一定具備Readable 流(並不局限於)能力,它擁有'readable'/'data'/'end'/'close'/'error' 一系列事件可供訂閱,也提供.read()/.pause()/.resume()等一系列方法供呼叫;
2).pipe() 的參數一定具備Writable 流(並不局限於)能力,它擁有'drain'/'pipe'/'unpipe'/'error'/'finish' 事件可供訪問,也提供.write ()/.end() 等一系列方法供呼叫
什麼鬼
有沒有一絲絲焦慮?別急,做為一個說人話的低級碼工,我會把Stream掰開了和您扯一扯的。
Stream類,在 Node.js的源碼 裡,是這麼定義的:
可以看出,本質上,Stream是一個EventEmitter,那意味著它具備事件驅動的功能(.emit/.on...)。眾所周知,“Node.js 就是基於V8的事件驅動平台”,實現了事件驅動的流式編程,具備了和Node一樣的異步回調的特徵。
例如在Readable 流中,有一個readable 事件,在一個暫停的唯讀流中,只要有資料塊準備好可讀時,它就會被發送給訂閱者(Readable 流有哪些呢?express中的req,ftp或mutli-form上傳元件的req.part,系統中的標準輸入process.stdin等)。有了readable 事件,我們可以做個處理shell 指令輸出的分析器之類的工具:
這樣呼叫:
對於 Readable 流,我們還可以訂閱它的 data 和 end 事件,以獲取資料塊並在流枯竭時獲得通知,如 經典socket示例 中那樣:
Readable流狀態的切換
要注意的是,Readable 流有兩種狀態:flowing mode(激流) 和 pause mode(暫停)。前者根本停不下來,誰被pipe上了就馬上不停的給;後者會暫停,直到下游顯式的呼叫 Stream.read() 請求才讀取資料塊。 Readable 流初始化時是 pause mode的。
這兩種狀態可以互為切換的,其中,
有以下任一行為,pause 轉 flowing:
1.對 Readable 流新增一個data事件訂閱
2.對 Readable 呼叫 .resume() 明確開啟flowing
3.呼叫 Readable 流的 .pipe(writable) ,橋接到一個 Writable 流上
有以下任一行為,flowing 轉回 pause:
1.Readable 流還沒有 pipe 到任何流上,可調 .pause() 暫停
2.Readable 流已經 pipe 到了流上,需 remove 掉所有 data 事件訂閱,並且調用 .unpipe()方法逐一解除與下游流的關係
妙用
結合流的非同步特性,我可以寫出這樣的應用:直接將 用戶A 的輸出橋接到 用戶B 的頁面上輸出:
用戶B請求的時候:
參考
how to write node programs with streams: stream-handbook