RxJS
RxJS世界中有⼀种特殊的对象,称为“流”(stream)
代表“流”的变量标⽰符,都是⽤$符号结尾,这是RxJS编程中普遍使⽤的风格,被称为“芬兰式命名法”(Finnish Notation)。
流对象中“流淌”的是数据,⽽通过subscribe函数可以添加函数对数据进⾏操作,上⾯的代码中,对holdTime$对象有两个subscribe调⽤,⼀个⽤来更新DOM,另⼀个⽤来调⽤API请求。
const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(), (mouseUpEvent, mouseDownEvent) => { return mouseUpEvent.timestamp- mouseDownEvent.timestamp; }); holdTime$.subscribe(ms => { document.querySelector('#hold-time').innerText = ms; }); holdTime$.flatMap(ms => Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/' + ms)) .map(e => e.response) .subscribe(res => { document.querySelector('#rank').innerText = '你超过了' + res.rank + '% 的用户'; });
但是在RxJS实现中,没有这样纠缠不清的变量,如果你仔细看,会发现所有的变量其实都没有“变”,赋值时是什么值,就会⼀直保持这些值。
在jQuery的实现中,我们的代码看起来就是⼀串指令的组合;在RxJS的代码中,代码是⼀个⼀个函数,每个函数只是对输⼊的参数做了响应,然后返回结果。
解决什么问题:保证代码质量,控制代码复杂度,保证代码的可维护性。
函数式编程
⾮常强调使⽤函数来解决问题的⼀种编程⽅式。
函数式编程对函数的使⽤有⼀些特殊的要求:
- 声明式(Declarative)
- 纯函数(Pure Function)
- 数据不可变性(Immutability)
JavaScript并不是纯粹的函数式编程语⾔,但是,通过应⽤⼀些编程规范,再借助⼀点⼯具的帮助,我们完全可以⽤JavaScript写出函数式的代码。RxJS就是辅助我们写出函数式代码的⼀种⼯具。
响应式编程(Reactive Programming)
Reactive Extension
也叫ReactiveX,或者简称Rx,指的是实践响应式编程的⼀套⼯具
Rx(包括RxJS)诞⽣的主要⽬的虽然是解决异步处理的问题,但并不表⽰Rx不适合同步的数据处理,实际上,使⽤RxJS之后⼤部分代码不需要关⼼⾃⼰是被同步执⾏还是异步执⾏,所以处理起来会更加简单
RxJS擅长处理异步操作,因为它对数据采⽤“推”的处理⽅式,当⼀个 数据产⽣的时候,被推送给对应的处理函数,这个处理函数不⽤关⼼数据 是同步产⽣的还是异步产⽣的,这样就把开发者从命令式异步处理的枷锁 中解放了出来。
RxJS中的数据流可能包含复杂的功能,但是可以分解成很多⼩的部分 来实现,实现某⼀个⼩功能的函数就是操作符。可以说,学习RxJS就是学习如何组合操作符来解决复杂问题。
Observable和Observer
Observable就是“可以被观察的对象”即“可被观察者”,⽽Observer就是“观察者”,连接两者的桥梁就是Observable对象的函数subscribe。RxJS中的数据流就是Observable对象。
安装
npm install rxjs yarn add ts-node yarn add typescript
配置package.json
"scripts": { "start": "ts-node ./index.ts" },
就可以用yarn start来执行代码了
开始操作
import {Observable} from 'rxjs' const onSubscribe = observer => { observer.next(1); observer.next(2); observer.next(3); } const source$ = new Observable(onSubscribe) const theObserver = { next: item => console.log(item) } source$.subscribe(theObserver)
首先就要创建要给Observable对象,需要一个函数作为参数,这个函数参数完全决定了Observable对象的⾏为,也是通过这个函数里面observer.next(1)
将数据推送给observer。而observer则必须含有一个next属性的函数,用来接受推送过来的数据。Observable完全控制着observer,也掌握着啥时候推送数据的主动权,observer只要被动的接受,处理数据就可以了,所以异步的时候,observer也不用关心数据何时产生。
source$叫做数据流对象。通过subscribe函数将theObserver和source$关联起来。观察者订阅了一个数据流?我这样理解。
值得注意的是theObserver并不完全等于onSubscribe中的observer,theObserver会被经过一层包装。
observer除了有next字段外,还有complete用来在obserable数据全部提供完毕的终结信号,还有一个error字段用来在出错的时候调用,值得注意的时候,rxjs中,终结状态只会有一个,要么error,要么complete,因此这两个函数只会有一个被调用。
observer的简单形式:
source$.subscribe( item => console.log(item), err => console.log(err), () => console.log('No More Data') );
subscribe的三个参数依次传next,error,complete三个函数就可以了,如果不关心error,就第二个参数给个null占位即可。
observer可以主动取消订阅数据,做法是返回了一个含有unsubscribe字段的对象:
import {Observable} from 'rxjs' const onSubscribe = observer => { let num = 1 const timer = setInterval(() => { console.log('onSubscribe', num) observer.next(num++); }, 1000) return { unsubscribe: () => { clearInterval(timer) } } } const source$ = new Observable(onSubscribe) const theObserver = { next: item => console.log(item) } const subcription = source$.subscribe(theObserver) setTimeout(() => { subcription.unsubscribe() }, 3000)
值得注意的是,退订操作只是不会再调用next了,数据其实还是在不断产生的,注释掉clearInterval(timer)
就能看到,虽然observer没有再收到新数据了,但是数据还是一直在生产中,定时器一直在运转。
操作符
操作符就是数据到observer之前对数据做处理用的。
import { Observable } from "rxjs"; import { map } from "rxjs/operators"; const onSubscribe = observer => { observer.next(1); observer.next(2); observer.next(3); }; const source$ = Observable.create(onSubscribe); source$.pipe(map((x: number) => x * x)).subscribe(console.log);
用Observable的create来创建Observable对象,然后pipe中用map来处理数据,然后再交给observer。这里的map跟js中的map稍微不同,rxjs的map会产生一个新的Observable对象,这样也不会对上游的Observable产生影响,符合函数式编程的数据不可变要求。通过下面代码可以更清楚。
const source$ = Observable.create(onSubscribe); const mapped$ = source$.map(x => x*x); mapped$.subscribe(console.log);
操作符分实例操作符和静态操作符。
实例操作符:比如map,可认为是这样添加的:
Observable.prototype.map = implementationOfMap;
静态操作符:比如of,可认为是这样添加的:
Observable.of = functionToImplementOf;
区别就是一个是写在prototype上的,一个不是。静态的可以通过类直接调用,而实例的(写在prototype里的)是通过该类的每个实例都能调用的。
书上因为是v5版本,但是现在已经是v6了,所以这一块好像稍微有点区别。但是总的来说,主要不变的是导入方式和使用方式。测试了下v6中这样引用和使用:
import { of } from "rxjs"; import { map } from "rxjs/operators"; const source$ = of(1, 2, 3) source$.pipe(map((x: number) => x * x)).subscribe(console.log);
⽆论是静态操作符还是实例操作符,它们都会返回⼀个Observable对象。在链式调⽤中,静态操作符只能出现在⾸位,实例操作符则可以出现在任何位置
有的操作符两者都是,因此会发现在rxjs和rxjs/operators中都可以引入。比如merge。具体导入哪个就看是用在链式中间还是开头了。
操作符的实现
每个操作符都是一个函数,都必须考虑下面这些:
- 返回⼀个全新的Observable对象
- 对上游和下游的订阅及退订处理
- 处理异常情况
- 及时释放资源
创建类操作符
所谓创建类操作符,就是⼀些能够创造出⼀个Observable对象的⽅法,所谓“创造”,并不只是说返回⼀个Observable对象,因为任何⼀个操作符都会返回Observable对象,这⾥所说的创造,是指这些操作符不依赖于其他Observable对象,这些操作符可以凭空或者根据其他数据源创造出⼀个Observable对象。
只是不依赖于其他Observable对象而已,并不代表不需要输入,很多都是需要接受参数的。往往不会从其他Observable对象获取数据,在数据管道中,创建类操作符就是数据流的源头
of:列举数据。range:指定范围,步长为1的产生数据。generate:循环创建(类似for循环)。
合并操作符
这里记一下高阶Observable,就像高阶函数返回的是函数而不是具体数据一样,高阶Observable操作的就是Observable。
switchAll
总是switch到最新的Observable。
const s$ = interval(1000).pipe( take(3), map(x => interval(700).pipe( map(y => x + ":" + y), take(2) ) ) ); s$.pipe( switchAll() ).subscribe(console.log) // 0:0 // 1:0 // 2:0 // 2:1
分别在1,2,3秒的时候会产生三个Observable,姑且叫s1,s2,s3,在1.7s的时候s1产生第一个数据0:0,而s1的第二个数据要等到1.7+0.7s=2.4s的时候,而在2s的时候s2产生了,因此switch到s2,取消掉s1的订阅,2.7s的时候产生第二个数据1:0,同理,s2的第二个数据出来前就会switch到s3,然后没有新的Observable产生,一直订阅s3到结束。
如果把上面的700改成1500,那么就只有s3会产生数据,其他两个在产生数据前就被switch了。
exhaust
switch是来了新的就换新的,而exhaust是只要当前的数据没有吐完则忽略这期间新产生的Observable,等到吐完了才等待之后新产生的Observable。
如果直接把上面的switchAll换成exhaust,s2这个Observable就生不逢时的被忽略了,不会输出它产生的数据。
过滤数据流
回压控制
zip之类的合并操作符,必须一个个数据匹配来生成新数据,如果两条数据流产生数据的时间相隔很久,一个快一个慢,那么快的那条数据流的数据就会积压过多。这个时候,可以选择的放弃一些数据,这种叫做有损回压控制,Lossy Backpressure Control。
throttleTime:throttleTime(2000),产生一个数据后,放弃掉之后两秒内的数据,等待两秒之后的数据。这玩意如下代码,测试下每次输出的数据都不一样。
interval(1000).pipe( throttleTime(2000) ).subscribe(console.log)
不会如预想的一样输出0,2,4,6……把2000改小一点,比如1999就可以。
debounceTime:debounceTime(2000),第一个数据产生后,先保存着,两秒内如果没有新数据产生则输出这个,否则重新计时。
总结:
throttleTime是控制一段时间内爆发的数据只有一个能被处理,其他都舍弃掉。比如用户进行大量点击的时候,并不需要去响应所有的点击,处理一段时间内的一个即可。
debounceTime则是控制当数据爆发速度下降到某个程度的时候才处理。比如用户一直在滚动网页,只要他滚动没停说明他对当前的内容不感兴趣,那么就没必要加载图片等操作。
上面两个都是time结尾的操作符,表示的是通过时间来控制。对应的还有不带time的操作符,表示的是用数据流来控制数据。
throttle:
const s$ = interval(1000).pipe( map(x => `from s ${x}`) ) const f = v => { console.log('f got a value', v) return timer(1900) } s$.pipe( throttle(f) ).subscribe(console.log)
throttle每往下游传递⼀个数据,都关上了上下游之间闸门,只有当f产⽣数据的时候才打开这个闸门。并且,当阀门关闭的时候,throttle会退订f返回的Observable,然后重新订阅这个Observable。因此,这个Observable只有产生的第一个数据会有用(用来告诉阀门要打开了),因为之后就重启了下订阅。f产生什么数据并不重要,重要的是产生数据的时机。
auditTime:与throttleTime类似,throttleTime放出时间段内的第一个数据,其它的舍弃。而auditTime放出时间段内的最后一个数据。对应的,也有不带time的audit操作符。要注意的是:如果节流的时间过长,而这时上游已经完结了,那么auditTime就不会产生数据了。比如如下代码:
timer(1000).pipe( auditTime(2000) ).subscribe(console.log)
2000时间过长,这之前的数据全部舍弃掉了,等到阀门开启的时候,上游已经没有数据产生了,因此没有任何输出。
sampleTime:采样。按等分时间来采样,返回时间段内的最后一个数据。与上面的auditTime一样都是返回时间段内的最后一个数据,不同的是auditTime时间段的开始是由上游产生数据来触发的,而sampleTime的时间段与上游何时产生数据无关。
转化数据流
map:注意下map是有第二个参数
of(1, 2, 3, 4).pipe( map(function(x) { return `${x}+${this.name}`}, {name: 'kobe'}) ).subscribe(console.log)
第二个参数指代是第一个参数中的this,注意的是,如果要使用这个this,这个函数写成箭头函数是不行的,因为箭头形式的函数定义里,this是绑定于定义环境的,map的第⼆个参数也就不会起到任何作用。
虽然有这个功能,但是本着纯函数不依赖于其他数据的原则,还是不要用这个功能为好。
缓存窗口:无损回压控制
操作符带window的返回的都是Observable类型的,带buffer的则是返回的数组。
buffertTime,windowTime:都是根据时间来界定区间
timer(0, 100).pipe( windowTime(400), ).subscribe(x => x.subscribe(console.log))
输出看不出他是怎么处理的,实际上每400ms会把上游给到的数据放到新建的Observable里给下游。
timer(0, 100).pipe( bufferTime(400), ).subscribe(console.log)
400ms内的数据放数组里然后给下游。
异常处理
try...catch的问题:1. 只能处理同步代码 2. 可以用回调函数的方式来处理异步的问题,但是那又会陷入回调地狱。
promise解决了上述问题,但是promise的问题是请求失败后不能重试,以及不强制要求异常被捕获。
RxJs的解决办法:对错误处理分两类,恢复和重试。
重试的本质就是:取消上游的订阅,然后重新订阅。
多播
subject与普通的Observable不同的在于,他有自己的状态,他知道有哪些人订阅了他,而Observable则对observer一视同仁,来了就给数据。因为有自己的状态,所以complete了或者error了就结束了,不能重新开始了。
subject设计出来用来解决多播问题,因为subject既是Observable又是Observer,加上他又是Hot数据流,所以让一个Subject去订阅一个单播,然后只要去订阅这个Subject那么就能得到单播数据源的Hot数据流了。
multicast
const s$ = interval(1000) const p$ = s$.pipe(multicast(() => new Subject())) p$.subscribe(x => console.log('ob1', x)) setTimeout(() => { p$.subscribe(x => console.log('ob2', x)) }, 4000) p$.connect()
上面代码在ts中会报错:类型'Observable <any>'不存在属性'connect'
这是ts的锅,因为pipe总是返回Observable。办法是强制返回类型:
const p$ = s$.pipe(multicast(() => new Subject())) as ConnectableObservable<number>
使用multicast必须connect,也就是把啥时候放出数据的主动权交给了用户,毕竟这是Hot数据流,错过了数据可就没了,因为放开数据阀门的时机很重要。
如果需求是只要有人订阅了,那么就让subject去拿上游的数据,然后订阅者都没了,因为没人关心上游数据了,自然就可以取消订阅了,那么就可以使用refCount,可以不用connect,因此也可以去掉强制返回类型的代码。
const p$ = s$.pipe( multicast(() => new Subject()), refCount() )
multicast(() => new Subject()) 和 multicast(new Subject()) 的区别
const s$ = interval(1000).pipe(take(3)); const p$ = s$.pipe( multicast(new Subject()), refCount() ) p$.subscribe(x => console.log("ob1", x)); setTimeout(() => { p$.subscribe(x => console.log("ob2", x)); }, 4000);
上面的代码只会输出ob1的消息,因为等到ob2来订阅时,s$的数据已经结束,那么就会调用下游,也就是中间人subject的complete()方法,subject将不能再被使用。
而如果换成multicast(() => new Subject()),
,则在ob1结束后,ob2仍能正常订阅收到0,1,2数据。因为此时的subect对象是一个重新构建的新对象。
但是也因为ob1和ob2没有时间上的重叠,使得最后看起来像是订阅了一个Cold Observable。
publish 和 share
这两个操作符不过是基于multicast而做的而已,publish省去了传递subject对象参数,但是依然需要connect,share则是加了refCount而已。
Schedule
const s$ = range(0, 3) console.log('before subscribe') s$.subscribe(console.log, null, () => console.log('complete')) console.log('after subscribe') // before subscribe // 0 // 1 // 2 // complete // after subscribe
这样的输出没毛病,只是可以看出,这段代码是同步输出的,那么如果range产生的数据量很大的话,那么由于js是单线程,直到数据产生完这段时间其他任务都无法被执行,只能等待着。
range有第三个参数是一个schedule。asap是as soon as possible的缩写,是一个schedule实例。
const s$ = range(0, 3, asap) // before subscribe // after subscribe // 0 // 1 // 2 // complete
asap把产生每⼀个数据的工作都通过MicroTask来实现,避免了同步调用,让别的任务有执行的机会,而不会一直让range独占线程。
asap和async两个Scheduler都是利用事件循环来实现异步的效果,两者的不同,就是asap会尽量使用Micro Task,而async利用的是Macro Task。
所谓事件循环可以看作一个死循环,不断的拿任务队列里的任务给调用栈去执行。
任务队列则包括Micro Task和Macro Task,Macro Task就是普通的任务队列,而Micro Task拥有更高的优先级,里面的任务会被优先处理。