Last active
February 4, 2026 17:48
-
-
Save RajChowdhury240/39165a9ea73293f04602586ef2870843 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| AWS Sandbox Account Role Scanner | |
| Scans sandbox accounts for roles with 'sandbox-' prefix that don't have matching policies | |
| """ | |
| import boto3 | |
| import csv | |
| import json | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dataclasses import dataclass, asdict | |
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| from rich.console import Console | |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeRemainingColumn | |
| from rich.table import Table | |
| from rich.panel import Panel | |
| from rich import box | |
| console = Console() | |
| @dataclass | |
| class AccountInfo: | |
| """Data class for AWS account information""" | |
| account_id: str | |
| account_name: str | |
| status: str | |
| email: str | |
| @dataclass | |
| class RoleFinding: | |
| """Data class for role findings""" | |
| account_id: str | |
| account_name: str | |
| role_name: str | |
| role_arn: str | |
| attached_policies: str | |
| inline_policies: str | |
| has_sandbox_policy: bool | |
| issue_description: str | |
| class AWSAccountScanner: | |
| """Main scanner class for AWS accounts and roles""" | |
| def __init__(self, assume_role_name: str = "ca-iam-cie-engineer", max_workers: int = 10): | |
| """ | |
| Initialize the scanner | |
| Args: | |
| assume_role_name: Role name to assume in target accounts | |
| max_workers: Maximum number of concurrent threads | |
| """ | |
| self.assume_role_name = assume_role_name | |
| self.max_workers = max_workers | |
| self.org_client = boto3.client('organizations') | |
| self.sts_client = boto3.client('sts') | |
| self.findings: List[RoleFinding] = [] | |
| def list_all_accounts(self) -> List[AccountInfo]: | |
| """ | |
| List all AWS accounts from AWS Organizations | |
| Returns: | |
| List of AccountInfo objects | |
| """ | |
| console.print("[bold blue]Fetching all AWS accounts from Organizations...[/bold blue]") | |
| accounts = [] | |
| paginator = self.org_client.get_paginator('list_accounts') | |
| try: | |
| for page in paginator.paginate(): | |
| for account in page['Accounts']: | |
| accounts.append(AccountInfo( | |
| account_id=account['Id'], | |
| account_name=account['Name'], | |
| status=account['Status'], | |
| email=account['Email'] | |
| )) | |
| except Exception as e: | |
| console.print(f"[bold red]Error fetching accounts: {str(e)}[/bold red]") | |
| raise | |
| return accounts | |
| def filter_sandbox_accounts(self, accounts: List[AccountInfo]) -> List[AccountInfo]: | |
| """ | |
| Filter accounts that have 'sandbox' in their name and are active | |
| Args: | |
| accounts: List of all accounts | |
| Returns: | |
| Filtered list of sandbox accounts | |
| """ | |
| sandbox_accounts = [ | |
| acc for acc in accounts | |
| if 'sandbox' in acc.account_name.lower() and acc.status == 'ACTIVE' | |
| ] | |
| console.print(f"[green]Found {len(sandbox_accounts)} active sandbox accounts[/green]") | |
| return sandbox_accounts | |
| def assume_role_in_account(self, account_id: str, role_name: str) -> Optional[boto3.Session]: | |
| """ | |
| Assume a role in the target account | |
| Args: | |
| account_id: Target account ID | |
| role_name: Role name to assume | |
| Returns: | |
| Boto3 session with assumed role credentials or None if failed | |
| """ | |
| role_arn = f"arn:aws:iam::{account_id}:role/{role_name}" | |
| try: | |
| response = self.sts_client.assume_role( | |
| RoleArn=role_arn, | |
| RoleSessionName=f"SandboxScanner-{account_id}", | |
| DurationSeconds=3600 | |
| ) | |
| credentials = response['Credentials'] | |
| session = boto3.Session( | |
| aws_access_key_id=credentials['AccessKeyId'], | |
| aws_secret_access_key=credentials['SecretAccessKey'], | |
| aws_session_token=credentials['SessionToken'] | |
| ) | |
| return session | |
| except Exception as e: | |
| console.print(f"[yellow]Failed to assume role in account {account_id}: {str(e)}[/yellow]") | |
| return None | |
| def check_role_policies(self, iam_client, role_name: str) -> tuple[List[str], List[str], bool]: | |
| """ | |
| Check if a role has policies with 'sandbox-' prefix | |
| Args: | |
| iam_client: IAM client for the account | |
| role_name: Role name to check | |
| Returns: | |
| Tuple of (attached_policies, inline_policies, has_sandbox_policy) | |
| """ | |
| attached_policies = [] | |
| inline_policies = [] | |
| has_sandbox_policy = False | |
| try: | |
| # Get attached managed policies | |
| paginator = iam_client.get_paginator('list_attached_role_policies') | |
| for page in paginator.paginate(RoleName=role_name): | |
| for policy in page['AttachedPolicies']: | |
| policy_name = policy['PolicyName'] | |
| attached_policies.append(policy_name) | |
| if policy_name.startswith('sandbox-'): | |
| has_sandbox_policy = True | |
| # Get inline policies | |
| paginator = iam_client.get_paginator('list_role_policies') | |
| for page in paginator.paginate(RoleName=role_name): | |
| for policy_name in page['PolicyNames']: | |
| inline_policies.append(policy_name) | |
| if policy_name.startswith('sandbox-'): | |
| has_sandbox_policy = True | |
| except Exception as e: | |
| console.print(f"[yellow]Error checking policies for role {role_name}: {str(e)}[/yellow]") | |
| return attached_policies, inline_policies, has_sandbox_policy | |
| def scan_account_roles(self, account: AccountInfo, progress, task_id) -> List[RoleFinding]: | |
| """ | |
| Scan a single account for sandbox roles without matching policies | |
| Args: | |
| account: Account to scan | |
| progress: Rich progress object | |
| task_id: Task ID for progress tracking | |
| Returns: | |
| List of findings | |
| """ | |
| findings = [] | |
| # Assume role in the account | |
| session = self.assume_role_in_account(account.account_id, self.assume_role_name) | |
| if not session: | |
| progress.update(task_id, advance=1) | |
| return findings | |
| iam_client = session.client('iam') | |
| try: | |
| # List all roles | |
| paginator = iam_client.get_paginator('list_roles') | |
| for page in paginator.paginate(): | |
| for role in page['Roles']: | |
| role_name = role['RoleName'] | |
| # Check if role name starts with 'sandbox-' | |
| if role_name.startswith('sandbox-'): | |
| attached_policies, inline_policies, has_sandbox_policy = \ | |
| self.check_role_policies(iam_client, role_name) | |
| # If role has 'sandbox-' prefix but no 'sandbox-' policy | |
| if not has_sandbox_policy: | |
| findings.append(RoleFinding( | |
| account_id=account.account_id, | |
| account_name=account.account_name, | |
| role_name=role_name, | |
| role_arn=role['Arn'], | |
| attached_policies=', '.join(attached_policies) if attached_policies else 'None', | |
| inline_policies=', '.join(inline_policies) if inline_policies else 'None', | |
| has_sandbox_policy=has_sandbox_policy, | |
| issue_description=f"Role '{role_name}' has 'sandbox-' prefix but no matching 'sandbox-' policy" | |
| )) | |
| except Exception as e: | |
| console.print(f"[red]Error scanning account {account.account_id}: {str(e)}[/red]") | |
| progress.update(task_id, advance=1) | |
| return findings | |
| def scan_all_accounts(self, accounts: List[AccountInfo]) -> List[RoleFinding]: | |
| """ | |
| Scan all accounts using multithreading | |
| Args: | |
| accounts: List of accounts to scan | |
| Returns: | |
| List of all findings | |
| """ | |
| all_findings = [] | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| TaskProgressColumn(), | |
| TimeRemainingColumn(), | |
| console=console | |
| ) as progress: | |
| task = progress.add_task( | |
| "[cyan]Scanning sandbox accounts...", | |
| total=len(accounts) | |
| ) | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| future_to_account = { | |
| executor.submit(self.scan_account_roles, account, progress, task): account | |
| for account in accounts | |
| } | |
| for future in as_completed(future_to_account): | |
| account = future_to_account[future] | |
| try: | |
| findings = future.result() | |
| all_findings.extend(findings) | |
| except Exception as e: | |
| console.print(f"[red]Exception for account {account.account_id}: {str(e)}[/red]") | |
| return all_findings | |
| def display_results_table(self, findings: List[RoleFinding]): | |
| """ | |
| Display findings in a rich table | |
| Args: | |
| findings: List of findings to display | |
| """ | |
| table = Table( | |
| title="[bold cyan]Sandbox Role Scan Results[/bold cyan]", | |
| box=box.ROUNDED, | |
| show_header=True, | |
| header_style="bold magenta" | |
| ) | |
| table.add_column("Account ID", style="cyan", no_wrap=True) | |
| table.add_column("Account Name", style="green") | |
| table.add_column("Role Name", style="yellow") | |
| table.add_column("Attached Policies", style="blue") | |
| table.add_column("Inline Policies", style="blue") | |
| table.add_column("Issue", style="red") | |
| for finding in findings: | |
| table.add_row( | |
| finding.account_id, | |
| finding.account_name, | |
| finding.role_name, | |
| finding.attached_policies[:50] + "..." if len(finding.attached_policies) > 50 else finding.attached_policies, | |
| finding.inline_policies[:50] + "..." if len(finding.inline_policies) > 50 else finding.inline_policies, | |
| "Missing sandbox- policy" | |
| ) | |
| console.print(table) | |
| def save_to_csv(self, findings: List[RoleFinding], filename: str): | |
| """ | |
| Save findings to CSV file | |
| Args: | |
| findings: List of findings to save | |
| filename: Output CSV filename | |
| """ | |
| if not findings: | |
| console.print("[yellow]No findings to save to CSV[/yellow]") | |
| return | |
| fieldnames = [ | |
| 'account_id', 'account_name', 'role_name', 'role_arn', | |
| 'attached_policies', 'inline_policies', 'has_sandbox_policy', | |
| 'issue_description' | |
| ] | |
| try: | |
| with open(filename, 'w', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for finding in findings: | |
| writer.writerow(asdict(finding)) | |
| console.print(f"[bold green]Results saved to: {filename}[/bold green]") | |
| except Exception as e: | |
| console.print(f"[bold red]Error saving CSV: {str(e)}[/bold red]") | |
| def run(self): | |
| """Main execution method""" | |
| console.print(Panel.fit( | |
| "[bold cyan]AWS Sandbox Account Role Scanner[/bold cyan]\n" | |
| "Scanning for roles with 'sandbox-' prefix missing 'sandbox-' policies", | |
| border_style="cyan" | |
| )) | |
| try: | |
| # Step 1: List all accounts | |
| all_accounts = self.list_all_accounts() | |
| console.print(f"[green]Total accounts found: {len(all_accounts)}[/green]\n") | |
| # Step 2: Filter sandbox accounts | |
| sandbox_accounts = self.filter_sandbox_accounts(all_accounts) | |
| if not sandbox_accounts: | |
| console.print("[yellow]No sandbox accounts found. Exiting.[/yellow]") | |
| return | |
| # Display sandbox accounts | |
| sandbox_table = Table(title="Sandbox Accounts to Scan", box=box.SIMPLE) | |
| sandbox_table.add_column("Account ID", style="cyan") | |
| sandbox_table.add_column("Account Name", style="green") | |
| sandbox_table.add_column("Email", style="blue") | |
| for acc in sandbox_accounts: | |
| sandbox_table.add_row(acc.account_id, acc.account_name, acc.email) | |
| console.print(sandbox_table) | |
| console.print() | |
| # Step 3 & 4: Assume role and scan for mismatched roles | |
| findings = self.scan_all_accounts(sandbox_accounts) | |
| # Display and save results | |
| console.print(f"\n[bold cyan]Scan Complete![/bold cyan]") | |
| console.print(f"[green]Total issues found: {len(findings)}[/green]\n") | |
| if findings: | |
| self.display_results_table(findings) | |
| # Save to CSV | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| csv_filename = f"/mnt/user-data/outputs/sandbox_role_findings_{timestamp}.csv" | |
| self.save_to_csv(findings, csv_filename) | |
| else: | |
| console.print("[green]No issues found! All sandbox roles have matching policies.[/green]") | |
| except Exception as e: | |
| console.print(f"[bold red]Fatal error: {str(e)}[/bold red]") | |
| raise | |
| def main(): | |
| """Main entry point""" | |
| # Configuration | |
| ASSUME_ROLE_NAME = "ca-iam-cie-engineer" | |
| MAX_WORKERS = 10 # Adjust based on your needs | |
| scanner = AWSAccountScanner( | |
| assume_role_name=ASSUME_ROLE_NAME, | |
| max_workers=MAX_WORKERS | |
| ) | |
| scanner.run() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment