当前位置:   article > 正文

【Kotlin】Channel简介

【Kotlin】Channel简介

1 前言

        Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

        当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

        Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

  1. fun main() {
  2. var channel = Channel<Int>()
  3. CoroutineScope(Dispatchers.Default).launch { // 生产者
  4. repeat(3) {
  5. delay(10)
  6. println("send: $it")
  7. channel.send(it)
  8. }
  9. }
  10. CoroutineScope(Dispatchers.Default).launch { // 消费者
  11. repeat(3) {
  12. delay(100)
  13. var element = channel.receive()
  14. println("receive: $element")
  15. }
  16. }
  17. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  18. }

        打印如下。

  1. send: 0
  2. receive: 0
  3. send: 1
  4. receive: 1
  5. send: 2
  6. receive: 2

        说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

  1. fun main() {
  2. var channel = Channel<Int>(2)
  3. CoroutineScope(Dispatchers.Default).launch { // 生产者
  4. repeat(3) {
  5. delay(10)
  6. println("send: $it")
  7. channel.send(it)
  8. }
  9. }
  10. CoroutineScope(Dispatchers.Default).launch { // 消费者
  11. repeat(3) {
  12. delay(100)
  13. var element = channel.receive()
  14. println("receive: $element")
  15. }
  16. }
  17. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  18. }

        打印如下。

  1. send: 0
  2. send: 1
  3. send: 2
  4. receive: 0
  5. receive: 1
  6. receive: 2

        说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

  1. fun main() {
  2. var channel = Channel<Int>()
  3. CoroutineScope(Dispatchers.Default).launch { // 生产者
  4. repeat(3) {
  5. println("send: $it")
  6. channel.send(it)
  7. }
  8. }
  9. CoroutineScope(Dispatchers.Default).launch { // 消费者
  10. var iterator = channel.iterator()
  11. while (iterator.hasNext()) {
  12. var element = iterator.next()
  13. println("receive: $element")
  14. }
  15. }
  16. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  17. }

        打印如下。

  1. send: 0
  2. send: 1
  3. receive: 0
  4. receive: 1
  5. send: 2
  6. receive: 2

3.2 for in

  1. fun main() {
  2. var channel = Channel<Int>()
  3. CoroutineScope(Dispatchers.Default).launch { // 生产者
  4. repeat(3) {
  5. println("send: $it")
  6. channel.send(it)
  7. }
  8. }
  9. CoroutineScope(Dispatchers.Default).launch { // 消费者
  10. for (element in channel) {
  11. println("receive: $element")
  12. }
  13. }
  14. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  15. }

        打印如下。

  1. send: 0
  2. receive: 0
  3. send: 1
  4. send: 2
  5. receive: 1
  6. receive: 2

4 Channel 中 produce 和 actor

        produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannel;actor 函数用于构造一个消费者协程,并返回一个 SendChannel。

4.1 produce

  1. fun main() {
  2. var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者
  3. repeat(3) {
  4. println("send: $it")
  5. send(it)
  6. }
  7. }
  8. CoroutineScope(Dispatchers.Default).launch { // 消费者
  9. for (element in receiveChannel) {
  10. println("receive: $element")
  11. }
  12. }
  13. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  14. }

        打印如下。

  1. send: 0
  2. send: 1
  3. receive: 0
  4. receive: 1
  5. send: 2
  6. receive: 2

4.2 actor

  1. fun main() {
  2. var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者
  3. repeat(3) {
  4. var element = receive()
  5. println("receive: $element")
  6. }
  7. }
  8. CoroutineScope(Dispatchers.Default).launch { // 消费者
  9. repeat(3) {
  10. println("send: $it")
  11. sendChannel.send(it)
  12. }
  13. }
  14. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  15. }

        打印如下。

  1. send: 0
  2. send: 1
  3. receive: 0
  4. receive: 1
  5. send: 2
  6. receive: 2

5 Channel 的关闭

        对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

  1. fun main() {
  2. var channel = Channel<Int>(3)
  3. CoroutineScope(Dispatchers.Default).launch { // 生产者
  4. repeat(3) {
  5. println("send: $it")
  6. channel.send(it)
  7. }
  8. channel.close()
  9. println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
  10. }
  11. CoroutineScope(Dispatchers.Default).launch { // 消费者
  12. repeat(3) {
  13. var element = channel.receive()
  14. println("receive: $element")
  15. }
  16. println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
  17. }
  18. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  19. }

        打印如下。

  1. send: 0
  2. send: 1
  3. send: 2
  4. producter, isClosedForSend=true, isClosedForReceive=false
  5. receive: 0
  6. receive: 1
  7. receive: 2
  8. consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

        Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

  1. fun main() {
  2. var channel = Channel<Int>(2)
  3. CoroutineScope(Dispatchers.Default).launch { // 生产者
  4. delay(10)
  5. repeat(3) {
  6. println("send: $it")
  7. channel.send(it)
  8. }
  9. }
  10. repeat(2) { index ->
  11. CoroutineScope(Dispatchers.Default).launch { // 消费者
  12. for (element in channel) {
  13. println("receive-$index: $element")
  14. }
  15. }
  16. }
  17. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  18. }

        打印如下。

  1. send: 0
  2. send: 1
  3. send: 2
  4. receive-0: 0
  5. receive-0: 2
  6. receive-1: 1

        说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

  1. fun main() {
  2. var broadcastChannel = BroadcastChannel<Int>(2)
  3. CoroutineScope(Dispatchers.Default).launch { // 生产者
  4. delay(10)
  5. repeat(3) {
  6. println("send: $it")
  7. broadcastChannel.send(it)
  8. }
  9. }
  10. repeat(2) { index ->
  11. CoroutineScope(Dispatchers.Default).launch { // 消费者
  12. var receiveChannel = broadcastChannel.openSubscription()
  13. for (element in receiveChannel) {
  14. println("receive-$index: $element")
  15. }
  16. }
  17. }
  18. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  19. }

        打印如下。

  1. send: 0
  2. send: 1
  3. send: 2
  4. receive-0: 0
  5. receive-0: 1
  6. receive-0: 2
  7. receive-1: 0
  8. receive-1: 1
  9. receive-1: 2

        说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

  1. fun main() {
  2. var channel = Channel<Int>()
  3. var broadcastChannel = channel.broadcast(2)
  4. CoroutineScope(Dispatchers.Default).launch { // 生产者
  5. delay(10)
  6. repeat(3) {
  7. println("send: $it")
  8. broadcastChannel.send(it)
  9. }
  10. }
  11. repeat(2) { index ->
  12. CoroutineScope(Dispatchers.Default).launch { // 消费者
  13. var receiveChannel = broadcastChannel.openSubscription()
  14. for (element in receiveChannel) {
  15. println("receive-$index: $element")
  16. }
  17. }
  18. }
  19. Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
  20. }

        打印如下。

  1. send: 0
  2. send: 1
  3. send: 2
  4. receive-1: 0
  5. receive-1: 1
  6. receive-1: 2
  7. receive-0: 0
  8. receive-0: 1
  9. receive-0: 2
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/503703
推荐阅读
相关标签
  

闽ICP备14008679号