Rxjs

Rxjs使用观察者模式,用于编写异步队列和事件处理

后端不需要处理前端事件,所以Nestjs主要使用Rxjs的异步队列功能

Rxjs的核心组成成分:

  • Observable:可观察的物件
  • Subscription:用于监听Observable
  • Operators:纯函数,可以处理管道的数据,如map、filter、concat、reduce等

Nestjs中内置Rxjs,无需安装,并且Nestjs也有一些基于Rxjs封装的API

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { of, Observable, interval, take } from "rxjs"
import { map, filter, findIndex, reduce } from "rxjs/operators"

// Observable,被观察的物件
const observable = new Observable((subscribe)=> {
// 发送消息
subscribe.next(114)
subscribe.next(514)
subscribe.next(1919)

setTimeout(() => {
subscribe.next(810)
// 通知结束消息发送
subscribe.complete()
}, 3000)
})

// Subscription,监听Observable
observable.subscribe({
// 接收消息
next: (num)=> {
console.log(num)
}
})

interval也是一种Observable,可以按时间间隔发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 从0开始每隔500ms输出一个下一个整数,直到5为止(5不被输出)
// interval:间隔时间 pipe:管道 take:停止输出的条件 subscribe:输出的观察者
// interval(500).pipe(take(5)).subscribe(e=> {
// console.log(e)
// })

// map和take类似,但是每个整数会被转为这样的对象
// const subs = interval(500).pipe(map(v=>({n:v}))).subscribe(e=> {
// console.log(e)
// if(e.n === 10) {
// // 停止观察,10会被输出
// subs.unsubscribe()
// }
// })

// pipe内可以使用多个Operator
const subs = interval(500).pipe(map(v=>({n:v})),filter(v=>v.n%2===0)).subscribe(e=> {
console.log(e)
if(e.n === 10) {
// 停止观察,10会被输出
subs.unsubscribe()
}
})

of也是一种Observable,可以自定义填充数据:

1
2
3
4
5
6
7
8
9
// 数据被规定为1,2,3,4,5,6
const subs = of(1,2,3,4,5,6).pipe(map(v=>({n:v})),filter(v=>v.n%2===0)).subscribe(e=> {
console.log(e)
})

// 当操作出错时,retry会重试,重试几次取决于传的参数
const subs = of(1,2,3,4,5,6).pipe(retry(3), map(v=>({n:v}))).subscribe(e=> {
console.log(e)
})

也可以处理DOM事件:

1
2
3
4
5
const click$ = fromEvent(document, 'click').pipe(map(v=>v.target))

click$.subscribe(e=> {
console.log(e)
})