Last active
February 25, 2026 22:34
-
-
Save vishalsachdev/cae45dc48e2b26d92fced8fd488e303c to your computer and use it in GitHub Desktop.
Export all Granola meeting notes to local markdown files (bypasses 30-day MCP limit)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Export all Granola meeting notes to local markdown files. | |
| Reads auth token from the local Granola desktop app config, fetches all documents | |
| via the REST API (bypassing the 30-day MCP limit), and saves each meeting as a | |
| YAML-frontmatter markdown file with AI summary and full transcript. | |
| Usage: | |
| pip install requests | |
| python export.py # Export all meetings | |
| python export.py --output ./notes # Custom output directory | |
| """ | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import requests | |
| # --- Configuration --- | |
| CUTOFF_DATE = None # Set to datetime(...) to only export meetings before a date, or None for all | |
| OUTPUT_DIR = Path(sys.argv[sys.argv.index("--output") + 1] if "--output" in sys.argv else "./granola-export") | |
| SUPABASE_PATH = Path.home() / "Library/Application Support/Granola/supabase.json" | |
| API_BASE = "https://api.granola.ai" | |
| PAGE_SIZE = 100 | |
| REQUEST_DELAY = 0.5 # seconds between transcript API calls | |
| def load_auth_token() -> str: | |
| """Read the access token from Granola's supabase.json.""" | |
| with open(SUPABASE_PATH) as f: | |
| data = json.load(f) | |
| tokens = json.loads(data["workos_tokens"]) | |
| return tokens["access_token"] | |
| def make_slug(title: str, max_len: int = 50) -> str: | |
| """Convert a title to a URL-friendly slug.""" | |
| if not title: | |
| return "untitled" | |
| # Lowercase and replace non-alphanumeric with hyphens | |
| slug = re.sub(r"[^a-z0-9]+", "-", title.lower()) | |
| # Strip leading/trailing hyphens | |
| slug = slug.strip("-") | |
| # Truncate to max_len, breaking at word boundary | |
| if len(slug) > max_len: | |
| slug = slug[:max_len].rsplit("-", 1)[0] | |
| return slug or "untitled" | |
| def fetch_all_documents(headers: dict) -> list[dict]: | |
| """Fetch all documents from the API, paginating as needed.""" | |
| all_docs = [] | |
| offset = 0 | |
| while True: | |
| resp = requests.post( | |
| f"{API_BASE}/v2/get-documents", | |
| json={"limit": PAGE_SIZE, "offset": offset, "include_last_viewed_panel": True}, | |
| headers=headers, | |
| ) | |
| resp.raise_for_status() | |
| result = resp.json() | |
| docs = result.get("docs", []) | |
| all_docs.extend(docs) | |
| print(f" Fetched {len(docs)} documents (offset={offset}, total so far={len(all_docs)})") | |
| if len(docs) < PAGE_SIZE: | |
| break | |
| offset += PAGE_SIZE | |
| time.sleep(REQUEST_DELAY) | |
| return all_docs | |
| def fetch_transcript(doc_id: str, headers: dict) -> list[dict]: | |
| """Fetch the transcript for a given document.""" | |
| resp = requests.post( | |
| f"{API_BASE}/v1/get-document-transcript", | |
| json={"document_id": doc_id}, | |
| headers=headers, | |
| ) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def prosemirror_to_markdown(node: dict, depth: int = 0) -> str: | |
| """Convert a ProseMirror JSON document to markdown text.""" | |
| if not node or not isinstance(node, dict): | |
| return "" | |
| node_type = node.get("type", "") | |
| content = node.get("content", []) | |
| attrs = node.get("attrs", {}) | |
| marks = node.get("marks", []) | |
| text = node.get("text", "") | |
| # Handle text nodes with marks | |
| if node_type == "text": | |
| result = text | |
| for mark in marks: | |
| mark_type = mark.get("type", "") | |
| if mark_type == "bold": | |
| result = f"**{result}**" | |
| elif mark_type == "italic": | |
| result = f"*{result}*" | |
| elif mark_type == "link": | |
| href = mark.get("attrs", {}).get("href", "") | |
| result = f"[{result}]({href})" | |
| elif mark_type == "code": | |
| result = f"`{result}`" | |
| return result | |
| # Recurse into children | |
| children_text = "".join(prosemirror_to_markdown(child, depth) for child in content) | |
| if node_type == "doc": | |
| return children_text | |
| elif node_type == "paragraph": | |
| return children_text + "\n" | |
| elif node_type == "heading": | |
| level = attrs.get("level", 1) | |
| prefix = "#" * level | |
| return f"{prefix} {children_text}\n\n" | |
| elif node_type == "bulletList": | |
| # Process list items with indentation | |
| items = [] | |
| for child in content: | |
| items.append(render_list_item(child, depth, ordered=False)) | |
| return "".join(items) + ("\n" if depth == 0 else "") | |
| elif node_type == "orderedList": | |
| items = [] | |
| for i, child in enumerate(content, 1): | |
| items.append(render_list_item(child, depth, ordered=True, index=i)) | |
| return "".join(items) + ("\n" if depth == 0 else "") | |
| elif node_type == "listItem": | |
| return children_text | |
| elif node_type == "horizontalRule": | |
| return "---\n\n" | |
| elif node_type == "blockquote": | |
| lines = children_text.strip().split("\n") | |
| return "\n".join(f"> {line}" for line in lines) + "\n\n" | |
| elif node_type == "codeBlock": | |
| lang = attrs.get("language", "") | |
| return f"```{lang}\n{children_text}```\n\n" | |
| elif node_type == "hardBreak": | |
| return "\n" | |
| else: | |
| return children_text | |
| def render_list_item(node: dict, depth: int, ordered: bool = False, index: int = 1) -> str: | |
| """Render a list item with proper indentation and nested lists.""" | |
| indent = " " * depth | |
| prefix = f"{index}. " if ordered else "- " | |
| result = "" | |
| content = node.get("content", []) | |
| for i, child in enumerate(content): | |
| child_type = child.get("type", "") | |
| if child_type == "paragraph": | |
| para_text = "".join( | |
| prosemirror_to_markdown(c, depth) for c in child.get("content", []) | |
| ) | |
| if i == 0: | |
| result += f"{indent}{prefix}{para_text}\n" | |
| else: | |
| result += f"{indent} {para_text}\n" | |
| elif child_type in ("bulletList", "orderedList"): | |
| # Nested list | |
| for j, sub_item in enumerate(child.get("content", []), 1): | |
| result += render_list_item( | |
| sub_item, depth + 1, ordered=(child_type == "orderedList"), index=j | |
| ) | |
| else: | |
| # Fallback: render as markdown | |
| text = prosemirror_to_markdown(child, depth) | |
| if text.strip(): | |
| result += f"{indent} {text}" | |
| return result | |
| def format_transcript(segments: list[dict]) -> str: | |
| """Format transcript segments into readable markdown.""" | |
| if not segments: | |
| return "*No transcript available.*\n" | |
| lines = [] | |
| for seg in segments: | |
| text = seg.get("text", "").strip() | |
| if not text: | |
| continue | |
| start = seg.get("start_timestamp", "") | |
| # Format timestamp if available | |
| ts_str = "" | |
| if start: | |
| try: | |
| dt = datetime.fromisoformat(start.replace("Z", "+00:00")) | |
| ts_str = f"[{dt.strftime('%H:%M:%S')}] " | |
| except (ValueError, TypeError): | |
| pass | |
| lines.append(f"{ts_str}{text}") | |
| return "\n\n".join(lines) + "\n" if lines else "*No transcript available.*\n" | |
| def extract_participants(doc: dict) -> list[str]: | |
| """Extract participant names/emails from document metadata.""" | |
| participants = [] | |
| # From people.creator | |
| people = doc.get("people") or {} | |
| if isinstance(people, dict): | |
| creator = people.get("creator", {}) | |
| if creator: | |
| name = creator.get("name", "") | |
| email = creator.get("email", "") | |
| if name and email: | |
| participants.append(f"{name} <{email}>") | |
| elif email: | |
| participants.append(email) | |
| elif name: | |
| participants.append(name) | |
| # From people.attendees | |
| for att in people.get("attendees", []): | |
| email = att.get("email", "") | |
| details = att.get("details", {}) | |
| person = details.get("person", {}) | |
| name_obj = person.get("name", {}) | |
| full_name = name_obj.get("fullName", "") | |
| if full_name and email: | |
| participants.append(f"{full_name} <{email}>") | |
| elif email: | |
| participants.append(email) | |
| elif full_name: | |
| participants.append(full_name) | |
| # Supplement from google_calendar_event attendees if people list was sparse | |
| gce = doc.get("google_calendar_event") or {} | |
| gce_attendees = gce.get("attendees", []) | |
| existing_emails = {p.split("<")[-1].rstrip(">") for p in participants if "<" in p} | |
| existing_emails.update(p for p in participants if "@" in p and "<" not in p) | |
| for att in gce_attendees: | |
| email = att.get("email", "") | |
| if email and email not in existing_emails: | |
| participants.append(email) | |
| existing_emails.add(email) | |
| return participants | |
| def yaml_escape(value: str) -> str: | |
| """Escape a string for YAML frontmatter.""" | |
| if not value: | |
| return '""' | |
| # If it contains special characters, wrap in quotes | |
| if any(c in value for c in (':', '#', '"', "'", '[', ']', '{', '}', ',', '&', '*', '?', '|', '-', '<', '>', '=', '!', '%', '@', '`', '\n')): | |
| escaped = value.replace('"', '\\"') | |
| return f'"{escaped}"' | |
| return value | |
| def build_markdown(doc: dict, transcript_segments: list[dict]) -> str: | |
| """Build the full markdown file content for a document.""" | |
| doc_id = doc.get("id", "unknown") | |
| title = doc.get("title") or "Untitled Meeting" | |
| created_at = doc.get("created_at", "") | |
| participants = extract_participants(doc) | |
| link = f"https://notes.granola.ai/d/{doc_id}" | |
| # Parse date for frontmatter | |
| date_str = "" | |
| if created_at: | |
| try: | |
| dt = datetime.fromisoformat(created_at.replace("Z", "+00:00")) | |
| date_str = dt.strftime("%Y-%m-%d %H:%M:%S %Z") | |
| except (ValueError, TypeError): | |
| date_str = created_at | |
| # Build YAML frontmatter | |
| lines = ["---"] | |
| lines.append(f"title: {yaml_escape(title)}") | |
| lines.append(f"date: {date_str}") | |
| lines.append(f"meeting_id: {doc_id}") | |
| lines.append(f"link: {link}") | |
| if participants: | |
| lines.append("participants:") | |
| for p in participants: | |
| lines.append(f" - {yaml_escape(p)}") | |
| else: | |
| lines.append("participants: []") | |
| lines.append("---") | |
| lines.append("") | |
| # Title | |
| lines.append(f"# {title}") | |
| lines.append("") | |
| # Panel content (AI summary/notes) | |
| panel = doc.get("last_viewed_panel") | |
| if panel: | |
| panel_content = panel.get("content") | |
| panel_title = panel.get("title", "Notes") | |
| if panel_content and isinstance(panel_content, dict): | |
| lines.append(f"## {panel_title}") | |
| lines.append("") | |
| md = prosemirror_to_markdown(panel_content) | |
| # Clean up excessive blank lines | |
| md = re.sub(r"\n{3,}", "\n\n", md).strip() | |
| lines.append(md) | |
| lines.append("") | |
| else: | |
| # Try notes_markdown as fallback | |
| notes_md = doc.get("notes_markdown", "") | |
| if notes_md: | |
| lines.append("## Notes") | |
| lines.append("") | |
| lines.append(notes_md.strip()) | |
| lines.append("") | |
| # Transcript | |
| lines.append("## Transcript") | |
| lines.append("") | |
| lines.append(format_transcript(transcript_segments)) | |
| return "\n".join(lines) | |
| def main(): | |
| print("Granola Meeting Export") | |
| print("=" * 60) | |
| print(f"Cutoff date: {CUTOFF_DATE.isoformat() if CUTOFF_DATE else 'None (all meetings)'}") | |
| print(f"Output dir: {OUTPUT_DIR}") | |
| print() | |
| # Load auth | |
| print("Loading auth token...") | |
| token = load_auth_token() | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json", | |
| } | |
| # Fetch all documents | |
| print("Fetching documents from API...") | |
| all_docs = fetch_all_documents(headers) | |
| print(f"Total documents: {len(all_docs)}") | |
| print() | |
| # Filter docs if cutoff date is set | |
| if CUTOFF_DATE: | |
| filtered_docs = [] | |
| for doc in all_docs: | |
| created_at = doc.get("created_at", "") | |
| if not created_at: | |
| continue | |
| try: | |
| dt = datetime.fromisoformat(created_at.replace("Z", "+00:00")) | |
| if dt < CUTOFF_DATE: | |
| filtered_docs.append(doc) | |
| except (ValueError, TypeError): | |
| continue | |
| old_docs = filtered_docs | |
| print(f"Documents older than {CUTOFF_DATE.date()}: {len(old_docs)}") | |
| else: | |
| old_docs = [d for d in all_docs if d.get("created_at")] | |
| print(f"Exporting all {len(old_docs)} documents") | |
| # Sort by date ascending (oldest first) | |
| old_docs.sort(key=lambda d: d.get("created_at", "")) | |
| print() | |
| # Process each document | |
| exported = 0 | |
| skipped_exists = 0 | |
| skipped_error = 0 | |
| for i, doc in enumerate(old_docs, 1): | |
| doc_id = doc.get("id", "unknown") | |
| title = doc.get("title") or "Untitled" | |
| created_at = doc.get("created_at", "") | |
| # Build filename | |
| try: | |
| dt = datetime.fromisoformat(created_at.replace("Z", "+00:00")) | |
| date_prefix = dt.strftime("%Y-%m-%d") | |
| except (ValueError, TypeError): | |
| date_prefix = "unknown-date" | |
| slug = make_slug(title) | |
| filename = f"{date_prefix}-{slug}.md" | |
| filepath = OUTPUT_DIR / filename | |
| # Skip if already exists | |
| if filepath.exists(): | |
| print(f" [{i}/{len(old_docs)}] SKIP (exists): {filename}") | |
| skipped_exists += 1 | |
| continue | |
| print(f" [{i}/{len(old_docs)}] Exporting: {filename}") | |
| print(f" Title: {title[:70]}") | |
| # Fetch transcript | |
| try: | |
| transcript = fetch_transcript(doc_id, headers) | |
| except Exception as e: | |
| print(f" WARNING: Could not fetch transcript: {e}") | |
| transcript = [] | |
| # Build and write markdown | |
| try: | |
| md_content = build_markdown(doc, transcript) | |
| filepath.write_text(md_content, encoding="utf-8") | |
| exported += 1 | |
| except Exception as e: | |
| print(f" ERROR: Failed to write file: {e}") | |
| skipped_error += 1 | |
| # Rate limiting | |
| time.sleep(REQUEST_DELAY) | |
| print() | |
| print("=" * 60) | |
| print(f"Export complete!") | |
| print(f" Exported: {exported}") | |
| print(f" Skipped (exist):{skipped_exists}") | |
| print(f" Errors: {skipped_error}") | |
| print(f" Total processed:{len(old_docs)}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment