当前位置:   article > 正文

【RxJava】使用_rxjava使用

rxjava使用

RxJava】使用

虽说Rxjava显然已经有些过时了,但是有些公司还在使用,为了能适应更多的业务代码,并提高自己的开发效率,所以这里仅做个Rxjava使用的总结,不涉及Rxjava内部的实现原理。

RxJava的核心就是异步数据流和响应式编程

我们平时开发过程中的网络请求、数据库读写、文件读写、定时任务等各种耗时操作,都可以使用RxJava来完成。

在平时的开发中,我们可以把所有的事件(数据)我们都可以看成是一条河流,它可以被观察,被过滤等操作,也可以将多条河流汇合成一条新河流。

引入RxJava

只需要引入如下两个依赖即可使用rxjava:

implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
  • 1
  • 2

RxJava的几个重要概念

  • 观察者Observer:观察事件变化并处理的主要角色,消费者也可以理解为一种特殊的观察者
  • 被观察者:触发事件并决定什么时候发送事件的主要角色
  • 订阅Subscribe:观察者和被观察者建立关联的操作(代码中的体现经常是被观察者去订阅观察者

Observable、Flowable、Single、Completable、Maybe都是被观察者,这几种被观察者可以通过 toObservable、toFlowable、toSingle、toCompletable、toMaybe 相互转化。

常用操作符及其使用

创建操作符

创建被观察者的各种操作符:create()、just() 等等。

create操作符演示:

private fun testRxjava() {
    Observable.create(object : ObservableOnSubscribe<String> {
        override fun subscribe(emitter: ObservableEmitter<String>) {
            // 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到
            emitter.onNext("发送一个事件1")
            emitter.onNext("发送一个事件2")
            // onComplete方法表示事件发送结束
            emitter.onComplete()
        }
    }).subscribe(object : Observer<String> {
        override fun onSubscribe(d: Disposable) {
            // 被观察者和观察者建立订阅之后回调的方法
            Log.i("testLog", "onSubscribe..")
        }

        override fun onNext(t: String) {
            Log.i("testLog", "onNext.. t = $t")
        }

        override fun onError(e: Throwable) {
            // 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!"))
            // 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的
            Log.i("testLog", "onError.. e = ${e.message}")
        }

        override fun onComplete() {
            Log.i("testLog", "onComplete..")
        }

    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

日志打印如下:

I/testLog: onSubscribe..
I/testLog: onNext.. t = 发送一个事件1
I/testLog: onNext.. t = 发送一个事件2
I/testLog: onComplete..
  • 1
  • 2
  • 3
  • 4

当然,我们也可以使用Consumer(消费者)充当观察者,Consumer只有一个方法即accept,比起Observer需要实现四个方法来说,显然更加简洁了,

而当我们想要处理异常时,只需要再多传入一个专门接收ThrowableConsumer即可。使用如下:

Observable.create(object : ObservableOnSubscribe<String> {
    override fun subscribe(emitter: ObservableEmitter<String>) {
        // 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到
        emitter.onNext("发送一个事件1")
        emitter.onNext("发送一个事件2")
        emitter.onError(Throwable("test error!"))
        // onComplete方法表示事件发送结束
        emitter.onComplete()
    }
}).subscribe(
    // 第一个Consumer,用来处理事件
    object : Consumer<String> {
        override fun accept(t: String) {
            Log.i("testLog", "accept.. t = $t")
        }
    },
    // 异常将会在这里的Consumer中处理
    object : Consumer<Throwable> {
        override fun accept(t: Throwable?) {
            Log.i("testLog", "accept onError.. e = ${t?.message}")
        }
    })

日志打印如下:
I/testLog: accept.. t = 发送一个事件1
I/testLog: accept.. t = 发送一个事件2
I/testLog: accept onError.. e = test error!
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

just操作符

通过just操作符,可以非常简单的完成create操作符实现的事情。

just 方法传参限制最多为10个,另外,just 内部实际上是调用了 fromArray 操作符方法,而fromArray方法是不限制传参数量的。

just操作符的使用如下所示:

private fun testRxjava() {
    val consumer = object : Consumer<Any> {
        override fun accept(t: Any?) {
            Log.i("testLog", "accept.. t = $t")
        }
    }
    Observable.just("a", 2, "3")
        .subscribe(consumer)
}

日志打印如下:
I/testLog: accept.. t = a
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

转换操作符

map() / flatMap() / concatMap() 等。

  • map()操作符可以将被观察者发送的数据类型转变成其他的类型
  • flatMap()可以将事件序列中的元素进行整合加工,返回一个新的被观察者,在网络请求场景中比较常用。
  • concatMap()flatMap()基本一样,只不过concatMap()转化出来的事件是有序的,而flatMap()转化出来的事件是无序的

map()操作符的使用如下:

private fun testRxjava() {
    val consumer = object : Consumer<Any> {
        override fun accept(t: Any?) {
            Log.i("testLog", "accept.. t = $t")
        }
    }
    Observable.just("2")
        .map(object : Function<String, Int> {
            override fun apply(t: String): Int {
                // 事件发送出来后进入apply方法
                // 这里将这个事件(字符串2)转化为int类型并加1,然后返回
                return t.toString().toInt() + 1
            }
        }).subscribe(consumer)
}

// 日志打印如下:
I/testLog: accept.. t = 3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

flatMap()操作符的使用如下:

private fun testRxjava() {
    val consumer = object : Consumer<Any> {
        override fun accept(t: Any?) {
            Log.i("testLog", "accept.. t = $t")
        }
    }
    Observable.just("1", "2", "3", "4", "5")
        .flatMap(object : Function<String, ObservableSource<Any>> {
            override fun apply(t: String): ObservableSource<Any> {
                // ObservableSource是被观察者的顶层父类,所以其实就是生成一个新的被观察者并返回
                // 这里拿到的 t 是无序的,如果需要有序,则使用concatMap即可
                // 这种场景类比于当前需要请求基于上一次请求的结果
                return Observable.just(t + "3")
            }

        }).subscribe(consumer)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

组合操作符

  • concat():将多个被观察者进行整合,得到一个新的被观察者
  • merge():和concat()作用基本一样,只是concat()串行的,而merge()并行
private fun testRxjava() {
    val consumer = object : Consumer<Any> {
        override fun accept(t: Any?) {
            Log.i("testLog", "accept.. t = $t")
        }
    }
    // 将两个被观察者进行整合,得到一个新的被观察者
    Observable.concat(
        Observable.just("1"),
        Observable.just("2")
    ).subscribe(consumer)
}

日志打印如下:
I/testLog: accept.. t = 1
I/testLog: accept.. t = 2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

功能操作符

subscribeOn():用来决定执行subscribe()方法所处的线程,也就是发射事件所在的线程,该方法需要传入一个Scheduler对象Schedulers.io()Schedulers.newThread()都可以拿到一个Scheduler对象,它们都可以开启一个子线程,只是Schedulers.io()的底层实现是线程池的形式。

observeOn():用来决定下游事件被处理所处的线程,该方法同样需要传入一个Scheduler对象,一般是该方法来切换回到主线程。

下面是它们的用法:

private fun testRxjava() {
    Observable.create(object : ObservableOnSubscribe<String> {
        override fun subscribe(emitter: ObservableEmitter<String>) {
            // 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到
            Thread.sleep(2000)
            emitter.onNext("发送一个事件1,当前线程为:" + Thread.currentThread().name)
            emitter.onNext("发送一个事件2,当前线程为:" + Thread.currentThread().name)
            // onComplete方法表示事件发送结束
            emitter.onComplete()
        }
    })
        .subscribeOn(Schedulers.newThread()) // 决定上游事件被处理所处的线程
        .observeOn(AndroidSchedulers.mainThread()) // 决定下游事件被处理所处的线程
        .subscribe(object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                // 被观察者和观察者建立订阅之后回调的方法
                Log.i("testLog", "onSubscribe.." + Thread.currentThread().name)
            }

            override fun onNext(t: String) {
                Log.i("testLog", "onNext.. t = $t " + Thread.currentThread().name)
            }

            override fun onError(e: Throwable) {
                // 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!"))
                // 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的
                Log.i("testLog", "onError.. e = ${e.message + Thread.currentThread().name}")
            }

            override fun onComplete() {
                Log.i("testLog", "onComplete.." + Thread.currentThread().name)
            }

        })
}

日志打印如下:
I/testLog: onSubscribe..main
I/testLog: onNext.. t = 发送一个事件1,当前线程为:RxNewThreadScheduler-1 main
I/testLog: onNext.. t = 发送一个事件2,当前线程为:RxNewThreadScheduler-1 main
I/testLog: onComplete..main
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

过滤操作符

filter():过滤掉某些事件

private fun testRxjava() {
    val consumer = object : Consumer<Any> {
        override fun accept(t: Any?) {
            Log.i("testLog", "accept.. t = $t")
        }

    }
    // 发送[1,10]范围内的数值
    Observable.range(1, 10)
        .filter(object : Predicate<Int> {
            override fun test(t: Int): Boolean {
                if (t < 5) {
                    // 如果 t 小于5,就将 t 过滤掉
                    return true
                } else {
                    // 把 >= 5的t保留
                    return false
                }
            }

        })
        .subscribe(consumer)
}

日志打印如下:
I/testLog: accept.. t = 1
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3
I/testLog: accept.. t = 4
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/177360
推荐阅读
相关标签
  

闽ICP备14008679号