当前位置:   article > 正文

在 Kotlin 中使用 RxAndroid 进行反应式编程_kotlin rxandroid

kotlin rxandroid

响应式编程

RxJava是一个响应式实现,将这个概念带到了 Android 平台。Android 应用程序是您开始探索反应式世界的理想场所。使用RxAndroid更容易,这是一个包装异步 UI 事件以更像 RxJava 的库。

在这个 RxAndroid 响应式编程教程中,您将学习如何执行以下操作:

  • 掌握反应式编程的概念。
  • 定义一个Observable
  • 将按钮点击和文本字段上下文更改等异步事件转换为可观察的构造。
  • 转换和过滤可观察项目。
  • 在代码执行中利用 Rx 线程。
  • 将几个 observables 组合成一个流。
  • 把你所有的 observables 变成Flowable结构。
  • 使用 RxJava 的Maybe为应用程序添加最喜欢的功能。

入门

BaseSearchActivity并检查以下可供您使用的功能:

  • showProgress(): 显示进度条的功能...
  • hideProgress(): ... 以及隐藏它的功能。
  • showResult(result: List):显示数据列表的功能。
  • cheeseSearchEngine: 一个字段,它是 的一个实例CheeseSearchEngine。它有一个search函数,当你想搜索奶酪时调用它。它接受文本搜索查询并返回匹配奶酪的列表。

在您的 Android 设备或模拟器上构建并运行该项目。您应该会看到一个空空如也的搜索屏幕:

3a9cc14da5e83ceb8f72ab4dc33a961b.png

 

什么是响应式编程?

命令式编程中,一个表达式被计算一次,然后赋值给一个变量:

  1. var x = 2
  2. var y = 3
  3. var z = x * y // z 是 6
  4. x = 10
  5. // z 仍然是 6

另一方面,响应式编程就是对值变化做出响应

你可能已经做过一些响应式编程——即使你当时没有意识到这一点。

  • 在电子表格中定义单元格值类似于在命令式编程中定义变量。
  • 在电子表格中定义单元表达式类似于在反应式编程中定义和操作可观察对象。

采用以下实现上述示例的电子表格:

049a25f8d5db5cc86cbd3e8f89ccc57f.png

电子表格为单元格 B1 分配值 2,为单元格 B2 分配值 3,并为第三个单元格 B3 分配一个表达式,该表达式将 B1 的值乘以 B2 的值。当表达式中引用的任一组件的值发生变化时,会观察到变化,并在 B3 中自动重新评估表达式:

628cab7ede44e9ce7614b4179bb0f364.png

简而言之,反应式编程的想法是让组件形成更大的画面——可以观察到。并让您的程序监听并在更改发生时使用它们。

RxJava 和 RxKotlin 的区别

您可能知道,由于 Kotlin 与 Java 的语言兼容性,因此可以在 Kotlin 项目中使用 Java 库。如果是这样,那么为什么首先要创建 RxKotlin?RxKotlin 是 RxJava 的 Kotlin 包装器,它还为响应式编程提供了大量有用的扩展功能。

在本文中,我们将专注于使用 RxJava,因为理解这种方法的核心概念至关重要。但是,您将学到的所有内容也适用于 RxKotlin。

注意:请特别查看 build.gradle文件和项目依赖项。除了 UI 库,它包含 RxKotlinRxAndroid包。我们不需要在 RxJava这里明确指定,因为 RxKotlin已经包含它。

RxJava 可观察合约

RxJava 使用了观察者模式。

注意:要刷新您对观察者模式的记忆,您可以访问 Common Design Patterns for Android with Kotlin

在观察者模式中,您有实现两个关键 RxJava 接口的对象:ObservableObserver. 当一个Observable状态改变时,所有Observer订阅它的对象都会收到通知。

Observable接口中的方法之一是subscribe()Observer将调用它来开始订阅。

Observer接口具有三个Observable根据需要调用的方法:

  • onNext(T value)为 .提供一个类型为 T 的新项目Observer
  • onComplete()通知ObserverObservable完成发送项目。
  • onError(Throwable e)通知遇到错误ObserverObservable

通常,行为良好的Observable项目会发出零个或多个项目,这些项目可能会跟随完成或错误。

71d7d6441093338a2012f78801298573.png

圆圈表示已从 observable 发出的项目,黑色块表示完成或错误。举个例子,一个可观察的网络请求。该请求通常会发出单个项目(响应)并立即完成。

可观察到的鼠标移动会发出鼠标坐标,但永远不会完成:

da7a6539b2822ea6e8965dfa2a0d75b7.png

在这里,您可以看到已发出多个项目,但没有显示鼠标已完成或引发错误的块。

在 observable 完成后,不能再发出任何项目。这是一个违反observable行为不 示例:

e5bee3d6767005558f975cffbf1ee451.png

这是一个非常糟糕的 observable,因为它违反了 Observable 合约,因为它在发出完成信号后发出一个项目。

如何创建一个 Observable

有许多库可以帮助您从几乎任何类型的事件中创建 observables。但是,有时您只需要自己动手。此外,这是学习Observable 模式和响应式编程的好方法!

您将使用Observable.create()

Observable<T> create(ObservableOnSubscribe<T> source)

这很好,很简洁,但这是什么意思?source是什么?要了解该签名,您需要知道 ObservableOnSubscribe是什么。这是一个接口:

  1. public interface ObservableOnSubscribe<T> {
  2. void subscribe(ObservableEmitter<T> e) throws Exception;
  3. }

您需要创建的source Observable将需要实现subscribe(),这就需要一个提供一个emitter参数去实现。那么,什么是emitter?

RxJava 的Emitter接口类似于Observer

  1. public interface Emitter<T> {
  2. void onNext(T value);
  3. void onError(Throwable error);
  4. void onComplete();
  5. }

ObservableEmitter还提供了一种取消订阅的方法。

为想象一个调节水流的水龙头。水管就像一根水管Observable,如果你有办法接入它,它愿意输送水流。您构建一个可以打开和关闭的水龙头,就像一个ObservableEmitter,并将它连接到 中的水管Observable.create()。水龙头是反应式的,因为一旦你关闭它,水流——数据——就不再活跃。

是时候创建你的第一个 observable 了!

观察按钮点击

Activity类中添加以下代码:

  1. // 1
  2. private fun createButtonClickObservable(): Observable<String> {
  3. // 2
  4. return Observable.create { emitter ->
  5. // 3
  6. searchButton.setOnClickListener {
  7. // 4
  8. emitter.onNext(queryEditText.text.toString())
  9. }
  10. // 5
  11. emitter.setCancellable {
  12. // 6
  13. searchButton.setOnClickListener(null)
  14. }
  15. }
  16. }

输入上述代码后,您的导入应如下所示:

import io.reactivex.Observable

您已经导入了正确的Observable类,并且您正在使用Kotlin Android 扩展来获取对视图对象的引用。

这是上面代码中发生的事情:

  1. 您声明一个函数,该函数返回一个将发出字符串的可观察对象。
  2. 你用 来创建一个 observable Observable.create(),并为它提供一个新的ObservableOnSubscribe.
  3. searchButton设置一个OnClickListener
  4. 当点击事件发生时,调用onNext发射器并将当前文本值传递给它queryEditText
  5. 保留引用可能会导致 Java 或 Kotlin 中的内存泄漏。一旦不再需要侦听器,就将其删除是一个有用的习惯。但是,当您创建自己的 时,您会怎么称呼Observable?正因为如此,ObservableEmittersetCancellable()。Override cancel(),当 Observable 被释放时,你的实现将被调用,例如当 Observable 完成或所有 Observer 都取消订阅它时。
  6. 对于OnClickListener,删除侦听器的代码是setOnClickListener(null)

现在你已经定义了你的 Observable,你需要设置它的订阅。在此之前,,Consumer. 这是一种接受来自发射器的值的简单方法。

  1. public interface Consumer<T> {
  2. void accept(T t) throws Exception;
  3. }

当你想设置一个对 Observable 的简单订阅时,这个接口很方便。

Observable接口需要多个版本的subscribe(),都具有不同的参数。例如,Observer如果你愿意,你可以传递一个完整的,但是你需要实现所有必要的方法。

如果您的订阅只需要观察者响应发送到的值onNext(),您可以使用subscribe()接收单个的版本Consumer(参数甚至命名为onNext,以使连接清晰)。

当您订阅活动的onStart(). 将以下代码添加到Activity.kt

  1. override fun onStart() {
  2. super.onStart()
  3. // 1
  4. val searchTextObservable = createButtonClickObservable()
  5. searchTextObservable
  6. // 2
  7. .subscribe { query ->
  8. // 3
  9. showResult(cheeseSearchEngine.search(query))
  10. }
  11. }

以下是每个步骤的说明:

  1. 首先,通过调用刚刚编写的方法创建一个 observable。
  2. 订阅 observable subscribe(),并提供一个简单的Consumer.
  3. 最后,执行搜索并显示结果。

构建并运行应用程序。输入一些字母,然后点击搜索按钮。在模拟延迟之后,您应该会看到与您的请求匹配的数据列表:

cf97d8ba0fdaf08e65d0bb917dec6741.png

听起来很好吃!:]

RxJava 线程模型

您已经第一次体验了响应式编程。但是有一个问题:当点击搜索按钮时,UI 会冻结几秒钟。

您可能还会注意到 Android Monitor 中的以下行:

> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames!  The application may be doing too much work on its main thread.

发生这种情况是因为search在主线程上执行。如果search要执行网络请求,Android 将导致应用程序崩溃并出现NetworkOnMainThreadException异常。是时候解决这个问题了。

关于 RxJava 的一个流行神话是它默认是多线程的,类似于AsyncTask. 但是,如果没有另外指定,RxJava 会在调用它的同一线程中完成所有工作。

subscribeOn您可以使用and运算符更改此行为observeOn

subscribeOn应该在运算符链中只调用一次。如果不是,则第一个调用获胜。subscribeOn指定订阅(即创建)observable 的线程。如果您使用从 Android 视图发出事件的可观察对象,则需要确保在 Android UI 线程上完成订阅。

另一方面,可以observeOn在链中调用任意多次。observeOn指定将执行链中下一个运算符的线程。例如:

  1. myObservable // observable will be subscribed on i/o thread
  2. .subscribeOn(Schedulers.io())
  3. .observeOn(AndroidSchedulers.mainThread())
  4. .map { /* this will be called on main thread... */ }
  5. .doOnNext{ /* ...and everything below until next observeOn */ }
  6. .observeOn(Schedulers.io())
  7. .subscribe { /* this will be called on i/o thread */ }

最有用的调度程序是:

  • Schedulers.io(): 适用于网络请求或磁盘操作等 I/O 密集型工作。
  • Schedulers.computation():最适合计算任务,如事件循环和处理回调。
  • AndroidSchedulers.mainThread()在 UI 线程上执行下一个运算符。

Map操作符

运算符将map一个函数应用于一个可观察对象发出的每个项目,并返回另一个发出这些函数调用结果的可观察对象。您还需要它来解决线程问题。

如果您有一个可观察的调用numbers,它发出以下内容:

480c7ef4464ab3e58d903bdae539a884.png

如果你申请map如下:

numbers.map { number -> number * number }

结果如下:

ff08271bc05e198bd99a1c2a5046d349.png

这是用很少的代码迭代多个项目的便捷方式。让我们使用它!

onStart()在类中进行修改,Activity如下所示:

  1. override fun onStart() {
  2. super.onStart()
  3. val searchTextObservable = createButtonClickObservable()
  4. searchTextObservable
  5. // 1
  6. .subscribeOn(AndroidSchedulers.mainThread())
  7. // 2
  8. .observeOn(Schedulers.io())
  9. // 3
  10. .map { searchEngine.search(it) }
  11. // 4
  12. .observeOn(AndroidSchedulers.mainThread())
  13. .subscribe {
  14. showResult(it)
  15. }
  16. }

浏览上面的代码:

  1. 首先,指定链下的代码应该从主线程而不是 I/O 线程开始。在 Android 中,所有适用的代码都View应该在主线程上执行。
  2. 指定应在 I/O 线程上调用 next 运算符。
  3. 对于每个搜索查询,您会返回一个结果列表。
  4. 最后,确保将结果传递到主线程上的列表。

构建并运行您的项目。现在,即使在搜索过程中,UI 也应该是响应式的。

使用 doOnNext 显示进度条

是时候显示进度条了!

为此,您需要一个doOnNext操作员。doOnNext接受 aConsumer并允许您在每次由 observable 发出项目时执行某些操作。

在同一个Activity类中修改onStart()如下:

  1. override fun onStart() {
  2. super.onStart()
  3. val searchTextObservable = createButtonClickObservable()
  4. searchTextObservable
  5. // 1
  6. .observeOn(AndroidSchedulers.mainThread())
  7. // 2
  8. .doOnNext { showProgress() }
  9. .observeOn(Schedulers.io())
  10. .map { searchEngine.search(it) }
  11. .observeOn(AndroidSchedulers.mainThread())
  12. .subscribe {
  13. // 3
  14. hideProgress()
  15. showResult(it)
  16. }
  17. }

依次获取每个编号的评论:

  1. 确保链中的下一个运算符将在主线程上运行。
  2. 添加doOnNext运算符,以便showProgress()每次发出新项目时都会调用它。
  3. hideProgress()当您即将显示结果时,不要忘记调用。

构建并运行您的项目。当您开始搜索时,您应该会看到进度条:

a911823e027d390b69044c20ee77d3e3.png

观察文本变化

如果你想在用户输入一些文本时自动执行搜索,就像谷歌一样?

首先,您需要订阅TextView文本更改。将以下函数添加到Activity类中:

  1. // 1
  2. private fun createTextChangeObservable(): Observable<String> {
  3. // 2
  4. val textChangeObservable = Observable.create<String> { emitter ->
  5. // 3
  6. val textWatcher = object : TextWatcher {
  7. override fun afterTextChanged(s: Editable?) = Unit
  8. override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit
  9. // 4
  10. override fun onTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
  11. s?.toString()?.let { emitter.onNext(it) }
  12. }
  13. }
  14. // 5
  15. queryEditText.addTextChangedListener(textWatcher)
  16. // 6
  17. emitter.setCancellable {
  18. queryEditText.removeTextChangedListener(textWatcher)
  19. }
  20. }
  21. // 7
  22. return textChangeObservable
  23. }

以下是上述每个步骤的逐个播放:

  1. 声明一个函数,该函数将返回一个可观察到的文本更改。
  2. Create textChangeObservablewith create(),它需要一个ObservableOnSubscribe.
  3. 当观察者进行订阅时,首先要做的是创建一个TextWatcher.
  4. beforeTextChanged()您对和不感兴趣afterTextChanged()。当用户键入并onTextChanged()触发时,您将新的文本值传递给观察者。
  5. TextView通过调用将观察者添加到您的addTextChangedListener()
  6. 不要忘记删除你的观察者。为此,请调用emitter.setCancellable()并覆盖cancel()调用removeTextChangedListener()
  7. 最后,返回创建的 observable。

要查看这个 observable 的实际效果,请将searchTextObservablein onStart()of的声明替换Activity如下:

val searchTextObservable = createTextChangeObservable()

构建并运行您的应用程序。当您开始在以下位置输入文本时,您应该会看到搜索开始TextView

ccd810eed30de59719ba88aeaf9742a1.png

按长度过滤查询

搜索像单个字母一样短的查询是没有意义的。为了解决这个问题,让我们介绍一下强大的filter运算符。

filter仅通过那些满足特定条件的项目。filter接受 a Predicate,这是一个接口,它定义了给定类型的输入需要通过的测试,并带有boolean结果。在这种情况下,Predicate 接受 aStringtrue在字符串的长度是两个或多个字符时返回。

替换return textChangeObservablecreateTextChangeObservable()以下代码:

return textChangeObservable.filter { it.length >= 2 }

一切都将完全一样,除了length小于的文本查询2不会被发送到链中。

运行应用程序;只有当您输入第二个字符时,您才会看到搜索开始:

6c501af04642a7e026d38209b5985c57.png

 

Debounce

您不希望每次查询更改一个符号时都向服务器发送新请求。

debounce是显示反应范式真正威力的运算符之一。很像filter运算符 ,debounce过滤可观察对象发出的项目。但是是否应该过滤掉该项目的决定不是基于该项目是什么,而是基于该项目的发出时间。

debounce在另一个项目的每个项目发射后等待指定的时间量。如果在此等待期间碰巧没有发射任何项目,则最终发射最后一个项目:

50a35ce271d13d904e1ea5862cef5652.png

在中,在 的下方createTextChangeObservable()添加运算符,使语句类似于以下代码:debouncefilterreturn

  1. return textChangeObservable
  2. .filter { it.length >= 2 }
  3. .debounce(1000, TimeUnit.MILLISECONDS) // add this line

运行应用程序。您会注意到只有在您停止快速更改时才开始搜索:

d88a055c1ae5f1f3574589d80924ac9f.gif

debounce在发出最新的查询文本之前等待 1000 毫秒。

Merge运算符

您首先创建了一个响应按钮点击的可观察对象,然后实现了一个响应文本字段更改的可观察对象。但你对两者有何反应?

有很多运算符可以组合 observables。最简单和有用的一种是merge.

merge从两个或多个可观察对象中获取项目并将它们放入单个可观察对象中:

63451b94df17b853fda6ef3516558190.png

将开头更改为onStart()以下内容:

  1. val buttonClickStream = createButtonClickObservable()
  2. val textChangeStream = createTextChangeObservable()
  3. val searchTextObservable = Observable.merge<String>(buttonClickStream, textChangeStream)

运行您的应用程序。使用文本字段和搜索按钮;当您完成输入两个或更多符号或只需按下“搜索”按钮时,搜索将开始。

Flowable

随着 RxJava2 的发布,该框架已经从头开始重新设计,以解决原始库中未解决的一些问题。

Flowable是一个概念,即 observable 发出项目的速度比消费者处理它们的速度要快。如果您使用 observables,它会缓冲项目直到没有更多可用内存,您的应用程序将崩溃并且使用它们将无法使用 firehose API。Flowables 考虑到了这一点,并让您指定 BackPressureStrategy来告诉 flowable 您希望消费者如何处理发出的比消费更快的项目。

背压策略:

  • BUFFER – 处理项目的方式与 RxJava 1 相同,但您也可以添加缓冲区大小。
  • DROP – 丢弃消费者无法处理的任何物品。
  • ERROR – 当下游无法跟上时抛出错误。
  • LATEST – 仅保留 onNext 发出的最新项目,覆盖先前的值。
  • MISSING – 在 onNext 事件期间没有缓冲或丢弃。

将 Observables 变成 Flowables

是时候使用这种策略的新知识将上面的 observables 变成 flowables 了。首先考虑您添加到应用程序中的 observables。您有一个 observable 在单击按钮时发出项目,另一个来自键盘输入。考虑到这两个,您可以想象在第一种情况下您可以使用 LATEST 策略,而在第二种情况下您可以使用 BUFFER。

打开Activity.kt并将您的 observables 修改为以下内容:

  1. val buttonClickStream = createButtonClickObservable()
  2. .toFlowable(BackpressureStrategy.LATEST) // 1
  3. val textChangeStream = createTextChangeObservable()
  4. .toFlowable(BackpressureStrategy.BUFFER) // 2
  1. 使用 LATEST BackpressureStrategy 将按钮点击流转换为可流动的。
  2. 使用 BUFFER BackpressureStrategy 将文本输入更改流转换为可流动的。

最后,将合并运算符更改为也使用Flowable

val searchTextFlowable = Flowable.merge<String>(buttonClickStream, textChangeStream)

现在,更改调用以使用新searchTextFlowable值,而不是之前的Observable

  1. searchTextFlowable
  2. // 1
  3. .observeOn(AndroidSchedulers.mainThread())
  4. // 2
  5. .doOnNext { showProgress() }
  6. .observeOn(Schedulers.io())
  7. .map { cheeseSearchEngine.search(it) }
  8. .observeOn(AndroidSchedulers.mainThread())
  9. .subscribe {
  10. // 3
  11. hideProgress()
  12. showResult(it)
  13. }

重新运行应用程序,您应该会看到一个没有任何可观察对象缺陷的工作应用程序。

Maybe

Maybe是一种发出单个值、无值或错误的计算。它们适用于诸如数据库更新和删除之类的事情。在这里,您将添加一个新功能,使用 Maybe 来收藏应用程序中的一种奶酪,并使用 Maybe 不发出任何值。

打开Adapter类,在 onBindView 中添加如下代码:

  1. // 1
  2. Maybe.create<Boolean> { emitter ->
  3. emitter.setCancellable {
  4. holder.itemView.imageFavorite.setOnClickListener(null)
  5. }
  6. holder.itemView.imageFavorite.setOnClickListener {
  7. emitter.onSuccess((it as CheckableImageView).isChecked) // 2
  8. }
  9. }.toFlowable().onBackpressureLatest() // 3
  10. .observeOn(Schedulers.io())
  11. .map { isChecked ->
  12. cheese.favorite = if (!isChecked) 1 else 0
  13. val database = CheeseDatabase.getInstance(holder.itemView.context).cheeseDao()
  14. database.favoriteCheese(cheese) // 4
  15. cheese.favorite // 5
  16. }
  17. .subscribeOn(AndroidSchedulers.mainThread())
  18. .subscribe {
  19. holder.itemView.imageFavorite.isChecked = it == 1 // 6
  20. }
  1. 从动作创建可能。
  2. 成功时发出检查状态。
  3. 将 Maybe 变成可流动的。
  4. 对 Cheeses 表执行更新。
  5. 返回操作的结果。
  6. 使用发射的结果将轮廓更改为填充的心。

09a757c21e4dc2021b96ceb1a3fc297f.png

注意:在删除操作的上下文中使用 Maybe 可能会更好,但例如在这里你可以收藏奶酪。

RxJava2 & Null

RxJava2 不再支持 Null。提供 null 将立即导致 NullPointerException 或下游信号。您可以在此处阅读有关此更改的所有信息。

RxJava 和 Activity/Fragment 生命周期

setCancellable还记得你设置的那些方法吗?在 observable 取消订阅之前,它们不会触发。

Observable.subscribe()调用返回一个DisposableDisposable是一个有两种方法的接口:

  1. public interface Disposable {
  2. void dispose(); // ends a subscription
  3. boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
  4. }

将以下属性添加到Activity

private lateinit var disposable: Disposable

使用以下代码设置toonStart()的返回值(仅第一行更改):subscribe()disposable

  1. disposable = searchTextObservable // change this line
  2. .observeOn(AndroidSchedulers.mainThread())
  3. .doOnNext { showProgress() }
  4. .observeOn(Schedulers.io())
  5. .map { cheeseSearchEngine.search(it) }
  6. .observeOn(AndroidSchedulers.mainThread())
  7. .subscribe {
  8. hideProgress()
  9. showResult(it)
  10. }

由于您订阅了 observable onStart()onStop()因此将是取消订阅。

将以下代码添加到Activity.kt

  1. @Override
  2. override fun onStop() {
  3. super.onStop()
  4. if (!disposable.isDisposed) {
  5. disposable.dispose()
  6. }
  7. }

就是这样!构建并运行应用程序。您自己不会“观察”任何更改,但现在该应用程序成功地避免了 RxJava 内存泄漏。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/251091
推荐阅读
相关标签
  

闽ICP备14008679号