概述
系列文章
RxJava系列之简介和观察者设计模式
RxJava系列之上游与下游
RxJava系列之常用创建型操作符
RxJava系列之常用变换操作符
RxJava系列之常用过滤操作符
RxJava系列之常用条件操作符
RxJava系列之常用合并操作符
RxJava系列之常用异常操作符
RxJava系列之线程切换实战
RxJava系列之背压模式
RxJava系列之配合Retrofit
RxJava系列之泛型高级
RxJava系列之手写create操作符
RxJava系列之手写create操作符增加泛型限定
RxJava系列之手写just操作符
RxJava系列之手写map操作符
RxJava系列之手写切换线程
背压模式
背压模式的由来:
RxJava1.X的时候,还没有背压模式, 我们的上游不停的发射,我们的下游处理不过来,就会照成内存泄漏
RxJava2.X之后,增加背压模式,Observable Flowable(解决背压)
Observable — > Flowable(解决背压)
什么时候用Observable<—>Observer, 什么使用Flowable<—>Subscriber ?
答:发射的事件,大量的事件(1000个),并且考虑到下游处理不过来,就需要使用Flowable
被压模式中的策略
// ERROR 放入缓存池,如果池子满了 水缸 max 128
// BackpressureStrategy.ERROR // todo 上游不停的发射大量事件,下游阻塞了 处理不过来,放入缓存池,如果池子满了,就会抛出异常
BackpressureStrategy.BUFFER // todo 上游不停的发射大量事件,下游阻塞了 处理不过来,放入缓存池,”等待“下游来接收事件处理
// BackpressureStrategy.DROP // todo 上游不停的发射大量事件,下游阻塞了 处理不过来,放入缓存池,如果池子满了,就会把后面发射的事件丢弃(1 ~ 5000 池子满了4000, 4001 ~ 5000丢弃)
// BackpressureStrategy.LATEST // todo 上游不停的发射大量事件,下游阻塞了 处理不过来,只存储 128个事件
测试代码
public void r01(View view) {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
// todo 1 上游不停的发射大量事件
for (int i = 0; i < Integer.MAX_VALUE; i++) {
e.onNext(i); // todo 1
}
e.onComplete();
}
},
// ERROR
放入缓存池,如果池子满了
水缸
max 128
// BackpressureStrategy.ERROR // todo 上游不停的发射大量事件,下游阻塞了 处理不过来,放入缓存池,如果池子满了,就会抛出异常
BackpressureStrategy.BUFFER // todo
上游不停的发射大量事件,下游阻塞了 处理不过来,放入缓存池,”等待“下游来接收事件处理
// BackpressureStrategy.DROP //
todo
上游不停的发射大量事件,下游阻塞了 处理不过来,放入缓存池,如果池子满了,就会把后面发射的事件丢弃
// (1 ~ 5000
池子满了4000,
4001 ~ 5000丢弃)
// BackpressureStrategy.LATEST // todo 上游不停的发射大量事件,下游阻塞了 处理不过来,只存储 128个事件
)
.subscribeOn(Schedulers.io()) // 给上游分配异步线程
.observeOn(AndroidSchedulers.mainThread())
// 给下游分配 主线程
// 订阅
.subscribe(
// 下游的简化版
/*new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
}*/
// 完整版本的下游 观察者
new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
// 如果同步的 不执行此s.request();,(等待下游,发现下游没有去处理)会抛出异常, 外界在调用subscription.request(10); 无效果
// 如果是异步的,不执行此s.request();,不会发生异常(上游不会等待下游)不会发生异常, 外界在调用subscription.request(10); 是ok的
// s.request(5); // 只请求输出 5次,给下游打印
// s.request(100);
// 只请求输出 100次,给下游打印
// s.request(500); // 只请求给下游输出 500个事件
// s.request(128); // 取出129给事件,给下游
// s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
// todo 2 下游阻塞了 处理不过来
try {
Thread.currentThread().sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// TODO 一旦下游处理了一次上游的事件,缓存池 - 1
Log.d(TAG, "onNext: " + integer);
}
// onError: create: could not emit value due to lack of requests
上游还有剩余的事件,无法处理,因为没有去请求
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
}
);
}
/**
* 外界调用 subscription.request(10); 异步的才有效
* @param view
*/
public void r02(View view) {
/**
*
todo 同步的 之前打印不出来,同步的 需要等待 下游处理后,然后再发射后面的事件,由于等待下游 没有request,所以就抛出异常 create: could not emit value due to lack of requests
*
之后
我们再 r02 方法中 点击 执行 subscription.request(1); 没效果
*
* todo 异步的:上游不停的发射, subscription.request(1); 就可以取出来了
*/
if (subscription != null)
// subscription.request(1); // 点击一下按钮,就接收一个,取出来 给下游处理
subscription.request(10);
}
public void r03(View view) {
// Observable.just()
// Flowable.just();
// Observable.fromArray()
// Flowable.fromArray()
}
总结
1.// ERROR 放入缓存池,如果池子满了 水缸 max 128
BackpressureStrategy.ERROR // todo 上游不停的发射大量事件,下游阻塞了 处理不过来,放入缓存池,如果池子满了,就会抛出异常
2.BackpressureStrategy.BUFFER // todo 上游不停的发射大量事件,下游阻塞了 处理不过来,放入缓存池,”等待“下游来接收事件处理
3.同步的,没有执行Subscription s.request(), 当上游发射1,下游无法处理(没有执行s.request()),会抛出异常
4.异步的,上游不停的发射,可以在r02方法中,s.request(10) 可以取出来给 下游接收事件处理的
5.一旦下游处理了一次上游的事件,缓存池 - 1
6.Observable 它的升级版 Flowable:功能强大,还有背压模式。
如果我们会使用Observable, 那么一定会使用Flowable
Flowable的设计,是按照Observable 依葫芦画瓢来设计Flowable,所以使用才一摸一样,只不过类名不同而已, Flowable还增加了背压模式
1.Observable<—>Observer, Flowable<—>Subscriber 对应关系
2.Observable的设计和 Flowable一致的,在Observable的基础上 增加了一套Flowable的代码,而且增加的时候 依葫芦画瓢的,Flowable增加了背压模式
3.Observable–Observer下游 – onSubscribe(Disposable d)切断下游(水管)
4.Flowable—Subscriber下游 – onSubscribe(Subscription s) 取出(s.request(5))事件 给下游接收使用
最后
以上就是鲤鱼自行车为你收集整理的RxJava系列之背压模式系列文章背压模式被压模式中的策略测试代码总结的全部内容,希望文章能够帮你解决RxJava系列之背压模式系列文章背压模式被压模式中的策略测试代码总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复