Last active
September 20, 2025 07:39
-
-
Save cumulus13/a94af45adcb89d2e3076b854ebc34f38 to your computer and use it in GitHub Desktop.
simple language translator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| # author: Hadi Cahyadi (Enhanced with async/await) | |
| # email: cumulus13@gmail.com | |
| import sys | |
| import asyncio | |
| import clipboard | |
| import os | |
| import aiohttp | |
| import aiofiles | |
| from pathlib import Path | |
| from typing import Optional, Tuple, List, Dict, Any | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| import json | |
| import argparse | |
| import shutil | |
| import time | |
| import socket | |
| import logging | |
| from contextlib import asynccontextmanager | |
| # Rich for beautiful console output | |
| from rich import print, console, traceback | |
| from rich.progress import Progress, SpinnerColumn, TextColumn | |
| from rich.panel import Panel | |
| from rich.table import Table | |
| try: | |
| from licface import CustomRichHelpFormatter | |
| except Exception as e: | |
| CustomRichHelpFormatter = argparse.RawDescriptionHelpFormatter | |
| # TTS (kept as sync since pyttsx3 is sync-only) | |
| import pyttsx3 | |
| from threading import Thread | |
| # Configure logging | |
| logging.basicConfig(level=logging.WARNING) | |
| logger = logging.getLogger(__name__) | |
| # Install rich traceback | |
| traceback.install(theme='fruity', max_frames=30, width=shutil.get_terminal_size()[0]) | |
| console = console.Console() | |
| @dataclass | |
| class TranslationResult: | |
| """Result of translation operation""" | |
| text: str | |
| detected_lang: str | |
| confidence: float = 0.0 | |
| @dataclass | |
| class HistoryEntry: | |
| """History entry structure""" | |
| timestamp: str | |
| original: str | |
| translated: str | |
| from_lang: str | |
| to_lang: str | |
| confidence: float = 0.0 | |
| class AsyncTranslator: | |
| """Modern async translator using Google Translate API""" | |
| def __init__(self): | |
| self.session: Optional[aiohttp.ClientSession] = None | |
| self.base_url = "https://translate.googleapis.com/translate_a/single" | |
| self.languages = self._load_languages() | |
| async def __aenter__(self): | |
| """Async context manager entry""" | |
| if not self.session: | |
| timeout = aiohttp.ClientTimeout(total=30) | |
| self.session = aiohttp.ClientSession( | |
| timeout=timeout, | |
| headers={ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| ) | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """Async context manager exit""" | |
| if self.session: | |
| await self.session.close() | |
| self.session = None | |
| @staticmethod | |
| def _load_languages() -> Dict[str, str]: | |
| """Load language mappings""" | |
| return { | |
| 'af': 'afrikaans', 'sq': 'albanian', 'am': 'amharic', 'ar': 'arabic', | |
| 'hy': 'armenian', 'az': 'azerbaijani', 'eu': 'basque', 'be': 'belarusian', | |
| 'bn': 'bengali', 'bs': 'bosnian', 'bg': 'bulgarian', 'ca': 'catalan', | |
| 'ceb': 'cebuano', 'ny': 'chichewa', 'zh-cn': 'chinese (simplified)', | |
| 'zh-tw': 'chinese (traditional)', 'co': 'corsican', 'hr': 'croatian', | |
| 'cs': 'czech', 'da': 'danish', 'nl': 'dutch', 'en': 'english', | |
| 'eo': 'esperanto', 'et': 'estonian', 'tl': 'filipino', 'fi': 'finnish', | |
| 'fr': 'french', 'fy': 'frisian', 'gl': 'galician', 'ka': 'georgian', | |
| 'de': 'german', 'el': 'greek', 'gu': 'gujarati', 'ht': 'haitian creole', | |
| 'ha': 'hausa', 'haw': 'hawaiian', 'iw': 'hebrew', 'he': 'hebrew', | |
| 'hi': 'hindi', 'hmn': 'hmong', 'hu': 'hungarian', 'is': 'icelandic', | |
| 'ig': 'igbo', 'id': 'indonesian', 'ga': 'irish', 'it': 'italian', | |
| 'ja': 'japanese', 'jw': 'javanese', 'kn': 'kannada', 'kk': 'kazakh', | |
| 'km': 'khmer', 'ko': 'korean', 'ku': 'kurdish (kurmanji)', 'ky': 'kyrgyz', | |
| 'lo': 'lao', 'la': 'latin', 'lv': 'latvian', 'lt': 'lithuanian', | |
| 'lb': 'luxembourgish', 'mk': 'macedonian', 'mg': 'malagasy', 'ms': 'malay', | |
| 'ml': 'malayalam', 'mt': 'maltese', 'mi': 'maori', 'mr': 'marathi', | |
| 'mn': 'mongolian', 'my': 'myanmar (burmese)', 'ne': 'nepali', 'no': 'norwegian', | |
| 'or': 'odia', 'ps': 'pashto', 'fa': 'persian', 'pl': 'polish', | |
| 'pt': 'portuguese', 'pa': 'punjabi', 'ro': 'romanian', 'ru': 'russian', | |
| 'sm': 'samoan', 'gd': 'scots gaelic', 'sr': 'serbian', 'st': 'sesotho', | |
| 'sn': 'shona', 'sd': 'sindhi', 'si': 'sinhala', 'sk': 'slovak', | |
| 'sl': 'slovenian', 'so': 'somali', 'es': 'spanish', 'su': 'sundanese', | |
| 'sw': 'swahili', 'sv': 'swedish', 'tg': 'tajik', 'ta': 'tamil', | |
| 'te': 'telugu', 'th': 'thai', 'tr': 'turkish', 'uk': 'ukrainian', | |
| 'ur': 'urdu', 'ug': 'uyghur', 'uz': 'uzbek', 'vi': 'vietnamese', | |
| 'cy': 'welsh', 'xh': 'xhosa', 'yi': 'yiddish', 'yo': 'yoruba', | |
| 'zu': 'zulu' | |
| } | |
| async def detect_language(self, text: str) -> Tuple[str, float]: | |
| """Detect language with confidence score""" | |
| if not self.session: | |
| raise RuntimeError("Session not initialized. Use async with statement.") | |
| params = { | |
| 'client': 'gtx', | |
| 'dt': 'at', | |
| 'q': text[:500], # Limit text for detection | |
| 'sl': 'auto', | |
| 'tl': 'en' | |
| } | |
| try: | |
| async with self.session.get(self.base_url, params=params) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| if result and len(result) > 2 and result[2]: | |
| detected_lang = result[2] | |
| confidence = 0.9 if detected_lang != 'auto' else 0.5 | |
| return detected_lang, confidence | |
| except Exception as e: | |
| logger.warning(f"Language detection failed: {e}") | |
| return 'auto', 0.0 | |
| async def translate(self, text: str, src: str = 'auto', dest: str = 'en', max_retries: int = 3) -> TranslationResult: | |
| """Async translate with retry logic""" | |
| if not text.strip(): | |
| return TranslationResult("", src, 0.0) | |
| if not self.session: | |
| raise RuntimeError("Session not initialized. Use async with statement.") | |
| params = { | |
| 'client': 'gtx', | |
| 'dt': ['at', 't', 'bd', 'rm'], | |
| 'q': text, | |
| 'sl': src, | |
| 'tl': dest, | |
| 'ie': 'UTF-8', | |
| 'oe': 'UTF-8' | |
| } | |
| for attempt in range(max_retries): | |
| try: | |
| async with self.session.get(self.base_url, params=params) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| if result and len(result) > 0 and result[0]: | |
| # Extract translated text | |
| translated = ''.join([part[0] for part in result[0] if part[0]]) | |
| # Extract detected language | |
| detected_lang = result[2] if len(result) > 2 and result[2] else src | |
| return TranslationResult( | |
| text=translated, | |
| detected_lang=detected_lang, | |
| confidence=0.9 if detected_lang != 'auto' else 0.5 | |
| ) | |
| else: | |
| logger.warning(f"HTTP {response.status}: {await response.text()}") | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| print(f"[yellow]Retry {attempt + 1}/{max_retries}: {e}[/]") | |
| await asyncio.sleep(1 * (attempt + 1)) # Exponential backoff | |
| else: | |
| raise e | |
| return TranslationResult("Translation failed", src, 0.0) | |
| class TTSManager: | |
| """Thread-safe TTS manager""" | |
| @staticmethod | |
| def speak(text: str, rate: Optional[int] = None) -> None: | |
| """Text-to-speech with configurable rate""" | |
| try: | |
| engine = pyttsx3.init() | |
| # Auto-detect rate or use custom | |
| if rate is None: | |
| rate = 130 if " " in text else 100 | |
| engine.setProperty('rate', rate) | |
| voices = engine.getProperty('voices') | |
| if voices and len(voices) > 1: | |
| engine.setProperty('voice', voices[1].id) | |
| engine.setProperty('volume', 1.0) | |
| engine.say(text) | |
| engine.runAndWait() | |
| engine.stop() | |
| except Exception as e: | |
| logger.error(f"TTS Error: {e}") | |
| @staticmethod | |
| def speak_async(text: str, timeout: int = 100, rate: Optional[int] = None) -> None: | |
| """Run TTS in background thread with timeout""" | |
| def worker(): | |
| TTSManager.speak(text, rate) | |
| thread = Thread(target=worker, daemon=True) | |
| thread.start() | |
| thread.join(timeout=timeout) | |
| class HistoryManager: | |
| """Async history management""" | |
| def __init__(self, history_file: Optional[str] = None): | |
| self.history_file = Path(history_file or os.path.expanduser("~/.transpy_history.json")) | |
| self.max_entries = 100 | |
| async def save_entry(self, original: str, translated: str, src_lang: str, dest_lang: str, confidence: float = 0.0): | |
| """Save translation to history asynchronously""" | |
| entry = HistoryEntry( | |
| timestamp=datetime.now().isoformat(), | |
| original=original, | |
| translated=translated, | |
| from_lang=src_lang, | |
| to_lang=dest_lang, | |
| confidence=confidence | |
| ) | |
| try: | |
| # Load existing history | |
| history = await self._load_history() | |
| # Add new entry and keep last N | |
| history.append(entry.__dict__) | |
| history = history[-self.max_entries:] | |
| # Save back | |
| await self._save_history(history) | |
| except Exception as e: | |
| logger.warning(f"History save failed: {e}") | |
| async def _load_history(self) -> List[Dict[str, Any]]: | |
| """Load history from file""" | |
| if not self.history_file.exists(): | |
| return [] | |
| try: | |
| async with aiofiles.open(self.history_file, 'r', encoding='utf-8') as f: | |
| content = await f.read() | |
| return json.loads(content) | |
| except Exception as e: | |
| logger.warning(f"Failed to load history: {e}") | |
| return [] | |
| async def _save_history(self, history: List[Dict[str, Any]]): | |
| """Save history to file""" | |
| try: | |
| self.history_file.parent.mkdir(parents=True, exist_ok=True) | |
| async with aiofiles.open(self.history_file, 'w', encoding='utf-8') as f: | |
| await f.write(json.dumps(history, ensure_ascii=False, indent=2)) | |
| except Exception as e: | |
| logger.error(f"Failed to save history: {e}") | |
| async def show_history(self, limit: int = 10): | |
| """Display recent history""" | |
| history = await self._load_history() | |
| if not history: | |
| print("[yellow]No translation history found[/]") | |
| return | |
| table = Table(title=f"Recent Translations (last {min(limit, len(history))})") | |
| table.add_column("Time", style="dim") | |
| table.add_column("FromβTo", style="cyan") | |
| table.add_column("Original", style="white", max_width=30) | |
| table.add_column("Translated", style="green", max_width=30) | |
| table.add_column("Conf", style="yellow", justify="center") | |
| for entry in history[-limit:]: | |
| timestamp = entry['timestamp'][:19].replace('T', ' ') | |
| lang_pair = f"{entry['from_lang']}β{entry['to_lang']}" | |
| confidence = f"{entry.get('confidence', 0.0):.1f}" if entry.get('confidence') else "-" | |
| table.add_row( | |
| timestamp, | |
| lang_pair, | |
| entry['original'][:30] + "..." if len(entry['original']) > 30 else entry['original'], | |
| entry['translated'][:30] + "..." if len(entry['translated']) > 30 else entry['translated'], | |
| confidence | |
| ) | |
| console.print(table) | |
| async def clear_history(self): | |
| """Clear translation history""" | |
| if self.history_file.exists(): | |
| self.history_file.unlink() | |
| print("[green]Translation history cleared[/]") | |
| else: | |
| print("[yellow]No history file found[/]") | |
| class TranspyApp: | |
| """Main application class""" | |
| def __init__(self): | |
| self.translator = AsyncTranslator() | |
| self.tts = TTSManager() | |
| self.history = HistoryManager() | |
| def create_parser(self) -> argparse.ArgumentParser: | |
| """Create command line parser""" | |
| parser = argparse.ArgumentParser( | |
| description="π Enhanced Async Translation Tool with TTS and History", | |
| formatter_class=CustomRichHelpFormatter, | |
| prog='tr/translate', | |
| epilog=""" | |
| Examples: | |
| tr "Hello world" # en β id (default) | |
| tr "Selamat pagi" -r # id β en (reverse) | |
| tr "Hello" -f en -t es # en β es (custom) | |
| tr c --clipboard # from clipboard, copy result | |
| tr --history # show recent translations | |
| tr --detect "Bonjour le monde" # detect language only | |
| tr --server # run in server mode | |
| """ | |
| ) | |
| parser.add_argument("TEXT", help='Text to translate (use "c" for clipboard)', nargs='*') | |
| parser.add_argument('-f', '--from', dest='src', help='Source language (default: en)', default="en") | |
| parser.add_argument('-t', '--to', dest='dest', help='Target language (default: id)', default="id") | |
| parser.add_argument('-r', '--reverse', help='Reverse translation direction', action='store_true') | |
| parser.add_argument('-c', '--clipboard', help='Copy result to clipboard', action='store_true') | |
| parser.add_argument('-n', '--no-speak', help='Disable text-to-speech', action='store_true') | |
| parser.add_argument('-T', '--timeout', type=int, help='TTS timeout (seconds)', default=120) | |
| parser.add_argument('--rate', type=int, help='TTS speech rate (words per minute)') | |
| parser.add_argument('--history', action='store_true', help='Show translation history') | |
| parser.add_argument('--clear-history', action='store_true', help='Clear translation history') | |
| parser.add_argument('--detect', help='Detect language of given text') | |
| parser.add_argument('--langs', action='store_true', help='Show common language codes') | |
| parser.add_argument('--all-langs', action='store_true', help='Show all language codes') | |
| parser.add_argument('--no-history', action='store_true', help='Don\'t save to history') | |
| parser.add_argument('-s', '--server', action='store_true', help='Run in server mode') | |
| parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') | |
| return parser | |
| def show_languages(self, show_all: bool = False): | |
| """Display available languages""" | |
| langs = self.translator.languages | |
| if show_all: | |
| table = Table(title="π All Available Language Codes") | |
| else: | |
| table = Table(title="π Common Language Codes") | |
| # Show only first 25 for common view | |
| langs = dict(list(langs.items())[:25]) | |
| table.add_column("Code", style="yellow", justify="center") | |
| table.add_column("Language", style="cyan") | |
| for code, name in sorted(langs.items()): | |
| table.add_row(code, name.title()) | |
| if not show_all: | |
| table.add_row("...", f"... and {len(self.translator.languages)-25} more") | |
| table.add_row("", "[dim]Use --all-langs for complete list[/]") | |
| console.print(table) | |
| async def detect_language_cmd(self, text: str): | |
| """Handle language detection command""" | |
| async with self.translator: | |
| lang, confidence = await self.translator.detect_language(text) | |
| if lang and lang != 'auto': | |
| lang_name = self.translator.languages.get(lang, 'Unknown') | |
| confidence_str = f"{confidence:.2f}" if confidence > 0 else "unknown" | |
| panel = Panel.fit( | |
| f"[yellow]{lang}[/] ([cyan]{lang_name.title()}[/])\n" | |
| f"[dim]Confidence: {confidence_str}[/]", | |
| title="π Language Detection", | |
| border_style="cyan" | |
| ) | |
| console.print(panel) | |
| else: | |
| print("[red]Language detection failed[/]") | |
| async def translate_text(self, text: str, src: str, dest: str, args): | |
| """Main translation logic""" | |
| if not text.strip(): | |
| print("[yellow]No text to translate[/]") | |
| return | |
| async with self.translator: | |
| # Show translation info | |
| panel = Panel.fit( | |
| f"[cyan]From:[/] [yellow]{src}[/] ({self.translator.languages.get(src, 'auto')})\n" | |
| f"[cyan]To:[/] [green]{dest}[/] ({self.translator.languages.get(dest, dest)})\n" | |
| f"[cyan]Text:[/] [white]\"{text}\"[/]", | |
| title="π Translation Request", | |
| border_style="blue" | |
| ) | |
| console.print(panel) | |
| # TTS for original (if not reverse mode) | |
| if not args.reverse and not args.no_speak and not os.getenv('TRANS_SPEAK') == "0": | |
| console.print("[dim]π Speaking original text...[/]") | |
| self.tts.speak_async(text, args.timeout, args.rate) | |
| # Perform translation with progress indicator | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| transient=True | |
| ) as progress: | |
| task = progress.add_task("Translating...", total=None) | |
| try: | |
| result = await self.translator.translate(text, src, dest) | |
| except Exception as e: | |
| console.print(f"[red]Translation failed: {e}[/]") | |
| return | |
| # Display result | |
| result_panel = Panel.fit( | |
| f"[green bold]\"{result.text}\"[/]", | |
| title="β Translation Result", | |
| border_style="green" | |
| ) | |
| console.print(result_panel) | |
| # Show detected language if auto-detect was used | |
| if src == 'auto' and result.detected_lang != src: | |
| detected_name = self.translator.languages.get(result.detected_lang, result.detected_lang) | |
| print(f"[dim]π€ Detected language: {result.detected_lang} ({detected_name.title()}) - confidence: {result.confidence:.2f}[/]") | |
| # TTS for translated text (if reverse mode) | |
| if args.reverse and not args.no_speak and not os.getenv('TRANS_SPEAK') == "0": | |
| console.print("[dim]π Speaking translated text...[/]") | |
| self.tts.speak_async(result.text, args.timeout, args.rate) | |
| # Copy to clipboard | |
| if args.clipboard: | |
| clipboard.copy(result.text) | |
| console.print("[dim]π Copied to clipboard[/]") | |
| # Save to history | |
| if not args.no_history: | |
| await self.history.save_entry( | |
| text, result.text, src, dest, result.confidence | |
| ) | |
| async def run(self): | |
| """Main application entry point""" | |
| parser = self.create_parser() | |
| if len(sys.argv) == 1: | |
| parser.print_help() | |
| return | |
| try: | |
| args = parser.parse_args() | |
| # Configure logging level | |
| if args.verbose: | |
| logging.basicConfig(level=logging.INFO, force=True) | |
| # Handle utility commands | |
| if args.history: | |
| await self.history.show_history() | |
| return | |
| if args.clear_history: | |
| await self.history.clear_history() | |
| return | |
| if args.detect: | |
| await self.detect_language_cmd(args.detect) | |
| return | |
| if args.langs: | |
| self.show_languages(False) | |
| return | |
| if args.all_langs: | |
| self.show_languages(True) | |
| return | |
| # Main translation logic | |
| if not args.TEXT: | |
| print("[yellow]No text provided[/]") | |
| return | |
| text = " ".join(args.TEXT) | |
| # Handle clipboard input | |
| if args.TEXT and args.TEXT[0].lower() == 'c': | |
| text = clipboard.paste() | |
| console.print("[dim]π Using clipboard content[/]") | |
| # Determine languages | |
| if args.reverse: | |
| src_lang = args.dest | |
| dest_lang = args.src | |
| else: | |
| src_lang = args.src | |
| dest_lang = args.dest | |
| # Perform translation | |
| await self.translate_text(text, src_lang, dest_lang, args) | |
| except KeyboardInterrupt: | |
| console.print("\n[yellow]β οΈ Translation cancelled[/]") | |
| except Exception as e: | |
| if args.verbose if 'args' in locals() else False: | |
| console.print_exception() | |
| else: | |
| console.print(f"[red]Error: {e}[/]") | |
| def main(): | |
| """Entry point""" | |
| app = TranspyApp() | |
| try: | |
| asyncio.run(app.run()) | |
| except KeyboardInterrupt: | |
| print("\n[yellow]Goodbye! π[/]") | |
| except Exception as e: | |
| print(f"[red]Fatal error: {e}[/]") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment