Skip to content

Instantly share code, notes, and snippets.

@fischerdr
Created November 26, 2024 15:47
Show Gist options
  • Select an option

  • Save fischerdr/637f6083a948fb04a944ee87b052d834 to your computer and use it in GitHub Desktop.

Select an option

Save fischerdr/637f6083a948fb04a944ee87b052d834 to your computer and use it in GitHub Desktop.
import os
import logging
import hvac
from argparse import ArgumentParser
def setup_logging():
"""Set up the logging configuration."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def get_vault_client(vault_addr, token=None, username=None, password=None):
"""Authenticate and return a Vault client."""
client = hvac.Client(url=vault_addr)
if token:
logging.info("Authenticating using provided token.")
client.token = token
elif username and password:
logging.info("Authenticating using username and password.")
client.auth.userpass.login(username=username, password=password)
elif os.getenv("VAULT_TOKEN"):
logging.info("Authenticating using VAULT_TOKEN environment variable.")
client.token = os.getenv("VAULT_TOKEN")
else:
raise ValueError("No authentication method provided.")
if client.is_authenticated():
logging.info("Authentication successful.")
else:
logging.error("Authentication failed.")
raise ValueError("Failed to authenticate with Vault.")
return client
def list_k8s_roles(client, namespace, path):
"""List all Kubernetes auth roles and their settings."""
try:
# Set the namespace
client._adapter.namespace = namespace
# Get all roles under the specified path
roles_path = f"{path}/roles"
roles = client.list(roles_path)
if roles:
logging.info("Found roles in path '%s':", roles_path)
for role in roles.get("data", {}).get("keys", []):
logging.info("Fetching settings for role: %s", role)
role_settings = client.read(f"{roles_path}/{role}")
if role_settings:
logging.info("Role '%s' settings: %s", role, role_settings['data'])
else:
logging.warning("No settings found for role '%s'.", role)
else:
logging.warning("No roles found in path '%s'.", roles_path)
except Exception as e:
logging.error("Failed to list Kubernetes roles: %s", e)
def main():
parser = ArgumentParser(description="HashiCorp Vault Kubernetes Role Checker")
parser.add_argument("--vault_addr", required=True, help="Vault server address.")
parser.add_argument("--namespace", required=True, help="Vault namespace.")
parser.add_argument("--path", required=True, help="Kubernetes auth path.")
parser.add_argument("--token", help="Vault token.")
parser.add_argument("--username", help="Vault username (for userpass auth).")
parser.add_argument("--password", help="Vault password (for userpass auth).")
args = parser.parse_args()
setup_logging()
try:
client = get_vault_client(
vault_addr=args.vault_addr,
token=args.token,
username=args.username,
password=args.password,
)
list_k8s_roles(client, args.namespace, args.path)
except Exception as e:
logging.error("An error occurred: %s", e)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment