赞
踩
1.依赖
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'//实现防抖功能
2.五种被观察者
(1)Observer
Observer observer = new Observer<Object>() { private Disposable disposable; @Override public void onSubscribe(@NonNull Disposable d) { //事件订阅成功回调,返回Disposable请求实例,可以通过d.dispose()中断传递请求 //最先被回调 一定执行在主线程里 System.out.println("onSubscribe()"); //可以获取到d disposable = d; } @Override public void onNext(@NonNull Object o) { 响应事件的方法,接收事件数据 System.out.println("onNext()" + o); //可以切断传递 disposable.dispose(); } @Override public void onError(@NonNull Throwable e) { //事件队列异常,不再接收新的事件 System.out.println("onError()" + e); } @Override public void onComplete() { //事件队列完结,不再接收新的事件 System.out.println("onComplete()"); } };
3.订阅者
4.创建操作符
(1) create()
(2)just()
简化Create操作,最多只能传入10个参数
(3)fromArray()
简化Create操作 可能传入无限个参数
(4)fromFuture()
与线程安全有关
(5)fromIterable()
可以传入继承了Iterable接口的集合
(6)range()
可以方便确定范围
5.转换操作符
(1)map()
可以将被观察者发送的数据类型转变为其他类型
(2)flatMap()
可以将事件序列中的元素进行加工,一一返回新的被观察者(s)
可以处理嵌套for循环、嵌套网络请求的情况
(3)concatMap()
在flatMap的基础上保证转发出来的事件是有序的
(4)buffer()
从需要发射的事件序列中放进缓存区,达一定数量时一起发送
6.组合操作符
(1)concat()
将多个被观察者所需要发送的事件组合在一个被观察者中,一起发送(即调用一个被观察者,多次调用onNext()方法)最多发送4个
(2)concatArray()
在concat()的基础上可以传无限个被观察者
(3)merge()
在concat()的基础上,实现并行操作
7.功能操作符
(1)subscribeOn()
确定上游事件所处线程
(2)observeOn()
确定下游事件所处线程
(3)参数
8.过滤操作符
(1)filter()
满足条件true则通过;不满足条件false过滤掉
9.条件操作符
10.自定义Observer
abstract class APIResponse<T>(val context : Context) : Observer<LoginResponseWrapper<T>>{ private var isShow = true //次构造函数 -> 主动控制Dialog显示 constructor(context: Context,isShow : Boolean) : this(context){ this.isShow = isShow } //模板设计模式 abstract fun success(data : T ?) abstract fun failure(errorMsg: String ? ) //订阅---最先调用 override fun onSubscribe(d: Disposable) { // 弹出 加载框 if (isShow) { LoadingDialog.show(context) } } //向下游---上游流下了的数据 override fun onNext(t: LoginResponseWrapper<T>) { if (t.data == null) { // 失败 failure("登录失败了,请检查原因:msg:${t.errorMsg}") } else { // 成功 success(t.data) } } //错误---上游流下了的错误 override fun onError(e: Throwable) { // 取消加载 LoadingDialog.cancel() failure(e.message) } //完成---传递数据完成后 override fun onComplete() { // 取消加载 LoadingDialog.cancel() } }
11.对上下游所处线程进行封装
private static <UD> ObservableTransformer<UD, UD> rxud() { return new ObservableTransformer<UD, UD>() { @Override public ObservableSource<UD> apply(Observable<UD> upstream) { return upstream.subscribeOn(Schedulers.io()) // 给上面代码分配异步线程 .observeOn(AndroidSchedulers.mainThread()) // 给下面代码分配主线程; //hook .map(new Function<UD, UD>() { @Override public UD apply(UD ud) throws Exception { Log.d(TAG, "apply: 我监听到你了,居然再执行"); return ud; } }); } }; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。