赞
踩
RxJava 是一个响应式编程框架,里面代码比较复杂,本系列文章将从以下几个角度来分析这个框架。
map、flatmap
。在上一篇文章中,我们了解到调用每个操作符,都会返回一个与之对应的 Observable,且相关的逻辑都封装在里面,因此我们在分析操作符的原理时,可以直接去查找与操作符对应的 Observable。例如 map 对应 ObservableMap
、flatmap 对应 ObservableFlatMap
。
本文我们就来分析一下这两个操作符的原理。
版本:
Gradle 依赖:
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
implementation 'io.reactivex.rxjava2:rxjava:2.2.19'
关联文章:
Client 调用示例代码:
private void testJust() {
Observable.just("11", "22", "33")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
// map操作符是将接收的元素类型转换为另一种类型。
return Integer.parseInt(s);
}
})
.subscribe(new Observer<Integer>() {
//略
});
}
我们知道 RxJava 调用链的串联分为三个步骤,因此我们从这三个步骤来进行分析。
- 创建任务链阶段: 在调用
map
操作符时,会创建一个 ObservableMap 对象。- 逆向逐级订阅阶段: 当 ObservableMap 执行
subscribe()
方法时,会调用ObservableMap.subscribeActual()
方法,这里会生成一个MapObserver
对调用map
操作符之前生成的 Observable 进行订阅 (此处指代调用just
生成的Observable )。- 执行任务链阶段: 执行任务链之后,
MapObserver.onNext()
方法会被触发,因此类型转换的逻辑就在这里面。
下面我们来具体分析一下 ObservableMap
代码。
ObservableMap
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { // 1.此处的 source 指代调用just操作符生成的ObservableFromArray。 super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { /* * 2.当前的MapObserver对ObservableFromArray进行订阅, * 因此MapObserver会接收到ObservableFromArray发出的事件。 * * 注:此处subscribeActual接收的Observer参数,其实是用于订阅ObservableMap的。 */ source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { // 3.执行类型转换操作,这里的mapper就是用户在外部创建的Function对象。 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } /* * 4.将转换过的数据发送给下一个Observer接收者。 * 此处的downstream其实就是外部传入的订阅ObervableMap消息的接收者。 */ downstream.onNext(v); } // ...省略代码... } }
流程图如下:
小结:
Observable 与 Observer 的映射关系:
MapObserver
接收just
操作符生成的ObservableFromArray
发出的事件。用户创建的 Observer
接收map
操作符生成的ObservableMap
发出的事件。
ObservableMap 和 MapObserver 的相关操作逻辑:
- 创建 ObservableMap 对象时,会接收上一个 Observable (即ObservableFromArray )对象。
- 当
ObservableMap.subscribeActual()
被调用时,会让 MapObserver 对 ObservableFromArray 进行订阅,因此 MapObserver 就会接收到 ObservableFromArray 发出的事件。- 执行任务阶段,会调用
MapObserver.onNext()
方法,进行数据的类型转换处理。- 将类型转换后的数据传递给 ObservableMap 的事件接收者。
Client 调用示例代码:
private void testJust() { Observable.just("11", "22", "33") .flatMap(new Function<String, Observable<Integer>>() { @Override public Observable<Integer> apply(String s) throws Exception { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(Integer.parseInt(s)); } }); } .subscribe(new Observer<Integer>() { //略 }); }
flatmap
操作符会生成对应的 ObservableFlatMap,逻辑都在 ObservableFlatMap 中,下面我们来具体分析一下 ObservableFlatMap 代码。
ObservableMap
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends ObservableSource<? extends U>> mapper; // ...省略代码... public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { // 1.source 是指向调用flatmap的Obserable对象,此处示例代码指向ObservableFromArray。 super(source); // 2.mapper是让用户进行逻辑操作的回调。 this.mapper = mapper; // ...省略代码... } @Override public void subscribeActual(Observer<? super U> t) { if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { return; } // 3.将ObservableFromArray与MergeObserver进行关联,此时MergeObserver就可以接收来自ObservableFromArray的事件。 source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } }
ObservableMap.MergeObserver
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> { final Observer<? super U> downstream; MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { this.downstream = actual; this.mapper = mapper; // ...省略代码... } @Override public void onNext(T t) { // safeguard against misbehaving sources if (done) { return; } ObservableSource<? extends U> p; try { // 4.在任务处理阶段,会触发mapper.apply(t)操作,并返回一个ObservableSource对象。 p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } // ...省略代码... // 5.对ObservableSource发送的事件进行监听操作。 subscribeInner(p); } @SuppressWarnings("unchecked") void subscribeInner(ObservableSource<? extends U> p) { for (;;) { if (p instanceof Callable) { // ...省略代码... } else { // 6.将当前的MergeObserver传递到InnerObserver,便于后面在InnerObserver中调用MergeObserver方法。 InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++); if (addInner(inner)) { // 7.将用户新创建的ObservableSource与InnerObserver进行关联, // 此时InnerObserver就可以接收用户新创建的ObservableSource发出的事件。 p.subscribe(inner); } break; } } } // 9.这个方法是在 InnerObserver.onNext() 中被调用的。 void tryEmit(U value, InnerObserver<T, U> inner) { if (get() == 0 && compareAndSet(0, 1)) { // 10.downstream 指向用户创建的Observer。 downstream.onNext(value); if (decrementAndGet() == 0) { return; } } else { // ...省略代码... } drainLoop(); } }
ObservableFlatMap.InnerObserver
static final class InnerObserver<T, U> extends AtomicReference<Disposable> implements Observer<U> { final MergeObserver<T, U> parent; InnerObserver(MergeObserver<T, U> parent, long id) { this.id = id; this.parent = parent; } @Override public void onNext(U t) { if (fusionMode == QueueDisposable.NONE) { // 8.parent 就是 ObservableFlatMap.MergeObserver // 所以这里调用 MergeObserver.tryEmit() 方法。 parent.tryEmit(t, this); } else { parent.drain(); } } }
流程图如下:
注:图中的序号与小结中文字的序号无关。
小结:
Observable 与 Observer 的映射关系:
just
操作符生成的ObservableFromArray
所发出的事件被MergeObserver
接收。map
操作符生成的ObservableFlatMap
所发出的事件被用户创建的 Observer
接收。mapper.apply()
操作过程生成的新Observable
所发出的事件被ObservableFlatMap.InnerObserver
接收。
ObservableFlatMap 和 MergeObserver、InnerObserver 的相关操作逻辑:
- 创建 ObservableFlatMap 对象时,会接收上一个 Observable (即ObservableFromArray )对象。
- 当
ObservableFlatMap .subscribeActual()
被调用时,会将ObservableFromArray
与MergeObserver
进行关联,此时 MergeObserver 就会接收到 ObservableFromArray 发出的事件。- 执行任务阶段,在执行
MergeObserver.onNext()
方法时,此时会执行mapper.apply
生成一个新的ObservableSource。- 将
ObservableSource
与InnerObserver
进行关联,此时 InnerObserver 就会接收到 ObservableSource 发出的事件。- 在
InnerObserver.onNext()
内会调用MergeObserver.tryEmit()
方法。- 在
MergeObserver.tryEmit()
方法中最终会调用用户的Observer.onNext()
方法。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。