赞
踩
人性的背后是白云苍狗,愿我们都能做生活的高手
目录
3.2.1由 Observable 通过 create 操作符来创建
Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。
ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
ReactiveX不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。
①使用观察者模式:
Rx可以方便的创建事件流和数据流
使用查询式的操作符组合和变换数据流
可以订阅任何可观察的数据流并执行操作
②简化代码:
对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题
③使用Observable
Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。
Observable通过使用最佳的方式访问异步数据序列填补了这个间隙
Rx的Observable模型让你可以像使用集合数据一样操作异步事件流,对异步事件流使用各种简单、可组合的操作。
④响应式编程
Rx提供了一系列的操作符,可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。
可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好。使用Observable,在数据准备好时,生产者将数据推送给消费者。数据可以同步或异步的到达,这种方式更灵活。
- implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
- implementation 'io.reactivex.rxjava2:rxjava:2.0.7'
- //创建Observable
- Observable<String> observable =Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- //发送事件
- e.onNext("袁震");
- //事件发送完成
- e.onComplete();
- }
- });
接收一个集合作为输入,然后每次输出一个元素给subscriber。
- List<String> list =new ArrayList<>();
- list.add("袁震1");
- list.add("袁震2");
- list.add("袁震3");
- list.add("袁震4");
- list.add("袁震5");
- list.add("袁震6");
- Observable.fromArray(list)
- .subscribe(new Observer<List<String>>() {
- @Override
- public void onSubscribe(Disposable d) {
- // 开始分发
- }
- @Override
- public void onNext(List<String> strings) {
- // 拿到事件
- }
- @Override
- public void onError(Throwable e) {
- // 错误事件
- }
- @Override
- public void onComplete() {
- // 完成事件
- }
- });
- Observable.just(list)
- .subscribe(new Observer<List<String>>() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
- @Override
- public void onNext(List<String> strings) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
创建Observer消费
- Observer<List<String>> observer =new Observer<List<String>>() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
- @Override
- public void onNext(List<String> strings) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- };
- //事件订阅
- Observable.just(list)
- .subscribe(observer);
在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:
1. Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;
2. Schedulers.newThread(): 开启新线程操作;
3. Schedulers.immediate(): 默认指定的线程,也就是当前线程;
4. Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;
5. AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;
我们可以通过 subscribeOn() 和 observeOn() 这两个方法来进行线程调度
举一个简单的加载图片的案例:
- public class MainActivity extends AppCompatActivity {
- private final static String IMGPATH = "https://image.baidu.com/search/detail?ct=503316480&z=0&ipn=d&word=%E9%BE%99%E7%8F%A0%E5%9B%BE%E7%89%87&step_word=&hs=0&pn=14&spn=0&di=7264239678495129601&pi=0&rn=1&tn=baiduimagedetail&is=0%2C0&istype=0&ie=utf-8&oe=utf-8&in=&cl=2&lm=-1&st=undefined&cs=142232831%2C114825227&os=3731017878%2C1223082555&simid=142232831%2C114825227&adpicid=0&lpn=0&ln=1644&fr=&fmq=1706601641304_R&fm=&ic=undefined&s=undefined&hd=undefined&latest=undefined©right=undefined&se=&sme=&tab=0&width=undefined&height=undefined&face=undefined&ist=&jit=&cg=&bdtype=0&oriquery=&objurl=https%3A%2F%2Fimg9.51tietu.net%2Fpic%2F20190919%2Fjekrbvfuaasjekrbvfuaas.jpg&fromurl=ippr_z2C%24qAzdH3FAzdH3Fooo_z%26e3Bc8ptjp7_z%26e3BgjpAzdH3FrAzdH3F8dn9a890_z%26e3Bip4s&gsm=1e&rpstart=0&rpnum=0&islist=&querylist=&nojc=undefined&dyTabStr=MCwzLDEsMiw2LDUsNCw4LDcsOQ%3D%3D&lid=10588899187206391380";
- private ImageView img;
-
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_main);
- img = findViewById(R.id.img);
- //创建Observable
- Observable.just(IMGPATH)//发送图片地址
- .map(new Function<String, Bitmap>() {
- @Override
- public Bitmap apply(String s) throws Exception {
- URL url = new URL(IMGPATH);
- HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
- httpURLConnection.setConnectTimeout(5000);
- int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
- if (responseCode == HttpURLConnection.HTTP_OK) {
- InputStream inputStream = httpURLConnection.getInputStream();
- Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
- return bitmap;
- }
- return null;
- }
- })
- .subscribeOn(Schedulers.io())//上面是异步
- .observeOn(AndroidSchedulers.mainThread())//下面是主线程
- .subscribe(new Observer<Bitmap>() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
-
- @Override
- public void onNext(Bitmap bitmap) {
- img.setImageBitmap(bitmap);
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
-
- }
- }
上面的案例中使用到了map,map的作用就是将发送的事件或事件序列,加工后转换成不同的事件或事件序列
上述案例中Observable 发送了一个 String 事件,也就是产生一个url,通过 map 操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Bitmap
如果我要在转换为birmap之后,再加一个日志,可以写为:
- public class MainActivity extends AppCompatActivity {
- private final static String IMGPATH = "https://image.baidu.com/search/detail?ct=503316480&z=0&ipn=d&word=%E9%BE%99%E7%8F%A0%E5%9B%BE%E7%89%87&step_word=&hs=0&pn=14&spn=0&di=7264239678495129601&pi=0&rn=1&tn=baiduimagedetail&is=0%2C0&istype=0&ie=utf-8&oe=utf-8&in=&cl=2&lm=-1&st=undefined&cs=142232831%2C114825227&os=3731017878%2C1223082555&simid=142232831%2C114825227&adpicid=0&lpn=0&ln=1644&fr=&fmq=1706601641304_R&fm=&ic=undefined&s=undefined&hd=undefined&latest=undefined©right=undefined&se=&sme=&tab=0&width=undefined&height=undefined&face=undefined&ist=&jit=&cg=&bdtype=0&oriquery=&objurl=https%3A%2F%2Fimg9.51tietu.net%2Fpic%2F20190919%2Fjekrbvfuaasjekrbvfuaas.jpg&fromurl=ippr_z2C%24qAzdH3FAzdH3Fooo_z%26e3Bc8ptjp7_z%26e3BgjpAzdH3FrAzdH3F8dn9a890_z%26e3Bip4s&gsm=1e&rpstart=0&rpnum=0&islist=&querylist=&nojc=undefined&dyTabStr=MCwzLDEsMiw2LDUsNCw4LDcsOQ%3D%3D&lid=10588899187206391380";
- private ImageView img;
-
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_main);
- img = findViewById(R.id.img);
- //创建Observable
- Observable.just(IMGPATH)//发送图片地址
- .map(new Function<String, Bitmap>() {
- @Override
- public Bitmap apply(String s) throws Exception {
- URL url = new URL(IMGPATH);
- HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
- httpURLConnection.setConnectTimeout(5000);
- int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
- if (responseCode == HttpURLConnection.HTTP_OK) {
- InputStream inputStream = httpURLConnection.getInputStream();
- Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
- return bitmap;
- }
- return null;
- }
- })
- .map(new Function<Bitmap, Bitmap>() {
- @Override
- public Bitmap apply(Bitmap bitmap) throws Exception {
- System.out.println("下载图片");
- return bitmap;
- }
- })
- .subscribeOn(Schedulers.io())//上面是异步
- .observeOn(AndroidSchedulers.mainThread())//下面是主线程
- .subscribe(new Observer<Bitmap>() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
-
- @Override
- public void onNext(Bitmap bitmap) {
- img.setImageBitmap(bitmap);
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
-
- }
map 操作符,是一对一的变换,并且返回的是变换后的对象。而 flatMap 操作符可以适应一对多,并且返回的是一个 Observable
举个例子:
- public class Address {
-
- private String name;
-
- public Address(String name) {
- this.name = name;
- }
- }
- public class YuanZhen {
-
- private String name;
-
- private int age;
-
- public Address address;
-
- public YuanZhen(String name, int age, Address address) {
- this.name = name;
- this.age = age;
- this.address = address;
- }
-
- public Address getAddress() {
- return address;
- }
- }
- List<YuanZhen> list =new ArrayList<>();
- list.add(new YuanZhen("袁震1",18,new Address("家里")));
- list.add(new YuanZhen("袁震2",19,new Address("家里")));
- list.add(new YuanZhen("袁震3",20,new Address("家里")));
- Observable.just(list)
- .flatMap(new Function<List<YuanZhen>, ObservableSource<Address>>() {
- @Override
- public ObservableSource<Address> apply(List<YuanZhen> yuanZhen) throws Exception {
- return Observable.fromArray(yuanZhen.get(0).getAddress());
- }
- }).subscribe(new Observer<Address>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onNext(Address address) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
通过上面的代码可以看出,map 与 flatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:
flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象;
flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中;
flatMap 变换后产生的每一个Observable对象发送的事件,最后都汇入同一个Observable,进而发送给Subscriber回调;
map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样;
可以对一个Observable多次使用 map 和 flatMap;
debounce:N个事件发生的时间间隔太近,就过滤掉前N-1个事件,保留最后一个事件
- Observable.just(list)
- .debounce(1000, TimeUnit.MILLISECONDS)//1s内的事件被丢弃
throttleFirst:与debounce类似,也是时间间隔太短,就丢弃事件
添加依赖:
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1' // 操作功能防抖
- RxView.clicks(img)
- .throttleFirst(2000, TimeUnit.MILLISECONDS) // 2秒钟之内 响应一次
- .subscribe(new Observer<Object>() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
- @Override
- public void onNext(Object o) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
可以做定时操作,就是延迟执行。时间间隔由timer控制。
- Observable.timer(3,TimeUnit.MICROSECONDS)
- .subscribe(new Observer<Long>() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
- @Override
- public void onNext(Long aLong) {
- System.out.println("我是袁震");
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
3s后输出我是袁震
定时的周期性操作,与timer的区别就在于它可以重复操作。事件间隔由interval控制
- Observable.interval(3,TimeUnit.MICROSECONDS)
- .subscribe(new Observer<Long>() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
- @Override
- public void onNext(Long aLong) {
- System.out.println("我是袁震");
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
每隔3s输出一次我是袁震
在Activity#onDestroy()的时候或者不需要继续执行的时候应该取消订阅
- Observable<String> observable =Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- //发送事件
- e.onNext("袁震");
- //事件发送完成
- e.onComplete();
- }
- });
- observable.unsubscribeOn(AndroidSchedulers.mainThread());
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。