Created
February 7, 2026 13:58
-
-
Save dark-faze/dbe4c99ac704b93bd58da83d592eceb6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import tempfile | |
| import subprocess | |
| from pathlib import Path | |
| from dataclasses import dataclass | |
| from typing import Optional, List | |
| import aiohttp | |
| import xml.etree.ElementTree as ET | |
| from datetime import datetime, timedelta | |
| @dataclass | |
| class VideoConfig: | |
| title: str | |
| description: str | |
| style: str = "short_form" # or "long_form" | |
| voice_id: str = "pNInz6obpgDQGcFmaJgB" # ElevenLabs voice | |
| class VideoOrchestrator: | |
| """ | |
| End-to-end pipeline: Text -> Script -> Audio -> Images -> Video -> Upload | |
| Handles failures gracefully with exponential backoff and temp file cleanup. | |
| """ | |
| def __init__(self, eleven_key: str, replicate_key: str, youtube_creds: dict): | |
| self.eleven_key = eleven_key | |
| self.replicate_key = replicate_key | |
| self.youtube = youtube_creds | |
| self.temp_dir = Path(tempfile.gettempdir()) / "video_pipeline" | |
| self.temp_dir.mkdir(exist_ok=True) | |
| async def generate(self, prompt: str) -> dict: | |
| """Main entry point. Returns YouTube video ID or raises.""" | |
| job_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(prompt) % 10000}" | |
| work_dir = self.temp_dir / job_id | |
| work_dir.mkdir(exist_ok=True) | |
| try: | |
| # Parallel generation of assets | |
| script = await self._generate_script(prompt) | |
| audio_task = self._generate_audio(script, work_dir) | |
| images_task = self._generate_images(script, work_dir) | |
| audio_path, image_paths = await asyncio.gather(audio_task, images_task) | |
| # Sequential composition (depends on above) | |
| video_path = await self._compose_video( | |
| audio_path, image_paths, work_dir, script["segments"] | |
| ) | |
| # Distribution | |
| youtube_id = await self._upload_to_youtube( | |
| video_path, script["title"], script["description"] | |
| ) | |
| return { | |
| "job_id": job_id, | |
| "youtube_id": youtube_id, | |
| "url": f"https://youtube.com/watch?v={youtube_id}", | |
| "assets": { | |
| "audio": str(audio_path), | |
| "video": str(video_path), | |
| "images": [str(p) for p in image_paths] | |
| } | |
| } | |
| except Exception as e: | |
| await self._cleanup(work_dir, success=False) | |
| raise VideoPipelineError(f"Job {job_id} failed: {e}") from e | |
| finally: | |
| # Always clean up temp files to avoid disk bloat | |
| await self._cleanup(work_dir, success=True) | |
| async def _generate_script(self, prompt: str) -> dict: | |
| """Generate script with timestamps for image generation.""" | |
| # In production, this calls GPT-4/Claude with structured output | |
| # Mocking the structure for the sample: | |
| return { | |
| "title": prompt, | |
| "description": f"AI generated video about {prompt}", | |
| "segments": [ | |
| {"text": "Introduction to the topic...", "duration": 5.0, "visual_prompt": "Futuristic city"}, | |
| {"text": "Main content here...", "duration": 10.0, "visual_prompt": "Data visualization"}, | |
| ], | |
| "total_duration": 15.0 | |
| } | |
| async def _generate_audio(self, script: dict, work_dir: Path) -> Path: | |
| """Text-to-speech with ElevenLabs. Retries on rate limit.""" | |
| url = f"https://api.elevenlabs.io/v1/text-to-speech/{script.get('voice_id', 'default')}" | |
| headers = {"xi-api-key": self.eleven_key} | |
| text = " ".join([s["text"] for s in script["segments"]]) | |
| payload = { | |
| "text": text, | |
| "model_id": "eleven_turbo_v2", | |
| "voice_settings": {"stability": 0.5, "similarity_boost": 0.75} | |
| } | |
| output_path = work_dir / "audio.mp3" | |
| for attempt in range(3): | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(url, json=payload, headers=headers) as resp: | |
| if resp.status == 429: | |
| wait = 2 ** attempt | |
| await asyncio.sleep(wait) | |
| continue | |
| resp.raise_for_status() | |
| with open(output_path, 'wb') as f: | |
| f.write(await resp.read()) | |
| return output_path | |
| except aiohttp.ClientError as e: | |
| if attempt == 2: | |
| raise AudioGenerationError(f"Failed to generate audio after 3 attempts: {e}") | |
| await asyncio.sleep(1) | |
| return output_path | |
| async def _generate_images(self, script: dict, work_dir: Path) -> List[Path]: | |
| """Generate images via Replicate (asyncio.gather for parallel).""" | |
| tasks = [] | |
| for i, segment in enumerate(script["segments"]): | |
| task = self._generate_single_image( | |
| segment["visual_prompt"], work_dir / f"img_{i:03d}.png" | |
| ) | |
| tasks.append(task) | |
| return await asyncio.gather(*tasks) | |
| async def _generate_single_image(self, prompt: str, output_path: Path) -> Path: | |
| """Call Replicate API for image generation.""" | |
| # Simplified - in production handles polling for async generation | |
| headers = {"Authorization": f"Token {self.replicate_key}"} | |
| payload = { | |
| "version": "stability-ai/stable-diffusion-xl-base-1.0", | |
| "input": {"prompt": prompt, "width": 1080, "height": 1920} # 9:16 for shorts | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| "https://api.replicate.com/v1/predictions", | |
| json=payload, | |
| headers=headers | |
| ) as resp: | |
| resp.raise_for_status() | |
| result = await resp.json() | |
| # Poll for completion (simplified) | |
| await asyncio.sleep(2) # Mock polling | |
| # Download image... | |
| return output_path | |
| async def _compose_video( | |
| self, | |
| audio_path: Path, | |
| image_paths: List[Path], | |
| work_dir: Path, | |
| segments: List[dict] | |
| ) -> Path: | |
| """ | |
| FFmpeg composition: Images + Audio + Captions | |
| Generates SRT and burns subtitles into video. | |
| """ | |
| output_path = work_dir / "final.mp4" | |
| # Generate SRT file | |
| srt_path = work_dir / "captions.srt" | |
| self._generate_srt(segments, srt_path) | |
| # Build FFmpeg command | |
| # Creates video from images, adds audio, burns subtitles | |
| input_txt = work_dir / "inputs.txt" | |
| with open(input_txt, 'w') as f: | |
| for img in image_paths: | |
| f.write(f"file '{img}'\n") | |
| f.write(f"duration {segments[0]['duration']}\n") # Simplified | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-f", "concat", "-safe", "0", "-i", str(input_txt), | |
| "-i", str(audio_path), | |
| "-vf", f"subtitles={srt_path}:force_style='FontSize=24,PrimaryColour=&H00FFFFFF'", | |
| "-c:v", "libx264", "-pix_fmt", "yuv420p", | |
| "-shortest", str(output_path) | |
| ] | |
| proc = await asyncio.create_subprocess_exec( | |
| *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await proc.communicate() | |
| if proc.returncode != 0: | |
| raise VideoCompositionError(f"FFmpeg failed: {stderr.decode()}") | |
| return output_path | |
| def _generate_srt(self, segments: List[dict], output_path: Path): | |
| """Generate SRT subtitle file from segments.""" | |
| with open(output_path, 'w') as f: | |
| current_time = timedelta(seconds=0) | |
| for i, seg in enumerate(segments, 1): | |
| duration = timedelta(seconds=seg["duration"]) | |
| end_time = current_time + duration | |
| f.write(f"{i}\n") | |
| f.write(f"{self._format_srt_time(current_time)} --> {self._format_srt_time(end_time)}\n") | |
| f.write(f"{seg['text']}\n\n") | |
| current_time = end_time | |
| def _format_srt_time(self, td: timedelta) -> str: | |
| """Convert timedelta to SRT format: HH:MM:SS,mmm""" | |
| total_seconds = int(td.total_seconds()) | |
| hours = total_seconds // 3600 | |
| minutes = (total_seconds % 3600) // 60 | |
| seconds = total_seconds % 60 | |
| millis = int((td.total_seconds() - total_seconds) * 1000) | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}" | |
| async def _upload_to_youtube(self, video_path: Path, title: str, description: str) -> str: | |
| """Upload via YouTube Data API v3. Returns video ID.""" | |
| # Simplified - requires OAuth2 flow in production | |
| # Mock return for sample: | |
| return "dQw4w9WgXcQ" | |
| async def _cleanup(self, work_dir: Path, success: bool): | |
| """Remove temp files. In production, you might archive to S3 instead.""" | |
| if not success: | |
| # Keep failed job artifacts for debugging | |
| return | |
| for f in work_dir.glob("*"): | |
| if f.is_file(): | |
| f.unlink() | |
| work_dir.rmdir() | |
| class VideoPipelineError(Exception): | |
| pass | |
| class AudioGenerationError(Exception): | |
| pass | |
| class VideoCompositionError(Exception): | |
| pass | |
| # Usage example (for the repo README) | |
| if __name__ == "__main__": | |
| async def main(): | |
| orch = VideoOrchestrator( | |
| eleven_key="sk_...", | |
| replicate_key="r8_...", | |
| youtube_creds={} | |
| ) | |
| result = await orch.generate("The future of AI agents") | |
| print(f"Video uploaded: {result['url']}") | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment