当前位置:   article > 正文

Rxjava_io.reactivex.rxjava2:rxjava

io.reactivex.rxjava2:rxjava
  1. 依赖
  2. 五种被观察者
  3. 订阅者
  4. 创建操作符
  5. 转换操作符
  6. 组合操作符
  7. 功能操作符
  8. 过滤操作符
  9. 条件操作符
  10. 自定义Observer
  11. 对上下游所处线程进行封装

1.依赖

implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'//实现防抖功能
  • 1
  • 2
  • 3

2.五种被观察者
(1)Observer

    Observer observer = new Observer<Object>() {
        private Disposable disposable;
        
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            //事件订阅成功回调,返回Disposable请求实例,可以通过d.dispose()中断传递请求
            //最先被回调 一定执行在主线程里
            System.out.println("onSubscribe()");
            //可以获取到d
            disposable = d;
        }

        @Override
        public void onNext(@NonNull Object o) {
            响应事件的方法,接收事件数据
            System.out.println("onNext()" + o);
            //可以切断传递
            disposable.dispose();
        }

        @Override
        public void onError(@NonNull Throwable e) {
            //事件队列异常,不再接收新的事件
            System.out.println("onError()" + e);
        }

        @Override
        public void onComplete() {
            //事件队列完结,不再接收新的事件
            System.out.println("onComplete()");
        }
    };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

3.订阅者

4.创建操作符
(1) create()

在这里插入图片描述
(2)just()
简化Create操作,最多只能传入10个参数
在这里插入图片描述
(3)fromArray()
简化Create操作 可能传入无限个参数
在这里插入图片描述

(4)fromFuture()
与线程安全有关
在这里插入图片描述
(5)fromIterable()
可以传入继承了Iterable接口的集合
在这里插入图片描述
(6)range()
可以方便确定范围

在这里插入图片描述
5.转换操作符
(1)map()
可以将被观察者发送的数据类型转变为其他类型
在这里插入图片描述

(2)flatMap()
可以将事件序列中的元素进行加工,一一返回新的被观察者(s)
在这里插入图片描述

可以处理嵌套for循环、嵌套网络请求的情况
  • 1

(3)concatMap()
在flatMap的基础上保证转发出来的事件是有序的

(4)buffer()
从需要发射的事件序列中放进缓存区,达一定数量时一起发送

在这里插入图片描述

6.组合操作符
(1)concat()
将多个被观察者所需要发送的事件组合在一个被观察者中,一起发送(即调用一个被观察者,多次调用onNext()方法)最多发送4个

在这里插入图片描述
(2)concatArray()
在concat()的基础上可以传无限个被观察者

(3)merge()
在concat()的基础上,实现并行操作

7.功能操作符
(1)subscribeOn()
确定上游事件所处线程
(2)observeOn()
确定下游事件所处线程
(3)参数

  • Schedulers.newThread() — 总是启用新线程,并在新线程中执行操作;多用于耗时操作
  • Schedulers.io() — 通常用于网络、读写文件等IO密集型的操作,行为模式和new Thread()差不多,只是IO的内部是一个无上限的线程池,可重用空闲的线程,更高效
  • AndroidSchedulers.mainThread() — Android的主线程;用于更新UI
  • Schedulers.computation() — 代表CPU计算密集型的操作

在这里插入图片描述

8.过滤操作符
(1)filter()
满足条件true则通过;不满足条件false过滤掉
在这里插入图片描述

9.条件操作符

10.自定义Observer

  1. 继承Observer
  2. 确定泛型类型
  3. 重写4个主要方法
  4. 使用模板设计模式
abstract class APIResponse<T>(val context : Context)
    : Observer<LoginResponseWrapper<T>>{

    private var isShow = true

    //次构造函数 ->  主动控制Dialog显示
    constructor(context: Context,isShow : Boolean) : this(context){
        this.isShow = isShow
    }

    //模板设计模式
    abstract fun success(data : T ?)
    abstract fun failure(errorMsg: String ? )


    //订阅---最先调用
    override fun onSubscribe(d: Disposable) {
        // 弹出 加载框
        if (isShow) {
            LoadingDialog.show(context)
        }
    }

    //向下游---上游流下了的数据
    override fun onNext(t: LoginResponseWrapper<T>) {

        if (t.data == null) {
            // 失败
            failure("登录失败了,请检查原因:msg:${t.errorMsg}")
        } else {
            // 成功
            success(t.data)
        }
    }

    //错误---上游流下了的错误
    override fun onError(e: Throwable) {
        // 取消加载
        LoadingDialog.cancel()

        failure(e.message)
    }

    //完成---传递数据完成后
    override fun onComplete() {
        // 取消加载
        LoadingDialog.cancel()
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

在这里插入图片描述11.对上下游所处线程进行封装

    private static <UD> ObservableTransformer<UD, UD> rxud() {
        return new ObservableTransformer<UD, UD>() {
            @Override
            public ObservableSource<UD> apply(Observable<UD> upstream) {
                return  upstream.subscribeOn(Schedulers.io())     // 给上面代码分配异步线程
                        .observeOn(AndroidSchedulers.mainThread()) // 给下面代码分配主线程;
                        //hook
                        .map(new Function<UD, UD>() {
                            @Override
                            public UD apply(UD ud) throws Exception {
                                Log.d(TAG, "apply: 我监听到你了,居然再执行");
                                return ud;
                            }
                        });
            }
        };
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/300813
推荐阅读
相关标签
  

闽ICP备14008679号