Last active
December 20, 2025 13:09
-
-
Save sshaplygin/0f9227a4b61c3fcbfaaae291b0faa0dd to your computer and use it in GitHub Desktop.
Thread pool with single worker channel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [package] | |
| name = "threadpool" | |
| version = "0.1.0" | |
| edition = "2024" | |
| [dependencies] | |
| chrono = "0.4.42" | |
| tracing = "0.1.43" | |
| tracing-subscriber = { version = "0.3.22", features = ["fmt", "time", "chrono"] } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::sync::mpsc::Receiver; | |
| use std::sync::{Arc, Mutex, mpsc}; | |
| use std::thread::{self, JoinHandle}; | |
| use std::time::Duration; | |
| use tracing::{debug, info, Level}; | |
| use tracing_subscriber::{FmtSubscriber, fmt::time::ChronoUtc}; | |
| type Job = Box<dyn FnOnce() + Send + 'static>; | |
| type SharedReceiver = Arc<Mutex<mpsc::Receiver<Job>>>; | |
| pub struct ThreadPool { | |
| workers: Vec<Worker>, | |
| sender: Option<mpsc::Sender<Job>>, | |
| } | |
| impl ThreadPool { | |
| pub fn new(size:usize) -> ThreadPool{ | |
| assert!(size > 0); | |
| let (tx, rx) = mpsc::channel(); | |
| let receiver = Arc::new(Mutex::new(rx)); | |
| let mut workers = Vec::with_capacity(size); | |
| for id in 0..size { | |
| workers.push(Worker::new(id, Arc::clone(&receiver))); | |
| } | |
| ThreadPool{ | |
| workers: workers, | |
| sender: Some(tx), | |
| } | |
| } | |
| pub fn with_cores() -> ThreadPool { | |
| let size = thread::available_parallelism() | |
| .map(|n| n.get()) | |
| .unwrap_or(1); | |
| debug!("get cores count {}", size); | |
| Self::new(size) | |
| } | |
| pub fn execute<F>(&self, f: F) | |
| where | |
| F: FnOnce() + Send + 'static, | |
| { | |
| let job = Box::new(f); | |
| if let Some(tx) = self.sender.as_ref() { | |
| tx.send(job).unwrap(); | |
| } | |
| } | |
| pub fn submit<F, R>(&self, f: F) -> Receiver<R> | |
| where | |
| F: FnOnce() -> R + Send + 'static, | |
| R: Send + 'static, | |
| { | |
| let (res_tx, res_rx) = mpsc::channel::<R>(); | |
| let job = Box::new(move || { | |
| let result = f(); | |
| let _ = res_tx.send(result); | |
| }); | |
| if let Some(tx) = self.sender.as_ref() { | |
| tx.send(job).unwrap(); | |
| }; | |
| res_rx | |
| } | |
| } | |
| impl Drop for ThreadPool { | |
| fn drop(&mut self) { | |
| drop(self.sender.take()); | |
| info!("shutdown thread pool"); | |
| for worker in &mut self.workers { | |
| if let Some(thread) = worker.thread.take() { | |
| thread.join().unwrap(); | |
| debug!("worker thread {} destroyed", worker.id); | |
| } | |
| } | |
| info!("thread pool destroyed"); | |
| } | |
| } | |
| struct Worker { | |
| id: usize, | |
| thread: Option<thread::JoinHandle<()>>, | |
| } | |
| impl Worker { | |
| fn new(id: usize, rx: SharedReceiver) -> Worker { | |
| debug!("worker {} call new", id); | |
| let thread = thread::spawn(move || { | |
| debug!("worker {} started and waiting", id); | |
| loop { | |
| let job = { | |
| let rx = rx.lock().unwrap(); | |
| rx.recv() | |
| }; | |
| match job { | |
| Ok(job) => { | |
| debug!("call job from worker {}", id); | |
| job(); | |
| }, | |
| Err(_) => { | |
| debug!("job {} graceful shutdown", id); | |
| break; | |
| } | |
| } | |
| } | |
| }); | |
| Worker{ | |
| id, | |
| thread: Some(thread), | |
| } | |
| } | |
| } | |
| fn main() { | |
| let timer = ChronoUtc::rfc_3339(); | |
| let subscriber =FmtSubscriber::builder() | |
| .with_max_level(Level::DEBUG) | |
| .with_timer(timer) | |
| .finish(); | |
| tracing::subscriber::set_global_default(subscriber) | |
| .expect("setting default subscriber failed"); | |
| let pool = ThreadPool::with_cores(); | |
| pool.execute(|| { | |
| thread::sleep(Duration::from_millis(500)); | |
| info!("task 1"); | |
| }); | |
| pool.execute(|| { | |
| thread::sleep(Duration::from_millis(200)); | |
| info!("task 2"); | |
| }); | |
| pool.execute(|| { | |
| thread::sleep(Duration::from_millis(100)); | |
| info!("task 3"); | |
| }); | |
| let future1 = pool.submit(|| { | |
| 100 * 100 | |
| }); | |
| let res1 = future1.recv().unwrap(); | |
| println!("result future1 {}", res1) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment