Android :RxJava2.0到底更新了什么?(含使用建议)

栏目: Java · 发布时间: 6年前

内容简介:Android :RxJava2.0到底更新了什么?(含使用建议)

前言

  • Rxjava 由于其 基于事件流的链式调用、逻辑简洁 & 使用简单 的特点,深受各大 Android 开发者的欢迎。

Android :RxJava2.0到底更新了什么?(含使用建议)

如果还不了解RxJava,请看文章: Android:这是一篇 清晰 & 易懂的Rxjava 入门教程

  • RxJava 2.0 已于2016 - 10.29正式发布,对 RxJava 1.0 进行了1次重大升级:实际使用的 API 及 方法有很大的区别

    RxJava 2.0 的使用思路 和 RxJava 1.0 非常类似

  • 同时,由于 RxJava 2.0RxJava 1.0 不能共存在1个项目中,所以假如你在使用 RxJava 1.0 需要升级到 RxJava 2.0 ,则需要做一些转变

  • 今天,我将为大家带来 RxJava 2.0 相对于 RxJava 1.0 的升级总结 & 从 RxJava 1.0 升级到 RxJava 2.0 需要注意的坑 ,希望大家会喜欢
  1. 本系列文章主要基于 Rxjava 2.0
  2. 接下来的时间, 我将持续推出 AndroidRxjava 2.0 的一系列文章,包括原理、操作符、应用场景、背压等等 ,有兴趣可以继续关注 Carson_Ho的安卓开发笔记 !!

Android :RxJava2.0到底更新了什么?(含使用建议)

目录

Android :RxJava2.0到底更新了什么?(含使用建议)

1. 依赖包更改

  • 由于 RxJava 2.0RxJava 1.0 不能共存在1个项目中,所以依赖也不能共存,需要进行更换
  • 改动如下
// 原本:`RxJava 1.0` 依赖
compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'

// 更改:`RxJava 2.0` 依赖
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'

// 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在

2. 增加被观察者的新实现:Flowable

  • 由于 RxJava 1.0 中 的被观察者 Observable 不能很好地支持背压( Backpressure
  • 所以,在 RxJava 2.0 增加了被观察者的新实现 Flowable 来支持背压 Backpressure
    1. 而被观察者的旧实现 Observable 不再支持 背压 Backpressure
    2. Flowable 的使用与 Observable 非常类似,关于使用具体请看文章: Android RxJava 背压策略:图文 + 实例 全面解析

3. 创建被观察者(Observable) & 观察者(Observer) 方式的区别

在`RxJava 2.0 `中,创建被观察者(`Observable`) & 观察者(Observer)的方式也与`RxJava 1.0 `有些区别:

  • 对于创建被观察者( Observable
<-- RxJava 1.0 中 创建被观察者 -->
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

<-- RxJava 2.0 中 创建被观察者 -->
// 变化1:Observable.OnSubscribe接口名改成ObservableOnSubscribe 
Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {

            // 变化2:复写的call(Subscriber)改成 subscribe (ObservableEmitter)    
            // 注:参数也发生了变化,即Subscriber -> ObservableEmitter = 发射器
           @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 可发出三种类型的事件:next事件、complete事件&error事件
                // 通过调用emitter.onNext(T value) 、onComplete()和onError(Throwable e)
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onComplete();
            }
        });
  • 对于创建 观察者( Observer
<-- RxJava 1.0 中 创建观察者(Observer) -->
// 方法1:采用 Observer 接口
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

// 方法2:采用 Subscriber 接口(实现了Observer接口的抽象类)
// 与Observer接口的区别:对 Observer接口进行了扩展:onStart()、unsubscribe(),但使用方式基本类似

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

<-- RxJava 2.0 中 创建观察者(Observer) -->
        Observer<Integer> observer= new Observer<Integer>() {

            // 变化1:增加回调方法onSubscribe()
            // 作用:最先调用该方法,即适合做初始化工作
            @Override
            public void onSubscribe(Disposable d) {
            // 传入的参数Disposable作用 类似于 Subsciption
            // 即相当于订阅关系的开关,即可切断 观察者和被观察者的订阅关系
            // 注:调用dispose() = 观察者无法接收事件,但被观察者还是会继续发送事件
            }

            @Override
            public void onNext(Integer value) {
            }

            @Override
            public void onError(Throwable e) {
            }
            // 变化2:onCompleted()改成 onComplete()
            @Override
            public void onComplete() {
            }
        }

4. 简化订阅方法

  • 对于简化订阅的方式, RxJava 1 主要采用 ActionX 接口 & FuncX 接口
  • RxJava 2 中,主要是对这一系列接口的名字 按照 Java8 的命名规则 进行了修改,而使用方法不变

4.1 ActionX 和 FuncX 改名

  • 对于 ActionX 接口名的更改
RxJava 1 RxJava 2
Action0 Action
Action1 Consumer(接收1个参数)
Action2 BiConsumer (接收2个参数)
ActionN Consumer
<-- 示例1 -->
   Disposable disposable = observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                  //这里接收数据项
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
              //这里接收onError
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
              //这里接收onComplete。
            }
        });

<-- 示例2 -->
flowable.subscribe(
        new Consumer<String>() {//相当于onNext
            @Override
            public void accept(String s) throws Exception {
            }
        }, new Consumer<Throwable>() {//相当于onError
            @Override
            public void accept(Throwable throwable) throws Exception {
            }
        }, new Action() {//相当于onComplete,注意这里是Action
            @Override
            public void run() throws Exception {
            }
        }, new Consumer<Subscription>() {//相当于onSubscribe
            @Override
            public void accept(Subscription subscription) throws Exception {
            }
        });

4.2 RxJava2的接口方法都允许抛出异常

即,接口方法里加上了 throws Exception

// Action接口
public interface Action {
    void run() throws Exception;
}

// Consumer接口
public interface Consumer<T> {
    void accept(T t) throws Exception;
}

// 注:
  // 1. 这意味着,在这些方法里调用会发生异常的方法不需要try-catch
  // 2. RxJava 2.0 不再支持 null 值,如果传入一个null会抛出 NullPointerException

5. 操作符的改变

  • 对于操作符, RxJava 1.0RxJava 2.0 在命名 & 行为上大多数保持了一致
  • 需要强调的是 first()subscribeWith ()和 compose() 操作符

5.1 first()操作符

  • 改动如下
RxJava 1.0 RxJava 2.0
first() 改名为:firstElement()
first(Func1) 弃用,改用为:filter(predicate).first()
firstOrDefault(T) 改名为:first(T)
firstOrDefault(Func1, T) 改名为:first(T)

- 示例

<-- RxJava 1.0 -->
Observable
          .concat(Observable.from(list))
          .first(new Func1<Data, Boolean>() {
                @Override
                public Boolean call(Data data) {
                    return DataUtils.isAvailable(data);
                }
            }).publish();

<-- RxJava 2.0 -->
Observable
          .concat(Observable.fromIterable(list))
          .filter(new Predicate<Data>() {

                @Override
                public boolean test(@NonNull Data data) throws Exception {
                    return DataUtils.isAvailable(data);
                }
            }).firstElement().toObservable().publish();

5.2 subscribeWith()操作符

具体请看下图:

Android :RxJava2.0到底更新了什么?(含使用建议)

5.3 compose()操作符

主要变动在于:

1. RxJava 1.0 实现的是: rx.Observable.Transformer 接口

继承自 Func1<Observable<T>, Observable<R>>

<-- RxJava 1.0 中的用法 -->
private static <T> Observable.Transformer<T, T> createIOSchedulers() {
        return new Observable.Transformer<T, T>() {
            @Override
            public Observable<T> call(Observable<T> tObservable) {
                return tObservable.subscribeOn(Schedulers.io())
                        .unsubscribeOn(AndroidSchedulers.mainThread())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

    public static <T> Observable.Transformer<JsonResult<T>,T> applySchedulers() {
        return createIOSchedulers();
    }

Action1<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Subscription subscription = Observable.from(items)
                                      .compose(RxUtil.<String>applySchedulers())
                                      .map(new Func1<String, Integer>() {
                                                  @Override public Integer call(String s) {
                                                      return Integer.valueOf(s);
                                                  }
                                              })
                                      .subscribe(onNext);
  1. RxJava 2.0 实现的是 io.reactivex.ObservableTansformer<Upstream, Downstream>

一个独立的接口

<-- RxJava 2.0 中的用法 -->
public static <T> ObservableTransformer<T, T> io2MainObservable() {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

    public static <T> ObservableTransformer<T, T> applySchedulers() {
        return io2MainObservable();
    }

Consumer<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Disposable disposable = Observable.fromArray(items)
                                  .compose(RxUtil.<String>applySchedulers())
                                  .map(new Function<String, Integer>() {
                                              @Override public Integer apply(String s) throws Exception {
                                                  return Integer.valueOf(s);
                                              }
                                          })
                                  .subscribe(onNext);

6. 额外

6.1 新增Processor

  • 作用类似于 Subject & 继承自 Flowable = 支持背压控制

    Subject 则 不支持背压控制

  • 使用如下

//Processor
    AsyncProcessor<String> processor = AsyncProcessor.create();
    processor.subscribe(o -> Log.d("JG",o)); //three
    processor.onNext("one");
    processor.onNext("two");
    processor.onNext("three");
    processor.onComplete();

//Subject
    AsyncSubject<String> subject = AsyncSubject.create();
    subject.subscribe(o -> Log.d("JG",o));//three
    subject.onNext("one");
    subject.onNext("two");
    subject.onNext("three");
    subject.onComplete();

6.2 更改Single

  • Single 的作用类似于 Observable = 发送数据,但区别在于订阅后只能接受到1次
  • 改动如下
<-- 源码分析 -->
// 变动1:Single被重新设计为 Reactive-Streams架构,即SingleSubscriber 改为:SingleObserver
interface SingleObserver<T> {
    // 变动2:多了一个回调方法 onSubscribe()
    void onSubscribe(Disposable d); 
    void onSuccess(T value);
    void onError(Throwable error);
}

<-- 具体使用 -->
Single<Long> single = Single.just(1l);

single.subscribe(new SingleObserver<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onSuccess(Long value) {
        // 和onNext是一样的
    }

    @Override
    public void onError(Throwable e) {
    }
});

// 注:普通Observable对象可通过toSingle()转换成Single对象
// 即,Observable.just(1).toSingle()

6.3 更改Completable

  • Completable 的作用类似于 Observable = 发送数据,但区别在于订阅后只能接受 CompleteonError 事件
  • 改动如下
// 变动1:Completable被重新设计为 Reactive-Streams架构,即CompletableSubscriber 改为:CompletableObserver
interface CompletableObserver<T> {
    void onSubscribe(Disposable d);
    void onComplete();
    void onError(Throwable error);
}

<-- 具体使用 -->
Completable<Long> Completable = Completable.just(1l);
Completable.subscribe(new CompletableObserver<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onComplete(Long value) {

    }

    @Override
    public void onError(Throwable e) {
    }
});

// 注:普通Observable对象可通过toCompletable()转换成Completable对象
// 即,Observable.just(1).toCompletable()

7. 使用建议

对于学习 & 在项目中使用 RxJava 的版本选择,我给出以下建议:

Android :RxJava2.0到底更新了什么?(含使用建议)

8. 总结

  • 本文主要讲解了 RxJava 2.0 相对于 RxJava 1.0 的变动
  • 从上面可以看到, RxJava 2.0 相对于 RxJava 1.0 最大的改动, 主要是增加了被观察者的新实现: Flowable & 用于背压( Backpressure )的处理

    此处,我建议大家深入了解背压,请看文章: Android RxJava 背压策略:图文 + 实例 全面解析

  • 接下来的时间, 我将持续推出 AndroidRxjava 2.0 的一系列文章,包括原理、操作符、应用场景、背压等等 ,有兴趣可以继续关注 Carson_Ho的安卓开发笔记 !!

Android :RxJava2.0到底更新了什么?(含使用建议)

请点赞 / 评论帮顶!因为你的鼓励是我写作的最大动力!


以上所述就是小编给大家介绍的《Android :RxJava2.0到底更新了什么?(含使用建议)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Making Things See

Making Things See

Greg Borenstein / Make / 2012-2-3 / USD 39.99

Welcome to the Vision Revolution. With Microsoft's Kinect leading the way, you can now use 3D computer vision technology to build digital 3D models of people and objects that you can manipulate with g......一起来看看 《Making Things See》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具