Skip to content

Instantly share code, notes, and snippets.

@pozylon
Created December 8, 2025 20:58
Show Gist options
  • Select an option

  • Save pozylon/c1df29efb044e8d06e3fdda0b326c289 to your computer and use it in GitHub Desktop.

Select an option

Save pozylon/c1df29efb044e8d06e3fdda0b326c289 to your computer and use it in GitHub Desktop.
Remove Obsolete TXT entries created by Traefik from Exoscale Managed DNS
#!/usr/bin/env python3
"""
Script to remove all _acme-challenge TXT DNS entries from Exoscale DNS.
Usage:
export EXOSCALE_API_KEY=<your_api_key>
export EXOSCALE_API_SECRET=<your_api_secret>
python remove_acme_challenges.py [--dry-run] [--domain <domain>]
Options:
--dry-run Show what would be deleted without actually deleting
--domain Only process a specific domain (optional)
"""
import argparse
import base64
import hashlib
import hmac
import json
import os
import sys
import time
from urllib.parse import urljoin
import requests
# Exoscale API v2 base URL
API_BASE_URL = "https://api-ch-gva-2.exoscale.com/v2"
def get_credentials():
"""Get Exoscale API credentials from environment variables."""
api_key = os.environ.get("EXOSCALE_API_KEY")
api_secret = os.environ.get("EXOSCALE_API_SECRET")
if not api_key or not api_secret:
print("Error: EXOSCALE_API_KEY and EXOSCALE_API_SECRET environment variables must be set")
sys.exit(1)
return api_key, api_secret
def sign_request(api_key, api_secret, method, path, body="", query_params=None, expiration=None):
"""
Sign an Exoscale API v2 request using HMAC-SHA256.
The message to sign consists of 5 parts joined by newlines:
1. Request line: "METHOD /path"
2. Request body (or empty)
3. Query parameter values concatenated (or empty)
4. Signed headers (currently empty)
5. Expiration timestamp
"""
if expiration is None:
expiration = int(time.time()) + 600 # 10 minutes from now
# Build the message to sign
request_line = f"{method} {path}"
# Query params (sorted by key)
query_string = ""
signed_query_args = ""
if query_params:
sorted_params = sorted(query_params.items())
query_string = "".join(v for k, v in sorted_params)
signed_query_args = ";".join(k for k, v in sorted_params)
# Message parts
message_parts = [
request_line,
body if body else "",
query_string,
"", # signed headers (empty for now)
str(expiration),
]
message = "\n".join(message_parts)
# Compute HMAC-SHA256 signature
signature = hmac.new(
api_secret.encode("utf-8"),
message.encode("utf-8"),
hashlib.sha256
).digest()
signature_b64 = base64.b64encode(signature).decode("utf-8")
# Build authorization header
auth_parts = [f"credential={api_key}"]
if signed_query_args:
auth_parts.append(f"signed-query-args={signed_query_args}")
auth_parts.append(f"expires={expiration}")
auth_parts.append(f"signature={signature_b64}")
auth_header = "EXO2-HMAC-SHA256 " + ",".join(auth_parts)
return auth_header
def make_request(api_key, api_secret, method, endpoint, body=None, query_params=None):
"""Make a signed request to the Exoscale API v2."""
path = f"/v2{endpoint}"
url = f"{API_BASE_URL.rstrip('/v2')}{path}"
body_str = json.dumps(body) if body else ""
auth_header = sign_request(
api_key, api_secret, method, path, body_str, query_params
)
headers = {
"Authorization": auth_header,
"Content-Type": "application/json",
}
if query_params:
url += "?" + "&".join(f"{k}={v}" for k, v in sorted(query_params.items()))
response = requests.request(
method=method,
url=url,
headers=headers,
data=body_str if body_str else None,
)
if response.status_code >= 400:
print(f"Error: API request failed with status {response.status_code}")
print(f"Response: {response.text}")
return None
if response.text:
return response.json()
return {}
def list_domains(api_key, api_secret):
"""List all DNS domains."""
result = make_request(api_key, api_secret, "GET", "/dns-domain")
if result and "dns-domains" in result:
return result["dns-domains"]
return []
def list_records(api_key, api_secret, domain_id):
"""List all DNS records for a domain."""
result = make_request(api_key, api_secret, "GET", f"/dns-domain/{domain_id}/record")
if result and "dns-domain-records" in result:
return result["dns-domain-records"]
return []
def delete_record(api_key, api_secret, domain_id, record_id):
"""Delete a DNS record."""
result = make_request(api_key, api_secret, "DELETE", f"/dns-domain/{domain_id}/record/{record_id}")
return result is not None
def find_acme_challenge_records(api_key, api_secret, domain_filter=None):
"""Find all _acme-challenge TXT records across all domains."""
domains = list_domains(api_key, api_secret)
if not domains:
print("No DNS domains found.")
return []
acme_records = []
for domain in domains:
domain_name = domain.get("unicode-name", domain.get("id"))
domain_id = domain.get("id")
if domain_filter and domain_name != domain_filter:
continue
print(f"Checking domain: {domain_name}")
records = list_records(api_key, api_secret, domain_id)
for record in records:
record_name = record.get("name", "")
record_type = record.get("type", "")
# Check if this is an _acme-challenge TXT record
if record_type == "TXT" and "_acme-challenge" in record_name:
acme_records.append({
"domain_id": domain_id,
"domain_name": domain_name,
"record_id": record.get("id"),
"record_name": record_name,
"record_content": record.get("content", ""),
})
return acme_records
def main():
parser = argparse.ArgumentParser(
description="Remove all _acme-challenge TXT DNS entries from Exoscale DNS"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be deleted without actually deleting",
)
parser.add_argument(
"--domain",
type=str,
default=None,
help="Only process a specific domain",
)
args = parser.parse_args()
api_key, api_secret = get_credentials()
print("Searching for _acme-challenge TXT records...")
print()
acme_records = find_acme_challenge_records(api_key, api_secret, args.domain)
if not acme_records:
print("\nNo _acme-challenge TXT records found.")
return
print(f"\nFound {len(acme_records)} _acme-challenge TXT record(s):\n")
for record in acme_records:
print(f" Domain: {record['domain_name']}")
print(f" Record: {record['record_name']}")
print(f" Content: {record['record_content'][:50]}..." if len(record['record_content']) > 50 else f" Content: {record['record_content']}")
print()
if args.dry_run:
print("Dry run mode - no records will be deleted.")
return
# Confirm deletion
confirm = input(f"Delete {len(acme_records)} record(s)? [y/N]: ")
if confirm.lower() != "y":
print("Aborted.")
return
# Delete records
deleted_count = 0
failed_count = 0
for record in acme_records:
print(f"Deleting {record['record_name']} from {record['domain_name']}...", end=" ")
if delete_record(api_key, api_secret, record["domain_id"], record["record_id"]):
print("OK")
deleted_count += 1
else:
print("FAILED")
failed_count += 1
print()
print(f"Deleted: {deleted_count}, Failed: {failed_count}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment