Skip to content

Instantly share code, notes, and snippets.

@feliwir
Created October 16, 2025 07:52
Show Gist options
  • Select an option

  • Save feliwir/a43e2747ff2dc5b7316549bfa7b5811c to your computer and use it in GitHub Desktop.

Select an option

Save feliwir/a43e2747ff2dc5b7316549bfa7b5811c to your computer and use it in GitHub Desktop.
use std::{
pin::Pin,
task::{Context, Poll},
};
use chrono::{DateTime, Duration, Utc};
use futures_core::{Stream, stream::LocalBoxStream};
pub struct DiscardingStream<'a, T> {
last_message: Option<DateTime<Utc>>,
discard_interval: Duration,
stream: LocalBoxStream<'a, T>,
}
impl<'a, T> DiscardingStream<'a, T> {
pub fn new(
stream: LocalBoxStream<'a, T>,
discard_interval: Duration,
) -> DiscardingStream<'a, T> {
DiscardingStream {
last_message: None,
discard_interval,
stream,
}
}
}
impl<'a, T> Stream for DiscardingStream<'a, T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
let now = Utc::now();
if let Some(last) = this.last_message {
if (now - last) < this.discard_interval {
// Discard the message
return Poll::Pending;
}
}
this.last_message = Some(now);
Poll::Ready(Some(item))
}
other => other,
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment