赞
踩
Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。
当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。
Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。
- fun main() {
- var channel = Channel<Int>()
- CoroutineScope(Dispatchers.Default).launch { // 生产者
- repeat(3) {
- delay(10)
- println("send: $it")
- channel.send(it)
- }
- }
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- repeat(3) {
- delay(100)
- var element = channel.receive()
- println("receive: $element")
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- receive: 0
- send: 1
- receive: 1
- send: 2
- receive: 2
说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。
- fun main() {
- var channel = Channel<Int>(2)
- CoroutineScope(Dispatchers.Default).launch { // 生产者
- repeat(3) {
- delay(10)
- println("send: $it")
- channel.send(it)
- }
- }
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- repeat(3) {
- delay(100)
- var element = channel.receive()
- println("receive: $element")
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- send: 1
- send: 2
- receive: 0
- receive: 1
- receive: 2
说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。
- fun main() {
- var channel = Channel<Int>()
- CoroutineScope(Dispatchers.Default).launch { // 生产者
- repeat(3) {
- println("send: $it")
- channel.send(it)
- }
- }
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- var iterator = channel.iterator()
- while (iterator.hasNext()) {
- var element = iterator.next()
- println("receive: $element")
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- send: 1
- receive: 0
- receive: 1
- send: 2
- receive: 2
- fun main() {
- var channel = Channel<Int>()
- CoroutineScope(Dispatchers.Default).launch { // 生产者
- repeat(3) {
- println("send: $it")
- channel.send(it)
- }
- }
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- for (element in channel) {
- println("receive: $element")
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- receive: 0
- send: 1
- send: 2
- receive: 1
- receive: 2
produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannel;actor 函数用于构造一个消费者协程,并返回一个 SendChannel。
- fun main() {
- var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者
- repeat(3) {
- println("send: $it")
- send(it)
- }
- }
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- for (element in receiveChannel) {
- println("receive: $element")
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- send: 1
- receive: 0
- receive: 1
- send: 2
- receive: 2
- fun main() {
- var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者
- repeat(3) {
- var element = receive()
- println("receive: $element")
- }
- }
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- repeat(3) {
- println("send: $it")
- sendChannel.send(it)
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- send: 1
- receive: 0
- receive: 1
- send: 2
- receive: 2
对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。
- fun main() {
- var channel = Channel<Int>(3)
- CoroutineScope(Dispatchers.Default).launch { // 生产者
- repeat(3) {
- println("send: $it")
- channel.send(it)
- }
- channel.close()
- println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
- }
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- repeat(3) {
- var element = channel.receive()
- println("receive: $element")
- }
- println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- send: 1
- send: 2
- producter, isClosedForSend=true, isClosedForReceive=false
- receive: 0
- receive: 1
- receive: 2
- consumer, isClosedForSend=true, isClosedForReceive=true
Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。
- fun main() {
- var channel = Channel<Int>(2)
- CoroutineScope(Dispatchers.Default).launch { // 生产者
- delay(10)
- repeat(3) {
- println("send: $it")
- channel.send(it)
- }
- }
- repeat(2) { index ->
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- for (element in channel) {
- println("receive-$index: $element")
- }
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- send: 1
- send: 2
- receive-0: 0
- receive-0: 2
- receive-1: 1
说明:结果表明,Channel 中同一个元素只会被一个消费者读到。
- fun main() {
- var broadcastChannel = BroadcastChannel<Int>(2)
- CoroutineScope(Dispatchers.Default).launch { // 生产者
- delay(10)
- repeat(3) {
- println("send: $it")
- broadcastChannel.send(it)
- }
- }
- repeat(2) { index ->
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- var receiveChannel = broadcastChannel.openSubscription()
- for (element in receiveChannel) {
- println("receive-$index: $element")
- }
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- send: 1
- send: 2
- receive-0: 0
- receive-0: 1
- receive-0: 2
- receive-1: 0
- receive-1: 1
- receive-1: 2
说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。
- fun main() {
- var channel = Channel<Int>()
- var broadcastChannel = channel.broadcast(2)
- CoroutineScope(Dispatchers.Default).launch { // 生产者
- delay(10)
- repeat(3) {
- println("send: $it")
- broadcastChannel.send(it)
- }
- }
- repeat(2) { index ->
- CoroutineScope(Dispatchers.Default).launch { // 消费者
- var receiveChannel = broadcastChannel.openSubscription()
- for (element in receiveChannel) {
- println("receive-$index: $element")
- }
- }
- }
- Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
- }
打印如下。
- send: 0
- send: 1
- send: 2
- receive-1: 0
- receive-1: 1
- receive-1: 2
- receive-0: 0
- receive-0: 1
- receive-0: 2
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。