Created
January 31, 2026 01:00
-
-
Save aadishv/aa4b8418e2768d276fc7c6e37dddbc20 to your computer and use it in GitHub Desktop.
slop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::sync::{Arc, Mutex}; | |
| use tokio::io::{AsyncReadExt, AsyncWriteExt}; | |
| use wincode::{SchemaRead, SchemaWrite, config::{Configuration, Deserialize, Serialize}}; | |
| use std::time::Duration; | |
| pub trait Transportable<'a>: SchemaRead<'a, Configuration, Dst = Self> + SchemaWrite<Configuration, Src = Self> {} | |
| #[derive(Clone)] | |
| pub struct History<T> | |
| where | |
| T: Sized, | |
| { | |
| pub packets: Vec<T>, | |
| pub last_frame: Option<T>, | |
| } | |
| pub trait Compressible<'a, D: Transportable<'a>>: Sized { | |
| fn compress(&self, history: History<Self>) -> D; | |
| fn decompress(frame: D, history: History<Self>) -> Self; | |
| } | |
| pub trait TransportHost<'a, D: Transportable<'a>> { | |
| fn send(&self, frame: D); | |
| } | |
| #[derive(SchemaRead, SchemaWrite, Clone, Debug)] | |
| pub struct GrowingList(pub Vec<u8>); | |
| #[derive(SchemaRead, SchemaWrite, Clone, Debug)] | |
| pub enum CompressedGrowingList { | |
| Scratch(Vec<u8>), | |
| Concat(Vec<u8>), | |
| } | |
| impl Transportable<'_> for GrowingList {} | |
| impl Transportable<'_> for CompressedGrowingList {} | |
| impl Compressible<'_, CompressedGrowingList> for GrowingList { | |
| fn compress(&self, history: History<Self>) -> CompressedGrowingList { | |
| if let Some(last) = history.last_frame { | |
| if self.0.starts_with(&last.0) && self.0.len() > last.0.len() { | |
| return CompressedGrowingList::Concat(self.0[last.0.len()..].to_vec()); | |
| } | |
| } | |
| CompressedGrowingList::Scratch(self.0.clone()) | |
| } | |
| fn decompress(frame: CompressedGrowingList, history: History<Self>) -> Self { | |
| match frame { | |
| CompressedGrowingList::Scratch(data) => GrowingList(data), | |
| CompressedGrowingList::Concat(data) => { | |
| let mut full = history.last_frame.unwrap().0; | |
| full.extend(data); | |
| GrowingList(full) | |
| } | |
| } | |
| } | |
| } | |
| pub struct StdinTransport<T> | |
| where | |
| T: for<'a> Transportable<'a> + Send + Sync + Clone + 'static, | |
| { | |
| pub(crate) history: Arc<Mutex<History<T>>>, | |
| } | |
| impl<T> StdinTransport<T> | |
| where | |
| T: for<'a> Transportable<'a> + Send + Sync + Clone + 'static, | |
| { | |
| pub fn new() -> Self { | |
| let history = Arc::new(Mutex::new(History { | |
| packets: Vec::new(), | |
| last_frame: None, | |
| })); | |
| let history_clone = history.clone(); | |
| tokio::spawn(async move { | |
| let mut stdin = tokio::io::stdin(); | |
| loop { | |
| let mut len_buf = [0u8; 4]; | |
| if stdin.read_exact(&mut len_buf).await.is_err() { break; } | |
| let len = u32::from_le_bytes(len_buf) as usize; | |
| let mut buffer = vec![0u8; len]; | |
| if stdin.read_exact(&mut buffer).await.is_err() { break; } | |
| if let Ok(packet) = T::deserialize(&buffer, Configuration::default()) { | |
| let mut hist = history_clone.lock().unwrap(); | |
| hist.packets.push(packet.clone()); | |
| hist.last_frame = Some(packet); | |
| } | |
| } | |
| }); | |
| Self { history } | |
| } | |
| pub fn get_history(&self) -> History<T> { | |
| self.history.lock().unwrap().clone() | |
| } | |
| } | |
| pub struct StdoutTransport<T> | |
| where | |
| T: for<'a> Transportable<'a> + Send + Sync + Clone + 'static, | |
| { | |
| pub(crate) history: Arc<Mutex<History<T>>>, | |
| tx: tokio::sync::mpsc::UnboundedSender<T>, | |
| } | |
| impl<T> StdoutTransport<T> | |
| where | |
| T: for<'a> Transportable<'a> + Send + Sync + Clone + 'static, | |
| { | |
| pub fn new() -> Self { | |
| let history = Arc::new(Mutex::new(History { | |
| packets: Vec::new(), | |
| last_frame: None, | |
| })); | |
| let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<T>(); | |
| let history_clone = history.clone(); | |
| tokio::spawn(async move { | |
| let mut stdout = tokio::io::stdout(); | |
| while let Some(packet) = rx.recv().await { | |
| { | |
| let mut hist = history_clone.lock().unwrap(); | |
| hist.packets.push(packet.clone()); | |
| hist.last_frame = Some(packet.clone()); | |
| } | |
| if let Ok(bytes) = T::serialize(&packet, Configuration::default()) { | |
| let len = bytes.len() as u32; | |
| let _ = stdout.write_all(&len.to_le_bytes()).await; | |
| let _ = stdout.write_all(&bytes).await; | |
| let _ = stdout.flush().await; | |
| } | |
| } | |
| }); | |
| Self { history, tx } | |
| } | |
| pub fn send(&self, packet: T) { | |
| let _ = self.tx.send(packet); | |
| } | |
| pub fn get_history(&self) -> History<T> { | |
| self.history.lock().unwrap().clone() | |
| } | |
| } | |
| pub struct CompressedStdinTransport<T, C> | |
| where | |
| T: for<'a> Compressible<'a, C> + Send + Sync + Clone + 'static, | |
| C: for<'a> Transportable<'a> + Send + Sync + Clone + 'static, | |
| { | |
| history: Arc<Mutex<History<T>>>, | |
| inner_history: Arc<Mutex<History<C>>>, | |
| } | |
| impl<T, C> CompressedStdinTransport<T, C> | |
| where | |
| T: for<'a> Compressible<'a, C> + Send + Sync + Clone + 'static, | |
| C: for<'a> Transportable<'a> + Send + Sync + Clone + 'static, | |
| { | |
| pub fn new() -> Self { | |
| let history = Arc::new(Mutex::new(History { | |
| packets: Vec::new(), | |
| last_frame: None, | |
| })); | |
| let inner_transport = StdinTransport::<C>::new(); | |
| let inner_history = inner_transport.history.clone(); | |
| let history_clone = history.clone(); | |
| let inner_history_clone = inner_history.clone(); | |
| tokio::spawn(async move { | |
| let mut processed = 0; | |
| loop { | |
| let inner_packets = { | |
| let h = inner_history_clone.lock().unwrap(); | |
| h.packets.clone() | |
| }; | |
| if inner_packets.len() > processed { | |
| for i in processed..inner_packets.len() { | |
| let compressed = inner_packets[i].clone(); | |
| let mut hist = history_clone.lock().unwrap(); | |
| let decompressed = T::decompress(compressed, hist.clone()); | |
| hist.packets.push(decompressed.clone()); | |
| hist.last_frame = Some(decompressed); | |
| } | |
| processed = inner_packets.len(); | |
| } | |
| tokio::time::sleep(Duration::from_millis(50)).await; | |
| } | |
| }); | |
| Self { history, inner_history } | |
| } | |
| pub fn get_history(&self) -> History<T> { | |
| self.history.lock().unwrap().clone() | |
| } | |
| pub fn get_compressed_history(&self) -> History<C> { | |
| self.inner_history.lock().unwrap().clone() | |
| } | |
| } | |
| pub struct CompressedStdoutTransport<T, C> | |
| where | |
| T: for<'a> Compressible<'a, C> + Send + Sync + Clone + 'static, | |
| C: for<'a> Transportable<'a> + Send + Sync + Clone + 'static, | |
| { | |
| history: Arc<Mutex<History<T>>>, | |
| inner: StdoutTransport<C>, | |
| } | |
| impl<T, C> CompressedStdoutTransport<T, C> | |
| where | |
| T: for<'a> Compressible<'a, C> + Send + Sync + Clone + 'static, | |
| C: for<'a> Transportable<'a> + Send + Sync + Clone + 'static, | |
| { | |
| pub fn new() -> Self { | |
| let history = Arc::new(Mutex::new(History { | |
| packets: Vec::new(), | |
| last_frame: None, | |
| })); | |
| let inner = StdoutTransport::<C>::new(); | |
| Self { history, inner } | |
| } | |
| pub fn send(&self, packet: T) { | |
| let compressed = { | |
| let mut hist = self.history.lock().unwrap(); | |
| let c = packet.compress(hist.clone()); | |
| hist.packets.push(packet.clone()); | |
| hist.last_frame = Some(packet); | |
| c | |
| }; | |
| self.inner.send(compressed); | |
| } | |
| pub fn get_history(&self) -> History<T> { | |
| self.history.lock().unwrap().clone() | |
| } | |
| pub fn get_compressed_history(&self) -> History<C> { | |
| self.inner.get_history() | |
| } | |
| } | |
| #[tokio::main] | |
| async fn main() { | |
| let args: Vec<String> = std::env::args().collect(); | |
| let is_sender = args.contains(&"--sender".to_string()); | |
| if is_sender { | |
| let transport = CompressedStdoutTransport::<GrowingList, CompressedGrowingList>::new(); | |
| let mut i = 0u8; | |
| let mut data = Vec::new(); | |
| loop { | |
| data.push(i); | |
| let packet = GrowingList(data.clone()); | |
| transport.send(packet); | |
| let hist = transport.get_history(); | |
| let c_hist = transport.get_compressed_history(); | |
| if let (Some(raw), Some(comp)) = (hist.last_frame, c_hist.last_frame) { | |
| let raw_size = raw.0.len(); | |
| let comp_size = match comp { | |
| CompressedGrowingList::Scratch(v) => v.len(), | |
| CompressedGrowingList::Concat(v) => v.len(), | |
| }; | |
| eprintln!("Sender: Raw {} bytes -> Compressed {} bytes", raw_size, comp_size); | |
| } | |
| i = (i + 1) % 20; | |
| tokio::time::sleep(Duration::from_millis(500)).await; | |
| } | |
| } else { | |
| let transport = CompressedStdinTransport::<GrowingList, CompressedGrowingList>::new(); | |
| let mut last_len = 0; | |
| loop { | |
| let history = transport.get_history(); | |
| let c_history = transport.get_compressed_history(); | |
| if history.packets.len() > last_len { | |
| for i in last_len..history.packets.len() { | |
| let packet = &history.packets[i]; | |
| let c_packet = &c_history.packets[i]; | |
| let c_type = match c_packet { | |
| CompressedGrowingList::Scratch(_) => "Scratch", | |
| CompressedGrowingList::Concat(_) => "Concat", | |
| }; | |
| eprintln!("Receiver: Packet {} [Type: {}] -> {:?}", i, c_type, packet); | |
| } | |
| last_len = history.packets.len(); | |
| } | |
| tokio::time::sleep(Duration::from_millis(100)).await; | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment