rxjs 源码分析1-(fromEvent)

栏目: 编程语言 · 发布时间: 5年前

内容简介:我们先从我们最常用的事件处理开始,Rxjs 对事件的处理有一个其实现的效果如下图:

Rxjs 是使用 Observables 的响应式编程的库,它使编写异步或基于回调的代码更容易。我们现在针对 Rxjs 6 来进行源码分析,分析其实现的基本原理, 我们可以根据中文文档来学习 Rxjs 的基本使用,但是这个文档是 Rxjs 5 的版本。其最基本的使用区别如下,Rxjs 6的操作符都放在 pipe (管道)中配置,而Rxjs 5 的版本是直接调用

Rxjs 5

fromEvent(addBtn, 'click')
    .throttleTime(3000)
    .subscribe(() => {
        nameInput.value = +(nameInput.value) + 1
    })
复制代码

Rxjs 6

fromEvent(addBtn, 'click')
    .pipe(throttleTime(3000))
    .subscribe(() => {
        nameInput.value = +(nameInput.value) + 1
    })
复制代码

fromEvent

使用方法

我们先从我们最常用的事件处理开始,Rxjs 对事件的处理有一个 fromEvent 方法, 其最简单的一个范例如下:

import {fromEvent } from './esm2015';

const addBtn = document.getElementById('add')
const minusBtn = document.getElementById('minus')
const nameInput = document.getElementById('name');
fromEvent(addBtn, 'click')  
    .subscribe(() => {
        nameInput.value = +(nameInput.value) + 1
    })

fromEvent(minusBtn, 'click')
    .subscribe(() => {
        nameInput.value = +(nameInput.value) - 1
    })
复制代码

其实现的效果如下图:

rxjs 源码分析1-(fromEvent)

我们点击加号或者减号 button 去给Input赋值。

原理

从上面我们已经可以知道怎么去简单使用 fromEvent , 下面我们根据源代码来一步步深入分析,其基本原理:

export function fromEvent(target, eventName, options, resultSelector) {
    if (isFunction(options)) {
        resultSelector = options;
        options = undefined;
    }
    if (resultSelector) {
        return fromEvent(target, eventName, options).pipe(map(args => isArray(args) ? resultSelector(...args) : resultSelector(args)));
    }
    return new Observable(subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    });
}
复制代码

fromEvent 其实就是一个方法, 可以传入四个参数,我们上面的Demo 只是传递了两个参数,我们先只分析只传递两个参数的情况: fromEvent 最终会返回一个 Observable 对象, 我们可以将上面的Demo 代码,进行简单的处理,如下:

import { fromEvent } from './esm2015';

const addBtn = document.getElementById('add')
const minusBtn = document.getElementById('minus')
const nameInput = document.getElementById('name')
const addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})

const minusFromEventObj = fromEvent(minusBtn, 'click')
minusFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) - 1
})
复制代码

我们将如下代码分成了两个步骤,

fromEvent(addBtn, 'click')  
    .subscribe(() => {
        nameInput.value = +(nameInput.value) + 1
    })

复制代码

const addFromEventObj = fromEvent(addBtn, 'click') 我们可以查看 addFromEventObj 对象,如下截图:

rxjs 源码分析1-(fromEvent)
很简单,就是 Observable 对象,其中有一个重要的属性 _subscribe 的属性, 执行的就是 fromEventreturn

对象的传入的参数:

return new Observable(subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    });
复制代码

其实 fromEvent 方法,很简单,就是返回一个 Observable 对象, 其他的基本就没有什么了。 其中点还是返回的这个对象,我们下面来深入分析 Observable 对象.

Observable

Observable 的中文翻译就是 可观察的 , 表示一个可观察的对象,既然是一个可观察的对象,那观察到变化后,是不是要通知相应的 观察者 呢?

构造函数

我们首先分析 Observable 的构造函数:

constructor(subscribe) {
        this._isScalar = false;
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }
复制代码

其构造函数就接收一个参数 subscribe ,其中文意思就是 订阅 ,其实 订阅 就是 观察者 的概念,然后我们的 可观察的对象观察者 对象就关联起来了。

const addFromEventObj = fromEvent(addBtn, 'click')addFromEventObj 表示创建了一个 可观察的 对象, 有了 可观察的 对象,我们就需要对这个对象进行 订阅 了, 我们下面就来分析 subscribe 方法

subscribe

subscribe的源码如下:

subscribe(observerOrNext, error, complete) {
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
        if (operator) {
            operator.call(sink, this.source);
        }
        else {
            sink.add(
                this.source || 
                (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
                this._subscribe(sink) :
                this._trySubscribe(sink)
            );
        }
        if (config.useDeprecatedSynchronousErrorHandling) {
            if (sink.syncErrorThrowable) {
                sink.syncErrorThrowable = false;
                if (sink.syncErrorThrown) {
                    throw sink.syncErrorValue;
                }
            }
        }
        return sink;
    }
复制代码

我们目前先不考虑添加 operator (操作符)的情况,我们下面来一一分析这个方法:

  1. const sink = toSubscriber(observerOrNext, error, complete); 创建了一个 Subscriber 方法,
  2. 因为 operator 为undefined, 所以条件判断语句会进入 else ,
sink.add(
                this.source || 
                (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
                this._subscribe(sink) :
                this._trySubscribe(sink)
            );
复制代码

其中add()方法里面会先执行 this._trySubscribe(sink) , 然后执行: return this._subscribe(sink); , 我们应该还记得 this._subscribe ,是我们调用 fromEvent 方法, 返回 Observable 对象,传入构造函数的参数,代码如下:

subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    }
复制代码

这里就是给真实的 Dom 元素进行事件的绑定,现在我们点击 button , 就会调用如上的 handler 方法, 主要实现是 subscriber.next(e); , 其'subscriber' 也就是 const sink = toSubscriber(observerOrNext, error, complete); 创建的 sink 对象,其对应就是 esm2015\internal\Subscriber.js 对象,现在我们查看其对应的 next() 方法, 其指向的是:

_next(value) {
        this.destination.next(value);
    }
复制代码

this.destination 属性, 是在 Subscriber 构造函数赋值的, 是一个 SafeSubscriber 对象

default:
        this.syncErrorThrowable = true;
        this.destination = new SafeSubscriber(this, destinationOrNext, error, complete);
        break;
复制代码

其中 destinationOrNext 对应的是 subscribe 方法传入的参数,如下:

const addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})
复制代码

就是

() => {
    nameInput.value = +(nameInput.value) + 1
}
复制代码

下面我们继续来分析 this.destination.next(value); , 其代码如下:

next(value) {
        if (!this.isStopped && this._next) {
            const { _parentSubscriber } = this;
            if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
                this.__tryOrUnsub(this._next, value);
            }
            else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
                this.unsubscribe();
            }
        }
    }
复制代码

最终实现方法是 this.__tryOrUnsub(this._next, value); , 这个方法传递了两个参数:

  1. this._next , 其在 SafeSubscriber 构造函数中对其进行了赋值: next = observerOrNext; ,也就是构造函数的第二个参数, 我们上面已经分析了创建 SafeSubscriber 对象的地方 this.destination = new SafeSubscriber(this, destinationOrNext, error, complete); , 这个 destinationOrNext 就是 subscribe 方法传入的参数,如下:
const addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})
复制代码
  1. value , 也就是 click 事件的对象 MouseEvent

我们接下来具体分析**__tryOrUnsub** 方法,代码如下:

__tryOrUnsub(fn, value) {
        try {
            fn.call(this._context, value);
        }
        catch (err) {
            this.unsubscribe();
            if (config.useDeprecatedSynchronousErrorHandling) {
                throw err;
            }
            else {
                hostReportError(err);
            }
        }
    }
复制代码

其主要实现就是 fn.call(this._context, value); , 就会执行了 subscribe 里面的方法了, 也就是执行:

() => {
    nameInput.value = +(nameInput.value) + 1
}
复制代码

pipe

上面我们已经基本理解了 fromEvent 的基本使用方法,主要分析的是 subscribe 方法,我们现在有个需求,我们要控制Button , 在 3s 时间内, 我们只能点击一次, 以防止,恶意点击按钮. Rxjs 都是基于流来操作, Observable 对象提供了一个 pipe (管道)的方法, 在进入到 subscribe 订阅者方法之前,所以的数据需要进行加工,异常处理, 以保证 subscribe 收到的是正确的数据。我们下面来深入分析 pipe 方法。 我们将我们的Demo 修改如下:

import { fromEvent } from './esm2015';
import { throttleTime } from './esm2015/operators'

const addBtn = document.getElementById('add')
const minusBtn = document.getElementById('minus')
const nameInput = document.getElementById('name')
const addFromEventObj = fromEvent(addBtn, 'click')
const pipeObj = addFromEventObj
    .pipe(throttleTime(1000 * 3))
pipeObj.subscribe(() => {
        nameInput.value = +(nameInput.value) + 1
    })

const minusFromEventObj = fromEvent(minusBtn, 'click')

minusFromEventObj
    .pipe(throttleTime(1000 * 3))
    .subscribe(() => {
        nameInput.value = +(nameInput.value) - 1
    })
复制代码

上面我们通过 .pipe(throttleTime(1000 * 3)) 添加了 管道 ,其中传入了一个 throttleTime (节流)操作符, 下面是 pipe 方法的代码:

pipe(...operations) {
        if (operations.length === 0) {
            return this;
        }
        var opts = pipeFromArray(operations);
        var result = opts(this);
         return result;
    }
复制代码

从上面方法,我们可以看出pipe 方法,可以传入多个操作符, 我们现在先只看下简单的,只传入一个操作符的情况, 如果只传入一个参数 var opts = pipeFromArray(operations);opts 对应的就是我们传入的 throttleTime 函数返回的方法,也就是如下:

export function throttleTime(duration, scheduler = async, config = defaultThrottleConfig) {
    return (source) => {
        return source.lift(new ThrottleTimeOperator(duration, scheduler, config.leading, config.trailing));
    }
}
复制代码

其中 source 就是上面的 this , 也就是 Observable 对象,我们下面可以继续看下 lift 方法:

lift(operator) {
        const observable = new Observable();
        observable.source = this;
        observable.operator = operator;
        return observable;
    }
复制代码

返回了一个新的 observable 对象,只是在添加了 operator 属性。如下代码:

const addFromEventObj = fromEvent(addBtn, 'click')
const pipeObj = addFromEventObj
    .pipe(throttleTime(1000 * 3))
复制代码

pipeObj 如下图所示:

rxjs 源码分析1-(fromEvent)

所以 pipe 方法就是将一个(组)操作符挂载在一个新的 observable 对象的 operator 属性上。 我们还需要重新分析 subscribe 方法

subscribe(observerOrNext, error, complete) {
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
        if (operator) {
            operator.call(sink, this.source);
        }
        else {
            sink.add(
                this.source || 
                (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
                this._subscribe(sink) :
                this._trySubscribe(sink)
            );
        }
        if (config.useDeprecatedSynchronousErrorHandling) {
            if (sink.syncErrorThrowable) {
                sink.syncErrorThrowable = false;
                if (sink.syncErrorThrown) {
                    throw sink.syncErrorValue;
                }
            }
        }
        return sink;
    }
复制代码

我们在调用 subscribe 方法之前,我们已经调用了 pipe 方法, pipe 方法返回的对象, 已经有了 operator 操作符,所以上面的逻辑分支会走 if operator.call(sink, this.source); , call 代码如下:

call(subscriber, source) {
        return source.subscribe(new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler, this.leading, this.trailing));
    }
复制代码

重新调用了 subscribe 方法, 只是传入的第一个参数 observerOrNext 是一个ThrottleTimeSubscriber对象, 我们重新回到 subscribe 方法, const sink = toSubscriber(observerOrNext, error, complete); , 查看下:

export function toSubscriber(nextOrObserver, error, complete) {
    if (nextOrObserver) {
        if (nextOrObserver instanceof Subscriber) {
            return nextOrObserver;
        }
        if (nextOrObserver[rxSubscriberSymbol]) {
            return nextOrObserver[rxSubscriberSymbol]();
        }
    }
    if (!nextOrObserver && !error && !complete) {
        return new Subscriber(emptyObserver);
    }
    return new Subscriber(nextOrObserver, error, complete);
}
复制代码

从代码可以分析出,如果第一个参数 nextOrObserver 是一个 Subscriber 类型, 就直接返回这个对象, 而我们的 ThrottleTimeSubscriber 是继承与 Subscriber 对象的,所以 sink 就是一个 ThrottleTimeSubscriber 对象。 我们又重新回到 fromEvent 方法, 其中的subscriber指向的就是一个 ThrottleTimeSubscriber 对象。 subscriber.next(e); 调用的也是 ThrottleTimeSubscriber 对象的 next 方法。

return new Observable(subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    });
复制代码

后面我们来继续分析, 这个 operator 操作符是怎么起作用的。

throttleTime

上面我们已经分析, pipe 方法就是将一个(组)操作符挂载在一个新的 observable 对象的 operator 属性上。现在我们来分析操作符具体是怎么工作的,我们可以先以 throttleTime 操作符来进行分析。

上面我们已经分析了,我们在点击Button 的时候,会调用 handler 方法, 其中 subscriber 已经是一个 ThrottleTimeSubscriber 对象。

return new Observable(subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    });
复制代码

我们下面来分析: ThrottleTimeSubscriber.next(e) 方法, 其代码如下:

_next(value) {
        if (this.throttled) {
            if (this.trailing) {
                this._trailingValue = value;
                this._hasTrailingValue = true;
            }
        }
        else {
            this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })
            this.add(this.throttled);
            if (this.leading) {
                this.destination.next(value);
            }
        }
    }
复制代码

这个方法很重要

  1. 首先加了一个 throttled 标记变量, 用来标记是否已经启动了节流开关,一开始是为 undefinded 的 代码会进入 else 分支,
  2. 然后执行 this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this }) 给throttled赋值,下一次进来的时候, throttled 就有值了。
  3. 执行 this.destination.next(value); , 这个方法,会最终调用 subscribe 订阅方法中传递的方法,也就是
addFromEventObj = addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})
复制代码

总结:

这个方法是实现 throttleTime 节流的关键点, 通过判断 throttled 标记来判断是否要执行 subscribe 中的方法。

下面我们来重点分析: this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this }) .

首先: scheduler 指向的是 esm2015\internal\scheduler\AsyncAction.js 对象, 我们查看下其 schedule 方法:

schedule(state, delay = 0) {
        if (this.closed) {
            return this;
        }
        this.state = state;
        const id = this.id;
        const scheduler = this.scheduler;
        if (id != null) {
            this.id = this.recycleAsyncId(scheduler, id, delay);
        }
        this.pending = true;
        this.delay = delay;
        this.id = this.id || this.requestAsyncId(scheduler, this.id, delay);
        return this;
    }
复制代码

其中最重要的是: this.id = this.id || this.requestAsyncId(scheduler, this.id, delay); , 这里调用了一个 requestAsyncId 方法,其代码如下:

requestAsyncId(scheduler, id, delay = 0) {
        return setInterval(scheduler.flush.bind(scheduler, this), delay);
    }
复制代码

哈哈,这里设置了一个定时器 setInterval , 这就是 throttleTime 生效的一个关键点。

总结:

setInterval 实现的功能是定时去清除 throttled 变量值,从而达到,我们在调用throttleTime(1000*3)这个操作符后,点击一次按钮后,3S内不能再次点击,但是3S后,又可以点击的原理

这个定时器的第一个参数是 scheduler.flush.bind(scheduler, this) , 其对应的是: esm2015/internal/scheduler/AsyncScheduler.js 里的flush 方法,其代码如下:

flush(action) {
        const { actions } = this;
        if (this.active) {
            actions.push(action);
            return;
        }
        let error;
        this.active = true;
        do {
            if (error = action.execute(action.state, action.delay)) {
                break;
            }
        } while (action = actions.shift());
        this.active = false;
        if (error) {
            while (action = actions.shift()) {
                action.unsubscribe();
            }
            throw error;
        }
    }
复制代码

会去遍历所有的actions, 然后去执行 execute 方法, 传入的action 就是对应的 AsyncAction 对象, execute 方法如下:

execute(state, delay) {
        if (this.closed) {
            return new Error('executing a cancelled action');
        }
        this.pending = false;
        const error = this._execute(state, delay);
        if (error) {
            return error;
        }
        else if (this.pending === false && this.id != null) {
            this.id = this.recycleAsyncId(this.scheduler, this.id, null);
        }
    }
复制代码

然后会调用 const error = this._execute(state, delay); , _execute 代码如下:

_execute(state, delay) {
        let errored = false;
        let errorValue = undefined;
        try {
            this.work(state);
        }
        catch (e) {
            errored = true;
            errorValue = !!e && e || new Error(e);
        }
        if (errored) {
            this.unsubscribe();
            return errorValue;
        }
    }
复制代码

其中最重要的是调用了 this.work(state) 方法, workthis.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this }) 传递的第一个参数

function dispatchNext(arg) {
    const { subscriber } = arg;
    subscriber.clearThrottle();
}
复制代码

最终会调用 clearThrottle 方法,其代码如下:

clearThrottle() {
        const throttled = this.throttled;
        if (throttled) {
            if (this.trailing && this._hasTrailingValue) {
                this.destination.next(this._trailingValue);
                this._trailingValue = null;
                this._hasTrailingValue = false;
            }
            throttled.unsubscribe();
            this.remove(throttled);
            this.throttled = null;
        }
    }
复制代码

其中很关键的一个步骤是将throttled 这个标记,设置为null this.throttled = null; , 为什么说很重要呢, 还记得我们上面有分析 ThrottleTimeSubscriber , 这个class 下面的 _next 方法,其代码如下:

_next(value) {
        if (this.throttled) {
            if (this.trailing) {
                this._trailingValue = value;
                this._hasTrailingValue = true;
            }
        }
        else {
            this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })
            this.add(this.throttled);
            if (this.leading) {
                this.destination.next(value);
            }
        }
    }
复制代码

这个方法有判断 throttled 这个标记, 如果不为空,其实相当与什么也不做,只有为空的情况下,才会去执行 this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this }) , 也就是这个方法,会去执行我们 subscribe 订阅里面的方法, 从而达到了,节流的效果。

let addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj = addFromEventObj.pipe(throttleTime(1000 * 10))
addFromEventObj = addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})
复制代码

总结

上面我们已经简单的分析了Rxjs 的基本使用方式,下面是我们这篇文章涉及的几个基本概念

(Observable)可观察的对象, (subscriber)观察者, (pipe)管道, (throttleTime)操作符 下面我们根据这几个概念来总结下Rxjs基于流来处理数据的整个过程(被观察对象-> 数据处理-> 观察者)

  1. 首先需要创建一个可观察对象, 在 esm2015\internal\observable\ 文件夹下都是用来创建可观察对象的方法, 如上面我们Demo 用到的 fromEvent 就是其中一个,然后返回一个 Observable 对象, Observable 还有一个静态方法 create 可以直接创建一个个 Observable 对象
  2. pipe 就是将操作符挂载在 Observable 对象的 operator 属性上, 如果pipe 传递了多个操作符, 则在 source 属性(也是一个 Observable )对象的 operator , 层层递归,从右到左
addFromEventObj = addFromEventObj.pipe(throttleTime(1000 * 2),mapTo(1), scan((init, next) => init + next, 0))
复制代码

其对应的对象如下图:

rxjs 源码分析1-(fromEvent)
  1. 然后调用subscribe方法, 里面的第一个参数就是观察者,如下的subscriber 方法。
let addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj = addFromEventObj.pipe(throttleTime(1000 * 2),mapTo(1), scan((init, next) => init + next, 0))
const subscriber = value => {
    nameInput.value = value
}
addFromEventObj = addFromEventObj.subscribe(subscriber)
复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

About Face 3

About Face 3

Alan Cooper、Robert Reimann、David Cronin / John Wiley & Sons / 2007-5-15 / GBP 28.99

* The return of the authoritative bestseller includes all new content relevant to the popularization of how About Face maintains its relevance to new Web technologies such as AJAX and mobile platforms......一起来看看 《About Face 3》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试