赞
踩
上一篇研究了 Kotlin 协程。上一篇文章重点介绍了协程的一些基础知识,如协程上下文(CoroutineContext)、协程作用域(CoroutineScope)、协程构建器等。如承诺的那样,这是关于流(Flows)的后续文章。
可以异步计算的数据流被称为流(Flow)。Flow,像 LiveData 和 RxJava 流一样,允许你实现观察者模式:由一个对象(源)维护其依赖项列表,称为观察者(收集器),并在任何状态变化时自动通知它们。流使用挂起函数异步产生和消费值。
要创建流,首先你需要创建一个生产者:
- val randomFlow: Flow<Int> = flow {
- repeat(10) { it ->
- emit(it+1) // 向流中发射请求的结果
- delay(1000) // 挂起协程1秒
- }
- }
要收集流,首先你需要启动一个协程,因为流在底层是操作协程的。使用 collect
操作符来收集它发射的值。
- lifecycleScope.launch {
- viewModel.uiStateFlow.collect { it ->
- binding.uiText.text = it.toString()
- }
- }
有两种不同类型的流:
它不会开始产生值,直到有人开始收集它们。它只能有一个订阅者,并且不存储数据。 // 常规流示例
- val coldFlow = flow {
- emit(0)
- emit(1)
- emit(2)
- }
-
- launch { // 第一次调用 collect
- coldFlow.collect { value ->
- println("cold flow collector 1 received: $value")
- }
-
- delay(2500)
-
- // 第二次调用 collect
- coldFlow.collect { value ->
- println("cold flow collector 2 received: $value")
- }
- }
// 结果 // 两个收集器都会从开始获取所有值。 // 对于两个收集器,相应的流都从开始启动。
- flow collector 1 received: [0, 1, 2]
- flow collector 1 received: [0, 1, 2]
即使没有人收集它们,它也会产生值。它可以有多个订阅者,并且可以存储数据。 // SharedFlow 示例
- val sharedFlow = MutableSharedFlow<Int>()
-
- sharedFlow.emit(0)
- sharedFlow.emit(1)
- sharedFlow.emit(2)
- sharedFlow.emit(3)
- sharedFlow.emit(4)
-
- launch {
- sharedFlow.collect { value ->
- println("SharedFlow collector 1 received: $value")
- }
-
- delay(2500)
-
- // 第二次调用 collect
- sharedFlow.collect { value ->
- println("SharedFlow collector 2 received: $value")
- }
- }
// 结果 // 收集器将从它们开始收集的地方获取值。 // 这里,第1个收集器获取了所有值。但第2个收集器只获取了在2500毫秒后发射的值,因为它在2500毫秒后开始收集。
- SharedFlow collector 1 received: [0,1,2,3,4]
- SharedFlow collector 2 received: [2,3,4]
我们可以使用 stateIn()
和 shareIn()
操作符分别将任何冷流转换为热流。
StateFlow — StateFlow 是一个热流,代表一次只持有一个值的状态。它也是一个合流,意味着当新值被发射时,最近值被保留并立即发射给新的收集器。当你需要为状态维护一个单一的真实来源并自动用最新状态更新所有收集器时,它很有用。它始终有一个初始值,并且只存储最近发射的值。
- class HomeViewModel : ViewModel() {
-
- private val _textStateFlow = MutableStateFlow("Hello World")
- val stateFlow =_textStateFlow.asStateFlow()
-
- fun triggerStateFlow(){
- _textStateFlow.value="State flow"
- }
- }
-
- // 在 Activity/Fragment 中收集 StateFlow
- class HomeFragment : Fragment() {
- private val viewModel: HomeViewModel by viewModels()
-
- override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
- super.onViewCreated(view, savedInstanceState)
-
- lifecycleScope.launchWhenStarted {
-
- // 触发流并开始监听值
-
- // collectLatest() 是 Kotlin 的 Flow API 中的一个高阶函数
- // 它允许你从流中收集发射的值,并仅对最近值执行转换。它类似于
- // collect(), 后者用于收集所有发射的值,但 collectLatest 只处理最新发射的值并
- // 忽略尚未处理的任何先前值。
- viewModel.stateFlow.collectLatest {
- binding.stateFlowButton.text = it
- }
- }
- }
-
- // 在 Compose 中收集 StateFlow
- @Compose
- fun HomeScreen() {
- // Compose 提供了 collectAsStateWithLifecycle 函数,它
- // 从流中收集值,并提供最新值以供使用
- // 需要的地方。当新的流值被发射时,我们得到更新的
- // 值,并且重新组合以更新值的状态。
- // 它默认使用 LifeCycle.State.Started 在生命周期处于指定状态时开始收集值,并在其下降时停止。
- val someFlow by viewModel.flow.collectAsStateWithLifecycle()
-
- }
SharedFlow — SharedFlow 是一个热流,可以有多个收集器。它可以独立于收集器发射值,多个收集器可以从流中收集相同的值。当你需要向多个收集器广播值或当你想要有多个订阅者订阅相同的数据流时,它很有用。它没有初始值,你可以配置其重放缓存来为新收集器存储一定数量的先前发射的值。
- class HomeViewModel : ViewModel() {
- private val _events = MutableSharedFlow<Event>() // 私有可变共享流
- val events = _events.asSharedFlow() // 公开暴露为只读共享流
-
- suspend fun produceEvent(event: Event) {
- _events.emit(event) // 直到所有订阅者接收到它才挂起
- }
- }
-
- // 在 Activity/Fragment 中收集 StateFlow
- class HomeFragment : Fragment() {
- private val viewModel: HomeViewModel by viewModels()
-
- override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
- super.onViewCreated(view, savedInstanceState)
-
- lifecycleScope.launchWhenStarted {
-
- // 触发流并开始监听值
-
- // collectLatest() 是 Kotlin 的 Flow API 中的一个高阶函数
- // 它允许你从流中收集发射的值,并仅对最近值执行转换。它类似于
- // collect(), 后者用于收集所有发射的值,但 collectLatest 只处理最新发射的值并
- // 忽略尚未处理的任何先前值。
- viewModel.events.collectLatest {
- binding.eventFlowButton.text = it
- }
- }
- }
-
- // 在 Compose 中收集 StateFlow
- @Compose
- fun HomeScreen() {
- // Compose 提供了 collectAsStateWithLifecycle 函数,它
- // 从流中收集值,并提供最新值以供使用
- // 需要的地方。当新的流值被发射时,我们得到更新的
- // 值,并且重新组合以更新值的状态。
- // 它默认使用 LifeCycle.State.Started 在生命周期处于指定状态时开始收集值,并在其下降时停止。
- val someFlow by viewModel.events.collectAsStateWithLifecycle()
-
- }
Kotlin 流提供了几种处理异常和错误的机制。
try-catch 块 — 处理异常的基本方法之一是在流中使用 try-catch 块。
- flow {
- try {
- emit(productsService.fetchProducts())
- } catch (e: Exception) {
- emitError(e)
- }
- }
catch 操作符 — Flow 中的 catch 操作符允许你通过将错误处理逻辑封装在一个地方来处理异常。
- flow {
- emit(productsService.fetchProducts())
- }.catch { e ->
- emitError(e)
- }
onCompletion 操作符 — 用于在流完成后执行代码,无论是成功完成还是异常完成。
- flow {
- emit(productsService.fetchProducts())
- }.onCompletion { cause ->
- if (cause != null) {
- emitError(cause)
- }
- }
自定义错误处理 — 在 Android 的复杂场景中,我们可以创建自定义操作符或扩展函数,以适合我们应用程序的方式处理错误。
- fun <T> Flow<T>.sampleErrorHandler(): Flow<Result<T>> = transform { value ->
- try {
- emit(Result.Success(value))
- } catch (e: Exception) {
- emit(Result.Error(e))
- }
- }
LiveData 是生命周期感知的,这意味着它自动管理观察者的生命周期,确保仅在观察者处于活动状态时才传递更新。而流默认不是生命周期感知的。我们可以使用 Compose 中的 collectLatest() 或 collectAsStateWithLifecycle() 函数来从流中收集结果。
流提供更多的灵活性,适合更复杂的异步数据操作,而 LiveData 通常用于更简单的 UI 更新。
流提供内置的背压支持,允许控制数据发射和处理的速率,而 LiveData 不支持背压处理。
流提供丰富的操作符集合,用于顺序和结构化处理,而 LiveData 专注于将最新数据传递给观察者。
感谢阅读!如果你学到了新东西,请关注我获取更多
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。