Ray Job Dispatch and Tuning — Isaac Lab Documentation
Title: Ray Job Dispatch and Tuning#
URL Source: https://isaac-sim.github.io/IsaacLab/main/source/features/ray.html
Published Time: Thu, 11 Sep 2025 17:00:56 GMT
Markdown Content: Isaac Lab supports Ray for streamlining dispatching multiple training jobs (in parallel and in series), and hyperparameter tuning, both on local and remote configurations.
This independent community contributed walkthrough video demonstrates some of the core functionality of the Ray integration covered in this overview. Although there may be some differences in the codebase (such as file names being shortened) since the creation of the video, the general workflow is the same.
Attention
This functionality is experimental, and has been tested only on Linux.
Overview#
The Ray integration is useful for the following.
-
Dispatching several training jobs in parallel or sequentially with minimal interaction.
-
Tuning hyperparameters; in parallel or sequentially with support for multiple GPUs and/or multiple GPU Nodes.
-
Using the same training setup everywhere (on cloud and local) with minimal overhead.
-
Resource Isolation for training jobs (resource-wrapped jobs).
The core functionality of the Ray workflow consists of two main scripts that enable the orchestration of resource-wrapped and tuning aggregate jobs. In resource-wrapped aggregate jobs, each sub-job and its resource requirements are defined manually, enabling resource isolation. For tuning aggregate jobs, individual jobs are generated automatically based on a hyperparameter sweep configuration.
Both resource-wrapped and tuning aggregate jobs dispatch individual jobs to a designated Ray cluster, which leverages the cluster’s resources (e.g., a single workstation node or multiple nodes) to execute these jobs with workers in parallel and/or sequentially.
By default, jobs use all available resources on each available GPU-enabled node for each sub-job worker. This can be changed through specifying the --num_workers argument for resource-wrapped jobs, or --num_workers_per_node for tuning jobs, which is especially critical for parallel aggregate job processing on local/virtual multi-GPU machines. Tuning jobs assume homogeneous node resource composition for nodes with GPUs.
The three following files contain the core functionality of the Ray integration.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
"""
This script dispatches sub-job(s) (individual jobs, use :file:tuner.py for tuning jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.If the desired resources for each sub-job is specified,the maximum number of workers possible with the desired resources are created for each nodewith GPU(s) in the cluster. It is also possible to split available node resources for each nodeinto the desired number of workers with the --num_workers flag, to be able to easilyparallelize sub-jobs on multi-GPU nodes. Due to Isaac Lab requiring a GPU,this ignores all CPU only nodes such as loggers.Sub-jobs are matched with node(s) in a cluster via the following relation:sorted_nodes = Node sorted by descending GPUs, then descending CPUs, then descending RAM, then node IDnode_submitted_to = sorted_nodes[job_index % total_node_count]To check the ordering of sorted nodes, supply the --test argument and run the script.Sub-jobs are separated by the + delimiter. The --sub_jobs argument must be the lastargument supplied to the script.If there is more than one available worker, and more than one sub-job,sub-jobs will be executed in parallel. If there are more sub-jobs than workers, sub-jobs willbe dispatched to workers as they become available. There is no limit on the numberof sub-jobs that can be near-simultaneously submitted.This script is meant to be executed on a Ray cluster head node as an aggregate cluster job.To submit aggregate cluster jobs such as this script to one or more remote clusters,see :file:../submit_isaac_ray_job.py.KubeRay clusters on Google GKE can be created with :file:../launch.pyUsage:.. code-block:: bash # Ensure that sub-jobs are separated by the + delimiter. # Generic Templates----------------------------------- ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h # No resource isolation; no parallelization: ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --sub_jobs <JOB0>+<JOB1>+<JOB2> # Automatic Resource Isolation; Example A: needed for parallelization ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py \ --num_workers <NUM_TO_DIVIDE_TOTAL_RESOURCES_BY> \ --sub_jobs <JOB0>+<JOB1> # Manual Resource Isolation; Example B: needed for parallelization ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --num_cpu_per_worker <CPU> \ --gpu_per_worker <GPU> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1> # Manual Resource Isolation; Example C: Needed for parallelization, for heterogeneous workloads ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --num_cpu_per_worker <CPU> \ --gpu_per_worker <GPU1> <GPU2> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1> # to see all arguments ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h"""import argparseimport util
def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None: """ Provided a list of jobs, dispatch jobs to one worker per available node, unless otherwise specified by resource constraints.
Args: jobs: bash commands to execute on a Ray cluster args: The arguments for resource allocation
""" job_objs = [] util.ray_init( ray_address=args.ray_address, runtime_env={ "py_modules": None if not args.py_modules else args.py_modules, }, log_to_driver=False, ) gpu_node_resources = util.get_gpu_node_resources(include_id=True, include_gb_ram=True)
if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers:
raise ValueError("Either specify only num_workers or only granular resources(GPU,CPU,RAM_GB).")
num_nodes = len(gpu_node_resources)
# Populate arguments
formatted_node_resources = {
"gpu_per_worker": [gpu_node_resources[i]["GPU"] for i in range(num_nodes)],
"cpu_per_worker": [gpu_node_resources[i]["CPU"] for i in range(num_nodes)],
"ram_gb_per_worker": [gpu_node_resources[i]["ram_gb"] for i in range(num_nodes)],
"num_workers": args.num_workers, # By default, 1 worker por node
}
args = util.fill_in_missing_resources(args, resources=formatted_node_resources, policy=min)
print(f"[INFO]: Number of GPU nodes found: {num_nodes}")
if args.test:
jobs = ["nvidia-smi"] * num_nodes
for i, job in enumerate(jobs):
gpu_node = gpu_node_resources[i % num_nodes]
print(f"[INFO]: Creating job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
print(
f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}"
f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}"
)
print(f"[INFO] For the node parameters, creating {args.num_workers[i]} workers")
num_gpus = args.gpu_per_worker[i] / args.num_workers[i]
num_cpus = args.cpu_per_worker[i] / args.num_workers[i]
memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i]
job_objs.append(
util.Job(
cmd=job,
name=f"Job-{i + 1}",
resources=util.JobResource(num_gpus=num_gpus, num_cpus=num_cpus, memory=memory),
node=util.JobNode(
specific="node_id",
node_id=gpu_node["id"],
),
)
)
# submit jobs
util.submit_wrapped_jobs(jobs=job_objs, test_mode=args.test, concurrent=False)
if name == "main": parser = argparse.ArgumentParser(description="Submit multiple jobs with optional GPU testing.") parser = util.add_resource_arguments(arg_parser=parser) parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.") parser.add_argument( "--test", action="store_true", help=( "Run nvidia-smi test instead of the arbitrary job," "can use as a sanity check prior to any jobs to check " "that GPU resources are correctly isolated." ), ) parser.add_argument( "--py_modules", type=str, nargs="*", default=[], help=( "List of python modules or paths to add before running the job. Example: --py_modules my_package/my_package" ), ) parser.add_argument( "--sub_jobs", type=str, nargs=argparse.REMAINDER, help="This should be last wrapper argument. Jobs separated by the + delimiter to run on a cluster.", ) args = parser.parse_args() if args.sub_jobs is not None: jobs = " ".join(args.sub_jobs) formatted_jobs = jobs.split("+") else: formatted_jobs = [] print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}") wrap_resources_to_jobs(jobs=formatted_jobs, args=args)
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
import argparse import importlib.util import os import subprocess import sys from time import sleep, time
import ray import util from ray import air, tune from ray.tune.search.optuna import OptunaSearch from ray.tune.search.repeater import Repeater
"""This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,into individual jobs (shell commands) to run on the GPU-enabled nodes of the cluster.By default, one worker is created for each GPU-enabled node in the cluster for each individual job.To use more than one worker per node (likely the case for multi-GPU machines), supply thenum_workers_per_node argument.Each hyperparameter sweep configuration should include the workflow,runner arguments, and hydra arguments to vary.This assumes that all workers in a cluster are homogeneous. For heterogeneous workloads,create several heterogeneous clusters (with homogeneous nodes in each cluster),then submit several overall-cluster jobs with :file:../submit_job.py.KubeRay clusters on Google GKE can be created with :file:../launch.pyTo report tune metrics on clusters, a running MLFlow server with a known URI that the cluster hasaccess to is required. For KubeRay clusters configured with :file:../launch.py, this is includedautomatically, and can be easily found with with :file:grok_cluster_with_kubectl.pyUsage:.. code-block:: bash ./isaaclab.sh -p scripts/reinforcement_learning/ray/tuner.py -h # Examples # Local ./isaaclab.sh -p scripts/reinforcement_learning/ray/tuner.py --run_mode local \ --cfg_file scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py \ --cfg_class CartpoleTheiaJobCfg # Remote (run grok cluster or create config file mentioned in :file:submit_job.py) ./isaaclab.sh -p scripts/reinforcement_learning/ray/submit_job.py \ --aggregate_jobs tuner.py \ --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \ --cfg_class CartpoleTheiaJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>"""
DOCKER_PREFIX = "/workspace/isaaclab/"
BASE_DIR = os.path.expanduser("~")
PYTHON_EXEC = "./isaaclab.sh -p"
WORKFLOW = "scripts/reinforcement_learning/rl_games/train.py"
NUM_WORKERS_PER_NODE = 1 # needed for local parallelism
PROCESS_RESPONSE_TIMEOUT = 200.0 # seconds to wait before killing the process when it stops responding
MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS = 1000 # maximum number of lines to read from the training process logs
MAX_LOG_EXTRACTION_ERRORS = 2 # maximum allowed LogExtractionErrors before we abort the whole training
class IsaacLabTuneTrainable(tune.Trainable):
"""The Isaac Lab Ray Tune Trainable.
This class uses the standalone workflows to start jobs, along with the hydra integration.
This class achieves Ray-based logging through reading the tensorboard logs from
the standalone workflows. This depends on a config generated in the format of
:class:JobCfg
"""
def setup(self, config: dict) -> None:
"""Get the invocation command, return quick for easy scheduling.""" self.data = None self.time_since_last_proc_response = 0.0 self.invoke_cmd = util.get_invocation_command_from_cfg(cfg=config, python_cmd=PYTHON_EXEC, workflow=WORKFLOW) print(f"[INFO]: Recovered invocation with {self.invoke_cmd}") self.experiment = None
def reset_config(self, new_config: dict):
"""Allow environments to be re-used by fetching a new invocation command""" self.setup(new_config) return True
def step(self) -> dict:
if self.experiment is None: # start experiment
# When including this as first step instead of setup, experiments get scheduled faster
# Don't want to block the scheduler while the experiment spins up
print(f"[INFO]: Invoking experiment as first step with {self.invoke_cmd}...")
try:
experiment = util.execute_job(
self.invoke_cmd,
identifier_string="",
extract_experiment=True, # Keep this as True to return a valid dictionary
persistent_dir=BASE_DIR,
max_lines_to_search_logs=MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS,
max_time_to_search_logs=PROCESS_RESPONSE_TIMEOUT,
)
except util.LogExtractionError:
self.data = {
"LOG_EXTRACTION_ERROR_STOPPER_FLAG": True,
"done": True,
}
return self.data
self.experiment = experiment
print(f"[INFO]: Tuner recovered experiment info {experiment}")
self.proc = experiment["proc"]
self.experiment_name = experiment["experiment_name"]
self.isaac_logdir = experiment["logdir"]
self.tensorboard_logdir = self.isaac_logdir + "/" + self.experiment_name
self.done = False
if self.proc is None:
raise ValueError("Could not start trial.")
proc_status = self.proc.poll()
if proc_status is not None: # process finished, signal finish
self.data["done"] = True
print(f"[INFO]: Process finished with {proc_status}, returning...")
else: # wait until the logs are ready or fresh
data = util.load_tensorboard_logs(self.tensorboard_logdir)
while data is None:
data = util.load_tensorboard_logs(self.tensorboard_logdir)
proc_status = self.proc.poll()
if proc_status is not None:
break
sleep(2) # Lazy report metrics to avoid performance overhead
if self.data is not None:
data_ = {k: v for k, v in data.items() if k != "done"}
self_data_ = {k: v for k, v in self.data.items() if k != "done"}
unresponsiveness_start_time = time()
while util._dicts_equal(data_, self_data_):
self.time_since_last_proc_response = time() - unresponsiveness_start_time
data = util.load_tensorboard_logs(self.tensorboard_logdir)
data_ = {k: v for k, v in data.items() if k != "done"}
proc_status = self.proc.poll()
if proc_status is not None:
break
if self.time_since_last_proc_response > PROCESS_RESPONSE_TIMEOUT:
self.time_since_last_proc_response = 0.0
print("[WARNING]: Training workflow process is not responding, terminating...")
self.proc.terminate()
try:
self.proc.wait(timeout=20)
except subprocess.TimeoutExpired:
print("[ERROR]: The process did not terminate within timeout duration.")
self.proc.kill()
self.proc.wait()
self.data = data
self.data["done"] = True
return self.data
sleep(2) # Lazy report metrics to avoid performance overhead
self.data = data
self.data["done"] = False
return self.data
def default_resource_request(self):
"""How many resources each trainable uses. Assumes homogeneous resources across gpu nodes, and that each trainable is meant for one node, where it uses all available resources.""" resources = util.get_gpu_node_resources(one_node_only=True) if NUM_WORKERS_PER_NODE != 1: print("[WARNING]: Splitting node into more than one worker") return tune.PlacementGroupFactory( [{"CPU": resources["CPU"] / NUM_WORKERS_PER_NODE, "GPU": resources["GPU"] / NUM_WORKERS_PER_NODE}], strategy="STRICT_PACK", )
class LogExtractionErrorStopper(tune.Stopper): """Stopper that stops all trials if multiple LogExtractionErrors occur.
Args: max_errors: The maximum number of LogExtractionErrors allowed before terminating the experiment. """
def __init__ (self, max_errors: int):
self.max_errors = max_errors
self.error_count = 0
def __call__ (self, trial_id, result):
"""Increments the error count if trial has encountered a LogExtractionError.
It does not stop the trial based on the metrics, always returning False. """ if result.get("LOG_EXTRACTION_ERROR_STOPPER_FLAG", False): self.error_count += 1 print( f"[ERROR]: Encountered LogExtractionError {self.error_count} times. " f"Maximum allowed is {self.max_errors}." ) return False
def stop_all(self):
"""Returns true if number of LogExtractionErrors exceeds the maximum allowed, terminating the experiment.""" if self.error_count > self.max_errors: print("[FATAL]: Encountered LogExtractionError more than allowed, aborting entire tuning run... ") return True else: return False
def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None: """Invoke an Isaac-Ray tuning run.
Log either to a local directory or to MLFlow. Args: cfg: Configuration dictionary extracted from job setup args: Command-line arguments related to tuning. """ # Allow for early exit os.environ["TUNE_DISABLE_STRICT_METRIC_CHECKING"] = "1"
print("[WARNING]: Not saving checkpoints, just running experiment...")
print("[INFO]: Model parameters and metrics will be preserved.")
print("[WARNING]: For homogeneous cluster resources only...")
# Get available resources
resources = util.get_gpu_node_resources()
print(f"[INFO]: Available resources {resources}")
if not ray.is_initialized():
ray.init(
address=args.ray_address,
log_to_driver=True,
num_gpus=len(resources),
)
print(f"[INFO]: Using config {cfg}")
# Configure the search algorithm and the repeater
searcher = OptunaSearch(
metric=args.metric,
mode=args.mode,
)
repeat_search = Repeater(searcher, repeat=args.repeat_run_count)
if args.run_mode == "local": # Standard config, to file
run_config = air.RunConfig(
storage_path="/tmp/ray",
name=f"IsaacRay-{args.cfg_class}-tune",
verbose=1,
checkpoint_config=air.CheckpointConfig(
checkpoint_frequency=0, # Disable periodic checkpointing
checkpoint_at_end=False, # Disable final checkpoint
),
stop=LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS),
)
elif args.run_mode == "remote": # MLFlow, to MLFlow server
mlflow_callback = MLflowLoggerCallback(
tracking_uri=args.mlflow_uri,
experiment_name=f"IsaacRay-{args.cfg_class}-tune",
save_artifact=False,
tags={"run_mode": "remote", "cfg_class": args.cfg_class},
)
run_config = ray.train.RunConfig(
name="mlflow",
storage_path="/tmp/ray",
callbacks=[mlflow_callback],
checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False),
stop=LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS),
)
else:
raise ValueError("Unrecognized run mode.")
# Configure the tuning job
tuner = tune.Tuner(
IsaacLabTuneTrainable,
param_space=cfg,
tune_config=tune.TuneConfig(
metric=args.metric,
mode=args.mode,
search_alg=repeat_search,
num_samples=args.num_samples,
reuse_actors=True,
),
run_config=run_config,
)
# Execute the tuning
tuner.fit()
# Save results to mounted volume
if args.run_mode == "local":
print("[DONE!]: Check results with tensorboard dashboard")
else:
print("[DONE!]: Check results with MLFlow dashboard")
class JobCfg: """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable, at a minimum, the tune job should inherit from this class."""
def __init__ (self, cfg: dict):
""" Runner args include command line arguments passed to the task. For example: cfg["runner_args"]["headless_singleton"] = "--headless" cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras" """ assert "runner_args" in cfg, "No runner arguments specified." """ Task is the desired task to train on. For example: cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"]) """ assert "--task" in cfg["runner_args"], "No task specified." """ Hydra args define the hyperparameters varied within the sweep. For example: cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"]) """ assert "hydra_args" in cfg, "No hyperparameters specified." self.cfg = cfg
if name == "main": parser = argparse.ArgumentParser(description="Tune Isaac Lab hyperparameters.") parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.") parser.add_argument( "--cfg_file", type=str, default="hyperparameter_tuning/vision_cartpole_cfg.py", required=False, help="The relative filepath where a hyperparameter sweep is defined", ) parser.add_argument( "--cfg_class", type=str, default="CartpoleRGBNoTuneJobCfg", required=False, help="Name of the hyperparameter sweep class to use", ) parser.add_argument( "--run_mode", choices=["local", "remote"], default="remote", help=( "Set to local to use ./isaaclab.sh -p python, set to " "remote to use /workspace/isaaclab/isaaclab.sh -p python" ), ) parser.add_argument( "--workflow", default=None, # populated with RL Games help="The absolute path of the workflow to use for the experiment. By default, RL Games is used.", ) parser.add_argument( "--mlflow_uri", type=str, default=None, required=False, help="The MLFlow Uri.", ) parser.add_argument( "--num_workers_per_node", type=int, default=1, help="Number of workers to run on each GPU node. Only supply for parallelism on multi-gpu nodes", )
parser.add_argument("--metric", type=str, default="rewards/time", help="What metric to tune for.")
parser.add_argument(
"--mode",
choices=["max", "min"],
default="max",
help="What to optimize the metric to while tuning",
)
parser.add_argument(
"--num_samples",
type=int,
default=100,
help="How many hyperparameter runs to try total.",
)
parser.add_argument(
"--repeat_run_count",
type=int,
default=3,
help="How many times to repeat each hyperparameter config.",
)
parser.add_argument(
"--process_response_timeout",
type=float,
default=PROCESS_RESPONSE_TIMEOUT,
help="Training workflow process response timeout.",
)
parser.add_argument(
"--max_lines_to_search_experiment_logs",
type=float,
default=MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS,
help="Max number of lines to search for experiment logs before terminating the training workflow process.",
)
parser.add_argument(
"--max_log_extraction_errors",
type=float,
default=MAX_LOG_EXTRACTION_ERRORS,
help="Max number number of LogExtractionError failures before we abort the whole tuning run.",
)
args = parser.parse_args()
PROCESS_RESPONSE_TIMEOUT = args.process_response_timeout
MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS = int(args.max_lines_to_search_experiment_logs)
print(
"[INFO]: The max number of lines to search for experiment logs before (early) terminating the training "
f"workflow process is set to {MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS}.\n"
"[INFO]: The process response timeout, used while updating tensorboard scalars and searching for "
f"experiment logs, is set to {PROCESS_RESPONSE_TIMEOUT} seconds."
)
MAX_LOG_EXTRACTION_ERRORS = int(args.max_log_extraction_errors)
print(
"[INFO]: Max number of LogExtractionError failures before we abort the whole tuning run is "
f"set to {MAX_LOG_EXTRACTION_ERRORS}.\n"
)
NUM_WORKERS_PER_NODE = args.num_workers_per_node
print(f"[INFO]: Using {NUM_WORKERS_PER_NODE} workers per node.")
if args.run_mode == "remote":
BASE_DIR = DOCKER_PREFIX # ensure logs are dumped to persistent location
PYTHON_EXEC = DOCKER_PREFIX + PYTHON_EXEC[2:]
if args.workflow is None:
WORKFLOW = DOCKER_PREFIX + WORKFLOW
else:
WORKFLOW = args.workflow
print(f"[INFO]: Using remote mode {PYTHON_EXEC=} {WORKFLOW=}")
if args.mlflow_uri is not None:
import mlflow
mlflow.set_tracking_uri(args.mlflow_uri)
from ray.air.integrations.mlflow import MLflowLoggerCallback
else:
raise ValueError("Please provide a result MLFLow URI server.")
else: # local
PYTHON_EXEC = os.getcwd() + "/" + PYTHON_EXEC[2:]
if args.workflow is None:
WORKFLOW = os.getcwd() + "/" + WORKFLOW
else:
WORKFLOW = args.workflow
BASE_DIR = os.getcwd()
print(f"[INFO]: Using local mode {PYTHON_EXEC=} {WORKFLOW=}")
file_path = args.cfg_file
class_name = args.cfg_class
print(f"[INFO]: Attempting to use sweep config from {file_path=} {class_name=}")
module_name = os.path.splitext(os.path.basename(file_path))[0]
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
print(f"[INFO]: Successfully imported {module_name} from {file_path}")
if hasattr(module, class_name):
ClassToInstantiate = getattr(module, class_name)
print(f"[INFO]: Found correct class {ClassToInstantiate}")
instance = ClassToInstantiate()
print(f"[INFO]: Successfully instantiated class '{class_name}' from {file_path}")
cfg = instance.cfg
print(f"[INFO]: Grabbed the following hyperparameter sweep config: \n {cfg}")
invoke_tuning_run(cfg, args)
else:
raise AttributeError(f"[ERROR]:Class '{class_name}' not found in {file_path}")
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
""" This script dispatches one or more user-defined Python tasks to workers in a Ray cluster. Each task, along with its resource requirements and execution parameters, is specified in a YAML configuration file. Users may define the number of CPUs, GPUs, and the amount of memory to allocate per task via the config file.
Key features:
- Fine-grained, per-task resource management via config fields (
num_gpus,num_cpus,memory).- Parallel execution of multiple tasks using available resources across the Ray cluster.- Option to specify node affinity for tasks, e.g., by hostname, node ID, or any node.- Optional batch (simultaneous) or independent scheduling of tasks.Task scheduling and distribution are handled via Ray’s built-in resource manager.YAML configuration fields:---------------------------pip: List of extra pip packages to install before running any tasks.-py_modules: List of additional Python module paths (directories or files) to include in the runtime environment.-concurrent: (bool) It determines task dispatch semantics: - Ifconcurrent: true, all tasks are scheduled as a batch. The script waits until sufficient resources are available for every task in the batch, then launches all tasks together. If resources are insufficient, all tasks remain blocked until the cluster can support the full batch. - Ifconcurrent: false, tasks are launched as soon as resources are available for each individual task, and Ray independently schedules them. This may result in non-simultaneous task start times.-tasks: List of task specifications, each with: -name: String identifier for the task. -py_args: Arguments to the Python interpreter (e.g., script/module, flags, user arguments). -num_gpus: Number of GPUs to allocate (float or string arithmetic, e.g., "22"). -num_cpus: Number of CPUs to allocate (float or string). -memory: Amount of RAM in bytes (int or string). -node(optional): Node placement constraints. -specific(str): Type of node placement, supporthostname,node_id, orany. -any: Place the task on any available node. -hostname: Place the task on a specific hostname.hostnamemust be specified in the node field. -node_id: Place the task on a specific node ID.node_idmust be specified in the node field. -hostname(str): Specific hostname to place the task on. -node_id(str): Specific node ID to place the task on.Typical usage:---------------.. code-block:: bash # Print help and argument details: python task_runner.py -h # Submit tasks defined in a YAML file to the Ray cluster (auto-detects Ray head address): python task_runner.py --task_cfg /path/to/tasks.yamlYAML configuration example-1:---------------------------.. code-block:: yaml pip: ["xxx"] py_modules: ["my_package/my_package"] concurrent: false tasks: - name: "Isaac-Cartpole-v0" py_args: "-m torch.distributed.run --nnodes=1 --nproc_per_node=2 --rdzv_endpoint=localhost:29501 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --max_iterations 200 --headless --distributed" num_gpus: 2 num_cpus: 10 memory: 10737418240 - name: "script need some dependencies" py_args: "script.py --option arg" num_gpus: 0 num_cpus: 1 memory: 10102410241024YAML configuration example-2:---------------------------.. code-block:: yaml pip: ["xxx"] py_modules: ["my_package/my_package"] concurrent: true tasks: - name: "Isaac-Cartpole-v0-multi-node-train-1" py_args: "-m torch.distributed.run --nproc_per_node=1 --nnodes=2 --node_rank=0 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=localhost:5555 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --headless --distributed --max_iterations 1000" num_gpus: 1 num_cpus: 10 memory: 10102410241024 node: specific: "hostname" hostname: "xxx" - name: "Isaac-Cartpole-v0-multi-node-train-2" py_args: "-m torch.distributed.run --nproc_per_node=1 --nnodes=2 --node_rank=1 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=x.x.x.x:5555 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --headless --distributed --max_iterations 1000" num_gpus: 1 num_cpus: 10 memory: 10102410241024 node: specific: "hostname" hostname: "xxx"To stop all tasks early, press Ctrl+C; the script will cancel all running Ray tasks."""import argparseimport yamlfrom datetime import datetimeimport util def parse_args() -> argparse.Namespace: """ Parse command-line arguments for the Ray task runner.
Returns: argparse.Namespace: The namespace containing parsed CLI arguments:
- task_cfg (str): Path to the YAML task file.
- ray_address (str): Ray cluster address.
- test (bool): Whether to run a GPU resource isolation sanity check. """ parser = argparse.ArgumentParser(description="Run tasks from a YAML config file.") parser.add_argument("--task_cfg", type=str, required=True, help="Path to the YAML task file.") parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.") parser.add_argument( "--test", action="store_true", help=( "Run nvidia-smi test instead of the arbitrary job," "can use as a sanity check prior to any jobs to check " "that GPU resources are correctly isolated." ), ) return parser.parse_args()
def parse_task_resource(task: dict) -> util.JobResource: """ Parse task resource requirements from the YAML configuration.
Args:
task (dict): Dictionary representing a single task's configuration.
Keys may include num_gpus, num_cpus, and memory, each either
as a number or evaluatable string expression.
Returns: util.JobResource: Resource object with the parsed values. """ resource = util.JobResource() if "num_gpus" in task: resource.num_gpus = eval(task["num_gpus"]) if isinstance(task["num_gpus"], str) else task["num_gpus"] if "num_cpus" in task: resource.num_cpus = eval(task["num_cpus"]) if isinstance(task["num_cpus"], str) else task["num_cpus"] if "memory" in task: resource.memory = eval(task["memory"]) if isinstance(task["memory"], str) else task["memory"] return resource
def run_tasks( tasks: list[dict], args: argparse.Namespace, runtime_env: dict | None = None, concurrent: bool = False ) -> None: """ Submit tasks to the Ray cluster for execution.
Args: tasks (list[dict]): A list of task configuration dictionaries. args (argparse.Namespace): Parsed command-line arguments. runtime_env (dict | None): Ray runtime environment configuration containing:
- pip (list[str] | None): Additional pip packages to install.
- py_modules (list[str] | None): Python modules to include in the environment. concurrent (bool): Whether to launch tasks simultaneously as a batch, or independently as resources become available.
Returns: None """ job_objs = [] util.ray_init(ray_address=args.ray_address, runtime_env=runtime_env, log_to_driver=False) for task in tasks: resource = parse_task_resource(task) print(f"[INFO] Creating job {task['name']} with resource={resource}") job = util.Job( name=task["name"], py_args=task["py_args"], resources=resource, node=util.JobNode( specific=task.get("node", {}).get("specific"), hostname=task.get("node", {}).get("hostname"), node_id=task.get("node", {}).get("node_id"), ), ) job_objs.append(job) start = datetime.now() print(f"[INFO] Creating {len(job_objs)} jobs at {start.strftime('%H:%M:%S.%f')} with runtime env={runtime_env}") # submit jobs util.submit_wrapped_jobs( jobs=job_objs, test_mode=args.test, concurrent=concurrent, ) end = datetime.now() print( f"[INFO] All jobs completed at {end.strftime('%H:%M:%S.%f')}, took {(end - start).total_seconds():.2f} seconds." )
def main() -> None: """ Main entry point for the Ray task runner script.
Reads the YAML task configuration file, parses CLI arguments, and dispatches tasks to the Ray cluster.
Returns: None """ args = parse_args() with open(args.task_cfg) as f: config = yaml.safe_load(f) tasks = config["tasks"] runtime_env = { "pip": None if not config.get("pip") else config["pip"], "py_modules": None if not config.get("py_modules") else config["py_modules"], } concurrent = config.get("concurrent", False) run_tasks( tasks=tasks, args=args, runtime_env=runtime_env, concurrent=concurrent, )
if name == "main": main()
The following script can be used to submit aggregate jobs to one or more Ray cluster(s), which can be used for running jobs on a remote cluster or simultaneous jobs with heterogeneous resource requirements.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
"""
This script submits aggregate job(s) to cluster(s) described in a
config file containing name: <NAME> address: http://<IP>:<PORT> on
a new line for each cluster. For KubeRay clusters, this file
can be automatically created with :file:grok_cluster_with_kubectl.py
Aggregate job(s) are matched with cluster(s) via the following relation:
cluster_line_index_submitted_to = job_index % total_cluster_countAggregate jobs are separated by the * delimiter. The --aggregate_jobs argument must bethe last argument supplied to the script.An aggregate job could be a :file:../tuner.py tuning job, which automaticallycreates several individual jobs when started on a cluster. Alternatively, an aggregate jobcould be a :file:'../wrap_resources.py resource-wrapped job,which may contain several individual sub-jobs separated bythe + delimiter. An aggregate job could also be a :file:../task_runner.py multi-task submission job,where each sub-job and its resource requirements are defined in a YAML configuration file.In this mode, :file:../task_runner.py` will read the YAML file (via --task_cfg), andsubmit all defined sub-tasks to the Ray cluster, supporting per-job resource specification andreal-time streaming of sub-job outputs.If there are more aggregate jobs than cluster(s), aggregate jobs will be submittedas clusters become available via the defined relation above. If there are less aggregate job(s)than clusters, some clusters will not receive aggregate job(s). The maximum number ofaggregate jobs that can be run simultaneously is equal to the number of workers created bydefault by a ThreadPoolExecutor on the machine submitting jobs due to fetching the log output afterjobs finish, which is unlikely to constrain overall-job submission.Usage:.. code-block:: bash # Example; submitting a tuning job python3 scripts/reinforcement_learning/ray/submit_job.py \ --aggregate_jobs /workspace/isaaclab/scripts/reinforcement_learning/ray/tuner.py \ --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \ --cfg_class CartpoleTheiaJobCfg --mlflow_uri <ML_FLOW_URI> # Example: Submitting resource wrapped job python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs wrap_resources.py --test # Example: submitting tasks with specific resources, and supporting pip packages and py_modules # You may use relative paths for task_cfg and py_modules, placing them in the scripts/reinforcement_learning/ray directory, which will be uploaded to the cluster. python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs task_runner.py --task_cfg tasks.yaml # For all command line arguments python3 scripts/reinforcement_learning/ray/submit_job.py -h"""import argparseimport osimport timefrom concurrent.futures import ThreadPoolExecutorfrom ray import job_submission
script_directory = os.path.dirname(os.path.abspath( file ))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}
def read_cluster_spec(fn: str | None = None) -> list[dict]: if fn is None: cluster_spec_path = os.path.expanduser("~/.cluster_config") else: cluster_spec_path = os.path.expanduser(fn)
if not os.path.exists(cluster_spec_path):
raise FileNotFoundError(f"Cluster spec file not found at {cluster_spec_path}")
clusters = []
with open(cluster_spec_path) as f:
for line in f:
parts = line.strip().split(" ")
http_address = parts[3]
cluster_info = {"name": parts[1], "address": http_address}
print(f"[INFO] Setting {cluster_info['name']}") # with {cluster_info['num_gpu']} GPUs.")
clusters.append(cluster_info)
return clusters
def submit_job(cluster: dict, job_command: str) -> None: """ Submits a job to a single cluster, prints the final result and Ray dashboard URL at the end. """ address = cluster["address"] cluster_name = cluster["name"] print(f"[INFO]: Submitting job to cluster '{cluster_name}' at {address}") # with {num_gpus} GPUs.") client = job_submission.JobSubmissionClient(address) runtime_env = {"working_dir": CONFIG["working_dir"], "executable": CONFIG["executable"]} print(f"[INFO]: Checking contents of the directory: {CONFIG['working_dir']}") try: dir_contents = os.listdir(CONFIG["working_dir"]) print(f"[INFO]: Directory contents: {dir_contents}") except Exception as e: print(f"[INFO]: Failed to list directory contents: {str(e)}") entrypoint = f"{CONFIG['executable']} {job_command}" print(f"[INFO]: Attempting entrypoint {entrypoint=} in cluster {cluster}") job_id = client.submit_job(entrypoint=entrypoint, runtime_env=runtime_env) status = client.get_job_status(job_id) while status in [job_submission.JobStatus.PENDING, job_submission.JobStatus.RUNNING]: time.sleep(5) status = client.get_job_status(job_id)
final_logs = client.get_job_logs(job_id)
print("----------------------------------------------------")
print(f"[INFO]: Cluster {cluster_name} Logs: \n")
print(final_logs)
print("----------------------------------------------------")
def submit_jobs_to_clusters(jobs: list[str], clusters: list[dict]) -> None: """ Submit all jobs to their respective clusters, cycling through clusters if there are more jobs than clusters. """ if not clusters: raise ValueError("No clusters available for job submission.")
if len(jobs) < len(clusters):
print("[INFO]: Less jobs than clusters, some clusters will not receive jobs")
elif len(jobs) == len(clusters):
print("[INFO]: Exactly one job per cluster")
else:
print("[INFO]: More jobs than clusters, jobs submitted as clusters become available.")
with ThreadPoolExecutor() as executor:
for idx, job_command in enumerate(jobs):
# Cycle through clusters using modulus to wrap around if there are more jobs than clusters
cluster = clusters[idx % len(clusters)]
executor.submit(submit_job, cluster, job_command)
if name == "main": parser = argparse.ArgumentParser(description="Submit multiple GPU jobs to multiple Ray clusters.") parser.add_argument("--config_file", default="~/.cluster_config", help="The cluster config path.") parser.add_argument( "--aggregate_jobs", type=str, nargs=argparse.REMAINDER, help="This should be last argument. The aggregate jobs to submit separated by the * delimiter.", ) args = parser.parse_args() if args.aggregate_jobs is not None: jobs = " ".join(args.aggregate_jobs) formatted_jobs = jobs.split("*") if len(formatted_jobs) > 1: print("Warning; Split jobs by cluster with the * delimiter") else: formatted_jobs = [] print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}") clusters = read_cluster_spec(args.config_file) submit_jobs_to_clusters(formatted_jobs, clusters)
The following script can be used to extract KubeRay cluster information for aggregate job submission.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
import argparse import os import re import subprocess import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed
"""This script requires that kubectl is installed and KubeRay was used to create the cluster.Creates a config file containing name: <NAME> address: http://<IP>:<PORT> ona new line for each cluster, and also fetches the MLFlow URI.Usage:.. code-block:: bash python3 scripts/reinforcement_learning/ray/grok_cluster_with_kubectl.py # For options, supply -h arg"""
def get_namespace() -> str: """Get the current Kubernetes namespace from the context, fallback to default if not set""" try: namespace = ( subprocess.check_output(["kubectl", "config", "view", "--minify", "--output", "jsonpath={..namespace}"]) .decode() .strip() ) if not namespace: namespace = "default" except subprocess.CalledProcessError: namespace = "default" return namespace
def get_pods(namespace: str = "default") -> list[tuple]: """Get a list of all of the pods in the namespace""" cmd = ["kubectl", "get", "pods", "-n", namespace, "--no-headers"] output = subprocess.check_output(cmd).decode() pods = [] for line in output.strip().split("\n"): fields = line.split() pod_name = fields[0] status = fields[2] pods.append((pod_name, status)) return pods
def get_clusters(pods: list, cluster_name_prefix: str) -> set: """ Get unique cluster name(s). Works for one or more clusters, based off of the number of head nodes. Excludes MLflow deployments. """ clusters = set() for pod_name, _ in pods: # Skip MLflow pods if "-mlflow" in pod_name: continue
match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
if match:
# Get base name without head/worker suffix (skip workers)
if "head" in pod_name:
base_name = match.group(1).split("-head")[0]
clusters.add(base_name)
return sorted(clusters)
def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") -> str: """ Get MLflow service information if it exists in the namespace with the given prefix. Only works for a single cluster instance. Args: namespace: Kubernetes namespace cluster_prefix: Base cluster name (without -head/-worker suffixes) Returns: MLflow service URL """ # Strip any -head or -worker suffixes to get base name if namespace is None: namespace = get_namespace() pods = get_pods(namespace=namespace) clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix) if len(clusters) > 1: raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.") mlflow_name = f"{cluster_prefix}-mlflow"
cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
try:
output = subprocess.check_output(cmd).decode()
fields = output.strip().split()
# Get cluster IP
cluster_ip = fields[2]
port = "5000" # Default MLflow port
# This needs to be http to be resolved. HTTPS can't be resolved
# This should be fine as it is on a subnet on the cluster regardless
return f"http://{cluster_ip}:{port}"
except subprocess.CalledProcessError as e:
raise ValueError(f"Could not grok MLflow: {e}") # Fixed f-string
def check_clusters_running(pods: list, clusters: set) -> bool: """ Check that all of the pods in all provided clusters are running.
Args: pods (list): A list of tuples where each tuple contains the pod name and its status. clusters (set): A set of cluster names to check.
Returns: bool: True if all pods in any of the clusters are running, False otherwise. """ clusters_running = False for cluster in clusters: cluster_pods = [p for p in pods if p[0].startswith(cluster)] total_pods = len(cluster_pods) running_pods = len([p for p in cluster_pods if p[1] == "Running"]) if running_pods == total_pods and running_pods > 0: clusters_running = True break return clusters_running
def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str: """ Given a cluster head pod, check its logs, which should include the ray address which can accept job requests.
Args: head_pod (str): The name of the head pod. namespace (str, optional): The Kubernetes namespace. Defaults to "default". ray_head_name (str, optional): The name of the ray head container. Defaults to "head".
Returns: str: The ray address if found, None otherwise.
Raises: ValueError: If the logs cannot be retrieved or the ray address is not found. """ cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace] try: output = subprocess.check_output(cmd).decode() except subprocess.CalledProcessError as e: raise ValueError( f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name." ) match = re.search(r"RAY_ADDRESS='([^']+)'", output) if match: return match.group(1) else: return None
def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str: """ For each cluster, check that it is running, and get the Ray head address that will accept jobs.
Args: cluster_info (dict): A dictionary containing cluster information with keys 'cluster', 'pods', and 'namespace'. ray_head_name (str, optional): The name of the ray head container. Defaults to "head".
Returns: str: A string containing the cluster name and its Ray head address, or an error message if the head pod or Ray address is not found. """ cluster, pods, namespace = cluster_info head_pod = None for pod_name, status in pods: if pod_name.startswith(cluster + "-head"): head_pod = pod_name break if not head_pod: return f"Error: Could not find head pod for cluster {cluster}\n"
# Get RAY_ADDRESS and status
ray_address = get_ray_address(head_pod, namespace=namespace, ray_head_name=ray_head_name)
if not ray_address:
return f"Error: Could not find RAY_ADDRESS for cluster {cluster}\n"
# Return only cluster and ray address
return f"name: {cluster} address: {ray_address}\n"
def main(): # Parse command-line arguments parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.") parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.") parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.") parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container") parser.add_argument( "--namespace", help="Kubernetes namespace to use. If not provided, will detect from current context." ) args = parser.parse_args()
# Get namespace from args or detect it
current_namespace = args.namespace if args.namespace else get_namespace()
print(f"Using namespace: {current_namespace}")
cluster_name_prefix = args.prefix
cluster_spec_file = os.path.expanduser(args.output)
# Get all pods
pods = get_pods(namespace=current_namespace)
# Get clusters
clusters = get_clusters(pods, cluster_name_prefix)
if not clusters:
print(f"No clusters found with prefix {cluster_name_prefix}")
return
# Wait for clusters to be running
while True:
pods = get_pods(namespace=current_namespace)
if check_clusters_running(pods, clusters):
break
print("Waiting for all clusters to spin up...")
time.sleep(5)
print("Checking for MLflow:")
# Check MLflow status for each cluster
for cluster in clusters:
try:
mlflow_address = get_mlflow_info(current_namespace, cluster)
print(f"MLflow address for {cluster}: {mlflow_address}")
except ValueError as e:
print(f"ML Flow not located: {e}")
print()
# Prepare cluster info for parallel processing
cluster_infos = []
for cluster in clusters:
cluster_pods = [p for p in pods if p[0].startswith(cluster)]
cluster_infos.append((cluster, cluster_pods, current_namespace))
# Use ThreadPoolExecutor to process clusters in parallel
results = []
results_lock = threading.Lock()
with ThreadPoolExecutor() as executor:
future_to_cluster = {
executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos
}
for future in as_completed(future_to_cluster):
cluster_name = future_to_cluster[future]
try:
result = future.result()
with results_lock:
results.append(result)
except Exception as exc:
print(f"{cluster_name} generated an exception: {exc}")
# Sort results alphabetically by cluster name
results.sort()
# Write sorted results to the output file (Ray info only)
with open(cluster_spec_file, "w") as f:
for result in results:
f.write(result)
print(f"Cluster spec information saved to {cluster_spec_file}")
# Display the contents of the config file
with open(cluster_spec_file) as f:
print(f.read())
if name == "main": main()
The following script can be used to easily create clusters on Google GKE.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
import argparse import pathlib import subprocess import yaml
import util from jinja2 import Environment, FileSystemLoader from kubernetes import config
"""This script helps create one or more KubeRay clusters.Usage:.. code-block:: bash # If the head node is stuck on container creating, make sure to create a secret python3 scripts/reinforcement_learning/ray/launch.py -h # Examples # The following creates 8 GPUx1 nvidia l4 workers python3 scripts/reinforcement_learning/ray/launch.py --cluster_host google_cloud \ --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \ --num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1 # The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers, # and 2 GPUx4 nvidia-tesla-t4 GPU workers python3 scripts/reinforcement_learning/ray/launch.py --cluster_host google_cloud \ --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \ --num_workers 1 2 --num_clusters 1 \ --worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4"""RAY_DIR = pathlib.Path( file ).parent
def apply_manifest(args: argparse.Namespace) -> None: """Provided a Jinja templated ray.io/v1alpha1 file, populate the arguments and create the cluster. Additionally, create kubernetes containers for resources separated by '---' from the rest of the file.
Args: args: Possible arguments concerning cluster parameters. """ # Load Kubernetes configuration config.load_kube_config()
# Set up Jinja2 environment for loading templates
templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
file_loader = FileSystemLoader(str(templates_dir))
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True, autoescape=True)
# Define template filename
template_file = "kuberay.yaml.jinja"
# Convert args namespace to a dictionary
template_params = vars(args)
# Load and render the template
template = jinja_env.get_template(template_file)
file_contents = template.render(template_params)
# Parse all YAML documents in the rendered template
all_yamls = []
for doc in yaml.safe_load_all(file_contents):
all_yamls.append(doc)
# Convert back to YAML string, preserving multiple documents
cleaned_yaml_string = ""
for i, doc in enumerate(all_yamls):
if i > 0:
cleaned_yaml_string += "\n---\n"
cleaned_yaml_string += yaml.dump(doc)
# Apply the Kubernetes manifest using kubectl
try:
print(cleaned_yaml_string)
subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
except subprocess.CalledProcessError as e:
exit(f"An error occurred while running `kubectl`: {e}")
def parse_args() -> argparse.Namespace: """ Parse command-line arguments for Kubernetes deployment script.
Returns: argparse.Namespace: Parsed command-line arguments. """ arg_parser = argparse.ArgumentParser( description="Script to apply manifests to create Kubernetes objects for Ray clusters.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, )
arg_parser.add_argument(
"--cluster_host",
type=str,
default="google_cloud",
choices=["google_cloud"],
help=(
"In the cluster_configs directory, the name of the folder where a tune.yaml.jinja"
"file exists defining the KubeRay config. Currently only google_cloud is supported."
),
)
arg_parser.add_argument(
"--name",
type=str,
required=False,
default="isaacray",
help="Name of the Kubernetes deployment.",
)
arg_parser.add_argument(
"--namespace",
type=str,
required=False,
default="default",
help="Kubernetes namespace to deploy the Ray cluster.",
)
arg_parser.add_argument(
"--service_acount_name", type=str, required=False, default="default", help="The service account name to use."
)
arg_parser.add_argument(
"--image",
type=str,
required=True,
help="Docker image for the Ray cluster pods.",
)
arg_parser.add_argument(
"--worker_accelerator",
nargs="+",
type=str,
default=["nvidia-l4"],
help="GPU accelerator name. Supply more than one for heterogeneous resources.",
)
arg_parser = util.add_resource_arguments(arg_parser, cluster_create_defaults=True)
arg_parser.add_argument(
"--num_clusters",
type=int,
default=1,
help="How many Ray Clusters to create.",
)
arg_parser.add_argument(
"--num_head_cpu",
type=float, # to be able to schedule partial CPU heads
default=8,
help="The number of CPUs to give the Ray head.",
)
arg_parser.add_argument("--head_ram_gb", type=int, default=8, help="How many gigs of ram to give the Ray head")
args = arg_parser.parse_args()
return util.fill_in_missing_resources(args, cluster_creation_flag=True)
def main(): args = parse_args()
if "head" in args.name:
raise ValueError("For compatibility with other scripts, do not include head in the name")
if args.num_clusters == 1:
apply_manifest(args)
else:
default_name = args.name
for i in range(args.num_clusters):
args.name = default_name + "-" + str(i)
apply_manifest(args)
if name == "main": main()
Docker-based Local Quickstart#
First, follow the Docker Guide to set up the NVIDIA Container Toolkit and Docker Compose.
Then, run the following steps to start a tuning run.
Build the base image, but we don't need to run it
python3 docker/container.py start && python3 docker/container.py stop
Build the tuning image with extra deps
docker build -t isaacray -f scripts/reinforcement_learning/ray/cluster_configs/Dockerfile .
Start the tuning image - symlink so that changes in the source folder show up in the container
docker run -v $(pwd)/source:/workspace/isaaclab/source -it --gpus all --net=host --entrypoint /bin/bash isaacray
Start the Ray server within the tuning image
echo "import ray; ray.init(); import time; [time.sleep(10) for _ in iter(int, 1)]" | ./isaaclab.sh -p
In a different terminal, run the following.
In a new terminal (don't close the above) , enter the image with a new shell.
docker container ps docker exec -it <ISAAC_RAY_IMAGE_ID_FROM_CONTAINER_PS> /bin/bash
Start a tuning run, with one parallel worker per GPU
./isaaclab.sh -p scripts/reinforcement_learning/ray/tuner.py
--cfg_file scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py
--cfg_class CartpoleTheiaJobCfg
--run_mode local
--workflow scripts/reinforcement_learning/rl_games/train.py
--num_workers_per_node <NUMBER_OF_GPUS_IN_COMPUTER>
To view the training logs, in a different terminal, run the following and visit localhost:6006 in a browser afterwards.
In a new terminal (don't close the above) , enter the image with a new shell.
docker container ps docker exec -it <ISAAC_RAY_IMAGE_ID_FROM_CONTAINER_PS> /bin/bash
Start a tuning run, with one parallel worker per GPU
tensorboard --logdir=.
Submitting resource-wrapped individual jobs instead of automatic tuning runs is described in the following file.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
"""
This script dispatches sub-job(s) (individual jobs, use :file:tuner.py for tuning jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.If the desired resources for each sub-job is specified,the maximum number of workers possible with the desired resources are created for each nodewith GPU(s) in the cluster. It is also possible to split available node resources for each nodeinto the desired number of workers with the --num_workers flag, to be able to easilyparallelize sub-jobs on multi-GPU nodes. Due to Isaac Lab requiring a GPU,this ignores all CPU only nodes such as loggers.Sub-jobs are matched with node(s) in a cluster via the following relation:sorted_nodes = Node sorted by descending GPUs, then descending CPUs, then descending RAM, then node IDnode_submitted_to = sorted_nodes[job_index % total_node_count]To check the ordering of sorted nodes, supply the --test argument and run the script.Sub-jobs are separated by the + delimiter. The --sub_jobs argument must be the lastargument supplied to the script.If there is more than one available worker, and more than one sub-job,sub-jobs will be executed in parallel. If there are more sub-jobs than workers, sub-jobs willbe dispatched to workers as they become available. There is no limit on the numberof sub-jobs that can be near-simultaneously submitted.This script is meant to be executed on a Ray cluster head node as an aggregate cluster job.To submit aggregate cluster jobs such as this script to one or more remote clusters,see :file:../submit_isaac_ray_job.py.KubeRay clusters on Google GKE can be created with :file:../launch.pyUsage:.. code-block:: bash # Ensure that sub-jobs are separated by the + delimiter. # Generic Templates----------------------------------- ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h # No resource isolation; no parallelization: ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --sub_jobs <JOB0>+<JOB1>+<JOB2> # Automatic Resource Isolation; Example A: needed for parallelization ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py \ --num_workers <NUM_TO_DIVIDE_TOTAL_RESOURCES_BY> \ --sub_jobs <JOB0>+<JOB1> # Manual Resource Isolation; Example B: needed for parallelization ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --num_cpu_per_worker <CPU> \ --gpu_per_worker <GPU> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1> # Manual Resource Isolation; Example C: Needed for parallelization, for heterogeneous workloads ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --num_cpu_per_worker <CPU> \ --gpu_per_worker <GPU1> <GPU2> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1> # to see all arguments ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h"""import argparseimport util
def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None: """ Provided a list of jobs, dispatch jobs to one worker per available node, unless otherwise specified by resource constraints.
Args: jobs: bash commands to execute on a Ray cluster args: The arguments for resource allocation
""" job_objs = [] util.ray_init( ray_address=args.ray_address, runtime_env={ "py_modules": None if not args.py_modules else args.py_modules, }, log_to_driver=False, ) gpu_node_resources = util.get_gpu_node_resources(include_id=True, include_gb_ram=True)
if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers:
raise ValueError("Either specify only num_workers or only granular resources(GPU,CPU,RAM_GB).")
num_nodes = len(gpu_node_resources)
# Populate arguments
formatted_node_resources = {
"gpu_per_worker": [gpu_node_resources[i]["GPU"] for i in range(num_nodes)],
"cpu_per_worker": [gpu_node_resources[i]["CPU"] for i in range(num_nodes)],
"ram_gb_per_worker": [gpu_node_resources[i]["ram_gb"] for i in range(num_nodes)],
"num_workers": args.num_workers, # By default, 1 worker por node
}
args = util.fill_in_missing_resources(args, resources=formatted_node_resources, policy=min)
print(f"[INFO]: Number of GPU nodes found: {num_nodes}")
if args.test:
jobs = ["nvidia-smi"] * num_nodes
for i, job in enumerate(jobs):
gpu_node = gpu_node_resources[i % num_nodes]
print(f"[INFO]: Creating job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
print(
f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}"
f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}"
)
print(f"[INFO] For the node parameters, creating {args.num_workers[i]} workers")
num_gpus = args.gpu_per_worker[i] / args.num_workers[i]
num_cpus = args.cpu_per_worker[i] / args.num_workers[i]
memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i]
job_objs.append(
util.Job(
cmd=job,
name=f"Job-{i + 1}",
resources=util.JobResource(num_gpus=num_gpus, num_cpus=num_cpus, memory=memory),
node=util.JobNode(
specific="node_id",
node_id=gpu_node["id"],
),
)
)
# submit jobs
util.submit_wrapped_jobs(jobs=job_objs, test_mode=args.test, concurrent=False)
if name == "main": parser = argparse.ArgumentParser(description="Submit multiple jobs with optional GPU testing.") parser = util.add_resource_arguments(arg_parser=parser) parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.") parser.add_argument( "--test", action="store_true", help=( "Run nvidia-smi test instead of the arbitrary job," "can use as a sanity check prior to any jobs to check " "that GPU resources are correctly isolated." ), ) parser.add_argument( "--py_modules", type=str, nargs="*", default=[], help=( "List of python modules or paths to add before running the job. Example: --py_modules my_package/my_package" ), ) parser.add_argument( "--sub_jobs", type=str, nargs=argparse.REMAINDER, help="This should be last wrapper argument. Jobs separated by the + delimiter to run on a cluster.", ) args = parser.parse_args() if args.sub_jobs is not None: jobs = " ".join(args.sub_jobs) formatted_jobs = jobs.split("+") else: formatted_jobs = [] print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}") wrap_resources_to_jobs(jobs=formatted_jobs, args=args)
The task_runner.py dispatches Python tasks to a Ray cluster via a single declarative YAML file. This approach allows users to specify additional pip packages and Python modules for each run. Fine-grained resource allocation is supported, with explicit control over the number of CPUs, GPUs, and memory assigned to each task. The runner also offers advanced scheduling capabilities: tasks can be restricted to specific nodes by hostname or node ID, and supports two launch modes: tasks can be executed independently as resources become available, or grouped into a simultaneous batch—ideal for multi-node training jobs—which ensures that all tasks launch together only when sufficient resources are available across the cluster.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
""" This script dispatches one or more user-defined Python tasks to workers in a Ray cluster. Each task, along with its resource requirements and execution parameters, is specified in a YAML configuration file. Users may define the number of CPUs, GPUs, and the amount of memory to allocate per task via the config file.
Key features:
- Fine-grained, per-task resource management via config fields (
num_gpus,num_cpus,memory).- Parallel execution of multiple tasks using available resources across the Ray cluster.- Option to specify node affinity for tasks, e.g., by hostname, node ID, or any node.- Optional batch (simultaneous) or independent scheduling of tasks.Task scheduling and distribution are handled via Ray’s built-in resource manager.YAML configuration fields:---------------------------pip: List of extra pip packages to install before running any tasks.-py_modules: List of additional Python module paths (directories or files) to include in the runtime environment.-concurrent: (bool) It determines task dispatch semantics: - Ifconcurrent: true, all tasks are scheduled as a batch. The script waits until sufficient resources are available for every task in the batch, then launches all tasks together. If resources are insufficient, all tasks remain blocked until the cluster can support the full batch. - Ifconcurrent: false, tasks are launched as soon as resources are available for each individual task, and Ray independently schedules them. This may result in non-simultaneous task start times.-tasks: List of task specifications, each with: -name: String identifier for the task. -py_args: Arguments to the Python interpreter (e.g., script/module, flags, user arguments). -num_gpus: Number of GPUs to allocate (float or string arithmetic, e.g., "22"). -num_cpus: Number of CPUs to allocate (float or string). -memory: Amount of RAM in bytes (int or string). -node(optional): Node placement constraints. -specific(str): Type of node placement, supporthostname,node_id, orany. -any: Place the task on any available node. -hostname: Place the task on a specific hostname.hostnamemust be specified in the node field. -node_id: Place the task on a specific node ID.node_idmust be specified in the node field. -hostname(str): Specific hostname to place the task on. -node_id(str): Specific node ID to place the task on.Typical usage:---------------.. code-block:: bash # Print help and argument details: python task_runner.py -h # Submit tasks defined in a YAML file to the Ray cluster (auto-detects Ray head address): python task_runner.py --task_cfg /path/to/tasks.yamlYAML configuration example-1:---------------------------.. code-block:: yaml pip: ["xxx"] py_modules: ["my_package/my_package"] concurrent: false tasks: - name: "Isaac-Cartpole-v0" py_args: "-m torch.distributed.run --nnodes=1 --nproc_per_node=2 --rdzv_endpoint=localhost:29501 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --max_iterations 200 --headless --distributed" num_gpus: 2 num_cpus: 10 memory: 10737418240 - name: "script need some dependencies" py_args: "script.py --option arg" num_gpus: 0 num_cpus: 1 memory: 10102410241024YAML configuration example-2:---------------------------.. code-block:: yaml pip: ["xxx"] py_modules: ["my_package/my_package"] concurrent: true tasks: - name: "Isaac-Cartpole-v0-multi-node-train-1" py_args: "-m torch.distributed.run --nproc_per_node=1 --nnodes=2 --node_rank=0 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=localhost:5555 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --headless --distributed --max_iterations 1000" num_gpus: 1 num_cpus: 10 memory: 10102410241024 node: specific: "hostname" hostname: "xxx" - name: "Isaac-Cartpole-v0-multi-node-train-2" py_args: "-m torch.distributed.run --nproc_per_node=1 --nnodes=2 --node_rank=1 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=x.x.x.x:5555 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --headless --distributed --max_iterations 1000" num_gpus: 1 num_cpus: 10 memory: 10102410241024 node: specific: "hostname" hostname: "xxx"To stop all tasks early, press Ctrl+C; the script will cancel all running Ray tasks."""import argparseimport yamlfrom datetime import datetimeimport util def parse_args() -> argparse.Namespace: """ Parse command-line arguments for the Ray task runner.
Returns: argparse.Namespace: The namespace containing parsed CLI arguments:
- task_cfg (str): Path to the YAML task file.
- ray_address (str): Ray cluster address.
- test (bool): Whether to run a GPU resource isolation sanity check. """ parser = argparse.ArgumentParser(description="Run tasks from a YAML config file.") parser.add_argument("--task_cfg", type=str, required=True, help="Path to the YAML task file.") parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.") parser.add_argument( "--test", action="store_true", help=( "Run nvidia-smi test instead of the arbitrary job," "can use as a sanity check prior to any jobs to check " "that GPU resources are correctly isolated." ), ) return parser.parse_args()
def parse_task_resource(task: dict) -> util.JobResource: """ Parse task resource requirements from the YAML configuration.
Args:
task (dict): Dictionary representing a single task's configuration.
Keys may include num_gpus, num_cpus, and memory, each either
as a number or evaluatable string expression.
Returns: util.JobResource: Resource object with the parsed values. """ resource = util.JobResource() if "num_gpus" in task: resource.num_gpus = eval(task["num_gpus"]) if isinstance(task["num_gpus"], str) else task["num_gpus"] if "num_cpus" in task: resource.num_cpus = eval(task["num_cpus"]) if isinstance(task["num_cpus"], str) else task["num_cpus"] if "memory" in task: resource.memory = eval(task["memory"]) if isinstance(task["memory"], str) else task["memory"] return resource
def run_tasks( tasks: list[dict], args: argparse.Namespace, runtime_env: dict | None = None, concurrent: bool = False ) -> None: """ Submit tasks to the Ray cluster for execution.
Args: tasks (list[dict]): A list of task configuration dictionaries. args (argparse.Namespace): Parsed command-line arguments. runtime_env (dict | None): Ray runtime environment configuration containing:
- pip (list[str] | None): Additional pip packages to install.
- py_modules (list[str] | None): Python modules to include in the environment. concurrent (bool): Whether to launch tasks simultaneously as a batch, or independently as resources become available.
Returns: None """ job_objs = [] util.ray_init(ray_address=args.ray_address, runtime_env=runtime_env, log_to_driver=False) for task in tasks: resource = parse_task_resource(task) print(f"[INFO] Creating job {task['name']} with resource={resource}") job = util.Job( name=task["name"], py_args=task["py_args"], resources=resource, node=util.JobNode( specific=task.get("node", {}).get("specific"), hostname=task.get("node", {}).get("hostname"), node_id=task.get("node", {}).get("node_id"), ), ) job_objs.append(job) start = datetime.now() print(f"[INFO] Creating {len(job_objs)} jobs at {start.strftime('%H:%M:%S.%f')} with runtime env={runtime_env}") # submit jobs util.submit_wrapped_jobs( jobs=job_objs, test_mode=args.test, concurrent=concurrent, ) end = datetime.now() print( f"[INFO] All jobs completed at {end.strftime('%H:%M:%S.%f')}, took {(end - start).total_seconds():.2f} seconds." )
def main() -> None: """ Main entry point for the Ray task runner script.
Reads the YAML task configuration file, parses CLI arguments, and dispatches tasks to the Ray cluster.
Returns: None """ args = parse_args() with open(args.task_cfg) as f: config = yaml.safe_load(f) tasks = config["tasks"] runtime_env = { "pip": None if not config.get("pip") else config["pip"], "py_modules": None if not config.get("py_modules") else config["py_modules"], } concurrent = config.get("concurrent", False) run_tasks( tasks=tasks, args=args, runtime_env=runtime_env, concurrent=concurrent, )
if name == "main": main()
To use this script, run a command similar to the following (replace tasks.yaml with your actual configuration file):
python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs task_runner.py --task_cfg tasks.yaml
For detailed instructions on how to write your tasks.yaml file, please refer to the comments in task_runner.py.
Tip: Place the tasks.yaml file in the scripts/reinforcement_learning/ray directory so that it is included when the working_dir is uploaded. You can then reference it using a relative path in the command.
Transferring files from the running container can be done as follows.
docker container ps docker cp <ISAAC_RAY_IMAGE_ID_FROM_CONTAINER_PS>:</path/in/container/file> </path/on/host/>
For tuning jobs, specify the tuning job / hyperparameter sweep as child class of JobCfg . The included JobCfg only supports the rl_games workflow due to differences in environment entrypoints and hydra arguments, although other workflows will work if provided a compatible JobCfg.
class JobCfg: """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable, at a minimum, the tune job should inherit from this class."""
def __init__ (self, cfg: dict):
""" Runner args include command line arguments passed to the task. For example: cfg["runner_args"]["headless_singleton"] = "--headless" cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras" """ assert "runner_args" in cfg, "No runner arguments specified." """ Task is the desired task to train on. For example: cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"]) """ assert "--task" in cfg["runner_args"], "No task specified." """ Hydra args define the hyperparameters varied within the sweep. For example: cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"]) """ assert "hydra_args" in cfg, "No hyperparameters specified." self.cfg = cfg
For example, see the following Cartpole Example configurations.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
import pathlib import sys
Allow for import of items from the ray workflow.
CUR_DIR = pathlib.Path( file ).parent UTIL_DIR = CUR_DIR.parent sys.path.extend([str(UTIL_DIR), str(CUR_DIR)]) import util import vision_cfg from ray import tune
class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"]) super(). init (cfg, vary_env_count=False, vary_cnn=False, vary_mlp=False)
class CartpoleRGBCNNOnlyJobCfg(vision_cfg.CameraJobCfg): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"]) super(). init (cfg, vary_env_count=False, vary_cnn=True, vary_mlp=False)
class CartpoleRGBJobCfg(vision_cfg.CameraJobCfg): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"]) super(). init (cfg, vary_env_count=True, vary_cnn=True, vary_mlp=True)
class CartpoleResNetJobCfg(vision_cfg.ResNetCameraJob): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-ResNet18-v0"]) super(). init (cfg)
class CartpoleTheiaJobCfg(vision_cfg.TheiaCameraJob): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"]) super(). init (cfg)
Remote Clusters#
Select one of the following methods to create a Ray cluster to accept and execute dispatched jobs.
KubeRay Setup#
If using KubeRay clusters on Google GKE with the batteries-included cluster launch file, the following dependencies are also needed.
python3 -p -m pip install kubernetes Jinja2
For use on Kubernetes clusters with KubeRay, such as Google Kubernetes Engine or Amazon Elastic Kubernetes Service, kubectl is required, and can be installed via the Kubernetes website .
Google Cloud is currently the only platform tested, although any cloud provider should work if one configures the following.
Attention
The ray command should be modified to use Isaac python, which could be achieved in a fashion similar to
sed -i "1i $(echo "#!/workspace/isaaclab/_isaac_sim/python.sh")" \
/isaac-sim/kit/python/bin/ray && ln -s /isaac-sim/kit/python/bin/ray /usr/local/bin/ray
.
-
An container registry (NGC, GCS artifact registry, AWS ECR, etc) with an Isaac Lab image configured to support Ray. See
cluster_configs/Dockerfileto see how to modify theisaac-lab-basecontainer for Ray compatibility. Ray should use the isaac sim python shebang, andnvidia-smishould work within the container. Be careful with the setup here as paths need to be configured correctly for everything to work. It’s likely that the example dockerfile will work out of the box and can be pushed to the registry, as long as the base image has already been built as in the container guide. -
A Kubernetes setup with available NVIDIA RTX (likely
l4orl40ortesla-t4ora10) GPU-passthrough node-pool resources, that has access to your container registry/storage bucket and has the Ray operator enabled with correct IAM permissions. This can be easily achieved with services such as Google GKE or AWS EKS, provided that your account or organization has been granted a GPU-budget. It is recommended to use manual kubernetes services as opposed to “autopilot” services for cost-effective experimentation as this way clusters can be completely shut down when not in use, although this may require installing the Nvidia GPU Operator . -
An MLFlow server that your cluster has access to (already included for Google Cloud, which can be referenced for the format and MLFlow integration).
-
A
kuberay.yaml.ninjafile that describes how to allocate resources (already included for Google Cloud, which can be referenced for the format and MLFlow integration).
Ray Clusters (Without Kubernetes) Setup#
Attention
Modify the Ray command to use Isaac Python like in KubeRay clusters, and follow the same steps for creating an image/cluster permissions.
See the Ray Clusters Overview or Anyscale for more information.
Also, create an MLFlow server that your local host and cluster have access to.
Shared Steps Between KubeRay and Pure Ray Part I#
1.) Install Ray on your local machine.
python3 -p -m pip install ray[default]==2.31.0
2.) Build the Isaac Ray image, and upload it to your container registry of choice.
Login with NGC (nvcr.io) registry first, see docker steps in repo.
python3 docker/container.py start
Build the special Isaac Lab Ray Image
docker build -t <REGISTRY/IMAGE_NAME> -f scripts/reinforcement_learning/ray/cluster_configs/Dockerfile .
Push the image to your registry of choice.
docker push <REGISTRY/IMAGE_NAME>
KubeRay Clusters Only#
k9s is a great tool for monitoring your clusters that can easily be installed with snap install k9s --devmode.
1.) Verify cluster access, and that the correct operators are installed.
Verify cluster access
kubectl cluster-info
If using a manually managed cluster (not Autopilot or the like)
verify that there are node pools
kubectl get nodes
Check that the ray operator is installed on the cluster
should list rayclusters.ray.io , rayjobs.ray.io , and rayservices.ray.io
kubectl get crds | grep ray
Check that the NVIDIA Driver Operator is installed on the cluster
should list clusterpolicies.nvidia.com
kubectl get crds | grep nvidia
2.) Create the KubeRay cluster and an MLFlow server for receiving logs that your cluster has access to. This can be done automatically for Google GKE, where instructions are included in the following creation file.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
import argparse import pathlib import subprocess import yaml
import util from jinja2 import Environment, FileSystemLoader from kubernetes import config
"""This script helps create one or more KubeRay clusters.Usage:.. code-block:: bash # If the head node is stuck on container creating, make sure to create a secret python3 scripts/reinforcement_learning/ray/launch.py -h # Examples # The following creates 8 GPUx1 nvidia l4 workers python3 scripts/reinforcement_learning/ray/launch.py --cluster_host google_cloud \ --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \ --num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1 # The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers, # and 2 GPUx4 nvidia-tesla-t4 GPU workers python3 scripts/reinforcement_learning/ray/launch.py --cluster_host google_cloud \ --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \ --num_workers 1 2 --num_clusters 1 \ --worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4"""RAY_DIR = pathlib.Path( file ).parent
def apply_manifest(args: argparse.Namespace) -> None: """Provided a Jinja templated ray.io/v1alpha1 file, populate the arguments and create the cluster. Additionally, create kubernetes containers for resources separated by '---' from the rest of the file.
Args: args: Possible arguments concerning cluster parameters. """ # Load Kubernetes configuration config.load_kube_config()
# Set up Jinja2 environment for loading templates
templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
file_loader = FileSystemLoader(str(templates_dir))
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True, autoescape=True)
# Define template filename
template_file = "kuberay.yaml.jinja"
# Convert args namespace to a dictionary
template_params = vars(args)
# Load and render the template
template = jinja_env.get_template(template_file)
file_contents = template.render(template_params)
# Parse all YAML documents in the rendered template
all_yamls = []
for doc in yaml.safe_load_all(file_contents):
all_yamls.append(doc)
# Convert back to YAML string, preserving multiple documents
cleaned_yaml_string = ""
for i, doc in enumerate(all_yamls):
if i > 0:
cleaned_yaml_string += "\n---\n"
cleaned_yaml_string += yaml.dump(doc)
# Apply the Kubernetes manifest using kubectl
try:
print(cleaned_yaml_string)
subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
except subprocess.CalledProcessError as e:
exit(f"An error occurred while running `kubectl`: {e}")
def parse_args() -> argparse.Namespace: """ Parse command-line arguments for Kubernetes deployment script.
Returns: argparse.Namespace: Parsed command-line arguments. """ arg_parser = argparse.ArgumentParser( description="Script to apply manifests to create Kubernetes objects for Ray clusters.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, )
arg_parser.add_argument(
"--cluster_host",
type=str,
default="google_cloud",
choices=["google_cloud"],
help=(
"In the cluster_configs directory, the name of the folder where a tune.yaml.jinja"
"file exists defining the KubeRay config. Currently only google_cloud is supported."
),
)
arg_parser.add_argument(
"--name",
type=str,
required=False,
default="isaacray",
help="Name of the Kubernetes deployment.",
)
arg_parser.add_argument(
"--namespace",
type=str,
required=False,
default="default",
help="Kubernetes namespace to deploy the Ray cluster.",
)
arg_parser.add_argument(
"--service_acount_name", type=str, required=False, default="default", help="The service account name to use."
)
arg_parser.add_argument(
"--image",
type=str,
required=True,
help="Docker image for the Ray cluster pods.",
)
arg_parser.add_argument(
"--worker_accelerator",
nargs="+",
type=str,
default=["nvidia-l4"],
help="GPU accelerator name. Supply more than one for heterogeneous resources.",
)
arg_parser = util.add_resource_arguments(arg_parser, cluster_create_defaults=True)
arg_parser.add_argument(
"--num_clusters",
type=int,
default=1,
help="How many Ray Clusters to create.",
)
arg_parser.add_argument(
"--num_head_cpu",
type=float, # to be able to schedule partial CPU heads
default=8,
help="The number of CPUs to give the Ray head.",
)
arg_parser.add_argument("--head_ram_gb", type=int, default=8, help="How many gigs of ram to give the Ray head")
args = arg_parser.parse_args()
return util.fill_in_missing_resources(args, cluster_creation_flag=True)
def main(): args = parse_args()
if "head" in args.name:
raise ValueError("For compatibility with other scripts, do not include head in the name")
if args.num_clusters == 1:
apply_manifest(args)
else:
default_name = args.name
for i in range(args.num_clusters):
args.name = default_name + "-" + str(i)
apply_manifest(args)
if name == "main": main()
For other cloud services, the kuberay.yaml.ninja will be similar to that of Google’s.
Jinja is used for templating here as full helm setup is excessive for application
apiVersion: ray.io/v1alpha1 kind: RayCluster metadata: name: {{ name }} namespace: {{ namespace }} spec: rayVersion: "2.8.0" enableInTreeAutoscaling: true autoscalerOptions: upscalingMode: Default idleTimeoutSeconds: 120 imagePullPolicy: Always securityContext: {} envFrom: []
headGroupSpec: rayStartParams: block: "true" dashboard-host: 0.0.0.0 dashboard-port: "8265" port: "6379" include-dashboard: "true" ray-debugger-external: "true" object-manager-port: "8076" num-gpus: "0" num-cpus: "0" # prevent scheduling jobs to the head node - workers only headService: apiVersion: v1 kind: Service metadata: name: {{ name }}-head spec: type: LoadBalancer template: metadata: labels: app.kubernetes.io/instance: tuner app.kubernetes.io/name: kuberay cloud.google.com/gke-ray-node-type: head spec: serviceAccountName: {{ service_account_name }} affinity: {} securityContext: fsGroup: 100 containers: - env: image: {{ image }} imagePullPolicy: Always name: head resources: limits: cpu: "{{ num_head_cpu }}" memory: {{ head_ram_gb }}G nvidia.com/gpu: "0" requests: cpu: "{{ num_head_cpu }}" memory: {{ head_ram_gb }}G nvidia.com/gpu: "0" securityContext: {} volumeMounts: - mountPath: /tmp/ray name: ray-logs command: ["/bin/bash", "-c", "ray start --head --port=6379 --object-manager-port=8076 --dashboard-host=0.0.0.0 --dashboard-port=8265 --include-dashboard=true && tail -f /dev/null"] - image: fluent/fluent-bit:1.9.6 name: fluentbit resources: limits: cpu: 100m memory: 128Mi requests: cpu: 100m memory: 128Mi volumeMounts: - mountPath: /tmp/ray name: ray-logs imagePullSecrets: [] nodeSelector: iam.gke.io/gke-metadata-server-enabled: "true" volumes: - configMap: name: fluentbit-config name: fluentbit-config - name: ray-logs emptyDir: {}
workerGroupSpecs: {% for it in range(gpu_per_worker|length) %} - groupName: "{{ worker_accelerator[it] }}x{{ gpu_per_worker[it] }}-cpu-{{ cpu_per_worker[it] }}-ram-gb-{{ ram_gb_per_worker[it] }}" replicas: {{ num_workers[it] }} maxReplicas: {{ num_workers[it] }} minReplicas: {{ num_workers[it] }} rayStartParams: block: "true" ray-debugger-external: "true" replicas: "{{num_workers[it]}}" template: metadata: annotations: {} labels: app.kubernetes.io/instance: tuner app.kubernetes.io/name: kuberay cloud.google.com/gke-ray-node-type: worker spec: serviceAccountName: {{ service_account_name }} affinity: {} securityContext: fsGroup: 100 containers: - env: - name: NVIDIA_VISIBLE_DEVICES value: "all" - name: NVIDIA_DRIVER_CAPABILITIES value: "compute,utility"
image: {{ image }}
imagePullPolicy: Always
name: ray-worker
resources:
limits:
cpu: "{{ cpu_per_worker[it] }}"
memory: {{ ram_gb_per_worker[it] }}G
nvidia.com/gpu: "{{ gpu_per_worker[it] }}"
requests:
cpu: "{{ cpu_per_worker[it] }}"
memory: {{ ram_gb_per_worker[it] }}G
nvidia.com/gpu: "{{ gpu_per_worker[it] }}"
securityContext: {}
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
command: ["/bin/bash", "-c", "ray start --address={{name}}-head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
- image: fluent/fluent-bit:1.9.6
name: fluentbit
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 100m
memory: 128Mi
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
imagePullSecrets: []
nodeSelector:
cloud.google.com/gke-accelerator: {{ worker_accelerator[it] }}
iam.gke.io/gke-metadata-server-enabled: "true"
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
volumes:
- configMap:
name: fluentbit-config
name: fluentbit-config
- name: ray-logs
emptyDir: {}
{% endfor %}
ML Flow Server - for fetching logs
apiVersion: apps/v1 kind: Deployment metadata: name: {{name}}-mlflow namespace: {{ namespace }} spec: replicas: 1 selector: matchLabels: app: mlflow template: metadata: labels: app: mlflow spec: containers: - name: mlflow image: ghcr.io/mlflow/mlflow:v2.9.2 ports: - containerPort: 5000 command: ["mlflow"] args: - server - --host=0.0.0.0 - --port=5000 - --backend-store-uri=sqlite:///mlflow.db
ML Flow Service (for port forwarding, kubectl port-forward service/{name}-mlflow 5000:5000)
apiVersion: v1 kind: Service metadata: name: {{name}}-mlflow namespace: {{ namespace }} spec: selector: app: mlflow ports:
- port: 5000 targetPort: 5000 type: ClusterIP
3.) Fetch the KubeRay cluster IP addresses, and the MLFLow Server IP. This can be done automatically for KubeRay clusters, where instructions are included in the following fetching file. The KubeRay clusters are saved to a file, but the MLFLow Server IP is printed.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
import argparse import os import re import subprocess import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed
"""This script requires that kubectl is installed and KubeRay was used to create the cluster.Creates a config file containing name: <NAME> address: http://<IP>:<PORT> ona new line for each cluster, and also fetches the MLFlow URI.Usage:.. code-block:: bash python3 scripts/reinforcement_learning/ray/grok_cluster_with_kubectl.py # For options, supply -h arg"""
def get_namespace() -> str: """Get the current Kubernetes namespace from the context, fallback to default if not set""" try: namespace = ( subprocess.check_output(["kubectl", "config", "view", "--minify", "--output", "jsonpath={..namespace}"]) .decode() .strip() ) if not namespace: namespace = "default" except subprocess.CalledProcessError: namespace = "default" return namespace
def get_pods(namespace: str = "default") -> list[tuple]: """Get a list of all of the pods in the namespace""" cmd = ["kubectl", "get", "pods", "-n", namespace, "--no-headers"] output = subprocess.check_output(cmd).decode() pods = [] for line in output.strip().split("\n"): fields = line.split() pod_name = fields[0] status = fields[2] pods.append((pod_name, status)) return pods
def get_clusters(pods: list, cluster_name_prefix: str) -> set: """ Get unique cluster name(s). Works for one or more clusters, based off of the number of head nodes. Excludes MLflow deployments. """ clusters = set() for pod_name, _ in pods: # Skip MLflow pods if "-mlflow" in pod_name: continue
match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
if match:
# Get base name without head/worker suffix (skip workers)
if "head" in pod_name:
base_name = match.group(1).split("-head")[0]
clusters.add(base_name)
return sorted(clusters)
def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") -> str: """ Get MLflow service information if it exists in the namespace with the given prefix. Only works for a single cluster instance. Args: namespace: Kubernetes namespace cluster_prefix: Base cluster name (without -head/-worker suffixes) Returns: MLflow service URL """ # Strip any -head or -worker suffixes to get base name if namespace is None: namespace = get_namespace() pods = get_pods(namespace=namespace) clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix) if len(clusters) > 1: raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.") mlflow_name = f"{cluster_prefix}-mlflow"
cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
try:
output = subprocess.check_output(cmd).decode()
fields = output.strip().split()
# Get cluster IP
cluster_ip = fields[2]
port = "5000" # Default MLflow port
# This needs to be http to be resolved. HTTPS can't be resolved
# This should be fine as it is on a subnet on the cluster regardless
return f"http://{cluster_ip}:{port}"
except subprocess.CalledProcessError as e:
raise ValueError(f"Could not grok MLflow: {e}") # Fixed f-string
def check_clusters_running(pods: list, clusters: set) -> bool: """ Check that all of the pods in all provided clusters are running.
Args: pods (list): A list of tuples where each tuple contains the pod name and its status. clusters (set): A set of cluster names to check.
Returns: bool: True if all pods in any of the clusters are running, False otherwise. """ clusters_running = False for cluster in clusters: cluster_pods = [p for p in pods if p[0].startswith(cluster)] total_pods = len(cluster_pods) running_pods = len([p for p in cluster_pods if p[1] == "Running"]) if running_pods == total_pods and running_pods > 0: clusters_running = True break return clusters_running
def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str: """ Given a cluster head pod, check its logs, which should include the ray address which can accept job requests.
Args: head_pod (str): The name of the head pod. namespace (str, optional): The Kubernetes namespace. Defaults to "default". ray_head_name (str, optional): The name of the ray head container. Defaults to "head".
Returns: str: The ray address if found, None otherwise.
Raises: ValueError: If the logs cannot be retrieved or the ray address is not found. """ cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace] try: output = subprocess.check_output(cmd).decode() except subprocess.CalledProcessError as e: raise ValueError( f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name." ) match = re.search(r"RAY_ADDRESS='([^']+)'", output) if match: return match.group(1) else: return None
def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str: """ For each cluster, check that it is running, and get the Ray head address that will accept jobs.
Args: cluster_info (dict): A dictionary containing cluster information with keys 'cluster', 'pods', and 'namespace'. ray_head_name (str, optional): The name of the ray head container. Defaults to "head".
Returns: str: A string containing the cluster name and its Ray head address, or an error message if the head pod or Ray address is not found. """ cluster, pods, namespace = cluster_info head_pod = None for pod_name, status in pods: if pod_name.startswith(cluster + "-head"): head_pod = pod_name break if not head_pod: return f"Error: Could not find head pod for cluster {cluster}\n"
# Get RAY_ADDRESS and status
ray_address = get_ray_address(head_pod, namespace=namespace, ray_head_name=ray_head_name)
if not ray_address:
return f"Error: Could not find RAY_ADDRESS for cluster {cluster}\n"
# Return only cluster and ray address
return f"name: {cluster} address: {ray_address}\n"
def main(): # Parse command-line arguments parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.") parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.") parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.") parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container") parser.add_argument( "--namespace", help="Kubernetes namespace to use. If not provided, will detect from current context." ) args = parser.parse_args()
# Get namespace from args or detect it
current_namespace = args.namespace if args.namespace else get_namespace()
print(f"Using namespace: {current_namespace}")
cluster_name_prefix = args.prefix
cluster_spec_file = os.path.expanduser(args.output)
# Get all pods
pods = get_pods(namespace=current_namespace)
# Get clusters
clusters = get_clusters(pods, cluster_name_prefix)
if not clusters:
print(f"No clusters found with prefix {cluster_name_prefix}")
return
# Wait for clusters to be running
while True:
pods = get_pods(namespace=current_namespace)
if check_clusters_running(pods, clusters):
break
print("Waiting for all clusters to spin up...")
time.sleep(5)
print("Checking for MLflow:")
# Check MLflow status for each cluster
for cluster in clusters:
try:
mlflow_address = get_mlflow_info(current_namespace, cluster)
print(f"MLflow address for {cluster}: {mlflow_address}")
except ValueError as e:
print(f"ML Flow not located: {e}")
print()
# Prepare cluster info for parallel processing
cluster_infos = []
for cluster in clusters:
cluster_pods = [p for p in pods if p[0].startswith(cluster)]
cluster_infos.append((cluster, cluster_pods, current_namespace))
# Use ThreadPoolExecutor to process clusters in parallel
results = []
results_lock = threading.Lock()
with ThreadPoolExecutor() as executor:
future_to_cluster = {
executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos
}
for future in as_completed(future_to_cluster):
cluster_name = future_to_cluster[future]
try:
result = future.result()
with results_lock:
results.append(result)
except Exception as exc:
print(f"{cluster_name} generated an exception: {exc}")
# Sort results alphabetically by cluster name
results.sort()
# Write sorted results to the output file (Ray info only)
with open(cluster_spec_file, "w") as f:
for result in results:
f.write(result)
print(f"Cluster spec information saved to {cluster_spec_file}")
# Display the contents of the config file
with open(cluster_spec_file) as f:
print(f.read())
if name == "main": main()
Ray Clusters Only (Without Kubernetes)#
1.) Verify cluster access.
2.) Create a ~/.cluster_config file, where name: <NAME> address: http://<IP>:<PORT> is on a new line for each unique cluster. For one cluster, there should only be one line in this file.
3.) Start an MLFLow Server to receive the logs that the ray cluster has access to, and determine the server URI.
Dispatching Steps Shared Between KubeRay and Pure Ray Part II#
1.) Test that your cluster is operational with the following.
Test that NVIDIA GPUs are visible and that Ray is operation with the following command:
python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs wrap_resources.py --test
2.) Submitting tuning and/or resource-wrapped jobs is described in the submit_job.py file.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
"""
This script submits aggregate job(s) to cluster(s) described in a
config file containing name: <NAME> address: http://<IP>:<PORT> on
a new line for each cluster. For KubeRay clusters, this file
can be automatically created with :file:grok_cluster_with_kubectl.py
Aggregate job(s) are matched with cluster(s) via the following relation:
cluster_line_index_submitted_to = job_index % total_cluster_countAggregate jobs are separated by the * delimiter. The --aggregate_jobs argument must bethe last argument supplied to the script.An aggregate job could be a :file:../tuner.py tuning job, which automaticallycreates several individual jobs when started on a cluster. Alternatively, an aggregate jobcould be a :file:'../wrap_resources.py resource-wrapped job,which may contain several individual sub-jobs separated bythe + delimiter. An aggregate job could also be a :file:../task_runner.py multi-task submission job,where each sub-job and its resource requirements are defined in a YAML configuration file.In this mode, :file:../task_runner.py` will read the YAML file (via --task_cfg), andsubmit all defined sub-tasks to the Ray cluster, supporting per-job resource specification andreal-time streaming of sub-job outputs.If there are more aggregate jobs than cluster(s), aggregate jobs will be submittedas clusters become available via the defined relation above. If there are less aggregate job(s)than clusters, some clusters will not receive aggregate job(s). The maximum number ofaggregate jobs that can be run simultaneously is equal to the number of workers created bydefault by a ThreadPoolExecutor on the machine submitting jobs due to fetching the log output afterjobs finish, which is unlikely to constrain overall-job submission.Usage:.. code-block:: bash # Example; submitting a tuning job python3 scripts/reinforcement_learning/ray/submit_job.py \ --aggregate_jobs /workspace/isaaclab/scripts/reinforcement_learning/ray/tuner.py \ --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \ --cfg_class CartpoleTheiaJobCfg --mlflow_uri <ML_FLOW_URI> # Example: Submitting resource wrapped job python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs wrap_resources.py --test # Example: submitting tasks with specific resources, and supporting pip packages and py_modules # You may use relative paths for task_cfg and py_modules, placing them in the scripts/reinforcement_learning/ray directory, which will be uploaded to the cluster. python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs task_runner.py --task_cfg tasks.yaml # For all command line arguments python3 scripts/reinforcement_learning/ray/submit_job.py -h"""import argparseimport osimport timefrom concurrent.futures import ThreadPoolExecutorfrom ray import job_submission
script_directory = os.path.dirname(os.path.abspath( file ))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}
def read_cluster_spec(fn: str | None = None) -> list[dict]: if fn is None: cluster_spec_path = os.path.expanduser("~/.cluster_config") else: cluster_spec_path = os.path.expanduser(fn)
if not os.path.exists(cluster_spec_path):
raise FileNotFoundError(f"Cluster spec file not found at {cluster_spec_path}")
clusters = []
with open(cluster_spec_path) as f:
for line in f:
parts = line.strip().split(" ")
http_address = parts[3]
cluster_info = {"name": parts[1], "address": http_address}
print(f"[INFO] Setting {cluster_info['name']}") # with {cluster_info['num_gpu']} GPUs.")
clusters.append(cluster_info)
return clusters
def submit_job(cluster: dict, job_command: str) -> None: """ Submits a job to a single cluster, prints the final result and Ray dashboard URL at the end. """ address = cluster["address"] cluster_name = cluster["name"] print(f"[INFO]: Submitting job to cluster '{cluster_name}' at {address}") # with {num_gpus} GPUs.") client = job_submission.JobSubmissionClient(address) runtime_env = {"working_dir": CONFIG["working_dir"], "executable": CONFIG["executable"]} print(f"[INFO]: Checking contents of the directory: {CONFIG['working_dir']}") try: dir_contents = os.listdir(CONFIG["working_dir"]) print(f"[INFO]: Directory contents: {dir_contents}") except Exception as e: print(f"[INFO]: Failed to list directory contents: {str(e)}") entrypoint = f"{CONFIG['executable']} {job_command}" print(f"[INFO]: Attempting entrypoint {entrypoint=} in cluster {cluster}") job_id = client.submit_job(entrypoint=entrypoint, runtime_env=runtime_env) status = client.get_job_status(job_id) while status in [job_submission.JobStatus.PENDING, job_submission.JobStatus.RUNNING]: time.sleep(5) status = client.get_job_status(job_id)
final_logs = client.get_job_logs(job_id)
print("----------------------------------------------------")
print(f"[INFO]: Cluster {cluster_name} Logs: \n")
print(final_logs)
print("----------------------------------------------------")
def submit_jobs_to_clusters(jobs: list[str], clusters: list[dict]) -> None: """ Submit all jobs to their respective clusters, cycling through clusters if there are more jobs than clusters. """ if not clusters: raise ValueError("No clusters available for job submission.")
if len(jobs) < len(clusters):
print("[INFO]: Less jobs than clusters, some clusters will not receive jobs")
elif len(jobs) == len(clusters):
print("[INFO]: Exactly one job per cluster")
else:
print("[INFO]: More jobs than clusters, jobs submitted as clusters become available.")
with ThreadPoolExecutor() as executor:
for idx, job_command in enumerate(jobs):
# Cycle through clusters using modulus to wrap around if there are more jobs than clusters
cluster = clusters[idx % len(clusters)]
executor.submit(submit_job, cluster, job_command)
if name == "main": parser = argparse.ArgumentParser(description="Submit multiple GPU jobs to multiple Ray clusters.") parser.add_argument("--config_file", default="~/.cluster_config", help="The cluster config path.") parser.add_argument( "--aggregate_jobs", type=str, nargs=argparse.REMAINDER, help="This should be last argument. The aggregate jobs to submit separated by the * delimiter.", ) args = parser.parse_args() if args.aggregate_jobs is not None: jobs = " ".join(args.aggregate_jobs) formatted_jobs = jobs.split("*") if len(formatted_jobs) > 1: print("Warning; Split jobs by cluster with the * delimiter") else: formatted_jobs = [] print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}") clusters = read_cluster_spec(args.config_file) submit_jobs_to_clusters(formatted_jobs, clusters)
3.) For tuning jobs, specify the tuning job / hyperparameter sweep as a JobCfg . The included JobCfg only supports the rl_games workflow due to differences in environment entrypoints and hydra arguments, although other workflows will work if provided a compatible JobCfg.
class JobCfg: """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable, at a minimum, the tune job should inherit from this class."""
def __init__ (self, cfg: dict):
""" Runner args include command line arguments passed to the task. For example: cfg["runner_args"]["headless_singleton"] = "--headless" cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras" """ assert "runner_args" in cfg, "No runner arguments specified." """ Task is the desired task to train on. For example: cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"]) """ assert "--task" in cfg["runner_args"], "No task specified." """ Hydra args define the hyperparameters varied within the sweep. For example: cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"]) """ assert "hydra_args" in cfg, "No hyperparameters specified." self.cfg = cfg
For example, see the following Cartpole Example configurations.
Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
import pathlib import sys
Allow for import of items from the ray workflow.
CUR_DIR = pathlib.Path( file ).parent UTIL_DIR = CUR_DIR.parent sys.path.extend([str(UTIL_DIR), str(CUR_DIR)]) import util import vision_cfg from ray import tune
class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"]) super(). init (cfg, vary_env_count=False, vary_cnn=False, vary_mlp=False)
class CartpoleRGBCNNOnlyJobCfg(vision_cfg.CameraJobCfg): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"]) super(). init (cfg, vary_env_count=False, vary_cnn=True, vary_mlp=False)
class CartpoleRGBJobCfg(vision_cfg.CameraJobCfg): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"]) super(). init (cfg, vary_env_count=True, vary_cnn=True, vary_mlp=True)
class CartpoleResNetJobCfg(vision_cfg.ResNetCameraJob): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-ResNet18-v0"]) super(). init (cfg)
class CartpoleTheiaJobCfg(vision_cfg.TheiaCameraJob): def init (self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"]) super(). init (cfg)
To view the tuning results, view the MLFlow dashboard of the server that you created. For KubeRay, this can be done through port forwarding the MLFlow dashboard with the following.
kubectl port-forward service/isaacray-mlflow 5000:5000
Then visit the following address in a browser.
localhost:5000
If the MLFlow port is forwarded like above, it can be converted into tensorboard logs with this following command.
./isaaclab.sh -p scripts/reinforcement_learning/ray/mlflow_to_local_tensorboard.py \
--uri http://localhost:5000 --experiment-name IsaacRay-<CLASS_JOB_CFG>-tune --download-dir test
Kubernetes Cluster Cleanup#
For the sake of conserving resources, and potentially freeing precious GPU resources for other people to use on shared compute platforms, please destroy the Ray cluster after use. They can be easily recreated! For KubeRay clusters, this can be done as follows.
kubectl get raycluster | egrep 'isaacray' | awk '{print $1}' | xargs kubectl delete raycluster && kubectl get deployments | egrep 'mlflow' | awk '{print $1}' | xargs kubectl delete deployment && kubectl get services | egrep 'mlflow' | awk '{print $1}' | xargs kubectl delete service && kubectl get services | egrep 'isaacray' | awk '{print $1}' | xargs kubectl delete service
Links/Buttons:
- #
- Ray
- independent community contributed walkthrough video
- Docker Guide
- Kubernetes website
- Nvidia GPU Operator
- MLFlow server
- Ray Clusters Overview
- Anyscale
- k9s