Created
January 5, 2026 17:38
-
-
Save alexengrig/504ddcda282086318af8a621e8e66588 to your computer and use it in GitHub Desktop.
pthread_*
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <iostream> | |
| #include <vector> | |
| #include <queue> | |
| #include <string> | |
| #include <csignal> | |
| #include <cstring> | |
| #include <pthread.h> | |
| #include <thread> | |
| #include <chrono> | |
| #include <fstream> | |
| #include <ctime> | |
| #include <memory> | |
| static int exit_code = 0; | |
| struct Config { | |
| int consumer_count = 0; | |
| int max_consumer_sleep_ms = 0; | |
| bool debug = false; | |
| }; | |
| namespace { | |
| Config app_config; | |
| } | |
| #define DBG(stmt) do { if (app_config.debug) { stmt; } } while (0) | |
| struct SharedState { | |
| std::queue<int> values; | |
| size_t max_size = 1024; | |
| pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; | |
| pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER; | |
| pthread_cond_t not_full = PTHREAD_COND_INITIALIZER; | |
| bool is_producer_active = true; | |
| }; | |
| static SharedState shared_state; | |
| struct ConsumerArgs { | |
| int index = 0; | |
| }; | |
| struct InterruptorArgs { | |
| std::vector<pthread_t> *consumer_threads = nullptr; | |
| pthread_t *producer_t = nullptr; | |
| }; | |
| static volatile sig_atomic_t sigterm_received = 0; | |
| static void on_sigterm(int) { | |
| sigterm_received = 1; | |
| } | |
| void *producer_routine(void * /*unused*/) { | |
| DBG(std::cout << "[producer] start" << std::endl); | |
| std::ifstream ifs("in.txt"); | |
| if (!ifs.is_open()) { | |
| std::cerr << "[producer] cannot open file: in.txt" << std::endl; | |
| exit_code = 30; | |
| return nullptr; | |
| } | |
| int value; | |
| while (ifs >> value) { | |
| if (sigterm_received) { | |
| DBG(std::cout << "[producer] stopped by SIGTERM" << std::endl); | |
| break; | |
| } | |
| pthread_mutex_lock(&shared_state.mutex); | |
| while (!sigterm_received && shared_state.values.size() >= shared_state.max_size) { | |
| pthread_cond_wait(&shared_state.not_full, &shared_state.mutex); | |
| } | |
| if (sigterm_received) { | |
| pthread_mutex_unlock(&shared_state.mutex); | |
| DBG(std::cout << "[producer] stopped by SIGTERM" << std::endl); | |
| break; | |
| } | |
| shared_state.values.push(value); | |
| pthread_cond_signal(&shared_state.not_empty); | |
| pthread_mutex_unlock(&shared_state.mutex); | |
| } | |
| DBG(std::cout << "[producer] finish" << std::endl); | |
| return nullptr; | |
| } | |
| void *consumer_routine(void *arg) { | |
| auto *args = static_cast<ConsumerArgs *>(arg); | |
| unsigned seed = args->index ^ | |
| static_cast<unsigned>(pthread_self()) ^ | |
| static_cast<unsigned>(time(nullptr)); | |
| int old_cancel_state = 0; | |
| pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state); | |
| int partial_sum = 0; | |
| DBG(std::cout << "[consumer-" << args->index << "] start" << std::endl); | |
| while (true) { | |
| int value = 0; | |
| pthread_mutex_lock(&shared_state.mutex); | |
| while (!sigterm_received && shared_state.is_producer_active && shared_state.values.empty()) { | |
| pthread_cond_wait(&shared_state.not_empty, &shared_state.mutex); | |
| } | |
| if (sigterm_received || (!shared_state.is_producer_active && shared_state.values.empty())) { | |
| pthread_mutex_unlock(&shared_state.mutex); | |
| DBG(std::cout << "[consumer-" << args->index << "] stopped by " | |
| << (sigterm_received ? "SIGTERM" : "producer") << std::endl); | |
| break; | |
| } | |
| value = shared_state.values.front(); | |
| shared_state.values.pop(); | |
| pthread_cond_signal(&shared_state.not_full); | |
| pthread_mutex_unlock(&shared_state.mutex); | |
| partial_sum += value; | |
| DBG(std::cout << "[consumer-" << args->index << "] got value = " << value | |
| << ", partial_sum = " << partial_sum << std::endl); | |
| if (app_config.max_consumer_sleep_ms > 0) { | |
| int ms = rand_r(&seed) % (app_config.max_consumer_sleep_ms + 1); | |
| DBG(std::cout << "[consumer-" << args->index << "] start to sleep " << ms << " ms" << std::endl); | |
| std::this_thread::sleep_for(std::chrono::milliseconds(ms)); | |
| DBG(std::cout << "[consumer-" << args->index << "] finish to sleep " << ms << " ms" << std::endl); | |
| } | |
| } | |
| // pthread_setcancelstate(old_cancel_state, nullptr); | |
| DBG(std::cout << "[consumer-" << args->index << "] finish with partial_sum = " << partial_sum << std::endl); | |
| return new int(partial_sum); | |
| } | |
| void *consumer_interruptor_routine(void *arg) { | |
| auto *args = static_cast<InterruptorArgs *>(arg); | |
| DBG(std::cout << "[interruptor] start" << std::endl); | |
| unsigned seed = static_cast<unsigned>(pthread_self()) ^ | |
| static_cast<unsigned>(time(nullptr)); | |
| while (true) { | |
| if (sigterm_received) { | |
| pthread_cancel(*args->producer_t); | |
| DBG(std::cout << "[interruptor] stopped by SIGTERM" << std::endl); | |
| break; | |
| } | |
| pthread_mutex_lock(&shared_state.mutex); | |
| bool should_stop = !shared_state.is_producer_active; | |
| pthread_mutex_unlock(&shared_state.mutex); | |
| if (should_stop) { | |
| DBG(std::cout << "[interruptor] stopped by producer" << std::endl); | |
| break; | |
| } | |
| if (args && args->consumer_threads && !args->consumer_threads->empty()) { | |
| const auto &threads = *args->consumer_threads; | |
| size_t idx = rand_r(&seed) % threads.size(); | |
| pthread_t target = threads[idx]; | |
| pthread_cancel(target); | |
| } | |
| int ms = (app_config.max_consumer_sleep_ms > 0) | |
| ? 1 + (rand_r(&seed) % (app_config.max_consumer_sleep_ms + 1)) | |
| : 1; | |
| std::this_thread::sleep_for(std::chrono::milliseconds(ms)); | |
| } | |
| DBG(std::cout << "[interruptor] finish" << std::endl); | |
| return nullptr; | |
| } | |
| int run_threads() { | |
| // consumers | |
| std::vector<pthread_t> consumer_ts(app_config.consumer_count); | |
| std::vector<ConsumerArgs> consumer_args(app_config.consumer_count); | |
| for (int i = 0; i < app_config.consumer_count; ++i) { | |
| consumer_args[i].index = i; | |
| if (int rc = pthread_create(&consumer_ts[i], nullptr, &consumer_routine, &consumer_args[i]); rc != 0) { | |
| std::cerr << "pthread_create(consumer-" << i << ") failed: " << std::strerror(rc) << std::endl; | |
| exit_code = 11; | |
| return 0; | |
| } | |
| } | |
| // producer | |
| pthread_t producer_t{}; | |
| if (int rc = pthread_create(&producer_t, nullptr, &producer_routine, nullptr); rc != 0) { | |
| std::cerr << "pthread_create(producer) failed: " << std::strerror(rc) << std::endl; | |
| exit_code = 14; | |
| return 0; | |
| } | |
| // interruptor | |
| pthread_t interruptor_t{}; | |
| InterruptorArgs interruptor_args{&consumer_ts, &producer_t}; | |
| if (int rc = pthread_create(&interruptor_t, nullptr, &consumer_interruptor_routine, &interruptor_args); rc != 0) { | |
| std::cerr << "pthread_create(interruptor) failed: " << std::strerror(rc) << std::endl; | |
| exit_code = 13; | |
| return 0; | |
| } | |
| // join and stop producing | |
| pthread_join(producer_t, nullptr); | |
| pthread_mutex_lock(&shared_state.mutex); | |
| shared_state.is_producer_active = false; | |
| pthread_cond_broadcast(&shared_state.not_empty); | |
| pthread_mutex_unlock(&shared_state.mutex); | |
| DBG(std::cout << "[runner] producer is joined" << std::endl); | |
| // join interruptor | |
| pthread_join(interruptor_t, nullptr); | |
| DBG(std::cout << "[runner] interruptor is joined" << std::endl); | |
| // join and sum consuming | |
| int total_sum = 0; | |
| for (int i = 0; i < app_config.consumer_count; ++i) { | |
| void *ret = nullptr; | |
| pthread_join(consumer_ts[i], &ret); | |
| DBG(std::cout << "[runner] consumer-" << i << " is joined"<< std::endl); | |
| if (ret) { | |
| std::unique_ptr<int> partial{static_cast<int *>(ret)}; | |
| total_sum += *partial; | |
| } | |
| } | |
| return total_sum; | |
| } | |
| int main(int argc, char **argv) { | |
| if (argc < 3 || argc > 4) { | |
| std::cerr << "Usage: " << argv[0] << " <consumer_count> <max_sleep_ms> [debug=0|1]" << std::endl; | |
| return 1; | |
| } | |
| // parse args | |
| try { | |
| app_config.consumer_count = std::stoi(argv[1]); | |
| app_config.max_consumer_sleep_ms = std::stoi(argv[2]); | |
| app_config.debug = (argc == 4) ? (std::stoi(argv[3]) != 0) : false; | |
| if (app_config.consumer_count < 1 || app_config.max_consumer_sleep_ms < 0) { | |
| std::cerr << "Invalid arguments" << std::endl; | |
| return 2; | |
| } | |
| DBG(std::cout << "[main] Config: " | |
| << "consumer_count = " << app_config.consumer_count | |
| << ", max_consumer_sleep_ms = " << app_config.max_consumer_sleep_ms | |
| << ", debug = " << app_config.debug << std::endl; | |
| ); | |
| } catch (...) { | |
| std::cerr << "Invalid arguments" << std::endl; | |
| return 3; | |
| } | |
| // install SIGTERM handler | |
| { | |
| struct sigaction sa{}; | |
| sa.sa_handler = on_sigterm; | |
| sigemptyset(&sa.sa_mask); | |
| sa.sa_flags = 0; | |
| if (sigaction(SIGTERM, &sa, nullptr) != 0) { | |
| std::perror("sigaction(SIGTERM)"); | |
| return 4; | |
| } | |
| } | |
| int sum = run_threads(); | |
| DBG(std::cout << "[main] Total sum: "); | |
| std::cout << sum << std::endl; | |
| return exit_code; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment