Skip to content

Instantly share code, notes, and snippets.

@rust-play
Created December 21, 2025 12:53
Show Gist options
  • Select an option

  • Save rust-play/bedc9114c3492eed1d57c5c8dda38796 to your computer and use it in GitHub Desktop.

Select an option

Save rust-play/bedc9114c3492eed1d57c5c8dda38796 to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use anyhow::{Context, Result};
use std::time::Duration;
use tokio::task::JoinSet;
use std::sync::Arc;
// --- 1. Error Definitions ---
#[derive(Error, Debug)]
pub enum ProcessorError {
#[error("Network failure while executing task at {url}: {reason}")]
NetworkError { url: String, reason: String },
#[error("Invalid task configuration: {0}")]
ConfigError(String),
}
// --- 2. Trait Definition ---
#[async_trait]
pub trait AsyncAction: Send + Sync {
async fn perform(&self) -> Result<(), ProcessorError>;
}
// --- 3. Concrete Implementations ---
#[derive(Serialize, Deserialize, Debug, Clone)]
struct WebhookTask {
url: String,
payload: String,
}
#[async_trait]
impl AsyncAction for WebhookTask {
async fn perform(&self) -> Result<(), ProcessorError> {
// Simulate network latency
tokio::time::sleep(Duration::from_millis(500)).await;
if self.url.is_empty() {
return Err(ProcessorError::NetworkError {
url: "N/A".into(),
reason: "URL cannot be empty".into(),
});
}
println!("--> [Webhook] Sent to {}: {}", self.url, self.payload);
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct DelayTask {
seconds: u64,
}
#[async_trait]
impl AsyncAction for DelayTask {
async fn perform(&self) -> Result<(), ProcessorError> {
println!("--> [Delay] Starting {}s sleep", self.seconds);
tokio::time::sleep(Duration::from_secs(self.seconds)).await;
println!("--> [Delay] Finished {}s sleep", self.seconds);
Ok(())
}
}
// --- 4. Polymorphic Container ---
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type", rename_all = "snake_case")]
enum Task {
Webhook(WebhookTask),
Delay(DelayTask),
}
impl Task {
/// Converts the enum variant into a thread-safe trait object.
/// We use Arc to satisfy the 'static lifetime requirement for tokio::spawn.
fn into_trait_object(self) -> Arc<dyn AsyncAction> {
match self {
Task::Webhook(t) => Arc::new(t),
Task::Delay(t) => Arc::new(t),
}
}
}
// --- 5. Main Execution Loop ---
#[tokio::main]
async fn main() -> Result<()> {
let raw_json = r#"
[
{"type": "webhook", "url": "https://api.a.com", "payload": "ping_1"},
{"type": "delay", "seconds": 2},
{"type": "webhook", "url": "https://api.b.com", "payload": "ping_2"},
{"type": "delay", "seconds": 1}
]
"#;
// Parse the pipeline
let tasks: Vec<Task> = serde_json::from_str(raw_json)
.context("Failed to parse task JSON")?;
let mut set = JoinSet::new();
println!("Starting concurrent processing of {} tasks...", tasks.len());
for task in tasks {
let action = task.into_trait_object();
// Spawn each task onto the tokio runtime
set.spawn(async move {
action.perform().await
});
}
// Process results as they finish (out of order)
while let Some(res) = set.join_next().await {
// Outer Result is for the JoinHandle (task panics/cancellation)
// Inner Result is our ProcessorError
let task_result = res.context("Task execution panicked")?;
if let Err(e) = task_result {
eprintln!("Task failed with error: {}", e);
}
}
println!("All tasks finished processing.");
Ok(())
}
// --- 6. Test Suite ---
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::Instant;
#[test]
fn test_deserialization() {
let json = r#"{"type": "delay", "seconds": 5}"#;
let task: Task = serde_json::from_str(json).unwrap();
match task {
Task::Delay(d) => assert_eq!(d.seconds, 5),
_ => panic!("Expected Delay variant"),
}
}
#[tokio::test]
async fn test_webhook_error() {
let task = WebhookTask { url: "".into(), payload: "".into() };
let result = task.perform().await;
assert!(matches!(result, Err(ProcessorError::NetworkError { .. })));
}
#[tokio::test]
async fn test_concurrency_timing() {
let tasks = vec![
Task::Delay(DelayTask { seconds: 1 }),
Task::Delay(DelayTask { seconds: 1 }),
];
let mut set = JoinSet::new();
let start = Instant::now();
for task in tasks {
let action = task.into_trait_object();
set.spawn(async move { action.perform().await });
}
while let Some(_) = set.join_next().await {}
// If concurrent, two 1s tasks should finish in significantly less than 2s
assert!(start.elapsed() < Duration::from_millis(1500));
}
#[tokio::test]
async fn test_dynamic_dispatch_collection() {
// Verify we can mix and match types in a standard collection
let actions: Vec<Arc<dyn AsyncAction>> = vec![
Arc::new(DelayTask { seconds: 0 }),
Arc::new(WebhookTask { url: "test".into(), payload: "test".into() }),
];
for action in actions {
assert!(action.perform().await.is_ok());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment