Skip to content

Instantly share code, notes, and snippets.

@shriyanss
Created February 7, 2026 01:49
Show Gist options
  • Select an option

  • Save shriyanss/1589ef283e19123ded33f823bd4722d4 to your computer and use it in GitHub Desktop.

Select an option

Save shriyanss/1589ef283e19123ded33f823bd4722d4 to your computer and use it in GitHub Desktop.
Execute SQL queries on JSON files
"""
JSON to SQLite Utility
Usage:
1. Pipe JSON/JSONL from stdin:
cat data.json | python3 json2sql.py 'SELECT * FROM t'
2. Provide file as argument:
python3 json2sql.py data.json 'SELECT * FROM t'
Features:
- Supports standard JSON arrays and JSONL (line-delimited JSON).
- Automatically flattens nested objects into dot-notation columns (e.g., parent.child).
- Infers schema (types and columns) from the first 1000 records.
- Efficient stream processing for large files.
- Standard table name is 't'. Columns with dots must be quoted in SQL: `SELECT "a.b" FROM t`.
"""
import sys
import json
import sqlite3
import os
import itertools
def get_input_stream():
# Usage:
# 1. python3 json2sql.py <query> (reads from stdin)
# 2. python3 json2sql.py <file.json> <query>
args = sys.argv[1:]
query = ""
input_file = sys.stdin
if len(args) == 1:
# Assume stdin
query = args[0]
if sys.stdin.isatty():
print("Error: No input provided on stdin and no file specified.", file=sys.stderr)
sys.exit(1)
elif len(args) == 2:
# Assume file + query
filepath = args[0]
query = args[1]
if not os.path.isfile(filepath):
print(f"Error: File '{filepath}' not found.", file=sys.stderr)
sys.exit(1)
input_file = open(filepath, 'r')
else:
print("Usage:", file=sys.stderr)
print(" cat data.json | python3 json2sql.py '<query>'", file=sys.stderr)
print(" python3 json2sql.py data.json '<query>'", file=sys.stderr)
sys.exit(1)
return input_file, query
def load_data_generator(f):
"""
Yields parsed JSON objects from the input file/stream.
Handles standard JSON lists, JSONL, and single JSON objects.
"""
try:
first_char = f.read(1)
except Exception:
return
if not first_char:
return
while first_char.isspace():
first_char = f.read(1)
if not first_char:
return
if first_char == '[':
content = first_char + f.read()
try:
data = json.loads(content)
if isinstance(data, list):
for item in data:
yield item
else:
yield data
except json.JSONDecodeError as e:
print(f"Error parsing JSON Array: {e}", file=sys.stderr)
sys.exit(1)
else:
line1_rest = f.readline()
line1 = first_char + line1_rest
try:
obj = json.loads(line1)
yield obj
for line in f:
line = line.strip()
if line:
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
except json.JSONDecodeError:
rest = f.read()
full_content = line1 + rest
try:
obj = json.loads(full_content)
if isinstance(obj, list):
for item in obj:
yield item
else:
yield obj
except json.JSONDecodeError as e:
print(f"Error: Could not parse input as JSONL or JSON Object. \nDetails: {e}", file=sys.stderr)
sys.exit(1)
def flatten_dict(d, parent_key='', sep='.'):
"""
Recursively flattens a nested dictionary.
Lists and other non-dict types are preserved as values.
"""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def infer_schema_from_sample(sample_data):
all_keys = set()
# Flatten sample data first to find all possible columns
flattened_sample = []
for row in sample_data:
if isinstance(row, dict):
flat_row = flatten_dict(row)
flattened_sample.append(flat_row)
all_keys.update(flat_row.keys())
else:
flattened_sample.append(row)
columns = sorted(list(all_keys))
col_types = {}
for col in columns:
col_types[col] = "TEXT" # Default
for row in flattened_sample:
if isinstance(row, dict) and col in row and row[col] is not None:
val = row[col]
if isinstance(val, int):
col_types[col] = "INTEGER"
elif isinstance(val, float):
col_types[col] = "REAL"
else:
col_types[col] = "TEXT"
break
return columns, col_types
def flatten_value(val):
if isinstance(val, (dict, list)):
return json.dumps(val)
return val
def main():
input_file, query = get_input_stream()
data_gen = load_data_generator(input_file)
BATCH_SIZE = 1000
first_batch = list(itertools.islice(data_gen, BATCH_SIZE))
if not first_batch:
conn = sqlite3.connect(':memory:')
print("Warning: No data found.", file=sys.stderr)
return
# Infer schema from flattened batch
columns, col_types = infer_schema_from_sample(first_batch)
conn = sqlite3.connect(':memory:')
table_name = 't'
# Create Table
# Quote column names to handle dot notation
col_defs = ", ".join([f'"{col}" {col_types[col]}' for col in columns])
create_stmt = f'CREATE TABLE {table_name} ({col_defs})'
conn.execute(create_stmt)
insert_stmt = f'INSERT INTO {table_name} ({", ".join([f"{chr(34)}{c}{chr(34)}" for c in columns])}) VALUES ({", ".join(["?" for _ in columns])})'
def insert_batch(batch):
rows = []
for row in batch:
if not isinstance(row, dict): continue
# Flatten the row before inserting
flat_row = flatten_dict(row)
row_values = []
for col in columns:
val = flat_row.get(col)
row_values.append(flatten_value(val))
rows.append(row_values)
conn.executemany(insert_stmt, rows)
conn.commit()
insert_batch(first_batch)
current_batch = []
for row in data_gen:
current_batch.append(row)
if len(current_batch) >= BATCH_SIZE:
insert_batch(current_batch)
current_batch = []
if current_batch:
insert_batch(current_batch)
cursor = conn.cursor()
try:
cursor.execute(query)
except sqlite3.OperationalError as e:
if "no such table" in str(e):
print(f"SQL Error: {e}", file=sys.stderr)
print(f"Hint: The table is named '{table_name}'.", file=sys.stderr)
else:
print(f"SQL Error: {e}", file=sys.stderr)
sys.exit(1)
if cursor.description:
headers = [description[0] for description in cursor.description]
print("\t".join(headers))
for row in cursor.fetchall():
print("\t".join(str(x) if x is not None else "NULL" for x in row))
conn.close()
if input_file is not sys.stdin:
input_file.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment