Created
December 19, 2025 04:31
-
-
Save c0ze/34032f3fef3d6b5625945aec76ccc429 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| TTS API Server - Standalone Edge TTS service for Docker deployment. | |
| This is a minimal version without static file serving. | |
| """ | |
| import asyncio | |
| from aiohttp import web | |
| import edge_tts | |
| import json | |
| class TTSHandler: | |
| def __init__(self): | |
| pass | |
| def prep_text(self, text_in): | |
| # Clean and prepare text similar to epub2tts | |
| text = ( | |
| text_in.replace("--", ", ") | |
| .replace("—", ", ") | |
| .replace(";", ", ") | |
| .replace(":", ", ") | |
| .replace("''", ", ") | |
| .replace("'", "'") | |
| .replace('"', '"') | |
| .replace('"', '"') | |
| .replace("◇", "") | |
| .replace(" . . . ", ", ") | |
| .replace("... ", ", ") | |
| .replace("«", " ") | |
| .replace("»", " ") | |
| .replace("[", "") | |
| .replace("]", "") | |
| .replace("&", " and ") | |
| .replace(" GNU ", " new ") | |
| .replace("\n", " \n") | |
| .replace("*", " ") | |
| .strip() | |
| ) | |
| return text | |
| async def stream_audio(self, text, voice, rate='+0%', pitch='+0Hz'): | |
| """ | |
| Generator that yields audio chunks. | |
| """ | |
| cleaned_text = self.prep_text(text) | |
| # Debug: Log request details | |
| print(f"\n{'='*60}") | |
| print(f"[EDGE TTS REQUEST]") | |
| print(f" Voice: {voice}") | |
| print(f" Rate: {rate}, Pitch: {pitch}") | |
| print(f" Text length: {len(text)} chars (cleaned: {len(cleaned_text)} chars)") | |
| print(f" Text preview: {text[:50]}{'...' if len(text) > 50 else ''}") | |
| print(f" Cleaned preview: {cleaned_text[:50]}{'...' if len(cleaned_text) > 50 else ''}") | |
| print(f"{'='*60}") | |
| communicate = edge_tts.Communicate(cleaned_text, voice, rate=rate, pitch=pitch) | |
| chunk_count = 0 | |
| total_bytes = 0 | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| chunk_count += 1 | |
| chunk_size = len(chunk["data"]) | |
| total_bytes += chunk_size | |
| if chunk_count <= 3 or chunk_count % 50 == 0: | |
| print(f" [EDGE TTS RESPONSE] Chunk #{chunk_count}: {chunk_size} bytes (total: {total_bytes} bytes)") | |
| yield chunk["data"] | |
| elif chunk["type"] == "WordBoundary": | |
| pass | |
| print(f"[EDGE TTS COMPLETE] Total chunks: {chunk_count}, Total bytes: {total_bytes}") | |
| print(f"{'='*60}\n") | |
| # CORS handling | |
| async def handle_cors(request, response): | |
| response.headers['Access-Control-Allow-Origin'] = '*' | |
| response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS' | |
| response.headers['Access-Control-Allow-Headers'] = 'Content-Type' | |
| return response | |
| async def get_voices(request): | |
| try: | |
| voices = await edge_tts.list_voices() | |
| response = web.json_response(voices) | |
| await handle_cors(request, response) | |
| return response | |
| except Exception as e: | |
| print("Error fetching voices:", e) | |
| return web.json_response([], status=500) | |
| async def tts_stream(request): | |
| ws = web.WebSocketResponse() | |
| await ws.prepare(request) | |
| handler = TTSHandler() | |
| print("Client connected to TTS stream") | |
| try: | |
| async for msg in ws: | |
| if msg.type == web.WSMsgType.TEXT: | |
| data = json.loads(msg.data) | |
| text = data.get('text') | |
| voice = data.get('voice') | |
| rate = data.get('rate', '+0%') | |
| pitch = data.get('pitch', '+0Hz') | |
| print(f"Speaking: {voice} - {text[:20]}...") | |
| try: | |
| async for audio_chunk in handler.stream_audio(text, voice, rate, pitch): | |
| await ws.send_bytes(audio_chunk) | |
| # Signal end of stream | |
| await ws.send_str(json.dumps({"type": "complete"})) | |
| except Exception as stream_err: | |
| print(f"Error during stream: {stream_err}") | |
| await ws.send_str(json.dumps({"type": "error", "message": str(stream_err)})) | |
| elif msg.type == web.WSMsgType.ERROR: | |
| print('ws connection closed with exception %s', ws.exception()) | |
| except Exception as e: | |
| print("WebSocket Error:", e) | |
| finally: | |
| print('Client disconnected') | |
| return ws | |
| async def handle_options(request): | |
| response = web.Response() | |
| await handle_cors(request, response) | |
| return response | |
| async def health_check(request): | |
| return web.json_response({"status": "ok", "service": "edge-tts-api"}) | |
| app = web.Application() | |
| app.add_routes([ | |
| web.get('/health', health_check), | |
| web.get('/voices', get_voices), | |
| web.get('/tts', tts_stream), | |
| web.options('/voices', handle_options), | |
| ]) | |
| if __name__ == '__main__': | |
| import os | |
| port = int(os.environ.get('TTS_PORT', 5050)) | |
| print(f"Starting Edge TTS API on port {port}...") | |
| web.run_app(app, host='0.0.0.0', port=port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment