Created
February 7, 2026 01:49
-
-
Save shriyanss/1589ef283e19123ded33f823bd4722d4 to your computer and use it in GitHub Desktop.
Execute SQL queries on JSON files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| JSON to SQLite Utility | |
| Usage: | |
| 1. Pipe JSON/JSONL from stdin: | |
| cat data.json | python3 json2sql.py 'SELECT * FROM t' | |
| 2. Provide file as argument: | |
| python3 json2sql.py data.json 'SELECT * FROM t' | |
| Features: | |
| - Supports standard JSON arrays and JSONL (line-delimited JSON). | |
| - Automatically flattens nested objects into dot-notation columns (e.g., parent.child). | |
| - Infers schema (types and columns) from the first 1000 records. | |
| - Efficient stream processing for large files. | |
| - Standard table name is 't'. Columns with dots must be quoted in SQL: `SELECT "a.b" FROM t`. | |
| """ | |
| import sys | |
| import json | |
| import sqlite3 | |
| import os | |
| import itertools | |
| def get_input_stream(): | |
| # Usage: | |
| # 1. python3 json2sql.py <query> (reads from stdin) | |
| # 2. python3 json2sql.py <file.json> <query> | |
| args = sys.argv[1:] | |
| query = "" | |
| input_file = sys.stdin | |
| if len(args) == 1: | |
| # Assume stdin | |
| query = args[0] | |
| if sys.stdin.isatty(): | |
| print("Error: No input provided on stdin and no file specified.", file=sys.stderr) | |
| sys.exit(1) | |
| elif len(args) == 2: | |
| # Assume file + query | |
| filepath = args[0] | |
| query = args[1] | |
| if not os.path.isfile(filepath): | |
| print(f"Error: File '{filepath}' not found.", file=sys.stderr) | |
| sys.exit(1) | |
| input_file = open(filepath, 'r') | |
| else: | |
| print("Usage:", file=sys.stderr) | |
| print(" cat data.json | python3 json2sql.py '<query>'", file=sys.stderr) | |
| print(" python3 json2sql.py data.json '<query>'", file=sys.stderr) | |
| sys.exit(1) | |
| return input_file, query | |
| def load_data_generator(f): | |
| """ | |
| Yields parsed JSON objects from the input file/stream. | |
| Handles standard JSON lists, JSONL, and single JSON objects. | |
| """ | |
| try: | |
| first_char = f.read(1) | |
| except Exception: | |
| return | |
| if not first_char: | |
| return | |
| while first_char.isspace(): | |
| first_char = f.read(1) | |
| if not first_char: | |
| return | |
| if first_char == '[': | |
| content = first_char + f.read() | |
| try: | |
| data = json.loads(content) | |
| if isinstance(data, list): | |
| for item in data: | |
| yield item | |
| else: | |
| yield data | |
| except json.JSONDecodeError as e: | |
| print(f"Error parsing JSON Array: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| else: | |
| line1_rest = f.readline() | |
| line1 = first_char + line1_rest | |
| try: | |
| obj = json.loads(line1) | |
| yield obj | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| yield json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| except json.JSONDecodeError: | |
| rest = f.read() | |
| full_content = line1 + rest | |
| try: | |
| obj = json.loads(full_content) | |
| if isinstance(obj, list): | |
| for item in obj: | |
| yield item | |
| else: | |
| yield obj | |
| except json.JSONDecodeError as e: | |
| print(f"Error: Could not parse input as JSONL or JSON Object. \nDetails: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| def flatten_dict(d, parent_key='', sep='.'): | |
| """ | |
| Recursively flattens a nested dictionary. | |
| Lists and other non-dict types are preserved as values. | |
| """ | |
| items = [] | |
| for k, v in d.items(): | |
| new_key = f"{parent_key}{sep}{k}" if parent_key else k | |
| if isinstance(v, dict): | |
| items.extend(flatten_dict(v, new_key, sep=sep).items()) | |
| else: | |
| items.append((new_key, v)) | |
| return dict(items) | |
| def infer_schema_from_sample(sample_data): | |
| all_keys = set() | |
| # Flatten sample data first to find all possible columns | |
| flattened_sample = [] | |
| for row in sample_data: | |
| if isinstance(row, dict): | |
| flat_row = flatten_dict(row) | |
| flattened_sample.append(flat_row) | |
| all_keys.update(flat_row.keys()) | |
| else: | |
| flattened_sample.append(row) | |
| columns = sorted(list(all_keys)) | |
| col_types = {} | |
| for col in columns: | |
| col_types[col] = "TEXT" # Default | |
| for row in flattened_sample: | |
| if isinstance(row, dict) and col in row and row[col] is not None: | |
| val = row[col] | |
| if isinstance(val, int): | |
| col_types[col] = "INTEGER" | |
| elif isinstance(val, float): | |
| col_types[col] = "REAL" | |
| else: | |
| col_types[col] = "TEXT" | |
| break | |
| return columns, col_types | |
| def flatten_value(val): | |
| if isinstance(val, (dict, list)): | |
| return json.dumps(val) | |
| return val | |
| def main(): | |
| input_file, query = get_input_stream() | |
| data_gen = load_data_generator(input_file) | |
| BATCH_SIZE = 1000 | |
| first_batch = list(itertools.islice(data_gen, BATCH_SIZE)) | |
| if not first_batch: | |
| conn = sqlite3.connect(':memory:') | |
| print("Warning: No data found.", file=sys.stderr) | |
| return | |
| # Infer schema from flattened batch | |
| columns, col_types = infer_schema_from_sample(first_batch) | |
| conn = sqlite3.connect(':memory:') | |
| table_name = 't' | |
| # Create Table | |
| # Quote column names to handle dot notation | |
| col_defs = ", ".join([f'"{col}" {col_types[col]}' for col in columns]) | |
| create_stmt = f'CREATE TABLE {table_name} ({col_defs})' | |
| conn.execute(create_stmt) | |
| insert_stmt = f'INSERT INTO {table_name} ({", ".join([f"{chr(34)}{c}{chr(34)}" for c in columns])}) VALUES ({", ".join(["?" for _ in columns])})' | |
| def insert_batch(batch): | |
| rows = [] | |
| for row in batch: | |
| if not isinstance(row, dict): continue | |
| # Flatten the row before inserting | |
| flat_row = flatten_dict(row) | |
| row_values = [] | |
| for col in columns: | |
| val = flat_row.get(col) | |
| row_values.append(flatten_value(val)) | |
| rows.append(row_values) | |
| conn.executemany(insert_stmt, rows) | |
| conn.commit() | |
| insert_batch(first_batch) | |
| current_batch = [] | |
| for row in data_gen: | |
| current_batch.append(row) | |
| if len(current_batch) >= BATCH_SIZE: | |
| insert_batch(current_batch) | |
| current_batch = [] | |
| if current_batch: | |
| insert_batch(current_batch) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(query) | |
| except sqlite3.OperationalError as e: | |
| if "no such table" in str(e): | |
| print(f"SQL Error: {e}", file=sys.stderr) | |
| print(f"Hint: The table is named '{table_name}'.", file=sys.stderr) | |
| else: | |
| print(f"SQL Error: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if cursor.description: | |
| headers = [description[0] for description in cursor.description] | |
| print("\t".join(headers)) | |
| for row in cursor.fetchall(): | |
| print("\t".join(str(x) if x is not None else "NULL" for x in row)) | |
| conn.close() | |
| if input_file is not sys.stdin: | |
| input_file.close() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment