Last active
September 4, 2024 03:01
-
-
Save daipham3213/410b83277c45d54578a6d5536d65cbb6 to your computer and use it in GitHub Desktop.
asyncio event loop with worker pool, can be used in a native python thread
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """This module provides a custom event loop for the health manager service. | |
| The event loop is implemented as a thread that runs an asyncio event loop. | |
| This allows the health manager to run periodic tasks and other asynchronous | |
| operations without blocking the main thread. | |
| The event loop is started when the module is imported and runs until the | |
| program exits. The main thread can interact with the event loop by submitting | |
| tasks to it using the `spawn` function. | |
| """ | |
| import asyncio | |
| import atexit | |
| import inspect | |
| import logging | |
| import random | |
| import threading | |
| import typing as ty | |
| from functools import partial | |
| class EventLoop(threading.Thread): | |
| def __init__(self, group=None, name=None, maxsize=1000, logger=None): | |
| super().__init__(group=group, name=name, daemon=True) | |
| self._maxsize = maxsize | |
| self._logger = logger or logging.getLogger(__name__) | |
| self._loop = asyncio.new_event_loop() | |
| self._queue = asyncio.Queue(maxsize=self._maxsize) | |
| self._tasks = [] | |
| atexit.register(self._loop.stop) | |
| def run(self): | |
| """ Will start up worker pool and reset exception state """ | |
| self._loop.run_forever() | |
| def start(self): | |
| """Spawn N workers to process items from the queue.""" | |
| for _ in range(self._maxsize): | |
| self._loop.create_task(self._worker()) | |
| super().start() | |
| def spawn(self, fn: ty.Callable, *args, **kwargs): | |
| """Spawn a new task on the event loop.""" | |
| future = self._loop.create_future() | |
| coro = self._wrapper(fn, *args, **kwargs) | |
| asyncio.run_coroutine_threadsafe(self._spawn(future, coro), self._loop) | |
| return future | |
| async def _wrapper(self, fn: ty.Callable, *args, **kwargs): | |
| """Wrap a synchronous function in a coroutine.""" | |
| if inspect.iscoroutinefunction(fn): | |
| return await fn(*args, **kwargs) | |
| return fn(*args, **kwargs) | |
| def schedule(self, fn: ty.Callable, interval: int, *args, **kwargs): | |
| """Schedule a task to run at regular intervals.""" | |
| coro = partial(self._wrapper, fn, *args, **kwargs) | |
| future = self._loop.create_future() | |
| asyncio.run_coroutine_threadsafe( | |
| self._schedule(future, coro, interval), | |
| self._loop) | |
| return future | |
| async def _schedule(self, | |
| future: asyncio.Future, | |
| coro: ty.Coroutine, | |
| interval: int): | |
| """Internal function that schedules a task.""" | |
| while True: | |
| await self._queue.put((future, coro)) | |
| await asyncio.sleep(interval) | |
| async def _worker(self): | |
| """Worker function that processes items from the queue.""" | |
| while True: | |
| future: asyncio.Future | |
| coro: ty.Callable | |
| future, coro = await self._queue.get() | |
| try: | |
| if future.cancelled(): | |
| continue | |
| if future.done(): | |
| continue | |
| result = await coro() | |
| future.set_result(result) | |
| except Exception as ex: | |
| future.set_exception(ex) | |
| finally: | |
| self._queue.task_done() | |
| async def _spawn(self, future: asyncio.Future, coro: ty.Coroutine): | |
| """Fire a new item into the queue.""" | |
| await self._queue.put((future, coro)) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example usage