|
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import mimetypes |
|
import os |
|
import re |
|
import sys |
|
import time |
|
from pathlib import Path |
|
import requests |
|
from tqdm import tqdm |
|
|
|
API_SUFFIX = "/api" |
|
|
|
class OutlineClient: |
|
def __init__(self, host: str, api_key: str): |
|
self.base = host.rstrip("/") + API_SUFFIX |
|
self.session = requests.Session() |
|
self.session.headers.update({ |
|
"Authorization": f"Bearer {api_key}", |
|
"Accept": "application/json", |
|
"Content-Type": "application/json", |
|
}) |
|
|
|
def _post(self, method: str, payload: dict) -> dict: |
|
url = f"{self.base}/{method}" |
|
for _ in range(5): |
|
r = self.session.post(url, json=payload) |
|
if r.status_code == 429: |
|
retry_after = r.headers.get("Retry-After", "60") |
|
try: |
|
wait = int(float(retry_after)) |
|
except ValueError: |
|
wait = 60 |
|
print(f"Rate limit hit. Waiting {wait}s...") |
|
time.sleep(wait) |
|
continue |
|
if not r.ok: |
|
raise RuntimeError(f"{method} β {r.status_code}: {r.text}") |
|
data = r.json() |
|
if not data.get("ok"): |
|
raise RuntimeError(f"{method} error: {data}") |
|
return data["data"] |
|
raise RuntimeError(f"{method} failed after retries") |
|
|
|
def get_or_create_collection(self, name: str, description: str = "") -> str: |
|
existing = self._post("collections.list", {"query": name}) |
|
for col in existing: |
|
if col["name"].lower() == name.lower(): |
|
return col["id"] |
|
icon = guess_icon(name, default="π") |
|
return self._post("collections.create", { |
|
"name": name, |
|
"description": description, |
|
"icon": icon |
|
})["id"] |
|
|
|
def update_collection_description(self, collection_id: str, description: str): |
|
self._post("collections.update", { |
|
"id": collection_id, |
|
"description": description |
|
}) |
|
|
|
def create_document(self, title: str, text: str, collection_id: str) -> dict: |
|
return self._post("documents.create", { |
|
"title": title, |
|
"text": text, |
|
"collectionId": collection_id, |
|
"icon": guess_icon(title), |
|
"publish": True |
|
}) |
|
|
|
def upload_attachment(self, path: Path) -> str: |
|
mime = mimetypes.guess_type(path.name)[0] or "application/octet-stream" |
|
meta = { |
|
"name": path.name, |
|
"contentType": mime, |
|
"size": path.stat().st_size, |
|
} |
|
resp = self._post("attachments.create", meta) |
|
with path.open("rb") as fh: |
|
files = {"file": (path.name, fh, mime)} |
|
requests.post(resp["uploadUrl"], data=resp["form"], files=files).raise_for_status() |
|
return resp["attachment"]["url"] |
|
|
|
def guess_icon(title: str, default: str = "π") -> str: |
|
t = title.lower() |
|
if "home" in t: return "π " |
|
if "intro" in t or "start" in t: return "π" |
|
if "install" in t or "setup" in t: return "π οΈ" |
|
if "faq" in t or "help" in t: return "β" |
|
if "guide" in t or "tutorial" in t: return "π" |
|
if "advanced" in t: return "π¬" |
|
if "api" in t: return "π" |
|
if "diagram" in t or "arch" in t: return "ποΈ" |
|
return default |
|
|
|
IMG_RE = re.compile(r"!\[[^\]]*]\(([^)]+)\)") |
|
MD_LINK_RE = re.compile(r"\[([^\]]+)]\(([^)]+)\)") |
|
|
|
def rewrite_markdown(md: str, root: Path, client: OutlineClient) -> str: |
|
def sub_img(m): |
|
path = (root / m.group(1).split(" ")[0]).resolve() |
|
return m.group(0).replace(m.group(1), client.upload_attachment(path)) if path.exists() else m.group(0) |
|
return IMG_RE.sub(sub_img, md) |
|
|
|
def fix_internal_links(md: str, url_map: dict[str, tuple[str, str]]) -> str: |
|
def replace_link(m): |
|
label, link = m.group(1).strip(), m.group(2).split("|")[0].strip() |
|
base = os.path.splitext(os.path.basename(link))[0].lower() |
|
if base in url_map: |
|
title, urlid = url_map[base] |
|
slug = re.sub(r"[^\w\s-]", "", title).strip().lower() |
|
slug = re.sub(r"[\s_-]+", "-", slug) |
|
return f"[{label}](/doc/{slug}-{urlid})" |
|
return m.group(0) |
|
return MD_LINK_RE.sub(replace_link, md) |
|
|
|
def pretty_title(path: Path) -> str: |
|
return re.sub(r"[-_]+", " ", path.stem).title() |
|
|
|
def export_repo(repo_path: Path, client: OutlineClient, collection_name: str): |
|
files = sorted(p for p in repo_path.rglob("*.md") if p.is_file()) |
|
home_path = next((f for f in files if f.stem.lower() == "home"), None) |
|
home_raw = home_path.read_text(encoding="utf-8") if home_path else "" |
|
|
|
docs = [] |
|
for f in tqdm(files, desc="Prepare"): |
|
raw = f.read_text(encoding="utf-8") |
|
md = rewrite_markdown(raw, f.parent, client) |
|
docs.append((f, pretty_title(f), md)) |
|
|
|
temp_map = {f.stem.lower(): (title, "temp") for f, title, _ in docs} |
|
collection_id = client.get_or_create_collection( |
|
collection_name, |
|
fix_internal_links(home_raw, temp_map).strip() |
|
) |
|
|
|
url_map = {} |
|
for f, title, content in tqdm(docs, desc="Upload"): |
|
fixed_md = fix_internal_links(content, url_map) |
|
doc = client.create_document(title, fixed_md, collection_id) |
|
url_map[f.stem.lower()] = (title, doc["urlId"]) |
|
|
|
if home_raw.strip(): |
|
fixed_home = fix_internal_links(home_raw, url_map).strip() |
|
client.update_collection_description(collection_id, fixed_home) |
|
|
|
def main(): |
|
p = argparse.ArgumentParser() |
|
p.add_argument("repo", type=Path) |
|
p.add_argument("--api-key", required=True) |
|
p.add_argument("--collection") |
|
p.add_argument("--host", default="https://app.getoutline.com") |
|
args = p.parse_args() |
|
|
|
if not args.repo.is_dir(): |
|
sys.exit("Not a directory: " + str(args.repo)) |
|
|
|
client = OutlineClient(args.host, args.api_key) |
|
export_repo(args.repo, client, args.collection or args.repo.stem) |
|
|
|
if __name__ == "__main__": |
|
main() |