-
-
Save rust-play/bedc9114c3492eed1d57c5c8dda38796 to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use async_trait::async_trait; | |
| use serde::{Deserialize, Serialize}; | |
| use thiserror::Error; | |
| use anyhow::{Context, Result}; | |
| use std::time::Duration; | |
| use tokio::task::JoinSet; | |
| use std::sync::Arc; | |
| // --- 1. Error Definitions --- | |
| #[derive(Error, Debug)] | |
| pub enum ProcessorError { | |
| #[error("Network failure while executing task at {url}: {reason}")] | |
| NetworkError { url: String, reason: String }, | |
| #[error("Invalid task configuration: {0}")] | |
| ConfigError(String), | |
| } | |
| // --- 2. Trait Definition --- | |
| #[async_trait] | |
| pub trait AsyncAction: Send + Sync { | |
| async fn perform(&self) -> Result<(), ProcessorError>; | |
| } | |
| // --- 3. Concrete Implementations --- | |
| #[derive(Serialize, Deserialize, Debug, Clone)] | |
| struct WebhookTask { | |
| url: String, | |
| payload: String, | |
| } | |
| #[async_trait] | |
| impl AsyncAction for WebhookTask { | |
| async fn perform(&self) -> Result<(), ProcessorError> { | |
| // Simulate network latency | |
| tokio::time::sleep(Duration::from_millis(500)).await; | |
| if self.url.is_empty() { | |
| return Err(ProcessorError::NetworkError { | |
| url: "N/A".into(), | |
| reason: "URL cannot be empty".into(), | |
| }); | |
| } | |
| println!("--> [Webhook] Sent to {}: {}", self.url, self.payload); | |
| Ok(()) | |
| } | |
| } | |
| #[derive(Serialize, Deserialize, Debug, Clone)] | |
| struct DelayTask { | |
| seconds: u64, | |
| } | |
| #[async_trait] | |
| impl AsyncAction for DelayTask { | |
| async fn perform(&self) -> Result<(), ProcessorError> { | |
| println!("--> [Delay] Starting {}s sleep", self.seconds); | |
| tokio::time::sleep(Duration::from_secs(self.seconds)).await; | |
| println!("--> [Delay] Finished {}s sleep", self.seconds); | |
| Ok(()) | |
| } | |
| } | |
| // --- 4. Polymorphic Container --- | |
| #[derive(Serialize, Deserialize, Debug)] | |
| #[serde(tag = "type", rename_all = "snake_case")] | |
| enum Task { | |
| Webhook(WebhookTask), | |
| Delay(DelayTask), | |
| } | |
| impl Task { | |
| /// Converts the enum variant into a thread-safe trait object. | |
| /// We use Arc to satisfy the 'static lifetime requirement for tokio::spawn. | |
| fn into_trait_object(self) -> Arc<dyn AsyncAction> { | |
| match self { | |
| Task::Webhook(t) => Arc::new(t), | |
| Task::Delay(t) => Arc::new(t), | |
| } | |
| } | |
| } | |
| // --- 5. Main Execution Loop --- | |
| #[tokio::main] | |
| async fn main() -> Result<()> { | |
| let raw_json = r#" | |
| [ | |
| {"type": "webhook", "url": "https://api.a.com", "payload": "ping_1"}, | |
| {"type": "delay", "seconds": 2}, | |
| {"type": "webhook", "url": "https://api.b.com", "payload": "ping_2"}, | |
| {"type": "delay", "seconds": 1} | |
| ] | |
| "#; | |
| // Parse the pipeline | |
| let tasks: Vec<Task> = serde_json::from_str(raw_json) | |
| .context("Failed to parse task JSON")?; | |
| let mut set = JoinSet::new(); | |
| println!("Starting concurrent processing of {} tasks...", tasks.len()); | |
| for task in tasks { | |
| let action = task.into_trait_object(); | |
| // Spawn each task onto the tokio runtime | |
| set.spawn(async move { | |
| action.perform().await | |
| }); | |
| } | |
| // Process results as they finish (out of order) | |
| while let Some(res) = set.join_next().await { | |
| // Outer Result is for the JoinHandle (task panics/cancellation) | |
| // Inner Result is our ProcessorError | |
| let task_result = res.context("Task execution panicked")?; | |
| if let Err(e) = task_result { | |
| eprintln!("Task failed with error: {}", e); | |
| } | |
| } | |
| println!("All tasks finished processing."); | |
| Ok(()) | |
| } | |
| // --- 6. Test Suite --- | |
| #[cfg(test)] | |
| mod tests { | |
| use super::*; | |
| use tokio::time::Instant; | |
| #[test] | |
| fn test_deserialization() { | |
| let json = r#"{"type": "delay", "seconds": 5}"#; | |
| let task: Task = serde_json::from_str(json).unwrap(); | |
| match task { | |
| Task::Delay(d) => assert_eq!(d.seconds, 5), | |
| _ => panic!("Expected Delay variant"), | |
| } | |
| } | |
| #[tokio::test] | |
| async fn test_webhook_error() { | |
| let task = WebhookTask { url: "".into(), payload: "".into() }; | |
| let result = task.perform().await; | |
| assert!(matches!(result, Err(ProcessorError::NetworkError { .. }))); | |
| } | |
| #[tokio::test] | |
| async fn test_concurrency_timing() { | |
| let tasks = vec![ | |
| Task::Delay(DelayTask { seconds: 1 }), | |
| Task::Delay(DelayTask { seconds: 1 }), | |
| ]; | |
| let mut set = JoinSet::new(); | |
| let start = Instant::now(); | |
| for task in tasks { | |
| let action = task.into_trait_object(); | |
| set.spawn(async move { action.perform().await }); | |
| } | |
| while let Some(_) = set.join_next().await {} | |
| // If concurrent, two 1s tasks should finish in significantly less than 2s | |
| assert!(start.elapsed() < Duration::from_millis(1500)); | |
| } | |
| #[tokio::test] | |
| async fn test_dynamic_dispatch_collection() { | |
| // Verify we can mix and match types in a standard collection | |
| let actions: Vec<Arc<dyn AsyncAction>> = vec![ | |
| Arc::new(DelayTask { seconds: 0 }), | |
| Arc::new(WebhookTask { url: "test".into(), payload: "test".into() }), | |
| ]; | |
| for action in actions { | |
| assert!(action.perform().await.is_ok()); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment