导航菜单

  • 0.api
  • 0.Async
  • 0.module
  • 1.ES2015
  • 2.Promise
  • 3.Node
  • 4.NodeInstall
  • 5.REPL
  • 6.NodeCore
  • 7.module&NPM
  • 8.Encoding
  • 9.Buffer
  • 10.fs
  • 11.Stream-1
  • 11.Stream-2
  • 11.Stream-3
  • 11.Stream-4
  • 12-Network-2
  • 12.NetWork-3
  • 12.Network-1
  • 13.tcp
  • 14.http-1
  • 14.http-2
  • 15.compress
  • 16.crypto
  • 17.process
  • 18.yargs
  • 19.cache
  • 20.action
  • 21.https
  • 22.cookie
  • 23.session
  • 24.express-1
  • 24.express-2
  • 24.express-3
  • 24.express-4
  • 25.koa-1
  • 26.webpack-1-basic
  • 26.webpack-2-optimize
  • 26.webpack-3-file
  • 26.webpack-4.tapable
  • 26.webpack-5-AST
  • 26.webpack-6-sources
  • 26.webpack-7-loader
  • 26.webpack-8-plugin
  • 26.webpack-9-hand
  • 26.webpack-10-prepare
  • 28.redux
  • 28.redux-jwt-back
  • 28.redux-jwt-front
  • 29.mongodb-1
  • 29.mongodb-2
  • 29.mongodb-3
  • 29.mongodb-4
  • 29.mongodb-5
  • 29.mongodb-6
  • 30.cms-1-mysql
  • 30.cms-2-mysql
  • 30.cms-3-mysql
  • 30.cms-4-nunjucks
  • 30.cms-5-mock
  • 30.cms-6-egg
  • 30.cms-7-api
  • 30.cms-8-roadhog
  • 30.cms-9-yaml
  • 30.cms-10-umi
  • 30.cms-12-dva
  • 30.cms-13-dva-ant
  • 30.cms-14-front
  • 30.cms-15-deploy
  • 31.dva
  • 31.cms-13-dva-antdesign
  • 33.redis
  • 34.unittest
  • 35.jwt
  • 36.websocket-1
  • 36.websocket-2
  • 38.chat-api-1
  • 38.chat-api-2
  • 38.chat-3
  • 38.chat-api-3
  • 38.chat
  • 38.chat2
  • 38.chat2
  • 39.crawl-0
  • 39.crawl-1
  • 39.crawl-2
  • 40.deploy
  • 41.safe
  • 42.test
  • 43.nginx
  • 44.enzyme
  • 45.docker
  • 46.elastic
  • 47.oauth
  • 48.wxpay
  • index
  • 52.UML
  • 53.design
  • index
  • 54.linux
  • 57.ts
  • 56.react-ssr
  • 58.ts_react
  • 59.ketang
  • 59.ketang2
  • 61.1.devops-linux
  • 61.2.devops-vi
  • 61.3.devops-user
  • 61.4.devops-auth
  • 61.5.devops-shell
  • 61.6.devops-install
  • 61.7.devops-system
  • 61.8.devops-service
  • 61.9.devops-network
  • 61.10.devops-nginx
  • 61.11.devops-docker
  • 61.12.devops-jekins
  • 61.13.devops-groovy
  • 61.14.devops-php
  • 61.15.devops-java
  • 61.16.devops-node
  • 61.17.devops-k8s
  • 62.1.react-basic
  • 62.2.react-state
  • 62.3.react-high
  • 62.4.react-optimize
  • 62.5.react-hooks
  • 62.6.react-immutable
  • 62.7.react-mobx
  • 62.8.react-source
  • 63.1.redux
  • 63.2.redux-middleware
  • 63.3.redux-hooks
  • 63.4.redux-saga
  • 63.5.redux-saga-hand
  • 64.1.router
  • 64.2.router-connected
  • 65.1.typescript
  • 65.2.typescript
  • 65.3.typescript
  • 65.4.antd
  • 65.4.definition
  • 66-1.vue-base
  • 66-2.vue-component
  • 66-3.vue-cli3.0
  • 66-4.$message组件
  • 66-5.Form组件
  • 66-6.tree
  • 66-7.vue-router-apply
  • 66-8.axios-apply
  • 66-9.vuex-apply
  • 66-10.jwt-vue
  • 66-11.vue-ssr
  • 66-12.nuxt-apply
  • 66-13.pwa
  • 66-14.vue单元测试
  • 66-15.权限校验
  • 67-1-network
  • 68-2-wireshark
  • 7.npm2
  • 69-hooks
  • 70-deploy
  • 71-hmr
  • 72.deploy
  • 73.import
  • 74.mobile
  • 75.webpack-1.文件分析
  • 75.webpack-2.loader
  • 75.webpack-3.源码流程
  • 75.webpack-4.tapable
  • 75.webpack-5.prepare
  • 75.webpack-6.resolve
  • 75.webpack-7.loader
  • 75.webpack-8.module
  • 75.webpack-9.chunk
  • 75.webpack-10.asset
  • 75.webpack-11.实现
  • 76.react_optimize
  • 77.ts_ketang_back
  • 77.ts_ketang_front
  • 78.vue-domdiff
  • 79.grammar
  • 80.tree
  • 81.axios
  • 82.1.react
  • 82.2.react-high
  • 82.3.react-router
  • 82.4.redux
  • 82.5.redux_middleware
  • 82.6.connected
  • 82.7.saga
  • 82.8.dva
  • 82.8.dva-source
  • 82.9.roadhog
  • 82.10.umi
  • 82.11.antdesign
  • 82.12.ketang-front
  • 82.12.ketang-back
  • 83.upload
  • 84.graphql
  • 85.antpro
  • 86.1.uml
  • 86.2.design
  • 87.postcss
  • 88.react16-1
  • 89.nextjs
  • 90.react-test
  • 91.react-ts
  • 92.rbac
  • 93.tsnode
  • 94.1.JavaScript
  • 94.2.JavaScript
  • 94.3.MODULE
  • 94.4.EventLoop
  • 94.5.文件上传
  • 94.6.https
  • 94.7. nginx
  • 95.1. react
  • 95.2.react
  • 96.1.react16
  • 96.2.fiber
  • 96.3.fiber
  • 97.serverless
  • 98.websocket
  • 100.1.react-basic
  • 101.1.monitor
  • 101.2.monitor
  • 102.java
  • 103.1.webpack-usage
  • 103.2.webpack-bundle
  • 103.3.webpack-ast
  • 103.4.webpack-flow
  • 103.5.webpack-loader
  • 103.6.webpack-tapable
  • 103.7.webpack-plugin
  • 103.8.webpack-optimize1
  • 103.9.webpack-optimize2
  • 103.10.webpack-hand
  • 103.11.webpack-hmr
  • 103.11.webpack5
  • 103.13.splitChunks
  • 103.14.webpack-sourcemap
  • 103.15.webpack-compiler1
  • 103.15.webpack-compiler2
  • 103.16.rollup.1
  • 103.16.rollup.2
  • 103.16.rollup.3
  • 103.16.vite.basic
  • 103.16.vite.source
  • 103.16.vite.plugin
  • 103.16.vite.1
  • 103.16.vite.2
  • 103.17.polyfill
  • 104.1.binary
  • 104.2.binary
  • 105.skeleton
  • 106.1.react
  • 106.2.react_hooks
  • 106.3.react_router
  • 106.4.redux
  • 106.5.redux_middleware
  • 106.6.connected-react-router
  • 106.6.redux-first-history
  • 106.7.redux-saga
  • 106.8.dva
  • 106.9.umi
  • 106.10.ketang
  • 106.11.antdesign
  • 106.12.antpro
  • 106.13.router-6
  • 106.14.ssr
  • 106.15.nextjs
  • 106.16.1.cms
  • 106.16.2.cms
  • 106.16.3.cms
  • 106.16.4.cms
  • 106.16.mobx
  • 106.17.fomily
  • 107.fiber
  • 108.http
  • 109.1.webpack_usage
  • 109.2.webpack_source
  • 109.3.dll
  • 110.nest.js
  • 111.xstate
  • 112.Form
  • 113.redux-saga
  • 114.react+typescript
  • 115.immer
  • 116.pro5
  • 117.css-loader
  • 118.1.umi-core
  • 119.2.module-federation
  • 119.1.module-federation
  • 120.create-react-app
  • 121.react-scripts
  • 122.react-optimize
  • 123.jsx-runtime
  • 124.next.js
  • 125.1.linux
  • 125.2.linux-vi
  • 125.3.linux-user
  • 125.4.linux-auth
  • 125.5.linux-shell
  • 125.6.linux-install
  • 125.7.linux-system
  • 125.8.linux-service
  • 125.9.linux-network
  • 125.10.nginx
  • 125.11.docker
  • 125.12.ci
  • 125.13.k8s
  • 125.14.k8s
  • 125.15.k8s
  • 125.16.k8s
  • 126.11.react-1
  • 126.12.react-2
  • 126.12.react-3
  • 126.12.react-4
  • 126.12.react-5
  • 126.12.react-6
  • 126.12.react-7
  • 126.12.react-8
  • 127.frontend
  • 128.rollup
  • 129.px2rem-loader
  • 130.health
  • 131.hooks
  • 132.keepalive
  • 133.vue-cli
  • 134.react18
  • 134.2.react18
  • 134.3.react18
  • 135.function
  • 136.toolkit
  • 137.lerna
  • 138.create-vite
  • 139.cli
  • 140.antd
  • 141.react-dnd
  • 142.1.link
  • 143.1.gulp
  • 143.2.stream
  • 143.3.gulp
  • 144.1.closure
  • 144.2.v8
  • 144.3.gc
  • 145.react-router-v6
  • 146.browser
  • 147.lighthouse
  • 148.1.basic
  • 148.2.basic
  • 148.3.basic
  • 148.4.basic
  • 148.5.basic
  • 149.1.vite
  • 149.2.vite
  • 149.3.vite
  • 149.4.vite
  • 150.react-window
  • 151.react-query
  • 152.useRequest
  • 153.transition
  • 154.emotion
  • 155.1.formily
  • 155.2.formily
  • 155.3.formily
  • 155.3.1.mobx.usage
  • 155.3.2.mobx.source
  • 156.vue-loader
  • 103.11.mf
  • 157.1.react18
  • 158.umi4
  • 159.rxjs
  • 159.rxjs2
  • 160.bff
  • 161.zustand
  • 162.vscode
  • 163.emp
  • 164.cors
  • 1.RxJS
    • 1.1 RxJS 介绍
    • 1.2 基本概念
    • 1.3 参考链接
  • 2.Observable
    • 2.1 Pull 和 Push
    • 2.2 Stream(流)
    • 2.3 Observable
    • 2.4 src\index.js
    • 2.5 rxjs\index.js
    • 2.6 Observable.js
    • 2.7 Subscriber.js
    • 2.8 isFunction.js
  • 3. of
    • 3.1 src\index.js
    • 3.2 rxjs\index.js
    • 2.3 of.js
    • 3.4 from.js
    • 3.5 innerFrom.js
    • 3.6 isArrayLike.js
    • 3.7 isPromise.js
  • 4. fromEvent
    • 4.1 src\index.js
    • 4.2 rxjs\index.js
    • 4.3 fromEvent.js
    • 4.4 Subscriber.js
    • 4.5 Subscription.js
    • 4.6 Observable.js
  • 5. map&filter
    • 5.1 Operators
    • 5.2 src\index.js
    • 5.3 rxjs\index.js
    • 5.4 Observable.js
    • 5.5 map.js
    • 5.6 filter.js
  • 6. pipe
    • 6.1 src\index.js
    • 6.2 Observable.js
    • 6.3 pipe.js
    • 6.4 identity.js
  • 7.asyncScheduler
    • 7.1 src\index.js
    • 7.2 rxjs\index.js
    • 7.3 Scheduler.js
    • 7.4 AsyncAction.js
    • 7.5 async.js
  • 8.timer
    • 8.1 src\index.js
    • 8.2 rxjs\index.js
    • 8.3 timer.js
  • 9.interval
    • 9.1 src\index.js
    • 9.2 rxjs\index.js
    • 9.3 interval.js
    • 9.4 timer.js
  • 10.take
    • 10.1 src\index.js
    • 10.2 rxjs\index.js
    • 10.3 take.js
  • 11.Subject
    • 11.1 Cold Observable 和 Hot Observable
    • 11.1.1 Cold Observable
    • 11.1.2 Hot Observable
    • 11.1 src\index.js
    • 11.2 rxjs\index.js
    • 11.3 Subject.js

1.RxJS #

1.1 RxJS 介绍 #

  • RxJS 是一个使用可观察序列组合异步和基于事件的程序的库
  • 它提供了一种核心类型,即 Observable,以及卫星类型(Observer,Schedulers,Subjects)和操作符,这些操作符受到 Array 方法(map,filter,reduce,every 等)的启发,可以将异步事件处理为集合
  • ReactiveX 结合了观察者模式和迭代器模式,并将函数式编程与集合相结合,以满足对理想管理事件序列的需求

1.2 基本概念 #

  • Observable 表示可调用的未来值或事件的集合的想法
  • Observer 是一组回调,知道如何监听 Observable 传递的值
  • Subscription 表示 Observable 的执行,主要用于取消执行
  • Operators 是纯函数,使用操作(如 map、filter、concat、reduce 等)处理集合时具有函数式编程风格
  • Subject 等同于 EventEmitter,是将值或事件多播到多个 Observer 的唯一方法
  • Schedulers 是集中式调度程序,用于控制并发,允许我们协调计算发生在例如 setTimeout、requestAnimationFrame 或其他位置的时间

1.3 参考链接 #

  • 官方文档
  • 入门指南
  • 例子
  • 常见问题
  • 中文文档
  • github 源码
  • rxjs 弹珠图
  • rxjs 可视化
  • explorer

2.Observable #

  • Observables(可观察对象)是懒惰的多个值的 Push 集合,可观察对象是一种异步数据流,它可以在将来推送多个值。它们是惰性的,因为它们不会立即开始发送值,直到有人订阅它们

2.1 Pull 和 Push #

  • Pull 和 Push 是两种不同的协议,用于描述数据生产者如何与数据消费者进行通信
  • 在 Pull 系统中,消费者决定何时从数据生产者接收数据。生产者本身并不知道何时将数据传递给消费者
  • 每个JavaScript函数都是Pull系统。函数是数据的生产者,而调用函数的代码通过拉出单个返回值从其调用中消费它
function* generator() {
  yield 1
  yield 2
  yield 3
}

const iterator = generator()
console.log(iterator.next().value) // 1
console.log(iterator.next().value) // 2
console.log(iterator.next().value) // 3
  • 在Push系统中,生产者决定何时将数据发送给消费者。消费者不知道何时会收到该数据
  • Promise是 JavaScript 中最常见的推送系统。Promise向已注册的回调函数(Consumers)提供已解析的值
  • RxJS引入了可观察对象,这是一种新的 JavaScript 推送系统。可观察对象是多个值的生产者,将它们推送到观察者(Consumers)
button.addEventListener("click", function () {
  console.log("Button was clicked!")
})
生产者 消费者
Pull 被动:在请求时生成数据 主动:决定何时请求数据
Push 主动:以自己的速度生成数据 被动:对收到的数据做出反应

2.2 Stream(流) #

  • 流是随着时间变化的值序列

2.3 Observable #

  • Observer(观察者) 是由可观察对象传递的值的消费者。观察者仅仅是一组回调,每种类型的通知由可观察对象传递:next,error 和 complete
  • 要使用Observer(观察者),请将其提供给可观察对象的subscribe
  • 观察者只是带有三个回调的对象,每种类型的通知都有一个回调,可观察对象可能传递这些通知
  • RxJS中的观察者也可能是部分可选的的。如果不提供其中一个回调,可观察对象的执行仍然会正常进行,但是某些类型的通知将被忽略,因为观察者中没有相应的回调
  • 在订阅可观察对象时,您也可以将next回调作为参数提供,而不必附加到观察者对象上,在 observable.subscribe 内部,它将使用回调参数作为next处理程序创建观察者对象
  • 调用或订阅是一个隔离的操作:两次函数调用会触发两个单独的副作用,两次可观察对象订阅会触发两个单独的副作用。与事件发射器(EventEmitters)不同,事件发射器共享副作用并且无论是否存在订阅者都有急切执行,而可观察对象没有共享的执行并且是懒惰的
  • Observables 可以使用 new Observable 或创建操作符创建,使用观察者订阅,执行以向观察者发送 next / error / complete 通知,并且可以对其执行进行处理
  • Observable 的核心关注点
    • 创建 Observables
    • 订阅 Observables
    • 执行 Observables
    • 处理 Observables

2.4 src\index.js #

src\index.js

import { Observable } from "./rxjs"
const observable = new Observable((subscriber) => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  subscriber.complete()
})
observable.subscribe({
  next: (value) => console.log("next value:", value),
  complete: () => {
    console.log("complete")
  },
})
observable.subscribe((value) => console.log("next value:", value))

2.5 rxjs\index.js #

src\rxjs\index.js

export { Observable } from "./internal/Observable"

2.6 Observable.js #

src\rxjs\internal\Observable.js

import { Subscriber } from "./Subscriber"

export class Observable {
  constructor(subscribe) {
    if (subscribe) {
      this._subscribe = subscribe
    }
  }
  subscribe(observerOrNext) {
    const subscriber = new Subscriber(observerOrNext)
    this._subscribe(subscriber)
    return subscriber
  }
}

2.7 Subscriber.js #

src\rxjs\internal\Subscriber.js

import { isFunction } from "./util/isFunction"
export class Subscriber {
  isStopped = false
  constructor(observerOrNext) {
    let observer
    if (isFunction(observerOrNext)) {
      observer = {
        next: observerOrNext,
      }
    } else {
      observer = observerOrNext
    }
    this.destination = observer
  }
  next(value) {
    if (!this.isStopped) {
      this.destination.next(value)
    }
  }
  complete() {
    if (!this.isStopped) {
      this.isStopped = true
      this.destination.complete?.()
    }
  }
}

2.8 isFunction.js #

src\rxjs\internal\util\isFunction.js

export function isFunction(value) {
  return typeof value === "function"
}

3. of #

  • RxJS 的 of 操作符允许你创建一个 Observable,它发出一组项目,然后完成
  • 你可以使用它来将任何值发送到一个 Observable 中,例如,你可以使用它来将一个数字数组转换为 Observable
  • of 操作符是同步的,意味着它会立即发出所有的值,并立即完成,如果你需要异步发出值,你可以使用 from 操作符
  • RxJS 的 from 操作符允许你将多种不同的数据类型转换为 Observable,包括数组、类数组对象(如 arguments 对象)、迭代器和可观察对象
  • from 操作符是异步的,意味着它会在内部使用内置的调度

3.1 src\index.js #

src\index.js

import { of, from } from "./rxjs"
const arrayLike = of(1, 2, 3)
arrayLike.subscribe({
  next: (value) => console.log(`arrayLike:`, value),
  complete: () => console.log("arrayLike done"),
})

const promiseLike = from(Promise.resolve(4))

promiseLike.subscribe({
  next: (value) => console.log(`promiseLike:`, value),
  complete: () => console.log("promiseLike done"),
})

3.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
+export { of } from './internal/observable/of';
+export { from } from './internal/observable/from';

2.3 of.js #

src\rxjs\internal\observable\of.js

import { from } from "./from"
export function of(...args) {
  return from(args)
}

3.4 from.js #

src\rxjs\internal\observable\from.js

import { innerFrom } from "./innerFrom"
export function from(input) {
  return innerFrom(input)
}

3.5 innerFrom.js #

src\rxjs\internal\observable\innerFrom.js

import { isArrayLike } from "../util/isArrayLike"
import { isPromise } from "../util/isPromise"
import { Observable } from "../Observable"
export function innerFrom(input) {
  if (input instanceof Observable) {
    return input
  }
  if (input != null) {
    if (isArrayLike(input)) {
      return fromArrayLike(input)
    }
    if (isPromise(input)) {
      return fromPromise(input)
    }
  }
}
export function fromArrayLike(array) {
  return new Observable((subscriber) => {
    for (let i = 0; i < array.length; i++) {
      subscriber.next(array[i])
    }
    subscriber.complete()
  })
}
export function fromPromise(promise) {
  return new Observable((subscriber) => {
    promise.then((value) => {
      subscriber.next(value)
      subscriber.complete()
    })
  })
}

3.6 isArrayLike.js #

src\rxjs\internal\util\isArrayLike.js

export const isArrayLike = (x) =>
  x && typeof x.length === "number" && typeof x !== "function"

3.7 isPromise.js #

src\rxjs\internal\util\isPromise.js

import { isFunction } from "./isFunction"
export function isPromise(value) {
  return isFunction(value?.then)
}

4. fromEvent #

  • RxJS 的 fromEvent 函数允许你将浏览器事件转换为 Observable。它接受两个参数:
    • 第一个参数是事件目标,例如 DOM 元素或window对象
    • 第二个参数是事件名称,例如click或scroll

4.1 src\index.js #

src\index.js

import { fromEvent } from "./rxjs"
const source = fromEvent(document, "click")
const subscriber = source.subscribe(console.log)
setTimeout(() => {
  subscriber.unsubscribe()
}, 1000)

4.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
+export { fromEvent } from './internal/observable/fromEvent';

4.3 fromEvent.js #

src\rxjs\internal\observable\fromEvent.js

import { Observable } from "../Observable"
export function fromEvent(target, eventName) {
  return new Observable((subscriber) => {
    const handler = (...args) => subscriber.next(...args)
    target.addEventListener(eventName, handler)
    return () => target.removeEventListener(eventName, handler)
  })
}

4.4 Subscriber.js #

src\rxjs\internal\Subscriber.js

import { isFunction } from './util/isFunction';
+import { Subscription } from './Subscription';
+export class Subscriber extends Subscription {
    isStopped = false;
    constructor(observerOrNext) {
        super();
        let observer;
        if (isFunction(observerOrNext)) {
            observer = {
                next: observerOrNext
            };
        } else {
            observer = observerOrNext;
        }
        this.destination = observer;
    }
    next(value) {
        if (!this.isStopped) {
            this.destination.next(value);
        }
    }
    complete() {
        if (!this.isStopped) {
            this.isStopped = true;
            this.destination.complete?.();
        }
    }
}

4.5 Subscription.js #

src\rxjs\internal\Subscription.js

export class Subscription {
  _finalizers = []
  unsubscribe() {
    const { _finalizers } = this
    if (_finalizers) {
      for (const finalizer of _finalizers) {
        finalizer()
      }
    }
  }
  add(teardown) {
    this._finalizers.push(teardown)
  }
}

4.6 Observable.js #

src\rxjs\internal\Observable.js

import { Subscriber } from './Subscriber';
export class Observable {
    constructor(subscribe) {
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }
    subscribe(observerOrNext) {
        const subscriber = new Subscriber(observerOrNext);
+       const teardown = this._subscribe(subscriber)
+       subscriber.add(teardown)
        return subscriber;
    }
}

5. map&filter #

  • map操作符允许你对Observable中的每个值进行转换,并返回一个新的 Observable。它接受一个函数作为参数,该函数定义如何转换每个值。
  • filter操作符允许你选择性地过滤 Observable 中的值。它接受一个函数作为参数,该函数定义如何过滤值

5.1 Operators #

  • 在 Rx 中Observable,控制流的状态,是它的基石,但最有用的是它的operator,operator允许复杂的异步代码以声明的方式进行轻松组合的基础单元。 operator主要作用是操作、组合流中的数据
  • 操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变
  • 操作符本质上是一个纯函数 (pure function),它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出。订阅输出 Observable 同样会订阅输入 Observable
  • 操作符类型
    • Creation Operators 创建操作符,它们用于创建新的 Observable。这些操作符可以从各种不同的数据源(如数组、对象、Promise 等)创建 Observable,并可以控制 Observable 的行为(如发出值的频率、顺序等),像of、from、timer、interval和fromEvent等
    • Transformation Operators 转换操作符是 RxJS 中的一类特殊的操作符,它们用于将输入 Observable 转换为新的输出 Observable。这些操作符可以对输入 Observable 中的值进行转换、过滤、合并等操作,以便在输出 Observable 中呈现出所需的信息。像map等
    • Combination Operators 组合操作符用于将多个 Observable 合并成一个新的 Observable。这些操作符可以帮助你创建复杂的数据流,并控制它们之间的关系,像merge和concat等
    • Filtering Operators 过滤操作符用于过滤输入 Observable 中的值,只返回符合特定条件的值。这些操作符可以帮助你创建精确的数据流,并且非常实用。像filter等
    • Multicasting Operators 多播操作符用于将单个Observable共享给多个观察者(Observer)。这些操作符可以帮助你控制 Observable 的行为,并有效地利用资源。像share等
  • rxmarbles

5.2 src\index.js #

src\index.js

import { of, map, filter } from "./rxjs"
const subscriber = of(1, 2, 3)
  .pipe(map((val) => val * 2)) // [2,4,6]
  .pipe(filter((val) => val > 3)) //[4,6]
  .pipe(map((data) => data + 1)) //[5,7]
subscriber.subscribe(console.log)

5.3 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
+export { filter } from './internal/operators/filter';
+export { map } from './internal/operators/map';

5.4 Observable.js #

src\rxjs\internal\Observable.js

import { Subscriber } from './Subscriber';
export class Observable {
    constructor(subscribe) {
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }
    subscribe(observerOrNext) {
        const subscriber = new Subscriber(observerOrNext);
        const teardown = this._subscribe(subscriber)
        subscriber.add(teardown)
        return subscriber;
    }
+   pipe(operation) {
+       return operation(this);
+   }
}

5.5 map.js #

src\rxjs\internal\operators\map.js

import { Observable } from "../Observable"
export function map(project) {
  return (source) => {
    const observable = new Observable(function (subscriber) {
      return source.subscribe({
        // 从 subscriber 订阅对象 中获取 观察者 对象
        ...subscriber.destination,
        next: (value) => {
          subscriber.next(project(value))
        },
      })
    })
    return observable
  }
}

5.6 filter.js #

src\rxjs\internal\operators\filter.js

import { Observable } from "../Observable"
export function filter(predicate) {
  return (source) => {
    const observable = new Observable(function (subscriber) {
      return source.subscribe({
        // 从 subscriber 订阅对象 中获取 观察者 对象
        ...subscriber.destination,
        next: (value) => {
          predicate(value) && subscriber.next(value)
        },
      })
    })
    return observable
  }
}

6. pipe #

  • Observable 对象有一个名为 pipe 的方法,允许你将多个操作符链接在一起。它可以让你在单个表达式中执行复杂的数据处理流程。

6.1 src\index.js #

src\index.js

import { of, map, filter } from "./rxjs"
const subscriber = of(1, 2, 3).pipe(
  map((val) => val * 2),
  filter((val) => val > 3),
  map((data) => data + 1)
)
subscriber.subscribe(console.log)

6.2 Observable.js #

src\rxjs\internal\Observable.js

import { Subscriber } from './Subscriber';
+import { pipeFromArray } from './util/pipe';
export class Observable {
    constructor(subscribe) {
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }
    subscribe(observerOrNext) {
        const subscriber = new Subscriber(observerOrNext);
        const teardown = this._subscribe(subscriber)
        subscriber.add(teardown)
        return subscriber;
    }
+   pipe(...operations) {
+       return pipeFromArray(operations)(this);
+   }
}

6.3 pipe.js #

src\rxjs\internal\util\pipe.js

import { identity } from "./identity"
export function pipeFromArray(fns) {
  if (fns.length === 0) {
    return identity
  }
  if (fns.length === 1) {
    return fns[0]
  }
  return function piped(input) {
    return fns.reduce((prev, fn) => fn(prev), input)
  }
}

6.4 identity.js #

src\rxjs\internal\util\identity.js

export function identity(x) {
  return x
}

7.asyncScheduler #

  • 使用 setTimeout(task,duration)调度任务
  • async 调度器通过将任务放在 JavaScript 事件循环队列中异步地调度任务。它最适用于延迟任务的执行或定期执行任务

7.1 src\index.js #

src\index.js

import { asyncScheduler } from "./rxjs"
function task(state) {
  console.log("state: ", state)
  if (state < 5) {
    this.schedule(state + 1, 1000)
  }
}
asyncScheduler.schedule(task, 1000, 0)

7.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
+export { asyncScheduler } from './internal/scheduler/async';

7.3 Scheduler.js #

src\rxjs\internal\Scheduler.js

export class Scheduler {
  constructor(schedulerActionCtor) {
    this.schedulerActionCtor = schedulerActionCtor
  }
  schedule(work, delay = 0, state) {
    return new this.schedulerActionCtor(work).schedule(state, delay)
  }
}

7.4 AsyncAction.js #

src\rxjs\internal\scheduler\AsyncAction.js

export class AsyncAction {
  pending = false
  constructor(work) {
    this.work = work
  }
  schedule(state, delay = 0) {
    this.state = state
    this.delay = delay
    if (this.id != null) {
      this.id = this.recycleAsyncId(this.id)
    }
    this.pending = true
    this.id = this.requestAsyncId(delay)
    return this
  }
  requestAsyncId(delay = 0) {
    return setInterval(this.execute.bind(this), delay)
  }
  execute() {
    this.pending = false
    this.work(this.state)
    if (this.pending === false && this.id !== null) {
      this.id = this.recycleAsyncId(this.id)
    }
  }
  recycleAsyncId(id) {
    if (id !== null) {
      clearInterval(id)
    }
    return null
  }
}

7.5 async.js #

src\rxjs\internal\scheduler\async.js

import { AsyncAction } from "./AsyncAction"
import { Scheduler } from "../Scheduler"
export const asyncScheduler = new Scheduler(AsyncAction)

8.timer #

  • timer 函数是一个工厂函数,可以创建一个发出数字的 Observable,每个数字增加1。它接受两个参数:起始值和间隔时间

8.1 src\index.js #

src\index.js

import { timer } from "./rxjs"
timer(1000).subscribe(() => console.log("timer"))

8.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
+export { timer } from './internal/observable/timer';

8.3 timer.js #

src\rxjs\internal\observable\timer.js

import { Observable } from "../Observable"
import { asyncScheduler } from "../scheduler/async"
export function timer(dueTime = 0, scheduler = asyncScheduler) {
  return new Observable((subscriber) => {
    let n = 0
    return scheduler.schedule(function () {
      subscriber.next(n++)
    }, dueTime)
  })
}

9.interval #

  • interval 函数是一个工厂函数,可以创建一个发出数字的 Observable,每个数字增加 1。它接受一个间隔时间参数,表示每次发送之间的时间间隔
  • interval 函数会一直发送数字,直到你取消订阅。你可以使用 take 操作符限制发送的数字数量

9.1 src\index.js #

src\index.js

import { interval } from "./rxjs"
interval(1000).subscribe((v) => console.log(v))

9.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
+export { interval } from './internal/observable/interval';

9.3 interval.js #

src\rxjs\internal\observable\interval.js

import { asyncScheduler } from "../scheduler/async"
import { timer } from "./timer"
export function interval(period = 0, scheduler = asyncScheduler) {
  return timer(period, period, scheduler)
}

9.4 timer.js #

src\rxjs\internal\observable\timer.js

import { Observable } from '../Observable';
import { asyncScheduler } from '../scheduler/async';
+export function timer(dueTime = 0, interval, scheduler = asyncScheduler) {
  return new Observable(subscriber => {
    let n = 0;
    return scheduler.schedule(function () {
      subscriber.next(n++);
+    if (interval >= 0) {
+      this.schedule(undefined, interval);
+    } else {
+      subscriber.complete();
+    }
    }, dueTime);
  });
}

10.take #

  • take 操作符会从 Observable 中取出前 N 个值,然后完成。它是一个过滤操作符,可以用来限制 Observable 发送的值的数量
  • take 操作符会在 Observable 发送完 N 个值之后立即完成,因此你不需要使用 unsubscribe 方法取消订阅

10.1 src\index.js #

src\index.js

import { interval, take } from "./rxjs"
interval(500).pipe(take(3)).subscribe(console.log)
// 0
// 1
// 2

10.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
export { interval } from './internal/observable/interval';
+export { take } from './internal/operators/take';

10.3 take.js #

src\rxjs\internal\operators\take.js

import { Observable } from "../Observable"
export function take(count) {
  return (source) => {
    let seen = 0
    const observable = new Observable(function (subscriber) {
      return source.subscribe({
        ...subscriber,
        next: (value) => {
          seen++
          if (seen <= count) {
            subscriber.next(value)
            if (seen >= count) {
              subscriber.complete()
            }
          }
        },
      })
    })
    return observable
  }
}

11.Subject #

  • Subject 是 Observable 的一种特殊类型,它允许将值多播给许多观察者。Subject 就像 EventEmitter
  • 每个 Subject 都是一个 Observable 和一个 Observer。您可以订阅 Subject,并且还可以调用 next 来提供值,以及 error 和 complete
  • 简单来说,Subject 是一种特殊的 Observable,它既可以订阅数据流,也可以向数据流中提交数据。Subject 还具有 Observer 的特性,即可以调用 next、error 和 complete 方法

11.1 Cold Observable 和 Hot Observable #

  • Hot Observable 和 Cold Observable是指两种不同类型的 Observable,它们在执行时的行为有所不同
  • Cold Observable 是一种会在每个观察者订阅时重新开始发出数据的 Observable。每个观察者都有自己的数据流,即使多个观察者订阅同一个 Cold Observable,它们也会收到完全独立的数据流。例如,当你订阅一个 Cold Observable 时,它会从头开始发出数据,不会丢失任何信息
  • Hot Observable是一种在发出数据时无论是否有观察者订阅都会继续发出数据的Observable。每个观察者都会收到相同的数据流,并且会收到所有之前发出的数据。例如,当你订阅一个Hot Observable时,它可能会丢失一些信息,因为它在你订阅之前就已经开始发出数据了
  • 总的来说,Cold Observable 适用于那些需要每个观察者都收到完整数据流的场景,而 Hot Observable 适用于那些数据流是连续不断的,不需要每个观察者都收到完整数据流的场景

11.1.1 Cold Observable #

  • 推送值的生产者producer来自Observable内部。将会推送什么样的值在Observable创建时被定义下来,不会改变
  • producer与observer是一对一的关系,即是 unicast (单播)的
  • 当有observer订阅时,producer会把预先定义好的若干值依次推送给每个observer
  • Cold Observable每次订阅后就只会有一个观察者, 下一个观察者要进行订阅时是一次新的数据流程,因此Cold Observable与observer是一对一关系
  • 数据流的operators基本上都是属于Cold Observable
import { Observable } from "./rxjs"
const source = new Observable((subscriber) => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  subscriber.complete()
})

source.subscribe((data) => console.log(`subscriberA: ${data}`))
// 1, 2, 3
source.subscribe((data) => console.log(`subscriberB: ${data}`))
// 1, 2, 3

11.1.2 Hot Observable #

    • 推送值的producer来自observable外部,何时推送以及推送什么样的值在创建时都是未知的。producer与observer是一对多的关系,即multicast (多播)的
  • 每当有observer订阅时,会将observer注册到观察者列表中
  • 当外部的producer被触发或执行时,会将值同时推送给所有的observer

import { Subject } from "./rxjs"
const source = new Subject()
source.subscribe({ next: (data) => console.log(`Subject 第一次订阅: ${data}`) })
source.next(1)
source.next(2)
source.subscribe({ next: (data) => console.log(`Subject 第二次订阅: ${data}`) })
source.next(3)
source.next(4)

11.1 src\index.js #

src\index.js

import { Subject } from "./rxjs"
const subject = new Subject()

subject.subscribe({ next: (data) => console.log("observerA: ", data) })
subject.subscribe({ next: (data) => console.log("observerB: ", data) })

subject.next(1)
subject.next(2)

// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

11.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
export { interval } from './internal/observable/interval';
export { take } from './internal/operators/take';
+export { Subject } from './internal/Subject';

11.3 Subject.js #

src\rxjs\internal\Subject.js

import { Subscriber } from "./Subscriber"
export class Subject extends Subscriber {
  observers = []
  subscribe(subscriber) {
    const { observers } = this
    observers.push(subscriber)
  }
  next(value) {
    const copy = this.observers.slice()
    for (const observer of copy) {
      observer.next(value)
    }
  }
  complete() {
    const { observers } = this
    while (observers.length) {
      observers.shift().complete?.()
    }
  }
}

访问验证

请输入访问令牌

Token不正确,请重新输入