1. 首页
  2. IT教程

Coroutine(协程)(三)

Coroutine(协程)(三)

一、通道

1.通道基础

一个 Channel 是一个和 BlockingQueue 非常相似的概念。其中一个不同是它代替了阻塞的 put 操作并提供了挂起的 send,还替代了阻塞的 take 操作并提供了挂起的 receive。

val channel = Channel<Int>()
launch {
    // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
    for (x in 1..5) channel.send(x * x)
}
// 这里我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")
2.关闭与迭代通道 close
val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // 我们结束发送
}
// 这里我们使用 `for` 循环来打印所有被接收到的元素(直到通道被关闭)
for (y in channel) println(y)
println("Done!")
3.管道

管道是一种一个协程在流中开始生产可能无穷多个元素的模式:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val numbers = produceNumbers() // 从 1 开始生成整数
    val squares = square(numbers) // 整数求平方
    repeat(5) {
        println(squares.receive()) // 输出前五个
    }
    println("Done!") // 至此已完成
    coroutineContext.cancelChildren() // 取消子协程
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // 从 1 开始的无限的整数流
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

所有创建了协程的函数被定义在了 CoroutineScope 的扩展上, 所以我们可以依靠结构化并发来确保没有常驻在我们的应用程序中的全局协程。

4.带缓冲的通道

到目前为止展示的通道都是没有缓冲区的。无缓冲的通道在发送者和接收者相遇时传输元素(也称“对接”)。如果发送先被调用,则它将被挂起直到接收被调用, 如果接收先被调用,它将被挂起直到发送被调用。
Channel() 工厂函数与 produce 建造器通过一个可选的参数 capacity 来指定 缓冲区大小 。缓冲允许发送者在被挂起前发送多个元素, 就像 BlockingQueue 有指定的容量一样,当缓冲区被占满的时候将会引起阻塞。

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(4) // 启动带缓冲的通道
    val sender = launch { // 启动发送者协程
        repeat(10) {
            println("Sending $it") // 在每一个元素发送前打印它们
            channel.send(it) // 将在缓冲区被占满时挂起
        }
    }
    // 没有接收到东西……只是等待……
    delay(1000)
    sender.cancel() // 取消发送者协程    
}

使用缓冲通道并给 capacity 参数传入 4 它将打印“sending” 五 次,并且在试图发送第五个元素的时候被挂起

二、异常处理与监督

1.异常的传播

协程构建器有两种形式:自动传播异常(launch 与 actor)或向用户暴露异常(async 与 produce)。 当这些构建器用于创建一个协程时,即该协程不是另一个协程的协程, 前者这类构建器将异常视为未捕获异常,类似 Java 的 Thread.uncaughtExceptionHandler, 而后者则依赖用户来最终消费异常,例如通过 await 或 receive

fun main() = runBlocking {
    val job = GlobalScope.launch { // launch 根协程
        println("Throwing exception from launch")
        throw IndexOutOfBoundsException() // 我们将在控制台打印 Thread.defaultUncaughtExceptionHandler
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async { // async 根协程
        println("Throwing exception from async")
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}

输出

Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException
Joined failed job
Throwing exception from async
Caught ArithmeticException
2.CoroutineExceptionHandler

类似于 Android 的 uncaughtExceptionPreHandler

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception -> 
        println("CoroutineExceptionHandler got $exception") 
    }
    val job = GlobalScope.launch(handler) { // 根协程,运行在 GlobalScope 中
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) { // 同样是根协程,但使用 async 代替了 launch
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用 deferred.await()
    }
    joinAll(job, deferred)    
}
3.取消与异常

取消与异常紧密相关。协程内部使用 CancellationException 来进行取消,这个异常会被所有的处理者忽略,所以那些可以被 catch 代码块捕获的异常仅仅应该被用来作为额外调试信息的资源。

三、共享的可变状态与并发

同步访问共享的可变状态,volatile 无济于事

@Volatile // 在 Kotlin 中 `volatile` 是一个注解
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

在简单的计数器场景中,我们可以使用具有 incrementAndGet 原子操作的 AtomicInteger 类
其实这个问题在java中也是这么处理的

1.以细粒度限制线程

限制线程 是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都限制在单个线程中。

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 将每次自增限制在单线程上下文中
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

这段代码运行非常缓慢,因为它进行了 细粒度 的线程限制。每个增量操作都得使用 withContext(counterContext) 块从多线程 Dispatchers.Default 上下文切换到单线程上下文。

2.以粗粒度限制线程

在实践中,线程限制是在大段代码中执行的,例如:状态更新类业务逻辑中大部分都是限于单线程中。下面的示例演示了这种情况, 在单线程上下文中运行每个协程。

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    // 将一切都限制在单线程上下文中
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

这段代码运行更快而且打印出了正确的结果。

3.互斥

该问题的互斥解决方案:使用永远不会同时执行的 关键代码块 来保护共享状态的所有修改。在阻塞的世界中,你通常会为此目的使用 synchronized 或者 ReentrantLock。 在协程中的替代品叫做 Mutex 。它具有 lock 和 unlock 方法, 可以隔离关键的部分。关键的区别在于 Mutex.lock() 是一个挂起函数,它不会阻塞线程。

还有 withLock 扩展函数,可以方便的替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() } 模式:

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 用锁保护每次自增
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

原创文章,作者:夜风博客,如若转载,请注明出处:https://www.homedt.net/397718.html