Skip to content

Instantly share code, notes, and snippets.

@tobz
Last active December 4, 2019 20:18
Show Gist options
  • Select an option

  • Save tobz/7b78985a56bad1cef30e08332c60ddeb to your computer and use it in GitHub Desktop.

Select an option

Save tobz/7b78985a56bad1cef30e08332c60ddeb to your computer and use it in GitHub Desktop.
use std::sync::Arc;
use futures_intrusive::sync::ManualResetEvent;
use crate::wg::{WaitGroup, WaitGroupHandle};
pub struct Context {
start: WaitGroup,
done: WaitGroup,
close: Arc<ManualResetEvent>,
}
pub struct ContextHandle {
start: WaitGroupHandle,
done: WaitGroupHandle,
close: Arc<ManualResetEvent>,
}
impl Context {
pub fn new() -> Context {
Context {
start: WaitGroup::new(),
done: WaitGroup::new(),
close: Arc::new(ManualResetEvent::new(false)),
}
}
pub fn handle(&mut self) -> ContextHandle {
ContextHandle {
start: self.start.add(),
done: self.done.add(),
close: self.close.clone(),
}
}
pub async fn wait_for_start(&mut self) {
self.start.wait().await
}
pub async fn wait_for_done(&mut self) {
self.done.wait().await
}
pub fn close(&mut self) {
self.close.set()
}
}
impl ContextHandle {
pub fn mark_started(&mut self) {
self.start.done()
}
pub fn mark_finished(&mut self) {
self.done.done()
}
pub fn close(&self) -> &ManualResetEvent {
&self.close
}
}
pub struct Liveness {
name: String,
handle: ContextHandle,
driver: Box<dyn Driver + Send + Sync>,
tx: mpsc::Sender<()>,
clock: Clock,
}
impl Liveness {
pub fn new(config: &Configuration, handle: ContextHandle, tx: mpsc::Sender<()>) -> Result<Liveness, DriverError> {
let driver = match config.cache_type.as_str() {
"redis" => {
let dsn = format!("redis://{}", config.cache_address);
Box::new(Redis::new(dsn.as_str())?)
},
x => return Err(("liveness", format!("failed to configure driver for cache type '{}'", x)).into())
};
Ok(Liveness {
name: config.node_name.clone(),
handle,
driver,
tx,
clock: Clock::new(),
})
}
pub async fn run(&mut self) {
info!("liveness checker started");
self.handle.mark_started();
let mut tick = Interval::new_interval(Duration::from_secs(1));
let close = self.handle.close();
loop {
select! {
_ = tick.next().fuse() => {
info!("liveness check fired");
// Write a value and read it back.
let now = self.clock.now();
let base_key = format!("pettycache:liveness:{}", self.name);
let loop_key = format!("{}:{}", base_key, now);
let now_str = now.to_string();
// Set our base key to indicate the last time we ran the liveness check.
if let Err(e) = self.driver.set(base_key.as_bytes(), now_str.as_bytes()).await {
error!("caught error while setting liveness value: {}", e);
continue
}
if let Err(e) = self.driver.set_ex(loop_key.as_bytes(), "liveness".as_bytes(), 15).await {
error!("caught error while setting liveness value: {}", e);
continue
}
match self.driver.get(loop_key.as_bytes()).await {
Ok(value) => match String::from_utf8(value) {
Ok(s) => info!("liveness value: {}", s),
Err(_) => error!("liveness value was not valid UTF-8; possible bug"),
},
Err(e) => error!("caught error while getting liveness value: {}", e),
}
},
_ = close.wait() => break,
}
}
info!("liveness check shutting down");
self.handle.mark_finished();
}
}
fn main() {
let close = Signals::new(&[SIGINT, SIGTERM])
.map(|s| {
let terminate = Arc::new(ManualResetEvent::new(false));
let close = terminate.clone();
thread::spawn(move || {
let _ = s.forever().next();
terminate.set();
});
close
})
.expect("failed to create signal handler");
tracing_subscriber::fmt::init();
run(close)
}
fn run(close: Arc<ManualResetEvent>) {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let config = Configuration::testing();
let mut context = Context::new();
// Spawn our liveness checker.
let liveness_handle = context.handle();
let (liveness_tx, _) = mpsc::channel(32);
let mut liveness = Liveness::new(&config, liveness_handle, liveness_tx)
.expect("failed to create liveness checker");
tokio::spawn(async move { liveness.run().await });
context.wait_for_start().await;
info!("pettycache-server started");
close.wait().await;
info!("got signal to shutdown");
context.close();
context.wait_for_done().await;
info!("pettycache-server done");
})
}
use std::sync::{
Arc, Mutex,
atomic::{AtomicUsize, Ordering},
};
use tokio::sync::oneshot;
pub struct WaitGroup {
outstanding: Arc<AtomicUsize>,
done: bool,
tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
rx: Option<oneshot::Receiver<()>>,
}
pub struct WaitGroupHandle {
outstanding: Arc<AtomicUsize>,
tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
impl WaitGroup {
pub fn new() -> WaitGroup {
let (tx, rx) = oneshot::channel();
WaitGroup {
outstanding: Arc::new(AtomicUsize::new(0)),
done: false,
tx: Arc::new(Mutex::new(Some(tx))),
rx: Some(rx),
}
}
pub fn add(&mut self) -> WaitGroupHandle {
self.outstanding.fetch_add(1, Ordering::SeqCst);
WaitGroupHandle {
outstanding: self.outstanding.clone(),
tx: self.tx.clone(),
}
}
pub async fn wait(&mut self) {
// If we're not already marked as being done, then just wait for the final handle to send
// us the all clear. No need to check for errors here, because we can't ever drop the
// handle as we're holding a reference to it.
if !self.done {
let _ = self.rx.take().expect("tried to wait on finished wait group").await;
self.done = true
}
}
}
impl WaitGroupHandle {
pub fn done(&mut self) {
// If we're the ones that take this to zero waiters, then we're also responsible for being
// the ones to notify the group itself that we're done. We also don't care if the receiver
// is still around or not. Not our problem.
if self.outstanding.fetch_sub(1, Ordering::SeqCst) == 1 {
let tx = self.tx.lock().unwrap().take().expect("sender not in wait group");
let _ = tx.send(());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment