深入理解RxJava中的观察者模式

栏目: Java · 发布时间: 7年前

内容简介:深入理解RxJava中的观察者模式

对于Android开发者来说,RxJava应该是不陌生的,如果读者不了解RxJava是什么,推荐两篇文章给大家

通过这两篇文章相信你能对RxJava有一个初步的了解,这篇文章不打算介绍RxJava的具体使用方式,因为我觉得使用方式在文档中其实写的很清楚,这些属于“术”的层面,站在更高层次去理解RxJava的原理和核心内容才是“道”,只有“道”和“术”相结合才能真正的掌握RxJava,本篇文章介绍RxJava中的观察者模式。

RxJava中观察者模式

首先,我们来对RxJava做一个简单的定义和总结,来说明RxJava是什么,引用官方文档的一句话

Rx = Observables + LINQ + Schedulers

Rx本质上是一种编程模型,这种模型具有以下三个特性:

  • Observables 使用观察者模式来实现
  • LINQ 具有LINQ的特性,最主要其实使用了Lambda表达式
  • Schedulers 线程调度器

而RxJava 本质上就是使用 Java 语言实现了Rx的编程模型,通过RxJava解决了Android开发中的回调地狱,线程切换,最重要的是让业务的可扩展性更强,业务代码书写更规范。 今天我们来聊聊Rx模型的第一个特性- 观察者模式

要想理解RxJava中的观察者模式我们先了解一下RxJava中的几个简单的概念:

  • Observer/Subscriber
  • Observable

Observer:英文翻译为观察者,顾名思义它是一个观察者对象(Observer)或者称为订阅者(Subscriber),A对象订阅了某个事件,A对象观察了某个事件。A就是观察者。

Observable既然有观察者,那必然有被观察者,Observable就是被观察者,或者更专业一点说: 可观察事对象(Observable)

一个观察者(Observer)订阅一个可观察对象(Observable),观察者对Observable发射的数据或数据序列作出响应。 这句话其实也解释了为什么Rx被称为响应式编程

举一个很简单的例子来对RxJava是如何串联这几个对象做到响应式编程的,

假设我们需要订阅A地的天气状况,当A地天气发生变化或者气温下降的时候通知到对应的订阅者。

RxJava2中是这样处理的,

Observable.create(new ObservableOnSubscribe<Weather>() {
            @Override
            public void subscribe(ObservableEmitter<Weather> observableEmitter) throws Exception {
                if (!observableEmitter.isDisposed()) {
                    Weather weather = WeatherHelper.getCityWeather("A");
                    if(weather != null){
                        observableEmitter.onNext(weather);
                    } else {
                        observableEmitter.onError(new Exception("获取A地天气失败"));
                    }
                    observableEmitter.onComplete();
                }
            }
        }).map(new Function<Weather, Weather>() {
            @Override
            public Weather apply(Weather weather) throws Exception {
                // 各种变换
                return weather;
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<Weather>() {
            @Override
            public void accept(Weather weather) throws Exception {
                // 处理成功获取天气后的回调
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                // 处理成功获取天气失败的回调
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                // 结束方法
            }
        });

使用输入输出流表示

深入理解RxJava中的观察者模式

上面代码省略了对数据的变换和组合操作,做最基础的代码分析。 分析上面的过程发现

new ObservableOnSubscribe()

对应的是Observable的可观察对象;

new Consumer()

对应的是观察者对象,Rx在观察者的基础上扩展了2个对象 new Consumer() / new Action() 分别用于处理异常和一次任务完成的回调处理;使用了泛型的方式来处理不同可观察对象的数据传递。

从代码层面来分析他是如何把各个对象组合成观察者模式的: 大概的图示如下:

深入理解RxJava中的观察者模式

为什么图这样的?

  • Observable对象创建了一个可观察对象ObservableOnSubscribe
  • 中间观察对象数据做各种变换处理(这个代码中省略),本质上还是返回了一个ObservableOnSubscri对象
  • subscribeOn和observeOn对线程做简单的调度处理
  • Observable对象subscribe了Observer
  • 观察者主动去拉取可观察者的数据,

观察者主动去拉取可观察者的数据,这一点是从源码角度分析得出的,

@SchedulerSupport("none")
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");

        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            this.subscribeActual(observer);
        } catch (NullPointerException var4) {
            throw var4;
        } catch (Throwable var5) {
            Exceptions.throwIfFatal(var5);
            RxJavaPlugins.onError(var5);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(var5);
            throw npe;
        }
    }

简单来理解就是A().B().C().subscribe(xxx);A().B().C()依然返回了一个 Observable对象,最终执行了subscribe方法来,源码中可以看到

subscribeActual(observer);

方法最终拉取可观察者的数据,也就是执行了ObservableOnSubscribe的subscribe方法。

至此,我们有发现有几个问题无法解释:

  • 为什么是Observable对象订阅了Observer,传统的观察者模式不应该是Observer订阅Observable吗?
  • 为什么是Observer去主动拉取事件信息,而不是可观察对象主动推动呢?是否有违观察者模式的定义?

观察者模式的定义和变种

聊到这里必须理清一下什么是观察者模式,观察者模式真的有我们所理解的这么简单吗?我们先看看维基百科关于观察者模式的简洁

观察者模式是软件 设计模式 的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实时事件处理系统。

深入理解RxJava中的观察者模式

注意这一句话“ 并且在它本身的状态改变时主动发出通知 ”,这是官方的设计模式UML图,实际上在观察者模式中还存在着另外一种观察模式,那就是拉取模型:

  • 推模型,事件通知源主动发起消息推送,订阅者自动接收数据
  • 拉模型,订阅者主动去拉取事件通知

拉取模型和推送模型唯一的区别在于notify方法的触发是依赖于订阅者主动发起通知,事件源才会把数据推送过来的。在执行notify方法的时候,会主动请求事件源的数据:

@Override
    public void notify(Subject subject) {
        // 主动去事件源里拿数据
        State state = ((ConcreteSubjectA) subject).getState();
        System.out.println(getName() + "观察者状态更新为:" + state);
    }

这就是推模型的实现;在Android中典型如消息推动,如果App正常接受消息,那么消息通知是推动过去的,如果App没有启动,那么App会主动去消息服务端拉取未接收的消息。

理解RxJava中的观察者模式

明白了观察者模式的2种基本模型,我们来理解一下前面的2个问题

  • 为什么是Observable对象订阅了Observer,传统的观察者模式不应该是Observer订阅Observable吗?
  • 为什么是Observer去主动拉取事件信息,而不是可观察对象主动推动呢?是否有违观察者模式的定义?

第一层意义:subscribe方法本质上是一个注册方法,所以这就解释了第一个问题,并不是Observable对象订阅了Observer,而是向Observable注册了一个观察者(Observer); 第二层意义:subscribe方法在注册了一个观察者的同时,向事件源拉进行消息的拉取

@SchedulerSupport("none")
   public final void subscribe(Observer<? super T> observer) {
       ObjectHelper.requireNonNull(observer, "observer is null");

       try {
           observer = RxJavaPlugins.onSubscribe(this, observer);
           ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
           this.subscribeActual(observer);
       } catch (NullPointerException var4) {
           throw var4;
       } catch (Throwable var5) {
           Exceptions.throwIfFatal(var5);
           RxJavaPlugins.onError(var5);
           NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
           npe.initCause(var5);
           throw npe;
       }
   }

subscribeActual就是向事件源拉取消息,类似于前面提到的getState()方法。只不过这个方法不直接返回值,而是通过Consumer回调的方式返回。

至此,整个过程就非常清晰了:

创建一个事件源(creat方法)-》对数据源做变换(Map..)-》通过subscribe方法注册一个观察者,并执行拉取数据操作-》线程变换-》通过回调返回数据结果

总结

RxJava中对观察者模式的理解是最关键的,理解了观察者模式后,才能对RxJava的整个设计有本质的了解,知其然更要知其所以然,subscribe方法的两层含义需要结合源码仔细体会,然后在实践中不断的加深理解,才能做到融会贯通。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Understanding Machine Learning

Understanding Machine Learning

Shai Shalev-Shwartz、Shai Ben-David / Cambridge University Press / 2014 / USD 48.51

Machine learning is one of the fastest growing areas of computer science, with far-reaching applications. The aim of this textbook is to introduce machine learning, and the algorithmic paradigms it of......一起来看看 《Understanding Machine Learning》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具