赞
踩
RxJava是一个响应式实现,将这个概念带到了 Android 平台。Android 应用程序是您开始探索反应式世界的理想场所。使用RxAndroid更容易,这是一个包装异步 UI 事件以更像 RxJava 的库。
在这个 RxAndroid 响应式编程教程中,您将学习如何执行以下操作:
在BaseSearchActivity
并检查以下可供您使用的功能:
showProgress()
: 显示进度条的功能...hideProgress()
: ... 以及隐藏它的功能。showResult(result: List)
:显示数据列表的功能。cheeseSearchEngine
: 一个字段,它是 的一个实例CheeseSearchEngine
。它有一个search
函数,当你想搜索奶酪时调用它。它接受文本搜索查询并返回匹配奶酪的列表。在您的 Android 设备或模拟器上构建并运行该项目。您应该会看到一个空空如也的搜索屏幕:
命令式编程中,一个表达式被计算一次,然后赋值给一个变量:
- var x = 2
- var y = 3
- var z = x * y // z 是 6
-
- x = 10
- // z 仍然是 6
另一方面,响应式编程就是对值变化做出响应。
你可能已经做过一些响应式编程——即使你当时没有意识到这一点。
采用以下实现上述示例的电子表格:
电子表格为单元格 B1 分配值 2,为单元格 B2 分配值 3,并为第三个单元格 B3 分配一个表达式,该表达式将 B1 的值乘以 B2 的值。当表达式中引用的任一组件的值发生变化时,会观察到变化,并在 B3 中自动重新评估表达式:
简而言之,反应式编程的想法是让组件形成更大的画面——可以观察到。并让您的程序监听并在更改发生时使用它们。
您可能知道,由于 Kotlin 与 Java 的语言兼容性,因此可以在 Kotlin 项目中使用 Java 库。如果是这样,那么为什么首先要创建 RxKotlin?RxKotlin 是 RxJava 的 Kotlin 包装器,它还为响应式编程提供了大量有用的扩展功能。
在本文中,我们将专注于使用 RxJava,因为理解这种方法的核心概念至关重要。但是,您将学到的所有内容也适用于 RxKotlin。
注意:请特别查看build.gradle
文件和项目依赖项。除了 UI 库,它包含RxKotlin
和RxAndroid
包。我们不需要在RxJava
这里明确指定,因为RxKotlin
已经包含它。
RxJava 使用了观察者模式。
注意:要刷新您对观察者模式的记忆,您可以访问 Common Design Patterns for Android with Kotlin。
在观察者模式中,您有实现两个关键 RxJava 接口的对象:Observable
和Observer
. 当一个Observable
状态改变时,所有Observer
订阅它的对象都会收到通知。
Observable
接口中的方法之一是subscribe()
,Observer
将调用它来开始订阅。
该Observer
接口具有三个Observable
根据需要调用的方法:
onNext(T value)
为 .提供一个类型为 T 的新项目Observer
。onComplete()
通知Observer
已Observable
完成发送项目。onError(Throwable e)
通知遇到错误Observer
。Observable
通常,行为良好的Observable
项目会发出零个或多个项目,这些项目可能会跟随完成或错误。
圆圈表示已从 observable 发出的项目,黑色块表示完成或错误。举个例子,一个可观察的网络请求。该请求通常会发出单个项目(响应)并立即完成。
可观察到的鼠标移动会发出鼠标坐标,但永远不会完成:
在这里,您可以看到已发出多个项目,但没有显示鼠标已完成或引发错误的块。
在 observable 完成后,不能再发出任何项目。这是一个违反observable行为不 示例:
这是一个非常糟糕的 observable,因为它违反了 Observable 合约,因为它在发出完成信号后发出一个项目。
有许多库可以帮助您从几乎任何类型的事件中创建 observables。但是,有时您只需要自己动手。此外,这是学习Observable 模式和响应式编程的好方法!
您将使用Observable.create()
:
Observable<T> create(ObservableOnSubscribe<T> source)
这很好,很简洁,但这是什么意思?source是什么?要了解该签名,您需要知道 ObservableOnSubscribe
是什么。这是一个接口:
- public interface ObservableOnSubscribe<T> {
- void subscribe(ObservableEmitter<T> e) throws Exception;
- }
您需要创建的source Observable
将需要实现subscribe()
,这就需要一个提供一个emitter参数去实现。那么,什么是emitter?
RxJava 的Emitter
接口类似于Observer
:
- public interface Emitter<T> {
- void onNext(T value);
- void onError(Throwable error);
- void onComplete();
- }
ObservableEmitter
还提供了一种取消订阅的方法。
为想象一个调节水流的水龙头。水管就像一根水管Observable
,如果你有办法接入它,它愿意输送水流。您构建一个可以打开和关闭的水龙头,就像一个ObservableEmitter
,并将它连接到 中的水管Observable.create()
。水龙头是反应式的,因为一旦你关闭它,水流——数据——就不再活跃。
是时候创建你的第一个 observable 了!
在Activity
类中添加以下代码:
// 1 private fun createButtonClickObservable(): Observable<String> { // 2 return Observable.create { emitter -> // 3 searchButton.setOnClickListener { // 4 emitter.onNext(queryEditText.text.toString()) } // 5 emitter.setCancellable { // 6 searchButton.setOnClickListener(null) } } }
输入上述代码后,您的导入应如下所示:
import io.reactivex.Observable
您已经导入了正确的Observable
类,并且您正在使用Kotlin Android 扩展来获取对视图对象的引用。
这是上面代码中发生的事情:
Observable.create()
,并为它提供一个新的ObservableOnSubscribe
.searchButton
设置一个OnClickListener
。onNext
发射器并将当前文本值传递给它queryEditText
。Observable
?正因为如此,ObservableEmitter
有setCancellable()
。Override cancel()
,当 Observable 被释放时,你的实现将被调用,例如当 Observable 完成或所有 Observer 都取消订阅它时。OnClickListener
,删除侦听器的代码是setOnClickListener(null)
。现在你已经定义了你的 Observable,你需要设置它的订阅。在此之前,,Consumer
. 这是一种接受来自发射器的值的简单方法。
- public interface Consumer<T> {
- void accept(T t) throws Exception;
- }
当你想设置一个对 Observable 的简单订阅时,这个接口很方便。
该Observable
接口需要多个版本的subscribe()
,都具有不同的参数。例如,Observer
如果你愿意,你可以传递一个完整的,但是你需要实现所有必要的方法。
如果您的订阅只需要观察者响应发送到的值onNext()
,您可以使用subscribe()
接收单个的版本Consumer
(参数甚至命名为onNext
,以使连接清晰)。
当您订阅活动的onStart()
. 将以下代码添加到Activity.kt:
- override fun onStart() {
- super.onStart()
- // 1
- val searchTextObservable = createButtonClickObservable()
-
- searchTextObservable
- // 2
- .subscribe { query ->
- // 3
- showResult(cheeseSearchEngine.search(query))
- }
- }
以下是每个步骤的说明:
subscribe()
,并提供一个简单的Consumer
.构建并运行应用程序。输入一些字母,然后点击搜索按钮。在模拟延迟之后,您应该会看到与您的请求匹配的数据列表:
听起来很好吃!:]
您已经第一次体验了响应式编程。但是有一个问题:当点击搜索按钮时,UI 会冻结几秒钟。
您可能还会注意到 Android Monitor 中的以下行:
> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames! The application may be doing too much work on its main thread.
发生这种情况是因为search
在主线程上执行。如果search
要执行网络请求,Android 将导致应用程序崩溃并出现NetworkOnMainThreadException
异常。是时候解决这个问题了。
关于 RxJava 的一个流行神话是它默认是多线程的,类似于AsyncTask
. 但是,如果没有另外指定,RxJava 会在调用它的同一线程中完成所有工作。
subscribeOn
您可以使用and运算符更改此行为observeOn
。
subscribeOn
应该在运算符链中只调用一次。如果不是,则第一个调用获胜。subscribeOn
指定订阅(即创建)observable 的线程。如果您使用从 Android 视图发出事件的可观察对象,则需要确保在 Android UI 线程上完成订阅。
另一方面,可以observeOn
在链中调用任意多次。observeOn
指定将执行链中下一个运算符的线程。例如:
- myObservable // observable will be subscribed on i/o thread
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .map { /* this will be called on main thread... */ }
- .doOnNext{ /* ...and everything below until next observeOn */ }
- .observeOn(Schedulers.io())
- .subscribe { /* this will be called on i/o thread */ }
最有用的调度程序是:
Schedulers.io()
: 适用于网络请求或磁盘操作等 I/O 密集型工作。Schedulers.computation()
:最适合计算任务,如事件循环和处理回调。AndroidSchedulers.mainThread()
在 UI 线程上执行下一个运算符。运算符将map
一个函数应用于一个可观察对象发出的每个项目,并返回另一个发出这些函数调用结果的可观察对象。您还需要它来解决线程问题。
如果您有一个可观察的调用numbers
,它发出以下内容:
如果你申请map
如下:
numbers.map { number -> number * number }
结果如下:
这是用很少的代码迭代多个项目的便捷方式。让我们使用它!
onStart()
在类中进行修改,Activity
如下所示:
override fun onStart() { super.onStart() val searchTextObservable = createButtonClickObservable() searchTextObservable // 1 .subscribeOn(AndroidSchedulers.mainThread()) // 2 .observeOn(Schedulers.io()) // 3 .map { searchEngine.search(it) } // 4 .observeOn(AndroidSchedulers.mainThread()) .subscribe { showResult(it) } }
浏览上面的代码:
View
应该在主线程上执行。构建并运行您的项目。现在,即使在搜索过程中,UI 也应该是响应式的。
是时候显示进度条了!
为此,您需要一个doOnNext
操作员。doOnNext
接受 aConsumer
并允许您在每次由 observable 发出项目时执行某些操作。
在同一个Activity
类中修改onStart()
如下:
override fun onStart() { super.onStart() val searchTextObservable = createButtonClickObservable() searchTextObservable // 1 .observeOn(AndroidSchedulers.mainThread()) // 2 .doOnNext { showProgress() } .observeOn(Schedulers.io()) .map { searchEngine.search(it) } .observeOn(AndroidSchedulers.mainThread()) .subscribe { // 3 hideProgress() showResult(it) } }
依次获取每个编号的评论:
doOnNext
运算符,以便showProgress()
每次发出新项目时都会调用它。hideProgress()
当您即将显示结果时,不要忘记调用。构建并运行您的项目。当您开始搜索时,您应该会看到进度条:
如果你想在用户输入一些文本时自动执行搜索,就像谷歌一样?
首先,您需要订阅TextView
文本更改。将以下函数添加到Activity
类中:
// 1 private fun createTextChangeObservable(): Observable<String> { // 2 val textChangeObservable = Observable.create<String> { emitter -> // 3 val textWatcher = object : TextWatcher { override fun afterTextChanged(s: Editable?) = Unit override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit // 4 override fun onTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) { s?.toString()?.let { emitter.onNext(it) } } } // 5 queryEditText.addTextChangedListener(textWatcher) // 6 emitter.setCancellable { queryEditText.removeTextChangedListener(textWatcher) } } // 7 return textChangeObservable }
以下是上述每个步骤的逐个播放:
textChangeObservable
with create()
,它需要一个ObservableOnSubscribe
.TextWatcher
.beforeTextChanged()
您对和不感兴趣afterTextChanged()
。当用户键入并onTextChanged()
触发时,您将新的文本值传递给观察者。TextView
通过调用将观察者添加到您的addTextChangedListener()
。emitter.setCancellable()
并覆盖cancel()
调用removeTextChangedListener()
要查看这个 observable 的实际效果,请将searchTextObservable
in onStart()
of的声明替换Activity
如下:
val searchTextObservable = createTextChangeObservable()
构建并运行您的应用程序。当您开始在以下位置输入文本时,您应该会看到搜索开始TextView
:
搜索像单个字母一样短的查询是没有意义的。为了解决这个问题,让我们介绍一下强大的filter
运算符。
filter
仅通过那些满足特定条件的项目。filter
接受 a Predicate
,这是一个接口,它定义了给定类型的输入需要通过的测试,并带有boolean
结果。在这种情况下,Predicate 接受 aString
并true
在字符串的长度是两个或多个字符时返回。
替换return textChangeObservable
为createTextChangeObservable()
以下代码:
return textChangeObservable.filter { it.length >= 2 }
一切都将完全一样,除了length
小于的文本查询2
不会被发送到链中。
运行应用程序;只有当您输入第二个字符时,您才会看到搜索开始:
您不希望每次查询更改一个符号时都向服务器发送新请求。
debounce
是显示反应范式真正威力的运算符之一。很像filter
运算符 ,debounce
过滤可观察对象发出的项目。但是是否应该过滤掉该项目的决定不是基于该项目是什么,而是基于该项目的发出时间。
debounce
在另一个项目的每个项目发射后等待指定的时间量。如果在此等待期间碰巧没有发射任何项目,则最终发射最后一个项目:
在中,在 的下方createTextChangeObservable()
添加运算符,使语句类似于以下代码:debounce
filter
return
- return textChangeObservable
- .filter { it.length >= 2 }
- .debounce(1000, TimeUnit.MILLISECONDS) // add this line
运行应用程序。您会注意到只有在您停止快速更改时才开始搜索:
debounce
在发出最新的查询文本之前等待 1000 毫秒。
您首先创建了一个响应按钮点击的可观察对象,然后实现了一个响应文本字段更改的可观察对象。但你对两者有何反应?
有很多运算符可以组合 observables。最简单和有用的一种是merge
.
merge
从两个或多个可观察对象中获取项目并将它们放入单个可观察对象中:
将开头更改为onStart()
以下内容:
- val buttonClickStream = createButtonClickObservable()
- val textChangeStream = createTextChangeObservable()
-
- val searchTextObservable = Observable.merge<String>(buttonClickStream, textChangeStream)
运行您的应用程序。使用文本字段和搜索按钮;当您完成输入两个或更多符号或只需按下“搜索”按钮时,搜索将开始。
随着 RxJava2 的发布,该框架已经从头开始重新设计,以解决原始库中未解决的一些问题。
Flowable是一个概念,即 observable 发出项目的速度比消费者处理它们的速度要快。如果您使用 observables,它会缓冲项目直到没有更多可用内存,您的应用程序将崩溃并且使用它们将无法使用 firehose API。Flowables 考虑到了这一点,并让您指定 BackPressureStrategy
来告诉 flowable 您希望消费者如何处理发出的比消费更快的项目。
背压策略:
是时候使用这种策略的新知识将上面的 observables 变成 flowables 了。首先考虑您添加到应用程序中的 observables。您有一个 observable 在单击按钮时发出项目,另一个来自键盘输入。考虑到这两个,您可以想象在第一种情况下您可以使用 LATEST 策略,而在第二种情况下您可以使用 BUFFER。
打开Activity.kt并将您的 observables 修改为以下内容:
- val buttonClickStream = createButtonClickObservable()
- .toFlowable(BackpressureStrategy.LATEST) // 1
-
- val textChangeStream = createTextChangeObservable()
- .toFlowable(BackpressureStrategy.BUFFER) // 2
最后,将合并运算符更改为也使用Flowable:
val searchTextFlowable = Flowable.merge<String>(buttonClickStream, textChangeStream)
现在,更改调用以使用新searchTextFlowable
值,而不是之前的Observable
:
- searchTextFlowable
- // 1
- .observeOn(AndroidSchedulers.mainThread())
- // 2
- .doOnNext { showProgress() }
- .observeOn(Schedulers.io())
- .map { cheeseSearchEngine.search(it) }
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe {
- // 3
- hideProgress()
- showResult(it)
- }
重新运行应用程序,您应该会看到一个没有任何可观察对象缺陷的工作应用程序。
Maybe是一种发出单个值、无值或错误的计算。它们适用于诸如数据库更新和删除之类的事情。在这里,您将添加一个新功能,使用 Maybe 来收藏应用程序中的一种奶酪,并使用 Maybe 不发出任何值。
打开Adapter类,在 onBindView 中添加如下代码:
// 1 Maybe.create<Boolean> { emitter -> emitter.setCancellable { holder.itemView.imageFavorite.setOnClickListener(null) } holder.itemView.imageFavorite.setOnClickListener { emitter.onSuccess((it as CheckableImageView).isChecked) // 2 } }.toFlowable().onBackpressureLatest() // 3 .observeOn(Schedulers.io()) .map { isChecked -> cheese.favorite = if (!isChecked) 1 else 0 val database = CheeseDatabase.getInstance(holder.itemView.context).cheeseDao() database.favoriteCheese(cheese) // 4 cheese.favorite // 5 } .subscribeOn(AndroidSchedulers.mainThread()) .subscribe { holder.itemView.imageFavorite.isChecked = it == 1 // 6 }
注意:在删除操作的上下文中使用 Maybe 可能会更好,但例如在这里你可以收藏奶酪。
RxJava2 不再支持 Null。提供 null 将立即导致 NullPointerException 或下游信号。您可以在此处阅读有关此更改的所有信息。
setCancellable
还记得你设置的那些方法吗?在 observable 取消订阅之前,它们不会触发。
该Observable.subscribe()
调用返回一个Disposable
. Disposable
是一个有两种方法的接口:
- public interface Disposable {
- void dispose(); // ends a subscription
- boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
- }
将以下属性添加到Activity
:
private lateinit var disposable: Disposable
使用以下代码设置toonStart()
的返回值(仅第一行更改):subscribe()
disposable
- disposable = searchTextObservable // change this line
- .observeOn(AndroidSchedulers.mainThread())
- .doOnNext { showProgress() }
- .observeOn(Schedulers.io())
- .map { cheeseSearchEngine.search(it) }
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe {
- hideProgress()
- showResult(it)
- }
由于您订阅了 observable onStart()
,onStop()
因此将是取消订阅。
将以下代码添加到Activity.kt:
- @Override
- override fun onStop() {
- super.onStop()
- if (!disposable.isDisposed) {
- disposable.dispose()
- }
- }
就是这样!构建并运行应用程序。您自己不会“观察”任何更改,但现在该应用程序成功地避免了 RxJava 内存泄漏。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。