我是靠谱客的博主 贤惠鲜花,这篇文章主要介绍Rxjava2源码分析(一):Flowable的创建和基本使用过程分析,现在分享给大家,希望可以做个参考。

本文用于记录一下自己学习Rxjava2源码的过程。
首先是最简单的一个使用方法(未做线程切换),用来学习Flowable的创建和使用。
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override

public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception {
}
}, BackpressureStrategy.ERROR)
.subscribe(new FlowableSubscriber<Object>() {
@Override

public void onSubscribe(@NonNull Subscription s) {
}
@Override

public void onNext(Object o) {
}
@Override

public void onError(Throwable t) {
}
@Override

public void onComplete() {
}
});


一、先看下create这个方法
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");

ObjectHelper.requireNonNull(mode, "mode is null");

return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}

1、首先看其参数,其参数为FlowableOnSubscribe和BackpressureStrategy

FlowableOnSubscribe为一个接口,里面只有一个subscribe方法,该方法参数为FlowableEmitter
public interface FlowableOnSubscribe<T> {


void subscribe(@NonNull FlowableEmitter<T> e) throws Exception;
}
public interface FlowableEmitter<T> extends Emitter<T> {


void setDisposable(@Nullable Disposable s);


void setCancellable(@Nullable Cancellable c);


long requested();


boolean isCancelled();


FlowableEmitter<T> serialize();
}
FlowableEmitter是一个接口,继承自Emitter接口
public interface Emitter<T> {


void onNext(@NonNull T value);


void onError(@NonNull Throwable error);


void onComplete();
}
Emitter接口里面的方法是不是很熟悉,这3个方法就是对数据的回调处理了。
因此create方法的第一个参数大概作用就可以知道了,创建一个匿名对象,这个对象处理数据供回调操作。

BackpressureStrategy这是一个枚举作用后面再说。

2、再来看create里面的代码,前两句用来判断参数是否为空,主要看最后一句
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
先看onAssembly这个方法的参数,new了一个FlowableCreate对象,传入了create的参数,看一下FlowableCreate这个类,继承自Flwable,其构造方法保存了FlowableOnSubscribe、BackpressureStrategy这两个对象
public final class FlowableCreate<T> extends Flowable<T> {
final FlowableOnSubscribe<T> source;

final BackpressureStrategy backpressure;

public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;

this.backpressure = backpressure;

}
}
接着进入onAssembly方法
public static <T> Flowable<T> onAssembly(@NonNull Flowable<T> source) {
Function<? super Flowable, ? extends Flowable> f = onFlowableAssembly;

if (f != null) {
return apply(f, source);

}
return source;
}
这里先去获取了Function对象f,但是这时候f为null,所以直接反回了source,也就是刚刚new的FlowableCreate对象,保存了FlowableOnSubscribe、BackpressureStrategy这两个对象,到此为止create结束。

二、接着看subscribe方法,先看参数,这里new了个FlowableSubscriber对象

public interface FlowableSubscriber<T> extends Subscriber<T>
FlowableSubscriber为一个接口,继承自Subscriber,Subscriber也是一个接口,里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看
public interface Subscriber<T> {


public void onSubscribe(Subscription s);


public void onNext(T t);


public void onError(Throwable t);


public void onComplete();
}
里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看,进入subscribe方法里面
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");

try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);


ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");


subscribeActual(z);

} catch (NullPointerException e) { // NOPMD

throw e;

} catch (Throwable e) {
Exceptions.throwIfFatal(e);



RxJavaPlugins.onError(e);


NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}
}
主要看try里的代码
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); 
subscribeActual(z);
1、第一句调用了onSubscribe方法,进入方法里面
public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) {
BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe;

if (f != null) {
return apply(f, source, subscriber);

}
return subscribe:;
}
该方法传入两个参数Flowable、Subscriber,调用该方法时传入传入的是this(即create创建出来的FlowableCreate对象),s(即subscribe方法new的FlowableSubscriber对象
该方法内部BiFunction对象f为null所以直接反回了FlowableSubscriber对象。
所以第一句代码主要作用就是把FlowableSubscriber对象赋值给z
2、第二句检查z是否为null
3、这句代码调用了Flowable中的
protected abstract void subscribeActual(Subscriber<? super T> s);
这个方法,这是个抽象方法。但是由于create方法,此时的Flowable对象是他的子类FlowableCreate
进入FlowableCreate类里面的subscribeActual方法
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;


switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);

break;

}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);

break;

}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);

break;

}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);

break;

}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());

break;

}
}
t.onSubscribe(emitter);

try {
source.subscribe(emitter);

} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

emitter.onError(ex);

}
}
此时枚举对象BackpressureStrategy派上用场了,根据枚举类型创建不同子类的BaseEmitter对象,这里以ErrorAsyncEmitter为例。
ErrorAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;


NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
super(actual);

}
@Override

public final void onNext(T t) {
if (isCancelled()) {
return;

}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));

return;

}
if (get() != 0) {
actual.onNext(t);

BackpressureHelper.produced(this, 1);

} else {
onOverflow();

}
}
abstract void onOverflow();
}
这个类里面有个onNext方法,把参数赋给了Subscriber的onNext方法,这里就关联上了onNext方法。

implements FlowableEmitter<T>, Subscription {
private static final long serialVersionUID = 7326289992464377023L;


final Subscriber<? super T> actual;


final SequentialDisposable serial;


BaseEmitter(Subscriber<? super T> actual) {
this.actual = actual;

this.serial = new SequentialDisposable();

}
@Override

public void onComplete() {
if (isCancelled()) {
return;

}
try {
actual.onComplete();

} finally {
serial.dispose();

}
}
@Override

public void onError(Throwable e) {
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");

}
if (isCancelled()) {
RxJavaPlugins.onError(e);

return;

}
try {
actual.onError(e);

} finally {
serial.dispose();

}
}
@Override

public final void cancel() {
serial.dispose();

onUnsubscribed();

}
void onUnsubscribed() {
// default is no-op

}
@Override

public final boolean isCancelled() {
return serial.isDisposed();

}
@Override

public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);

onRequested();

}
}
void onRequested() {
// default is no-op

}
@Override

public final void setDisposable(Disposable s) {
serial.update(s);

}
@Override

public final void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));

}
@Override

public final long requested() {
return get();

}
@Override

public final FlowableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);

}
}
最终到了BaseEmitter这个了,该类实现了FlowableEmitter这个接口
这个类保存了FlowableSubscriber对象,同时可以看到onErroronComplete这两个方法,此时EmitterSubscriber的方法关联完毕。onErroronComplete这两个方法是互斥的,所以调用一个后另一个就不执行,具体的过程这里不分析。

继续看这句代码onSubscribet.onSubscribe(emitter);emitter作为参数传给了onSubscribe用于回调

最后这句代码:
source.subscribe(emitter);
source为最开始create的参数,即我们new的FlowableOnSubscribe对象。这句代码把emitter作为参数传给了FlowableOnSubscribe对象

整个过程完成了
总结起来步骤为:
1、create方法创建了FlowableCreate对象,这个对象保存了一个FlowableOnSubscribe对象,FlowableOnSubscribe包含了一个subscribe(@NonNull FlowableEmitter<T> e)方法,该方法调用Emitter里的onNext、onError、onComplete方法。
2、subscribe方法创建了一个Emitter对象,通过这个对象关联了FlowableOnSubscribe对象和FlowableSubscriber对象,在FlowableOnSubscribe的subscribe方法中获取数据后调用emitter的方法把数据传递给FlowableSubscriber,用于处理结果。



最后

以上就是贤惠鲜花最近收集整理的关于Rxjava2源码分析(一):Flowable的创建和基本使用过程分析的全部内容,更多相关Rxjava2源码分析(一):Flowable内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部