(六)温故知新系列之RXJS——RXJS操作符基础(转化类)

Source

前言

合并类操作符把多个数据流汇合为⼀个数据 流,但是汇合之前数据是怎样,在汇合之后还是那样;过滤类操作符可以 筛选掉⼀些数据,其中回压控制的过滤类操作符还可以改变数据传递给下 游的时间,但是数据本⾝不会变化,怎么进就怎么出。这⼀章会介绍RxJS 对于数据的转化处理,也就是让数据管道中的数据发⽣变化。

转化类和过滤类操作符都对数据做⼀些处理,但是过滤类做的处理是 筛选,决定哪些数据传递给下游,并不对数据本⾝做处理;⽽转化类操作 符不做过滤,会对每个具体数据做⼀些转化。

  • 将每个元素用映射函数产生新的数据 —— map
  • 将数据流中的每个元素映射为同一个数据 —— mapTo
  • 提取数据流中每个数据的某个字段 —— pluck
  • 产生高阶 Observable 对象 —— windowTime、windowCount、windowWhen、windowToggle和window
  • 产生数组构成的数据流 —— bufferTime、bufferCount、bufferWhen、bufferToggel和buffer
  • 映射产生高阶 Observable 对象然后合并 —— concatMap、mergeMap、switchMap、exhaustMap
  • 产生规约运算结果组成的数据流 —— scan和mergeScan

对数据的转化可以分为两种:

  • 对每个数据做转化。上游的数据和下游的数据依然是⼀对⼀的关系, 只不过传给下游的数据已经是另⼀个数据,⽐如上游传下来的是数据A, 传给下游的是数据f(A),其中f是⼀个函数,以A为输⼊返回⼀个新的数据。* 不转化单个数据,⽽是把数据重新组合。⽐如上游传下来的是A、 B、C三个数据,传给下游的是⼀个数组数据[A,B,C],并没有改变上游 数据本⾝,只是把它们都塞到了⼀个数组对象中传给下游。在RxJS中,创建类操作符是数据流的源头,其余所有操作符最重要的 三类就是合并类、过滤类和转化类。 使⽤RxJS解决问题绝⼤ 部分时间就是在使⽤这三种操作符,所以,⼀定要掌握这⼀章的知识,同时要及时复习⼀下前⾯的章节内容。

映射数据

假如上游的数据是A、B、C、D的序列,那么可以认为经过转化类操 作符之后,就会变成f(A)、f(B)、f(C)、f(D)的序列,其中f是⼀ 个函数,作⽤于上游数据之后,产⽣的就是传给下游新的数据。

1.map 对源 observable 的每个值应用投射函数

与JS不同之处是map这个操作符可以映射⼀段时间上异步产⽣ 的数据。因为RxJS的数据处理⽅式是“推”,每当上游推下来⼀个数据, map就把这个数据作为参数传给map的参数函数,然后再把函数执⾏的返回 值推给下游。

map除了必须要有的函数参数project,还有⼀个可选参数 thisArg,⽤于指定函数project执⾏时的this值。

const { of, interval } = rxjs;
const { map, } = rxjs.operators;
const source$ = of(3, 1, 4);

const mapFunc = function (value, index) {return `${value} ${this.separator} ${index}`;
};
const context = { separator: '=>' };

const result$ = source$.pipe(map(mapFunc, context));
result$.subscribe(console.log,
);
// 3 => 0
// 1 => 1
// 4 => 2 

mapFunc这个函数是map的第⼀个参数,充当project的功能,同 时,map还有第⼆个参数context对象,如果⽤上这个参数,那么mapFunc在 每次执⾏的时候,this就是map的这个参数context。

注意

mapFunc的定义使⽤了不同的函数表达式,⽽不是箭头形式的函数定 义,因为箭头形式的函数定义⾥,this是绑定于定义环境的,map的第⼆个 参数也就不会起到任何作⽤。

这是map的⼀个⼩的功能细节,但是,并不建议使⽤,因为按照函数 式编程的原则,应该尽量让函数成为纯函数,如果⼀个函数的执⾏依赖于 this,那么就难以预料这个函数的执⾏结果,并不是什么好事。所以,虽然 我们知道map有这个功能,但要尽量避免使⽤它。

2.mapTo 将每个发出值映射成字符串

mapTo这个函数完全可以⽤ map来实现:

const { of, interval } = rxjs;
const { map, } = rxjs.operators;
const source$ = of(3, 1, 4);

const result$ = source$.pipe(map(() => 'A')); // A A A
const result$ = source$.pipe(mapTo('A')); // A A A

result$.subscribe(console.log,
); 

3.pluck 选择属性来发出。

pluck就是把上游数据中特定字段 的值“拔”出来。

const { of, interval } = rxjs;
const { pluck , } = rxjs.operators;
const source$ = of({ name: 'RxJS', version: 'v4' },{ name: 'React', version: 'v15' },{ name: 'React', version: 'v16' },{ name: 'RxJS', version: 'v5' }
);
const result$ = source$.pipe(pluck('name'))
result$.subscribe(console.log,
);
// RxJS
// React
// React
// RxJS 

##缓存窗口:⽆损回压控制

⽆损的回压控制 就是把上游在⼀段时间内产⽣的数据放到⼀个数据集合⾥,然后把这个数 据集合⼀次丢给下游。⾥所说的“数据集合”,可以是⼀个数组,也可 以是⼀个Observable对象。

RxJS有两组操作符对两种数据集合类型分别提 供⽀持,⽀持数组的以buffer开头,⽀持Observable对象的以window开头。

回压控制的过程中,并没有像map和mapTo那样映射产⽣新的 数据,只是把多个上游数据缓存起来,当时机合适时,把缓存的数据汇聚 到⼀个数组或者Observable对象传给下游,所以也算是转化类操作符的范 畴。

如果数据管道中使⽤了这样的转化类操作符,下游必须要做 对应的处理,原本下游预期的是⼀个⼀个独⽴的数据,现在会接收到数组 或者Observable对象,⾄于如何处理这些类型数据,决定权完全在下游。

⽆损的回压控制,实际上就是把数据取舍的决策权交给了下游。

对于回压控制,如果使⽤过滤类操作符,虽然是有损的回压控制,但 是好处就是对下游来说是透明的,有没有使⽤过滤类操作符不影响下游的 处理⽅式;如果使⽤转化类操作符,代价就是下游需要对应改变,好处就 是对数据⽆损。如果并不确定该如何对数据做取舍,那就适合⽤转化类操 作符。

4.windowTime和bufferTime 在每个提供的时间跨度内,收集源 obsercvable 中的值的 observable。

基本⽤法就是 ⽤⼀个参数来指定产⽣缓冲窗⼜的间隔:

const { of, timer } = rxjs;
const { windowTime, } = rxjs.operators;

const source$ = timer(0, 100);
const result$ = source$.pipe(windowTime(400)
)
result$.subscribe(res => {console.log('NEW WINDOW!') }
);

// 每400毫秒输出一个 NEW WINDOW! 

windowTime的参数是400,也就会把时间划分为连续的400毫秒 长度区块,在每个时间区块中,上游传下来的数据不会直接送给下游,⽽ 是在该时间区块的开始就新创建⼀个Observable对象推送给下游,然后在 这个时间区块内上游产⽣的数据放到这个新创建的Observable对象中。

windowTime产⽣的Observable对象中每个数据 依然是Observable对象,也就是⼀个⾼阶Observable对象。在每个400毫秒 的时间区间内,上游的每个数据都被传送给对应时间区间的内部 Observable对象中,当400毫秒时间⼀到,这个区间的内部Observable对象 就会完结。

bufferTime产⽣的是普通的Observable对象,其中的数据是数组形式, bufferTime会把时间区块内的数据缓存,在时间区块结束的时候把所有缓 存的数据放在⼀个数组⾥传给下游。很明显,windowTime把上游数据传递 出去是不需要延迟的,⽽bufferTime则需要缓存上游的数据,这也就是其 名字中带buffer(缓存)的原因。

5.windowCount和bufferCount 每发出x个项就开启一个新窗口。

const { of, timer } = rxjs;
const { windowCount, } = rxjs.operators;

const source$ = timer(0, 100);
const result$ = source$.pipe(windowCount(4)
)
result$.subscribe(res => {console.log('NEW WINDOW!',res)}
); 

理解了windowTime 和 windowBuffer 也不难理解这两个带count的操作符了,不过多赘述了。

⾼阶的map

所有⾼阶map的操作符都有⼀个函数参数project,但是和普通map不 同,普通map只是把⼀个数据映射为另⼀个数据,⽽⾼阶map的函数参数 project把⼀个数据映射为⼀个Observable对象。

先来看普通的map使⽤这个project会产⽣什么样的结果:

const { of, timer, interval } = rxjs;
const { map, take } = rxjs.operators;

const project = (value, index) => {return interval(100).pipe(take(5))
}

const source$ = interval(200);
const result$ = source$.pipe(map(project)
)

result$.subscribe(console.log);// {} 

可以看到,在这⾥map产⽣的是⼀个⾼阶Observable对象,project返回 的结果成为这个⾼阶Observable对象的每个内部Observable对象。

所谓⾼阶map,所做的事情就是⽐普通的map更进⼀步,不只是把 project返回的结果丢给下游就完事,⽽是把每个内部Observable中的数据做 组合,通俗⼀点说就是“砸平”,最后传给下游的依然是普通的⼀阶 Observable对象。

所有xxxxMap名称模式的操作符,都是⼀个map加上⼀个“砸平”操作的 组合,理解这样的本质之后,就容易理解⾼阶map了,其实就是把上图中map产⽣的⾼阶Observable利⽤对应的组合操作符合并为⼀阶的 Observable对象。

6.concatMap 将值映射成内部 observable,并按顺序订阅和发出。

把上⾯使⽤map的代码改为使⽤concatMap

const { of, timer, interval } = rxjs;
const { concatMap, take } = rxjs.operators;

const project = (value, index) => {return interval(100).pipe(take(5))
}

const source$ = interval(200);
const result$ = source$.pipe(concatMap(project)
)

result$.subscribe(console.log); // 01234 01234... 

第⼀个内部Observable对象中的数据被完整传递给了 concatMap的下游,但是,第⼆个产⽣的内部Observable对象没有那么快处 理,只有到第⼀个内部Observable对象完结之后,concatMap才会去订阅第 ⼆个内部Observable,这样就导致第⼆个内部Observable对象中的数据排在 了后⾯,绝不会和第⼀个内部Observable对象中的数据交叉。

concatMap适合处理需要顺序连接不同Observable对象中数据的操作, 有⼀个特别适合使⽤concatMap的应⽤例⼦,就是⽹页应⽤中的拖拽操作。 在⽹页应⽤中,拖拽操作就是⽤户的⿏标在某个DOM元素上按下去, 然后拖动这个DOM元素,最后松开⿏标这整个过程,⽽且⽤户在⼀个⽹页 可以做完⼀个拖拽动作之后再做⼀个拖拽动作,这个过程是重复的,拖拽 涉及的事件包括mousedown、mousemove和mouseup,所以拖拽功能控制得 好的关键,就是要做好这⼏个事件的处理。

如果把mousemove的序列看作是⼀ 个Observable对象,整个过程可以看作是⼀个⾼阶Observable对象,其中每 ⼀个内部Observable对象由mousedown事件引发,每⼀个内部Observable对 象就是以mouseup结束的mousemove数据序列,⽽且,每⼀⾏都是⾸尾相接 的,不存在数据的交叉。

7.mergeMap 对于每个内部Observable对象直接合并。

mergeMap对于每个内部Observable对象直接合并, 也就是任何内部Observable对象中的数据,来⼀个给下游传⼀个,不做任 何等待。

const { of, timer, interval } = rxjs;
const { mergeMap, take } = rxjs.operators;

const project = (value, index) => {return interval(100).pipe(take(5))
}

const source$ = interval(200).pipe(take(2));
const result$ = source$.pipe(mergeMap(project)
)

result$.subscribe(console.log); // 0120314 ... 

mergeMap能够解决异步操作的问题,最典型的应⽤场景就是对于 AJAX请求的处理。在⼀个⽹页应⽤中,⼀个很典型的场景,每点击某个 元素就需要发送⼀个AJAX请求给服务器端,同时还要根据返回结果更新 ⽹页中的状态,AJAX的处理当然是⼀个异步过程,使⽤传统的⽅法来解 决这样的异步过程代码会⼗分繁杂。

但是,如果把⽤户的点击操作看作⼀个数据流,把AJAX的返回结果 也看作⼀个数据流,那么这个问题的解法就是完全另⼀个样⼦,可以⾮常 简洁,下⾯是⽰例代码:

const sendButton = document.querySelector('#send'); Rx.Observable.fromEvent(sendButton, 'click').mergeMap(() => { return Rx.Observable.ajax(apiUrl); }).subscribe(result => { // 正常处理AJAX返回的结果 }); 

其中,mergeMap的函数参数部分只需要考虑如何调⽤AJAX,然后返 回⼀个包含结果的Observable对象,剩下来如何将AJAX结果传递给下游, 交给mergeMap就可以了。虽然是⼀个异步操作,但是整个代码依然是同步 的感觉,这就是RxJS的优势。

8.switchMap 映射成 observable,完成前一个内部 observable,发出值。

上⾯介绍的mergeMap适合处理AJAX请求,但是使⽤mergeMap存在⼀ 个问题,就是每⼀个上游的数据都会引发调⽤AJAX⽽且把AJAX结果传递 给下游,在某些场景下,这样的处理未必合适。⽐如,当⽤户点击某个按 钮时获取RxJS项⽬在GitHub上当前的star个数,⽤户可能快速点击这个按 钮,但是他们肯定是希望获得最新的数据,如果使⽤mergeMap可能就不会 获得预计的结果。

⽤户点击按钮,⼀个AJAX请求发出去,这时候RxJS的star数为9907, 不过因为⽹络速度⽐较慢的原因,这个AJAX请求的延时⽐较⼤,⽤户等 不及了,又点了⼀次按钮,又⼀个AJAX请求发出去了。这时候,第⼀个 AJAX请求已经获得了数据9907,⽽恰在此时世界某个地⽅的开发者也很 喜欢RxJS,点击了RxJS项⽬的star,于是RxJS的star数变成了9908,然后, ⽤户触发的第⼆个AJAX也到了,拿到了9908的数据。只要涉及输⼊输 出,延时就是不可预期的,先发出去的AJAX未必就会先返回,完全有可 能第⼆个AJAX请求的结果⽐第⼀个更早返回,这时候使⽤mergeMap就会 出问题了,⽤户会先看到9908,然后又会被第⼀个AJAX请求的返回修改 为9907,毫⽆疑问,9907并不是最新的数据。

switchMap依然在上游产⽣数据的时候去调⽤函数参数project,但它和 concatMap和mergeMap都不⼀样的是,后产⽣的内部Observable对象优先级 总是更⾼,只要有新的内部Observable对象产⽣,就⽴刻退订之前的内部 Observable对象,改为从最新的内部Observable对象拿数据。就像switch的 含义⼀样,switchMap做的是⼀个“切换”,只要有更新的内部Observable对 象,就切换到最新的内部Observable对象。

const { of, timer, interval } = rxjs;
const { switchMap, take } = rxjs.operators;

const project = (value, index) => {return interval(100).pipe(take(5))
}

const source$ = interval(200).pipe(take(2));
const result$ = source$.pipe(switchMap(project)
)
result$.subscribe(console.log); // 001234 

switchMap这个特点适⽤于总是要获取最新AJAX请求返回的应⽤,只 需要把上⾯使⽤mergeMap来合并AJAX请求的代码中改为⽤switchMap就可以了。

9.exhaustMap 映射成内部 observable,忽略其他值直到该 observable 完成。

exhaustMap对数据的处理策略和switchMap正好相反,先产⽣的内部 Observable优先级总是更⾼,后产⽣的内部Observable对象被利⽤的唯⼀机 会,就是之前的内部Observable对象已经完结。

10.scan 随着时间的推移进行归并。

scan和 reduce的区别在于scan对上游每⼀个数据都会产⽣⼀个规约结果,⽽reduce 是对上游所有数据进⾏规约,reduce最多只给下游传递⼀个数据,如果上 游数据永不完结,那reduce也永远不会产⽣数据,⽽scan完全可以处理⼀个 永不完结的上游Observable对象。

const { of, timer, interval } = rxjs;
const { scan, take } = rxjs.operators;

const project = (value, index) => {return interval(100).pipe(take(5))
}

const source$ = interval(100);
const result$ = source$.pipe(scan((accumulation, value) => {return accumulation + value;})
)

result$.subscribe(console.log); // 0 1 2 3 6 10 15 21 ... 

其中,source$间隔100毫秒产⽣⼀个数值序列,scan的规约函数参数把 之前规约的值加上当前数据作为规约结果,每⼀次上游产⽣数据的时候, 这个规约函数都会被调⽤,结果会传给下游,同时结果也会由scan保存, 作为下⼀次调⽤规约函数时的accumulation参数。

小结

最简单的数据转化只是把上游的某个数据转化为对应的⼀个下游数 据,但是数据转化不限于单个数据的转化,还包括把上游的多个数据合并 为⼀个数据传给下游。

这种合并转化操作不同于合并类操作符的操作,因 为合并类操作符只是搬运上游数据,并不会改变数据⾃⾝。 转化类操作符也可以⽤来控制回压,这是⼀种⽆损的回压控制⽅法, 本质上是把如何过滤掉⽆关信息的决策权交给了下游。

最后

整理了一套《前端大厂面试宝典》,包含了HTML、CSS、JavaScript、HTTP、TCP协议、浏览器、VUE、React、数据结构和算法,一共201道面试题,并对每个问题作出了回答和解析。

有需要的小伙伴,可以点击文末卡片领取这份文档,无偿分享

部分文档展示:



文章篇幅有限,后面的内容就不一一展示了

有需要的小伙伴,可以点下方卡片免费领取