Skip to content

Instantly share code, notes, and snippets.

@ayush-that
Created March 1, 2026 12:17
Show Gist options
  • Select an option

  • Save ayush-that/58b02799c43de2c12b866b89c41f26ee to your computer and use it in GitHub Desktop.

Select an option

Save ayush-that/58b02799c43de2c12b866b89c41f26ee to your computer and use it in GitHub Desktop.
Audiobook generation pipeline for 'The Accidental CTO' using Sarvam AI TTS.
#!/usr/bin/env python3
"""Audiobook generation pipeline for 'The Accidental CTO' using Sarvam AI TTS."""
import argparse
import base64
import json
import os
import re
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
import requests
from dotenv import load_dotenv
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK
from mutagen.mp3 import MP3
from pydub import AudioSegment
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
BOOK_PATH = Path(__file__).resolve().parent.parent / "The-Accidental-CTO" / "The Accidental CTO.md"
OUTPUT_DIR = Path(__file__).resolve().parent
CHUNKS_DIR = OUTPUT_DIR / "chunks"
CHAPTERS_DIR = OUTPUT_DIR / "chapters"
PROGRESS_FILE = OUTPUT_DIR / "progress.json"
MANIFEST_FILE = OUTPUT_DIR / "manifest.json"
FULL_BOOK_FILE = OUTPUT_DIR / "the_accidental_cto_full.mp3"
SARVAM_TTS_URL = "https://api.sarvam.ai/text-to-speech"
ANTHROPIC_API_URL = "https://api.anthropic.com/v1/messages"
MAX_CHUNK_CHARS = 2400
NORMALIZE_BATCH_CHARS = 6000 # process ~6k chars at a time through LLM
REQUEST_INTERVAL = 1.1 # seconds between API calls
REQUEST_TIMEOUT = 30
NORMALIZED_DIR = OUTPUT_DIR / "normalized"
SHELL_COMMANDS = (
"ssh", "htop", "pg_dump", "scp", "psql", "kubectl", "docker",
"sudo", "apt", "yum", "npm", "pip", "git", "curl", "wget",
"systemctl", "nginx", "redis-cli", "mysql", "mongod",
)
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class Chapter:
index: int
title: str
slug: str
raw_text: str
clean_text: str = ""
chunks: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Step 1: Parse chapters
# ---------------------------------------------------------------------------
def parse_chapters(text: str) -> list[Chapter]:
"""Split the markdown book into chapters on ``## Chapter`` headers.
Key Takeaways sections stay within their parent chapter.
The Dedication section at the end is included as the final chapter.
Everything before the first ``## Chapter`` header is skipped.
"""
chapter_pattern = re.compile(r"^## Chapter \d+:(?!.*Key Takeaways)", re.MULTILINE)
dedication_pattern = re.compile(r"^# Dedication", re.MULTILINE)
# Find all ## Chapter header positions
matches = list(chapter_pattern.finditer(text))
if not matches:
raise RuntimeError("No chapter headers found in the book.")
chapters: list[Chapter] = []
for i, match in enumerate(matches):
start = match.start()
# Section extends to the next ## Chapter header (or dedication / end)
if i + 1 < len(matches):
end = matches[i + 1].start()
else:
# After last ## Chapter, check for Dedication
ded = dedication_pattern.search(text, match.end())
end = ded.start() if ded else len(text)
header_line = text[start:text.index("\n", start)].strip()
raw = text[start:end].rstrip()
# Remove "Key Takeaways" that are ## level headers — they stay in chapter
# (already included since we split on ## Chapter, not ##)
title = re.sub(r"^##\s*", "", header_line).strip()
slug = _slugify(title)
chapters.append(Chapter(
index=len(chapters) + 1,
title=title,
slug=slug,
raw_text=raw,
))
# Include Dedication as the final section
ded_match = dedication_pattern.search(text)
if ded_match:
raw = text[ded_match.start():].rstrip()
chapters.append(Chapter(
index=len(chapters) + 1,
title="Dedication",
slug="dedication",
raw_text=raw,
))
return chapters
def _slugify(title: str) -> str:
s = title.lower()
s = re.sub(r"[^a-z0-9\s-]", "", s)
s = re.sub(r"[\s-]+", "_", s).strip("_")
return s[:60]
# ---------------------------------------------------------------------------
# Step 2: Preprocess text for TTS
# ---------------------------------------------------------------------------
def preprocess_text(raw: str) -> str:
text = raw
# 1. Replace fenced code blocks
text = re.sub(
r"```[\s\S]*?```",
"A code example is shown in the book.",
text,
)
# 2. Replace markdown tables (consecutive lines with |)
def _replace_table(m: re.Match) -> str:
return "A comparison table is shown in the book."
text = re.sub(r"(?:^[ \t]*\|.*\|[ \t]*\n){2,}", _replace_table, text, flags=re.MULTILINE)
# 3. Remove image references
text = re.sub(r"!\[[^\]]*\]\([^)]*\)", "", text)
# 4. Convert hyperlinks [text](url) -> text
text = re.sub(r"\[([^\]]+)\]\([^)]*\)", r"\1", text)
# 5. Strip bold
text = re.sub(r"\*\*([^*]+)\*\*", r"\1", text)
# 6. Strip italic (underscore style, respecting word boundaries)
text = re.sub(r"(?<!\w)_([^_]+)_(?!\w)", r"\1", text)
# 7. Strip inline code
text = re.sub(r"`([^`]+)`", r"\1", text)
# 8. Remove HTML <br/> tags
text = re.sub(r"<br\s*/?>", "", text)
# 9. Remove escaped asterisks
text = re.sub(r"\\\*\\\*", "", text)
text = re.sub(r"\\\*", "", text)
# 10. Convert headers (## Title) -> "Title."
text = re.sub(r"^#{1,6}\s+(.+)$", r"\1.", text, flags=re.MULTILINE)
# 11. Handle blockquotes
lines = text.split("\n")
processed: list[str] = []
for line in lines:
stripped = line.strip()
if stripped.startswith(">"):
content = stripped.lstrip(">").strip()
# Check if it starts with a shell command
first_word = content.split()[0] if content.split() else ""
# Strip angle-bracket emails/urls for command detection
first_word_clean = re.sub(r"<[^>]+>", "", first_word).strip()
if first_word_clean in SHELL_COMMANDS:
processed.append(f'The command {first_word_clean} was executed.')
else:
# Keep as dialogue / regular quote
processed.append(content)
else:
processed.append(line)
text = "\n".join(processed)
# 12. Collapse multiple blank lines, normalize whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ \t]+", " ", text)
text = text.strip()
return text
# ---------------------------------------------------------------------------
# Step 2.5: Normalize text for speech using LLM
# ---------------------------------------------------------------------------
NORMALIZE_SYSTEM_PROMPT = """\
You are a text normalizer preparing written content for a text-to-speech audiobook engine. \
The TTS engine reads text literally, so you must rewrite the text so it sounds natural when spoken aloud.
Rules — apply ALL of these:
1. PARENTHESES: Remove parentheses. Integrate the content naturally using commas, dashes, or rephrasing.
- "Redis (an in-memory cache)" → "Redis, an in-memory cache,"
- "(pronounced 'koo-ber-net-ees')" → "pronounced koo-ber-net-ees"
- "(~500 ms)" → "around 500 milliseconds"
2. ACRONYMS & ABBREVIATIONS: Expand or space out for speech.
- CPU/CPUs → "C P U" / "C P Us"
- API/APIs → "A P I" / "A P Is"
- CDN → "C D N", SQL → "S Q L", NoSQL → "No S Q L"
- AWS → "A W S", DNS → "D N S", CI/CD → "C I C D"
- SLA → "S L A", ORM → "O R M", CTO → "C T O"
- e.g. → "for example", i.e. → "that is", etc. → "etcetera", vs. → "versus"
- ms → "milliseconds", GB → "gigabytes", MB → "megabytes", kHz → "kilohertz"
- 10x → "ten times", 100x → "hundred times"
3. HYPHENS & DASHES: Replace decorative/separator hyphens and em-dashes.
- "---" or "—" used as separators → use commas or periods instead
- Keep hyphens in compound words ("real-time", "read-only")
4. SPECIAL CHARACTERS: Remove or replace.
- URLs → just say the domain name or remove entirely
- Email addresses → remove or say naturally
- ~ → "approximately" or "around"
- / when meaning "or" → "or"
- >= → "greater than or equal to", <= → "less than or equal to"
- != → "not equal to", == → "equals"
- % → "percent"
- & → "and"
- Remove any remaining *, #, _, `, |, >, < characters that are markdown artifacts
5. NUMBERS & UNITS:
- "1000" → "one thousand", "10,000" → "ten thousand" (for round numbers)
- Keep specific numbers as digits: "2,500 characters" is fine
- "48000" → "forty-eight thousand"
- "v2" → "version 2", "v3" → "version 3"
6. CODE & TECHNICAL TERMS: Make them speakable.
- File paths like "/etc/nginx/conf.d" → just remove or say "the nginx config file"
- Variable names in camelCase or snake_case → space them out: "readReplica" → "read replica"
- Config keys → just the meaningful word
- Any remaining code-like text → rephrase or remove
7. HINDI/URDU PHRASES: Keep them exactly as written. Do not translate or modify.
8. DO NOT change the meaning, add new information, or remove meaningful content.
9. DO NOT add any commentary, explanations, or meta-text.
10. Keep the output roughly the same length as the input. Do not significantly expand or compress.
11. Output ONLY the normalized text, nothing else.\
"""
def normalize_for_speech(text: str, anthropic_key: str, chapter_slug: str) -> str:
"""Normalize text for natural TTS using Claude Haiku. Caches results."""
cache_file = NORMALIZED_DIR / f"{chapter_slug}.txt"
if cache_file.exists():
cached = cache_file.read_text(encoding="utf-8")
if cached.strip():
return cached
NORMALIZED_DIR.mkdir(parents=True, exist_ok=True)
# Split into batches to stay within LLM context limits
paragraphs = text.split("\n\n")
batches = []
current_batch = ""
for para in paragraphs:
candidate = f"{current_batch}\n\n{para}".strip() if current_batch else para
if len(candidate) <= NORMALIZE_BATCH_CHARS:
current_batch = candidate
else:
if current_batch:
batches.append(current_batch)
current_batch = para
if current_batch:
batches.append(current_batch)
normalized_parts = []
for i, batch in enumerate(batches):
print(f" Normalizing batch {i + 1}/{len(batches)} ({len(batch)} chars)...")
result = _call_anthropic(batch, anthropic_key)
normalized_parts.append(result)
if i < len(batches) - 1:
time.sleep(0.5) # gentle rate limiting
normalized = "\n\n".join(normalized_parts)
cache_file.write_text(normalized, encoding="utf-8")
return normalized
def _call_anthropic(text: str, api_key: str) -> str:
"""Call Claude Haiku to normalize a batch of text."""
headers = {
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 8192,
"system": NORMALIZE_SYSTEM_PROMPT,
"messages": [
{"role": "user", "content": f"Normalize this text for audiobook TTS:\n\n{text}"}
],
}
for attempt in range(3):
try:
resp = requests.post(
ANTHROPIC_API_URL,
headers=headers,
json=payload,
timeout=60,
)
if resp.status_code == 200:
data = resp.json()
return data["content"][0]["text"]
if resp.status_code == 429:
time.sleep(5 * (attempt + 1))
continue
if resp.status_code >= 500:
time.sleep(2)
continue
resp.raise_for_status()
except requests.exceptions.Timeout:
time.sleep(2)
continue
print(f" WARNING: Normalization failed, using raw text")
return text
# ---------------------------------------------------------------------------
# Step 3: Chunk text
# ---------------------------------------------------------------------------
def chunk_text(text: str) -> list[str]:
"""Split text into chunks of at most MAX_CHUNK_CHARS characters."""
paragraphs = re.split(r"\n\n+", text)
chunks: list[str] = []
current = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
candidate = f"{current}\n\n{para}".strip() if current else para
if len(candidate) <= MAX_CHUNK_CHARS:
current = candidate
else:
# Flush current if non-empty
if current:
chunks.append(current)
current = ""
# Handle paragraph that itself exceeds the limit
if len(para) > MAX_CHUNK_CHARS:
chunks.extend(_split_long_paragraph(para))
else:
current = para
if current:
chunks.append(current)
return chunks
def _split_long_paragraph(para: str) -> list[str]:
"""Split a long paragraph at sentence, then clause, then word boundaries."""
# Try sentence boundaries first
sentences = re.split(r"(?<=[.?!])\s+", para)
if len(sentences) > 1:
return _merge_pieces(sentences)
# Try clause boundaries (comma, semicolon)
clauses = re.split(r"(?<=[,;])\s+", para)
if len(clauses) > 1:
return _merge_pieces(clauses)
# Last resort: split at word boundaries (never mid-word)
words = para.split()
return _merge_pieces(words, joiner=" ")
def _merge_pieces(pieces: list[str], joiner: str = " ") -> list[str]:
chunks: list[str] = []
current = ""
for piece in pieces:
candidate = f"{current}{joiner}{piece}".strip() if current else piece
if len(candidate) <= MAX_CHUNK_CHARS:
current = candidate
else:
if current:
chunks.append(current)
current = piece
if current:
chunks.append(current)
return chunks
# ---------------------------------------------------------------------------
# Step 4: Synthesize a single chunk via Sarvam TTS
# ---------------------------------------------------------------------------
def synthesize_chunk(text: str, api_key: str) -> bytes:
"""Call Sarvam TTS and return raw MP3 bytes."""
headers = {
"Content-Type": "application/json",
"api-subscription-key": api_key,
}
payload = {
"text": text,
"target_language_code": "en-IN",
"model": "bulbul:v3",
"speaker": "aditya",
"pace": 1.0,
"speech_sample_rate": 48000,
"output_audio_codec": "mp3",
}
backoff = 5
for attempt in range(4): # initial + 3 retries
try:
resp = requests.post(
SARVAM_TTS_URL,
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT,
)
if resp.status_code == 200:
data = resp.json()
return base64.b64decode(data["audios"][0])
if resp.status_code == 429:
wait = backoff * (2 ** attempt)
print(f" Rate limited (429). Waiting {wait}s...")
time.sleep(wait)
continue
if resp.status_code >= 500:
print(f" Server error ({resp.status_code}). Retrying in 2s...")
time.sleep(2)
continue
resp.raise_for_status()
except requests.exceptions.Timeout:
print(f" Timeout on attempt {attempt + 1}. Retrying...")
time.sleep(2)
continue
raise RuntimeError(f"Failed to synthesize chunk after retries. Last text: {text[:80]}...")
# ---------------------------------------------------------------------------
# Step 5: Generate audio for a full chapter
# ---------------------------------------------------------------------------
def generate_chapter_audio(chapter: Chapter, api_key: str) -> list[Path]:
"""Synthesize all chunks for a chapter, saving MP3 files. Supports resume."""
ch_label = f"ch{chapter.index:02d}"
ch_dir = CHUNKS_DIR / ch_label
ch_dir.mkdir(parents=True, exist_ok=True)
progress = _load_progress()
chunk_files: list[Path] = []
for i, chunk_text_content in enumerate(chapter.chunks, start=1):
filename = f"{ch_label}_{i:03d}.mp3"
filepath = ch_dir / filename
if filepath.exists():
print(f" Chunk {i}/{len(chapter.chunks)} already exists, skipping.")
chunk_files.append(filepath)
continue
print(f" Synthesizing chunk {i}/{len(chapter.chunks)} ({len(chunk_text_content)} chars)...")
audio_bytes = synthesize_chunk(chunk_text_content, api_key)
filepath.write_bytes(audio_bytes)
chunk_files.append(filepath)
# Update progress
progress.setdefault("chapters", {})
progress["chapters"].setdefault(ch_label, {"completed_chunks": 0, "total_chunks": len(chapter.chunks)})
progress["chapters"][ch_label]["completed_chunks"] = i
_save_progress(progress)
# Rate limiting
if i < len(chapter.chunks):
time.sleep(REQUEST_INTERVAL)
return chunk_files
# ---------------------------------------------------------------------------
# Step 6: Concatenate chapter chunks into a single chapter MP3
# ---------------------------------------------------------------------------
def concatenate_chapter(chapter: Chapter) -> Path:
ch_label = f"ch{chapter.index:02d}"
ch_dir = CHUNKS_DIR / ch_label
output_path = CHAPTERS_DIR / f"{ch_label}_{chapter.slug}.mp3"
CHAPTERS_DIR.mkdir(parents=True, exist_ok=True)
chunk_files = sorted(ch_dir.glob(f"{ch_label}_*.mp3"))
if not chunk_files:
raise RuntimeError(f"No chunk files found for {ch_label}")
silence = AudioSegment.silent(duration=300)
combined = AudioSegment.empty()
for i, cf in enumerate(chunk_files):
segment = AudioSegment.from_mp3(str(cf))
if i > 0:
combined += silence
combined += segment
combined.export(
str(output_path),
format="mp3",
bitrate="192k",
parameters=["-ar", "48000"],
)
print(f" Exported {output_path.name} ({len(combined) / 1000:.1f}s)")
return output_path
# ---------------------------------------------------------------------------
# Step 7: Concatenate all chapters into the full audiobook
# ---------------------------------------------------------------------------
def concatenate_full_book(chapter_files: list[Path]) -> Path:
silence = AudioSegment.silent(duration=2000)
combined = AudioSegment.empty()
for i, cf in enumerate(chapter_files):
print(f" Loading {cf.name}...")
segment = AudioSegment.from_mp3(str(cf))
if i > 0:
combined += silence
combined += segment
combined.export(
str(FULL_BOOK_FILE),
format="mp3",
bitrate="192k",
parameters=["-ar", "48000"],
)
print(f" Full book exported: {FULL_BOOK_FILE.name} ({len(combined) / 1000:.1f}s)")
return FULL_BOOK_FILE
# ---------------------------------------------------------------------------
# Step 8: Add metadata (ID3 tags) and write manifest
# ---------------------------------------------------------------------------
def add_metadata(chapters: list[Chapter], chapter_files: list[Path], total_cost_chars: int):
# Tag individual chapter files (skip missing files in single-chapter mode)
for chapter, filepath in zip(chapters, chapter_files):
if not filepath.exists():
continue
try:
audio = MP3(str(filepath))
if audio.tags is None:
audio.add_tags()
except Exception:
audio = MP3(str(filepath))
audio.add_tags()
audio.tags.add(TIT2(encoding=3, text=chapter.title))
audio.tags.add(TPE1(encoding=3, text="Subhash Choudhary"))
audio.tags.add(TALB(encoding=3, text="The Accidental CTO"))
audio.tags.add(TRCK(encoding=3, text=str(chapter.index)))
audio.save()
# Tag full book if it exists
if FULL_BOOK_FILE.exists():
try:
audio = MP3(str(FULL_BOOK_FILE))
if audio.tags is None:
audio.add_tags()
except Exception:
audio = MP3(str(FULL_BOOK_FILE))
audio.add_tags()
audio.tags.add(TIT2(encoding=3, text="The Accidental CTO"))
audio.tags.add(TPE1(encoding=3, text="Subhash Choudhary"))
audio.tags.add(TALB(encoding=3, text="The Accidental CTO"))
audio.save()
# Build manifest
manifest = {
"title": "The Accidental CTO",
"author": "Subhash Choudhary",
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_chapters": len(chapters),
"total_speakable_chars": total_cost_chars,
"estimated_cost_inr": round(total_cost_chars / 10000 * 30, 2),
"chapters": [],
}
for chapter, filepath in zip(chapters, chapter_files):
duration = 0.0
if filepath.exists():
try:
audio = MP3(str(filepath))
duration = audio.info.length
except Exception:
pass
manifest["chapters"].append({
"index": chapter.index,
"title": chapter.title,
"slug": chapter.slug,
"chunks": len(chapter.chunks),
"chars": len(chapter.clean_text),
"duration_seconds": round(duration, 2),
"file": filepath.name,
})
MANIFEST_FILE.write_text(json.dumps(manifest, indent=2))
print(f" Manifest written to {MANIFEST_FILE.name}")
# ---------------------------------------------------------------------------
# Progress helpers
# ---------------------------------------------------------------------------
def _load_progress() -> dict:
if PROGRESS_FILE.exists():
return json.loads(PROGRESS_FILE.read_text())
return {}
def _save_progress(data: dict):
PROGRESS_FILE.write_text(json.dumps(data, indent=2))
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Generate audiobook from The Accidental CTO")
parser.add_argument("--dry-run", action="store_true", help="Parse and show stats without making API calls")
parser.add_argument("--chapter", type=int, help="Generate a single chapter by sequential index")
parser.add_argument("--skip-tts", action="store_true", help="Skip TTS; only concatenate and tag existing chunks")
parser.add_argument("--skip-normalize", action="store_true", help="Skip LLM normalization step")
args = parser.parse_args()
# Load API keys
load_dotenv(OUTPUT_DIR.parent / ".env")
api_key = os.getenv("SARVAM_API_KEY") or os.getenv("SARVAM_API+KEY") or ""
anthropic_key = os.getenv("ANTHROPIC_API_KEY") or ""
if not api_key and not args.dry_run and not args.skip_tts:
print("ERROR: No API key found. Set SARVAM_API_KEY or SARVAM_API+KEY in .env")
return
if not anthropic_key and not args.dry_run and not args.skip_normalize:
print("ERROR: No ANTHROPIC_API_KEY found in .env (needed for text normalization)")
return
# Step 1: Parse chapters
print("=" * 60)
print("STEP 1: Parsing chapters")
print("=" * 60)
book_text = BOOK_PATH.read_text(encoding="utf-8")
chapters = parse_chapters(book_text)
print(f" Found {len(chapters)} chapters (including Dedication)")
# Step 2: Preprocess
print("\nSTEP 2: Preprocessing text for TTS")
print("=" * 60)
for ch in chapters:
ch.clean_text = preprocess_text(ch.raw_text)
print(f" Ch {ch.index:2d}: {ch.title[:50]:50s} | raw={len(ch.raw_text):6d} -> clean={len(ch.clean_text):6d}")
# Step 2.5: Normalize for speech using LLM
if not args.skip_normalize:
print("\nSTEP 2.5: Normalizing text for speech (Claude Haiku)")
print("=" * 60)
for ch in chapters:
cache_file = NORMALIZED_DIR / f"{ch.slug}.txt"
if cache_file.exists() and cache_file.read_text(encoding="utf-8").strip():
print(f" Ch {ch.index:2d}: cached ({len(cache_file.read_text()):,} chars)")
ch.clean_text = cache_file.read_text(encoding="utf-8")
else:
print(f" Ch {ch.index:2d}: normalizing {ch.title[:40]}...")
ch.clean_text = normalize_for_speech(ch.clean_text, anthropic_key, ch.slug)
print(f" -> {len(ch.clean_text):,} chars")
else:
print("\n Skipping normalization (--skip-normalize)")
# Step 3: Chunk
print("\nSTEP 3: Chunking text")
print("=" * 60)
total_chunks = 0
total_chars = 0
for ch in chapters:
ch.chunks = chunk_text(ch.clean_text)
total_chunks += len(ch.chunks)
total_chars += len(ch.clean_text)
print(f" Ch {ch.index:2d}: {len(ch.chunks):3d} chunks")
print(f"\n Total chunks: {total_chunks}")
print(f" Total speakable chars: {total_chars:,}")
print(f" Estimated cost: Rs {total_chars / 10000 * 30:.0f} (~${total_chars / 10000 * 30 / 85:.1f} USD)")
print(f" Estimated API time: {total_chunks * REQUEST_INTERVAL / 60:.1f} min")
if args.dry_run:
print("\n[DRY RUN] Stopping before TTS. No API calls made.")
return
# Determine which chapters to process
if args.chapter:
targets = [ch for ch in chapters if ch.index == args.chapter]
if not targets:
print(f"ERROR: Chapter {args.chapter} not found (valid: 1-{len(chapters)})")
return
else:
targets = chapters
# Step 4 & 5: Synthesize chunks
if not args.skip_tts:
print(f"\nSTEP 4-5: Synthesizing audio ({len(targets)} chapter(s))")
print("=" * 60)
for ch in targets:
print(f"\n --- Chapter {ch.index}: {ch.title} ({len(ch.chunks)} chunks) ---")
generate_chapter_audio(ch, api_key)
# Step 6: Concatenate chapters
print(f"\nSTEP 6: Concatenating chapter audio")
print("=" * 60)
chapter_files: list[Path] = []
for ch in targets:
ch_label = f"ch{ch.index:02d}"
ch_dir = CHUNKS_DIR / ch_label
if ch_dir.exists() and list(ch_dir.glob("*.mp3")):
filepath = concatenate_chapter(ch)
chapter_files.append(filepath)
else:
print(f" Skipping Ch {ch.index} (no chunks found)")
# Step 7: Concatenate full book (only if processing all chapters)
if not args.chapter and len(chapter_files) == len(chapters):
print(f"\nSTEP 7: Concatenating full audiobook")
print("=" * 60)
concatenate_full_book(chapter_files)
elif args.chapter:
print(f"\n Skipping full book concatenation (single chapter mode)")
# Step 8: Metadata and manifest
print(f"\nSTEP 8: Adding metadata and writing manifest")
print("=" * 60)
# For manifest, always use the full chapters list
all_chapter_files: list[Path] = []
for ch in chapters:
expected = CHAPTERS_DIR / f"ch{ch.index:02d}_{ch.slug}.mp3"
all_chapter_files.append(expected)
add_metadata(chapters, all_chapter_files, total_chars)
print("\nDone!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment