Created
December 8, 2025 20:58
-
-
Save pozylon/c1df29efb044e8d06e3fdda0b326c289 to your computer and use it in GitHub Desktop.
Remove Obsolete TXT entries created by Traefik from Exoscale Managed DNS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Script to remove all _acme-challenge TXT DNS entries from Exoscale DNS. | |
| Usage: | |
| export EXOSCALE_API_KEY=<your_api_key> | |
| export EXOSCALE_API_SECRET=<your_api_secret> | |
| python remove_acme_challenges.py [--dry-run] [--domain <domain>] | |
| Options: | |
| --dry-run Show what would be deleted without actually deleting | |
| --domain Only process a specific domain (optional) | |
| """ | |
| import argparse | |
| import base64 | |
| import hashlib | |
| import hmac | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from urllib.parse import urljoin | |
| import requests | |
| # Exoscale API v2 base URL | |
| API_BASE_URL = "https://api-ch-gva-2.exoscale.com/v2" | |
| def get_credentials(): | |
| """Get Exoscale API credentials from environment variables.""" | |
| api_key = os.environ.get("EXOSCALE_API_KEY") | |
| api_secret = os.environ.get("EXOSCALE_API_SECRET") | |
| if not api_key or not api_secret: | |
| print("Error: EXOSCALE_API_KEY and EXOSCALE_API_SECRET environment variables must be set") | |
| sys.exit(1) | |
| return api_key, api_secret | |
| def sign_request(api_key, api_secret, method, path, body="", query_params=None, expiration=None): | |
| """ | |
| Sign an Exoscale API v2 request using HMAC-SHA256. | |
| The message to sign consists of 5 parts joined by newlines: | |
| 1. Request line: "METHOD /path" | |
| 2. Request body (or empty) | |
| 3. Query parameter values concatenated (or empty) | |
| 4. Signed headers (currently empty) | |
| 5. Expiration timestamp | |
| """ | |
| if expiration is None: | |
| expiration = int(time.time()) + 600 # 10 minutes from now | |
| # Build the message to sign | |
| request_line = f"{method} {path}" | |
| # Query params (sorted by key) | |
| query_string = "" | |
| signed_query_args = "" | |
| if query_params: | |
| sorted_params = sorted(query_params.items()) | |
| query_string = "".join(v for k, v in sorted_params) | |
| signed_query_args = ";".join(k for k, v in sorted_params) | |
| # Message parts | |
| message_parts = [ | |
| request_line, | |
| body if body else "", | |
| query_string, | |
| "", # signed headers (empty for now) | |
| str(expiration), | |
| ] | |
| message = "\n".join(message_parts) | |
| # Compute HMAC-SHA256 signature | |
| signature = hmac.new( | |
| api_secret.encode("utf-8"), | |
| message.encode("utf-8"), | |
| hashlib.sha256 | |
| ).digest() | |
| signature_b64 = base64.b64encode(signature).decode("utf-8") | |
| # Build authorization header | |
| auth_parts = [f"credential={api_key}"] | |
| if signed_query_args: | |
| auth_parts.append(f"signed-query-args={signed_query_args}") | |
| auth_parts.append(f"expires={expiration}") | |
| auth_parts.append(f"signature={signature_b64}") | |
| auth_header = "EXO2-HMAC-SHA256 " + ",".join(auth_parts) | |
| return auth_header | |
| def make_request(api_key, api_secret, method, endpoint, body=None, query_params=None): | |
| """Make a signed request to the Exoscale API v2.""" | |
| path = f"/v2{endpoint}" | |
| url = f"{API_BASE_URL.rstrip('/v2')}{path}" | |
| body_str = json.dumps(body) if body else "" | |
| auth_header = sign_request( | |
| api_key, api_secret, method, path, body_str, query_params | |
| ) | |
| headers = { | |
| "Authorization": auth_header, | |
| "Content-Type": "application/json", | |
| } | |
| if query_params: | |
| url += "?" + "&".join(f"{k}={v}" for k, v in sorted(query_params.items())) | |
| response = requests.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| data=body_str if body_str else None, | |
| ) | |
| if response.status_code >= 400: | |
| print(f"Error: API request failed with status {response.status_code}") | |
| print(f"Response: {response.text}") | |
| return None | |
| if response.text: | |
| return response.json() | |
| return {} | |
| def list_domains(api_key, api_secret): | |
| """List all DNS domains.""" | |
| result = make_request(api_key, api_secret, "GET", "/dns-domain") | |
| if result and "dns-domains" in result: | |
| return result["dns-domains"] | |
| return [] | |
| def list_records(api_key, api_secret, domain_id): | |
| """List all DNS records for a domain.""" | |
| result = make_request(api_key, api_secret, "GET", f"/dns-domain/{domain_id}/record") | |
| if result and "dns-domain-records" in result: | |
| return result["dns-domain-records"] | |
| return [] | |
| def delete_record(api_key, api_secret, domain_id, record_id): | |
| """Delete a DNS record.""" | |
| result = make_request(api_key, api_secret, "DELETE", f"/dns-domain/{domain_id}/record/{record_id}") | |
| return result is not None | |
| def find_acme_challenge_records(api_key, api_secret, domain_filter=None): | |
| """Find all _acme-challenge TXT records across all domains.""" | |
| domains = list_domains(api_key, api_secret) | |
| if not domains: | |
| print("No DNS domains found.") | |
| return [] | |
| acme_records = [] | |
| for domain in domains: | |
| domain_name = domain.get("unicode-name", domain.get("id")) | |
| domain_id = domain.get("id") | |
| if domain_filter and domain_name != domain_filter: | |
| continue | |
| print(f"Checking domain: {domain_name}") | |
| records = list_records(api_key, api_secret, domain_id) | |
| for record in records: | |
| record_name = record.get("name", "") | |
| record_type = record.get("type", "") | |
| # Check if this is an _acme-challenge TXT record | |
| if record_type == "TXT" and "_acme-challenge" in record_name: | |
| acme_records.append({ | |
| "domain_id": domain_id, | |
| "domain_name": domain_name, | |
| "record_id": record.get("id"), | |
| "record_name": record_name, | |
| "record_content": record.get("content", ""), | |
| }) | |
| return acme_records | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Remove all _acme-challenge TXT DNS entries from Exoscale DNS" | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be deleted without actually deleting", | |
| ) | |
| parser.add_argument( | |
| "--domain", | |
| type=str, | |
| default=None, | |
| help="Only process a specific domain", | |
| ) | |
| args = parser.parse_args() | |
| api_key, api_secret = get_credentials() | |
| print("Searching for _acme-challenge TXT records...") | |
| print() | |
| acme_records = find_acme_challenge_records(api_key, api_secret, args.domain) | |
| if not acme_records: | |
| print("\nNo _acme-challenge TXT records found.") | |
| return | |
| print(f"\nFound {len(acme_records)} _acme-challenge TXT record(s):\n") | |
| for record in acme_records: | |
| print(f" Domain: {record['domain_name']}") | |
| print(f" Record: {record['record_name']}") | |
| print(f" Content: {record['record_content'][:50]}..." if len(record['record_content']) > 50 else f" Content: {record['record_content']}") | |
| print() | |
| if args.dry_run: | |
| print("Dry run mode - no records will be deleted.") | |
| return | |
| # Confirm deletion | |
| confirm = input(f"Delete {len(acme_records)} record(s)? [y/N]: ") | |
| if confirm.lower() != "y": | |
| print("Aborted.") | |
| return | |
| # Delete records | |
| deleted_count = 0 | |
| failed_count = 0 | |
| for record in acme_records: | |
| print(f"Deleting {record['record_name']} from {record['domain_name']}...", end=" ") | |
| if delete_record(api_key, api_secret, record["domain_id"], record["record_id"]): | |
| print("OK") | |
| deleted_count += 1 | |
| else: | |
| print("FAILED") | |
| failed_count += 1 | |
| print() | |
| print(f"Deleted: {deleted_count}, Failed: {failed_count}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment