概述
RxJava2中最大的更新就是Flowable了,它支持背压模式。就是处理一个快速更新的被观察者,和一个缓慢处理的观察者时的出现的oom问题。让我们先看看它的使用
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
for (int i = 0; i < 10; i++) {//十个客官排队
e.onNext(i + "");//一个个跃跃欲试啊
}
e.onComplete();//完成
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())//上游io线程
.observeOn(AndroidSchedulers.mainThread())//下游UI线程
.subscribe(new Subscriber<String>() {
Subscription mSub;
@Override
public void onSubscribe(Subscription subscription) {
mSub = subscription;
mSub.request(1);//来一个吧~
}
@Override
public void onNext(String str) {
Log.e("flowable", "onNext ====>> " + str);//来喽~~
mSub.request(1);//那下一位客官里面请~~
}
@Override
public void onError(Throwable t) {
Log.e("flowable", "onError ====>> " + t.getMessage().toString());
}
@Override
public void onComplete() {
Log.e("flowable", "onComplete");
}
});
运行结果
凑字数23333
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 0
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 1
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 2
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 3
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 4
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 5
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 6
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 7
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 8
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>> 9
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onComplete
进入正题,
Flowable.create()方法有两个参数,一个是FlowableOnSubscribe,另一个参数就是背压的策略类型。他是枚举类型的参数。
public enum BackpressureStrategy {
/**
* 不指定背压策略
*/
MISSING,
/**
* 如果放入Flowable的异步缓存池中的数据超限(128)了,就会抛出MissingBackpressureException异常
*/
ERROR,
/**
* 缓存池没有固定大小,可无限添加直到oom
*/
BUFFER,
/**
* 如果异步缓存池满了,就会丢掉将要放入缓存池中的数据
*/
DROP,
/**
* 如果异步缓存池满了,会丢掉将要放入缓存池的数据,但不管缓存池的状态如何,都会讲最后一条强行放入缓存池中
*/
LATEST
}
之前例子中我们指定的背压策略为BUFFER,那我进入create方法中看看是怎么完成的策略模式。
create方法返回了一个FlowableCreate对象,这个类继承了Flowable,重写了其中的subscribeActual方法
public final class FlowableCreate<T> extends Flowable<T> {
final FlowableOnSubscribe<T> source;
final BackpressureStrategy backpressure;
//正好就是Flowable.create中的两个参数
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;//策略的父类,发射器
switch (backpressure) {//根据枚举来判断创建那种发射器
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);//调用观察者subscriber的onSubscribe,返回一个发射器
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}}
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Subscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
s = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");
subscribeActual(s);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
而当我们调用subscribe方法,也就是观察者和被观察者发生订阅关系时,就会调用subscribeActual方法,设置发射器的类型。并调用观察者Subscriber的onSubscribe方法返回一个发射器(发射器实现了Subscription接口)。
@Override
public void onSubscribe(Subscription subscription) {
mSub = subscription;
mSub.request(1);//来一个吧~
}
我们就是使用这个发射器实例的request方法,实现相应式拉动。
最后
以上就是笨笨汽车为你收集整理的Re:Android RxJava2中Flowable使用的策略模式的全部内容,希望文章能够帮你解决Re:Android RxJava2中Flowable使用的策略模式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复