Created
March 1, 2026 12:17
-
-
Save ayush-that/58b02799c43de2c12b866b89c41f26ee to your computer and use it in GitHub Desktop.
Audiobook generation pipeline for 'The Accidental CTO' using Sarvam AI TTS.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Audiobook generation pipeline for 'The Accidental CTO' using Sarvam AI TTS.""" | |
| import argparse | |
| import base64 | |
| import json | |
| import os | |
| import re | |
| import time | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import requests | |
| from dotenv import load_dotenv | |
| from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK | |
| from mutagen.mp3 import MP3 | |
| from pydub import AudioSegment | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| BOOK_PATH = Path(__file__).resolve().parent.parent / "The-Accidental-CTO" / "The Accidental CTO.md" | |
| OUTPUT_DIR = Path(__file__).resolve().parent | |
| CHUNKS_DIR = OUTPUT_DIR / "chunks" | |
| CHAPTERS_DIR = OUTPUT_DIR / "chapters" | |
| PROGRESS_FILE = OUTPUT_DIR / "progress.json" | |
| MANIFEST_FILE = OUTPUT_DIR / "manifest.json" | |
| FULL_BOOK_FILE = OUTPUT_DIR / "the_accidental_cto_full.mp3" | |
| SARVAM_TTS_URL = "https://api.sarvam.ai/text-to-speech" | |
| ANTHROPIC_API_URL = "https://api.anthropic.com/v1/messages" | |
| MAX_CHUNK_CHARS = 2400 | |
| NORMALIZE_BATCH_CHARS = 6000 # process ~6k chars at a time through LLM | |
| REQUEST_INTERVAL = 1.1 # seconds between API calls | |
| REQUEST_TIMEOUT = 30 | |
| NORMALIZED_DIR = OUTPUT_DIR / "normalized" | |
| SHELL_COMMANDS = ( | |
| "ssh", "htop", "pg_dump", "scp", "psql", "kubectl", "docker", | |
| "sudo", "apt", "yum", "npm", "pip", "git", "curl", "wget", | |
| "systemctl", "nginx", "redis-cli", "mysql", "mongod", | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Data model | |
| # --------------------------------------------------------------------------- | |
| @dataclass | |
| class Chapter: | |
| index: int | |
| title: str | |
| slug: str | |
| raw_text: str | |
| clean_text: str = "" | |
| chunks: list[str] = field(default_factory=list) | |
| # --------------------------------------------------------------------------- | |
| # Step 1: Parse chapters | |
| # --------------------------------------------------------------------------- | |
| def parse_chapters(text: str) -> list[Chapter]: | |
| """Split the markdown book into chapters on ``## Chapter`` headers. | |
| Key Takeaways sections stay within their parent chapter. | |
| The Dedication section at the end is included as the final chapter. | |
| Everything before the first ``## Chapter`` header is skipped. | |
| """ | |
| chapter_pattern = re.compile(r"^## Chapter \d+:(?!.*Key Takeaways)", re.MULTILINE) | |
| dedication_pattern = re.compile(r"^# Dedication", re.MULTILINE) | |
| # Find all ## Chapter header positions | |
| matches = list(chapter_pattern.finditer(text)) | |
| if not matches: | |
| raise RuntimeError("No chapter headers found in the book.") | |
| chapters: list[Chapter] = [] | |
| for i, match in enumerate(matches): | |
| start = match.start() | |
| # Section extends to the next ## Chapter header (or dedication / end) | |
| if i + 1 < len(matches): | |
| end = matches[i + 1].start() | |
| else: | |
| # After last ## Chapter, check for Dedication | |
| ded = dedication_pattern.search(text, match.end()) | |
| end = ded.start() if ded else len(text) | |
| header_line = text[start:text.index("\n", start)].strip() | |
| raw = text[start:end].rstrip() | |
| # Remove "Key Takeaways" that are ## level headers — they stay in chapter | |
| # (already included since we split on ## Chapter, not ##) | |
| title = re.sub(r"^##\s*", "", header_line).strip() | |
| slug = _slugify(title) | |
| chapters.append(Chapter( | |
| index=len(chapters) + 1, | |
| title=title, | |
| slug=slug, | |
| raw_text=raw, | |
| )) | |
| # Include Dedication as the final section | |
| ded_match = dedication_pattern.search(text) | |
| if ded_match: | |
| raw = text[ded_match.start():].rstrip() | |
| chapters.append(Chapter( | |
| index=len(chapters) + 1, | |
| title="Dedication", | |
| slug="dedication", | |
| raw_text=raw, | |
| )) | |
| return chapters | |
| def _slugify(title: str) -> str: | |
| s = title.lower() | |
| s = re.sub(r"[^a-z0-9\s-]", "", s) | |
| s = re.sub(r"[\s-]+", "_", s).strip("_") | |
| return s[:60] | |
| # --------------------------------------------------------------------------- | |
| # Step 2: Preprocess text for TTS | |
| # --------------------------------------------------------------------------- | |
| def preprocess_text(raw: str) -> str: | |
| text = raw | |
| # 1. Replace fenced code blocks | |
| text = re.sub( | |
| r"```[\s\S]*?```", | |
| "A code example is shown in the book.", | |
| text, | |
| ) | |
| # 2. Replace markdown tables (consecutive lines with |) | |
| def _replace_table(m: re.Match) -> str: | |
| return "A comparison table is shown in the book." | |
| text = re.sub(r"(?:^[ \t]*\|.*\|[ \t]*\n){2,}", _replace_table, text, flags=re.MULTILINE) | |
| # 3. Remove image references | |
| text = re.sub(r"!\[[^\]]*\]\([^)]*\)", "", text) | |
| # 4. Convert hyperlinks [text](url) -> text | |
| text = re.sub(r"\[([^\]]+)\]\([^)]*\)", r"\1", text) | |
| # 5. Strip bold | |
| text = re.sub(r"\*\*([^*]+)\*\*", r"\1", text) | |
| # 6. Strip italic (underscore style, respecting word boundaries) | |
| text = re.sub(r"(?<!\w)_([^_]+)_(?!\w)", r"\1", text) | |
| # 7. Strip inline code | |
| text = re.sub(r"`([^`]+)`", r"\1", text) | |
| # 8. Remove HTML <br/> tags | |
| text = re.sub(r"<br\s*/?>", "", text) | |
| # 9. Remove escaped asterisks | |
| text = re.sub(r"\\\*\\\*", "", text) | |
| text = re.sub(r"\\\*", "", text) | |
| # 10. Convert headers (## Title) -> "Title." | |
| text = re.sub(r"^#{1,6}\s+(.+)$", r"\1.", text, flags=re.MULTILINE) | |
| # 11. Handle blockquotes | |
| lines = text.split("\n") | |
| processed: list[str] = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| if stripped.startswith(">"): | |
| content = stripped.lstrip(">").strip() | |
| # Check if it starts with a shell command | |
| first_word = content.split()[0] if content.split() else "" | |
| # Strip angle-bracket emails/urls for command detection | |
| first_word_clean = re.sub(r"<[^>]+>", "", first_word).strip() | |
| if first_word_clean in SHELL_COMMANDS: | |
| processed.append(f'The command {first_word_clean} was executed.') | |
| else: | |
| # Keep as dialogue / regular quote | |
| processed.append(content) | |
| else: | |
| processed.append(line) | |
| text = "\n".join(processed) | |
| # 12. Collapse multiple blank lines, normalize whitespace | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = text.strip() | |
| return text | |
| # --------------------------------------------------------------------------- | |
| # Step 2.5: Normalize text for speech using LLM | |
| # --------------------------------------------------------------------------- | |
| NORMALIZE_SYSTEM_PROMPT = """\ | |
| You are a text normalizer preparing written content for a text-to-speech audiobook engine. \ | |
| The TTS engine reads text literally, so you must rewrite the text so it sounds natural when spoken aloud. | |
| Rules — apply ALL of these: | |
| 1. PARENTHESES: Remove parentheses. Integrate the content naturally using commas, dashes, or rephrasing. | |
| - "Redis (an in-memory cache)" → "Redis, an in-memory cache," | |
| - "(pronounced 'koo-ber-net-ees')" → "pronounced koo-ber-net-ees" | |
| - "(~500 ms)" → "around 500 milliseconds" | |
| 2. ACRONYMS & ABBREVIATIONS: Expand or space out for speech. | |
| - CPU/CPUs → "C P U" / "C P Us" | |
| - API/APIs → "A P I" / "A P Is" | |
| - CDN → "C D N", SQL → "S Q L", NoSQL → "No S Q L" | |
| - AWS → "A W S", DNS → "D N S", CI/CD → "C I C D" | |
| - SLA → "S L A", ORM → "O R M", CTO → "C T O" | |
| - e.g. → "for example", i.e. → "that is", etc. → "etcetera", vs. → "versus" | |
| - ms → "milliseconds", GB → "gigabytes", MB → "megabytes", kHz → "kilohertz" | |
| - 10x → "ten times", 100x → "hundred times" | |
| 3. HYPHENS & DASHES: Replace decorative/separator hyphens and em-dashes. | |
| - "---" or "—" used as separators → use commas or periods instead | |
| - Keep hyphens in compound words ("real-time", "read-only") | |
| 4. SPECIAL CHARACTERS: Remove or replace. | |
| - URLs → just say the domain name or remove entirely | |
| - Email addresses → remove or say naturally | |
| - ~ → "approximately" or "around" | |
| - / when meaning "or" → "or" | |
| - >= → "greater than or equal to", <= → "less than or equal to" | |
| - != → "not equal to", == → "equals" | |
| - % → "percent" | |
| - & → "and" | |
| - Remove any remaining *, #, _, `, |, >, < characters that are markdown artifacts | |
| 5. NUMBERS & UNITS: | |
| - "1000" → "one thousand", "10,000" → "ten thousand" (for round numbers) | |
| - Keep specific numbers as digits: "2,500 characters" is fine | |
| - "48000" → "forty-eight thousand" | |
| - "v2" → "version 2", "v3" → "version 3" | |
| 6. CODE & TECHNICAL TERMS: Make them speakable. | |
| - File paths like "/etc/nginx/conf.d" → just remove or say "the nginx config file" | |
| - Variable names in camelCase or snake_case → space them out: "readReplica" → "read replica" | |
| - Config keys → just the meaningful word | |
| - Any remaining code-like text → rephrase or remove | |
| 7. HINDI/URDU PHRASES: Keep them exactly as written. Do not translate or modify. | |
| 8. DO NOT change the meaning, add new information, or remove meaningful content. | |
| 9. DO NOT add any commentary, explanations, or meta-text. | |
| 10. Keep the output roughly the same length as the input. Do not significantly expand or compress. | |
| 11. Output ONLY the normalized text, nothing else.\ | |
| """ | |
| def normalize_for_speech(text: str, anthropic_key: str, chapter_slug: str) -> str: | |
| """Normalize text for natural TTS using Claude Haiku. Caches results.""" | |
| cache_file = NORMALIZED_DIR / f"{chapter_slug}.txt" | |
| if cache_file.exists(): | |
| cached = cache_file.read_text(encoding="utf-8") | |
| if cached.strip(): | |
| return cached | |
| NORMALIZED_DIR.mkdir(parents=True, exist_ok=True) | |
| # Split into batches to stay within LLM context limits | |
| paragraphs = text.split("\n\n") | |
| batches = [] | |
| current_batch = "" | |
| for para in paragraphs: | |
| candidate = f"{current_batch}\n\n{para}".strip() if current_batch else para | |
| if len(candidate) <= NORMALIZE_BATCH_CHARS: | |
| current_batch = candidate | |
| else: | |
| if current_batch: | |
| batches.append(current_batch) | |
| current_batch = para | |
| if current_batch: | |
| batches.append(current_batch) | |
| normalized_parts = [] | |
| for i, batch in enumerate(batches): | |
| print(f" Normalizing batch {i + 1}/{len(batches)} ({len(batch)} chars)...") | |
| result = _call_anthropic(batch, anthropic_key) | |
| normalized_parts.append(result) | |
| if i < len(batches) - 1: | |
| time.sleep(0.5) # gentle rate limiting | |
| normalized = "\n\n".join(normalized_parts) | |
| cache_file.write_text(normalized, encoding="utf-8") | |
| return normalized | |
| def _call_anthropic(text: str, api_key: str) -> str: | |
| """Call Claude Haiku to normalize a batch of text.""" | |
| headers = { | |
| "x-api-key": api_key, | |
| "anthropic-version": "2023-06-01", | |
| "content-type": "application/json", | |
| } | |
| payload = { | |
| "model": "claude-haiku-4-5-20251001", | |
| "max_tokens": 8192, | |
| "system": NORMALIZE_SYSTEM_PROMPT, | |
| "messages": [ | |
| {"role": "user", "content": f"Normalize this text for audiobook TTS:\n\n{text}"} | |
| ], | |
| } | |
| for attempt in range(3): | |
| try: | |
| resp = requests.post( | |
| ANTHROPIC_API_URL, | |
| headers=headers, | |
| json=payload, | |
| timeout=60, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| return data["content"][0]["text"] | |
| if resp.status_code == 429: | |
| time.sleep(5 * (attempt + 1)) | |
| continue | |
| if resp.status_code >= 500: | |
| time.sleep(2) | |
| continue | |
| resp.raise_for_status() | |
| except requests.exceptions.Timeout: | |
| time.sleep(2) | |
| continue | |
| print(f" WARNING: Normalization failed, using raw text") | |
| return text | |
| # --------------------------------------------------------------------------- | |
| # Step 3: Chunk text | |
| # --------------------------------------------------------------------------- | |
| def chunk_text(text: str) -> list[str]: | |
| """Split text into chunks of at most MAX_CHUNK_CHARS characters.""" | |
| paragraphs = re.split(r"\n\n+", text) | |
| chunks: list[str] = [] | |
| current = "" | |
| for para in paragraphs: | |
| para = para.strip() | |
| if not para: | |
| continue | |
| candidate = f"{current}\n\n{para}".strip() if current else para | |
| if len(candidate) <= MAX_CHUNK_CHARS: | |
| current = candidate | |
| else: | |
| # Flush current if non-empty | |
| if current: | |
| chunks.append(current) | |
| current = "" | |
| # Handle paragraph that itself exceeds the limit | |
| if len(para) > MAX_CHUNK_CHARS: | |
| chunks.extend(_split_long_paragraph(para)) | |
| else: | |
| current = para | |
| if current: | |
| chunks.append(current) | |
| return chunks | |
| def _split_long_paragraph(para: str) -> list[str]: | |
| """Split a long paragraph at sentence, then clause, then word boundaries.""" | |
| # Try sentence boundaries first | |
| sentences = re.split(r"(?<=[.?!])\s+", para) | |
| if len(sentences) > 1: | |
| return _merge_pieces(sentences) | |
| # Try clause boundaries (comma, semicolon) | |
| clauses = re.split(r"(?<=[,;])\s+", para) | |
| if len(clauses) > 1: | |
| return _merge_pieces(clauses) | |
| # Last resort: split at word boundaries (never mid-word) | |
| words = para.split() | |
| return _merge_pieces(words, joiner=" ") | |
| def _merge_pieces(pieces: list[str], joiner: str = " ") -> list[str]: | |
| chunks: list[str] = [] | |
| current = "" | |
| for piece in pieces: | |
| candidate = f"{current}{joiner}{piece}".strip() if current else piece | |
| if len(candidate) <= MAX_CHUNK_CHARS: | |
| current = candidate | |
| else: | |
| if current: | |
| chunks.append(current) | |
| current = piece | |
| if current: | |
| chunks.append(current) | |
| return chunks | |
| # --------------------------------------------------------------------------- | |
| # Step 4: Synthesize a single chunk via Sarvam TTS | |
| # --------------------------------------------------------------------------- | |
| def synthesize_chunk(text: str, api_key: str) -> bytes: | |
| """Call Sarvam TTS and return raw MP3 bytes.""" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "api-subscription-key": api_key, | |
| } | |
| payload = { | |
| "text": text, | |
| "target_language_code": "en-IN", | |
| "model": "bulbul:v3", | |
| "speaker": "aditya", | |
| "pace": 1.0, | |
| "speech_sample_rate": 48000, | |
| "output_audio_codec": "mp3", | |
| } | |
| backoff = 5 | |
| for attempt in range(4): # initial + 3 retries | |
| try: | |
| resp = requests.post( | |
| SARVAM_TTS_URL, | |
| headers=headers, | |
| json=payload, | |
| timeout=REQUEST_TIMEOUT, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| return base64.b64decode(data["audios"][0]) | |
| if resp.status_code == 429: | |
| wait = backoff * (2 ** attempt) | |
| print(f" Rate limited (429). Waiting {wait}s...") | |
| time.sleep(wait) | |
| continue | |
| if resp.status_code >= 500: | |
| print(f" Server error ({resp.status_code}). Retrying in 2s...") | |
| time.sleep(2) | |
| continue | |
| resp.raise_for_status() | |
| except requests.exceptions.Timeout: | |
| print(f" Timeout on attempt {attempt + 1}. Retrying...") | |
| time.sleep(2) | |
| continue | |
| raise RuntimeError(f"Failed to synthesize chunk after retries. Last text: {text[:80]}...") | |
| # --------------------------------------------------------------------------- | |
| # Step 5: Generate audio for a full chapter | |
| # --------------------------------------------------------------------------- | |
| def generate_chapter_audio(chapter: Chapter, api_key: str) -> list[Path]: | |
| """Synthesize all chunks for a chapter, saving MP3 files. Supports resume.""" | |
| ch_label = f"ch{chapter.index:02d}" | |
| ch_dir = CHUNKS_DIR / ch_label | |
| ch_dir.mkdir(parents=True, exist_ok=True) | |
| progress = _load_progress() | |
| chunk_files: list[Path] = [] | |
| for i, chunk_text_content in enumerate(chapter.chunks, start=1): | |
| filename = f"{ch_label}_{i:03d}.mp3" | |
| filepath = ch_dir / filename | |
| if filepath.exists(): | |
| print(f" Chunk {i}/{len(chapter.chunks)} already exists, skipping.") | |
| chunk_files.append(filepath) | |
| continue | |
| print(f" Synthesizing chunk {i}/{len(chapter.chunks)} ({len(chunk_text_content)} chars)...") | |
| audio_bytes = synthesize_chunk(chunk_text_content, api_key) | |
| filepath.write_bytes(audio_bytes) | |
| chunk_files.append(filepath) | |
| # Update progress | |
| progress.setdefault("chapters", {}) | |
| progress["chapters"].setdefault(ch_label, {"completed_chunks": 0, "total_chunks": len(chapter.chunks)}) | |
| progress["chapters"][ch_label]["completed_chunks"] = i | |
| _save_progress(progress) | |
| # Rate limiting | |
| if i < len(chapter.chunks): | |
| time.sleep(REQUEST_INTERVAL) | |
| return chunk_files | |
| # --------------------------------------------------------------------------- | |
| # Step 6: Concatenate chapter chunks into a single chapter MP3 | |
| # --------------------------------------------------------------------------- | |
| def concatenate_chapter(chapter: Chapter) -> Path: | |
| ch_label = f"ch{chapter.index:02d}" | |
| ch_dir = CHUNKS_DIR / ch_label | |
| output_path = CHAPTERS_DIR / f"{ch_label}_{chapter.slug}.mp3" | |
| CHAPTERS_DIR.mkdir(parents=True, exist_ok=True) | |
| chunk_files = sorted(ch_dir.glob(f"{ch_label}_*.mp3")) | |
| if not chunk_files: | |
| raise RuntimeError(f"No chunk files found for {ch_label}") | |
| silence = AudioSegment.silent(duration=300) | |
| combined = AudioSegment.empty() | |
| for i, cf in enumerate(chunk_files): | |
| segment = AudioSegment.from_mp3(str(cf)) | |
| if i > 0: | |
| combined += silence | |
| combined += segment | |
| combined.export( | |
| str(output_path), | |
| format="mp3", | |
| bitrate="192k", | |
| parameters=["-ar", "48000"], | |
| ) | |
| print(f" Exported {output_path.name} ({len(combined) / 1000:.1f}s)") | |
| return output_path | |
| # --------------------------------------------------------------------------- | |
| # Step 7: Concatenate all chapters into the full audiobook | |
| # --------------------------------------------------------------------------- | |
| def concatenate_full_book(chapter_files: list[Path]) -> Path: | |
| silence = AudioSegment.silent(duration=2000) | |
| combined = AudioSegment.empty() | |
| for i, cf in enumerate(chapter_files): | |
| print(f" Loading {cf.name}...") | |
| segment = AudioSegment.from_mp3(str(cf)) | |
| if i > 0: | |
| combined += silence | |
| combined += segment | |
| combined.export( | |
| str(FULL_BOOK_FILE), | |
| format="mp3", | |
| bitrate="192k", | |
| parameters=["-ar", "48000"], | |
| ) | |
| print(f" Full book exported: {FULL_BOOK_FILE.name} ({len(combined) / 1000:.1f}s)") | |
| return FULL_BOOK_FILE | |
| # --------------------------------------------------------------------------- | |
| # Step 8: Add metadata (ID3 tags) and write manifest | |
| # --------------------------------------------------------------------------- | |
| def add_metadata(chapters: list[Chapter], chapter_files: list[Path], total_cost_chars: int): | |
| # Tag individual chapter files (skip missing files in single-chapter mode) | |
| for chapter, filepath in zip(chapters, chapter_files): | |
| if not filepath.exists(): | |
| continue | |
| try: | |
| audio = MP3(str(filepath)) | |
| if audio.tags is None: | |
| audio.add_tags() | |
| except Exception: | |
| audio = MP3(str(filepath)) | |
| audio.add_tags() | |
| audio.tags.add(TIT2(encoding=3, text=chapter.title)) | |
| audio.tags.add(TPE1(encoding=3, text="Subhash Choudhary")) | |
| audio.tags.add(TALB(encoding=3, text="The Accidental CTO")) | |
| audio.tags.add(TRCK(encoding=3, text=str(chapter.index))) | |
| audio.save() | |
| # Tag full book if it exists | |
| if FULL_BOOK_FILE.exists(): | |
| try: | |
| audio = MP3(str(FULL_BOOK_FILE)) | |
| if audio.tags is None: | |
| audio.add_tags() | |
| except Exception: | |
| audio = MP3(str(FULL_BOOK_FILE)) | |
| audio.add_tags() | |
| audio.tags.add(TIT2(encoding=3, text="The Accidental CTO")) | |
| audio.tags.add(TPE1(encoding=3, text="Subhash Choudhary")) | |
| audio.tags.add(TALB(encoding=3, text="The Accidental CTO")) | |
| audio.save() | |
| # Build manifest | |
| manifest = { | |
| "title": "The Accidental CTO", | |
| "author": "Subhash Choudhary", | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| "total_chapters": len(chapters), | |
| "total_speakable_chars": total_cost_chars, | |
| "estimated_cost_inr": round(total_cost_chars / 10000 * 30, 2), | |
| "chapters": [], | |
| } | |
| for chapter, filepath in zip(chapters, chapter_files): | |
| duration = 0.0 | |
| if filepath.exists(): | |
| try: | |
| audio = MP3(str(filepath)) | |
| duration = audio.info.length | |
| except Exception: | |
| pass | |
| manifest["chapters"].append({ | |
| "index": chapter.index, | |
| "title": chapter.title, | |
| "slug": chapter.slug, | |
| "chunks": len(chapter.chunks), | |
| "chars": len(chapter.clean_text), | |
| "duration_seconds": round(duration, 2), | |
| "file": filepath.name, | |
| }) | |
| MANIFEST_FILE.write_text(json.dumps(manifest, indent=2)) | |
| print(f" Manifest written to {MANIFEST_FILE.name}") | |
| # --------------------------------------------------------------------------- | |
| # Progress helpers | |
| # --------------------------------------------------------------------------- | |
| def _load_progress() -> dict: | |
| if PROGRESS_FILE.exists(): | |
| return json.loads(PROGRESS_FILE.read_text()) | |
| return {} | |
| def _save_progress(data: dict): | |
| PROGRESS_FILE.write_text(json.dumps(data, indent=2)) | |
| # --------------------------------------------------------------------------- | |
| # Main pipeline | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Generate audiobook from The Accidental CTO") | |
| parser.add_argument("--dry-run", action="store_true", help="Parse and show stats without making API calls") | |
| parser.add_argument("--chapter", type=int, help="Generate a single chapter by sequential index") | |
| parser.add_argument("--skip-tts", action="store_true", help="Skip TTS; only concatenate and tag existing chunks") | |
| parser.add_argument("--skip-normalize", action="store_true", help="Skip LLM normalization step") | |
| args = parser.parse_args() | |
| # Load API keys | |
| load_dotenv(OUTPUT_DIR.parent / ".env") | |
| api_key = os.getenv("SARVAM_API_KEY") or os.getenv("SARVAM_API+KEY") or "" | |
| anthropic_key = os.getenv("ANTHROPIC_API_KEY") or "" | |
| if not api_key and not args.dry_run and not args.skip_tts: | |
| print("ERROR: No API key found. Set SARVAM_API_KEY or SARVAM_API+KEY in .env") | |
| return | |
| if not anthropic_key and not args.dry_run and not args.skip_normalize: | |
| print("ERROR: No ANTHROPIC_API_KEY found in .env (needed for text normalization)") | |
| return | |
| # Step 1: Parse chapters | |
| print("=" * 60) | |
| print("STEP 1: Parsing chapters") | |
| print("=" * 60) | |
| book_text = BOOK_PATH.read_text(encoding="utf-8") | |
| chapters = parse_chapters(book_text) | |
| print(f" Found {len(chapters)} chapters (including Dedication)") | |
| # Step 2: Preprocess | |
| print("\nSTEP 2: Preprocessing text for TTS") | |
| print("=" * 60) | |
| for ch in chapters: | |
| ch.clean_text = preprocess_text(ch.raw_text) | |
| print(f" Ch {ch.index:2d}: {ch.title[:50]:50s} | raw={len(ch.raw_text):6d} -> clean={len(ch.clean_text):6d}") | |
| # Step 2.5: Normalize for speech using LLM | |
| if not args.skip_normalize: | |
| print("\nSTEP 2.5: Normalizing text for speech (Claude Haiku)") | |
| print("=" * 60) | |
| for ch in chapters: | |
| cache_file = NORMALIZED_DIR / f"{ch.slug}.txt" | |
| if cache_file.exists() and cache_file.read_text(encoding="utf-8").strip(): | |
| print(f" Ch {ch.index:2d}: cached ({len(cache_file.read_text()):,} chars)") | |
| ch.clean_text = cache_file.read_text(encoding="utf-8") | |
| else: | |
| print(f" Ch {ch.index:2d}: normalizing {ch.title[:40]}...") | |
| ch.clean_text = normalize_for_speech(ch.clean_text, anthropic_key, ch.slug) | |
| print(f" -> {len(ch.clean_text):,} chars") | |
| else: | |
| print("\n Skipping normalization (--skip-normalize)") | |
| # Step 3: Chunk | |
| print("\nSTEP 3: Chunking text") | |
| print("=" * 60) | |
| total_chunks = 0 | |
| total_chars = 0 | |
| for ch in chapters: | |
| ch.chunks = chunk_text(ch.clean_text) | |
| total_chunks += len(ch.chunks) | |
| total_chars += len(ch.clean_text) | |
| print(f" Ch {ch.index:2d}: {len(ch.chunks):3d} chunks") | |
| print(f"\n Total chunks: {total_chunks}") | |
| print(f" Total speakable chars: {total_chars:,}") | |
| print(f" Estimated cost: Rs {total_chars / 10000 * 30:.0f} (~${total_chars / 10000 * 30 / 85:.1f} USD)") | |
| print(f" Estimated API time: {total_chunks * REQUEST_INTERVAL / 60:.1f} min") | |
| if args.dry_run: | |
| print("\n[DRY RUN] Stopping before TTS. No API calls made.") | |
| return | |
| # Determine which chapters to process | |
| if args.chapter: | |
| targets = [ch for ch in chapters if ch.index == args.chapter] | |
| if not targets: | |
| print(f"ERROR: Chapter {args.chapter} not found (valid: 1-{len(chapters)})") | |
| return | |
| else: | |
| targets = chapters | |
| # Step 4 & 5: Synthesize chunks | |
| if not args.skip_tts: | |
| print(f"\nSTEP 4-5: Synthesizing audio ({len(targets)} chapter(s))") | |
| print("=" * 60) | |
| for ch in targets: | |
| print(f"\n --- Chapter {ch.index}: {ch.title} ({len(ch.chunks)} chunks) ---") | |
| generate_chapter_audio(ch, api_key) | |
| # Step 6: Concatenate chapters | |
| print(f"\nSTEP 6: Concatenating chapter audio") | |
| print("=" * 60) | |
| chapter_files: list[Path] = [] | |
| for ch in targets: | |
| ch_label = f"ch{ch.index:02d}" | |
| ch_dir = CHUNKS_DIR / ch_label | |
| if ch_dir.exists() and list(ch_dir.glob("*.mp3")): | |
| filepath = concatenate_chapter(ch) | |
| chapter_files.append(filepath) | |
| else: | |
| print(f" Skipping Ch {ch.index} (no chunks found)") | |
| # Step 7: Concatenate full book (only if processing all chapters) | |
| if not args.chapter and len(chapter_files) == len(chapters): | |
| print(f"\nSTEP 7: Concatenating full audiobook") | |
| print("=" * 60) | |
| concatenate_full_book(chapter_files) | |
| elif args.chapter: | |
| print(f"\n Skipping full book concatenation (single chapter mode)") | |
| # Step 8: Metadata and manifest | |
| print(f"\nSTEP 8: Adding metadata and writing manifest") | |
| print("=" * 60) | |
| # For manifest, always use the full chapters list | |
| all_chapter_files: list[Path] = [] | |
| for ch in chapters: | |
| expected = CHAPTERS_DIR / f"ch{ch.index:02d}_{ch.slug}.mp3" | |
| all_chapter_files.append(expected) | |
| add_metadata(chapters, all_chapter_files, total_chars) | |
| print("\nDone!") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment