Created
July 7, 2025 09:09
-
-
Save SF-300/a490bb13caf87f22781488788cc2e1dd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import typing as t | |
| import asyncio as aio | |
| class Producer[T](t.Protocol): | |
| async def put(self, item: T) -> None: ... | |
| def put_nowait(self, item: T) -> None: ... | |
| def multi_put_nowait[T](queues: t.Iterable[Producer[T]], item: T) -> None: | |
| # Snapshot to avoid modification during iteration | |
| queues = tuple(queues) | |
| errors: list[aio.QueueFull] = [] | |
| if not queues: | |
| return | |
| for q in queues: | |
| try: | |
| q.put_nowait(item) | |
| except aio.QueueFull as e: | |
| errors.append(e) | |
| if errors: | |
| raise ExceptionGroup( | |
| "QueueFull for one or more subscribers during put_nowait", errors | |
| ) | |
| async def multi_put[T](queues: t.Iterable[Producer[T]], item: T) -> None: | |
| # Create a snapshot of queues with their filters to prevent issues with concurrent modification | |
| queues = tuple(queues) | |
| if not queues: | |
| return | |
| results = await aio.gather(*(q.put(item) for q in queues), return_exceptions=True) | |
| errors: list[Exception] = [] | |
| for res in results: | |
| if isinstance(res, Exception): | |
| errors.append(res) | |
| elif isinstance(res, BaseException): | |
| # For non-Exception BaseExceptions (e.g. KeyboardInterrupt, SystemExit, CancelledError from gather itself) | |
| # re-raise them immediately as they are not typically grouped application errors. | |
| raise res | |
| if errors: | |
| raise ExceptionGroup( | |
| "Error(s) occurred while putting item into subscriber queues", errors | |
| ) | |
| class QueueConsumer[T]: | |
| def __init__( | |
| self, | |
| queue: aio.Queue[T], | |
| unsubscribe: t.Callable[[], t.Any], | |
| ) -> None: | |
| self._queue = queue | |
| self._unsubscribe = unsubscribe | |
| async def get(self) -> T: | |
| return await self._queue.get() | |
| def get_nowait(self) -> T: | |
| return self._queue.get_nowait() | |
| def empty(self) -> bool: | |
| return self._queue.empty() | |
| def qsize(self) -> int: | |
| return self._queue.qsize() | |
| def full(self) -> bool: | |
| return self._queue.full() | |
| @property | |
| def maxsize(self) -> int: | |
| return self._queue.maxsize | |
| def task_done(self) -> None: | |
| self._queue.task_done() | |
| def __enter__(self) -> t.Self: | |
| return self | |
| def __exit__(self, *args, **kwargs) -> None: | |
| self._unsubscribe() | |
| _instantly_ready = aio.Event() | |
| _instantly_ready.set() | |
| class QueueProducer[T]: | |
| def __init__(self, maxsize: int = 0) -> None: | |
| # Dictionary: queue -> filter function | |
| self._queues: dict[aio.Queue[T], t.Callable[[T], bool]] = {} | |
| self._maxsize = maxsize | |
| async def put(self, item: T) -> None: | |
| queues = (q for q, filter_fn in self._queues.items() if filter_fn(item)) | |
| return await multi_put(queues, item) | |
| def put_nowait(self, item: T) -> None: | |
| queues = (q for q, filter_fn in self._queues.items() if filter_fn(item)) | |
| return multi_put_nowait(queues, item) | |
| @t.overload | |
| def subscribed( | |
| self, | |
| filter_: t.Callable[[T], bool] = ..., | |
| queue: aio.Queue[T] | None = ..., | |
| ) -> QueueConsumer[T]: ... | |
| @t.overload | |
| def subscribed[F]( | |
| self, | |
| filter_: type[F], | |
| queue: aio.Queue[T] | None = ..., | |
| ) -> QueueConsumer[F]: ... | |
| def subscribed(self, filter_=lambda _: True, queue=None): | |
| if queue is None: | |
| queue = aio.Queue(maxsize=self._maxsize) | |
| if not callable(filter_): | |
| def filter_fn(item, /): | |
| return isinstance(item, filter_) | |
| else: | |
| filter_fn = t.cast(t.Callable[[T], bool], filter_) | |
| self._queues[queue] = filter_fn | |
| def unsubscribe() -> None: | |
| self._queues.pop(queue, None) | |
| return QueueConsumer(queue, unsubscribe) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment