Skip to content

Instantly share code, notes, and snippets.

@RajChowdhury240
Last active February 4, 2026 17:48
Show Gist options
  • Select an option

  • Save RajChowdhury240/39165a9ea73293f04602586ef2870843 to your computer and use it in GitHub Desktop.

Select an option

Save RajChowdhury240/39165a9ea73293f04602586ef2870843 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
AWS Sandbox Account Role Scanner
Scans sandbox accounts for roles with 'sandbox-' prefix that don't have matching policies
"""
import boto3
import csv
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Optional
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeRemainingColumn
from rich.table import Table
from rich.panel import Panel
from rich import box
console = Console()
@dataclass
class AccountInfo:
"""Data class for AWS account information"""
account_id: str
account_name: str
status: str
email: str
@dataclass
class RoleFinding:
"""Data class for role findings"""
account_id: str
account_name: str
role_name: str
role_arn: str
attached_policies: str
inline_policies: str
has_sandbox_policy: bool
issue_description: str
class AWSAccountScanner:
"""Main scanner class for AWS accounts and roles"""
def __init__(self, assume_role_name: str = "ca-iam-cie-engineer", max_workers: int = 10):
"""
Initialize the scanner
Args:
assume_role_name: Role name to assume in target accounts
max_workers: Maximum number of concurrent threads
"""
self.assume_role_name = assume_role_name
self.max_workers = max_workers
self.org_client = boto3.client('organizations')
self.sts_client = boto3.client('sts')
self.findings: List[RoleFinding] = []
def list_all_accounts(self) -> List[AccountInfo]:
"""
List all AWS accounts from AWS Organizations
Returns:
List of AccountInfo objects
"""
console.print("[bold blue]Fetching all AWS accounts from Organizations...[/bold blue]")
accounts = []
paginator = self.org_client.get_paginator('list_accounts')
try:
for page in paginator.paginate():
for account in page['Accounts']:
accounts.append(AccountInfo(
account_id=account['Id'],
account_name=account['Name'],
status=account['Status'],
email=account['Email']
))
except Exception as e:
console.print(f"[bold red]Error fetching accounts: {str(e)}[/bold red]")
raise
return accounts
def filter_sandbox_accounts(self, accounts: List[AccountInfo]) -> List[AccountInfo]:
"""
Filter accounts that have 'sandbox' in their name and are active
Args:
accounts: List of all accounts
Returns:
Filtered list of sandbox accounts
"""
sandbox_accounts = [
acc for acc in accounts
if 'sandbox' in acc.account_name.lower() and acc.status == 'ACTIVE'
]
console.print(f"[green]Found {len(sandbox_accounts)} active sandbox accounts[/green]")
return sandbox_accounts
def assume_role_in_account(self, account_id: str, role_name: str) -> Optional[boto3.Session]:
"""
Assume a role in the target account
Args:
account_id: Target account ID
role_name: Role name to assume
Returns:
Boto3 session with assumed role credentials or None if failed
"""
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
try:
response = self.sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=f"SandboxScanner-{account_id}",
DurationSeconds=3600
)
credentials = response['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return session
except Exception as e:
console.print(f"[yellow]Failed to assume role in account {account_id}: {str(e)}[/yellow]")
return None
def check_role_policies(self, iam_client, role_name: str) -> tuple[List[str], List[str], bool]:
"""
Check if a role has policies with 'sandbox-' prefix
Args:
iam_client: IAM client for the account
role_name: Role name to check
Returns:
Tuple of (attached_policies, inline_policies, has_sandbox_policy)
"""
attached_policies = []
inline_policies = []
has_sandbox_policy = False
try:
# Get attached managed policies
paginator = iam_client.get_paginator('list_attached_role_policies')
for page in paginator.paginate(RoleName=role_name):
for policy in page['AttachedPolicies']:
policy_name = policy['PolicyName']
attached_policies.append(policy_name)
if policy_name.startswith('sandbox-'):
has_sandbox_policy = True
# Get inline policies
paginator = iam_client.get_paginator('list_role_policies')
for page in paginator.paginate(RoleName=role_name):
for policy_name in page['PolicyNames']:
inline_policies.append(policy_name)
if policy_name.startswith('sandbox-'):
has_sandbox_policy = True
except Exception as e:
console.print(f"[yellow]Error checking policies for role {role_name}: {str(e)}[/yellow]")
return attached_policies, inline_policies, has_sandbox_policy
def scan_account_roles(self, account: AccountInfo, progress, task_id) -> List[RoleFinding]:
"""
Scan a single account for sandbox roles without matching policies
Args:
account: Account to scan
progress: Rich progress object
task_id: Task ID for progress tracking
Returns:
List of findings
"""
findings = []
# Assume role in the account
session = self.assume_role_in_account(account.account_id, self.assume_role_name)
if not session:
progress.update(task_id, advance=1)
return findings
iam_client = session.client('iam')
try:
# List all roles
paginator = iam_client.get_paginator('list_roles')
for page in paginator.paginate():
for role in page['Roles']:
role_name = role['RoleName']
# Check if role name starts with 'sandbox-'
if role_name.startswith('sandbox-'):
attached_policies, inline_policies, has_sandbox_policy = \
self.check_role_policies(iam_client, role_name)
# If role has 'sandbox-' prefix but no 'sandbox-' policy
if not has_sandbox_policy:
findings.append(RoleFinding(
account_id=account.account_id,
account_name=account.account_name,
role_name=role_name,
role_arn=role['Arn'],
attached_policies=', '.join(attached_policies) if attached_policies else 'None',
inline_policies=', '.join(inline_policies) if inline_policies else 'None',
has_sandbox_policy=has_sandbox_policy,
issue_description=f"Role '{role_name}' has 'sandbox-' prefix but no matching 'sandbox-' policy"
))
except Exception as e:
console.print(f"[red]Error scanning account {account.account_id}: {str(e)}[/red]")
progress.update(task_id, advance=1)
return findings
def scan_all_accounts(self, accounts: List[AccountInfo]) -> List[RoleFinding]:
"""
Scan all accounts using multithreading
Args:
accounts: List of accounts to scan
Returns:
List of all findings
"""
all_findings = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeRemainingColumn(),
console=console
) as progress:
task = progress.add_task(
"[cyan]Scanning sandbox accounts...",
total=len(accounts)
)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_account = {
executor.submit(self.scan_account_roles, account, progress, task): account
for account in accounts
}
for future in as_completed(future_to_account):
account = future_to_account[future]
try:
findings = future.result()
all_findings.extend(findings)
except Exception as e:
console.print(f"[red]Exception for account {account.account_id}: {str(e)}[/red]")
return all_findings
def display_results_table(self, findings: List[RoleFinding]):
"""
Display findings in a rich table
Args:
findings: List of findings to display
"""
table = Table(
title="[bold cyan]Sandbox Role Scan Results[/bold cyan]",
box=box.ROUNDED,
show_header=True,
header_style="bold magenta"
)
table.add_column("Account ID", style="cyan", no_wrap=True)
table.add_column("Account Name", style="green")
table.add_column("Role Name", style="yellow")
table.add_column("Attached Policies", style="blue")
table.add_column("Inline Policies", style="blue")
table.add_column("Issue", style="red")
for finding in findings:
table.add_row(
finding.account_id,
finding.account_name,
finding.role_name,
finding.attached_policies[:50] + "..." if len(finding.attached_policies) > 50 else finding.attached_policies,
finding.inline_policies[:50] + "..." if len(finding.inline_policies) > 50 else finding.inline_policies,
"Missing sandbox- policy"
)
console.print(table)
def save_to_csv(self, findings: List[RoleFinding], filename: str):
"""
Save findings to CSV file
Args:
findings: List of findings to save
filename: Output CSV filename
"""
if not findings:
console.print("[yellow]No findings to save to CSV[/yellow]")
return
fieldnames = [
'account_id', 'account_name', 'role_name', 'role_arn',
'attached_policies', 'inline_policies', 'has_sandbox_policy',
'issue_description'
]
try:
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for finding in findings:
writer.writerow(asdict(finding))
console.print(f"[bold green]Results saved to: {filename}[/bold green]")
except Exception as e:
console.print(f"[bold red]Error saving CSV: {str(e)}[/bold red]")
def run(self):
"""Main execution method"""
console.print(Panel.fit(
"[bold cyan]AWS Sandbox Account Role Scanner[/bold cyan]\n"
"Scanning for roles with 'sandbox-' prefix missing 'sandbox-' policies",
border_style="cyan"
))
try:
# Step 1: List all accounts
all_accounts = self.list_all_accounts()
console.print(f"[green]Total accounts found: {len(all_accounts)}[/green]\n")
# Step 2: Filter sandbox accounts
sandbox_accounts = self.filter_sandbox_accounts(all_accounts)
if not sandbox_accounts:
console.print("[yellow]No sandbox accounts found. Exiting.[/yellow]")
return
# Display sandbox accounts
sandbox_table = Table(title="Sandbox Accounts to Scan", box=box.SIMPLE)
sandbox_table.add_column("Account ID", style="cyan")
sandbox_table.add_column("Account Name", style="green")
sandbox_table.add_column("Email", style="blue")
for acc in sandbox_accounts:
sandbox_table.add_row(acc.account_id, acc.account_name, acc.email)
console.print(sandbox_table)
console.print()
# Step 3 & 4: Assume role and scan for mismatched roles
findings = self.scan_all_accounts(sandbox_accounts)
# Display and save results
console.print(f"\n[bold cyan]Scan Complete![/bold cyan]")
console.print(f"[green]Total issues found: {len(findings)}[/green]\n")
if findings:
self.display_results_table(findings)
# Save to CSV
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f"/mnt/user-data/outputs/sandbox_role_findings_{timestamp}.csv"
self.save_to_csv(findings, csv_filename)
else:
console.print("[green]No issues found! All sandbox roles have matching policies.[/green]")
except Exception as e:
console.print(f"[bold red]Fatal error: {str(e)}[/bold red]")
raise
def main():
"""Main entry point"""
# Configuration
ASSUME_ROLE_NAME = "ca-iam-cie-engineer"
MAX_WORKERS = 10 # Adjust based on your needs
scanner = AWSAccountScanner(
assume_role_name=ASSUME_ROLE_NAME,
max_workers=MAX_WORKERS
)
scanner.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment