赞
踩
这是《使用Kotlin开发一个现代的APP》系列文章的第三部分,还没看过前2部分的,可以先看一下:
【译】使用Kotlin从零开始写一个现代Android 项目-Part1
【译】使用Kotlin从零开始写一个现代Android 项目-Part2
正文开始!
关于RxJava,一个广泛的概念是-RxJava是用于异步编程的API的Java实现,它具有可观察流和响应式的API。实际上,它是这三个概念的结合:观察者模式、迭代器模式和函数式编程。这里也有其他编程语言实现的库,如:RxSwift、RxJs 、RxNet等。
我RxJava上手很难,有时,它确实很令人困惑,如果实施不当,可能会给您带来一些问题。尽管如此,我们还是值得花时间学习它。我将尝试通过简单的步骤来解释RxJava。
首先,让我们回答一些简单的问题,当您开始阅读有关RxJava时,可能会问自己:
答案是否定的,RxJava只是可以在Android开发中使用的又一个库。如果使用Kotlin开发,它也不是必须的,我希望你明白我说的,它只一个很帮助你的库,就像你使用的所以其他库一样。
你可以直接从RxJava2开始,不过,作为Android开发人员,知道这两种情况对你还是有好处的,因为你可能会参与维护其他人的RxJava1代码。
RxJava能用在任何Java开发平台,不仅仅是Android,比如,对于后端开发来说,RxJava 可以与Spring等框架一起使用,RxAndroid是一个库,其中包含在Android中使用RxJava所需的库。因此,如果要在Android开发中使用RxJava,则必须再添加RxAndroid。稍后,我将解释RxAndroid基于RxJava所添加的内容。
我们没有必要另外再添加一个Rx 库了,因为Kotlin与Java是完全兼容的,这里确实有一个RxKotin库:https://github.com/ReactiveX/RxKotlin ,不过该库是在RxJava之上编写的。它只是将Kotlin功能添加到RxJava。您可以将RxJava与Kotlin一起使用,而无需使用RxKotlin库。为了简单起见,在这一部分中我将不使用RxKotlin。
要使用RxJava,你需要在build.gradle
中添加如下代码:
- dependencies {
- ...
- implementation "io.reactivex.rxjava2:rxjava:2.1.8"
- implementation "io.reactivex.rxjava2:rxandroid:2.0.1"
- ...
- }
然后,点击sync
,下载Rxjava库。
我想把RxJava分为以下三部分:
Observables
和 Observers
Observables
和 Observers
我们已经解释了这种模式。您可以将Observable视为数据的源(被观察者
),将Observer视为接收数据的源(观察者
)。
有很多创建Observables的方式,最简单的方法是使用Observable.just()
来获取一个项目并创建Observable来发射该项目。
让我们转到GitRepoRemoteDataSource
类并更改getRepositories
方法,以返回Observable:
- class GitRepoRemoteDataSource {
-
- fun getRepositories() : Observable<ArrayList<Repository>> {
- var arrayList = ArrayList<Repository>()
- arrayList.add(Repository("First from remote", "Owner 1", 100, false))
- arrayList.add(Repository("Second from remote", "Owner 2", 30, true))
- arrayList.add(Repository("Third from remote", "Owner 3", 430, false))
-
- return Observable.just(arrayList).delay(2,TimeUnit.SECONDS)
- }
- }
Observable <ArrayList <Repository >>
表示Observable发出Repository对象的数组列表。如果要创建发出Repository对象的Observable ,则应使用Observable.from(arrayList)
。
.delay(2,TimeUnit.SECONDS)
表示延迟2s后才开始发射数据。
但是,等等!我们并没有高数Observable何时发射数据啊?Observables通常在一些Observer订阅后就开始发出数据。
请注意,我们不再需要以下接口了
- interface OnRepoRemoteReadyCallback {
- fun onRemoteDataReady(data: ArrayList<Repository>)
- }
在GitRepoLocalDataSource:
类中做同样的更改
- class GitRepoLocalDataSource {
-
- fun getRepositories() : Observable<ArrayList<Repository>> {
- var arrayList = ArrayList<Repository>()
- arrayList.add(Repository("First From Local", "Owner 1", 100, false))
- arrayList.add(Repository("Second From Local", "Owner 2", 30, true))
- arrayList.add(Repository("Third From Local", "Owner 3", 430, false))
-
- return Observable.just(arrayList).delay(2, TimeUnit.SECONDS)
- }
-
- fun saveRepositories(arrayList: ArrayList<Repository>) {
- //todo save repositories in DB
- }
- }
同样的,也不需要这个接口了:
- interface OnRepoLocalReadyCallback {
- fun onLocalDataReady(data: ArrayList<Repository>)
- }
现在,我们需要在repository
中返回Observable
- class GitRepoRepository(private val netManager: NetManager) {
-
- private val localDataSource = GitRepoLocalDataSource()
- private val remoteDataSource = GitRepoRemoteDataSource()
-
- fun getRepositories(): Observable<ArrayList<Repository>> {
-
- netManager.isConnectedToInternet?.let {
- if (it) {
- //todo save those data to local data store
- return remoteDataSource.getRepositories()
- }
- }
-
- return localDataSource.getRepositories()
- }
- }
如果网络已连接,我们从远程数据源返回Observable,否则,从本地数据源返回Observable,同样的,我们也不再需要OnRepositoryReadyCallback
接口。
如你所料,我们需要更改在MainViewModel中获取数据的方式。现在我们应该从gitRepoRepository
获取Observable并订阅它。一旦我们向Observer订阅了该Observable,Observable将开始发出数据:
- class MainViewModel(application: Application) : AndroidViewModel(application) {
- ...
-
- fun loadRepositories() {
- isLoading.set(true)
- gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{
- override fun onSubscribe(d: Disposable) {
- //todo
- }
-
- override fun onError(e: Throwable) {
- //todo
- }
-
- override fun onNext(data: ArrayList<Repository>) {
- repositories.value = data
- }
-
- override fun onComplete() {
- isLoading.set(false)
- }
- })
- }
- }
一旦Observer订阅了Observable,onSubscribe
方法将被调用,主要onSubscribe
的参数Disposable
,稍后将讲到它。
每当Observable发出数据时,将调用onNext()
方法。当Observable完成s数据发射时,onComplete()
将被调用一次。之后,Observable终止。
如果发生某些异常,onError()
方法将被回调,然后Observable终止。这意味着Observable将不再发出数据,因此onNext()
不会被调用,也不会调用onComplete()
。
另外,请注意。如果尝试订阅已终止的Observable,则将收到IllegalStateException
。
那么,RxJava如何帮助我们?
onError()
方法中返回,因此我们可以向用户显示适当的错误消息。我们再一次看一下ViewModel的生命周期图
一旦Activity销毁,ViewModel的onCleared
方法将被调用,在onCleared
方法中,我们需要取消所有订阅
- class MainViewModel(application: Application) : AndroidViewModel(application) {
- ...
-
- lateinit var disposable: Disposable
-
- fun loadRepositories() {
- isLoading.set(true)
- gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{
- override fun onSubscribe(d: Disposable) {
- disposable = d
- }
-
- override fun onError(e: Throwable) {
- //if some error happens in our data layer our app will not crash, we will
- // get error here
- }
-
- override fun onNext(data: ArrayList<Repository>) {
- repositories.value = data
- }
-
- override fun onComplete() {
- isLoading.set(false)
- }
- })
- }
-
- override fun onCleared() {
- super.onCleared()
- if(!disposable.isDisposed){
- disposable.dispose()
- }
- }
- }
我们可以优化一下上面的代码:
首先,使用DisposableObserver
替换Observer
,它实现了Disposable并且有dispose()
方法,我们不再需要onSubscribe()
方法,因为我们可以直接在DisposableObserver实例上调用dispose()
。
第二步,替换掉返回Void的.subscribe()
方法,使用.subscribeWith()
方法,他能返回指定的Observer
- class MainViewModel(application: Application) : AndroidViewModel(application) {
- ...
-
- lateinit var disposable: Disposable
-
- fun loadRepositories() {
- isLoading.set(true)
- disposable = gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {
-
- override fun onError(e: Throwable) {
- // todo
- }
-
- override fun onNext(data: ArrayList<Repository>) {
- repositories.value = data
- }
-
- override fun onComplete() {
- isLoading.set(false)
- }
- })
- }
-
- override fun onCleared() {
- super.onCleared()
- if(!disposable.isDisposed){
- disposable.dispose()
- }
- }
- }
上面的代码还可以继续优化:
我们保存了一个Disposable实例,因此,我们才可以在onCleared()
回调中调用dispose()
,但是等等!我们需要为每一个调用都这样做吗?如果有10个回调,那么我们得保存10个实例,在onCleared()
中取消10次订阅?显然不可能,这里有更好的方法,我们应该将它们全部保存在一个存储桶中,并在调用onCleared()
方法时,将它们全部一次处理。我们可以使用CompositeDisposable
。
CompositeDisposable
:可容纳多个Disposable的容器
因此,每次创建一个Disposable,都需要将其添加到CompositeDisposable
中:
- class MainViewModel(application: Application) : AndroidViewModel(application) {
- ...
-
- private val compositeDisposable = CompositeDisposable()
-
- fun loadRepositories() {
- isLoading.set(true)
- compositeDisposable.add(gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {
-
- override fun onError(e: Throwable) {
- //if some error happens in our data layer our app will not crash, we will
- // get error here
- }
-
- override fun onNext(data: ArrayList<Repository>) {
- repositories.value = data
- }
-
- override fun onComplete() {
- isLoading.set(false)
- }
- }))
- }
-
- override fun onCleared() {
- super.onCleared()
- if(!compositeDisposable.isDisposed){
- compositeDisposable.dispose()
- }
- }
- }
感谢Kotlin的扩展函数,我们还可以更进一步:
与C#和Gosu相似,Kotlin提供了使用新功能扩展类的能力,而不必继承该类,也就是扩展函数。
让我们创建一个新的包,叫做extensions
,并且添加一个新的文件RxExtensions.kt
- import io.reactivex.disposables.CompositeDisposable
- import io.reactivex.disposables.Disposable
-
- operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
- add(disposable)
- }
现在我们可以使用+ =
符号将Disposable对象添加到CompositeDisposable实例:
- class MainViewModel(application: Application) : AndroidViewModel(application) {
- ...
-
- private val compositeDisposable = CompositeDisposable()
-
- fun loadRepositories() {
- isLoading.set(true)
- compositeDisposable += gitRepoRepository.getRepositories().subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {
-
- override fun onError(e: Throwable) {
- //if some error happens in our data layer our app will not crash, we will
- // get error here
- }
-
- override fun onNext(data: ArrayList<Repository>) {
- repositories.value = data
- }
-
- override fun onComplete() {
- isLoading.set(false)
- }
- })
- }
-
- override fun onCleared() {
- super.onCleared()
- if (!compositeDisposable.isDisposed) {
- compositeDisposable.dispose()
- }
- }
- }
现在,我们运行程序,当你点击Load Data
按钮,2s之后,程序crash,然后,如果查看日志,您将看到onNext
方法内部发生错误,并且异常的原因是:
java.lang.IllegalStateException: Cannot invoke setValue on a background thread
为何会发生这个异常?
RxJava附带有调度器(Schedulers),使我们可以选择在哪个线程代码上执行。更准确地说,我们可以选择使用subscribeOn()
方在哪个线程执行,observeOn()
方法可以观察哪个线程观察者。通常情况下,我们所有的数据层代码都应该在后台线程执行,例如,如果我们使用Schedulers.newThread()
,每当我们调用它时,调度器都会给我们分配一个新的线程,为了简单起见,Scheduler中还有其他一些方法,我将不在本博文中介绍。
可能您已经知道所有UI代码都是在Android 主线程上完成的。 RxJava是Java库,它不了解Android主线程,这就是我们使用RxAndroid的原因。 RxAndroid使我们可以选择Android Main线程作为执行代码的线程。显然,我们的Observer应该在Android Main线程上运行。
让我们更改一下代码:
- ...
- fun loadRepositories() {
- isLoading.set(true)
- compositeDisposable += gitRepoRepository
- .getRepositories()
- .subscribeOn(Schedulers.newThread())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {
- ...
- })
- }
- ...
然后再运行代码,一切都正常了,nice~
这里还有一些其他的observable 类型
onSuccess()
事件或者异常Observable<T>
一样,不发射数据,或者发射n个数据,或者发射异常,但是Observable不支持背压,而Flowable却支持。为了记住一些概念,我喜欢将它们与现实中的一些例子类比
把它类比成一个通道,如果你向通道中塞入瓶颈能够接受的最多的商品,这将会变得很糟,这里也是同样的,有时,你的观察者无法处理其收到的事件数量,因此需要放慢速度。
你可以看看RxJava 关于背压的文档:https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0)
RxJava中,最牛逼的就是它的操作符了,仅用一行代码即可在RxJava中解决一些通常需要10行或更多行的问题。这些是操作符可以帮我们做的:
我给你举一个例子,让我们将数据保存到GitRepoLocalDataSource中。因为我们正在保存数据,所以我们需要Completable来模拟它。假设我们还想模拟1秒的延迟。天真的方法是:
- fun saveRepositories(arrayList: ArrayList<Repository>): Completable {
- return Completable.complete().delay(1,TimeUnit.SECONDS)
- }
为什么说天真?
Completable.complete()
返回一个Completable实例,该实例在订阅后立即完成。
一旦Completable 完成后,它将终止。因此,之后将不执行任何运算符(延迟是运算符之一)。在这种情况下,我们的Completable不会有任何延迟。让我们找解决方法:
- fun saveRepositories(arrayList: ArrayList<Repository>): Completable {
- return Single.just(1).delay(1,TimeUnit.SECONDS).toCompletable()
- }
为什么是这种方式?
Single.just(1)
创建一个Single实例,并且仅发射一个数字1,因为我们用了delay(1,TimeUnit.SECONDS)
,因此发射操作延迟1s。
toCompletable()
返回一个Completable,它丢弃Single的结果,并在此Single调用onSuccess
时调用onComplete
。
因此,上面的代码将返回Completable,并且1s后调用onComplete()
。
现在,我们应该更改我们的GitRepoRepository。让我们回顾一下逻辑。我们检查互联网连接。如果有互联网连接,我们从远程数据源获取数据,将其保存在本地数据源中并返回数据。否则,我们仅从本地数据源获取数据。看一看:
- fun getRepositories(): Observable<ArrayList<Repository>> {
-
- netManager.isConnectedToInternet?.let {
- if (it) {
- return remoteDataSource.getRepositories().flatMap {
- return@flatMap localDataSource.saveRepositories(it)
- .toSingleDefault(it)
- .toObservable()
- }
- }
- }
-
- return localDataSource.getRepositories()
- }
使用了.flatMap
,一旦remoteDataSource.getRepositories()
发射数据,该项目将被映射到发出相同项目的新Observable。我们从Completable创建的新Observable发射的相同项目保存在本地数据存储中,并且将其转换为发出相同发射项的Single。因为我们需要返回Observable,所以我们必须将Single转换为Observable。
很疯狂,huh? 想象一下RxJava还能为我们做些啥!
RxJava是一个非常有用的工具,去使用它,探索它,我相信你会爱上它的!
以上就是本文得全部内容,下一篇文章将是本系列的最后一篇文章,敬请期待!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。