Codpoe

Front-end Developer @bytedance

关于 RxJS 的数据流

RxJS

在写 Android 的时候,RxJava 和 RxAndroid 几乎是每个项目的标配,同时,RxJava 也是 Rx 系列里 GitHub star 数最多的,可见 Rx 在 Java 领域的普及度很高,而 RxJava 的各种文章也真心多。而在前端领域,RxJS 就相对没那么流行了。

至于原因,一方面前端的各种框架 Vue 等等也可以做到类似的功能,有时候并没有必要再引入 RxJS,另一方面是 RxJS 上手有点难(当初入门 RxJava 的时候,扔物线大大的入门文章我反复看了好几遍。。。),还有就是我觉得吧,是 Java 本身的异步有点难用。。。

但毋庸置疑,RxJS 的数据流操作能力是相当强大且富有想象力的。

饿了么前端的 RxJS 入门文章

最近看了饿了么前端博客里的 RxJS 入门文章–让我们一起来学习 RxJS,文章写得还是很易懂的,里面用『画板』这个例子初步展示了 RxJS 的强大数据流操作能力。

说 Rx 就必然要说 Rx 里五花八门的操作符(operator)了。『画板』这个例子里面主要用到了mapswitchMapswitchMapTobufferCount这几个操作符。饿了么这篇文章并没有具体介绍这几个操作符,我就看着文档,敲着 demo,试图理解并区分这几个操作符,真的会晕。。。

  • map map操作符是比较常用的,用于映射数据,说白了就是对数据做操作与转换。比如把源数据1, 2, 3通过map((data) => (data * 10))转换为10, 20, 30

  • switchMap switchMap用于改变Observable,应该是等同于map+switch

  • switchMapTo switchMapTo类似于switchMap,区别在于switchMapTo不接收参数,也就是说,它不可以对上游数据进行操作,仅仅只是改变数据流的Observable;而switchMap接收参数,可以对上游数据进行操作,并据此发出相应的Observable

  • bufferCount 用于将数据按照参数的设置集合在一起。在旧版本的 RxJS 中,并没有bufferCount这个操作符,但是有bufferWithCount,作用是一样的。bufferCount可接收两个参数,第一个参数是数据集合的大小,第二个参数是跳过的数据的数量。第二个参数是可选的。举个栗子:设源数据是1, 2, 3,经过bufferCount(2, 1)之后,得到的数据就是[1, 2], [2, 3]

问题来了

按照饿了么文章提到的思路,我就去按瓢画葫芦了。代码如下:

let down$ = Rx.Observable.fromEvent(canvas, 'mousedown').map(ev => 'down');
let up$ = Rx.Observable.fromEvent(canvas, 'mouseup').map(ev => 'up');
let move$ = Rx.Observable.fromEvent(canvas, 'mousemove');

down$
  .merge(up$)
  .switchMap(action => (action === 'down' ? move$ : Rx.Observable.empty()))
  .map(ev => ({
    offsetX: ev.offsetX,
    offsetY: ev.offsetY,
  }))
  .bufferCount(2, 1)
  .subscribe(res => {
    ctx.moveTo(res[0].offsetX, res[0].offsetY);
    ctx.lineTo(res[1].offsetX, res[1].offsetY);
    ctx.stroke();
  });
let down$ = Rx.Observable.fromEvent(canvas, 'mousedown').map(ev => 'down');
let up$ = Rx.Observable.fromEvent(canvas, 'mouseup').map(ev => 'up');
let move$ = Rx.Observable.fromEvent(canvas, 'mousemove');

down$
  .merge(up$)
  .switchMap(action => (action === 'down' ? move$ : Rx.Observable.empty()))
  .map(ev => ({
    offsetX: ev.offsetX,
    offsetY: ev.offsetY,
  }))
  .bufferCount(2, 1)
  .subscribe(res => {
    ctx.moveTo(res[0].offsetX, res[0].offsetY);
    ctx.lineTo(res[1].offsetX, res[1].offsetY);
    ctx.stroke();
  });

好像没什么问题,但是在测试的时候,发现后面画的线总是会和前面的线连在一起,如图所示:

这好像与原意不符啊,原本想要的效果是每次画的线要分开的。那么问题来了,到底哪个环节出了差错呢?一脸懵 B。

解决

想了想,前面的线跟后面的连在一起,应该是成对出现的坐标出了问题,导致上一次画线的末尾坐标跟下一下画线的初始坐标配对了,那么是数据流出现了问题吧。那就捋一捋数据流吧。

鼠标按下,触发mousedown事件,发射出down,但未松开鼠标,所以没有触发mouseup事件,也就没有将数据改成['down', 'up'],接下来鼠标按着移动,数据经过switchMapTo变换,Observable变成了mousemove,画了一会,鼠标松开,触发mouseup事件,发射出up...

形象地,实际的数据流为:

down$:       down-------------down--------
up$:         -----------up----------------
merge:       -----------up----down--------
switchMapTo: move0-move1------move2-move3-
bufferCount: [move0, move1]-[move1, move2]-[move2, move3]

result:      [move0, move1]-[move1, move2]-[move2, move3]
down$:       down-------------down--------
up$:         -----------up----------------
merge:       -----------up----down--------
switchMapTo: move0-move1------move2-move3-
bufferCount: [move0, move1]-[move1, move2]-[move2, move3]

result:      [move0, move1]-[move1, move2]-[move2, move3]

然而按照预期想法,move0, move1应是同一条线上的,move2, move3是另一条线上的,所以预期的数据流应是这样的:

result: [move0, move1]-[move2, move3]
result: [move0, move1]-[move2, move3]

这显然和实际情况不符,这就是问题所在:bufferCount错误地将两个原本不应该在一起的坐标合并了。所以要解决这个问题,就要改变bufferCount的位置。

具体地说,就是不能在主数据流里用bufferCount,而应该在每次都是新发射出的mousemove数据流里用,先使坐标们正确地合并在一起,然后才发射到主数据流上。

down$:       down-------------down--------
up$:         -----------up----------------
merge:       -----------up----down--------
switchMapTo: move0-move1------move2-move3-
- bufferCount:
             [move0, move1]-[move2, move3]

result:      [move0, move1]-[move2, move3]
down$:       down-------------down--------
up$:         -----------up----------------
merge:       -----------up----down--------
switchMapTo: move0-move1------move2-move3-
- bufferCount:
             [move0, move1]-[move2, move3]

result:      [move0, move1]-[move2, move3]

根据上面这个数据流,修改代码如下:

let down$ = Rx.Observable.fromEvent(canvas, 'mousedown').map(ev => 'down');
let up$ = Rx.Observable.fromEvent(canvas, 'mouseup').map(ev => 'up');
let move$ = Rx.Observable.fromEvent(canvas, 'mousemove')
  .map(ev => ({
    offsetX: ev.offsetX,
    offsetY: ev.offsetY,
  }))
  .bufferCount(2, 1); // 此操作符的位置很关键

down$
  .merge(up$)
  .switchMap(action => (action === 'down' ? move$ : Rx.Observable.empty()))
  .subscribe(res => {
    ctx.moveTo(res[0].offsetX, res[0].offsetY);
    ctx.lineTo(res[1].offsetX, res[1].offsetY);
    ctx.stroke();
  });
let down$ = Rx.Observable.fromEvent(canvas, 'mousedown').map(ev => 'down');
let up$ = Rx.Observable.fromEvent(canvas, 'mouseup').map(ev => 'up');
let move$ = Rx.Observable.fromEvent(canvas, 'mousemove')
  .map(ev => ({
    offsetX: ev.offsetX,
    offsetY: ev.offsetY,
  }))
  .bufferCount(2, 1); // 此操作符的位置很关键

down$
  .merge(up$)
  .switchMap(action => (action === 'down' ? move$ : Rx.Observable.empty()))
  .subscribe(res => {
    ctx.moveTo(res[0].offsetX, res[0].offsetY);
    ctx.lineTo(res[1].offsetX, res[1].offsetY);
    ctx.stroke();
  });

其实后来看了饿了么这篇文章 Demo 的代码,也是这样的。

最后

 通过这个『画板』的栗子,可以看出 RxJS 强大的数据操作能力,但是也要时刻注意代码所表示的数据流是否真的如你的预期,所以要注意操作符序列的顺序。最后,画数据流图会有很大帮助。