Skip to content

Instantly share code, notes, and snippets.

@SF-300
Created July 7, 2025 09:09
Show Gist options
  • Select an option

  • Save SF-300/a490bb13caf87f22781488788cc2e1dd to your computer and use it in GitHub Desktop.

Select an option

Save SF-300/a490bb13caf87f22781488788cc2e1dd to your computer and use it in GitHub Desktop.
import typing as t
import asyncio as aio
class Producer[T](t.Protocol):
async def put(self, item: T) -> None: ...
def put_nowait(self, item: T) -> None: ...
def multi_put_nowait[T](queues: t.Iterable[Producer[T]], item: T) -> None:
# Snapshot to avoid modification during iteration
queues = tuple(queues)
errors: list[aio.QueueFull] = []
if not queues:
return
for q in queues:
try:
q.put_nowait(item)
except aio.QueueFull as e:
errors.append(e)
if errors:
raise ExceptionGroup(
"QueueFull for one or more subscribers during put_nowait", errors
)
async def multi_put[T](queues: t.Iterable[Producer[T]], item: T) -> None:
# Create a snapshot of queues with their filters to prevent issues with concurrent modification
queues = tuple(queues)
if not queues:
return
results = await aio.gather(*(q.put(item) for q in queues), return_exceptions=True)
errors: list[Exception] = []
for res in results:
if isinstance(res, Exception):
errors.append(res)
elif isinstance(res, BaseException):
# For non-Exception BaseExceptions (e.g. KeyboardInterrupt, SystemExit, CancelledError from gather itself)
# re-raise them immediately as they are not typically grouped application errors.
raise res
if errors:
raise ExceptionGroup(
"Error(s) occurred while putting item into subscriber queues", errors
)
class QueueConsumer[T]:
def __init__(
self,
queue: aio.Queue[T],
unsubscribe: t.Callable[[], t.Any],
) -> None:
self._queue = queue
self._unsubscribe = unsubscribe
async def get(self) -> T:
return await self._queue.get()
def get_nowait(self) -> T:
return self._queue.get_nowait()
def empty(self) -> bool:
return self._queue.empty()
def qsize(self) -> int:
return self._queue.qsize()
def full(self) -> bool:
return self._queue.full()
@property
def maxsize(self) -> int:
return self._queue.maxsize
def task_done(self) -> None:
self._queue.task_done()
def __enter__(self) -> t.Self:
return self
def __exit__(self, *args, **kwargs) -> None:
self._unsubscribe()
_instantly_ready = aio.Event()
_instantly_ready.set()
class QueueProducer[T]:
def __init__(self, maxsize: int = 0) -> None:
# Dictionary: queue -> filter function
self._queues: dict[aio.Queue[T], t.Callable[[T], bool]] = {}
self._maxsize = maxsize
async def put(self, item: T) -> None:
queues = (q for q, filter_fn in self._queues.items() if filter_fn(item))
return await multi_put(queues, item)
def put_nowait(self, item: T) -> None:
queues = (q for q, filter_fn in self._queues.items() if filter_fn(item))
return multi_put_nowait(queues, item)
@t.overload
def subscribed(
self,
filter_: t.Callable[[T], bool] = ...,
queue: aio.Queue[T] | None = ...,
) -> QueueConsumer[T]: ...
@t.overload
def subscribed[F](
self,
filter_: type[F],
queue: aio.Queue[T] | None = ...,
) -> QueueConsumer[F]: ...
def subscribed(self, filter_=lambda _: True, queue=None):
if queue is None:
queue = aio.Queue(maxsize=self._maxsize)
if not callable(filter_):
def filter_fn(item, /):
return isinstance(item, filter_)
else:
filter_fn = t.cast(t.Callable[[T], bool], filter_)
self._queues[queue] = filter_fn
def unsubscribe() -> None:
self._queues.pop(queue, None)
return QueueConsumer(queue, unsubscribe)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment