本文用于记录一下自己学习Rxjava2源码的过程。首先是最简单的一个使用方法(未做线程切换),用来学习Flowable的创建和使用。Flowable .create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception { } }, BackpressureStrategy.ERROR) .subscribe(new FlowableSubscriber<Object>() { @Override public void onSubscribe(@NonNull Subscription s) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });
一、先看下create这个方法
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) { ObjectHelper.requireNonNull(source, "source is null"); ObjectHelper.requireNonNull(mode, "mode is null"); return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode)); }
1、首先看其参数,其参数为FlowableOnSubscribe和BackpressureStrategy
FlowableOnSubscribe为一个接口,里面只有一个subscribe方法,该方法参数为FlowableEmitter
public interface FlowableOnSubscribe<T> { void subscribe(@NonNull FlowableEmitter<T> e) throws Exception; }
public interface FlowableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable s); void setCancellable(@Nullable Cancellable c); long requested(); boolean isCancelled();FlowableEmitter<T> serialize(); }
FlowableEmitter是一个接口,继承自Emitter接口
public interface Emitter<T> {void onNext(@NonNull T value);void onError(@NonNull Throwable error);void onComplete(); }
Emitter接口里面的方法是不是很熟悉,这3个方法就是对数据的回调处理了。
因此create方法的第一个参数大概作用就可以知道了,创建一个匿名对象,这个对象处理数据供回调操作。
BackpressureStrategy这是一个枚举作用后面再说。
2、再来看create里面的代码,前两句用来判断参数是否为空,主要看最后一句
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
先看onAssembly这个方法的参数,new了一个FlowableCreate对象,传入了create的参数,看一下FlowableCreate这个类,继承自Flwable,其构造方法保存了FlowableOnSubscribe、BackpressureStrategy这两个对象
public final class FlowableCreate<T> extends Flowable<T> { final FlowableOnSubscribe<T> source; final BackpressureStrategy backpressure; public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) { this.source = source; this.backpressure = backpressure; }}
接着进入onAssembly方法
public static <T> Flowable<T> onAssembly(@NonNull Flowable<T> source) { Function<? super Flowable, ? extends Flowable> f = onFlowableAssembly; if (f != null) { return apply(f, source); } return source; }
这里先去获取了Function对象f,但是这时候f为null,所以直接反回了source,也就是刚刚new的FlowableCreate对象,保存了FlowableOnSubscribe、BackpressureStrategy这两个对象,到此为止create结束。
二、接着看subscribe方法,先看参数,这里new了个FlowableSubscriber对象
public interface FlowableSubscriber<T> extends Subscriber<T>
FlowableSubscriber为一个接口,继承自Subscriber,Subscriber也是一个接口,里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看
public interface Subscriber<T> {public void onSubscribe(Subscription s);
public void onNext(T t);public void onError(Throwable t);
public void onComplete(); }
里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看,进入subscribe方法里面
public final void subscribe(FlowableSubscriber<? super T> s) { ObjectHelper.requireNonNull(s, "s is null"); try { Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s); ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); subscribeActual(z); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
主要看try里的代码
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); subscribeActual(z);
1、第一句调用了onSubscribe方法,进入方法里面
public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) { BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe; if (f != null) { return apply(f, source, subscriber); } return subscribe:; }
该方法传入两个参数Flowable、Subscriber,调用该方法时传入传入的是this(即create创建出来的FlowableCreate对象),s(即subscribe方法new的FlowableSubscriber对象)
该方法内部BiFunction对象f为null所以直接反回了FlowableSubscriber对象。
所以第一句代码主要作用就是把FlowableSubscriber对象赋值给z
2、第二句检查z是否为null
3、这句代码调用了Flowable中的protected abstract void subscribeActual(Subscriber<? super T> s);
这个方法,这是个抽象方法。但是由于create方法,此时的Flowable对象是他的子类FlowableCreate
进入FlowableCreate类里面的subscribeActual方法
public void subscribeActual(Subscriber<? super T> t) { BaseEmitter<T> emitter; switch (backpressure) { case MISSING: { emitter = new MissingEmitter<T>(t); break; } case ERROR: { emitter = new ErrorAsyncEmitter<T>(t); break; } case DROP: { emitter = new DropAsyncEmitter<T>(t); break; } case LATEST: { emitter = new LatestAsyncEmitter<T>(t); break; } default: { emitter = new BufferAsyncEmitter<T>(t, bufferSize()); break; } } t.onSubscribe(emitter); try { source.subscribe(emitter); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); emitter.onError(ex); } }
此时枚举对象BackpressureStrategy派上用场了,根据枚举类型创建不同子类的BaseEmitter对象,这里以ErrorAsyncEmitter为例。
ErrorAsyncEmitter(Subscriber<? super T> actual) { super(actual); }
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> { private static final long serialVersionUID = 4127754106204442833L; NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) { super(actual); } @Override public final void onNext(T t) { if (isCancelled()) { return; } if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (get() != 0) { actual.onNext(t); BackpressureHelper.produced(this, 1); } else { onOverflow(); } } abstract void onOverflow(); }
这个类里面有个onNext方法,把参数赋给了Subscriber的onNext方法,这里就关联上了onNext方法。
implements FlowableEmitter<T>, Subscription { private static final long serialVersionUID = 7326289992464377023L; final Subscriber<? super T> actual; final SequentialDisposable serial; BaseEmitter(Subscriber<? super T> actual) { this.actual = actual; this.serial = new SequentialDisposable(); } @Override public void onComplete() { if (isCancelled()) { return; } try { actual.onComplete(); } finally { serial.dispose(); } } @Override public void onError(Throwable e) { if (e == null) { e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (isCancelled()) { RxJavaPlugins.onError(e); return; } try { actual.onError(e); } finally { serial.dispose(); } } @Override public final void cancel() { serial.dispose(); onUnsubscribed(); } void onUnsubscribed() { // default is no-op } @Override public final boolean isCancelled() { return serial.isDisposed(); } @Override public final void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); onRequested(); } } void onRequested() { // default is no-op } @Override public final void setDisposable(Disposable s) { serial.update(s); } @Override public final void setCancellable(Cancellable c) { setDisposable(new CancellableDisposable(c)); } @Override public final long requested() { return get(); } @Override public final FlowableEmitter<T> serialize() { return new SerializedEmitter<T>(this); } }
最终到了BaseEmitter这个了,该类实现了FlowableEmitter这个接口
这个类保存了FlowableSubscriber对象,同时可以看到onError、onComplete这两个方法,此时Emitter和Subscriber的方法关联完毕。onError、onComplete这两个方法是互斥的,所以调用一个后另一个就不执行,具体的过程这里不分析。
继续看这句代码onSubscribet.onSubscribe(emitter);把emitter作为参数传给了onSubscribe用于回调
最后这句代码:
source.subscribe(emitter);
source为最开始create的参数,即我们new的FlowableOnSubscribe对象。这句代码把emitter作为参数传给了FlowableOnSubscribe对象。
整个过程完成了
总结起来步骤为:
1、create方法创建了FlowableCreate对象,这个对象保存了一个FlowableOnSubscribe对象,FlowableOnSubscribe包含了一个subscribe(@NonNull FlowableEmitter<T> e)方法,该方法调用Emitter里的onNext、onError、onComplete方法。
2、subscribe方法创建了一个Emitter对象,通过这个对象关联了FlowableOnSubscribe对象和FlowableSubscriber对象,在FlowableOnSubscribe的subscribe方法中获取数据后调用emitter的方法把数据传递给FlowableSubscriber,用于处理结果。
最后
以上就是贤惠鲜花最近收集整理的关于Rxjava2源码分析(一):Flowable的创建和基本使用过程分析的全部内容,更多相关Rxjava2源码分析(一):Flowable内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复