赞
踩
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
既然我们已经知道背压问题是如何产生的,就要去尝试正确地处理它,大致解决方案策略在于,如果你有一个流,你需要一个缓冲区,以防数据产生的速度快于消耗的速度,所以往往就会针对这个背压策略进行些讨论
对于以上问题,通过学习Flow
里的背压策略,相信可以很快就知道答案了
Flow
的背压机制由于Flow
是基于协程中使用的,它不需要一些巧妙设计的解决方案来明确处理背压,在Flow
中,不同于一些传统的响应式框架,它的背压管理是使用Kotlin
挂起函数suspend
实现的,看下源码你会发现,它里面所有的函数方法都是使用suspend
修饰符标记,这个修饰符就是为了暂停调度者的执行不阻塞线程。因此,Flow<T>
在同一个协程中发射和收集时,如果收集器跟不上数据流,它可以简单地暂停元素的发射,直到它准备好接收更多。看到这,是不是觉得有点难懂…
简单举个例子,假设我们拥有一个烤箱,可以用来烤面包,由于烤箱容量的限制,一次只能烤4个面包,如果你试着一次烤8个面包,会大大加大烤箱的承载负荷,这已经远远超过了它的内存使用量,很有可能会因此烧掉你的面包。
回顾下之前所说的,当我们消耗的速度比生产的速度慢的时候,就会产生背压,下面用代码来模拟下这个过程
fun currentTime() = System.currentTimeMillis()
fun threadName() = Thread.currentThread().name
var start: Long = 0
fun createEmitter(): Flow<Int> =
(1..5)
.asFlow()
.onStart { start = currentTime() }
.onEach {
delay(1000L)
print("Emit $it (${currentTime() - start}ms) ")
}
fun main() {
runBlocking {
val time = measureTimeMillis {
createEmitter().collect {
print("\nCollect $it starts ${start - currentTime()}ms")
delay(3000L)
println(" Collect $it ends ${currentTime() - start}ms")
}
}
print("\nCollected in $time ms")
}
}
看下输出结果,如下图所示
这样整个过程下来,大概需要20多秒才能结束,这里我们模拟了接收元素比发送元素慢的情况,因此就需要一个背压机制,而这正是Flow本质中的,它并不需要另外的设计来解决背压
buffer
进行缓存收集为了使缓冲和背压处理正常工作,我们需要在单独的协程中运行收集器。这就是.buffer()
操作符进来的地方,它是将所有发出的项目发送Channel
到在单独的协程中运行的收集器。
public fun <T> Flow<T>.buffer(
capacity: Int = BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>
它还为我们提供了缓冲功能,我们可以指定capacity
我们的缓冲区和处理策略onBufferOverflow
,所以当Buffer
溢出的时候,它为我们提供了三个选项
enum BufferOverflow {
SUSPEND,
DROP_OLDEST,
DROP_LATEST
}
SUSPEND
:会将当前协程挂起,直到缓冲区中的数据被消费了DROP_OLDEST
:它会丢弃最老的数据DROP_LATEST
: 它会丢弃最新的数据好的,我们回到上文所展示的模拟示例,这时候我们可以加入缓冲收集buffer
,不指定任何参数,这样默认就是使用SUSPEND
,它会将当前协程进行挂起
此时当收集器繁忙的时候,程序就开始缓冲,并在第一次收集方法调用结束的时候,两次发射后再次开始收集,此时流程的耗时时长缩短到大约16秒就可以执行完毕,如下图所示输出结果
conflate
解决conflate
操作符于Channel
中的Conflate
模式是一直的,新数据会直接覆盖掉旧数据,它不设缓冲区,也就是缓冲区大小为 0,丢弃旧数据,也就是采取 DROP_OLDEST
策略,那么不就等于buffer(0,BufferOverflow.DROP_OLDEST)
,可以看下它的源码可以佐证我们的判断
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
在某些情况下,由于根本原因是解决生产消费速率不匹配的问题,我们需要做一些取舍的操作,conflate
将丢弃掉旧数据,只有在收集器空闲之前发出的最后一个元素才被收集,将上文的模拟实例改为conflate
执行,你会发现我们直接丢弃掉了2和4,或者说新的数据直接覆盖掉了它们,整个流程只需要10秒左右就执行完成了
collectLatest
解决通过官方介绍,我们知道collectLatest
作用在于当原始流发出一个新的值的时候,前一个值的处理将被取消,也就是不会被接收, 和conflate
的区别在于它不会用新的数据覆盖,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消
深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上鸿蒙开发知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
课程,涵盖了95%以上鸿蒙开发知识点,真正体系化!**
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。