当前位置:   article > 正文

java xmap_RxJava2操作符(FlatMap学习)

the mapper returned a null observablesource

FlatMap是RxJava2变换操作符中比较重要的一个,本文我们来学习一下它的内部变换过程。

使用FlatMap变换方法如下:

private void init() {

Observer observer = new Observer() {

//.....代码省略

};

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) {

Log.i(TAG, "subscribe--运行线程:" + Thread.currentThread().getName());

emitter.onNext(1);

emitter.onNext(2);

emitter.onNext(3);

emitter.onComplete();

}

}).subscribeOn(Schedulers.io())

.subscribeOn(Schedulers.computation())

.observeOn(AndroidSchedulers.mainThread())

//FlatMap变换

.flatMap(new Function>() {

@Override

public ObservableSource apply(Integer integer) {

//将int类型参数转换为string类型参数,然后用just操作符将其重新发射出去

return Observable.just(String.valueOf(integer));

}

})

.subscribe(observer);

}

点进这个flatMap方法看下:

public final Observable flatMap(Function super T, ? extends ObservableSource extends R>> mapper) {

//调用两个参数的FlatMap

return flatMap(mapper, false);

}

public final Observable flatMap(Function super T, ? extends ObservableSource extends R>> mapper, boolean delayErrors) {

//调用三个参数的FlatMap

return flatMap(mapper, delayErrors, Integer.MAX_VALUE);

}

public final Observable flatMap(Function super T, ? extends ObservableSource extends R>> mapper, boolean delayErrors, int maxConcurrency) {

//调用四个参数的FlatMap

return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());

}

public final Observable flatMap(Function super T, ? extends ObservableSource extends R>> mapper,

boolean delayErrors, int maxConcurrency, int bufferSize) {

ObjectHelper.requireNonNull(mapper, "mapper is null");

ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");

ObjectHelper.verifyPositive(bufferSize, "bufferSize");

//如果上游的Observable类型是ScalarCallable类型的(比如上游的observable是通过Observable.just创建的等等,这种情况比较少见)

if (this instanceof ScalarCallable) {

@SuppressWarnings("unchecked")

T v = ((ScalarCallable)this).call();

if (v == null) {

return empty();

}

return ObservableScalarXMap.scalarXMap(v, mapper);

}

//上游类型不是ScalarCallable类型,返回ObservableFlatMap(一般情况下都是返回这个)

return RxJavaPlugins.onAssembly(new ObservableFlatMap(this, mapper, delayErrors, maxConcurrency, bufferSize));

}

FlatMap操作符通过一系列调用,最终生成了一个ObservableFlatMap对象,ObservableFlatMap类的狗造方法接收五个参数,简要介绍下:

ObservableSource source:保存上游的observable对象。

Function super T, ? extends ObservableSource extends U>> mapper:调用flatMap操作符时传入的Function接口实现类对象。

boolean delayErrors:当订阅出现异常时,是否立即发送错误(备注:如果delayErrors为true,则第一个出现异常的序列将直接终止整个序列;如果delayErrors为false,则该异常将被推迟,直到整个任务序列被异常终止)。

int maxConcurrency:可以同时订阅的ObservableSource的最大数量(由于FlatMap是一对多变换,因此可能需要多个临时的Observable来辅助变换,最后再将这多个临时的Observable合并为一个将数据发射出去。这里的最大数量就是这多个临时的Observable数量)。

int bufferSize:数据缓冲区的缓存大小(默认为128)。

看下ObservableFlatMap这个类:

public final class ObservableFlatMap extends AbstractObservableWithUpstream {

final Function super T, ? extends ObservableSource extends U>> mapper;

final boolean delayErrors;

final int maxConcurrency;

final int bufferSize;

public ObservableFlatMap(ObservableSource source,

Function super T, ? extends ObservableSource extends U>> mapper,

boolean delayErrors, int maxConcurrency, int bufferSize) {

super(source);

this.mapper = mapper;

this.delayErrors = delayErrors;

this.maxConcurrency = maxConcurrency;

this.bufferSize = bufferSize;

}

@Override

public void subscribeActual(Observer super U> t) {

//如果上游的Observable是ScalarCallable(ScalarCallable接口继承了Callable接口)类型的,

//则委托ObservableScalarXMap执行数据下发,程序返回

//(备注:这里几乎不会执行,在调用FlatMap操作符创建新的Observable对象时就已经经过该类型的判断,参见上面的代码,

//因此若最终返回ObservableFlatMap对象,上游observable对象必定不是ScalarCallable类型)

if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {

return;

}

//如果上游Observable对象不是ScalarCallable类型,通过MergeObserver来实现具体的数据变换以及下发

source.subscribe(new MergeObserver(t, mapper, delayErrors, maxConcurrency, bufferSize));

}

//......代码省略

}

在subscribeActual方法内通过MergeObserver来包装下游的observer,并将其他参数传递进去,辅助变换。在RxJava中这些包装类的设计思路基本都是类似的,因此就不做过多描述了,我们看下MergeObserver内部的onNext方法:

public void onNext(T t) {

// safeguard against misbehaving sources

if (done) {

return;

}

ObservableSource extends U> p;

try {

//保存调用mapper.apply(t)方法生成的Observable,接收上游数据,用于接下来的数据变换

p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

s.dispose();

onError(e);

return;

}

if (maxConcurrency != Integer.MAX_VALUE) {

synchronized (this) {

if (wip == maxConcurrency) {

sources.offer(p);

return;

}

wip++;

}

}

//调用方法

subscribeInner(p);

}

由于RxJava默认传入的maxConcurrency是Integer.MAX_VALUE(这也是我们通常使用的方式),因此直接调用 subscribeInner(p),这个p就是保存我们调用FlatMap操作符时实现的Function接口生成的Observable(p在初始化是调用了mapper.apply(t)方法,t为上游Observable发射的数据),用来进行数据变换。看下这个方法:

void subscribeInner(ObservableSource extends U> p) {

for (;;) {

if (p instanceof Callable) {

//1、如果传入的ObservableSource是Callable类型的(比如上面的示例代码生成的ObservableJust对象,但这属于特例)程序走这里,

//(ScalarCallable接口继承了Callable接口),

//这里tryEmitScalar执行的过程和下面的InnerObserver执行过程极为类似,这里我们就只介绍InnerObserver的执行过程

tryEmitScalar(((Callable extends U>)p));

if (maxConcurrency != Integer.MAX_VALUE) {

synchronized (this) {

p = sources.poll();

if (p == null) {

wip--;

break;

}

}

} else {

break;

}

} else {

//2、Function对象内部返回的observable不是Callable类型的,则为每个observable创建一个InnerObserver,

//本文中我们只关心这里,上面的数据下发过程以此类似,因此我们只要分析一个方面就行了

InnerObserver inner = new InnerObserver(this, uniqueId++);

//MergeObserver内部有一个保存InnerObserver的数组observers,

//因此这个addInner方法就是将每次新建的InnerObserver保存到这个数组中

if (addInner(inner)) {

//对每次创建的InnerObserver执行订阅

p.subscribe(inner);

}

break;

}

}

}

本文我们只分析使用InnerObserver这种方式,InnerObserver又包装了当前的MergeObserver,并将新创建的InnerObserver保存到InnerObserver数组中。看下addInner方法:

boolean addInner(InnerObserver inner) {

//这里通过死循环来保存新建的InnerObserver,确保保存成功

for (;;) {

//从observers获取InnerObserver数组,这个observers是一个AtomicReference类型,

//确保在多线程环境下只有一个InnerObserver数组对象

InnerObserver, ?>[] a = observers.get();

//保存失败的唯一条件是外部取消订阅

if (a == CANCELLED) {

inner.dispose();

return false;

}

int n = a.length;

//保存新建的InnerObserver的目标数组

InnerObserver, ?>[] b = new InnerObserver[n + 1];

//源数组到目标数组的迁移

System.arraycopy(a, 0, b, 0, n);

//保存新建的InnerObserver

b[n] = inner;

//将目标数组设置回observers中,完成保存InnerObserver的数组的更新

if (observers.compareAndSet(a, b)) {

return true;

}

}

}

新建的InnerObserver保存成功后,调用p.subscribe(inner)开始下发数据(这里的p就是Function返回的Observable),依次调用InnerObserver的onSubscribe,onNext,onComplete/onError方法,我们看下这几个方法:

//建立订阅关系,获取disposable订阅状态管理对象

public void onSubscribe(Disposable s) {

if (DisposableHelper.setOnce(this, s)) {

//如果s是QueueDisposable类型

if (s instanceof QueueDisposable) {

@SuppressWarnings("unchecked")

QueueDisposable qd = (QueueDisposable) s;

//获取合并标记

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

//这里根据订阅类型是同步的还是异步的执行相应的操作

if (m == QueueDisposable.SYNC) {

fusionMode = m;

queue = qd;

done = true;

//同步情况下,直接发射数据

parent.drain();

return;

}

if (m == QueueDisposable.ASYNC) {

fusionMode = m;

queue = qd;

}

}

}

}

接着看onNext方法:

public void onNext(U t) {

//由于在onSubscribe中条件(s instanceof QueueDisposable)为false,因此fusionMode值还是0,走第一个条件

if (fusionMode == QueueDisposable.NONE) {

parent.tryEmit(t, this);

} else {

parent.drain();

}

}

看下tryEmit(t, this)方法:

void tryEmit(U value, InnerObserver inner) {

//1、判断VALUE==0是否成立,如果成立则将其设置为1,并执行条件内的方法

if (get() == 0 && compareAndSet(0, 1)) {

//2、调用下游observer的onNext方法,下发数据

actual.onNext(value);

//3、每次下发数据后将VALUE减一,并判断减一后的VALUE是否为0,如果为0,则表示数据都已下发完毕,方法结束

if (decrementAndGet() == 0) {

return;

}

} else {

SimpleQueue q = inner.queue;

if (q == null) {

q = new SpscLinkedArrayQueue(bufferSize);

inner.queue = q;

}

//4、将接收到的上游数据缓存到队列中

q.offer(value);

//5、每次将一个数据缓存到缓存队列中后,将VALUE加一,

//然后判断VALUE加一之前的值,若不等于0,直接返回,结束方法;若等于0,执行步骤6

if (getAndIncrement() != 0) {

return;

}

}

//6、循环获取缓存队列中的数据

drainLoop();

}

MergeObserver继承了AtomicInteger,主要是保证数据的下发是一个一个进行的。

简要介绍下tryEmit方法的运行流程:

tryEmit方法第一次调用时,get() == 0成立,调用compareAndSet(0, 1)将VALUE设置为1,此时执行条件内部语句,开始下发数据。数据下发执行完后调用decrementAndGet()==0,这里是先将VALUE减一,VALUE值变为0,因此VALUE==0成立,方法结束。

如果在下发数据执行期间,上游有新的数据传递过来,此时get() == 0不成立,进入步骤4,将上游发送过来的数据保存到缓存队列中。

执行完步骤4后,调用getAndIncrement()将VALUE值加一,并判断VALUE加一之前的值,若不等于0,结束方法;若等于0,执行步骤6。

步骤6是从队列中循环取出数据并下发给下游observer。其执行条件是步骤3或步骤5的条件判断不成立,什么时候不成立呢?

假设有两个线程在执行,分别称为线程A和线程B,假设线程A先抢到了CPU权限,步骤1条件成立,此时VALUE值为1,但是在执行步骤3之前,线程B抢占了CPU执行权限,线程A处于休眠状态,由于此时VALUE值为1,get() == 0不成立,此时线程B就会将数据缓存到队列中。

一种情况是:线程B继续执行,并在步骤5处将VALUE加一(VALUE值变为2),由于VALUE原先的值为1,因此方法结束,线程B让出CPU权限。线程A获取CPU权限,原先的步骤3继续执行(VALUE的值变为1),条件不成立,执行步骤6,从队列中循环取出数据并下发。

另一种情况是:线程B在执行步骤5之前,失去了CPU的权限。原线程A抢到了CPU权限,执行步骤3(VALUE值变为0),条件3判断成立,线程A中这个方法结束,线程A让出CPU权限。线程B获取CPU权限,继续执行步骤5(VALUE值变为1),但getAndIncrement()方法获取的是VALUE原来的值,原来的值为0,因此条件不成立,执行步骤6,从队列中循环取出数据并下发。

tryEmit方法介绍完了,再来看下drainLoop()这个方法:

void drainLoop() {

final Observer super U> child = this.actual;

int missed = 1;

for (;;) {

//检查订阅是否被终止

if (checkTerminate()) {

return;

}

//获取MergeObserver内的缓存队列

SimplePlainQueue svq = queue;

//通常情况下,MergeObserver内的缓存队列都是空的

if (svq != null) {

//如果缓存队列里面有数据,开始循环

for (;;) {

U o;

for (;;) {

//再次检查订阅是否被终止

if (checkTerminate()) {

return;

}

//从缓存队列中取一个数据

o = svq.poll();

if (o == null) {

break;

}

//调用下游observer的onNext方法,下发数据

child.onNext(o);

}

if (o == null) {

break;

}

}

}

boolean d = done;

svq = queue;

InnerObserver, ?>[] inner = observers.get();

int n = inner.length;

//调用下游observer的onComplete或onError方法

if (d && (svq == null || svq.isEmpty()) && n == 0) {

Throwable ex = errors.terminate();

if (ex != ExceptionHelper.TERMINATED) {

if (ex == null) {

child.onComplete();

} else {

child.onError(ex);

}

}

return;

}

//处理MergeObserver内的InnerObserver数组

boolean innerCompleted = false;

if (n != 0) {

long startId = lastId;

int index = lastIndex;

if (n <= index || inner[index].id != startId) {

if (n <= index) {

index = 0;

}

int j = index;

for (int i = 0; i < n; i++) {

if (inner[j].id == startId) {

break;

}

j++;

if (j == n) {

j = 0;

}

}

index = j;

lastIndex = j;

lastId = inner[j].id;

}

int j = index;

sourceLoop:

//循环处理InnerObserver数组内的每个InnerObserver对象

for (int i = 0; i < n; i++) {

if (checkTerminate()) {

return;

}

@SuppressWarnings("unchecked")

InnerObserver is = (InnerObserver)inner[j];

//处理InnerObserver内的缓存队列,如果有缓存数据,则将其发射出去

for (;;) {

if (checkTerminate()) {

return;

}

SimpleQueue q = is.queue;

if (q == null) {

break;

}

U o;

for (;;) {

try {

o = q.poll();

} catch (Throwable ex) {

Exceptions.throwIfFatal(ex);

is.dispose();

errors.addThrowable(ex);

if (checkTerminate()) {

return;

}

removeInner(is);

innerCompleted = true;

i++;

continue sourceLoop;

}

if (o == null) {

break;

}

//发射InnerObserver内缓存队列缓存的数据

child.onNext(o);

if (checkTerminate()) {

return;

}

}

if (o == null) {

break;

}

}

boolean innerDone = is.done;

SimpleQueue innerQueue = is.queue;

//如果InnerObserver处理完毕,并且其内部缓存队列的数据都已发射出去

if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {

//将InnerObserver从数组中移除

removeInner(is);

if (checkTerminate()) {

return;

}

//设置InnerObserver状态为complete

innerCompleted = true;

}

j++;

if (j == n) {

j = 0;

}

}

lastIndex = j;

lastId = inner[j].id;

}

//当前InnerObserver已经处理完毕,继续循环处理数组中下一个InnerObserver

if (innerCompleted) {

//传入的默认maxConcurrency值是Integer.MAX_VALUE,因此if条件内的代码不会执行

if (maxConcurrency != Integer.MAX_VALUE) {

ObservableSource extends U> p;

synchronized (this) {

p = sources.poll();

if (p == null) {

wip--;

continue;

}

}

subscribeInner(p);

}

//结束当前循环,开始下一循环处理下一个InnerObserver

continue;

}

//每次数据下发完毕,将VALUE值减一

missed = addAndGet(-missed);

//如果VALUE值变为0,表示缓存队列的数据已全部下发完毕,退出循环,方法结束

if (missed == 0) {

break;

}

}

}

至此整个FlatMap操作符的流程就分析完了,总结下:

根据上游Observable对象的类型是不是ScalarCallable类型,FlatMap决定返回相应的新的Observable对象,一般情况下返回的都是ObservableFlatMap对象(ObservableScalarXMap对象处理数据分发的方式与ObservableFlatMap类似,这里我们只分析ObservableFlatMap)。

下游订阅时,触发ObservableFlatMap的subscribeActual方法,触发上游subscribe --> subscribeActual --> 调用MergeObserver的onSubscribe建立订阅关系,上游调用onNext下发数据 --> 调用MergeObserver的onNext,数据下发完毕或者出错调用MergeObserver的onComplete或onError。

在MergeObserver的onNext方法中,获取FlatMap操作符接收的Function对象返回的数据变换Observable(暂时命名为observableA),并为每个返回的observableA创建一个对应的InnerObserver对象(暂时命名为innerObserverA),然后直行订阅observableA.subscribe(innerObserverA),最终将上游的数据经过变换后重新发射出去。

InnerObserver内部有一个缓存队列,用于缓存变换后的数据,其onNext方法内部最终还是调用的MergeObserver的tryEmit方法,将变换后的数据重新发射到下游observer。MergeObserver实现了AtomicInteger类,采用CAS操作保证了数据下发操作的原子性(即每次只有一个数据下发,在当前数据下发过程中,如果上游有新的数据到来,则将新的数据保存到InnerObserver的缓存队列中。等当前数据下发完毕后,再从InnerObserver的缓存队列中取出数据并将其下发给下游observer)。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/177392
推荐阅读
相关标签
  

闽ICP备14008679号