RxJS 真·简介

本文来自RxJS文档的Overview

RxJS是Reactive系列的JS版本。它有着下面一些概念。对它们有所理解将让你能得心应手地使用RxJS。

Observable

  • Observable类似函数定义(回调),Observer类似调用函数
  • Observable可以同步或是异步返回值
  • Observable在生命周期里可以返回多个值

Observable有创建(create)、订阅(subscribe)、执行(execute)、析构(dispose)四步。在订阅后,通过create方法创建的Observable体会立即得到执行(不论是同步或是异步内容),同时,传入create的回调函数中可以向Observer调用nexterrorcomplete方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // happens asynchronously
}, 1000);
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');

Observer

Observers只是有next,error,complete三个回调的对象,这三个回调分别用来处理Observable传递的三种不同的状态

1
2
3
4
5
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};

Subscription

Subscription表述了Observable的执行过程。但是主要提供unsubscribe()方法取消Observable执行

主要由Observablesubscribe方法创建,当然还有add()remove()方法组合subscription。

1
2
3
4
5
6
7
8
9
10
11
12
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);

Subject

Subject是一个广播的Observable(类似EventEmitter),它既是Observable又是Observer,既有next方法,又有处理next的回调。

Observable本质的不同是,

  • Subject注册多个回调,Observable指定一个回调
  • 回调触发时机上,Subject通过特定时机触发(即Subject.next),Observable在回调定义后立即触发(即subscribe后)
1
2
3
4
5
6
7
8
9
10
11
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);

Subject又可细分成BehaviorSubject, ReplaySubject, AsyncSubject

  • 使用refCount()替代connect()完成multicasted Observable的自动绑定

BehaviorSubject

存储了释放给消费者的最后一个值。在新消费者订阅时会自动下发。

1
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

ReplaySubject

存储之前释放给消费者的一组值。在新消费者订阅时会自动下发。

1
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

第二个参数描述数据的过期时间

AsyncSubject

只存储最后一次释放的值,并complete状态后下发给消费者

Operator

RxJS的核心概念,读入一个Observable返回一个全新的Observable,是纯函数

Operator分为两类:

  • instance operator 用来对已有Operator链式调用进行改造,是纯函数,如.map()
  • static operator 用来从JS原始值中构造Observable,如.of().from()

RxJS提供的Operator非常多,以至于文档写了个小程序帮助你选择你想要的Operator。借助宝石图(marble diagram),可以更好理解各operator。

Scheduler

Scheduler允许你定义Observable发布消息给Observer的执行环境,具体来说如存储tasks,执行任务的时机和顺序,同步/异步等。

选择上,有Rx.Scheduler.queue(当前事件帧), Rx.Scheduler.asap(microtasks queue), Rx.Scheduler.async(setInterval)。static operator通常使用Scheduler,通过observeOnsubscribeOn两种方法指定。instance operator可以使用一个Scheduler。

现在可以跟着Tutorial使用起RxJS了。Enjoy~