赞
踩
一、库的引入
GitHub下载地址:https://github.com/ReactiveX/RxJava
目前最新版本如图:
根据版本号,引入Rxjava库到我的项目里,如图:
同时为了更好的兼容Android,我也引入了Rxandroid,github地址如下:
https://github.com/ReactiveX/RxAndroid
这样,Rxjava与Rxandriod库的引入以告大吉,接下来就开始使用了。
二、基本使用方法
RxJava使用的是观察者模式。是由:
观察者:监视着被观察者,当被观察者发生变化时通知观察者,然后观察者执行相应的操作;
被观察者:被监视的对象,当某个状态改变时告诉观察者;
订阅(或注册、关联):将观察者与被观察者建立联系。
它三者的关系就好比一个Button的点击事件:
观察者:OnClickListener;
被观察者:Button;
订阅(或注册):setOnClickListener();
而将其对应到RxJava的对象为:
观察者:Observer;
被观察者:Observable;
订阅(活注册):subscribe();
创建方法,以简单打印字符串为例展开:
方法一:Create
示例代码如下:
- public void create_one(View view){
- //创建观察者
- Observer<String> observer = new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) { //最先回调,没有执行onNext、onComplete、onError也会回调
- // d.dispose();//移除订阅关系,执行该方法后,下面的onNext、onError、onComplete都不会执行。
- boolean disposed = d.isDisposed();//判断是否取消了订阅关系,为真就是没有订阅,假就是订阅中
- Log.d(TAG, "onSubscribe:" + d.toString()+";disposed值为:"+disposed);
- }
-
- @Override
- public void onNext(String s) {//被观察者调用onNext时,这里就会回调
- Log.d(TAG, "onNext:" + s);
- }
-
- @Override
- public void onError(Throwable e) {//发送错误时调用
- Log.d(TAG, "onError:" + e.getMessage());
- }
-
- @Override
- public void onComplete() {//数据接收完成时调用
- Log.d(TAG, "onComplete:");
- }
- };
-
- //创建被观察者
- Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- //e只有三个方法onNext、onError、onComplete
- e.onNext("Hello");//发送数据
- // e.onError(new Exception("error"));//发送出错
- e.onComplete();//发送完成。这个方法与OnError只能执行一个,谁在前就执行谁。手动调用这个方法,Observer的onComplete才会执行,onError同理。
- }
- });
-
- //订阅,管着观察者与被观察者
- observable.subscribe(observer);
- }
打印日志输出如下:
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onSubscribe:null;disposed值为:false
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onNext:Hello
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onComplete:
方法二:Create
示例代码如下:
- public void create_two(View view){
- //创建被观察者,同方法一
- Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- //e只有三个方法onNext、onError、onComplete
- e.onNext("Hello 2");//发送数据
- e.onComplete();//发送完成。这个方法与OnError只能执行一个,谁在前就执行谁。手动调用这个方法,observable添加的Action才会执行。
- //e.onError(new Exception("error"));//发送出错,手动调用这个方法后,observable添加的Consumer<Throwable>才会执行。
- }
- });
- observable.subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "accept:s值为:" + s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG, "accept:throwable值为:" + throwable.getMessage());
- }
- }, new Action() {
- @Override
- public void run() throws Exception {
- Log.e(TAG, "run工作");
- }
- }, new Consumer<Disposable>() {
- @Override
- public void accept(Disposable disposable) throws Exception {
- Log.e(TAG,"disposable值为:"+disposable.isDisposed());
- // disposable.dispose();//移除订阅关系,执行该方法后,上面的Consumer<String>、Consumer<Throwable>、Action都不会执行。
- }
- });
- }
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: disposable值为:false
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为:Hello 2
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: run工作
方法三:just
示例代码如下:
- /**
- * 方法三:just
- */
- public void just(View view){
- //生成被观察者
- Observable<String> observable = Observable.just("just1", "just2");
- //定义观察者,包含订阅
- observable.subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {//这个accept就等于观察者的onNext
- Log.e(TAG, "accept:s值为" + s);
- }
- });
- }
执行后打印日志如下:
方法四:fromArray
示例代码如下:
- public void fromArray(View view){
- //生成被观察者
- Observable<String> observable = Observable.fromArray("from1", "from2", "from3");
- //定义观察者,包含订阅
- observable.subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "accept:s值为" + s);
- }
- });
- }
打印日志如下:
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from1
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from2
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from3
方法五:fromCallable
示例代码如下:
- public void fromCallable(View view){
- //生成被观察者
- Observable<String> observable = Observable.fromCallable(new Callable<String>() {
- @Override
- public String call() throws Exception {
- return "fromCallable";
- }
- });
- //定义观察者,包含订阅
- observable.subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "accept:s值为" + s);
- }
- });
- }
12-26 11:15:21.169 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为fromCallable
方法列表如下:
三、调度器Scheduler与线程控制
调度器种类:
常用的是Schedulers.io()进行耗时操作、AndroidSchedulers.mainThread()更新ui.
1、Schedulers.immediate();
直接在当前线程运行,相当于不指定线程,默认的Scheduler.
2、Schedulers.newThread();
启动新现成,在新的线程中执行操作。
3、Schedulers.io();
I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler,行为模式和newThread()差不多,区别在于io()的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下,io()比newThread()更有效率。不要把计算工作放在io(),可以避免创建不必要的线程。
4、Schedulers.computation();
计算所使用的Scheduler。这个计算是指Cpu密集型计算,即不会被I/O等操作限制性的操作,例如图形的计算。这个Sheduler使用的是固定的线程池,大小为Cpu核数。不要把I/O放在computation中,否则I/O操作等待时间会浪费cpu。用于计算任务,如事件循环和回调处理,不要用于IO操作,默认线程数等于处理器的数量。
5、Schedulsers.from(executor)
使用指定的Executor作为调度器。
6. Schedulers.trampoline()
当其它排队的任务完成后,在当前线程排队开始执行
7. AndroidSchedulers.mainThread()
在RxAndroid中,他指定操作将在Android主线程中执行。
指定线程:
1、observerOn(Schedulers)
指定观察者Observer在哪个线程执行
2、subscribeOn(Scheduler)
指定被观察者Observable在哪个线程执行
线程多次随意切换:
observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
示例代码如下:
- public void schedulers(View view){
- Observable.just(1,2,2)//创建被观察者
- .subscribeOn(Schedulers.io())//指定被观察者运行在io线程
- .observeOn(AndroidSchedulers.mainThread())//指定下面的观察者运行在主线程中
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.e(TAG,"integer的值为:"+integer.intValue());
- }
- });
- }
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:1
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:2
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:2
除了将这些调度器传递给RxJava的Observable操作符,你也可以用它们调度你自己的任务。例如:Scheduler.Worker
连接地址:http://wiki.jikexueyuan.com/project/rx-docs/Scheduler.html
四、操作符,比较复杂也非常强大
操作符理解为可以控制流程的方法。
1、操作符的分类
2、变换操作符
变换操作符是用来变换类型的。
种类如下:
map操作符,后面的Function有两个泛型参数,第一个是输入类型,第二个是转换后输出返回的类型,例如下面的示例程序,apply()方法要返回的是第二个参数的类型。
- class User{
- private String name;
- private String password;
-
- public User(String name, String password) {
- this.name = name;
- this.password = password;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
- }
-
- //模拟网络登录
- private boolean login(User user){
- if(user.getName().equals("liming")&&user.getPassword().equals("123")){
- return true;
- }
- return false;
- }
- /**
- * 操作符map
- */
- public void operator_map(View view){
- User user=new User("liming","123");
- Observable.just(user)
- .map(new Function<User, Boolean>() {//操作符map将User转换为需要的结果boolean
- @Override
- public Boolean apply(User user) throws Exception {
- return login(user);//进行网络登录
- }
- }).subscribeOn(Schedulers.io())//网络登录需要在io中操作
- .observeOn(AndroidSchedulers.mainThread())//更新ui,需要在主线程中操作
- .subscribe(new Consumer<Boolean>() {
- @Override
- public void accept(Boolean isLogin) throws Exception {
- Log.e(TAG,"登录是否成功:"+isLogin);
- }
- });
- }
打印日志输出如下:
12-26 15:17:11.509 14096-14096/demo.face.comi.io.rxjavademo E/MainActivity: 登录是否成功:true
比Map要强大,可以做到map不能操作的事,可以递级处理数据,然后传递数据,例如下面的例子,先用户登录,然后获取用户特征值,示例代码如下:(注意所使用User类与login方法同上)
- /**
- * 模拟根据用户是否登录成功,返回用户的轮廓
- */
- private UserProfile profile(boolean b){
- if(b){
- return new UserProfile("漂亮");
- }else{
- return new UserProfile("好丑");
- }
- }
-
- /**
- * 操作符-flatmap
- * @param view
- */
- public void operator_flatmap(View view){
- User user=new User("xiaobao","123456");
- Observable.just(user).flatMap(new Function<User, ObservableSource<Boolean>>() {
- @Override
- public ObservableSource<Boolean> apply(User user) throws Exception {
- return Observable.just(login(user));//用户登录
- }
- }).flatMap(new Function<Boolean, ObservableSource<UserProfile>>() {
- @Override
- public ObservableSource<UserProfile> apply(Boolean aBoolean) throws Exception {
- return Observable.just(profile(aBoolean));//获取用户特征
- }
- }).subscribeOn(Schedulers.io())//控制被监控对象在io线程中
- .observeOn(AndroidSchedulers.mainThread())//监控在主线程中
- .subscribe(new Consumer<UserProfile>() {
- @Override
- public void accept(UserProfile userProfile) throws Exception {//返回用户特征
- //相当于onNext
- Log.e(TAG, "userProfile值为:" + userProfile.profile);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- //onError
- }
- }, new Action() {
- @Override
- public void run() throws Exception {
- //onComplete
- }
- });
- }
打印日志如下:
12-26 15:57:28.089 19512-19512/demo.face.comi.io.rxjavademo E/MainActivity: userProfile值为:好丑
buffer操作符:
缓存发射的数据为一个list,到观察者的时候参数就是一个list。
示例代码如下:
- public void operator_buffer(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- for(int i=0;i<10;i++){
- e.onNext(i+"");
- }
- e.onComplete();
- }
- }).buffer(500, TimeUnit.MILLISECONDS)//缓冲500毫秒内发射的数据
- .subscribe(new Consumer<List<String>>() {
- @Override
- public void accept(List<String> strings) throws Exception {
- //这里strings为list,就是缓冲了500毫秒内发射的String
- Log.e(TAG,"accept的数据大小为:"+strings.size()+";数据为:"+strings.toString());
- }
- });
- }
打印日志如下:
3、过滤操作符
debounce操作符:
被观察者连续发射的数据的时间间隔 如果在指定时间 就被过滤拦截。
示例如下:
- public void operator_debounce(View view){
- Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter<Integer> e) throws Exception {
- if(e.isDisposed()) return;
- try{
- for(int i=1;i<10;i++){//发生数据间隔时间分别为100、200。。。。。1000毫秒
- e.onNext(i);//发射数据
- Thread.sleep(100*i);
- }
- }catch (Exception error){
- e.onError(error);
- }
- }
- }).subscribeOn(Schedulers.computation())
- .debounce(500,TimeUnit.MILLISECONDS)//如果发射数据间隔小于500就被过滤拦截掉
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.e(TAG, "accept的数据为:" + integer);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG,"出错:"+throwable.toString());
- }
- });
- }
打印日志如下:
filter操作符:
过滤数据,返回真即使满足条件,不拦截,否者拦截,观察者接收不到。
示例代码如下:
- public void operator_filter(View view){
- Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter<Integer> e) throws Exception {
- if(e.isDisposed()) return;
- try{
- for(int i=1;i<10;i++){//发生数据间隔时间分别为100、200。。。。。1000毫秒
- e.onNext(i);//发射数据
- Thread.sleep(100*i);
- }
- e.onComplete();
- }catch (Exception error){
- e.onError(error);
- }
- }
- }).subscribeOn(Schedulers.computation())
- .debounce(500,TimeUnit.MILLISECONDS)
- .filter(new Predicate<Integer>() {//在debounce的基础上加过滤,必须大于6
- @Override
- public boolean test(Integer integer) throws Exception {
- return integer>6;
- }
- })
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.e(TAG, "accept的数据为:" + integer);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG,"出错:"+throwable.toString());
- }
- });
- }
打印日志如下:
12-26 16:45:25.356 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:7
12-26 16:45:26.056 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:8
12-26 16:45:26.856 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:9
take操作符
示例代码如下:
- public void operator_take(View view){
- Observable.just(1,2,3,4,5,6,7)
- .take(3)//发射前3个数据
- .subscribe(new Observer<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
-
- @Override
- public void onNext(Integer integer){
- Log.e(TAG,"onNext的数据为:"+integer);
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
- }
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:1
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:2
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:3
过滤操作符实际使用案例:防止点击按钮连续点击、搜索引擎过滤,避免搜索字段的变化连续请求网络。以下为按钮防连续点击为例,这个功能需要集成RxBinding库。
示例代码如下:
- RxView.clicks(btn_operator_filterLivingExample)
- .throttleFirst(2,TimeUnit.SECONDS)//2秒内的点击只拿第一个,他的都过滤调
- .subscribe(new Consumer<Object>() {
- @Override
- public void accept(Object o) throws Exception {
- Log.e(TAG,"被点击了。。。");
- }
- });
4、组合操作符
startWith操作符
举例如下:在abc之前插入一个d,所以d是最先发出去的数据。
- public void operator_startWith(View view){
- Observable.just("a","b","c")
- .startWith("d")
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG,"accept:"+s);
- }
- });
- }
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:d
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:a
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:b
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:c
merge操作符:
合并观察者
示例代码如下:
- public void operator_merge(View view){
- Observable<String> o1 = Observable.just("a","b","c");
- Observable<String> o2 = Observable.just("d","e","f");
- Observable.merge(o1,o2).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "accept: "+s);
- }
- });
- }
打印日志如下:
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: a
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: d
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: e
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: f
combineLatest操作符
当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
示例代码如下:
- public void operator_combineLatest(View view){
- Observable<String> o1 = Observable.just("a","b","c");
- Observable<String> o2 = Observable.just("d","e","f");
-
- //第一个String就是o1 最后的数据,第二个String是o2的每个数据源,第三个String是结合之后返回的类型
- Observable.combineLatest(o1, o2, new BiFunction<String, String, String>() {
- @Override
- public String apply(String s, String s2) throws Exception {
- //s就是o1 最后的数据,s2 就是o2的每个数据源
- return s+s2;
- }
- }).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "accept: "+s);
- }
- });
- }
打印日志如下:
组合操作符实例:
利用combineLatest实现注册的时候所有输入信息(姓名、邮箱、年龄等)合法后才点亮注册按钮,示例代码如下:
- public void operator_combineDemo(View view){
- //skip过滤操作符,这里是过滤掉第一个字符的意思,也就是第一个字符不算数。
- Observable<CharSequence> observableName = RxTextView.textChanges(etName).skip(1);
- Observable<CharSequence> observableEmail = RxTextView.textChanges(etEmail).skip(1);
- Observable<CharSequence> observableAge= RxTextView.textChanges(etAge).skip(1);
- Observable.combineLatest(observableName, observableEmail, observableAge, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
- @Override
- public Boolean apply(CharSequence name, CharSequence email, CharSequence age) throws Exception {
- //当输入框name长度大于3,email长度大于5,age长度大于0,这些长度都是在过滤掉第一个字符的前提下。只有这些条件都满足下才返回true
- return name.length()>3&&email.length()>5&&age.length()>0;
- }
- }).subscribe(new Consumer<Boolean>() {
- @Override
- public void accept(Boolean aBoolean) throws Exception {
- if(aBoolean){//返回true才能让注册按钮可以被点击
- Log.e(TAG,"验证通过,按钮可以点击了");
- }
- }
- });
- }
onErrorReturn操作符
让Observable遇到错误时发射一个特殊的项并且正常终止,onErrorRetrun能够捕获在它之前发生的异常,它之后流中的操作发生的异常就它就不会管了。
- public void operator_onErrorReturn(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- for(int i=0;i<=4;i++){
- if(i==2){
- e.onError(new Throwable("出现错误了"));
- }else{
- e.onNext(i+"");
- }
- try{
- Thread.sleep(1000);
- }catch (Exception ex){
- ex.printStackTrace();
- }
- }
- e.onComplete();
- }
- }).subscribeOn(Schedulers.newThread())
- .onErrorReturn(new Function<Throwable, String>() {
- @Override
- public String apply(Throwable throwable) throws Exception {
- Log.e(TAG, "在onErrorReturn处理了: "+throwable.toString() );
- return "10";
- }
- }).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "收到消息: " + s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG, "结果错误: " + throwable.toString());
- }
- });
- }
12-27 13:51:53.634 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 13:51:54.634 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 13:51:55.635 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 在onErrorReturn处理了: java.lang.Throwable: 出现错误了
12-27 13:51:55.635 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 10
onErrorRsumeNext操作符
和onErrorNext不同的是,onErrorResumeNext是返回一个重新定义的Observable,onErrorNext返回的是发射的数据。
注意onErrorResumeNext拦截的错误是Throwable,不能拦截Exception。 不然它会将错误传递给观察者的onError方法。要拦截Exception请用onExceptionResumeNext。
示例代码如下:
- public void operator_onErrorResumeNext(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- for(int i = 0; i<= 3 ;i++){
- if(i == 2){
- //这里是Throwable
- e.onError(new Throwable("出现错误了"));
- }else{
- e.onNext(i+"");
- }
- try{
- Thread.sleep(1000);
- }catch (Exception ex){
- ex.printStackTrace();
- }
- }
-
- e.onComplete();
- }
- })
- .subscribeOn(Schedulers.newThread())
- .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {
- @Override
- public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {
- //拦截到错误之后,重新定义了被观察者
- return Observable.just("重新定义了被观察者");
- }
- })
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "收到消息: " + s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG, "结果错误: " + throwable.toString());
- }
- });
- }
12-27 13:58:32.695 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 13:58:33.695 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 13:58:34.697 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 重新定义了被观察者
onExceptionResumeNext操作符
onExceptionResumeNext 和 onErrorResumeNext基本一样,也是收到错误重新定义了新的被观察者。但是有一点不用: 如果onErrorResumeNext收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,onExceptionResumeNext则会继续拦截。
注意onExceptionResumeNext拦截的错误是Exception,不能拦截Throwable。 不然它会将错误传递给观察者的onError方法。要拦截Throwable请用onErrorResumeNext。
示例代码如下:
- public void operator_onExceptionResumeNext(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- for(int i = 0; i<= 3 ;i++){
- if(i == 2){
- //注意这里是Exception
- e.onError(new Exception("出现错误了"));
- }else{
- e.onNext(i+"");
- }
- try{
- Thread.sleep(1000);
- }catch (Exception ex){
- ex.printStackTrace();
- }
- }
- e.onComplete();
- }
- })
- .subscribeOn(Schedulers.newThread())
- .onExceptionResumeNext(new Observable<String>() {
- @Override
- protected void subscribeActual(Observer<? super String> observer) {
- observer.onNext("错误替换的消息");
- observer.onComplete();
- }
- })
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "收到消息: " + s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG, "结果错误: " + throwable.toString());
- }
- });
- }
输出结果如下:
12-27 14:10:10.213 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:10:11.213 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:10:12.215 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 错误替换的消息
retry操作符
重试的意思,拦截到错误,然后让 被观察者重新发射数据。Throwable和Exception都额可以拦截
它有五种参数方法:
- retry(): 让被观察者重新发射数据,要是一直错误就一直发送了
- retry(BiPredicate): interger是第几次重新发送,Throwable是错误的内容
- retry(long time): 最多让被观察者重新发射数据多少次
- retry(long time,Predicate predicate): 最多让被观察者重新发射数据多少次,在predicate里面进行判断拦截 返回是否继续
- retry(Predicate predicate): 在predicate里面进行判断拦截 返回是否继续
- public void operator_retry(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- for(int i = 0; i<= 3 ;i++){
- if(i == 2){
- e.onError(new Exception("出现错误了"));
- }else{
- e.onNext(i+"");
- }
- try{
- Thread.sleep(1000);
- }catch (Exception ex){
- ex.printStackTrace();
- }
- }
- e.onComplete();
- }
- }).subscribeOn(Schedulers.newThread())
- .retry(new Predicate<Throwable>() {
- @Override
- public boolean test(Throwable throwable) throws Exception {
- Log.e(TAG, "retry错误: "+throwable.toString());
-
- //返回假就是不让重新发射数据了,调用观察者的onError就终止了。
- //返回真就是让被观察者重新发射请求
- return true;
- }
- })
- // .retry(new BiPredicate<Integer, Throwable>() {
- // @Override
- // public boolean test(Integer integer, Throwable throwable) throws Exception {
- // Log.e(TAG, "retry错误: "+integer+" "+throwable.toString());
- //
- // //返回假就是不让重新发射数据了,调用观察者的onError就终止了。
- // //返回真就是让被观察者重新发射请求
- // return true;
- // }
- // })
- // .retry(3)//最多让被观察者重新发射数据3次
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "收到消息: " + s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG, "结果错误: " + throwable.toString());
- }
- });
- }
打印日志如下:
12-27 14:27:06.456 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:07.457 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:08.457 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
12-27 14:27:08.459 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:09.460 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:10.460 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
12-27 14:27:10.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:11.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:12.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
......
......
一直这样循环下去
retryWhen操作符
retryWhen和retry类似,区别是:retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。
这里如果里面的throwableObservable不进行处理,那么onNext也会拦截处理,这里有个坑。
示例代码如下
- public void operator_retryWhen(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- for (int i = 0; i <= 3; i++) {
- if (i == 2) {
- e.onError(new Exception("出现错误了"));
- } else {
- e.onNext(i + "");
- }
- try {
- Thread.sleep(1000);
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- e.onComplete();
- }
- })
- .subscribeOn(Schedulers.newThread())
- .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
- @Override
- public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
-
- //这里可以发送新的被观察者 Observable
- return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
- @Override
- public ObservableSource<?> apply(Throwable throwable) throws Exception {
-
- //如果发射的onError就终止
- return Observable.error(new Throwable("retryWhen终止啦"));
- }
- });
-
- }
- })
-
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "收到消息: " + s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG, "结果错误: " + throwable.toString());
- }
- });
- }
6、辅助操作符
delay操作符
整体延迟一段指定的时间再发射来自Observable的发射物。就是延迟。
它有6种方法参数:
- public void operator_delay(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- for(int i=0; i<=3 ;i++){
- e.onNext(i+"");
- }
- try{
- Thread.sleep(1000);
- }catch (Exception ex){
- ex.printStackTrace();
- }
- e.onComplete();
- }
- })
- .delay(3000, TimeUnit.MILLISECONDS)
- //delayError参数如果为假就直接抛出onError,为真就如常延迟执行
- // .delay(3000,TimeUnit.MILLISECONDS,true)
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "收到消息: " + s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG, "收到错误: " + throwable.toString());
- }
- });
- }
delaySubscription操作符
该操作符跟delay的差别就是:delaySubscription只做一件事,延迟订阅。
do操作符
do操作符有很多个,就相当于生命周期。例如doOnNext在onNext的时候回调。
例如下面的例子:
- public void operator_do(View view){
- Observable.just("1","2")
- .doOnNext(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG,"doOnNext:"+s);
- }
- })
- .doAfterNext(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG,"doAfterNext:"+s);
- }
- })
- .doOnComplete(new Action() {
- @Override
- public void run() throws Exception {
- Log.e(TAG,"doOnComplete:");
- }
- })
- .doOnSubscribe(new Consumer<Disposable>() {//订阅之后回调的方法
- @Override
- public void accept(Disposable disposable) throws Exception {
- Log.e(TAG,"doOnSubscribe");
- }
- })
- .doAfterTerminate(new Action() {
- @Override
- public void run() throws Exception {
- Log.e(TAG,"doAfterTerminate");
- }
- })
- .doFinally(new Action() {
- @Override
- public void run() throws Exception {
- Log.e(TAG,"doFinally:");
- }
- })
- //Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
- .doOnEach(new Consumer<Notification<String>>() {
- @Override
- public void accept(Notification<String> stringNotification) throws Exception {
- Log.e(TAG, "doOnEach: "+(stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError"));
- }
- })
- //订阅后可以进行取消订阅
- .doOnLifecycle(new Consumer<Disposable>() {
- @Override
- public void accept(Disposable disposable) throws Exception {
- Log.e(TAG, "doOnLifecycle: "+disposable.isDisposed());
- //disposable.dispose();
- }
- }, new Action() {
- @Override
- public void run() throws Exception {
- Log.e(TAG, "doOnLifecycle run: ");
- }
- })
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "收到消息: " + s);
- }
- });
- }
执行结果如下:
materialize把 被观察者Observable转换为Notification通知对象。dematerialize相反了。 注意用了materialize之后,onNext会回调多了一个数据,因为onComplete也回调到这里了。
示例代码如下
- public void operator_materialize(View view){
- Observable.just("1","2")
- .materialize()
- .subscribe(new Consumer<Notification<String>>() {
- @Override
- public void accept(@NonNull Notification<String> stringNotification) throws Exception {
- //这时候的数据就是一个Notification对象了
- Log.e(TAG, (stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError")+": "+stringNotification.getValue());
- }
- });
- }
日志输出如下:
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onNext: 1
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onNext: 2
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onComplete: null
TimeInterval操作符
获取数据发送的时间间隔,就是把数据转换为数据发送的间隔Timed。
有4个参数方法:
- public void operator_timeinterval(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- for(int i=0; i<3; i++){
- e.onNext(i+"");
- Thread.sleep(1000);
- }
- e.onComplete();
- }
- })
- .subscribeOn(Schedulers.newThread())
- .timeInterval()
- .subscribe(new Consumer<Timed<String>>() {
- @Override
- public void accept(@NonNull Timed<String> stringTimed) throws Exception {
- Log.e(TAG, "accept: "+stringTimed.time());
- }
- });
- }
日志输出如下:
- public void operator_timestamp(View view){
- Observable.just("a","b")
- .timestamp()
- .subscribe(new Consumer<Timed<String>>() {
- @Override
- public void accept(@NonNull Timed<String> stringTimed) throws Exception {
- //转换时间
- String date = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss", Locale.CHINA)
- .format(new Date(stringTimed.time()));
- Log.e(TAG, "accept: "+date);
- }
- });
- }
给定多个Observable,只让第一个发射数据的Observable发射全部数据。
ambWith和ambArray差不多,Observable.amb(o1,o2)和o1.ambWith(o2)是一样的效果。
有两个参数方法:
- amb(Iterable);
- ambArray();
- public void operator_amb(View view){
- Observable o1 = Observable.just("a","b","c").delay(1000, TimeUnit.MILLISECONDS);
- Observable o2 = Observable.just("d","e","f");
-
- Observable.ambArray(o1,o2).subscribe(new Consumer<String>() {
- @Override
- public void accept(@NonNull String o) throws Exception {
- Log.e(TAG, "accept: "+o);
- }
- });
- }
输出结果如下:
- public void operator_defaultIfEmpty(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- e.onComplete();
- }
- }).defaultIfEmpty("默认数据")
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(@NonNull String s) throws Exception {
- Log.e(TAG, "accept: "+s);
- }
- });
- }
输入日志如下:
- public void operator_switchIfEmpty(View view){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- e.onComplete();
- }
- }).switchIfEmpty(Observable.just("a","b","c"))
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(@NonNull String s) throws Exception {
- Log.e(TAG, "accept: "+s);
- }
- });
- }
执行结果如下:
- //丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
- public void operator_skipUntil(View view) {
- //skipUntil里面的Observable发射了之后,原始的Observable每隔一秒循环发射的数据才开始被接收到
- Observable.interval(1, TimeUnit.SECONDS)
- .skipUntil(Observable.just("1")).delay(5,TimeUnit.SECONDS)
- .subscribe(new Consumer<Long>() {
- @Override
- public void accept(Long aLong) throws Exception {
- Log.e(TAG, "accept:" + aLong);
- }
- });
- }
- public void operator_skipWhile(View view){
- Observable.interval(1, TimeUnit.SECONDS)
- .observeOn(Schedulers.newThread())
- .skipWhile(new Predicate<Long>() {
- @Override
- public boolean test(@NonNull Long aLong) throws Exception {
- return aLong < 5;
- //返回假,原始的Observable发射的数据才可以接收到
- }
- })
- .subscribe(new Consumer<Long>() {
- @Override
- public void accept(@NonNull Long aLong) throws Exception {
- Log.e(TAG, "accept: "+aLong);
- }
- });
- }
输出日志如下:
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。