赞
踩
Rx是Reactive Extensions的缩写的简写,可以使用可观察数据流对编程接口进行异步编程,它结合了观察者模式,迭代器模式和函数式的精华。
Rxjava是一种异步数据处理库,也是一种观察者模式。最早是Netflix公司用于重构当前架构时减少REST调用的次数,参考了Microsoft公司的响应式编程,把Microsoft的Rx库迁移到Java JVM中,其中最有名的就是RxJava。
它的特点主要有以下:
RxAndroid
在开发项目的时候,开发者在使用Rxjava时会搭配RxAndroid,他是针对Rxjava在Android平台使用的一个响应式扩展组件。使用RxAndroid的Schedulers(调度器)可以解决Android主线程问题, 多线程等问题。
观察者模式的四大要素
观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。
扩展的观察者模式
onNext()订阅了一个事件,当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError()。
依赖
//在Project的gradle下添加maven仓库
maven {
url "https://oss.jfrog.org/libs-snapshot" }
implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
Hello World
//1.创建被观察者 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("hello world"); emitter.onComplete(); } }); //2.创建观察者 Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("onSubscribe():"); } @Override public void onNext(@NonNull String s) { System.out.println("onNext():" + s); } @Override public void onError(@NonNull Throwable e) { System.out.println("onError():" + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete():"); } }; //3.订阅事件 observable.subscribe(observer);
**注意:**onError()和onComplete()只会回调一个。
Create
//链式写法 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("hello world"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe():"+d.toString()); } @Override public void onNext(String o) { System.out.println("onNext():" + o); } @Override public void onError(Throwable e) { System.out.println("onError():" + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete():"); } });
Just
使用将为你创建一个Observable并自动为你调用onNext( )发射数据,just中传递的参数将直接在Observer的onNext()方法中接收到。
Observable.just("hello world").subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
From
将一个Iterable, 一个Future, 或者一个数组转换成一个Observable,遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。
List<String> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { list.add("Hello" + i); } Observable.fromArray(list).subscribe(new Observer<List<String>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<String> strings) { System.out.println(strings); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
Defer
当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。 以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。
value = "2020/12/13"; Observable<String> observable = Observable.defer(new Supplier<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> get() throws Throwable { return Observable.just(value); } }); value = "12345"; observable.subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
Empty/Never/Throw
Empty是创建一个不发射任何数据但是正常终止的Observable。 Never是创建一个不发射数据也不终止的Observable。 Throw是创建一个不发射数据以一个错误终止的Observable。 这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。
Observable.defer(new Supplier<ObservableSource<?>>() { @Override public ObservableSource<?> get() throws Throwable { return Observable.error(new Throwable("你写了个bug")); } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Object o) { } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { } });
Interval
创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定1秒一次调用onNext()方法。
//TrampolineScheduler不会立即执行,当其他排队任务结束时才执行,TrampolineScheduler运行在主线程。 Observable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline()).subscribe(new Observer<Long>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Long aLong) { System.out.println(aLong); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
Repeat
创建一个Observable,该Observable的事件可以重复调用。
Observable.just(123).repeat(2).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
Start
返回一个Observable,它发射一个类似于函数声明的值。
Timer
创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。
Observable.timer(2000, TimeUnit.MILLISECONDS,Schedulers.trampoline()).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { System.out.println(aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
Map
Map就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
//Integer to String
Observable.just(123).map(new Function<Integer, String>() {
@Override
public String apply(Integer s) throws Exception {
return s.toString();
}
}
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。