我是靠谱客的博主 瘦瘦海燕,最近开发中收集的这篇文章主要介绍[译] Kotlin协程的设计-coroutine创建的过程是什么样的?[译] Kotlin协程的设计-coroutine创建的过程是什么样的?1、定义2、 CPS — Continuation Passing Style(延续传递风格)3、Kotlin coroutine原则,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

[译] Kotlin协程的设计-coroutine创建的过程是什么样的?

原文:Design of Kotlin Coroutines

注:这是在medium上面读到的一篇关于Kotlin协程的文章,个人觉得写的很好,对Kotlin协程的理解有很大的帮助,故此翻译。由于个人水平问题,可能存在错漏的地方,仅供参考。

部分术语直接使用英文单词,对开发者来说,可能这样更直观一点

很多地方的英文单词和汉语术语是可以互换的

coroutine-协程

coroutineContext-协程上下文

context-上下文

continuation-延续

我们很多人都使用coroutines,但是谁知道coroutine创建的过程是什么样的?这篇blog的结构如下所示:

1、定义

2、CPS — 延续传递风格(Continuation Passing Style)

3、Kotlin coroutine原则

  • 3.1 Coroutine构建

    • 3.1.1 launch()
    • 3.1.2 start()
    • 3.1.3 invoke()
    • 3.1.4 startCoroutineCancellable()
    • 3.1.5 resumeWithCancellable()
    • 3.1.6 resumeWith()
    • 3.1.7 invokeSuspend()
    • 3.1.8 Summary of coroutine construction
  • 3.2 字节码分析

1、定义

coroutine是什么?

一个coroutine是一个可挂起的计算的实例,从概念上讲,它类似于一个thread,从某种意义上讲,它包含一个代码块与其他代码同时执行,然而一个coroutine并不和任何thread绑定。它有可能在某个线程挂起执行,然后在另一个线程恢复

在异步编程里面,tasks在不同的线程里面并行执行,而且不需要等待其他tasks的结束。不恰当的多线程使用有可能导致CPU的高度使用,或者增加CPU的运行周期,从而导致你的应用性能急剧下降,因此,线程是一种昂贵的资源。Coroutines是thread的轻量级替代品。

suspend函数是什么?

suspend函数是可以被started、paused和resume的一种函数。关于suspend函数需要被记忆的最重要的其中一点是:它们只允许被其他的coroutine或者另一个suspend函数调用

当一个coroutine被挂起时,其运行的线程可以被其他coroutines使用。coroutine的延续不一定是同一个线程。在这里,我们得出的结论是,我们可以在少数线程上面同时运行许多coroutines。接下来你会看到它是怎么工作的。

suspending vs Non-suspending/常规 函数

  • 一个suspend函数包含一个或多个挂起点(suspention Points),一个常规函数没有挂起点。一个挂起点代表了方法体内的一个声明,代表在方法体内,可以暂停执行,以在后面的一段时间后恢复

  • 普通函数无法直接调用suspend函数,因为它不支持挂起点

  • suspend函数可以调用普通函数,因为它没有挂起点

这是一个suspend函数例子:

suspend fun functionA(): String {
     return  "hello"
}

正如我们上面说的,我们不能在普通函数里面调用suspend函数。让我们反编译一下functionA(),看看里面发生了什么。

使用Tools->Kotlin->Show Kotlin Bytecode.

@Nullable
public static final Object functionA(@NotNull Continuation $completion) {
  return "hello";
}

你可能疑惑Continuation参数是怎么来的,以及是什么意思。现在我们会解释它是怎么来的,一会我们会展示它是什么。每个suspend函数都会经过一个CPS-Continuation passing style转换。

我们可以通过接下来的例子观察coroutine的挂起。想象我们正在玩一款游戏,我们想要suspend(暂停),然后继续在我们离开的地方玩。在这个场景下,我们将游戏存档,当我们想继续的时候,我们可以在档案中的上一个暂停点恢复。当过程被suspend时,它会返回一个Continuation(延续)。

2、 CPS — Continuation Passing Style(延续传递风格)

在一个有n个参数(p_1,p_2,p_3,…,p_n)以及返回类型为T的CPS转换suspend函数中,它们获得了一个Continuation<T>类型的参数p_n+1,返回类型是Any?,返回类型被转换为Any?,原因是:

  • 如果方法返回了一个结果,我们得到T
  • 如果方法suspend(挂起),它返回信号值COROUTINE_SUSPENDED,它代表挂起状态

3、Kotlin coroutine原则

让我们用一个示例来浏览coroutine过程,了解coroutine是如何创建的,并解释器流程。

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        lifecycleScope.launch {
            val randomNum = getRandomNum() // suspension point #1
            val sqrt = getSqrt(randomNum.toDouble()) // suspension point #2
            log(sqrt.toString())
        }
    }

    private suspend fun getRandomNum(): Int {
        delay(1000)
        return (1..1000).shuffled().first()
    }

    private suspend fun getSqrt(num: Double): Double {
        delay(2000)
        return sqrt(num)
    }

    private fun log(text: String) {
        Log.i(this@MainActivity::class.simpleName, text)
    }
}

在上面的例子中,我们有两个suspend函数。getRandomNum()函数获取一个从1到1000的随机数,将其传递给另一个suspend函数getSqrt()来获取其根。

我们在第六行添加一个断点,在debug模式中运行来获得执行coroutine体之前的堆栈。我们想通过这个来看coroutine创建过程是什么样的。

这是一个堆栈:

invokeSuspend:75, MainActivity$startCoroutine$1 (me.aleksandarzekovic.exploringcoroutines)
resumeWith:33, BaseContinuationImpl (kotlin.coroutines.jvm.internal)
resumeCancellableWith:266, DispatchedContinuationKt (kotlinx.coroutines.internal)
startCoroutineCancellable:30, CancellableKt (kotlinx.coroutines.intrinsics)
startCoroutineCancellable$default:25, CancellableKt (kotlinx.coroutines.intrinsics)
invoke:110, CoroutineStart (kotlinx.coroutines)
start:126, AbstractCoroutine (kotlinx.coroutines)
launch:56, BuildersKt__Builders_commonKt (kotlinx.coroutines)
launch:1, BuildersKt (kotlinx.coroutines)
launch$default:47, BuildersKt__Builders_commonKt (kotlinx.coroutines)
launch$default:1, BuildersKt (kotlinx.coroutines)
startCoroutine:75, MainActivity (me.aleksandarzekovic.exploringcoroutines)
onCreate:71, MainActivity (me.aleksandarzekovic.exploringcoroutines)
...

执行顺序是:

3.1 coroutine构建

3.1.1 launch()

启动一个新的coroutine,无需阻塞当前线程,并返回类型为Job的coroutine引用

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

CoroutineScope

为新的coroutines定义一个作用域。每个协程构建器(像launch,async,等等)都是CoroutineScope的一个扩展函数,并继承其CoroutineContext用来自动传播其所有的元素(Element)和取消(cancellation)

CoroutineScope是仅有一个coroutineContext属性的接口。

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

注意:在我们的例子中,我们使用launch coroutine构建器,并且我们将通过它解释创建coroutine。其他coroutine构建器相对这个是非常直观的

launch coroutine构建器包含三个参数:

  • context - 附加到CoroutineScope.coroutineContext的协程上下文
  • start - coroutine启动选项,默认值是CoroutineStart.DEFAULT
  • block - 将会在提供的作用域中执行的协程代码。编译器在编译阶段为suspend lambda函数生成一个继承SuspendLambda且实现Function2的内部类:
final class me/aleksandarzekovic/exploringcoroutines/MainActivity$onCreate$1 
  extends kotlin/coroutines/jvm/internal/SuspendLambda 
    implements kotlin/jvm/functions/Function2

这意味着kotlin为每个coroutine在编译阶段生成了一个SuspendLambda匿名内部类。这是协程体类(coroutine body class)。内部实现了两个方法:

  • invokeSuspend() - 包含我们协程体中的代码,它在内部处理状态值。coroutine中最重要的状态变化逻辑,被称之为状态机,正是被包含在它里面的
  • create() - 方法接收一个Continuation对象,然后创建并返回一个协程体类的对象。

Coroutine state machine(协程状态机)

Kotlin以状态机来实现挂起函数,因为这样的实现不需要特定的运行时支持。这决定了Kotlin coroutines必须要显式的suspend标记(方法染色):编译器必须知道哪些方法可能是suspend的,以将其变为状态机

状态机的状态对应着挂起点,在我们的例子中:

该代码块中有三个方法:

  • L0:直到挂起点#1
  • L1:直到挂起点#2
  • L2:直到结束

带有这三个状态的状态机生成的伪代码像这样:

// The initial state of the state machine
int label = 0
A a = null
B b = null

void resumeWith(Object result) {
    if (label == 0) goto L0
    if (label == 1) goto L1
    if (label == 2) goto L2
    else throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")

  L0:
    // result is expected to be `null` at this invocation
    label = 1
    // 'this' is passed as a continuation
    result = getRandomNum(this) 
    if (result == COROUTINE_SUSPENDED) return
  L1:
    A a = (A) result
    label = 2
    val result = getSqrt(a, this) // 'this' is passed as a continuation
    if (result == COROUTINE_SUSPENDED) return
  L2:
    B b = (B) result
    log(String.valueOf(b))
    label = -1 // No more steps are allowed
    return
}    

coroutine的逻辑被封装在invokeSuspend方法中,我们之前有提到。

SuspendLambda的继承关系是:

Continuation(延续)

代表一个挂起点之后的延续,返回一个类型为T的值

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

Continuations非常重要,因为它允许coroutine的延续。每个suspend函数都与Continuation生成的子类型相关联,它处理挂起的实现

  • context - 与该延续对应的coroutine的上下文
  • resumeWith() - 用来在挂起点之间传递结果。它以最后一个挂起点的结果(或异常)调用并恢复coroutine的执行

BaseContinuationImpl

BaseContinuationImpl 的主要源码如下:

internal abstract  class  BaseContinuationImpl (...) {
     // Implement resumeWith of Continuation
     // It is final and cannot be overridden!
    public  final override fun resumeWith (result: Result<Any?>) {
        // ...
        val  outcome  = invokeSuspend(param)
        // ...
    }
    // For implementation 
    protected abstract fun invokeSuspend (result: Result<Any?>) : Any?
}

invokeSuspend() - 是一个抽象方法,是在编译阶段生成的协程体类中实现的

resumeWith() 方法的实现通常被invokeSuspend()方法调用

ContinuationImpl

ContinuationImpl继承了BaseContinuationImpl类。它的功能是通过拦截器生成一个DispatchedContinuation对象,这也是一个Continuation。我们将在3.2.4节讨论它。

让我们继续分析launch()的方法体

newCoroutineContext()

newCoroutineContext - 为一个新的coroutine创建上下文。如果没有指定其他dispatcher或者ContinuationInterceptor,它将会使用Dispatchers.Default ,并且为 调试设施(如果开关为开的话)和 JVM上的copyable-thread-local设施 添加额外的可选支持。

newCoroutineContext是CoroutineScope的一个扩展函数。它的功能是合并CoroutineScope继承的context和通过参数传进来的context,并返回一个新的context。

让我们简要介绍一下CoroutineContext

CoroutineContext

CoroutineContext是一个由Elements对象组成的不可变的索引联合集(immutable indexed union set),Element对象比如CoroutineName、CoroutineId、CoroutineExceptionHandler、ContinuationInterceptor、CoroutineDispatcher、Job。此集合中的每个元素都包含一个独特的Key

想象我们想控制我们的coroutine运行在某个线程或者线程池上。取决于我们想把任务运行在主线程上,或者任务是CPU或者IO型的,我们将会使用dispatcher

Dispatchers是coroutine提供的线程调度器,使用其来切换线程或者是指定coroutine运行的线程。Dispatchers中有四种类型的调度器:

  • Dispatchers.Default
  • Dispatchers.IO
  • Dispatchers.Main
  • Dispatchers.Unconfined

这些调度器都是CoroutineDispatcher(协程调度器)。我们在上面提到,协程调度器是CoroutineContext的一个元素,这意味着这些调度器都是CoroutineContext的元素。

我们说CoroutineContext类似于包含不同元素的集合。我们可以通过adding/removing一个元素,或者合并两个已有的context来创建一个新的context。plus(+)操作符作为Set.plus的扩展,返回两个context的组合,加号右边的元素会替代左边相同key的元素。

没有任何元素的上下文可以被创建为EmptyCoroutineContext的一个实例。

import kotlinx.coroutines.*

fun main() {
    val coroutineName = CoroutineName("C#1") + CoroutineName("C#2")
    println(coroutineName)
}

// result
// CoroutineName(C#2)
import kotlinx.coroutines.*

fun main() {
    val coroutineContext = CoroutineName("C#1") + Dispatchers.Default
    println(coroutineContext)
}

// result
// [CoroutineName(C#1), Dispatchers.Default]
import kotlinx.coroutines.*

fun main() {
    val firstCoroutineContext = CoroutineName("C#1") + Dispatchers.Default
    println(firstCoroutineContext)
    
    val secondCoroutineContext = Job() + Dispatchers.IO
    println(secondCoroutineContext)
    
    val finalCoroutineContext = firstCoroutineContext + secondCoroutineContext
    println(finalCoroutineContext)
}

// result
// [CoroutineName(C#1), Dispatchers.Default]
// [JobImpl{Active}@39a054a5, Dispatchers.IO]
// [CoroutineName(C#1), JobImpl{Active}@39a054a5, Dispatchers.IO]

一个CoroutineContext从来不会被重写,而是与一个已有的合并。现在我们了解了一些CoroutineContext的知识,我们可以回到刚才离开的地方,launch构建器中的newCoroutineContext。

让我们定义不同的context,便于我们理解:

  • scope context - CoroutineScope中定义的context
  • (passed context)传入的context - 构建器函数接收一个CoroutineContext实例作为第一个参数
  • (parent context)父上下文 - 构建器中的suspend代码块参数拥有一个CoroutineScope接收器,该接收器本身提供了CoroutineContext,这个context并不是一个新的context!(译注:这里有些绕,指的context是StandaloneCoroutine里面的context)
  • 新的coroutine创建它自己的子Job实例(使用此作业的上下文作为其parent),并将父context+它自己的Job定义为它的coroutine context(子context)。我们在稍后更详细地看到我们是怎么得出这个结论的。

(译注:上面这一段有些绕口,其实就是public final override val context: CoroutineContext = parentContext + this 这段代码)

在定义了新的上下文(父上下文)之后,我们可以创建一个新的coroutine:

StandaloneCoroutine
我们使用新的上下文(父上下文)来创建coroutine。start参数的默认值是CoroutineStart.DEFAULT,在这个例子中,我们创建了StandaloneCoroutine(继承自AbstractCoroutine),返回值是一个Job。StandaloneCoroutine是一个协程对象。

注意:如果我们把start设置为lazy,我们会创建一个LazyStandaloneCoroutine,LazyStandaloneCoroutine是继承自StandaloneCoroutine的,StandaloneCoroutine是继承自AbstractCoroutine的。

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

StandaloneCoroutine中,只有handleJobException方法被重写,用来处理没有被父协程处理的异常。这里调用的start方法是父类AbstractCoroutine的方法。

@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
  
  // ...
  
  public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
      start(block, receiver, this)
  }
  
  // ...
}

AbstractCoroutine类实现了JobSupport类和Job,Continuation以及CoroutineScope接口。AbstractCoroutine类主要负责协程的恢复和结果的返回。

JobSupport
JobSupport是Job的特殊实现。AbstractCoroutine可以被当做一个Job来控制coroutine的生命周期,它可以实现Continuation接口,也可以被当做Continuation使用。

Job

AbstractCoroutine的上下文是我们通过参数传递(父context)加上当前的coroutine,由于我们知道AbstractCoroutine既是Job又是CoroutineScope,我们知道我们的协程上下文包含一个Job元素。这个context是协程上下文(子context)。

@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    // ...

    /**
     * The context of this coroutine that includes this coroutine as a [Job].
     */
    @Suppress("LeakingThis")
    public final override val context: CoroutineContext = parentContext + this
    
    // ...
}

堆栈中的第二步是coroutine.start(start, coroutine, block)

3.1.2 start()

使用给定的代码块和启动策略启动协程。该方法在协程中最多被执行一次。

@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
  
  // ...
  
  public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
      start(block, receiver, this)
  }
  
  // ...
}

AbstractCoroutine#start()方法调用start()方法。CoroutineStart是一个枚举类,invoke()方法是内部重写的。在这个场景中,start()方法会调用CoroutineStart.invoke()方法。

3.1.3 invoke()

定义协程构建器的启动选项,它在launch、async以及其他协程构建器中使用start参数传递

@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(completion)
        ATOMIC -> block.startCoroutine(completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(completion)
        LAZY -> Unit // will start lazily
}

CoroutineStart是一个有四个类型的枚举类:

  • DEFAULT - 根据其上下文,立即执行coroutine
  • LAZY - 懒启动coroutine,或者被需要的时候
  • ATOMIC - 根据上下文原子化(一种无法被取消的方式)执行coroutine
  • UNDISPATCHED - 立即执行,直到遇到当前线程的第一个挂起点

在这里,DEFAULT用于作为实例

3.1.4 startCoroutineCancellable()

以一种可取消的方式使用此方法启动coroutine,以便在等待调度时可以取消。

/**
 * param completion is AbstractCoroutine
 * return a Continuation
 */
internal fun <R , T> (suspend ( R ) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =   
  runSafely(completion) {   
      createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))   
  }

runSafely()运行给定的代码块,如果发生异常,则将completion完成。理由:我们在自己的调度器上异步运行协程的时候会调用startCoroutineCancellable。因此,如果在coroutine启动的过程中调度器抛出异常,协程永远无法被结束,所以我们应该将调度器异常视为原因并恢复completion(译注:这里的completion指的都是传递的那个参数)

private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()
    } catch (e: Throwable) {
        completion.resumeWith(Result.failure(e))
    }
}

startCoroutineCancellable的实现是一个链式调用,让我们看下这个链:

1、createCoroutineUnintercepted() - 是调用协程体的扩展函数,协程体被编译成SuspendLambda的子类(协程体类),所以这就是BaseContinuationImpl。

// kotlin/libraries/stdlib/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt

@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

create()方法创建一个协程体的实例,在这里我们获得了协程类的实例。

@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
    Intrinsics.checkNotNullParameter(completion, "completion");
    Function2 var3 = new <anonymous constructor>(completion);
    return var3;
 }

2、intercepted() - 使用ContinuationInterceptor来拦截continuation(延续).

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
   (this as? ContinuationImpl)?.intercepted() ?: this

this是一个继承自ContinuationImpl的协程体的实例

public fun intercepted(): Continuation<Any?> =
    intercepted
      ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

如果intercepted为空,通过使用context中指定的interceptor拦截协程体类,并返回包装后的协程体对象。

context[ContinuationInterceptor] - 获取集合中的调度器(译注:这里指的应该是拦截器),并调用interceptContinuation()。
interceptContinuation()方法用来将协程体的延续包装成一个DispatchedContinuation。

// CoroutineDispatcher

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> 
  = DispatchedContinuation(this, continuation)

DispatchedContinuation

DispatchedContinuation代表了协程体的延续对象,并持有线程调度器。它的功能是使用线程调度器来调度协程体到指定的线程上执行。

注意在构造器中,它需要一个调度器和continuation,并且它实现了Continuation<T>DispatchedTask<T>

3、resumeCancellableWith() - Continuation的一个扩展函数

@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

如果this不是拦截或者包装的协程体对象,resumeWith(result)将会被调用。

否则,如果this是拦截的且用DispatchedContinuation类包装的对象,resumeCancellableWith(result, onCancellation)会被调用

inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

如果你查看CoroutineDispatcher的源码,dispatcher.isDispatchNeeded()的返回值总是true,只有Dispatchers.Unconfined会重写并返回false。

如果dispatcher.isDispatchNeeded()返回false,我们将直接在协程体中调用resumeWith()方法。

dispatcher.dispatch(context, this)实际上等同于将代码的执行过程分发给默认的线程池。第二个参数是Runnable,我们在这里传递的是this,因为DispatchedContinuation间接地实现了Runnable接口。

Dispatchers.Defaul是DefaultScheduler,DefaultScheduler是一个单例类,所以只有一个对象会被创建并在任何地方使用。DefaultScheduler是SchedulerCoroutineDispatcher的一个子类。

@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler

internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) { 
    ... 
}

internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
    
    ...
    
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
    
    ...
}

Dispatchers.Default#dispatch()调用SchedulerCoroutineDispatcher的dispatch()方法,它将会调用coroutineScheduler.dispatch()

CoroutineScheduler

CoroutineScheduler是Kotlin中实现的线程池,它提供了coroutine可以运行的线程,这意味着它会生成它们(译注:应该是线程池可以生成线程)

CoroutineScheduler是Executor的一个子类,它的execute()方法也是转发给dispatch()方法。

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
  
  ...
  
  override fun execute(command: Runnable) = dispatch(command)
  
  ...
  
  fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
  
  internal inner class Worker private constructor() : Thread() { ... }
}

在dispatch方法中,我们看到以下内容:

  • createTask() - 我们通过传递的Runnable创建Task,该task实际上是一个DispatchedContinuation
  • currentWorker() - 获得当前正在执行的线程。worker是CoroutineScheduler的一个内部类
  • currentWorker.submitToLocalQueue() - 添加task到工作线程的本地队列并等待执行

Worker

Worker是Kotlin coroutine的线程。Worker继承自Thread,其实际上是Java线程的一个封装。我们得出结论,Worker就是Thread。

让我们分析一下Worker是怎么执行task的。

internal inner class Worker private constructor() : Thread() {
    ...
    override fun run() = runWorker()

    private fun runWorker() {
        ...
        while (!isTerminated && state != WorkerState.TERMINATED) {
            val task = findTask(mayHaveLocalTasks)
            if (task != null) {
                ...
                executeTask(task)
                continue
            } 
            ...
        }
        ...
    }
    ...
}

Worker会重写Thread的run方法,然后会调用runWorker方法。在一个while循环中,将会经常试图在本地工作队列中拉取task。如果有一个task需要被执行,executeTask(task)会被调用来执行它。

我们看一下executeTask(task)方法做了什么:

internal inner class Worker private constructor() : Thread() {
    ...
    private fun executeTask(task: Task) {
        val taskMode = task.mode
        idleReset(taskMode)
        beforeTask(taskMode)
        runSafely(task)
        afterTask(taskMode)
    }
    
    ...
    
    fun runSafely(task: Task) {
        try {
            task.run()
        } 
        ...
    }
}

internal abstract class Task(
    @JvmField var submissionTime: Long,
    @JvmField var taskContext: TaskContext
) : Runnable {
    constructor() : this(0, NonBlockingContext)
    inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}

runSafely()方法中,我们调用task.run()。Task是一个Runnable且Runnable#run()实际上意味着我们的协程任务实际上正在执行。

DispatchedContinuation继承自DispatchedTask类,DispatchedTask继承自SchedulerTask,SchedulerTask实现了Runnable接口。我们看到DispatchedTask最终实现了Runable接口,所以我们DispatchedTask中run()的实现。

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {

   public final override fun run() {
     // ...
     try {
         val delegate = delegate as DispatchedContinuation<T>
         val continuation = delegate.continuation
         withContinuationContext(continuation, delegate.countOrElement) {
             // ...
             val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
             if (job != null && !job.isActive) {
                 val cause = job.getCancellationException()
                 cancelCompletedResult(state, cause)
                 continuation.resumeWithStackTrace(cause)
             } else {
                 if (exception != null) {
                     continuation.resumeWithException(exception)
                 } else {
                     continuation.resume(getSuccessfulResult(state))
                 }
             }
         }
     }
     // ...
   }
}

在run()方法中,原始的协程延续对象是通过DispatchedContinuation获取的,resumeWithStackTrace,resumeWithException和resume是扩展函数,它们会触发resumeWith()方法。

/**
 * Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point.
 */
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

/**
 * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
 * last suspension point.
 */
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
    resumeWith(Result.failure(recoverStackTrace(exception, this)))
}

3.1.5 resumeWith

最终的resumeWith()实现在BaseContinuationImpl类中:

public final override fun resumeWith(result: Result<Any?>) {
    // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
    var current = this
    var param = result
    while (true) {
        // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
        // can precisely track what part of suspended callstack was already resumed
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
            releaseIntercepted() // this state machine instance is terminating
            if (completion is BaseContinuationImpl) {
                // unrolling recursion via loop
                current = completion
                param = outcome
            } else {
                // top-level completion reached -- invoke and return
                completion.resumeWith(outcome)
                return
            }
        }
    }
}

protected abstract fun invokeSuspend(result: Result<Any?>): Any?

让我们注意invokeSuspend()方法

3.1.6 invokeSuspend

协程体将会根据状态机顺序执行直到suspend函数被调用。当我们调用它时,方法将会返回COROUTINE_SUSPEND标志,并且直接返回退出循环以及Continuation体,在这种情况下,不会发生线程阻塞。

当一个方法需要被suspend时,状态机会存储上一次的结果作为延续体的一个变量。当suspend函数恢复,Continuation的resumeWith()方法会被调用,然后invokeSuspend()方法会被调用。这样,协程体后面的代码也可以继续执行。

3.1.7 协程构建的总结

1、协程体(挂起lambda方法)在编译期会被编译成内部类,它继承SuspendLambda并实现Function2,特定的继承链是:
SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation

2、CoroutineScope#launch()创建一个协程,根据默认的启动模式CoroutineStart.DEFAULT,创建一个协程对象StandaloneCoroutine并启动StandaloneCoroutine#start(start, coroutine, block)

3、StandaloneCoroutine是AbstractCoroutine的一个子类,且StandaloneCoroutine#start()的实现可以在AbstractCoroutine (AbstractCoroutine#start())中找到。AbstractCoroutine#start() 会触发CoroutineStart#invoke()

4、由于我们例子中的调度器是Dispatchers.Default,我们调用协程体的startCoroutineCancellable()方法来执行逻辑CoroutineStart#invoke()

5、startCoroutineCancellable是一个链式调用:
createCoroutineUnintercepted().intercepted().resumeCancellableWith()

6、createCoroutineUnintercepted创建一个协程体对象

7、intercepted()使用一个interceptor/scheduler来将协程体对象包装为DispatchedContinuation。DispatchedContinuation代表协程体类的一个延续对象,并包含调度器。

8、由于调度器是Dispatchers.Default且isDispatchNeeded() 方法返回true,DispatchedContinuation#resumeCancellableWith() 使用线程调度器的dispatcher#dispatch(context, this)来调度。

9、Dispatchers.Default#dispatch()调用SchedulerCoroutineDispatcher的dispatch()方法,它会调用CoroutineScheduler#dispatch()

10、CoroutineScheduler申请一个线程Worker并在Worker#run()方法中触发DispatchedContinuation的run方法(DispatchedTask#run())

11、run方法会触发resumeWith()方法。协程体的调用实际上是一个resumeWith()方法的调用。

3.2 字节码分析

让我们分析一下我们例子的字节码。使用Tools->Kotlin->Show Kotlin Bytecode,然后点击Decompile生成对应的反编译的Java代码。

BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
   int label;

   @Nullable
   public final Object invokeSuspend(@NotNull Object $result) {
      Object var10000;
      label17: {
         Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         MainActivity var6;
         switch(this.label) {
         case 0:
            ResultKt.throwOnFailure($result);
            var6 = MainActivity.this;
            this.label = 1;
            var10000 = var6.getRandomNum(this);
            if (var10000 == var5) {
               return var5;
            }
            break;
         case 1:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break;
         case 2:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break label17;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }

         int randomNum = ((Number)var10000).intValue();
         var6 = MainActivity.this;
         double var10001 = (double)randomNum;
         this.label = 2;
         var10000 = var6.getSqrt(var10001, this);
         if (var10000 == var5) {
            return var5;
         }
      }

      double sqrt = ((Number)var10000).doubleValue();
      MainActivity.this.log(String.valueOf(sqrt));
      return Unit.INSTANCE;
   }

   @NotNull
   public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
      Intrinsics.checkNotNullParameter(completion, "completion");
      Function2 var3 = new <anonymous constructor>(completion);
      return var3;
   }

   public final Object invoke(Object var1, Object var2) {
      return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
   }
}), 3, (Object)null);

我们看一下invokeSuspend方法:

1、15,16行,我们检查var10000是否为COROUTINE_SUSPENDED,如果是的话,意味着我们没有可用的返回值,并等待可用的返回值返回。在我们的例子中,getRandomNum()将会返回COROUTINE_SUSPENDED。在这之前我们把label设为1。

为什么getRandomNum()方法返回COROUTINE_SUSPENDED?

private final Object getRandomNum(Continuation var1) {
  Object $continuation;
  label20: {
     // ...
  }

  Object $result = ((<undefinedtype>)$continuation).result;
  Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
  switch(((<undefinedtype>)$continuation).label) {
  case 0:
     ResultKt.throwOnFailure($result);
     ((<undefinedtype>)$continuation).label = 1;
     if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
        return var5;
     }
     break;
  case 1:
     ResultKt.throwOnFailure($result);
     break;
  default:
     throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
  }

  byte var2 = 1;
  return CollectionsKt.first(CollectionsKt.shuffled((Iterable)(new IntRange(var2, 1000))));
}

在13行,delay()方法被调用,我们看一下它的实现。

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    postDelayed(Runnable {
        with(continuation) { resumeUndispatched(Unit) }
    }, timeMillis)
}

suspendCancellableCoroutine的返回是COROUTINE_SUSPENDED。需要suspend并等待结果返回。可以看到这里的delay的逻辑与Handler机制相似(Handler.postDelayed)。当执行完毕后,continuation.resume() -> BaseContinuationImpl.resumeWith()-> SuspendLambda.invokeSuspend()将被调用来恢复。

当getRandomNum()方法执行结束完毕且可用的返回值被返回后,invokeSuspend()会被调用。此时label=1。首先我们调用throwOnFailure,如果结果失败的话会抛出异常。如果结果成功,我们赋值给var10000,然后break当前的执行逻辑(32-39行)。

2、15,16行(译注:这里应该是36-37行),getSqrt()的执行过程与getRandomNum()类似。此时label=2。当getRandomNum()(译注:应该是sqrt())的结果返回,invokeSuspend将会被再次调用,同样调用throwOnFailure,如果结果失败会抛出异常。如果结果成功,调用break label17,跳转到101行执行后续的逻辑。

现在你可以探索其他的协程构建器,并看它们之间有什么具体的区别。如果有你有什么疑问的话,你可以通过LinkedIn写信给我。

希望你喜欢这篇文章,请继续关注更多!

最后

以上就是瘦瘦海燕为你收集整理的[译] Kotlin协程的设计-coroutine创建的过程是什么样的?[译] Kotlin协程的设计-coroutine创建的过程是什么样的?1、定义2、 CPS — Continuation Passing Style(延续传递风格)3、Kotlin coroutine原则的全部内容,希望文章能够帮你解决[译] Kotlin协程的设计-coroutine创建的过程是什么样的?[译] Kotlin协程的设计-coroutine创建的过程是什么样的?1、定义2、 CPS — Continuation Passing Style(延续传递风格)3、Kotlin coroutine原则所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部