当前位置:   article > 正文

Rxjava(变换类)--FlatMap_observable.flatmap

observable.flatmap

FlatMap 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并 后放进一个单独的Observable


FlatMap 操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这 个函数返回一个本身也发射数据的Observable,然后FlatMap 合并这些Observables发射的数 据,最后将合并后的结果当做它自己的数据序列发射。

  1. Observable.just(1).flatMap(new Func1<Integer, Observable<Integer>>() {
  2. @Override
  3. public Observable<Integer> call(final Integer integer) {
  4. return Observable.create(new Observable.OnSubscribe<Integer>() {
  5. @Override
  6. public void call(Subscriber<? super Integer> subscriber) {
  7. subscriber.onNext(integer);
  8. subscriber.onCompleted();
  9. }
  10. });
  11. }
  12. }).subscribe(new Action1<Integer>() {
  13. @Override
  14. public void call(Integer integer) {
  15. log(integer + "");
  16. }
  17. });
  18. }

看一下flatMap的实现

  1. public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
  2. if (getClass() == ScalarSynchronousObservable.class) {
  3. return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
  4. }
  5. return merge(map(func));
  6. }
这里just产生的是ScalarSynchronousObservable的Observable所以走上面一个分支,调用它的scalarFlatMap

  1. public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
  2. return create(new OnSubscribe<R>() {
  3. @Override
  4. public void call(final Subscriber<? super R> child) {
  5. Observable<? extends R> o = func.call(t);
  6. if (o instanceof ScalarSynchronousObservable) {
  7. child.setProducer(createProducer(child, ((ScalarSynchronousObservable<? extends R>)o).t));
  8. } else {
  9. o.unsafeSubscribe(Subscribers.wrap(child));
  10. }
  11. }
  12. });
  13. }
这里重新创建了一个实现OnSubscribe接口的类,当我们subscribe时候,会调用它的call,这里会调用我们demo中flatMap中的Func1方法

判断返回Observable的类型

这里我们不是ScalarSynchronousObservable,走下面一个分支

wrap函数很简单

  1. */
  2. public static <T> Subscriber<T> wrap(final Subscriber<? super T> subscriber) {
  3. return new Subscriber<T>(subscriber) {
  4. @Override
  5. public void onCompleted() {
  6. subscriber.onCompleted();
  7. }
  8. @Override
  9. public void onError(Throwable e) {
  10. subscriber.onError(e);
  11. }
  12. @Override
  13. public void onNext(T t) {
  14. subscriber.onNext(t);
  15. }
  16. };
  17. }

这里订阅的时候会调用我们创建flatMap中创建Observable的call函数

  1. public void call(Subscriber<? super Integer> subscriber) {
  2. subscriber.onNext(integer);
  3. subscriber.onCompleted();
  4. }


这里传递过来的Subscriber即我们前面wrap的Subscriber


flatMap()操作符使用你提供的原本会被原始Observable发送的事件,来创建一个新的Observable。而且这个操作符,返回的是一个自身发送事件并合并结果的Observable。可以用于任何由原始Observable发送出的事件,发送合并后的结果。记住,flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序。为了防止交错的发生(从上面的图可以看出),可以使用与之类似的concatMap()操作符

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/177398
推荐阅读
相关标签
  

闽ICP备14008679号