Last active
December 8, 2023 13:30
-
-
Save AndreFCruz/e697250a2324138892391f4ea56966f6 to your computer and use it in GitHub Desktop.
Helper to submit arbitrary executables to the HTCondor scheduler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import sys | |
| import random | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| from argparse import ArgumentParser | |
| import htcondor | |
| import classad | |
| # USER EMAIL TO NOTIFY ON JOB COMPLETION | |
| USER_EMAIL = None # TODO: fill in with your email if you want to be notified | |
| # Job submission defaults | |
| DEFAULT_REQUEST_CPUS = 1 | |
| DEFAULT_REQUEST_GPUS = 0 | |
| DEFAULT_REQUEST_MEMORY = "16GB" | |
| DEFAULT_LOGS_SAVE_DIR = Path().cwd() / "logs" | |
| DEFAULT_JOB_BID = 25 | |
| DEFAULT_MAX_RUNNING_PRICE = 100 | |
| def setup_arg_parser() -> ArgumentParser: | |
| """Setup the command-line argument parser for this script.""" | |
| # Init parser | |
| parser = ArgumentParser(description="Launch a job on the HTCondor scheduler.") | |
| # List of command-line arguments, with type and helper string | |
| cli_args = [ | |
| ("executable", str, "[str] Path to the executable to run"), | |
| ("--request-cpus", int, "[int] Number of CPUs to request for the job", DEFAULT_REQUEST_CPUS), | |
| ("--request-gpus", int, "[int] Number of GPUs to request for the job", DEFAULT_REQUEST_GPUS), | |
| ("--request-memory", str, "[str] Amount of memory to request for the job", DEFAULT_REQUEST_MEMORY), | |
| ("--request-gpu-memory",str, "[str] Amount of GPU memory to request for the job", None), | |
| ("--logs-save-dir", str, "[str] Path to the directory where to save the logs", DEFAULT_LOGS_SAVE_DIR), | |
| ("--concurrency-limit", int, "[int] Maximum number of jobs that can run concurrently", None), | |
| ("--concurrency-group", str, "[str] Name of the group over which to enforce the concurrency limit", None), | |
| ("--job-bid", int, "[int] The initial bid for the job", DEFAULT_JOB_BID), | |
| ("--max-running-price", int, "[int] The maximum running price for the job", DEFAULT_MAX_RUNNING_PRICE), | |
| ("--num-processes", int, "[int] Number of processes to launch with the given executable", 1), | |
| ] | |
| for arg in cli_args: | |
| parser.add_argument( | |
| arg[0], | |
| type=arg[1], | |
| help=arg[2], | |
| default=(arg[3] if len(arg) > 3 else None), # NOTE: default value if any | |
| ) | |
| # Add boolean arguments | |
| parser.add_argument( | |
| "--use-random-seed", | |
| action="store_true", | |
| help="[bool] Whether to seed the executable with a random seed", | |
| default=False, | |
| ) | |
| # Add variable number of arguments | |
| parser.add_argument( | |
| "--executable-args", | |
| nargs="*", | |
| help="[str] Variable number of arguments to pass to the executable", | |
| default=None, | |
| ) | |
| return parser | |
| def get_current_time_str() -> str: | |
| """Returns the current timestamp in the format: YYYY-MM-DD_HH-MM-SS | |
| """ | |
| return datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| def launch_executable_job( | |
| executable: str | Path, | |
| request_cpus: int = DEFAULT_REQUEST_CPUS, | |
| request_gpus: int = DEFAULT_REQUEST_GPUS, | |
| request_memory: str = DEFAULT_REQUEST_MEMORY, | |
| request_gpu_memory: str = None, | |
| logs_save_dir: str | Path = None, | |
| concurrency_limit: int = None, | |
| concurrency_limit_group: str = None, | |
| use_random_seed: bool = False, | |
| job_bid: int = DEFAULT_JOB_BID, | |
| max_running_price: int = DEFAULT_MAX_RUNNING_PRICE, | |
| num_processes: int = 1, | |
| executable_args: list[str] = None, | |
| **submit_kwargs, | |
| ): | |
| """Launch a job on the HTCondor scheduler. | |
| Parameters | |
| ---------- | |
| executable : str | Path | |
| Path to the executable to run. | |
| request_cpus : int, optional | |
| Number of CPUs to request for the job, by default 1. | |
| request_gpus : int, optional | |
| Number of GPUs to request for the job, by default 0. | |
| request_memory : str, optional | |
| Amount of memory to request for the job, by default "16GB". | |
| request_gpu_memory : str, optional | |
| Amount of GPU memory to request for the job, by default None. | |
| logs_save_dir : str | Path, optional | |
| Path to the directory where to save the logs, by default will save | |
| to a folder named "logs" on the current working directory. | |
| concurrency_limit : int, optional | |
| Maximum number of jobs that can run concurrently, by default will not | |
| enforce a limit. | |
| concurrency_limit_group : str, optional | |
| Name of the group over which to enforce the concurrency limit, by | |
| default will use the executable name. | |
| use_random_seed : bool, optional | |
| Whether to seed the executable with a random seed, by default False. | |
| job_bid : int, optional | |
| The initial bid for the job, by default 25. | |
| max_running_price : int, optional | |
| The maximum running price for the job, by default 100. | |
| num_processes : int, optional | |
| Number of processes to launch, by default 1. | |
| executable_args : dict, optional | |
| Extra keyword arguments to pass to the executable, by default None. | |
| **submit_kwargs | |
| Extra keyword arguments to pass to the htcondor.Submit object. | |
| """ | |
| curr_time_str = get_current_time_str() | |
| executable = Path(executable) | |
| # Name for cluster logs related to this job | |
| logs_save_dir = Path(logs_save_dir) if logs_save_dir else DEFAULT_LOGS_SAVE_DIR | |
| logs_save_dir.mkdir(exist_ok=True) | |
| logs_file_path = ( | |
| logs_save_dir | |
| / f"{executable.name}_$(Cluster).$(Process)_{curr_time_str}" | |
| ).as_posix() | |
| # Construct job description | |
| job_desc_dict = { | |
| "executable": f"{sys.executable}", # correct env for the python executable | |
| "arguments": ( | |
| f"{executable.as_posix()} " | |
| f"{' '.join(executable_args or [])} " | |
| ), | |
| "output": f"{logs_file_path}.out", | |
| "error": f"{logs_file_path}.err", | |
| "log": f"{logs_file_path}.log", | |
| "request_cpus": f"{request_cpus}", | |
| "request_memory": f"{request_memory}", | |
| "request_disk": "5GB", | |
| "jobprio": f"{job_bid - 1000}", | |
| "+MaxRunningPrice": max_running_price, | |
| "+RunningPriceExceededAction": classad.quote("restart"), | |
| } | |
| # Add GPU memory request if any | |
| if request_gpu_memory: | |
| assert request_gpus > 0, "Must request at least 1 GPU to request GPU memory" | |
| job_desc_dict["requirements"] = f"(Target.GPUMemory >= {request_gpu_memory})" | |
| job_desc_dict["request_gpus"] = f"{request_gpus}" | |
| # Add concurrency limit if any | |
| if concurrency_limit: | |
| resources_per_job = 10_000 // concurrency_limit | |
| job_desc_dict["concurrency_limits"] = ( | |
| f"{concurrency_limit_group or executable.name}:{resources_per_job}" | |
| ) | |
| # Seed the executable with a random seed | |
| if use_random_seed: | |
| job_desc_dict["job_seed_macro"] = f"$(Process) + {random.randrange(int(2 ** 31 - 1))}" # add random salt to all job seeds | |
| job_desc_dict["job_seed"] = "$INT(job_seed_macro)" | |
| job_desc_dict["arguments"] += "--seed $(job_seed) " | |
| elif num_processes > 1: | |
| logging.warning(f"Multiple processes requested but will likely use the same seed for all processes") | |
| # Add user email to notify on job completion | |
| if USER_EMAIL: | |
| job_desc_dict["notify_user"] = USER_EMAIL | |
| job_desc_dict["notification"] = "error" | |
| # Add extra submit kwargs | |
| job_desc_dict.update({ | |
| key.strip("-").replace("-", "_"): value | |
| for key, value in submit_kwargs.items() | |
| }) | |
| import ipdb; ipdb.set_trace() | |
| # Create a submit description object and submit it to the scheduler | |
| job_description = htcondor.Submit(job_desc_dict) | |
| # Submit `n_trials` jobs to the scheduler | |
| schedd = htcondor.Schedd() | |
| submit_result = schedd.submit(job_description, count=num_processes) | |
| print( | |
| f"Launched {submit_result.num_procs()} processes with " | |
| f"cluster-ID={submit_result.cluster()}\n") | |
| if __name__ == "__main__": | |
| # Setup parser and process cmd-line args | |
| parser = setup_arg_parser() | |
| # args = parser.parse_args() | |
| args, submit_kwargs = parser.parse_known_args() | |
| print(f"Current python executable: '{sys.executable}'\n") | |
| print(f"Received the following cmd-line args: {args}\n") | |
| # Launch the job | |
| launch_executable_job( | |
| executable=args.executable, | |
| request_cpus=args.request_cpus, | |
| request_gpus=args.request_gpus, | |
| request_memory=args.request_memory, | |
| request_gpu_memory=args.request_gpu_memory, | |
| logs_save_dir=args.logs_save_dir, | |
| concurrency_limit=args.concurrency_limit, | |
| concurrency_limit_group=args.concurrency_group, | |
| use_random_seed=args.use_random_seed, | |
| job_bid=args.job_bid, | |
| max_running_price=args.max_running_price, | |
| num_processes=args.num_processes, | |
| executable_args=args.executable_args, | |
| # Convert list of ("--k1", "v1", "--k2", "v2", ...) to kwargs dict | |
| **dict(zip(submit_kwargs[::2], submit_kwargs[1::2])), | |
| ) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage
Note:
All extra key-word arguments (unrecognized by the argument parser) will be added to the job description.
For instance:
condor_submit.py <recognized-args ...> --unrec-arg1 val1will add{"unrec_arg1": "val1"}to the job submission parameters.