赞
踩
阅读过实战系列三的文章的同学可能有印象,我有提到过Kotlin Flow和Room数据库能很好的配合使用。此外,不久前官方推出了DataStore也是基于Kotlin Flow,所以本节我们先来了解下Kotlin Flow的使用方法。
Kotlin的官方解释引入Flow的场景是:
Return multiple asynchronously computed values over time
这句话很重要,它包括几个关键含义:
- 返回值有多个
- 返回值是分开的
- 返回的是异步结果
接下来我们看看单个含义在Kotlin中如何实现:
返回多个值我们能很容易的想到使用Collections。例如一个
返回List
的listExample
的函数。
fun listExample(): List<Int> {
return listOf(1, 2, 3)
}
listExample().forEach { value -> println(value) }
// 1, 2, 3
上面的情况多个值是作为一个整体一起返回的,如果让多个值分开返回就得使用Kotlin的另外一个数据类型Sequences。
fun sequenceExample(): Sequence<Int> { return sequence { Thread.sleep(1000) // 假装在执行耗时操作 yield(1) // 返回值 Thread.sleep(1000) yield(2) Thread.sleep(1000) yield(3) } } sequenceExample().forEach { value -> println(value) println(Date().time) } // 结果: I/System.out: 1 I/System.out: 1605153441181 I/System.out: 2 I/System.out: 1605153442182 I/System.out: 3 I/System.out: 1605153443185
结果可以看出,通过使用sequence可以实现多个结果分批次单个数值的返回。
异步返回可以使用susupend函数。
suspend fun simpleAsyn(): List<Int> {
delay(1000) // 假装在执行异步耗时操作
return listOf(1, 2, 3)
}
fun simpleFlow(): Flow<Int> {
return flow {
delay(1000) // 假装在执行异步耗时操作
emit(1) // 返回值
delay(1000)
emit(2)
delay(1000)
emit(3)
}
}
simpleFlow().collect { value -> println(value) }
// 1, 2, 3
结论:Flow可以简单的理解为异步的Sequence。
Flow可以理解成异步的Sequence,也具有和Sequence类似的一些特性。
我们的代码如下,打印结果出来可能会比较费解。
val streamOfInts: Flow<Int> = flow { println("开始发送===>") for (i in 1..3) { delay(100) println("发送初始值===>$i") emit(i) } println("结束发送===>") } streamOfInts.map { value -> value * value } println("Flow完成===>") // 打印结果: println("Flow完成===>")
为什么呢? 因为Flow是惰性的,streamOfInts这个Flow没有执行结束操作函数,Flow中的各种中间操作函数不会立即执行
结束操作函数 — Flow执行函数后的返回结果不是Flow,那这个函数就是结束操作函数
中间操作函数 — Flow执行函数后的返回结果仍然是Flow
我们接下来修改代码,执行collect这个函数,它的意义是接收Flow传过来的值:
val streamOfInts: Flow<Int> = flow { println("开始发送===>") for (i in 1..3) { delay(100) println("发送初始值===>$i") emit(i) } println("结束发送===>") } streamOfInts.map { value -> value * value }.collect { println("最终收到 ===> $it") } // 打印结果: I/System.out: 开始发送===> I/System.out: 发送初始值===>1 I/System.out: 最终收到 ===> 1 I/System.out: 发送初始值===>2 I/System.out: 最终收到 ===> 4 I/System.out: 发送初始值===>3 I/System.out: 最终收到 ===> 9 I/System.out: 结束发送===> I/System.out: Flow完成===>
熟悉RxJava的同学是不是似曾相识的感觉,没错Flow就是冷信号,只有被订阅后才会发送值。
上面的结果我们能看出来,分别对每个值进行了所有的操作后才进行下一个值的处理,即先处理完1,再处理2,再处理3。
1 - 1 - 2 - 4 - 3 - 9
flowOf
函数flowOf(1, 2, 3)
// 1, 2, 3
flowOf(listOf(1,2,3))
// [1, 2, 3]
asFlow
函数listOf(1, 2, 3).asFlow()
// 1, 2, 3
asFlow
函数fun flowBuilderFunction(): Int {
return 10
}
::flowBuilderFunction.asFlow()
// 10
asFlow
函数suspend fun flowBuilderFunction(): Int {
return 10
}
::flowBuilderFunction.asFlow()
// 10
asFlow
函数arrayOf(1,2,3).asFlow()
// 1, 2, 3
asFlow
函数LongRange(1, 5).asFlow().collect { value -> println(value) }
// 1, 2, 3, 4, 5
emptyFlow<String>()
- 如果两个相邻的值生产出来的时间间隔超过了
[timeout]
毫秒,就忽过滤掉前一个值- 最后一个值不受影响,总是会被释放
emit
。
flow { emit(1) delay(3000) emit(2) delay(1000) emit(3) delay(1000) emit(4) }.debounce(2000) // 结果:1 4 // 解释: // 2和1的间隔大于2000,1被释放 // 3和2的间隔小于2000, 2被忽略 // 4和3的间隔小于2000, 3被忽略 // 4是最后一个值不受timeout值的影响, 4被释放
[timeout]
可以传毫秒,也可以传Duration
注意:还是个实验性的方法,使用的时候需要加上
@ExperimentalTime
flow {
emit(1)
delay(3000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.debounce(2000.milliseconds)
// 结果:1 4
- 如果生产的值和上个发送的值相同,值就会被过滤掉
flow { emit(1) emit(1) emit(2) emit(2) emit(3) emit(4) }.distinctUntilChanged() // 结果:1 2 3 4 // 解释: // 第一个1被释放 // 第二个1由于和第一个1相同,被过滤掉 // 第一个2被释放 // 第二个2由于和第一个2相同,被过滤掉 // 第一个3被释放 // 第一个4被释放
- 可以传参
(old: T, new: T) -> Boolean
,进行自定义的比较
private class Person(val age: Int, val name: String)
flow {
emit(Person(20, "张三"))
emit(Person(21, "李四"))
emit(Person(21, "王五"))
emit(Person(22, "赵六"))
}.distinctUntilChanged{old, new -> old.age == new.age }
.collect{ value -> println(value.name) }
// 结果:张三 李四 赵六
// 解释:本例子定义如果年龄相同就认为是相同的值,所以王五被过滤掉了
- 可以用
distinctUntilChangedBy
转换成年龄进行对比
flow {
emit(Person(20, "张三"))
emit(Person(21, "李四"))
emit(Person(21, "王五"))
emit(Person(22, "赵六"))
}.distinctUntilChangedBy { person -> person.age }
// 结果:张三 李四 赵六
对每个值进行转换
flow { emit(1) emit(2) emit(3) emit(4) }.transform { if (it % 2 == 0) { emit(it * it) } } // 结果:4 16 // 解释: // 1 不是偶数,被忽略 // 2 是偶数,2的平方4 // 3 不是偶数,被忽略 // 4 是偶数,4的平方16
第一个值被释放之前被执行
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onStart { emit(1000) }
// 结果:1000 1 2 3 4
// 解释:
// 第一个值1被释放的时候调用了emit(1000), 所以1000在1之前被释放
最后一个值释放完成之后被执行
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onCompletion { emit(1000) }
// 结果:1 2 3 4 1000
// 解释:
// 第一个值4被释放的时候调用了emit(1000), 所以1000在4之后被释放
忽略最开始的
[count]
个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.drop(2)
// 结果:3 4
// 解释:
// 最开始释放的两个值(1,2)被忽略了
判断第一个值如果满足
(T) -> Boolean
这个条件就忽略
flow { emit(1) emit(2) emit(3) emit(4) }.dropWhile { it % 2 == 0 } // 结果:1 2 3 4 // 解释: // 第一个值不是偶数,所以1被释放 flow { emit(1) emit(2) emit(3) emit(4) }.dropWhile { it % 2 != 0 } // 结果:2 3 4 // 解释: // 第一个值是偶数,所以1被忽略
只释放前面
[count]
个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.take(2)
// 结果:1 2
// 解释:
// 前面两个值被释放
判断第一个值如果满足
(T) -> Boolean
这个条件就释放
flow { emit(1) emit(2) emit(3) emit(4) }.takeWhile { it%2 != 0 } // 结果:1 // 解释: // 第一个值满足是奇数条件 flow { emit(1) emit(2) emit(3) emit(4) }.takeWhile { it%2 == 0 } // 结果:无 // 解释: // 第一个值不满足是奇数条件
可以切换CoroutineContext
说明:flowOn只影响该运算符之前的CoroutineContext,对它之后的CoroutineContext没有任何影响
withContext(Dispatchers.Main) { flow { println("Thread name1 = ${Thread.currentThread().name}") emit(1) emit(2) emit(3) emit(4) } .map { println("Thread name2 = ${Thread.currentThread().name}") it * it } .filter { println("Thread name3 = ${Thread.currentThread().name}") it > 9 } .flowOn(Dispatchers.IO) .collect{value -> println("Thread name4 = ${Thread.currentThread().name}") println(value) } } // 结果: // Thread name1 = DefaultDispatcher-worker-1 // Thread name2 = DefaultDispatcher-worker-1 // Thread name3 = DefaultDispatcher-worker-1 // Thread name2 = DefaultDispatcher-worker-1 // Thread name3 = DefaultDispatcher-worker-1 // Thread name2 = DefaultDispatcher-worker-1 // Thread name3 = DefaultDispatcher-worker-1 // Thread name2 = DefaultDispatcher-worker-1 // Thread name3 = DefaultDispatcher-worker-1 // Thread name4 = main // 16 // 解释: // flowOn(Dispatchers.IO)之前的flow,map,filter都是在Dispatchers.IO上执行 // flowOn(Dispatchers.IO)之后的collect则由withContext(Dispatchers.Main)决定
将flow的多个任务分配到不同的协程中去执行,加快执行的速度。
val flow1 = flow { delay(2000) // 假设耗时任务 emit(1) // 释放值 delay(2000) emit(2) delay(2000) emit(3) delay(2000) emit(4) } flow1.collect { value -> println(value) delay(4000) } // 结果 // 2020-11-16 13:48:37.144 24060-24116/com.johnny.flowdemo I/System.out: 1 // 2020-11-16 13:48:43.150 24060-24116/com.johnny.flowdemo I/System.out: 2 // 2020-11-16 13:48:49.160 24060-24116/com.johnny.flowdemo I/System.out: 3 // 2020-11-16 13:48:55.166 24060-24116/com.johnny.flowdemo I/System.out: 4 // 解释: // 4个耗时操作每个2000毫秒,加上collect的4000毫秒,所以每个值的时间间隔是6000毫秒。 val flow2 = flow { delay(2000) // 假设耗时任务 emit(1) // 释放值 delay(2000) emit(2) delay(2000) emit(3) delay(2000) emit(4) }.buffer() flow2.collect { value -> println(value) delay(4000) } // 结果 // 2020-11-16 13:51:11.290 24253-24299/com.johnny.flowdemo I/System.out: 1 // 2020-11-16 13:51:15.293 24253-24299/com.johnny.flowdemo I/System.out: 2 // 2020-11-16 13:51:19.297 24253-24300/com.johnny.flowdemo I/System.out: 3 // 2020-11-16 13:51:23.301 24253-24300/com.johnny.flowdemo I/System.out: 4 // 解释: // 4个耗时操作被分配到了不同的协程中执行,总共耗时了大约2000毫秒。collect收到的4个值差不多同时,所以每个值依次收到的时间间隔是4000毫秒。
如果值的生产速度大于值的消耗速度,就忽略掉中间未来得及处理的值,只处理最新的值。
val flow1 = flow { delay(2000) emit(1) delay(2000) emit(2) delay(2000) emit(3) delay(2000) emit(4) }.conflate() flow1.collect { value -> println(value) delay(5000) } // 结果: 1 3 4 // 解释: // 2000毫秒后生产了1这个值,交由collect执行,花费了5000毫秒,当1这个值执行collect完成后已经经过了7000毫秒。 // 这7000毫秒中,生产了2,但是collect还没执行完成又生产了3,所以7000毫秒以后会直接执行3的collect方法,忽略了2这个值 // collect执行完3后,还有一个4,继续执行。
将原始的
Flow<T>
通过[transform]
转换成Flow<Flow<T>>
,然后将Flow<Flow<T>>
释放的Flow<T>
其中释放的值一个个释放。
flow { delay(1000) emit(1) delay(1000) emit(2) delay(1000) emit(3) delay(1000) emit(4) }.flatMapConcat { flow { emit("$it 产生第一个flow值") delay(2500) emit("$it 产生第二个flow值") } }.collect { value -> println(value) } // 结果 // I/System.out: 1 产生第一个flow值 // I/System.out: 1 产生第二个flow值 // I/System.out: 2 产生第一个flow值 // I/System.out: 2 产生第二个flow值 // I/System.out: 3 产生第一个flow值 // I/System.out: 3 产生第二个flow值 // I/System.out: 4 产生第一个flow值 // I/System.out: 4 产生第二个flow值 // 解释: // 原始Flow<Int>通过flatMapConcat被转换成Flow<Flow<Int>> // 原始Flow<Int>首先释放1,接着Flow<Flow<Int>> 就会释放 1产生第一个flow值 和 1产生第二个flow值 两个值 // Flow<Int>释放2,... // Flow<Int>释放3,... // Flow<Int>释放4,...
和flatMapConcat类似,只是少了一步Map操作。
flow { delay(1000) emit(flow { emit("1 产生第一个flow值") delay(2000) emit("1 产生第二个flow值") }) delay(1000) emit(flow { emit("2 产生第一个flow值") delay(2000) emit("3 产生第二个flow值") }) delay(1000) emit(flow { emit("3 产生第一个flow值") delay(2000) emit("3 产生第二个flow值") }) delay(1000) emit(flow { emit("4 产生第一个flow值") delay(2500) emit("4 产生第二个flow值") }) }.flattenConcat() // 结果 // I/System.out: 1 产生第一个flow值 // I/System.out: 1 产生第二个flow值 // I/System.out: 2 产生第一个flow值 // I/System.out: 2 产生第二个flow值 // I/System.out: 3 产生第一个flow值 // I/System.out: 3 产生第二个flow值 // I/System.out: 4 产生第一个flow值 // I/System.out: 4 产生第二个flow值
将原始的
Flow<T>
通过[transform]
转换成Flow<Flow<T>>
,然后将Flow<Flow<T>>
释放的Flow<T>
其中释放的值一个个释放。它与flatMapConcat的区别是:
Flow<Flow<T>>
释放的Flow<T>
其中释放的值没有顺序性,谁先产生谁先释放。
flow { delay(1000) emit(1) delay(1000) emit(2) delay(1000) emit(3) delay(1000) emit(4) }.flatMapMerge { flow { emit("$it 产生第一个flow值") delay(2500) emit("$it 产生第二个flow值") } }.collect { value -> println(value) } // 结果 // I/System.out: 1 产生第一个flow值 // I/System.out: 2 产生第一个flow值 // I/System.out: 3 产生第一个flow值 // I/System.out: 1 产生第二个flow值 // I/System.out: 4 产生第一个flow值 // I/System.out: 2 产生第二个flow值 // I/System.out: 3 产生第二个flow值 // I/System.out: 4 产生第二个flow值 // 解释: // 原始Flow<Int>首先释放1, 第二个Flow<Flow<Int>> 释放 1产生第一个flow值,但是 1产生第二个flow值是3500毫秒才释放,2 产生第一个flow值 是2000毫秒释放, 3 产生第一个flow值 是3000毫秒释放,3500毫秒时刻才是 1产生第二个flow值 的释放
和 flattenMerge类似,只是少了一步Map操作。
将
Iterable<Flow<T>>
合并成一个Flow<T>
val flow1 = listOf( flow { emit(1) delay(500) emit(2) }, flow { emit(3) delay(500) emit(4) }, flow { emit(5) delay(500) emit(6) } ) flow1.merge().collect { value -> println("$value") } // 结果: 1 3 5 2 4 6 // 解释: // 按Iterable的顺序和耗时顺序依次释放值
原始flow会触发
transformLatest
转换后的flow, 当原始flow有新的值释放后,transformLatest
转换后的flow会被取消,接着触发新的转换后的flow
flow { emit(1) delay(1000) emit(2) delay(2000) emit(3) delay(3000) emit(4) }.transformLatest { value -> delay(2500) emit(value * value ) } // 结果: 9 16 // 解释: // 原始Flow释放1以后,转换后的Flow还没来得及释放1,原始Flow释放2 // 原始Flow释放2以后,转换后的Flow还没来得及释放4,原始Flow释放3 // 原始Flow释放3以后,转换后的Flow有足够的时间释放9 // 原始Flow释放4以后,转换后的Flow有足够的时间释放16
和transformLatest类似, 原始flow会触发
transformLatest
转换后的flow, 当原始flow有新的值释放后,transformLatest
转换后的flow会被取消,接着触发新的转换后的flow
区别:flatMapLatest的
transform
转换成的是Flow<T>
, transformLatest的transform
转换成的是Unit
flow { emit(1) delay(1000) emit(2) delay(2000) emit(3) delay(3000) emit(4) }.flatMapLatest { value -> flow { delay(2500) emit(value * value ) } } // 结果: 9 16 // 解释: // 原始Flow释放1以后,转换后的Flow还没来得及释放1,原始Flow释放2 // 原始Flow释放2以后,转换后的Flow还没来得及释放4,原始Flow释放3 // 原始Flow释放3以后,转换后的Flow有足够的时间释放9 // 原始Flow释放4以后,转换后的Flow有足够的时间释放16
和transformLatest类似, 原始flow会触发
transformLatest
转换后的flow, 当原始flow有新的值释放后,transformLatest
转换后的flow会被取消,接着触发新的转换后的flow
区别:mapLatest的
transform
转换成的是T
,flatMapLatest的transform
转换成的是Flow<T>
,transformLatest的transform
转换成的是Unit
通过
predicate
进行过滤,满足条件则被释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.filter { it % 2 == 0 }
// 结果: 2 4
// 解释:
// 2和4满足it % 2 == 0,被释放
通过
predicate
进行过滤,不满足条件则被释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.filterNot { it % 2 == 0 }
// 结果: 1 3
// 解释:
// 1和3不满足it % 2 == 0,被释放
如果是某个数据类型则被释放
flow {
emit(1)
emit("2")
emit("3")
emit(4)
}.filterIsInstance<String>()
// 结果: "2" "3"
// 解释:
// "2" "3"是String类型,被释放
如果数据是非空,则被释放
flow {
emit(1)
emit("2")
emit("3")
emit(null)
}.filterNotNull()
// 结果: 1 "2" "3"
将一个值转换成另外一个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.map { it * it }
// 结果: 1 4 9 16
// 解释:
// 将1,2,3,4转换成对应的平方数
将一个非空值转换成另外一个值
将值封装成IndexedValue对象
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.withIndex()
// 结果:
// I/System.out: IndexedValue(index=0, value=1)
// I/System.out: IndexedValue(index=1, value=2)
// I/System.out: IndexedValue(index=2, value=3)
// I/System.out: IndexedValue(index=3, value=4)
每个值释放的时候可以执行的一段代码
flow { emit(1) emit(2) emit(3) emit(4) }.onEach { println("接收到$it") } // 结果: I/System.out: 接收到1 I/System.out: 1 I/System.out: 接收到2 I/System.out: 2 I/System.out: 接收到3 I/System.out: 3 I/System.out: 接收到4 I/System.out: 4
有一个初始值,然后每个值都和初始值进行运算,然后这个值作为后一个值的初始值
flow { emit(1) emit(2) emit(3) emit(4) }.scan(100) { acc, value -> acc * value } // 结果: 100 100 200 600 2400 // 解释: // 初始值 100 // 1 100 * 1 = 100 // 2 100 * 2 = 200 // 3 200 * 3 = 600 // 4 600 * 4 = 2400
和scan类似,但是没有初始值,最开始是它本身
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.runningReduce { acc, value ->
acc * value
}
// 结果: 1 2 6 24
// 解释:
// 1 1
// 2 1 * 2 = 2
// 3 2 * 3 = 6
// 4 6 * 4 = 24
任意一个flow释放值且都有释放值后会调用combine后的代码块,且值为每个flow的最新值。
val flow1 = flowOf(1, 2, 3, 4).onEach { delay(10) } val flow2 = flowOf("a", "b", "c", "d").onEach { delay(20) } flow1.combine(flow2) { first, second -> "$first$second" }.collect { println("$it") } // 结果:1a 2a 2b 3b 4b 4c 4d // 解释: // 开始 --- flow1 释放 1,flow2 释放 a, 释放1a // 10毫秒 --- flow1 释放 2,释放2a // 20毫秒 --- flow2 释放 b,此时释放2b // 30毫秒 --- flow1 释放 3,此时释放3b // 40毫秒 --- flow1 释放 4,此时释放4b // 40毫秒 --- flow2 释放 c,此时释放4c // 60毫秒 --- flow2 释放 d,此时释放4d
说明:Combine能够接受的参数类型非常多,但是效果都是如上类似。
接收值,一直有用,无需再做介绍。
scope.launch { flow.collect() }
的缩写, 代表在某个协程上下文环境中去接收释放的值
val flow1 = flow { delay(1000) emit(1) delay(1000) emit(2) delay(1000) emit(3) delay(1000) emit(4) } flow1.onEach { println("$it") } .launchIn(GlobalScope) // 结果:1 2 3 4
和
withIndex
对应的,接收封装的IndexedValue
val flow1 = flow { emit(1) emit(2) emit(3) emit(4) }.withIndex() flow1.collectIndexed { index, value -> println("index = $index, value = $value") } // 结果: // I/System.out: index = 0, value = IndexedValue(index=0, value=1) // I/System.out: index = 1, value = IndexedValue(index=1, value=2) // I/System.out: index = 2, value = IndexedValue(index=2, value=3) // I/System.out: index = 3, value = IndexedValue(index=3, value=4)
collectLatest与collect的区别是,如果有新的值释放,上一个值的操作如果没执行完则将会被取消
val flow1 = flow { emit(1) delay(1000) emit(2) delay(1000) emit(3) delay(2000) emit(4) } flow1.collectLatest { println("正在计算收到的值 $it") delay(1500) println("收到的值 $it") } // 结果: // I/System.out: 正在计算收到的值 1 // I/System.out: 正在计算收到的值 2 // I/System.out: 正在计算收到的值 3 // I/System.out: 收到的值 3 // I/System.out: 正在计算收到的值 4 // I/System.out: 收到的值 4 // 解释: // 1间隔1000毫秒后释放2,2间隔1000毫秒后释放3,这间隔小于需要接收的时间1500毫秒,所以当2和3 到来后,之前的操作被取消了。 // 3和4 之间的间隔够长能够等待执行完毕,4是最后一个值也能执行
将释放的值转换成List
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.toList())
// 结果:[1, 2, 3, 4]
将释放的值转换成Set
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.toSet())
// 结果:[1, 2, 3, 4]
- 计算释放值的个数
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.count())
// 结果:4
- 计算满足某一条件的释放值的个数
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.count { it % 2 == 0 })
// 结果:2
// 解释:
// 偶数有2个值 2 4
和runningReduce类似,但是只计算最后的结果。
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.reduce { acc, value -> acc * value })
// 结果:24
// 解释:计算最后的结果,1 * 2 * 3 * 4 = 24
和scan类似,有一个初始值,但是只计算最后的结果。
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.fold(100) { acc, value -> acc * value })
// 结果:2400
// 解释:计算最后的结果,100 * 1 * 2 * 3 * 4 = 2400
只接收一个值的Flow
注意:多于1个或者没有值都会报错
val flow1 = flow {
emit(1)
}
println(flow1.single())
// 结果:1
接收一个值的Flow或者一个空值的Flow
- 接收释放的第一个值/接收第一个值或者空值
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.first())
// 结果:1
- 接收第一个满足某个条件的值
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.first { it % 2 == 0})
// 结果:2
try { flow { emit(1) emit(2) emit(3) emit(4) }.collect { println("接收值 $it") check(it <= 1) { "$it 大于1" } } } catch (e: Throwable) { println("收到了异常: $e") } // 结果: // I/System.out: 接收值 1 // I/System.out: 接收值 2 // I/System.out: 收到了异常: java.lang.IllegalStateException: 2 大于1 // 解释: // 收到2的时候就抛出了异常,让后flow被取消,异常被捕获
catch
函数
catch
函数能够捕获之前产生的异常,之后的异常无法捕获。
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.map {
check(it <= 1) { "$it 大于1" }
it
}
.catch { e -> println("Caught $e") }
.collect()
// 结果:
// Caught java.lang.IllegalStateException: 2 大于1
Flow的取消可以执行CoroutineScope.cancel
GlobalScope.launch { val flow1 = flow { emit(1) emit(2) emit(3) emit(4) } flow1.collect { value -> println("$value") if (value >= 3) { cancel() } } } // 结果:1 2 3
我们了解了Flow是冷信号,类似于RxJava中的observables
。Kotlin还提供了一个类似于RxJava中的Subject
的StateFlow。MutableStateFlow是相当于值可变的StateFlow
它有如下几个特点:
例如:网络请求中(Requesting),网络请求完成(Request Complete),网络请求失败(Request Fail)。
distinctUntilChanged
类似collect
对于它不是必需的,StateFlow创建的时候就能开始释放值class CounterModel {
private val _counter = MutableStateFlow(0) // 私有使用MutableStateFlow
val counter = _counter.asStateFlow() // 对外公开只读的StateFlow
fun inc() {
_counter.value++ //更改值
}
}
val aModel = CounterModel()
val bModel = CounterModel()
val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }
// 两个计时器的结果相加
本文主要讲解了Kotlin Flow
的基础知识,下一篇我们将来介绍Kotlin Flow
在UI界面编写,网络数据请求和数据库中是如何使用的。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。