当前位置:   article > 正文

RxJava(六)变换操作符_the method flatmapiterable(function

the method flatmapiterable(function

变换操作符

  1. private void RxJava1() {
  2.         ObservableSource<T> 另一个Observable;
  3.         Predicate<T> 条件;
  4.         prefetch 预取量;
  5.         maxConcurrency 最大并发数;
  6.         //observeOn : 指定观察者观察Observable的调度器
  7.         //subscribeOn : 指定Observable执行任务的调度器
  8.         //判断发射的所有数据项是否都满足Predicate条件
  9.         Observable.range(1,10).all(Predicate<T>);
  10.         //两个Observable中那个先发射了数据,就发射他的全部数据
  11.         Observable.range(1,10).ambWith(ObservableSource<T>);
  12.         Observable.range(1,10).any(Predicate<T>);
  13.         Observable.range(1,10).as(ObservableConverter<T,R>);
  14.         //定时定数收集Observable发射的数据放进一个集合,然后发射这个集合(包含收集的数据项)
  15.         Observable.range(1,10).buffer(int count);
  16.         Observable.range(1,10).buffer(int count,int skip);
  17.         Observable.range(1,10).buffer(long timeSpan,TimeUnit unit);
  18.         Observable.range(1,10).buffer(long timeSpan,TimeUnit unit,int count);
  19.         Observable.range(1,10).buffer(long timeSpan,long timeSkip,TimeUnit unit);
  20.         //发射第一项,否则抛出异常NoSuchElementException
  21.         Flowable.just(1,2,3).blockingFirst(Integer defaultItem);
  22.         //以阻塞方式消耗发射的数据
  23.         Flowable.just(1,2,3).blockingForEach(new Consumer<Integer>);
  24.         //将此Flowable转换为Iterable
  25.         Flowable.just(1,2,3).blockingIterable();
  26.         Flowable.just(1,2,3).blockingIterable(int bufferSize);
  27.         //发射最后一项,否则抛出异常NoSuchElementException
  28.         Flowable.just(1,2,3).blockingLast();
  29.         //发射最后一项,若无数据则发射默认值
  30.         Flowable.just(1,2,3).blockingLast(Integer defaultItem);
  31.         //返回一个Iterable,它将会阻塞,直到此Flowable发出的新项目,并将其返回。
  32.         Flowable.just(1,2,3).blockingLatest();
  33.         //返回一个Iterable,在每次迭代时阻塞,直到此Flowable发出一个新项,然后Iterable返回该项
  34.         Flowable.just(1,2,3).blockingNext();
  35.         //发射单个值,如果它发出多个项目,则抛出IllegalArgumentException
  36.         Flowable.just(1,2,3).blockingSingle();
  37.         //发射单个值,如果它发出多个项目,则抛出IllegalArgumentException,若无数据发射,则返回默认值
  38.         Flowable.just(1,2,3).blockingSingle(Integer defaultItem);
  39.         //将Flowable运行至结束,忽略任何数据和异常
  40.         Flowable.just(1,2,3).blockingSubscribe(new Consumer<>);
  41.         //立即执行
  42.         Observable.range(1,10).compose(ObservableTransformer<>);
  43.         Observable.range(1,10).cacheWithInitialCapacity(int initialCapacity);
  44.         //在发射之前强制将Observable发射的所有数据转换为指定类型
  45.         Observable.range(1,10).cast(Class<T>);
  46.         //收集数据到一个集合,数组;T1为Callable发射数据,T2为Observable发射数据,分别组合被收集
  47.         Observable.range(1,10).collect(Callable<U> initialValueSuplier,BiConsumer<T1,T2>);
  48.         //收集数据到一个集合,数组,T1为U值,T2为Observable发射数据
  49.         Observable.range(1,10).collectInto(U,BiConsumer<T1,T2>);
  50.         //对原始Observable数据应用一个函数
  51.         Observable.range(1,10).map(Function<T,R>);
  52.         //对Observable数据子项进行一个函数变换,返回N个发射这些变换结果的Observables,然后组合这些Obsrvables
  53.         //发射的数据为ObservableSource<T>,当作自己的数据序列发射。
  54.         Observable.range(1,10).flatMap(Function<T,ObservableSource<T>>);
  55.         Observable.range(1,10).flatMap(Function<T,ObservableSource<T>>,boolean delayError,int maxCurrency);
  56.         Observable.range(1,10).flatMapIterable(Function<T,Iterable<T>>);
  57.         //类似FlatMap,只是严格按照顺序
  58.         Observable.range(1,10).concatMap(Function<T,ObservableSource<T>>,int prefetch);
  59.         Observable.range(1,10).concatMapDelayError(Function<T,ObservableSource<T>>);
  60.         //得到一个CompletableSource,
  61.         Observable.range(1,10).concatMapCompletable(Function<T, CompletableSource>);
  62.         Observable.range(1,10).concatMapCompletableDelayError(Function<T, CompletableSource>,boolean delayError);
  63.         Observable.range(1,10).concatMapEager(Function<T,ObservableSource<T>>);
  64.         Observable.range(1,10).concatMapEagerDelayError(Function<T,ObservableSource<T>>,boolean);
  65.         Observable.range(1,10).concatMapIterable(Function<T, Iterator<U>>);
  66.         Observable.range(1,10).concatMapIterable(Function<Integer, Iterator<U>>,int prefetch);
  67.         Observable.range(1,10).concatMapMaybe(Function<T, Maybe<T>>);
  68.         Observable.range(1,10).concatMapMaybeDelayError();
  69.         Observable.range(1,10).concatMapSingle(Function<T, Single<U>>);
  70.         Observable.range(1,10).concatMapSingleDelayError();
  71.         //
  72.         Observable.range(1,10).concatWith(CompletableSource);
  73.         Observable.range(1,10).concatWith(ObservableSource<T>);
  74.         Observable.range(1,10).concatWith(Maybe<T>);
  75.         Observable.range(1,10).concatWith(Single<T>);
  76.         //在同一线程,结果同concatMap。当在不同线程,将Observable发射的数据变换为Observables集合,
  77.         // 当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable
  78.         Observable.range(1,10).switchMap(Function<T,ObservableSource<T>>);
  79.         //判断在发射的所有数据项中是否包含指定的数据
  80.         Observable.range(1,10).contains(Object element);
  81.         //发射数据项的总数
  82.         Observable.range(1,10).count();
  83.         //缓存Observable发射的数据序列并发射相同的数据序列给后续的订阅者
  84.         Observable.range(1,10).cache();
  85.         //发射过快timeOut,丢弃前一次的数据;发射过慢timeOut,马上进行发射;用法同throttleWithTimeout
  86.         Observable.range(1,10).debounce(long timeOut,TimeUnit unit);
  87.         Observable.range(1,10).debounce(long timeOut,TimeUnit unit,Scheduler scheduler);
  88.         Observable.range(1,10).debounce(Function<T,ObservableSource<U>>);
  89.         //如果原始Observable正常终止后仍然没有发射任何数据,就发射一个默认值,内部调用的switchIfEmpty。
  90.         Observable.range(1,10).defaultIfEmpty(Integer defaultItem);
  91.         //延时发射Observable的结果
  92.         Observable.range(1,10).delay(long delay,TimeUnit unit);
  93.         Observable.range(1,10).delay(long delay,TimeUnit unit,boolean delayError);
  94.         Observable.range(1,10).delay(long delay,TimeUnit unit,Scheduler scheduler);
  95.         //延时处理订阅请求
  96.         Observable.range(1,10).delaySubscription(long delay,TimeUnit unit);
  97.         Observable.range(1,10).delaySubscription(long delay,TimeUnit unit,Scheduler scheduler);
  98.         //过滤重复数据
  99.         Observable.range(1,10).distinct();
  100.         Observable.range(1,10).distinct(Function<T,Key>);
  101.         //过滤连续重复数据
  102.         Observable.range(1,10).distinctUntilChanged(Function<T,Key>);
  103.         Observable.range(1,10).distinctUntilChanged(BiPredicate<T1,T2>);
  104.         //发射指定index项数据,没有则发射默认值
  105.         Observable.range(1,10).elementAt(long index);
  106.         Observable.range(1,10).elementAt(long index,Integer defaultItem);
  107.         Observable.range(1,10).elementAtOrError(long index);
  108.         Observable.range(1,10).firstElement();
  109.         Observable.range(1,10).lastElement();
  110.         //不关心发射的数据,只发射onError,onComplete
  111.         Observable.range(1,10).ignoreElements();
  112.         Observable.range(1,10).first(Integer defaultItem);
  113.         Observable.range(1,10).firstOrError();
  114.         Observable.range(1,10).last(Integer defaultItem);
  115.         Observable.range(1,10).lastOrError();
  116.         //BlockingObservable是一个阻塞的Observable。普通的Observable 转换为 BlockingObservable,可以使用
  117.         //Observable.toBlocking( )方法或者BlockingObservable.from( )方法。内部通过CountDownLatch实现了阻塞操作。
  118.         //对BlockingObservable发射的每一项数据调用一个方法,会阻塞直到Observable完成。
  119.         Observable.range(1,10).forEach(Consumer<T>);
  120.         Observable.range(1,10).forEachWhile(Predicate<T> onNext,onError,onComplete);
  121.         //对每一项数据应用一个函数,按条件分配key,相同key分配到一个组,然后发射这些组
  122.         Observable.range(1,10).groupBy(Function<T,Key>);
  123.         Observable.range(1,10).groupBy(Function<T,Key>,boolean delayError);
  124.         Observable.range(1,10).groupBy(Function<T,Key>,Function<T,V>);
  125.         Observable.range(1,10).groupBy(Function<T,Key>,Function<T,V>,boolean delayError);
  126.         //将主题封装在一个可观察的对象中,这使得消费代码无法将其转换回主题,是隐藏实现细节的一种防御机制。
  127.         Observable.range(1,10).hide();
  128.         //用于判断Observable有没有发射数据。不为空false,如果只收到了onComplete通知则为true。
  129.         Observable.range(1,10).isEmpty();
  130.         //两个Observable在交叉发射时间内,应用一个函数结合他们的数据,只要其中一个发射了onComplete则结束
  131.         //ObLeft.jion(ObRight,Func1,Func2,BiFunc)
  132.         Observable.range(1,10).join(ObservableSource<T> right,Function<T,
  133.         ObservableSource<T>> funcLeft, Function<T,ObservableSource<T>> funcRight,BiFunction<T1,T2,R> funcR);
  134.         //结果相同,只是BiFunction组合的是Observable
  135.         Observable.range(1,10).groupJoin(参数同上);
  136.         //所有操作符的基础,内部都有调用
  137.         Observable.range(1,10).lift(ObservableOperator<R,T>);
  138.         // 将Observable转换成一个通知列表,发送通知
  139.         Observable.range(1,10).materialize();
  140.         //将通知转换为一个Observable
  141.         Observable.range(1,10).dematerialize();
  142.         //组合两个Observable,Maybe,Single,Complable
  143.         Observable.range(1,10).mergeWith(ObservableSource<T>);
  144.         //过滤指定类型的数据,与filter类似
  145.         Observable.range(1,10).ofType(Class<T>);
  146.         //当原始Observable在遇到错误时,使用备用Observable。可以处理所有的错误
  147.         Observable.range(1,10).onErrorResumeNext(ObservableSource<T> next);
  148.         Observable.range(1,10).onErrorResumeNext(Function<Throwable,ObservableSource<T>>);
  149.         //当原始Observable在遇到错误时发射一个特定的数据
  150.         Observable.range(1,10).onErrorReturn(Function<Throwable,T>);
  151.         Observable.range(1,10).onErrorReturnItem(item);
  152.         //当原始Observable在遇到错误时,使用备用Observable。只能处理异常。
  153.         Observable.range(1,10).onExceptionResumeNext(ObservableSource<T> next);
  154.         //当数据切断的时候,要执行此操作,防止内存泄露
  155.         Observable.range(1,10).onTerminateDetach();
  156.         //当接收到onComplete()会触发重新订阅,默认情况下运行在一个新的线程上
  157.         Observable.range(1,10).repeat();
  158.         Observable.range(1,10).repeat(long time);
  159.         //直到满足什么条件就重新订阅;BooleanSupplier存储一个boolean值,.getAsBoolean()获取boolean值;
  160.         Observable.range(1,10).repeatUntil(BooleanSupplier stop);
  161.         //当满足什么条件就会重新订阅
  162.         Observable.range(1,10).repeatWhen(Function<Observable<T>,ObservableSource<R>>);
  163.         //对数据序列应用一个函数,然后发射结果
  164.         Observable.range(1,10).reduce(BiFunction<T1,T2,R>);
  165.         //生成connectableObservable,使得无论什么时候订阅都能收到完整数据
  166.         Observable.range(1,10).replay();
  167.         //参数,1个,2个,3个组合
  168.         Observable.range(1,10).replay(int bufferSize,long Time,TimeUnit unit,Scheduler shceduler);
  169.         Observable.range(1,10).replay(Function<Observable<T>,ObservableSource<R>>);
  170.         //当原始Observable在遇到错误时进行重试。
  171.         Observable.range(1,10).retry();
  172.         Observable.range(1,10).retry(long time);
  173.         Observable.range(1,10).retry(Predicate<Throwable>);
  174.         Observable.range(1,10).retry(long time,Predicate<Throwable>);
  175.         Observable.range(1,10).retry(BiPredicate<T,Throwable>);
  176.         Observable.range(1,10).retryUntil(BooleanSupplier stop);
  177.         //当原始Observable在遇到错误,将错误传递给另一个ObservableSource<T>来决定是否要重新订阅这个Observable,
  178.         Observable.range(1,10).retryWhen(Function<Observable<Throwable>,ObservableSource<T>>);
  179.         //如果Observable终止时只发射了一个值,返回那个值,否则抛出异常或者发射默认值。
  180.         Observable.range(1,10).single(T defaultItem);
  181.         Observable.range(1,10).singleOrError();
  182.         Observable.range(1,10).singleElement();
  183.         //定期发射Observable最近的数据(最后一个)
  184.         Observable.range(1,10).sample(long peroid,TimeUnit unit);
  185.         Observable.range(1,10).sample(long peroid,TimeUnit unit,boolean emitLast);
  186.         Observable.range(1,10).sample(ObservableSource<U>);
  187.         Observable.range(1,10).sample(ObservableSource<U>,boolean emitLast);
  188.         Observable.range(1,10).sample(long peroid,TimeUnit unit,Scheduler scheduler);
  189.         Observable.range(1,10).sample(long peroid,TimeUnit unit,Scheduler scheduler,boolean emitLast);
  190.         //与reduce很像,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。
  191.         Observable.range(1,10).scan(BiFunction<T1,T2,R>);
  192.         Observable.range(1,10).scanWith(...);
  193.         //跳过定时定量数据
  194.         Observable.range(1,10).skip(long count);
  195.         Observable.range(1,10).skip(long time,TimeUnit unit);
  196.         Observable.range(1,10).skip(long time,TimeUnit unit,Scheduler scheduler)
  197.         Observable.range(1,10).skipLast(同上);
  198.         Observable.range(1,10).skipLast(同上,boolean delayError);
  199.         //忽略原始Observable发射的数据,直到自定义的Observable发射了一个数据,Observer开始接收数据。
  200.         Observable.range(1,10).skipUntil(ObservableSource<T> other);
  201.         //忽略,直到指定条件不成立
  202.         Observable.range(1,10).skipWhile(Predicate<T>);
  203.         //发射定时定量数据
  204.         Observable.range(1,10).take(同skip);
  205.         Observable.range(1,10).takeLast(同skip );
  206.         Observable.range(1,10).takeUntil(ObservableSource<T> other);
  207.         Observable.range(1,10).takeUntil(Predicate<T>);
  208.         Observable.range(1,10).takeWhile(Predicate<T>);
  209.         //共享数据,每个Observer都可已收到相同的数据
  210.         Observable.range(1,10).share();
  211.         //强制Observable按次序发射数据并且要求功能是完好的
  212.         Observable.range(1,10).serialize();
  213.         //排序
  214.         Observable.range(1,10).sorted();
  215.         //Comparator外比较器
  216.         Observable.range(1,10).sorted(Comparator<T>);
  217.         //在发射数据序列之前添加数据序列
  218.         Observable.range(1,10).startWith(T item);
  219.         Observable.range(1,10).startWith(Iterable<T>);
  220.         Observable.range(1,10).startWith(ObservableSource<T> other);
  221.         Observable.range(1,10).startWithArray(Array[]);
  222.         //如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable。
  223.         Observable.range(1,10).switchIfEmpty(ObservableSource<T>);
  224.         //只发射第一个数据,忽略skipDuration内发射的数据
  225.         Observable.range(1,10).throttleFirst( long skipDuration, TimeUnit unit, Scheduler scheduler);
  226.         Observable.range(1,10).throttleFirst( long windowDuration, TimeUnit unit);
  227.         //定期发射Observable最近的数据(最后一个)
  228.         Observable.range(1,10).throttleLast(同上);
  229.         Observable.range(1,10).throttleLatest(long timeout,TimeUnit unit);
  230.         Observable.range(1,10).throttleLatest(long timeout,TimeUnit unit,boolean emitLast);
  231.         Observable.range(1,10).throttleLatest(long timeout,TimeUnit unit,Scheduler shceduler,boolean emitLast);
  232.         //发射过快timeOut,丢弃前一次的数据;发射过慢timeOut,马上进行发射
  233.         Observable.range(1,10).throttleWithTimeout(long timeout,TimeUnit unit);
  234.         Observable.range(1,10).throttleWithTimeout(long timeout,TimeUnit unit,Scheduler scheduler);
  235.         //发射两个数据之间的间隔时间
  236.         Observable.range(1,10).timeInterval();
  237.         Observable.range(1,10).timeInterval(TimeUnit unit);
  238.         Observable.range(1,10).timeInterval(Scheduler scheduler);
  239.         Observable.range(1,10).timeInterval(TimeUnit unit,Scheduler scheduler);
  240.         //给Observable发射的每个数据项添加一个时间戳。数据带上时间
  241.         Observable.range(1,10).timestamp(参数同上);
  242.         //如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable
  243.         Observable.range(1,10).timeout(long timeout,TimeUnit unit);
  244.         Observable.range(1,10).timeout(long timeout,TimeUnit unit,Scheduler shceduler);
  245.         Observable.range(1,10).timeout(long timeout,TimeUnit unit,ObservableSource<T> other);
  246.         //转换
  247.         Observable.range(1,10).to(Function<Observable<T>,R>);
  248.         Observable.range(1,10).toFlowable(BackpressureStrategy);
  249.         Observable.range(1,10).toFuture();
  250.         //收集原始Observable发射的所有数据到一个List集合
  251.         Observable.range(1,10).toList();
  252.         //将数据序列转换为一个Map,根据数据生成Key,Value
  253.         Observable.range(1,10).toMap(Function<T,K>,Function<T,V>);
  254.         //将数据序列转换为一个Map,根据数据生成Key,Value,类似于toMap,不同的地方在于map的value是一个集合。
  255.         Observable.range(1,10).toMultimap(Function<T,K>,Function<T,V>, Collection<V>);
  256.         //收集原始Observable发射的所有数据到一个有序List集合
  257.         Observable.range(1,10).toSortedList();
  258.         //将原始Observerable拆分成包含n项的窗口,然后发射这些窗口;
  259.         Observable.range(1,10).window(long count);
  260.         Observable.range(1,10).window(long count,long skip);
  261.         Observable.range(1,10).window(long timeSpan,TimeUnit unit);
  262.         Observable.range(1,10).window(long count,long skip,int bufferSize);
  263.         Observable.range(1,10).withLatestFrom(ObservableSource<T> other,BiFunction<T2,T2,R>);
  264.         Observable.range(1,10).withLatestFrom(ObservableSource<T>[] others,Function<T[],R>);
  265.         Observable.range(1,10).withLatestFrom(Iterable<ObservableSource<T>> iterables,Function<T[],R>);        //按顺序依次组合多个Observable的数据项应用一个函数,直到数据项最少的那个Observable用完数据项为止不再组合,然后发射组合项
  266.         Observable.range(1,10).zipWith(ObservableSource<T>,BiFunction<T1,T2,R>);
  267.         Observable.range(1,10).zipWith(ObservableSource<T>,BiFunction<T1,T2,R>,boolean delayError);
  268.         Observable.range(1,10).zipWith(ObservableSource<T>,BiFunction<T1,T2,R>,boolean delayError,int bufferSize);
  269.         Observable.range(1,10).zipWith(Iterable<T>,BiFunction<T1,T2,R>);
  270.     }


map    对原始Observable的数据进行一个函数操作。
flatMap    对原始Observable的数据进行一个函数操作,然后将每一项数据合并,作为自己的数据序列发射,合并的数据顺序可能交错。

  1. private void rxJavTest() {
  2.        Observable.just(1,2,3)
  3.                .subscribeOn(Schedulers.io())
  4.                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
  5.                    @Override
  6.                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
  7.                        integer = integer * 2;
  8.                        return Observable.just(integer);
  9.                    }
  10.                }).observeOn(AndroidSchedulers.mainThread())
  11.                .subscribe(new Consumer<Integer>() {
  12.                    @Override
  13.                    public void accept(Integer integer) {
  14.                        Log.d(TAG, "accept: " + integer);
  15.                    }
  16.                });
  17.     }


输出结果:
accept: 2
accept: 4
accept: 6

concatMap    同flatMap,只是合并后数据严格按照顺序发射。
switchMap    在同一线程,结果同concatMap。当在不同线程,如newThread则只发射最近请求。

  1.  private void rxJavTest() {
  2.        Observable.just(1,2,3)
  3.                .subscribeOn(Schedulers.io())
  4.                .switchMap(new Function<Integer, ObservableSource<Integer>>() {
  5.                    @Override
  6.                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
  7.                        integer = integer * 2;
  8.                        return Observable.just(integer).subscribeOn(Schedulers.newThread());
  9.                    }
  10.                }).observeOn(AndroidSchedulers.mainThread())
  11.                .subscribe(new Consumer<Integer>() {
  12.                    @Override
  13.                    public void accept(Integer integer) {
  14.                        Log.d(TAG, "accept: " + integer);
  15.                    }
  16.                });
  17.     }


输出结果:
accept: 6

groupBy 对每一项数据应用一个函数,按条件分配key,相同key分配到一个组,然后发射这些组。

  1.  private void rxJavTest() {
  2.         
  3.        Observable.range(1,15)
  4.                .subscribeOn(Schedulers.io())
  5.                .groupBy(new Function<Integer, Integer>() {
  6.                    @Override
  7.                    public Integer apply(Integer integer) {
  8.                        //分为5组
  9.                        return integer % 5;
  10.                    }
  11.                }).observeOn(AndroidSchedulers.mainThread())
  12.                .subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
  13.                    @Override
  14.                    public void accept(GroupedObservable<Integer, Integer> groupedObservable) throws Exception {
  15.                        Log.d(TAG, "accept: group key = " + groupedObservable.getKey());
  16.                        groupedObservable.subscribe(new Consumer<Integer>() {
  17.                            @Override
  18.                            public void accept(Integer integer) throws Exception {
  19.                                Log.d(TAG, "accept: group 数据项 :" + integer);
  20.                            }
  21.                        });
  22.                    }
  23.                });
  24.     }



输出结果:
accept: group key = 1
accept: group 数据项 :1
accept: group 数据项 :6
accept: group 数据项 :11
accept: group key = 2
accept: group 数据项 :2
accept: group 数据项 :7
accept: group 数据项 :12
accept: group key = 3
accept: group 数据项 :3
accept: group 数据项 :8
accept: group 数据项 :13
accept: group key = 4
accept: group 数据项 :4
accept: group 数据项 :9
accept: group 数据项 :14
accept: group key = 0
accept: group 数据项 :5
accept: group 数据项 :10
accept: group 数据项 :15buffer    定期收集原始Observable发射的数据形成一个包裹,然后当成自己的数据序列发射这个包裹。

  1. private void rxJavTest() {
  2.         Observable.range(1, 10)
  3.                 .subscribeOn(Schedulers.io())
  4.                 .buffer(5)
  5.                 .observeOn(AndroidSchedulers.mainThread())
  6.                 .subscribe(new Consumer<List<Integer>>() {
  7.                     @Override
  8.                     public void accept(List<Integer> integers) throws Exception {
  9.                         for (int i = 0; i < integers.size(); i++) {
  10.                             Log.d(TAG, "list大小" + integers.size()
  11.                                     +"   list包裹中的数据项:" + integers.get(i));
  12.                         }
  13.                     }
  14.                 });
  15.     }


输出结果:
list大小5 list包裹中的数据项:1
list大小5 list包裹中的数据项:2
list大小5 list包裹中的数据项:3
list大小5 list包裹中的数据项:4
list大小5 list包裹中的数据项:5
list大小5 list包裹中的数据项:6
list大小5 list包裹中的数据项:7
list大小5 list包裹中的数据项:8
list大小5 list包裹中的数据项:9
list大小5 list包裹中的数据项:10

bufer(n,skip)    Observer每接受到一个原始Observable发射的数据,就用n项数据填充缓存:接受到数据项 --> 之后n-1项 ;然后发射这些缓存。

  1. private void rxJavTest() {
  2.         Observable.range(1, 10)
  3.                 .subscribeOn(Schedulers.io())
  4.                 .buffer(5,1)
  5.                 .observeOn(AndroidSchedulers.mainThread())
  6.                 .subscribe(new Consumer<List<Integer>>() {
  7.                     @Override
  8.                     public void accept(List<Integer> integers) throws Exception {
  9.                         Log.d(TAG, "accept: List :" + integers.size());
  10.                         for (int i = 0; i < integers.size(); i++) {
  11.                             Log.d(TAG, "accept: 数据项:" + integers.get(i));
  12.                         }
  13.                     }
  14.                 });
  15.     }



输出结果:
accept: List :5    [1,2,3,4,5]
accept: List :5    [2,3,4,5,6]
accept: List :5    [3,4,5,6,7]
accept: List :5    [4,5,6,7,8]
accept: List :5    [5,6,7,8,9]
accept: List :5    [6,7,8,9,10]
accept: List :4    [7,8,9,10]
accept: List :3    [8,9,10]
accept: List :2    [9,10]
accept: List :1    [10]

window    将原始Observerable拆分成包含n项的窗口Observable,然后发射这些窗口;
private void rxJavTest() {

  1.         Observable.range(1, 10)
  2.                 .subscribeOn(Schedulers.io())
  3.                 .window(5)
  4.                 .observeOn(AndroidSchedulers.mainThread())
  5.                 .subscribe(new Consumer<Observable<Integer>>() {
  6.                     @Override
  7.                     public void accept(Observable<Integer> integerObservable) throws Exception {
  8.                         Log.d(TAG, "accept: 一个Window包含:");
  9.                         integerObservable.subscribe(new Consumer<Integer>() {
  10.                             @Override
  11.                             public void accept(Integer integer) throws Exception {
  12.                                 Log.d(TAG, "accept: 数据项:" + integer);
  13.                             }
  14.                         });
  15.                     }
  16.                 });
  17.     }



输出结果:
accept: 一个Window包含:
accept: 数据项:1
accept: 数据项:2
accept: 数据项:3
accept: 数据项:4
accept: 数据项:5
accept: 一个Window包含:
accept: 数据项:6
accept: 数据项:7
accept: 数据项:8
accept: 数据项:9
accept: 数据项:10

repeat 创建一个重复发射原始数据序列的Observable。
(1)repeat(n) 重复发射n次。
(2)repeatWhen    当满足某种条件时才重新订阅和发送原始数据。

  1.         final long start = System.currentTimeMillis();
  2.         Observable.just(1,2,3)
  3.                 .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  4.                     @Override
  5.                     public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
  6.                         return Observable.timer(3,TimeUnit.SECONDS);
  7.                     }
  8.                 }).subscribe(new Consumer<Integer>() {
  9.             @Override
  10.             public void accept(Integer integer) throws Exception {
  11.                 Log.d(TAG, "值 :" + integer + " 收到时间 :" + (System.currentTimeMillis() - start));
  12.             }
  13.         });


输出结果:
值 :1 收到时间 :22
值 :2 收到时间 :22
值 :3 收到时间 :22
值 :1 收到时间 :3023
值 :2 收到时间 :3024
值 :3 收到时间 :3024

(3)repeatUntil 直到某个条件就不再重复发射原始数据。

  1. final long start = System.currentTimeMillis();
  2.         Observable.just(1,2,3)
  3.                 .repeatUntil(new BooleanSupplier() {
  4.                     @Override
  5.                     public boolean getAsBoolean() throws Exception {
  6.                         //重复发射数据,直到时间走过50毫秒
  7.                         return (System.currentTimeMillis() - start) > 50;
  8.                     }
  9.                 }).subscribe(new Consumer<Integer>() {
  10.             @Override
  11.             public void accept(Integer integer) throws Exception {
  12.                 Log.d(TAG, "值 :" + integer + " 收到时间 :" + (System.currentTimeMillis() - start));
  13.             }
  14.         });



打印结果太多,就不贴出了。
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/177374
推荐阅读
相关标签