Created
February 13, 2026 18:36
-
-
Save stephendolan/8fa6ec68fc34abc90263f41c826afafa to your computer and use it in GitHub Desktop.
transcribe-call-ended
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Tuple Trigger: Transcribe Call | |
| Fires on call-ended. Reads WAV files and Events.txt from the recording | |
| directory, transcribes audio with whisper-cpp, and writes Summary.md. | |
| Requires: | |
| 1. brew install whisper-cpp | |
| 2. Download a GGML model: curl -L -o ~/.local/share/whisper-cpp/models/ggml-large-v3.bin \ | |
| https://huggingface.co/ggerganov/whisper.cpp/resolve/main/ggml-large-v3.bin | |
| Environment variables (from Tuple): | |
| TUPLE_TRIGGER_CALL_ARTIFACTS_DIR - Path to the call artifacts directory | |
| Optional environment variables: | |
| WHISPER_MODEL - Path to a GGML model file (overrides default) | |
| """ | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| # --- Configuration --- | |
| DEFAULT_MODEL = os.path.expanduser( | |
| "~/.local/share/whisper-cpp/models/ggml-large-v3.bin" | |
| ) | |
| WHISPER_CLI = "whisper-cli" | |
| # --- Parsing --- | |
| def parse_wav_filename(filename): | |
| """Extract user ID and start timestamp from a WAV filename. | |
| Format: User{id}@{yyyy-MM-dd_HH.mm.ss.SSS}.wav | |
| """ | |
| match = re.match( | |
| r"User(\d+)@(\d{4}-\d{2}-\d{2}_\d{2}\.\d{2}\.\d{2}\.\d{3})", filename | |
| ) | |
| if not match: | |
| return None, None | |
| user_id = match.group(1) | |
| dt = datetime.strptime(match.group(2), "%Y-%m-%d_%H.%M.%S.%f") | |
| return user_id, dt | |
| def parse_event_line(line): | |
| """Parse an Events.txt line -> (datetime, message) or (None, None).""" | |
| match = re.match( | |
| r"\[(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] (.+)", line.strip() | |
| ) | |
| if not match: | |
| return None, None | |
| dt = datetime.strptime(match.group(1), "%Y/%m/%d %H:%M:%S.%f") | |
| return dt, match.group(2) | |
| # --- Data Loading --- | |
| def load_events(events_path): | |
| """Load categorized events and extract participant names.""" | |
| events = [] | |
| user_names = {} | |
| # Maps message prefix -> event type for user-related events | |
| user_event_types = { | |
| "This user is:": "self_identified", | |
| "Peer already on call:": "peer_present", | |
| "Peer joined call:": "peer_joined", | |
| "Peer left call:": "peer_left", | |
| } | |
| with open(events_path) as f: | |
| for line in f: | |
| dt, message = parse_event_line(line) | |
| if dt is None: | |
| continue | |
| if "Call joined" in message: | |
| call_id_match = re.search(r"id = (.+)", message) | |
| events.append((dt, "call_start", { | |
| "call_id": call_id_match.group(1) if call_id_match else "unknown" | |
| })) | |
| elif "Call ended" in message: | |
| events.append((dt, "call_end", {})) | |
| else: | |
| user_match = re.search( | |
| r'UserID (\d+), "([^"]+)" <([^>]+)>', message | |
| ) | |
| if not user_match: | |
| continue | |
| for prefix, event_type in user_event_types.items(): | |
| if prefix in message: | |
| uid, name, email = user_match.group(1, 2, 3) | |
| user_data = {"user_id": uid, "name": name, "email": email} | |
| if event_type != "peer_left": | |
| user_names[uid] = name | |
| events.append((dt, event_type, user_data)) | |
| break | |
| return events, user_names | |
| def transcribe_wav(wav_path, model_path, output_dir): | |
| """Run whisper-cli on a single WAV file, return the JSON transcription path.""" | |
| stem = Path(wav_path).stem | |
| output_base = os.path.join(output_dir, stem) | |
| result = subprocess.run( | |
| [ | |
| WHISPER_CLI, | |
| "-m", model_path, | |
| "-l", "en", | |
| "-np", | |
| "-oj", | |
| "-of", output_base, | |
| wav_path, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.returncode != 0: | |
| print(f" Warning: whisper-cli failed on {Path(wav_path).name}: {result.stderr.strip()}", file=sys.stderr) | |
| return None | |
| json_path = output_base + ".json" | |
| return json_path if os.path.exists(json_path) else None | |
| def load_transcription(json_path): | |
| """Load a single whisper JSON and convert to absolute-timestamped segments.""" | |
| user_id, file_start_dt = parse_wav_filename(Path(json_path).stem) | |
| if user_id is None: | |
| return [] | |
| with open(json_path) as f: | |
| data = json.load(f) | |
| segments = [] | |
| for entry in data.get("transcription", []): | |
| text = entry["text"].strip() | |
| if not text: | |
| continue | |
| offset_ms = entry["offsets"]["from"] | |
| abs_dt = file_start_dt + timedelta(milliseconds=offset_ms) | |
| segments.append((abs_dt, "speech", {"user_id": user_id, "text": text})) | |
| return segments | |
| # --- Markdown Generation --- | |
| def first_name(full_name): | |
| return full_name.split()[0] | |
| def format_time(dt): | |
| return dt.strftime("%H:%M:%S") | |
| def generate_markdown(events, transcriptions, user_names): | |
| all_entries = sorted(events + transcriptions, key=lambda x: x[0]) | |
| lines = [] | |
| if all_entries: | |
| call_date = all_entries[0][0].strftime("%Y-%m-%d") | |
| call_time = all_entries[0][0].strftime("%H:%M") | |
| else: | |
| call_date = call_time = "Unknown" | |
| lines.append(f"# Call Summary - {call_date} {call_time}") | |
| lines.append("") | |
| if user_names: | |
| lines.append("## Participants") | |
| lines.append("") | |
| for uid, name in sorted(user_names.items(), key=lambda x: x[1]): | |
| lines.append(f"- {name}") | |
| lines.append("") | |
| lines.append("## Timeline") | |
| lines.append("") | |
| peers_at_start = { | |
| data["user_id"] | |
| for _, event_type, data in all_entries | |
| if event_type == "peer_present" | |
| } | |
| last_speaker = None | |
| prev_was_event = False | |
| for dt, event_type, data in all_entries: | |
| time_str = format_time(dt) | |
| if event_type == "peer_joined" and data["user_id"] in peers_at_start: | |
| continue | |
| if event_type in ("call_start", "call_end", "self_identified", | |
| "peer_present", "peer_joined", "peer_left"): | |
| if not prev_was_event and lines and lines[-1] != "": | |
| lines.append("") | |
| label = { | |
| "call_start": "Call started", | |
| "call_end": "Call ended", | |
| "self_identified": f"{data.get('name', '?')} joined", | |
| "peer_present": f"{data.get('name', '?')} was already on the call", | |
| "peer_joined": f"{data.get('name', '?')} joined", | |
| "peer_left": f"{data.get('name', '?')} left", | |
| }[event_type] | |
| lines.append(f"*{time_str} -- {label}*") | |
| prev_was_event = True | |
| last_speaker = None | |
| elif event_type == "speech": | |
| if prev_was_event: | |
| lines.append("") | |
| speaker_name = user_names.get(data["user_id"], f"User{data['user_id']}") | |
| speaker_first = first_name(speaker_name) | |
| text = data["text"] | |
| if speaker_first == last_speaker: | |
| lines[-1] += " " + text | |
| else: | |
| lines.append(f"**{speaker_first}** [{time_str}]: {text}") | |
| last_speaker = speaker_first | |
| prev_was_event = False | |
| return "\n".join(lines) + "\n" | |
| # --- Main --- | |
| def main(): | |
| recording_dir = os.environ.get("TUPLE_TRIGGER_CALL_ARTIFACTS_DIR") | |
| if not recording_dir: | |
| print("Error: TUPLE_TRIGGER_CALL_ARTIFACTS_DIR not set", file=sys.stderr) | |
| sys.exit(1) | |
| if not os.path.isdir(recording_dir): | |
| print(f"Error: Recording directory not found: {recording_dir}", file=sys.stderr) | |
| sys.exit(1) | |
| model_path = os.environ.get("WHISPER_MODEL", DEFAULT_MODEL) | |
| if not os.path.exists(model_path): | |
| print(f"Error: Whisper model not found: {model_path}", file=sys.stderr) | |
| print("Install with: brew install whisper-cpp", file=sys.stderr) | |
| print(f"Download model to: {model_path}", file=sys.stderr) | |
| sys.exit(1) | |
| events_path = os.path.join(recording_dir, "Events.txt") | |
| if not os.path.exists(events_path): | |
| print(f"Error: Events.txt not found in {recording_dir}", file=sys.stderr) | |
| sys.exit(1) | |
| # Discover WAV files | |
| wav_files = sorted(Path(recording_dir).glob("*.wav")) | |
| if not wav_files: | |
| print("No WAV files found, nothing to transcribe.", file=sys.stderr) | |
| sys.exit(0) | |
| print(f"Transcribing {len(wav_files)} audio segments...") | |
| # Transcribe into a temp directory | |
| with tempfile.TemporaryDirectory(prefix="tuple-transcribe-") as tmp_dir: | |
| all_segments = [] | |
| for i, wav in enumerate(wav_files, 1): | |
| print(f" [{i}/{len(wav_files)}] {wav.name}") | |
| json_path = transcribe_wav(str(wav), model_path, tmp_dir) | |
| if json_path: | |
| all_segments.extend(load_transcription(json_path)) | |
| # Load events | |
| events, user_names = load_events(events_path) | |
| # Generate markdown | |
| markdown = generate_markdown(events, all_segments, user_names) | |
| output_path = os.path.join(recording_dir, "Summary.md") | |
| with open(output_path, "w") as f: | |
| f.write(markdown) | |
| print(f"Written to {output_path}") | |
| print(f" {len(events)} events, {len(all_segments)} speech segments") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment