Created
June 24, 2025 03:23
-
-
Save mecaneer23/7b43f779c55fbf8057e09d74f21848e7 to your computer and use it in GitHub Desktop.
queue.Queue alternate which allows viewing the current state of the queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Debug queue | |
| Use as drop-in replacement for queue.Queue | |
| when debugging | |
| Allows for reading current queue state | |
| Much less efficient than a normal queue | |
| in regard to both time and space | |
| """ | |
| from queue import Queue as StockQueue | |
| from typing import Generic, TypeVar | |
| T = TypeVar("T") | |
| class Queue(Generic[T]): | |
| """A queue that has a readable debug state""" | |
| def __init__(self) -> None: | |
| self.queue: StockQueue[T] = StockQueue() | |
| self.items: list[T] = [] | |
| def put( | |
| self, | |
| item: T, | |
| block: bool = True, # noqa: FBT001, FBT002 | |
| timeout: float | None = None, | |
| ) -> None: | |
| """Add an item to the queue""" | |
| self.queue.put(item, block, timeout) | |
| self.items.append(item) | |
| def get( | |
| self, | |
| block: bool = True, # noqa: FBT001, FBT002 | |
| timeout: float | None = None, | |
| ) -> T: | |
| """Get an item from the queue""" | |
| if len(self.items) > 0: | |
| self.items.pop(0) | |
| return self.queue.get(block, timeout) | |
| def empty(self) -> bool: | |
| """Return True if the queue is empty""" | |
| return self.queue.empty() | |
| def get_items(self) -> list[T]: | |
| """Get internal list of items from the queue""" | |
| return self.items |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment