Skip to content

Instantly share code, notes, and snippets.

@mecaneer23
Created June 24, 2025 03:23
Show Gist options
  • Select an option

  • Save mecaneer23/7b43f779c55fbf8057e09d74f21848e7 to your computer and use it in GitHub Desktop.

Select an option

Save mecaneer23/7b43f779c55fbf8057e09d74f21848e7 to your computer and use it in GitHub Desktop.
queue.Queue alternate which allows viewing the current state of the queue
"""
Debug queue
Use as drop-in replacement for queue.Queue
when debugging
Allows for reading current queue state
Much less efficient than a normal queue
in regard to both time and space
"""
from queue import Queue as StockQueue
from typing import Generic, TypeVar
T = TypeVar("T")
class Queue(Generic[T]):
"""A queue that has a readable debug state"""
def __init__(self) -> None:
self.queue: StockQueue[T] = StockQueue()
self.items: list[T] = []
def put(
self,
item: T,
block: bool = True, # noqa: FBT001, FBT002
timeout: float | None = None,
) -> None:
"""Add an item to the queue"""
self.queue.put(item, block, timeout)
self.items.append(item)
def get(
self,
block: bool = True, # noqa: FBT001, FBT002
timeout: float | None = None,
) -> T:
"""Get an item from the queue"""
if len(self.items) > 0:
self.items.pop(0)
return self.queue.get(block, timeout)
def empty(self) -> bool:
"""Return True if the queue is empty"""
return self.queue.empty()
def get_items(self) -> list[T]:
"""Get internal list of items from the queue"""
return self.items
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment