我是靠谱客的博主 粗犷心情,最近开发中收集的这篇文章主要介绍Rxjava 背压笔记,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

RxJava是一个观察者模式的架构,当这个架构中被观察者(Observable)和观察者(Subscriber)处在不同的线程环境中时,由于者各自的工作量不一样,导致它们产生事件和处理事件的速度不一样,这就会出现两种情况:

  • 被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件。
  • 被观察者产生事件的速度很快,而观察者处理很慢。那就出问题了,如果不作处理的话,事件会堆积起来,最终挤爆你的内存,导致程序崩溃。

下面我们用代码演示一下这种崩溃的场景:

//被观察者在主线程中,每1ms发送一个事件
Observable.interval(1, TimeUnit.MILLISECONDS)
//.subscribeOn(Schedulers.newThread())
//将观察者的工作放在新线程环境中
.observeOn(Schedulers.newThread())
//观察者处理每1000ms才处理一个事件
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("TAG","---->"+aLong);
}
});

在上面的代码中,被观察者发送事件的速度是观察者处理速度的1000倍

这段代码运行之后:


...
Caused by: rx.exceptions.MissingBackpressureException
...
...

首先什么样的情况下会抛出MissingBackpressureException呢?在Rxjava 1.x中的observeOn,因为切换了消费者的线程,因此内部实现了用队列存储事件.在Android中默认的buffersize大小是16,因此当消费者比生产者慢时,队列中的数目积累到超过16个,就会抛出MissingBackpressureException,那应该如何去解决这个问题呢,这时就需要进行Flow Control(流控)了.

Flow Control(详解)有哪些方式呢?大概是有四种:

  • (1) 背压(Backpressure)。
  • (2) 节流(Throttling)。
  • (3) 打包处理。
  • (4) 调用栈阻塞(Callstack blocking)。

背压(Backpressure)

Backpressure,也称为Reactive Pull,就是下游需要多少(具体是通过下游的request请求指定需要多少),上游就发送多少。这有点类似于TCP里的流量控制,接收方根据自己的接收窗口的情况来控制接收速率,并通过反向的ACK包来控制发送方的发送速率。

这种方案只对于所谓的cold Observable有效。而hot Observable是无效的

节流(Throttling)

节流(Throttling),说白了就是丢弃。消费不过来,就处理其中一部分,剩下的丢弃。还是举音视频直播的例子,在下游处理不过来的时候,就需要丢弃数据包。

而至于处理哪些和丢弃哪些数据,就有不同的策略。主要有三种策略:

  • sample (也叫throttleLast):在一段时间内,只处理最后一个数据
  • throttleFirst:在一段时间内,只处理第一个数据
  • debounce (也叫throttleWithTimeout):发送一个数据,开始计时,到了规定时间内,若没有再发送数据,则开始处理数据,反之重新开始计时

打包处理

打包就是把上游来的小包裹打成大包裹,分发到下游。这样下游需要处理的包裹的个数就减少了。RxJava中提供了两类这样的机制:buffer和window。

buffer和window的功能基本一样,只是输出格式不太一样:buffer打包后的包裹用一个List表示,而window打包后的包裹又是一个Observable

调用栈阻塞(Callstack blocking)

这是一种特殊情况,阻塞住整个调用栈(Callstack blocking)。之所以说这是一种特殊情况,是因为这种方式只适用于整个调用链都在一个线程上同步执行的情况,这要求中间的各个operator都不能启动新的线程。在平常使用中这种应该是比较少见的,因为我们经常使用subscribeOn或observeOn来切换执行线程,而且有些复杂的operator本身也会在内部启动新的线程来处理。另外,如果真的出现了完全同步的调用链,前面的另外三种Flow Control思路仍然可能是适用的,只不过这种阻塞的方式更简单,不需要额外的支持。

这里举个例子把调用栈阻塞和前面的Backpressure比较一下。“调用栈阻塞”相当于很多车行驶在盘山公路上,而公路只有一条车道。那么排在最前面的第一辆车就挡住了整条路,后面的车也只能排在后面。而“Backpressure”相当于银行办业务时的窗口叫号,窗口主动叫某个号过去(相当于请求),那个人才过去办理

下面我们详细解释下背压:

需要强调两点:

  • 背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
  • 背压(Backpressure)并不是一个像flatMap一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。

那么我们再回看上面的程序异常就很好理解了,就是当被观察者发送事件速度过快的情况下,我们没有做流速控制,导致了异常。

那么背压(Backpressure)策略具体是哪如何实现流速控制的呢?

在RxJava的观察者模型中, 被观察者是主动的推送数据给观察者,观察者是被动接收的 。而响应式拉取则反过来, 观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据 。这种叫做 响应式拉取;

结构示意图如下:


观察者可以根据自身实际情况按需拉取数据,而不是被动接收(也就相当于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。

代码实例如下:

//被观察者将产生100000个事件
Observable observable=Observable.range(1,100000);
class MySubscriber extends Subscriber<T> {
@Override
public void onStart() {
//一定要在onStart中通知被观察者先发送一个事件
request(1);
}
@Override
public void onCompleted() {
...
}
@Override
public void onError(Throwable e) {
...
}
@Override
public void onNext(T n) {
...
...
//处理完毕之后,在通知被观察者发送下一个事件
request(1);
}
}
observable.observeOn(Schedulers.newThread())
.subscribe(MySubscriber);

在代码中,传递事件开始前的onstart()中,调用了request(1),通知被观察者先发送一个事件,然后在onNext()中处理完事件,再次调用request(1),通知被观察者发送下一个事件....

注意在onNext()方法中,最好最后再调用request()方法.

如果你想取消这种backpressure 策略,调用quest(Long.MAX_VALUE)即可。

实际上,在上面的代码中,你也可以不需要调用request(n)方法去拉取数据,程序依然能完美运行,这是因为range --> observeOn,这一段中间过程本身就是响应式拉取数据,observeOn这个操作符内部有一个缓冲区,Android环境下长度是16,它会告诉range最多发送16个事件,充满缓冲区即可。不过话说回来,在观察者中使用request(n)这个方法可以使背压的策略表现得更加直观,更便于理解

如果你足够细心,会发现,在开头展示异常情况的代码中,使用的是interval这个操作符,但是在这里使用了range操作符,为什么呢?

这是因为interval操作符本身并不支持背压策略,它并不响应request(n),也就是说,它发送事件的速度是不受控制的,而range这类操作符是支持背压的,它发送事件的速度可以被控制。

那么到底什么样的Observable是支持背压的呢?

引出 Hot and Cold Observables(原创详解)

  • Cold Observables:指的是那些在订阅之后才开始发送事件的Observable(每个Subscriber都能独立的接收到完整的事件)。
  • Hot Observables:指的是那些在创建了Observable之后,(不管是否订阅)就开始发送事件的Observable,一创建就开始发送数据.

其中:我们经常用到的Observable.create 就是 Cold Observable,而 just, range, timer 和 from 这些创建的同样是 Cold Observable。

cold observable 相当于响应式拉(就是observer处理完了一个事件就从observable拉取下一个事件),hot observable通常不能很好的处理响应式拉模型,但它却是处理流量控制问题的不二候选人,例如使用onBackpressureBuffer或者onBackpressureDrop 操作符,和其他操作符比如operators, throttling, buffers, or windows.

Hot Observable这一类是不支持背压的,而是Cold Observable这一类中也有一部分并不支持背压(比如interval,timer等操作符创建的Observable)。所以在Rxjava2.0就做了一些升级

不过此次更新中,出现了两种观察者模式:

  • Observable(被观察者)/Observer(观察者)
  • Flowable(被观察者)/Subscriber(观察者)

RxJava2.X中,Observeable用于订阅Observer,是不支持背压的,而Flowable用于订阅Subscriber,是支持背压(Backpressure)的。

在Rxjava 1.x中有一些操作符是可以让不支持背压的Observeable转换成支持背压的Observeable;
有下面四种operator:
  • onBackpressureBuffer
  • onBackpressureDrop
  • onBackpressureLatest
  • onBackpressureBlock(已过期)

它们转化成的Observable分别具有不同的Backpressure策略。

而在RxJava 2.x中,Observable不再支持Backpressure,而是改用Flowable来专门支持Backpressure。上面提到的四种operator的前三种分别对应Flowable的三种Backpressure策略:

  • BackpressureStrategy.BUFFER
  • BackpressureStrategy.DROP
  • BackpressureStrategy.LATEST

onBackpressureBuffer是不丢弃数据的处理方式。把上游收到的全部缓存下来,等下游来请求再发给下游。相当于一个水库。但上游太快,水库(buffer)就会溢出。

onBackpressureDrop和onBackpressureLatest比较类似,都会丢弃数据。这两种策略相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。但这两种策略在令牌数为0的时候有一点微妙的区别:onBackpressureDrop直接丢弃数据,不缓存任何数据;而onBackpressureLatest则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。可以结合下面两幅图来理解。

onBackpressureBlock是看下游有没有需求,有需求就发给下游,下游没有需求,不丢弃,但试图堵住上游的入口(能不能真堵得住还得看上游的情况了),自己并不缓存。这种策略已经废弃不用。

最后

以上就是粗犷心情为你收集整理的Rxjava 背压笔记的全部内容,希望文章能够帮你解决Rxjava 背压笔记所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部