我是靠谱客的博主 笨笨汽车,最近开发中收集的这篇文章主要介绍Re:Android RxJava2中Flowable使用的策略模式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

RxJava2中最大的更新就是Flowable了,它支持背压模式。就是处理一个快速更新的被观察者,和一个缓慢处理的观察者时的出现的oom问题。让我们先看看它的使用

Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                for (int i = 0; i < 10; i++) {//十个客官排队
                    e.onNext(i + "");//一个个跃跃欲试啊
                }
                e.onComplete();//完成
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())//上游io线程
                .observeOn(AndroidSchedulers.mainThread())//下游UI线程
                .subscribe(new Subscriber<String>() {

            Subscription mSub;

            @Override
            public void onSubscribe(Subscription subscription) {
                mSub = subscription;
                mSub.request(1);//来一个吧~
            }

            @Override
            public void onNext(String str) {
                Log.e("flowable", "onNext ====>>  " + str);//来喽~~
                mSub.request(1);//那下一位客官里面请~~
            }

            @Override
            public void onError(Throwable t) {
                Log.e("flowable", "onError ====>>  " + t.getMessage().toString());
            }

            @Override
            public void onComplete() {
                Log.e("flowable", "onComplete");
            }
        });
运行结果 凑字数23333
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  0
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  1
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  2
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  3
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  4
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  5
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  6
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  7
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  8
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onNext ====>>  9
01-10 11:06:55.182 23952-23952/com.hzly.text E/flowable: onComplete
进入正题, Flowable.create()方法有两个参数,一个是FlowableOnSubscribe,另一个参数就是背压的策略类型。他是枚举类型的参数。

public enum BackpressureStrategy {
    /**
     * 不指定背压策略
     */
    MISSING,
    /**
     * 如果放入Flowable的异步缓存池中的数据超限(128)了,就会抛出MissingBackpressureException异常
     */
    ERROR,
    /**
     * 缓存池没有固定大小,可无限添加直到oom
     */
    BUFFER,
    /**
     * 如果异步缓存池满了,就会丢掉将要放入缓存池中的数据
     */
    DROP,
    /**
     * 如果异步缓存池满了,会丢掉将要放入缓存池的数据,但不管缓存池的状态如何,都会讲最后一条强行放入缓存池中
     */
    LATEST
}
之前例子中我们指定的背压策略为BUFFER,那我进入create方法中看看是怎么完成的策略模式。

create方法返回了一个FlowableCreate对象,这个类继承了Flowable,重写了其中的subscribeActual方法

public final class FlowableCreate<T> extends Flowable<T> {

    final FlowableOnSubscribe<T> source;

    final BackpressureStrategy backpressure;
	//正好就是Flowable.create中的两个参数
    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;//策略的父类,发射器

        switch (backpressure) {//根据枚举来判断创建那种发射器
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);//调用观察者subscriber的onSubscribe,返回一个发射器
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }}
@BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Subscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            s = RxJavaPlugins.onSubscribe(this, s);
            ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");
            subscribeActual(s);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
而当我们调用subscribe方法,也就是观察者和被观察者发生订阅关系时,就会调用subscribeActual方法,设置发射器的类型。并调用观察者Subscriber的onSubscribe方法返回一个发射器(发射器实现了Subscription接口)。

@Override
public void onSubscribe(Subscription subscription) {
      mSub = subscription;
      mSub.request(1);//来一个吧~
            }
我们就是使用这个发射器实例的request方法,实现相应式拉动。

最后

以上就是笨笨汽车为你收集整理的Re:Android RxJava2中Flowable使用的策略模式的全部内容,希望文章能够帮你解决Re:Android RxJava2中Flowable使用的策略模式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(33)

评论列表共有 0 条评论

立即
投稿
返回
顶部