首頁 web前端 js教程 Node.js中的串流(Stream)介紹_node.js

Node.js中的串流(Stream)介紹_node.js

May 16, 2016 pm 04:07 PM
node.js stream

什麼是流?

說到流,就涉及到一個*nix的概念:管道——在*nix中,流在Shell中被實現為可以通過|(管道符) 進行橋接的數據,一個進程的輸出(stdout)可直接作為下一個進程的輸入(stdin)。

在Node中,流(Stream)的概念與之類似,代表一種資料流可供橋接的能力。

pipe

流化的精髓在於 .pipe()方法。可供橋接的能力,在於資料流的兩端(上游/下游 或稱為 讀/寫流)以一個 .pipe()方法進行橋接。

偽代碼的表現形式為:

複製程式碼 程式碼如下:

//上游.pipe(下游)
Readable.pipe(Writable);

流的分類

這裡並不打算討論所謂的Node  v0.4 之前的「經典」流。那麼,流分為這麼幾類(皆為抽象介面:

1.stream.Readable    可讀流(需要實作_read方法,重點在於資料流讀取的細部
2.stream.Writable     可寫流(需要實作_write方法,重點在於資料流寫入的細節
3.stream.Duplex        可讀/寫流(上兩個接口,關注點以上為兩個接口的細部
4.stream.Transform  繼承自Duplex(需要實作_transform方法,重點在於資料區塊的處理

簡單來說:

1).pipe() 的擁有者一定具備Readable 流(並不局限於)能力,它擁有'readable'/'data'/'end'/'close'/'error' 一系列事件可供訂閱,也提供.read()/.pause()/.resume()等一系列方法供呼叫;
2).pipe() 的參數一定具備Writable 流(並不局限於)能力,它擁有'drain'/'pipe'/'unpipe'/'error'/'finish' 事件可供訪問,也提供.write ()/.end() 等一系列方法供呼叫

什麼鬼

有沒有一絲絲焦慮?別急,做為一個說人話的低級碼工,我會把Stream掰開了和您扯一扯的。

Stream類,在 Node.js的源碼 裡,是這麼定義的:

複製程式碼 程式碼如下:

var EE = require('events').EventEmitter;
var util = require('util');
util.inherits(Stream, EE);
 
function Stream() {
  EE.call(this);
}

可以看出,本質上,Stream是一個EventEmitter,那意味著它具備事件驅動的功能(.emit/.on...)。眾所周知,“Node.js 就是基於V8的事件驅動平台”,實現了事件驅動的流式編程,具備了和Node一樣的異步回調的特徵。

例如在Readable 流中,有一個readable 事件,在一個暫停的唯讀流中,只要有資料塊準備好可讀時,它就會被發送給訂閱者(Readable 流有哪些呢?express中的req,ftp或mutli-form上傳元件的req.part,系統中的標準輸入process.stdin等)。有了readable 事件,我們可以做個處理shell 指令輸出的分析器之類的工具:

複製程式碼 程式碼如下:

process.stdin.on('readable', function(){
   var buf = process.stdin.read();
   if(buf){
      var data = buf.toString();
      // parsing data ...                                }
});

這樣呼叫:

複製程式碼 程式碼如下:

head -10 some.txt | node parser.js

對於 Readable 流,我們還可以訂閱它的 data 和 end 事件,以獲取資料塊並在流枯竭時獲得通知,如 經典socket示例 中那樣:

複製程式碼 程式碼如下:

req.on('connect', function(res, socket, head) {
    socket.on('data', function(chunk) {
      console.log(chunk.toString());
    });
    socket.on('end', function() {
      proxy.close();
    });
  });

Readable流狀態的切換
要注意的是,Readable 流有兩種狀態:flowing mode(激流) 和 pause  mode(暫停)。前者根本停不下來,誰被pipe上了就馬上不停的給;後者會暫停,直到下游顯式的呼叫 Stream.read() 請求才讀取資料塊。 Readable 流初始化時是 pause mode的。

這兩種狀態可以互為切換的,其中,

有以下任一行為,pause 轉 flowing:

1.對 Readable 流新增一個data事件訂閱
2.對 Readable 呼叫 .resume() 明確開啟flowing
3.呼叫 Readable 流的 .pipe(writable) ,橋接到一個 Writable 流上

有以下任一行為,flowing 轉回 pause:

1.Readable 流還沒有 pipe 到任何流上,可調 .pause() 暫停
2.Readable 流已經 pipe 到了流上,需 remove 掉所有 data 事件訂閱,並且調用 .unpipe()方法逐一解除與下游流的關係

妙用

結合流的非同步特性,我可以寫出這樣的應用:直接將 用戶A 的輸出橋接到 用戶B 的頁面上輸出:

複製程式碼 程式碼如下:

router.post('/post', function(req, res) {
    var destination = req.headers['destination']; //發給誰
    cache[destionation] = req;
    //是的,不返回,所以最好是個ajax請求
});

用戶B請求的時候:

複製程式碼 程式碼如下:

router.get('/inbox', function(req, res){
    var user = req.headers['user'];
    cache.find(user, function(err, previousReq){ //找到先前存的req
       var form = new multiparty.Form();
       form.parse(previousReq);  // 有文件給我
       form.on('part', function (part) {
            part.pipe(res); //流式大法佳:)
 
            part.on('error', function (err) {
                console.log(err);
                messaging.setRequestDone(uniqueID);
                return res.end(err);
            });
        });
    });
});

參考

how to write node programs with streams: stream-handbook

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

圖文詳解Node V8引擎的記憶體和GC 圖文詳解Node V8引擎的記憶體和GC Mar 29, 2023 pm 06:02 PM

這篇文章帶大家深入了解NodeJS V8引擎的記憶體和垃圾回收器(GC),希望對大家有幫助!

一文聊聊Node中的記憶體控制 一文聊聊Node中的記憶體控制 Apr 26, 2023 pm 05:37 PM

基於無阻塞、事件驅動建立的Node服務,具有記憶體消耗低的優點,非常適合處理海量的網路請求。在海量請求的前提下,就需要考慮「記憶體控制」的相關問題了。 1. V8的垃圾回收機制與記憶體限制 Js由垃圾回收機

深入聊聊Node中的File模組 深入聊聊Node中的File模組 Apr 24, 2023 pm 05:49 PM

文件模組是對底層文件操作的封裝,例如文件讀寫/打開關閉/刪除添加等等文件模組最大的特點就是所有的方法都提供的**同步**和**異步**兩個版本,具有sync 字尾的方法都是同步方法,沒有的都是異

一起聊聊Node中的事件循環 一起聊聊Node中的事件循環 Apr 11, 2023 pm 07:08 PM

事件循環是 Node.js 的基本組成部分,透過確保主執行緒不被阻塞來實現非同步編程,了解事件循環對建立高效應用程式至關重要。以下這篇文章就來帶大家深入了解Node中的事件循環 ,希望對大家有幫助!

深入了解Node中的Buffer 深入了解Node中的Buffer Apr 25, 2023 pm 07:49 PM

一開始的時候 JS 只在瀏覽器端運行,對於 Unicode 編碼的字串容易處理,但對於二進位和非 Unicode 編碼的字串處理困難。並且二進制是電腦最底層的資料格式,視訊/音訊/程式/網路包

node無法用npm指令怎麼辦 node無法用npm指令怎麼辦 Feb 08, 2023 am 10:09 AM

node無法用npm指令是因為沒有正確配置環境變量,其解決方法是:1、開啟“系統屬性”;2、找到“環境變數”->“系統變數”,然後編輯環境變數;3、找到nodejs所在的資料夾;4、點選「確定」即可。

IntelliJ IDEA中如何調試Java Stream操作 IntelliJ IDEA中如何調試Java Stream操作 May 09, 2023 am 11:25 AM

Stream作業是Java8推出的一大亮點!雖然java.util.stream很強大,但還是有很多開發者在實際工作中很少使用,其中吐槽最多的一個原因就是不好調試,一開始確實是這樣,因為stream這樣的流式操作在DEBUG的時候,是一行程式碼,直接下一步的時候,其實一下就過了好多操作,這樣我們就很難判斷到底是裡面的哪一行出了問題。外掛:JavaStreamDebugger如果你用的IDEA版本比較新的話,這個插件已經是自備的了,就不需要安裝了。如果還沒安裝的話,就手動安裝一下,然後繼續下面

java8的stream怎麼取max java8的stream怎麼取max May 14, 2023 pm 03:43 PM

java8的stream取maxpublicstaticvoidmain(String[]args){Listlist=Arrays.asList(1,2,3,4,5,6);Integermax=list.stream().max((a,b)->{if (a>b){return1;}elsereturn-1;}).get();System.out.println(max);}注意點:這裡判斷大小是透過正負數和0值。而不是直接寫成if(a>b){returna;}elseretur

See all articles