Skip to content

Instantly share code, notes, and snippets.

@AndreFCruz
Last active December 8, 2023 13:30
Show Gist options
  • Select an option

  • Save AndreFCruz/e697250a2324138892391f4ea56966f6 to your computer and use it in GitHub Desktop.

Select an option

Save AndreFCruz/e697250a2324138892391f4ea56966f6 to your computer and use it in GitHub Desktop.
Helper to submit arbitrary executables to the HTCondor scheduler
#!/usr/bin/env python3
from __future__ import annotations
import sys
import random
import logging
from pathlib import Path
from datetime import datetime
from argparse import ArgumentParser
import htcondor
import classad
# USER EMAIL TO NOTIFY ON JOB COMPLETION
USER_EMAIL = None # TODO: fill in with your email if you want to be notified
# Job submission defaults
DEFAULT_REQUEST_CPUS = 1
DEFAULT_REQUEST_GPUS = 0
DEFAULT_REQUEST_MEMORY = "16GB"
DEFAULT_LOGS_SAVE_DIR = Path().cwd() / "logs"
DEFAULT_JOB_BID = 25
DEFAULT_MAX_RUNNING_PRICE = 100
def setup_arg_parser() -> ArgumentParser:
"""Setup the command-line argument parser for this script."""
# Init parser
parser = ArgumentParser(description="Launch a job on the HTCondor scheduler.")
# List of command-line arguments, with type and helper string
cli_args = [
("executable", str, "[str] Path to the executable to run"),
("--request-cpus", int, "[int] Number of CPUs to request for the job", DEFAULT_REQUEST_CPUS),
("--request-gpus", int, "[int] Number of GPUs to request for the job", DEFAULT_REQUEST_GPUS),
("--request-memory", str, "[str] Amount of memory to request for the job", DEFAULT_REQUEST_MEMORY),
("--request-gpu-memory",str, "[str] Amount of GPU memory to request for the job", None),
("--logs-save-dir", str, "[str] Path to the directory where to save the logs", DEFAULT_LOGS_SAVE_DIR),
("--concurrency-limit", int, "[int] Maximum number of jobs that can run concurrently", None),
("--concurrency-group", str, "[str] Name of the group over which to enforce the concurrency limit", None),
("--job-bid", int, "[int] The initial bid for the job", DEFAULT_JOB_BID),
("--max-running-price", int, "[int] The maximum running price for the job", DEFAULT_MAX_RUNNING_PRICE),
("--num-processes", int, "[int] Number of processes to launch with the given executable", 1),
]
for arg in cli_args:
parser.add_argument(
arg[0],
type=arg[1],
help=arg[2],
default=(arg[3] if len(arg) > 3 else None), # NOTE: default value if any
)
# Add boolean arguments
parser.add_argument(
"--use-random-seed",
action="store_true",
help="[bool] Whether to seed the executable with a random seed",
default=False,
)
# Add variable number of arguments
parser.add_argument(
"--executable-args",
nargs="*",
help="[str] Variable number of arguments to pass to the executable",
default=None,
)
return parser
def get_current_time_str() -> str:
"""Returns the current timestamp in the format: YYYY-MM-DD_HH-MM-SS
"""
return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
def launch_executable_job(
executable: str | Path,
request_cpus: int = DEFAULT_REQUEST_CPUS,
request_gpus: int = DEFAULT_REQUEST_GPUS,
request_memory: str = DEFAULT_REQUEST_MEMORY,
request_gpu_memory: str = None,
logs_save_dir: str | Path = None,
concurrency_limit: int = None,
concurrency_limit_group: str = None,
use_random_seed: bool = False,
job_bid: int = DEFAULT_JOB_BID,
max_running_price: int = DEFAULT_MAX_RUNNING_PRICE,
num_processes: int = 1,
executable_args: list[str] = None,
**submit_kwargs,
):
"""Launch a job on the HTCondor scheduler.
Parameters
----------
executable : str | Path
Path to the executable to run.
request_cpus : int, optional
Number of CPUs to request for the job, by default 1.
request_gpus : int, optional
Number of GPUs to request for the job, by default 0.
request_memory : str, optional
Amount of memory to request for the job, by default "16GB".
request_gpu_memory : str, optional
Amount of GPU memory to request for the job, by default None.
logs_save_dir : str | Path, optional
Path to the directory where to save the logs, by default will save
to a folder named "logs" on the current working directory.
concurrency_limit : int, optional
Maximum number of jobs that can run concurrently, by default will not
enforce a limit.
concurrency_limit_group : str, optional
Name of the group over which to enforce the concurrency limit, by
default will use the executable name.
use_random_seed : bool, optional
Whether to seed the executable with a random seed, by default False.
job_bid : int, optional
The initial bid for the job, by default 25.
max_running_price : int, optional
The maximum running price for the job, by default 100.
num_processes : int, optional
Number of processes to launch, by default 1.
executable_args : dict, optional
Extra keyword arguments to pass to the executable, by default None.
**submit_kwargs
Extra keyword arguments to pass to the htcondor.Submit object.
"""
curr_time_str = get_current_time_str()
executable = Path(executable)
# Name for cluster logs related to this job
logs_save_dir = Path(logs_save_dir) if logs_save_dir else DEFAULT_LOGS_SAVE_DIR
logs_save_dir.mkdir(exist_ok=True)
logs_file_path = (
logs_save_dir
/ f"{executable.name}_$(Cluster).$(Process)_{curr_time_str}"
).as_posix()
# Construct job description
job_desc_dict = {
"executable": f"{sys.executable}", # correct env for the python executable
"arguments": (
f"{executable.as_posix()} "
f"{' '.join(executable_args or [])} "
),
"output": f"{logs_file_path}.out",
"error": f"{logs_file_path}.err",
"log": f"{logs_file_path}.log",
"request_cpus": f"{request_cpus}",
"request_memory": f"{request_memory}",
"request_disk": "5GB",
"jobprio": f"{job_bid - 1000}",
"+MaxRunningPrice": max_running_price,
"+RunningPriceExceededAction": classad.quote("restart"),
}
# Add GPU memory request if any
if request_gpu_memory:
assert request_gpus > 0, "Must request at least 1 GPU to request GPU memory"
job_desc_dict["requirements"] = f"(Target.GPUMemory >= {request_gpu_memory})"
job_desc_dict["request_gpus"] = f"{request_gpus}"
# Add concurrency limit if any
if concurrency_limit:
resources_per_job = 10_000 // concurrency_limit
job_desc_dict["concurrency_limits"] = (
f"{concurrency_limit_group or executable.name}:{resources_per_job}"
)
# Seed the executable with a random seed
if use_random_seed:
job_desc_dict["job_seed_macro"] = f"$(Process) + {random.randrange(int(2 ** 31 - 1))}" # add random salt to all job seeds
job_desc_dict["job_seed"] = "$INT(job_seed_macro)"
job_desc_dict["arguments"] += "--seed $(job_seed) "
elif num_processes > 1:
logging.warning(f"Multiple processes requested but will likely use the same seed for all processes")
# Add user email to notify on job completion
if USER_EMAIL:
job_desc_dict["notify_user"] = USER_EMAIL
job_desc_dict["notification"] = "error"
# Add extra submit kwargs
job_desc_dict.update({
key.strip("-").replace("-", "_"): value
for key, value in submit_kwargs.items()
})
import ipdb; ipdb.set_trace()
# Create a submit description object and submit it to the scheduler
job_description = htcondor.Submit(job_desc_dict)
# Submit `n_trials` jobs to the scheduler
schedd = htcondor.Schedd()
submit_result = schedd.submit(job_description, count=num_processes)
print(
f"Launched {submit_result.num_procs()} processes with "
f"cluster-ID={submit_result.cluster()}\n")
if __name__ == "__main__":
# Setup parser and process cmd-line args
parser = setup_arg_parser()
# args = parser.parse_args()
args, submit_kwargs = parser.parse_known_args()
print(f"Current python executable: '{sys.executable}'\n")
print(f"Received the following cmd-line args: {args}\n")
# Launch the job
launch_executable_job(
executable=args.executable,
request_cpus=args.request_cpus,
request_gpus=args.request_gpus,
request_memory=args.request_memory,
request_gpu_memory=args.request_gpu_memory,
logs_save_dir=args.logs_save_dir,
concurrency_limit=args.concurrency_limit,
concurrency_limit_group=args.concurrency_group,
use_random_seed=args.use_random_seed,
job_bid=args.job_bid,
max_running_price=args.max_running_price,
num_processes=args.num_processes,
executable_args=args.executable_args,
# Convert list of ("--k1", "v1", "--k2", "v2", ...) to kwargs dict
**dict(zip(submit_kwargs[::2], submit_kwargs[1::2])),
)
@AndreFCruz
Copy link
Author

Usage

usage: condor_submit.py [-h] [--request-cpus REQUEST_CPUS] [--request-gpus REQUEST_GPUS] [--request-memory REQUEST_MEMORY] [--request-gpu-memory REQUEST_GPU_MEMORY] [--logs-save-dir LOGS_SAVE_DIR]
                        [--concurrency-limit CONCURRENCY_LIMIT] [--concurrency-group CONCURRENCY_GROUP] [--job-bid JOB_BID] [--max-running-price MAX_RUNNING_PRICE] [--num-processes NUM_PROCESSES]
                        [--use-random-seed] [--executable-args [EXECUTABLE_ARGS ...]]
                        executable

Launch a job on the HTCondor scheduler.

positional arguments:
  executable            [str] Path to the executable to run

optional arguments:
  -h, --help            show this help message and exit
  --request-cpus REQUEST_CPUS
                        [int] Number of CPUs to request for the job
  --request-gpus REQUEST_GPUS
                        [int] Number of GPUs to request for the job
  --request-memory REQUEST_MEMORY
                        [str] Amount of memory to request for the job
  --request-gpu-memory REQUEST_GPU_MEMORY
                        [str] Amount of GPU memory to request for the job
  --logs-save-dir LOGS_SAVE_DIR
                        [str] Path to the directory where to save the logs
  --concurrency-limit CONCURRENCY_LIMIT
                        [int] Maximum number of jobs that can run concurrently
  --concurrency-group CONCURRENCY_GROUP
                        [str] Name of the group over which to enforce the concurrency limit
  --job-bid JOB_BID     [int] The initial bid for the job
  --max-running-price MAX_RUNNING_PRICE
                        [int] The maximum running price for the job
  --num-processes NUM_PROCESSES
                        [int] Number of processes to launch with the given executable
  --use-random-seed     [bool] Whether to seed the executable with a random seed
  --executable-args [EXECUTABLE_ARGS ...]
                        [str] Variable number of arguments to pass to the executable

Note:
All extra key-word arguments (unrecognized by the argument parser) will be added to the job description.
For instance: condor_submit.py <recognized-args ...> --unrec-arg1 val1 will add {"unrec_arg1": "val1"} to the job submission parameters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment