Last active
December 8, 2025 21:45
-
-
Save yigitkonur/2bbbde91712a4f059aaab9ef68564be6 to your computer and use it in GitHub Desktop.
Raycast script: YouTube Transcribe with ElevenLabs Scribe - downloads audio, transcribes with diarization, generates SRT subtitles and speaker-formatted output
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Required parameters: | |
| # @raycast.schemaVersion 1 | |
| # @raycast.title YouTube Transcribe with Diarization | |
| # @raycast.mode silent | |
| # Optional parameters: | |
| # @raycast.icon 🎙️ | |
| # @raycast.packageName Media Tools | |
| # @raycast.argument1 { "type": "text", "placeholder": "YouTube URLs (comma or newline separated)" } | |
| # @raycast.needsConfirmation false | |
| # Documentation: | |
| # @raycast.author Yigit Konur | |
| # @raycast.authorURL https://github.com/yigitkonur | |
| # @raycast.description Downloads YouTube audio, transcribes with ElevenLabs Scribe (diarization + auto language), generates SRT/MD and copies transcript. Runs in background. Supports multiple URLs in parallel. Uses MD cache. | |
| import sys | |
| import os | |
| import re | |
| import json | |
| import tempfile | |
| import subprocess | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Tuple | |
| from dataclasses import dataclass, field | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import threading | |
| # Load environment variables from .env file | |
| SCRIPT_DIR = Path(__file__).parent | |
| ENV_FILE = SCRIPT_DIR / ".env" | |
| def load_env(): | |
| """Load environment variables from .env file.""" | |
| if ENV_FILE.exists(): | |
| with open(ENV_FILE) as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith('#') and '=' in line: | |
| key, value = line.split('=', 1) | |
| key = key.strip() | |
| value = value.strip() | |
| # Don't override existing env vars | |
| if key and key not in os.environ: | |
| os.environ[key] = value | |
| load_env() | |
| # Constants | |
| SRT_OUTPUT_DIR = SCRIPT_DIR / "srt" | |
| MD_OUTPUT_DIR = SCRIPT_DIR / "md" | |
| YT_DLP_PATH = "/opt/homebrew/bin/yt-dlp" | |
| SOUND_SUCCESS = "/System/Library/Sounds/Glass.aiff" | |
| SOUND_ERROR = "/System/Library/Sounds/Basso.aiff" | |
| # Parallel processing settings | |
| MAX_PARALLEL_DOWNLOADS = 3 | |
| MAX_PARALLEL_TRANSCRIPTIONS = 2 # ElevenLabs API rate limit consideration | |
| def play_sound(success: bool = True) -> None: | |
| """Play notification sound on macOS.""" | |
| sound_file = SOUND_SUCCESS if success else SOUND_ERROR | |
| try: | |
| subprocess.run(["afplay", sound_file], capture_output=True, check=True) | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| pass | |
| # SRT formatting constants (Netflix/BBC best practices) | |
| MAX_CHARS_PER_LINE = 42 | |
| MAX_DURATION_SECONDS = 7.0 | |
| MIN_DURATION_SECONDS = 0.833 # ~5/6 second | |
| MAX_READING_SPEED_CPS = 21 # chars per second | |
| PAUSE_THRESHOLD_SECONDS = 0.4 # pause that triggers new block | |
| # Thread-safe print lock | |
| print_lock = threading.Lock() | |
| def safe_print(*args, **kwargs): | |
| """Thread-safe print function.""" | |
| with print_lock: | |
| print(*args, **kwargs) | |
| @dataclass | |
| class VideoResult: | |
| """Holds all results for a single video processing.""" | |
| url: str | |
| success: bool = False | |
| error: Optional[str] = None | |
| video_info: Optional[Dict] = None | |
| transcript_data: Optional[Dict] = None | |
| srt_path: Optional[Path] = None | |
| md_path: Optional[Path] = None | |
| diarized_text: str = "" | |
| md_content: str = "" | |
| @dataclass | |
| class Word: | |
| """Represents a single word with timing and speaker info.""" | |
| text: str | |
| start: float | |
| end: float | |
| speaker: Optional[str] = None | |
| confidence: float = 1.0 | |
| @dataclass | |
| class SubtitleBlock: | |
| """Represents a single SRT subtitle block.""" | |
| index: int | |
| start: float | |
| end: float | |
| speaker: Optional[str] | |
| text: str | |
| def to_srt(self, speaker_map: Optional[Dict[str, str]] = None) -> str: | |
| """Convert block to SRT format string.""" | |
| start_ts = format_srt_timestamp(self.start) | |
| end_ts = format_srt_timestamp(self.end) | |
| # Add speaker prefix if available (use friendly name from map) | |
| if self.speaker: | |
| friendly_name = speaker_map.get(self.speaker, self.speaker) if speaker_map else self.speaker | |
| display_text = f"[{friendly_name}]\n{self.text}" | |
| else: | |
| display_text = self.text | |
| return f"{self.index}\n{start_ts} --> {end_ts}\n{display_text}\n" | |
| def format_srt_timestamp(seconds: float) -> str: | |
| """Convert seconds to SRT timestamp format (HH:MM:SS,mmm).""" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| millis = int((seconds % 1) * 1000) | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" | |
| def sanitize_filename(name: str) -> str: | |
| """Sanitize string for use as filename.""" | |
| # Remove invalid characters | |
| sanitized = re.sub(r'[<>:"/\\|?*]', '', name) | |
| # Replace spaces with underscores | |
| sanitized = re.sub(r'\s+', '_', sanitized) | |
| # Limit length | |
| return sanitized[:100] | |
| def is_sentence_end(text: str) -> bool: | |
| """Check if text ends with sentence-ending punctuation.""" | |
| return bool(re.search(r'[.!?][\'"\u00BB\u201C\u201D]?\s*$', text.strip())) | |
| def download_youtube_audio(url: str, output_dir: Path) -> Tuple[Path, Dict]: | |
| """ | |
| Download audio from YouTube URL using yt-dlp. | |
| Returns: | |
| Tuple of (audio_file_path, video_metadata) | |
| """ | |
| # Verify yt-dlp exists | |
| if not Path(YT_DLP_PATH).exists(): | |
| raise FileNotFoundError(f"yt-dlp not found at {YT_DLP_PATH}") | |
| # Create temp filename pattern | |
| output_template = str(output_dir / "%(id)s.%(ext)s") | |
| # First, extract info without downloading | |
| info_cmd = [ | |
| YT_DLP_PATH, | |
| "--dump-json", | |
| "--no-playlist", | |
| url | |
| ] | |
| try: | |
| result = subprocess.run( | |
| info_cmd, | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| timeout=60 | |
| ) | |
| video_info = json.loads(result.stdout) | |
| video_id = video_info.get('id', 'unknown') | |
| video_title = video_info.get('title', 'Unknown Title') | |
| duration = video_info.get('duration', 0) | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError("Timeout while fetching video info") | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"Failed to get video info: {e.stderr}") | |
| except json.JSONDecodeError: | |
| raise RuntimeError("Failed to parse video info") | |
| # Download audio | |
| download_cmd = [ | |
| YT_DLP_PATH, | |
| "--format", "bestaudio/best", | |
| "--extract-audio", | |
| "--audio-format", "mp3", | |
| "--audio-quality", "192K", | |
| "--no-playlist", | |
| "--output", output_template, | |
| url | |
| ] | |
| try: | |
| subprocess.run( | |
| download_cmd, | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| timeout=300 | |
| ) | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError("Download timeout (>5 minutes)") | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"Download failed: {e.stderr}") | |
| # Find the downloaded file | |
| audio_file = output_dir / f"{video_id}.mp3" | |
| if not audio_file.exists(): | |
| # Try to find any mp3 file | |
| mp3_files = list(output_dir.glob("*.mp3")) | |
| if mp3_files: | |
| audio_file = mp3_files[0] | |
| else: | |
| raise FileNotFoundError("Downloaded audio file not found") | |
| return audio_file, { | |
| 'id': video_id, | |
| 'title': video_title, | |
| 'duration': duration | |
| } | |
| def transcribe_with_elevenlabs(audio_path: Path) -> Dict: | |
| """ | |
| Transcribe audio using ElevenLabs Scribe API with diarization. | |
| Returns: | |
| Transcript data with words, segments, speakers | |
| """ | |
| try: | |
| from elevenlabs import ElevenLabs | |
| except ImportError: | |
| raise ImportError("elevenlabs") | |
| # Get API key | |
| api_key = os.environ.get("ELEVENLABS_API_KEY") | |
| if not api_key: | |
| raise ValueError( | |
| "ELEVENLABS_API_KEY environment variable not set.\n" | |
| "Get your API key from: https://elevenlabs.io/app/settings/api-keys\n" | |
| "Set it with: export ELEVENLABS_API_KEY='your_key_here'" | |
| ) | |
| client = ElevenLabs(api_key=api_key) | |
| # Read audio file | |
| with open(audio_path, 'rb') as f: | |
| audio_data = f.read() | |
| # Call Scribe API | |
| try: | |
| transcript = client.speech_to_text.convert( | |
| file=audio_data, | |
| model_id="scribe_v1", # Options: scribe_v1, scribe_v1_experimental, scribe_v2 | |
| diarize=True, # Enable speaker diarization | |
| timestamps_granularity="word", # Word-level timestamps | |
| tag_audio_events=False, # Disable (laughter), (music), etc. | |
| # language_code=None means auto-detect | |
| ) | |
| except Exception as e: | |
| error_str = str(e) | |
| if "rate" in error_str.lower(): | |
| raise RuntimeError("Rate limited by ElevenLabs API. Please wait and try again.") | |
| raise RuntimeError(f"Transcription failed: {e}") | |
| # Parse response - ElevenLabs returns Pydantic models | |
| result = { | |
| 'transcript': getattr(transcript, 'text', ''), | |
| 'language': getattr(transcript, 'language_code', 'unknown'), | |
| 'words': [], | |
| 'speakers': set() | |
| } | |
| # Extract words with timing and speaker info | |
| words_list = getattr(transcript, 'words', None) | |
| if words_list: | |
| for w in words_list: | |
| # Clean text - remove extra whitespace | |
| text = getattr(w, 'text', '').strip() | |
| if not text: | |
| continue | |
| word = Word( | |
| text=text, | |
| start=float(getattr(w, 'start', 0)), | |
| end=float(getattr(w, 'end', 0)), | |
| speaker=getattr(w, 'speaker_id', None), # ElevenLabs uses speaker_id | |
| confidence=float(getattr(w, 'confidence', 1.0)) | |
| ) | |
| result['words'].append(word) | |
| if word.speaker: | |
| result['speakers'].add(word.speaker) | |
| detected_lang = result['language'] | |
| num_speakers = len(result['speakers']) | |
| num_words = len(result['words']) | |
| return result | |
| def build_subtitle_blocks(words: List[Word]) -> List[SubtitleBlock]: | |
| """ | |
| Build SRT subtitle blocks from word-level data. | |
| Rules (Netflix/BBC best practices): | |
| 1. Max 42 characters per line | |
| 2. Duration: 0.833s - 7s per block | |
| 3. Break on speaker change (immediate new block) | |
| 4. Break on sentence end (., !, ?) | |
| 5. Break on pause > 0.4s | |
| 6. Max reading speed: 21 chars/sec | |
| """ | |
| if not words: | |
| return [] | |
| blocks: List[SubtitleBlock] = [] | |
| current_words: List[Word] = [] | |
| current_speaker: Optional[str] = None | |
| block_index = 1 | |
| def flush_block(): | |
| """Create a subtitle block from accumulated words.""" | |
| nonlocal block_index, current_words, current_speaker | |
| if not current_words: | |
| return | |
| text = ' '.join(w.text for w in current_words) | |
| start_time = current_words[0].start | |
| end_time = current_words[-1].end | |
| # Ensure minimum duration | |
| if end_time - start_time < MIN_DURATION_SECONDS: | |
| end_time = start_time + MIN_DURATION_SECONDS | |
| # Line wrapping for readability (max 42 chars per line, 2 lines max) | |
| wrapped_text = wrap_subtitle_text(text) | |
| blocks.append(SubtitleBlock( | |
| index=block_index, | |
| start=start_time, | |
| end=end_time, | |
| speaker=current_speaker, | |
| text=wrapped_text | |
| )) | |
| block_index += 1 | |
| current_words = [] | |
| for i, word in enumerate(words): | |
| # Check for speaker change - immediate new block | |
| if word.speaker != current_speaker and current_words: | |
| flush_block() | |
| current_speaker = word.speaker | |
| # Set speaker for first word | |
| if not current_words: | |
| current_speaker = word.speaker | |
| # Check for pause between words | |
| if current_words: | |
| last_word_end = current_words[-1].end | |
| pause = word.start - last_word_end | |
| if pause > PAUSE_THRESHOLD_SECONDS: | |
| flush_block() | |
| current_words.append(word) | |
| # Calculate current block properties | |
| current_text = ' '.join(w.text for w in current_words) | |
| current_duration = current_words[-1].end - current_words[0].start | |
| # Check if sentence ended | |
| if is_sentence_end(word.text): | |
| flush_block() | |
| continue | |
| # Check duration limit | |
| if current_duration >= MAX_DURATION_SECONDS: | |
| flush_block() | |
| continue | |
| # Check character limit (with buffer for next word) | |
| if len(current_text) >= MAX_CHARS_PER_LINE * 2 - 10: # 2 lines, with buffer | |
| flush_block() | |
| continue | |
| # Check reading speed | |
| if current_duration > 0: | |
| reading_speed = len(current_text) / current_duration | |
| if reading_speed > MAX_READING_SPEED_CPS and len(current_words) > 3: | |
| # Split to maintain readable speed | |
| flush_block() | |
| # Flush remaining words | |
| flush_block() | |
| return blocks | |
| def wrap_subtitle_text(text: str, max_line_length: int = MAX_CHARS_PER_LINE) -> str: | |
| """ | |
| Wrap subtitle text for readability. | |
| Rules: | |
| - Max 42 characters per line | |
| - Max 2 lines per block | |
| - Break at natural points (spaces, after punctuation) | |
| - First line can be longer than second | |
| """ | |
| if len(text) <= max_line_length: | |
| return text | |
| words = text.split() | |
| lines = [] | |
| current_line = [] | |
| current_length = 0 | |
| for word in words: | |
| word_len = len(word) + (1 if current_line else 0) # +1 for space | |
| if current_length + word_len <= max_line_length: | |
| current_line.append(word) | |
| current_length += word_len | |
| else: | |
| if current_line: | |
| lines.append(' '.join(current_line)) | |
| current_line = [word] | |
| current_length = len(word) | |
| # Max 2 lines | |
| if len(lines) >= 2: | |
| break | |
| if current_line and len(lines) < 2: | |
| lines.append(' '.join(current_line)) | |
| return '\n'.join(lines) | |
| def generate_srt_content(blocks: List[SubtitleBlock], speakers: set) -> str: | |
| """Generate complete SRT file content with friendly speaker names.""" | |
| # Create speaker map: speaker_0 -> Speaker 1, etc. | |
| speaker_map = {} | |
| for i, speaker_id in enumerate(sorted(speakers), 1): | |
| speaker_map[speaker_id] = f"Speaker {i}" | |
| return '\n'.join(block.to_srt(speaker_map) for block in blocks) | |
| def generate_diarized_text(words: List[Word]) -> str: | |
| """ | |
| Generate speaker-formatted output. | |
| Format: | |
| Speaker 1: concatenated text until speaker change | |
| Speaker 2: concatenated text until speaker change | |
| """ | |
| if not words: | |
| return "" | |
| output_lines = [] | |
| current_speaker: Optional[str] = None | |
| current_text: List[str] = [] | |
| # Map speaker IDs to friendly names | |
| speaker_map: Dict[str, str] = {} | |
| speaker_counter = 1 | |
| def get_speaker_name(speaker_id: Optional[str]) -> str: | |
| nonlocal speaker_counter | |
| if not speaker_id: | |
| return "Unknown" | |
| if speaker_id not in speaker_map: | |
| speaker_map[speaker_id] = f"Speaker {speaker_counter}" | |
| speaker_counter += 1 | |
| return speaker_map[speaker_id] | |
| def flush_speaker(): | |
| if current_text and current_speaker is not None: | |
| name = get_speaker_name(current_speaker) | |
| text = ' '.join(current_text) | |
| output_lines.append(f"{name}: {text}") | |
| for word in words: | |
| if word.speaker != current_speaker: | |
| flush_speaker() | |
| current_speaker = word.speaker | |
| current_text = [] | |
| current_text.append(word.text) | |
| # Flush last speaker | |
| flush_speaker() | |
| return '\n\n'.join(output_lines) | |
| def copy_to_clipboard(text: str) -> bool: | |
| """Copy text to clipboard using pbcopy.""" | |
| try: | |
| subprocess.run( | |
| ['pbcopy'], | |
| input=text.encode('utf-8'), | |
| check=True | |
| ) | |
| return True | |
| except subprocess.CalledProcessError: | |
| return False | |
| def extract_video_id(url: str) -> Optional[str]: | |
| """Extract YouTube video ID from URL.""" | |
| patterns = [ | |
| r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/)([a-zA-Z0-9_-]{11})', | |
| r'[?&]v=([a-zA-Z0-9_-]{11})', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def find_cached_result(video_id: str) -> Optional[Tuple[Path, str, str]]: | |
| """ | |
| Check if we have a cached MD file for this video ID. | |
| Returns (md_path, md_content, diarized_text) if found, None otherwise. | |
| """ | |
| if not MD_OUTPUT_DIR.exists(): | |
| return None | |
| # Search for MD files containing this video ID | |
| for md_file in MD_OUTPUT_DIR.glob("*.md"): | |
| try: | |
| content = md_file.read_text(encoding='utf-8') | |
| # Check if this file contains the video ID in the source URL | |
| if video_id in content: | |
| # Extract diarized text (everything after "## Transcript\n\n") | |
| if "## Transcript" in content: | |
| parts = content.split("## Transcript\n\n", 1) | |
| if len(parts) == 2: | |
| diarized_text = parts[1].strip() | |
| # Extract title from first line | |
| first_line = content.split('\n')[0] | |
| title = first_line.replace('# ', '').strip() | |
| return (md_file, content, diarized_text, title) | |
| except Exception: | |
| continue | |
| return None | |
| def parse_urls(input_text: str) -> List[str]: | |
| """Parse multiple YouTube URLs from input (comma or newline separated).""" | |
| # Split by comma or newline | |
| raw_urls = re.split(r'[,\n]+', input_text) | |
| # Clean and validate URLs | |
| youtube_patterns = [ | |
| r'(https?://)?(www\.)?youtube\.com/watch\?v=[\w-]+', | |
| r'(https?://)?(www\.)?youtu\.be/[\w-]+', | |
| r'(https?://)?(www\.)?youtube\.com/shorts/[\w-]+' | |
| ] | |
| valid_urls = [] | |
| for url in raw_urls: | |
| url = url.strip() | |
| if not url: | |
| continue | |
| # Check if valid YouTube URL | |
| if any(re.match(pattern, url) for pattern in youtube_patterns): | |
| # Ensure https:// prefix | |
| if not url.startswith('http'): | |
| url = 'https://' + url | |
| valid_urls.append(url) | |
| return valid_urls | |
| def process_single_video(url: str, temp_dir: Path, video_index: int, total_videos: int) -> VideoResult: | |
| """ | |
| Process a single video: download, transcribe, generate outputs. | |
| Uses MD cache if available. | |
| Returns VideoResult with all data. | |
| """ | |
| result = VideoResult(url=url) | |
| prefix = f"[{video_index}/{total_videos}]" | |
| try: | |
| # Check cache first | |
| video_id = extract_video_id(url) | |
| if video_id: | |
| cached = find_cached_result(video_id) | |
| if cached: | |
| md_path, md_content, diarized_text, title = cached | |
| # Find corresponding SRT file | |
| srt_stem = md_path.stem # Same filename without extension | |
| srt_path = SRT_OUTPUT_DIR / f"{srt_stem}.srt" | |
| result.video_info = {'id': video_id, 'title': title, 'duration': 0} | |
| result.transcript_data = {'language': 'cached', 'speakers': set(), 'words': []} | |
| result.srt_path = srt_path if srt_path.exists() else None | |
| result.md_path = md_path | |
| result.diarized_text = diarized_text | |
| result.md_content = md_content | |
| result.success = True | |
| return result | |
| # Step 1: Download audio (no cache found) | |
| audio_file, video_info = download_youtube_audio(url, temp_dir) | |
| result.video_info = video_info | |
| # Step 2: Transcribe | |
| transcript_data = transcribe_with_elevenlabs(audio_file) | |
| result.transcript_data = transcript_data | |
| # Step 3: Generate outputs | |
| blocks = build_subtitle_blocks(transcript_data['words']) | |
| srt_content = generate_srt_content(blocks, transcript_data['speakers']) | |
| diarized_text = generate_diarized_text(transcript_data['words']) | |
| result.diarized_text = diarized_text | |
| # Step 4: Save files | |
| SRT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| MD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_title = sanitize_filename(video_info['title']) | |
| # Save SRT | |
| srt_filename = f"{safe_title}_{timestamp}.srt" | |
| srt_path = SRT_OUTPUT_DIR / srt_filename | |
| with open(srt_path, 'w', encoding='utf-8') as f: | |
| f.write(srt_content) | |
| result.srt_path = srt_path | |
| # Save MD | |
| md_filename = f"{safe_title}_{timestamp}.md" | |
| md_path = MD_OUTPUT_DIR / md_filename | |
| md_content = f"""# {video_info['title']} | |
| **Source:** {url} | |
| **Language:** {transcript_data['language']} | |
| **Speakers:** {len(transcript_data['speakers'])} | |
| **Transcribed:** {datetime.now().strftime("%Y-%m-%d %H:%M")} | |
| --- | |
| ## Transcript | |
| {diarized_text} | |
| """ | |
| result.md_content = md_content | |
| with open(md_path, 'w', encoding='utf-8') as f: | |
| f.write(md_content) | |
| result.md_path = md_path | |
| result.success = True | |
| except Exception as e: | |
| result.error = str(e) | |
| return result | |
| def main(): | |
| """Main entry point with parallel batch processing support.""" | |
| # Validate arguments | |
| if len(sys.argv) < 2: | |
| play_sound(success=False) | |
| sys.exit(1) | |
| input_text = sys.argv[1].strip() | |
| # Parse URLs (supports comma or newline separated) | |
| urls = parse_urls(input_text) | |
| if not urls: | |
| play_sound(success=False) | |
| sys.exit(1) | |
| total_videos = len(urls) | |
| is_batch = total_videos > 1 | |
| start_time = datetime.now() | |
| results: List[VideoResult] = [] | |
| try: | |
| # Create temp directory for all audio files | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_path = Path(temp_dir) | |
| if is_batch: | |
| # PARALLEL PROCESSING for multiple videos | |
| with ThreadPoolExecutor(max_workers=MAX_PARALLEL_DOWNLOADS) as executor: | |
| futures = {} | |
| for i, url in enumerate(urls, 1): | |
| # Create unique temp subdir for each video | |
| video_temp = temp_path / f"video_{i}" | |
| video_temp.mkdir(exist_ok=True) | |
| future = executor.submit( | |
| process_single_video, url, video_temp, i, total_videos | |
| ) | |
| futures[future] = (i, url) | |
| # Collect results as they complete | |
| for future in as_completed(futures): | |
| result = future.result() | |
| results.append(result) | |
| else: | |
| # Single video - process directly | |
| video_temp = temp_path / "video_1" | |
| video_temp.mkdir(exist_ok=True) | |
| result = process_single_video(urls[0], video_temp, 1, 1) | |
| results.append(result) | |
| # Sort results by original order (URL index) | |
| results_by_url = {r.url: r for r in results} | |
| results = [results_by_url[url] for url in urls if url in results_by_url] | |
| # Calculate stats | |
| successful = [r for r in results if r.success] | |
| failed = [r for r in results if not r.success] | |
| # Build combined clipboard content | |
| if successful: | |
| clipboard_parts = [] | |
| for r in successful: | |
| if r.video_info and r.diarized_text: | |
| part = f"# {r.video_info['title']}\n\n{r.diarized_text}" | |
| clipboard_parts.append(part) | |
| combined_clipboard = "\n\n---\n\n".join(clipboard_parts) | |
| copy_to_clipboard(combined_clipboard) | |
| # Play success sound | |
| play_sound(success=True) | |
| else: | |
| # All failed - play error sound | |
| play_sound(success=False) | |
| except Exception: | |
| play_sound(success=False) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment