复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49复制代码本文用于记录一下自己学习Rxjava2源码的过程。复制代码复制代码首先是最简单的一个使用方法(未做线程切换),用来学习Flowable的创建和使用。复制代码复制代码Flowable .create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception { } }, BackpressureStrategy.ERROR) .subscribe(new FlowableSubscriber<Object>() { @Override public void onSubscribe(@NonNull Subscription s) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });
复制代码
1
复制代码
1
复制代码
1一、先看下create这个方法
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13复制代码public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) { ObjectHelper.requireNonNull(source, "source is null"); ObjectHelper.requireNonNull(mode, "mode is null"); return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode)); }
复制代码
1
复制代码
11、首先看其参数,其参数为FlowableOnSubscribe和BackpressureStrategy
复制代码
1
复制代码
1FlowableOnSubscribe为一个接口,里面只有一个subscribe方法,该方法参数为FlowableEmitter
复制代码
1
2
3
4
5
6
7
8
9复制代码public interface FlowableOnSubscribe<T> { void subscribe(@NonNull FlowableEmitter<T> e) throws Exception; }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30复制代码复制代码public interface FlowableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable s); void setCancellable(@Nullable Cancellable c); long requested(); boolean isCancelled();复制代码FlowableEmitter<T> serialize(); }
复制代码
1FlowableEmitter是一个接口,继承自Emitter接口
复制代码
1
2
3
4
5
6
7
8
9
10
11
12复制代码public interface Emitter<T> {复制代码void onNext(@NonNull T value);复制代码void onError(@NonNull Throwable error);复制代码void onComplete(); }
复制代码
1Emitter接口里面的方法是不是很熟悉,这3个方法就是对数据的回调处理了。
复制代码
1因此create方法的第一个参数大概作用就可以知道了,创建一个匿名对象,这个对象处理数据供回调操作。
复制代码
1
复制代码
1BackpressureStrategy这是一个枚举作用后面再说。
复制代码
1
2
复制代码
12、再来看create里面的代码,前两句用来判断参数是否为空,主要看最后一句
复制代码
1复制代码return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
复制代码
1先看onAssembly这个方法的参数,new了一个FlowableCreate对象,传入了create的参数,看一下FlowableCreate这个类,继承自Flwable,其构造方法保存了FlowableOnSubscribe、BackpressureStrategy这两个对象
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21复制代码复制代码复制代码public final class FlowableCreate<T> extends Flowable<T> { final FlowableOnSubscribe<T> source; final BackpressureStrategy backpressure; public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) { this.source = source; this.backpressure = backpressure; }复制代码}
复制代码
1接着进入onAssembly方法
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25复制代码复制代码public static <T> Flowable<T> onAssembly(@NonNull Flowable<T> source) { Function<? super Flowable, ? extends Flowable> f = onFlowableAssembly; if (f != null) { return apply(f, source); } return source; }
复制代码
1这里先去获取了Function对象f,但是这时候f为null,所以直接反回了source,也就是刚刚new的FlowableCreate对象,保存了FlowableOnSubscribe、BackpressureStrategy这两个对象,到此为止create结束。
复制代码
1
2
复制代码
1二、接着看subscribe方法,先看参数,这里new了个FlowableSubscriber对象
复制代码
1
复制代码
1复制代码public interface FlowableSubscriber<T> extends Subscriber<T>
复制代码
1FlowableSubscriber为一个接口,继承自Subscriber,Subscriber也是一个接口,里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13复制代码public interface Subscriber<T> {复制代码public void onSubscribe(Subscription s);复制代码
复制代码public void onNext(T t);复制代码public void onError(Throwable t);复制代码
复制代码public void onComplete(); }
复制代码
1里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看,进入subscribe方法里面
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63复制代码public final void subscribe(FlowableSubscriber<? super T> s) { ObjectHelper.requireNonNull(s, "s is null"); try { Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s); ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); subscribeActual(z); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
复制代码
1主要看try里的代码
复制代码
1
2
3
4复制代码Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);复制代码复制代码ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); subscribeActual(z);
复制代码
11、第一句调用了onSubscribe方法,进入方法里面
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17复制代码public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) { BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe; if (f != null) { return apply(f, source, subscriber); } return subscribe:; }
复制代码
1该方法传入两个参数Flowable、Subscriber,调用该方法时传入传入的是this(即create创建出来的FlowableCreate对象),s(即subscribe方法new的FlowableSubscriber对象)
复制代码
1该方法内部BiFunction对象f为null所以直接反回了FlowableSubscriber对象。
复制代码
1所以第一句代码主要作用就是把FlowableSubscriber对象赋值给z
复制代码
12、第二句检查z是否为null
复制代码
13、这句代码调用了Flowable中的复制代码protected abstract void subscribeActual(Subscriber<? super T> s);
复制代码
1这个方法,这是个抽象方法。但是由于create方法,此时的Flowable对象是他的子类FlowableCreate
复制代码
1进入FlowableCreate类里面的subscribeActual方法
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95复制代码public void subscribeActual(Subscriber<? super T> t) { BaseEmitter<T> emitter; switch (backpressure) { case MISSING: { emitter = new MissingEmitter<T>(t); break; } case ERROR: { emitter = new ErrorAsyncEmitter<T>(t); break; } case DROP: { emitter = new DropAsyncEmitter<T>(t); break; } case LATEST: { emitter = new LatestAsyncEmitter<T>(t); break; } default: { emitter = new BufferAsyncEmitter<T>(t, bufferSize()); break; } } t.onSubscribe(emitter); try { source.subscribe(emitter); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); emitter.onError(ex); } }
复制代码
1此时枚举对象BackpressureStrategy派上用场了,根据枚举类型创建不同子类的BaseEmitter对象,这里以ErrorAsyncEmitter为例。
复制代码
1
2
3
4
5复制代码ErrorAsyncEmitter(Subscriber<? super T> actual) { super(actual); }
复制代码
1
2复制代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97复制代码复制代码abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> { private static final long serialVersionUID = 4127754106204442833L; NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) { super(actual); } @Override public final void onNext(T t) { if (isCancelled()) { return; } if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (get() != 0) { actual.onNext(t); BackpressureHelper.produced(this, 1); } else { onOverflow(); } } abstract void onOverflow(); }
复制代码
1这个类里面有个onNext方法,把参数赋给了Subscriber的onNext方法,这里就关联上了onNext方法。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218复制代码复制代码
复制代码implements FlowableEmitter<T>, Subscription { private static final long serialVersionUID = 7326289992464377023L; final Subscriber<? super T> actual; final SequentialDisposable serial; BaseEmitter(Subscriber<? super T> actual) { this.actual = actual; this.serial = new SequentialDisposable(); } @Override public void onComplete() { if (isCancelled()) { return; } try { actual.onComplete(); } finally { serial.dispose(); } } @Override public void onError(Throwable e) { if (e == null) { e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (isCancelled()) { RxJavaPlugins.onError(e); return; } try { actual.onError(e); } finally { serial.dispose(); } } @Override public final void cancel() { serial.dispose(); onUnsubscribed(); } void onUnsubscribed() { // default is no-op } @Override public final boolean isCancelled() { return serial.isDisposed(); } @Override public final void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); onRequested(); } } void onRequested() { // default is no-op } @Override public final void setDisposable(Disposable s) { serial.update(s); } @Override public final void setCancellable(Cancellable c) { setDisposable(new CancellableDisposable(c)); } @Override public final long requested() { return get(); } @Override public final FlowableEmitter<T> serialize() { return new SerializedEmitter<T>(this); } }
复制代码
1最终到了BaseEmitter这个了,该类实现了FlowableEmitter这个接口
复制代码
1这个类保存了FlowableSubscriber对象,同时可以看到onError、onComplete这两个方法,此时Emitter和Subscriber的方法关联完毕。onError、onComplete这两个方法是互斥的,所以调用一个后另一个就不执行,具体的过程这里不分析。
复制代码
1
2
复制代码
1继续看这句代码onSubscribet.onSubscribe(emitter);把emitter作为参数传给了onSubscribe用于回调
复制代码
1
复制代码
1最后这句代码:
复制代码
1复制代码source.subscribe(emitter);
复制代码
1source为最开始create的参数,即我们new的FlowableOnSubscribe对象。这句代码把emitter作为参数传给了FlowableOnSubscribe对象。
复制代码
1
2
复制代码
1整个过程完成了
复制代码
1总结起来步骤为:
复制代码
11、create方法创建了FlowableCreate对象,这个对象保存了一个FlowableOnSubscribe对象,FlowableOnSubscribe包含了一个subscribe(@NonNull FlowableEmitter<T> e)方法,该方法调用Emitter里的onNext、onError、onComplete方法。
复制代码
12、subscribe方法创建了一个Emitter对象,通过这个对象关联了FlowableOnSubscribe对象和FlowableSubscriber对象,在FlowableOnSubscribe的subscribe方法中获取数据后调用emitter的方法把数据传递给FlowableSubscriber,用于处理结果。
复制代码
1
复制代码
1
2
复制代码
1
2
最后
以上就是贤惠鲜花最近收集整理的关于Rxjava2源码分析(一):Flowable的创建和基本使用过程分析的全部内容,更多相关Rxjava2源码分析(一):Flowable内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复