Skip to content

Instantly share code, notes, and snippets.

@ynkdir
Last active February 7, 2026 16:20
Show Gist options
  • Select an option

  • Save ynkdir/651b36fe902fb2cf8ce23232bb3508a2 to your computer and use it in GitHub Desktop.

Select an option

Save ynkdir/651b36fe902fb2cf8ce23232bb3508a2 to your computer and use it in GitHub Desktop.
Connecting Asyncio and Winui3 event loops
# /// script
# dependencies = ["win32more"]
# ///
import asyncio
import heapq
import threading
from concurrent.futures import Future
from win32more.Microsoft.UI.Dispatching import DispatcherQueue
from win32more.Microsoft.UI.Xaml import Application, Window
from win32more.Windows.Win32.System.Com import COINIT_APARTMENTTHREADED, CoInitializeEx
from win32more.winui3 import XamlApplication
class GuestEventLoop(asyncio.EventLoop):
def __init__(self, dispatcher):
super().__init__()
self._dispatcher = dispatcher
self._thread = threading.Thread(target=self.run_forever)
def start_guest_loop(self):
asyncio.events._set_running_loop(self)
self._thread.start()
def stop_guest_loop(self):
self._thread.join()
self._thread = None
asyncio.events._set_running_loop(None)
def _dispatch_and_wait(self, callback, *args):
def wrapper():
try:
future.set_result(callback(*args))
except Exception as e:
future.set_exception(e)
future = Future()
self._dispatcher.dispatch(wrapper)
return future.result()
def stop(self):
super().stop()
self._write_to_self() # wakeup loop
# Trio's guest mode model
#
# UI thread Asyncio thread
#
# +------------+
# dispatch | wait |
# +-------+<--------------+------------+
# |process|
# |events |
# +-------+-------------->+------------+
# | wait |
# +------------+
# copied whole code from asyncio/base_events.py and extracted __wait_events() and __process_events()
def _run_once(self):
self.__wait_events()
self._dispatch_and_wait(self.__process_events)
def __wait_events(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
sched_count = len(self._scheduled)
if (
sched_count > asyncio.base_events._MIN_SCHEDULED_TIMER_HANDLES
and self._timer_cancelled_count / sched_count > asyncio.base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION
):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
timeout = self._scheduled[0]._when - self.time()
if timeout > asyncio.base_events.MAXIMUM_SELECT_TIMEOUT:
timeout = asyncio.base_events.MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
timeout = 0
event_list = self._selector.select(timeout)
self._process_events(event_list)
# Needed to break cycles when an exception occurs.
event_list = None
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
def __process_events(self):
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
asyncio.base_events.logger.warning(
"Executing %s took %.3f seconds", asyncio.base_events._format_handle(handle), dt
)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
class App(XamlApplication):
def OnLaunched(self, args):
self._init_asyncio()
self._window = Window()
self._window.Activate()
asyncio.create_task(self.heavytask())
def _init_asyncio(self):
self._dispatcher = DispatcherQueue.GetForCurrentThread()
# To close application safely, stop asyncio loop first.
self._dispatcher.ShutdownStarting += lambda s, e: self._loop.stop()
self._dispatcher.ShutdownCompleted += lambda s, e: self._loop.stop_guest_loop()
self._loop = GuestEventLoop(self)
self._loop.start_guest_loop()
def dispatch(self, fn):
self._dispatcher.TryEnqueue(fn)
async def heavytask(self):
print(f"heavytask ({threading.get_native_id()}): start")
for i in range(5):
await asyncio.sleep(1)
print(f"heavytask ({threading.get_native_id()}): {i}")
print(f"heavytask ({threading.get_native_id()}): end")
if __name__ == "__main__":
CoInitializeEx(None, COINIT_APARTMENTTHREADED)
Application.Start(lambda params: App())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment