Skip to content

Instantly share code, notes, and snippets.

@RajChowdhury240
Created December 12, 2025 18:46
Show Gist options
  • Select an option

  • Save RajChowdhury240/fc3b3871fe0ae7502a5d8a30280733b1 to your computer and use it in GitHub Desktop.

Select an option

Save RajChowdhury240/fc3b3871fe0ae7502a5d8a30280733b1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
AWS Multi-Account Credential Report Script
Assumes a role across multiple AWS accounts and retrieves IAM credential reports.
"""
import boto3
import csv
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from io import StringIO
from typing import Dict, List, Optional
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeRemainingColumn
from rich.console import Console
console = Console()
# Configuration
ROLE_NAME = "ca-iam-cie-engineer"
MAX_WORKERS = 10 # Adjust based on your needs
OUTPUT_FILE = f"credential_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
def get_org_accounts() -> List[Dict[str, str]]:
"""Retrieve all active accounts from AWS Organizations."""
try:
org_client = boto3.client('organizations')
accounts = []
paginator = org_client.get_paginator('list_accounts')
for page in paginator.paginate():
for account in page['Accounts']:
if account['Status'] == 'ACTIVE':
accounts.append({
'Id': account['Id'],
'Name': account['Name'],
'Email': account['Email']
})
return accounts
except Exception as e:
console.print(f"[red]Error retrieving accounts: {e}[/red]")
return []
def assume_role(account_id: str, role_name: str) -> Optional[boto3.Session]:
"""Assume a role in the target account."""
try:
sts_client = boto3.client('sts')
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=f"CredentialReport-{account_id}"
)
credentials = response['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return session
except Exception as e:
console.print(f"[yellow]Failed to assume role in {account_id}: {e}[/yellow]")
return None
def get_credential_report(session: boto3.Session) -> Optional[str]:
"""Generate and retrieve IAM credential report."""
try:
iam_client = session.client('iam')
# Generate credential report
max_attempts = 10
for attempt in range(max_attempts):
response = iam_client.generate_credential_report()
if response['State'] == 'COMPLETE':
break
time.sleep(2)
# Get the credential report
report_response = iam_client.get_credential_report()
report_content = report_response['Content'].decode('utf-8')
return report_content
except Exception as e:
console.print(f"[yellow]Failed to get credential report: {e}[/yellow]")
return None
def parse_credential_report(report_content: str) -> List[Dict]:
"""Parse the credential report CSV content."""
try:
csv_file = StringIO(report_content)
reader = csv.DictReader(csv_file)
return list(reader)
except Exception as e:
console.print(f"[yellow]Failed to parse credential report: {e}[/yellow]")
return []
def process_account(account: Dict[str, str], role_name: str) -> List[Dict]:
"""Process a single account and return credential report data."""
account_id = account['Id']
account_name = account['Name']
account_email = account['Email']
results = []
# Assume role
session = assume_role(account_id, role_name)
if not session:
results.append({
'AccountID': account_id,
'AccountName': account_name,
'Email': account_email,
'PasswordLastUsed': 'ERROR: Failed to assume role',
'CredentialReportData': 'N/A'
})
return results
# Get credential report
report_content = get_credential_report(session)
if not report_content:
results.append({
'AccountID': account_id,
'AccountName': account_name,
'Email': account_email,
'PasswordLastUsed': 'ERROR: Failed to get report',
'CredentialReportData': 'N/A'
})
return results
# Parse credential report
report_data = parse_credential_report(report_content)
if not report_data:
results.append({
'AccountID': account_id,
'AccountName': account_name,
'Email': account_email,
'PasswordLastUsed': 'ERROR: Failed to parse report',
'CredentialReportData': 'N/A'
})
return results
# Process each user in the report
for user_data in report_data:
results.append({
'AccountID': account_id,
'AccountName': account_name,
'Email': account_email,
'PasswordLastUsed': user_data.get('password_last_used', 'N/A'),
'CredentialReportData': str(user_data)
})
return results
def main():
"""Main execution function."""
console.print("[bold green]AWS Multi-Account Credential Report Script[/bold green]")
console.print(f"Role to assume: {ROLE_NAME}")
console.print(f"Output file: {OUTPUT_FILE}\n")
# Get all active accounts
console.print("[cyan]Retrieving active AWS accounts...[/cyan]")
accounts = get_org_accounts()
if not accounts:
console.print("[red]No accounts found or error retrieving accounts.[/red]")
return
console.print(f"[green]Found {len(accounts)} active accounts[/green]\n")
# Process accounts with multi-threading
all_results = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=console
) as progress:
task = progress.add_task(
f"[cyan]Processing {len(accounts)} accounts...",
total=len(accounts)
)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Submit all tasks
future_to_account = {
executor.submit(process_account, account, ROLE_NAME): account
for account in accounts
}
# Process completed tasks
for future in as_completed(future_to_account):
account = future_to_account[future]
try:
results = future.result()
all_results.extend(results)
except Exception as e:
console.print(f"[red]Error processing {account['Id']}: {e}[/red]")
all_results.append({
'AccountID': account['Id'],
'AccountName': account['Name'],
'Email': account['Email'],
'PasswordLastUsed': f'ERROR: {str(e)}',
'CredentialReportData': 'N/A'
})
finally:
progress.update(task, advance=1)
# Write results to CSV
console.print(f"\n[cyan]Writing results to {OUTPUT_FILE}...[/cyan]")
try:
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['AccountID', 'AccountName', 'Email', 'PasswordLastUsed', 'CredentialReportData']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(all_results)
console.print(f"[bold green]✓ Successfully wrote {len(all_results)} records to {OUTPUT_FILE}[/bold green]")
except Exception as e:
console.print(f"[red]Error writing CSV file: {e}[/red]")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment