概述
RxJava源码(从 just 开始)
RxJava源码(简单操作符)
RxJava源码(线程)
RxJava源码(背压)
RxJava源码(RxBinding)
RxJava源码(衍生 RxLifecycle)
先来看下背压方式的简单实现:
// 1、创建Flowable对象
val flowable = Flowable
.create<Int>(object : FlowableOnSubscribe<Int?> {
override fun subscribe(emitter: FlowableEmitter<Int?>) {
Log.e("TAG", "subscribe: " + emitter.requested())
for (i in 0..127) {
Log.e("TAG", "subscribe: $i")
emitter.onNext(i)
}
}
}, BackpressureStrategy.DROP)
// 2、创建Subscriber观察者对象
val subscriber: Subscriber<Int> = object : Subscriber<Int> {
var mSubscription: Subscription? = null
override fun onSubscribe(s: Subscription) {
Log.e("TAG", "onSubscribe")
mSubscription = s
s.request(1)
}
override fun onNext(integer: Int) {
Log.e("TAG", "onNext: $integer")
}
override fun onError(t: Throwable) {
Log.e("TAG", "onError: ", t)
}
override fun onComplete() {
Log.e("TAG", "onComplete")
}
}
// 3、发生订阅关系
flowable.subscribe(subscriber)
在创建 Flowable 的第二个参数中设置 BackpressureStrategy 策略。
Flowable 提供了以下几种策略:
public enum BackpressureStrategy {
MISSING,
ERROR,
BUFFER,
DROP,
LATEST
}
现在就开始从源码角度看了:
由前几天我们知道一定会到 FlowableCreate 中的 subscribeActual 方法中。
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
这里由传入不同的策略,会生成对应的 BaseEmitter策略子类。
ERROR:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时。直接抛出异常MissingBackpressureException
MISSING:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时。友好提示:缓存区满了
BUFFER:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时。将缓存区大小设置成无限大,直至OOM。
DROP:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时。超过缓存区大小(128)的事件丢弃。
LATEST:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时。只保存最新(最后)事件,超过缓存区大小(128)的事件丢弃。
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription
BaseEmitter 集成 AtomicLong,long 的操作保持原子型。由于生产者和消费者可以在不同的线程操作,可能会带来线程不安全,所以采用了 AtomicLong 线程安全的 Long 来保存可消费的数据。
实现了 FlowableEmitter 和 Subscription 。
FlowableEmitter 类继承 Emitter ,在发射的之前可以通过 requestd 方法判断下游还可以处理多少,这样就完成了响应式拉取的核心东西。
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
long requested();
boolean isCancelled();
@NonNull
FlowableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}
Subscription 这里就不解释了。
t.onSubscribe(emitter);实则执行了 LambdaSubscriber 的 onSubscribe ,进而 RequestMax 的 accept 中。
public enum RequestMax implements Consumer<Subscription> {
INSTANCE;
@Override
public void accept(Subscription t) throws Exception {
t.request(Long.MAX_VALUE);
}
}
经过一系列,它会调用 FlowableCreate 的 request 方法:
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
public static boolean validate(long n) {
if (n <= 0) {
return false;
}
return true;
}
先判断传入的 n 是否大于 0 是否合法,合法就更改可以处理的保存的记录数,n 这里是 Long.MAX_VALUE。
public static long add(AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}
public static long addCap(long a, long b) {
long u = a + b;
if (u < 0L) {
return Long.MAX_VALUE;
}
return u;
}
调用 requested.get(),实际取出当前还有多少数据没有被消费,都被消费了,直接返回 Long.MAX_VALUE,还有数据,addCap 计算出来的 u,并保存在 DropAsyncEmitter 类型的 requested 中;
source.subscribe(emitter);就是我们的自定义事件开始:
override fun subscribe(emitter: FlowableEmitter<Int>) {
for (i in 0..127) {
println("create: $i")
emitter.onNext(i)
}
}
source.subscribe(emitter) 执行中,通常会用 emitter 去触发 emitter.onNext(int),这样会触发事件流,也就到了 DropAsyncEmitter 中的 onNext 方法。
DropAsyncEmitter 没有重写,自然在其父类中。
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
前面都是检测合法性,重要的是下面的判断,get() 实则是 Long.MAX_VALUE 或者 Long.MAX_VALUE+ 没有消费的事件,即刚才保存的 u,之后再通过 LambdaSubscriber 订阅再发送到我们自定义的观察者中。
如果 get() != 0 ,说明下游可以处理数据,那就发送一个数据,否则则丢弃不管了(该事件到这就停止了),也就是 Drop 策略。get() != 0 则处理完成自定义的观察者 onNext 后,将记录的处理数据减一,再次将新的值赋给 DropAsyncEmitter 类型的 requested。
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
知道发送的原理了,我们接前文的异步看下。
当 Flowable 执行异步的时候会再 observeOn 中会创建一个 FlowableObserveOn 对象,其中将 BUFFER_SIZE 传入其中,赋值给参数 prefetch。
BUFFER_SIZE 为 128。
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
public static int bufferSize() {
return BUFFER_SIZE;
}
public final Flowable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
FlowableObserveOn 在 subscribeActual 中,将 ScheduledWorker 封装成为 ObserveOnSubscriber,此时 ObserveOnSubscriber 的 onSubscribe 中调用
queue = new SpscArrayQueue<T>(prefetch);//prefetch = 128
创建了长度 128 的 SimpleQueue 队列,这里缓存了发射的数据。
MISSING
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
if (t != null) {
downstream.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (; ; ) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
ERROR
没有发送处理
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
默认的BUFFER则新创建了SpscLinkedArrayQueue队列
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
this.wip = new AtomicInteger();
}
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.offer(t);
drain();
}
LATEST
LatestAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
this.queue = new AtomicReference<T>();
this.wip = new AtomicInteger();
}
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.set(t);
drain();
}
LatestAsyncEmitter 和 BufferAsyncEmitter 各有个的实现,这里就不详细介绍了。
最后
以上就是单薄抽屉为你收集整理的RxJava源码(背压)的全部内容,希望文章能够帮你解决RxJava源码(背压)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复