Skip to content

Instantly share code, notes, and snippets.

@alexengrig
Created January 5, 2026 17:38
Show Gist options
  • Select an option

  • Save alexengrig/504ddcda282086318af8a621e8e66588 to your computer and use it in GitHub Desktop.

Select an option

Save alexengrig/504ddcda282086318af8a621e8e66588 to your computer and use it in GitHub Desktop.
pthread_*
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <csignal>
#include <cstring>
#include <pthread.h>
#include <thread>
#include <chrono>
#include <fstream>
#include <ctime>
#include <memory>
static int exit_code = 0;
struct Config {
int consumer_count = 0;
int max_consumer_sleep_ms = 0;
bool debug = false;
};
namespace {
Config app_config;
}
#define DBG(stmt) do { if (app_config.debug) { stmt; } } while (0)
struct SharedState {
std::queue<int> values;
size_t max_size = 1024;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
bool is_producer_active = true;
};
static SharedState shared_state;
struct ConsumerArgs {
int index = 0;
};
struct InterruptorArgs {
std::vector<pthread_t> *consumer_threads = nullptr;
pthread_t *producer_t = nullptr;
};
static volatile sig_atomic_t sigterm_received = 0;
static void on_sigterm(int) {
sigterm_received = 1;
}
void *producer_routine(void * /*unused*/) {
DBG(std::cout << "[producer] start" << std::endl);
std::ifstream ifs("in.txt");
if (!ifs.is_open()) {
std::cerr << "[producer] cannot open file: in.txt" << std::endl;
exit_code = 30;
return nullptr;
}
int value;
while (ifs >> value) {
if (sigterm_received) {
DBG(std::cout << "[producer] stopped by SIGTERM" << std::endl);
break;
}
pthread_mutex_lock(&shared_state.mutex);
while (!sigterm_received && shared_state.values.size() >= shared_state.max_size) {
pthread_cond_wait(&shared_state.not_full, &shared_state.mutex);
}
if (sigterm_received) {
pthread_mutex_unlock(&shared_state.mutex);
DBG(std::cout << "[producer] stopped by SIGTERM" << std::endl);
break;
}
shared_state.values.push(value);
pthread_cond_signal(&shared_state.not_empty);
pthread_mutex_unlock(&shared_state.mutex);
}
DBG(std::cout << "[producer] finish" << std::endl);
return nullptr;
}
void *consumer_routine(void *arg) {
auto *args = static_cast<ConsumerArgs *>(arg);
unsigned seed = args->index ^
static_cast<unsigned>(pthread_self()) ^
static_cast<unsigned>(time(nullptr));
int old_cancel_state = 0;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
int partial_sum = 0;
DBG(std::cout << "[consumer-" << args->index << "] start" << std::endl);
while (true) {
int value = 0;
pthread_mutex_lock(&shared_state.mutex);
while (!sigterm_received && shared_state.is_producer_active && shared_state.values.empty()) {
pthread_cond_wait(&shared_state.not_empty, &shared_state.mutex);
}
if (sigterm_received || (!shared_state.is_producer_active && shared_state.values.empty())) {
pthread_mutex_unlock(&shared_state.mutex);
DBG(std::cout << "[consumer-" << args->index << "] stopped by "
<< (sigterm_received ? "SIGTERM" : "producer") << std::endl);
break;
}
value = shared_state.values.front();
shared_state.values.pop();
pthread_cond_signal(&shared_state.not_full);
pthread_mutex_unlock(&shared_state.mutex);
partial_sum += value;
DBG(std::cout << "[consumer-" << args->index << "] got value = " << value
<< ", partial_sum = " << partial_sum << std::endl);
if (app_config.max_consumer_sleep_ms > 0) {
int ms = rand_r(&seed) % (app_config.max_consumer_sleep_ms + 1);
DBG(std::cout << "[consumer-" << args->index << "] start to sleep " << ms << " ms" << std::endl);
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
DBG(std::cout << "[consumer-" << args->index << "] finish to sleep " << ms << " ms" << std::endl);
}
}
// pthread_setcancelstate(old_cancel_state, nullptr);
DBG(std::cout << "[consumer-" << args->index << "] finish with partial_sum = " << partial_sum << std::endl);
return new int(partial_sum);
}
void *consumer_interruptor_routine(void *arg) {
auto *args = static_cast<InterruptorArgs *>(arg);
DBG(std::cout << "[interruptor] start" << std::endl);
unsigned seed = static_cast<unsigned>(pthread_self()) ^
static_cast<unsigned>(time(nullptr));
while (true) {
if (sigterm_received) {
pthread_cancel(*args->producer_t);
DBG(std::cout << "[interruptor] stopped by SIGTERM" << std::endl);
break;
}
pthread_mutex_lock(&shared_state.mutex);
bool should_stop = !shared_state.is_producer_active;
pthread_mutex_unlock(&shared_state.mutex);
if (should_stop) {
DBG(std::cout << "[interruptor] stopped by producer" << std::endl);
break;
}
if (args && args->consumer_threads && !args->consumer_threads->empty()) {
const auto &threads = *args->consumer_threads;
size_t idx = rand_r(&seed) % threads.size();
pthread_t target = threads[idx];
pthread_cancel(target);
}
int ms = (app_config.max_consumer_sleep_ms > 0)
? 1 + (rand_r(&seed) % (app_config.max_consumer_sleep_ms + 1))
: 1;
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
DBG(std::cout << "[interruptor] finish" << std::endl);
return nullptr;
}
int run_threads() {
// consumers
std::vector<pthread_t> consumer_ts(app_config.consumer_count);
std::vector<ConsumerArgs> consumer_args(app_config.consumer_count);
for (int i = 0; i < app_config.consumer_count; ++i) {
consumer_args[i].index = i;
if (int rc = pthread_create(&consumer_ts[i], nullptr, &consumer_routine, &consumer_args[i]); rc != 0) {
std::cerr << "pthread_create(consumer-" << i << ") failed: " << std::strerror(rc) << std::endl;
exit_code = 11;
return 0;
}
}
// producer
pthread_t producer_t{};
if (int rc = pthread_create(&producer_t, nullptr, &producer_routine, nullptr); rc != 0) {
std::cerr << "pthread_create(producer) failed: " << std::strerror(rc) << std::endl;
exit_code = 14;
return 0;
}
// interruptor
pthread_t interruptor_t{};
InterruptorArgs interruptor_args{&consumer_ts, &producer_t};
if (int rc = pthread_create(&interruptor_t, nullptr, &consumer_interruptor_routine, &interruptor_args); rc != 0) {
std::cerr << "pthread_create(interruptor) failed: " << std::strerror(rc) << std::endl;
exit_code = 13;
return 0;
}
// join and stop producing
pthread_join(producer_t, nullptr);
pthread_mutex_lock(&shared_state.mutex);
shared_state.is_producer_active = false;
pthread_cond_broadcast(&shared_state.not_empty);
pthread_mutex_unlock(&shared_state.mutex);
DBG(std::cout << "[runner] producer is joined" << std::endl);
// join interruptor
pthread_join(interruptor_t, nullptr);
DBG(std::cout << "[runner] interruptor is joined" << std::endl);
// join and sum consuming
int total_sum = 0;
for (int i = 0; i < app_config.consumer_count; ++i) {
void *ret = nullptr;
pthread_join(consumer_ts[i], &ret);
DBG(std::cout << "[runner] consumer-" << i << " is joined"<< std::endl);
if (ret) {
std::unique_ptr<int> partial{static_cast<int *>(ret)};
total_sum += *partial;
}
}
return total_sum;
}
int main(int argc, char **argv) {
if (argc < 3 || argc > 4) {
std::cerr << "Usage: " << argv[0] << " <consumer_count> <max_sleep_ms> [debug=0|1]" << std::endl;
return 1;
}
// parse args
try {
app_config.consumer_count = std::stoi(argv[1]);
app_config.max_consumer_sleep_ms = std::stoi(argv[2]);
app_config.debug = (argc == 4) ? (std::stoi(argv[3]) != 0) : false;
if (app_config.consumer_count < 1 || app_config.max_consumer_sleep_ms < 0) {
std::cerr << "Invalid arguments" << std::endl;
return 2;
}
DBG(std::cout << "[main] Config: "
<< "consumer_count = " << app_config.consumer_count
<< ", max_consumer_sleep_ms = " << app_config.max_consumer_sleep_ms
<< ", debug = " << app_config.debug << std::endl;
);
} catch (...) {
std::cerr << "Invalid arguments" << std::endl;
return 3;
}
// install SIGTERM handler
{
struct sigaction sa{};
sa.sa_handler = on_sigterm;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
if (sigaction(SIGTERM, &sa, nullptr) != 0) {
std::perror("sigaction(SIGTERM)");
return 4;
}
}
int sum = run_threads();
DBG(std::cout << "[main] Total sum: ");
std::cout << sum << std::endl;
return exit_code;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment