Last active
December 13, 2025 21:59
-
-
Save JesseRMeyer/3b3fbf460e9d831b39afa19903447005 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { // usage code | |
| auto tsgq = ThreadSafeQueue<AtomicGrowingQueue<int>, os::Futex>(8); | |
| tsgq.Put(1); | |
| assert(tsgq.Pop() == 1); | |
| // AtomicStackAllocatedQueue takes different arguments than AtomicGrowingQueue but ThreadSafeQueue doesn't care | |
| auto tsfq = ThreadSafeQueue<AtomicStackAllocatedQueue<10, int>, os::Futex>(); | |
| tsfq.Put(1); | |
| assert(tsfq.Pop() == 1); | |
| } | |
| template <u32 N, typename T> | |
| class AtomicStackAllocatedQueue { //TODO(Jesse): Optimal acquire/release memory ordering (right now the result from every atomic operation is the most conservative via seq_cst) | |
| public: | |
| static_assert(N > 1); | |
| using value_type = T; | |
| AtomicStackAllocatedQueue(): capacity(N) {} | |
| AtomicStackAllocatedQueue(AtomicStackAllocatedQueue&& other) { | |
| if (this == &other) { | |
| //NOTE(Jesse): Bug? Is this even possible? | |
| return; | |
| } | |
| Cleanup(); | |
| //data = other.data; | |
| head_idx.store(other.head_idx.load()); | |
| tail_idx.store(other.tail_idx.load()); | |
| capacity.store(other.capacity.load()); | |
| other.head_idx = 0; | |
| other.tail_idx = 0; | |
| other.capacity = 0; | |
| } | |
| AtomicStackAllocatedQueue& | |
| operator=(AtomicStackAllocatedQueue&& other) { | |
| if (this == &other) { | |
| return *this; | |
| } | |
| Cleanup(); | |
| //data = memcopy(other.data; | |
| head_idx.store(other.head_idx.load()); | |
| tail_idx.store(other.tail_idx.load()); | |
| capacity.store(other.capacity.load()); | |
| other.data = nullptr; | |
| other.head_idx = 0; | |
| other.tail_idx = 0; | |
| other.capacity = 0; | |
| return *this; | |
| } | |
| AtomicStackAllocatedQueue(AtomicStackAllocatedQueue const& other) = delete; | |
| AtomicStackAllocatedQueue& | |
| operator=(AtomicStackAllocatedQueue const& other) = delete; | |
| ~AtomicStackAllocatedQueue() { | |
| Cleanup(); | |
| } | |
| template <typename U> | |
| inline void | |
| Put(U&& item) { | |
| assert(HasRoom()); | |
| defer(head_idx = (head_idx + 1) % capacity); | |
| data[head_idx] = forward<U>(item); | |
| } | |
| inline T& | |
| Front() { | |
| assert(not IsEmpty()); | |
| return data[tail_idx]; | |
| } | |
| //NOTE(Jesse): Return by value which is not abi compatible with STD due to concerns around exceptions (which this project disables) | |
| // and a minor point around copy by value inefficiencies, but the types provided to this queue usually | |
| // define move copy constructors by default and thus shouldn't be problematic (copy elided). | |
| // Otherwise, hopefully this call is inlined and dead code elimination removes the dead write. | |
| // | |
| // Optionally, the return value can be ignored and Front() used instead. | |
| inline T | |
| Pop() { | |
| assert(not IsEmpty()); | |
| defer(tail_idx = (tail_idx + 1) % capacity); | |
| return move(data[tail_idx]); | |
| } | |
| inline bool | |
| IsEmpty() { | |
| return head_idx == tail_idx; | |
| } | |
| inline bool | |
| IsFull() { | |
| return not HasRoom(); | |
| } | |
| inline bool | |
| HasRoom() { | |
| return (head_idx + 1) % capacity != tail_idx; | |
| } | |
| inline bool | |
| HasItems() { | |
| return not IsEmpty(); | |
| } | |
| protected: | |
| void | |
| Cleanup() { | |
| //delete[] data; | |
| //data = nullptr; | |
| head_idx = 0; | |
| tail_idx = 0; | |
| capacity = 0; | |
| } | |
| atomic<T> data[N]; | |
| atomic<u32> head_idx = 0; | |
| atomic<u32> tail_idx = 0; | |
| atomic<u32> capacity = 0; | |
| }; | |
| template <typename T> | |
| class AtomicGrowingQueue { | |
| public: | |
| using value_type = T; | |
| AtomicGrowingQueue(): data(8, 8) {} | |
| AtomicGrowingQueue(u32 initial_elements): data(initial_elements, initial_elements) { | |
| assert(initial_elements >= 2); | |
| } | |
| AtomicGrowingQueue(AtomicGrowingQueue&& other) { | |
| if (this == &other) { | |
| //NOTE(Jesse): Bug? Is this even possible? | |
| return; | |
| } | |
| Cleanup(); | |
| data = move(other.data); | |
| head_idx.store(other.head_idx.load()); | |
| tail_idx.store(other.tail_idx.load()); | |
| other.head_idx = 0; | |
| other.tail_idx = 0; | |
| } | |
| AtomicGrowingQueue& | |
| operator=(AtomicGrowingQueue&& other) { | |
| if (this == &other) { | |
| return *this; | |
| } | |
| Cleanup(); | |
| data = move(other.data); | |
| head_idx.store(other.head_idx.load()); | |
| tail_idx.store(other.tail_idx.load()); | |
| other.head_idx = 0; | |
| other.tail_idx = 0; | |
| return *this; | |
| } | |
| AtomicGrowingQueue(AtomicGrowingQueue const& other) = delete; | |
| AtomicGrowingQueue& | |
| operator=(AtomicGrowingQueue const& other) = delete; | |
| ~AtomicGrowingQueue() { | |
| Cleanup(); | |
| } | |
| template <typename U> | |
| inline void | |
| Put(U&& item) { | |
| assert(HasRoom()); | |
| defer(head_idx = (head_idx + 1) % data.Size()); | |
| data[head_idx] = forward<U>(item); | |
| } | |
| inline T& | |
| Front() { | |
| assert(not IsEmpty()); | |
| return data[tail_idx]; | |
| } | |
| //NOTE(Jesse): Return by value which is not abi compatible with STD due to concerns around exceptions (which this project disables) | |
| // and a minor point around copy by value inefficiencies, but the types provided to this queue usually | |
| // define move copy constructors by default and thus shouldn't be problematic (copy elided). | |
| // Otherwise, hopefully this call is inlined and dead code elimination removes the dead write. | |
| // | |
| // Optionally, the return value can be ignored and Front() used instead. | |
| inline T | |
| Pop() { | |
| assert(not IsEmpty()); | |
| defer(tail_idx = (tail_idx + 1) % data.Size()); | |
| return move(data[tail_idx]); | |
| } | |
| inline bool | |
| IsEmpty() { | |
| return head_idx == tail_idx; | |
| } | |
| inline bool | |
| IsFull() { | |
| return not HasRoom(); | |
| } | |
| inline bool | |
| HasRoom() { | |
| return (head_idx + 1) % data.Size() != tail_idx; | |
| } | |
| inline bool | |
| HasItems() { | |
| return not IsEmpty(); | |
| } | |
| protected: | |
| void | |
| Cleanup() { | |
| head_idx = 0; | |
| tail_idx = 0; | |
| } | |
| Vector<atomic<T>> data = nullptr; | |
| atomic<u32> head_idx = 0; | |
| atomic<u32> tail_idx = 0; | |
| }; | |
| template <typename QueueType, typename MutexType> | |
| class ThreadSafeQueue { | |
| public: | |
| using value_type = typename QueueType::value_type; | |
| ThreadSafeQueue() = default; | |
| template <typename... Args> | |
| explicit ThreadSafeQueue(Args&&... args): queue(forward<Args>(args)...) {} | |
| template <typename U> | |
| void | |
| Put(U&& item) { | |
| if (queue.IsFull()) { | |
| wait_mutex.Wait(); | |
| } | |
| mutex.Lock(); | |
| defer({ | |
| mutex.Unlock(); | |
| if (wait_mutex.IsWaiting()) { | |
| wait_mutex.Wake(); | |
| } | |
| }); | |
| queue.Put(forward<U>(item)); | |
| } | |
| value_type | |
| Pop() { | |
| mutex.Lock(); | |
| defer({ | |
| mutex.Unlock(); | |
| if (wait_mutex.IsWaiting()) { | |
| wait_mutex.Wake(); | |
| } | |
| }); | |
| return queue.Pop(); | |
| } | |
| private: | |
| QueueType queue; | |
| MutexType mutex; | |
| MutexType wait_mutex; | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment