Skip to content

Instantly share code, notes, and snippets.

@wlinds
Created December 19, 2025 09:03
Show Gist options
  • Select an option

  • Save wlinds/639ee01c640b702d8da99d1339496c07 to your computer and use it in GitHub Desktop.

Select an option

Save wlinds/639ee01c640b702d8da99d1339496c07 to your computer and use it in GitHub Desktop.
CLI Util: GBQ / PostgreSQL | CLI tool for transferring data between Google BigQuery and PostgreSQL
"""
GBQ-Utils: CLI tool for transferring data between Google BigQuery and PostgreSQL
REQ:
pip install pandas pandas-gbq psycopg2-binary python-dotenv google-cloud-bigquery sqlalchemy
CONFIG:
1. database.ini - PostgreSQL connection settings (host, port, user, sslmode)
2. .env file - DB_PASSWORD and DB_NAME (defaults to 'default_pool')
3. Google Cloud credentials - Uses default application credentials (gcloud auth)
USAGE:
# Download all tables from BigQuery dataset 'demo' to PostgreSQL schema 'demo'
python3 main.py download --dataset demo
# Download specific tables only
python3 main.py download --dataset demo --tables users orders
# Download to a different PostgreSQL schema
python3 main.py download --dataset demo --schema my_schema
# Upload all tables from PostgreSQL schema to BigQuery
python3 main.py upload --schema demo
# Export a BigQuery table to CSV
python3 main.py export --source bq --dataset demo --table users -o exports/users.csv
# Export a PostgreSQL table to Excel
python3 main.py export --source pg --schema demo --table users -o exports/users.xlsx
# List all tables in a BigQuery dataset
python3 main.py list --source bq --dataset demo
# Append instead of replace existing tables
python3 main.py download --dataset demo --if-exists append
"""
import argparse, os, sys
from configparser import ConfigParser
from contextlib import contextmanager
import pandas as pd
import pandas_gbq
from dotenv import load_dotenv
from google.cloud import bigquery
from sqlalchemy import create_engine, text
def get_pg_engine():
load_dotenv()
parser = ConfigParser()
parser.read(os.path.join(os.path.dirname(__file__), "database.ini"))
if not parser.has_section("postgresql"):
raise RuntimeError("Section postgresql not found in database.ini")
p = dict(parser.items("postgresql"))
p["password"] = os.getenv("DB_PASSWORD")
p["database"] = os.getenv("DB_NAME", "default_pool")
conn_str = f"postgresql+psycopg2://{p['user']}:{p['password']}@{p['host']}:{p['port']}/{p['database']}"
if p.get("sslmode"):
conn_str += f"?sslmode={p['sslmode']}"
return create_engine(conn_str)
@contextmanager
def pg_engine():
engine = get_pg_engine()
try:
yield engine
finally:
engine.dispose()
@contextmanager
def bq_client():
client = bigquery.Client()
try:
yield client
finally:
client.close()
# --- List tables ---
def list_bq_tables(dataset):
with bq_client() as client:
return [t.table_id for t in client.list_tables(client.dataset(dataset))]
def list_pg_tables(schema):
with pg_engine() as engine:
with engine.connect() as conn:
result = conn.execute(text(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema = :schema AND table_type = 'BASE TABLE'"
), {"schema": schema})
return [row[0] for row in result]
# --- Download: BigQuery -> PostgreSQL ---
def download_table(dataset, table_name, pg_schema=None, if_exists="replace"):
pg_schema = pg_schema or dataset
print(f"Downloading {dataset}.{table_name} -> {pg_schema}.{table_name}")
try:
with bq_client() as client, pg_engine() as engine:
with engine.connect() as conn:
conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{pg_schema}"'))
conn.commit()
df = client.query(f"SELECT * FROM `{dataset}`.`{table_name}`").to_dataframe()
if df.empty:
print(" Empty, skipping")
return True
for col in df.columns:
if df[col].dtype == "object":
df[col] = df[col].astype(str).replace("nan", None)
elif pd.api.types.is_datetime64_any_dtype(df[col]):
df[col] = pd.to_datetime(df[col], errors="coerce")
df.to_sql(table_name, engine, schema=pg_schema, if_exists=if_exists,
index=False, method="multi", chunksize=10000)
print(f" Wrote {len(df)} rows")
return True
except Exception as e:
print(f" Error: {e}")
return False
# --- Upload: PostgreSQL -> BigQuery ---
def upload_table(pg_schema, table_name, bq_dataset=None, if_exists="replace"):
bq_dataset = bq_dataset or pg_schema
print(f"Uploading {pg_schema}.{table_name} -> {bq_dataset}.{table_name}")
try:
with pg_engine() as engine:
df = pd.read_sql_query(f'SELECT * FROM "{pg_schema}"."{table_name}"', engine)
if df.empty:
print(" Empty, skipping")
return True
pandas_gbq.to_gbq(df, f"{bq_dataset}.{table_name}", if_exists=if_exists, progress_bar=True)
print(f" Uploaded {len(df)} rows")
return True
except Exception as e:
print(f" Error: {e}")
return False
# --- Batch operations ---
def process_tables(tables, operation, list_func, name, **kwargs):
if tables is None:
tables = list_func()
print(f"Found {len(tables)} tables")
results = {"successful": [], "failed": []}
for i, table in enumerate(tables, 1):
print(f"\n[{i}/{len(tables)}] {table}")
if operation(table_name=table, **kwargs):
results["successful"].append(table)
else:
results["failed"].append(table)
print(f"\n=== {name} Summary ===")
print(f"Successful: {len(results['successful'])}, Failed: {len(results['failed'])}")
return results
# --- Export ---
def export_table(source, dataset_or_schema, table_name, output_path, fmt="csv"):
print(f"Exporting {dataset_or_schema}.{table_name} to {output_path}")
try:
if source == "bq":
with bq_client() as client:
df = client.query(f"SELECT * FROM `{dataset_or_schema}`.`{table_name}`").to_dataframe()
else:
with pg_engine() as engine:
df = pd.read_sql_query(f'SELECT * FROM "{dataset_or_schema}"."{table_name}"', engine)
if df.empty:
print(" Empty table")
return False
print(f" Found {len(df)} rows")
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
writers = {
"csv": lambda: df.to_csv(output_path, index=False),
"excel": lambda: df.to_excel(output_path, index=False),
"json": lambda: df.to_json(output_path, orient="records", indent=2),
"parquet": lambda: df.to_parquet(output_path, index=False),
}
writers.get(fmt, writers["csv"])()
print(f" Exported to {output_path}")
return True
except Exception as e:
print(f" Error: {e}")
return False
# --- CLI ---
def main():
parser = argparse.ArgumentParser(
description="GBQ-Utils: Transfer data between BigQuery and PostgreSQL",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 main.py download --dataset demo
python3 main.py download --dataset demo --tables MyTable MySecondTable
python3 main.py upload --schema demo
python3 main.py export --source bq --dataset demo --table MyTable -o exports/mytable.csv
python3 main.py list --source bq --dataset demo
"""
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Download
dl = subparsers.add_parser("download", help="Download from BigQuery to PostgreSQL")
dl.add_argument("--dataset", "-d", required=True)
dl.add_argument("--tables", "-t", nargs="+")
dl.add_argument("--schema", "-s")
dl.add_argument("--if-exists", choices=["replace", "append", "fail"], default="replace")
# Upload
ul = subparsers.add_parser("upload", help="Upload from PostgreSQL to BigQuery")
ul.add_argument("--schema", "-s", required=True)
ul.add_argument("--tables", "-t", nargs="+")
ul.add_argument("--dataset", "-d")
ul.add_argument("--if-exists", choices=["replace", "append", "fail"], default="replace")
# Export
ex = subparsers.add_parser("export", help="Export table/view to file")
ex.add_argument("--source", choices=["bq", "pg"], required=True)
ex.add_argument("--dataset", "-d")
ex.add_argument("--schema", "-s")
ex.add_argument("--table", "-t", required=True)
ex.add_argument("--output", "-o", required=True)
ex.add_argument("--format", "-f", choices=["csv", "excel", "json", "parquet"])
# List
ls = subparsers.add_parser("list", help="List tables in dataset/schema")
ls.add_argument("--source", choices=["bq", "pg"], required=True)
ls.add_argument("--dataset", "-d")
ls.add_argument("--schema", "-s")
args = parser.parse_args()
if args.command is None:
parser.print_help()
sys.exit(1)
if args.command == "download":
results = process_tables(
args.tables, download_table,
lambda: list_bq_tables(args.dataset), "Download",
dataset=args.dataset, pg_schema=args.schema, if_exists=args.if_exists
)
sys.exit(0 if not results["failed"] else 1)
elif args.command == "upload":
results = process_tables(
args.tables, upload_table,
lambda: list_pg_tables(args.schema), "Upload",
pg_schema=args.schema, bq_dataset=args.dataset, if_exists=args.if_exists
)
sys.exit(0 if not results["failed"] else 1)
elif args.command == "export":
fmt = args.format or {".csv": "csv", ".xlsx": "excel", ".xls": "excel",
".json": "json", ".parquet": "parquet"}.get(
os.path.splitext(args.output)[1].lower(), "csv")
ds = args.dataset if args.source == "bq" else args.schema
if not ds:
print(f"Error: --{'dataset' if args.source == 'bq' else 'schema'} required")
sys.exit(1)
success = export_table(args.source, ds, args.table, args.output, fmt)
sys.exit(0 if success else 1)
elif args.command == "list":
ds = args.dataset if args.source == "bq" else args.schema
if not ds:
print(f"Error: --{'dataset' if args.source == 'bq' else 'schema'} required")
sys.exit(1)
tables = list_bq_tables(ds) if args.source == "bq" else list_pg_tables(ds)
print(f"Found {len(tables)} tables:")
for t in sorted(tables):
print(f" {t}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment