概述
RxJava缺少创建无限自然数流的工厂。 这样的流很有用,例如,当您想通过压缩两个事件的唯一序列号给可能的无限事件流时:
Flowable<Long> naturalNumbers = //???
Flowable<Event> someInfiniteEventStream = //...
Flowable<Pair<Long, Event>> sequenced = Flowable.zip(
naturalNumbers,
someInfiniteEventStream,
Pair::of
);
实现naturalNumbers
令人惊讶地复杂。 在RxJava 1.x中,您可以短暂地放弃不遵守背压的Observable
:
import rx.Observable; //RxJava 1.x
Observable<Long> naturalNumbers = Observable.create(subscriber -> {
long state = 0;
//poor solution :-(
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(state++);
}
});
这样的流没有背压是什么意思? 好吧,基本上,流可以轻松地以CPU内核允许的速度生成事件( state
变量不断增加),每秒数百万。 但是,当使用者无法快速使用事件时,未处理事件的积压开始出现:
naturalNumbers
// .observeOn(Schedulers.io())
.subscribe(
x -> {
//slooow, 1 millisecond
}
);
上面的程序(带有observeOn()
运算符的注释掉)可以正常运行,因为它具有意外的反压。 默认情况下,所有内容在RxJava中都是单线程的,因此生产者和使用者在同一个线程中工作。 实际上,调用subscriber.onNext()
会阻止,因此while
循环会自动对其进行限制。 但是,尝试取消注释observeOn()
,灾难会在几毫秒后发生。 订阅回调在设计上是单线程的。 对于每个元素,它至少需要1毫秒,因此该流每秒可以处理不超过1000个事件。 我们有些幸运。 RxJavaSwift发现这种灾难性状况,并因MissingBackpressureException
而快速失败
我们最大的错误是在生成事件时没有考虑消费者的速度。 顺便说一下,这是响应流背后的核心思想:不允许生产者发出比消费者请求更多的事件。 在RxJava 1.x中,即使实现最简单的流(从头开始考虑背压)也不是一件容易的事。 RxJava 2.x带来了一些便利的运算符,这些运算符建立在先前版本的经验基础之上。 首先RxJava 2.x时不允许你实现Flowable
(背压-aware)的相同的方式,你可以与Observable
。 创建Flowable
会使消费者使消息过载是不可能的:
Flowable<Long> naturalNumbers = Flowable.create(subscriber -> {
long state = 0;
while (!subscriber.isCancelled()) {
subscriber.onNext(state++);
}
}, BackpressureStrategy.DROP);
您是否发现了这个额外的DROP参数? 在解释之前,让我们看一下使用慢速用户订阅时的输出:
0
1
2
3
//...continuous numbers...
126
127
101811682
//...where did my 100M events go?!?
101811683
101811684
101811685
//...continuous numbers...
101811776
//...17M events disappeared again...
101811777
//...
你的旅费可能会改变。 怎么了? observeOn()
运算符在调度程序(线程池)之间切换。 从未决事件队列中合并的线程池。 此队列是有限的,容量为128个元素。 知道此限制的observeOn()
运算符仅从上游请求128个元素(我们的自定义Flowable
)。 此时,它使我们的订户可以处理事件,每毫秒1次。 因此,大约100毫秒后, observeOn()
发现其内部队列几乎为空,并要求更多。 会得到128、129、130…吗? 没有! 我们的Flowable
在这0.1秒内产生了疯狂的事件,并且(令人惊讶地)在该时间范围内成功产生了超过1亿个数字。 他们去哪了 好吧, observeOn()
并没有要求它们,因此DROP
策略(强制性参数)只是丢弃了不需要的事件。
BackpressureStrategy
听起来不对,还有其他策略吗? 是的,很多:
-
BackpressureStrategy.BUFFER
:如果上游产生太多事件,则将它们缓冲在无界队列中。 没有任何事件丢失,但是您的整个应用程序很可能会丢失。 如果幸运的话,OutOfMemoryError
将拯救您。 我停留在5秒以上的GC暂停中。 -
BackpressureStrategy.ERROR
:如果发现事件的过度产生,将抛出MissingBackpressureException
。 这是一个理智(安全)的策略。 -
BackpressureStrategy.LATEST
:类似于DROP
,但是记住上次删除的事件。 万一要求提供更多数据,但我们只是丢弃了所有内容–至少具有最后看到的价值。 -
BackpressureStrategy.MISSING
:没有安全措施,请加以处理。 下游运算符之一(如observeOn()
)最有可能抛出MissingBackpressureException
。 -
BackpressureStrategy.DROP
:删除未请求的事件。
顺便说一句,当您将Observable
变为Flowable
还必须提供BackpressureStrategy
。 RxJava必须知道如何限制过量产生的Observable
。 好的,那么简单的序列自然数流的正确实现是什么?
认识
create()
和generate()
之间的区别在于责任。 假设Flowable.create()
会在不考虑背压的情况下完整地生成流。 它只是在需要时才产生事件。 另一方面,仅允许Flowable.generate()
一次生成一个事件(或完成流)。 背压机制透明地计算出当前需要多少个事件。 generate()
调用适当的次数,例如,在observeOn()
情况下, observeOn()
128次。
因为此运算符一次生成一个事件,所以通常需要某种状态来确定上次出现的时间1 。 这就是generate()
含义:(im)可变状态的持有者和基于该状态生成下一个事件的函数:
Flowable<Long> naturalNumbers =
Flowable.generate(() -> 0L, (state, emitter) -> {
emitter.onNext(state);
return state + 1;
});
generate()
的第一个参数是初始状态(工厂),在本例中为0L
。 现在,每当订户或任何下游操作员要求一定数量的事件时,都会调用lambda表达式。 它的职责是根据提供的状态最多调用一次onNext()
(最多发出一个事件)。 首次调用lambda时, state
等于初始值0L
。 但是,我们可以修改状态并返回其新值。 在此示例中,我们增加了long
以便随后的lambda表达式调用收到state = 1L
。 显然,这种情况不断发生,产生连续的自然数。
这样的编程模型显然比while
循环难。 它还从根本上改变了实现事件源的方式。 与其在任何时候都想推送事件,不如只是被动地等待请求。 下游运营商和订户正在从您的流中提取数据。 这种转变可在管道的所有级别上产生背压。
generate()
有一些风格。 首先,如果您的状态是可变对象,则可以使用不需要返回新状态值的重载版本。 尽管功能较少,但可变状态往往会产生较少的垃圾。 这假设您的状态不断变化,并且每次都传递相同的状态对象实例。 例如,您可以轻松地将Iterator
(也是基于pull的!)变成具有反压奇观的流:
Iterator<Integer> iter = //...
Flowable<String> strings = Flowable.generate(() -> iter, (iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next().toString());
} else {
emitter.onComplete();
}
});
注意,流的类型( <String>
)不必与状态的类型( Iterator<Integer>
)相同。 当然,如果您有Java Collection
并想将其转换为流,则不必先创建迭代器。 使用Flowable.fromIterable()
足够了。 甚至更简单的generate()
版本都假定您根本没有任何状态。 例如随机数流:
Flowable<Double> randoms = Flowable
.generate(emitter -> emitter.onNext(Math.random()));
但老实说,您可能最终将需要一个Random
实例:
Flowable.generate(Random::new, (random, emitter) -> {
emitter.onNext(random.nextBoolean());
});
摘要
如您所见,RxJava 1.x中的Observable.create()
和Flowable.create Flowable.create()
有一些缺点。 如果您真的在乎大量并发系统的可伸缩性和运行状况(否则您将不会读到这篇文章!),则必须了解背压。 如果您真的需要从头开始创建流,而不是使用from*()
系列方法或繁重工作的各种库,请熟悉generate()
。 本质上,您必须学习如何将某些类型的数据源建模为奇特的迭代器。 期待有更多文章解释如何实现更多现实生活流。
这类似于无状态HTTP协议,该协议在服务器上使用称为session *的小状态来跟踪过去的请求。
翻译自: https://www.javacodegeeks.com/2017/08/generating-backpressure-aware-streams-flowable-generate-rxjava-faq.html
最后
以上就是难过向日葵为你收集整理的使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答的全部内容,希望文章能够帮你解决使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复