首頁 > web前端 > js教程 > 如何利用node做出可讀流動模式

如何利用node做出可讀流動模式

php中世界最好的语言
發布: 2018-06-04 10:48:55
原創
1222 人瀏覽過

這次帶給大家怎樣利用node做出可讀流動模式,利用node做出可讀流動模式的注意事項有哪些,以下就是實戰案例,一起來看一下。

node的可讀流基於事件

可讀流之流動模式,這種流動模式會有一個"開關",每次當"開關"開啟的時候,流動模式起作用,如果將這個"開關"設置成暫停的話,那麼,這個可讀流將不會去讀取文件,直到將這個"開關"重新置為流動。

讀取檔案流程

讀取檔案內容的流程,主要為:

  1. 開啟文件,開啟文件成功,將觸發open事件,如果開啟失敗,觸發error事件和close事件,將文件關閉。

  2. 開始讀取檔案中的內容,監聽data事件,資料處於流動狀態,可透過修改開關的狀態來暫停讀取。

  3. 每次讀取到的內容放入快取中,並透過data事件將資料發佈出去。

  4. 當檔案中的內容讀取完畢之後,就會關閉檔案。

這一系列動作都是基於事件來進行操作的,而node中的事件我們都知道是一種發布訂閱模式來實現的。

下面我們來看看,node是如何使用可讀流來讀取檔案中的內容?

node 可讀流參數

#首先我們透過fs模組來建立一個可讀流,可讀流接受兩個參數:

  1. 第一個參數是要讀取的檔案位址,在這裡指明你要讀取哪個檔案。

  2. 第二個參數是可選項,這個參數是一個對象,用來指定可讀流的一些具體的參數。

如下幾個參數我們來一一說明:

  • #highWaterMark:設定高水位線,這個參數主要用於在讀取文件時,可讀流會將檔案中的內容讀取到快取當中,而這裡我們需要建立一個buffer來快取這些數據,所以這個參數是用來設定buffer的大小,如果不對這個參數設定的話,可讀流預設的配置64k。

  • flags:這個參數主要用於設定檔案的執行模式,比如說我們具體的操作適用於讀取檔案還是寫入檔案等這些操作。如果是寫入檔案的話那我們,使用的是w。如果是讀取檔的話那這個操作符就應該是r。

下面這張表格就說明了不同的符號代表不同意義:

##意思r已讀取文件,檔案不存在報錯##r rswwx#w #wx a#axa ax #
  • autoClose:這個參數主要用於,對檔案的關閉的一些控制。如果文件再開啟的過程或在其他操作的過程中出現了錯誤的情況下,需要將文件進行關閉。那這個參數就是設定檔案是否自動關閉的功能。

  • encoding:node中用buffer來讀取檔案操作的東西二進位資料。這些資料展現出來的話我們是一堆亂碼,所以需要,要我們對這個資料指定一個具體的編碼格式。然後將會對這些資料進行編碼轉化,這樣轉化出來的資料就是我們所能看懂的資料。

  • starts:這個參數主要用來指定從什麼位置開始讀取檔案中的內容,預設的話是從零開始。

  • ends:這個參數主要用來指定定具體要讀取檔案多長的數據,這裡需要說明一下,這個參數是包括本身的位置,也就是所謂的套件前和包後。

下面我們來看看可讀流具體範例:

let fs = require("fs");
let rs = fs.createReadStream("./a.js", {
  highWaterMark: 3,
  encoding: "utf8",
  autoClose: true,
  start: 0,
  end: 9
});
rs.on("open", () => {console.log("open");});
rs.on("close", () => {console.log("close");});
rs.on("data", data => {
  console.log(data);
  rs.pause();//暂停读取 此时流动模式为暂停模式
});
setInterval(() => {
  rs.resume();//重新设置为流动模式,开始读取数据
}, 1000);
rs.on("end", () => { console.log("end"); });
rs.on("error", err => { console.log(err); });
登入後複製

手寫可讀流第一步

#上面我們說過,node可讀流是基於node的核心模組事件來完成的,所以在實作我們自己的可讀流時需要繼承events模組,程式碼如下:

let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
}
登入後複製
繼承了EventEmitter類,我們就可以使用EventEmitter類別中的各個方法,並且同樣是採用發布訂閱的模式了處理事件。

第二步:處理可讀流配置的參數

#上面我們提到,node中建立可讀流時可以對這個流配置具體的參數,例如

let rs = fs.createReadStream("./a.js", {
  highWaterMark: 3,
  encoding: "utf8",
  autoClose: true,
  start: 0,
  end: 9
});
登入後複製
那麼對於這些參數,我們自己實作的可讀流類別也需要對這些參數進行處理,那麼這些參數該如何進行處理呢?
constructor(path, options = {}) {
  super();
  this.path = path; //指定要读取的文件地址
  this.highWaterMark = options.highWaterMark || 64 * 1024;
  this.autoClose = options.autoClose || true; //是否自动关闭文件
  this.start = options.start || 0; //从文件哪个位置开始读取
  this.pos = this.start; // pos会随着读取的位置改变
  this.end = options.end || null; // null表示没传递
  this.encoding = options.encoding || null;// buffer编码
  this.flags = options.flags || 'r';
  this.flowing = null; // 模式开关
  this.buffer = Buffer.alloc(this.highWaterMark);// 根据设置创建一个buffer存储读出来的数
  this.open();
}
登入後複製
通常配置的原則是以使用者配置的參數為準,如果使用者沒有對這個參數進行設定的話,就採用預設的配置。

實作可讀流第三步:開啟檔案

#這裡原理是使用node模組fs中的open方法。首先我們來回顧下fs.open()方法的使用。
    fs.open(filename,flags,[mode],callback);
    //实例
    fs.open('./1,txt','r',function(err,fd){});
    登入後複製
  1. 這裡要說明下,

    回呼函數callback中有2個參數:

  2. 第一個是error,node中非同步回呼都會傳回的一個參數,用來說明具體的

    錯誤訊息

第二個參數是fd,是文件描述符,用來識別文件,等價於open函數的第一個參數

好了,現在我們來看看我們自己的可讀流的open方法該如何實現吧:

open() {
  fs.open(this.path, this.flags, (err, fd) => { 
    //fd标识的就是当前this.path这个文件,从3开始(number类型)
    if (err) {
      if (this.autoClose) { // 如果需要自动关闭则去关闭文件
        this.destroy(); // 销毁(关闭文件,触发关闭事件)
      }
      this.emit('error', err); // 如果有错误触发error事件
      return;
    }
    this.fd = fd; // 保存文件描述符
    this.emit('open', this.fd); // 触发文件的打开的方法
  });
}
登入後複製
從程式碼上我們可以看出:

fs.open函數是非同步函數,也就是說callback是非同步執行的,在成功開啟檔案的情況下,fd這個屬性也是非同步取得到的,這點要注意。 另外重要的一點是,如果在開啟檔案發生錯誤時,則表示開啟檔案失敗,那麼此時就需要將檔案關閉。

實作可讀流第四步:讀取檔案內容

#上面我們詳細說過,可讀流本身定義了一個"開關",當我們要讀取檔案中的內容的時候,我們需要將這個"開關"打開,那麼node可讀流本身是如何來打開這個"開關"的呢?

監聽data事件

node可讀流透過監聽data事件來實現這個"開關"的開啟:

rs.on("data", data => {
  console.log(data);
});
登入後複製
###當使用者監聽data事件的時候,"開關"開啟,不停的從文件中讀取內容。那麼node是怎麼監聽data事件的呢? ###答案就是事件模組的newListener######這是因為node可讀流是基於事件的,而事件中,伺服器就可以透過newListener事件監聽到從使用者這邊過來的所有事件,每個事件都有對應的類型,當使用者監聽的是data事件的時候,我們就可以取得到,然後就可以去讀取檔案中的內容了,那我們自己的可讀流該如何實現呢? ###
// 监听newListener事件,看是否监听了data事件,如果监听了data事件的话,就开始启动流动模式,读取文件中的内容
this.on("newListener", type => {
  if (type === "data") {
    // 开启流动模式,开始读取文件中的内容
    this.flowing = true;
    this.read();
  }
});
登入後複製

好了,知道了这个"开关"是如何打开的,那么这个时候就到了真正读取文件中内容的关键时候了,先上代码先:

read() {
  // 第一次读取文件的话,有可能文件是还没有打开的,此时this.fd可能还没有值
  if (typeof this.fd !== "number") {
    // 如果此时文件还是没有打开的话,就触发一次open事件,这样文件就真的打开了,然后再读取
    return this.once("open", () => this.read());
  }
  // 具体每次读取多少个字符,需要进行计算,因为最后一次读取倒的可能比highWaterMark小
  let howMuchRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
  fs.read(this.fd, this.buffer, 0, howMuchRead, this.pos, (err, byteRead) => {
    // this.pos 是每次读取文件读取的位置,是一个偏移量,每次读取会发生变化
    this.pos += byteRead;
    // 将读取到的内容转换成字符串串,然后通过data事件,将内容发布出去
    let srr = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);
    // 将内容通过data事件发布出去
    this.emit("data", srr);
    // 当读取到到内容长度和设置的highWaterMark一致的话,并且还是流动模式的话,就继续读取
    if ((byteRead === this.highWaterMark) && this.flowing) {
      return this.read();
    }
    // 没有更多的内容了,此时表示文件中的内容已经读取完毕
    if (byteRead < this.highWaterMark) {
      // 读取完成,发布end方法,并关闭文件
      this.emit("end");
      this.destory();
    }
  });
}
登入後複製

这里我们特别要注意的是:

  1. 文件是否已经打开,是否获取到fd,如果没有打开的话,则再次触发open方法

  2. 分批次读取文件内容,每次读取的内容是变化的,所以位置和偏移量是要动态计算的

  3. 控制读取停止的条件。

实现可读流第五步:关闭文件

好了,到现在,基础的读取工作已经完成,那么就需要将文件关闭了,上面的open和read方法里面都调用了一个方法:destory,没错,这个就是关闭文件的方法,好了,那么我们来看看这个方法该如何实现吧

destory() {
  if (typeof this.fd !== "number") {
    // 发布close事件
    return this.emit("close");
  }
  // 将文件关闭,发布close事件
  fs.close(this.fd, () => {
    this.emit("close");
  });
}
登入後複製

当然这块的原理就是调用fs模块的close方法啦。

实现可读流第六步:暂停和恢复

既然都说了,node可读流有一个神奇的"开关",就像大坝的阀门一样,可以控制水的流动,同样也可以控制水的暂停啦。当然在node可读流中的暂停是停止对文件的读取,恢复就是将开关打开,继续读取文件内容,那么这两个分别对应的方法就是pause()和resume()方法。

那么我们自己的可读流类里面该如何实现这两个方法的功能呢?非常简单:

我们在定义类的私有属性的时候,定义了这样一个属性flowing,当它的值为true时表示开关打开,反之关闭。

pause() {
  this.flowing = false;// 将流动模式设置成暂停模式,不会读取文件
}
resume() {
  this.flowing = true;//将模式设置成流动模式,可以读取文件
  this.read();// 重新开始读取文件
}
登入後複製

相信看了本文案例你已经掌握了方法,更多精彩请关注php中文网其它相关文章!

推荐阅读:

怎样利用JS做出引用传递与值传递

使用JS实做出加密解密操作

#符號
讀取並寫入,檔案不存在報錯
同步讀取檔案並忽略快取
寫入文件,不存在則創建,存在則清空
排它寫入文件
讀取並寫入文件,不存在則創建,存在則清空
和w 類似,排他方式開啟
追加寫入
與a類似,排他方式寫入
讀取並追加寫入,不存在則建立
作用與a 類似,但以排他方式開啟檔案

以上是如何利用node做出可讀流動模式的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板