Last active
February 15, 2026 21:25
-
-
Save x42005e1f/857dcc8b6865a11f1ffc7767bb602779 to your computer and use it in GitHub Desktop.
An experimental guest mode for asyncio (inspired by Trio)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # SPDX-FileCopyrightText: 2026 Ilya Egorov <0x42005e1f@gmail.com> | |
| # SPDX-License-Identifier: ISC | |
| # requires-python = ">=3.8" | |
| # dependencies = [ | |
| # "outcome>=1.0.0", | |
| # "sniffio>=1.3.0", | |
| # "wrapt>=2.0.0", | |
| # ] | |
| import asyncio | |
| import sys | |
| from concurrent.futures import ThreadPoolExecutor | |
| import outcome | |
| from sniffio import thread_local as current_async_library_tlocal | |
| from wrapt import AutoObjectProxy | |
| _MAXIMUM_SELECT_TIMEOUT = 24 * 60 * 60 | |
| _THREAD_JOIN_TIMEOUT = 5 * 60 | |
| class _GuestSelector(AutoObjectProxy): | |
| __slots__ = "_guest_selector_future" | |
| def select(self, /, timeout=None): | |
| if self._guest_selector_future is not None: | |
| try: | |
| return self._guest_selector_future.result() | |
| finally: | |
| self._guest_selector_future = None # break reference cycles | |
| return self.__wrapped__.select(timeout) | |
| def _patch_loop(loop, /): | |
| loop_selector = loop._selector = _GuestSelector(loop._selector) | |
| loop_selector._guest_selector_future = None | |
| if getattr(loop, "_proactor", None) is loop_selector.__wrapped__: | |
| loop._proactor = loop_selector # Windows | |
| return loop | |
| # see asyncio.base_events.BaseEventLoop._run_once | |
| def _compute_nearest_timeout(loop, /): | |
| if loop._ready: | |
| return 0 | |
| if not loop._scheduled: | |
| return None | |
| handle = loop._scheduled[0] | |
| if handle.cancelled(): | |
| return 0 | |
| timeout = handle.when() - loop.time() | |
| if timeout < 0: | |
| return 0 | |
| if timeout > _MAXIMUM_SELECT_TIMEOUT: | |
| return _MAXIMUM_SELECT_TIMEOUT | |
| return timeout | |
| def _run_until_complete(loop, executor, future, /): | |
| selector = loop._selector | |
| select = selector.__wrapped__.select | |
| future = asyncio.ensure_future(future, loop=loop) | |
| while True: | |
| host_library = current_async_library_tlocal.name | |
| host_loop = asyncio._get_running_loop() | |
| try: | |
| asyncio.set_event_loop(None) | |
| asyncio._set_running_loop(None) | |
| current_async_library_tlocal.name = "asyncio" | |
| done = future.done() | |
| loop.stop() | |
| loop.run_forever() | |
| if done: | |
| break | |
| if future.done(): | |
| timeout = 0 | |
| else: | |
| timeout = _compute_nearest_timeout(loop) | |
| finally: | |
| current_async_library_tlocal.name = host_library | |
| asyncio._set_running_loop(host_loop) | |
| asyncio.set_event_loop(host_loop) | |
| if timeout is None or timeout: # timeout is non-zero | |
| selector._guest_selector_future = executor.submit(select, timeout) | |
| yield selector._guest_selector_future | |
| assert selector._guest_selector_future.done() | |
| else: # timeout is zero | |
| yield None | |
| # see asyncio.runners._cancel_all_tasks | |
| def _cancel_all_tasks(loop, executor, /): | |
| to_cancel = asyncio.all_tasks(loop) | |
| if not to_cancel: | |
| return | |
| for task in to_cancel: | |
| task.cancel() | |
| # see python/cpython#124640 | |
| async def wait_for_cancelled(): | |
| return await asyncio.gather(*to_cancel, return_exceptions=True) | |
| yield from _run_until_complete(loop, executor, wait_for_cancelled()) | |
| for task in to_cancel: | |
| if task.cancelled(): | |
| continue | |
| if task.exception() is None: | |
| continue | |
| loop.call_exception_handler({ | |
| "message": "unhandled exception during start_guest_run() shutdown", | |
| "exception": task.exception(), | |
| "task": task, | |
| }) | |
| def _run(loop, executor, future, /): | |
| try: | |
| yield from _run_until_complete(loop, executor, future) | |
| finally: # see asyncio.runners.Runner.close | |
| try: | |
| yield from _cancel_all_tasks(loop, executor) | |
| yield from _run_until_complete( | |
| loop, | |
| executor, | |
| loop.shutdown_asyncgens(), | |
| ) | |
| if sys.version_info >= (3, 12): | |
| yield from _run_until_complete( | |
| loop, | |
| executor, | |
| loop.shutdown_default_executor(_THREAD_JOIN_TIMEOUT), | |
| ) | |
| elif sys.version_info >= (3, 9): | |
| yield from _run_until_complete( | |
| loop, | |
| executor, | |
| loop.shutdown_default_executor(), | |
| ) | |
| finally: | |
| try: | |
| executor.shutdown(wait=False) | |
| finally: | |
| loop.close() | |
| def start_guest_run( | |
| async_fn, | |
| /, | |
| *args, | |
| done_callback, | |
| run_sync_soon_threadsafe, | |
| run_sync_soon_not_threadsafe=None, | |
| host_uses_signal_set_wakeup_fd=False, # ignored | |
| loop_factory=None, | |
| task_factory=None, | |
| context=None, | |
| debug=None, | |
| ): | |
| async def wrapper(*args): | |
| return await async_fn(*args) | |
| if run_sync_soon_not_threadsafe is None: | |
| run_sync_soon_not_threadsafe = run_sync_soon_threadsafe | |
| if loop_factory is None: | |
| if sys.version_info >= (3, 13): | |
| loop_factory = asyncio.EventLoop | |
| elif sys.platform == "win32": | |
| loop_factory = asyncio.ProactorEventLoop | |
| else: | |
| loop_factory = asyncio.SelectorEventLoop | |
| guest_loop = _patch_loop(loop_factory()) | |
| if task_factory is not None: | |
| guest_loop.set_task_factory(task_factory) | |
| if debug is not None: | |
| guest_loop.set_debug(debug) | |
| guest_executor = ThreadPoolExecutor(1, thread_name_prefix="asyncio-guest") | |
| if context is None: | |
| guest_task = guest_loop.create_task(wrapper(*args)) | |
| elif sys.version_info >= (3, 11): | |
| guest_task = guest_loop.create_task(wrapper(*args), context=context) | |
| else: | |
| guest_task = context.run(guest_loop.create_task, wrapper(*args)) | |
| guest_run = _run(guest_loop, guest_executor, guest_task) | |
| def guest_callback(future): | |
| run_sync_soon_threadsafe(host_callback) | |
| def host_callback(): | |
| try: | |
| future = next(guest_run) | |
| except StopIteration: # completed | |
| def guest_shutdown_callback(): | |
| done_callback(outcome.capture(guest_task.result)) | |
| run_sync_soon_not_threadsafe(guest_shutdown_callback) | |
| except BaseException as exc: # failed | |
| message = "The event loop has been closed prematurely" | |
| exception = RuntimeError(message) | |
| exception.__cause__ = exc | |
| if guest_task.done() and not guest_task.cancelled(): | |
| exception.__context__ = guest_task.exception() | |
| def guest_shutdown_callback(): | |
| nonlocal exception | |
| try: | |
| done_callback(outcome.Error(exception)) | |
| finally: | |
| del exception # break reference cycles | |
| run_sync_soon_not_threadsafe(guest_shutdown_callback) | |
| else: # running | |
| if future is None: # timeout is zero | |
| run_sync_soon_not_threadsafe(host_callback) | |
| else: # timeout is non-zero | |
| future.add_done_callback(guest_callback) | |
| run_sync_soon_threadsafe(host_callback) | |
| return guest_loop |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See fleming79/async-kernel#320 (comment).
A brief note regarding
host_uses_signal_set_wakeup_fd(for the 46th revision). This parameter is only relevant if the host lives longer than the guest (or if there is another guest; or if the host usessignal.set_wakeup_fd()in some special way). Otherwise, it does not matter who exactly will be woken up by signals received by the process: waking up the guest will also wake up the host due to the subsequentrun_sync_soon_threadsafe().The 52nd revision is the latest version that uses the greenlet library. It relies on python/cpython#110773, and the trade-off is that it requires Python ≥3.13. You can use it if the latest revision is not suitable for you for some reason.
There is also oremanj/aioguest, which also uses the greenlet approach. But it does not localize the asyncio event loop execution context, does not support multiple asyncio event loops, and has other implementation nuances. Сonceptually, however, it adheres to the same solutions as Trio.