Skip to content

Instantly share code, notes, and snippets.

@dark-faze
Created February 7, 2026 13:58
Show Gist options
  • Select an option

  • Save dark-faze/dbe4c99ac704b93bd58da83d592eceb6 to your computer and use it in GitHub Desktop.

Select an option

Save dark-faze/dbe4c99ac704b93bd58da83d592eceb6 to your computer and use it in GitHub Desktop.
import asyncio
import tempfile
import subprocess
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, List
import aiohttp
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
@dataclass
class VideoConfig:
title: str
description: str
style: str = "short_form" # or "long_form"
voice_id: str = "pNInz6obpgDQGcFmaJgB" # ElevenLabs voice
class VideoOrchestrator:
"""
End-to-end pipeline: Text -> Script -> Audio -> Images -> Video -> Upload
Handles failures gracefully with exponential backoff and temp file cleanup.
"""
def __init__(self, eleven_key: str, replicate_key: str, youtube_creds: dict):
self.eleven_key = eleven_key
self.replicate_key = replicate_key
self.youtube = youtube_creds
self.temp_dir = Path(tempfile.gettempdir()) / "video_pipeline"
self.temp_dir.mkdir(exist_ok=True)
async def generate(self, prompt: str) -> dict:
"""Main entry point. Returns YouTube video ID or raises."""
job_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(prompt) % 10000}"
work_dir = self.temp_dir / job_id
work_dir.mkdir(exist_ok=True)
try:
# Parallel generation of assets
script = await self._generate_script(prompt)
audio_task = self._generate_audio(script, work_dir)
images_task = self._generate_images(script, work_dir)
audio_path, image_paths = await asyncio.gather(audio_task, images_task)
# Sequential composition (depends on above)
video_path = await self._compose_video(
audio_path, image_paths, work_dir, script["segments"]
)
# Distribution
youtube_id = await self._upload_to_youtube(
video_path, script["title"], script["description"]
)
return {
"job_id": job_id,
"youtube_id": youtube_id,
"url": f"https://youtube.com/watch?v={youtube_id}",
"assets": {
"audio": str(audio_path),
"video": str(video_path),
"images": [str(p) for p in image_paths]
}
}
except Exception as e:
await self._cleanup(work_dir, success=False)
raise VideoPipelineError(f"Job {job_id} failed: {e}") from e
finally:
# Always clean up temp files to avoid disk bloat
await self._cleanup(work_dir, success=True)
async def _generate_script(self, prompt: str) -> dict:
"""Generate script with timestamps for image generation."""
# In production, this calls GPT-4/Claude with structured output
# Mocking the structure for the sample:
return {
"title": prompt,
"description": f"AI generated video about {prompt}",
"segments": [
{"text": "Introduction to the topic...", "duration": 5.0, "visual_prompt": "Futuristic city"},
{"text": "Main content here...", "duration": 10.0, "visual_prompt": "Data visualization"},
],
"total_duration": 15.0
}
async def _generate_audio(self, script: dict, work_dir: Path) -> Path:
"""Text-to-speech with ElevenLabs. Retries on rate limit."""
url = f"https://api.elevenlabs.io/v1/text-to-speech/{script.get('voice_id', 'default')}"
headers = {"xi-api-key": self.eleven_key}
text = " ".join([s["text"] for s in script["segments"]])
payload = {
"text": text,
"model_id": "eleven_turbo_v2",
"voice_settings": {"stability": 0.5, "similarity_boost": 0.75}
}
output_path = work_dir / "audio.mp3"
for attempt in range(3):
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers) as resp:
if resp.status == 429:
wait = 2 ** attempt
await asyncio.sleep(wait)
continue
resp.raise_for_status()
with open(output_path, 'wb') as f:
f.write(await resp.read())
return output_path
except aiohttp.ClientError as e:
if attempt == 2:
raise AudioGenerationError(f"Failed to generate audio after 3 attempts: {e}")
await asyncio.sleep(1)
return output_path
async def _generate_images(self, script: dict, work_dir: Path) -> List[Path]:
"""Generate images via Replicate (asyncio.gather for parallel)."""
tasks = []
for i, segment in enumerate(script["segments"]):
task = self._generate_single_image(
segment["visual_prompt"], work_dir / f"img_{i:03d}.png"
)
tasks.append(task)
return await asyncio.gather(*tasks)
async def _generate_single_image(self, prompt: str, output_path: Path) -> Path:
"""Call Replicate API for image generation."""
# Simplified - in production handles polling for async generation
headers = {"Authorization": f"Token {self.replicate_key}"}
payload = {
"version": "stability-ai/stable-diffusion-xl-base-1.0",
"input": {"prompt": prompt, "width": 1080, "height": 1920} # 9:16 for shorts
}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.replicate.com/v1/predictions",
json=payload,
headers=headers
) as resp:
resp.raise_for_status()
result = await resp.json()
# Poll for completion (simplified)
await asyncio.sleep(2) # Mock polling
# Download image...
return output_path
async def _compose_video(
self,
audio_path: Path,
image_paths: List[Path],
work_dir: Path,
segments: List[dict]
) -> Path:
"""
FFmpeg composition: Images + Audio + Captions
Generates SRT and burns subtitles into video.
"""
output_path = work_dir / "final.mp4"
# Generate SRT file
srt_path = work_dir / "captions.srt"
self._generate_srt(segments, srt_path)
# Build FFmpeg command
# Creates video from images, adds audio, burns subtitles
input_txt = work_dir / "inputs.txt"
with open(input_txt, 'w') as f:
for img in image_paths:
f.write(f"file '{img}'\n")
f.write(f"duration {segments[0]['duration']}\n") # Simplified
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0", "-i", str(input_txt),
"-i", str(audio_path),
"-vf", f"subtitles={srt_path}:force_style='FontSize=24,PrimaryColour=&H00FFFFFF'",
"-c:v", "libx264", "-pix_fmt", "yuv420p",
"-shortest", str(output_path)
]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise VideoCompositionError(f"FFmpeg failed: {stderr.decode()}")
return output_path
def _generate_srt(self, segments: List[dict], output_path: Path):
"""Generate SRT subtitle file from segments."""
with open(output_path, 'w') as f:
current_time = timedelta(seconds=0)
for i, seg in enumerate(segments, 1):
duration = timedelta(seconds=seg["duration"])
end_time = current_time + duration
f.write(f"{i}\n")
f.write(f"{self._format_srt_time(current_time)} --> {self._format_srt_time(end_time)}\n")
f.write(f"{seg['text']}\n\n")
current_time = end_time
def _format_srt_time(self, td: timedelta) -> str:
"""Convert timedelta to SRT format: HH:MM:SS,mmm"""
total_seconds = int(td.total_seconds())
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
millis = int((td.total_seconds() - total_seconds) * 1000)
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}"
async def _upload_to_youtube(self, video_path: Path, title: str, description: str) -> str:
"""Upload via YouTube Data API v3. Returns video ID."""
# Simplified - requires OAuth2 flow in production
# Mock return for sample:
return "dQw4w9WgXcQ"
async def _cleanup(self, work_dir: Path, success: bool):
"""Remove temp files. In production, you might archive to S3 instead."""
if not success:
# Keep failed job artifacts for debugging
return
for f in work_dir.glob("*"):
if f.is_file():
f.unlink()
work_dir.rmdir()
class VideoPipelineError(Exception):
pass
class AudioGenerationError(Exception):
pass
class VideoCompositionError(Exception):
pass
# Usage example (for the repo README)
if __name__ == "__main__":
async def main():
orch = VideoOrchestrator(
eleven_key="sk_...",
replicate_key="r8_...",
youtube_creds={}
)
result = await orch.generate("The future of AI agents")
print(f"Video uploaded: {result['url']}")
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment