Created
November 26, 2024 15:47
-
-
Save fischerdr/637f6083a948fb04a944ee87b052d834 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import logging | |
| import hvac | |
| from argparse import ArgumentParser | |
| def setup_logging(): | |
| """Set up the logging configuration.""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| def get_vault_client(vault_addr, token=None, username=None, password=None): | |
| """Authenticate and return a Vault client.""" | |
| client = hvac.Client(url=vault_addr) | |
| if token: | |
| logging.info("Authenticating using provided token.") | |
| client.token = token | |
| elif username and password: | |
| logging.info("Authenticating using username and password.") | |
| client.auth.userpass.login(username=username, password=password) | |
| elif os.getenv("VAULT_TOKEN"): | |
| logging.info("Authenticating using VAULT_TOKEN environment variable.") | |
| client.token = os.getenv("VAULT_TOKEN") | |
| else: | |
| raise ValueError("No authentication method provided.") | |
| if client.is_authenticated(): | |
| logging.info("Authentication successful.") | |
| else: | |
| logging.error("Authentication failed.") | |
| raise ValueError("Failed to authenticate with Vault.") | |
| return client | |
| def list_k8s_roles(client, namespace, path): | |
| """List all Kubernetes auth roles and their settings.""" | |
| try: | |
| # Set the namespace | |
| client._adapter.namespace = namespace | |
| # Get all roles under the specified path | |
| roles_path = f"{path}/roles" | |
| roles = client.list(roles_path) | |
| if roles: | |
| logging.info("Found roles in path '%s':", roles_path) | |
| for role in roles.get("data", {}).get("keys", []): | |
| logging.info("Fetching settings for role: %s", role) | |
| role_settings = client.read(f"{roles_path}/{role}") | |
| if role_settings: | |
| logging.info("Role '%s' settings: %s", role, role_settings['data']) | |
| else: | |
| logging.warning("No settings found for role '%s'.", role) | |
| else: | |
| logging.warning("No roles found in path '%s'.", roles_path) | |
| except Exception as e: | |
| logging.error("Failed to list Kubernetes roles: %s", e) | |
| def main(): | |
| parser = ArgumentParser(description="HashiCorp Vault Kubernetes Role Checker") | |
| parser.add_argument("--vault_addr", required=True, help="Vault server address.") | |
| parser.add_argument("--namespace", required=True, help="Vault namespace.") | |
| parser.add_argument("--path", required=True, help="Kubernetes auth path.") | |
| parser.add_argument("--token", help="Vault token.") | |
| parser.add_argument("--username", help="Vault username (for userpass auth).") | |
| parser.add_argument("--password", help="Vault password (for userpass auth).") | |
| args = parser.parse_args() | |
| setup_logging() | |
| try: | |
| client = get_vault_client( | |
| vault_addr=args.vault_addr, | |
| token=args.token, | |
| username=args.username, | |
| password=args.password, | |
| ) | |
| list_k8s_roles(client, args.namespace, args.path) | |
| except Exception as e: | |
| logging.error("An error occurred: %s", e) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment