Skip to content

Instantly share code, notes, and snippets.

@varesa
Created February 11, 2026 13:48
Show Gist options
  • Select an option

  • Save varesa/d19489faa6b3e2d032443a1acdac94ef to your computer and use it in GitHub Desktop.

Select an option

Save varesa/d19489faa6b3e2d032443a1acdac94ef to your computer and use it in GitHub Desktop.
import threading
from collections import defaultdict
from datetime import datetime, timezone
import kopf
import kubernetes
from compiler import compile_login_config, compile_roles_properties, compile_users_properties
from hasher import hash_password
from secret_writer import write_compiled_secret
from secrets import PasswordKeyNotFound, PasswordSecretNotFound, read_password_from_secret
# Load the in-cluster config (or local kubeconfig for development)
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
try:
kubernetes.config.load_kube_config()
except kubernetes.config.ConfigException:
pass # Running in test environment without kubeconfig
cr_api = kubernetes.client.CustomObjectsApi()
core_api = kubernetes.client.CoreV1Api()
_namespace_locks: dict[str, threading.Lock] = defaultdict(threading.Lock)
def _update_user_status(cr_api, namespace, name, state, message, last_compiled):
"""Update status subresource on a single ArtemisUser."""
status = {"state": state, "message": message}
# Only include lastCompiled when it has a valid date-time value;
# the CRD schema requires date-time format and rejects empty strings.
if last_compiled:
status["lastCompiled"] = last_compiled
cr_api.patch_namespaced_custom_object_status(
group="example.com",
version="v1",
namespace=namespace,
plural="artemisusers",
name=name,
body={"status": status},
)
def _reconcile_namespace(namespace, cr_api, core_api, logger):
"""Full reconciliation pipeline for one namespace.
Lists all ArtemisUsers, reads passwords, hashes them, compiles config files,
writes the compiled Secret, and updates status on every user.
"""
result = cr_api.list_namespaced_custom_object(
group="example.com",
version="v1",
plural="artemisusers",
namespace=namespace,
)
users = result.get("items", [])
active_users = [
u for u in users
if "deletionTimestamp" not in u.get("metadata", {})
]
compiled_users = []
has_errors = False
for user in active_users:
user_name = user["metadata"]["name"]
spec = user["spec"]
try:
password = read_password_from_secret(
core_api,
namespace,
spec["password"]["secretName"],
spec["password"]["secretKey"],
)
enc_password = hash_password(password)
compiled_users.append((spec["username"], enc_password, spec["role"]))
_update_user_status(
cr_api, namespace, user_name, "Compiled", "",
datetime.now(timezone.utc).isoformat(),
)
except (PasswordSecretNotFound, PasswordKeyNotFound) as e:
logger.warning(f"Skipping user {user_name}: {e}")
has_errors = True
_update_user_status(cr_api, namespace, user_name, "Error", str(e), "")
users_props = compile_users_properties(compiled_users)
roles_props = compile_roles_properties(compiled_users)
login_conf = compile_login_config()
write_compiled_secret(core_api, namespace, users_props, roles_props, login_conf)
if has_errors:
raise kopf.TemporaryError("Some users had missing password Secrets", delay=30)
@kopf.on.startup()
def configure(settings, **kwargs):
"""Configure Kopf to use annotations for progress storage.
Avoids CRD schema conflicts since our status schema only defines
state, message, and lastCompiled fields.
"""
settings.persistence.progress_storage = kopf.AnnotationsProgressStorage()
settings.persistence.diffbase_storage = kopf.AnnotationsDiffBaseStorage()
@kopf.on.create('artemisusers')
@kopf.on.update('artemisusers')
@kopf.on.delete('artemisusers', optional=True)
@kopf.on.resume('artemisusers')
def handle_artemis_user(namespace, name, logger, **kwargs):
"""Handle ArtemisUser lifecycle events by recompiling the namespace Secret."""
logger.info(f"Update triggered by {name} in {namespace}, recompiling secrets")
with _namespace_locks[namespace]:
_reconcile_namespace(namespace, cr_api, core_api, logger)
import base64
from datetime import datetime, timezone
import kopf
import kubernetes
from jaas_render import (
LOGIN_CONFIG_KEY,
MANAGED_BY_LABEL_KEY,
MANAGED_BY_LABEL_VALUE,
ROLES_PROPERTIES_KEY,
SECRET_NAME,
USERS_PROPERTIES_KEY,
build_audit_annotations,
render_login_config,
render_roles_properties,
render_users_properties,
)
# Load the in-cluster config (or local kubeconfig for development)
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
kubernetes.config.load_kube_config()
cr_api = kubernetes.client.CustomObjectsApi()
core_api = kubernetes.client.CoreV1Api()
ISSUE_MISSING_SECRET = "missing_secret"
ISSUE_MISSING_KEY = "missing_key"
ISSUE_EMPTY_VALUE = "empty_value"
ISSUE_REASON_MAP = {
ISSUE_MISSING_SECRET: "SecretMissing",
ISSUE_MISSING_KEY: "SecretKeyMissing",
ISSUE_EMPTY_VALUE: "SecretValueEmpty",
}
ISSUE_MESSAGE_MAP = {
ISSUE_MISSING_SECRET: "missing Secret",
ISSUE_MISSING_KEY: "missing key in Secret",
ISSUE_EMPTY_VALUE: "empty value in Secret",
}
# Deterministic priority when multiple issue categories are present.
ISSUE_REASON_PRIORITY = [
ISSUE_MISSING_SECRET,
ISSUE_MISSING_KEY,
ISSUE_EMPTY_VALUE,
]
def _read_password(logger, namespace, username, secret_name, secret_key):
try:
secret = core_api.read_namespaced_secret(secret_name, namespace)
except kubernetes.client.exceptions.ApiException as exc:
logger.warning(
"Password Secret %s for user %s could not be read: %s",
secret_name,
username,
exc,
)
return None, {"category": ISSUE_MISSING_SECRET, "secret_name": secret_name}
data = secret.data or {}
if secret_key in data:
try:
value = base64.b64decode(data[secret_key]).decode("utf-8")
if value:
return value, None
return None, {"category": ISSUE_EMPTY_VALUE, "secret_name": secret_name}
except (ValueError, UnicodeDecodeError) as exc:
logger.warning(
"Password Secret %s key %s for user %s is invalid: %s",
secret_name,
secret_key,
username,
exc,
)
return None, {"category": ISSUE_EMPTY_VALUE, "secret_name": secret_name}
string_data = getattr(secret, "string_data", None) or {}
if secret_key in string_data:
value = string_data[secret_key]
if value:
return value, None
return None, {"category": ISSUE_EMPTY_VALUE, "secret_name": secret_name}
logger.warning(
"Password Secret %s missing key %s for user %s",
secret_name,
secret_key,
username,
)
return None, {"category": ISSUE_MISSING_KEY, "secret_name": secret_name}
def _select_issue_reason(issues):
categories = {issue["category"] for issue in issues}
for category in ISSUE_REASON_PRIORITY:
if category in categories:
return ISSUE_REASON_MAP[category]
return "SecretIssues"
def _format_issue_message(issues):
unique = set()
parts = []
for issue in issues:
key = (issue["category"], issue["secret_name"])
if key in unique:
continue
unique.add(key)
parts.append(
f"{ISSUE_MESSAGE_MAP[issue['category']]} {issue['secret_name']}"
)
return "; ".join(parts)
def _update_status(patch, current_status, state, reason, message, generation):
previous_state = (current_status or {}).get("state")
previous_transition = (current_status or {}).get("lastTransitionTime")
if previous_state == state and previous_transition:
transition_time = previous_transition
else:
transition_time = datetime.now(timezone.utc).isoformat()
patch.status["state"] = state
patch.status["reason"] = reason
patch.status["message"] = message
patch.status["observedGeneration"] = generation
patch.status["lastTransitionTime"] = transition_time
@kopf.on.create("artemisusers")
@kopf.on.update("artemisusers")
@kopf.on.resume("artemisusers")
@kopf.on.delete("artemisusers")
def handler(logger, name, namespace, body, patch, status, **kwargs):
logger.info("Update triggered by %s in %s, recompiling secrets", name, namespace)
artemis_users = cr_api.list_namespaced_custom_object(
group="example.com",
plural="artemisusers",
version="v1",
namespace=namespace,
)
users = []
user_roles = {}
sources = []
seen_usernames = set()
issues = []
validation_errors = []
for item in artemis_users.get("items", []):
metadata = item.get("metadata", {})
spec = item.get("spec", {})
resource_name = metadata.get("name")
generation = metadata.get("generation")
username = spec.get("username")
password_spec = spec.get("password", {})
secret_name = password_spec.get("secretName")
secret_key = password_spec.get("secretKey")
roles = spec.get("roles") or []
if not username:
logger.warning("ArtemisUser %s missing username", resource_name)
validation_errors.append(resource_name)
continue
if not secret_name or not secret_key:
logger.warning("ArtemisUser %s missing password Secret reference", resource_name)
validation_errors.append(resource_name)
continue
if username in seen_usernames:
logger.warning("Duplicate username %s detected; skipping update", username)
validation_errors.append(resource_name)
continue
seen_usernames.add(username)
sources.append({"name": resource_name, "generation": generation})
password, issue = _read_password(
logger, namespace, username, secret_name, secret_key
)
if issue is not None:
issues.append(issue)
continue
if password is None:
issues.append(
{"category": ISSUE_EMPTY_VALUE, "secret_name": secret_name}
)
continue
users.append({"username": username, "password": password})
user_roles[username] = sorted(roles)
generation = body.get("metadata", {}).get("generation")
current_status = status or body.get("status") or {}
if issues:
reason = _select_issue_reason(issues)
message = _format_issue_message(issues)
kopf.warn(body, reason=reason, message=message)
_update_status(patch, current_status, "Degraded", reason, message, generation)
return
if validation_errors:
message = "One or more ArtemisUser resources are invalid; see logs."
_update_status(
patch, current_status, "Degraded", "InvalidSpec", message, generation
)
return
login_config = render_login_config()
users_properties = render_users_properties(users)
roles_properties = render_roles_properties(user_roles)
annotations = build_audit_annotations(sources)
secret_body = kubernetes.client.V1Secret(
metadata=kubernetes.client.V1ObjectMeta(
name=SECRET_NAME,
namespace=namespace,
labels={MANAGED_BY_LABEL_KEY: MANAGED_BY_LABEL_VALUE},
annotations=annotations,
),
type="Opaque",
string_data={
LOGIN_CONFIG_KEY: login_config,
USERS_PROPERTIES_KEY: users_properties,
ROLES_PROPERTIES_KEY: roles_properties,
},
)
try:
existing = core_api.read_namespaced_secret(SECRET_NAME, namespace)
secret_body.metadata.resource_version = existing.metadata.resource_version
core_api.replace_namespaced_secret(SECRET_NAME, namespace, secret_body)
logger.info("Updated JAAS Secret %s in %s", SECRET_NAME, namespace)
except kubernetes.client.exceptions.ApiException as exc:
if exc.status == 404:
try:
core_api.create_namespaced_secret(namespace, secret_body)
logger.info("Created JAAS Secret %s in %s", SECRET_NAME, namespace)
except kubernetes.client.exceptions.ApiException as create_exc:
logger.error("Failed to create JAAS Secret %s: %s", SECRET_NAME, create_exc)
_update_status(
patch,
current_status,
"Degraded",
"RenderFailed",
"Failed to upsert JAAS Secret.",
generation,
)
raise
else:
logger.error("Failed to upsert JAAS Secret %s: %s", SECRET_NAME, exc)
_update_status(
patch,
current_status,
"Degraded",
"RenderFailed",
"Failed to upsert JAAS Secret.",
generation,
)
raise
_update_status(
patch,
current_status,
"Ready",
"RenderSucceeded",
"Rendered JAAS Secret.",
generation,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment