导航菜单

  • 0.api
  • 0.Async
  • 0.module
  • 1.ES2015
  • 2.Promise
  • 3.Node
  • 4.NodeInstall
  • 5.REPL
  • 6.NodeCore
  • 7.module&NPM
  • 8.Encoding
  • 9.Buffer
  • 10.fs
  • 11.Stream-1
  • 11.Stream-2
  • 11.Stream-3
  • 11.Stream-4
  • 12-Network-2
  • 12.NetWork-3
  • 12.Network-1
  • 13.tcp
  • 14.http-1
  • 14.http-2
  • 15.compress
  • 16.crypto
  • 17.process
  • 18.yargs
  • 19.cache
  • 20.action
  • 21.https
  • 22.cookie
  • 23.session
  • 24.express-1
  • 24.express-2
  • 24.express-3
  • 24.express-4
  • 25.koa-1
  • 26.webpack-1-basic
  • 26.webpack-2-optimize
  • 26.webpack-3-file
  • 26.webpack-4.tapable
  • 26.webpack-5-AST
  • 26.webpack-6-sources
  • 26.webpack-7-loader
  • 26.webpack-8-plugin
  • 26.webpack-9-hand
  • 26.webpack-10-prepare
  • 28.redux
  • 28.redux-jwt-back
  • 28.redux-jwt-front
  • 29.mongodb-1
  • 29.mongodb-2
  • 29.mongodb-3
  • 29.mongodb-4
  • 29.mongodb-5
  • 29.mongodb-6
  • 30.cms-1-mysql
  • 30.cms-2-mysql
  • 30.cms-3-mysql
  • 30.cms-4-nunjucks
  • 30.cms-5-mock
  • 30.cms-6-egg
  • 30.cms-7-api
  • 30.cms-8-roadhog
  • 30.cms-9-yaml
  • 30.cms-10-umi
  • 30.cms-12-dva
  • 30.cms-13-dva-ant
  • 30.cms-14-front
  • 30.cms-15-deploy
  • 31.dva
  • 31.cms-13-dva-antdesign
  • 33.redis
  • 34.unittest
  • 35.jwt
  • 36.websocket-1
  • 36.websocket-2
  • 38.chat-api-1
  • 38.chat-api-2
  • 38.chat-3
  • 38.chat-api-3
  • 38.chat
  • 38.chat2
  • 38.chat2
  • 39.crawl-0
  • 39.crawl-1
  • 39.crawl-2
  • 40.deploy
  • 41.safe
  • 42.test
  • 43.nginx
  • 44.enzyme
  • 45.docker
  • 46.elastic
  • 47.oauth
  • 48.wxpay
  • index
  • 52.UML
  • 53.design
  • index
  • 54.linux
  • 57.ts
  • 56.react-ssr
  • 58.ts_react
  • 59.ketang
  • 59.ketang2
  • 61.1.devops-linux
  • 61.2.devops-vi
  • 61.3.devops-user
  • 61.4.devops-auth
  • 61.5.devops-shell
  • 61.6.devops-install
  • 61.7.devops-system
  • 61.8.devops-service
  • 61.9.devops-network
  • 61.10.devops-nginx
  • 61.11.devops-docker
  • 61.12.devops-jekins
  • 61.13.devops-groovy
  • 61.14.devops-php
  • 61.15.devops-java
  • 61.16.devops-node
  • 61.17.devops-k8s
  • 62.1.react-basic
  • 62.2.react-state
  • 62.3.react-high
  • 62.4.react-optimize
  • 62.5.react-hooks
  • 62.6.react-immutable
  • 62.7.react-mobx
  • 62.8.react-source
  • 63.1.redux
  • 63.2.redux-middleware
  • 63.3.redux-hooks
  • 63.4.redux-saga
  • 63.5.redux-saga-hand
  • 64.1.router
  • 64.2.router-connected
  • 65.1.typescript
  • 65.2.typescript
  • 65.3.typescript
  • 65.4.antd
  • 65.4.definition
  • 66-1.vue-base
  • 66-2.vue-component
  • 66-3.vue-cli3.0
  • 66-4.$message组件
  • 66-5.Form组件
  • 66-6.tree
  • 66-7.vue-router-apply
  • 66-8.axios-apply
  • 66-9.vuex-apply
  • 66-10.jwt-vue
  • 66-11.vue-ssr
  • 66-12.nuxt-apply
  • 66-13.pwa
  • 66-14.vue单元测试
  • 66-15.权限校验
  • 67-1-network
  • 68-2-wireshark
  • 7.npm2
  • 69-hooks
  • 70-deploy
  • 71-hmr
  • 72.deploy
  • 73.import
  • 74.mobile
  • 75.webpack-1.文件分析
  • 75.webpack-2.loader
  • 75.webpack-3.源码流程
  • 75.webpack-4.tapable
  • 75.webpack-5.prepare
  • 75.webpack-6.resolve
  • 75.webpack-7.loader
  • 75.webpack-8.module
  • 75.webpack-9.chunk
  • 75.webpack-10.asset
  • 75.webpack-11.实现
  • 76.react_optimize
  • 77.ts_ketang_back
  • 77.ts_ketang_front
  • 78.vue-domdiff
  • 79.grammar
  • 80.tree
  • 81.axios
  • 82.1.react
  • 82.2.react-high
  • 82.3.react-router
  • 82.4.redux
  • 82.5.redux_middleware
  • 82.6.connected
  • 82.7.saga
  • 82.8.dva
  • 82.8.dva-source
  • 82.9.roadhog
  • 82.10.umi
  • 82.11.antdesign
  • 82.12.ketang-front
  • 82.12.ketang-back
  • 83.upload
  • 84.graphql
  • 85.antpro
  • 86.1.uml
  • 86.2.design
  • 87.postcss
  • 88.react16-1
  • 89.nextjs
  • 90.react-test
  • 91.react-ts
  • 92.rbac
  • 93.tsnode
  • 94.1.JavaScript
  • 94.2.JavaScript
  • 94.3.MODULE
  • 94.4.EventLoop
  • 94.5.文件上传
  • 94.6.https
  • 94.7. nginx
  • 95.1. react
  • 95.2.react
  • 96.1.react16
  • 96.2.fiber
  • 96.3.fiber
  • 97.serverless
  • 98.websocket
  • 100.1.react-basic
  • 101.1.monitor
  • 101.2.monitor
  • 102.java
  • 103.1.webpack-usage
  • 103.2.webpack-bundle
  • 103.3.webpack-ast
  • 103.4.webpack-flow
  • 103.5.webpack-loader
  • 103.6.webpack-tapable
  • 103.7.webpack-plugin
  • 103.8.webpack-optimize1
  • 103.9.webpack-optimize2
  • 103.10.webpack-hand
  • 103.11.webpack-hmr
  • 103.11.webpack5
  • 103.13.splitChunks
  • 103.14.webpack-sourcemap
  • 103.15.webpack-compiler1
  • 103.15.webpack-compiler2
  • 103.16.rollup.1
  • 103.16.rollup.2
  • 103.16.rollup.3
  • 103.16.vite.basic
  • 103.16.vite.source
  • 103.16.vite.plugin
  • 103.16.vite.1
  • 103.16.vite.2
  • 103.17.polyfill
  • 104.1.binary
  • 104.2.binary
  • 105.skeleton
  • 106.1.react
  • 106.2.react_hooks
  • 106.3.react_router
  • 106.4.redux
  • 106.5.redux_middleware
  • 106.6.connected-react-router
  • 106.6.redux-first-history
  • 106.7.redux-saga
  • 106.8.dva
  • 106.9.umi
  • 106.10.ketang
  • 106.11.antdesign
  • 106.12.antpro
  • 106.13.router-6
  • 106.14.ssr
  • 106.15.nextjs
  • 106.16.1.cms
  • 106.16.2.cms
  • 106.16.3.cms
  • 106.16.4.cms
  • 106.16.mobx
  • 106.17.fomily
  • 107.fiber
  • 108.http
  • 109.1.webpack_usage
  • 109.2.webpack_source
  • 109.3.dll
  • 110.nest.js
  • 111.xstate
  • 112.Form
  • 113.redux-saga
  • 114.react+typescript
  • 115.immer
  • 116.pro5
  • 117.css-loader
  • 118.1.umi-core
  • 119.2.module-federation
  • 119.1.module-federation
  • 120.create-react-app
  • 121.react-scripts
  • 122.react-optimize
  • 123.jsx-runtime
  • 124.next.js
  • 125.1.linux
  • 125.2.linux-vi
  • 125.3.linux-user
  • 125.4.linux-auth
  • 125.5.linux-shell
  • 125.6.linux-install
  • 125.7.linux-system
  • 125.8.linux-service
  • 125.9.linux-network
  • 125.10.nginx
  • 125.11.docker
  • 125.12.ci
  • 125.13.k8s
  • 125.14.k8s
  • 125.15.k8s
  • 125.16.k8s
  • 126.11.react-1
  • 126.12.react-2
  • 126.12.react-3
  • 126.12.react-4
  • 126.12.react-5
  • 126.12.react-6
  • 126.12.react-7
  • 126.12.react-8
  • 127.frontend
  • 128.rollup
  • 129.px2rem-loader
  • 130.health
  • 131.hooks
  • 132.keepalive
  • 133.vue-cli
  • 134.react18
  • 134.2.react18
  • 134.3.react18
  • 135.function
  • 136.toolkit
  • 137.lerna
  • 138.create-vite
  • 139.cli
  • 140.antd
  • 141.react-dnd
  • 142.1.link
  • 143.1.gulp
  • 143.2.stream
  • 143.3.gulp
  • 144.1.closure
  • 144.2.v8
  • 144.3.gc
  • 145.react-router-v6
  • 146.browser
  • 147.lighthouse
  • 148.1.basic
  • 148.2.basic
  • 148.3.basic
  • 148.4.basic
  • 148.5.basic
  • 149.1.vite
  • 149.2.vite
  • 149.3.vite
  • 149.4.vite
  • 150.react-window
  • 151.react-query
  • 152.useRequest
  • 153.transition
  • 154.emotion
  • 155.1.formily
  • 155.2.formily
  • 155.3.formily
  • 155.3.1.mobx.usage
  • 155.3.2.mobx.source
  • 156.vue-loader
  • 103.11.mf
  • 157.1.react18
  • 158.umi4
  • 159.rxjs
  • 159.rxjs2
  • 160.bff
  • 161.zustand
  • 162.vscode
  • 163.emp
  • 164.cors
  • 1.流的分类
  • 2.Readable(可读流)
    • 2.1 1.readableStream.js
    • 2.2 readableStream.js
    • 2.3 Readable.js
    • 2.4 Stream.js
  • 3.Writable(可写流)
    • 3.1 基本实现
      • 3.1.1 2.writableStream.js
      • 3.1.2 writableStream.js
      • 3.1.3 Writable.js
    • 3.2 highWaterMark
      • 3.2.1 3.highWaterMark.js
      • 3.2.2 Writable.js
  • 4.pipe(管道)
    • 4.1 3.pipe.js
    • 4.2 Readable.js
  • 5.Duplex()双工流)
    • 5.1 4.duplexStream.js
    • 5.2 duplexStream.js
    • 5.3 Duplex.js
  • 6.Transform(转换流)
    • 6.1 5.transformStream.js
    • 6.2 transformStream.js
    • 6.3 Transform.js
  • 7.objectMode(对象模式)
    • 7.1 6.objectMode.js
  • 8.through2
    • 8.1 7.through2.js
    • 8.2 through2.js
    • 8.3 through2.obj
    • 8.4 through2.js
    • 8.5 data.txt

1.流的分类 #

  • Readable 可读流
  • Writable 可写流
  • Duplex 双工流
  • Transform 转换流
  • PassThrough 传递流

2.Readable(可读流) #

2.1 1.readableStream.js #

1.readableStream.js

const readableStream = require('./readableStream');
readableStream.on('data', (data) => {
    console.log(data);
    readableStream.pause();
});

2.2 readableStream.js #

readableStream.js

const Readable = require('./Readable');
const readableIterator = (function (count) {
    return {
        next() {
            count++;
            if (count <= 5) {
                return { done: false, value: count + '' };
            } else {
                return { done: true, value: null }
            }
        }
    }
})(0)

const readableStream = new Readable({
    read() {
        let { done, value } = readableIterator.next();
        if (done) {
            this.push(null);
        } else {
            this.push(value);
        }
    }
});
module.exports = readableStream;

2.3 Readable.js #

Readable.js

const Stream = require('./Stream');
var { inherits } = require('util');
function Readable(options) {
    Stream.call(this, options);
    this._readableState = { ended: false, buffer: [], flowing: false };
    if (options.read) this._read = options.read;
}
inherits(Readable, Stream);
Readable.prototype.on = function (event, fn) {
    Stream.prototype.on.call(this, event, fn);
    if (event === 'data') {
        this.resume();
    }
}
Readable.prototype.resume = function () {
    this._readableState.flowing = true;
    while (this.read());
}
Readable.prototype.pause = function () {
    this._readableState.flowing = false;
}
Readable.prototype.read = function () {
    if (!this._readableState.ended && this._readableState.flowing) {
        this._read();
    }
    let data = this._readableState.buffer.shift();
    if (data) {
        this.emit('data', data);
    }
    return data;
}
Readable.prototype.push = function (chunk) {
    if (chunk === null) {
        this._readableState.ended = true;
    } else {
        this._readableState.buffer.push(chunk);
    }
}
module.exports = Readable;

2.4 Stream.js #

Stream.js

const EventEmitter = require('events');
var { inherits } = require('util');
function Stream(options) {
     this.options = options;
    EventEmitter.call(this);
}
inherits(Stream, EventEmitter);
module.exports = Stream;

3.Writable(可写流) #

3.1 基本实现 #

3.1.1 2.writableStream.js #

2.writableStream.js

let writableStream = require('./writableStream');
writableStream.write('1');
writableStream.write('2');
writableStream.write('3');
writableStream.write('4');
writableStream.write('5');
writableStream.end();

3.1.2 writableStream.js #

writableStream.js

const Writable = require('./Writable');
const writableStream = new Writable({
    write(data, encoding, next) {
        console.log(data.toString(encoding));
        setTimeout(next, 1000);
    }
});
module.exports = writableStream;

3.1.3 Writable.js #

Writable.js

const Stream = require('./Stream');
var { inherits } = require('util');
function Writable(options) {
    Stream.call(this, options);
    this._writableState = {
        ended: false,
        writing: false,
        buffer: []
    };
    if (options.write) this._write = options.write;
}
inherits(Writable, Stream);
Writable.prototype.write = function (chunk) {
    if (this._writableState.ended) {
        return;
    }
    if (this._writableState.writing) {
        this._writableState.buffer.push(chunk);
    } else {
        this._writableState.writing = true;
        this._write(chunk, 'utf8', () => this.next());
    }
}
Writable.prototype.next = function () {
    this._writableState.writing = false;
    if (this._writableState.buffer.length > 0) {
        this._write(this._writableState.buffer.shift(), 'utf8', () => this.next());
    }
}
Writable.prototype.end = function () {
    this._writableState.ended = true;
}
module.exports = Writable;

3.2 highWaterMark #

3.2.1 3.highWaterMark.js #

3.highWaterMark.js

//const { Writable } = require('stream');
const Writable = require('./Writable');
class WritableStream extends Writable {
    _write = (data, encoding, next) => {
        console.log(data.toString());
        setTimeout(next, 1000);
    }
}
const writableStream = new WritableStream({
    highWaterMark: 1
});
writableStream.on('finish', () => {
    console.log('finish');
});
let canWrite = writableStream.write('1');
console.log('canWrite:1', canWrite);
canWrite = writableStream.write('2');
console.log('canWrite:2', canWrite);
canWrite = writableStream.write('3');
console.log('canWrite:3', canWrite);
writableStream.once('drain', () => {
    console.log('drain');
    let canWrite = writableStream.write('4');
    console.log('canWrite:4', canWrite);
    canWrite = writableStream.write('5');
    console.log('canWrite:5', canWrite);
    canWrite = writableStream.write('6');
    console.log('canWrite:6', canWrite);
});

/**
1
canWrite:1 false
canWrite:2 false
canWrite:3 false
2
3
drain
4
canWrite:4 false
canWrite:5 false
canWrite:6 false
5
6
 */

3.2.2 Writable.js #

Writable.js

const Stream = require('./Stream');
var { inherits } = require('util');
function Writable(options) {
    Stream.call(this, options);
    this._writableState = {
        ended: false,
        writing: false,
        buffer: [],
+       bufferSize: 0
    };
    if (options.write) this._write = options.write;
}
inherits(Writable, Stream);
Writable.prototype.write = function (chunk) {
    if (this._writableState.ended) {
        return;
    }
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, 'utf8');
    this._writableState.bufferSize += chunk.length;
+   let canWrite = this.options.highWaterMark > this._writableState.bufferSize;
+   if (this._writableState.writing) {
+       this._writableState.buffer.push(chunk);
+   } else {
+       this._writableState.writing = true;
+       this._write(chunk, 'utf8', () => this.next());
+   }
+   return canWrite;
}
Writable.prototype.next = function () {
    this._writableState.writing = false;
+   if (this._writableState.buffer.length > 0) {
+       let chunk = this._writableState.buffer.shift();
+       this._write(chunk, 'utf8', () => {
+           this._writableState.bufferSize -= chunk.length;
+           this.next();
+       })
+   } else {
+       this.emit('drain');
+   }
}
Writable.prototype.end = function () {
    this._writableState.ended = true;
}
module.exports = Writable;

4.pipe(管道) #

4.1 3.pipe.js #

3.pipe.js

const readableStream = require('./readableStream');
const writableStream = require('./writableStream');
readableStream.pipe(writableStream);

4.2 Readable.js #

Readable.js

const Stream = require('./Stream');
var { inherits } = require('util');
function Readable(options) {
    Stream.call(this, options);
    this._readableState = { ended: false, buffer: [], flowing: false };
    if (options.read) this._read = options.read;
}
inherits(Readable, Stream);
Readable.prototype.on = function (event, fn) {
    Stream.prototype.on.call(this, event, fn);
    if (event === 'data') {
        this.resume();
    }
}
Readable.prototype.resume = function () {
    this._readableState.flowing = true;
    while (this.read());
}
Readable.prototype.pause = function () {
    this._readableState.flowing = false;
}
Readable.prototype.read = function () {
    if (!this._readableState.ended && this._readableState.flowing) {
        this._read();
    }
    let data = this._readableState.buffer.shift();
    if (data) {
        this.emit('data', data);
    }
    return data;
}
Readable.prototype.push = function (chunk) {
    if (chunk === null) {
        this._readableState.ended = true;
    } else {
        this._readableState.buffer.push(chunk);
    }
}
+Readable.prototype.pipe = function (dest) {
+    this.on('data', (chunk) => {
+        dest.write(chunk);
+    })
+    this.on('end', () => {
+        dest.end();
+    });
+}
module.exports = Readable;

5.Duplex()双工流) #

5.1 4.duplexStream.js #

4.duplexStream.js

const duplexStream = require('./duplexStream');
duplexStream.pipe(duplexStream);

5.2 duplexStream.js #

duplexStream.js

const Duplex = require('./Duplex');
const readableIterator = (function (count) {
    return {
        next() {
            count++;
            if (count <= 5) {
                return { done: false, value: count + '' };
            } else {
                return { done: true, value: null }
            }
        }
    }
})(0)
const duplexStream = new Duplex({
    read() {
        let { done, value } = readableIterator.next();
        if (done) {
            this.push(null);
        } else {
            this.push(value);
        }
    },
    write(data, encoding, next) {
        console.log(data.toString(encoding));
        setTimeout(next, 1000);
    }
});
module.exports = duplexStream;

5.3 Duplex.js #

Duplex.js

const Readable = require('./Readable');
const Writable = require('./Writable');
var { inherits } = require('util');
inherits(Duplex, Readable);
const keys = Object.keys(Writable.prototype);
for (let v = 0; v < keys.length; v++) {
    const method = keys[v];
    if (!Duplex.prototype[method]) {
        Duplex.prototype[method] = Writable.prototype[method];
    }
}
function Duplex(options) {
    Readable.call(this, options);
    Writable.call(this, options);
}

module.exports = Duplex;

6.Transform(转换流) #

6.1 5.transformStream.js #

5.transformStream.js

const readableStream = require('./readableStream');
const transformStream = require('./transformStream');
const writableStream = require('./writableStream');
readableStream.pipe(transformStream).pipe(writableStream);

6.2 transformStream.js #

transformStream.js

const Transform = require('./Transform');
const transformStream = new Transform({
    transform(buffer, encoding, next) {
        let transformed = buffer.toString(encoding) + '$';
        next(null, transformed);
    }
});
module.exports = transformStream;

6.3 Transform.js #

Transform.js

const Duplex = require('./Duplex');
var { inherits } = require('util');
inherits(Transform, Duplex);
function Transform(options) {
    Duplex.call(this, options);
    if (options.transform) this._transform = options.transform;
}
Transform.prototype._write = function (chunk, encoding, next) {
    this._transform(chunk, encoding, (err, data) => {
        if (data) {
            this.push(data);
        }
        next(err);
    });
}
Transform.prototype._read = function () {

}
module.exports = Transform;

7.objectMode(对象模式) #

  • 默认情况下,流处理的数据是Buffer/String类型的值
  • 有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象

7.1 6.objectMode.js #

6.objectMode.js

const { Readable, Writable } = require('stream');
const readableIterator = (function (count) {
    return {
        next() {
            count++;
            if (count <= 5) {
                return { done: false, value: { id: count + '' } };
            } else {
                return { done: true, value: null }
            }
        }
    }
})(0)
const readableStream = new Readable({
    objectMode: true,
    read() {
        let { done, value } = readableIterator.next();
        if (done) {
            this.push(null);
        } else {
            this.push(value);
        }
    }
});
const writableStream = new Writable({
    objectMode: true,
    write(data, encoding, next) {
        console.log(data);
        setTimeout(next, 1000);
    }
});
readableStream.pipe(writableStream);

8.through2 #

  • through2是一个简单的流处理模块,它提供了一个简单的接口,可以让我们更加方便地处理流

8.1 7.through2.js #

const fs = require('fs');
const through2 = require('./through2');
const readableStream = require('./readableStream');
const writableStream = require('./writableStream');
const transformStream = through2(function (chunk, encoding, next) {
    let transformed = chunk.toString(encoding) + '$';
    next(null, transformed);
});
readableStream.pipe(transformStream).pipe(writableStream);

8.2 through2.js #

through2.js

const fs = require('fs');
const through2 = require('./through2');
const readableStream = require('./readableStream');
const writableStream = require('./writableStream');
const transformStream = through2(function (chunk, encoding, next) {
    let transformed = chunk.toString(encoding) + '$';
    next(null, transformed);
});
readableStream.pipe(transformStream).pipe(writableStream);

8.3 through2.obj #

8.through2.js

const fs = require('fs');
const through2 = require('through2');
const fileStream = fs.createReadStream('data.txt', { highWaterMark: 10 });
const all = [];
fileStream.pipe(
    through2.obj(function (chunk, encoding, next) {
        this.push(JSON.parse(chunk))
        next();
    })).on('data', (data) => {
        all.push(data)
    }).on('end', () => {
        console.log(all);
    })

8.4 through2.js #

through2.js

const Transform = require('./Transform');
const { Transform } = require('stream');
function through2(transform) {
    return new Transform({
        transform
    });
}
through2.obj = function (transform) {
    return new Transform({
        objectMode: true,
        transform
    });
}
module.exports = through2;

8.5 data.txt #

data.txt

{"id":1}
{"id":2}
{"id":3}

访问验证

请输入访问令牌

Token不正确,请重新输入