Skip to content

Instantly share code, notes, and snippets.

@tkanhe
Created February 13, 2026 13:50
Show Gist options
  • Select an option

  • Save tkanhe/5c2d0affbcc7b958dc8c4804d524acca to your computer and use it in GitHub Desktop.

Select an option

Save tkanhe/5c2d0affbcc7b958dc8c4804d524acca to your computer and use it in GitHub Desktop.
Cross-account S3 object copy utility with parallel processing and retry logic
from __future__ import annotations
import concurrent.futures
import logging
import sys
import time
from dataclasses import dataclass, field
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
# ──────────────────────────────────────────────
# CONFIGURATION - Edit these values
# ──────────────────────────────────────────────
SOURCE_PROFILE = None # AWS CLI profile name, or None to use explicit creds / default chain
SOURCE_REGION = "us-east-1"
SOURCE_BUCKET = "source-bucket-name"
SOURCE_PREFIX = "690d23a0b619789dffde9e08/"
# Explicit source credentials (used only if SOURCE_PROFILE is None)
SOURCE_ACCESS_KEY = None
SOURCE_SECRET_KEY = None
SOURCE_SESSION_TOKEN = None
DEST_PROFILE = None
DEST_REGION = "us-east-1"
DEST_BUCKET = "destination-bucket-name"
DEST_PREFIX = SOURCE_PREFIX # Set to a different string to remap the prefix
# Explicit destination credentials (used only if DEST_PROFILE is None)
DEST_ACCESS_KEY = None
DEST_SECRET_KEY = None
DEST_SESSION_TOKEN = None
MAX_WORKERS = 20
SKIP_EXISTING = True # Skip objects that already exist at destination with same size/ETag
DRY_RUN = False # Set True to list objects without copying
# ──────────────────────────────────────────────
# INTERNAL CONSTANTS
# ──────────────────────────────────────────────
MULTIPART_THRESHOLD = 100 * 1024 * 1024 # 100 MB
MULTIPART_CHUNK_SIZE = 50 * 1024 * 1024 # 50 MB
MAX_RETRIES = 3
RETRY_BACKOFF_BASE = 2 # seconds
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)
@dataclass
class CopyStats:
total: int = 0
success: int = 0
failed: int = 0
skipped: int = 0
failed_keys: list = field(default_factory=list)
def create_s3_client(
region: str,
profile: str | None = None,
access_key: str | None = None,
secret_key: str | None = None,
session_token: str | None = None,
):
"""Create an S3 client from a named profile or explicit credentials."""
boto_config = Config(
retries={"max_attempts": 3, "mode": "adaptive"},
max_pool_connections=50,
)
if profile:
session = boto3.Session(profile_name=profile, region_name=region)
return session.client("s3", config=boto_config)
creds = {}
if access_key and secret_key:
creds["aws_access_key_id"] = access_key
creds["aws_secret_access_key"] = secret_key
if session_token:
creds["aws_session_token"] = session_token
return boto3.client("s3", region_name=region, config=boto_config, **creds)
def list_objects(client, bucket: str, prefix: str) -> list[dict]:
"""List all objects under a prefix. Returns list of {Key, Size, ETag}."""
paginator = client.get_paginator("list_objects_v2")
objects = []
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
objects.append({"Key": obj["Key"], "Size": obj["Size"], "ETag": obj["ETag"]})
return objects
def head_object_safe(client, bucket: str, key: str) -> dict | None:
"""Return object metadata or None if it doesn't exist."""
try:
return client.head_object(Bucket=bucket, Key=key)
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return None
raise
def copy_object(
source_client,
dest_client,
source_bucket: str,
dest_bucket: str,
src_key: str,
dest_key: str,
src_size: int,
src_etag: str,
) -> tuple[str, str | None]:
"""
Copy a single object between accounts. Returns (status, error_msg).
Status is one of: "success", "skipped", "failed".
"""
if SKIP_EXISTING:
dest_meta = head_object_safe(dest_client, dest_bucket, dest_key)
if dest_meta and dest_meta["ContentLength"] == src_size and dest_meta["ETag"] == src_etag:
return "skipped", None
last_error = None
for attempt in range(1, MAX_RETRIES + 1):
try:
if src_size <= MULTIPART_THRESHOLD:
_copy_simple(source_client, dest_client, source_bucket, dest_bucket, src_key, dest_key)
else:
_copy_multipart(source_client, dest_client, source_bucket, dest_bucket, src_key, dest_key)
return "success", None
except Exception as e:
last_error = str(e)
if attempt < MAX_RETRIES:
wait = RETRY_BACKOFF_BASE**attempt
log.warning("Retry %d/%d for %s (waiting %ds): %s", attempt, MAX_RETRIES, src_key, wait, e)
time.sleep(wait)
return "failed", last_error
def _copy_simple(source_client, dest_client, source_bucket, dest_bucket, src_key, dest_key):
"""Download into memory and upload. Suitable for objects <= MULTIPART_THRESHOLD."""
resp = source_client.get_object(Bucket=source_bucket, Key=src_key)
content_type = resp.get("ContentType", "binary/octet-stream")
body = resp["Body"].read()
dest_client.put_object(Bucket=dest_bucket, Key=dest_key, Body=body, ContentType=content_type)
def _copy_multipart(source_client, dest_client, source_bucket, dest_bucket, src_key, dest_key):
"""Stream large objects via multipart upload to avoid loading everything into memory."""
resp = source_client.get_object(Bucket=source_bucket, Key=src_key)
content_type = resp.get("ContentType", "binary/octet-stream")
stream = resp["Body"]
mpu = dest_client.create_multipart_upload(Bucket=dest_bucket, Key=dest_key, ContentType=content_type)
upload_id = mpu["UploadId"]
parts = []
try:
part_number = 1
while True:
chunk = stream.read(MULTIPART_CHUNK_SIZE)
if not chunk:
break
part_resp = dest_client.upload_part(Bucket=dest_bucket, Key=dest_key, UploadId=upload_id, PartNumber=part_number, Body=chunk)
parts.append({"ETag": part_resp["ETag"], "PartNumber": part_number})
part_number += 1
dest_client.complete_multipart_upload(
Bucket=dest_bucket,
Key=dest_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
except Exception:
dest_client.abort_multipart_upload(Bucket=dest_bucket, Key=dest_key, UploadId=upload_id)
raise
finally:
stream.close()
def copy_s3_objects() -> CopyStats:
"""Copy all objects under SOURCE_PREFIX to DEST_PREFIX in parallel."""
source_client = create_s3_client(
region=SOURCE_REGION,
profile=SOURCE_PROFILE,
access_key=SOURCE_ACCESS_KEY,
secret_key=SOURCE_SECRET_KEY,
session_token=SOURCE_SESSION_TOKEN,
)
dest_client = create_s3_client(
region=DEST_REGION,
profile=DEST_PROFILE,
access_key=DEST_ACCESS_KEY,
secret_key=DEST_SECRET_KEY,
session_token=DEST_SESSION_TOKEN,
)
log.info("Listing objects in s3://%s/%s ...", SOURCE_BUCKET, SOURCE_PREFIX)
objects = list_objects(source_client, SOURCE_BUCKET, SOURCE_PREFIX)
stats = CopyStats(total=len(objects))
if stats.total == 0:
log.info("No objects found.")
return stats
log.info("Found %d objects to copy.", stats.total)
if DRY_RUN:
for obj in objects:
dest_key = obj["Key"].replace(SOURCE_PREFIX, DEST_PREFIX, 1)
log.info("[DRY RUN] %s -> s3://%s/%s", obj["Key"], DEST_BUCKET, dest_key)
return stats
def _task(obj):
dest_key = obj["Key"].replace(SOURCE_PREFIX, DEST_PREFIX, 1)
status, err = copy_object(
source_client,
dest_client,
SOURCE_BUCKET,
DEST_BUCKET,
obj["Key"],
dest_key,
obj["Size"],
obj["ETag"],
)
return obj["Key"], dest_key, status, err
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(_task, obj): obj for obj in objects}
for future in concurrent.futures.as_completed(futures):
src_key, dest_key, status, err = future.result()
if status == "success":
stats.success += 1
log.info("Copied %s -> %s", src_key, dest_key)
elif status == "skipped":
stats.skipped += 1
log.debug("Skipped %s (already exists)", src_key)
else:
stats.failed += 1
stats.failed_keys.append(src_key)
log.error("Failed %s: %s", src_key, err)
done = stats.success + stats.failed + stats.skipped
if done % 50 == 0 or done == stats.total:
log.info("Progress: %d/%d (%.1f%%)", done, stats.total, done / stats.total * 100)
log.info(
"Complete: %d success, %d skipped, %d failed, %d total",
stats.success,
stats.skipped,
stats.failed,
stats.total,
)
if stats.failed_keys:
log.error("Failed keys:\n %s", "\n ".join(stats.failed_keys))
return stats
if __name__ == "__main__":
stats = copy_s3_objects()
sys.exit(1 if stats.failed > 0 else 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment