当前位置:   article > 正文

android RxJava2版本使用简介_android observable.just

android observable.just

一、库的引入

GitHub下载地址:https://github.com/ReactiveX/RxJava

目前最新版本如图:


根据版本号,引入Rxjava库到我的项目里,如图:


同时为了更好的兼容Android,我也引入了Rxandroid,github地址如下:

https://github.com/ReactiveX/RxAndroid

这样,Rxjava与Rxandriod库的引入以告大吉,接下来就开始使用了。

二、基本使用方法

RxJava使用的是观察者模式。是由:

观察者:监视着被观察者,当被观察者发生变化时通知观察者,然后观察者执行相应的操作;

被观察者:被监视的对象,当某个状态改变时告诉观察者;

订阅(或注册、关联):将观察者与被观察者建立联系。

它三者的关系就好比一个Button的点击事件:

观察者:OnClickListener;

被观察者:Button;

订阅(或注册):setOnClickListener();

而将其对应到RxJava的对象为:

观察者:Observer;

被观察者:Observable;

订阅(活注册):subscribe();

创建方法,以简单打印字符串为例展开:

方法一:Create 

示例代码如下:

  1. public void create_one(View view){
  2. //创建观察者
  3. Observer<String> observer = new Observer<String>() {
  4. @Override
  5. public void onSubscribe(Disposable d) { //最先回调,没有执行onNext、onComplete、onError也会回调
  6. // d.dispose();//移除订阅关系,执行该方法后,下面的onNext、onError、onComplete都不会执行。
  7. boolean disposed = d.isDisposed();//判断是否取消了订阅关系,为真就是没有订阅,假就是订阅中
  8. Log.d(TAG, "onSubscribe:" + d.toString()+";disposed值为:"+disposed);
  9. }
  10. @Override
  11. public void onNext(String s) {//被观察者调用onNext时,这里就会回调
  12. Log.d(TAG, "onNext:" + s);
  13. }
  14. @Override
  15. public void onError(Throwable e) {//发送错误时调用
  16. Log.d(TAG, "onError:" + e.getMessage());
  17. }
  18. @Override
  19. public void onComplete() {//数据接收完成时调用
  20. Log.d(TAG, "onComplete:");
  21. }
  22. };
  23. //创建被观察者
  24. Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
  25. @Override
  26. public void subscribe(ObservableEmitter<String> e) throws Exception {
  27. //e只有三个方法onNext、onError、onComplete
  28. e.onNext("Hello");//发送数据
  29. // e.onError(new Exception("error"));//发送出错
  30. e.onComplete();//发送完成。这个方法与OnError只能执行一个,谁在前就执行谁。手动调用这个方法,Observer的onComplete才会执行,onError同理。
  31. }
  32. });
  33. //订阅,管着观察者与被观察者
  34. observable.subscribe(observer);
  35. }
打印日志输出如下:

12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onSubscribe:null;disposed值为:false
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onNext:Hello
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onComplete:


方法二:Create

示例代码如下:

  1. public void create_two(View view){
  2. //创建被观察者,同方法一
  3. Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<String> e) throws Exception {
  6. //e只有三个方法onNext、onError、onComplete
  7. e.onNext("Hello 2");//发送数据
  8. e.onComplete();//发送完成。这个方法与OnError只能执行一个,谁在前就执行谁。手动调用这个方法,observable添加的Action才会执行。
  9. //e.onError(new Exception("error"));//发送出错,手动调用这个方法后,observable添加的Consumer<Throwable>才会执行。
  10. }
  11. });
  12. observable.subscribe(new Consumer<String>() {
  13. @Override
  14. public void accept(String s) throws Exception {
  15. Log.e(TAG, "accept:s值为:" + s);
  16. }
  17. }, new Consumer<Throwable>() {
  18. @Override
  19. public void accept(Throwable throwable) throws Exception {
  20. Log.e(TAG, "accept:throwable值为:" + throwable.getMessage());
  21. }
  22. }, new Action() {
  23. @Override
  24. public void run() throws Exception {
  25. Log.e(TAG, "run工作");
  26. }
  27. }, new Consumer<Disposable>() {
  28. @Override
  29. public void accept(Disposable disposable) throws Exception {
  30. Log.e(TAG,"disposable值为:"+disposable.isDisposed());
  31. // disposable.dispose();//移除订阅关系,执行该方法后,上面的Consumer<String>、Consumer<Throwable>、Action都不会执行。
  32. }
  33. });
  34. }

打印日志输出如下:

12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: disposable值为:false
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为:Hello 2
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: run工作

方法三:just

示例代码如下:

  1. /**
  2. * 方法三:just
  3. */
  4. public void just(View view){
  5. //生成被观察者
  6. Observable<String> observable = Observable.just("just1", "just2");
  7. //定义观察者,包含订阅
  8. observable.subscribe(new Consumer<String>() {
  9. @Override
  10. public void accept(String s) throws Exception {//这个accept就等于观察者的onNext
  11. Log.e(TAG, "accept:s值为" + s);
  12. }
  13. });
  14. }
执行后打印日志如下:
12-26 11:02:00.577 16781-16781/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为just1
12-26 11:02:00.578 16781-16781/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为just2

方法四:fromArray

示例代码如下:

  1. public void fromArray(View view){
  2. //生成被观察者
  3. Observable<String> observable = Observable.fromArray("from1", "from2", "from3");
  4. //定义观察者,包含订阅
  5. observable.subscribe(new Consumer<String>() {
  6. @Override
  7. public void accept(String s) throws Exception {
  8. Log.e(TAG, "accept:s值为" + s);
  9. }
  10. });
  11. }
打印日志如下:

12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from1
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from2
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from3


方法五:fromCallable

示例代码如下:

  1. public void fromCallable(View view){
  2. //生成被观察者
  3. Observable<String> observable = Observable.fromCallable(new Callable<String>() {
  4. @Override
  5. public String call() throws Exception {
  6. return "fromCallable";
  7. }
  8. });
  9. //定义观察者,包含订阅
  10. observable.subscribe(new Consumer<String>() {
  11. @Override
  12. public void accept(String s) throws Exception {
  13. Log.e(TAG, "accept:s值为" + s);
  14. }
  15. });
  16. }

打印日志如下:

12-26 11:15:21.169 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为fromCallable

方法列表如下:


三、调度器Scheduler与线程控制

调度器种类:

常用的是Schedulers.io()进行耗时操作、AndroidSchedulers.mainThread()更新ui.

1、Schedulers.immediate();

直接在当前线程运行,相当于不指定线程,默认的Scheduler.

2、Schedulers.newThread();

启动新现成,在新的线程中执行操作。

3、Schedulers.io();

I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler,行为模式和newThread()差不多,区别在于io()的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下,io()比newThread()更有效率。不要把计算工作放在io(),可以避免创建不必要的线程。

4、Schedulers.computation();

计算所使用的Scheduler。这个计算是指Cpu密集型计算,即不会被I/O等操作限制性的操作,例如图形的计算。这个Sheduler使用的是固定的线程池,大小为Cpu核数。不要把I/O放在computation中,否则I/O操作等待时间会浪费cpu。用于计算任务,如事件循环和回调处理,不要用于IO操作,默认线程数等于处理器的数量。

5、Schedulsers.from(executor)

使用指定的Executor作为调度器。

6. Schedulers.trampoline()
当其它排队的任务完成后,在当前线程排队开始执行

7. AndroidSchedulers.mainThread()
在RxAndroid中,他指定操作将在Android主线程中执行。

指定线程:

1、observerOn(Schedulers)

指定观察者Observer在哪个线程执行

2、subscribeOn(Scheduler)

指定被观察者Observable在哪个线程执行

线程多次随意切换:
observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

示例代码如下:

  1. public void schedulers(View view){
  2. Observable.just(1,2,2)//创建被观察者
  3. .subscribeOn(Schedulers.io())//指定被观察者运行在io线程
  4. .observeOn(AndroidSchedulers.mainThread())//指定下面的观察者运行在主线程中
  5. .subscribe(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer integer) throws Exception {
  8. Log.e(TAG,"integer的值为:"+integer.intValue());
  9. }
  10. });
  11. }

代码运行结果如下:

12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:1
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:2
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:2

除了将这些调度器传递给RxJava的Observable操作符,你也可以用它们调度你自己的任务。例如:Scheduler.Worker

连接地址:http://wiki.jikexueyuan.com/project/rx-docs/Scheduler.html

四、操作符,比较复杂也非常强大

操作符理解为可以控制流程的方法。

1、操作符的分类


2、变换操作符

变换操作符是用来变换类型的。

种类如下:


map操作符,后面的Function有两个泛型参数,第一个是输入类型,第二个是转换后输出返回的类型,例如下面的示例程序,apply()方法要返回的是第二个参数的类型。

  1. class User{
  2. private String name;
  3. private String password;
  4. public User(String name, String password) {
  5. this.name = name;
  6. this.password = password;
  7. }
  8. public String getName() {
  9. return name;
  10. }
  11. public void setName(String name) {
  12. this.name = name;
  13. }
  14. public String getPassword() {
  15. return password;
  16. }
  17. public void setPassword(String password) {
  18. this.password = password;
  19. }
  20. }
  21. //模拟网络登录
  22. private boolean login(User user){
  23. if(user.getName().equals("liming")&&user.getPassword().equals("123")){
  24. return true;
  25. }
  26. return false;
  27. }
  28. /**
  29. * 操作符map
  30. */
  31. public void operator_map(View view){
  32. User user=new User("liming","123");
  33. Observable.just(user)
  34. .map(new Function<User, Boolean>() {//操作符map将User转换为需要的结果boolean
  35. @Override
  36. public Boolean apply(User user) throws Exception {
  37. return login(user);//进行网络登录
  38. }
  39. }).subscribeOn(Schedulers.io())//网络登录需要在io中操作
  40. .observeOn(AndroidSchedulers.mainThread())//更新ui,需要在主线程中操作
  41. .subscribe(new Consumer<Boolean>() {
  42. @Override
  43. public void accept(Boolean isLogin) throws Exception {
  44. Log.e(TAG,"登录是否成功:"+isLogin);
  45. }
  46. });
  47. }

打印日志输出如下:

12-26 15:17:11.509 14096-14096/demo.face.comi.io.rxjavademo E/MainActivity: 登录是否成功:true

flatMap操作符:

比Map要强大,可以做到map不能操作的事,可以递级处理数据,然后传递数据,例如下面的例子,先用户登录,然后获取用户特征值,示例代码如下:(注意所使用User类与login方法同上)

  1. /**
  2. * 模拟根据用户是否登录成功,返回用户的轮廓
  3. */
  4. private UserProfile profile(boolean b){
  5. if(b){
  6. return new UserProfile("漂亮");
  7. }else{
  8. return new UserProfile("好丑");
  9. }
  10. }
  11. /**
  12. * 操作符-flatmap
  13. * @param view
  14. */
  15. public void operator_flatmap(View view){
  16. User user=new User("xiaobao","123456");
  17. Observable.just(user).flatMap(new Function<User, ObservableSource<Boolean>>() {
  18. @Override
  19. public ObservableSource<Boolean> apply(User user) throws Exception {
  20. return Observable.just(login(user));//用户登录
  21. }
  22. }).flatMap(new Function<Boolean, ObservableSource<UserProfile>>() {
  23. @Override
  24. public ObservableSource<UserProfile> apply(Boolean aBoolean) throws Exception {
  25. return Observable.just(profile(aBoolean));//获取用户特征
  26. }
  27. }).subscribeOn(Schedulers.io())//控制被监控对象在io线程中
  28. .observeOn(AndroidSchedulers.mainThread())//监控在主线程中
  29. .subscribe(new Consumer<UserProfile>() {
  30. @Override
  31. public void accept(UserProfile userProfile) throws Exception {//返回用户特征
  32. //相当于onNext
  33. Log.e(TAG, "userProfile值为:" + userProfile.profile);
  34. }
  35. }, new Consumer<Throwable>() {
  36. @Override
  37. public void accept(Throwable throwable) throws Exception {
  38. //onError
  39. }
  40. }, new Action() {
  41. @Override
  42. public void run() throws Exception {
  43. //onComplete
  44. }
  45. });
  46. }
打印日志如下:

12-26 15:57:28.089 19512-19512/demo.face.comi.io.rxjavademo E/MainActivity: userProfile值为:好丑
buffer操作符:

缓存发射的数据为一个list,到观察者的时候参数就是一个list。

示例代码如下:

  1. public void operator_buffer(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. for(int i=0;i<10;i++){
  6. e.onNext(i+"");
  7. }
  8. e.onComplete();
  9. }
  10. }).buffer(500, TimeUnit.MILLISECONDS)//缓冲500毫秒内发射的数据
  11. .subscribe(new Consumer<List<String>>() {
  12. @Override
  13. public void accept(List<String> strings) throws Exception {
  14. //这里strings为list,就是缓冲了500毫秒内发射的String
  15. Log.e(TAG,"accept的数据大小为:"+strings.size()+";数据为:"+strings.toString());
  16. }
  17. });
  18. }
打印日志如下:
12-26 16:21:27.925 22722-22722/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据大小为:10;数据为:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

3、过滤操作符

debounce操作符:

被观察者连续发射的数据的时间间隔 如果在指定时间 就被过滤拦截。

示例如下:

  1. public void operator_debounce(View view){
  2. Observable.create(new ObservableOnSubscribe<Integer>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  5. if(e.isDisposed()) return;
  6. try{
  7. for(int i=1;i<10;i++){//发生数据间隔时间分别为100、200。。。。。1000毫秒
  8. e.onNext(i);//发射数据
  9. Thread.sleep(100*i);
  10. }
  11. }catch (Exception error){
  12. e.onError(error);
  13. }
  14. }
  15. }).subscribeOn(Schedulers.computation())
  16. .debounce(500,TimeUnit.MILLISECONDS)//如果发射数据间隔小于500就被过滤拦截掉
  17. .subscribe(new Consumer<Integer>() {
  18. @Override
  19. public void accept(Integer integer) throws Exception {
  20. Log.e(TAG, "accept的数据为:" + integer);
  21. }
  22. }, new Consumer<Throwable>() {
  23. @Override
  24. public void accept(Throwable throwable) throws Exception {
  25. Log.e(TAG,"出错:"+throwable.toString());
  26. }
  27. });
  28. }
打印日志如下:
12-26 16:37:23.194 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:6
12-26 16:37:23.795 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:7
12-26 16:37:24.494 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:8
12-26 16:37:25.295 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:9

filter操作符:

过滤数据,返回真即使满足条件,不拦截,否者拦截,观察者接收不到。

示例代码如下:

  1. public void operator_filter(View view){
  2. Observable.create(new ObservableOnSubscribe<Integer>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  5. if(e.isDisposed()) return;
  6. try{
  7. for(int i=1;i<10;i++){//发生数据间隔时间分别为100、200。。。。。1000毫秒
  8. e.onNext(i);//发射数据
  9. Thread.sleep(100*i);
  10. }
  11. e.onComplete();
  12. }catch (Exception error){
  13. e.onError(error);
  14. }
  15. }
  16. }).subscribeOn(Schedulers.computation())
  17. .debounce(500,TimeUnit.MILLISECONDS)
  18. .filter(new Predicate<Integer>() {//在debounce的基础上加过滤,必须大于6
  19. @Override
  20. public boolean test(Integer integer) throws Exception {
  21. return integer>6;
  22. }
  23. })
  24. .subscribe(new Consumer<Integer>() {
  25. @Override
  26. public void accept(Integer integer) throws Exception {
  27. Log.e(TAG, "accept的数据为:" + integer);
  28. }
  29. }, new Consumer<Throwable>() {
  30. @Override
  31. public void accept(Throwable throwable) throws Exception {
  32. Log.e(TAG,"出错:"+throwable.toString());
  33. }
  34. });
  35. }
打印日志如下:

12-26 16:45:25.356 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:7
12-26 16:45:26.056 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:8
12-26 16:45:26.856 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:9
take操作符

示例代码如下:

  1. public void operator_take(View view){
  2. Observable.just(1,2,3,4,5,6,7)
  3. .take(3)//发射前3个数据
  4. .subscribe(new Observer<Integer>() {
  5. @Override
  6. public void onSubscribe(Disposable d) {
  7. }
  8. @Override
  9. public void onNext(Integer integer){
  10. Log.e(TAG,"onNext的数据为:"+integer);
  11. }
  12. @Override
  13. public void onError(Throwable e) {
  14. }
  15. @Override
  16. public void onComplete() {
  17. }
  18. });
  19. }

打印日志如下:

12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:1
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:2
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:3

过滤操作符实际使用案例:防止点击按钮连续点击、搜索引擎过滤,避免搜索字段的变化连续请求网络。以下为按钮防连续点击为例,这个功能需要集成RxBinding库。

示例代码如下:

  1. RxView.clicks(btn_operator_filterLivingExample)
  2. .throttleFirst(2,TimeUnit.SECONDS)//2秒内的点击只拿第一个,他的都过滤调
  3. .subscribe(new Consumer<Object>() {
  4. @Override
  5. public void accept(Object o) throws Exception {
  6. Log.e(TAG,"被点击了。。。");
  7. }
  8. });

4、组合操作符


startWith操作符

举例如下:在abc之前插入一个d,所以d是最先发出去的数据。

  1. public void operator_startWith(View view){
  2. Observable.just("a","b","c")
  3. .startWith("d")
  4. .subscribe(new Consumer<String>() {
  5. @Override
  6. public void accept(String s) throws Exception {
  7. Log.e(TAG,"accept:"+s);
  8. }
  9. });
  10. }

打印日志如下:

12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:d
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:a
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:b
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:c

merge操作符:

合并观察者

示例代码如下:

  1. public void operator_merge(View view){
  2. Observable<String> o1 = Observable.just("a","b","c");
  3. Observable<String> o2 = Observable.just("d","e","f");
  4. Observable.merge(o1,o2).subscribe(new Consumer<String>() {
  5. @Override
  6. public void accept(String s) throws Exception {
  7. Log.e(TAG, "accept: "+s);
  8. }
  9. });
  10. }
打印日志如下:

12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: a
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: d
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: e
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: f
combineLatest操作符

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

示例代码如下:

  1. public void operator_combineLatest(View view){
  2. Observable<String> o1 = Observable.just("a","b","c");
  3. Observable<String> o2 = Observable.just("d","e","f");
  4. //第一个String就是o1 最后的数据,第二个String是o2的每个数据源,第三个String是结合之后返回的类型
  5. Observable.combineLatest(o1, o2, new BiFunction<String, String, String>() {
  6. @Override
  7. public String apply(String s, String s2) throws Exception {
  8. //s就是o1 最后的数据,s2 就是o2的每个数据源
  9. return s+s2;
  10. }
  11. }).subscribe(new Consumer<String>() {
  12. @Override
  13. public void accept(String s) throws Exception {
  14. Log.e(TAG, "accept: "+s);
  15. }
  16. });
  17. }
打印日志如下:
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: cd
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: ce
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: cf

组合操作符实例:

利用combineLatest实现注册的时候所有输入信息(姓名、邮箱、年龄等)合法后才点亮注册按钮,示例代码如下:

  1. public void operator_combineDemo(View view){
  2. //skip过滤操作符,这里是过滤掉第一个字符的意思,也就是第一个字符不算数。
  3. Observable<CharSequence> observableName = RxTextView.textChanges(etName).skip(1);
  4. Observable<CharSequence> observableEmail = RxTextView.textChanges(etEmail).skip(1);
  5. Observable<CharSequence> observableAge= RxTextView.textChanges(etAge).skip(1);
  6. Observable.combineLatest(observableName, observableEmail, observableAge, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
  7. @Override
  8. public Boolean apply(CharSequence name, CharSequence email, CharSequence age) throws Exception {
  9. //当输入框name长度大于3,email长度大于5,age长度大于0,这些长度都是在过滤掉第一个字符的前提下。只有这些条件都满足下才返回true
  10. return name.length()>3&&email.length()>5&&age.length()>0;
  11. }
  12. }).subscribe(new Consumer<Boolean>() {
  13. @Override
  14. public void accept(Boolean aBoolean) throws Exception {
  15. if(aBoolean){//返回true才能让注册按钮可以被点击
  16. Log.e(TAG,"验证通过,按钮可以点击了");
  17. }
  18. }
  19. });
  20. }

5、错误处理操作符


onErrorReturn操作符

让Observable遇到错误时发射一个特殊的项并且正常终止,onErrorRetrun能够捕获在它之前发生的异常,它之后流中的操作发生的异常就它就不会管了。

  1. public void operator_onErrorReturn(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. for(int i=0;i<=4;i++){
  6. if(i==2){
  7. e.onError(new Throwable("出现错误了"));
  8. }else{
  9. e.onNext(i+"");
  10. }
  11. try{
  12. Thread.sleep(1000);
  13. }catch (Exception ex){
  14. ex.printStackTrace();
  15. }
  16. }
  17. e.onComplete();
  18. }
  19. }).subscribeOn(Schedulers.newThread())
  20. .onErrorReturn(new Function<Throwable, String>() {
  21. @Override
  22. public String apply(Throwable throwable) throws Exception {
  23. Log.e(TAG, "在onErrorReturn处理了: "+throwable.toString() );
  24. return "10";
  25. }
  26. }).subscribe(new Consumer<String>() {
  27. @Override
  28. public void accept(String s) throws Exception {
  29. Log.e(TAG, "收到消息: " + s);
  30. }
  31. }, new Consumer<Throwable>() {
  32. @Override
  33. public void accept(Throwable throwable) throws Exception {
  34. Log.e(TAG, "结果错误: " + throwable.toString());
  35. }
  36. });
  37. }

打印日志如下:

12-27 13:51:53.634 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 13:51:54.634 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 13:51:55.635 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 在onErrorReturn处理了: java.lang.Throwable: 出现错误了
12-27 13:51:55.635 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 10

onErrorRsumeNext操作符

和onErrorNext不同的是,onErrorResumeNext是返回一个重新定义的Observable,onErrorNext返回的是发射的数据。

注意onErrorResumeNext拦截的错误是Throwable,不能拦截Exception。 不然它会将错误传递给观察者的onError方法。要拦截Exception请用onExceptionResumeNext。

示例代码如下:

  1. public void operator_onErrorResumeNext(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. for(int i = 0; i<= 3 ;i++){
  6. if(i == 2){
  7. //这里是Throwable
  8. e.onError(new Throwable("出现错误了"));
  9. }else{
  10. e.onNext(i+"");
  11. }
  12. try{
  13. Thread.sleep(1000);
  14. }catch (Exception ex){
  15. ex.printStackTrace();
  16. }
  17. }
  18. e.onComplete();
  19. }
  20. })
  21. .subscribeOn(Schedulers.newThread())
  22. .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {
  23. @Override
  24. public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {
  25. //拦截到错误之后,重新定义了被观察者
  26. return Observable.just("重新定义了被观察者");
  27. }
  28. })
  29. .subscribe(new Consumer<String>() {
  30. @Override
  31. public void accept(String s) throws Exception {
  32. Log.e(TAG, "收到消息: " + s);
  33. }
  34. }, new Consumer<Throwable>() {
  35. @Override
  36. public void accept(Throwable throwable) throws Exception {
  37. Log.e(TAG, "结果错误: " + throwable.toString());
  38. }
  39. });
  40. }

打印日志如下:

12-27 13:58:32.695 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 13:58:33.695 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 13:58:34.697 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 重新定义了被观察者

onExceptionResumeNext操作符

onExceptionResumeNext 和 onErrorResumeNext基本一样,也是收到错误重新定义了新的被观察者。但是有一点不用: 如果onErrorResumeNext收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,onExceptionResumeNext则会继续拦截。

注意onExceptionResumeNext拦截的错误是Exception,不能拦截Throwable。 不然它会将错误传递给观察者的onError方法。要拦截Throwable请用onErrorResumeNext。

示例代码如下:

  1. public void operator_onExceptionResumeNext(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. for(int i = 0; i<= 3 ;i++){
  6. if(i == 2){
  7. //注意这里是Exception
  8. e.onError(new Exception("出现错误了"));
  9. }else{
  10. e.onNext(i+"");
  11. }
  12. try{
  13. Thread.sleep(1000);
  14. }catch (Exception ex){
  15. ex.printStackTrace();
  16. }
  17. }
  18. e.onComplete();
  19. }
  20. })
  21. .subscribeOn(Schedulers.newThread())
  22. .onExceptionResumeNext(new Observable<String>() {
  23. @Override
  24. protected void subscribeActual(Observer<? super String> observer) {
  25. observer.onNext("错误替换的消息");
  26. observer.onComplete();
  27. }
  28. })
  29. .subscribe(new Consumer<String>() {
  30. @Override
  31. public void accept(String s) throws Exception {
  32. Log.e(TAG, "收到消息: " + s);
  33. }
  34. }, new Consumer<Throwable>() {
  35. @Override
  36. public void accept(Throwable throwable) throws Exception {
  37. Log.e(TAG, "结果错误: " + throwable.toString());
  38. }
  39. });
  40. }
输出结果如下:

12-27 14:10:10.213 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:10:11.213 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:10:12.215 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 错误替换的消息

retry操作符

重试的意思,拦截到错误,然后让 被观察者重新发射数据。Throwable和Exception都额可以拦截

它有五种参数方法: 
retry(): 让被观察者重新发射数据,要是一直错误就一直发送了 
retry(BiPredicate): interger是第几次重新发送,Throwable是错误的内容 
retry(long time): 最多让被观察者重新发射数据多少次 
retry(long time,Predicate predicate): 最多让被观察者重新发射数据多少次,在predicate里面进行判断拦截 返回是否继续 
retry(Predicate predicate): 在predicate里面进行判断拦截 返回是否继续

示例代码如下:
  1. public void operator_retry(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. for(int i = 0; i<= 3 ;i++){
  6. if(i == 2){
  7. e.onError(new Exception("出现错误了"));
  8. }else{
  9. e.onNext(i+"");
  10. }
  11. try{
  12. Thread.sleep(1000);
  13. }catch (Exception ex){
  14. ex.printStackTrace();
  15. }
  16. }
  17. e.onComplete();
  18. }
  19. }).subscribeOn(Schedulers.newThread())
  20. .retry(new Predicate<Throwable>() {
  21. @Override
  22. public boolean test(Throwable throwable) throws Exception {
  23. Log.e(TAG, "retry错误: "+throwable.toString());
  24. //返回假就是不让重新发射数据了,调用观察者的onError就终止了。
  25. //返回真就是让被观察者重新发射请求
  26. return true;
  27. }
  28. })
  29. // .retry(new BiPredicate<Integer, Throwable>() {
  30. // @Override
  31. // public boolean test(Integer integer, Throwable throwable) throws Exception {
  32. // Log.e(TAG, "retry错误: "+integer+" "+throwable.toString());
  33. //
  34. // //返回假就是不让重新发射数据了,调用观察者的onError就终止了。
  35. // //返回真就是让被观察者重新发射请求
  36. // return true;
  37. // }
  38. // })
  39. // .retry(3)//最多让被观察者重新发射数据3次
  40. .subscribe(new Consumer<String>() {
  41. @Override
  42. public void accept(String s) throws Exception {
  43. Log.e(TAG, "收到消息: " + s);
  44. }
  45. }, new Consumer<Throwable>() {
  46. @Override
  47. public void accept(Throwable throwable) throws Exception {
  48. Log.e(TAG, "结果错误: " + throwable.toString());
  49. }
  50. });
  51. }
打印日志如下:

12-27 14:27:06.456 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:07.457 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:08.457 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
12-27 14:27:08.459 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:09.460 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:10.460 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
12-27 14:27:10.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:11.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:12.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了

......

......

一直这样循环下去

retryWhen操作符

retryWhen和retry类似,区别是:retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。

这里如果里面的throwableObservable不进行处理,那么onNext也会拦截处理,这里有个坑。

示例代码如下

  1. public void operator_retryWhen(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. for (int i = 0; i <= 3; i++) {
  6. if (i == 2) {
  7. e.onError(new Exception("出现错误了"));
  8. } else {
  9. e.onNext(i + "");
  10. }
  11. try {
  12. Thread.sleep(1000);
  13. } catch (Exception ex) {
  14. ex.printStackTrace();
  15. }
  16. }
  17. e.onComplete();
  18. }
  19. })
  20. .subscribeOn(Schedulers.newThread())
  21. .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
  22. @Override
  23. public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
  24. //这里可以发送新的被观察者 Observable
  25. return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
  26. @Override
  27. public ObservableSource<?> apply(Throwable throwable) throws Exception {
  28. //如果发射的onError就终止
  29. return Observable.error(new Throwable("retryWhen终止啦"));
  30. }
  31. });
  32. }
  33. })
  34. .subscribe(new Consumer<String>() {
  35. @Override
  36. public void accept(String s) throws Exception {
  37. Log.e(TAG, "收到消息: " + s);
  38. }
  39. }, new Consumer<Throwable>() {
  40. @Override
  41. public void accept(Throwable throwable) throws Exception {
  42. Log.e(TAG, "结果错误: " + throwable.toString());
  43. }
  44. });
  45. }

日志输出如下:
12-27 14:45:55.551 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:45:56.551 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:45:57.554 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 结果错误: java.lang.Throwable: retryWhen终止啦

6、辅助操作符



delay操作符

整体延迟一段指定的时间再发射来自Observable的发射物。就是延迟。

它有6种方法参数:

  • delay(Function):
  • delay(long delay,TimeUnit unit): 指定延迟多长时间
  • delay(long delay,TimeUnit unit,mScheduler scheduler): 指定延迟多长时间并添加调度器
  • delay(long delay,TimeUnit unit,boolean delayError): 指定延迟多长时间。delayError参数如果为假 就直接抛出onError,为真就如常延迟执行。
  • delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
  • delay(ObservableSource ,Function):
示例如下:

  1. public void operator_delay(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. for(int i=0; i<=3 ;i++){
  6. e.onNext(i+"");
  7. }
  8. try{
  9. Thread.sleep(1000);
  10. }catch (Exception ex){
  11. ex.printStackTrace();
  12. }
  13. e.onComplete();
  14. }
  15. })
  16. .delay(3000, TimeUnit.MILLISECONDS)
  17. //delayError参数如果为假就直接抛出onError,为真就如常延迟执行
  18. // .delay(3000,TimeUnit.MILLISECONDS,true)
  19. .subscribe(new Consumer<String>() {
  20. @Override
  21. public void accept(String s) throws Exception {
  22. Log.e(TAG, "收到消息: " + s);
  23. }
  24. }, new Consumer<Throwable>() {
  25. @Override
  26. public void accept(Throwable throwable) throws Exception {
  27. Log.e(TAG, "收到错误: " + throwable.toString());
  28. }
  29. });
  30. }

隔了三秒输出如下日志:
12-27 15:28:10.560 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 2
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 3

delaySubscription操作符

该操作符跟delay的差别就是:delaySubscription只做一件事,延迟订阅。

do操作符

do操作符有很多个,就相当于生命周期。例如doOnNext在onNext的时候回调。

例如下面的例子:

  1. public void operator_do(View view){
  2. Observable.just("1","2")
  3. .doOnNext(new Consumer<String>() {
  4. @Override
  5. public void accept(String s) throws Exception {
  6. Log.e(TAG,"doOnNext:"+s);
  7. }
  8. })
  9. .doAfterNext(new Consumer<String>() {
  10. @Override
  11. public void accept(String s) throws Exception {
  12. Log.e(TAG,"doAfterNext:"+s);
  13. }
  14. })
  15. .doOnComplete(new Action() {
  16. @Override
  17. public void run() throws Exception {
  18. Log.e(TAG,"doOnComplete:");
  19. }
  20. })
  21. .doOnSubscribe(new Consumer<Disposable>() {//订阅之后回调的方法
  22. @Override
  23. public void accept(Disposable disposable) throws Exception {
  24. Log.e(TAG,"doOnSubscribe");
  25. }
  26. })
  27. .doAfterTerminate(new Action() {
  28. @Override
  29. public void run() throws Exception {
  30. Log.e(TAG,"doAfterTerminate");
  31. }
  32. })
  33. .doFinally(new Action() {
  34. @Override
  35. public void run() throws Exception {
  36. Log.e(TAG,"doFinally:");
  37. }
  38. })
  39. //Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
  40. .doOnEach(new Consumer<Notification<String>>() {
  41. @Override
  42. public void accept(Notification<String> stringNotification) throws Exception {
  43. Log.e(TAG, "doOnEach: "+(stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError"));
  44. }
  45. })
  46. //订阅后可以进行取消订阅
  47. .doOnLifecycle(new Consumer<Disposable>() {
  48. @Override
  49. public void accept(Disposable disposable) throws Exception {
  50. Log.e(TAG, "doOnLifecycle: "+disposable.isDisposed());
  51. //disposable.dispose();
  52. }
  53. }, new Action() {
  54. @Override
  55. public void run() throws Exception {
  56. Log.e(TAG, "doOnLifecycle run: ");
  57. }
  58. })
  59. .subscribe(new Consumer<String>() {
  60. @Override
  61. public void accept(String s) throws Exception {
  62. Log.e(TAG, "收到消息: " + s);
  63. }
  64. });
  65. }
执行结果如下:
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnSubscribe
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnLifecycle: false
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnNext:1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onNext
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterNext:1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnNext:2
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onNext
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 2
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterNext:2
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnComplete:
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onComplete
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doFinally:
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterTerminate
materialize操作符

materialize把 被观察者Observable转换为Notification通知对象。dematerialize相反了。 注意用了materialize之后,onNext会回调多了一个数据,因为onComplete也回调到这里了。

示例代码如下

  1. public void operator_materialize(View view){
  2. Observable.just("1","2")
  3. .materialize()
  4. .subscribe(new Consumer<Notification<String>>() {
  5. @Override
  6. public void accept(@NonNull Notification<String> stringNotification) throws Exception {
  7. //这时候的数据就是一个Notification对象了
  8. Log.e(TAG, (stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError")+": "+stringNotification.getValue());
  9. }
  10. });
  11. }
日志输出如下:

12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onNext: 1
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onNext: 2
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onComplete: null
TimeInterval操作符

获取数据发送的时间间隔,就是把数据转换为数据发送的间隔Timed。

有4个参数方法:

  • timeInterval(): 转换为时间Timed,默认时间单位为毫秒
  • timeInterval(Scheduler): 转换为时间Timed,可以设置调度器
  • timeInterval(TimeUnit): 转换为时间Timed,可以设置时间单位
  • timeInterval(TimeUnit,Scheduler): 转换为时间Timed,可以设置时间单位和调度器
例子如下:
  1. public void operator_timeinterval(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. for(int i=0; i<3; i++){
  6. e.onNext(i+"");
  7. Thread.sleep(1000);
  8. }
  9. e.onComplete();
  10. }
  11. })
  12. .subscribeOn(Schedulers.newThread())
  13. .timeInterval()
  14. .subscribe(new Consumer<Timed<String>>() {
  15. @Override
  16. public void accept(@NonNull Timed<String> stringTimed) throws Exception {
  17. Log.e(TAG, "accept: "+stringTimed.time());
  18. }
  19. });
  20. }
日志输出如下:
12-27 16:55:17.608 22853-22896/demo.face.comi.io.rxjavademo E/MainActivity: accept: 10
12-27 16:55:18.608 22853-22896/demo.face.comi.io.rxjavademo E/MainActivity: accept: 1001
12-27 16:55:19.608 22853-22896/demo.face.comi.io.rxjavademo E/MainActivity: accept: 1000
timestamp操作符
给发射的每个数据添加时间,转换了为Timed,和timeInterval的参数一致,但是timestamp获取到的time是时间戳,需要自己转换。
示例代码如下:
  1. public void operator_timestamp(View view){
  2. Observable.just("a","b")
  3. .timestamp()
  4. .subscribe(new Consumer<Timed<String>>() {
  5. @Override
  6. public void accept(@NonNull Timed<String> stringTimed) throws Exception {
  7. //转换时间
  8. String date = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss", Locale.CHINA)
  9. .format(new Date(stringTimed.time()));
  10. Log.e(TAG, "accept: "+date);
  11. }
  12. });
  13. }

日志输出如下:
12-27 16:59:40.259 23241-23241/demo.face.comi.io.rxjavademo E/MainActivity: accept: 2017-12-27 04:59:40
12-27 16:59:40.261 23241-23241/demo.face.comi.io.rxjavademo E/MainActivity: accept: 2017-12-27 04:59:40
7、条件操作符

amb/ambArray/ambWith操作符

给定多个Observable,只让第一个发射数据的Observable发射全部数据。

ambWith和ambArray差不多,Observable.amb(o1,o2)和o1.ambWith(o2)是一样的效果。

有两个参数方法: 
- amb(Iterable); 
- ambArray();

示例代码如下:
  1. public void operator_amb(View view){
  2. Observable o1 = Observable.just("a","b","c").delay(1000, TimeUnit.MILLISECONDS);
  3. Observable o2 = Observable.just("d","e","f");
  4. Observable.ambArray(o1,o2).subscribe(new Consumer<String>() {
  5. @Override
  6. public void accept(@NonNull String o) throws Exception {
  7. Log.e(TAG, "accept: "+o);
  8. }
  9. });
  10. }
输出结果如下:
12-27 17:09:38.495 24304-24304/demo.face.comi.io.rxjavademo E/MainActivity: accept: a
12-27 17:09:38.495 24304-24304/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-27 17:09:38.495 24304-24304/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
由于o1延迟发送,所以o2先发送,而又由于ambArray只会让第一个Observable发送全部数据,所以结果就是abc。
defaultIfEmpty操作符
被观察者没有onNext发送数据就调用了onComplete,就发射defaultlfEmpty里面的数据
示例代码如下:
  1. public void operator_defaultIfEmpty(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. e.onComplete();
  6. }
  7. }).defaultIfEmpty("默认数据")
  8. .subscribe(new Consumer<String>() {
  9. @Override
  10. public void accept(@NonNull String s) throws Exception {
  11. Log.e(TAG, "accept: "+s);
  12. }
  13. });
  14. }
输入日志如下:
12-27 17:13:44.928 24752-24752/demo.face.comi.io.rxjavademo E/MainActivity: accept: 默认数据

switchIfEmpty操作符

如果发射源没有发射数据就完成了,就发射switchIfEmpty里面新的Observable发射源
示例代码如下:
  1. public void operator_switchIfEmpty(View view){
  2. Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> e) throws Exception {
  5. e.onComplete();
  6. }
  7. }).switchIfEmpty(Observable.just("a","b","c"))
  8. .subscribe(new Consumer<String>() {
  9. @Override
  10. public void accept(@NonNull String s) throws Exception {
  11. Log.e(TAG, "accept: "+s);
  12. }
  13. });
  14. }
执行结果如下:
12-27 17:25:43.048 25581-25581/demo.face.comi.io.rxjavademo E/MainActivity: accept: a
12-27 17:25:43.048 25581-25581/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-27 17:25:43.048 25581-25581/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
skipUntil操作符
SkipUntil 订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable
  1. //丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
  2. public void operator_skipUntil(View view) {
  3. //skipUntil里面的Observable发射了之后,原始的Observable每隔一秒循环发射的数据才开始被接收到
  4. Observable.interval(1, TimeUnit.SECONDS)
  5. .skipUntil(Observable.just("1")).delay(5,TimeUnit.SECONDS)
  6. .subscribe(new Consumer<Long>() {
  7. @Override
  8. public void accept(Long aLong) throws Exception {
  9. Log.e(TAG, "accept:" + aLong);
  10. }
  11. });
  12. }

直到等待5秒后才开始输出如下日志:
12-27 17:42:21.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:0
12-27 17:42:22.107 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:1
12-27 17:42:23.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:2
12-27 17:42:24.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:3
12-27 17:42:25.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:4
12-27 17:42:26.107 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:5

.......
skipWhile操作符
和skipUntil不同的是,skipWhile可以判断,返回假才让数据发出去。
示例代码如下:
  1. public void operator_skipWhile(View view){
  2. Observable.interval(1, TimeUnit.SECONDS)
  3. .observeOn(Schedulers.newThread())
  4. .skipWhile(new Predicate<Long>() {
  5. @Override
  6. public boolean test(@NonNull Long aLong) throws Exception {
  7. return aLong < 5;
  8. //返回假,原始的Observable发射的数据才可以接收到
  9. }
  10. })
  11. .subscribe(new Consumer<Long>() {
  12. @Override
  13. public void accept(@NonNull Long aLong) throws Exception {
  14. Log.e(TAG, "accept: "+aLong);
  15. }
  16. });
  17. }
输出日志如下:
12-27 17:46:26.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 5
12-27 17:46:27.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 6
12-27 17:46:28.342 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 7
12-27 17:46:29.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 8
12-27 17:46:30.342 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 9

........

takeUntil() / takeWhile() / takeWhileWithIndex()操作符

和skilUntil、shikWhil相反,他们是开始先发射原始数据,到takeUntil的第二个Observable发射了一个数据或一个通知就不发射原来的数据了。

8、布尔操作符

9、算术和聚合操作符



代码下载地址:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/300696
推荐阅读
相关标签
  

闽ICP备14008679号