Last active
October 14, 2025 12:36
-
-
Save EricsonWillians/77a6e7568c8a72aff22ab44b23ec56fb to your computer and use it in GitHub Desktop.
This script provides comprehensive domain availability checking across multiple TLDs using robust DNS queries, enhanced WHOIS lookups, and advanced features like bulk checking, export capabilities, and detailed logging.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Professional Domain Availability Checker | |
| This script provides comprehensive domain availability checking across multiple TLDs using | |
| robust DNS queries, enhanced WHOIS lookups, and advanced features like bulk checking, | |
| export capabilities, and detailed logging. | |
| Features: | |
| - Concurrent domain checking for optimal performance | |
| - Comprehensive DNS record analysis with retry logic | |
| - Enhanced WHOIS verification with intelligent parsing | |
| - Multiple output formats (table, JSON, CSV) | |
| - Detailed logging and error handling | |
| - Bulk domain checking from file input | |
| - Performance metrics and statistics | |
| Author: Professional Domain Checker | |
| Version: 4.0.0 | |
| License: MIT | |
| Python: 3.8+ | |
| Dependencies: | |
| - dnspython>=2.3.0 | |
| - typer>=0.9.0 | |
| - rich>=13.0.0 | |
| - python-whois>=0.8.0 | |
| - requests>=2.28.0 (fallback WHOIS) | |
| - validators>=0.20.0 | |
| - python-dateutil>=2.8.0 | |
| Installation: | |
| pip install dnspython typer rich python-whois requests validators python-dateutil | |
| """ | |
| import asyncio | |
| import concurrent.futures | |
| import csv | |
| import json | |
| import logging | |
| import re | |
| import socket | |
| import ssl | |
| import sys | |
| import time | |
| import urllib.parse | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Set, Tuple, Union, Any, NamedTuple | |
| from dataclasses import dataclass, asdict, field | |
| from enum import Enum | |
| import threading | |
| import typer | |
| from rich.console import Console | |
| from rich.table import Table | |
| from rich.progress import ( | |
| Progress, BarColumn, TextColumn, TimeRemainingColumn, | |
| SpinnerColumn, MofNCompleteColumn | |
| ) | |
| from rich.logging import RichHandler | |
| from rich.panel import Panel | |
| from rich.text import Text | |
| from rich import box | |
| # Import with better error handling | |
| try: | |
| import dns.resolver | |
| import dns.exception | |
| DNS_AVAILABLE = True | |
| except ImportError as e: | |
| DNS_AVAILABLE = False | |
| DNS_IMPORT_ERROR = str(e) | |
| try: | |
| import whois as python_whois | |
| PYTHON_WHOIS_AVAILABLE = True | |
| except ImportError: | |
| PYTHON_WHOIS_AVAILABLE = False | |
| try: | |
| import requests | |
| REQUESTS_AVAILABLE = True | |
| except ImportError: | |
| REQUESTS_AVAILABLE = False | |
| try: | |
| import validators | |
| VALIDATORS_AVAILABLE = True | |
| except ImportError: | |
| VALIDATORS_AVAILABLE = False | |
| try: | |
| from dateutil import parser as date_parser | |
| DATEUTIL_AVAILABLE = True | |
| except ImportError: | |
| DATEUTIL_AVAILABLE = False | |
| # Application constants | |
| APP_NAME = "Professional Domain Checker" | |
| APP_VERSION = "4.0.0" | |
| DEFAULT_TIMEOUT = 8 | |
| MAX_TIMEOUT = 30 | |
| DEFAULT_WORKERS = 15 | |
| MAX_WORKERS = 50 | |
| DEFAULT_TLDS = [ | |
| "com", "net", "org", "io", "co", "biz", "info", "us", "tech", "ai", | |
| "dev", "app", "xyz", "online", "store", "site", "me", "tv", "cc" | |
| ] | |
| # Enhanced WHOIS configuration | |
| WHOIS_RATE_LIMIT = 1.0 | |
| WHOIS_MAX_RETRIES = 3 | |
| WHOIS_RETRY_DELAY = 2.0 | |
| WHOIS_TIMEOUT = 10 | |
| # WHOIS servers mapping for direct queries | |
| WHOIS_SERVERS = { | |
| 'com': 'whois.verisign-grs.com', | |
| 'net': 'whois.verisign-grs.com', | |
| 'org': 'whois.pir.org', | |
| 'info': 'whois.afilias.net', | |
| 'biz': 'whois.biz', | |
| 'us': 'whois.nic.us', | |
| 'io': 'whois.nic.io', | |
| 'ai': 'whois.nic.ai', | |
| 'co': 'whois.nic.co', | |
| 'me': 'whois.nic.me', | |
| 'tv': 'whois.nic.tv', | |
| 'cc': 'whois.nic.cc', | |
| 'uk': 'whois.nic.uk', | |
| 'de': 'whois.denic.de', | |
| 'fr': 'whois.afnic.fr', | |
| 'it': 'whois.nic.it', | |
| 'ca': 'whois.cira.ca', | |
| 'au': 'whois.auda.org.au', | |
| 'jp': 'whois.jprs.jp', | |
| 'kr': 'whois.kr', | |
| 'cn': 'whois.cnnic.cn', | |
| 'in': 'whois.registry.in', | |
| 'br': 'whois.registro.br', | |
| 'mx': 'whois.mx', | |
| 'pl': 'whois.dns.pl', | |
| 'ru': 'whois.tcinet.ru', | |
| 'nl': 'whois.domain-registry.nl', | |
| 'be': 'whois.dns.be', | |
| 'ch': 'whois.nic.ch', | |
| 'se': 'whois.iis.se', | |
| 'no': 'whois.norid.no', | |
| 'dk': 'whois.dk-hostmaster.dk', | |
| 'fi': 'whois.fi', | |
| 'es': 'whois.nic.es', | |
| 'pt': 'whois.dns.pt' | |
| } | |
| # TLD-specific rate limits | |
| TLD_RATE_LIMITS = { | |
| 'com': 1.0, 'net': 1.0, 'org': 0.8, 'io': 1.5, 'ai': 2.0, | |
| 'uk': 1.2, 'de': 1.5, 'cn': 2.5, 'jp': 2.0, 'kr': 1.8, | |
| 'in': 1.5, 'br': 1.3, 'au': 1.4, 'ca': 1.1, 'mx': 1.6 | |
| } | |
| class DomainStatus(Enum): | |
| """Enumeration for domain status.""" | |
| AVAILABLE = "Available" | |
| REGISTERED = "Registered" | |
| RESERVED = "Reserved" | |
| PREMIUM = "Premium" | |
| ERROR = "Error" | |
| UNKNOWN = "Unknown" | |
| @dataclass | |
| class WhoisInfo: | |
| """Enhanced WHOIS information structure.""" | |
| status: DomainStatus | |
| registrar: Optional[str] = None | |
| creation_date: Optional[datetime] = None | |
| expiration_date: Optional[datetime] = None | |
| updated_date: Optional[datetime] = None | |
| name_servers: List[str] = field(default_factory=list) | |
| registrant_name: Optional[str] = None | |
| registrant_org: Optional[str] = None | |
| registrant_country: Optional[str] = None | |
| admin_email: Optional[str] = None | |
| tech_email: Optional[str] = None | |
| dnssec: Optional[str] = None | |
| whois_server: Optional[str] = None | |
| raw_response: Optional[str] = None | |
| response_source: str = "unknown" # python-whois, direct, api | |
| def is_available(self) -> bool: | |
| """Check if domain is available.""" | |
| return self.status == DomainStatus.AVAILABLE | |
| def days_until_expiration(self) -> Optional[int]: | |
| """Calculate days until expiration.""" | |
| if self.expiration_date: | |
| now = datetime.now(timezone.utc) | |
| exp_date = self.expiration_date | |
| if exp_date.tzinfo is None: | |
| exp_date = exp_date.replace(tzinfo=timezone.utc) | |
| delta = exp_date - now | |
| return max(0, delta.days) | |
| return None | |
| def get_summary(self) -> str: | |
| """Get a summary string for the WHOIS info.""" | |
| if self.is_available(): | |
| return "Available" | |
| parts = [f"Registered"] | |
| if self.registrar: | |
| parts.append(f"via {self.registrar}") | |
| if self.expiration_date: | |
| days_left = self.days_until_expiration() | |
| if days_left is not None: | |
| if days_left < 30: | |
| parts.append(f"⚠️ Expires in {days_left} days!") | |
| else: | |
| parts.append(f"Expires: {self.expiration_date.strftime('%Y-%m-%d')}") | |
| return " | ".join(parts) | |
| @dataclass | |
| class DomainResult: | |
| """Enhanced domain check result.""" | |
| domain: str | |
| status: DomainStatus | |
| dns_records: Dict[str, List[str]] = field(default_factory=dict) | |
| whois_info: Optional[WhoisInfo] = None | |
| response_time_ms: float = 0.0 | |
| timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) | |
| error_message: Optional[str] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for serialization.""" | |
| result = { | |
| 'domain': self.domain, | |
| 'status': self.status.value, | |
| 'dns_records': self.dns_records, | |
| 'response_time_ms': self.response_time_ms, | |
| 'timestamp': self.timestamp.isoformat(), | |
| 'error_message': self.error_message | |
| } | |
| if self.whois_info: | |
| result['whois_info'] = { | |
| 'status': self.whois_info.status.value, | |
| 'registrar': self.whois_info.registrar, | |
| 'creation_date': self.whois_info.creation_date.isoformat() if self.whois_info.creation_date else None, | |
| 'expiration_date': self.whois_info.expiration_date.isoformat() if self.whois_info.expiration_date else None, | |
| 'updated_date': self.whois_info.updated_date.isoformat() if self.whois_info.updated_date else None, | |
| 'name_servers': self.whois_info.name_servers, | |
| 'registrant_name': self.whois_info.registrant_name, | |
| 'registrant_org': self.whois_info.registrant_org, | |
| 'registrant_country': self.whois_info.registrant_country, | |
| 'admin_email': self.whois_info.admin_email, | |
| 'tech_email': self.whois_info.tech_email, | |
| 'dnssec': self.whois_info.dnssec, | |
| 'whois_server': self.whois_info.whois_server, | |
| 'response_source': self.whois_info.response_source, | |
| 'days_until_expiration': self.whois_info.days_until_expiration() | |
| } | |
| return result | |
| class DomainValidator: | |
| """Enhanced domain validation.""" | |
| @staticmethod | |
| def is_valid_domain(domain: str) -> bool: | |
| """Validate domain format.""" | |
| if not domain or len(domain) > 253: | |
| return False | |
| if VALIDATORS_AVAILABLE: | |
| return validators.domain(domain) is True | |
| # Fallback validation | |
| domain = domain.rstrip('.') | |
| if not domain: | |
| return False | |
| parts = domain.split('.') | |
| if len(parts) < 2: | |
| return False | |
| for part in parts: | |
| if not part or len(part) > 63: | |
| return False | |
| if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?$', part): | |
| return False | |
| return True | |
| @staticmethod | |
| def normalize_domain(domain: str) -> str: | |
| """Normalize domain name.""" | |
| if not domain: | |
| return "" | |
| # Remove protocol | |
| if '://' in domain: | |
| domain = urllib.parse.urlparse(f"http://{domain}").netloc or urllib.parse.urlparse(domain).netloc | |
| # Clean up | |
| domain = domain.lower().strip().rstrip('.') | |
| domain = domain.split('/')[0] # Remove path | |
| return domain | |
| @staticmethod | |
| def extract_tld(domain: str) -> str: | |
| """Extract TLD from domain.""" | |
| parts = domain.split('.') | |
| return parts[-1] if len(parts) > 1 else "" | |
| class EnhancedWhoisChecker: | |
| """Enhanced WHOIS checker with multiple methods and intelligent parsing.""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| self.rate_limiter = {} | |
| self.cache = {} | |
| self.lock = threading.Lock() | |
| def _apply_rate_limit(self, tld: str) -> None: | |
| """Apply rate limiting per TLD.""" | |
| with self.lock: | |
| rate_limit = TLD_RATE_LIMITS.get(tld, WHOIS_RATE_LIMIT) | |
| current_time = time.time() | |
| last_check = self.rate_limiter.get(tld, 0) | |
| if current_time - last_check < rate_limit: | |
| sleep_time = rate_limit - (current_time - last_check) | |
| self.logger.debug(f"Rate limiting .{tld}: sleeping {sleep_time:.2f}s") | |
| time.sleep(sleep_time) | |
| self.rate_limiter[tld] = time.time() | |
| def _parse_date(self, date_str: str) -> Optional[datetime]: | |
| """Parse date string into datetime object.""" | |
| if not date_str or date_str.lower() in ['none', 'null', 'n/a', '']: | |
| return None | |
| # Clean up the date string | |
| date_str = str(date_str).strip() | |
| if DATEUTIL_AVAILABLE: | |
| try: | |
| return date_parser.parse(date_str) | |
| except (ValueError, TypeError): | |
| pass | |
| # Fallback date parsing | |
| date_patterns = [ | |
| '%Y-%m-%d', | |
| '%Y-%m-%d %H:%M:%S', | |
| '%Y-%m-%dT%H:%M:%S', | |
| '%Y-%m-%dT%H:%M:%SZ', | |
| '%d-%b-%Y', | |
| '%d.%m.%Y', | |
| '%m/%d/%Y', | |
| '%Y/%m/%d' | |
| ] | |
| for pattern in date_patterns: | |
| try: | |
| return datetime.strptime(date_str, pattern) | |
| except ValueError: | |
| continue | |
| self.logger.debug(f"Could not parse date: {date_str}") | |
| return None | |
| def _extract_emails(self, text: str) -> List[str]: | |
| """Extract email addresses from text.""" | |
| email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
| return re.findall(email_pattern, text, re.IGNORECASE) | |
| def _parse_python_whois_response(self, whois_data: Any, domain: str) -> WhoisInfo: | |
| """Parse python-whois response.""" | |
| self.logger.debug(f"Parsing python-whois response for {domain}") | |
| # Extract domain name(s) | |
| domain_names = getattr(whois_data, 'domain_name', None) | |
| if domain_names: | |
| if isinstance(domain_names, list): | |
| domain_names = [d.lower() if d else '' for d in domain_names] | |
| else: | |
| domain_names = [str(domain_names).lower()] | |
| # Check if domain is registered | |
| is_registered = bool(domain_names and any( | |
| domain.lower() in dn for dn in domain_names if dn | |
| )) | |
| if not is_registered: | |
| return WhoisInfo( | |
| status=DomainStatus.AVAILABLE, | |
| response_source="python-whois", | |
| raw_response=str(whois_data)[:1000] | |
| ) | |
| # Extract registrar | |
| registrar = getattr(whois_data, 'registrar', None) | |
| if isinstance(registrar, list): | |
| registrar = registrar[0] if registrar else None | |
| # Extract dates | |
| creation_date = None | |
| expiration_date = None | |
| updated_date = None | |
| for date_field, attr_name in [ | |
| ('creation_date', 'creation_date'), | |
| ('expiration_date', 'expiration_date'), | |
| ('updated_date', 'updated_date') | |
| ]: | |
| date_value = getattr(whois_data, attr_name, None) | |
| if date_value: | |
| if isinstance(date_value, list): | |
| date_value = date_value[0] if date_value else None | |
| if isinstance(date_value, datetime): | |
| locals()[date_field] = date_value | |
| elif isinstance(date_value, str): | |
| locals()[date_field] = self._parse_date(date_value) | |
| # Extract name servers | |
| name_servers = getattr(whois_data, 'name_servers', []) | |
| if name_servers and not isinstance(name_servers, list): | |
| name_servers = [name_servers] | |
| name_servers = [ns.lower().strip() for ns in name_servers if ns] | |
| # Extract status | |
| status_info = getattr(whois_data, 'status', None) | |
| if isinstance(status_info, list): | |
| status_info = ' '.join(str(s) for s in status_info if s) | |
| # Extract other fields | |
| registrant_name = getattr(whois_data, 'registrant_name', None) | |
| registrant_org = getattr(whois_data, 'org', None) | |
| registrant_country = getattr(whois_data, 'country', None) | |
| # Extract emails from raw data | |
| raw_text = str(whois_data) | |
| emails = self._extract_emails(raw_text) | |
| admin_email = None | |
| tech_email = None | |
| if emails: | |
| for email in emails: | |
| if 'admin' in email.lower(): | |
| admin_email = email | |
| elif 'tech' in email.lower(): | |
| tech_email = email | |
| if not admin_email and emails: | |
| admin_email = emails[0] | |
| # DNSSEC | |
| dnssec = getattr(whois_data, 'dnssec', None) | |
| if isinstance(dnssec, list): | |
| dnssec = ', '.join(str(d) for d in dnssec if d) | |
| return WhoisInfo( | |
| status=DomainStatus.REGISTERED, | |
| registrar=str(registrar) if registrar else None, | |
| creation_date=creation_date, | |
| expiration_date=expiration_date, | |
| updated_date=updated_date, | |
| name_servers=name_servers, | |
| registrant_name=str(registrant_name) if registrant_name else None, | |
| registrant_org=str(registrant_org) if registrant_org else None, | |
| registrant_country=str(registrant_country) if registrant_country else None, | |
| admin_email=admin_email, | |
| tech_email=tech_email, | |
| dnssec=str(dnssec) if dnssec else None, | |
| response_source="python-whois", | |
| raw_response=raw_text[:1000] | |
| ) | |
| def _direct_whois_query(self, domain: str) -> Optional[str]: | |
| """Perform direct WHOIS query via socket.""" | |
| tld = DomainValidator.extract_tld(domain) | |
| whois_server = WHOIS_SERVERS.get(tld) | |
| if not whois_server: | |
| self.logger.debug(f"No WHOIS server known for .{tld}") | |
| return None | |
| try: | |
| self.logger.debug(f"Direct WHOIS query to {whois_server} for {domain}") | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
| sock.settimeout(WHOIS_TIMEOUT) | |
| sock.connect((whois_server, 43)) | |
| query = f"{domain}\r\n" | |
| sock.send(query.encode()) | |
| response = b"" | |
| while True: | |
| data = sock.recv(4096) | |
| if not data: | |
| break | |
| response += data | |
| return response.decode('utf-8', errors='ignore') | |
| except Exception as e: | |
| self.logger.debug(f"Direct WHOIS query failed for {domain}: {e}") | |
| return None | |
| def _parse_direct_whois_response(self, response: str, domain: str) -> WhoisInfo: | |
| """Parse direct WHOIS response.""" | |
| if not response: | |
| return WhoisInfo(status=DomainStatus.ERROR, response_source="direct") | |
| response_lower = response.lower() | |
| # Check for availability indicators | |
| availability_patterns = [ | |
| r'no match', r'not found', r'no entries found', | |
| r'available', r'no data found', r'no object found', | |
| r'not registered', r'no information available', | |
| r'domain status:\s*available', r'status:\s*free' | |
| ] | |
| for pattern in availability_patterns: | |
| if re.search(pattern, response_lower): | |
| return WhoisInfo( | |
| status=DomainStatus.AVAILABLE, | |
| response_source="direct", | |
| raw_response=response[:1000] | |
| ) | |
| # If we get substantial response, assume it's registered | |
| if len(response.strip()) < 50: | |
| return WhoisInfo( | |
| status=DomainStatus.UNKNOWN, | |
| response_source="direct", | |
| raw_response=response | |
| ) | |
| # Parse registered domain details | |
| registrar = None | |
| creation_date = None | |
| expiration_date = None | |
| updated_date = None | |
| name_servers = [] | |
| # Extract registrar | |
| registrar_patterns = [ | |
| r'registrar:\s*(.+)', | |
| r'registrar name:\s*(.+)', | |
| r'sponsoring registrar:\s*(.+)' | |
| ] | |
| for pattern in registrar_patterns: | |
| match = re.search(pattern, response, re.IGNORECASE) | |
| if match: | |
| registrar = match.group(1).strip() | |
| break | |
| # Extract dates | |
| date_patterns = { | |
| 'creation_date': [ | |
| r'creation date:\s*(.+)', | |
| r'created:\s*(.+)', | |
| r'registered:\s*(.+)', | |
| r'domain_dateregistered:\s*(.+)' | |
| ], | |
| 'expiration_date': [ | |
| r'expir(?:y|ation) date:\s*(.+)', | |
| r'expires:\s*(.+)', | |
| r'expiry:\s*(.+)', | |
| r'domain_datebilleduntil:\s*(.+)' | |
| ], | |
| 'updated_date': [ | |
| r'updated date:\s*(.+)', | |
| r'last updated:\s*(.+)', | |
| r'modified:\s*(.+)', | |
| r'domain_datelastmodified:\s*(.+)' | |
| ] | |
| } | |
| for date_field, patterns in date_patterns.items(): | |
| for pattern in patterns: | |
| match = re.search(pattern, response, re.IGNORECASE) | |
| if match: | |
| date_str = match.group(1).strip() | |
| parsed_date = self._parse_date(date_str) | |
| if parsed_date: | |
| locals()[date_field] = parsed_date | |
| break | |
| if locals().get(date_field): | |
| break | |
| # Extract name servers | |
| ns_patterns = [ | |
| r'name server:\s*(.+)', | |
| r'nameserver:\s*(.+)', | |
| r'nserver:\s*(.+)', | |
| r'dns:\s*(.+)' | |
| ] | |
| for pattern in ns_patterns: | |
| matches = re.findall(pattern, response, re.IGNORECASE) | |
| for match in matches: | |
| ns = match.strip().lower() | |
| if ns and ns not in name_servers: | |
| name_servers.append(ns) | |
| # Extract emails | |
| emails = self._extract_emails(response) | |
| admin_email = None | |
| tech_email = None | |
| for email in emails: | |
| if 'admin' in response[max(0, response.find(email)-50):response.find(email)+50].lower(): | |
| admin_email = email | |
| elif 'tech' in response[max(0, response.find(email)-50):response.find(email)+50].lower(): | |
| tech_email = email | |
| return WhoisInfo( | |
| status=DomainStatus.REGISTERED, | |
| registrar=registrar, | |
| creation_date=creation_date, | |
| expiration_date=expiration_date, | |
| updated_date=updated_date, | |
| name_servers=name_servers, | |
| admin_email=admin_email, | |
| tech_email=tech_email, | |
| response_source="direct", | |
| raw_response=response[:1000] | |
| ) | |
| def check_whois(self, domain: str) -> WhoisInfo: | |
| """Check domain WHOIS with multiple methods.""" | |
| # Check cache first | |
| if domain in self.cache: | |
| self.logger.debug(f"Cache hit for {domain}") | |
| return self.cache[domain] | |
| tld = DomainValidator.extract_tld(domain) | |
| self._apply_rate_limit(tld) | |
| whois_info = None | |
| last_error = None | |
| # Method 1: Try python-whois | |
| if PYTHON_WHOIS_AVAILABLE: | |
| try: | |
| self.logger.debug(f"Trying python-whois for {domain}") | |
| whois_data = python_whois.whois(domain) | |
| whois_info = self._parse_python_whois_response(whois_data, domain) | |
| # Validate the result | |
| if whois_info.status != DomainStatus.UNKNOWN: | |
| self.cache[domain] = whois_info | |
| return whois_info | |
| except Exception as e: | |
| last_error = e | |
| error_msg = str(e).lower() | |
| # Check if error indicates availability | |
| if any(indicator in error_msg for indicator in [ | |
| 'no match', 'not found', 'no entries', | |
| 'available', 'no data', 'no object' | |
| ]): | |
| whois_info = WhoisInfo( | |
| status=DomainStatus.AVAILABLE, | |
| response_source="python-whois-error", | |
| raw_response=error_msg | |
| ) | |
| self.cache[domain] = whois_info | |
| return whois_info | |
| self.logger.debug(f"python-whois failed for {domain}: {e}") | |
| # Method 2: Try direct WHOIS query | |
| try: | |
| self.logger.debug(f"Trying direct WHOIS for {domain}") | |
| response = self._direct_whois_query(domain) | |
| if response: | |
| whois_info = self._parse_direct_whois_response(response, domain) | |
| if whois_info.status != DomainStatus.UNKNOWN: | |
| self.cache[domain] = whois_info | |
| return whois_info | |
| except Exception as e: | |
| last_error = e | |
| self.logger.debug(f"Direct WHOIS failed for {domain}: {e}") | |
| # If all methods failed, return error status | |
| error_msg = str(last_error) if last_error else "All WHOIS methods failed" | |
| whois_info = WhoisInfo( | |
| status=DomainStatus.ERROR, | |
| response_source="error", | |
| raw_response=error_msg[:1000] | |
| ) | |
| self.cache[domain] = whois_info | |
| return whois_info | |
| class DNSChecker: | |
| """Enhanced DNS checker.""" | |
| def __init__(self, timeout: int = DEFAULT_TIMEOUT): | |
| self.timeout = min(timeout, MAX_TIMEOUT) | |
| self.resolver = self._create_resolver() | |
| self.logger = logging.getLogger(__name__) | |
| def _create_resolver(self) -> 'dns.resolver.Resolver': | |
| """Create DNS resolver.""" | |
| resolver = dns.resolver.Resolver() | |
| resolver.timeout = self.timeout | |
| resolver.lifetime = self.timeout | |
| resolver.nameservers = ['8.8.8.8', '1.1.1.1', '208.67.222.222'] | |
| return resolver | |
| def check_dns_records(self, domain: str) -> Dict[str, List[str]]: | |
| """Check DNS records for domain.""" | |
| results = {} | |
| record_types = ["A", "AAAA", "MX", "NS", "CNAME", "TXT", "SOA"] | |
| for record_type in record_types: | |
| try: | |
| answers = self.resolver.resolve(domain, record_type) | |
| results[record_type] = [str(rdata) for rdata in answers] | |
| except dns.resolver.NXDOMAIN: | |
| # Domain doesn't exist | |
| results[record_type] = [] | |
| break | |
| except dns.resolver.NoAnswer: | |
| results[record_type] = [] | |
| except dns.exception.Timeout: | |
| results[record_type] = ["Timeout"] | |
| except Exception as e: | |
| results[record_type] = [f"Error: {str(e)[:50]}"] | |
| return results | |
| def is_domain_active(self, dns_records: Dict[str, List[str]]) -> bool: | |
| """Check if domain appears active based on DNS.""" | |
| essential_records = ["A", "AAAA", "MX", "NS"] | |
| for record_type in essential_records: | |
| records = dns_records.get(record_type, []) | |
| if records and not any( | |
| record.startswith(("Error:", "Timeout")) for record in records | |
| ): | |
| return True | |
| return False | |
| class DomainChecker: | |
| """Main domain checker class.""" | |
| def __init__(self, timeout: int = DEFAULT_TIMEOUT, max_workers: int = DEFAULT_WORKERS): | |
| self.timeout = timeout | |
| self.max_workers = min(max_workers, MAX_WORKERS) | |
| self.dns_checker = DNSChecker(timeout) | |
| self.whois_checker = EnhancedWhoisChecker() | |
| self.logger = logging.getLogger(__name__) | |
| def check_domain(self, domain: str, check_whois: bool = False) -> DomainResult: | |
| """Check a single domain.""" | |
| start_time = time.time() | |
| try: | |
| # Normalize domain | |
| domain = DomainValidator.normalize_domain(domain) | |
| if not DomainValidator.is_valid_domain(domain): | |
| return DomainResult( | |
| domain=domain, | |
| status=DomainStatus.ERROR, | |
| error_message="Invalid domain format" | |
| ) | |
| # Check DNS | |
| dns_records = self.dns_checker.check_dns_records(domain) | |
| is_dns_active = self.dns_checker.is_domain_active(dns_records) | |
| # Determine initial status | |
| if is_dns_active: | |
| status = DomainStatus.REGISTERED | |
| else: | |
| status = DomainStatus.AVAILABLE | |
| # Check WHOIS if requested | |
| whois_info = None | |
| if check_whois: | |
| whois_info = self.whois_checker.check_whois(domain) | |
| # WHOIS is more authoritative | |
| if whois_info.status in [DomainStatus.AVAILABLE, DomainStatus.REGISTERED]: | |
| status = whois_info.status | |
| response_time = (time.time() - start_time) * 1000 | |
| return DomainResult( | |
| domain=domain, | |
| status=status, | |
| dns_records=dns_records, | |
| whois_info=whois_info, | |
| response_time_ms=response_time | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error checking {domain}: {e}") | |
| return DomainResult( | |
| domain=domain, | |
| status=DomainStatus.ERROR, | |
| error_message=str(e), | |
| response_time_ms=(time.time() - start_time) * 1000 | |
| ) | |
| def check_multiple_domains( | |
| self, | |
| domains: List[str], | |
| check_whois: bool = False, | |
| progress_callback: Optional[callable] = None | |
| ) -> List[DomainResult]: | |
| """Check multiple domains concurrently.""" | |
| results = [] | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| future_to_domain = { | |
| executor.submit(self.check_domain, domain, check_whois): domain | |
| for domain in domains | |
| } | |
| for future in concurrent.futures.as_completed(future_to_domain): | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| if progress_callback: | |
| progress_callback(result) | |
| except Exception as e: | |
| domain = future_to_domain[future] | |
| error_result = DomainResult( | |
| domain=domain, | |
| status=DomainStatus.ERROR, | |
| error_message=str(e) | |
| ) | |
| results.append(error_result) | |
| if progress_callback: | |
| progress_callback(error_result) | |
| return results | |
| class OutputFormatter: | |
| """Output formatting utilities.""" | |
| @staticmethod | |
| def create_table(results: List[DomainResult], console: Console, verbose: bool = False) -> Table: | |
| """Create Rich table.""" | |
| table = Table( | |
| title=f"Domain Availability Check - {len(results)} domains", | |
| box=box.ROUNDED, | |
| show_lines=True | |
| ) | |
| table.add_column("Domain", style="cyan", min_width=20) | |
| table.add_column("Status", style="bold", min_width=12) | |
| table.add_column("DNS Records", style="green", min_width=15) | |
| table.add_column("WHOIS Info", style="magenta", min_width=25) | |
| table.add_column("Response Time", style="yellow", min_width=12) | |
| if verbose: | |
| table.add_column("Registrar", style="blue", min_width=15) | |
| table.add_column("Expires", style="red", min_width=12) | |
| for result in sorted(results, key=lambda x: x.domain): | |
| # Status with colors | |
| if result.status == DomainStatus.AVAILABLE: | |
| status_display = "[green]✓ Available[/green]" | |
| elif result.status == DomainStatus.REGISTERED: | |
| status_display = "[red]✗ Registered[/red]" | |
| else: | |
| status_display = f"[yellow]⚠ {result.status.value}[/yellow]" | |
| # DNS summary | |
| dns_summary = [] | |
| for record_type, records in result.dns_records.items(): | |
| if records and not any(r.startswith(("Error:", "Timeout")) for r in records): | |
| dns_summary.append(f"{record_type}({len(records)})") | |
| dns_display = ", ".join(dns_summary) if dns_summary else "None" | |
| # WHOIS summary | |
| if result.whois_info: | |
| whois_display = result.whois_info.get_summary() | |
| else: | |
| whois_display = "Not checked" | |
| if len(whois_display) > 40: | |
| whois_display = whois_display[:37] + "..." | |
| # Response time | |
| time_display = f"{result.response_time_ms:.1f}ms" | |
| row_data = [ | |
| result.domain, | |
| status_display, | |
| dns_display, | |
| whois_display, | |
| time_display | |
| ] | |
| if verbose and result.whois_info: | |
| # Registrar | |
| registrar = result.whois_info.registrar or "N/A" | |
| if len(registrar) > 20: | |
| registrar = registrar[:17] + "..." | |
| # Expiration | |
| if result.whois_info.expiration_date: | |
| days_left = result.whois_info.days_until_expiration() | |
| if days_left is not None: | |
| if days_left < 30: | |
| exp_display = f"[red]{days_left}d[/red]" | |
| elif days_left < 90: | |
| exp_display = f"[yellow]{days_left}d[/yellow]" | |
| else: | |
| exp_display = f"{days_left}d" | |
| else: | |
| exp_display = "N/A" | |
| else: | |
| exp_display = "N/A" | |
| row_data.extend([registrar, exp_display]) | |
| elif verbose: | |
| row_data.extend(["N/A", "N/A"]) | |
| table.add_row(*row_data) | |
| return table | |
| @staticmethod | |
| def to_json(results: List[DomainResult]) -> str: | |
| """Convert results to JSON.""" | |
| data = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "total_domains": len(results), | |
| "available_count": sum(1 for r in results if r.status == DomainStatus.AVAILABLE), | |
| "registered_count": sum(1 for r in results if r.status == DomainStatus.REGISTERED), | |
| "error_count": sum(1 for r in results if r.status == DomainStatus.ERROR), | |
| "results": [result.to_dict() for result in results] | |
| } | |
| return json.dumps(data, indent=2, default=str) | |
| @staticmethod | |
| def to_csv(results: List[DomainResult]) -> str: | |
| """Convert results to CSV.""" | |
| import io | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| # Header | |
| writer.writerow([ | |
| "Domain", "Status", "DNS_Records", "WHOIS_Summary", | |
| "Registrar", "Creation_Date", "Expiration_Date", | |
| "Response_Time_ms", "Error_Message" | |
| ]) | |
| # Data | |
| for result in results: | |
| dns_summary = "; ".join([ | |
| f"{k}:{len(v)}" for k, v in result.dns_records.items() if v | |
| ]) | |
| whois_summary = "" | |
| registrar = "" | |
| creation_date = "" | |
| expiration_date = "" | |
| if result.whois_info: | |
| whois_summary = result.whois_info.get_summary() | |
| registrar = result.whois_info.registrar or "" | |
| creation_date = result.whois_info.creation_date.isoformat() if result.whois_info.creation_date else "" | |
| expiration_date = result.whois_info.expiration_date.isoformat() if result.whois_info.expiration_date else "" | |
| writer.writerow([ | |
| result.domain, | |
| result.status.value, | |
| dns_summary, | |
| whois_summary, | |
| registrar, | |
| creation_date, | |
| expiration_date, | |
| f"{result.response_time_ms:.1f}", | |
| result.error_message or "" | |
| ]) | |
| return output.getvalue() | |
| def setup_logging(level: str) -> None: | |
| """Setup logging configuration.""" | |
| log_level = getattr(logging, level.upper(), logging.INFO) | |
| logging.basicConfig( | |
| level=log_level, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| handlers=[ | |
| RichHandler(rich_tracebacks=True), | |
| logging.FileHandler("domain_checker.log") | |
| ] | |
| ) | |
| def load_domains_from_file(file_path: Path) -> List[str]: | |
| """Load domains from file.""" | |
| domains = [] | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith('#'): | |
| if ',' in line: | |
| domains.extend([d.strip() for d in line.split(',')]) | |
| else: | |
| domains.append(line) | |
| return [DomainValidator.normalize_domain(d) for d in domains if d] | |
| except Exception as e: | |
| raise typer.BadParameter(f"Error reading file {file_path}: {e}") | |
| # Typer app setup | |
| app = typer.Typer( | |
| name=APP_NAME, | |
| help="Professional domain availability checker with enhanced WHOIS support", | |
| add_completion=False | |
| ) | |
| console = Console() | |
| def version_callback(value: bool): | |
| """Show version.""" | |
| if value: | |
| console.print(f"[bold cyan]{APP_NAME}[/bold cyan] version [green]{APP_VERSION}[/green]") | |
| raise typer.Exit() | |
| @app.command() | |
| def main( | |
| name: Optional[str] = typer.Argument(None, help="Base name for domain checking"), | |
| tlds: str = typer.Option(",".join(DEFAULT_TLDS), help="Comma-separated TLDs"), | |
| domains: Optional[str] = typer.Option(None, help="Specific domains to check"), | |
| bulk_file: Optional[Path] = typer.Option(None, help="File with domains to check"), | |
| whois: bool = typer.Option(False, help="Enable WHOIS verification"), | |
| verbose: bool = typer.Option(False, "-v", "--verbose", help="Verbose output"), | |
| timeout: int = typer.Option(DEFAULT_TIMEOUT, help="Timeout in seconds"), | |
| workers: int = typer.Option(DEFAULT_WORKERS, help="Number of workers"), | |
| output_format: str = typer.Option("table", help="Output format: table, json, csv"), | |
| export_file: Optional[Path] = typer.Option(None, help="Export to file"), | |
| log_level: str = typer.Option("WARNING", help="Log level"), | |
| version: Optional[bool] = typer.Option(None, "--version", callback=version_callback, help="Show version") | |
| ): | |
| """Professional domain availability checker.""" | |
| # Validate dependencies | |
| if not DNS_AVAILABLE: | |
| console.print(Panel( | |
| f"[red]Error:[/red] dnspython is required.\n" | |
| f"Install with: [cyan]pip install dnspython[/cyan]", | |
| title="Missing Dependency", | |
| border_style="red" | |
| )) | |
| raise typer.Exit(1) | |
| if whois and not PYTHON_WHOIS_AVAILABLE: | |
| console.print(Panel( | |
| "[yellow]Warning:[/yellow] python-whois not installed. WHOIS checks disabled.\n" | |
| "Install with: [cyan]pip install python-whois[/cyan]", | |
| title="Optional Dependency", | |
| border_style="yellow" | |
| )) | |
| whois = False | |
| # Setup logging | |
| setup_logging(log_level) | |
| # Validate output format | |
| if output_format not in ["table", "json", "csv"]: | |
| console.print(f"[red]Error:[/red] Invalid output format: {output_format}") | |
| raise typer.Exit(1) | |
| # Determine domains to check | |
| domains_to_check = [] | |
| if bulk_file: | |
| if not bulk_file.exists(): | |
| console.print(f"[red]Error:[/red] File not found: {bulk_file}") | |
| raise typer.Exit(1) | |
| domains_to_check = load_domains_from_file(bulk_file) | |
| elif domains: | |
| domains_to_check = [ | |
| DomainValidator.normalize_domain(d.strip()) | |
| for d in domains.split(',') if d.strip() | |
| ] | |
| elif name: | |
| tld_list = [tld.strip().lower() for tld in tlds.split(',') if tld.strip()] | |
| base_name = DomainValidator.normalize_domain(name) | |
| domains_to_check = [f"{base_name}.{tld}" for tld in tld_list] | |
| else: | |
| console.print("[red]Error:[/red] Provide name, domains, or bulk-file") | |
| raise typer.Exit(1) | |
| # Validate domains | |
| valid_domains = [d for d in domains_to_check if DomainValidator.is_valid_domain(d)] | |
| if not valid_domains: | |
| console.print("[red]Error:[/red] No valid domains to check") | |
| raise typer.Exit(1) | |
| # Remove duplicates | |
| unique_domains = list(dict.fromkeys(valid_domains)) | |
| # Show info | |
| console.print(Panel( | |
| f"[bold]Checking {len(unique_domains)} domains[/bold]\n" | |
| f"WHOIS: [cyan]{'Enabled' if whois else 'Disabled'}[/cyan]\n" | |
| f"Timeout: [cyan]{timeout}s[/cyan]\n" | |
| f"Workers: [cyan]{workers}[/cyan]", | |
| title=f"{APP_NAME} v{APP_VERSION}", | |
| border_style="blue" | |
| )) | |
| # Initialize checker | |
| checker = DomainChecker(timeout=timeout, max_workers=workers) | |
| # Check domains with progress | |
| results = [] | |
| start_time = time.time() | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| MofNCompleteColumn(), | |
| TextColumn("•"), | |
| TimeRemainingColumn(), | |
| console=console | |
| ) as progress: | |
| task = progress.add_task("Checking domains...", total=len(unique_domains)) | |
| def progress_callback(result: DomainResult): | |
| progress.advance(task) | |
| results = checker.check_multiple_domains( | |
| unique_domains, | |
| check_whois=whois, | |
| progress_callback=progress_callback | |
| ) | |
| total_time = time.time() - start_time | |
| # Sort results | |
| results.sort(key=lambda x: x.domain) | |
| # Display results | |
| formatter = OutputFormatter() | |
| if output_format == "table": | |
| table = formatter.create_table(results, console, verbose) | |
| console.print(table) | |
| elif output_format == "json": | |
| console.print(formatter.to_json(results)) | |
| elif output_format == "csv": | |
| console.print(formatter.to_csv(results)) | |
| # Show statistics | |
| available_count = sum(1 for r in results if r.status == DomainStatus.AVAILABLE) | |
| registered_count = sum(1 for r in results if r.status == DomainStatus.REGISTERED) | |
| error_count = sum(1 for r in results if r.status == DomainStatus.ERROR) | |
| stats_table = Table(title="Statistics", box=box.ROUNDED) | |
| stats_table.add_column("Metric", style="cyan") | |
| stats_table.add_column("Value", style="green", justify="right") | |
| stats_table.add_row("Total domains", str(len(results))) | |
| stats_table.add_row("Available", str(available_count)) | |
| stats_table.add_row("Registered", str(registered_count)) | |
| stats_table.add_row("Errors", str(error_count)) | |
| stats_table.add_row("Total time", f"{total_time:.2f}s") | |
| stats_table.add_row("Avg response time", f"{sum(r.response_time_ms for r in results)/len(results):.1f}ms") | |
| console.print(stats_table) | |
| # Export to file | |
| if export_file: | |
| file_format = output_format if output_format != "table" else "json" | |
| try: | |
| if file_format == "json": | |
| content = formatter.to_json(results) | |
| elif file_format == "csv": | |
| content = formatter.to_csv(results) | |
| with open(export_file, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| console.print(f"[green]Results exported to:[/green] {export_file}") | |
| except Exception as e: | |
| console.print(f"[red]Export failed:[/red] {e}") | |
| # Show available domains summary | |
| available_domains = [r for r in results if r.status == DomainStatus.AVAILABLE] | |
| if available_domains: | |
| console.print(Panel( | |
| "\n".join([f"• {r.domain}" for r in available_domains[:10]]) + | |
| (f"\n... and {len(available_domains) - 10} more" if len(available_domains) > 10 else ""), | |
| title=f"[green]Available Domains ({len(available_domains)})[/green]", | |
| border_style="green" | |
| )) | |
| if __name__ == "__main__": | |
| app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment