Created
February 13, 2026 13:50
-
-
Save tkanhe/5c2d0affbcc7b958dc8c4804d524acca to your computer and use it in GitHub Desktop.
Cross-account S3 object copy utility with parallel processing and retry logic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import concurrent.futures | |
| import logging | |
| import sys | |
| import time | |
| from dataclasses import dataclass, field | |
| import boto3 | |
| from botocore.config import Config | |
| from botocore.exceptions import ClientError | |
| # ────────────────────────────────────────────── | |
| # CONFIGURATION - Edit these values | |
| # ────────────────────────────────────────────── | |
| SOURCE_PROFILE = None # AWS CLI profile name, or None to use explicit creds / default chain | |
| SOURCE_REGION = "us-east-1" | |
| SOURCE_BUCKET = "source-bucket-name" | |
| SOURCE_PREFIX = "690d23a0b619789dffde9e08/" | |
| # Explicit source credentials (used only if SOURCE_PROFILE is None) | |
| SOURCE_ACCESS_KEY = None | |
| SOURCE_SECRET_KEY = None | |
| SOURCE_SESSION_TOKEN = None | |
| DEST_PROFILE = None | |
| DEST_REGION = "us-east-1" | |
| DEST_BUCKET = "destination-bucket-name" | |
| DEST_PREFIX = SOURCE_PREFIX # Set to a different string to remap the prefix | |
| # Explicit destination credentials (used only if DEST_PROFILE is None) | |
| DEST_ACCESS_KEY = None | |
| DEST_SECRET_KEY = None | |
| DEST_SESSION_TOKEN = None | |
| MAX_WORKERS = 20 | |
| SKIP_EXISTING = True # Skip objects that already exist at destination with same size/ETag | |
| DRY_RUN = False # Set True to list objects without copying | |
| # ────────────────────────────────────────────── | |
| # INTERNAL CONSTANTS | |
| # ────────────────────────────────────────────── | |
| MULTIPART_THRESHOLD = 100 * 1024 * 1024 # 100 MB | |
| MULTIPART_CHUNK_SIZE = 50 * 1024 * 1024 # 50 MB | |
| MAX_RETRIES = 3 | |
| RETRY_BACKOFF_BASE = 2 # seconds | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| log = logging.getLogger(__name__) | |
| @dataclass | |
| class CopyStats: | |
| total: int = 0 | |
| success: int = 0 | |
| failed: int = 0 | |
| skipped: int = 0 | |
| failed_keys: list = field(default_factory=list) | |
| def create_s3_client( | |
| region: str, | |
| profile: str | None = None, | |
| access_key: str | None = None, | |
| secret_key: str | None = None, | |
| session_token: str | None = None, | |
| ): | |
| """Create an S3 client from a named profile or explicit credentials.""" | |
| boto_config = Config( | |
| retries={"max_attempts": 3, "mode": "adaptive"}, | |
| max_pool_connections=50, | |
| ) | |
| if profile: | |
| session = boto3.Session(profile_name=profile, region_name=region) | |
| return session.client("s3", config=boto_config) | |
| creds = {} | |
| if access_key and secret_key: | |
| creds["aws_access_key_id"] = access_key | |
| creds["aws_secret_access_key"] = secret_key | |
| if session_token: | |
| creds["aws_session_token"] = session_token | |
| return boto3.client("s3", region_name=region, config=boto_config, **creds) | |
| def list_objects(client, bucket: str, prefix: str) -> list[dict]: | |
| """List all objects under a prefix. Returns list of {Key, Size, ETag}.""" | |
| paginator = client.get_paginator("list_objects_v2") | |
| objects = [] | |
| for page in paginator.paginate(Bucket=bucket, Prefix=prefix): | |
| for obj in page.get("Contents", []): | |
| objects.append({"Key": obj["Key"], "Size": obj["Size"], "ETag": obj["ETag"]}) | |
| return objects | |
| def head_object_safe(client, bucket: str, key: str) -> dict | None: | |
| """Return object metadata or None if it doesn't exist.""" | |
| try: | |
| return client.head_object(Bucket=bucket, Key=key) | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "404": | |
| return None | |
| raise | |
| def copy_object( | |
| source_client, | |
| dest_client, | |
| source_bucket: str, | |
| dest_bucket: str, | |
| src_key: str, | |
| dest_key: str, | |
| src_size: int, | |
| src_etag: str, | |
| ) -> tuple[str, str | None]: | |
| """ | |
| Copy a single object between accounts. Returns (status, error_msg). | |
| Status is one of: "success", "skipped", "failed". | |
| """ | |
| if SKIP_EXISTING: | |
| dest_meta = head_object_safe(dest_client, dest_bucket, dest_key) | |
| if dest_meta and dest_meta["ContentLength"] == src_size and dest_meta["ETag"] == src_etag: | |
| return "skipped", None | |
| last_error = None | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| if src_size <= MULTIPART_THRESHOLD: | |
| _copy_simple(source_client, dest_client, source_bucket, dest_bucket, src_key, dest_key) | |
| else: | |
| _copy_multipart(source_client, dest_client, source_bucket, dest_bucket, src_key, dest_key) | |
| return "success", None | |
| except Exception as e: | |
| last_error = str(e) | |
| if attempt < MAX_RETRIES: | |
| wait = RETRY_BACKOFF_BASE**attempt | |
| log.warning("Retry %d/%d for %s (waiting %ds): %s", attempt, MAX_RETRIES, src_key, wait, e) | |
| time.sleep(wait) | |
| return "failed", last_error | |
| def _copy_simple(source_client, dest_client, source_bucket, dest_bucket, src_key, dest_key): | |
| """Download into memory and upload. Suitable for objects <= MULTIPART_THRESHOLD.""" | |
| resp = source_client.get_object(Bucket=source_bucket, Key=src_key) | |
| content_type = resp.get("ContentType", "binary/octet-stream") | |
| body = resp["Body"].read() | |
| dest_client.put_object(Bucket=dest_bucket, Key=dest_key, Body=body, ContentType=content_type) | |
| def _copy_multipart(source_client, dest_client, source_bucket, dest_bucket, src_key, dest_key): | |
| """Stream large objects via multipart upload to avoid loading everything into memory.""" | |
| resp = source_client.get_object(Bucket=source_bucket, Key=src_key) | |
| content_type = resp.get("ContentType", "binary/octet-stream") | |
| stream = resp["Body"] | |
| mpu = dest_client.create_multipart_upload(Bucket=dest_bucket, Key=dest_key, ContentType=content_type) | |
| upload_id = mpu["UploadId"] | |
| parts = [] | |
| try: | |
| part_number = 1 | |
| while True: | |
| chunk = stream.read(MULTIPART_CHUNK_SIZE) | |
| if not chunk: | |
| break | |
| part_resp = dest_client.upload_part(Bucket=dest_bucket, Key=dest_key, UploadId=upload_id, PartNumber=part_number, Body=chunk) | |
| parts.append({"ETag": part_resp["ETag"], "PartNumber": part_number}) | |
| part_number += 1 | |
| dest_client.complete_multipart_upload( | |
| Bucket=dest_bucket, | |
| Key=dest_key, | |
| UploadId=upload_id, | |
| MultipartUpload={"Parts": parts}, | |
| ) | |
| except Exception: | |
| dest_client.abort_multipart_upload(Bucket=dest_bucket, Key=dest_key, UploadId=upload_id) | |
| raise | |
| finally: | |
| stream.close() | |
| def copy_s3_objects() -> CopyStats: | |
| """Copy all objects under SOURCE_PREFIX to DEST_PREFIX in parallel.""" | |
| source_client = create_s3_client( | |
| region=SOURCE_REGION, | |
| profile=SOURCE_PROFILE, | |
| access_key=SOURCE_ACCESS_KEY, | |
| secret_key=SOURCE_SECRET_KEY, | |
| session_token=SOURCE_SESSION_TOKEN, | |
| ) | |
| dest_client = create_s3_client( | |
| region=DEST_REGION, | |
| profile=DEST_PROFILE, | |
| access_key=DEST_ACCESS_KEY, | |
| secret_key=DEST_SECRET_KEY, | |
| session_token=DEST_SESSION_TOKEN, | |
| ) | |
| log.info("Listing objects in s3://%s/%s ...", SOURCE_BUCKET, SOURCE_PREFIX) | |
| objects = list_objects(source_client, SOURCE_BUCKET, SOURCE_PREFIX) | |
| stats = CopyStats(total=len(objects)) | |
| if stats.total == 0: | |
| log.info("No objects found.") | |
| return stats | |
| log.info("Found %d objects to copy.", stats.total) | |
| if DRY_RUN: | |
| for obj in objects: | |
| dest_key = obj["Key"].replace(SOURCE_PREFIX, DEST_PREFIX, 1) | |
| log.info("[DRY RUN] %s -> s3://%s/%s", obj["Key"], DEST_BUCKET, dest_key) | |
| return stats | |
| def _task(obj): | |
| dest_key = obj["Key"].replace(SOURCE_PREFIX, DEST_PREFIX, 1) | |
| status, err = copy_object( | |
| source_client, | |
| dest_client, | |
| SOURCE_BUCKET, | |
| DEST_BUCKET, | |
| obj["Key"], | |
| dest_key, | |
| obj["Size"], | |
| obj["ETag"], | |
| ) | |
| return obj["Key"], dest_key, status, err | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
| futures = {executor.submit(_task, obj): obj for obj in objects} | |
| for future in concurrent.futures.as_completed(futures): | |
| src_key, dest_key, status, err = future.result() | |
| if status == "success": | |
| stats.success += 1 | |
| log.info("Copied %s -> %s", src_key, dest_key) | |
| elif status == "skipped": | |
| stats.skipped += 1 | |
| log.debug("Skipped %s (already exists)", src_key) | |
| else: | |
| stats.failed += 1 | |
| stats.failed_keys.append(src_key) | |
| log.error("Failed %s: %s", src_key, err) | |
| done = stats.success + stats.failed + stats.skipped | |
| if done % 50 == 0 or done == stats.total: | |
| log.info("Progress: %d/%d (%.1f%%)", done, stats.total, done / stats.total * 100) | |
| log.info( | |
| "Complete: %d success, %d skipped, %d failed, %d total", | |
| stats.success, | |
| stats.skipped, | |
| stats.failed, | |
| stats.total, | |
| ) | |
| if stats.failed_keys: | |
| log.error("Failed keys:\n %s", "\n ".join(stats.failed_keys)) | |
| return stats | |
| if __name__ == "__main__": | |
| stats = copy_s3_objects() | |
| sys.exit(1 if stats.failed > 0 else 0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment