Created
December 19, 2025 09:03
-
-
Save wlinds/639ee01c640b702d8da99d1339496c07 to your computer and use it in GitHub Desktop.
CLI Util: GBQ / PostgreSQL | CLI tool for transferring data between Google BigQuery and PostgreSQL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| GBQ-Utils: CLI tool for transferring data between Google BigQuery and PostgreSQL | |
| REQ: | |
| pip install pandas pandas-gbq psycopg2-binary python-dotenv google-cloud-bigquery sqlalchemy | |
| CONFIG: | |
| 1. database.ini - PostgreSQL connection settings (host, port, user, sslmode) | |
| 2. .env file - DB_PASSWORD and DB_NAME (defaults to 'default_pool') | |
| 3. Google Cloud credentials - Uses default application credentials (gcloud auth) | |
| USAGE: | |
| # Download all tables from BigQuery dataset 'demo' to PostgreSQL schema 'demo' | |
| python3 main.py download --dataset demo | |
| # Download specific tables only | |
| python3 main.py download --dataset demo --tables users orders | |
| # Download to a different PostgreSQL schema | |
| python3 main.py download --dataset demo --schema my_schema | |
| # Upload all tables from PostgreSQL schema to BigQuery | |
| python3 main.py upload --schema demo | |
| # Export a BigQuery table to CSV | |
| python3 main.py export --source bq --dataset demo --table users -o exports/users.csv | |
| # Export a PostgreSQL table to Excel | |
| python3 main.py export --source pg --schema demo --table users -o exports/users.xlsx | |
| # List all tables in a BigQuery dataset | |
| python3 main.py list --source bq --dataset demo | |
| # Append instead of replace existing tables | |
| python3 main.py download --dataset demo --if-exists append | |
| """ | |
| import argparse, os, sys | |
| from configparser import ConfigParser | |
| from contextlib import contextmanager | |
| import pandas as pd | |
| import pandas_gbq | |
| from dotenv import load_dotenv | |
| from google.cloud import bigquery | |
| from sqlalchemy import create_engine, text | |
| def get_pg_engine(): | |
| load_dotenv() | |
| parser = ConfigParser() | |
| parser.read(os.path.join(os.path.dirname(__file__), "database.ini")) | |
| if not parser.has_section("postgresql"): | |
| raise RuntimeError("Section postgresql not found in database.ini") | |
| p = dict(parser.items("postgresql")) | |
| p["password"] = os.getenv("DB_PASSWORD") | |
| p["database"] = os.getenv("DB_NAME", "default_pool") | |
| conn_str = f"postgresql+psycopg2://{p['user']}:{p['password']}@{p['host']}:{p['port']}/{p['database']}" | |
| if p.get("sslmode"): | |
| conn_str += f"?sslmode={p['sslmode']}" | |
| return create_engine(conn_str) | |
| @contextmanager | |
| def pg_engine(): | |
| engine = get_pg_engine() | |
| try: | |
| yield engine | |
| finally: | |
| engine.dispose() | |
| @contextmanager | |
| def bq_client(): | |
| client = bigquery.Client() | |
| try: | |
| yield client | |
| finally: | |
| client.close() | |
| # --- List tables --- | |
| def list_bq_tables(dataset): | |
| with bq_client() as client: | |
| return [t.table_id for t in client.list_tables(client.dataset(dataset))] | |
| def list_pg_tables(schema): | |
| with pg_engine() as engine: | |
| with engine.connect() as conn: | |
| result = conn.execute(text( | |
| "SELECT table_name FROM information_schema.tables " | |
| "WHERE table_schema = :schema AND table_type = 'BASE TABLE'" | |
| ), {"schema": schema}) | |
| return [row[0] for row in result] | |
| # --- Download: BigQuery -> PostgreSQL --- | |
| def download_table(dataset, table_name, pg_schema=None, if_exists="replace"): | |
| pg_schema = pg_schema or dataset | |
| print(f"Downloading {dataset}.{table_name} -> {pg_schema}.{table_name}") | |
| try: | |
| with bq_client() as client, pg_engine() as engine: | |
| with engine.connect() as conn: | |
| conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{pg_schema}"')) | |
| conn.commit() | |
| df = client.query(f"SELECT * FROM `{dataset}`.`{table_name}`").to_dataframe() | |
| if df.empty: | |
| print(" Empty, skipping") | |
| return True | |
| for col in df.columns: | |
| if df[col].dtype == "object": | |
| df[col] = df[col].astype(str).replace("nan", None) | |
| elif pd.api.types.is_datetime64_any_dtype(df[col]): | |
| df[col] = pd.to_datetime(df[col], errors="coerce") | |
| df.to_sql(table_name, engine, schema=pg_schema, if_exists=if_exists, | |
| index=False, method="multi", chunksize=10000) | |
| print(f" Wrote {len(df)} rows") | |
| return True | |
| except Exception as e: | |
| print(f" Error: {e}") | |
| return False | |
| # --- Upload: PostgreSQL -> BigQuery --- | |
| def upload_table(pg_schema, table_name, bq_dataset=None, if_exists="replace"): | |
| bq_dataset = bq_dataset or pg_schema | |
| print(f"Uploading {pg_schema}.{table_name} -> {bq_dataset}.{table_name}") | |
| try: | |
| with pg_engine() as engine: | |
| df = pd.read_sql_query(f'SELECT * FROM "{pg_schema}"."{table_name}"', engine) | |
| if df.empty: | |
| print(" Empty, skipping") | |
| return True | |
| pandas_gbq.to_gbq(df, f"{bq_dataset}.{table_name}", if_exists=if_exists, progress_bar=True) | |
| print(f" Uploaded {len(df)} rows") | |
| return True | |
| except Exception as e: | |
| print(f" Error: {e}") | |
| return False | |
| # --- Batch operations --- | |
| def process_tables(tables, operation, list_func, name, **kwargs): | |
| if tables is None: | |
| tables = list_func() | |
| print(f"Found {len(tables)} tables") | |
| results = {"successful": [], "failed": []} | |
| for i, table in enumerate(tables, 1): | |
| print(f"\n[{i}/{len(tables)}] {table}") | |
| if operation(table_name=table, **kwargs): | |
| results["successful"].append(table) | |
| else: | |
| results["failed"].append(table) | |
| print(f"\n=== {name} Summary ===") | |
| print(f"Successful: {len(results['successful'])}, Failed: {len(results['failed'])}") | |
| return results | |
| # --- Export --- | |
| def export_table(source, dataset_or_schema, table_name, output_path, fmt="csv"): | |
| print(f"Exporting {dataset_or_schema}.{table_name} to {output_path}") | |
| try: | |
| if source == "bq": | |
| with bq_client() as client: | |
| df = client.query(f"SELECT * FROM `{dataset_or_schema}`.`{table_name}`").to_dataframe() | |
| else: | |
| with pg_engine() as engine: | |
| df = pd.read_sql_query(f'SELECT * FROM "{dataset_or_schema}"."{table_name}"', engine) | |
| if df.empty: | |
| print(" Empty table") | |
| return False | |
| print(f" Found {len(df)} rows") | |
| os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) | |
| writers = { | |
| "csv": lambda: df.to_csv(output_path, index=False), | |
| "excel": lambda: df.to_excel(output_path, index=False), | |
| "json": lambda: df.to_json(output_path, orient="records", indent=2), | |
| "parquet": lambda: df.to_parquet(output_path, index=False), | |
| } | |
| writers.get(fmt, writers["csv"])() | |
| print(f" Exported to {output_path}") | |
| return True | |
| except Exception as e: | |
| print(f" Error: {e}") | |
| return False | |
| # --- CLI --- | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="GBQ-Utils: Transfer data between BigQuery and PostgreSQL", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python3 main.py download --dataset demo | |
| python3 main.py download --dataset demo --tables MyTable MySecondTable | |
| python3 main.py upload --schema demo | |
| python3 main.py export --source bq --dataset demo --table MyTable -o exports/mytable.csv | |
| python3 main.py list --source bq --dataset demo | |
| """ | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", help="Available commands") | |
| # Download | |
| dl = subparsers.add_parser("download", help="Download from BigQuery to PostgreSQL") | |
| dl.add_argument("--dataset", "-d", required=True) | |
| dl.add_argument("--tables", "-t", nargs="+") | |
| dl.add_argument("--schema", "-s") | |
| dl.add_argument("--if-exists", choices=["replace", "append", "fail"], default="replace") | |
| # Upload | |
| ul = subparsers.add_parser("upload", help="Upload from PostgreSQL to BigQuery") | |
| ul.add_argument("--schema", "-s", required=True) | |
| ul.add_argument("--tables", "-t", nargs="+") | |
| ul.add_argument("--dataset", "-d") | |
| ul.add_argument("--if-exists", choices=["replace", "append", "fail"], default="replace") | |
| # Export | |
| ex = subparsers.add_parser("export", help="Export table/view to file") | |
| ex.add_argument("--source", choices=["bq", "pg"], required=True) | |
| ex.add_argument("--dataset", "-d") | |
| ex.add_argument("--schema", "-s") | |
| ex.add_argument("--table", "-t", required=True) | |
| ex.add_argument("--output", "-o", required=True) | |
| ex.add_argument("--format", "-f", choices=["csv", "excel", "json", "parquet"]) | |
| # List | |
| ls = subparsers.add_parser("list", help="List tables in dataset/schema") | |
| ls.add_argument("--source", choices=["bq", "pg"], required=True) | |
| ls.add_argument("--dataset", "-d") | |
| ls.add_argument("--schema", "-s") | |
| args = parser.parse_args() | |
| if args.command is None: | |
| parser.print_help() | |
| sys.exit(1) | |
| if args.command == "download": | |
| results = process_tables( | |
| args.tables, download_table, | |
| lambda: list_bq_tables(args.dataset), "Download", | |
| dataset=args.dataset, pg_schema=args.schema, if_exists=args.if_exists | |
| ) | |
| sys.exit(0 if not results["failed"] else 1) | |
| elif args.command == "upload": | |
| results = process_tables( | |
| args.tables, upload_table, | |
| lambda: list_pg_tables(args.schema), "Upload", | |
| pg_schema=args.schema, bq_dataset=args.dataset, if_exists=args.if_exists | |
| ) | |
| sys.exit(0 if not results["failed"] else 1) | |
| elif args.command == "export": | |
| fmt = args.format or {".csv": "csv", ".xlsx": "excel", ".xls": "excel", | |
| ".json": "json", ".parquet": "parquet"}.get( | |
| os.path.splitext(args.output)[1].lower(), "csv") | |
| ds = args.dataset if args.source == "bq" else args.schema | |
| if not ds: | |
| print(f"Error: --{'dataset' if args.source == 'bq' else 'schema'} required") | |
| sys.exit(1) | |
| success = export_table(args.source, ds, args.table, args.output, fmt) | |
| sys.exit(0 if success else 1) | |
| elif args.command == "list": | |
| ds = args.dataset if args.source == "bq" else args.schema | |
| if not ds: | |
| print(f"Error: --{'dataset' if args.source == 'bq' else 'schema'} required") | |
| sys.exit(1) | |
| tables = list_bq_tables(ds) if args.source == "bq" else list_pg_tables(ds) | |
| print(f"Found {len(tables)} tables:") | |
| for t in sorted(tables): | |
| print(f" {t}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment