内容简介:我们来看下RxJava自身对其的解释来祭出google翻译,当数据流通过异步步骤时,每个步骤可以以不同的速度执行不同的操作。
我们来看下RxJava自身对其的解释
When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, a so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.
来祭出google翻译,
当数据流通过异步步骤时,每个步骤可以以不同的速度执行不同的操作。
为了避免压倒这些步骤,这些步骤通常表现为由于临时缓冲或需要跳过/丢弃数据而增加的内存使用量,所以应用所谓的背压,这是流量控制的一种形式,其中步骤可以表达多少
物品准备好了。
这允许在通常无法知道上游将向其发送多少项的步骤的情况下约束数据流的存储器使用。
简而言之, 当 异步
情况下, 上流(被观察者)发送数据过快, 而下流(消费者)无法及时处理数据, 导致缓存内存增大, 最终导致OOM, 这个时候, 背压就是为了处理这种情况.
Observable / Observer
我们都知道 RxJava2
主要是增加了使用背压机制下调用的api, 而原来的 Observable / Observer
组合是不支持背压的, 那么我们先回顾下原来的 Observable / Observer
组合使用的订阅流程
Observable // 新建 ObservableCreate .create(ObservableOnSubscribe<Int> { var i = 0 while (true){ it.onNext(i ++) } }) // 新建 ObservableSubscribeOn .subscribeOn(Schedulers.io()) // 新建 ObservableObserveOn .observeOn(Schedulers.newThread()) // 返回 LambdaObserver .subscribe{ Thread.sleep(5000) Log.d("rxjava", it.toString()) }
Observable / Observer
组合的订阅消费流程 和线程切换原理在去年都曾经写过, 这里就不再赘述, 我们主要看下, 上游的缓冲池(buffer)的做法, 相关代码在 ObserveOnObserver#onNext
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ... @Override public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; ... queue = new SpscLinkedArrayQueue<T>(bufferSize); downstream.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } ... } public final class SpscLinkedArrayQueue<T> implements SimplePlainQueue<T> { public SpscLinkedArrayQueue(final int bufferSize) { int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize)); int mask = p2capacity - 1; AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<Object>(p2capacity + 1); producerBuffer = buffer; producerMask = mask; adjustLookAheadStep(p2capacity); consumerBuffer = buffer; consumerMask = mask; producerLookAhead = mask - 1; // we know it's all empty to start with soProducerIndex(0L); } @Override public boolean offer(final T e) { if (null == e) { throw new NullPointerException("Null is not a valid element"); } // local load of field to avoid repeated loads after volatile reads final AtomicReferenceArray<Object> buffer = producerBuffer; final long index = lpProducerIndex(); final int mask = producerMask; final int offset = calcWrappedOffset(index, mask); if (index < producerLookAhead) { return writeToQueue(buffer, e, index, offset); } else { final int lookAheadStep = producerLookAheadStep; // go around the buffer or resize if full (unless we hit max capacity) int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask); if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room return writeToQueue(buffer, e, index, offset); } else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full return writeToQueue(buffer, e, index, offset); } else { resize(buffer, index, offset, e, mask); // add a buffer and link old to new return true; } } } }
省略掉大部分不相关代码, 这里大致做的是, 当被观察者被订阅的时候, 会初始化一个 SpscLinkedArrayQueue
类对象, 他的内部实际维护了一个 AtomicReferenceArray
对象, 这个就是我们的缓冲队列, 他的初始默认容量是128, 当我们调用到 onNext
方法的时候, 会首先判断buffer是否已满, 若未满, 则进行插入; 若已满, 则会进行扩容处理, 在异步情况下, 我们的下游无法消费掉buffer内的缓存, 导致内存占用越来越大, 最终导致OOM
Flowable / Subscriber
好, 快速过完无背压机制下的订阅流程后, 我们来看下RxJava2是怎么通过 Flowable / Subscriber
来处理背压的
订阅响应流程
首先, 我们来跟着基础Demo来看下Flowable的订阅流程
Flowable // 创建 FlowableCreate 对象 .create(FlowableOnSubscribe<Int> { for(i in 0..150){ it.onNext(i) Log.d("send", i.toString()) } }, BackpressureStrategy.LATEST) // 创建 FlowableSubscribeOn 对象 .subscribeOn(Schedulers.io()) // 创建 FlowableObserveOn 对象 .observeOn(AndroidSchedulers.mainThread()) .subscribe (object : FlowableSubscriber<Int> { lateinit var s: Subscription var i = 1 override fun onComplete() { Log.d("rxjava", "complete") } override fun onSubscribe(s: Subscription) { this.s = s s.request(130) } override fun onNext(t: Int?) { Log.e("rxjava receive", "${i++} $t") } override fun onError(t: Throwable?) { t?.printStackTrace() } }) Thread.sleep(5000)
-
create
操作符用来初始化一个FlowableCreate
对象, 并返回public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) { ObjectHelper.requireNonNull(source, "source is null"); ObjectHelper.requireNonNull(mode, "mode is null"); return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode)); }
-
subscribeOn
操作符 将FlowableCreate
进行包装, 以FlowableSubscribeOn
对象返回public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return subscribeOn(scheduler, !(this instanceof FlowableCreate)); } public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn)); }
-
observeOn
操作符 将FlowableSubscribeOn
进行包装, 并以FlowableObserveOn
对象返回public final Flowable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
注意这里传入的 butterSize()
方法, 他的详细代码如下
static final int BUFFER_SIZE; static { BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128)); } public static int bufferSize() { return BUFFER_SIZE; }
在 FlowableObserveOn
类里, prefetch
表示的是初始的缓存池大小, 可以看出, 如果不指定buffer大小, 那么我们默认大小就是 128
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> { ... public FlowableObserveOn( Flowable<T> source, Scheduler scheduler, boolean delayError, int prefetch) { super(source); this.scheduler = scheduler; this.delayError = delayError; // 预约的缓存池初始化大小 this.prefetch = prefetch; } ... }
-
subscribe
操作符, 以当前Demo为例, 我们这里生成了一个LambdaSubscriber
, 而后调用到的是我们上个操作符流程返回的Flowable#subscribe
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) { return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); }
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) { ... LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; }
public final void subscribe(FlowableSubscriber<? super T> s) { ObjectHelper.requireNonNull(s, "s is null"); try { Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s); ... subscribeActual(z); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Subscription has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
最终走到 Flowable#subscribeActual(Subscriber<? super T> s)
方法, 它是个抽象方法, 由调用方 FlowableObserveOn
对象实现
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> { ... @Override public void subscribeActual(Subscriber<? super T> s) { Worker worker = scheduler.createWorker(); if (s instanceof ConditionalSubscriber) { source.subscribe(new ObserveOnConditionalSubscriber<T>( (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch)); } else { source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch)); } } ... }
-
这里
source
对象是我们前面被包装的FlowableSubscribeOn
对象, 是不是很熟悉?FlowableSubscribeOn
类继承于Flowable
, 而Flowable#subscribe(FlowableSubscriber<? super T> s)
方法最终走到Flowable#subscribeActual(Subscriber<? super T> s)
,上文已知这是个抽象方法, 最终执行到FlowableSubscribeOn#subscribeActual
, 并将LambdaSubscriber
包装成ObserveOnSubscriber
作为参数传递进去public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> { ... @Override public void subscribeActual(final Subscriber<? super T> s) { Scheduler.Worker w = scheduler.createWorker(); final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests); // s 为 ObserveOnSubscriber 对象 s.onSubscribe(sos); w.schedule(sos); } ... }
- 我们先来看下
s.onSubscribe(sos);
执行代码static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T> implements FlowableSubscriber<T> { ... @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { this.upstream = s; if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") QueueSubscription<T> f = (QueueSubscription<T>) s; int m = f.requestFusion(ANY | BOUNDARY); if (m == SYNC) { sourceMode = SYNC; queue = f; done = true; downstream.onSubscribe(this); return; } else if (m == ASYNC) { sourceMode = ASYNC; queue = f; downstream.onSubscribe(this); s.request(prefetch); return; } } // 初始化缓存队列 // prefetch 默认大小为128 queue = new SpscArrayQueue<T>(prefetch); // 下流订阅执行 downstream.onSubscribe(this); // 上流的subscription request 执行 s.request(prefetch); } } ... }
- 我们先来看下
可以看到, 在这个时候, 就会执行到消费者的 onSubscribe
方法, 然后会执行到 SubscribeOnSubscriber#request
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread> implements FlowableSubscriber<T>, Subscription, Runnable { @Override public void request(final long n) { if (SubscriptionHelper.validate(n)) { Subscription s = this.upstream.get(); if (s != null) { requestUpstream(n, s); } else { BackpressureHelper.add(requested, n); s = this.upstream.get(); if (s != null) { long r = requested.getAndSet(0L); if (r != 0L) { requestUpstream(r, s); } } } } } }
SubscribeOnSubscriber
实现了 Subscription
的接口, request
方法主要用来处理内部缓存最大值
public interface Subscription { /** * No events will be sent by a {@link Publisher} until demand is signaled via this method. * <p> * It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE. * An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded". * <p> * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled. * <p> * A {@link Publisher} can send less than is requested if the stream ends but * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}. * * @param n the strictly positive number of elements to requests to the upstream {@link Publisher} */ public void request(long n); public void cancel(); }
- 这里, 和原来的线程切换原理相同, 根据
scheduler.createWorker
根据工厂模式的帮助, 创建新的Worker
对象, 它的内部的工作原理都是通过线程池来执行对应的Runnable
, 而我们包装后所得的SubscribeOnSubscriber
对象正是继承于Runnable
, 我们可以看他内部实现的run
方法static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread> implements FlowableSubscriber<T>, Subscription, Runnable { ... SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) { this.downstream = actual; this.worker = worker; this.source = source; this.upstream = new AtomicReference<Subscription>(); this.requested = new AtomicLong(); this.nonScheduledRequests = !requestOn; } @Override public void run() { lazySet(Thread.currentThread()); Publisher<T> src = source; source = null; src.subscribe(this); } ... }
这里的 source
是在 FlowableSubscribeOn
初始化的时候, 通过构造函数传入的, 而我们知道, 他的内部是 FlowableCreate
public FlowableSubscribeOn(Flowable<T> source, Scheduler scheduler, boolean nonScheduledRequests) { super(source); this.scheduler = scheduler; this.nonScheduledRequests = nonScheduledRequests; }
- 根据前面的套路, 我们直接去看
FlowableCreate
下的subscribeActual
实现方法@Override public void subscribeActual(Subscriber<? super T> t) { BaseEmitter<T> emitter; switch (backpressure) { case MISSING: { emitter = new MissingEmitter<T>(t); break; } case ERROR: { emitter = new ErrorAsyncEmitter<T>(t); break; } case DROP: { emitter = new DropAsyncEmitter<T>(t); break; } case LATEST: { emitter = new LatestAsyncEmitter<T>(t); break; } default: { emitter = new BufferAsyncEmitter<T>(t, bufferSize()); break; } } // SubscribeOnSubscriber.onSubscribe t.onSubscribe(emitter); try { // 执行被观察者发送事件 source.subscribe(emitter); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); emitter.onError(ex); } }
可以看到, 根据我们选择的背压策略, 新建不同类型继承于 BaseEmitter
类对象.
- 而这时候, 首先执行的是
SubscribeOnSubscriber#onSubscribe
@Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this.upstream, s)) { // 初始128 long r = requested.getAndSet(0L); if (r != 0L) { requestUpstream(r, s); } } } // 切换线程到正确的目标线程上 // 最终执行 Subscription#request(n) void requestUpstream(final long n, final Subscription s) { if (nonScheduledRequests || Thread.currentThread() == get()) { s.request(n); } else { worker.schedule(new Request(s, n)); } }
这里的 Subscription
是前面 BaseEmitter
对象, 将其内部的缓存容量大小设为默认的128大小.
abstract static class BaseEmitter<T> extends AtomicLong implements FlowableEmitter<T>, Subscription { @Override public final void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); onRequested(); } } void onRequested() { // default is no-op } }
-
到这里我们的订阅整个流程已经完成, 然后我们通过实现的被观察者的发送事件, 通过
emitter
来循环发送自增整型i
- 我们来看下排除掉背压控制后的响应流程, 关于几个背压策略我们后文细讲, 这里我们需要知道
MissingEmitter
是不会处理背压, 所以我们从他的onNext
看起static final class MissingEmitter<T> extends BaseEmitter<T> { MissingEmitter(Subscriber<? super T> downstream) { super(downstream); } @Override public void onNext(T t) { if (isCancelled()) { return; } if (t != null) { downstream.onNext(t); } else { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } ... } }
直接执行下流的
onNext
方法, 而它的下流是SubscribeOnSubscriber
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread> implements FlowableSubscriber<T>, Subscription, Runnable { @Override public void onNext(T t) { downstream.onNext(t); } }
- 我们来看下排除掉背压控制后的响应流程, 关于几个背压策略我们后文细讲, 这里我们需要知道
-
SubscribeOnSubscriber
的下流是ObserveOnSubscriber
, 他的onNext
方法在父类BaseObserveOnSubscriber
中abstract static class BaseObserveOnSubscriber<T> extends BasicIntQueueSubscription<T> implements FlowableSubscriber<T>, Runnable { @Override public final void onNext(T t) { if (done) { return; } if (sourceMode == ASYNC) { trySchedule(); return; } if (!queue.offer(t)) { upstream.cancel(); error = new MissingBackpressureException("Queue is full?!"); done = true; } trySchedule(); } final void trySchedule() { if (getAndIncrement() != 0) { return; } worker.schedule(this); } @Override public final void run() { if (outputFused) { runBackfused(); } else if (sourceMode == SYNC) { runSync(); } else { runAsync(); } } }
ObserveOnSubscriber
内部维护的 queue
是 SpscArrayQueue
, 当维护的队列满了以后, 直接返回false, 而不会再做扩容操作, 当检测到内部缓存队列已满的时候, 会设置 done
为true, 异常为 MissingBackpressureException
@Override public boolean offer(E e) { if (null == e) { throw new NullPointerException("Null is not a valid element"); } // local load of field to avoid repeated loads after volatile reads final int mask = this.mask; final long index = producerIndex.get(); final int offset = calcElementOffset(index, mask); if (index >= producerLookAhead) { int step = lookAheadStep; if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad producerLookAhead = index + step; } else if (null != lvElement(offset)) { return false; } } soElement(offset, e); // StoreStore soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() return true; }
来看下他的异步处理
@Override void runAsync() { int missed = 1; final Subscriber<? super T> a = downstream; final SimpleQueue<T> q = queue; long e = produced; for (;;) { long r = requested.get(); while (e != r) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; upstream.cancel(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); e++; if (e == limit) { if (r != Long.MAX_VALUE) { r = requested.addAndGet(-e); } upstream.request(e); e = 0L; } } if (e == r && checkTerminated(done, q.isEmpty(), a)) { return; } int w = get(); if (missed == w) { produced = e; missed = addAndGet(-missed); if (missed == 0) { break; } } else { missed = w; } } }
final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) { if (cancelled) { clear(); return true; } if (d) { if (delayError) { if (empty) { cancelled = true; Throwable e = error; if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); return true; } } else { Throwable e = error; if (e != null) { cancelled = true; clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { cancelled = true; a.onComplete(); worker.dispose(); return true; } } } return false; }
执行流程基本与 ObservableObserveOn
相同, 这里不再赘述.
可以注意到, 当 done
为true并且 error
不为空的时候, 会调用到下流的 onError
, 最终是抛出 MissingBackpressureException
异常
背压策略
Emitter的类图如下
BaseEmitter
我们首先看下 BaseEmitter
, 它是一个抽象类, 由于继承 AtomicLong
, 所以本身可维护下流缓存阈值, 而他内部维护的 downsteam
表示的是下流的 Subscriber
, 在本文的demo中, 它就是 SubscribeOnSubscriber
对象, 他的内部还实现了 request(long n)
方法, 这个方法主要是设置内部的缓存允许阈值, 详细代码我们再上文有带到.
NoOverflowBaseAsyncEmitter
NoOverflowBaseAsyncEmitter
继承于 BaseEmitter
, 并覆写了onNext, 这里的代码也比较容易理解, 在缓存大小允许范围内发送数据, 则直接发送, 并计数-1, 否则调用 onOverflow
抽象方法, 可以看出 onOverflow
就是出现背压情况时候的实际处理策略
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> { @Override public final void onNext(T t) { if (isCancelled()) { return; } ... if (get() != 0) { downstream.onNext(t); // 阈值-1 BackpressureHelper.produced(this, 1); } else { onOverflow(); } } // 超出阈值情况处理 abstract void onOverflow(); }
ErrorAsyncEmitter
ErrorAsyncEmitter
继承于 NoOverflowBaseAsyncEmitter
, 当出现背压情况的时候, 则直接调用 onError
, 抛出 MissingBackpressureException
异常
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> { ... @Override void onOverflow() { onError(new MissingBackpressureException("create: could not emit value due to lack of requests")); } }
DropAsyncEmitter
DropAsyncEmitter
继承于 NoOverflowBaseAsyncEmitter
, 背压策略不做任务处理, 则当超出缓存允许大小的时候, 对应的发送数据就不会再调用到下流的响应事件, 相当于drop
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> { ... @Override void onOverflow() { // nothing to do } }
LatestAsyncEmitter
它的内部维护了一个 AtomicReference
的 queue
, 可以理解为其自身的缓存池, 而这个池只有一个内存大小, 它在调用到 onNext
方法的时候, 将发送的数据存入 queue
, 并调用 drain
, 我们主要看下 drain
方法
void drain() { // 省略保证线程安全相关代码 ... final Subscriber<? super T> a = downstream; final AtomicReference<T> q = queue; for (;;) { long r = get(); long e = 0L; while (e != r) { if (isCancelled()) { q.lazySet(null); return; } boolean d = done; T o = q.getAndSet(null); boolean empty = o == null; if (d && empty) { Throwable ex = error; if (ex != null) { error(ex); } else { complete(); } return; } if (empty) { break; } a.onNext(o); e++; } if (e == r) { if (isCancelled()) { q.lazySet(null); return; } boolean d = done; boolean empty = q.get() == null; if (d && empty) { Throwable ex = error; if (ex != null) { error(ex); } else { complete(); } return; } } if (e != 0) { BackpressureHelper.produced(this, e); } ... } }
我们可以看到, 它在内部缓存阈值允许范围内, 会遍历读取 queue
的数据, 并将其发送给下流,当然这里 queue
的缓存大小也只有一个, 所以才会出现消费的最后一个数据会是上流发送的最后一个数据
BufferAsyncEmitter
BufferAsyncEmitter
内部维护了一个128大小的 SpscLinkedArrayQueue
缓存队列, SpscLinkedArrayQueue
我们在 ObserveOnObserver
中见过, 他在存入数据的时候, 当达到队列最大长度的时候, 会进行扩容处理, 而通过 BufferAsyncEmitter
每次发送数据的时候, 首先会存入到 queue
中, 然后调用 drain
, 这里的代码与 LatestAsyncEmitter
也很相似, 区别只在于缓冲区的区别, 而当缓存区一再扩容的时候, 有可能出现OOM的现象, 他的逻辑其实与 ObserveOnObserver
内的消费逻辑是一样的
void drain() { ... final Subscriber<? super T> a = downstream; final SpscLinkedArrayQueue<T> q = queue; for (;;) { long r = get(); long e = 0L; while (e != r) { if (isCancelled()) { q.clear(); return; } boolean d = done; T o = q.poll(); boolean empty = o == null; if (d && empty) { Throwable ex = error; if (ex != null) { error(ex); } else { complete(); } return; } if (empty) { break; } a.onNext(o); e++; } if (e == r) { if (isCancelled()) { q.clear(); return; } boolean d = done; boolean empty = q.isEmpty(); if (d && empty) { Throwable ex = error; if (ex != null) { error(ex); } else { complete(); } return; } } if (e != 0) { BackpressureHelper.produced(this, e); } ... } }
背压总结
我们再回头看下, 几种背压策略的效果
策略 | 效果 |
---|---|
MISSING | 无任何背压策略执行, 除非调用 onBackpressureXXX |
ERROR | 抛出异常 |
BUFFER | 内部维护可扩容的缓存池, 效果与 Observer 一样, 可能导致OOM |
DROP | 如果下流无法跟上上流发射速度, 则会丢弃这块数据 |
LATEST | 当下流无法跟上上流的发射速度的时候, 则只保存最近生产的数据 |
以上所述就是小编给大家介绍的《RxJava源码解析(三)-背压》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
嵌入式系统开发之道
2011-12 / 69.00元
《嵌入式系统开发之道:菜鸟成长日志与项目经理的私房菜》用平易朴实的语言,以一个完整的嵌入式系统的开发流程为架构,通过一位“菜鸟”工程师与项目经理的诙谐对话,故事性地带出嵌入式系统概念及开发要素,并点出要成为一名称职的嵌入式系统工程师,在实际工作中所必须具备的各项知识及技能。 《嵌入式系统开发之道:菜鸟成长日志与项目经理的私房菜》可以分为三大部分:第1、3、4、17、18、19章和附录D为嵌入......一起来看看 《嵌入式系统开发之道》 这本书的介绍吧!