Skip to content

Instantly share code, notes, and snippets.

@daipham3213
Last active September 4, 2024 03:01
Show Gist options
  • Select an option

  • Save daipham3213/410b83277c45d54578a6d5536d65cbb6 to your computer and use it in GitHub Desktop.

Select an option

Save daipham3213/410b83277c45d54578a6d5536d65cbb6 to your computer and use it in GitHub Desktop.
asyncio event loop with worker pool, can be used in a native python thread
"""This module provides a custom event loop for the health manager service.
The event loop is implemented as a thread that runs an asyncio event loop.
This allows the health manager to run periodic tasks and other asynchronous
operations without blocking the main thread.
The event loop is started when the module is imported and runs until the
program exits. The main thread can interact with the event loop by submitting
tasks to it using the `spawn` function.
"""
import asyncio
import atexit
import inspect
import logging
import random
import threading
import typing as ty
from functools import partial
class EventLoop(threading.Thread):
def __init__(self, group=None, name=None, maxsize=1000, logger=None):
super().__init__(group=group, name=name, daemon=True)
self._maxsize = maxsize
self._logger = logger or logging.getLogger(__name__)
self._loop = asyncio.new_event_loop()
self._queue = asyncio.Queue(maxsize=self._maxsize)
self._tasks = []
atexit.register(self._loop.stop)
def run(self):
""" Will start up worker pool and reset exception state """
self._loop.run_forever()
def start(self):
"""Spawn N workers to process items from the queue."""
for _ in range(self._maxsize):
self._loop.create_task(self._worker())
super().start()
def spawn(self, fn: ty.Callable, *args, **kwargs):
"""Spawn a new task on the event loop."""
future = self._loop.create_future()
coro = self._wrapper(fn, *args, **kwargs)
asyncio.run_coroutine_threadsafe(self._spawn(future, coro), self._loop)
return future
async def _wrapper(self, fn: ty.Callable, *args, **kwargs):
"""Wrap a synchronous function in a coroutine."""
if inspect.iscoroutinefunction(fn):
return await fn(*args, **kwargs)
return fn(*args, **kwargs)
def schedule(self, fn: ty.Callable, interval: int, *args, **kwargs):
"""Schedule a task to run at regular intervals."""
coro = partial(self._wrapper, fn, *args, **kwargs)
future = self._loop.create_future()
asyncio.run_coroutine_threadsafe(
self._schedule(future, coro, interval),
self._loop)
return future
async def _schedule(self,
future: asyncio.Future,
coro: ty.Coroutine,
interval: int):
"""Internal function that schedules a task."""
while True:
await self._queue.put((future, coro))
await asyncio.sleep(interval)
async def _worker(self):
"""Worker function that processes items from the queue."""
while True:
future: asyncio.Future
coro: ty.Callable
future, coro = await self._queue.get()
try:
if future.cancelled():
continue
if future.done():
continue
result = await coro()
future.set_result(result)
except Exception as ex:
future.set_exception(ex)
finally:
self._queue.task_done()
async def _spawn(self, future: asyncio.Future, coro: ty.Coroutine):
"""Fire a new item into the queue."""
await self._queue.put((future, coro))
@daipham3213
Copy link
Author

Example usage

# The global event loop instance
loop = EventLoop()
loop.start()


def schedule_task(n):
    print(f"task {n} started")


tasks = []
for i in range(10):
    t = loop.schedule(schedule_task, random.randint(1, 5), i)
    tasks.append(t)

tasks[2].cancel()

# Wait for all tasks to complete
loop.join()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment