Skip to content

Instantly share code, notes, and snippets.

@yigitkonur
Last active December 8, 2025 21:45
Show Gist options
  • Select an option

  • Save yigitkonur/2bbbde91712a4f059aaab9ef68564be6 to your computer and use it in GitHub Desktop.

Select an option

Save yigitkonur/2bbbde91712a4f059aaab9ef68564be6 to your computer and use it in GitHub Desktop.
Raycast script: YouTube Transcribe with ElevenLabs Scribe - downloads audio, transcribes with diarization, generates SRT subtitles and speaker-formatted output
#!/usr/bin/env python3
# Required parameters:
# @raycast.schemaVersion 1
# @raycast.title YouTube Transcribe with Diarization
# @raycast.mode silent
# Optional parameters:
# @raycast.icon 🎙️
# @raycast.packageName Media Tools
# @raycast.argument1 { "type": "text", "placeholder": "YouTube URLs (comma or newline separated)" }
# @raycast.needsConfirmation false
# Documentation:
# @raycast.author Yigit Konur
# @raycast.authorURL https://github.com/yigitkonur
# @raycast.description Downloads YouTube audio, transcribes with ElevenLabs Scribe (diarization + auto language), generates SRT/MD and copies transcript. Runs in background. Supports multiple URLs in parallel. Uses MD cache.
import sys
import os
import re
import json
import tempfile
import subprocess
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# Load environment variables from .env file
SCRIPT_DIR = Path(__file__).parent
ENV_FILE = SCRIPT_DIR / ".env"
def load_env():
"""Load environment variables from .env file."""
if ENV_FILE.exists():
with open(ENV_FILE) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
# Don't override existing env vars
if key and key not in os.environ:
os.environ[key] = value
load_env()
# Constants
SRT_OUTPUT_DIR = SCRIPT_DIR / "srt"
MD_OUTPUT_DIR = SCRIPT_DIR / "md"
YT_DLP_PATH = "/opt/homebrew/bin/yt-dlp"
SOUND_SUCCESS = "/System/Library/Sounds/Glass.aiff"
SOUND_ERROR = "/System/Library/Sounds/Basso.aiff"
# Parallel processing settings
MAX_PARALLEL_DOWNLOADS = 3
MAX_PARALLEL_TRANSCRIPTIONS = 2 # ElevenLabs API rate limit consideration
def play_sound(success: bool = True) -> None:
"""Play notification sound on macOS."""
sound_file = SOUND_SUCCESS if success else SOUND_ERROR
try:
subprocess.run(["afplay", sound_file], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# SRT formatting constants (Netflix/BBC best practices)
MAX_CHARS_PER_LINE = 42
MAX_DURATION_SECONDS = 7.0
MIN_DURATION_SECONDS = 0.833 # ~5/6 second
MAX_READING_SPEED_CPS = 21 # chars per second
PAUSE_THRESHOLD_SECONDS = 0.4 # pause that triggers new block
# Thread-safe print lock
print_lock = threading.Lock()
def safe_print(*args, **kwargs):
"""Thread-safe print function."""
with print_lock:
print(*args, **kwargs)
@dataclass
class VideoResult:
"""Holds all results for a single video processing."""
url: str
success: bool = False
error: Optional[str] = None
video_info: Optional[Dict] = None
transcript_data: Optional[Dict] = None
srt_path: Optional[Path] = None
md_path: Optional[Path] = None
diarized_text: str = ""
md_content: str = ""
@dataclass
class Word:
"""Represents a single word with timing and speaker info."""
text: str
start: float
end: float
speaker: Optional[str] = None
confidence: float = 1.0
@dataclass
class SubtitleBlock:
"""Represents a single SRT subtitle block."""
index: int
start: float
end: float
speaker: Optional[str]
text: str
def to_srt(self, speaker_map: Optional[Dict[str, str]] = None) -> str:
"""Convert block to SRT format string."""
start_ts = format_srt_timestamp(self.start)
end_ts = format_srt_timestamp(self.end)
# Add speaker prefix if available (use friendly name from map)
if self.speaker:
friendly_name = speaker_map.get(self.speaker, self.speaker) if speaker_map else self.speaker
display_text = f"[{friendly_name}]\n{self.text}"
else:
display_text = self.text
return f"{self.index}\n{start_ts} --> {end_ts}\n{display_text}\n"
def format_srt_timestamp(seconds: float) -> str:
"""Convert seconds to SRT timestamp format (HH:MM:SS,mmm)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def sanitize_filename(name: str) -> str:
"""Sanitize string for use as filename."""
# Remove invalid characters
sanitized = re.sub(r'[<>:"/\\|?*]', '', name)
# Replace spaces with underscores
sanitized = re.sub(r'\s+', '_', sanitized)
# Limit length
return sanitized[:100]
def is_sentence_end(text: str) -> bool:
"""Check if text ends with sentence-ending punctuation."""
return bool(re.search(r'[.!?][\'"\u00BB\u201C\u201D]?\s*$', text.strip()))
def download_youtube_audio(url: str, output_dir: Path) -> Tuple[Path, Dict]:
"""
Download audio from YouTube URL using yt-dlp.
Returns:
Tuple of (audio_file_path, video_metadata)
"""
# Verify yt-dlp exists
if not Path(YT_DLP_PATH).exists():
raise FileNotFoundError(f"yt-dlp not found at {YT_DLP_PATH}")
# Create temp filename pattern
output_template = str(output_dir / "%(id)s.%(ext)s")
# First, extract info without downloading
info_cmd = [
YT_DLP_PATH,
"--dump-json",
"--no-playlist",
url
]
try:
result = subprocess.run(
info_cmd,
capture_output=True,
text=True,
check=True,
timeout=60
)
video_info = json.loads(result.stdout)
video_id = video_info.get('id', 'unknown')
video_title = video_info.get('title', 'Unknown Title')
duration = video_info.get('duration', 0)
except subprocess.TimeoutExpired:
raise RuntimeError("Timeout while fetching video info")
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to get video info: {e.stderr}")
except json.JSONDecodeError:
raise RuntimeError("Failed to parse video info")
# Download audio
download_cmd = [
YT_DLP_PATH,
"--format", "bestaudio/best",
"--extract-audio",
"--audio-format", "mp3",
"--audio-quality", "192K",
"--no-playlist",
"--output", output_template,
url
]
try:
subprocess.run(
download_cmd,
capture_output=True,
text=True,
check=True,
timeout=300
)
except subprocess.TimeoutExpired:
raise RuntimeError("Download timeout (>5 minutes)")
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Download failed: {e.stderr}")
# Find the downloaded file
audio_file = output_dir / f"{video_id}.mp3"
if not audio_file.exists():
# Try to find any mp3 file
mp3_files = list(output_dir.glob("*.mp3"))
if mp3_files:
audio_file = mp3_files[0]
else:
raise FileNotFoundError("Downloaded audio file not found")
return audio_file, {
'id': video_id,
'title': video_title,
'duration': duration
}
def transcribe_with_elevenlabs(audio_path: Path) -> Dict:
"""
Transcribe audio using ElevenLabs Scribe API with diarization.
Returns:
Transcript data with words, segments, speakers
"""
try:
from elevenlabs import ElevenLabs
except ImportError:
raise ImportError("elevenlabs")
# Get API key
api_key = os.environ.get("ELEVENLABS_API_KEY")
if not api_key:
raise ValueError(
"ELEVENLABS_API_KEY environment variable not set.\n"
"Get your API key from: https://elevenlabs.io/app/settings/api-keys\n"
"Set it with: export ELEVENLABS_API_KEY='your_key_here'"
)
client = ElevenLabs(api_key=api_key)
# Read audio file
with open(audio_path, 'rb') as f:
audio_data = f.read()
# Call Scribe API
try:
transcript = client.speech_to_text.convert(
file=audio_data,
model_id="scribe_v1", # Options: scribe_v1, scribe_v1_experimental, scribe_v2
diarize=True, # Enable speaker diarization
timestamps_granularity="word", # Word-level timestamps
tag_audio_events=False, # Disable (laughter), (music), etc.
# language_code=None means auto-detect
)
except Exception as e:
error_str = str(e)
if "rate" in error_str.lower():
raise RuntimeError("Rate limited by ElevenLabs API. Please wait and try again.")
raise RuntimeError(f"Transcription failed: {e}")
# Parse response - ElevenLabs returns Pydantic models
result = {
'transcript': getattr(transcript, 'text', ''),
'language': getattr(transcript, 'language_code', 'unknown'),
'words': [],
'speakers': set()
}
# Extract words with timing and speaker info
words_list = getattr(transcript, 'words', None)
if words_list:
for w in words_list:
# Clean text - remove extra whitespace
text = getattr(w, 'text', '').strip()
if not text:
continue
word = Word(
text=text,
start=float(getattr(w, 'start', 0)),
end=float(getattr(w, 'end', 0)),
speaker=getattr(w, 'speaker_id', None), # ElevenLabs uses speaker_id
confidence=float(getattr(w, 'confidence', 1.0))
)
result['words'].append(word)
if word.speaker:
result['speakers'].add(word.speaker)
detected_lang = result['language']
num_speakers = len(result['speakers'])
num_words = len(result['words'])
return result
def build_subtitle_blocks(words: List[Word]) -> List[SubtitleBlock]:
"""
Build SRT subtitle blocks from word-level data.
Rules (Netflix/BBC best practices):
1. Max 42 characters per line
2. Duration: 0.833s - 7s per block
3. Break on speaker change (immediate new block)
4. Break on sentence end (., !, ?)
5. Break on pause > 0.4s
6. Max reading speed: 21 chars/sec
"""
if not words:
return []
blocks: List[SubtitleBlock] = []
current_words: List[Word] = []
current_speaker: Optional[str] = None
block_index = 1
def flush_block():
"""Create a subtitle block from accumulated words."""
nonlocal block_index, current_words, current_speaker
if not current_words:
return
text = ' '.join(w.text for w in current_words)
start_time = current_words[0].start
end_time = current_words[-1].end
# Ensure minimum duration
if end_time - start_time < MIN_DURATION_SECONDS:
end_time = start_time + MIN_DURATION_SECONDS
# Line wrapping for readability (max 42 chars per line, 2 lines max)
wrapped_text = wrap_subtitle_text(text)
blocks.append(SubtitleBlock(
index=block_index,
start=start_time,
end=end_time,
speaker=current_speaker,
text=wrapped_text
))
block_index += 1
current_words = []
for i, word in enumerate(words):
# Check for speaker change - immediate new block
if word.speaker != current_speaker and current_words:
flush_block()
current_speaker = word.speaker
# Set speaker for first word
if not current_words:
current_speaker = word.speaker
# Check for pause between words
if current_words:
last_word_end = current_words[-1].end
pause = word.start - last_word_end
if pause > PAUSE_THRESHOLD_SECONDS:
flush_block()
current_words.append(word)
# Calculate current block properties
current_text = ' '.join(w.text for w in current_words)
current_duration = current_words[-1].end - current_words[0].start
# Check if sentence ended
if is_sentence_end(word.text):
flush_block()
continue
# Check duration limit
if current_duration >= MAX_DURATION_SECONDS:
flush_block()
continue
# Check character limit (with buffer for next word)
if len(current_text) >= MAX_CHARS_PER_LINE * 2 - 10: # 2 lines, with buffer
flush_block()
continue
# Check reading speed
if current_duration > 0:
reading_speed = len(current_text) / current_duration
if reading_speed > MAX_READING_SPEED_CPS and len(current_words) > 3:
# Split to maintain readable speed
flush_block()
# Flush remaining words
flush_block()
return blocks
def wrap_subtitle_text(text: str, max_line_length: int = MAX_CHARS_PER_LINE) -> str:
"""
Wrap subtitle text for readability.
Rules:
- Max 42 characters per line
- Max 2 lines per block
- Break at natural points (spaces, after punctuation)
- First line can be longer than second
"""
if len(text) <= max_line_length:
return text
words = text.split()
lines = []
current_line = []
current_length = 0
for word in words:
word_len = len(word) + (1 if current_line else 0) # +1 for space
if current_length + word_len <= max_line_length:
current_line.append(word)
current_length += word_len
else:
if current_line:
lines.append(' '.join(current_line))
current_line = [word]
current_length = len(word)
# Max 2 lines
if len(lines) >= 2:
break
if current_line and len(lines) < 2:
lines.append(' '.join(current_line))
return '\n'.join(lines)
def generate_srt_content(blocks: List[SubtitleBlock], speakers: set) -> str:
"""Generate complete SRT file content with friendly speaker names."""
# Create speaker map: speaker_0 -> Speaker 1, etc.
speaker_map = {}
for i, speaker_id in enumerate(sorted(speakers), 1):
speaker_map[speaker_id] = f"Speaker {i}"
return '\n'.join(block.to_srt(speaker_map) for block in blocks)
def generate_diarized_text(words: List[Word]) -> str:
"""
Generate speaker-formatted output.
Format:
Speaker 1: concatenated text until speaker change
Speaker 2: concatenated text until speaker change
"""
if not words:
return ""
output_lines = []
current_speaker: Optional[str] = None
current_text: List[str] = []
# Map speaker IDs to friendly names
speaker_map: Dict[str, str] = {}
speaker_counter = 1
def get_speaker_name(speaker_id: Optional[str]) -> str:
nonlocal speaker_counter
if not speaker_id:
return "Unknown"
if speaker_id not in speaker_map:
speaker_map[speaker_id] = f"Speaker {speaker_counter}"
speaker_counter += 1
return speaker_map[speaker_id]
def flush_speaker():
if current_text and current_speaker is not None:
name = get_speaker_name(current_speaker)
text = ' '.join(current_text)
output_lines.append(f"{name}: {text}")
for word in words:
if word.speaker != current_speaker:
flush_speaker()
current_speaker = word.speaker
current_text = []
current_text.append(word.text)
# Flush last speaker
flush_speaker()
return '\n\n'.join(output_lines)
def copy_to_clipboard(text: str) -> bool:
"""Copy text to clipboard using pbcopy."""
try:
subprocess.run(
['pbcopy'],
input=text.encode('utf-8'),
check=True
)
return True
except subprocess.CalledProcessError:
return False
def extract_video_id(url: str) -> Optional[str]:
"""Extract YouTube video ID from URL."""
patterns = [
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/)([a-zA-Z0-9_-]{11})',
r'[?&]v=([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def find_cached_result(video_id: str) -> Optional[Tuple[Path, str, str]]:
"""
Check if we have a cached MD file for this video ID.
Returns (md_path, md_content, diarized_text) if found, None otherwise.
"""
if not MD_OUTPUT_DIR.exists():
return None
# Search for MD files containing this video ID
for md_file in MD_OUTPUT_DIR.glob("*.md"):
try:
content = md_file.read_text(encoding='utf-8')
# Check if this file contains the video ID in the source URL
if video_id in content:
# Extract diarized text (everything after "## Transcript\n\n")
if "## Transcript" in content:
parts = content.split("## Transcript\n\n", 1)
if len(parts) == 2:
diarized_text = parts[1].strip()
# Extract title from first line
first_line = content.split('\n')[0]
title = first_line.replace('# ', '').strip()
return (md_file, content, diarized_text, title)
except Exception:
continue
return None
def parse_urls(input_text: str) -> List[str]:
"""Parse multiple YouTube URLs from input (comma or newline separated)."""
# Split by comma or newline
raw_urls = re.split(r'[,\n]+', input_text)
# Clean and validate URLs
youtube_patterns = [
r'(https?://)?(www\.)?youtube\.com/watch\?v=[\w-]+',
r'(https?://)?(www\.)?youtu\.be/[\w-]+',
r'(https?://)?(www\.)?youtube\.com/shorts/[\w-]+'
]
valid_urls = []
for url in raw_urls:
url = url.strip()
if not url:
continue
# Check if valid YouTube URL
if any(re.match(pattern, url) for pattern in youtube_patterns):
# Ensure https:// prefix
if not url.startswith('http'):
url = 'https://' + url
valid_urls.append(url)
return valid_urls
def process_single_video(url: str, temp_dir: Path, video_index: int, total_videos: int) -> VideoResult:
"""
Process a single video: download, transcribe, generate outputs.
Uses MD cache if available.
Returns VideoResult with all data.
"""
result = VideoResult(url=url)
prefix = f"[{video_index}/{total_videos}]"
try:
# Check cache first
video_id = extract_video_id(url)
if video_id:
cached = find_cached_result(video_id)
if cached:
md_path, md_content, diarized_text, title = cached
# Find corresponding SRT file
srt_stem = md_path.stem # Same filename without extension
srt_path = SRT_OUTPUT_DIR / f"{srt_stem}.srt"
result.video_info = {'id': video_id, 'title': title, 'duration': 0}
result.transcript_data = {'language': 'cached', 'speakers': set(), 'words': []}
result.srt_path = srt_path if srt_path.exists() else None
result.md_path = md_path
result.diarized_text = diarized_text
result.md_content = md_content
result.success = True
return result
# Step 1: Download audio (no cache found)
audio_file, video_info = download_youtube_audio(url, temp_dir)
result.video_info = video_info
# Step 2: Transcribe
transcript_data = transcribe_with_elevenlabs(audio_file)
result.transcript_data = transcript_data
# Step 3: Generate outputs
blocks = build_subtitle_blocks(transcript_data['words'])
srt_content = generate_srt_content(blocks, transcript_data['speakers'])
diarized_text = generate_diarized_text(transcript_data['words'])
result.diarized_text = diarized_text
# Step 4: Save files
SRT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
MD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_title = sanitize_filename(video_info['title'])
# Save SRT
srt_filename = f"{safe_title}_{timestamp}.srt"
srt_path = SRT_OUTPUT_DIR / srt_filename
with open(srt_path, 'w', encoding='utf-8') as f:
f.write(srt_content)
result.srt_path = srt_path
# Save MD
md_filename = f"{safe_title}_{timestamp}.md"
md_path = MD_OUTPUT_DIR / md_filename
md_content = f"""# {video_info['title']}
**Source:** {url}
**Language:** {transcript_data['language']}
**Speakers:** {len(transcript_data['speakers'])}
**Transcribed:** {datetime.now().strftime("%Y-%m-%d %H:%M")}
---
## Transcript
{diarized_text}
"""
result.md_content = md_content
with open(md_path, 'w', encoding='utf-8') as f:
f.write(md_content)
result.md_path = md_path
result.success = True
except Exception as e:
result.error = str(e)
return result
def main():
"""Main entry point with parallel batch processing support."""
# Validate arguments
if len(sys.argv) < 2:
play_sound(success=False)
sys.exit(1)
input_text = sys.argv[1].strip()
# Parse URLs (supports comma or newline separated)
urls = parse_urls(input_text)
if not urls:
play_sound(success=False)
sys.exit(1)
total_videos = len(urls)
is_batch = total_videos > 1
start_time = datetime.now()
results: List[VideoResult] = []
try:
# Create temp directory for all audio files
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
if is_batch:
# PARALLEL PROCESSING for multiple videos
with ThreadPoolExecutor(max_workers=MAX_PARALLEL_DOWNLOADS) as executor:
futures = {}
for i, url in enumerate(urls, 1):
# Create unique temp subdir for each video
video_temp = temp_path / f"video_{i}"
video_temp.mkdir(exist_ok=True)
future = executor.submit(
process_single_video, url, video_temp, i, total_videos
)
futures[future] = (i, url)
# Collect results as they complete
for future in as_completed(futures):
result = future.result()
results.append(result)
else:
# Single video - process directly
video_temp = temp_path / "video_1"
video_temp.mkdir(exist_ok=True)
result = process_single_video(urls[0], video_temp, 1, 1)
results.append(result)
# Sort results by original order (URL index)
results_by_url = {r.url: r for r in results}
results = [results_by_url[url] for url in urls if url in results_by_url]
# Calculate stats
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
# Build combined clipboard content
if successful:
clipboard_parts = []
for r in successful:
if r.video_info and r.diarized_text:
part = f"# {r.video_info['title']}\n\n{r.diarized_text}"
clipboard_parts.append(part)
combined_clipboard = "\n\n---\n\n".join(clipboard_parts)
copy_to_clipboard(combined_clipboard)
# Play success sound
play_sound(success=True)
else:
# All failed - play error sound
play_sound(success=False)
except Exception:
play_sound(success=False)
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment