Last active
December 4, 2019 20:18
-
-
Save tobz/7b78985a56bad1cef30e08332c60ddeb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::sync::Arc; | |
| use futures_intrusive::sync::ManualResetEvent; | |
| use crate::wg::{WaitGroup, WaitGroupHandle}; | |
| pub struct Context { | |
| start: WaitGroup, | |
| done: WaitGroup, | |
| close: Arc<ManualResetEvent>, | |
| } | |
| pub struct ContextHandle { | |
| start: WaitGroupHandle, | |
| done: WaitGroupHandle, | |
| close: Arc<ManualResetEvent>, | |
| } | |
| impl Context { | |
| pub fn new() -> Context { | |
| Context { | |
| start: WaitGroup::new(), | |
| done: WaitGroup::new(), | |
| close: Arc::new(ManualResetEvent::new(false)), | |
| } | |
| } | |
| pub fn handle(&mut self) -> ContextHandle { | |
| ContextHandle { | |
| start: self.start.add(), | |
| done: self.done.add(), | |
| close: self.close.clone(), | |
| } | |
| } | |
| pub async fn wait_for_start(&mut self) { | |
| self.start.wait().await | |
| } | |
| pub async fn wait_for_done(&mut self) { | |
| self.done.wait().await | |
| } | |
| pub fn close(&mut self) { | |
| self.close.set() | |
| } | |
| } | |
| impl ContextHandle { | |
| pub fn mark_started(&mut self) { | |
| self.start.done() | |
| } | |
| pub fn mark_finished(&mut self) { | |
| self.done.done() | |
| } | |
| pub fn close(&self) -> &ManualResetEvent { | |
| &self.close | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| pub struct Liveness { | |
| name: String, | |
| handle: ContextHandle, | |
| driver: Box<dyn Driver + Send + Sync>, | |
| tx: mpsc::Sender<()>, | |
| clock: Clock, | |
| } | |
| impl Liveness { | |
| pub fn new(config: &Configuration, handle: ContextHandle, tx: mpsc::Sender<()>) -> Result<Liveness, DriverError> { | |
| let driver = match config.cache_type.as_str() { | |
| "redis" => { | |
| let dsn = format!("redis://{}", config.cache_address); | |
| Box::new(Redis::new(dsn.as_str())?) | |
| }, | |
| x => return Err(("liveness", format!("failed to configure driver for cache type '{}'", x)).into()) | |
| }; | |
| Ok(Liveness { | |
| name: config.node_name.clone(), | |
| handle, | |
| driver, | |
| tx, | |
| clock: Clock::new(), | |
| }) | |
| } | |
| pub async fn run(&mut self) { | |
| info!("liveness checker started"); | |
| self.handle.mark_started(); | |
| let mut tick = Interval::new_interval(Duration::from_secs(1)); | |
| let close = self.handle.close(); | |
| loop { | |
| select! { | |
| _ = tick.next().fuse() => { | |
| info!("liveness check fired"); | |
| // Write a value and read it back. | |
| let now = self.clock.now(); | |
| let base_key = format!("pettycache:liveness:{}", self.name); | |
| let loop_key = format!("{}:{}", base_key, now); | |
| let now_str = now.to_string(); | |
| // Set our base key to indicate the last time we ran the liveness check. | |
| if let Err(e) = self.driver.set(base_key.as_bytes(), now_str.as_bytes()).await { | |
| error!("caught error while setting liveness value: {}", e); | |
| continue | |
| } | |
| if let Err(e) = self.driver.set_ex(loop_key.as_bytes(), "liveness".as_bytes(), 15).await { | |
| error!("caught error while setting liveness value: {}", e); | |
| continue | |
| } | |
| match self.driver.get(loop_key.as_bytes()).await { | |
| Ok(value) => match String::from_utf8(value) { | |
| Ok(s) => info!("liveness value: {}", s), | |
| Err(_) => error!("liveness value was not valid UTF-8; possible bug"), | |
| }, | |
| Err(e) => error!("caught error while getting liveness value: {}", e), | |
| } | |
| }, | |
| _ = close.wait() => break, | |
| } | |
| } | |
| info!("liveness check shutting down"); | |
| self.handle.mark_finished(); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fn main() { | |
| let close = Signals::new(&[SIGINT, SIGTERM]) | |
| .map(|s| { | |
| let terminate = Arc::new(ManualResetEvent::new(false)); | |
| let close = terminate.clone(); | |
| thread::spawn(move || { | |
| let _ = s.forever().next(); | |
| terminate.set(); | |
| }); | |
| close | |
| }) | |
| .expect("failed to create signal handler"); | |
| tracing_subscriber::fmt::init(); | |
| run(close) | |
| } | |
| fn run(close: Arc<ManualResetEvent>) { | |
| let rt = tokio::runtime::Runtime::new().unwrap(); | |
| rt.block_on(async { | |
| let config = Configuration::testing(); | |
| let mut context = Context::new(); | |
| // Spawn our liveness checker. | |
| let liveness_handle = context.handle(); | |
| let (liveness_tx, _) = mpsc::channel(32); | |
| let mut liveness = Liveness::new(&config, liveness_handle, liveness_tx) | |
| .expect("failed to create liveness checker"); | |
| tokio::spawn(async move { liveness.run().await }); | |
| context.wait_for_start().await; | |
| info!("pettycache-server started"); | |
| close.wait().await; | |
| info!("got signal to shutdown"); | |
| context.close(); | |
| context.wait_for_done().await; | |
| info!("pettycache-server done"); | |
| }) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::sync::{ | |
| Arc, Mutex, | |
| atomic::{AtomicUsize, Ordering}, | |
| }; | |
| use tokio::sync::oneshot; | |
| pub struct WaitGroup { | |
| outstanding: Arc<AtomicUsize>, | |
| done: bool, | |
| tx: Arc<Mutex<Option<oneshot::Sender<()>>>>, | |
| rx: Option<oneshot::Receiver<()>>, | |
| } | |
| pub struct WaitGroupHandle { | |
| outstanding: Arc<AtomicUsize>, | |
| tx: Arc<Mutex<Option<oneshot::Sender<()>>>>, | |
| } | |
| impl WaitGroup { | |
| pub fn new() -> WaitGroup { | |
| let (tx, rx) = oneshot::channel(); | |
| WaitGroup { | |
| outstanding: Arc::new(AtomicUsize::new(0)), | |
| done: false, | |
| tx: Arc::new(Mutex::new(Some(tx))), | |
| rx: Some(rx), | |
| } | |
| } | |
| pub fn add(&mut self) -> WaitGroupHandle { | |
| self.outstanding.fetch_add(1, Ordering::SeqCst); | |
| WaitGroupHandle { | |
| outstanding: self.outstanding.clone(), | |
| tx: self.tx.clone(), | |
| } | |
| } | |
| pub async fn wait(&mut self) { | |
| // If we're not already marked as being done, then just wait for the final handle to send | |
| // us the all clear. No need to check for errors here, because we can't ever drop the | |
| // handle as we're holding a reference to it. | |
| if !self.done { | |
| let _ = self.rx.take().expect("tried to wait on finished wait group").await; | |
| self.done = true | |
| } | |
| } | |
| } | |
| impl WaitGroupHandle { | |
| pub fn done(&mut self) { | |
| // If we're the ones that take this to zero waiters, then we're also responsible for being | |
| // the ones to notify the group itself that we're done. We also don't care if the receiver | |
| // is still around or not. Not our problem. | |
| if self.outstanding.fetch_sub(1, Ordering::SeqCst) == 1 { | |
| let tx = self.tx.lock().unwrap().take().expect("sender not in wait group"); | |
| let _ = tx.send(()); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment