赞
踩
返回一个Observable,它向源ObservableSource和指定的集合ObservableSource发出的一对值发射指定函数的结果,
同时限制对这些ObservableSources的并发预订的最大数量
public final <R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends R>> mapper) { return flatMap(mapper, false); } public final <R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) { return flatMap(mapper, delayErrors, Integer.MAX_VALUE); } //bufferSize() = 128 public final <R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) { return flatMap(mapper, delayErrors, maxConcurrency, bufferSize()); } public final <R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { public final <R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends R>> onNextMapper, Function<? super Throwable, ? extends ObservableSource<? extends R>> onErrorMapper, Callable<? extends ObservableSource<? extends R>> onCompleteSupplier) { public final <R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends R>> onNextMapper, Function<Throwable, ? extends ObservableSource<? extends R>> onErrorMapper, Callable<? extends ObservableSource<? extends R>> onCompleteSupplier, int maxConcurrency) { public final <R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends R>> mapper, int maxConcurrency) { return flatMap(mapper, false, maxConcurrency, bufferSize()); } public final <U, R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends R> resultSelector) { return flatMap(mapper, resultSelector, false, bufferSize(), bufferSize()); } public final <U, R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors) { return flatMap(mapper, combiner, delayErrors, bufferSize(), bufferSize()); } public final <U, R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors, int maxConcurrency) { return flatMap(mapper, combiner, delayErrors, maxConcurrency, bufferSize()); } public final <U, R> Observable<R> flatMap( Function<? super T, ? extends ObservableSource<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends R> combiner, int maxConcurrency) { return flatMap(mapper, combiner, false, maxConcurrency, bufferSize()); } // mapper (映射器) : 为源ObservableSource发出的每个项目返回ObservableSource的函数 // combiner (结合): 一个函数,它将源和集合ObservableSources的每一个发出的一项组合在一起, // 并且返回要由结果ObservableSource发出的一项 // delayErrors (延迟错误): // 如果为true,则将当前Observable和所有内部ObservableSource的异常延迟到它们全部终止之前 // 如果为false,则第一个发出异常的信号将立即终止整个序列 // maxConcurrency (最大并发): 可以同时订阅的ObservableSources的最大数量 // bufferSize (缓冲区大小): 要从内部ObservableSources中预取的元素数。 public final <U, R> Observable<R> flatMap( final Function<? super T, ? extends ObservableSource<? extends U>> mapper, final BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors, int maxConcurrency, int bufferSize)
public final <U> Observable<U> flatMapIterable(
final Function<? super T, ? extends Iterable<? extends U>> mapper)
public final <U, V> Observable<V> flatMapIterable(
final Function<? super T, ? extends Iterable<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends V> resultSelector)
将上游Observable的每个元素映射到MaybeSources中,
订阅所有元素并将它们的onSuccess值以不特定的顺序合并到单个Observable序列中。
public final <R> Observable<R> flatMapMaybe(
Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
return flatMapMaybe(mapper, false);
}
//delayErrors : 延迟错误
public final <R> Observable<R> flatMapMaybe(
Function<? super T, ? extends MaybeSource<? extends R>> mapper,
boolean delayErrors)
将上游Observable的每个元素映射到CompletableSources中,对其进行订阅,
并等待上游和所有CompletableSources完成,从而有选择地延迟所有错误。
public final Completable flatMapCompletable(
Function<? super T, ? extends CompletableSource> mapper) {
return flatMapCompletable(mapper, false);
}
//delayErrors : 延迟错误
public final Completable flatMapCompletable(
Function<? super T, ? extends CompletableSource> mapper,
boolean delayErrors)
将上游Observable的每个元素映射到SingleSources中,
并对其进行订阅并将它们的onSuccess值【以不特定顺序】合并到单个Observable序列中,
可以选择延迟所有错误。
public final <R> Observable<R> flatMapSingle(
Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return flatMapSingle(mapper, false);
}
//delayErrors : 延迟错误
public final <R> Observable<R> flatMapSingle(
Function<? super T, ? extends SingleSource<? extends R>> mapper,
boolean delayErrors)
//正常发射 public void flatMap1() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).flatMap( new Function<Integer, ObservableSource<Integer>>() { /** * @param integer 原始Observable发出的数据 */ @Override public ObservableSource<Integer> apply(Integer integer) throws Exception { System.out.println("Function 原始Observable============" + integer); return Observable.just(integer * 100); } }, new BiFunction<Integer, Integer, Integer>() { /** * @param integer : 原始Observable发出的数据 * @param integer2 : flatmap里Observable发出的的数据 */ @Override public Integer apply(Integer integer, Integer integer2) throws Exception { System.out.println("====BiFunction 原始Observable=========" + integer); System.out.println("====BiFunction flatmap里Observable====" + integer2); return integer * integer2; } }, true, Observable.bufferSize(), Observable.bufferSize()) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("=========onSubscribe========="); } @Override public void onNext(Integer integer) { System.out.println("========onNext==========" + integer); } @Override public void onError(Throwable e) { System.out.println("========onError==========" + e.getMessage()); } @Override public void onComplete() { System.out.println("=========onComplete========="); } }); } //=========onSubscribe========= //Function 原始Observable============1 //====BiFunction 原始Observable=========1 //====BiFunction flatmap里Observable====100 //========onNext==========100 //Function 原始Observable============2 //====BiFunction 原始Observable=========2 //====BiFunction flatmap里Observable====200 //========onNext==========400 //Function 原始Observable============3 //====BiFunction 原始Observable=========3 //====BiFunction flatmap里Observable====300 //========onNext==========900 //=========onComplete========= //发射error public void flatMap2() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Error("=error=")); emitter.onNext(3); emitter.onComplete(); } }).flatMap( new Function<Integer, ObservableSource<Integer>>() { /** * @param integer 原始Observable发出的数据 */ @Override public ObservableSource<Integer> apply(Integer integer) throws Exception { System.out.println("Function 原始Observable============" + integer); return Observable.just(integer * 100); } }, new BiFunction<Integer, Integer, Integer>() { /** * @param integer : 原始Observable发出的数据 * @param integer2 : flatmap里Observable发出的的数据 */ @Override public Integer apply(Integer integer, Integer integer2) throws Exception { System.out.println("====BiFunction 原始Observable===========" + integer); System.out.println("====BiFunction flatmap里Observable======" + integer2); return integer * integer2; } }, true, Observable.bufferSize(), Observable.bufferSize()) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("=========onSubscribe========="); } @Override public void onNext(Integer integer) { System.out.println("========onNext==========" + integer); } @Override public void onError(Throwable e) { System.out.println("========onError==========" + e.getMessage()); } @Override public void onComplete() { System.out.println("=========onComplete========="); } }); } //=========onSubscribe========= //Function 原始Observable============1 //====BiFunction 原始Observable===========1 //====BiFunction flatmap里Observable======100 //========onNext==========100 //Function 原始Observable============2 //====BiFunction 原始Observable===========2 //====BiFunction flatmap里Observable======200 //========onNext==========400 //========onError===========error=
/** * 我们最终只需要知道成功还是失败可以使用flatMapCompletable */ //======================================================================= // delayErrors = false // flatMapCompletable 处理发射的数据并添加onComplete、onError 直接回调结束 //======================================================================= public void flatMapCompletable1() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onNext(6); emitter.onNext(7); emitter.onNext(8); emitter.onNext(9); emitter.onNext(10); } }).flatMapCompletable( new Function<Integer, CompletableSource>() { @Override public CompletableSource apply(Integer integer) throws Exception { System.out.println("========flatMapCompletable======" + integer); if (integer % 2 == 0) { return Completable.error(new Throwable("-error-")); } else { return Completable.complete(); } } }, false) .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("========onSubscribe=========="); } @Override public void onComplete() { System.out.println("========onComplete=========="); } @Override public void onError(Throwable e) { System.out.println("========onError==========" + e.getMessage()); } }); } //========onSubscribe========== //========flatMapCompletable======1 //========flatMapCompletable======2 //========onError==========-error- public void flatMapCompletable2() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onNext(6); emitter.onError(new Error("-error-")); emitter.onNext(7); emitter.onNext(8); emitter.onNext(9); emitter.onNext(10); } }).flatMapCompletable( new Function<Integer, CompletableSource>() { @Override public CompletableSource apply(Integer integer) throws Exception { System.out.println("========flatMapCompletable======" + integer); if (integer % 2 == 0) { return Completable.error(new Throwable("-error-")); } else { return Completable.complete(); } } }, false) .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("========onSubscribe=========="); } @Override public void onComplete() { System.out.println("========onComplete=========="); } @Override public void onError(Throwable e) { System.out.println("========onError==========" + e.getMessage()); } }); } //========onSubscribe========== //========flatMapCompletable======1 //========flatMapCompletable======2 //========onError==========-error- public void flatMapCompletable3() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onNext(6); emitter.onComplete(); emitter.onNext(7); emitter.onNext(8); emitter.onNext(9); emitter.onNext(10); } }).flatMapCompletable( new Function<Integer, CompletableSource>() { @Override public CompletableSource apply(Integer integer) throws Exception { System.out.println("========flatMapCompletable======" + integer); if (integer % 2 == 0) { return Completable.error(new Throwable("-error-")); } else { return Completable.complete(); } } }, false) .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("========onSubscribe=========="); } @Override public void onComplete() { System.out.println("========onComplete=========="); } @Override public void onError(Throwable e) { System.out.println("========onError==========" + e.getMessage()); } }); } //========onSubscribe========== //========flatMapCompletable======1 //========flatMapCompletable======2 //========onError==========-error- //======================================================================= // delayErrors = true // flatMapCompletable 处理发射的数据并添加onComplete、onError // 但不会结束,直到Observable 发射onComplete、onError 结束 //======================================================================= public void flatMapCompletable4() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onNext(6); emitter.onNext(7); emitter.onNext(8); emitter.onNext(9); emitter.onNext(10); } }).flatMapCompletable( new Function<Integer, CompletableSource>() { @Override public CompletableSource apply(Integer integer) throws Exception { System.out.println("========flatMapCompletable======" + integer); if (integer % 2 == 0) { return Completable.error(new Throwable("-error-")); } else { return Completable.complete(); } } }, true) .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("========onSubscribe=========="); } @Override public void onComplete() { System.out.println("========onComplete=========="); } @Override public void onError(Throwable e) { System.out.println("========onError==========" + e.getMessage()); } }); } //========onSubscribe========== //========flatMapCompletable======1 //========flatMapCompletable======2 //========flatMapCompletable======3 //========flatMapCompletable======4 //========flatMapCompletable======5 //========flatMapCompletable======6 //========flatMapCompletable======7 //========flatMapCompletable======8 //========flatMapCompletable======9 //========flatMapCompletable======10 public void flatMapCompletable5() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onNext(6); emitter.onError(new Error("-error-")); emitter.onNext(7); emitter.onNext(8); emitter.onNext(9); emitter.onNext(10); } }).flatMapCompletable( new Function<Integer, CompletableSource>() { @Override public CompletableSource apply(Integer integer) throws Exception { System.out.println("========flatMapCompletable======" + integer); if (integer % 2 == 0) { return Completable.error(new Throwable("-error-")); } else { return Completable.complete(); } } }, true) .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("========onSubscribe=========="); } @Override public void onComplete() { System.out.println("========onComplete=========="); } @Override public void onError(Throwable e) { System.out.println("========onError==========" + e.getMessage()); } }); } //========onSubscribe========== //========flatMapCompletable======1 //========flatMapCompletable======2 //========flatMapCompletable======3 //========flatMapCompletable======4 //========flatMapCompletable======5 //========flatMapCompletable======6 //========onError==========4 exceptions occurred. public void flatMapCompletable6() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onNext(6); emitter.onComplete(); emitter.onNext(7); emitter.onNext(8); emitter.onNext(9); emitter.onNext(10); } }).flatMapCompletable( new Function<Integer, CompletableSource>() { @Override public CompletableSource apply(Integer integer) throws Exception { System.out.println("========flatMapCompletable======" + integer); if (integer % 2 == 0) { return Completable.error(new Throwable("-error-")); } else { return Completable.complete(); } } }, true) .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("========onSubscribe=========="); } @Override public void onComplete() { System.out.println("========onComplete=========="); } @Override public void onError(Throwable e) { System.out.println("========onError==========" + e.getMessage()); } }); } //========onSubscribe========== //========flatMapCompletable======1 //========flatMapCompletable======2 //========flatMapCompletable======3 //========flatMapCompletable======4 //========flatMapCompletable======5 //========flatMapCompletable======6 //========onError==========3 exceptions occurred.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。