Created
December 12, 2025 18:46
-
-
Save RajChowdhury240/fc3b3871fe0ae7502a5d8a30280733b1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| AWS Multi-Account Credential Report Script | |
| Assumes a role across multiple AWS accounts and retrieves IAM credential reports. | |
| """ | |
| import boto3 | |
| import csv | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime | |
| from io import StringIO | |
| from typing import Dict, List, Optional | |
| from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeRemainingColumn | |
| from rich.console import Console | |
| console = Console() | |
| # Configuration | |
| ROLE_NAME = "ca-iam-cie-engineer" | |
| MAX_WORKERS = 10 # Adjust based on your needs | |
| OUTPUT_FILE = f"credential_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| def get_org_accounts() -> List[Dict[str, str]]: | |
| """Retrieve all active accounts from AWS Organizations.""" | |
| try: | |
| org_client = boto3.client('organizations') | |
| accounts = [] | |
| paginator = org_client.get_paginator('list_accounts') | |
| for page in paginator.paginate(): | |
| for account in page['Accounts']: | |
| if account['Status'] == 'ACTIVE': | |
| accounts.append({ | |
| 'Id': account['Id'], | |
| 'Name': account['Name'], | |
| 'Email': account['Email'] | |
| }) | |
| return accounts | |
| except Exception as e: | |
| console.print(f"[red]Error retrieving accounts: {e}[/red]") | |
| return [] | |
| def assume_role(account_id: str, role_name: str) -> Optional[boto3.Session]: | |
| """Assume a role in the target account.""" | |
| try: | |
| sts_client = boto3.client('sts') | |
| role_arn = f"arn:aws:iam::{account_id}:role/{role_name}" | |
| response = sts_client.assume_role( | |
| RoleArn=role_arn, | |
| RoleSessionName=f"CredentialReport-{account_id}" | |
| ) | |
| credentials = response['Credentials'] | |
| session = boto3.Session( | |
| aws_access_key_id=credentials['AccessKeyId'], | |
| aws_secret_access_key=credentials['SecretAccessKey'], | |
| aws_session_token=credentials['SessionToken'] | |
| ) | |
| return session | |
| except Exception as e: | |
| console.print(f"[yellow]Failed to assume role in {account_id}: {e}[/yellow]") | |
| return None | |
| def get_credential_report(session: boto3.Session) -> Optional[str]: | |
| """Generate and retrieve IAM credential report.""" | |
| try: | |
| iam_client = session.client('iam') | |
| # Generate credential report | |
| max_attempts = 10 | |
| for attempt in range(max_attempts): | |
| response = iam_client.generate_credential_report() | |
| if response['State'] == 'COMPLETE': | |
| break | |
| time.sleep(2) | |
| # Get the credential report | |
| report_response = iam_client.get_credential_report() | |
| report_content = report_response['Content'].decode('utf-8') | |
| return report_content | |
| except Exception as e: | |
| console.print(f"[yellow]Failed to get credential report: {e}[/yellow]") | |
| return None | |
| def parse_credential_report(report_content: str) -> List[Dict]: | |
| """Parse the credential report CSV content.""" | |
| try: | |
| csv_file = StringIO(report_content) | |
| reader = csv.DictReader(csv_file) | |
| return list(reader) | |
| except Exception as e: | |
| console.print(f"[yellow]Failed to parse credential report: {e}[/yellow]") | |
| return [] | |
| def process_account(account: Dict[str, str], role_name: str) -> List[Dict]: | |
| """Process a single account and return credential report data.""" | |
| account_id = account['Id'] | |
| account_name = account['Name'] | |
| account_email = account['Email'] | |
| results = [] | |
| # Assume role | |
| session = assume_role(account_id, role_name) | |
| if not session: | |
| results.append({ | |
| 'AccountID': account_id, | |
| 'AccountName': account_name, | |
| 'Email': account_email, | |
| 'PasswordLastUsed': 'ERROR: Failed to assume role', | |
| 'CredentialReportData': 'N/A' | |
| }) | |
| return results | |
| # Get credential report | |
| report_content = get_credential_report(session) | |
| if not report_content: | |
| results.append({ | |
| 'AccountID': account_id, | |
| 'AccountName': account_name, | |
| 'Email': account_email, | |
| 'PasswordLastUsed': 'ERROR: Failed to get report', | |
| 'CredentialReportData': 'N/A' | |
| }) | |
| return results | |
| # Parse credential report | |
| report_data = parse_credential_report(report_content) | |
| if not report_data: | |
| results.append({ | |
| 'AccountID': account_id, | |
| 'AccountName': account_name, | |
| 'Email': account_email, | |
| 'PasswordLastUsed': 'ERROR: Failed to parse report', | |
| 'CredentialReportData': 'N/A' | |
| }) | |
| return results | |
| # Process each user in the report | |
| for user_data in report_data: | |
| results.append({ | |
| 'AccountID': account_id, | |
| 'AccountName': account_name, | |
| 'Email': account_email, | |
| 'PasswordLastUsed': user_data.get('password_last_used', 'N/A'), | |
| 'CredentialReportData': str(user_data) | |
| }) | |
| return results | |
| def main(): | |
| """Main execution function.""" | |
| console.print("[bold green]AWS Multi-Account Credential Report Script[/bold green]") | |
| console.print(f"Role to assume: {ROLE_NAME}") | |
| console.print(f"Output file: {OUTPUT_FILE}\n") | |
| # Get all active accounts | |
| console.print("[cyan]Retrieving active AWS accounts...[/cyan]") | |
| accounts = get_org_accounts() | |
| if not accounts: | |
| console.print("[red]No accounts found or error retrieving accounts.[/red]") | |
| return | |
| console.print(f"[green]Found {len(accounts)} active accounts[/green]\n") | |
| # Process accounts with multi-threading | |
| all_results = [] | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
| TimeRemainingColumn(), | |
| console=console | |
| ) as progress: | |
| task = progress.add_task( | |
| f"[cyan]Processing {len(accounts)} accounts...", | |
| total=len(accounts) | |
| ) | |
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
| # Submit all tasks | |
| future_to_account = { | |
| executor.submit(process_account, account, ROLE_NAME): account | |
| for account in accounts | |
| } | |
| # Process completed tasks | |
| for future in as_completed(future_to_account): | |
| account = future_to_account[future] | |
| try: | |
| results = future.result() | |
| all_results.extend(results) | |
| except Exception as e: | |
| console.print(f"[red]Error processing {account['Id']}: {e}[/red]") | |
| all_results.append({ | |
| 'AccountID': account['Id'], | |
| 'AccountName': account['Name'], | |
| 'Email': account['Email'], | |
| 'PasswordLastUsed': f'ERROR: {str(e)}', | |
| 'CredentialReportData': 'N/A' | |
| }) | |
| finally: | |
| progress.update(task, advance=1) | |
| # Write results to CSV | |
| console.print(f"\n[cyan]Writing results to {OUTPUT_FILE}...[/cyan]") | |
| try: | |
| with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as csvfile: | |
| fieldnames = ['AccountID', 'AccountName', 'Email', 'PasswordLastUsed', 'CredentialReportData'] | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(all_results) | |
| console.print(f"[bold green]✓ Successfully wrote {len(all_results)} records to {OUTPUT_FILE}[/bold green]") | |
| except Exception as e: | |
| console.print(f"[red]Error writing CSV file: {e}[/red]") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment