概述
在项目的下载库部分使用了PublishProcessor来实现支持有背压的Observable,向外提供数据变化状态,同时还对外提供了Single从数据库里面去获取数据。在下载界面我们会首先去subscribe PublishProcessor以监听数据变化,然后去拉取数据,填充到我们的界面里。流程大致如下:
TD.downloader.observeTaskChange()
.filter { it.change == TaskChange.TASK_ADD }
.observeOn(AndroidSchedulers.mainThread())
.`as`(RXUtils.autoDispose(lifecycleOwner))
.subscribe { events ->
...
}
...
TD.downloader.queryAll()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { tasks ->
...
}
在下载库里,我们的query是先与这个PublishProcessor.onNext()运行的,但是在实际运行中我发现在少部分低端手机上面总是先执行了observeTaskChange里面的代码,然后才执行queryAll()里面的代码。通过打印日志,我能很肯定代码运行的流程没问题,而如果我单步去调试,则总是先执行queryAll(),然后再执行observeTaskChange()。一时怀疑甚至怀疑这个Android系统的消息队列有问题,但是想想如果是消息队列序列有问题,那整个系统就完了。回头想想那么问题出在哪里呢??既然问题出在主线程分发,那如果能够在rxandroid打印一下postMessage的线程,就可以确定消息队列的序列了。为了验证这个问题,我把rxandroid的项目下载了下来,然后把项目中的rxandroid库换为了本地的,然后在HandlerScheduler类的schedule方法里sendMessageDelayed前加上了日志,打印了线程ID。打印Log后,我惊呆了,居然PublishProcessor.onNext()在有Bug的情况下根本就不会触发给Android的主线程发消息。于是我们可以猜测,应该是Flowable的其他地方已经把线程起起来了,然后通过Queue去把数据发送到了下游。通过Debug,然后发现FlowableObserveOn里面的trySchedule会在Subscription request的时候去调度线程,而这个request是在subscribe()的时候就会调用,这里就会去给主线程sendMessage了,如果手机配置不高,主线程执行速度慢,这个消息可能会有一定延迟才会收到。而如果这个时候我们调用了PublishProcessor.onNext(),就不会去给主线程发送新的消息,因为FlowableObserveOn里面正在处理,所以直接排队即可。这也就解释了为什么在同一个线程里面先返回了queryAll,后去PublishProcessor.onNext(),会先执行PublishProcessor的消费者代码,后执行queryAll的消费者代码。我这里用Java写了一个简单的Demo验证这个结论:
以下代码的single()线程可以认为是我上面说的Android主线程
public static void main(String[] args) {
PublishProcessor<Integer> publishProcessor = PublishProcessor.create();
Observable<Integer> observable = Observable.fromCallable(() -> 1)
.observeOn(Schedulers.single());
Flowable flowable = publishProcessor.hide().observeOn(Schedulers.single());
Completable.fromCallable((Callable<Integer>) () -> {
Thread.sleep(1000);
return 1;
}).subscribeOn(Schedulers.single())
.subscribe();
flowable.subscribe(res -> System.out.println(res));
observable.subscribe(res -> System.out.println(res));
publishProcessor.onNext(2);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出结果是:
2
1
证明了以上结论。
如何解决这个问题
这个问题的核心在于:
核心问题是热Flowable在subscribe的时候就会触发一次线程切换调度,不管有没有发射数据。我们尽量是理清楚是否真的需要处理背压,选择合适的操作符,我这里其实是不需要处理背压的,所以直接加一句toObservable()即可完美解决这个问题。此外我们注意Flowable的subscribe和其他操作符的subscribe调用顺序,也能完美解决这个问题。
最后
以上就是酷炫凉面为你收集整理的RxJava2的Flowable observeOn线程调度顺序问题的全部内容,希望文章能够帮你解决RxJava2的Flowable observeOn线程调度顺序问题所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复