Skip to content

Instantly share code, notes, and snippets.

@brandonhimpfen
Created February 22, 2026 20:07
Show Gist options
  • Select an option

  • Save brandonhimpfen/3a03ccaba08b27376bbb047d603c712e to your computer and use it in GitHub Desktop.

Select an option

Save brandonhimpfen/3a03ccaba08b27376bbb047d603c712e to your computer and use it in GitHub Desktop.
Dependency-free SQLite migrations for Python: migrations table + apply migrations in order with transaction safety.
#!/usr/bin/env python3
"""
Basic SQLite migrations runner (no dependencies).
Pattern:
- Keep a list of migrations (id, name, sql)
- Store applied migrations in a table: schema_migrations
- Apply missing migrations in order inside a transaction
Good for:
- Small CLIs/tools
- Lightweight apps
- “Single-file” utilities that still need schema changes over time
"""
from __future__ import annotations
import sqlite3
from dataclasses import dataclass
from typing import Iterable, Sequence
@dataclass(frozen=True)
class Migration:
id: int
name: str
sql: str
def connect_db(path: str) -> sqlite3.Connection:
conn = sqlite3.connect(path, timeout=5.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON;")
return conn
def ensure_migrations_table(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
)
def applied_migration_ids(conn: sqlite3.Connection) -> set[int]:
rows = conn.execute("SELECT id FROM schema_migrations ORDER BY id;").fetchall()
return {int(r["id"]) for r in rows}
def apply_migrations(conn: sqlite3.Connection, migrations: Sequence[Migration]) -> None:
"""
Applies missing migrations in ascending id order.
Rules:
- Migration IDs must be unique and increasing.
- Each migration should be safe to run exactly once.
- All pending migrations run in a single transaction by default.
"""
ensure_migrations_table(conn)
already = applied_migration_ids(conn)
# Sanity checks
ids = [m.id for m in migrations]
if len(ids) != len(set(ids)):
raise ValueError("Duplicate migration IDs detected.")
if ids != sorted(ids):
raise ValueError("Migrations must be sorted by id ascending.")
pending = [m for m in migrations if m.id not in already]
if not pending:
return
# Transaction: either all pending migrations apply, or none do.
try:
conn.execute("BEGIN;")
for m in pending:
conn.executescript(m.sql)
conn.execute(
"INSERT INTO schema_migrations (id, name) VALUES (?, ?);",
(m.id, m.name),
)
conn.execute("COMMIT;")
except Exception:
conn.execute("ROLLBACK;")
raise
# ---------------------------------------------------------------------------
# Example migrations
# ---------------------------------------------------------------------------
MIGRATIONS: list[Migration] = [
Migration(
id=1,
name="create_users",
sql="""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
""",
),
Migration(
id=2,
name="add_users_index",
sql="""
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
""",
),
Migration(
id=3,
name="create_posts",
sql="""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
body TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
""",
),
]
# ---------------------------------------------------------------------------
# Example usage
# ---------------------------------------------------------------------------
def main() -> None:
db_path = "example.db"
conn = connect_db(db_path)
try:
apply_migrations(conn, MIGRATIONS)
# Verify:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;"
).fetchall()
print("Tables:", [t["name"] for t in tables])
applied = conn.execute(
"SELECT id, name, applied_at FROM schema_migrations ORDER BY id;"
).fetchall()
print("Applied migrations:")
for r in applied:
print(f" {r['id']:>3} {r['name']} ({r['applied_at']})")
finally:
conn.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment