Skip to content

Instantly share code, notes, and snippets.

@luoyetx
Created June 11, 2015 12:25
Show Gist options
  • Select an option

  • Save luoyetx/886caae406808bee15ee to your computer and use it in GitHub Desktop.

Select an option

Save luoyetx/886caae406808bee15ee to your computer and use it in GitHub Desktop.
多线程非阻塞队列
#include "mqueue.h"
#include <iostream>
#include <string>
#include <vector>
#include <random>
#include <Windows.h>
using namespace std;
struct Data {
string name;
int id;
vector<int> ids;
vector<string> names;
//Data() {}
//Data(const Data &other) {
// name = other.name;
// id = other.id;
// ids = other.ids;
// names = other.names;
//}
//Data& operator=(const Data &other) {
// name = other.name;
// id = other.id;
// ids = other.ids;
// names = other.names;
// return *this;
//}
};
vector<int> ids{ 1, 2, 3, 5, 8, 6, 4, 7 };
vector<string> names{ "Alice", "Bob", "Cize", "Dave", "Ela", "Frank", "Gill", "Hibbe", "Ive", "Jack", "Kim" };
void productor(MQueue<Data> *q) {
int nfd = 10;
size_t self = this_thread::get_id().hash();
for (int i = 0; i < nfd; i++) {
Data node;
node.id = rand();
node.name = names[rand() % names.size()];
int size = rand() % 100;
for (int j = 0; j < size; j++) {
node.ids.push_back(rand());
}
size = rand() % 100;
for (int j = 0; j < size; j++) {
node.names.push_back(names[rand() % names.size()]);
}
//printf("Productor Thread %uld is putting node into Queue\n", self);
q->enqueue(node);
printf("Productor Thread %uld has put node into Queue\nnode.name = %s\nnode.id = %d\nnode.ids.size = %d\nnode.names.size = %d\n", \
self, node.name.c_str(), node.id, node.ids.size(), node.names.size());
//Sleep(rand()%500);
}
}
void consumer(MQueue<Data> *q) {
int failed = 0;
Data node;
size_t self = this_thread::get_id().hash();
while (failed < 5) {
//printf("Consumer Thread %uld trying to get data from Queue\n", self);
if (!q->dequeue(node)) {
++failed;
printf("Consumer Thread %uld get None form Queue\n", self);
Sleep(rand()%500 + 500);
continue;
}
Sleep(rand() % 500 + 500);
failed = 0;
printf("Consumer Thread %uld gets\nnode.name = %s\nnode.id = %d\nnode.ids.size = %d\nnode.names.size = %d\n", \
self, node.name.c_str(), node.id, node.ids.size(), node.names.size());
}
}
int main(int argc, char *argv[]) {
MQueue<Data> q;
int nfp = 2;
int nfc = 4;
vector<thread> pros;
vector<thread> cons;
for (int i = 0; i < nfp; ++i) {
printf("Creating %dth productor\n", i);
pros.push_back(thread(productor, &q));
}
for (int i = 0; i < nfc; ++i) {
printf("Creating %dth consumer\n", i);
cons.push_back(thread(consumer, &q));
}
for (auto& t : pros) {
t.join();
}
for (auto& t : cons) {
t.join();
}
printf("ALL DONE\n");
return 0;
}
#ifndef MQUEUE_H
#define MQUEUE_H
#include <queue>
#include <thread>
#include <mutex>
// **class T needs a copy constructor and operator=**
template<class T>
class MQueue {
public:
typedef T NodeType;
MQueue() {}
MQueue(const MQueue &) = delete;
MQueue& operator=(const MQueue &) = delete;
~MQueue() {}
// enqueue with node
void enqueue(NodeType &node) {
mutex_.lock();
q_.push(node);
mutex_.unlock();
}
// dequeue
// return true if success, and node hold the element
// return false if failed
bool dequeue(NodeType &node) {
mutex_.lock();
if (q_.empty() == true) {
mutex_.unlock();
return false;
}
node = q_.front(); // maybe time consume.
q_.pop();
mutex_.unlock();
return true;
}
bool empty() {
return q_.empty();
}
public:
NodeType emptyNode_;
std::mutex mutex_;
std::queue<NodeType> q_;
};
#endif // MQUEUE_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment