Skip to content

Instantly share code, notes, and snippets.

@stephendolan
Created February 13, 2026 18:36
Show Gist options
  • Select an option

  • Save stephendolan/8fa6ec68fc34abc90263f41c826afafa to your computer and use it in GitHub Desktop.

Select an option

Save stephendolan/8fa6ec68fc34abc90263f41c826afafa to your computer and use it in GitHub Desktop.
transcribe-call-ended
#!/usr/bin/env python3
"""
Tuple Trigger: Transcribe Call
Fires on call-ended. Reads WAV files and Events.txt from the recording
directory, transcribes audio with whisper-cpp, and writes Summary.md.
Requires:
1. brew install whisper-cpp
2. Download a GGML model: curl -L -o ~/.local/share/whisper-cpp/models/ggml-large-v3.bin \
https://huggingface.co/ggerganov/whisper.cpp/resolve/main/ggml-large-v3.bin
Environment variables (from Tuple):
TUPLE_TRIGGER_CALL_ARTIFACTS_DIR - Path to the call artifacts directory
Optional environment variables:
WHISPER_MODEL - Path to a GGML model file (overrides default)
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
# --- Configuration ---
DEFAULT_MODEL = os.path.expanduser(
"~/.local/share/whisper-cpp/models/ggml-large-v3.bin"
)
WHISPER_CLI = "whisper-cli"
# --- Parsing ---
def parse_wav_filename(filename):
"""Extract user ID and start timestamp from a WAV filename.
Format: User{id}@{yyyy-MM-dd_HH.mm.ss.SSS}.wav
"""
match = re.match(
r"User(\d+)@(\d{4}-\d{2}-\d{2}_\d{2}\.\d{2}\.\d{2}\.\d{3})", filename
)
if not match:
return None, None
user_id = match.group(1)
dt = datetime.strptime(match.group(2), "%Y-%m-%d_%H.%M.%S.%f")
return user_id, dt
def parse_event_line(line):
"""Parse an Events.txt line -> (datetime, message) or (None, None)."""
match = re.match(
r"\[(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] (.+)", line.strip()
)
if not match:
return None, None
dt = datetime.strptime(match.group(1), "%Y/%m/%d %H:%M:%S.%f")
return dt, match.group(2)
# --- Data Loading ---
def load_events(events_path):
"""Load categorized events and extract participant names."""
events = []
user_names = {}
# Maps message prefix -> event type for user-related events
user_event_types = {
"This user is:": "self_identified",
"Peer already on call:": "peer_present",
"Peer joined call:": "peer_joined",
"Peer left call:": "peer_left",
}
with open(events_path) as f:
for line in f:
dt, message = parse_event_line(line)
if dt is None:
continue
if "Call joined" in message:
call_id_match = re.search(r"id = (.+)", message)
events.append((dt, "call_start", {
"call_id": call_id_match.group(1) if call_id_match else "unknown"
}))
elif "Call ended" in message:
events.append((dt, "call_end", {}))
else:
user_match = re.search(
r'UserID (\d+), "([^"]+)" <([^>]+)>', message
)
if not user_match:
continue
for prefix, event_type in user_event_types.items():
if prefix in message:
uid, name, email = user_match.group(1, 2, 3)
user_data = {"user_id": uid, "name": name, "email": email}
if event_type != "peer_left":
user_names[uid] = name
events.append((dt, event_type, user_data))
break
return events, user_names
def transcribe_wav(wav_path, model_path, output_dir):
"""Run whisper-cli on a single WAV file, return the JSON transcription path."""
stem = Path(wav_path).stem
output_base = os.path.join(output_dir, stem)
result = subprocess.run(
[
WHISPER_CLI,
"-m", model_path,
"-l", "en",
"-np",
"-oj",
"-of", output_base,
wav_path,
],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(f" Warning: whisper-cli failed on {Path(wav_path).name}: {result.stderr.strip()}", file=sys.stderr)
return None
json_path = output_base + ".json"
return json_path if os.path.exists(json_path) else None
def load_transcription(json_path):
"""Load a single whisper JSON and convert to absolute-timestamped segments."""
user_id, file_start_dt = parse_wav_filename(Path(json_path).stem)
if user_id is None:
return []
with open(json_path) as f:
data = json.load(f)
segments = []
for entry in data.get("transcription", []):
text = entry["text"].strip()
if not text:
continue
offset_ms = entry["offsets"]["from"]
abs_dt = file_start_dt + timedelta(milliseconds=offset_ms)
segments.append((abs_dt, "speech", {"user_id": user_id, "text": text}))
return segments
# --- Markdown Generation ---
def first_name(full_name):
return full_name.split()[0]
def format_time(dt):
return dt.strftime("%H:%M:%S")
def generate_markdown(events, transcriptions, user_names):
all_entries = sorted(events + transcriptions, key=lambda x: x[0])
lines = []
if all_entries:
call_date = all_entries[0][0].strftime("%Y-%m-%d")
call_time = all_entries[0][0].strftime("%H:%M")
else:
call_date = call_time = "Unknown"
lines.append(f"# Call Summary - {call_date} {call_time}")
lines.append("")
if user_names:
lines.append("## Participants")
lines.append("")
for uid, name in sorted(user_names.items(), key=lambda x: x[1]):
lines.append(f"- {name}")
lines.append("")
lines.append("## Timeline")
lines.append("")
peers_at_start = {
data["user_id"]
for _, event_type, data in all_entries
if event_type == "peer_present"
}
last_speaker = None
prev_was_event = False
for dt, event_type, data in all_entries:
time_str = format_time(dt)
if event_type == "peer_joined" and data["user_id"] in peers_at_start:
continue
if event_type in ("call_start", "call_end", "self_identified",
"peer_present", "peer_joined", "peer_left"):
if not prev_was_event and lines and lines[-1] != "":
lines.append("")
label = {
"call_start": "Call started",
"call_end": "Call ended",
"self_identified": f"{data.get('name', '?')} joined",
"peer_present": f"{data.get('name', '?')} was already on the call",
"peer_joined": f"{data.get('name', '?')} joined",
"peer_left": f"{data.get('name', '?')} left",
}[event_type]
lines.append(f"*{time_str} -- {label}*")
prev_was_event = True
last_speaker = None
elif event_type == "speech":
if prev_was_event:
lines.append("")
speaker_name = user_names.get(data["user_id"], f"User{data['user_id']}")
speaker_first = first_name(speaker_name)
text = data["text"]
if speaker_first == last_speaker:
lines[-1] += " " + text
else:
lines.append(f"**{speaker_first}** [{time_str}]: {text}")
last_speaker = speaker_first
prev_was_event = False
return "\n".join(lines) + "\n"
# --- Main ---
def main():
recording_dir = os.environ.get("TUPLE_TRIGGER_CALL_ARTIFACTS_DIR")
if not recording_dir:
print("Error: TUPLE_TRIGGER_CALL_ARTIFACTS_DIR not set", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(recording_dir):
print(f"Error: Recording directory not found: {recording_dir}", file=sys.stderr)
sys.exit(1)
model_path = os.environ.get("WHISPER_MODEL", DEFAULT_MODEL)
if not os.path.exists(model_path):
print(f"Error: Whisper model not found: {model_path}", file=sys.stderr)
print("Install with: brew install whisper-cpp", file=sys.stderr)
print(f"Download model to: {model_path}", file=sys.stderr)
sys.exit(1)
events_path = os.path.join(recording_dir, "Events.txt")
if not os.path.exists(events_path):
print(f"Error: Events.txt not found in {recording_dir}", file=sys.stderr)
sys.exit(1)
# Discover WAV files
wav_files = sorted(Path(recording_dir).glob("*.wav"))
if not wav_files:
print("No WAV files found, nothing to transcribe.", file=sys.stderr)
sys.exit(0)
print(f"Transcribing {len(wav_files)} audio segments...")
# Transcribe into a temp directory
with tempfile.TemporaryDirectory(prefix="tuple-transcribe-") as tmp_dir:
all_segments = []
for i, wav in enumerate(wav_files, 1):
print(f" [{i}/{len(wav_files)}] {wav.name}")
json_path = transcribe_wav(str(wav), model_path, tmp_dir)
if json_path:
all_segments.extend(load_transcription(json_path))
# Load events
events, user_names = load_events(events_path)
# Generate markdown
markdown = generate_markdown(events, all_segments, user_names)
output_path = os.path.join(recording_dir, "Summary.md")
with open(output_path, "w") as f:
f.write(markdown)
print(f"Written to {output_path}")
print(f" {len(events)} events, {len(all_segments)} speech segments")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment