导航菜单

  • 0.api
  • 0.Async
  • 0.module
  • 1.ES2015
  • 2.Promise
  • 3.Node
  • 4.NodeInstall
  • 5.REPL
  • 6.NodeCore
  • 7.module&NPM
  • 8.Encoding
  • 9.Buffer
  • 10.fs
  • 11.Stream-1
  • 11.Stream-2
  • 11.Stream-3
  • 11.Stream-4
  • 12-Network-2
  • 12.NetWork-3
  • 12.Network-1
  • 13.tcp
  • 14.http-1
  • 14.http-2
  • 15.compress
  • 16.crypto
  • 17.process
  • 18.yargs
  • 19.cache
  • 20.action
  • 21.https
  • 22.cookie
  • 23.session
  • 24.express-1
  • 24.express-2
  • 24.express-3
  • 24.express-4
  • 25.koa-1
  • 26.webpack-1-basic
  • 26.webpack-2-optimize
  • 26.webpack-3-file
  • 26.webpack-4.tapable
  • 26.webpack-5-AST
  • 26.webpack-6-sources
  • 26.webpack-7-loader
  • 26.webpack-8-plugin
  • 26.webpack-9-hand
  • 26.webpack-10-prepare
  • 28.redux
  • 28.redux-jwt-back
  • 28.redux-jwt-front
  • 29.mongodb-1
  • 29.mongodb-2
  • 29.mongodb-3
  • 29.mongodb-4
  • 29.mongodb-5
  • 29.mongodb-6
  • 30.cms-1-mysql
  • 30.cms-2-mysql
  • 30.cms-3-mysql
  • 30.cms-4-nunjucks
  • 30.cms-5-mock
  • 30.cms-6-egg
  • 30.cms-7-api
  • 30.cms-8-roadhog
  • 30.cms-9-yaml
  • 30.cms-10-umi
  • 30.cms-12-dva
  • 30.cms-13-dva-ant
  • 30.cms-14-front
  • 30.cms-15-deploy
  • 31.dva
  • 31.cms-13-dva-antdesign
  • 33.redis
  • 34.unittest
  • 35.jwt
  • 36.websocket-1
  • 36.websocket-2
  • 38.chat-api-1
  • 38.chat-api-2
  • 38.chat-3
  • 38.chat-api-3
  • 38.chat
  • 38.chat2
  • 38.chat2
  • 39.crawl-0
  • 39.crawl-1
  • 39.crawl-2
  • 40.deploy
  • 41.safe
  • 42.test
  • 43.nginx
  • 44.enzyme
  • 45.docker
  • 46.elastic
  • 47.oauth
  • 48.wxpay
  • index
  • 52.UML
  • 53.design
  • index
  • 54.linux
  • 57.ts
  • 56.react-ssr
  • 58.ts_react
  • 59.ketang
  • 59.ketang2
  • 61.1.devops-linux
  • 61.2.devops-vi
  • 61.3.devops-user
  • 61.4.devops-auth
  • 61.5.devops-shell
  • 61.6.devops-install
  • 61.7.devops-system
  • 61.8.devops-service
  • 61.9.devops-network
  • 61.10.devops-nginx
  • 61.11.devops-docker
  • 61.12.devops-jekins
  • 61.13.devops-groovy
  • 61.14.devops-php
  • 61.15.devops-java
  • 61.16.devops-node
  • 61.17.devops-k8s
  • 62.1.react-basic
  • 62.2.react-state
  • 62.3.react-high
  • 62.4.react-optimize
  • 62.5.react-hooks
  • 62.6.react-immutable
  • 62.7.react-mobx
  • 62.8.react-source
  • 63.1.redux
  • 63.2.redux-middleware
  • 63.3.redux-hooks
  • 63.4.redux-saga
  • 63.5.redux-saga-hand
  • 64.1.router
  • 64.2.router-connected
  • 65.1.typescript
  • 65.2.typescript
  • 65.3.typescript
  • 65.4.antd
  • 65.4.definition
  • 66-1.vue-base
  • 66-2.vue-component
  • 66-3.vue-cli3.0
  • 66-4.$message组件
  • 66-5.Form组件
  • 66-6.tree
  • 66-7.vue-router-apply
  • 66-8.axios-apply
  • 66-9.vuex-apply
  • 66-10.jwt-vue
  • 66-11.vue-ssr
  • 66-12.nuxt-apply
  • 66-13.pwa
  • 66-14.vue单元测试
  • 66-15.权限校验
  • 67-1-network
  • 68-2-wireshark
  • 7.npm2
  • 69-hooks
  • 70-deploy
  • 71-hmr
  • 72.deploy
  • 73.import
  • 74.mobile
  • 75.webpack-1.文件分析
  • 75.webpack-2.loader
  • 75.webpack-3.源码流程
  • 75.webpack-4.tapable
  • 75.webpack-5.prepare
  • 75.webpack-6.resolve
  • 75.webpack-7.loader
  • 75.webpack-8.module
  • 75.webpack-9.chunk
  • 75.webpack-10.asset
  • 75.webpack-11.实现
  • 76.react_optimize
  • 77.ts_ketang_back
  • 77.ts_ketang_front
  • 78.vue-domdiff
  • 79.grammar
  • 80.tree
  • 81.axios
  • 82.1.react
  • 82.2.react-high
  • 82.3.react-router
  • 82.4.redux
  • 82.5.redux_middleware
  • 82.6.connected
  • 82.7.saga
  • 82.8.dva
  • 82.8.dva-source
  • 82.9.roadhog
  • 82.10.umi
  • 82.11.antdesign
  • 82.12.ketang-front
  • 82.12.ketang-back
  • 83.upload
  • 84.graphql
  • 85.antpro
  • 86.1.uml
  • 86.2.design
  • 87.postcss
  • 88.react16-1
  • 89.nextjs
  • 90.react-test
  • 91.react-ts
  • 92.rbac
  • 93.tsnode
  • 94.1.JavaScript
  • 94.2.JavaScript
  • 94.3.MODULE
  • 94.4.EventLoop
  • 94.5.文件上传
  • 94.6.https
  • 94.7. nginx
  • 95.1. react
  • 95.2.react
  • 96.1.react16
  • 96.2.fiber
  • 96.3.fiber
  • 97.serverless
  • 98.websocket
  • 100.1.react-basic
  • 101.1.monitor
  • 101.2.monitor
  • 102.java
  • 103.1.webpack-usage
  • 103.2.webpack-bundle
  • 103.3.webpack-ast
  • 103.4.webpack-flow
  • 103.5.webpack-loader
  • 103.6.webpack-tapable
  • 103.7.webpack-plugin
  • 103.8.webpack-optimize1
  • 103.9.webpack-optimize2
  • 103.10.webpack-hand
  • 103.11.webpack-hmr
  • 103.11.webpack5
  • 103.13.splitChunks
  • 103.14.webpack-sourcemap
  • 103.15.webpack-compiler1
  • 103.15.webpack-compiler2
  • 103.16.rollup.1
  • 103.16.rollup.2
  • 103.16.rollup.3
  • 103.16.vite.basic
  • 103.16.vite.source
  • 103.16.vite.plugin
  • 103.16.vite.1
  • 103.16.vite.2
  • 103.17.polyfill
  • 104.1.binary
  • 104.2.binary
  • 105.skeleton
  • 106.1.react
  • 106.2.react_hooks
  • 106.3.react_router
  • 106.4.redux
  • 106.5.redux_middleware
  • 106.6.connected-react-router
  • 106.6.redux-first-history
  • 106.7.redux-saga
  • 106.8.dva
  • 106.9.umi
  • 106.10.ketang
  • 106.11.antdesign
  • 106.12.antpro
  • 106.13.router-6
  • 106.14.ssr
  • 106.15.nextjs
  • 106.16.1.cms
  • 106.16.2.cms
  • 106.16.3.cms
  • 106.16.4.cms
  • 106.16.mobx
  • 106.17.fomily
  • 107.fiber
  • 108.http
  • 109.1.webpack_usage
  • 109.2.webpack_source
  • 109.3.dll
  • 110.nest.js
  • 111.xstate
  • 112.Form
  • 113.redux-saga
  • 114.react+typescript
  • 115.immer
  • 116.pro5
  • 117.css-loader
  • 118.1.umi-core
  • 119.2.module-federation
  • 119.1.module-federation
  • 120.create-react-app
  • 121.react-scripts
  • 122.react-optimize
  • 123.jsx-runtime
  • 124.next.js
  • 125.1.linux
  • 125.2.linux-vi
  • 125.3.linux-user
  • 125.4.linux-auth
  • 125.5.linux-shell
  • 125.6.linux-install
  • 125.7.linux-system
  • 125.8.linux-service
  • 125.9.linux-network
  • 125.10.nginx
  • 125.11.docker
  • 125.12.ci
  • 125.13.k8s
  • 125.14.k8s
  • 125.15.k8s
  • 125.16.k8s
  • 126.11.react-1
  • 126.12.react-2
  • 126.12.react-3
  • 126.12.react-4
  • 126.12.react-5
  • 126.12.react-6
  • 126.12.react-7
  • 126.12.react-8
  • 127.frontend
  • 128.rollup
  • 129.px2rem-loader
  • 130.health
  • 131.hooks
  • 132.keepalive
  • 133.vue-cli
  • 134.react18
  • 134.2.react18
  • 134.3.react18
  • 135.function
  • 136.toolkit
  • 137.lerna
  • 138.create-vite
  • 139.cli
  • 140.antd
  • 141.react-dnd
  • 142.1.link
  • 143.1.gulp
  • 143.2.stream
  • 143.3.gulp
  • 144.1.closure
  • 144.2.v8
  • 144.3.gc
  • 145.react-router-v6
  • 146.browser
  • 147.lighthouse
  • 148.1.basic
  • 148.2.basic
  • 148.3.basic
  • 148.4.basic
  • 148.5.basic
  • 149.1.vite
  • 149.2.vite
  • 149.3.vite
  • 149.4.vite
  • 150.react-window
  • 151.react-query
  • 152.useRequest
  • 153.transition
  • 154.emotion
  • 155.1.formily
  • 155.2.formily
  • 155.3.formily
  • 155.3.1.mobx.usage
  • 155.3.2.mobx.source
  • 156.vue-loader
  • 103.11.mf
  • 157.1.react18
  • 158.umi4
  • 159.rxjs
  • 159.rxjs2
  • 160.bff
  • 161.zustand
  • 162.vscode
  • 163.emp
  • 164.cors
  • 1. 流的概念
  • 2.可读流createReadStream
    • 2.1 创建可读流
    • 2.2 监听data事件
    • 2.3 监听end事件
    • 2.4 监听error事件
    • 2.5 监听open事件
    • 2.6 监听close事件
    • 2.7 设置编码
    • 2.8 暂停和恢复触发data
  • 3.可写流createWriteStream
    • 3.1 创建可写流
    • 3.2 write方法
    • 3.3 end方法
    • 3.4 drain方法
    • 3.5 finish方法
  • 4.pipe方法
    • 4.1 pipe方法的原理
    • 4.2 pipe用法
    • 4.3 unpipe用法
    • 4.4 cork
    • 4.5 uncork
  • 5. 简单实现
    • 5.1 可读流的简单实现
    • 5.2 可写流的简单实现
    • 5.3 pipe
  • 5.4 暂停模式

1. 流的概念 #

  • 流是一组有序的,有起点和终点的字节数据传输手段
  • 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器request和response对象都是流。

2.可读流createReadStream #

实现了stream.Readable接口的对象,将对象数据读取为流数据,当监听data事件后,开始发射数据

fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};
util.inherits(ReadStream, Readable);

2.1 创建可读流 #

var rs = fs.createReadStream(path,[options]);
  1. path读取文件的路径
  2. options
    • flags打开文件要做的操作,默认为'r'
    • encoding默认为null
    • start开始读取的索引位置
    • end结束读取的索引位置(包括结束位置)
    • highWaterMark读取缓存区默认的大小64kb

如果指定utf8编码highWaterMark要大于3个字节

2.2 监听data事件 #

流切换到流动模式,数据会被尽可能快的读出

rs.on('data', function (data) {
    console.log(data);
});

2.3 监听end事件 #

该事件会在读完数据后被触发

rs.on('end', function () {
    console.log('读取完成');
});

2.4 监听error事件 #

rs.on('error', function (err) {
    console.log(err);
});

2.5 监听open事件 #

rs.on('open', function () {
    console.log(err);
});

2.6 监听close事件 #

rs.on('close', function () {
    console.log(err);
});

2.7 设置编码 #

与指定{encoding:'utf8'}效果相同,设置编码

rs.setEncoding('utf8');

2.8 暂停和恢复触发data #

通过pause()方法和resume()方法

rs.on('data', function (data) {
    rs.pause();
    console.log(data);
});
setTimeout(function () {
    rs.resume();
},2000);

3.可写流createWriteStream #

实现了stream.Writable接口的对象来将流数据写入到对象中

fs.createWriteStream = function(path, options) {
  return new WriteStream(path, options);
};

util.inherits(WriteStream, Writable);

3.1 创建可写流 #

var ws = fs.createWriteStream(path,[options]);
  1. path写入的文件路径
  2. options
    • flags打开文件要做的操作,默认为'w'
    • encoding默认为utf8
    • highWaterMark写入缓存区的默认大小16kb

3.2 write方法 #

ws.write(chunk,[encoding],[callback]);
  1. chunk写入的数据buffer/string
  2. encoding编码格式chunk为字符串时有用,可选
  3. callback 写入成功后的回调

返回值为布尔值,系统缓存区满时为false,未满时为true

3.3 end方法 #

ws.end(chunk,[encoding],[callback]);

表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数

3.4 drain方法 #

  • 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发
  • 建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块
    let fs = require('fs');
    let ws = fs.createWriteStream('./2.txt',{
      flags:'w',
      encoding:'utf8',
      highWaterMark:3
    });
    let i = 10;
    function write(){
     let  flag = true;
     while(i&&flag){
          flag = ws.write("1");
          i--;
         console.log(flag);
     }
    }
    write();
    ws.on('drain',()=>{
      console.log("drain");
      write();
    });

3.5 finish方法 #

在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统之后, 'finish' 事件将被触发。

var writer = fs.createWriteStream('./2.txt');
for (let i = 0; i < 100; i++) {
  writer.write(`hello, ${i}!\n`);
}
writer.end('结束\n');
writer.on('finish', () => {
  console.error('所有的写入已经完成!');
});

4.pipe方法 #

4.1 pipe方法的原理 #

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});

4.2 pipe用法 #

readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);

将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存。

4.3 unpipe用法 #

  • readable.unpipe()方法将之前通过stream.pipe()方法绑定的流分离
  • 如果 destination 没有传入, 则所有绑定的流都会被分离.
    let fs = require('fs');
    var from = fs.createReadStream('./1.txt');
    var to = fs.createWriteStream('./2.txt');
    from.pipe(to);
    setTimeout(() => {
    console.log('关闭向2.txt的写入');
    from.unpipe(writable);
    console.log('手工关闭文件流');
    to.end();
    }, 1000);

4.4 cork #

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出。

4.5 uncork #

writable.uncork()将输出在stream.cork()方法被调用之后缓冲在内存中的所有数据。

stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());

5. 简单实现 #

5.1 可读流的简单实现 #

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on('open', function () {
    console.log("open");
});
rs.on('data', function (data) {
    console.log(data);
});
rs.on('end', function () {
    console.log("end");
});
rs.on('close', function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.fd = options.fd;
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.start = options.start || 0;
        this.pos = this.start;
        this.end = options.end;
        this.flowing = false;
        this.autoClose = true;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.length = 0;
        this.on('newListener', (type, listener) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
        });
        this.on('end', () => {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.open();
    }

    read() {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        let n = this.end ? Math.min(this.end - this.pos, this.highWaterMark) : this.highWaterMark;
        fs.read(this.fd,this.buffer,0,n,this.pos,(err,bytesRead)=>{
            if(err){
             return;
            }
            if(bytesRead){
                let data = this.buffer.slice(0,bytesRead);
                data = this.encoding?data.toString(this.encoding):data;
                this.emit('data',data);
                this.pos += bytesRead;
                if(this.end && this.pos > this.end){
                  return this.emit('end');
                }
                if(this.flowing)
                    this.read();
            }else{
                this.emit('end');
            }
        })
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) return this.emit('error', err);
            this.fd = fd;
            this.emit('open', fd);
        })
    }


    end() {
        if (this.autoClose) {
            this.destroy();
        }
    }

    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }

}

module.exports = WriteStream;

5.2 可写流的简单实现 #

let fs = require('fs');
 let FileWriteStream = require('./FileWriteStream');
 let ws = FileWriteStream('./2.txt',{
     flags:'w',
     encoding:'utf8',
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1",'utf8',(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on('drain',()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends  EventEmitter{
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.fd = options.fd;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.encoding = options.encoding;
        this.start = options.start || 0;
        this.pos = this.start;
        this.writing = false;
        this.autoClose = true;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.buffers = [];
        this.length = 0;
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) return this.emit('error', err);
            this.fd = fd;
            this.emit('open', fd);
        })
    }

    write(chunk, encoding, cb) {
        if (typeof encoding == 'function') {
            cb = encoding;
            encoding = null;
        }

        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding || 'utf8');
        let len = chunk.length;
        this.length += len;
        let ret = this.length < this.highWaterMark;
        if (this.writing) {
            this.buffers.push({
                chunk,
                encoding,
                cb,
            });
        } else {
            this.writing = true;
            this._write(chunk, encoding,this.clearBuffer.bind(this));
        }
        return ret;
    }

    _write(chunk, encoding, cb) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this._write(chunk, encoding, cb));
        }
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            this.length -= written;
            this.pos += written;
            cb && cb();
        });
    }

    clearBuffer() {
        let data = this.buffers.shift();
        if (data) {
            this._write(data.chunk, data.encoding, this.clearBuffer.bind(this))
        } else {
            this.writing = false;
            this.emit('drain');
        }
    }

    end() {
        if (this.autoClose) {
            this.emit('end');
            this.destroy();
        }
    }

    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }

}

module.exports = WriteStream;

5.3 pipe #

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}

5.4 暂停模式 #

let fs =require('fs');
let ReadStream2 = require('./ReadStream2');
let rs = new ReadStream2('./1.txt',{
    start:3,
    end:8,
    encoding:'utf8',
    highWaterMark:3
});
rs.on('readable',function () {
    console.log('readable');
    console.log('rs.buffer.length',rs.length);
    let d = rs.read(1);
    console.log(d);
    console.log('rs.buffer.length',rs.length);

    setTimeout(()=>{
        console.log('rs.buffer.length',rs.length);
    },500)
});
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end', function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n,10);
        if(n != n){
            n = this.length;
        }
        if(this.length ==0)
            this.needReadable = true;
        let ret;
        if (0<n < this.length) {
            ret = Buffer.alloc(n);
            let b ;
            let index = 0;
            while(null != (b = this.buffers.shift())){
                for(let i=0;i<b.length;i++){
                    ret[index++] = b[i];
                    if(index == ret.length){
                        this.length -= n;
                        b = b.slice(i+1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if(this.needReadable){
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if(this.needReadable){
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if(this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }

}

module.exports = ReadStream;
/**
 * if (n !== 0)
 state.emittedReadable = false; 只要要读的字节数不是0就需要触发readable事件
 如果传入的NaN,则将n赋为缓区的长度,第一次就是0

 缓存区为0就开始读吧
 如果n等于0就返回null,state.needReadable = true;
 如果缓存区为0,是  state.needReadable = true; 需要触发readable

 **/
  • streaming_in_node
  • stream-handbook

访问验证

请输入访问令牌

Token不正确,请重新输入