Created
March 8, 2025 19:30
-
-
Save zlatent/3a553bf685b170f35f9144b669922e5b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/opt/miniconda3/bin/python | |
| import duckdb | |
| import glob | |
| import sys | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Union | |
| import re | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def sanitize_table_name(table_name: str) -> str: | |
| """ | |
| Sanitize the table name to be a valid SQL identifier. | |
| Args: | |
| table_name (str): Original table name | |
| Returns: | |
| str: Sanitized table name | |
| """ | |
| # Replace non-alphanumeric chars with underscore | |
| sanitized = ''.join(c if c.isalnum() else '_' for c in table_name) | |
| # Ensure it starts with a letter | |
| sanitized = sanitized.lstrip('0123456789_') | |
| return sanitized or 'table' # Fallback if name would be empty | |
| def analyze_table(con: duckdb.DuckDBPyConnection, table_name: str) -> None: | |
| """ | |
| Analyze and log table information. | |
| Args: | |
| con: DuckDB connection | |
| table_name: Name of the table to analyze | |
| """ | |
| try: | |
| schema_info = con.sql(f"DESCRIBE {table_name}") | |
| logger.info(f"Schema for {table_name}: {schema_info.show()}") | |
| sample_rows = con.sql(f"SELECT * FROM {table_name} LIMIT 3") | |
| logger.info(f"First few rows of {table_name}: {sample_rows.show()}") | |
| record_count = con.sql(f"SELECT count(*) FROM {table_name}").fetchone()[0] | |
| logger.info(f"Number of records in {table_name}: {record_count:,}") | |
| logger.info("-" * 72) | |
| except Exception as e: | |
| logger.error(f"Error analyzing table {table_name}: {str(e)}") | |
| def import_csvs_to_duckdb( | |
| database_name: str = ':memory:', | |
| directory: Union[str, Path] = '.' | |
| ) -> Optional[duckdb.DuckDBPyConnection]: | |
| """ | |
| Import all CSV files from the specified directory into DuckDB tables. | |
| Args: | |
| database_name: Name of the DuckDB database to use | |
| directory: Directory containing CSV files | |
| Returns: | |
| DuckDB connection object or None if no files were processed | |
| """ | |
| try: | |
| con = duckdb.connect(database_name) | |
| # Convert directory to Path object and resolve it | |
| dir_path = Path(directory).resolve() | |
| # Get all CSV files in directory | |
| csv_files = glob.glob('*.csv') + glob.glob('*.csv.gz') | |
| if not csv_files: | |
| logger.info(f"No CSV files found in directory: {dir_path}") | |
| con.close() | |
| return None | |
| logger.info(f"Found {len(csv_files)} CSV files to process") | |
| for csv_file in csv_files: | |
| # Create table name from file name | |
| table_name = sanitize_table_name(re.sub(r'\.csv(\.gz)?$', '', csv_file)) | |
| try: | |
| # Read CSV into DuckDB table with progress bar | |
| logger.info(f"Importing {csv_file} into table {table_name}") | |
| query = f""" | |
| CREATE TABLE {table_name} AS | |
| SELECT * FROM read_csv(?, | |
| normalize_names=true, | |
| all_varchar=true, | |
| sample_size=10000, -- Increase sample size for better inference | |
| header=true | |
| ) | |
| """ | |
| con.execute(query, [str(csv_file)]) | |
| # Analyze the newly created table | |
| analyze_table(con, table_name) | |
| except Exception as e: | |
| logger.error(f"Error importing {csv_file}: {str(e)}") | |
| continue | |
| return con | |
| except Exception as e: | |
| logger.error(f"Fatal error: {str(e)}") | |
| return None | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Import CSV files into DuckDB') | |
| parser.add_argument('--database', default=':memory:', | |
| help='DuckDB database name (default: in-memory)') | |
| parser.add_argument('--directory', default='.', | |
| help='Directory containing CSV files (default: current directory)') | |
| args = parser.parse_args() | |
| db_connection = import_csvs_to_duckdb(args.database, args.directory) | |
| if db_connection: | |
| try: | |
| db_connection.close() | |
| except Exception as e: | |
| logger.error(f"Error closing database connection: {str(e)}") | |
| finally: | |
| logger.info("Database connection closed.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment