赞
踩
变换操作符
- private void RxJava1() {
-
- ObservableSource<T> 另一个Observable;
- Predicate<T> 条件;
- prefetch 预取量;
- maxConcurrency 最大并发数;
-
- //observeOn : 指定观察者观察Observable的调度器
- //subscribeOn : 指定Observable执行任务的调度器
-
- //判断发射的所有数据项是否都满足Predicate条件
- Observable.range(1,10).all(Predicate<T>);
- //两个Observable中那个先发射了数据,就发射他的全部数据
- Observable.range(1,10).ambWith(ObservableSource<T>);
-
- Observable.range(1,10).any(Predicate<T>);
-
- Observable.range(1,10).as(ObservableConverter<T,R>);
-
-
- //定时定数收集Observable发射的数据放进一个集合,然后发射这个集合(包含收集的数据项)
- Observable.range(1,10).buffer(int count);
- Observable.range(1,10).buffer(int count,int skip);
- Observable.range(1,10).buffer(long timeSpan,TimeUnit unit);
- Observable.range(1,10).buffer(long timeSpan,TimeUnit unit,int count);
- Observable.range(1,10).buffer(long timeSpan,long timeSkip,TimeUnit unit);
-
- //发射第一项,否则抛出异常NoSuchElementException
- Flowable.just(1,2,3).blockingFirst(Integer defaultItem);
- //以阻塞方式消耗发射的数据
- Flowable.just(1,2,3).blockingForEach(new Consumer<Integer>);
- //将此Flowable转换为Iterable
- Flowable.just(1,2,3).blockingIterable();
- Flowable.just(1,2,3).blockingIterable(int bufferSize);
- //发射最后一项,否则抛出异常NoSuchElementException
- Flowable.just(1,2,3).blockingLast();
- //发射最后一项,若无数据则发射默认值
- Flowable.just(1,2,3).blockingLast(Integer defaultItem);
- //返回一个Iterable,它将会阻塞,直到此Flowable发出的新项目,并将其返回。
- Flowable.just(1,2,3).blockingLatest();
- //返回一个Iterable,在每次迭代时阻塞,直到此Flowable发出一个新项,然后Iterable返回该项
- Flowable.just(1,2,3).blockingNext();
- //发射单个值,如果它发出多个项目,则抛出IllegalArgumentException
- Flowable.just(1,2,3).blockingSingle();
- //发射单个值,如果它发出多个项目,则抛出IllegalArgumentException,若无数据发射,则返回默认值
- Flowable.just(1,2,3).blockingSingle(Integer defaultItem);
- //将Flowable运行至结束,忽略任何数据和异常
- Flowable.just(1,2,3).blockingSubscribe(new Consumer<>);
-
- //立即执行
- Observable.range(1,10).compose(ObservableTransformer<>);
-
- Observable.range(1,10).cacheWithInitialCapacity(int initialCapacity);
-
- //在发射之前强制将Observable发射的所有数据转换为指定类型
- Observable.range(1,10).cast(Class<T>);
-
- //收集数据到一个集合,数组;T1为Callable发射数据,T2为Observable发射数据,分别组合被收集
- Observable.range(1,10).collect(Callable<U> initialValueSuplier,BiConsumer<T1,T2>);
- //收集数据到一个集合,数组,T1为U值,T2为Observable发射数据
- Observable.range(1,10).collectInto(U,BiConsumer<T1,T2>);
-
-
- //对原始Observable数据应用一个函数
- Observable.range(1,10).map(Function<T,R>);
- //对Observable数据子项进行一个函数变换,返回N个发射这些变换结果的Observables,然后组合这些Obsrvables
- //发射的数据为ObservableSource<T>,当作自己的数据序列发射。
- Observable.range(1,10).flatMap(Function<T,ObservableSource<T>>);
- Observable.range(1,10).flatMap(Function<T,ObservableSource<T>>,boolean delayError,int maxCurrency);
- Observable.range(1,10).flatMapIterable(Function<T,Iterable<T>>);
- //类似FlatMap,只是严格按照顺序
- Observable.range(1,10).concatMap(Function<T,ObservableSource<T>>,int prefetch);
- Observable.range(1,10).concatMapDelayError(Function<T,ObservableSource<T>>);
- //得到一个CompletableSource,
- Observable.range(1,10).concatMapCompletable(Function<T, CompletableSource>);
- Observable.range(1,10).concatMapCompletableDelayError(Function<T, CompletableSource>,boolean delayError);
- Observable.range(1,10).concatMapEager(Function<T,ObservableSource<T>>);
- Observable.range(1,10).concatMapEagerDelayError(Function<T,ObservableSource<T>>,boolean);
- Observable.range(1,10).concatMapIterable(Function<T, Iterator<U>>);
- Observable.range(1,10).concatMapIterable(Function<Integer, Iterator<U>>,int prefetch);
- Observable.range(1,10).concatMapMaybe(Function<T, Maybe<T>>);
- Observable.range(1,10).concatMapMaybeDelayError();
- Observable.range(1,10).concatMapSingle(Function<T, Single<U>>);
- Observable.range(1,10).concatMapSingleDelayError();
- //
- Observable.range(1,10).concatWith(CompletableSource);
- Observable.range(1,10).concatWith(ObservableSource<T>);
- Observable.range(1,10).concatWith(Maybe<T>);
- Observable.range(1,10).concatWith(Single<T>);
-
- //在同一线程,结果同concatMap。当在不同线程,将Observable发射的数据变换为Observables集合,
- // 当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable
- Observable.range(1,10).switchMap(Function<T,ObservableSource<T>>);
-
- //判断在发射的所有数据项中是否包含指定的数据
- Observable.range(1,10).contains(Object element);
-
- //发射数据项的总数
- Observable.range(1,10).count();
-
- //缓存Observable发射的数据序列并发射相同的数据序列给后续的订阅者
- Observable.range(1,10).cache();
-
- //发射过快timeOut,丢弃前一次的数据;发射过慢timeOut,马上进行发射;用法同throttleWithTimeout
- Observable.range(1,10).debounce(long timeOut,TimeUnit unit);
- Observable.range(1,10).debounce(long timeOut,TimeUnit unit,Scheduler scheduler);
- Observable.range(1,10).debounce(Function<T,ObservableSource<U>>);
-
- //如果原始Observable正常终止后仍然没有发射任何数据,就发射一个默认值,内部调用的switchIfEmpty。
- Observable.range(1,10).defaultIfEmpty(Integer defaultItem);
-
- //延时发射Observable的结果
- Observable.range(1,10).delay(long delay,TimeUnit unit);
- Observable.range(1,10).delay(long delay,TimeUnit unit,boolean delayError);
- Observable.range(1,10).delay(long delay,TimeUnit unit,Scheduler scheduler);
- //延时处理订阅请求
- Observable.range(1,10).delaySubscription(long delay,TimeUnit unit);
- Observable.range(1,10).delaySubscription(long delay,TimeUnit unit,Scheduler scheduler);
-
- //过滤重复数据
- Observable.range(1,10).distinct();
- Observable.range(1,10).distinct(Function<T,Key>);
- //过滤连续重复数据
- Observable.range(1,10).distinctUntilChanged(Function<T,Key>);
- Observable.range(1,10).distinctUntilChanged(BiPredicate<T1,T2>);
-
- //发射指定index项数据,没有则发射默认值
- Observable.range(1,10).elementAt(long index);
- Observable.range(1,10).elementAt(long index,Integer defaultItem);
- Observable.range(1,10).elementAtOrError(long index);
- Observable.range(1,10).firstElement();
- Observable.range(1,10).lastElement();
- //不关心发射的数据,只发射onError,onComplete
- Observable.range(1,10).ignoreElements();
-
- Observable.range(1,10).first(Integer defaultItem);
- Observable.range(1,10).firstOrError();
- Observable.range(1,10).last(Integer defaultItem);
- Observable.range(1,10).lastOrError();
-
- //BlockingObservable是一个阻塞的Observable。普通的Observable 转换为 BlockingObservable,可以使用
- //Observable.toBlocking( )方法或者BlockingObservable.from( )方法。内部通过CountDownLatch实现了阻塞操作。
- //对BlockingObservable发射的每一项数据调用一个方法,会阻塞直到Observable完成。
- Observable.range(1,10).forEach(Consumer<T>);
- Observable.range(1,10).forEachWhile(Predicate<T> onNext,onError,onComplete);
-
- //对每一项数据应用一个函数,按条件分配key,相同key分配到一个组,然后发射这些组
- Observable.range(1,10).groupBy(Function<T,Key>);
- Observable.range(1,10).groupBy(Function<T,Key>,boolean delayError);
- Observable.range(1,10).groupBy(Function<T,Key>,Function<T,V>);
- Observable.range(1,10).groupBy(Function<T,Key>,Function<T,V>,boolean delayError);
-
- //将主题封装在一个可观察的对象中,这使得消费代码无法将其转换回主题,是隐藏实现细节的一种防御机制。
- Observable.range(1,10).hide();
-
- //用于判断Observable有没有发射数据。不为空false,如果只收到了onComplete通知则为true。
- Observable.range(1,10).isEmpty();
-
- //两个Observable在交叉发射时间内,应用一个函数结合他们的数据,只要其中一个发射了onComplete则结束
- //ObLeft.jion(ObRight,Func1,Func2,BiFunc)
- Observable.range(1,10).join(ObservableSource<T> right,Function<T,
- ObservableSource<T>> funcLeft, Function<T,ObservableSource<T>> funcRight,BiFunction<T1,T2,R> funcR);
- //结果相同,只是BiFunction组合的是Observable
- Observable.range(1,10).groupJoin(参数同上);
-
- //所有操作符的基础,内部都有调用
- Observable.range(1,10).lift(ObservableOperator<R,T>);
-
- // 将Observable转换成一个通知列表,发送通知
- Observable.range(1,10).materialize();
- //将通知转换为一个Observable
- Observable.range(1,10).dematerialize();
-
- //组合两个Observable,Maybe,Single,Complable
- Observable.range(1,10).mergeWith(ObservableSource<T>);
-
- //过滤指定类型的数据,与filter类似
- Observable.range(1,10).ofType(Class<T>);
-
- //当原始Observable在遇到错误时,使用备用Observable。可以处理所有的错误
- Observable.range(1,10).onErrorResumeNext(ObservableSource<T> next);
- Observable.range(1,10).onErrorResumeNext(Function<Throwable,ObservableSource<T>>);
- //当原始Observable在遇到错误时发射一个特定的数据
- Observable.range(1,10).onErrorReturn(Function<Throwable,T>);
- Observable.range(1,10).onErrorReturnItem(item);
- //当原始Observable在遇到错误时,使用备用Observable。只能处理异常。
- Observable.range(1,10).onExceptionResumeNext(ObservableSource<T> next);
-
- //当数据切断的时候,要执行此操作,防止内存泄露
- Observable.range(1,10).onTerminateDetach();
-
- //当接收到onComplete()会触发重新订阅,默认情况下运行在一个新的线程上
- Observable.range(1,10).repeat();
- Observable.range(1,10).repeat(long time);
- //直到满足什么条件就重新订阅;BooleanSupplier存储一个boolean值,.getAsBoolean()获取boolean值;
- Observable.range(1,10).repeatUntil(BooleanSupplier stop);
- //当满足什么条件就会重新订阅
- Observable.range(1,10).repeatWhen(Function<Observable<T>,ObservableSource<R>>);
-
- //对数据序列应用一个函数,然后发射结果
- Observable.range(1,10).reduce(BiFunction<T1,T2,R>);
-
- //生成connectableObservable,使得无论什么时候订阅都能收到完整数据
- Observable.range(1,10).replay();
- //参数,1个,2个,3个组合
- Observable.range(1,10).replay(int bufferSize,long Time,TimeUnit unit,Scheduler shceduler);
- Observable.range(1,10).replay(Function<Observable<T>,ObservableSource<R>>);
-
- //当原始Observable在遇到错误时进行重试。
- Observable.range(1,10).retry();
- Observable.range(1,10).retry(long time);
- Observable.range(1,10).retry(Predicate<Throwable>);
- Observable.range(1,10).retry(long time,Predicate<Throwable>);
- Observable.range(1,10).retry(BiPredicate<T,Throwable>);
- Observable.range(1,10).retryUntil(BooleanSupplier stop);
- //当原始Observable在遇到错误,将错误传递给另一个ObservableSource<T>来决定是否要重新订阅这个Observable,
- Observable.range(1,10).retryWhen(Function<Observable<Throwable>,ObservableSource<T>>);
-
- //如果Observable终止时只发射了一个值,返回那个值,否则抛出异常或者发射默认值。
- Observable.range(1,10).single(T defaultItem);
- Observable.range(1,10).singleOrError();
- Observable.range(1,10).singleElement();
-
- //定期发射Observable最近的数据(最后一个)
- Observable.range(1,10).sample(long peroid,TimeUnit unit);
- Observable.range(1,10).sample(long peroid,TimeUnit unit,boolean emitLast);
- Observable.range(1,10).sample(ObservableSource<U>);
- Observable.range(1,10).sample(ObservableSource<U>,boolean emitLast);
- Observable.range(1,10).sample(long peroid,TimeUnit unit,Scheduler scheduler);
- Observable.range(1,10).sample(long peroid,TimeUnit unit,Scheduler scheduler,boolean emitLast);
-
- //与reduce很像,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。
- Observable.range(1,10).scan(BiFunction<T1,T2,R>);
- Observable.range(1,10).scanWith(...);
-
- //跳过定时定量数据
- Observable.range(1,10).skip(long count);
- Observable.range(1,10).skip(long time,TimeUnit unit);
- Observable.range(1,10).skip(long time,TimeUnit unit,Scheduler scheduler)
- Observable.range(1,10).skipLast(同上);
- Observable.range(1,10).skipLast(同上,boolean delayError);
- //忽略原始Observable发射的数据,直到自定义的Observable发射了一个数据,Observer开始接收数据。
- Observable.range(1,10).skipUntil(ObservableSource<T> other);
- //忽略,直到指定条件不成立
- Observable.range(1,10).skipWhile(Predicate<T>);
- //发射定时定量数据
- Observable.range(1,10).take(同skip);
- Observable.range(1,10).takeLast(同skip );
- Observable.range(1,10).takeUntil(ObservableSource<T> other);
- Observable.range(1,10).takeUntil(Predicate<T>);
- Observable.range(1,10).takeWhile(Predicate<T>);
-
- //共享数据,每个Observer都可已收到相同的数据
- Observable.range(1,10).share();
-
- //强制Observable按次序发射数据并且要求功能是完好的
- Observable.range(1,10).serialize();
-
- //排序
- Observable.range(1,10).sorted();
- //Comparator外比较器
- Observable.range(1,10).sorted(Comparator<T>);
-
- //在发射数据序列之前添加数据序列
- Observable.range(1,10).startWith(T item);
- Observable.range(1,10).startWith(Iterable<T>);
- Observable.range(1,10).startWith(ObservableSource<T> other);
- Observable.range(1,10).startWithArray(Array[]);
-
- //如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable。
- Observable.range(1,10).switchIfEmpty(ObservableSource<T>);
-
- //只发射第一个数据,忽略skipDuration内发射的数据
- Observable.range(1,10).throttleFirst( long skipDuration, TimeUnit unit, Scheduler scheduler);
- Observable.range(1,10).throttleFirst( long windowDuration, TimeUnit unit);
- //定期发射Observable最近的数据(最后一个)
- Observable.range(1,10).throttleLast(同上);
- Observable.range(1,10).throttleLatest(long timeout,TimeUnit unit);
- Observable.range(1,10).throttleLatest(long timeout,TimeUnit unit,boolean emitLast);
- Observable.range(1,10).throttleLatest(long timeout,TimeUnit unit,Scheduler shceduler,boolean emitLast);
- //发射过快timeOut,丢弃前一次的数据;发射过慢timeOut,马上进行发射
- Observable.range(1,10).throttleWithTimeout(long timeout,TimeUnit unit);
- Observable.range(1,10).throttleWithTimeout(long timeout,TimeUnit unit,Scheduler scheduler);
-
- //发射两个数据之间的间隔时间
- Observable.range(1,10).timeInterval();
- Observable.range(1,10).timeInterval(TimeUnit unit);
- Observable.range(1,10).timeInterval(Scheduler scheduler);
- Observable.range(1,10).timeInterval(TimeUnit unit,Scheduler scheduler);
-
- //给Observable发射的每个数据项添加一个时间戳。数据带上时间
- Observable.range(1,10).timestamp(参数同上);
-
- //如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable
- Observable.range(1,10).timeout(long timeout,TimeUnit unit);
- Observable.range(1,10).timeout(long timeout,TimeUnit unit,Scheduler shceduler);
- Observable.range(1,10).timeout(long timeout,TimeUnit unit,ObservableSource<T> other);
-
- //转换
- Observable.range(1,10).to(Function<Observable<T>,R>);
- Observable.range(1,10).toFlowable(BackpressureStrategy);
- Observable.range(1,10).toFuture();
- //收集原始Observable发射的所有数据到一个List集合
- Observable.range(1,10).toList();
- //将数据序列转换为一个Map,根据数据生成Key,Value
- Observable.range(1,10).toMap(Function<T,K>,Function<T,V>);
- //将数据序列转换为一个Map,根据数据生成Key,Value,类似于toMap,不同的地方在于map的value是一个集合。
- Observable.range(1,10).toMultimap(Function<T,K>,Function<T,V>, Collection<V>);
-
- //收集原始Observable发射的所有数据到一个有序List集合
- Observable.range(1,10).toSortedList();
-
- //将原始Observerable拆分成包含n项的窗口,然后发射这些窗口;
- Observable.range(1,10).window(long count);
- Observable.range(1,10).window(long count,long skip);
- Observable.range(1,10).window(long timeSpan,TimeUnit unit);
- Observable.range(1,10).window(long count,long skip,int bufferSize);
-
- Observable.range(1,10).withLatestFrom(ObservableSource<T> other,BiFunction<T2,T2,R>);
- Observable.range(1,10).withLatestFrom(ObservableSource<T>[] others,Function<T[],R>);
- Observable.range(1,10).withLatestFrom(Iterable<ObservableSource<T>> iterables,Function<T[],R>); //按顺序依次组合多个Observable的数据项应用一个函数,直到数据项最少的那个Observable用完数据项为止不再组合,然后发射组合项
- Observable.range(1,10).zipWith(ObservableSource<T>,BiFunction<T1,T2,R>);
- Observable.range(1,10).zipWith(ObservableSource<T>,BiFunction<T1,T2,R>,boolean delayError);
- Observable.range(1,10).zipWith(ObservableSource<T>,BiFunction<T1,T2,R>,boolean delayError,int bufferSize);
- Observable.range(1,10).zipWith(Iterable<T>,BiFunction<T1,T2,R>);
- }
map 对原始Observable的数据进行一个函数操作。
flatMap 对原始Observable的数据进行一个函数操作,然后将每一项数据合并,作为自己的数据序列发射,合并的数据顺序可能交错。
- private void rxJavTest() {
-
- Observable.just(1,2,3)
- .subscribeOn(Schedulers.io())
- .flatMap(new Function<Integer, ObservableSource<Integer>>() {
- @Override
- public ObservableSource<Integer> apply(Integer integer) throws Exception {
- integer = integer * 2;
- return Observable.just(integer);
- }
- }).observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) {
- Log.d(TAG, "accept: " + integer);
- }
- });
- }
输出结果:
accept: 2
accept: 4
accept: 6
concatMap 同flatMap,只是合并后数据严格按照顺序发射。
switchMap 在同一线程,结果同concatMap。当在不同线程,如newThread则只发射最近请求。
- private void rxJavTest() {
-
- Observable.just(1,2,3)
- .subscribeOn(Schedulers.io())
- .switchMap(new Function<Integer, ObservableSource<Integer>>() {
- @Override
- public ObservableSource<Integer> apply(Integer integer) throws Exception {
- integer = integer * 2;
- return Observable.just(integer).subscribeOn(Schedulers.newThread());
- }
- }).observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) {
- Log.d(TAG, "accept: " + integer);
- }
- });
- }
输出结果:
accept: 6
groupBy 对每一项数据应用一个函数,按条件分配key,相同key分配到一个组,然后发射这些组。
- private void rxJavTest() {
-
- Observable.range(1,15)
- .subscribeOn(Schedulers.io())
- .groupBy(new Function<Integer, Integer>() {
- @Override
- public Integer apply(Integer integer) {
- //分为5组
- return integer % 5;
- }
- }).observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
- @Override
- public void accept(GroupedObservable<Integer, Integer> groupedObservable) throws Exception {
- Log.d(TAG, "accept: group key = " + groupedObservable.getKey());
- groupedObservable.subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.d(TAG, "accept: group 数据项 :" + integer);
- }
- });
- }
- });
- }
输出结果:
accept: group key = 1
accept: group 数据项 :1
accept: group 数据项 :6
accept: group 数据项 :11
accept: group key = 2
accept: group 数据项 :2
accept: group 数据项 :7
accept: group 数据项 :12
accept: group key = 3
accept: group 数据项 :3
accept: group 数据项 :8
accept: group 数据项 :13
accept: group key = 4
accept: group 数据项 :4
accept: group 数据项 :9
accept: group 数据项 :14
accept: group key = 0
accept: group 数据项 :5
accept: group 数据项 :10
accept: group 数据项 :15buffer 定期收集原始Observable发射的数据形成一个包裹,然后当成自己的数据序列发射这个包裹。
- private void rxJavTest() {
-
- Observable.range(1, 10)
- .subscribeOn(Schedulers.io())
- .buffer(5)
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<List<Integer>>() {
- @Override
- public void accept(List<Integer> integers) throws Exception {
- for (int i = 0; i < integers.size(); i++) {
- Log.d(TAG, "list大小" + integers.size()
- +" list包裹中的数据项:" + integers.get(i));
- }
- }
- });
- }
输出结果:
list大小5 list包裹中的数据项:1
list大小5 list包裹中的数据项:2
list大小5 list包裹中的数据项:3
list大小5 list包裹中的数据项:4
list大小5 list包裹中的数据项:5
list大小5 list包裹中的数据项:6
list大小5 list包裹中的数据项:7
list大小5 list包裹中的数据项:8
list大小5 list包裹中的数据项:9
list大小5 list包裹中的数据项:10
bufer(n,skip) Observer每接受到一个原始Observable发射的数据,就用n项数据填充缓存:接受到数据项 --> 之后n-1项 ;然后发射这些缓存。
- private void rxJavTest() {
-
- Observable.range(1, 10)
- .subscribeOn(Schedulers.io())
- .buffer(5,1)
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<List<Integer>>() {
- @Override
- public void accept(List<Integer> integers) throws Exception {
- Log.d(TAG, "accept: List :" + integers.size());
- for (int i = 0; i < integers.size(); i++) {
- Log.d(TAG, "accept: 数据项:" + integers.get(i));
- }
- }
- });
- }
输出结果:
accept: List :5 [1,2,3,4,5]
accept: List :5 [2,3,4,5,6]
accept: List :5 [3,4,5,6,7]
accept: List :5 [4,5,6,7,8]
accept: List :5 [5,6,7,8,9]
accept: List :5 [6,7,8,9,10]
accept: List :4 [7,8,9,10]
accept: List :3 [8,9,10]
accept: List :2 [9,10]
accept: List :1 [10]
window 将原始Observerable拆分成包含n项的窗口Observable,然后发射这些窗口;
private void rxJavTest() {
- Observable.range(1, 10)
- .subscribeOn(Schedulers.io())
- .window(5)
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<Observable<Integer>>() {
- @Override
- public void accept(Observable<Integer> integerObservable) throws Exception {
- Log.d(TAG, "accept: 一个Window包含:");
- integerObservable.subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.d(TAG, "accept: 数据项:" + integer);
- }
- });
- }
- });
- }
输出结果:
accept: 一个Window包含:
accept: 数据项:1
accept: 数据项:2
accept: 数据项:3
accept: 数据项:4
accept: 数据项:5
accept: 一个Window包含:
accept: 数据项:6
accept: 数据项:7
accept: 数据项:8
accept: 数据项:9
accept: 数据项:10
repeat 创建一个重复发射原始数据序列的Observable。
(1)repeat(n) 重复发射n次。
(2)repeatWhen 当满足某种条件时才重新订阅和发送原始数据。
- final long start = System.currentTimeMillis();
- Observable.just(1,2,3)
- .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
- @Override
- public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
- return Observable.timer(3,TimeUnit.SECONDS);
- }
- }).subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.d(TAG, "值 :" + integer + " 收到时间 :" + (System.currentTimeMillis() - start));
- }
- });
输出结果:
值 :1 收到时间 :22
值 :2 收到时间 :22
值 :3 收到时间 :22
值 :1 收到时间 :3023
值 :2 收到时间 :3024
值 :3 收到时间 :3024
(3)repeatUntil 直到某个条件就不再重复发射原始数据。
- final long start = System.currentTimeMillis();
- Observable.just(1,2,3)
- .repeatUntil(new BooleanSupplier() {
- @Override
- public boolean getAsBoolean() throws Exception {
- //重复发射数据,直到时间走过50毫秒
- return (System.currentTimeMillis() - start) > 50;
- }
- }).subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.d(TAG, "值 :" + integer + " 收到时间 :" + (System.currentTimeMillis() - start));
- }
- });
打印结果太多,就不贴出了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。