Created
February 3, 2026 22:14
-
-
Save adamski/c7fca78beb55dc8414bac00ca14d4653 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Shopify Configuration (required) | |
| # Get these from Dev Dashboard > Settings | |
| SHOPIFY_STORE=your-store-name | |
| SHOPIFY_CLIENT_ID=your-client-id | |
| SHOPIFY_CLIENT_SECRET=your-client-secret | |
| # PayPal Configuration (optional - for PayPal fee tracking) | |
| PAYPAL_CLIENT_ID=your-client-id | |
| PAYPAL_SECRET=your-secret | |
| PAYPAL_BASE_URL=https://api-m.paypal.com | |
| # Use https://api-m.sandbox.paypal.com for testing |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Shopify Sales Data Extractor with PayPal Fee Integration | |
| Requirements: | |
| pip install requests python-dotenv | |
| Setup: | |
| 1. Create a Shopify Custom App in your store admin | |
| - Settings > Apps and sales channels > Develop apps | |
| - Create app, configure scopes: read_orders, read_shopify_payments_payouts | |
| - Install and copy the Admin API access token | |
| 2. Create a PayPal REST API app at developer.paypal.com | |
| - Get Client ID and Secret | |
| 3. Create .env file with credentials (see .env.example) | |
| Usage: | |
| python shopify_reports.py weekly --product 123456789 --week 2025-01-06 | |
| python shopify_reports.py weekly --product 123456789 --week 2025-01-06 --csv sales.csv | |
| python shopify_reports.py monthly --month 2025-01 | |
| python shopify_reports.py products --month 2025-01 | |
| python shopify_reports.py products --month 2025-01 --csv products.csv | |
| """ | |
| import os | |
| import json | |
| import csv | |
| import calendar | |
| import argparse | |
| import requests | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| from enum import Enum | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass # dotenv is optional | |
| # ============================================================================= | |
| # Configuration | |
| # ============================================================================= | |
| SHOPIFY_STORE = os.getenv("SHOPIFY_STORE") # e.g., "my-store" (without .myshopify.com) | |
| SHOPIFY_CLIENT_ID = os.getenv("SHOPIFY_CLIENT_ID") | |
| SHOPIFY_CLIENT_SECRET = os.getenv("SHOPIFY_CLIENT_SECRET") | |
| SHOPIFY_API_VERSION = "2025-01" | |
| PAYPAL_CLIENT_ID = os.getenv("PAYPAL_CLIENT_ID") | |
| PAYPAL_SECRET = os.getenv("PAYPAL_SECRET") | |
| PAYPAL_BASE_URL = os.getenv("PAYPAL_BASE_URL", "https://api-m.paypal.com") # or api-m.sandbox.paypal.com | |
| # Token cache (in-memory, refreshes each run if expired) | |
| _shopify_token_cache: dict = {} | |
| def get_shopify_access_token() -> str: | |
| """ | |
| Get Shopify access token using client credentials grant. | |
| Caches the token and refreshes if expired. | |
| """ | |
| global _shopify_token_cache | |
| now = datetime.now().timestamp() | |
| # Check if we have a valid cached token | |
| if _shopify_token_cache.get("token") and _shopify_token_cache.get("expires_at", 0) > now: | |
| return _shopify_token_cache["token"] | |
| # Fetch new token | |
| resp = requests.post( | |
| f"https://{SHOPIFY_STORE}.myshopify.com/admin/oauth/access_token", | |
| headers={"Content-Type": "application/x-www-form-urlencoded"}, | |
| data={ | |
| "grant_type": "client_credentials", | |
| "client_id": SHOPIFY_CLIENT_ID, | |
| "client_secret": SHOPIFY_CLIENT_SECRET, | |
| }, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| # Cache with expiry (subtract 60 seconds for safety margin) | |
| _shopify_token_cache = { | |
| "token": data["access_token"], | |
| "expires_at": now + data.get("expires_in", 86399) - 60, | |
| } | |
| return _shopify_token_cache["token"] | |
| # ============================================================================= | |
| # Data Models | |
| # ============================================================================= | |
| class ReportType(Enum): | |
| WEEKLY_SALES = "weekly_sales" | |
| MONTHLY_REVENUE = "monthly_revenue" | |
| @dataclass | |
| class SaleRecord: | |
| order_id: str | |
| order_name: str | |
| created_at: str | |
| gross_amount: float | |
| subtotal: float | |
| tax: float | |
| shipping: float | |
| discounts: float | |
| payment_gateway: str | |
| gateway_transaction_id: Optional[str] | |
| shopify_fee: Optional[float] | |
| paypal_fee: Optional[float] | |
| net_amount: Optional[float] | |
| @dataclass | |
| class WeeklySalesReport: | |
| """Order-level detail for a given week.""" | |
| week_start: str | |
| week_end: str | |
| product_id: str | |
| orders: list[SaleRecord] = field(default_factory=list) | |
| @property | |
| def total_orders(self) -> int: | |
| return len(self.orders) | |
| @property | |
| def gross_revenue(self) -> float: | |
| return sum(o.gross_amount for o in self.orders) | |
| @property | |
| def total_tax(self) -> float: | |
| return sum(o.tax for o in self.orders) | |
| @property | |
| def total_shipping(self) -> float: | |
| return sum(o.shipping for o in self.orders) | |
| @property | |
| def total_discounts(self) -> float: | |
| return sum(o.discounts for o in self.orders) | |
| @property | |
| def paypal_fees(self) -> float: | |
| return sum(o.paypal_fee or 0 for o in self.orders) | |
| def to_csv(self, filename: str): | |
| """Export order-level detail to CSV.""" | |
| with open(filename, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ | |
| "Order", "Date", "Gross", "Subtotal", "Tax", | |
| "Shipping", "Discounts", "Gateway", "PayPal Fee" | |
| ]) | |
| for o in self.orders: | |
| writer.writerow([ | |
| o.order_name, o.created_at, o.gross_amount, o.subtotal, | |
| o.tax, o.shipping, o.discounts, o.payment_gateway, o.paypal_fee or "" | |
| ]) | |
| @dataclass | |
| class MonthlyRevenueReport: | |
| """Payout-aligned report with accurate Shopify fees.""" | |
| month: str # "2025-01" | |
| payouts: list[dict] = field(default_factory=list) | |
| paypal_transactions: list[dict] = field(default_factory=list) | |
| @property | |
| def shopify_gross(self) -> float: | |
| return sum(float(p.get("gross", {}).get("amount", 0)) for p in self.payouts) | |
| @property | |
| def shopify_fees(self) -> float: | |
| total = 0 | |
| for p in self.payouts: | |
| summary = p.get("summary", {}) | |
| total += abs(float(summary.get("chargesFee", {}).get("amount", 0))) | |
| total += abs(float(summary.get("adjustmentsFee", {}).get("amount", 0))) | |
| return total | |
| @property | |
| def shopify_net(self) -> float: | |
| return sum(float(p.get("net", {}).get("amount", 0)) for p in self.payouts) | |
| @property | |
| def paypal_gross(self) -> float: | |
| return sum(t["gross"] for t in self.paypal_transactions) | |
| @property | |
| def paypal_fees(self) -> float: | |
| return sum(abs(t["fee"]) for t in self.paypal_transactions) | |
| @property | |
| def paypal_net(self) -> float: | |
| return sum(t["net"] for t in self.paypal_transactions) | |
| @property | |
| def total_gross(self) -> float: | |
| return self.shopify_gross + self.paypal_gross | |
| @property | |
| def total_fees(self) -> float: | |
| return self.shopify_fees + self.paypal_fees | |
| @property | |
| def total_net(self) -> float: | |
| return self.shopify_net + self.paypal_net | |
| @dataclass | |
| class ProductRevenue: | |
| """Revenue data for a single product.""" | |
| product_id: str | |
| product_title: str | |
| units_sold: int | |
| gross_revenue: float | |
| tax: float | |
| discounts: float | |
| allocated_fees: float | |
| @property | |
| def net_revenue(self) -> float: | |
| """Net = Gross - Tax - Allocated Fees (discounts already reflected in gross).""" | |
| return self.gross_revenue - self.tax - self.allocated_fees | |
| @dataclass | |
| class ProductRevenueReport: | |
| """Per-product revenue breakdown for a month.""" | |
| month: str | |
| start_date: str | |
| end_date: str | |
| products: list[ProductRevenue] = field(default_factory=list) | |
| total_fees: float = 0.0 | |
| shopify_fees: float = 0.0 | |
| paypal_fees: float = 0.0 | |
| @property | |
| def total_gross(self) -> float: | |
| return sum(p.gross_revenue for p in self.products) | |
| @property | |
| def total_tax(self) -> float: | |
| return sum(p.tax for p in self.products) | |
| @property | |
| def total_discounts(self) -> float: | |
| return sum(p.discounts for p in self.products) | |
| @property | |
| def total_units(self) -> int: | |
| return sum(p.units_sold for p in self.products) | |
| @property | |
| def total_net(self) -> float: | |
| return sum(p.net_revenue for p in self.products) | |
| def to_csv(self, filename: str): | |
| """Export product revenue to CSV.""" | |
| with open(filename, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ | |
| "Product ID", "Product", "Units", "Gross", "Tax", | |
| "Discounts", "Allocated Fees", "Net Revenue" | |
| ]) | |
| for p in sorted(self.products, key=lambda x: x.net_revenue, reverse=True): | |
| writer.writerow([ | |
| p.product_id, p.product_title, p.units_sold, | |
| f"{p.gross_revenue:.2f}", f"{p.tax:.2f}", | |
| f"{p.discounts:.2f}", f"{p.allocated_fees:.2f}", | |
| f"{p.net_revenue:.2f}" | |
| ]) | |
| # Totals row | |
| writer.writerow([ | |
| "", "TOTAL", self.total_units, | |
| f"{self.total_gross:.2f}", f"{self.total_tax:.2f}", | |
| f"{self.total_discounts:.2f}", f"{self.total_fees:.2f}", | |
| f"{self.total_net:.2f}" | |
| ]) | |
| # ============================================================================= | |
| # Shopify GraphQL Client | |
| # ============================================================================= | |
| class ShopifyGraphQL: | |
| def __init__(self, store: str, version: str): | |
| self.store = store | |
| self.endpoint = f"https://{store}.myshopify.com/admin/api/{version}/graphql.json" | |
| def _get_headers(self) -> dict: | |
| """Get headers with fresh access token.""" | |
| return { | |
| "Content-Type": "application/json", | |
| "X-Shopify-Access-Token": get_shopify_access_token(), | |
| } | |
| def execute(self, query: str, variables: dict = None) -> dict: | |
| payload = {"query": query} | |
| if variables: | |
| payload["variables"] = variables | |
| resp = requests.post(self.endpoint, json=payload, headers=self._get_headers()) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if "errors" in data: | |
| raise Exception(f"GraphQL errors: {data['errors']}") | |
| return data | |
| def get_orders(self, product_id: str, start_date: str, end_date: str) -> list[dict]: | |
| """Fetch orders containing a specific product within date range.""" | |
| query = """ | |
| query GetOrders($query: String!, $cursor: String) { | |
| orders(first: 50, query: $query, after: $cursor) { | |
| edges { | |
| node { | |
| id | |
| name | |
| createdAt | |
| totalPriceSet { shopMoney { amount currencyCode } } | |
| subtotalPriceSet { shopMoney { amount currencyCode } } | |
| currentTotalTaxSet { shopMoney { amount currencyCode } } | |
| totalShippingPriceSet { shopMoney { amount currencyCode } } | |
| currentTotalDiscountsSet { shopMoney { amount currencyCode } } | |
| totalRefundedSet { shopMoney { amount currencyCode } } | |
| taxLines { title rate priceSet { shopMoney { amount } } } | |
| transactions(first: 10) { | |
| gateway | |
| kind | |
| status | |
| amountSet { shopMoney { amount } } | |
| receiptJson | |
| } | |
| lineItems(first: 50) { | |
| edges { | |
| node { | |
| product { id } | |
| quantity | |
| originalTotalSet { shopMoney { amount } } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| pageInfo { | |
| hasNextPage | |
| endCursor | |
| } | |
| } | |
| } | |
| """ | |
| query_filter = f"created_at:>={start_date} created_at:<={end_date} financial_status:paid" | |
| all_orders = [] | |
| cursor = None | |
| while True: | |
| result = self.execute(query, {"query": query_filter, "cursor": cursor}) | |
| orders_data = result.get("data", {}).get("orders", {}) | |
| for edge in orders_data.get("edges", []): | |
| order = edge["node"] | |
| # Filter for orders containing the target product | |
| if product_id: | |
| has_product = any( | |
| item["node"]["product"] and | |
| item["node"]["product"]["id"] == f"gid://shopify/Product/{product_id}" | |
| for item in order["lineItems"]["edges"] | |
| ) | |
| if not has_product: | |
| continue | |
| all_orders.append(order) | |
| page_info = orders_data.get("pageInfo", {}) | |
| if not page_info.get("hasNextPage"): | |
| break | |
| cursor = page_info.get("endCursor") | |
| return all_orders | |
| def get_payouts(self, start_date: str, end_date: str) -> dict: | |
| """Fetch Shopify Payments payouts with fee breakdown.""" | |
| query = """ | |
| query GetPayouts($cursor: String) { | |
| shopifyPaymentsAccount { | |
| payouts(first: 50, after: $cursor) { | |
| edges { | |
| node { | |
| id | |
| issuedAt | |
| net { amount currencyCode } | |
| gross { amount currencyCode } | |
| summary { | |
| chargesFee { amount currencyCode } | |
| adjustmentsFee { amount currencyCode } | |
| refundsFee { amount currencyCode } | |
| } | |
| } | |
| } | |
| pageInfo { | |
| hasNextPage | |
| endCursor | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| all_payouts = [] | |
| cursor = None | |
| while True: | |
| result = self.execute(query, {"cursor": cursor}) | |
| account = result.get("data", {}).get("shopifyPaymentsAccount") | |
| if not account: | |
| break | |
| payouts_data = account.get("payouts", {}) | |
| for edge in payouts_data.get("edges", []): | |
| payout = edge["node"] | |
| # Filter by date range | |
| issued = payout.get("issuedAt", "")[:10] | |
| if start_date <= issued <= end_date: | |
| all_payouts.append(payout) | |
| page_info = payouts_data.get("pageInfo", {}) | |
| if not page_info.get("hasNextPage"): | |
| break | |
| cursor = page_info.get("endCursor") | |
| return all_payouts | |
| def get_all_orders_with_line_items(self, start_date: str, end_date: str) -> list[dict]: | |
| """Fetch all orders with line item details for product revenue report.""" | |
| query = """ | |
| query GetOrdersWithLineItems($query: String!, $cursor: String) { | |
| orders(first: 50, query: $query, after: $cursor) { | |
| edges { | |
| node { | |
| id | |
| name | |
| createdAt | |
| totalPriceSet { shopMoney { amount } } | |
| lineItems(first: 100) { | |
| edges { | |
| node { | |
| product { id title } | |
| title | |
| quantity | |
| originalTotalSet { shopMoney { amount } } | |
| discountedTotalSet { shopMoney { amount } } | |
| taxLines { priceSet { shopMoney { amount } } } | |
| } | |
| } | |
| } | |
| transactions(first: 5) { | |
| gateway | |
| kind | |
| status | |
| } | |
| } | |
| } | |
| pageInfo { | |
| hasNextPage | |
| endCursor | |
| } | |
| } | |
| } | |
| """ | |
| query_filter = f"created_at:>={start_date} created_at:<={end_date} financial_status:paid" | |
| all_orders = [] | |
| cursor = None | |
| while True: | |
| result = self.execute(query, {"query": query_filter, "cursor": cursor}) | |
| orders_data = result.get("data", {}).get("orders", {}) | |
| for edge in orders_data.get("edges", []): | |
| all_orders.append(edge["node"]) | |
| page_info = orders_data.get("pageInfo", {}) | |
| if not page_info.get("hasNextPage"): | |
| break | |
| cursor = page_info.get("endCursor") | |
| return all_orders | |
| # ============================================================================= | |
| # PayPal Client | |
| # ============================================================================= | |
| class PayPalClient: | |
| def __init__(self, client_id: str, secret: str, base_url: str): | |
| self.base_url = base_url | |
| self.client_id = client_id | |
| self.secret = secret | |
| self.access_token = None | |
| def authenticate(self): | |
| """Get OAuth token from PayPal.""" | |
| resp = requests.post( | |
| f"{self.base_url}/v1/oauth2/token", | |
| auth=(self.client_id, self.secret), | |
| data={"grant_type": "client_credentials"}, | |
| headers={"Accept": "application/json"}, | |
| ) | |
| resp.raise_for_status() | |
| self.access_token = resp.json()["access_token"] | |
| def get_transactions(self, start_date: str, end_date: str) -> list[dict]: | |
| """ | |
| Fetch all transactions in date range. | |
| Returns list with transaction_id, gross, fee, net amounts. | |
| """ | |
| if not self.access_token: | |
| self.authenticate() | |
| transactions = [] | |
| page = 1 | |
| while True: | |
| resp = requests.get( | |
| f"{self.base_url}/v1/reporting/transactions", | |
| headers={"Authorization": f"Bearer {self.access_token}"}, | |
| params={ | |
| "start_date": f"{start_date}T00:00:00Z", | |
| "end_date": f"{end_date}T23:59:59Z", | |
| "fields": "transaction_info,payer_info,cart_info", | |
| "page_size": 100, | |
| "page": page, | |
| }, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| for txn in data.get("transaction_details", []): | |
| info = txn.get("transaction_info", {}) | |
| gross = float(info.get("transaction_amount", {}).get("value", 0)) | |
| fee = float(info.get("fee_amount", {}).get("value", 0)) | |
| transactions.append({ | |
| "transaction_id": info.get("transaction_id"), | |
| "gross": gross, | |
| "fee": fee, | |
| "net": gross - abs(fee), | |
| }) | |
| if len(data.get("transaction_details", [])) < 100: | |
| break | |
| page += 1 | |
| return transactions | |
| # ============================================================================= | |
| # Utility Functions | |
| # ============================================================================= | |
| def extract_paypal_txn_id(transaction: dict) -> Optional[str]: | |
| """Extract PayPal transaction ID from Shopify transaction receipt.""" | |
| receipt = transaction.get("receiptJson") or transaction.get("receipt") or {} | |
| if isinstance(receipt, str): | |
| try: | |
| receipt = json.loads(receipt) | |
| except json.JSONDecodeError: | |
| return None | |
| # PayPal stores ID in various places depending on integration | |
| return ( | |
| receipt.get("id") or | |
| receipt.get("transaction_id") or | |
| receipt.get("paypal_transaction_id") | |
| ) | |
| def validate_config(): | |
| """Check that required environment variables are set.""" | |
| missing = [] | |
| if not SHOPIFY_STORE: | |
| missing.append("SHOPIFY_STORE") | |
| if not SHOPIFY_CLIENT_ID: | |
| missing.append("SHOPIFY_CLIENT_ID") | |
| if not SHOPIFY_CLIENT_SECRET: | |
| missing.append("SHOPIFY_CLIENT_SECRET") | |
| if missing: | |
| print(f"Error: Missing required environment variables: {', '.join(missing)}") | |
| print("\nCreate a .env file with:") | |
| print(" SHOPIFY_STORE=your-store-name") | |
| print(" SHOPIFY_CLIENT_ID=your-client-id") | |
| print(" SHOPIFY_CLIENT_SECRET=your-client-secret") | |
| print(" PAYPAL_CLIENT_ID=xxxxx (optional)") | |
| print(" PAYPAL_SECRET=xxxxx (optional)") | |
| return False | |
| return True | |
| # ============================================================================= | |
| # Report Generators | |
| # ============================================================================= | |
| def generate_weekly_sales(product_id: str, week_start: str) -> WeeklySalesReport: | |
| """Generate weekly sales report for a specific product.""" | |
| start = datetime.strptime(week_start, "%Y-%m-%d") | |
| end = start + timedelta(days=6) | |
| end_str = end.strftime("%Y-%m-%d") | |
| shopify = ShopifyGraphQL(SHOPIFY_STORE, SHOPIFY_API_VERSION) | |
| # PayPal is optional | |
| paypal_txns = {} | |
| if PAYPAL_CLIENT_ID and PAYPAL_SECRET: | |
| paypal = PayPalClient(PAYPAL_CLIENT_ID, PAYPAL_SECRET, PAYPAL_BASE_URL) | |
| try: | |
| paypal_txns = {t["transaction_id"]: t for t in paypal.get_transactions(week_start, end_str)} | |
| except Exception as e: | |
| print(f"Warning: Could not fetch PayPal transactions: {e}") | |
| orders = shopify.get_orders(product_id, week_start, end_str) | |
| records = [] | |
| for order in orders: | |
| payment_txn = next( | |
| (t for t in order.get("transactions", []) | |
| if t["kind"] in ("sale", "capture") and t["status"] == "SUCCESS"), | |
| {} | |
| ) | |
| gateway = payment_txn.get("gateway", "unknown") | |
| paypal_txn_id = extract_paypal_txn_id(payment_txn) | |
| paypal_fee = abs(paypal_txns[paypal_txn_id]["fee"]) if paypal_txn_id and paypal_txn_id in paypal_txns else None | |
| gross = float(order["totalPriceSet"]["shopMoney"]["amount"]) | |
| records.append(SaleRecord( | |
| order_id=order["id"], | |
| order_name=order["name"], | |
| created_at=order["createdAt"][:10], | |
| gross_amount=gross, | |
| subtotal=float(order["subtotalPriceSet"]["shopMoney"]["amount"]), | |
| tax=float(order["currentTotalTaxSet"]["shopMoney"]["amount"]), | |
| shipping=float(order["totalShippingPriceSet"]["shopMoney"]["amount"]), | |
| discounts=float(order["currentTotalDiscountsSet"]["shopMoney"]["amount"]), | |
| payment_gateway=gateway, | |
| gateway_transaction_id=paypal_txn_id, | |
| shopify_fee=None, | |
| paypal_fee=paypal_fee, | |
| net_amount=None, | |
| )) | |
| return WeeklySalesReport( | |
| week_start=week_start, | |
| week_end=end_str, | |
| product_id=product_id, | |
| orders=records, | |
| ) | |
| def generate_monthly_revenue(month: str) -> MonthlyRevenueReport: | |
| """ | |
| Generate monthly revenue report aligned with payout periods. | |
| Includes accurate Shopify Payments fees. | |
| Args: | |
| month: "2025-01" format | |
| """ | |
| year, mon = map(int, month.split("-")) | |
| start_date = f"{month}-01" | |
| last_day = calendar.monthrange(year, mon)[1] | |
| end_date = f"{month}-{last_day:02d}" | |
| shopify = ShopifyGraphQL(SHOPIFY_STORE, SHOPIFY_API_VERSION) | |
| payouts = shopify.get_payouts(start_date, end_date) | |
| # PayPal is optional | |
| paypal_txns = [] | |
| if PAYPAL_CLIENT_ID and PAYPAL_SECRET: | |
| paypal = PayPalClient(PAYPAL_CLIENT_ID, PAYPAL_SECRET, PAYPAL_BASE_URL) | |
| try: | |
| paypal_txns = paypal.get_transactions(start_date, end_date) | |
| except Exception as e: | |
| print(f"Warning: Could not fetch PayPal transactions: {e}") | |
| return MonthlyRevenueReport( | |
| month=month, | |
| payouts=payouts, | |
| paypal_transactions=paypal_txns, | |
| ) | |
| def _get_order_gateway(order: dict) -> str: | |
| """Determine which payment gateway an order used.""" | |
| for txn in order.get("transactions", []): | |
| status = txn.get("status", "").upper() | |
| kind = txn.get("kind", "").lower() | |
| if kind in ("sale", "capture") and status == "SUCCESS": | |
| gateway = txn.get("gateway", "").lower() | |
| if "paypal" in gateway: | |
| return "paypal" | |
| elif "shopify_payments" in gateway or gateway == "shopify payments": | |
| return "shopify_payments" | |
| else: | |
| return gateway | |
| return "unknown" | |
| def generate_product_revenue(month: str, filter_product_id: str = None) -> ProductRevenueReport: | |
| """ | |
| Generate per-product revenue report with gateway-aware fee allocation. | |
| Shopify Payments fees are allocated only to products sold via Shopify Payments. | |
| PayPal fees are allocated only to products sold via PayPal. | |
| Args: | |
| month: "2025-01" format | |
| filter_product_id: Optional product ID to filter to a single product | |
| """ | |
| year, mon = map(int, month.split("-")) | |
| start_date = f"{month}-01" | |
| last_day = calendar.monthrange(year, mon)[1] | |
| end_date = f"{month}-{last_day:02d}" | |
| shopify = ShopifyGraphQL(SHOPIFY_STORE, SHOPIFY_API_VERSION) | |
| # Fetch all orders with line items | |
| print("Fetching orders...") | |
| orders = shopify.get_all_orders_with_line_items(start_date, end_date) | |
| print(f" Found {len(orders)} orders") | |
| # Fetch total fees from payouts | |
| print("Fetching Shopify Payments fees...") | |
| payouts = shopify.get_payouts(start_date, end_date) | |
| shopify_fees = 0.0 | |
| for p in payouts: | |
| summary = p.get("summary", {}) | |
| shopify_fees += abs(float(summary.get("chargesFee", {}).get("amount", 0))) | |
| shopify_fees += abs(float(summary.get("adjustmentsFee", {}).get("amount", 0))) | |
| print(f" Shopify fees: ${shopify_fees:.2f}") | |
| # Fetch PayPal fees | |
| paypal_fees = 0.0 | |
| if PAYPAL_CLIENT_ID and PAYPAL_SECRET: | |
| print("Fetching PayPal fees...") | |
| paypal = PayPalClient(PAYPAL_CLIENT_ID, PAYPAL_SECRET, PAYPAL_BASE_URL) | |
| try: | |
| paypal_txns = paypal.get_transactions(start_date, end_date) | |
| paypal_fees = sum(abs(t["fee"]) for t in paypal_txns) | |
| print(f" PayPal fees: ${paypal_fees:.2f}") | |
| except Exception as e: | |
| print(f" Warning: Could not fetch PayPal transactions: {e}") | |
| total_fees = shopify_fees + paypal_fees | |
| # Aggregate by product, tracking gateway breakdown | |
| product_data: dict[str, dict] = {} | |
| # Track gross by gateway for proportional allocation | |
| gateway_counts = {"shopify_payments": 0, "paypal": 0, "other": 0} | |
| for order in orders: | |
| gateway = _get_order_gateway(order) | |
| if gateway == "shopify_payments": | |
| gateway_counts["shopify_payments"] += 1 | |
| elif gateway == "paypal": | |
| gateway_counts["paypal"] += 1 | |
| else: | |
| gateway_counts["other"] += 1 | |
| for item_edge in order.get("lineItems", {}).get("edges", []): | |
| item = item_edge["node"] | |
| product = item.get("product") | |
| if product: | |
| product_id = product["id"].replace("gid://shopify/Product/", "") | |
| product_title = product.get("title") or item.get("title", "Unknown") | |
| else: | |
| product_id = "deleted" | |
| product_title = item.get("title", "Deleted/Custom Product") | |
| # Skip if filtering to a specific product | |
| if filter_product_id and product_id != filter_product_id: | |
| continue | |
| quantity = item.get("quantity", 0) | |
| gross = float(item.get("discountedTotalSet", {}).get("shopMoney", {}).get("amount", 0)) | |
| original = float(item.get("originalTotalSet", {}).get("shopMoney", {}).get("amount", 0)) | |
| discount = original - gross | |
| tax = 0.0 | |
| for tax_line in item.get("taxLines", []): | |
| tax += float(tax_line.get("priceSet", {}).get("shopMoney", {}).get("amount", 0)) | |
| if product_id not in product_data: | |
| product_data[product_id] = { | |
| "title": product_title, | |
| "units": 0, | |
| "gross": 0.0, | |
| "tax": 0.0, | |
| "discounts": 0.0, | |
| "gross_shopify": 0.0, # Gross from Shopify Payments orders | |
| "gross_paypal": 0.0, # Gross from PayPal orders | |
| } | |
| product_data[product_id]["units"] += quantity | |
| product_data[product_id]["gross"] += gross | |
| product_data[product_id]["tax"] += tax | |
| product_data[product_id]["discounts"] += discount | |
| # Track gross by gateway for this product | |
| if gateway == "shopify_payments": | |
| product_data[product_id]["gross_shopify"] += gross | |
| elif gateway == "paypal": | |
| product_data[product_id]["gross_paypal"] += gross | |
| # Calculate totals by gateway for fee allocation | |
| total_gross_shopify = sum(p["gross_shopify"] for p in product_data.values()) | |
| total_gross_paypal = sum(p["gross_paypal"] for p in product_data.values()) | |
| print(f"\nGateway breakdown:") | |
| print(f" Shopify Payments: {gateway_counts['shopify_payments']} orders, ${total_gross_shopify:,.2f} gross") | |
| print(f" PayPal: {gateway_counts['paypal']} orders, ${total_gross_paypal:,.2f} gross") | |
| if gateway_counts["other"] > 0: | |
| print(f" Other: {gateway_counts['other']} orders") | |
| # Build product revenue list with gateway-aware fee allocation | |
| products = [] | |
| for product_id, data in product_data.items(): | |
| # Allocate Shopify fees based on product's share of Shopify Payments gross | |
| if total_gross_shopify > 0: | |
| shopify_fee_share = (data["gross_shopify"] / total_gross_shopify) * shopify_fees | |
| else: | |
| shopify_fee_share = 0.0 | |
| # Allocate PayPal fees based on product's share of PayPal gross | |
| if total_gross_paypal > 0: | |
| paypal_fee_share = (data["gross_paypal"] / total_gross_paypal) * paypal_fees | |
| else: | |
| paypal_fee_share = 0.0 | |
| total_fee_share = shopify_fee_share + paypal_fee_share | |
| products.append(ProductRevenue( | |
| product_id=product_id, | |
| product_title=data["title"], | |
| units_sold=data["units"], | |
| gross_revenue=data["gross"], | |
| tax=data["tax"], | |
| discounts=data["discounts"], | |
| allocated_fees=total_fee_share, | |
| )) | |
| products.sort(key=lambda p: p.net_revenue, reverse=True) | |
| return ProductRevenueReport( | |
| month=month, | |
| start_date=start_date, | |
| end_date=end_date, | |
| products=products, | |
| total_fees=total_fees, | |
| shopify_fees=shopify_fees, | |
| paypal_fees=paypal_fees, | |
| ) | |
| # ============================================================================= | |
| # CLI Interface | |
| # ============================================================================= | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Shopify Sales & Revenue Reports", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python shopify_reports.py weekly --product 123456789 --week 2025-01-06 | |
| python shopify_reports.py weekly --product 123456789 --week 2025-01-06 --csv sales.csv | |
| python shopify_reports.py monthly --month 2025-01 | |
| python shopify_reports.py products --month 2025-01 | |
| python shopify_reports.py products --month 2025-01 --csv revenue.csv | |
| """ | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| # Weekly sales command | |
| weekly = subparsers.add_parser("weekly", help="Weekly sales report for a product") | |
| weekly.add_argument("--product", required=True, help="Shopify product ID") | |
| weekly.add_argument("--week", required=True, help="Week start date (Monday), e.g., 2025-01-06") | |
| weekly.add_argument("--csv", help="Export to CSV file") | |
| # Monthly revenue command | |
| monthly = subparsers.add_parser("monthly", help="Monthly revenue report with fees") | |
| monthly.add_argument("--month", required=True, help="Month, e.g., 2025-01") | |
| # Product revenue command | |
| products = subparsers.add_parser("products", help="Per-product revenue breakdown") | |
| products.add_argument("--month", required=True, help="Month, e.g., 2025-01") | |
| products.add_argument("--product", help="Filter to specific product ID (optional)") | |
| products.add_argument("--csv", help="Export to CSV file") | |
| args = parser.parse_args() | |
| if not validate_config(): | |
| return 1 | |
| if args.command == "weekly": | |
| report = generate_weekly_sales(args.product, args.week) | |
| print(f"\n{'='*50}") | |
| print(f"Weekly Sales: {report.week_start} to {report.week_end}") | |
| print(f"{'='*50}") | |
| print(f"Product ID: {report.product_id}") | |
| print(f"Orders: {report.total_orders}") | |
| print(f"Gross Revenue: ${report.gross_revenue:.2f}") | |
| print(f" Subtotal: ${sum(o.subtotal for o in report.orders):.2f}") | |
| print(f" Tax: ${report.total_tax:.2f}") | |
| print(f" Shipping: ${report.total_shipping:.2f}") | |
| print(f" Discounts: -${report.total_discounts:.2f}") | |
| if report.paypal_fees > 0: | |
| print(f"PayPal Fees: ${report.paypal_fees:.2f}") | |
| if args.csv: | |
| report.to_csv(args.csv) | |
| print(f"\nExported to {args.csv}") | |
| elif args.command == "monthly": | |
| report = generate_monthly_revenue(args.month) | |
| print(f"\n{'='*50}") | |
| print(f"Monthly Revenue: {report.month}") | |
| print(f"{'='*50}") | |
| if report.payouts: | |
| print(f"\nShopify Payments ({len(report.payouts)} payouts):") | |
| print(f" Gross: ${report.shopify_gross:.2f}") | |
| print(f" Fees: ${report.shopify_fees:.2f}") | |
| print(f" Net: ${report.shopify_net:.2f}") | |
| else: | |
| print("\nShopify Payments: No payouts found") | |
| if report.paypal_transactions: | |
| print(f"\nPayPal ({len(report.paypal_transactions)} transactions):") | |
| print(f" Gross: ${report.paypal_gross:.2f}") | |
| print(f" Fees: ${report.paypal_fees:.2f}") | |
| print(f" Net: ${report.paypal_net:.2f}") | |
| elif PAYPAL_CLIENT_ID: | |
| print("\nPayPal: No transactions found") | |
| else: | |
| print("\nPayPal: Not configured") | |
| print(f"\n{'-'*50}") | |
| print(f"Total Gross: ${report.total_gross:.2f}") | |
| print(f"Total Fees: ${report.total_fees:.2f}") | |
| print(f"Total Net: ${report.total_net:.2f}") | |
| elif args.command == "products": | |
| report = generate_product_revenue(args.month, args.product) | |
| print(f"\n{'='*110}") | |
| if args.product: | |
| print(f"Product Revenue: {report.month} (Product ID: {args.product})") | |
| else: | |
| print(f"Product Revenue: {report.month}") | |
| print(f"{'='*110}") | |
| if not report.products: | |
| if args.product: | |
| print(f"\nError: No data found for product ID '{args.product}'") | |
| print("Run without --product to see all products and their IDs.") | |
| return 1 | |
| else: | |
| print("\nNo products found for this period.") | |
| return 0 | |
| print(f"\n{'Product':<32} {'ID':<16} {'Units':>6} {'Gross':>12} {'Tax':>10} {'Fees':>10} {'Net':>12}") | |
| print("-" * 110) | |
| for p in report.products: | |
| title = p.product_title[:30] + ".." if len(p.product_title) > 32 else p.product_title | |
| print(f"{title:<32} {p.product_id:<16} {p.units_sold:>6} ${p.gross_revenue:>10,.2f} ${p.tax:>8,.2f} ${p.allocated_fees:>8,.2f} ${p.net_revenue:>10,.2f}") | |
| print("-" * 110) | |
| print(f"{'TOTAL':<32} {'':<16} {report.total_units:>6} ${report.total_gross:>10,.2f} ${report.total_tax:>8,.2f} ${report.total_fees:>8,.2f} ${report.total_net:>10,.2f}") | |
| print(f"\nFee breakdown:") | |
| print(f" Shopify Payments: ${report.shopify_fees:.2f}") | |
| print(f" PayPal: ${report.paypal_fees:.2f}") | |
| print(f"\nNote: Fees allocated by gateway - Shopify fees to Shopify orders, PayPal fees to PayPal orders") | |
| if args.csv: | |
| report.to_csv(args.csv) | |
| print(f"\nExported to {args.csv}") | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment