Last active
February 7, 2026 16:20
-
-
Save ynkdir/651b36fe902fb2cf8ce23232bb3508a2 to your computer and use it in GitHub Desktop.
Connecting Asyncio and Winui3 event loops
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = ["win32more"] | |
| # /// | |
| import asyncio | |
| import heapq | |
| import threading | |
| from concurrent.futures import Future | |
| from win32more.Microsoft.UI.Dispatching import DispatcherQueue | |
| from win32more.Microsoft.UI.Xaml import Application, Window | |
| from win32more.Windows.Win32.System.Com import COINIT_APARTMENTTHREADED, CoInitializeEx | |
| from win32more.winui3 import XamlApplication | |
| class GuestEventLoop(asyncio.EventLoop): | |
| def __init__(self, dispatcher): | |
| super().__init__() | |
| self._dispatcher = dispatcher | |
| self._thread = threading.Thread(target=self.run_forever) | |
| def start_guest_loop(self): | |
| asyncio.events._set_running_loop(self) | |
| self._thread.start() | |
| def stop_guest_loop(self): | |
| self._thread.join() | |
| self._thread = None | |
| asyncio.events._set_running_loop(None) | |
| def _dispatch_and_wait(self, callback, *args): | |
| def wrapper(): | |
| try: | |
| future.set_result(callback(*args)) | |
| except Exception as e: | |
| future.set_exception(e) | |
| future = Future() | |
| self._dispatcher.dispatch(wrapper) | |
| return future.result() | |
| def stop(self): | |
| super().stop() | |
| self._write_to_self() # wakeup loop | |
| # Trio's guest mode model | |
| # | |
| # UI thread Asyncio thread | |
| # | |
| # +------------+ | |
| # dispatch | wait | | |
| # +-------+<--------------+------------+ | |
| # |process| | |
| # |events | | |
| # +-------+-------------->+------------+ | |
| # | wait | | |
| # +------------+ | |
| # copied whole code from asyncio/base_events.py and extracted __wait_events() and __process_events() | |
| def _run_once(self): | |
| self.__wait_events() | |
| self._dispatch_and_wait(self.__process_events) | |
| def __wait_events(self): | |
| """Run one full iteration of the event loop. | |
| This calls all currently ready callbacks, polls for I/O, | |
| schedules the resulting callbacks, and finally schedules | |
| 'call_later' callbacks. | |
| """ | |
| sched_count = len(self._scheduled) | |
| if ( | |
| sched_count > asyncio.base_events._MIN_SCHEDULED_TIMER_HANDLES | |
| and self._timer_cancelled_count / sched_count > asyncio.base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION | |
| ): | |
| # Remove delayed calls that were cancelled if their number | |
| # is too high | |
| new_scheduled = [] | |
| for handle in self._scheduled: | |
| if handle._cancelled: | |
| handle._scheduled = False | |
| else: | |
| new_scheduled.append(handle) | |
| heapq.heapify(new_scheduled) | |
| self._scheduled = new_scheduled | |
| self._timer_cancelled_count = 0 | |
| else: | |
| # Remove delayed calls that were cancelled from head of queue. | |
| while self._scheduled and self._scheduled[0]._cancelled: | |
| self._timer_cancelled_count -= 1 | |
| handle = heapq.heappop(self._scheduled) | |
| handle._scheduled = False | |
| timeout = None | |
| if self._ready or self._stopping: | |
| timeout = 0 | |
| elif self._scheduled: | |
| # Compute the desired timeout. | |
| timeout = self._scheduled[0]._when - self.time() | |
| if timeout > asyncio.base_events.MAXIMUM_SELECT_TIMEOUT: | |
| timeout = asyncio.base_events.MAXIMUM_SELECT_TIMEOUT | |
| elif timeout < 0: | |
| timeout = 0 | |
| event_list = self._selector.select(timeout) | |
| self._process_events(event_list) | |
| # Needed to break cycles when an exception occurs. | |
| event_list = None | |
| # Handle 'later' callbacks that are ready. | |
| end_time = self.time() + self._clock_resolution | |
| while self._scheduled: | |
| handle = self._scheduled[0] | |
| if handle._when >= end_time: | |
| break | |
| handle = heapq.heappop(self._scheduled) | |
| handle._scheduled = False | |
| self._ready.append(handle) | |
| def __process_events(self): | |
| # This is the only place where callbacks are actually *called*. | |
| # All other places just add them to ready. | |
| # Note: We run all currently scheduled callbacks, but not any | |
| # callbacks scheduled by callbacks run this time around -- | |
| # they will be run the next time (after another I/O poll). | |
| # Use an idiom that is thread-safe without using locks. | |
| ntodo = len(self._ready) | |
| for i in range(ntodo): | |
| handle = self._ready.popleft() | |
| if handle._cancelled: | |
| continue | |
| if self._debug: | |
| try: | |
| self._current_handle = handle | |
| t0 = self.time() | |
| handle._run() | |
| dt = self.time() - t0 | |
| if dt >= self.slow_callback_duration: | |
| asyncio.base_events.logger.warning( | |
| "Executing %s took %.3f seconds", asyncio.base_events._format_handle(handle), dt | |
| ) | |
| finally: | |
| self._current_handle = None | |
| else: | |
| handle._run() | |
| handle = None # Needed to break cycles when an exception occurs. | |
| class App(XamlApplication): | |
| def OnLaunched(self, args): | |
| self._init_asyncio() | |
| self._window = Window() | |
| self._window.Activate() | |
| asyncio.create_task(self.heavytask()) | |
| def _init_asyncio(self): | |
| self._dispatcher = DispatcherQueue.GetForCurrentThread() | |
| # To close application safely, stop asyncio loop first. | |
| self._dispatcher.ShutdownStarting += lambda s, e: self._loop.stop() | |
| self._dispatcher.ShutdownCompleted += lambda s, e: self._loop.stop_guest_loop() | |
| self._loop = GuestEventLoop(self) | |
| self._loop.start_guest_loop() | |
| def dispatch(self, fn): | |
| self._dispatcher.TryEnqueue(fn) | |
| async def heavytask(self): | |
| print(f"heavytask ({threading.get_native_id()}): start") | |
| for i in range(5): | |
| await asyncio.sleep(1) | |
| print(f"heavytask ({threading.get_native_id()}): {i}") | |
| print(f"heavytask ({threading.get_native_id()}): end") | |
| if __name__ == "__main__": | |
| CoInitializeEx(None, COINIT_APARTMENTTHREADED) | |
| Application.Start(lambda params: App()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment