Created
February 16, 2026 22:55
-
-
Save macguru/134e55b67bb2f265de0b02a29b793df3 to your computer and use it in GitHub Desktop.
Helper for implementing observations using cancellable async/await functions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Foundation | |
| import Synchronization | |
| /// Synchronization mechanism to support cooperative and cancellable waiting on a value that _might_ become available _at some point in the future_. | |
| public final class UncertainFuture<Value: Sendable>: Sendable { | |
| /// Initializes a new uncertain future. | |
| public init() {} | |
| /// Return the passed value to all currently suspended callers waiting for ``value``. | |
| public func complete(_ value: Value) { | |
| state.withLock { state in | |
| guard case let .pending(waiters) = state else { | |
| fatalError("Attempted to complete UncertainFuture twice!") | |
| } | |
| state = .completed(value) | |
| for waiter in waiters { | |
| waiter.continuation.resume(returning: value) | |
| } | |
| } | |
| } | |
| /// Returns the value of the uncertain future, as soon as it's available. | |
| /// | |
| /// Calls will suspend and be resumed only when a new value has been provided through ``complete(_:)`` or the calling task has been cancelled. | |
| /// | |
| /// - Throws: Aborts the waiting and throws a `CancellationError` when the enclosing task has been cancelled. | |
| public var value: Value { | |
| get async throws { | |
| try await getValue() | |
| } | |
| } | |
| // MARK: - Internal | |
| /// The uncertain future starts out pending with no waiters queued up. | |
| private let state = Mutex<State>(.pending([])) | |
| /// Lifecycle stages of an uncertain future | |
| private enum State { | |
| /// The uncertain future is currently suspending calls waiting for a value | |
| case pending([Waiter]) | |
| /// The value has been provided and can be returned immediately | |
| case completed(Value) | |
| } | |
| /// Descriptor of a suspended caller | |
| private struct Waiter: Sendable { | |
| /// Unique id so that the waiter can be found again in case of cancellation | |
| let id: UUID | |
| /// The continuation on which the waiter has been suspended | |
| let continuation: CheckedContinuation<Value, any Error> | |
| } | |
| /// Cancellable suspending function used to implement the awaiting getter | |
| private func getValue() async throws -> Value { | |
| // ID needs to be created first to be available in both closures of the cancellation handler. | |
| let id = UUID() | |
| return try await withTaskCancellationHandler { | |
| try await withCheckedThrowingContinuation { continuation in | |
| state.withLock { state in | |
| // Immediately resume if the task is already cancelled, no need to wait | |
| guard !Task.isCancelled else { | |
| continuation.resume(throwing: CancellationError()) | |
| return | |
| } | |
| switch state { | |
| case let .completed(value): | |
| // Immediately resume with the existing value | |
| continuation.resume(returning: value) | |
| case var .pending(waiters): | |
| // Not yet completed, append as new waiter and suspend | |
| waiters.append(.init(id: id, continuation: continuation)) | |
| state = .pending(waiters) | |
| } | |
| } | |
| } | |
| } onCancel: { | |
| state.withLock { state in | |
| // Gracefully check if observation is still in place (might have been resumed meanwhile) | |
| guard case var .pending(waiters) = state, | |
| let index = waiters.firstIndex(where: { $0.id == id }) | |
| else { return } | |
| // Remove waiter and update state | |
| let cancelled = waiters.remove(at: index) | |
| state = .pending(waiters) | |
| // Then cancel the continuation | |
| cancelled.continuation.resume(throwing: CancellationError()) | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Synchronization | |
| import Testing | |
| import UncertainFuture | |
| extension ToolkitTests { | |
| struct UncertainFutureTests {} | |
| } | |
| extension ToolkitTests.UncertainFutureTests { | |
| /// Basic completion where a waiting task receives the value. | |
| @Test func completion() async throws { | |
| let uncertainFuture = UncertainFuture<String>() | |
| // Start a task that waits for the uncertain future | |
| let waitingTask = Task.immediate { | |
| try await uncertainFuture.value | |
| } | |
| // Complete the uncertain future | |
| uncertainFuture.complete("Hello World") | |
| // Verify the waiting task receives the value | |
| #expect(try await waitingTask.value == "Hello World") | |
| } | |
| /// Can be completed _before_ any value is awaited. | |
| @Test func earlyCompletion() async throws { | |
| let uncertainFuture = UncertainFuture<Int>() | |
| // Complete first | |
| uncertainFuture.complete(42) | |
| // Then access value - should return immediately | |
| #expect(try await uncertainFuture.value == 42) | |
| } | |
| /// Completes multiple tasks concurrently. | |
| @Test func concurrentWaiters() async throws { | |
| let uncertainFuture = UncertainFuture<String>() | |
| let concurrentCount = 5 | |
| try await confirmation(expectedCount: concurrentCount) { confirmation in | |
| try await withThrowingTaskGroup(of: String.self) { group in | |
| for _ in 0..<concurrentCount { | |
| group.addImmediateTask { | |
| let result = try await uncertainFuture.value | |
| confirmation.confirm() | |
| return result | |
| } | |
| } | |
| // Complete the uncertain future after all waiters are set up | |
| uncertainFuture.complete("Shared Value") | |
| // All waiters should receive the same value | |
| for try await result in group { | |
| #expect(result == "Shared Value") | |
| } | |
| } | |
| } | |
| } | |
| /// Complete multiple requests from an independently running task. | |
| @Test func concurrentCompletion() async throws { | |
| let uncertainFuture = UncertainFuture<Int>() | |
| let concurrentCount = 5 | |
| try await withThrowingTaskGroup(of: Int.self) { group in | |
| for _ in 0..<concurrentCount { | |
| group.addTask { try await uncertainFuture.value } | |
| } | |
| Task { | |
| uncertainFuture.complete(123) | |
| } | |
| // All tasks should return the same value | |
| for try await result in group { | |
| #expect(result == 123) | |
| } | |
| } | |
| } | |
| /// Cancelled waiters receive a cancellation error. | |
| @Test func cancellation() async throws { | |
| let uncertainFuture = UncertainFuture<String>() | |
| // Start a task that waits for the uncertain future and cancel it | |
| let waitingTask = Task.immediate { | |
| try await uncertainFuture.value | |
| } | |
| waitingTask.cancel() | |
| // Should throw CancellationError | |
| await #expect(throws: CancellationError.self) { | |
| try await waitingTask.value | |
| } | |
| // Uncertain future should still be completable for other waiters | |
| let anotherTask = Task.immediate { | |
| try await uncertainFuture.value | |
| } | |
| uncertainFuture.complete("Still works") | |
| #expect(try await anotherTask.value == "Still works") | |
| } | |
| /// Multiple concurrent waiters can all receive a cancellation error. | |
| @Test func concurrentCancellation() async throws { | |
| let uncertainFuture = UncertainFuture<String>() | |
| let concurrentCount = 6 | |
| try await withThrowingTaskGroup(of: Result<String, any Error>.self) { group in | |
| // Start multiple concurrent waiters | |
| for _ in 0..<concurrentCount { | |
| group.addImmediateTask { | |
| await Result { | |
| try await uncertainFuture.value | |
| } | |
| } | |
| } | |
| // Cancel all tasks | |
| group.cancelAll() | |
| // All tasks should throw CancellationError | |
| for try await result in group { | |
| #expect(result.failureIfAny is CancellationError) | |
| } | |
| } | |
| } | |
| /// Cancellation should be respected if it happens before even starting to await the uncertain future. | |
| @MainActor @Test func earlyCancellation() async throws { | |
| let uncertainFuture = UncertainFuture<Int>() | |
| let started = Atomic(false) | |
| // Detached task that starts out cancelled. Use a global actor to ensure the task's further execution is on the merits of this test. | |
| let waitingTask = Task { @MainActor in | |
| #expect(Task.isCancelled) | |
| started.store(true, ordering: .relaxed) | |
| return try await uncertainFuture.value | |
| } | |
| waitingTask.cancel() | |
| // Task should now immediately return with a cancellation error | |
| await #expect(throws: CancellationError.self) { | |
| try await waitingTask.value | |
| } | |
| #expect(started.load(ordering: .relaxed) == true) | |
| } | |
| /// Cancellation should be ignored should it arrive after the completion, even if the waiter hasn't yet resumed execution | |
| @Test func lateCancellation() async throws { | |
| let uncertainFuture = UncertainFuture<Int>() | |
| let continued = Atomic(false) | |
| // Start an immediate task to ensure it's suspended on the continuation. | |
| let waitingTask = Task.immediate { | |
| let value = try await uncertainFuture.value | |
| #expect(Task.isCancelled) | |
| continued.store(true, ordering: .relaxed) | |
| return value | |
| } | |
| // Complete the uncertain future and then cancel the task | |
| uncertainFuture.complete(1099) | |
| waitingTask.cancel() | |
| // Task should still not have continued running | |
| #expect(continued.load(ordering: .relaxed) == false) | |
| // Run now, continuation has been continued first, so the task should deliver the value | |
| #expect(try await waitingTask.value == 1099) | |
| #expect(continued.load(ordering: .relaxed) == true) | |
| } | |
| /// Some waiters may receive a cancellation error but others (i.e. not cancelled ones) still receive the value. | |
| @Test func partialCancellation() async throws { | |
| let uncertainFuture = UncertainFuture<String>() | |
| let totalCount = 4 | |
| let cancelledCount = 2 | |
| var tasks: [Task<String, any Error>] = [] | |
| // Start multiple waiters | |
| for _ in 0..<totalCount { | |
| tasks.append(Task.immediate { | |
| try await uncertainFuture.value | |
| }) | |
| } | |
| // Cancel some of them | |
| for index in 0..<cancelledCount { | |
| tasks[index].cancel() | |
| } | |
| // Complete the uncertain future | |
| uncertainFuture.complete("Partial Success") | |
| // Check results | |
| for (index, task) in tasks.enumerated() { | |
| if index < cancelledCount { | |
| await #expect(throws: CancellationError.self) { | |
| try await task.value | |
| } | |
| } else { | |
| #expect(try await task.value == "Partial Success") | |
| } | |
| } | |
| } | |
| /// Deallocation after all waiters have finished. | |
| @Test func deallocation() async throws { | |
| weak var weakUncertainFuture: UncertainFuture<String>? | |
| var task: Task<String, any Error>! | |
| // Create uncertain future in a scope | |
| do { | |
| let uncertainFuture = UncertainFuture<String>() | |
| weakUncertainFuture = uncertainFuture | |
| task = Task.immediate { | |
| try await uncertainFuture.value | |
| } | |
| } | |
| // Uncertain future should still be alive due to task reference | |
| #expect(weakUncertainFuture != nil) | |
| // Cancel the task | |
| task.cancel() | |
| await #expect(throws: CancellationError.self) { | |
| try await task.value | |
| } | |
| // Now uncertain future should be deallocated | |
| #expect(weakUncertainFuture == nil) | |
| } | |
| #if !os(iOS) | |
| /// A `fatalError` is raised when the uncertain future is completed twice. | |
| @Test func preconditionDoubleCompletion() async { | |
| await #expect(processExitsWith: .failure) { | |
| let uncertainFuture = UncertainFuture<String>() | |
| uncertainFuture.complete("First") | |
| uncertainFuture.complete("Second") | |
| } | |
| } | |
| #endif | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Part of my blog post on Certain and Uncertain Futures.