Last active
January 16, 2025 01:14
-
-
Save fischerdr/7c62df6be37359956e6ac2696c01ee50 to your computer and use it in GitHub Desktop.
pip install hvac click ; python vault_traversal.py --url https://vault.example.com --token mytoken --path secret/ --namespace space
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import hvac | |
| import os | |
| import click | |
| import logging | |
| from typing import Optional, List, Dict, Any, Union | |
| from pick import pick | |
| # Configure logging with more detailed format | |
| logging.basicConfig( | |
| level=logging.DEBUG, # Set to DEBUG level for more detailed information | |
| format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Create a specific logger for hvac client operations | |
| hvac_logger = logging.getLogger('hvac.client') | |
| hvac_logger.setLevel(logging.DEBUG) | |
| def debug_token(token: str) -> str: | |
| """ | |
| Safely debug token information without exposing the full token. | |
| Args: | |
| token: The token to debug | |
| Returns: | |
| str: A safe representation of the token for debugging | |
| """ | |
| if not token: | |
| return "No token provided" | |
| # Show only first 4 and last 4 characters of the token | |
| if len(token) > 8: | |
| return f"{token[:4]}...{token[-4:]}" | |
| return "Token too short" | |
| @click.command() | |
| @click.option('--url', default=None, help='Vault server URL') | |
| @click.option('--token', default=None, help='Vault token or path to token file') | |
| @click.option('--username', default=None, help='Username for Vault login') | |
| @click.option('--path', default=None, help='Starting path for traversal') | |
| @click.option('--namespace', default=None, help='Vault namespace') | |
| @click.option('--cert', default=None, help='Path to SSL certificate (PEM) file for verification') | |
| @click.option('--show-token-info', is_flag=True, help='Show detailed token information and permissions') | |
| def main(url: Optional[str], token: Optional[str], username: Optional[str], | |
| path: Optional[str], namespace: Optional[str], cert: Optional[str], | |
| show_token_info: bool = False) -> None: | |
| """ | |
| Main entry point for Vault path traversal tool. | |
| Args: | |
| url: Vault server URL | |
| token: Vault token or path to token file | |
| username: Username for Vault login | |
| path: Starting path for traversal | |
| namespace: Vault namespace | |
| cert: Path to SSL certificate (PEM) file for verification | |
| show_token_info: Flag to show detailed token information | |
| """ | |
| # Setup Vault client | |
| client = setup_vault_client(url, token, username, namespace, cert) | |
| if not client: | |
| return | |
| # Show token information if requested | |
| if show_token_info: | |
| display_token_info(client) | |
| if not path: # If no path specified, exit after showing token info | |
| return | |
| if path: | |
| if not validate_path_access(client, path): | |
| click.echo(f"Unable to access path: {path}") | |
| click.echo("Please verify:") | |
| click.echo("1. The path exists and is a valid KV v2 secrets path") | |
| click.echo("2. Your token has the required permissions") | |
| click.echo("3. The namespace is correct (if using namespaces)") | |
| return | |
| vaults = [os.path.join(path.rstrip('/'), '')] | |
| else: | |
| try: | |
| mounts = client.sys.list_mounted_secrets_engines()['data'] | |
| vaults = [] | |
| for mount, details in mounts.items(): | |
| if details['type'] == 'kv': | |
| if validate_path_access(client, mount.rstrip('/')): | |
| vaults.append(mount) | |
| else: | |
| logger.info(f"Skipping inaccessible mount: {mount}") | |
| if not vaults: | |
| click.echo("No accessible KV v2 secret mounts found.") | |
| click.echo("Please verify your token has the required permissions.") | |
| return | |
| except Exception as e: | |
| if "permission denied" in str(e).lower(): | |
| logger.error("Permission denied when listing secret engines. Please check your token permissions.") | |
| else: | |
| logger.error(f"Error listing secret engines: {str(e)}") | |
| return | |
| # Collect all secrets | |
| all_secrets: List[str] = [] | |
| for vault in vaults: | |
| collect_secrets(client, vault, all_secrets) | |
| if not all_secrets: | |
| click.echo("\nNo accessible secrets found in the specified path(s).") | |
| click.echo("Please verify:") | |
| click.echo("1. The paths contain KV v2 secrets") | |
| click.echo("2. Your token has the required permissions") | |
| click.echo("3. The namespace is correct (if using namespaces)") | |
| return | |
| # Let user pick a secret | |
| title = 'Please choose a secret to view (use arrow keys and Enter to select):' | |
| selected_secret, _ = pick(all_secrets, title) | |
| # Display the selected secret | |
| secret_data = get_secret_data(client, selected_secret) | |
| if secret_data: | |
| click.echo("\nSecret data:") | |
| for key, value in secret_data.items(): | |
| click.echo(f"{key}: {value}") | |
| else: | |
| click.echo("Failed to retrieve secret data") | |
| def setup_vault_client(url: Optional[str], token: Optional[str], | |
| username: Optional[str], namespace: Optional[str], | |
| cert: Optional[str]) -> Optional[hvac.Client]: | |
| """ | |
| Set up and authenticate the Vault client. | |
| Args: | |
| url: Vault server URL | |
| token: Vault token or path to token file | |
| username: Username for Vault login | |
| namespace: Vault namespace | |
| cert: Path to SSL certificate (PEM) file for verification | |
| Returns: | |
| hvac.Client: Authenticated Vault client or None if setup fails | |
| """ | |
| if url is None: | |
| url = os.environ.get('VAULT_ADDR') | |
| if token is None: | |
| token = os.environ.get('VAULT_TOKEN') | |
| if namespace is None: | |
| namespace = os.environ.get('VAULT_NAMESPACE') | |
| if cert is None: | |
| cert = os.environ.get('VAULT_CACERT') | |
| # Debug logging for configuration | |
| logger.debug("Vault Configuration:") | |
| logger.debug(f"URL: {url}") | |
| logger.debug(f"Token Source: {'Environment' if token == os.environ.get('VAULT_TOKEN') else 'Command Line/File'}") | |
| logger.debug(f"Token: {debug_token(token) if token else 'None'}") | |
| logger.debug(f"Namespace: {namespace}") | |
| logger.debug(f"Certificate: {cert}") | |
| if not url: | |
| logger.error("Vault URL must be provided either as an argument or in the VAULT_ADDR environment variable.") | |
| return None | |
| try: | |
| verify: Union[bool, str] = True | |
| if cert: | |
| if not os.path.isfile(cert): | |
| logger.error(f"SSL certificate file not found: {cert}") | |
| return None | |
| verify = cert | |
| logger.info(f"Using SSL certificate for verification: {cert}") | |
| logger.debug("Creating Vault client with URL: %s, Namespace: %s", url, namespace) | |
| client = hvac.Client(url=url, namespace=namespace, verify=verify) | |
| if token: | |
| if os.path.isfile(token): | |
| logger.debug("Reading token from file: %s", token) | |
| with open(token, 'r') as f: | |
| token = f.read().strip() | |
| logger.debug("Token read from file: %s", debug_token(token)) | |
| client.token = token | |
| logger.debug("Token set in client: %s", debug_token(client.token)) | |
| elif username: | |
| logger.debug("Attempting login with username: %s", username) | |
| # Prompt for password securely | |
| password = click.prompt("Enter your Vault password", hide_input=True) | |
| # Attempt to login and get a token | |
| try: | |
| login_response = client.auth.userpass.login(username, password) | |
| client.token = login_response['auth']['client_token'] | |
| logger.debug("Successfully logged in with username, received token: %s", | |
| debug_token(client.token)) | |
| except Exception as e: | |
| logger.error("Failed to login with username/password: %s", str(e)) | |
| return None | |
| else: | |
| logger.error("Either a token or username must be provided.") | |
| return None | |
| # Verify authentication | |
| try: | |
| is_authenticated = client.is_authenticated() | |
| logger.debug("Authentication check result: %s", is_authenticated) | |
| if not is_authenticated: | |
| logger.error("Failed to authenticate with Vault - token validation failed") | |
| return None | |
| # Additional token validation | |
| try: | |
| token_info = client.auth.token.lookup_self() | |
| logger.debug("Token validation successful:") | |
| logger.debug("Token display name: %s", token_info.get('display_name')) | |
| logger.debug("Token policies: %s", token_info.get('policies', [])) | |
| logger.debug("Token TTL: %s seconds", token_info.get('ttl')) | |
| except Exception as e: | |
| logger.error("Failed to lookup token details: %s", str(e)) | |
| return None | |
| except Exception as e: | |
| logger.error("Error during authentication check: %s", str(e)) | |
| return None | |
| return client | |
| except Exception as e: | |
| logger.error("Error connecting to Vault: %s", str(e)) | |
| return None | |
| def get_secret_data(client: hvac.Client, path: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get secret data from a specific path in Vault. | |
| Args: | |
| client: Authenticated Vault client | |
| path: Full path to the secret | |
| Returns: | |
| Optional[Dict[str, Any]]: Secret data if successful, None otherwise | |
| """ | |
| try: | |
| secret = client.secrets.kv.v2.read_secret_version(path=path) | |
| return secret['data']['data'] | |
| except Exception as e: | |
| logger.error(f"Error reading secret at {path}: {str(e)}") | |
| return None | |
| def collect_secrets(client: hvac.Client, path: str, secrets_list: List[str]) -> None: | |
| """ | |
| Recursively collect all secret paths in Vault. | |
| Args: | |
| client: Authenticated Vault client | |
| path: Current path to traverse | |
| secrets_list: List to store found secret paths | |
| """ | |
| try: | |
| items = client.secrets.kv.v2.list_secrets(path=path)['data']['keys'] | |
| for item in items: | |
| full_path = os.path.join(path, item) | |
| if item.endswith('/'): | |
| collect_secrets(client, full_path, secrets_list) | |
| else: | |
| secrets_list.append(full_path) | |
| except Exception as e: | |
| logger.error(f"Error listing secrets at {path}: {str(e)}") | |
| def traverse(client: hvac.Client, path: str) -> None: | |
| """ | |
| Recursively traverse Vault paths and list secrets. | |
| Args: | |
| client: Authenticated Vault client | |
| path: Path to traverse in Vault | |
| """ | |
| try: | |
| secrets = client.secrets.kv.v2.list_secrets(path=path)['data']['keys'] | |
| except hvac.exceptions.InvalidPath: | |
| return | |
| except Exception as e: | |
| logger.error(f"Error listing secrets at {path}: {str(e)}") | |
| return | |
| for secret in secrets: | |
| full_path = os.path.join(path, secret) | |
| if secret.endswith('/'): | |
| traverse(client, full_path) | |
| else: | |
| logger.info(full_path) | |
| def validate_path_access(client: hvac.Client, path: str) -> bool: | |
| """ | |
| Validate if the client has access to the given path. | |
| Args: | |
| client: Authenticated Vault client | |
| path: Path to validate | |
| Returns: | |
| bool: True if path is accessible, False otherwise | |
| """ | |
| try: | |
| # Try to list the path to check permissions | |
| client.secrets.kv.v2.list_secrets(path=path) | |
| return True | |
| except hvac.exceptions.InvalidPath: | |
| logger.warning(f"Path does not exist or is not a valid KV v2 path: {path}") | |
| return False | |
| except Exception as e: | |
| if "permission denied" in str(e).lower(): | |
| logger.warning(f"Permission denied for path: {path}") | |
| else: | |
| logger.error(f"Error accessing path {path}: {str(e)}") | |
| return False | |
| def display_token_info(client: hvac.Client) -> None: | |
| """ | |
| Display information about the current token including its policies and metadata. | |
| Args: | |
| client: Authenticated Vault client | |
| """ | |
| try: | |
| # Get token information | |
| token_info = client.auth.token.lookup_self() | |
| click.echo("\nToken Information:") | |
| click.echo("-" * 50) | |
| # Display basic token info | |
| click.echo(f"Display Name: {token_info.get('display_name', 'N/A')}") | |
| click.echo(f"Token Type: {token_info.get('type', 'N/A')}") | |
| click.echo(f"Token TTL: {token_info.get('ttl', 'N/A')} seconds") | |
| # Display policies | |
| policies = token_info.get('policies', []) | |
| click.echo("\nPolicies:") | |
| if policies: | |
| for policy in policies: | |
| click.echo(f" - {policy}") | |
| try: | |
| # Try to get policy details | |
| policy_info = client.sys.read_policy(policy) | |
| if policy_info and 'rules' in policy_info: | |
| click.echo(" Permissions:") | |
| rules = policy_info['rules'].split('\n') | |
| for rule in rules: | |
| if rule.strip(): | |
| click.echo(f" {rule.strip()}") | |
| except Exception: | |
| # Skip policy details if we can't read them | |
| continue | |
| else: | |
| click.echo(" No policies attached") | |
| # Display metadata if any | |
| metadata = token_info.get('metadata', {}) | |
| if metadata: | |
| click.echo("\nMetadata:") | |
| for key, value in metadata.items(): | |
| click.echo(f" {key}: {value}") | |
| # Display token capabilities for current path | |
| try: | |
| capabilities = client.auth.token.lookup_self()['data']['capabilities'] | |
| click.echo("\nToken Capabilities:") | |
| for cap in capabilities: | |
| click.echo(f" - {cap}") | |
| except Exception: | |
| click.echo("\nUnable to retrieve token capabilities") | |
| except Exception as e: | |
| logger.error(f"Error retrieving token information: {str(e)}") | |
| click.echo("Unable to retrieve token information. Please check your permissions.") | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
changed to public