Skip to content

Instantly share code, notes, and snippets.

@zlatent
Created March 8, 2025 19:30
Show Gist options
  • Select an option

  • Save zlatent/3a553bf685b170f35f9144b669922e5b to your computer and use it in GitHub Desktop.

Select an option

Save zlatent/3a553bf685b170f35f9144b669922e5b to your computer and use it in GitHub Desktop.
#!/opt/miniconda3/bin/python
import duckdb
import glob
import sys
import logging
from pathlib import Path
from typing import Optional, Union
import re
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def sanitize_table_name(table_name: str) -> str:
"""
Sanitize the table name to be a valid SQL identifier.
Args:
table_name (str): Original table name
Returns:
str: Sanitized table name
"""
# Replace non-alphanumeric chars with underscore
sanitized = ''.join(c if c.isalnum() else '_' for c in table_name)
# Ensure it starts with a letter
sanitized = sanitized.lstrip('0123456789_')
return sanitized or 'table' # Fallback if name would be empty
def analyze_table(con: duckdb.DuckDBPyConnection, table_name: str) -> None:
"""
Analyze and log table information.
Args:
con: DuckDB connection
table_name: Name of the table to analyze
"""
try:
schema_info = con.sql(f"DESCRIBE {table_name}")
logger.info(f"Schema for {table_name}: {schema_info.show()}")
sample_rows = con.sql(f"SELECT * FROM {table_name} LIMIT 3")
logger.info(f"First few rows of {table_name}: {sample_rows.show()}")
record_count = con.sql(f"SELECT count(*) FROM {table_name}").fetchone()[0]
logger.info(f"Number of records in {table_name}: {record_count:,}")
logger.info("-" * 72)
except Exception as e:
logger.error(f"Error analyzing table {table_name}: {str(e)}")
def import_csvs_to_duckdb(
database_name: str = ':memory:',
directory: Union[str, Path] = '.'
) -> Optional[duckdb.DuckDBPyConnection]:
"""
Import all CSV files from the specified directory into DuckDB tables.
Args:
database_name: Name of the DuckDB database to use
directory: Directory containing CSV files
Returns:
DuckDB connection object or None if no files were processed
"""
try:
con = duckdb.connect(database_name)
# Convert directory to Path object and resolve it
dir_path = Path(directory).resolve()
# Get all CSV files in directory
csv_files = glob.glob('*.csv') + glob.glob('*.csv.gz')
if not csv_files:
logger.info(f"No CSV files found in directory: {dir_path}")
con.close()
return None
logger.info(f"Found {len(csv_files)} CSV files to process")
for csv_file in csv_files:
# Create table name from file name
table_name = sanitize_table_name(re.sub(r'\.csv(\.gz)?$', '', csv_file))
try:
# Read CSV into DuckDB table with progress bar
logger.info(f"Importing {csv_file} into table {table_name}")
query = f"""
CREATE TABLE {table_name} AS
SELECT * FROM read_csv(?,
normalize_names=true,
all_varchar=true,
sample_size=10000, -- Increase sample size for better inference
header=true
)
"""
con.execute(query, [str(csv_file)])
# Analyze the newly created table
analyze_table(con, table_name)
except Exception as e:
logger.error(f"Error importing {csv_file}: {str(e)}")
continue
return con
except Exception as e:
logger.error(f"Fatal error: {str(e)}")
return None
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Import CSV files into DuckDB')
parser.add_argument('--database', default=':memory:',
help='DuckDB database name (default: in-memory)')
parser.add_argument('--directory', default='.',
help='Directory containing CSV files (default: current directory)')
args = parser.parse_args()
db_connection = import_csvs_to_duckdb(args.database, args.directory)
if db_connection:
try:
db_connection.close()
except Exception as e:
logger.error(f"Error closing database connection: {str(e)}")
finally:
logger.info("Database connection closed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment