This document aims to describe how coroutine usage on the server is implemented within frameworks, outlining best practices and providing educational guidance for developers.
This is not an exhaustive explanation of coroutines and all their features. For comprehensive coroutine documentation, refer to the official Kotlin coroutines documentation. This guide focuses specifically on server-side usage within our frameworks (Misk, event-client, etc.), addressing the unique considerations and patterns that apply to server environments.
While this guide lays out safe usage patterns for coroutines and suspending functions, it is ultimately up to the developer to choose an architecture that is appropriate for their service, can be maintained by their team, and is something they are comfortable with. This document does not prescribe usage but rather enables safe usage when coroutines are the right choice.
A common misconception is that coroutines are simply lightweight threads. While coroutines enable concurrent programming, they are fundamentally different from threads. A coroutine is a block of code with an associated context, not a unit of execution managed by the operating system.
A coroutine is a block of code that executes with an associated context. This context is similar to thread-local storage, but instead of being tied to a thread, it's tied to the coroutine itself. The context contains important information including:
- CoroutineContext: Data local to the coroutine (analogous to thread-local variables)
- Dispatcher: Specifies which pool of threads the coroutine can be resumed on
The Dispatcher determines where suspending calls resume execution:
- Shared thread pool: Multiple coroutines share a pool of worker threads (e.g.,
Dispatchers.Default,Dispatchers.IO) - Specific single thread: Execution is confined to a particular thread (e.g., Android's Main thread, Kafka polling thread)
- Dedicated blocking thread: A single thread that blocks until completion (e.g.,
runBlocking)
Example: If your application does nothing other than call runBlocking in main() and make suspending calls, it is essentially a single-threaded application running a single coroutine.
A suspending function is a function marked with the suspend keyword that can only be called from within a coroutine. Under the hood, a suspending function is syntactic sugar for a CompletableFuture or CompletionStage in Java.
In Java, using futures changes the programming paradigm from procedural to continuation-based:
Continuation-Based Programming (Java Futures):
- An asynchronous operation is invoked, returning a
CompletableFuture - The caller designates what code to execute when the operation completes (the continuation)
- The developer decides whether the calling thread blocks until completion or continues with other work
- If non-blocking, the developer must specify which thread executes the continuation when the operation completes
In Java, these are composed using CompletionStage methods:
- Synchronous composition:
.thenApply(),.thenAccept()- continuation runs on the same thread, blocking - Asynchronous composition:
.thenApplyAsync(),.thenAcceptAsync()- continuation runs on the default asynchronous execution facility or a specifiedExecutor
Coroutine Equivalent:
Kotlin coroutines provide the same functionality with cleaner syntax:
- Suspending call: Equivalent to a
CompletionStagebeing returned - Dispatcher in context: Equivalent to the asynchronous execution facility or
Executor - Procedural syntax: Despite being continuation-based under the hood, suspending functions look and behave like regular procedural code
This means you get the benefits of asynchronous, non-blocking code while maintaining the readability and structure of procedural programming.
The threading model of an application should not be dictated by the creation of coroutines but it's a separate but intertwined concept. Because coroutines do not require a specific threading model or dedicated threads, it is efficient to choose a threading model that makes use of a set of threads that more closely matches the underlying computing capabilities. If you have 10 cores, it's more efficient to have 10ish threads vs 10,000. The caveat is that this will change with Project Loom and lightweight threads, but the concept is the same.
If the threading model makes use of a small number of shared threads, it is important that none of those threads are blocked from execution. This is the "footgun" we most often describe with coroutines, as it can sometimes be unclear that a function will eventually block.
In the Misk framework, the thread-per-request model is used, meaning each thread handles its request in a self-contained world, sharing minimal to no data with others. This design philosophy ensures that threads don't interfere with each other, making concurrency easier to manage and debugging simpler, while still allowing for parallel execution by assigning separate threads to different requests.
Misk's support for coroutines does not change this threading model. All of the above benefits of single thread per request are maintained, but the developer now has access to using coroutines for a more idiomatic, continuation-based parallel composition of tasks.
That being said, the developer could change the threading model. This is a common anti-pattern when the framework does not help with maintaining its threading model—the caller must make that decision. Being that it's a common pattern for coroutines to make use of shared thread pools for efficiency, that would appear to be a good, default decision.
Example of the Anti-Pattern:
If a developer calls runBlocking and changes the context to Dispatchers.IO:
runBlocking(Dispatchers.IO) {
// action handler code
}A new threading model is introduced where the single thread for the request is now parked, and the rest of the action handler is on a shared thread, thus changing the threading model! This is precisely what the changes to the frameworks (Misk, event-client, etc.) are meant to address, providing a way to use coroutines that extend the framework's threading model rather than replacing it.
When building a library that provides suspending functions, you may encounter times when a suspending call needs to make a blocking call, or needs access to a specific thread or threads. This might occur when integrating with system calls that don't have suspending support, using 3rd-party libraries that only provide blocking APIs, or when working with APIs that require execution on a specific thread (like Kafka's single-threaded consumer model).
A common mistake is to hardcode Dispatchers.IO within your library's suspending functions:
// ❌ Anti-pattern: Hardcoded dispatcher
class DataFetcher {
suspend fun fetchData() = withContext(Dispatchers.IO) {
// blocking call
blockingApiCall()
}
}Why this is problematic:
- Dictates threading model: Your library is forcing a specific threading model on all callers
- Breaks thread-local context: Switching to
Dispatchers.IOloses thread-local data (tracing, authentication, request context) - Prevents caller control: The caller cannot control the threading policy for your library
- Violates best practices: Libraries should respect the caller's threading model, not impose their own
Instead of hardcoding the dispatcher, inject it as a constructor parameter with a sensible default:
// ✅ Best practice: Injected dispatcher
class DataFetcher(
private val blockingDispatcher: CoroutineDispatcher = Dispatchers.IO.limitedParallelism(10)
) {
suspend fun fetchData() = withContext(blockingDispatcher) {
// blocking call
blockingApiCall()
}
}Why this is better:
- Caller controls threading: The service using your library can provide its own dispatcher
- Service-level policy: Threading policy can be centralized at the service level and shared across components
- Default convenience: The default parameter provides a sensible fallback for simple use cases
- Testability: Tests can inject a test dispatcher for deterministic behavior
- Respects threading model: Still follows the principle of not dictating the threading model—the caller has the choice
The best practice is to define threading policy at the service level and share it across all components that need it:
// Service-level configuration
class MyService(
// Shared dispatcher for all blocking I/O in this service
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO.limitedParallelism(20)
) {
private val dataFetcher = DataFetcher(ioDispatcher)
private val fileWriter = FileWriter(ioDispatcher)
private val databaseClient = DatabaseClient(ioDispatcher)
// All components share the same threading policy
suspend fun processRequest(request: Request): Response {
val data = dataFetcher.fetchData(request.id)
fileWriter.writeFile(data)
databaseClient.saveRecord(data)
return Response(data)
}
}Benefits of service-level policy:
- Centralized control: One place to configure threading behavior for the entire service
- Consistent behavior: All components use the same threading policy
- Easy to tune: Adjust parallelism limits based on service needs and load testing
- Shared resources: Efficient thread reuse across all components
- Not leaked to callers: External callers don't need to know about internal threading decisions
You might ask: "If libraries shouldn't dictate threading models, why inject dispatchers at all?"
The answer: Inject dispatchers not because every class should have its own threading model, but because threading policy should be local to the service and shared, not leaked to every caller or hardwired to Dispatchers.IO.
- Service-local: The service decides its threading policy based on its needs
- Shared: Multiple components within the service share the same policy
- Not leaked: External callers don't need to manage or know about internal threading
- Not hardwired: The policy can be changed without modifying library code
There are specific cases where a library or framework should dictate threading policy:
Example: Kafka Consumer
The Kafka consumer API requires that all operations (polling, committing offsets, etc.) happen on the same thread. This is a specific requirement of the Kafka protocol and client implementation:
class KafkaConsumerWrapper(
private val consumer: KafkaConsumer<K, V>
) {
// Kafka requires single-threaded access
private val kafkaDispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun poll(timeout: Duration): ConsumerRecords<K, V> =
withContext(kafkaDispatcher) {
consumer.poll(timeout)
}
suspend fun commitSync() =
withContext(kafkaDispatcher) {
consumer.commitSync()
}
}In this case, the library must control the threading to meet Kafka's requirements. However, this is not the general case—most libraries should inject dispatchers rather than hardcode them.
When building library suspending functions that need to make blocking calls:
- ✅ DO inject dispatchers as constructor parameters with sensible defaults
- ✅ DO allow the service to define and share threading policy across components
- ✅ DO use
Dispatchers.IO.limitedParallelism()as a default for blocking I/O - ❌ DON'T hardcode
Dispatchers.IOor other dispatchers in your library code - ❌ DON'T force a specific threading model on your callers
⚠️ EXCEPTION: Only dictate threading when there are specific requirements (like Kafka's single-thread constraint)
This guide focuses on two primary use cases for coroutines in server-side applications:
Parallel composition allows multiple independent operations to execute concurrently, reducing overall latency without requiring additional threads. This is particularly valuable for:
- Aggregating data from multiple sources (databases, microservices, APIs)
- Batch operations where each item can be processed independently
- Fan-out/fan-in patterns where you need to gather results from multiple concurrent operations
Benefits include reduced request latency, better resource utilization, and cleaner code compared to callback-based approaches.
As the Kotlin ecosystem matures, more libraries provide suspending APIs (gRPC, database drivers, HTTP clients). Using coroutines allows:
- Seamless integration with these modern libraries without impedance mismatch
- Idiomatic Kotlin code that leverages language features and tooling
- Structured concurrency for automatic lifecycle management and error handling
- Better composability when combining multiple suspending operations
The following sections provide detailed guidance on both use cases.
Parallel composition of tasks involves combining multiple processes or activities to run concurrently (at the same time) rather than sequentially. This approach aims to reduce overall execution time, maximize resource utilization, and improve efficiency.
Concurrency vs. Parallelism: While often used interchangeably, concurrency means a system can handle multiple tasks by switching between them, even on a single processor. Parallelism means tasks are literally running simultaneously on multiple processing units (e.g., multi-core CPUs).
Multithreading is a concurrency model where multiple threads execute tasks, potentially in parallel on different CPU cores. IO-bound operations block the executing thread, so achieving concurrency often requires creating additional threads. This introduces hazards such as race conditions, deadlocks, and heavy OS-level context switching due to large thread counts. While the programming paradigm is familiar and procedural, synchronization complexity increases rapidly.
Asynchronous programming starts tasks without waiting for them to finish, enabling concurrency through callbacks, promises, or futures. It excels for IO-bound operations because they do not require blocking a thread. Architectures range from single-threaded event loops (e.g., Node.js) to thread pools sized to CPU cores, providing true parallelism while minimizing context switching. The cost is a shift toward callback- or reactive-driven design rather than straightforward procedural flow.
Traditional multithreading using thread pools remains appropriate for CPU-bound work or when integrating with legacy systems. This approach uses explicit thread management via executors to achieve parallelism.
Example: Batch Processing with Thread Pool
fun batchGetAttributes(
accountHolderAttributeMap: Map<AccountHolderToken, List<AttributeType>>,
): Map<AccountHolderToken, AttributeFacade> {
val executor = Executors.newCachedThreadPool()
try {
val futures = accountHolderAttributeMap.map { (token, attributes) ->
executor.submit<Pair<AccountHolderToken, AttributeFacade>> {
val request =
FetchDataRequest.Builder()
.key(token.toKey())
.types(emptyList())
.attribute_types(attributes)
.consistent_read(false)
.build()
val response = service.FetchData().executeBlocking(request)
token to AttributeFacade(response.attribute_facade)
}
}
return futures.map { it.get() }.toMap()
} finally {
executor.shutdown()
}
}This example demonstrates:
Executors.newCachedThreadPool()creates threads on-demand for each task- Each account holder's attributes are fetched on a separate thread
- Requires N threads for N concurrent operations
Future.get()blocks until all operations complete- If any operation throws an exception,
Future.get()will throw that exception - All operations complete before errors are handled (no early cancellation)
- Manual executor lifecycle management with proper shutdown
- Thread blocking for aggregation, which can be inefficient for IO-bound operations
Asynchronous programming with callbacks allows non-blocking concurrency without explicit thread management. This pattern uses callback-based APIs (like gRPC's enqueue) to compose parallel operations, aggregating results as they complete.
Example: Batch Processing with Asynchronous Callbacks
fun batchGetAttributes(
accountHolderAttributeMap: Map<AccountHolderToken, List<AttributeType>>,
): Map<AccountHolderToken, AttributeFacade> {
val futures = accountHolderAttributeMap.map { (token, attributes) ->
val future = CompletableFuture<Pair<AccountHolderToken, AttributeFacade>>()
val request =
FetchDataRequest.Builder()
.key(token.toKey())
.types(emptyList())
.attribute_types(attributes)
.consistent_read(false)
.build()
service.FetchData().enqueue(request, object : Callback<FetchDataResponse> {
override fun onSuccess(response: FetchDataResponse) {
future.complete(token to AttributeFacade(response.attribute_facade))
}
override fun onFailure(exception: Exception) {
future.completeExceptionally(exception)
}
})
future
}
return CompletableFuture.allOf(*futures.toTypedArray())
.thenApply { futures.map { it.get() }.toMap() }
.get()
}This example demonstrates:
CompletableFutureto bridge callback-based APIs with future compositionenqueue()for non-blocking asynchronous execution- Requires 1 thread - callbacks execute on the gRPC thread pool
- Callbacks complete futures via
complete()orcompleteExceptionally() CompletableFuture.allOf()aggregates multiple futures- If any operation fails, the exception is thrown when calling
.get() - All operations complete before errors are handled (no early cancellation)
- Composable async operations but still requires blocking
.get()for final result
Structured concurrency ensures that all child coroutines complete before their parent scope finishes, providing predictable lifecycle management and automatic cleanup. This pattern uses coroutineScope or supervisorScope to create a boundary for concurrent operations:
coroutineScope: If any child coroutine fails, all other children are cancelled and the exception is propagated. Use this when all operations must succeed together.supervisorScope: Child coroutines fail independently without affecting siblings. Use this when you want to collect partial results even if some operations fail.
Both scopes suspend until all children complete, ensuring no coroutines are left running after the scope exits.
Example: Batch Processing with Structured Concurrency
suspend fun batchGetAttributes(
accountHolderAttributeMap: Map<AccountHolderToken, List<AttributeType>>,
): Map<AccountHolderToken, AttributeFacade> = coroutineScope {
accountHolderAttributeMap
.map { (token, attributes) ->
async {
val request =
FetchDataRequest.Builder()
.key(token.toKey())
.types(emptyList())
.attribute_types(attributes)
.consistent_read(false)
.build()
val response = service.FetchData().execute(request)
token to AttributeFacade(response.attribute_facade)
}
}
.awaitAll()
.toMap()
}This example demonstrates:
coroutineScopeestablishes a structured concurrency boundary- Each account holder's attributes are fetched in parallel using
async - Requires 1 thread - coroutines suspend during IO operations
awaitAll()waits for all parallel operations to complete- If any operation fails, all other coroutines in the scope are immediately cancelled
- Automatic cancellation propagation enables fail-fast behavior and resource cleanup
- Results are collected into a Map for easy access
Multiple threads introduce several challenges that should be carefully considered:
- Thread-Local Data Loss: Thread-local variables (like request context, user authentication, tracing information) don't automatically propagate to spawned threads. This can break observability and context tracking across your application.
- Tracing and Observability: Distributed tracing spans and correlation IDs stored in thread-locals may not propagate automatically when work moves to different threads. This data must be explicitly managed and propagated across thread boundaries, making it difficult to trace requests through your system without additional instrumentation.
- Thread Model Complications: Introducing shared thread pools can change your application's threading model and scaling characteristics. If additional thread pools are used, make sure to not change the underlying threading model or create additional thread contention.
- Misk Thread-Per-Request Model: Misk currently uses a thread-per-request model where each request has a dedicated thread for blocking work. This ensures that blocking operations in one request won't impact other requests in progress. Introducing additional thread pools can complicate this model.
- Library Dispatcher Injection: When building libraries, inject dispatchers rather than hardcoding them. This allows service-level threading policy to be centralized and shared, rather than having each library component create its own thread pools. See "Best Practices for Library Suspending Functions with Blocking Calls" for details.
Recommendation: Avoid using multiple threads for parallel composition if possible. Prefer structured concurrency (Pattern 3) which maintains thread-local context and works seamlessly with Misk's threading model.
| Criteria | Pattern 1: Multiple Concurrent Threads | Pattern 2: Multiple Asynchronous Calls | Pattern 3: Structured Concurrency (Preferred) |
|---|---|---|---|
| Single Threaded | ❌ No (N threads) | ✅ Yes (1 thread) | ✅ Yes (1 thread) |
| Structured Concurrency | ❌ No | ❌ No | ✅ Yes |
| Idiomatic Kotlin | ❌ No | ✅ Yes | |
| Procedural Programming Paradigm | ✅ Yes | ❌ No (callback-based) | ✅ Yes |
Recommendations:
- Pattern 3 (Structured Concurrency) is preferred for most server-side use cases: it provides idiomatic Kotlin code with automatic lifecycle management, efficient resource usage, and fail-fast error handling
- Pattern 1 (Multiple Threads) may be appropriate for CPU-bound work or when integrating with legacy blocking APIs that cannot be easily adapted
- Pattern 2 (Asynchronous Callbacks) is useful when working with existing callback-based libraries, but consider wrapping them in suspend functions to use Pattern 3 instead
Developer Responsibility:
Ultimately, the developer is responsible for the approach they choose. While Pattern 3 is generally recommended, your familiarity and comfort level with each pattern should be factored into the decision. If you're more experienced with traditional multithreading or callback-based programming and need to deliver working code quickly, it may be pragmatic to use what you know well. However, investing time to learn structured concurrency will pay dividends in code maintainability, debugging ease, and alignment with Kotlin idioms for future work.
If a library contains suspending functions, integration is straightforward. You have two options:
You can call suspending functions with runBlocking to convert them to blocking calls. If taking this approach, make sure not to change the threading model or switch to a shared thread. You already have a dedicated thread that fully supports blocking, so prefer to continue using that thread.
Example:
fun handleRequest(): Response {
val result = runBlocking {
thirdPartyLibrary.fetchData() // Suspending function
}
return Response(result)
}The other option is to convert your action or callback handler to suspending. There is no downside to this since the threading model is preserved and the thread fully supports blocking. Doing this will allow easy integration.
Keep in mind that calling suspending functions is transitive—the function that calls it and the function that calls the function that calls it need to be suspending. In this case, the callback entry point is already suspending.
Example:
suspend fun handleRequest(): Response {
val result = thirdPartyLibrary.fetchData() // Suspending function
return Response(result)
}If working with libraries that have interfaces or callbacks with suspending functions, it is best to use the framework support for suspension. The biggest reason this is preferred is that there will probably be shared code, like clients or tasks, that you will either implement as suspending or not. Since these functions are called from the library, it's very possible and highly likely that they will use structured concurrency.
A key piece of structured concurrency is being able to cancel child coroutines when another fails to clean up. This is also true for timeouts, as certain coroutines may have an execution timeout.
Coroutine cancellation is cooperative and relies on the coroutine being in a suspended state or transitioning to a suspended state for cancellation. Cancelling a running coroutine does nothing to the running thread. If the coroutine is in a blocking call, it won't be cancelled since the cancellation logic does nothing to the underlying thread.
To properly allow for cancellation, any blocking calls should be wrapped in runInterruptible, which looks for a coroutine cancellation exception and attempts to interrupt the thread. Depending on the threading, your blocking calls may also need to be moved to a block-safe thread.
Example of wrapping blocking calls:
suspend fun <T> BlockingQueue<T>.awaitTake(): T =
runInterruptible(dispatcher) { queue.take() }However, if the common code is suspending, this does not need to be dealt with. And if your entry points from the framework are suspending, you are free to call these suspending functions as well as other blocking calls as needed.
Recommendation: When integrating with 3rd-party libraries that provide suspending APIs, prefer making your framework entry points suspending. This allows you to leverage structured concurrency, proper cancellation, and seamless integration without worrying about wrapping blocking calls or managing thread interruption.
When testing suspending functions and coroutines, prefer using runTest from the kotlinx-coroutines-test library. This creates a TestScope with several test-friendly characteristics that make coroutine testing more reliable and predictable.
-
Virtual Time: Time is controlled by the test framework rather than real wall-clock time. This allows you to test time-dependent behavior (delays, timeouts) instantly without actually waiting.
-
TestDispatcher: Provides predictable coroutine scheduling. All coroutines launched within the test scope use a controlled dispatcher, making tests deterministic and eliminating race conditions.
-
Automatic Completion Checking: The test fails if coroutines don't complete after a timeout period, helping catch infinite loops or deadlocks.
-
Uncaught Exception Detection: The test fails automatically if any coroutine throws an uncaught exception, ensuring errors don't go unnoticed.
-
Deterministic Execution: Coroutines execute in a predictable order, making tests reproducible and easier to debug.
class MyServiceTest {
@Test
fun testBatchGetAttributes() = runTest {
val service = MyService()
val tokens = listOf(token1, token2, token3)
val result = service.batchGetAttributes(tokens)
assertEquals(3, result.size)
// Assertions on result
}
}Virtual time allows you to test time-dependent behavior without waiting:
@Test
fun testWithDelay() = runTest {
val startTime = testScheduler.currentTime
delay(1000) // Virtual delay - completes instantly
val endTime = testScheduler.currentTime
assertEquals(1000, endTime - startTime) // Virtual time advanced
} @Test
fun testWithTimeout() = runTest {
// This block will run with a timeout of 100ms
val result = withTimeout(100.milliseconds) {
// Simulate some work that might take time
delay(50) // Virtual delay skipped by runTest
"Operation Successful"
}
assertNotNull(result)
} @Test
fun testTimeoutFailure() = runTest {
// This test will fail because the delay exceeds the timeout
val e = assertFailsWith<TimeoutCancellationException> {
withTimeout(50.milliseconds) {
delay(100) // Virtual delay skipped, but still times out
"Should not reach here"
}
}
// Expected: The block is cancelled and an exception is thrown
println("Caught expected timeout exception: ${e.message}")
}@Test
fun testParallelFetch() = runTest {
val service = MyService()
val result1 = async { service.fetchResource1() }
val result2 = async { service.fetchResource2() }
val results = listOf(result1.await(), result2.await())
// Verify both results are not null
results.forEach { assertNotNull(it) }
}@Test
fun testErrorPropagation() = runTest {
val service = MyService()
assertFailsWith<ServiceException> {
service.batchGetAttributesWithFailure()
}
// Test automatically fails if exception is uncaught
}If your code uses specific dispatchers (e.g., an injected CoroutineDispatcher for blocking calls), you can inject a test dispatcher to ensure deterministic behavior in your tests.
Understanding StandardTestDispatcher:
The StandardTestDispatcher is a test dispatcher that provides controlled, deterministic execution of coroutines:
- Requires explicit advancement: Coroutines don't execute automatically. You must explicitly advance time using
advanceUntilIdle(),advanceTimeBy(), orrunCurrent(). - Virtual time control: Works with the test scheduler to control virtual time progression.
- Deterministic execution: Ensures coroutines execute in a predictable order, making tests reproducible.
- Integrates with
runTest: When you userunTest, it automatically creates aStandardTestDispatcherand handles time advancement for you.
When to use StandardTestDispatcher explicitly:
You typically need to create a StandardTestDispatcher explicitly when:
- Your production code accepts an injected
CoroutineDispatcher(e.g., for blocking I/O) - You want to test that your code properly uses the injected dispatcher
- You need to verify behavior with custom dispatcher configurations
Example: Testing code with an injected dispatcher
// Production code with injected dispatcher
class DataRepository(
private val blockingDispatcher: CoroutineDispatcher = Dispatchers.IO.limitedParallelism(10)
) {
suspend fun fetchData(id: String): Data = withContext(blockingDispatcher) {
// Blocking call
legacyApi.fetchDataBlocking(id)
}
}
// Test with injected StandardTestDispatcher
@Test
fun testWithCustomDispatcher() = runTest {
// Create a StandardTestDispatcher that shares the test scheduler
val testDispatcher = StandardTestDispatcher(testScheduler)
val repository = DataRepository(testDispatcher)
// Launch the suspending call
val result = repository.fetchData("test-id")
// Assertions
assertEquals(expectedData, result)
}Key points:
- Pass
testSchedulerfrom therunTestscope to theStandardTestDispatcherconstructor. This ensures the test dispatcher shares the same virtual time control. - The
runTestblock automatically advances time and waits for all coroutines to complete, so you typically don't need to manually calladvanceUntilIdle(). - This approach allows you to test that your code correctly uses the injected dispatcher without actually creating real threads or thread pools.
- ✅ DO use
runTestfor all coroutine tests - ✅ DO rely on virtual time for testing delays and timeouts
- ✅ DO test both success and failure scenarios
- ✅ DO verify that structured concurrency properly cancels child coroutines on failure
- ✅ DO inject dispatchers when testing code that uses specific threading behavior
- ❌ DON'T use
runBlockingin tests - it doesn't provide virtual time or test-friendly features - ❌ DON'T use real delays (
Thread.sleep) - they make tests slow and flaky - ❌ DON'T rely on timing-based assertions - use virtual time instead
-
Coroutines Are Not Threads: Coroutines are blocks of code with an associated context, not OS-managed execution units. They enable concurrent programming while maintaining procedural syntax.
-
Suspending Functions Are Syntactic Sugar: Under the hood, suspending functions are equivalent to Java's
CompletableFutureorCompletionStage, but with cleaner, procedural syntax. -
Respect the Threading Model: Don't introduce new dispatchers or thread pools unless absolutely necessary. The framework's threading model (like Misk's thread-per-request) should be preserved.
-
Structured Concurrency Is Preferred: Pattern 3 (Structured Concurrency with
coroutineScope/supervisorScope) provides automatic lifecycle management, fail-fast error handling, and efficient single-threaded execution. -
Single Thread, Multiple Coroutines: With Misk's coroutine support, you can perform parallel composition of tasks on a single thread without introducing thread pools or changing the threading model.
-
Cancellation Is Cooperative: Coroutine cancellation only works when coroutines are suspended or transitioning to a suspended state. Blocking calls prevent cancellation unless wrapped in
runInterruptible. -
Use
limitedParallelism()for Blocking Calls: When you need to make blocking calls within coroutines, useDispatchers.IO.limitedParallelism(N)to create an isolated view with controlled parallelism, avoiding unbounded thread creation. -
Service-Level Threading Policy: Define threading policy at the service level and inject dispatchers into library components. This centralizes control and allows efficient resource sharing without hardcoding dispatchers.
-
Test with
runTest: UserunTestfromkotlinx-coroutines-testfor testing coroutines. It provides virtual time, deterministic execution, and automatic error detection, making tests fast and reliable. -
Choose What Works for Your Team: While structured concurrency is recommended, ultimately you should choose an architecture that is appropriate for your service, maintainable by your team, and comfortable for developers.
- ✅ DO use
coroutineScopefor operations where all tasks must succeed together (fail-fast behavior) - ✅ DO use
supervisorScopewhen you want to collect partial results even if some operations fail - ✅ DO use
asyncandawaitAll()for parallel composition of independent tasks - ✅ DO preserve the caller's threading model by not introducing your own dispatcher
- ✅ DO make your framework entry points (action handlers, callbacks) suspending when integrating with suspending libraries
- ❌ DON'T call
runBlocking(Dispatchers.IO)- this changes the threading model - ❌ DON'T introduce new thread pools or dispatchers unless you have a specific, well-understood reason
- ❌ DON'T mix blocking and suspending calls without understanding the implications
- ❌ DON'T use
GlobalScope- it bypasses structured concurrency and can lead to leaked coroutines
- ✅ DO prefer making your entry points suspending for seamless integration
- ✅ DO use
runBlocking(without a dispatcher) if you need to call suspending functions from blocking code - ✅ DO wrap blocking calls in
runInterruptiblewhen implementing suspending interfaces that need proper cancellation support - ❌ DON'T change the threading model when wrapping suspending calls
- ✅ DO prefer Pattern 3 (Structured Concurrency) for most server-side use cases
- ✅ DO consider Pattern 1 (Multiple Threads) only for CPU-bound work or legacy integration
- ✅ DO consider Pattern 2 (Asynchronous Callbacks) when working with existing callback-based libraries
- ✅ DO factor in your team's familiarity and comfort level with each pattern
- ✅ DO understand the trade-offs: thread count, error handling, cancellation behavior
- ✅ DO implement suspending APIs without introducing dispatchers for non-blocking operations
- ✅ DO inject dispatchers (with sensible defaults) when your library needs to make blocking calls
- ✅ DO let the caller dictate the threading model through dispatcher injection
- ✅ DO allow service-level threading policy to be shared across library components
- ✅ DO provide blocking wrappers using
runBlocking(without changing dispatcher) if needed - ✅ DO consider providing
CompletableFuture-based APIs for maximum flexibility - ❌ DON'T hardcode
Dispatchers.IOor other dispatchers in your library code - ❌ DON'T force a specific threading model on your callers
⚠️ EXCEPTION: Only dictate threading when there are specific requirements (like Kafka's single-thread constraint)
- ✅ DO use
runTestfromkotlinx-coroutines-testfor all coroutine tests - ✅ DO rely on virtual time for testing delays and timeouts (instant execution)
- ✅ DO test both success and failure scenarios with structured concurrency
- ✅ DO verify that child coroutines are properly cancelled on failure
- ✅ DO inject
StandardTestDispatcherwhen testing code with custom dispatchers - ✅ DO pass
testSchedulerto injected test dispatchers for virtual time control - ❌ DON'T use
runBlockingin tests - it lacks virtual time and test-friendly features - ❌ DON'T use real delays (
Thread.sleep) - they make tests slow and flaky - ❌ DON'T rely on timing-based assertions - use virtual time instead
- ✅ DO understand that suspending functions are transitive - all callers must be suspending
- ✅ DO remember that thread-local data (tracing, authentication, context) is preserved with structured concurrency
- ✅ DO use structured concurrency to get automatic cleanup and fail-fast error handling
- ✅ DO test your understanding with small examples before applying to production code
- ❌ DON'T assume coroutines automatically make everything faster - they enable better resource utilization for IO-bound work
For parallel composition of suspending calls on a single thread:
suspend fun fetchMultiple() = coroutineScope {
val result1 = async { fetchData1() }
val result2 = async { fetchData2() }
CombinedResult(result1.await(), result2.await())
}For collecting partial results even if some operations fail:
suspend fun fetchMultipleWithPartialResults() = supervisorScope {
val result1 = async { fetchData1() } // May fail
val result2 = async { fetchData2() } // May fail
val result3 = async { fetchData3() } // May fail
// Collect results, handling failures individually
val data1 = runCatching { result1.await() }.getOrNull()
val data2 = runCatching { result2.await() }.getOrNull()
val data3 = runCatching { result3.await() }.getOrNull()
PartialResult(data1, data2, data3)
}For parallel composition of blocking calls using coroutines:
suspend fun fetchMultiple() = coroutineScope {
val dispatcher = Dispatchers.IO.limitedParallelism(10)
val result1 = async(dispatcher) { fetchDataBlocking1() }
val result2 = async(dispatcher) { fetchDataBlocking2() }
CombinedResult(result1.await(), result2.await())
}For parallel composition without using coroutines:
fun fetchMultiple(): CombinedResult {
val executor = Executors.newCachedThreadPool()
try {
val future1 = executor.submit { fetchDataBlocking1() }
val future2 = executor.submit { fetchDataBlocking2() }
return CombinedResult(future1.get(), future2.get())
} finally {
executor.shutdown()
}
}For calling suspending functions from blocking code:
fun blockingFunction() {
val result = runBlocking { // No dispatcher specified
suspendingFunction()
}
}For wrapping blocking calls with cancellation support:
suspend fun <T> BlockingQueue<T>.awaitTake(): T =
runInterruptible(dispatcher) { queue.take() }For library functions that need blocking calls:
class MyLibrary(
private val blockingDispatcher: CoroutineDispatcher = Dispatchers.IO.limitedParallelism(10)
) {
suspend fun fetchData() = withContext(blockingDispatcher) {
// blocking call here
blockingApiCall()
}
}For testing suspending functions:
@Test
fun testSuspendingFunction() = runTest {
val result = myService.fetchData()
assertEquals(expectedData, result)
}For testing with injected dispatchers:
@Test
fun testWithCustomDispatcher() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val repository = DataRepository(testDispatcher)
val result = repository.fetchData("test-id")
assertEquals(expectedData, result)
}Yes, that is the default behavior if you don't introduce any more threads via Dispatchers.
Example using async/await:
suspend fun fetchMultipleResources(): CombinedResult = coroutineScope {
val userDeferred = async { userService.fetchUser() }
val accountDeferred = async { accountService.fetchAccount() }
val settingsDeferred = async { settingsService.fetchSettings() }
CombinedResult(
user = userDeferred.await(),
account = accountDeferred.await(),
settings = settingsDeferred.await()
)
}In this example, all three service calls are composed in parallel on a single thread. Each suspending call releases the thread while waiting for the IO operation to complete, allowing other coroutines to execute.
If I use async/await in my action handler, and I call a blocking function, will it create thread starvation?
No, you still have a single thread for your request and are fully able to make blocking calls on that thread as you always could. However, if you create multiple concurrent coroutines with async, and each call blocks the thread, they will run serially, not in parallel.
Example of the problem:
suspend fun fetchMultipleResources(): CombinedResult = coroutineScope {
val userDeferred = async { userService.fetchUserBlocking() } // Blocks the thread
val accountDeferred = async { accountService.fetchAccountBlocking() } // Waits for user to complete
val settingsDeferred = async { settingsService.fetchSettingsBlocking() } // Waits for account to complete
CombinedResult(
user = userDeferred.await(),
account = accountDeferred.await(),
settings = settingsDeferred.await()
)
}In this case, even though you've used async, the blocking calls prevent true parallel composition. The coroutines will execute serially because each one blocks the single thread until completion. To achieve parallel composition, you must use suspending functions that properly suspend rather than block.
If I want to still use async/await but I need to make blocking calls, should I just use an executor and threads?
You could, but it's better to still use async/await but call async with Dispatchers.IO.limitedParallelism(). This does a few things:
- Allows you to use the
async/awaitAPI - Allows you to still use structured concurrency
- Is very lightweight in that it doesn't, by default, create new threads as it is a view on the dispatcher that is possible to re-use existing threads
This means it has the efficiency of a shared thread pool, but gives you an isolated view.
Example:
suspend fun fetchMultipleResourcesWithBlocking(): CombinedResult = coroutineScope {
val blockingDispatcher = Dispatchers.IO.limitedParallelism(10)
val userDeferred = async(blockingDispatcher) { userService.fetchUserBlocking() }
val accountDeferred = async(blockingDispatcher) { accountService.fetchAccountBlocking() }
val settingsDeferred = async(blockingDispatcher) { settingsService.fetchSettingsBlocking() }
CombinedResult(
user = userDeferred.await(),
account = accountDeferred.await(),
settings = settingsDeferred.await()
)
}In this example, the blocking calls execute on the Dispatchers.IO thread pool with a limited parallelism of 10, allowing true parallel execution while maintaining structured concurrency and efficient thread reuse.
If I have a library with suspending functions and I would like the library calls to compose concurrent tasks, should I introduce a Dispatcher for the caller?
No, the caller should dictate the threading model. If the calling coroutine context has a single thread, then all non-blocking operations should be on that single thread. If the calling context uses a small thread pool like the Default dispatcher, it should use that.
Why this matters:
When you write a library with suspending functions, you should respect the caller's threading model. The caller has chosen their dispatcher for a reason—whether it's a single thread per request (like Misk), a shared thread pool, or a specific thread (like Android's Main thread). By not introducing your own dispatcher, you:
- Preserve thread-local data (tracing, authentication, request context)
- Respect the caller's performance and concurrency characteristics
- Avoid unexpected thread switching and context loss
- Allow the caller to control resource usage
Example - Library code (correct approach):
// Library function - no dispatcher specified
suspend fun fetchUserData(userId: String): UserData = coroutineScope {
val profile = async { fetchProfile(userId) }
val preferences = async { fetchPreferences(userId) }
val settings = async { fetchSettings(userId) }
UserData(
profile = profile.await(),
preferences = preferences.await(),
settings = settings.await()
)
}The library function uses coroutineScope and async without specifying a dispatcher, allowing it to inherit the caller's context and threading model.
What if my library function needs to make blocking calls?
If your library needs to make blocking calls (e.g., calling a 3rd-party blocking API or system call), you should inject a dispatcher rather than hardcoding it. This allows the service to control threading policy while still respecting the principle of not dictating the threading model.
Example - Library with blocking calls:
// Library class with injected dispatcher
class UserDataRepository(
private val blockingDispatcher: CoroutineDispatcher = Dispatchers.IO.limitedParallelism(10)
) {
// Suspending function that needs to make blocking calls
suspend fun fetchUserData(userId: String): UserData = coroutineScope {
val profile = async(blockingDispatcher) {
// Blocking call to legacy API
legacyApi.fetchProfileBlocking(userId)
}
val preferences = async(blockingDispatcher) {
// Blocking call to file system
fileSystem.readPreferencesBlocking(userId)
}
UserData(
profile = profile.await(),
preferences = preferences.await()
)
}
}
// Service-level usage with shared dispatcher
class MyService(
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO.limitedParallelism(20)
) {
private val userRepo = UserDataRepository(ioDispatcher)
suspend fun handleRequest(userId: String) = userRepo.fetchUserData(userId)
}By injecting the dispatcher:
- The library doesn't hardcode
Dispatchers.IO - The service controls the threading policy (parallelism limits, thread pool configuration)
- Multiple components can share the same dispatcher for efficient resource usage
- Tests can inject a test dispatcher for deterministic behavior
See the "Best Practices for Managing Custom Dispatchers" section for more details.
No, but there are a couple of options for composing them:
This provides the most flexibility as it can be called and managed as a completion stage in a "reactive" type caller paradigm. It can be blocked on with .get() for a blocking procedural paradigm, or it can be converted to a continuation and suspending with .await().
Downside: Building the library API requires moving away from the procedural paradigm.
Example:
// Library provides CompletableFuture API
fun fetchUserDataAsync(userId: String): CompletableFuture<UserData> {
// Implementation using CompletableFuture
}
// Caller can use it three ways:
// 1. Reactive/callback style
fetchUserDataAsync(userId).thenApply { data -> processData(data) }
// 2. Blocking style
val data = fetchUserDataAsync(userId).get()
// 3. Suspending style
suspend fun getUser(userId: String) = fetchUserDataAsync(userId).await()Following best practices, this should not introduce new dispatchers or threads to the context. It can create new coroutines and make use of structured concurrency for error handling and cleanup.
The blocking version of the API can simply delegate in a runBlocking. Because this is a blocking call, runBlocking is appropriate. It should not call runBlocking(Dispatchers.IO) and introduce a new dispatcher—it should maintain the single-threaded model.
Warning: runBlocking(Dispatchers.IO) is dangerous as it does what was described above as changing the threading model from the underlying service.
Example:
// Library provides suspending API
suspend fun fetchUserData(userId: String): UserData = coroutineScope {
val profile = async { fetchProfile(userId) }
val preferences = async { fetchPreferences(userId) }
UserData(
profile = profile.await(),
preferences = preferences.await()
)
}
// Blocking wrapper - correct approach
fun fetchUserDataBlocking(userId: String): UserData = runBlocking {
fetchUserData(userId)
}
// WRONG - Don't do this!
fun fetchUserDataBlockingWrong(userId: String): UserData = runBlocking(Dispatchers.IO) {
fetchUserData(userId) // Changes threading model!
}In this approach, the suspending version is the primary implementation, and the blocking version is a thin wrapper using runBlocking without changing the dispatcher. This maintains the caller's threading model while providing both APIs.