赞
踩
虽说Rxjava显然已经有些过时了,但是有些公司还在使用,为了能适应更多的业务代码,并提高自己的开发效率,所以这里仅做个Rxjava使用的总结,不涉及Rxjava内部的实现原理。
RxJava的核心就是异步数据流和响应式编程。
我们平时开发过程中的网络请求、数据库读写、文件读写、定时任务等各种耗时操作,都可以使用RxJava来完成。
在平时的开发中,我们可以把所有的事件(数据)我们都可以看成是一条河流,它可以被观察,被过滤等操作,也可以将多条河流汇合成一条新河流。
只需要引入如下两个依赖即可使用rxjava:
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
被观察者去订阅观察者
)Observable、Flowable、Single、Completable、Maybe都是被观察者,这几种被观察者可以通过 toObservable、toFlowable、toSingle、toCompletable、toMaybe 相互转化。
创建被观察者
的各种操作符:create()、just() 等等。
create操作符演示:
private fun testRxjava() { Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { // 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到 emitter.onNext("发送一个事件1") emitter.onNext("发送一个事件2") // onComplete方法表示事件发送结束 emitter.onComplete() } }).subscribe(object : Observer<String> { override fun onSubscribe(d: Disposable) { // 被观察者和观察者建立订阅之后回调的方法 Log.i("testLog", "onSubscribe..") } override fun onNext(t: String) { Log.i("testLog", "onNext.. t = $t") } override fun onError(e: Throwable) { // 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!")) // 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的 Log.i("testLog", "onError.. e = ${e.message}") } override fun onComplete() { Log.i("testLog", "onComplete..") } }) }
日志打印如下:
I/testLog: onSubscribe..
I/testLog: onNext.. t = 发送一个事件1
I/testLog: onNext.. t = 发送一个事件2
I/testLog: onComplete..
当然,我们也可以使用Consumer
(消费者)充当观察者,Consumer只有一个方法即accept
,比起Observer
需要实现四个方法
来说,显然更加简洁了,
而当我们想要处理异常时,只需要再多传入一个专门接收Throwable
的Consumer
即可。使用如下:
Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { // 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到 emitter.onNext("发送一个事件1") emitter.onNext("发送一个事件2") emitter.onError(Throwable("test error!")) // onComplete方法表示事件发送结束 emitter.onComplete() } }).subscribe( // 第一个Consumer,用来处理事件 object : Consumer<String> { override fun accept(t: String) { Log.i("testLog", "accept.. t = $t") } }, // 异常将会在这里的Consumer中处理 object : Consumer<Throwable> { override fun accept(t: Throwable?) { Log.i("testLog", "accept onError.. e = ${t?.message}") } }) 日志打印如下: I/testLog: accept.. t = 发送一个事件1 I/testLog: accept.. t = 发送一个事件2 I/testLog: accept onError.. e = test error!
just操作符:
通过just操作符,可以非常简单的完成create操作符实现的事情。
just 方法传参限制最多为10个,另外,just 内部实际上是调用了 fromArray
操作符方法,而fromArray
方法是不限制传参数量的。
just操作符的使用如下所示:
private fun testRxjava() {
val consumer = object : Consumer<Any> {
override fun accept(t: Any?) {
Log.i("testLog", "accept.. t = $t")
}
}
Observable.just("a", 2, "3")
.subscribe(consumer)
}
日志打印如下:
I/testLog: accept.. t = a
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3
map() / flatMap() / concatMap() 等。
map()
操作符可以将被观察者发送的数据类型转变成其他的类型。flatMap()
可以将事件序列中的元素进行整合加工,返回一个新的被观察者,在网络请求场景中比较常用。concatMap()
和flatMap()
基本一样,只不过concatMap()
转化出来的事件是有序的,而flatMap()
转化出来的事件是无序的。map()操作符的使用如下:
private fun testRxjava() { val consumer = object : Consumer<Any> { override fun accept(t: Any?) { Log.i("testLog", "accept.. t = $t") } } Observable.just("2") .map(object : Function<String, Int> { override fun apply(t: String): Int { // 事件发送出来后进入apply方法 // 这里将这个事件(字符串2)转化为int类型并加1,然后返回 return t.toString().toInt() + 1 } }).subscribe(consumer) } // 日志打印如下: I/testLog: accept.. t = 3
flatMap()操作符的使用如下:
private fun testRxjava() { val consumer = object : Consumer<Any> { override fun accept(t: Any?) { Log.i("testLog", "accept.. t = $t") } } Observable.just("1", "2", "3", "4", "5") .flatMap(object : Function<String, ObservableSource<Any>> { override fun apply(t: String): ObservableSource<Any> { // ObservableSource是被观察者的顶层父类,所以其实就是生成一个新的被观察者并返回 // 这里拿到的 t 是无序的,如果需要有序,则使用concatMap即可 // 这种场景类比于当前需要请求基于上一次请求的结果 return Observable.just(t + "3") } }).subscribe(consumer) }
concat()
作用基本一样,只是concat()
是串行的,而merge()
是并行的private fun testRxjava() { val consumer = object : Consumer<Any> { override fun accept(t: Any?) { Log.i("testLog", "accept.. t = $t") } } // 将两个被观察者进行整合,得到一个新的被观察者 Observable.concat( Observable.just("1"), Observable.just("2") ).subscribe(consumer) } 日志打印如下: I/testLog: accept.. t = 1 I/testLog: accept.. t = 2
subscribeOn()
:用来决定执行subscribe()
方法所处的线程,也就是发射事件所在的线程,该方法需要传入一个Scheduler对象,Schedulers.io()
和Schedulers.newThread()
都可以拿到一个Scheduler对象,它们都可以开启一个子线程,只是Schedulers.io()
的底层实现是线程池的形式。
observeOn()
:用来决定下游事件被处理所处的线程,该方法同样需要传入一个Scheduler对象,一般是该方法来切换回到主线程。
下面是它们的用法:
private fun testRxjava() { Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { // 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到 Thread.sleep(2000) emitter.onNext("发送一个事件1,当前线程为:" + Thread.currentThread().name) emitter.onNext("发送一个事件2,当前线程为:" + Thread.currentThread().name) // onComplete方法表示事件发送结束 emitter.onComplete() } }) .subscribeOn(Schedulers.newThread()) // 决定上游事件被处理所处的线程 .observeOn(AndroidSchedulers.mainThread()) // 决定下游事件被处理所处的线程 .subscribe(object : Observer<String> { override fun onSubscribe(d: Disposable) { // 被观察者和观察者建立订阅之后回调的方法 Log.i("testLog", "onSubscribe.." + Thread.currentThread().name) } override fun onNext(t: String) { Log.i("testLog", "onNext.. t = $t " + Thread.currentThread().name) } override fun onError(e: Throwable) { // 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!")) // 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的 Log.i("testLog", "onError.. e = ${e.message + Thread.currentThread().name}") } override fun onComplete() { Log.i("testLog", "onComplete.." + Thread.currentThread().name) } }) } 日志打印如下: I/testLog: onSubscribe..main I/testLog: onNext.. t = 发送一个事件1,当前线程为:RxNewThreadScheduler-1 main I/testLog: onNext.. t = 发送一个事件2,当前线程为:RxNewThreadScheduler-1 main I/testLog: onComplete..main
filter():过滤掉某些事件
private fun testRxjava() { val consumer = object : Consumer<Any> { override fun accept(t: Any?) { Log.i("testLog", "accept.. t = $t") } } // 发送[1,10]范围内的数值 Observable.range(1, 10) .filter(object : Predicate<Int> { override fun test(t: Int): Boolean { if (t < 5) { // 如果 t 小于5,就将 t 过滤掉 return true } else { // 把 >= 5的t保留 return false } } }) .subscribe(consumer) } 日志打印如下: I/testLog: accept.. t = 1 I/testLog: accept.. t = 2 I/testLog: accept.. t = 3 I/testLog: accept.. t = 4
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。