Skip to content

Instantly share code, notes, and snippets.

@aadishv
Created January 31, 2026 01:00
Show Gist options
  • Select an option

  • Save aadishv/aa4b8418e2768d276fc7c6e37dddbc20 to your computer and use it in GitHub Desktop.

Select an option

Save aadishv/aa4b8418e2768d276fc7c6e37dddbc20 to your computer and use it in GitHub Desktop.
slop
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use wincode::{SchemaRead, SchemaWrite, config::{Configuration, Deserialize, Serialize}};
use std::time::Duration;
pub trait Transportable<'a>: SchemaRead<'a, Configuration, Dst = Self> + SchemaWrite<Configuration, Src = Self> {}
#[derive(Clone)]
pub struct History<T>
where
T: Sized,
{
pub packets: Vec<T>,
pub last_frame: Option<T>,
}
pub trait Compressible<'a, D: Transportable<'a>>: Sized {
fn compress(&self, history: History<Self>) -> D;
fn decompress(frame: D, history: History<Self>) -> Self;
}
pub trait TransportHost<'a, D: Transportable<'a>> {
fn send(&self, frame: D);
}
#[derive(SchemaRead, SchemaWrite, Clone, Debug)]
pub struct GrowingList(pub Vec<u8>);
#[derive(SchemaRead, SchemaWrite, Clone, Debug)]
pub enum CompressedGrowingList {
Scratch(Vec<u8>),
Concat(Vec<u8>),
}
impl Transportable<'_> for GrowingList {}
impl Transportable<'_> for CompressedGrowingList {}
impl Compressible<'_, CompressedGrowingList> for GrowingList {
fn compress(&self, history: History<Self>) -> CompressedGrowingList {
if let Some(last) = history.last_frame {
if self.0.starts_with(&last.0) && self.0.len() > last.0.len() {
return CompressedGrowingList::Concat(self.0[last.0.len()..].to_vec());
}
}
CompressedGrowingList::Scratch(self.0.clone())
}
fn decompress(frame: CompressedGrowingList, history: History<Self>) -> Self {
match frame {
CompressedGrowingList::Scratch(data) => GrowingList(data),
CompressedGrowingList::Concat(data) => {
let mut full = history.last_frame.unwrap().0;
full.extend(data);
GrowingList(full)
}
}
}
}
pub struct StdinTransport<T>
where
T: for<'a> Transportable<'a> + Send + Sync + Clone + 'static,
{
pub(crate) history: Arc<Mutex<History<T>>>,
}
impl<T> StdinTransport<T>
where
T: for<'a> Transportable<'a> + Send + Sync + Clone + 'static,
{
pub fn new() -> Self {
let history = Arc::new(Mutex::new(History {
packets: Vec::new(),
last_frame: None,
}));
let history_clone = history.clone();
tokio::spawn(async move {
let mut stdin = tokio::io::stdin();
loop {
let mut len_buf = [0u8; 4];
if stdin.read_exact(&mut len_buf).await.is_err() { break; }
let len = u32::from_le_bytes(len_buf) as usize;
let mut buffer = vec![0u8; len];
if stdin.read_exact(&mut buffer).await.is_err() { break; }
if let Ok(packet) = T::deserialize(&buffer, Configuration::default()) {
let mut hist = history_clone.lock().unwrap();
hist.packets.push(packet.clone());
hist.last_frame = Some(packet);
}
}
});
Self { history }
}
pub fn get_history(&self) -> History<T> {
self.history.lock().unwrap().clone()
}
}
pub struct StdoutTransport<T>
where
T: for<'a> Transportable<'a> + Send + Sync + Clone + 'static,
{
pub(crate) history: Arc<Mutex<History<T>>>,
tx: tokio::sync::mpsc::UnboundedSender<T>,
}
impl<T> StdoutTransport<T>
where
T: for<'a> Transportable<'a> + Send + Sync + Clone + 'static,
{
pub fn new() -> Self {
let history = Arc::new(Mutex::new(History {
packets: Vec::new(),
last_frame: None,
}));
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<T>();
let history_clone = history.clone();
tokio::spawn(async move {
let mut stdout = tokio::io::stdout();
while let Some(packet) = rx.recv().await {
{
let mut hist = history_clone.lock().unwrap();
hist.packets.push(packet.clone());
hist.last_frame = Some(packet.clone());
}
if let Ok(bytes) = T::serialize(&packet, Configuration::default()) {
let len = bytes.len() as u32;
let _ = stdout.write_all(&len.to_le_bytes()).await;
let _ = stdout.write_all(&bytes).await;
let _ = stdout.flush().await;
}
}
});
Self { history, tx }
}
pub fn send(&self, packet: T) {
let _ = self.tx.send(packet);
}
pub fn get_history(&self) -> History<T> {
self.history.lock().unwrap().clone()
}
}
pub struct CompressedStdinTransport<T, C>
where
T: for<'a> Compressible<'a, C> + Send + Sync + Clone + 'static,
C: for<'a> Transportable<'a> + Send + Sync + Clone + 'static,
{
history: Arc<Mutex<History<T>>>,
inner_history: Arc<Mutex<History<C>>>,
}
impl<T, C> CompressedStdinTransport<T, C>
where
T: for<'a> Compressible<'a, C> + Send + Sync + Clone + 'static,
C: for<'a> Transportable<'a> + Send + Sync + Clone + 'static,
{
pub fn new() -> Self {
let history = Arc::new(Mutex::new(History {
packets: Vec::new(),
last_frame: None,
}));
let inner_transport = StdinTransport::<C>::new();
let inner_history = inner_transport.history.clone();
let history_clone = history.clone();
let inner_history_clone = inner_history.clone();
tokio::spawn(async move {
let mut processed = 0;
loop {
let inner_packets = {
let h = inner_history_clone.lock().unwrap();
h.packets.clone()
};
if inner_packets.len() > processed {
for i in processed..inner_packets.len() {
let compressed = inner_packets[i].clone();
let mut hist = history_clone.lock().unwrap();
let decompressed = T::decompress(compressed, hist.clone());
hist.packets.push(decompressed.clone());
hist.last_frame = Some(decompressed);
}
processed = inner_packets.len();
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
});
Self { history, inner_history }
}
pub fn get_history(&self) -> History<T> {
self.history.lock().unwrap().clone()
}
pub fn get_compressed_history(&self) -> History<C> {
self.inner_history.lock().unwrap().clone()
}
}
pub struct CompressedStdoutTransport<T, C>
where
T: for<'a> Compressible<'a, C> + Send + Sync + Clone + 'static,
C: for<'a> Transportable<'a> + Send + Sync + Clone + 'static,
{
history: Arc<Mutex<History<T>>>,
inner: StdoutTransport<C>,
}
impl<T, C> CompressedStdoutTransport<T, C>
where
T: for<'a> Compressible<'a, C> + Send + Sync + Clone + 'static,
C: for<'a> Transportable<'a> + Send + Sync + Clone + 'static,
{
pub fn new() -> Self {
let history = Arc::new(Mutex::new(History {
packets: Vec::new(),
last_frame: None,
}));
let inner = StdoutTransport::<C>::new();
Self { history, inner }
}
pub fn send(&self, packet: T) {
let compressed = {
let mut hist = self.history.lock().unwrap();
let c = packet.compress(hist.clone());
hist.packets.push(packet.clone());
hist.last_frame = Some(packet);
c
};
self.inner.send(compressed);
}
pub fn get_history(&self) -> History<T> {
self.history.lock().unwrap().clone()
}
pub fn get_compressed_history(&self) -> History<C> {
self.inner.get_history()
}
}
#[tokio::main]
async fn main() {
let args: Vec<String> = std::env::args().collect();
let is_sender = args.contains(&"--sender".to_string());
if is_sender {
let transport = CompressedStdoutTransport::<GrowingList, CompressedGrowingList>::new();
let mut i = 0u8;
let mut data = Vec::new();
loop {
data.push(i);
let packet = GrowingList(data.clone());
transport.send(packet);
let hist = transport.get_history();
let c_hist = transport.get_compressed_history();
if let (Some(raw), Some(comp)) = (hist.last_frame, c_hist.last_frame) {
let raw_size = raw.0.len();
let comp_size = match comp {
CompressedGrowingList::Scratch(v) => v.len(),
CompressedGrowingList::Concat(v) => v.len(),
};
eprintln!("Sender: Raw {} bytes -> Compressed {} bytes", raw_size, comp_size);
}
i = (i + 1) % 20;
tokio::time::sleep(Duration::from_millis(500)).await;
}
} else {
let transport = CompressedStdinTransport::<GrowingList, CompressedGrowingList>::new();
let mut last_len = 0;
loop {
let history = transport.get_history();
let c_history = transport.get_compressed_history();
if history.packets.len() > last_len {
for i in last_len..history.packets.len() {
let packet = &history.packets[i];
let c_packet = &c_history.packets[i];
let c_type = match c_packet {
CompressedGrowingList::Scratch(_) => "Scratch",
CompressedGrowingList::Concat(_) => "Concat",
};
eprintln!("Receiver: Packet {} [Type: {}] -> {:?}", i, c_type, packet);
}
last_len = history.packets.len();
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment