Created
February 13, 2026 19:40
-
-
Save mvdbeek/1a7c0764aed1a4ed5a870b3bf171c91f to your computer and use it in GitHub Desktop.
ENA/Galaxy stress test: fetch random small accessions, upload via ascp, run cat1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Fetch 100 random small (<500MB) SRR/ERR accessions from ENA and write their FTP links to a file. | |
| """ | |
| import csv | |
| import io | |
| import random | |
| import sys | |
| import urllib.request | |
| import urllib.parse | |
| ENA_SEARCH_URL = "https://www.ebi.ac.uk/ena/portal/api/search" | |
| FTP_OUTPUT_FILE = "ftp_links.txt" | |
| ASCP_OUTPUT_FILE = "ascp_links.txt" | |
| TARGET_COUNT = 100 | |
| MAX_BYTES = 500_000_000 # 500 MB | |
| MIN_BYTES = 1_000_000 # 1 MB (skip tiny files) | |
| def query_ena(base_lo, base_hi, limit=1000): | |
| """Query ENA for read runs within a base_count range, return rows with FTP links under MAX_BYTES.""" | |
| params = { | |
| "result": "read_run", | |
| "query": f"base_count>={base_lo} AND base_count<={base_hi}", | |
| "fields": "run_accession,fastq_ftp,fastq_bytes", | |
| "format": "tsv", | |
| "limit": limit, | |
| } | |
| url = f"{ENA_SEARCH_URL}?{urllib.parse.urlencode(params)}" | |
| print(f"Querying ENA (base_count {base_lo/1e6:.0f}M-{base_hi/1e6:.0f}M, limit={limit})...") | |
| req = urllib.request.Request(url) | |
| req.add_header("Accept", "text/plain") | |
| try: | |
| with urllib.request.urlopen(req, timeout=120) as resp: | |
| data = resp.read().decode("utf-8") | |
| except urllib.error.HTTPError as e: | |
| body = e.read().decode("utf-8", errors="replace") | |
| print(f" HTTP {e.code}: {body[:500]}", file=sys.stderr) | |
| raise | |
| reader = csv.DictReader(io.StringIO(data), delimiter="\t") | |
| rows = [] | |
| for row in reader: | |
| if not row.get("fastq_ftp"): | |
| continue | |
| try: | |
| fbytes = int(row.get("fastq_bytes", 0)) | |
| except (ValueError, TypeError): | |
| continue | |
| acc = row.get("run_accession", "") | |
| if not (acc.startswith("SRR") or acc.startswith("ERR")): | |
| continue | |
| if MIN_BYTES <= fbytes <= MAX_BYTES: | |
| rows.append(row) | |
| print(f" Got {len(rows)} rows with FTP links within size range") | |
| return rows | |
| def main(): | |
| # Query multiple base_count ranges to get a diverse pool | |
| # ~1 base ≈ 0.5 bytes compressed, so 500MB ≈ ~1 billion bases | |
| ranges = [ | |
| (1_000_000, 50_000_000), # 1M - 50M bases | |
| (50_000_000, 200_000_000), # 50M - 200M bases | |
| (200_000_000, 500_000_000), # 200M - 500M bases | |
| (500_000_000, 1_000_000_000), # 500M - 1B bases | |
| ] | |
| pool = [] | |
| for lo, hi in ranges: | |
| pool += query_ena(lo, hi, limit=2000) | |
| print(f"\nTotal pool: {len(pool)} accessions") | |
| if len(pool) < TARGET_COUNT: | |
| print(f"Warning: only found {len(pool)} accessions (wanted {TARGET_COUNT})") | |
| selected = random.sample(pool, min(TARGET_COUNT, len(pool))) | |
| ftp_lines = [] | |
| for row in selected: | |
| # fastq_ftp can contain multiple files separated by ';' | |
| for ftp_path in row["fastq_ftp"].split(";"): | |
| ftp_path = ftp_path.strip() | |
| if ftp_path: | |
| ftp_lines.append(f"ftp://{ftp_path}") | |
| with open(FTP_OUTPUT_FILE, "w") as f: | |
| for link in ftp_lines: | |
| f.write(link + "\n") | |
| # Generate ascp links: ftp://ftp.sra.ebi.ac.uk/... -> ascp://fasp.sra.ebi.ac.uk/... | |
| ascp_lines = [link.replace("ftp://ftp.", "ascp://fasp.") for link in ftp_lines] | |
| with open(ASCP_OUTPUT_FILE, "w") as f: | |
| for link in ascp_lines: | |
| f.write(link + "\n") | |
| # Print summary | |
| accession_types = {"SRR": 0, "ERR": 0, "DRR": 0, "other": 0} | |
| total_bytes = 0 | |
| for row in selected: | |
| acc = row["run_accession"] | |
| total_bytes += int(row["fastq_bytes"]) | |
| matched = False | |
| for prefix in ("SRR", "ERR", "DRR"): | |
| if acc.startswith(prefix): | |
| accession_types[prefix] += 1 | |
| matched = True | |
| break | |
| if not matched: | |
| accession_types["other"] += 1 | |
| print(f"\nSelected {len(selected)} accessions:") | |
| for k, v in accession_types.items(): | |
| if v > 0: | |
| print(f" {k}: {v}") | |
| print(f" Total size: {total_bytes / 1e9:.1f} GB") | |
| print(f" Total FTP links: {len(ftp_lines)}") | |
| print(f" Written to: {FTP_OUTPUT_FILE}") | |
| print(f" Written to: {ASCP_OUTPUT_FILE}") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Stress-test loop: every hour, generate fresh ascp links from ENA, | |
| upload them as a collection to Galaxy, and run cat1 on the collection. | |
| """ | |
| import argparse | |
| import csv | |
| import io | |
| import json | |
| import random | |
| import sys | |
| import time | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| from datetime import datetime | |
| # --- ENA config --- | |
| ENA_SEARCH_URL = "https://www.ebi.ac.uk/ena/portal/api/search" | |
| TARGET_COUNT = 100 | |
| MAX_BYTES = 500_000_000 | |
| MIN_BYTES = 1_000_000 | |
| BASE_COUNT_RANGES = [ | |
| (1_000_000, 50_000_000), | |
| (50_000_000, 200_000_000), | |
| (200_000_000, 500_000_000), | |
| (500_000_000, 1_000_000_000), | |
| ] | |
| INTERVAL_SECONDS = 3600 # 1 hour | |
| # ---- ENA helpers (from fetch_small_accessions.py) ---- | |
| def query_ena(base_lo, base_hi, limit=2000): | |
| params = { | |
| "result": "read_run", | |
| "query": f"base_count>={base_lo} AND base_count<={base_hi}", | |
| "fields": "run_accession,fastq_ftp,fastq_bytes", | |
| "format": "tsv", | |
| "limit": limit, | |
| } | |
| url = f"{ENA_SEARCH_URL}?{urllib.parse.urlencode(params)}" | |
| print(f" ENA query (base_count {base_lo/1e6:.0f}M-{base_hi/1e6:.0f}M, limit={limit})...") | |
| req = urllib.request.Request(url) | |
| req.add_header("Accept", "text/plain") | |
| with urllib.request.urlopen(req, timeout=120) as resp: | |
| data = resp.read().decode("utf-8") | |
| reader = csv.DictReader(io.StringIO(data), delimiter="\t") | |
| rows = [] | |
| for row in reader: | |
| if not row.get("fastq_ftp"): | |
| continue | |
| try: | |
| fbytes = int(row.get("fastq_bytes", 0)) | |
| except (ValueError, TypeError): | |
| continue | |
| acc = row.get("run_accession", "") | |
| if not (acc.startswith("SRR") or acc.startswith("ERR")): | |
| continue | |
| if MIN_BYTES <= fbytes <= MAX_BYTES: | |
| rows.append(row) | |
| print(f" {len(rows)} rows") | |
| return rows | |
| def generate_ascp_links(): | |
| """Query ENA and return a list of ascp URLs.""" | |
| pool = [] | |
| for lo, hi in BASE_COUNT_RANGES: | |
| pool += query_ena(lo, hi) | |
| if not pool: | |
| return [] | |
| selected = random.sample(pool, min(TARGET_COUNT, len(pool))) | |
| ascp_urls = [] | |
| total_bytes = 0 | |
| for row in selected: | |
| total_bytes += int(row["fastq_bytes"]) | |
| for ftp_path in row["fastq_ftp"].split(";"): | |
| ftp_path = ftp_path.strip() | |
| if ftp_path: | |
| ascp_urls.append(f"ftp://{ftp_path}".replace("ftp://ftp.", "ascp://fasp.")) | |
| print(f" Selected {len(selected)} accessions, {len(ascp_urls)} links, {total_bytes/1e9:.1f} GB total") | |
| return ascp_urls | |
| # ---- Galaxy helpers (from submit_to_galaxy.py) ---- | |
| def galaxy_post(url, payload, auth_headers): | |
| data = json.dumps(payload).encode("utf-8") | |
| req = urllib.request.Request(url, data=data, method="POST") | |
| req.add_header("Content-Type", "application/json") | |
| req.add_header("Accept", "application/json, text/plain, */*") | |
| for k, v in auth_headers.items(): | |
| req.add_header(k, v) | |
| try: | |
| with urllib.request.urlopen(req, timeout=120) as resp: | |
| return json.loads(resp.read().decode("utf-8")) | |
| except urllib.error.HTTPError as e: | |
| body = e.read().decode("utf-8", errors="replace") | |
| print(f" Galaxy HTTP {e.code} from {url}:", file=sys.stderr) | |
| print(f" {body[:2000]}", file=sys.stderr) | |
| raise | |
| def submit_and_run(base_url, history_id, auth_headers, ascp_urls): | |
| """Upload ascp URLs as a collection, then run cat1 on it.""" | |
| # Step 1: fetch/upload | |
| elements = [] | |
| for url in ascp_urls: | |
| # Extract run ID from URL, e.g. ERR021360 from .../ERR021360.fastq.gz | |
| name = url.rsplit("/", 1)[-1].split(".")[0] | |
| elements.append({ | |
| "dbkey": "?", | |
| "ext": "fastqsanger.gz", | |
| "name": name, | |
| "space_to_tab": False, | |
| "to_posix_lines": False, | |
| "auto_decompress": False, | |
| "deferred": True, | |
| "src": "url", | |
| "url": url, | |
| }) | |
| fetch_payload = { | |
| "history_id": history_id, | |
| "targets": [{ | |
| "auto_decompress": False, | |
| "destination": {"type": "hdca"}, | |
| "collection_type": "list", | |
| "name": "ascp collection", | |
| "elements": elements, | |
| }], | |
| "auto_decompress": True, | |
| } | |
| print(f" Uploading {len(ascp_urls)} URLs to Galaxy...") | |
| fetch_result = galaxy_post(f"{base_url}/api/tools/fetch", fetch_payload, auth_headers) | |
| hdca_id = fetch_result["output_collections"][0]["id"] | |
| print(f" Collection created (hdca_id: {hdca_id})") | |
| # Step 2: run cat1 | |
| cat1_payload = { | |
| "history_id": history_id, | |
| "tool_id": "cat1", | |
| "tool_version": "1.0.0", | |
| "__tags": [], | |
| "inputs": { | |
| "input1": { | |
| "batch": True, | |
| "product": False, | |
| "values": [{"id": hdca_id, "src": "hdca", "map_over_type": None}], | |
| } | |
| }, | |
| } | |
| print(f" Running cat1 on collection (retrying until populated)...") | |
| tools_url = f"{base_url}/api/tools" | |
| while True: | |
| try: | |
| cat1_result = galaxy_post(tools_url, cat1_payload, auth_headers) | |
| break | |
| except urllib.error.HTTPError as e: | |
| if e.code == 400: | |
| print(f" Collection not yet populated, retrying in 5s...") | |
| time.sleep(5) | |
| else: | |
| raise | |
| jobs = cat1_result.get("jobs", []) | |
| print(f" Submitted {len(jobs)} job(s)") | |
| for job in jobs: | |
| print(f" Job {job['id']}: state={job['state']}") | |
| return hdca_id | |
| # ---- Main loop ---- | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Every hour: generate ascp links, upload to Galaxy, run cat1" | |
| ) | |
| parser.add_argument( | |
| "--galaxy-url", | |
| default="https://test.galaxyproject.org", | |
| ) | |
| parser.add_argument("--history-id", required=True) | |
| group = parser.add_mutually_exclusive_group(required=True) | |
| group.add_argument("--api-key") | |
| group.add_argument("--galaxy-session") | |
| parser.add_argument( | |
| "--interval", | |
| type=int, | |
| default=INTERVAL_SECONDS, | |
| help=f"Seconds between iterations (default: {INTERVAL_SECONDS})", | |
| ) | |
| args = parser.parse_args() | |
| base_url = args.galaxy_url.rstrip("/") | |
| if args.api_key: | |
| auth_headers = {"x-api-key": args.api_key} | |
| else: | |
| auth_headers = {"Cookie": f"galaxysession={args.galaxy_session}"} | |
| iteration = 0 | |
| while True: | |
| iteration += 1 | |
| now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| print(f"\n{'='*60}") | |
| print(f"Iteration {iteration} at {now}") | |
| print(f"{'='*60}") | |
| try: | |
| print("Step 1: Generating ascp links from ENA...") | |
| ascp_urls = generate_ascp_links() | |
| if not ascp_urls: | |
| print(" No URLs generated, skipping this iteration") | |
| else: | |
| print("Step 2: Submitting to Galaxy and running cat1...") | |
| submit_and_run(base_url, args.history_id, auth_headers, ascp_urls) | |
| print("Done.") | |
| except Exception as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| print(f"\nSleeping {args.interval}s until next iteration...") | |
| time.sleep(args.interval) | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Read ascp_links.txt and POST them to a Galaxy instance's fetch API, | |
| mirroring the request structure in curl.sh. | |
| """ | |
| import argparse | |
| import json | |
| import sys | |
| import urllib.request | |
| ASCP_FILE = "ascp_links.txt" | |
| def build_payload(history_id, urls): | |
| """Build the Galaxy fetch tool JSON payload.""" | |
| elements = [] | |
| for url in urls: | |
| name = url.rsplit("/", 1)[-1].split(".")[0] | |
| elements.append({ | |
| "dbkey": "?", | |
| "ext": "fastqsanger.gz", | |
| "name": name, | |
| "space_to_tab": False, | |
| "to_posix_lines": False, | |
| "auto_decompress": False, | |
| "deferred": True, | |
| "src": "url", | |
| "url": url, | |
| }) | |
| return { | |
| "history_id": history_id, | |
| "targets": [ | |
| { | |
| "auto_decompress": False, | |
| "destination": {"type": "hdca"}, | |
| "collection_type": "list", | |
| "name": "ascp collection", | |
| "elements": elements, | |
| } | |
| ], | |
| "auto_decompress": True, | |
| } | |
| def galaxy_request(url, payload, args): | |
| """Make an authenticated POST to a Galaxy API endpoint and return parsed JSON.""" | |
| data = json.dumps(payload).encode("utf-8") | |
| req = urllib.request.Request(url, data=data, method="POST") | |
| req.add_header("Content-Type", "application/json") | |
| req.add_header("Accept", "application/json, text/plain, */*") | |
| if args.api_key: | |
| req.add_header("x-api-key", args.api_key) | |
| else: | |
| req.add_header("Cookie", f"galaxysession={args.galaxy_session}") | |
| try: | |
| with urllib.request.urlopen(req, timeout=120) as resp: | |
| body = resp.read().decode("utf-8") | |
| result = json.loads(body) | |
| return result | |
| except urllib.error.HTTPError as e: | |
| body = e.read().decode("utf-8", errors="replace") | |
| print(f"HTTP {e.code}:", file=sys.stderr) | |
| try: | |
| print(json.dumps(json.loads(body), indent=2), file=sys.stderr) | |
| except json.JSONDecodeError: | |
| print(body[:2000], file=sys.stderr) | |
| sys.exit(1) | |
| def build_cat1_payload(history_id, hdca_id): | |
| """Build the cat1 tool payload to run on a dataset collection.""" | |
| return { | |
| "history_id": history_id, | |
| "tool_id": "cat1", | |
| "tool_version": "1.0.0", | |
| "__tags": [], | |
| "inputs": { | |
| "input1": { | |
| "batch": True, | |
| "product": False, | |
| "values": [ | |
| {"id": hdca_id, "src": "hdca", "map_over_type": None} | |
| ], | |
| } | |
| }, | |
| } | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Submit ascp links to a Galaxy fetch API, then run cat1 on the collection" | |
| ) | |
| parser.add_argument( | |
| "--galaxy-url", | |
| default="https://test.galaxyproject.org", | |
| help="Galaxy instance URL (default: https://test.galaxyproject.org)", | |
| ) | |
| parser.add_argument( | |
| "--history-id", | |
| required=True, | |
| help="Galaxy history ID to upload into", | |
| ) | |
| group = parser.add_mutually_exclusive_group(required=True) | |
| group.add_argument( | |
| "--api-key", | |
| help="Galaxy API key for authentication", | |
| ) | |
| group.add_argument( | |
| "--galaxy-session", | |
| help="Galaxy session cookie value", | |
| ) | |
| parser.add_argument( | |
| "--ascp-file", | |
| default=ASCP_FILE, | |
| help=f"File containing ascp links, one per line (default: {ASCP_FILE})", | |
| ) | |
| args = parser.parse_args() | |
| with open(args.ascp_file) as f: | |
| urls = [line.strip() for line in f if line.strip()] | |
| if not urls: | |
| print("No URLs found in", args.ascp_file, file=sys.stderr) | |
| sys.exit(1) | |
| print(f"Read {len(urls)} URLs from {args.ascp_file}") | |
| base_url = args.galaxy_url.rstrip("/") | |
| # Step 1: Fetch/upload the ascp links as a collection | |
| fetch_url = f"{base_url}/api/tools/fetch" | |
| fetch_payload = build_payload(args.history_id, urls) | |
| print(f"Step 1: POSTing {len(urls)} URLs to {fetch_url} ...") | |
| fetch_result = galaxy_request(fetch_url, fetch_payload, args) | |
| hdca_id = fetch_result["output_collections"][0]["id"] | |
| collection_name = fetch_result["output_collections"][0].get("name", "") | |
| print(f" Created collection '{collection_name}' (hdca_id: {hdca_id})") | |
| # Step 2: Run cat1 tool on the collection | |
| tools_url = f"{base_url}/api/tools" | |
| cat1_payload = build_cat1_payload(args.history_id, hdca_id) | |
| print(f"Step 2: Running cat1 on collection {hdca_id} ...") | |
| cat1_result = galaxy_request(tools_url, cat1_payload, args) | |
| jobs = cat1_result.get("jobs", []) | |
| print(f" Submitted {len(jobs)} job(s)") | |
| for job in jobs: | |
| print(f" Job {job['id']}: state={job['state']}") | |
| print(json.dumps(cat1_result, indent=2)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment