Skip to content

Instantly share code, notes, and snippets.

@sshaplygin
Last active December 20, 2025 13:09
Show Gist options
  • Select an option

  • Save sshaplygin/0f9227a4b61c3fcbfaaae291b0faa0dd to your computer and use it in GitHub Desktop.

Select an option

Save sshaplygin/0f9227a4b61c3fcbfaaae291b0faa0dd to your computer and use it in GitHub Desktop.
Thread pool with single worker channel
[package]
name = "threadpool"
version = "0.1.0"
edition = "2024"
[dependencies]
chrono = "0.4.42"
tracing = "0.1.43"
tracing-subscriber = { version = "0.3.22", features = ["fmt", "time", "chrono"] }
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, mpsc};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use tracing::{debug, info, Level};
use tracing_subscriber::{FmtSubscriber, fmt::time::ChronoUtc};
type Job = Box<dyn FnOnce() + Send + 'static>;
type SharedReceiver = Arc<Mutex<mpsc::Receiver<Job>>>;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
impl ThreadPool {
pub fn new(size:usize) -> ThreadPool{
assert!(size > 0);
let (tx, rx) = mpsc::channel();
let receiver = Arc::new(Mutex::new(rx));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool{
workers: workers,
sender: Some(tx),
}
}
pub fn with_cores() -> ThreadPool {
let size = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
debug!("get cores count {}", size);
Self::new(size)
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
if let Some(tx) = self.sender.as_ref() {
tx.send(job).unwrap();
}
}
pub fn submit<F, R>(&self, f: F) -> Receiver<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (res_tx, res_rx) = mpsc::channel::<R>();
let job = Box::new(move || {
let result = f();
let _ = res_tx.send(result);
});
if let Some(tx) = self.sender.as_ref() {
tx.send(job).unwrap();
};
res_rx
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
info!("shutdown thread pool");
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
debug!("worker thread {} destroyed", worker.id);
}
}
info!("thread pool destroyed");
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, rx: SharedReceiver) -> Worker {
debug!("worker {} call new", id);
let thread = thread::spawn(move || {
debug!("worker {} started and waiting", id);
loop {
let job = {
let rx = rx.lock().unwrap();
rx.recv()
};
match job {
Ok(job) => {
debug!("call job from worker {}", id);
job();
},
Err(_) => {
debug!("job {} graceful shutdown", id);
break;
}
}
}
});
Worker{
id,
thread: Some(thread),
}
}
}
fn main() {
let timer = ChronoUtc::rfc_3339();
let subscriber =FmtSubscriber::builder()
.with_max_level(Level::DEBUG)
.with_timer(timer)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("setting default subscriber failed");
let pool = ThreadPool::with_cores();
pool.execute(|| {
thread::sleep(Duration::from_millis(500));
info!("task 1");
});
pool.execute(|| {
thread::sleep(Duration::from_millis(200));
info!("task 2");
});
pool.execute(|| {
thread::sleep(Duration::from_millis(100));
info!("task 3");
});
let future1 = pool.submit(|| {
100 * 100
});
let res1 = future1.recv().unwrap();
println!("result future1 {}", res1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment