Created
January 29, 2026 11:59
-
-
Save justincormack/59a7f3e781ca044f9194eaa624aaa0c0 to your computer and use it in GitHub Desktop.
rustfs conditional PUT concurrency issues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Conditional PUT Race Condition Test | |
| // | |
| // This test verifies whether RustFS correctly handles concurrent conditional PUT | |
| // requests from multiple nodes. With If-None-Match: "*", only ONE writer should | |
| // succeed when creating a new object - all others should get 412 Precondition Failed. | |
| // | |
| // Usage: | |
| // cargo run --bin conditional_put_race_test | |
| // | |
| // Or run the test directly: | |
| // cargo test --package rustfs_e2e_test --lib conditional_put_race_test -- --nocapture | |
| use aws_sdk_s3::config::{Credentials, Region}; | |
| use aws_sdk_s3::error::SdkError; | |
| use aws_sdk_s3::Client; | |
| use bytes::Bytes; | |
| use std::sync::Arc; | |
| use std::time::Instant; | |
| use tokio::sync::Barrier; | |
| const ACCESS_KEY: &str = "rustfsadmin"; | |
| const SECRET_KEY: &str = "rustfsadmin"; | |
| const BUCKET: &str = "test-bucket"; | |
| /// Create an S3 client pointing to a specific endpoint | |
| async fn create_client(endpoint: &str) -> Client { | |
| let creds = Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static"); | |
| let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) | |
| .region(Region::new("us-east-1")) | |
| .credentials_provider(creds) | |
| .endpoint_url(endpoint) | |
| .load() | |
| .await; | |
| Client::from_conf( | |
| aws_sdk_s3::Config::from(&config) | |
| .to_builder() | |
| .force_path_style(true) | |
| .build(), | |
| ) | |
| } | |
| /// Ensure the test bucket exists | |
| async fn ensure_bucket(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | |
| match client.create_bucket().bucket(BUCKET).send().await { | |
| Ok(_) => Ok(()), | |
| Err(SdkError::ServiceError(e)) => { | |
| let code = e.err().meta().code().unwrap_or(""); | |
| if code == "BucketAlreadyExists" || code == "BucketAlreadyOwnedByYou" { | |
| Ok(()) | |
| } else { | |
| Err(e.into_err().into()) | |
| } | |
| } | |
| Err(e) => Err(e.into()), | |
| } | |
| } | |
| /// Delete object if it exists | |
| async fn cleanup_object(client: &Client, key: &str) { | |
| let _ = client.delete_object().bucket(BUCKET).key(key).send().await; | |
| } | |
| /// Attempt conditional PUT with If-None-Match: "*" | |
| /// Returns Ok(true) if succeeded, Ok(false) if got 412, Err for other errors | |
| async fn conditional_put( | |
| client: &Client, | |
| key: &str, | |
| data: &[u8], | |
| client_id: usize, | |
| ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> { | |
| let result = client | |
| .put_object() | |
| .bucket(BUCKET) | |
| .key(key) | |
| .body(Bytes::from(data.to_vec()).into()) | |
| .if_none_match("*") | |
| .send() | |
| .await; | |
| match result { | |
| Ok(resp) => { | |
| println!( | |
| " Client {} SUCCEEDED - ETag: {}", | |
| client_id, | |
| resp.e_tag().unwrap_or("none") | |
| ); | |
| Ok(true) | |
| } | |
| Err(SdkError::ServiceError(e)) => { | |
| let code = e.err().meta().code().unwrap_or(""); | |
| if code == "PreconditionFailed" { | |
| println!(" Client {} got 412 PreconditionFailed (correct rejection)", client_id); | |
| Ok(false) | |
| } else { | |
| println!(" Client {} got error: {}", client_id, code); | |
| Err(e.into_err().into()) | |
| } | |
| } | |
| Err(e) => { | |
| println!(" Client {} got error: {}", client_id, e); | |
| Err(e.into()) | |
| } | |
| } | |
| } | |
| /// Run a single race test iteration | |
| async fn run_race_test( | |
| clients: &[Client], | |
| test_key: &str, | |
| iteration: usize, | |
| ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> { | |
| // Clean up any existing object | |
| cleanup_object(&clients[0], test_key).await; | |
| // Small delay to ensure deletion propagates | |
| tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; | |
| // Verify object doesn't exist | |
| let head_result = clients[0] | |
| .head_object() | |
| .bucket(BUCKET) | |
| .key(test_key) | |
| .send() | |
| .await; | |
| if head_result.is_ok() { | |
| println!("Warning: Object still exists after cleanup, skipping iteration {}", iteration); | |
| return Ok(0); | |
| } | |
| println!("\n=== Iteration {} ===", iteration); | |
| println!("Launching {} concurrent conditional PUTs to different nodes...", clients.len()); | |
| // Use a barrier to synchronize all clients | |
| let barrier = Arc::new(Barrier::new(clients.len())); | |
| let test_key = test_key.to_string(); | |
| let mut handles = vec![]; | |
| for (i, client) in clients.iter().enumerate() { | |
| let client = client.clone(); | |
| let barrier = barrier.clone(); | |
| let key = test_key.clone(); | |
| let data = format!("data from client {}", i).into_bytes(); | |
| let handle = tokio::spawn(async move { | |
| // Wait for all clients to be ready | |
| barrier.wait().await; | |
| // All clients fire simultaneously | |
| conditional_put(&client, &key, &data, i).await | |
| }); | |
| handles.push(handle); | |
| } | |
| // Collect results | |
| let mut success_count = 0; | |
| for handle in handles { | |
| match handle.await { | |
| Ok(Ok(true)) => success_count += 1, | |
| Ok(Ok(false)) => {} // Expected rejection | |
| Ok(Err(e)) => println!(" Error: {}", e), | |
| Err(e) => println!(" Task error: {}", e), | |
| } | |
| } | |
| println!("Result: {} out of {} succeeded", success_count, clients.len()); | |
| if success_count > 1 { | |
| println!(">>> RACE CONDITION DETECTED! Multiple writers succeeded when only 1 should have."); | |
| } else if success_count == 1 { | |
| println!(">>> Correct behavior: exactly 1 writer succeeded."); | |
| } else { | |
| println!(">>> Unexpected: no writers succeeded."); | |
| } | |
| Ok(success_count) | |
| } | |
| #[tokio::main] | |
| async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | |
| // Configure your endpoints here | |
| let endpoints = vec![ | |
| "http://localhost:9000", | |
| "http://localhost:9001", | |
| "http://localhost:9002", | |
| "http://localhost:9003", | |
| ]; | |
| println!("Conditional PUT Race Condition Test"); | |
| println!("===================================="); | |
| println!("Endpoints: {:?}", endpoints); | |
| println!(); | |
| // Create clients for each endpoint | |
| let mut clients = vec![]; | |
| for endpoint in &endpoints { | |
| clients.push(create_client(endpoint).await); | |
| } | |
| // Ensure bucket exists (use first client) | |
| println!("Ensuring test bucket '{}' exists...", BUCKET); | |
| ensure_bucket(&clients[0]).await?; | |
| println!("Bucket ready.\n"); | |
| // Run multiple iterations to increase chance of catching race | |
| let iterations = 10; | |
| let mut total_races_detected = 0; | |
| let mut total_correct = 0; | |
| let start = Instant::now(); | |
| for i in 1..=iterations { | |
| let test_key = format!("race-test-{}-{}", std::process::id(), i); | |
| match run_race_test(&clients, &test_key, i).await { | |
| Ok(success_count) => { | |
| if success_count > 1 { | |
| total_races_detected += 1; | |
| } else if success_count == 1 { | |
| total_correct += 1; | |
| } | |
| } | |
| Err(e) => { | |
| println!("Iteration {} failed: {}", i, e); | |
| } | |
| } | |
| // Cleanup | |
| cleanup_object(&clients[0], &test_key).await; | |
| // Small delay between iterations | |
| tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; | |
| } | |
| let elapsed = start.elapsed(); | |
| println!("\n"); | |
| println!("===================================="); | |
| println!("SUMMARY"); | |
| println!("===================================="); | |
| println!("Total iterations: {}", iterations); | |
| println!("Correct (1 winner): {}", total_correct); | |
| println!("Race conditions: {}", total_races_detected); | |
| println!("Time elapsed: {:?}", elapsed); | |
| println!(); | |
| if total_races_detected > 0 { | |
| println!("CONCLUSION: Race condition confirmed!"); | |
| println!("Conditional PUT is NOT safe for multi-node concurrent writes."); | |
| println!("This affects Iceberg/Delta Lake table commits and similar use cases."); | |
| } else { | |
| println!("CONCLUSION: No race conditions detected in {} iterations.", iterations); | |
| println!("This doesn't prove safety - try more iterations or adjust timing."); | |
| } | |
| Ok(()) | |
| } | |
| #[cfg(test)] | |
| mod tests { | |
| use super::*; | |
| #[tokio::test] | |
| #[ignore = "requires running RustFS cluster"] | |
| async fn test_conditional_put_race() { | |
| let endpoints = vec![ | |
| "http://localhost:9000", | |
| "http://localhost:9001", | |
| "http://localhost:9002", | |
| "http://localhost:9003", | |
| ]; | |
| let mut clients = vec![]; | |
| for endpoint in &endpoints { | |
| clients.push(create_client(endpoint).await); | |
| } | |
| ensure_bucket(&clients[0]).await.expect("Failed to create bucket"); | |
| let mut races_detected = 0; | |
| for i in 1..=20 { | |
| let test_key = format!("test-race-{}", i); | |
| if let Ok(count) = run_race_test(&clients, &test_key, i).await { | |
| if count > 1 { | |
| races_detected += 1; | |
| } | |
| } | |
| cleanup_object(&clients[0], &test_key).await; | |
| } | |
| // We expect races to be detected given the architecture | |
| println!("\nRaces detected: {}/20", races_detected); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment