Skip to content

Instantly share code, notes, and snippets.

@tarassh
Created October 18, 2025 17:47
Show Gist options
  • Select an option

  • Save tarassh/359d772899215209276789686fb75d3a to your computer and use it in GitHub Desktop.

Select an option

Save tarassh/359d772899215209276789686fb75d3a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Sum total incoming SOL for a Solana address by scanning full on-chain history
via a Solana JSON-RPC endpoint (using getSignaturesForAddress + getTransaction).
Also identifies and displays the top 10 contributor addresses that sent the most SOL.
Usage:
python sol_incoming_sum.py --address GrUy83AAsibyrcUtpAVA8VgpnQSgyCAb1d8Je8MXNGLJ \
--rpc https://api.mainnet-beta.solana.com \
--max-signatures 0
Notes:
- We compute incoming as the positive lamport delta of the target address per transaction:
incoming += max(0, post_balance - pre_balance)
This captures native SOL transfers, rent rebates, validator rewards, etc.
- Contributors are identified by analyzing balance decreases in the same transactions
where the target address gained SOL. This is a heuristic approach and may not be
100% accurate for complex multi-party transactions.
- Public RPCs may have rate limits and incomplete history. For reliable full-history,
use a full-history provider (e.g., your own RPC node or a data provider).
- Set --max-signatures to 0 (default) to scan all available; otherwise limit.
"""
import argparse
import time
import sys
import requests
from collections import defaultdict
JSON_HEADERS = {"Content-Type": "application/json"}
def rpc_call(rpc_url, payload, timeout=45):
r = requests.post(rpc_url, json=payload, headers=JSON_HEADERS, timeout=timeout)
r.raise_for_status()
return r.json()
def fetch_signatures(rpc_url, address, before=None, limit=1000):
params = [address, {"limit": limit}]
if before:
params[1]["before"] = before
payload = {"jsonrpc": "2.0", "id": 1, "method": "getSignaturesForAddress", "params": params}
resp = rpc_call(rpc_url, payload)
if "error" in resp:
raise RuntimeError(f"RPC error getSignaturesForAddress: {resp['error']}")
return resp.get("result", [])
def batch_get_transactions(rpc_url, signatures, commitment="confirmed", encoding="jsonParsed"):
# JSON-RPC batch request (up to ~50 per batch is typical; we will use 20)
batch = []
for i, sig in enumerate(signatures):
batch.append({
"jsonrpc": "2.0",
"id": i + 1,
"method": "getTransaction",
"params": [sig, {"maxSupportedTransactionVersion": 0, "commitment": commitment, "encoding": encoding}]
})
r = requests.post(rpc_url, json=batch, headers=JSON_HEADERS, timeout=90)
r.raise_for_status()
out = r.json()
# Some servers return list, some may return dict if error; normalize
if isinstance(out, dict):
out = [out]
# Sort back by id to align with signatures order
out.sort(key=lambda x: x.get("id", 0))
return out
def extract_pubkeys(account_keys):
pubkeys = []
for k in account_keys:
if isinstance(k, dict):
# jsonParsed returns dicts like {"pubkey": "...", "signer": bool, "writable": bool}
pubkeys.append(k.get("pubkey"))
else:
pubkeys.append(k)
return pubkeys
def sum_incoming_sol(rpc_url, address, max_signatures=0, sleep_sec=0.2, batch_size=20):
total_in_lamports = 0
total_txs = 0
before = None
scanned_signatures = 0
contributors = defaultdict(int) # Track contributions by address
while True:
to_fetch = 1000 if max_signatures == 0 else min(1000, max_signatures - scanned_signatures)
if to_fetch <= 0:
break
sigs = fetch_signatures(rpc_url, address, before=before, limit=to_fetch)
if not sigs:
break
signatures = [s["signature"] for s in sigs]
# paginate
before = signatures[-1]
scanned_signatures += len(signatures)
# Batch fetch transactions
for i in range(0, len(signatures), batch_size):
batch = signatures[i:i+batch_size]
try:
results = batch_get_transactions(rpc_url, batch)
except (requests.RequestException, RuntimeError) as e:
print(f"Warning: batch getTransaction failed ({e}); retrying once after sleep...", file=sys.stderr)
time.sleep(1.0)
results = batch_get_transactions(rpc_url, batch)
for res in results:
if "error" in res:
continue
inner = res.get("result")
if not inner:
continue
meta = inner.get("meta")
tx = inner.get("transaction")
if not meta or not tx:
continue
# Skip if account keys missing
message = tx.get("message", {})
account_keys = message.get("accountKeys", [])
pubkeys = extract_pubkeys(account_keys)
try:
idx = pubkeys.index(address)
except ValueError:
continue # address not part of account list (rare for getSignaturesForAddress, but be safe)
pre = meta.get("preBalances", [])
post = meta.get("postBalances", [])
if idx >= len(pre) or idx >= len(post):
continue
delta = post[idx] - pre[idx] # lamports
if delta > 0:
total_in_lamports += delta
# Find contributors: look for addresses that lost balance in this transaction
for i, (pre_bal, post_bal) in enumerate(zip(pre, post)):
if i != idx and pre_bal > post_bal: # Address lost balance (potential sender)
loss = pre_bal - post_bal
if i < len(pubkeys) and loss > 0:
contributor_addr = pubkeys[i]
# Only count the portion that went to our target address
# This is a simplified heuristic - in complex transactions this might not be 100% accurate
contributors[contributor_addr] += min(loss, delta)
total_txs += 1
time.sleep(sleep_sec)
return total_in_lamports, total_txs, scanned_signatures, contributors
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--address", required=True, help="Solana address (base58)")
ap.add_argument("--rpc", default="https://api.mainnet-beta.solana.com", help="Solana JSON-RPC URL")
ap.add_argument("--max-signatures", type=int, default=0, help="Maximum signatures to scan (0 = all available)")
ap.add_argument("--sleep-sec", type=float, default=0.2, help="Sleep between batches to avoid rate limits")
ap.add_argument("--batch-size", type=int, default=20, help="getTransaction batch size")
args = ap.parse_args()
addr = args.address.strip()
rpc_url = args.rpc.strip()
lamports, txs_seen, sigs_scanned, contributors = sum_incoming_sol(
rpc_url, addr, max_signatures=args.max_signatures, sleep_sec=args.sleep_sec, batch_size=args.batch_size
)
sol = lamports / 1_000_000_000
print(f"Address: {addr}")
print(f"RPC: {rpc_url}")
print(f"Signatures scanned: {sigs_scanned}")
print(f"Transactions parsed: {txs_seen}")
print(f"Total incoming SOL: {sol:.9f} SOL")
# Display top 10 contributors
if contributors:
print("\nTop 10 Contributors:")
print("-" * 80)
print(f"{'Rank':<4} {'Address':<44} {'Contributed SOL':<15} {'Percentage':<10}")
print("-" * 80)
# Sort contributors by contribution amount (descending)
sorted_contributors = sorted(contributors.items(), key=lambda x: x[1], reverse=True)[:10]
for rank, (contributor_addr, lamports_contributed) in enumerate(sorted_contributors, 1):
sol_contributed = lamports_contributed / 1_000_000_000
percentage = (lamports_contributed / lamports) * 100 if lamports > 0 else 0
print(f"{rank:<4} {contributor_addr:<44} {sol_contributed:<15.9f} {percentage:<10.2f}%")
else:
print("\nNo contributors identified (this may happen if all incoming SOL was from system operations like rewards/rebates)")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment