Skip to content

Instantly share code, notes, and snippets.

@cheeseonamonkey
Last active February 5, 2026 06:33
Show Gist options
  • Select an option

  • Save cheeseonamonkey/2a7ba0ea01278bafca8c1722523cac21 to your computer and use it in GitHub Desktop.

Select an option

Save cheeseonamonkey/2a7ba0ea01278bafca8c1722523cac21 to your computer and use it in GitHub Desktop.
snoflake iran (run a Tor Snowflake proxy in Docker without any headaches; ~40% of all Snowflake traffic is estimated to be Iranian)
#!/usr/bin/env python3
"""
snowflake.py — Snowflake proxy manager + robust impact stats (Docker)
- One file
- Privacy-safe (no IPs, no geo, no guessing)
- Horizontal scaling
"""
from __future__ import annotations
import argparse, os, re, subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List, Optional
from collections import Counter
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
CONSOLE = Console()
IMAGE = "thetorproject/snowflake-proxy:latest"
BASE = "snowflake-proxy"
DEFAULT_INTERVAL = "15m"
SUMMARY_RE = re.compile(
r"In the last\s+(?P<int>[^,]+),\s+there were\s+(?P<conns>\d+).*?Traffic Relayed\s+↓\s*(?P<down>[^,]+),\s*↑\s*(?P<up>[^.]+)",
re.I,
)
SIZE_RE = re.compile(r"(?P<n>\d+(?:\.\d+)?)\s*(?P<u>B|KB|MB|GB|KiB|MiB|GiB)", re.I)
UNIT = {
"B":1,"KB":1_000,"MB":1_000_000,"GB":1_000_000_000,
"KIB":1024,"MIB":1024**2,"GIB":1024**3,
}
DURATION_RE = re.compile(r"(?P<value>\d+)(?P<unit>[hms])", re.I)
def duration_seconds(text: str) -> int:
total = 0
for value, unit in DURATION_RE.findall(text):
v = int(value)
if unit.lower() == "h":
total += v * 3600
elif unit.lower() == "m":
total += v * 60
else:
total += v
return total or 3600
def parse_timestamp(line: str) -> Optional[datetime]:
parts = line.split()
if len(parts) < 2:
return None
ts_text = f"{parts[0]} {parts[1]}"
for fmt in ("%Y/%m/%d %H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.strptime(ts_text, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def must_root():
if os.geteuid() != 0:
CONSOLE.print("[red]Run as root.[/red]")
raise SystemExit(1)
def sh(cmd): return subprocess.run(cmd, capture_output=True, text=True)
def docker(cmd): return sh(["docker"] + cmd)
def parse_bytes(s: str) -> Optional[int]:
m = SIZE_RE.search(s)
if not m: return None
return int(float(m["n"]) * UNIT[m["u"].upper()])
@dataclass
class SummaryRow:
ts: datetime
interval: int
conns: int
up: int
down: int
@property
def total(self) -> int:
return self.up + self.down
def containers(n: int) -> List[str]:
return [f"{BASE}-{i}" for i in range(n)]
def logs(name: str) -> List[str]:
cp = docker(["logs", name])
if cp.returncode != 0:
return []
text = cp.stdout
if cp.stderr:
text += ("\n" if text else "") + cp.stderr
return text.splitlines()
def parse(lines: List[str]) -> List[SummaryRow]:
out = []
for ln in lines:
m = SUMMARY_RE.search(ln)
if not m: continue
ts = parse_timestamp(ln)
if not ts:
continue
interval = duration_seconds(m["int"])
up = parse_bytes(m["up"]) or 0
dn = parse_bytes(m["down"]) or 0
out.append(SummaryRow(ts, interval, int(m["conns"]), up, dn))
return out
# ---------- commands ----------
def start(a):
for c in containers(a.replicas):
docker(["rm","-f",c])
docker([
"run","-d",
"--name",c,
"--network","host",
"--restart","unless-stopped",
"--ulimit","nofile=1048576:1048576",
"--cpus",str(a.cpus),
IMAGE,
"-summary-interval",a.interval,
])
CONSOLE.print(Panel(
f"replicas: {a.replicas}\n"
f"cpus/container: {a.cpus}\n"
f"interval: {a.interval}",
title="snowflake started"
))
def stop(a):
for c in containers(a.replicas):
docker(["rm","-f",c])
CONSOLE.print(Panel("all proxies stopped", title="snowflake"))
def stats(a):
rows: List[SummaryRow] = []
errors = Counter()
restarts = 0
seen_names = set()
def ingest(name: str) -> None:
nonlocal restarts
if name in seen_names:
return
seen_names.add(name)
log_lines = logs(name)
if not log_lines:
return
rows.extend(parse(log_lines))
for ln in log_lines:
if "Proxy starting" in ln:
restarts += 1
if "ERROR" in ln:
msg = ln.split("ERROR", 1)[1].strip()
errors[msg or "ERROR"] += 1
for c in containers(a.replicas):
ingest(c)
ingest(BASE)
if not rows:
CONSOLE.print(Panel("No summaries yet. Check docker logs?", title="snowflake"))
return
rows.sort(key=lambda r: r.ts)
now = datetime.now(timezone.utc)
start = rows[0].ts
uptime_hours = max((now - start).total_seconds()/3600, 0.01)
coverage_seconds = sum(r.interval for r in rows)
coverage_hours = coverage_seconds / 3600
total_conns = sum(r.conns for r in rows)
down_total = sum(r.down for r in rows)
up_total = sum(r.up for r in rows)
total_bytes = down_total + up_total
per_interval = [r.total for r in rows]
avg_interval_mib = (total_bytes / len(rows)) / 1024**2
avg_throughput_kib = (total_bytes / coverage_seconds) / 1024 if coverage_seconds else 0
summaries_per_hour = len(rows) / uptime_hours
sessions_per_hour = total_conns / uptime_hours
mib_per_session = (total_bytes/1024**2)/total_conns if total_conns else 0
busiest = max(rows, key=lambda r: r.total)
peak_conn = max(rows, key=lambda r: r.conns)
quietest = min(rows, key=lambda r: r.total)
throughput_row = max(rows, key=lambda r: (r.total / r.interval) if r.interval else 0)
peak_kib_s = (throughput_row.total / throughput_row.interval / 1024) if throughput_row.interval else 0
last_age_min = (now - rows[-1].ts).total_seconds() / 60
summaries_per_hour = len(rows) / uptime_hours
t = Table(title="impact (since uptime)")
t.add_column("metric")
t.add_column("value", justify="right")
t.add_row("uptime", f"{uptime_hours:.2f} h")
t.add_row("reported coverage", f"{coverage_hours:.2f} h ({len(rows)} summaries)")
t.add_row("sessions", str(total_conns))
t.add_row("sessions/hour", f"{sessions_per_hour:.2f}")
t.add_row("MiB relayed", f"{total_bytes/1024**2:.2f} (↓{down_total/1024**2:.2f} / ↑{up_total/1024**2:.2f})")
t.add_row("downstream share", f"{(down_total/total_bytes*100) if total_bytes else 0:.1f}%")
t.add_row("MiB/hour", f"{(total_bytes/1024**2)/uptime_hours:.2f}")
t.add_row("MiB/session", f"{mib_per_session:.2f}")
t.add_row("avg interval", f"{avg_interval_mib:.2f} MiB / {avg_throughput_kib:.2f} KiB/s")
t.add_row("peak hour", f"{busiest.ts.strftime('%Y-%m-%d %H:%M')} ({busiest.total/1024**2:.2f} MiB)")
t.add_row("peak throughput", f"{peak_kib_s:.2f} KiB/s @ {throughput_row.ts.strftime('%Y-%m-%d %H:%M')}")
t.add_row("max sessions interval", f"{peak_conn.conns} @ {peak_conn.ts.strftime('%Y-%m-%d %H:%M')}")
t.add_row("quietest hour", f"{quietest.total/1024**2:.2f} MiB @ {quietest.ts.strftime('%Y-%m-%d %H:%M')}")
t.add_row("interval min", f"{min(per_interval)/1024**2:.2f} MiB")
t.add_row("interval max", f"{max(per_interval)/1024**2:.2f} MiB")
t.add_row("summaries/hour", f"{summaries_per_hour:.2f}")
cpu_hours = uptime_hours * a.replicas * a.cpus
if cpu_hours:
t.add_row("MiB / CPU-hour", f"{(total_bytes/1024**2)/cpu_hours:.2f}")
t.add_row("last summary age", f"{last_age_min:.1f} min")
CONSOLE.print(t)
leaderboard = Table(title="top intervals (by MiB)", show_footer=False)
leaderboard.add_column("#", justify="right")
leaderboard.add_column("time (UTC)")
leaderboard.add_column("MiB", justify="right")
leaderboard.add_column("sessions", justify="right")
leaderboard.add_column("MiB/s", justify="right")
top_intervals = sorted(rows, key=lambda r: r.total, reverse=True)[:3]
for idx, row in enumerate(top_intervals, 1):
kib_per_s = (row.total / row.interval / 1024) if row.interval else 0
leaderboard.add_row(
str(idx),
row.ts.strftime("%Y-%m-%d %H:%M"),
f"{row.total/1024**2:.2f}",
str(row.conns),
f"{kib_per_s:.2f}",
)
if top_intervals:
CONSOLE.print(leaderboard)
err_message = "none" if not errors else "\n".join(f"{count}× {msg}" for msg, count in errors.most_common(4))
CONSOLE.print(Panel(
f"Proxy restarts: {restarts}\nErrors:\n{err_message}",
title="diagnostics",
))
hints = []
if total_conns/uptime_hours < a.replicas:
hints.append("Low utilization: consider fewer replicas or longer runtime.")
if last_age_min > 30:
hints.append("Summaries stale: proxy may be idle or blocked.")
if restarts > 1:
hints.append("Multiple restarts observed; inspect host-level stability.")
if a.replicas > (os.cpu_count() or 1) * 2:
hints.append("Replica count likely exceeds CPU capacity.")
CONSOLE.print(Panel(
"\n".join(hints or ["Proxy appears healthy and utilized."]),
title="effectiveness hints"
))
def build():
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd")
ps = sub.add_parser("start")
ps.add_argument("--replicas", type=int, default=2)
ps.add_argument("--cpus", type=float, default=1.0)
ps.add_argument("--interval", default=DEFAULT_INTERVAL)
ps.set_defaults(func=start)
pst = sub.add_parser("stop")
pst.add_argument("--replicas", type=int, default=2)
pst.set_defaults(func=stop)
pstats = sub.add_parser("stats")
pstats.add_argument("--replicas", type=int, default=2)
pstats.add_argument("--cpus", type=float, default=1.0)
pstats.set_defaults(func=stats)
return p
def main():
must_root()
a = build().parse_args()
if not a.cmd:
a.cmd="stats"
a.func=stats
a.func(a)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment