Skip to content

Instantly share code, notes, and snippets.

@mvdbeek
Created February 13, 2026 19:40
Show Gist options
  • Select an option

  • Save mvdbeek/1a7c0764aed1a4ed5a870b3bf171c91f to your computer and use it in GitHub Desktop.

Select an option

Save mvdbeek/1a7c0764aed1a4ed5a870b3bf171c91f to your computer and use it in GitHub Desktop.
ENA/Galaxy stress test: fetch random small accessions, upload via ascp, run cat1
#!/usr/bin/env python3
"""
Fetch 100 random small (<500MB) SRR/ERR accessions from ENA and write their FTP links to a file.
"""
import csv
import io
import random
import sys
import urllib.request
import urllib.parse
ENA_SEARCH_URL = "https://www.ebi.ac.uk/ena/portal/api/search"
FTP_OUTPUT_FILE = "ftp_links.txt"
ASCP_OUTPUT_FILE = "ascp_links.txt"
TARGET_COUNT = 100
MAX_BYTES = 500_000_000 # 500 MB
MIN_BYTES = 1_000_000 # 1 MB (skip tiny files)
def query_ena(base_lo, base_hi, limit=1000):
"""Query ENA for read runs within a base_count range, return rows with FTP links under MAX_BYTES."""
params = {
"result": "read_run",
"query": f"base_count>={base_lo} AND base_count<={base_hi}",
"fields": "run_accession,fastq_ftp,fastq_bytes",
"format": "tsv",
"limit": limit,
}
url = f"{ENA_SEARCH_URL}?{urllib.parse.urlencode(params)}"
print(f"Querying ENA (base_count {base_lo/1e6:.0f}M-{base_hi/1e6:.0f}M, limit={limit})...")
req = urllib.request.Request(url)
req.add_header("Accept", "text/plain")
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = resp.read().decode("utf-8")
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
print(f" HTTP {e.code}: {body[:500]}", file=sys.stderr)
raise
reader = csv.DictReader(io.StringIO(data), delimiter="\t")
rows = []
for row in reader:
if not row.get("fastq_ftp"):
continue
try:
fbytes = int(row.get("fastq_bytes", 0))
except (ValueError, TypeError):
continue
acc = row.get("run_accession", "")
if not (acc.startswith("SRR") or acc.startswith("ERR")):
continue
if MIN_BYTES <= fbytes <= MAX_BYTES:
rows.append(row)
print(f" Got {len(rows)} rows with FTP links within size range")
return rows
def main():
# Query multiple base_count ranges to get a diverse pool
# ~1 base ≈ 0.5 bytes compressed, so 500MB ≈ ~1 billion bases
ranges = [
(1_000_000, 50_000_000), # 1M - 50M bases
(50_000_000, 200_000_000), # 50M - 200M bases
(200_000_000, 500_000_000), # 200M - 500M bases
(500_000_000, 1_000_000_000), # 500M - 1B bases
]
pool = []
for lo, hi in ranges:
pool += query_ena(lo, hi, limit=2000)
print(f"\nTotal pool: {len(pool)} accessions")
if len(pool) < TARGET_COUNT:
print(f"Warning: only found {len(pool)} accessions (wanted {TARGET_COUNT})")
selected = random.sample(pool, min(TARGET_COUNT, len(pool)))
ftp_lines = []
for row in selected:
# fastq_ftp can contain multiple files separated by ';'
for ftp_path in row["fastq_ftp"].split(";"):
ftp_path = ftp_path.strip()
if ftp_path:
ftp_lines.append(f"ftp://{ftp_path}")
with open(FTP_OUTPUT_FILE, "w") as f:
for link in ftp_lines:
f.write(link + "\n")
# Generate ascp links: ftp://ftp.sra.ebi.ac.uk/... -> ascp://fasp.sra.ebi.ac.uk/...
ascp_lines = [link.replace("ftp://ftp.", "ascp://fasp.") for link in ftp_lines]
with open(ASCP_OUTPUT_FILE, "w") as f:
for link in ascp_lines:
f.write(link + "\n")
# Print summary
accession_types = {"SRR": 0, "ERR": 0, "DRR": 0, "other": 0}
total_bytes = 0
for row in selected:
acc = row["run_accession"]
total_bytes += int(row["fastq_bytes"])
matched = False
for prefix in ("SRR", "ERR", "DRR"):
if acc.startswith(prefix):
accession_types[prefix] += 1
matched = True
break
if not matched:
accession_types["other"] += 1
print(f"\nSelected {len(selected)} accessions:")
for k, v in accession_types.items():
if v > 0:
print(f" {k}: {v}")
print(f" Total size: {total_bytes / 1e9:.1f} GB")
print(f" Total FTP links: {len(ftp_lines)}")
print(f" Written to: {FTP_OUTPUT_FILE}")
print(f" Written to: {ASCP_OUTPUT_FILE}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Stress-test loop: every hour, generate fresh ascp links from ENA,
upload them as a collection to Galaxy, and run cat1 on the collection.
"""
import argparse
import csv
import io
import json
import random
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime
# --- ENA config ---
ENA_SEARCH_URL = "https://www.ebi.ac.uk/ena/portal/api/search"
TARGET_COUNT = 100
MAX_BYTES = 500_000_000
MIN_BYTES = 1_000_000
BASE_COUNT_RANGES = [
(1_000_000, 50_000_000),
(50_000_000, 200_000_000),
(200_000_000, 500_000_000),
(500_000_000, 1_000_000_000),
]
INTERVAL_SECONDS = 3600 # 1 hour
# ---- ENA helpers (from fetch_small_accessions.py) ----
def query_ena(base_lo, base_hi, limit=2000):
params = {
"result": "read_run",
"query": f"base_count>={base_lo} AND base_count<={base_hi}",
"fields": "run_accession,fastq_ftp,fastq_bytes",
"format": "tsv",
"limit": limit,
}
url = f"{ENA_SEARCH_URL}?{urllib.parse.urlencode(params)}"
print(f" ENA query (base_count {base_lo/1e6:.0f}M-{base_hi/1e6:.0f}M, limit={limit})...")
req = urllib.request.Request(url)
req.add_header("Accept", "text/plain")
with urllib.request.urlopen(req, timeout=120) as resp:
data = resp.read().decode("utf-8")
reader = csv.DictReader(io.StringIO(data), delimiter="\t")
rows = []
for row in reader:
if not row.get("fastq_ftp"):
continue
try:
fbytes = int(row.get("fastq_bytes", 0))
except (ValueError, TypeError):
continue
acc = row.get("run_accession", "")
if not (acc.startswith("SRR") or acc.startswith("ERR")):
continue
if MIN_BYTES <= fbytes <= MAX_BYTES:
rows.append(row)
print(f" {len(rows)} rows")
return rows
def generate_ascp_links():
"""Query ENA and return a list of ascp URLs."""
pool = []
for lo, hi in BASE_COUNT_RANGES:
pool += query_ena(lo, hi)
if not pool:
return []
selected = random.sample(pool, min(TARGET_COUNT, len(pool)))
ascp_urls = []
total_bytes = 0
for row in selected:
total_bytes += int(row["fastq_bytes"])
for ftp_path in row["fastq_ftp"].split(";"):
ftp_path = ftp_path.strip()
if ftp_path:
ascp_urls.append(f"ftp://{ftp_path}".replace("ftp://ftp.", "ascp://fasp."))
print(f" Selected {len(selected)} accessions, {len(ascp_urls)} links, {total_bytes/1e9:.1f} GB total")
return ascp_urls
# ---- Galaxy helpers (from submit_to_galaxy.py) ----
def galaxy_post(url, payload, auth_headers):
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json, text/plain, */*")
for k, v in auth_headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
print(f" Galaxy HTTP {e.code} from {url}:", file=sys.stderr)
print(f" {body[:2000]}", file=sys.stderr)
raise
def submit_and_run(base_url, history_id, auth_headers, ascp_urls):
"""Upload ascp URLs as a collection, then run cat1 on it."""
# Step 1: fetch/upload
elements = []
for url in ascp_urls:
# Extract run ID from URL, e.g. ERR021360 from .../ERR021360.fastq.gz
name = url.rsplit("/", 1)[-1].split(".")[0]
elements.append({
"dbkey": "?",
"ext": "fastqsanger.gz",
"name": name,
"space_to_tab": False,
"to_posix_lines": False,
"auto_decompress": False,
"deferred": True,
"src": "url",
"url": url,
})
fetch_payload = {
"history_id": history_id,
"targets": [{
"auto_decompress": False,
"destination": {"type": "hdca"},
"collection_type": "list",
"name": "ascp collection",
"elements": elements,
}],
"auto_decompress": True,
}
print(f" Uploading {len(ascp_urls)} URLs to Galaxy...")
fetch_result = galaxy_post(f"{base_url}/api/tools/fetch", fetch_payload, auth_headers)
hdca_id = fetch_result["output_collections"][0]["id"]
print(f" Collection created (hdca_id: {hdca_id})")
# Step 2: run cat1
cat1_payload = {
"history_id": history_id,
"tool_id": "cat1",
"tool_version": "1.0.0",
"__tags": [],
"inputs": {
"input1": {
"batch": True,
"product": False,
"values": [{"id": hdca_id, "src": "hdca", "map_over_type": None}],
}
},
}
print(f" Running cat1 on collection (retrying until populated)...")
tools_url = f"{base_url}/api/tools"
while True:
try:
cat1_result = galaxy_post(tools_url, cat1_payload, auth_headers)
break
except urllib.error.HTTPError as e:
if e.code == 400:
print(f" Collection not yet populated, retrying in 5s...")
time.sleep(5)
else:
raise
jobs = cat1_result.get("jobs", [])
print(f" Submitted {len(jobs)} job(s)")
for job in jobs:
print(f" Job {job['id']}: state={job['state']}")
return hdca_id
# ---- Main loop ----
def main():
parser = argparse.ArgumentParser(
description="Every hour: generate ascp links, upload to Galaxy, run cat1"
)
parser.add_argument(
"--galaxy-url",
default="https://test.galaxyproject.org",
)
parser.add_argument("--history-id", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--api-key")
group.add_argument("--galaxy-session")
parser.add_argument(
"--interval",
type=int,
default=INTERVAL_SECONDS,
help=f"Seconds between iterations (default: {INTERVAL_SECONDS})",
)
args = parser.parse_args()
base_url = args.galaxy_url.rstrip("/")
if args.api_key:
auth_headers = {"x-api-key": args.api_key}
else:
auth_headers = {"Cookie": f"galaxysession={args.galaxy_session}"}
iteration = 0
while True:
iteration += 1
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n{'='*60}")
print(f"Iteration {iteration} at {now}")
print(f"{'='*60}")
try:
print("Step 1: Generating ascp links from ENA...")
ascp_urls = generate_ascp_links()
if not ascp_urls:
print(" No URLs generated, skipping this iteration")
else:
print("Step 2: Submitting to Galaxy and running cat1...")
submit_and_run(base_url, args.history_id, auth_headers, ascp_urls)
print("Done.")
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
print(f"\nSleeping {args.interval}s until next iteration...")
time.sleep(args.interval)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Read ascp_links.txt and POST them to a Galaxy instance's fetch API,
mirroring the request structure in curl.sh.
"""
import argparse
import json
import sys
import urllib.request
ASCP_FILE = "ascp_links.txt"
def build_payload(history_id, urls):
"""Build the Galaxy fetch tool JSON payload."""
elements = []
for url in urls:
name = url.rsplit("/", 1)[-1].split(".")[0]
elements.append({
"dbkey": "?",
"ext": "fastqsanger.gz",
"name": name,
"space_to_tab": False,
"to_posix_lines": False,
"auto_decompress": False,
"deferred": True,
"src": "url",
"url": url,
})
return {
"history_id": history_id,
"targets": [
{
"auto_decompress": False,
"destination": {"type": "hdca"},
"collection_type": "list",
"name": "ascp collection",
"elements": elements,
}
],
"auto_decompress": True,
}
def galaxy_request(url, payload, args):
"""Make an authenticated POST to a Galaxy API endpoint and return parsed JSON."""
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json, text/plain, */*")
if args.api_key:
req.add_header("x-api-key", args.api_key)
else:
req.add_header("Cookie", f"galaxysession={args.galaxy_session}")
try:
with urllib.request.urlopen(req, timeout=120) as resp:
body = resp.read().decode("utf-8")
result = json.loads(body)
return result
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
print(f"HTTP {e.code}:", file=sys.stderr)
try:
print(json.dumps(json.loads(body), indent=2), file=sys.stderr)
except json.JSONDecodeError:
print(body[:2000], file=sys.stderr)
sys.exit(1)
def build_cat1_payload(history_id, hdca_id):
"""Build the cat1 tool payload to run on a dataset collection."""
return {
"history_id": history_id,
"tool_id": "cat1",
"tool_version": "1.0.0",
"__tags": [],
"inputs": {
"input1": {
"batch": True,
"product": False,
"values": [
{"id": hdca_id, "src": "hdca", "map_over_type": None}
],
}
},
}
def main():
parser = argparse.ArgumentParser(
description="Submit ascp links to a Galaxy fetch API, then run cat1 on the collection"
)
parser.add_argument(
"--galaxy-url",
default="https://test.galaxyproject.org",
help="Galaxy instance URL (default: https://test.galaxyproject.org)",
)
parser.add_argument(
"--history-id",
required=True,
help="Galaxy history ID to upload into",
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--api-key",
help="Galaxy API key for authentication",
)
group.add_argument(
"--galaxy-session",
help="Galaxy session cookie value",
)
parser.add_argument(
"--ascp-file",
default=ASCP_FILE,
help=f"File containing ascp links, one per line (default: {ASCP_FILE})",
)
args = parser.parse_args()
with open(args.ascp_file) as f:
urls = [line.strip() for line in f if line.strip()]
if not urls:
print("No URLs found in", args.ascp_file, file=sys.stderr)
sys.exit(1)
print(f"Read {len(urls)} URLs from {args.ascp_file}")
base_url = args.galaxy_url.rstrip("/")
# Step 1: Fetch/upload the ascp links as a collection
fetch_url = f"{base_url}/api/tools/fetch"
fetch_payload = build_payload(args.history_id, urls)
print(f"Step 1: POSTing {len(urls)} URLs to {fetch_url} ...")
fetch_result = galaxy_request(fetch_url, fetch_payload, args)
hdca_id = fetch_result["output_collections"][0]["id"]
collection_name = fetch_result["output_collections"][0].get("name", "")
print(f" Created collection '{collection_name}' (hdca_id: {hdca_id})")
# Step 2: Run cat1 tool on the collection
tools_url = f"{base_url}/api/tools"
cat1_payload = build_cat1_payload(args.history_id, hdca_id)
print(f"Step 2: Running cat1 on collection {hdca_id} ...")
cat1_result = galaxy_request(tools_url, cat1_payload, args)
jobs = cat1_result.get("jobs", [])
print(f" Submitted {len(jobs)} job(s)")
for job in jobs:
print(f" Job {job['id']}: state={job['state']}")
print(json.dumps(cat1_result, indent=2))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment