内容简介:Android :RxJava2.0到底更新了什么?(含使用建议)
前言
-
Rxjava
由于其 基于事件流的链式调用、逻辑简洁 & 使用简单 的特点,深受各大Android
开发者的欢迎。
如果还不了解RxJava,请看文章: Android:这是一篇 清晰 & 易懂的Rxjava 入门教程
-
RxJava 2.0
已于2016 - 10.29正式发布,对RxJava 1.0
进行了1次重大升级:实际使用的API
及 方法有很大的区别但
RxJava 2.0
的使用思路 和RxJava 1.0
非常类似 -
同时,由于
RxJava 2.0
跟RxJava 1.0
不能共存在1个项目中,所以假如你在使用RxJava 1.0
需要升级到RxJava 2.0
,则需要做一些转变 - 今天,我将为大家带来
RxJava 2.0
相对于RxJava 1.0
的升级总结 & 从RxJava 1.0
升级到RxJava 2.0
需要注意的坑 ,希望大家会喜欢
- 本系列文章主要基于
Rxjava 2.0
- 接下来的时间, 我将持续推出
Android
中Rxjava 2.0
的一系列文章,包括原理、操作符、应用场景、背压等等 ,有兴趣可以继续关注 Carson_Ho的安卓开发笔记 !!
目录
1. 依赖包更改
- 由于
RxJava 2.0
跟RxJava 1.0
不能共存在1个项目中,所以依赖也不能共存,需要进行更换 - 改动如下
// 原本:`RxJava 1.0` 依赖 compile 'io.reactivex:rxandroid:1.2.0' compile 'io.reactivex:rxjava:1.1.5' // 更改:`RxJava 2.0` 依赖 compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.7' // 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在
2. 增加被观察者的新实现:Flowable
- 由于
RxJava 1.0
中 的被观察者Observable
不能很好地支持背压(Backpressure
) - 所以,在
RxJava 2.0
中 增加了被观察者的新实现Flowable
来支持背压Backpressure
- 而被观察者的旧实现
Observable
不再支持 背压Backpressure
-
Flowable
的使用与Observable
非常类似,关于使用具体请看文章: Android RxJava 背压策略:图文 + 实例 全面解析
- 而被观察者的旧实现
3. 创建被观察者(Observable) & 观察者(Observer) 方式的区别
在`RxJava 2.0 `中,创建被观察者(`Observable`) & 观察者(Observer)的方式也与`RxJava 1.0 `有些区别:
- 对于创建被观察者(
Observable
)
<-- RxJava 1.0 中 创建被观察者 --> Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); } }); <-- RxJava 2.0 中 创建被观察者 --> // 变化1:Observable.OnSubscribe接口名改成ObservableOnSubscribe Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() { // 变化2:复写的call(Subscriber)改成 subscribe (ObservableEmitter) // 注:参数也发生了变化,即Subscriber -> ObservableEmitter = 发射器 @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // 可发出三种类型的事件:next事件、complete事件&error事件 // 通过调用emitter.onNext(T value) 、onComplete()和onError(Throwable e) e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); e.onComplete(); } });
- 对于创建 观察者(
Observer
)
<-- RxJava 1.0 中 创建观察者(Observer) --> // 方法1:采用 Observer 接口 Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } }; // 方法2:采用 Subscriber 接口(实现了Observer接口的抽象类) // 与Observer接口的区别:对 Observer接口进行了扩展:onStart()、unsubscribe(),但使用方式基本类似 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } }; <-- RxJava 2.0 中 创建观察者(Observer) --> Observer<Integer> observer= new Observer<Integer>() { // 变化1:增加回调方法onSubscribe() // 作用:最先调用该方法,即适合做初始化工作 @Override public void onSubscribe(Disposable d) { // 传入的参数Disposable作用 类似于 Subsciption // 即相当于订阅关系的开关,即可切断 观察者和被观察者的订阅关系 // 注:调用dispose() = 观察者无法接收事件,但被观察者还是会继续发送事件 } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } // 变化2:onCompleted()改成 onComplete() @Override public void onComplete() { } }
4. 简化订阅方法
- 对于简化订阅的方式,
RxJava 1
主要采用ActionX
接口 &FuncX
接口 - 在
RxJava 2
中,主要是对这一系列接口的名字 按照Java8
的命名规则 进行了修改,而使用方法不变
4.1 ActionX 和 FuncX 改名
- 对于
ActionX
接口名的更改
RxJava 1 | RxJava 2 |
---|---|
Action0 | Action |
Action1 | Consumer(接收1个参数) |
Action2 | BiConsumer (接收2个参数) |
ActionN | Consumer |
<-- 示例1 --> Disposable disposable = observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //这里接收数据项 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //这里接收onError } }, new Action() { @Override public void run() throws Exception { //这里接收onComplete。 } }); <-- 示例2 --> flowable.subscribe( new Consumer<String>() {//相当于onNext @Override public void accept(String s) throws Exception { } }, new Consumer<Throwable>() {//相当于onError @Override public void accept(Throwable throwable) throws Exception { } }, new Action() {//相当于onComplete,注意这里是Action @Override public void run() throws Exception { } }, new Consumer<Subscription>() {//相当于onSubscribe @Override public void accept(Subscription subscription) throws Exception { } });
4.2 RxJava2的接口方法都允许抛出异常
即,接口方法里加上了 throws Exception
// Action接口 public interface Action { void run() throws Exception; } // Consumer接口 public interface Consumer<T> { void accept(T t) throws Exception; } // 注: // 1. 这意味着,在这些方法里调用会发生异常的方法不需要try-catch // 2. RxJava 2.0 不再支持 null 值,如果传入一个null会抛出 NullPointerException
5. 操作符的改变
- 对于操作符,
RxJava 1.0
与RxJava 2.0
在命名 & 行为上大多数保持了一致 - 需要强调的是
first()
、subscribeWith
()和compose()
操作符
5.1 first()操作符
- 改动如下
RxJava 1.0 | RxJava 2.0 |
---|---|
first() | 改名为:firstElement() |
first(Func1) | 弃用,改用为:filter(predicate).first() |
firstOrDefault(T) | 改名为:first(T) |
firstOrDefault(Func1, T) | 改名为:first(T) |
- 示例
<-- RxJava 1.0 --> Observable .concat(Observable.from(list)) .first(new Func1<Data, Boolean>() { @Override public Boolean call(Data data) { return DataUtils.isAvailable(data); } }).publish(); <-- RxJava 2.0 --> Observable .concat(Observable.fromIterable(list)) .filter(new Predicate<Data>() { @Override public boolean test(@NonNull Data data) throws Exception { return DataUtils.isAvailable(data); } }).firstElement().toObservable().publish();
5.2 subscribeWith()操作符
具体请看下图:
5.3 compose()操作符
主要变动在于:
1. RxJava 1.0
实现的是: rx.Observable.Transformer
接口
继承自 Func1<Observable<T>, Observable<R>>
<-- RxJava 1.0 中的用法 --> private static <T> Observable.Transformer<T, T> createIOSchedulers() { return new Observable.Transformer<T, T>() { @Override public Observable<T> call(Observable<T> tObservable) { return tObservable.subscribeOn(Schedulers.io()) .unsubscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()); } }; } public static <T> Observable.Transformer<JsonResult<T>,T> applySchedulers() { return createIOSchedulers(); } Action1<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Subscription subscription = Observable.from(items) .compose(RxUtil.<String>applySchedulers()) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.valueOf(s); } }) .subscribe(onNext);
-
RxJava 2.0
实现的是io.reactivex.ObservableTansformer<Upstream, Downstream>
一个独立的接口
<-- RxJava 2.0 中的用法 --> public static <T> ObservableTransformer<T, T> io2MainObservable() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); } }; } public static <T> ObservableTransformer<T, T> applySchedulers() { return io2MainObservable(); } Consumer<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Disposable disposable = Observable.fromArray(items) .compose(RxUtil.<String>applySchedulers()) .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.valueOf(s); } }) .subscribe(onNext);
6. 额外
6.1 新增Processor
-
作用类似于
Subject
& 继承自Flowable
= 支持背压控制而
Subject
则 不支持背压控制 -
使用如下
//Processor AsyncProcessor<String> processor = AsyncProcessor.create(); processor.subscribe(o -> Log.d("JG",o)); //three processor.onNext("one"); processor.onNext("two"); processor.onNext("three"); processor.onComplete(); //Subject AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe(o -> Log.d("JG",o));//three subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onComplete();
6.2 更改Single
-
Single
的作用类似于Observable
= 发送数据,但区别在于订阅后只能接受到1次 - 改动如下
<-- 源码分析 --> // 变动1:Single被重新设计为 Reactive-Streams架构,即SingleSubscriber 改为:SingleObserver interface SingleObserver<T> { // 变动2:多了一个回调方法 onSubscribe() void onSubscribe(Disposable d); void onSuccess(T value); void onError(Throwable error); } <-- 具体使用 --> Single<Long> single = Single.just(1l); single.subscribe(new SingleObserver<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(Long value) { // 和onNext是一样的 } @Override public void onError(Throwable e) { } }); // 注:普通Observable对象可通过toSingle()转换成Single对象 // 即,Observable.just(1).toSingle()
6.3 更改Completable
-
Completable
的作用类似于Observable
= 发送数据,但区别在于订阅后只能接受Complete
和onError
事件 - 改动如下
// 变动1:Completable被重新设计为 Reactive-Streams架构,即CompletableSubscriber 改为:CompletableObserver interface CompletableObserver<T> { void onSubscribe(Disposable d); void onComplete(); void onError(Throwable error); } <-- 具体使用 --> Completable<Long> Completable = Completable.just(1l); Completable.subscribe(new CompletableObserver<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onComplete(Long value) { } @Override public void onError(Throwable e) { } }); // 注:普通Observable对象可通过toCompletable()转换成Completable对象 // 即,Observable.just(1).toCompletable()
7. 使用建议
对于学习 & 在项目中使用 RxJava
的版本选择,我给出以下建议:
8. 总结
- 本文主要讲解了
RxJava 2.0
相对于RxJava 1.0
的变动 -
从上面可以看到,
RxJava 2.0
相对于RxJava 1.0
最大的改动, 主要是增加了被观察者的新实现:Flowable
& 用于背压(Backpressure
)的处理此处,我建议大家深入了解背压,请看文章: Android RxJava 背压策略:图文 + 实例 全面解析
-
接下来的时间, 我将持续推出
Android
中Rxjava 2.0
的一系列文章,包括原理、操作符、应用场景、背压等等 ,有兴趣可以继续关注 Carson_Ho的安卓开发笔记 !!
请点赞 / 评论帮顶!因为你的鼓励是我写作的最大动力!
以上所述就是小编给大家介绍的《Android :RxJava2.0到底更新了什么?(含使用建议)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Making Things See
Greg Borenstein / Make / 2012-2-3 / USD 39.99
Welcome to the Vision Revolution. With Microsoft's Kinect leading the way, you can now use 3D computer vision technology to build digital 3D models of people and objects that you can manipulate with g......一起来看看 《Making Things See》 这本书的介绍吧!