Kotlin Coroutine Internals
Let’s explore the internals of coroutines based on the Kotlin Proposals — Coroutines, which proposed the design of Kotlin Coroutines.
Coroutine
The original text describes a coroutine as an instance of suspendable computation in a single sentence. As such, the most essential characteristic of a coroutine is its ability to be suspended. So what exactly does it mean for a coroutine to be suspendable?
Suspension
According to the original text, being suspendable means that a coroutine can temporarily pause the execution of its code on the current thread and yield the thread to allow the execution of other coroutines’ code. It also implies that a suspended coroutine can be resumed on a different thread.
State Machine
Let’s consider a coroutine as one big function. The fact that a coroutine, i.e., a function, can be suspended during execution means that we must repeatedly execute the same function multiple times to obtain its return value. This is not a typical control flow that we are familiar with. Let’s examine how a suspendable function might look like using the example below:
class MyCoroutine {
private var label = 0
private var result: Result<Any?> = Result.success(null)
operator fun invoke(): Result<Any?> {
result = when (label) {
0 -> {
label = 1
Result.success(sayHello())
}
1 -> {
label = 2
Result.success(saySomething())
}
2 -> {
label = 3
Result.success(sayBye())
}
3 -> {
label = -1
Result.success("Done!")
}
else -> Result.failure(Error("Too many invokation"))
}
return result
}
private fun sayHello() {
println("Hello, World!")
}
private fun saySomething() {
println("I can suspend!")
}
private fun sayBye() {
println("Bye!")
}
}
fun main() {
val myCoroutine = MyCoroutine()
myCoroutine() // Hello, World!
myCoroutine() // I can suspend!
myCoroutine() // Bye!
val result = myCoroutine()
println(result.getOrNull()) // Done!
}
The MyCoroutine function is a suspendable function. To make a function suspendable, it is necessary to manage variables related to the function’s execution (e.g., program counter). Here, MyCoroutine utilizes the label variable to control the execution flow internally based on suspensions.
This explicit control of execution flow based on states is known as the state machine. Kotlin’s coroutines are also implemented using the state machine approach. The example above is a simple illustration of this metaphorical representation.
Suspension Point
In the previous example, there were three suspensions during the execution of the code. In coroutines, the points in the code where suspensions occur are called suspension points. Suspension points are crucial for achieving the core objective of coroutines, which is concurrency. Coroutines pause execution at suspension points, allowing newly created or waiting for coroutines to run on the thread where the suspended coroutine is located.
For example, by setting suspension points at idle CPU time during I/O operations, a new coroutine can be scheduled instead of the suspended coroutine, effectively utilizing CPU resources.
This concept appears similar to context switching the operating system (OS) performs. However, while preemptive multitasking typically utilizes time-slicing algorithms to perform context switches forcibly, coroutines follow the cooperative multitasking approach, where coroutine switches occur at suspension points defined by user code. In cooperative multitasking, it is essential to generate suspension points appropriately to distribute CPU resources among coroutines efficiently. So, how are suspension points created?
Let’s modify a part of the previous example to make only the sayHello() function a suspension point. If you run the code, you will notice that the “Hello World” statement is printed only during the initial invocation of the myCoroutine function, while the remaining statements are printed upon subsequent invocations.
val COROUTINE_SUSPEND = "FOO" // Special Symbol
class MySuspendingFunction() {
private var label = 0
operator fun invoke(): Result<Any?>? {
while (true) {
when (label) {
0 -> {
label = 1
val result = sayHello()
if (result == COROUTINE_SUSPEND) return null
}
1 -> {
label = 2
val result = saySomething()
if (result == COROUTINE_SUSPEND) return null
}
2 -> {
label = 3
val result = sayBye()
if (result == COROUTINE_SUSPEND) return null
}
3 -> {
label = -1
return Result.success("Done!")
}
else -> return Result.failure(Error("Too many invokation"))
}
}
label++
}
private fun sayHello(): Any {
println("Hello, World!")
return COROUTINE_SUSPEND
}
private fun saySomething(): Any {
println("I can suspend!")
return Unit
}
private fun sayBye(): Any {
println("Bye!")
return Unit
}
}
fun main() {
val fn = MySuspendingFunction()
fn() // Hello, World!
println("================================")
val result = fn() // I can suspend! + Bye!
println(result?.getOrNull()) // Done!
}
// Hello, World!
// ================================
// I can suspend!
// Bye!
// Done!
In Kotlin, the Kotlin compiler generates bytecode for suspend functions as a state machine. It also utilizes a special enum value called COROUTINE_SUSPEND to create suspension points.
Suspending Function
A suspending function is a special function that has the potential to be a suspension point. However, not all suspending functions necessarily include suspension points.
For example, the sayHello() function below does not act as a suspension point. On the other hand, the sayHelloAfter() function does function as a suspension point. More precisely, the delay function within the sayHelloAfter function is the suspension point.
suspend fun sayHello() {
println("Hello, World!")
}
suspend fun sayHelloAfter(delay: Long) {
delay(delay)
println("Hello, World!")
}
To understand how suspension points are created in Kotlin, it’s important to know about the concept of Continuation.
Continuation
A Continuation is an object that effectively owns the code block and execution context executed within a coroutine. It plays a crucial role in the execution and suspension of coroutines. The Continuation object is responsible for managing the execution of the coroutine, including handling suspension and resumption points. It encapsulates the logic needed to suspend a coroutine, save its state, and resume its execution from the point it left off.
In this sense, coroutines are a collection of one or more continuations connected in a sequence. Even functions like launch, which we commonly know as coroutine builders, internally create Continuation objects. The startCoroutine() function, used internally in Kotlin to create coroutines, also creates a Continuation.
// https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/src/kotlin/coroutines/Continuation.kt#L112
public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
Continuation in Kotlin is an interface that represents the shape of a continuation. It provides the resumeWith() method, which is used to resume the execution of the code block associated with the continuation.
// https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/src/kotlin/coroutines/Continuation.kt
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
The reason why the Continuation interface doesn’t provide a suspend() method is because the decision to create a suspension point is determined by suspend functions themselves. As mentioned earlier, the Kotlin compiler transforms suspend functions into a state machine form, and the decision to suspend is based on the return value of the suspend function. Therefore, there is no need to explicitly invoke a function like suspend() by injecting the Continuation into the suspend function. If you want to create a suspension point, you simply need to return COROUTINE_SUSPENDED within the suspend function.
Now, let’s discuss the purpose of the resumeWith() method. The resumeWith() method is used to provide a callback mechanism for external functions, such as I/O operations, that may introduce CPU idle time and can later resume the execution of a coroutine. It allows these external functions to notify the continuation when their work is complete, and the coroutine can be resumed.
For example, one of the most commonly used suspend functions in the Kotlin standard library, delay, is implemented roughly in the following form:
suspend fun delay(ms: Long) = suspendCoroutine { continuation ->
Timer().schedule(object : TimerTask() {
override fun run() {
continuation.resume(Unit)
}
}, ms)
}
The suspendCoroutine function is provided by the Kotlin standard library. It allows suspend functions to receive the Continuation object as a parameter, giving them direct control over the callback mechanism.
When using suspendCoroutine, if you don’t explicitly call resumeWith() on the Continuation object within the suspend function, the suspend function is implemented to return COROUTINE_SUSPENDED, indicating that it should behave as a suspension point.
In the example you provided, the delay() function uses suspendCoroutine to create a suspension point. It schedules the resumption of the coroutine on a separate thread using a timer or delay mechanism. When the delay is complete, the resumeWith() function is called on the Continuation object to resume the coroutine’s execution.
By returning COROUTINE_SUSPENDED or explicitly calling resumeWith(), the suspend function determines whether it should suspend or resume immediately.
Applying this, you can also implement functions that asynchronously read files as suspend functions.
import kotlin.coroutines.*
import java.io.File
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousFileChannel
import java.nio.channels.CompletionHandler
import java.nio.file.StandardOpenOption
suspend fun readAsync(file: File, buffer: ByteBuffer): Int = suspendCoroutine { continuation ->
val channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)
channel.read(buffer, 0L, continuation, object : CompletionHandler<Int, Continuation<Int>> {
override fun completed(result: Int, attachment: Continuation<Int>) {
continuation.resume(result)
channel.close()
}
override fun failed(exc: Throwable, attachment: Continuation<Int>) {
continuation.resumeWithException(exc)
channel.close()
}
})
}
Continuation Passing Style
In Kotlin, concurrency is implemented by having suspend functions return COROUTINE_SUSPENDED to create a suspension point and by calling Continuation.resumeWith() (usually delegating to an external thread) to resume the coroutine.
To control the Continuation object within a suspend function, a reference to the Continuation instance is required. To achieve this, the Kotlin compiler adds the Continuation as an additional parameter to every suspend function during the transformation process. This transformation process is known as Continuation Passing Style (CPS) transformation.
suspend fun <T> sayHello(): T
Here is what the code looks like after undergoing CPS conversion:
fun <T> sayHello(continuation: Continuation<T>): Any?
The reason the return type of the function is Any? is because all suspend functions are designed to be able to return a special value called COROUTINE_SUSPENDED. Kotlin adopts CPS transformation to allow suspend functions to call Continuation.resumeWith().
Implementation
By utilizing the concepts we’ve discussed so far, we can implement coroutines in a simplified manner. Coroutines include many more features, such as specifying the dispatcher to resume a suspended coroutine, cancellation support, structured concurrency, and more. The example provided focuses on the core concepts of coroutine implementation, namely the state machine and CPS, to aid in understanding.
import java.util.*
val COROUTINE_SUSPEND = "FOO"
// continuations
interface Continuation<T> {
fun resumeWith(result: Result<T>): Any?
}
abstract class ContinuationImpl(var next: Continuation<Any?>?) : Continuation<Any?> {
abstract fun invokeSuspend(): Any?
override fun resumeWith(result: Result<Any?>): Result<Any?>? {
var current = this
var result = result
while (true) {
with(current) {
result =
try {
val r = invokeSuspend()
if (r === COROUTINE_SUSPEND) return null
Result.success(r)
} catch (e: Throwable) {
Result.failure(e)
}
if (next != null && next is ContinuationImpl) {
current = next as ContinuationImpl
} else { // reached end
return result
}
}
}
}
}
// suspending functions (CPS applied)
var label = 0
fun mySuspendingFunction(continuation: Continuation<Any?>): Any? {
return when (label) {
0 -> {
label = 1
sayHello(continuation)
}
1 -> {
label = -1
saySomething(continuation)
sayBye(continuation)
}
else -> Error("Too many invokation")
}
}
fun sayHello(continuation: Continuation<Any?>): Any? {
println("Hello, World!")
return delay(3000, continuation)
}
fun saySomething(continuation: Continuation<Any?>): Any? {
println("I can suspend!")
return null
}
fun sayBye(continuation: Continuation<Any?>): Any? {
println("Bye!")
return null
}
// util methods
fun delay(ms: Long, continuation: Continuation<Any?>): Any? {
Timer().schedule(object : TimerTask() {
override fun run() {
continuation.resumeWith(Result.success(Unit))
}
}, ms)
return COROUTINE_SUSPEND
}
fun ((continuation: Continuation<Any?>) -> Any?).startCoroutine(): Any? {
val continuation = this.toContinuation(null)
return continuation.resumeWith(Result.success(Unit))
}
fun ((continuation: Continuation<Any?>) -> Any?).toContinuation(continuation: Continuation<Any?>?): Continuation<Any?> {
return object : ContinuationImpl(continuation) {
override fun invokeSuspend(): Any? {
return this@toContinuation(this)
}
}
}
fun main() {
::mySuspendingFunction.startCoroutine()
println("I'm not blocking!")
println("==========================")
Thread.sleep(4000) // prevent program being terminated
}
// Hello, World!
// I'm not blocking!
// ==========================
// (after 3 seconds..)
// I can suspend!
// Bye!
Conclusion
In this article, we have explored the underlying principles of Kotlin coroutines, focusing on the concepts of state machine and CPS (Continuation Passing Style). In the next article, we will dive into CoroutineContext and ContinuationInterceptor, which play a crucial role in the scheduling of coroutines.
Kotlin — Coroutine Internals was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.