Created
June 11, 2015 12:25
-
-
Save luoyetx/886caae406808bee15ee to your computer and use it in GitHub Desktop.
多线程非阻塞队列
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include "mqueue.h" | |
| #include <iostream> | |
| #include <string> | |
| #include <vector> | |
| #include <random> | |
| #include <Windows.h> | |
| using namespace std; | |
| struct Data { | |
| string name; | |
| int id; | |
| vector<int> ids; | |
| vector<string> names; | |
| //Data() {} | |
| //Data(const Data &other) { | |
| // name = other.name; | |
| // id = other.id; | |
| // ids = other.ids; | |
| // names = other.names; | |
| //} | |
| //Data& operator=(const Data &other) { | |
| // name = other.name; | |
| // id = other.id; | |
| // ids = other.ids; | |
| // names = other.names; | |
| // return *this; | |
| //} | |
| }; | |
| vector<int> ids{ 1, 2, 3, 5, 8, 6, 4, 7 }; | |
| vector<string> names{ "Alice", "Bob", "Cize", "Dave", "Ela", "Frank", "Gill", "Hibbe", "Ive", "Jack", "Kim" }; | |
| void productor(MQueue<Data> *q) { | |
| int nfd = 10; | |
| size_t self = this_thread::get_id().hash(); | |
| for (int i = 0; i < nfd; i++) { | |
| Data node; | |
| node.id = rand(); | |
| node.name = names[rand() % names.size()]; | |
| int size = rand() % 100; | |
| for (int j = 0; j < size; j++) { | |
| node.ids.push_back(rand()); | |
| } | |
| size = rand() % 100; | |
| for (int j = 0; j < size; j++) { | |
| node.names.push_back(names[rand() % names.size()]); | |
| } | |
| //printf("Productor Thread %uld is putting node into Queue\n", self); | |
| q->enqueue(node); | |
| printf("Productor Thread %uld has put node into Queue\nnode.name = %s\nnode.id = %d\nnode.ids.size = %d\nnode.names.size = %d\n", \ | |
| self, node.name.c_str(), node.id, node.ids.size(), node.names.size()); | |
| //Sleep(rand()%500); | |
| } | |
| } | |
| void consumer(MQueue<Data> *q) { | |
| int failed = 0; | |
| Data node; | |
| size_t self = this_thread::get_id().hash(); | |
| while (failed < 5) { | |
| //printf("Consumer Thread %uld trying to get data from Queue\n", self); | |
| if (!q->dequeue(node)) { | |
| ++failed; | |
| printf("Consumer Thread %uld get None form Queue\n", self); | |
| Sleep(rand()%500 + 500); | |
| continue; | |
| } | |
| Sleep(rand() % 500 + 500); | |
| failed = 0; | |
| printf("Consumer Thread %uld gets\nnode.name = %s\nnode.id = %d\nnode.ids.size = %d\nnode.names.size = %d\n", \ | |
| self, node.name.c_str(), node.id, node.ids.size(), node.names.size()); | |
| } | |
| } | |
| int main(int argc, char *argv[]) { | |
| MQueue<Data> q; | |
| int nfp = 2; | |
| int nfc = 4; | |
| vector<thread> pros; | |
| vector<thread> cons; | |
| for (int i = 0; i < nfp; ++i) { | |
| printf("Creating %dth productor\n", i); | |
| pros.push_back(thread(productor, &q)); | |
| } | |
| for (int i = 0; i < nfc; ++i) { | |
| printf("Creating %dth consumer\n", i); | |
| cons.push_back(thread(consumer, &q)); | |
| } | |
| for (auto& t : pros) { | |
| t.join(); | |
| } | |
| for (auto& t : cons) { | |
| t.join(); | |
| } | |
| printf("ALL DONE\n"); | |
| return 0; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #ifndef MQUEUE_H | |
| #define MQUEUE_H | |
| #include <queue> | |
| #include <thread> | |
| #include <mutex> | |
| // **class T needs a copy constructor and operator=** | |
| template<class T> | |
| class MQueue { | |
| public: | |
| typedef T NodeType; | |
| MQueue() {} | |
| MQueue(const MQueue &) = delete; | |
| MQueue& operator=(const MQueue &) = delete; | |
| ~MQueue() {} | |
| // enqueue with node | |
| void enqueue(NodeType &node) { | |
| mutex_.lock(); | |
| q_.push(node); | |
| mutex_.unlock(); | |
| } | |
| // dequeue | |
| // return true if success, and node hold the element | |
| // return false if failed | |
| bool dequeue(NodeType &node) { | |
| mutex_.lock(); | |
| if (q_.empty() == true) { | |
| mutex_.unlock(); | |
| return false; | |
| } | |
| node = q_.front(); // maybe time consume. | |
| q_.pop(); | |
| mutex_.unlock(); | |
| return true; | |
| } | |
| bool empty() { | |
| return q_.empty(); | |
| } | |
| public: | |
| NodeType emptyNode_; | |
| std::mutex mutex_; | |
| std::queue<NodeType> q_; | |
| }; | |
| #endif // MQUEUE_H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment