我是靠谱客的博主 贤惠鲜花,这篇文章主要介绍Rxjava2源码分析(一):Flowable的创建和基本使用过程分析,现在分享给大家,希望可以做个参考。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
复制代码
本文用于记录一下自己学习Rxjava2源码的过程。
复制代码
复制代码
首先是最简单的一个使用方法(未做线程切换),用来学习Flowable的创建和使用。
复制代码
复制代码
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override

public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception {
}
}, BackpressureStrategy.ERROR)
.subscribe(new FlowableSubscriber<Object>() {
@Override

public void onSubscribe(@NonNull Subscription s) {
}
@Override

public void onNext(Object o) {
}
@Override

public void onError(Throwable t) {
}
@Override

public void onComplete() {
}
});
复制代码
1
复制代码
1
复制代码
1
一、先看下create这个方法
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");

ObjectHelper.requireNonNull(mode, "mode is null");

return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
复制代码
1
复制代码
1
1、首先看其参数,其参数为FlowableOnSubscribe和BackpressureStrategy
复制代码
1
复制代码
1
FlowableOnSubscribe为一个接口,里面只有一个subscribe方法,该方法参数为FlowableEmitter
复制代码
1
2
3
4
5
6
7
8
9
复制代码
public interface FlowableOnSubscribe<T> {


void subscribe(@NonNull FlowableEmitter<T> e) throws Exception;
}
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码
复制代码
public interface FlowableEmitter<T> extends Emitter<T> {


void setDisposable(@Nullable Disposable s);


void setCancellable(@Nullable Cancellable c);


long requested();


boolean isCancelled();
复制代码


FlowableEmitter<T> serialize();
}
复制代码
1
FlowableEmitter是一个接口,继承自Emitter接口
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
复制代码
public interface Emitter<T> {
复制代码


void onNext(@NonNull T value);
复制代码


void onError(@NonNull Throwable error);
复制代码


void onComplete();
}
复制代码
1
Emitter接口里面的方法是不是很熟悉,这3个方法就是对数据的回调处理了。
复制代码
1
因此create方法的第一个参数大概作用就可以知道了,创建一个匿名对象,这个对象处理数据供回调操作。
复制代码
1
复制代码
1
BackpressureStrategy这是一个枚举作用后面再说。
复制代码
1
2
复制代码
1
2、再来看create里面的代码,前两句用来判断参数是否为空,主要看最后一句
复制代码
1
复制代码
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
复制代码
1
先看onAssembly这个方法的参数,new了一个FlowableCreate对象,传入了create的参数,看一下FlowableCreate这个类,继承自Flwable,其构造方法保存了FlowableOnSubscribe、BackpressureStrategy这两个对象
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码
复制代码
复制代码
public final class FlowableCreate<T> extends Flowable<T> {
final FlowableOnSubscribe<T> source;

final BackpressureStrategy backpressure;

public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;

this.backpressure = backpressure;

}
复制代码
}
复制代码
1
接着进入onAssembly方法
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码
复制代码
public static <T> Flowable<T> onAssembly(@NonNull Flowable<T> source) {
Function<? super Flowable, ? extends Flowable> f = onFlowableAssembly;

if (f != null) {
return apply(f, source);

}
return source;
}
复制代码
1
这里先去获取了Function对象f,但是这时候f为null,所以直接反回了source,也就是刚刚new的FlowableCreate对象,保存了FlowableOnSubscribe、BackpressureStrategy这两个对象,到此为止create结束。
复制代码
1
2
复制代码
1
二、接着看subscribe方法,先看参数,这里new了个FlowableSubscriber对象
复制代码
1
复制代码
1
复制代码
public interface FlowableSubscriber<T> extends Subscriber<T>
复制代码
1
FlowableSubscriber为一个接口,继承自Subscriber,Subscriber也是一个接口,里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码
public interface Subscriber<T> {
复制代码


public void onSubscribe(Subscription s);
复制代码

复制代码

public void onNext(T t);
复制代码


public void onError(Throwable t);
复制代码

复制代码

public void onComplete();
}
复制代码
1
里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看,进入subscribe方法里面
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
复制代码
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");

try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);


ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");


subscribeActual(z);

} catch (NullPointerException e) { // NOPMD

throw e;

} catch (Throwable e) {
Exceptions.throwIfFatal(e);



RxJavaPlugins.onError(e);


NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}
}
复制代码
1
主要看try里的代码
复制代码
1
2
3
4
复制代码
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
复制代码
复制代码
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); 
subscribeActual(z);
复制代码
1
1、第一句调用了onSubscribe方法,进入方法里面
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码
public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) {
BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe;

if (f != null) {
return apply(f, source, subscriber);

}
return subscribe:;
}
复制代码
1
该方法传入两个参数Flowable、Subscriber,调用该方法时传入传入的是this(即create创建出来的FlowableCreate对象),s(即subscribe方法new的FlowableSubscriber对象
复制代码
1
该方法内部BiFunction对象f为null所以直接反回了FlowableSubscriber对象。
复制代码
1
所以第一句代码主要作用就是把FlowableSubscriber对象赋值给z
复制代码
1
2、第二句检查z是否为null
复制代码
1
3、这句代码调用了Flowable中的
复制代码
protected abstract void subscribeActual(Subscriber<? super T> s);
复制代码
1
这个方法,这是个抽象方法。但是由于create方法,此时的Flowable对象是他的子类FlowableCreate
复制代码
1
进入FlowableCreate类里面的subscribeActual方法
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
复制代码
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;


switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);

break;

}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);

break;

}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);

break;

}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);

break;

}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());

break;

}
}
t.onSubscribe(emitter);

try {
source.subscribe(emitter);

} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

emitter.onError(ex);

}
}
复制代码
1
此时枚举对象BackpressureStrategy派上用场了,根据枚举类型创建不同子类的BaseEmitter对象,这里以ErrorAsyncEmitter为例。
复制代码
1
2
3
4
5
复制代码
ErrorAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
复制代码
1
2
复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
复制代码
复制代码
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;


NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
super(actual);

}
@Override

public final void onNext(T t) {
if (isCancelled()) {
return;

}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));

return;

}
if (get() != 0) {
actual.onNext(t);

BackpressureHelper.produced(this, 1);

} else {
onOverflow();

}
}
abstract void onOverflow();
}
复制代码
1
这个类里面有个onNext方法,把参数赋给了Subscriber的onNext方法,这里就关联上了onNext方法。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
复制代码
复制代码

复制代码
implements FlowableEmitter<T>, Subscription {
private static final long serialVersionUID = 7326289992464377023L;


final Subscriber<? super T> actual;


final SequentialDisposable serial;


BaseEmitter(Subscriber<? super T> actual) {
this.actual = actual;

this.serial = new SequentialDisposable();

}
@Override

public void onComplete() {
if (isCancelled()) {
return;

}
try {
actual.onComplete();

} finally {
serial.dispose();

}
}
@Override

public void onError(Throwable e) {
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");

}
if (isCancelled()) {
RxJavaPlugins.onError(e);

return;

}
try {
actual.onError(e);

} finally {
serial.dispose();

}
}
@Override

public final void cancel() {
serial.dispose();

onUnsubscribed();

}
void onUnsubscribed() {
// default is no-op

}
@Override

public final boolean isCancelled() {
return serial.isDisposed();

}
@Override

public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);

onRequested();

}
}
void onRequested() {
// default is no-op

}
@Override

public final void setDisposable(Disposable s) {
serial.update(s);

}
@Override

public final void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));

}
@Override

public final long requested() {
return get();

}
@Override

public final FlowableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);

}
}
复制代码
1
最终到了BaseEmitter这个了,该类实现了FlowableEmitter这个接口
复制代码
1
这个类保存了FlowableSubscriber对象,同时可以看到onErroronComplete这两个方法,此时EmitterSubscriber的方法关联完毕。onErroronComplete这两个方法是互斥的,所以调用一个后另一个就不执行,具体的过程这里不分析。
复制代码
1
2
复制代码
1
继续看这句代码onSubscribet.onSubscribe(emitter);emitter作为参数传给了onSubscribe用于回调
复制代码
1
复制代码
1
最后这句代码:
复制代码
1
复制代码
source.subscribe(emitter);
复制代码
1
source为最开始create的参数,即我们new的FlowableOnSubscribe对象。这句代码把emitter作为参数传给了FlowableOnSubscribe对象
复制代码
1
2
复制代码
1
整个过程完成了
复制代码
1
总结起来步骤为:
复制代码
1
1、create方法创建了FlowableCreate对象,这个对象保存了一个FlowableOnSubscribe对象,FlowableOnSubscribe包含了一个subscribe(@NonNull FlowableEmitter<T> e)方法,该方法调用Emitter里的onNext、onError、onComplete方法。
复制代码
1
2、subscribe方法创建了一个Emitter对象,通过这个对象关联了FlowableOnSubscribe对象和FlowableSubscriber对象,在FlowableOnSubscribe的subscribe方法中获取数据后调用emitter的方法把数据传递给FlowableSubscriber,用于处理结果。
复制代码
1
复制代码
1
2
复制代码
1
2

最后

以上就是贤惠鲜花最近收集整理的关于Rxjava2源码分析(一):Flowable的创建和基本使用过程分析的全部内容,更多相关Rxjava2源码分析(一):Flowable内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(44)

评论列表共有 0 条评论

立即
投稿
返回
顶部