Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか?

青灯夜游
リリース: 2021-07-08 10:06:38
転載
2838 人が閲覧しました

この記事では、Nodejs のストリームを紹介し、Node で読み取り可能なストリームがどのように実装されるかを見ていきます。一定の参考値があるので、困っている友達が参考になれば幸いです。

Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか?

#ストリームの概念

ストリームは、Node.js でストリーミング データを処理するための抽象インターフェイスです。ストリーム モジュールは、ストリーム インターフェイスを実装するオブジェクトを構築するために使用されます。 [推奨学習: "

nodejs チュートリアル "]

ストリームの役割

大きなファイルの読み取りと書き込みのプロセスでは、一度に読み取られてメモリに書き込まれます。読み取りと書き込みの回数は毎回制御できます。

ストリームの分類

1. 読み取り可能なストリーム - Readable

例: fs.createReadStream;

ソース コードの場所: lib/_stream_readable.js

2.書き込み可能なストリーム-Writable

例: fs.createWriteStream;

ソース コードの場所: lib /_stream_writable .js

3. 二重ストリーム - 二重: 読み取りと書き込みの機能を満たします

例: net.Socket();

ソース コードの場所: lib/ _stream_duplex.js

4. 変換ストリーム - Transform: 目的: 圧縮、トランスコーディング

例:

const { Transform } = require('stream');
Transform.call(this, '要转换的数据');//具体的使用详情 见node官网
ログイン後にコピー

-ソース コードの場所: lib/_stream_tranform.js

Readable ストリームのファイル読み取りプロセス

#ファイル コードの読み取りプロセス
    #
    const path = require("path");
    const aPath = path.join(__dirname, "a.txt");//需要读取的文件
    const fs = require("fs");
    let rs = fs.createReadStream(aPath, {
      flags: "r",
      encoding: null,//默认编码格式是buffer,深挖buffer又要学习字符编码,留个坑 到时候写一个编码规范的学习整理
      autoClose: true,//相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
      start: 0,
      highWaterMark: 3,//每次读取的个数 默认是64*1024个字节
    });
    
    rs.on("open", function (fd) {
      // fd  number类型
      console.log("fd", fd);
    });
    // 他会监听用户,绑定了data事件,就会触发对应的回调,不停的触发
    rs.on("data", function (chunk) {
    //这里会打印的是ascII 值 ,所以可以toString查看详情自己看得懂的样子
      console.log({ chunk }, "chunk.toString", chunk.toString()); 
      //如果想每一段事件 读一点 可以用rs.pause() 做暂停,然后计时器 里rs.resume()再次触发data事件
      rs.pause();//暂停读取
    });
    rs.on("close", function () {
      //当文件读取完毕后 会 触发 end事件
      console.log("close");
    });
    setInterval(() => {
      rs.resume(); //再次触发data,直到读完数据为止
    }, 1000);
    ログイン後にコピー
  • 余談: ファイル ストリームと通常のファイル ストリームについて話したいと思います。読み取り可能なストリーム 読み取りストリームの違い
  • 1. オープンとクローズはファイル ストリームに固有です。オープンとクローズをサポートするのはファイル ストリームです。
2. 読み取り可能なストリームは利用可能です (( 'data ')、on('end')、on('error')、resume、pause; したがって、これらのメソッドがサポートされている限り、それは読み取り可能なストリームです

書き込みのプロセス書き込み可能なストリームによるファイル

ファイル コードの書き込みプロセス

    const fs = require("fs");
    const path = require("path");
    const bPath = path.join(__dirname, "b.txt");
    let ws = fs.createWriteStream(bPath, {
    //参数和可读流的类似
      flags: "w",
      encoding: "utf-8",
      autoClose: true,
      start: 0,
      highWaterMark: 3,
    });
    ws.on("open", function (fd) {
      console.log("open", fd);
    });
    ws.on("close", function () {
      console.log("close");
    });
    
    //write的参数string 或者buffer,ws.write 还有一个boolea的返回值表示是真实写入文件还是放入缓存中
    ws.write("1");
    let flag = ws.write("1");
    console.log({ flag });//true
    flag = ws.write("1");
    console.log({ flag });//true
    flag = ws.write("1");
    console.log({ flag });//false
    ログイン後にコピー
  • 二重ストリームの書き込みおよび読み取りプロセス

ローカル サービスの例を書く

  • 1. サーバー (サーバー コード) の実装
  • const net = require("net"); //net 模块是 node自己封装的tcp层
    //socket 就是双工流 能读能写  http源码就是用net模块写的 基于tcp
    const server = net.createServer(function (socket) {
      socket.on("data", function (data) {//监听客户端发来的消息
        console.log(data.toString)
        socket.write("server:hello");//写入server:hello
      });
      socket.on("end", function () {
        console.log("客户端关闭");
      });
    });
    server.on("err", function (err) {
      console.log(err);
    });
    server.listen(8080);//服务端监听8080端口
    ログイン後にコピー
2. クライアント (クライアント) の実装

const net = require("net"); //net 模块是 node自己封装的tcp层
const socket = new net.Socket(); //
socket.connect(8080, "localhost"); //  表示链接服务器本地8080端口
socket.on("connect", function (data) {
  //和服务器建立链接后
  socket.write("connect server");
});
socket.on("data", function (data) {
  //监听数据,读取服务器传来的数据
  console.log(data.toString());
  socket.destroy()
});
socket.write('ok')
socket.on("error", function (err) {
  console.log(err);
});
ログイン後にコピー

3.余談になりますが、TCP の 3 ウェイ ハンドシェイクと 4 ウェイ ウェーブを確認するには、Wireshark (パケット キャプチャ ツール) を使用して、上記のコードを通じて実際のプロセスを確認できます。

変換フロー変換プロセス

変換フローは、入力を許可し、データに対して特定の操作を実行した後に出力を返す二重ストリームの一種です。この 2 つは依存関係があります。

コード プロセス(

この例の参考資料

Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか?読み取り可能ストリームの実装

ブレークポイントに従い、まず読み取り可能なストリームの呼び出しプロセスを理解します

読み取りプロセスの以前のコード読み取り可能なストリーム ファイルはブレークポイントの例として使用されます。

rs.on('open')

rs.on('open') は、入力するブレークポイント エントリです。

1. Stream.prototype.on.call を通じて Stream クラスを継承します。Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか?

ソース ファイルの場所: dlib/_stream_readable.js はありません (ここを直接調べました)ブレークポイントを通過しましたが、それも見つかりませんでした)

Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか? 次に、クリックして、Stream が EventEmitter のサブクラスであることを確認します。パブリッシュとサブスクライブのサポート

2. 監視対象のイベント タイプがデータまたは読み取り可能であり、次のイベントの監視を継続しないかどうか Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか?

#rs.on('data') Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか?

Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか?データ部分は 2 つのことを行います

    1. フロー (デフォルト値は null) が false でないかどうかを判断します 自動再開メソッドが実行されてファイルの読み取りを続行します (ここでの私の場合は rs.pause(); 手動でフローの値を false に設定します。引き続き呼び出されません)
  • 2. 次に、rs.pause () を呼び出さない場合は、resume で何が行われるかを確認するためにresume を呼び出し続けます

  • #2.1 最後に、stream.read() が呼び出されてファイルの読み取りが続行され、ファイルが読み取られた後、エミットエンドに移動し、順番にイベントを閉じます。 : したがって、ファイルが読み取られるまで、データはデフォルトでファイルの読み取りを継続します。ファイルの読み取りを制御可能にしたい場合は、私と同じことができます。rs.pause()

Nodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか? を使用してください。

自分で実装するには

実装のアイデア

继承EventEmitter发布订阅管理我们的事件

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {

}
module.exports = ReadStream;
ログイン後にコピー

数据初始化

constructor(path, options = {}) {
    super();
    //参考fs 写实例需要用到的参数
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;//默认编码格式是buffer
    this.autoClose = options.autoClose || true;//相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
    this.start = options.start || 0;//数据读取的开始位置
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;//默认一次读取64个字节的数据 
    this.offset = this.start;//fs.read的偏移量
    this.fd = undefined; //初始化fd 用于 open成功后的fd做赋值  供 read里使用
    this.flowing = false;//实现pause和resume备用,设置flag,当监听到data事件的时候 改 flowing为true,
    this.open(); //初始化的时候就要调用open
    this.on("readStreamListener", function (type) {
      // console.log(type)//这里打印就能看到 实例上所有 通过on 绑定的事件名称
      if (type === "data") {
      //监听到data事件的时候 改 flowing为true
        this.flowing = true;
        this.read();
      }
    });
    }
ログイン後にコピー

文件读取方法read,pause,resume,open和destroy的实现

open()

 open() {
 // 调用fs.open 读取目标文件 
    fs.open(this.path, this.flags, (err, fd) => { 
      this.fd = fd; //赋值一个fd 供后面的 read()方式使用,文件读取成功,fd是返回一个数字
      this.emit("open", fd);
    });
ログイン後にコピー

read()

 read() {
   // console.log("一开始read里的", this.fd); //但是这样依旧拿不到 open后的fd,用 发布订阅 通过on来获取 绑定的事件type
    //这里要做一个容错处理 ,因为open是异步读取文件,read里无法马上拿到open结果
  if (typeof this.fd !== "number") {
      //订阅open,给绑定一个回调事件read 直到this.fd有值
      return this.once("open", () => this.read());
    }
 }
  //fd打开后 调用fs.read
  //实例上的start值是未知number,存在实际剩余的可读的文件大小<highWaterMar的情况 ,用howMuchToRead 替换highWaterMark 去做fs.read的每次读取buffer的大小
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
  //定义一个用户 传进来的highWaterMark 大小的buffer对象
    const buffer = Buffer.alloc(this.highWaterMark);
       //读取文件中的内容fd给buffer 从0位置开始,每次读取howMuchToRead个。插入数据,同时更新偏移量
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          // 每读完一次,偏移量=已经读到的数量
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          //写到这里实例上的data 已经可以打印出数据了 但是 继续读取 调用this.read() 直到bytesRead不存在 说明数据读取完毕了 走else
          //回调 this.read();时候判断 this.flowing 是否为true
          //pause调用后this.flowing将为false
          if (this.flowing) {
            this.read();
          }
        } else {
          // 执行到这 bytesRead不存在说明  文件数据读取完毕了已经 触发end
          this.emit("end");//emit 实例上绑定的end事件
          //destroy 还没写到 稍等 马上后面就实现...
          this.destroy();
        }
      }
    );
ログイン後にコピー

resume()

文件读取不去data事件,会触发对应的回调,不停的触发 所以想要变可控可以手动调用 resume()& pause()

  • pause的实现,调用的时候设置 this.flowing=false,打断 read()
  pause() {
    this.flowing = false;
  }
ログイン後にコピー

pause()

  • pause 打断 read()多次读取,可以使用resume 打开 this.flowing=true 并调用read
resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
ログイン後にコピー

destroy()

  • 文件open不成功时候抛错时调用
  • 文件读取完毕后&&this.autoClose===true ,read()里文件读取end的时候 就执行close
  destroy(err) {
    if (err) {
      this.emit("error");
    }
    // 把close放destroy里 并 在read里调用
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
ログイン後にコピー

完整代码

  • 实现代码
/**
 *实现简单的可读流
 */

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    //参考fs 写实例需要用到的参数
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.fd = undefined;
    this.offset = this.start;
    this.flowing = false;
    this.open(); 
    this.on("newListener", function (type) {
      if (type === "data") {
        this.flowing = true;
        this.read();
      }
    });
  }
  destroy(err) {
    if (err) {
      this.emit("error");
    }
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        return this.destroy(err);
      }
      this.fd = fd;
      this.emit("open", fd);
    });
  }
  resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
  pause() {
    this.flowing = false;
  }

  read() {
    if (typeof this.fd !== "number") {
      return this.once("open", () => this.read());
    }
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
    const buffer = Buffer.alloc(this.highWaterMark);
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          if (this.flowing) {
            this.read();
          }
        } else {
          this.emit("end");
          this.destroy();
        }
      }
    );
  }
}

module.exports = ReadStream;
ログイン後にコピー
  • 调用代码
const ReadStream = require("./initReadStream");
let rs = new ReadStream(aPath, {
  flags: "r",
  encoding: null, //默认编码格式是buffer
  autoClose: true, //相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
  start: 0,
  highWaterMark: 3, //每次读取的个数 默认是64*1024个字节
});
ログイン後にコピー

可写流的实现

待续...

更多编程相关知识,请访问:编程视频!!

以上がNodejs の読み取り可能なストリームに関する簡単な説明。読み取り可能なストリームを実装するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:juejin.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
関連するチュートリアル
人気のおすすめ
最新のコース
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート
私たちについて 免責事項 Sitemap
PHP中国語ウェブサイト:福祉オンライン PHP トレーニング,PHP 学習者の迅速な成長を支援します!