赞
踩
FlatMap 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并 后放进一个单独的Observable
FlatMap 操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这 个函数返回一个本身也发射数据的Observable,然后FlatMap 合并这些Observables发射的数 据,最后将合并后的结果当做它自己的数据序列发射。
- Observable.just(1).flatMap(new Func1<Integer, Observable<Integer>>() {
- @Override
- public Observable<Integer> call(final Integer integer) {
- return Observable.create(new Observable.OnSubscribe<Integer>() {
- @Override
- public void call(Subscriber<? super Integer> subscriber) {
- subscriber.onNext(integer);
- subscriber.onCompleted();
- }
- });
- }
- }).subscribe(new Action1<Integer>() {
- @Override
- public void call(Integer integer) {
- log(integer + "");
- }
- });
- }
- public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
- if (getClass() == ScalarSynchronousObservable.class) {
- return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
- }
- return merge(map(func));
- }
这里just产生的是ScalarSynchronousObservable的Observable所以走上面一个分支,调用它的scalarFlatMap
- public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
- return create(new OnSubscribe<R>() {
- @Override
- public void call(final Subscriber<? super R> child) {
- Observable<? extends R> o = func.call(t);
- if (o instanceof ScalarSynchronousObservable) {
- child.setProducer(createProducer(child, ((ScalarSynchronousObservable<? extends R>)o).t));
- } else {
- o.unsafeSubscribe(Subscribers.wrap(child));
- }
- }
- });
- }
这里重新创建了一个实现OnSubscribe接口的类,当我们subscribe时候,会调用它的call,这里会调用我们demo中flatMap中的Func1方法
判断返回Observable的类型
这里我们不是ScalarSynchronousObservable,走下面一个分支
wrap函数很简单
- */
- public static <T> Subscriber<T> wrap(final Subscriber<? super T> subscriber) {
- return new Subscriber<T>(subscriber) {
-
- @Override
- public void onCompleted() {
- subscriber.onCompleted();
- }
-
- @Override
- public void onError(Throwable e) {
- subscriber.onError(e);
- }
-
- @Override
- public void onNext(T t) {
- subscriber.onNext(t);
- }
-
- };
- }
- public void call(Subscriber<? super Integer> subscriber) {
- subscriber.onNext(integer);
- subscriber.onCompleted();
- }
这里传递过来的Subscriber即我们前面wrap的Subscriber
flatMap()
操作符使用你提供的原本会被原始Observable
发送的事件,来创建一个新的Observable
。而且这个操作符,返回的是一个自身发送事件并合并结果的Observable
。可以用于任何由原始Observable
发送出的事件,发送合并后的结果。记住,flatMap()
可能交错的发送事件,最终结果的顺序可能并是不原始Observable
发送时的顺序。为了防止交错的发生(从上面的图可以看出),可以使用与之类似的concatMap()
操作符
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。