Skip to content

Instantly share code, notes, and snippets.

@EricsonWillians
Last active October 14, 2025 12:36
Show Gist options
  • Select an option

  • Save EricsonWillians/77a6e7568c8a72aff22ab44b23ec56fb to your computer and use it in GitHub Desktop.

Select an option

Save EricsonWillians/77a6e7568c8a72aff22ab44b23ec56fb to your computer and use it in GitHub Desktop.
This script provides comprehensive domain availability checking across multiple TLDs using robust DNS queries, enhanced WHOIS lookups, and advanced features like bulk checking, export capabilities, and detailed logging.
#!/usr/bin/env python3
"""
Professional Domain Availability Checker
This script provides comprehensive domain availability checking across multiple TLDs using
robust DNS queries, enhanced WHOIS lookups, and advanced features like bulk checking,
export capabilities, and detailed logging.
Features:
- Concurrent domain checking for optimal performance
- Comprehensive DNS record analysis with retry logic
- Enhanced WHOIS verification with intelligent parsing
- Multiple output formats (table, JSON, CSV)
- Detailed logging and error handling
- Bulk domain checking from file input
- Performance metrics and statistics
Author: Professional Domain Checker
Version: 4.0.0
License: MIT
Python: 3.8+
Dependencies:
- dnspython>=2.3.0
- typer>=0.9.0
- rich>=13.0.0
- python-whois>=0.8.0
- requests>=2.28.0 (fallback WHOIS)
- validators>=0.20.0
- python-dateutil>=2.8.0
Installation:
pip install dnspython typer rich python-whois requests validators python-dateutil
"""
import asyncio
import concurrent.futures
import csv
import json
import logging
import re
import socket
import ssl
import sys
import time
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Union, Any, NamedTuple
from dataclasses import dataclass, asdict, field
from enum import Enum
import threading
import typer
from rich.console import Console
from rich.table import Table
from rich.progress import (
Progress, BarColumn, TextColumn, TimeRemainingColumn,
SpinnerColumn, MofNCompleteColumn
)
from rich.logging import RichHandler
from rich.panel import Panel
from rich.text import Text
from rich import box
# Import with better error handling
try:
import dns.resolver
import dns.exception
DNS_AVAILABLE = True
except ImportError as e:
DNS_AVAILABLE = False
DNS_IMPORT_ERROR = str(e)
try:
import whois as python_whois
PYTHON_WHOIS_AVAILABLE = True
except ImportError:
PYTHON_WHOIS_AVAILABLE = False
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
try:
import validators
VALIDATORS_AVAILABLE = True
except ImportError:
VALIDATORS_AVAILABLE = False
try:
from dateutil import parser as date_parser
DATEUTIL_AVAILABLE = True
except ImportError:
DATEUTIL_AVAILABLE = False
# Application constants
APP_NAME = "Professional Domain Checker"
APP_VERSION = "4.0.0"
DEFAULT_TIMEOUT = 8
MAX_TIMEOUT = 30
DEFAULT_WORKERS = 15
MAX_WORKERS = 50
DEFAULT_TLDS = [
"com", "net", "org", "io", "co", "biz", "info", "us", "tech", "ai",
"dev", "app", "xyz", "online", "store", "site", "me", "tv", "cc"
]
# Enhanced WHOIS configuration
WHOIS_RATE_LIMIT = 1.0
WHOIS_MAX_RETRIES = 3
WHOIS_RETRY_DELAY = 2.0
WHOIS_TIMEOUT = 10
# WHOIS servers mapping for direct queries
WHOIS_SERVERS = {
'com': 'whois.verisign-grs.com',
'net': 'whois.verisign-grs.com',
'org': 'whois.pir.org',
'info': 'whois.afilias.net',
'biz': 'whois.biz',
'us': 'whois.nic.us',
'io': 'whois.nic.io',
'ai': 'whois.nic.ai',
'co': 'whois.nic.co',
'me': 'whois.nic.me',
'tv': 'whois.nic.tv',
'cc': 'whois.nic.cc',
'uk': 'whois.nic.uk',
'de': 'whois.denic.de',
'fr': 'whois.afnic.fr',
'it': 'whois.nic.it',
'ca': 'whois.cira.ca',
'au': 'whois.auda.org.au',
'jp': 'whois.jprs.jp',
'kr': 'whois.kr',
'cn': 'whois.cnnic.cn',
'in': 'whois.registry.in',
'br': 'whois.registro.br',
'mx': 'whois.mx',
'pl': 'whois.dns.pl',
'ru': 'whois.tcinet.ru',
'nl': 'whois.domain-registry.nl',
'be': 'whois.dns.be',
'ch': 'whois.nic.ch',
'se': 'whois.iis.se',
'no': 'whois.norid.no',
'dk': 'whois.dk-hostmaster.dk',
'fi': 'whois.fi',
'es': 'whois.nic.es',
'pt': 'whois.dns.pt'
}
# TLD-specific rate limits
TLD_RATE_LIMITS = {
'com': 1.0, 'net': 1.0, 'org': 0.8, 'io': 1.5, 'ai': 2.0,
'uk': 1.2, 'de': 1.5, 'cn': 2.5, 'jp': 2.0, 'kr': 1.8,
'in': 1.5, 'br': 1.3, 'au': 1.4, 'ca': 1.1, 'mx': 1.6
}
class DomainStatus(Enum):
"""Enumeration for domain status."""
AVAILABLE = "Available"
REGISTERED = "Registered"
RESERVED = "Reserved"
PREMIUM = "Premium"
ERROR = "Error"
UNKNOWN = "Unknown"
@dataclass
class WhoisInfo:
"""Enhanced WHOIS information structure."""
status: DomainStatus
registrar: Optional[str] = None
creation_date: Optional[datetime] = None
expiration_date: Optional[datetime] = None
updated_date: Optional[datetime] = None
name_servers: List[str] = field(default_factory=list)
registrant_name: Optional[str] = None
registrant_org: Optional[str] = None
registrant_country: Optional[str] = None
admin_email: Optional[str] = None
tech_email: Optional[str] = None
dnssec: Optional[str] = None
whois_server: Optional[str] = None
raw_response: Optional[str] = None
response_source: str = "unknown" # python-whois, direct, api
def is_available(self) -> bool:
"""Check if domain is available."""
return self.status == DomainStatus.AVAILABLE
def days_until_expiration(self) -> Optional[int]:
"""Calculate days until expiration."""
if self.expiration_date:
now = datetime.now(timezone.utc)
exp_date = self.expiration_date
if exp_date.tzinfo is None:
exp_date = exp_date.replace(tzinfo=timezone.utc)
delta = exp_date - now
return max(0, delta.days)
return None
def get_summary(self) -> str:
"""Get a summary string for the WHOIS info."""
if self.is_available():
return "Available"
parts = [f"Registered"]
if self.registrar:
parts.append(f"via {self.registrar}")
if self.expiration_date:
days_left = self.days_until_expiration()
if days_left is not None:
if days_left < 30:
parts.append(f"⚠️ Expires in {days_left} days!")
else:
parts.append(f"Expires: {self.expiration_date.strftime('%Y-%m-%d')}")
return " | ".join(parts)
@dataclass
class DomainResult:
"""Enhanced domain check result."""
domain: str
status: DomainStatus
dns_records: Dict[str, List[str]] = field(default_factory=dict)
whois_info: Optional[WhoisInfo] = None
response_time_ms: float = 0.0
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
error_message: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
result = {
'domain': self.domain,
'status': self.status.value,
'dns_records': self.dns_records,
'response_time_ms': self.response_time_ms,
'timestamp': self.timestamp.isoformat(),
'error_message': self.error_message
}
if self.whois_info:
result['whois_info'] = {
'status': self.whois_info.status.value,
'registrar': self.whois_info.registrar,
'creation_date': self.whois_info.creation_date.isoformat() if self.whois_info.creation_date else None,
'expiration_date': self.whois_info.expiration_date.isoformat() if self.whois_info.expiration_date else None,
'updated_date': self.whois_info.updated_date.isoformat() if self.whois_info.updated_date else None,
'name_servers': self.whois_info.name_servers,
'registrant_name': self.whois_info.registrant_name,
'registrant_org': self.whois_info.registrant_org,
'registrant_country': self.whois_info.registrant_country,
'admin_email': self.whois_info.admin_email,
'tech_email': self.whois_info.tech_email,
'dnssec': self.whois_info.dnssec,
'whois_server': self.whois_info.whois_server,
'response_source': self.whois_info.response_source,
'days_until_expiration': self.whois_info.days_until_expiration()
}
return result
class DomainValidator:
"""Enhanced domain validation."""
@staticmethod
def is_valid_domain(domain: str) -> bool:
"""Validate domain format."""
if not domain or len(domain) > 253:
return False
if VALIDATORS_AVAILABLE:
return validators.domain(domain) is True
# Fallback validation
domain = domain.rstrip('.')
if not domain:
return False
parts = domain.split('.')
if len(parts) < 2:
return False
for part in parts:
if not part or len(part) > 63:
return False
if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?$', part):
return False
return True
@staticmethod
def normalize_domain(domain: str) -> str:
"""Normalize domain name."""
if not domain:
return ""
# Remove protocol
if '://' in domain:
domain = urllib.parse.urlparse(f"http://{domain}").netloc or urllib.parse.urlparse(domain).netloc
# Clean up
domain = domain.lower().strip().rstrip('.')
domain = domain.split('/')[0] # Remove path
return domain
@staticmethod
def extract_tld(domain: str) -> str:
"""Extract TLD from domain."""
parts = domain.split('.')
return parts[-1] if len(parts) > 1 else ""
class EnhancedWhoisChecker:
"""Enhanced WHOIS checker with multiple methods and intelligent parsing."""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.rate_limiter = {}
self.cache = {}
self.lock = threading.Lock()
def _apply_rate_limit(self, tld: str) -> None:
"""Apply rate limiting per TLD."""
with self.lock:
rate_limit = TLD_RATE_LIMITS.get(tld, WHOIS_RATE_LIMIT)
current_time = time.time()
last_check = self.rate_limiter.get(tld, 0)
if current_time - last_check < rate_limit:
sleep_time = rate_limit - (current_time - last_check)
self.logger.debug(f"Rate limiting .{tld}: sleeping {sleep_time:.2f}s")
time.sleep(sleep_time)
self.rate_limiter[tld] = time.time()
def _parse_date(self, date_str: str) -> Optional[datetime]:
"""Parse date string into datetime object."""
if not date_str or date_str.lower() in ['none', 'null', 'n/a', '']:
return None
# Clean up the date string
date_str = str(date_str).strip()
if DATEUTIL_AVAILABLE:
try:
return date_parser.parse(date_str)
except (ValueError, TypeError):
pass
# Fallback date parsing
date_patterns = [
'%Y-%m-%d',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%d-%b-%Y',
'%d.%m.%Y',
'%m/%d/%Y',
'%Y/%m/%d'
]
for pattern in date_patterns:
try:
return datetime.strptime(date_str, pattern)
except ValueError:
continue
self.logger.debug(f"Could not parse date: {date_str}")
return None
def _extract_emails(self, text: str) -> List[str]:
"""Extract email addresses from text."""
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
return re.findall(email_pattern, text, re.IGNORECASE)
def _parse_python_whois_response(self, whois_data: Any, domain: str) -> WhoisInfo:
"""Parse python-whois response."""
self.logger.debug(f"Parsing python-whois response for {domain}")
# Extract domain name(s)
domain_names = getattr(whois_data, 'domain_name', None)
if domain_names:
if isinstance(domain_names, list):
domain_names = [d.lower() if d else '' for d in domain_names]
else:
domain_names = [str(domain_names).lower()]
# Check if domain is registered
is_registered = bool(domain_names and any(
domain.lower() in dn for dn in domain_names if dn
))
if not is_registered:
return WhoisInfo(
status=DomainStatus.AVAILABLE,
response_source="python-whois",
raw_response=str(whois_data)[:1000]
)
# Extract registrar
registrar = getattr(whois_data, 'registrar', None)
if isinstance(registrar, list):
registrar = registrar[0] if registrar else None
# Extract dates
creation_date = None
expiration_date = None
updated_date = None
for date_field, attr_name in [
('creation_date', 'creation_date'),
('expiration_date', 'expiration_date'),
('updated_date', 'updated_date')
]:
date_value = getattr(whois_data, attr_name, None)
if date_value:
if isinstance(date_value, list):
date_value = date_value[0] if date_value else None
if isinstance(date_value, datetime):
locals()[date_field] = date_value
elif isinstance(date_value, str):
locals()[date_field] = self._parse_date(date_value)
# Extract name servers
name_servers = getattr(whois_data, 'name_servers', [])
if name_servers and not isinstance(name_servers, list):
name_servers = [name_servers]
name_servers = [ns.lower().strip() for ns in name_servers if ns]
# Extract status
status_info = getattr(whois_data, 'status', None)
if isinstance(status_info, list):
status_info = ' '.join(str(s) for s in status_info if s)
# Extract other fields
registrant_name = getattr(whois_data, 'registrant_name', None)
registrant_org = getattr(whois_data, 'org', None)
registrant_country = getattr(whois_data, 'country', None)
# Extract emails from raw data
raw_text = str(whois_data)
emails = self._extract_emails(raw_text)
admin_email = None
tech_email = None
if emails:
for email in emails:
if 'admin' in email.lower():
admin_email = email
elif 'tech' in email.lower():
tech_email = email
if not admin_email and emails:
admin_email = emails[0]
# DNSSEC
dnssec = getattr(whois_data, 'dnssec', None)
if isinstance(dnssec, list):
dnssec = ', '.join(str(d) for d in dnssec if d)
return WhoisInfo(
status=DomainStatus.REGISTERED,
registrar=str(registrar) if registrar else None,
creation_date=creation_date,
expiration_date=expiration_date,
updated_date=updated_date,
name_servers=name_servers,
registrant_name=str(registrant_name) if registrant_name else None,
registrant_org=str(registrant_org) if registrant_org else None,
registrant_country=str(registrant_country) if registrant_country else None,
admin_email=admin_email,
tech_email=tech_email,
dnssec=str(dnssec) if dnssec else None,
response_source="python-whois",
raw_response=raw_text[:1000]
)
def _direct_whois_query(self, domain: str) -> Optional[str]:
"""Perform direct WHOIS query via socket."""
tld = DomainValidator.extract_tld(domain)
whois_server = WHOIS_SERVERS.get(tld)
if not whois_server:
self.logger.debug(f"No WHOIS server known for .{tld}")
return None
try:
self.logger.debug(f"Direct WHOIS query to {whois_server} for {domain}")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(WHOIS_TIMEOUT)
sock.connect((whois_server, 43))
query = f"{domain}\r\n"
sock.send(query.encode())
response = b""
while True:
data = sock.recv(4096)
if not data:
break
response += data
return response.decode('utf-8', errors='ignore')
except Exception as e:
self.logger.debug(f"Direct WHOIS query failed for {domain}: {e}")
return None
def _parse_direct_whois_response(self, response: str, domain: str) -> WhoisInfo:
"""Parse direct WHOIS response."""
if not response:
return WhoisInfo(status=DomainStatus.ERROR, response_source="direct")
response_lower = response.lower()
# Check for availability indicators
availability_patterns = [
r'no match', r'not found', r'no entries found',
r'available', r'no data found', r'no object found',
r'not registered', r'no information available',
r'domain status:\s*available', r'status:\s*free'
]
for pattern in availability_patterns:
if re.search(pattern, response_lower):
return WhoisInfo(
status=DomainStatus.AVAILABLE,
response_source="direct",
raw_response=response[:1000]
)
# If we get substantial response, assume it's registered
if len(response.strip()) < 50:
return WhoisInfo(
status=DomainStatus.UNKNOWN,
response_source="direct",
raw_response=response
)
# Parse registered domain details
registrar = None
creation_date = None
expiration_date = None
updated_date = None
name_servers = []
# Extract registrar
registrar_patterns = [
r'registrar:\s*(.+)',
r'registrar name:\s*(.+)',
r'sponsoring registrar:\s*(.+)'
]
for pattern in registrar_patterns:
match = re.search(pattern, response, re.IGNORECASE)
if match:
registrar = match.group(1).strip()
break
# Extract dates
date_patterns = {
'creation_date': [
r'creation date:\s*(.+)',
r'created:\s*(.+)',
r'registered:\s*(.+)',
r'domain_dateregistered:\s*(.+)'
],
'expiration_date': [
r'expir(?:y|ation) date:\s*(.+)',
r'expires:\s*(.+)',
r'expiry:\s*(.+)',
r'domain_datebilleduntil:\s*(.+)'
],
'updated_date': [
r'updated date:\s*(.+)',
r'last updated:\s*(.+)',
r'modified:\s*(.+)',
r'domain_datelastmodified:\s*(.+)'
]
}
for date_field, patterns in date_patterns.items():
for pattern in patterns:
match = re.search(pattern, response, re.IGNORECASE)
if match:
date_str = match.group(1).strip()
parsed_date = self._parse_date(date_str)
if parsed_date:
locals()[date_field] = parsed_date
break
if locals().get(date_field):
break
# Extract name servers
ns_patterns = [
r'name server:\s*(.+)',
r'nameserver:\s*(.+)',
r'nserver:\s*(.+)',
r'dns:\s*(.+)'
]
for pattern in ns_patterns:
matches = re.findall(pattern, response, re.IGNORECASE)
for match in matches:
ns = match.strip().lower()
if ns and ns not in name_servers:
name_servers.append(ns)
# Extract emails
emails = self._extract_emails(response)
admin_email = None
tech_email = None
for email in emails:
if 'admin' in response[max(0, response.find(email)-50):response.find(email)+50].lower():
admin_email = email
elif 'tech' in response[max(0, response.find(email)-50):response.find(email)+50].lower():
tech_email = email
return WhoisInfo(
status=DomainStatus.REGISTERED,
registrar=registrar,
creation_date=creation_date,
expiration_date=expiration_date,
updated_date=updated_date,
name_servers=name_servers,
admin_email=admin_email,
tech_email=tech_email,
response_source="direct",
raw_response=response[:1000]
)
def check_whois(self, domain: str) -> WhoisInfo:
"""Check domain WHOIS with multiple methods."""
# Check cache first
if domain in self.cache:
self.logger.debug(f"Cache hit for {domain}")
return self.cache[domain]
tld = DomainValidator.extract_tld(domain)
self._apply_rate_limit(tld)
whois_info = None
last_error = None
# Method 1: Try python-whois
if PYTHON_WHOIS_AVAILABLE:
try:
self.logger.debug(f"Trying python-whois for {domain}")
whois_data = python_whois.whois(domain)
whois_info = self._parse_python_whois_response(whois_data, domain)
# Validate the result
if whois_info.status != DomainStatus.UNKNOWN:
self.cache[domain] = whois_info
return whois_info
except Exception as e:
last_error = e
error_msg = str(e).lower()
# Check if error indicates availability
if any(indicator in error_msg for indicator in [
'no match', 'not found', 'no entries',
'available', 'no data', 'no object'
]):
whois_info = WhoisInfo(
status=DomainStatus.AVAILABLE,
response_source="python-whois-error",
raw_response=error_msg
)
self.cache[domain] = whois_info
return whois_info
self.logger.debug(f"python-whois failed for {domain}: {e}")
# Method 2: Try direct WHOIS query
try:
self.logger.debug(f"Trying direct WHOIS for {domain}")
response = self._direct_whois_query(domain)
if response:
whois_info = self._parse_direct_whois_response(response, domain)
if whois_info.status != DomainStatus.UNKNOWN:
self.cache[domain] = whois_info
return whois_info
except Exception as e:
last_error = e
self.logger.debug(f"Direct WHOIS failed for {domain}: {e}")
# If all methods failed, return error status
error_msg = str(last_error) if last_error else "All WHOIS methods failed"
whois_info = WhoisInfo(
status=DomainStatus.ERROR,
response_source="error",
raw_response=error_msg[:1000]
)
self.cache[domain] = whois_info
return whois_info
class DNSChecker:
"""Enhanced DNS checker."""
def __init__(self, timeout: int = DEFAULT_TIMEOUT):
self.timeout = min(timeout, MAX_TIMEOUT)
self.resolver = self._create_resolver()
self.logger = logging.getLogger(__name__)
def _create_resolver(self) -> 'dns.resolver.Resolver':
"""Create DNS resolver."""
resolver = dns.resolver.Resolver()
resolver.timeout = self.timeout
resolver.lifetime = self.timeout
resolver.nameservers = ['8.8.8.8', '1.1.1.1', '208.67.222.222']
return resolver
def check_dns_records(self, domain: str) -> Dict[str, List[str]]:
"""Check DNS records for domain."""
results = {}
record_types = ["A", "AAAA", "MX", "NS", "CNAME", "TXT", "SOA"]
for record_type in record_types:
try:
answers = self.resolver.resolve(domain, record_type)
results[record_type] = [str(rdata) for rdata in answers]
except dns.resolver.NXDOMAIN:
# Domain doesn't exist
results[record_type] = []
break
except dns.resolver.NoAnswer:
results[record_type] = []
except dns.exception.Timeout:
results[record_type] = ["Timeout"]
except Exception as e:
results[record_type] = [f"Error: {str(e)[:50]}"]
return results
def is_domain_active(self, dns_records: Dict[str, List[str]]) -> bool:
"""Check if domain appears active based on DNS."""
essential_records = ["A", "AAAA", "MX", "NS"]
for record_type in essential_records:
records = dns_records.get(record_type, [])
if records and not any(
record.startswith(("Error:", "Timeout")) for record in records
):
return True
return False
class DomainChecker:
"""Main domain checker class."""
def __init__(self, timeout: int = DEFAULT_TIMEOUT, max_workers: int = DEFAULT_WORKERS):
self.timeout = timeout
self.max_workers = min(max_workers, MAX_WORKERS)
self.dns_checker = DNSChecker(timeout)
self.whois_checker = EnhancedWhoisChecker()
self.logger = logging.getLogger(__name__)
def check_domain(self, domain: str, check_whois: bool = False) -> DomainResult:
"""Check a single domain."""
start_time = time.time()
try:
# Normalize domain
domain = DomainValidator.normalize_domain(domain)
if not DomainValidator.is_valid_domain(domain):
return DomainResult(
domain=domain,
status=DomainStatus.ERROR,
error_message="Invalid domain format"
)
# Check DNS
dns_records = self.dns_checker.check_dns_records(domain)
is_dns_active = self.dns_checker.is_domain_active(dns_records)
# Determine initial status
if is_dns_active:
status = DomainStatus.REGISTERED
else:
status = DomainStatus.AVAILABLE
# Check WHOIS if requested
whois_info = None
if check_whois:
whois_info = self.whois_checker.check_whois(domain)
# WHOIS is more authoritative
if whois_info.status in [DomainStatus.AVAILABLE, DomainStatus.REGISTERED]:
status = whois_info.status
response_time = (time.time() - start_time) * 1000
return DomainResult(
domain=domain,
status=status,
dns_records=dns_records,
whois_info=whois_info,
response_time_ms=response_time
)
except Exception as e:
self.logger.error(f"Error checking {domain}: {e}")
return DomainResult(
domain=domain,
status=DomainStatus.ERROR,
error_message=str(e),
response_time_ms=(time.time() - start_time) * 1000
)
def check_multiple_domains(
self,
domains: List[str],
check_whois: bool = False,
progress_callback: Optional[callable] = None
) -> List[DomainResult]:
"""Check multiple domains concurrently."""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_domain = {
executor.submit(self.check_domain, domain, check_whois): domain
for domain in domains
}
for future in concurrent.futures.as_completed(future_to_domain):
try:
result = future.result()
results.append(result)
if progress_callback:
progress_callback(result)
except Exception as e:
domain = future_to_domain[future]
error_result = DomainResult(
domain=domain,
status=DomainStatus.ERROR,
error_message=str(e)
)
results.append(error_result)
if progress_callback:
progress_callback(error_result)
return results
class OutputFormatter:
"""Output formatting utilities."""
@staticmethod
def create_table(results: List[DomainResult], console: Console, verbose: bool = False) -> Table:
"""Create Rich table."""
table = Table(
title=f"Domain Availability Check - {len(results)} domains",
box=box.ROUNDED,
show_lines=True
)
table.add_column("Domain", style="cyan", min_width=20)
table.add_column("Status", style="bold", min_width=12)
table.add_column("DNS Records", style="green", min_width=15)
table.add_column("WHOIS Info", style="magenta", min_width=25)
table.add_column("Response Time", style="yellow", min_width=12)
if verbose:
table.add_column("Registrar", style="blue", min_width=15)
table.add_column("Expires", style="red", min_width=12)
for result in sorted(results, key=lambda x: x.domain):
# Status with colors
if result.status == DomainStatus.AVAILABLE:
status_display = "[green]✓ Available[/green]"
elif result.status == DomainStatus.REGISTERED:
status_display = "[red]✗ Registered[/red]"
else:
status_display = f"[yellow]⚠ {result.status.value}[/yellow]"
# DNS summary
dns_summary = []
for record_type, records in result.dns_records.items():
if records and not any(r.startswith(("Error:", "Timeout")) for r in records):
dns_summary.append(f"{record_type}({len(records)})")
dns_display = ", ".join(dns_summary) if dns_summary else "None"
# WHOIS summary
if result.whois_info:
whois_display = result.whois_info.get_summary()
else:
whois_display = "Not checked"
if len(whois_display) > 40:
whois_display = whois_display[:37] + "..."
# Response time
time_display = f"{result.response_time_ms:.1f}ms"
row_data = [
result.domain,
status_display,
dns_display,
whois_display,
time_display
]
if verbose and result.whois_info:
# Registrar
registrar = result.whois_info.registrar or "N/A"
if len(registrar) > 20:
registrar = registrar[:17] + "..."
# Expiration
if result.whois_info.expiration_date:
days_left = result.whois_info.days_until_expiration()
if days_left is not None:
if days_left < 30:
exp_display = f"[red]{days_left}d[/red]"
elif days_left < 90:
exp_display = f"[yellow]{days_left}d[/yellow]"
else:
exp_display = f"{days_left}d"
else:
exp_display = "N/A"
else:
exp_display = "N/A"
row_data.extend([registrar, exp_display])
elif verbose:
row_data.extend(["N/A", "N/A"])
table.add_row(*row_data)
return table
@staticmethod
def to_json(results: List[DomainResult]) -> str:
"""Convert results to JSON."""
data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"total_domains": len(results),
"available_count": sum(1 for r in results if r.status == DomainStatus.AVAILABLE),
"registered_count": sum(1 for r in results if r.status == DomainStatus.REGISTERED),
"error_count": sum(1 for r in results if r.status == DomainStatus.ERROR),
"results": [result.to_dict() for result in results]
}
return json.dumps(data, indent=2, default=str)
@staticmethod
def to_csv(results: List[DomainResult]) -> str:
"""Convert results to CSV."""
import io
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
"Domain", "Status", "DNS_Records", "WHOIS_Summary",
"Registrar", "Creation_Date", "Expiration_Date",
"Response_Time_ms", "Error_Message"
])
# Data
for result in results:
dns_summary = "; ".join([
f"{k}:{len(v)}" for k, v in result.dns_records.items() if v
])
whois_summary = ""
registrar = ""
creation_date = ""
expiration_date = ""
if result.whois_info:
whois_summary = result.whois_info.get_summary()
registrar = result.whois_info.registrar or ""
creation_date = result.whois_info.creation_date.isoformat() if result.whois_info.creation_date else ""
expiration_date = result.whois_info.expiration_date.isoformat() if result.whois_info.expiration_date else ""
writer.writerow([
result.domain,
result.status.value,
dns_summary,
whois_summary,
registrar,
creation_date,
expiration_date,
f"{result.response_time_ms:.1f}",
result.error_message or ""
])
return output.getvalue()
def setup_logging(level: str) -> None:
"""Setup logging configuration."""
log_level = getattr(logging, level.upper(), logging.INFO)
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
RichHandler(rich_tracebacks=True),
logging.FileHandler("domain_checker.log")
]
)
def load_domains_from_file(file_path: Path) -> List[str]:
"""Load domains from file."""
domains = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
if ',' in line:
domains.extend([d.strip() for d in line.split(',')])
else:
domains.append(line)
return [DomainValidator.normalize_domain(d) for d in domains if d]
except Exception as e:
raise typer.BadParameter(f"Error reading file {file_path}: {e}")
# Typer app setup
app = typer.Typer(
name=APP_NAME,
help="Professional domain availability checker with enhanced WHOIS support",
add_completion=False
)
console = Console()
def version_callback(value: bool):
"""Show version."""
if value:
console.print(f"[bold cyan]{APP_NAME}[/bold cyan] version [green]{APP_VERSION}[/green]")
raise typer.Exit()
@app.command()
def main(
name: Optional[str] = typer.Argument(None, help="Base name for domain checking"),
tlds: str = typer.Option(",".join(DEFAULT_TLDS), help="Comma-separated TLDs"),
domains: Optional[str] = typer.Option(None, help="Specific domains to check"),
bulk_file: Optional[Path] = typer.Option(None, help="File with domains to check"),
whois: bool = typer.Option(False, help="Enable WHOIS verification"),
verbose: bool = typer.Option(False, "-v", "--verbose", help="Verbose output"),
timeout: int = typer.Option(DEFAULT_TIMEOUT, help="Timeout in seconds"),
workers: int = typer.Option(DEFAULT_WORKERS, help="Number of workers"),
output_format: str = typer.Option("table", help="Output format: table, json, csv"),
export_file: Optional[Path] = typer.Option(None, help="Export to file"),
log_level: str = typer.Option("WARNING", help="Log level"),
version: Optional[bool] = typer.Option(None, "--version", callback=version_callback, help="Show version")
):
"""Professional domain availability checker."""
# Validate dependencies
if not DNS_AVAILABLE:
console.print(Panel(
f"[red]Error:[/red] dnspython is required.\n"
f"Install with: [cyan]pip install dnspython[/cyan]",
title="Missing Dependency",
border_style="red"
))
raise typer.Exit(1)
if whois and not PYTHON_WHOIS_AVAILABLE:
console.print(Panel(
"[yellow]Warning:[/yellow] python-whois not installed. WHOIS checks disabled.\n"
"Install with: [cyan]pip install python-whois[/cyan]",
title="Optional Dependency",
border_style="yellow"
))
whois = False
# Setup logging
setup_logging(log_level)
# Validate output format
if output_format not in ["table", "json", "csv"]:
console.print(f"[red]Error:[/red] Invalid output format: {output_format}")
raise typer.Exit(1)
# Determine domains to check
domains_to_check = []
if bulk_file:
if not bulk_file.exists():
console.print(f"[red]Error:[/red] File not found: {bulk_file}")
raise typer.Exit(1)
domains_to_check = load_domains_from_file(bulk_file)
elif domains:
domains_to_check = [
DomainValidator.normalize_domain(d.strip())
for d in domains.split(',') if d.strip()
]
elif name:
tld_list = [tld.strip().lower() for tld in tlds.split(',') if tld.strip()]
base_name = DomainValidator.normalize_domain(name)
domains_to_check = [f"{base_name}.{tld}" for tld in tld_list]
else:
console.print("[red]Error:[/red] Provide name, domains, or bulk-file")
raise typer.Exit(1)
# Validate domains
valid_domains = [d for d in domains_to_check if DomainValidator.is_valid_domain(d)]
if not valid_domains:
console.print("[red]Error:[/red] No valid domains to check")
raise typer.Exit(1)
# Remove duplicates
unique_domains = list(dict.fromkeys(valid_domains))
# Show info
console.print(Panel(
f"[bold]Checking {len(unique_domains)} domains[/bold]\n"
f"WHOIS: [cyan]{'Enabled' if whois else 'Disabled'}[/cyan]\n"
f"Timeout: [cyan]{timeout}s[/cyan]\n"
f"Workers: [cyan]{workers}[/cyan]",
title=f"{APP_NAME} v{APP_VERSION}",
border_style="blue"
))
# Initialize checker
checker = DomainChecker(timeout=timeout, max_workers=workers)
# Check domains with progress
results = []
start_time = time.time()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeRemainingColumn(),
console=console
) as progress:
task = progress.add_task("Checking domains...", total=len(unique_domains))
def progress_callback(result: DomainResult):
progress.advance(task)
results = checker.check_multiple_domains(
unique_domains,
check_whois=whois,
progress_callback=progress_callback
)
total_time = time.time() - start_time
# Sort results
results.sort(key=lambda x: x.domain)
# Display results
formatter = OutputFormatter()
if output_format == "table":
table = formatter.create_table(results, console, verbose)
console.print(table)
elif output_format == "json":
console.print(formatter.to_json(results))
elif output_format == "csv":
console.print(formatter.to_csv(results))
# Show statistics
available_count = sum(1 for r in results if r.status == DomainStatus.AVAILABLE)
registered_count = sum(1 for r in results if r.status == DomainStatus.REGISTERED)
error_count = sum(1 for r in results if r.status == DomainStatus.ERROR)
stats_table = Table(title="Statistics", box=box.ROUNDED)
stats_table.add_column("Metric", style="cyan")
stats_table.add_column("Value", style="green", justify="right")
stats_table.add_row("Total domains", str(len(results)))
stats_table.add_row("Available", str(available_count))
stats_table.add_row("Registered", str(registered_count))
stats_table.add_row("Errors", str(error_count))
stats_table.add_row("Total time", f"{total_time:.2f}s")
stats_table.add_row("Avg response time", f"{sum(r.response_time_ms for r in results)/len(results):.1f}ms")
console.print(stats_table)
# Export to file
if export_file:
file_format = output_format if output_format != "table" else "json"
try:
if file_format == "json":
content = formatter.to_json(results)
elif file_format == "csv":
content = formatter.to_csv(results)
with open(export_file, 'w', encoding='utf-8') as f:
f.write(content)
console.print(f"[green]Results exported to:[/green] {export_file}")
except Exception as e:
console.print(f"[red]Export failed:[/red] {e}")
# Show available domains summary
available_domains = [r for r in results if r.status == DomainStatus.AVAILABLE]
if available_domains:
console.print(Panel(
"\n".join([f"• {r.domain}" for r in available_domains[:10]]) +
(f"\n... and {len(available_domains) - 10} more" if len(available_domains) > 10 else ""),
title=f"[green]Available Domains ({len(available_domains)})[/green]",
border_style="green"
))
if __name__ == "__main__":
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment