EventBus源码分析

栏目: IOS · Android · 发布时间: 4年前

内容简介:首先从订阅开始

首先从订阅开始

EventBus.getDefault().register(this);
复制代码

register 方法会获取传入的object对象的class,通过 findSubscriberMethods 方法来查找这个class中订阅的方法,如下

public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
复制代码

findSubscriberMethods 方法实现如下,其中有一个 ConcurrentHashMap 类型的静态对象 METHOD_CACHE ,是用来缓存对应类的订阅方法的,以便后续再次订阅时不用重新去findMethods,可以直接从缓存中读取。

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
复制代码

查找订阅方法通过 ignoreGeneratedIndex 字段分为两种方式

第一种 findUsingReflection 是通过反射来查找,找到被 @Subscribe 注解修饰的方法,并且根据具体的注解以及方法参数生成一个 SubscriberMethod 对象:

findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
复制代码

第二种 findUsingInfo 是通过apt的方式,提前找到订阅的方法,可以避免通过反射查找方法带来的耗时。

具体使用方法:在gradle配置apt,rebuild项目,会生成一个注解方法索引类,在 EventBusBuilder 中通过 addIndex 方法新建一个该类的对象传入即可。

这边还有一个问题,对于子类重写父类的订阅方法如何处理。在上面的两种方式中在查找完子类的订阅方法后都会继续去查找父类的订阅方法,都通过一个叫做 checkAdd 的方法进行支撑,该方法返回 true 表示可以添加到订阅方法的集合中去。

boolean checkAdd(Method method, Class<?> eventType) {
    // 2 level check: 1st level with event type only (fast), 2ndlevelwith complete signature when required.
    // Usually a subscriber doesn't have methods listening to thesameevent type.
    Object existing = anyMethodByEventType.put(eventType, method);
    if (existing == null) {
        return true;
    } else {
        if (existing instanceof Method) {
            if (!checkAddWithMethodSignature((Method)existing,eventType)) {
                // Paranoia check
                throw new IllegalStateException();
            }
            // Put any non-Method object to "consume" theexistingMethod
            anyMethodByEventType.put(eventType, this);
        }
        return checkAddWithMethodSignature(method, eventType);
    }
}
复制代码

checkAdd 中设置了两种检查方式,第一种是通过 eventType 也就是订阅方法的入参来检查,这种方式比较快,只需要看下之前有没有这种入参的方法就可以了。注释中也指出了通常一个类不会有多个入参相同的订阅方法。

第二种是通过 checkAddWithMethodSignature 方法来检查,源码如下:

private boolean checkAddWithMethodSignature(Method method,Class<?>eventType) {
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(method.getName());
    methodKeyBuilder.append('>').append(eventType.getName());
    String methodKey = methodKeyBuilder.toString();
    Class<?> methodClass = method.getDeclaringClass();
    Class<?> methodClassOld=subscriberClassByMethodKey.put(methodKey, methodClass);
    if (methodClassOld == null||methodClassOld.isAssignableFrom(methodClass)) {
        // Only add if not already found in a sub class
        return true;
    } else {
        // Revert the put, old class is further down theclasshierarchy
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false;
    }
}
复制代码

通过 method 以及 eventType 来生成一个key,来存储方法所在的类。其中 methodClassOld == null ||methodClassOld.isAssignableFrom(methodClass) 这个判断条件对应着两种情况, methodClassOld == null 说明是入参相同但是方法名不同的方法正在被添加,直接返回 true 就可以了 methodClassOld.isAssignableFrom(methodClass) 这个条件是为了过滤掉父类被子类重写的方法,前面说过了查找订阅方法是从子类开始遍历的,此时如果子类重写了父类的订阅方法,那么 methodClassOld 对应的是子类, methodClass 对应的是父类,显然这个判断就会为false,之后进入下面的else分支 return false ,也就是忽略掉父类被子类重写的方法。

查找订阅方法基本就这么点,查找完毕之后需要执行订阅操作,也就是 register 方法中的 subscribe(subscriber, subscriberMethod); 操作,直接看下该方法的实现:

private void subscribe(Object subscriber, SubscriberMethodsubscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber,subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions =subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " +subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    List<Class<?>> subscribedEvents =typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventTypehave to be considered.
            // Note: Iterating over all events may be inefficientwith lots of sticky events,
            // thus data structure should be changed to allow a moreefficient lookup
            // (e.g. an additional map storing sub classes of superclasses: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries =stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                checkPostStickyEventToSubscription(newSubscriptin, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription,stickyEvent);
        }
    }
}
复制代码

subscriptionsByEventType 这个Map是将 eventType 作为key保存其所对应的订阅方法的集合。该方法将刚查找到的方法添加到对应的集合中去,添加时有这样一层判断 i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority 这表示这个集合里的方法会按照所设定的优先级进行排序。紧接着又出现了个Map typesBySubscriber 将订阅者作为key保存一个 Class 的集合,暂时看不出有啥用,就先不管,最后再检查下是不是粘性事件,如果是粘性事件就根据所保存的粘性事件来执行该方法。 eventInheritance 也是在bulider中设置的,如果为 true 则会考虑事件的继承性,如果现在有 eventType 为正在订阅的方法的 eventType 的子类的粘性事件存在,那么这个粘性事件也会被正在订阅的方法接收到,直接说可能比较绕,举个栗子,现在我有两个事件,其中一个是另一个的子类,并且有两个粘性订阅方法,如下:

class EventMessage {
  
    }

    class SubEventMessage extends EventMessage {

    }
    
    @Subscribe(sticky =  true)
    public void onEvent(EventMessage message) {
        // do something
    }

    @Subscribe(sticky =  true)
    public void onSubEvent(SubEventMessage message) {
        // do something
    }
复制代码

当执行 register 时,如果内存中存在着一个类型为 SubEventMessage 的事件,那么订阅的时候 onEvent 方法会被执行,入参是内存中类型为 SubEventMessage 的事件。

现在 register 大致就分析完了,再来看下 unregister 方法:

public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }
复制代码

unregister 方法十分简单, typesBySubscriber 是刚才在进行订阅的时候不知道用来干什么的Map,现在知道是在取消订阅时用到的,这个Map将订阅者作为key,将其所有的订阅方法的 eventType 存入到对应的List中去,取消订阅时将这个List取出来,遍历去移除对应的订阅方法,具体实现在 unsubscribeByEventType 中,也十分简单,就不赘述了。

订阅和取消订阅都看过了,还差个发送事件,发送事件分为 postpostSticky 两种,先看 post

public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
复制代码

currentPostingThreadState 是个 ThreadLocal ,然后从中取出当前线程的 postingState ,也就是说每个线程都会维护一个自己的 posting 状态,之后会有个循环将事件队列清空,通过 postSingleEvent 方法来进一步处理:

private void postSingleEvent(Object event, PostingThreadStatepostingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event,postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event,postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered forevent " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass !=NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}
复制代码

同样是通过 eventInheritance 来判断是否要涉及 eventType 的父类,之后再通过 postSingleEventForEventType 方法的返回值来得到该事件是否被处理,如果没有被处理,那么会返回 false 进入下一个分支, logNoSubscriberMessagessendNoSubscriberEvents 都是在 builder 中传入的,前者用于没有订阅者处理事件时打印日志,后者用于没有订阅者处理事件时发送一个 NoSubscriberEvent 类型的事件,所以具体是怎么处理事件的还要继续看 postSingleEventForEventType 方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }
复制代码

postSingleEventForEventType 方法从 subscriptionsByEventType 中去获取对应事件类型的所有订阅者,如果没有订阅者就返回 false 表示事件没有被处理,否则就遍历所有的订阅者,通过 postToSubscription 方法来处理事件,接着往里看:

private void postToSubscription(Subscription subscription, Objectevent, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster notdecoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " +subscription.subscriberMethod.threadMode);
    }
}
复制代码

在这个方法内终于看到通过区分注解中的 threadMode 来区分不同的处理方式了,先来看下这几种 threadMode 分别代表什么意思:

Mode 含义
POSTING 在当前线程执行
MAIN 在主线程执行
MAIN_ORDERED 在主线程有序执行
BACKGROUND 在后台线程执行
ASYNC 在新的线程执行

可以看到有几个差不多,那具体有什么区别呢?直接从代码里看,先说明几个东西, invokeSubscriber 就是直接调用订阅方法,还有几个后缀为 poster 的变量暂时先理解为调用了 enqueue 方法后,订阅方法就会在某个时间被执行,后面再详细讲。

现在可以看代码了, POSTING 没什么好说的,直接调用 invokeSubscriber ,也就是说在调用 eventBus.post 的线程执行。

MAINMAIN_ORDERED 都是在主线程执行,后者的 ORDERED 体现在什么地方呢,先看下 MAIN 的分支,其中通过 mainThreadPoster.enqueue 插入的事件会在主线程执行,判断当前线程是否是主线程来决定直接调用订阅方法还是通过 mainThreadPoster 来发布,这里应该没什么疑惑的,主要是 MAIN_ORDERED

if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster notdecoupled from subscriber
                invokeSubscriber(subscription, event);
            }
复制代码

mainThreadPoster 不为空时,通过 mainThreadPoster 来发布事件,为空时直接调用订阅方法,说好的在主线程调用呢?这里注释也说明了是不正确的,实际上 mainThreadPoster 为空本身就是种异常情况,具体可以看下它的初始化过程,这里就不细说了。所以下面的 else 分支就先不管了,那么为什么说通过 mainThreadPoster 发布的事件就是“有序”的呢,实际上 mainThreadPoster 内部实现是个 handler ,可以将事件 post 到主线程中去执行,所以说是有序的,这里简单说明下原因:

主线程维护着一个消息队列,循环从里面取出消息来处理,我们知道可以通过 viewpost 方法来获取它绘制完成之后的宽高,原因是 post 方法里的事件会被插入到消息队列的尾部,而 viewmeasure , layout , draw 都在新插入的消息的前面,所以当 post 的方法执行时, view 肯定已经绘制好了。

handler 通过 sendMessage 发送的消息也会被插入到主线程消息队列的尾部,这就是“有序”,比如现在有一个 ImageView ,在它的 onMeasure 中去发布一个事件,如果订阅方法的模式是 MAIN 那么会在 onMeasure 中调用订阅方法,而如果模式是 MAIN_ORDERED 那么会在 ImageView 绘制完成后调用订阅方法。

再来看下 BACKGROUNDASYNC 的区别:

case BACKGROUND:
    if (isMainThread) {
        backgroundPoster.enqueue(subscription, event);
    } else {
        invokeSubscriber(subscription, event);
    }
    break;
case ASYNC:
    asyncPoster.enqueue(subscription, event);
    break;
复制代码

其中 backgroundPosterasyncPoster 都会开启一个新线程来执行订阅方法,暂时当成是一样的就行,那么区别就是 BACKGROUND 模式如果在子线程 post 一个事件,那么会直接在该线程调用订阅方法,只有在主线程 post 事件才会开启一个新线程。而 ASYNC 模式,不管是在哪 post 事件,都会开启一个新线程来调用订阅方法。

最后再看下几个 poster 基本上就看完了,几个 poster 都实现了同一个接口 Poster

interface Poster {

    /**
     * Enqueue an event to be posted for a particular subscription.
     *
     * @param subscription Subscription which will receive the event.
     * @param event        Event that will be posted to subscribers.
     */
    void enqueue(Subscription subscription, Object event);
}
复制代码

可以看到里面只有一个需要实现的方法 enqueue ,是用来插入事件的,这个接口被三个类实现,分别是 HandlerPosterBackgroundPosterAsyncPoster ,上面的 mainThreadPoster 对应的就是 HandlerPoster ,这三个类中都有个类型为 PendingPostQueue 的成员变量,这是个事件队列,具体实现就不看了,这个队列提供了入队和出队的方法。

先看下 HandlerPosterenqueue 方法:

public class HandlerPoster extends Handler implements Poster {
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
}
复制代码

HandlerPoster 继承了 Handler ,调用 enqueue 方法后会向事件队列中插入一个事件,然后将标记位 handlerActive 设置为 true 表示正在处理事件,然后调用 sendMessage 发送消息通知处理事件。 PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); 这行是用来获取一个消息队列的 Node 用来插入到队列中去, EventBus 维护着一个 pool 用来保存闲置的 Node 当有需要时从中取出一个给事件使用, pool 不够用时才会 new 新的 Node 出来,具体可以看下 PendingPost ,这样做的好处是可以避免频繁创建对象带来的开销。

再看下 HandlerPosterhandleMessage 方法:

public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
复制代码

首先会记录下开始处理事件的时间,然后从事件队列中取出事件,如果为空就将 handlerActive 设置为 false 直接 return 了,如果不为空,就调用 eventBus.invokeSubscriber(pendingPost); 来调用订阅方法,执行完后,再看下时间,如果超出了规定的时间那么重新发送一条消息,本次消息处理结束,等下次轮到自己的时候再处理事件,毕竟不能一直处理队列里的事件而阻塞了主线程,如果没有超出规定事件,那么说明还可以有事件可以处理下一个事件,就会再次进入循环。

BackgroundPosterAsyncPoster 其实和 HandlerPoster 差不多,只是没有用 Handler 而是用了线程池去处理事件,具体就不看了。

对了,还有个发送粘性事件:

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriberwants to remove immediately
    post(event);
}
复制代码

就是在 stickyEvents 这个 map 里存一下。

好了,完了。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

大数据时代小数据分析

大数据时代小数据分析

屈泽中 / 电子工业出版社 / 2015-7-1 / 69.00元

《大数据时代小数据分析》是一本大数据时代下进行小数据分析的入门级教材,通过数据分析的知识点,将各类分析工具进行串联和对比,例如:在进行线性规划的时候可以选择使用Excel或LINGO或Crystal Ball。工具的应用难易结合,让读者循序渐进地学习相关工具。JMP和Mintab用来分析数据,分析的结果使用Excel、LINGO、Crystal Ball来建立数据模型,最后使用Xcelsius来动......一起来看看 《大数据时代小数据分析》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具