Skip to content

🧩 SbatchMan Internal API

If you wish, you can interact directly with SbatchMan internal API.

__all__ module-attribute

__all__ = ['SbatchManError', 'ProjectNotInitializedError', 'ProjectExistsError', 'get_cluster_name', 'get_max_queued_jobs', 'set_max_queued_jobs', 'Job', 'Status', 'init_project', 'SlurmConfig', 'PbsConfig', 'LocalConfig', 'create_local_config', 'create_slurm_config', 'create_pbs_config', 'create_configs_from_file', 'launch_jobs_from_file', 'launch_job', 'job_submit', 'jobs_list', 'jobs_df', 'count_active_jobs', 'archive_jobs', 'delete_jobs', 'archive_job', 'unarchive_job', 'update_jobs_status']

__version__ module-attribute

__version__ = version('sbatchman')

Job dataclass

Job(config_name: str, cluster_name: str, exp_dir: str, command: str, status: str, scheduler: str, tag: str, job_id: int, queued_timestamp: str, exitcode: Optional[int] = None, preprocess: Optional[str] = None, postprocess: Optional[str] = None, archive_name: Optional[str] = None, variables: Optional[dict[str, Any]] = None, start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None)

archive_name class-attribute instance-attribute

archive_name: Optional[str] = None

cluster_name instance-attribute

cluster_name: str

command instance-attribute

command: str

config_name instance-attribute

config_name: str

end_timestamp class-attribute instance-attribute

end_timestamp: Optional[str] = None

exitcode class-attribute instance-attribute

exitcode: Optional[int] = None

exp_dir instance-attribute

exp_dir: str

job_id instance-attribute

job_id: int

postprocess class-attribute instance-attribute

postprocess: Optional[str] = None

preprocess class-attribute instance-attribute

preprocess: Optional[str] = None

queued_timestamp instance-attribute

queued_timestamp: str

scheduler instance-attribute

scheduler: str

start_timestamp class-attribute instance-attribute

start_timestamp: Optional[str] = None

status instance-attribute

status: str

tag instance-attribute

tag: str

variables class-attribute instance-attribute

variables: Optional[dict[str, Any]] = None

get_job_base_path

get_job_base_path() -> Path
Source code in src/sbatchman/core/job.py
107
108
109
110
111
def get_job_base_path(self) -> Path:
  if self.archive_name:
    return get_archive_dir() / self.archive_name / self.exp_dir
  else:
    return get_experiments_dir() / self.exp_dir

get_job_config

get_job_config() -> BaseConfig

Returns the configuration of the job. It will specialize the class to either SlurmConfig, LocalConfig or PbsConfig

Source code in src/sbatchman/core/job.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def get_job_config(self) -> BaseConfig:
  """
  Returns the configuration of the job. It will specialize the class to either SlurmConfig, LocalConfig or PbsConfig
  """

  configs_file_path = get_project_configs_file_path()

  if not configs_file_path.exists():
    raise ConfigurationNotFoundError(f"Configuration '{configs_file_path}' for cluster '{self.cluster_name}' not found at '{configs_file_path}'.")

  configs = yaml.safe_load(open(configs_file_path, 'r'))
  if self.cluster_name not in configs:
    raise ConfigurationError(f"Could not find cluster '{self.cluster_name}' in configurations.yaml file ({configs_file_path})")

  scheduler = configs[self.cluster_name]['scheduler']
  configs = configs[self.cluster_name]['configs']
  if self.config_name not in configs:
    raise ConfigurationError(f"Could not find configuration '{self.config_name}' in configurations.yaml file ({configs_file_path})")

  config_dict = configs[self.config_name]
  config_dict['name'] = self.config_name
  config_dict['cluster_name'] = self.cluster_name
  if 'scheduler' in config_dict:
    del config_dict['scheduler']

  if scheduler == 'slurm':
    return SlurmConfig(**config_dict)
  elif scheduler == 'pbs':
    return PbsConfig(**config_dict)
  elif scheduler == 'local':
    return LocalConfig(**config_dict)
  else:
    raise ConfigurationError(f"No class found for scheduler '{scheduler}'. Supported schedulers are: slurm, pbs, local.")

get_job_script_path

get_job_script_path() -> Path
Source code in src/sbatchman/core/job.py
113
114
def get_job_script_path(self) -> Path:
    return self.get_job_base_path() / "run.sh"

get_metadata_path

get_metadata_path() -> Path

Returns the path to the metadata.yaml file for this job. If the job is archived, it will return the path in the archive directory. Otherwise, it returns the path in the active experiments directory.

Source code in src/sbatchman/core/job.py
142
143
144
145
146
147
148
def get_metadata_path(self) -> Path:
  """
  Returns the path to the metadata.yaml file for this job.
  If the job is archived, it will return the path in the archive directory.
  Otherwise, it returns the path in the active experiments directory.
  """
  return self.get_job_base_path() / "metadata.yaml"

get_run_time

get_run_time() -> Optional[float]

Returns the job runtime in seconds, or None if start or end time not available.

Source code in src/sbatchman/core/job.py
209
210
211
212
213
214
215
216
217
218
219
220
def get_run_time(self) -> Optional[float]:
  """
  Returns the job runtime in seconds, or None if start or end time not available.
  """
  if not self.start_timestamp or not self.end_timestamp:
    return None
  try:
    started = datetime.strptime(self.start_timestamp[:22], "%Y%m%d_%H%M%S.%f")
    ended = datetime.strptime(self.end_timestamp[:22], "%Y%m%d_%H%M%S.%f")
    return (ended - started).total_seconds()
  except ValueError:
    return None

get_stderr

get_stderr() -> Optional[str]

Returns the contents of the stderr log file for this job, or None if not found.

Source code in src/sbatchman/core/job.py
132
133
134
135
136
137
138
139
140
def get_stderr(self) -> Optional[str]:
  """
  Returns the contents of the stderr log file for this job, or None if not found.
  """
  stderr_path = self.get_stderr_path()
  if stderr_path.exists():
    with open(stderr_path, "r") as f:
      return f.read()
  return None

get_stderr_path

get_stderr_path() -> Path
Source code in src/sbatchman/core/job.py
129
130
def get_stderr_path(self) -> Path:
  return self.get_job_base_path() / "stderr.log"

get_stdout

get_stdout() -> Optional[str]

Returns the contents of the stdout log file for this job, or None if not found.

Source code in src/sbatchman/core/job.py
119
120
121
122
123
124
125
126
127
def get_stdout(self) -> Optional[str]:
  """
  Returns the contents of the stdout log file for this job, or None if not found.
  """
  stdout_path = self.get_stdout_path()
  if stdout_path.exists():
    with open(stdout_path, "r") as f:
      return f.read()
  return None

get_stdout_path

get_stdout_path() -> Path
Source code in src/sbatchman/core/job.py
116
117
def get_stdout_path(self) -> Path:
  return self.get_job_base_path() / "stdout.log"

get_time_in_queue

get_time_in_queue() -> Optional[float]

Returns the time spent in queue in seconds, or None if not queued or start time not available.

Source code in src/sbatchman/core/job.py
195
196
197
198
199
200
201
202
203
204
205
206
207
def get_time_in_queue(self) -> Optional[float]:
  """
  Returns the time spent in queue in seconds, or None if not queued or start time not available.
  """
  print(self)
  if not self.queued_timestamp or not self.start_timestamp:
    return None
  try:
    queued = datetime.strptime(self.queued_timestamp[:22], "%Y%m%d_%H%M%S.%f")
    started = datetime.strptime(self.start_timestamp[:22], "%Y%m%d_%H%M%S.%f")
    return (started - queued).total_seconds()
  except ValueError:
    return None

parse_command_args

parse_command_args() -> Union[tuple[None, None, None], tuple[str, List[Any], dict[Any, Any]]]

Parses the command string if it is a simple CLI command (no pipes, redirections, or shell operators). Returns (executable, args_dict, positional_args) where args_dict maps argument names to values, and positional_args is a list of positional arguments (not associated with any flag).

Source code in src/sbatchman/core/job.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def parse_command_args(self) -> Union[tuple[None, None, None], tuple[str, List[Any], dict[Any, Any]]]:
  """
  Parses the command string if it is a simple CLI command (no pipes, redirections, or shell operators).
  Returns (executable, args_dict, positional_args) where args_dict maps argument names to values,
  and positional_args is a list of positional arguments (not associated with any flag).
  """
  if any(op in self.command for op in ['|', '>', '<', ';', '&&', '||']):
    return None, None, None

  tokens = shlex.split(self.command)
  if not tokens:
    return None, None, None

  executable = tokens[0]
  args_dict = {}
  positional_args = []
  key = None
  for token in tokens[1:]:
    if token.startswith('--'):
      if '=' in token:
        k, v = token[2:].split('=', 1)
        args_dict[k] = v
        key = None
      else:
        key = token[2:]
        args_dict[key] = True
    elif token.startswith('-') and len(token) > 1:
      key = token[1:]
      args_dict[key] = True
    else:
      if key:
        args_dict[key] = token
        key = None
      else:
        positional_args.append(token)
  return executable, positional_args, args_dict

write_job_id

write_job_id()

Updates the job_id in the metadata.yaml file. This is used to update the job_id after the job has been submitted.

Source code in src/sbatchman/core/job.py
176
177
178
179
180
181
182
183
184
def write_job_id(self):
  """
  Updates the job_id in the metadata.yaml file.
  This is used to update the job_id after the job has been submitted.
  """
  path = self.get_metadata_path()

  if path.exists():
    subprocess.run(["perl", "-i", "-pe", f"s/^job_id: [0-9]*/job_id: {int(self.job_id)}/", str(path)], check=True)

write_job_status

write_job_status()

Updates the status in the metadata.yaml file.

Source code in src/sbatchman/core/job.py
186
187
188
189
190
191
192
193
def write_job_status(self):
  """
  Updates the status in the metadata.yaml file.
  """
  path = self.get_metadata_path()

  if path.exists():
    subprocess.run(["sed", "-i", f"/^status:/c\\status: {str(self.status)}", str(path)], check=True)

write_metadata

write_metadata(override_status=True)

Saves the current job state to its metadata.yaml file.

Source code in src/sbatchman/core/job.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def write_metadata(self, override_status=True):
  """Saves the current job state to its metadata.yaml file."""
  path = self.get_metadata_path()

  path.parent.mkdir(parents=True, exist_ok=True)
  job_dict = asdict(self)

  # If metadata file exists, preserve start_timestamp and end_timestamp
  VARS_TO_KEEP = ["start_timestamp", "end_timestamp"]
  if not override_status:
    VARS_TO_KEEP += ['status']
  if path.exists():
    with open(path, "r") as f:
      existing_data = yaml.safe_load(f) or {}
    for var in VARS_TO_KEEP:
      if var in existing_data and existing_data[var] is not None:
        job_dict[var] = existing_data[var]

  # Convert Path objects to strings for clean YAML representation
  for key, value in job_dict.items():
    if isinstance(value, Path) or isinstance(value, Status):
      job_dict[key] = str(value)

  with open(path, "w") as f:
    yaml.dump(job_dict, f, default_flow_style=False)

ProjectExistsError

ProjectExistsError(message='SbatchMan root present already. Enjoy using SbatchMan!')

Bases: SbatchManError

Raised when the SbatchMan root directory is already present.

Source code in src/sbatchman/exceptions.py
13
14
15
def __init__(self, message="SbatchMan root present already. Enjoy using SbatchMan!"):
  self.message = message
  super().__init__(self.message)

message instance-attribute

message = message

ProjectNotInitializedError

ProjectNotInitializedError(message="SbatchMan root not found. Please run 'sbatchman init' or specify a directory.")

Bases: SbatchManError

Raised when the SbatchMan root directory cannot be found.

Source code in src/sbatchman/exceptions.py
7
8
9
def __init__(self, message="SbatchMan root not found. Please run 'sbatchman init' or specify a directory."):
  self.message = message
  super().__init__(self.message)

message instance-attribute

message = message

SbatchManError

Bases: Exception

Base exception for SbatchMan.

Status

Bases: Enum

CANCELLED class-attribute instance-attribute

CANCELLED = 'CANCELLED'

COMPLETED class-attribute instance-attribute

COMPLETED = 'COMPLETED'

FAILED class-attribute instance-attribute

FAILED = 'FAILED'

FAILED_SUBMISSION class-attribute instance-attribute

FAILED_SUBMISSION = 'FAILED_SUBMISSION'

OTHER class-attribute instance-attribute

OTHER = 'OTHER'

QUEUED class-attribute instance-attribute

QUEUED = 'QUEUED'

RUNNING class-attribute instance-attribute

RUNNING = 'RUNNING'

SUBMITTING class-attribute instance-attribute

SUBMITTING = 'SUBMITTING'

TIMEOUT class-attribute instance-attribute

TIMEOUT = 'TIMEOUT'

UNKNOWN class-attribute instance-attribute

UNKNOWN = 'UNKNOWN'

archive_job

archive_job(job: Job, archive_name: str) -> None

Archives a single active job to the named archive, creating the archive if it does not already exist.

Parameters:

Name Type Description Default
job Job

The Job instance to archive.

required
archive_name str

Name of the destination archive. The directory / will be created if absent.

required

Raises:

Type Description
ValueError

If the job's experiment directory cannot be resolved.

FileNotFoundError

If the source experiment directory does not exist.

Source code in src/sbatchman/core/jobs_manager.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def archive_job(job: Job, archive_name: str) -> None:
  """
  Archives a single active job to the named archive, creating the archive if
  it does not already exist.

  Args:
      job:          The Job instance to archive.
      archive_name: Name of the destination archive.  The directory
                    <archive_root>/<archive_name> will be created if absent.

  Raises:
      ValueError:  If the job's experiment directory cannot be resolved.
      FileNotFoundError: If the source experiment directory does not exist.
  """
  if not job.exp_dir:
    raise ValueError(f"Job {job!r} has no exp_dir set.")

  exp_dir_root = get_experiments_dir()
  archive_root = get_archive_dir()

  source_job_dir = exp_dir_root / job.exp_dir
  if not source_job_dir.exists():
    raise FileNotFoundError(f"Source directory not found: {source_job_dir}")

  # Create the archive (or just the destination sub-directory) if needed.
  dest_job_dir = archive_root / archive_name / job.exp_dir
  dest_job_dir.parent.mkdir(parents=True, exist_ok=True)

  # Update metadata to record the archive name before moving.
  job.archive_name = archive_name
  shutil.move(str(source_job_dir), str(dest_job_dir))

  # Rewrite metadata in the new location so it stays consistent.
  job.write_metadata()

archive_jobs

archive_jobs(archive_name: str, overwrite: bool = False, cluster_name: Optional[str] = None, config_name: Optional[str] = None, tag: Optional[str] = None, status: Optional[List[Status]] = None) -> List[Job]

Archives jobs matching the filter criteria.

Source code in src/sbatchman/core/jobs_manager.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def archive_jobs(archive_name: str, overwrite: bool = False, cluster_name: Optional[str] = None, config_name: Optional[str] = None, tag: Optional[str] = None, status: Optional[List[Status]] = None) -> List[Job]:
  """
  Archives jobs matching the filter criteria.
  """
  archive_path = get_archive_dir() / archive_name
  if archive_path.exists():
    if overwrite:
      shutil.rmtree(archive_path)
    else:
      raise ArchiveExistsError(
        f"Archive '{archive_name}' already exists. Use --overwrite to replace it."
      )

  jobs_to_archive = jobs_list(from_archived=False, cluster_name=cluster_name, config_name=config_name, tag=tag, status=status)

  exp_dir_root = get_experiments_dir()

  for job in jobs_to_archive:
    source_job_dir = exp_dir_root / job.exp_dir
    if not source_job_dir.exists():
      continue

    # Update metadata before moving
    job.archive_name = archive_name

    # Move to archive
    dest_job_dir = archive_path / job.exp_dir
    dest_job_dir.parent.mkdir(parents=True, exist_ok=True)
    shutil.move(str(source_job_dir), str(dest_job_dir))

    # Rewrite metadata in new location
    job.write_metadata()

  return jobs_to_archive

count_active_jobs

count_active_jobs() -> int

Counts the number of jobs that are currently queued or running by querying squeue.

Returns:

Type Description
int

The number of jobs with QUEUED or RUNNING status.

Source code in src/sbatchman/core/jobs_manager.py
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def count_active_jobs() -> int:
  """
  Counts the number of jobs that are currently queued or running by querying squeue.

  Returns:
    The number of jobs with QUEUED or RUNNING status.
  """
  import subprocess

  try:
    result = subprocess.run(
      ["squeue", "--me", "-h", "-t", "PENDING,RUNNING"],
      capture_output=True,
      text=True,
      timeout=30
    )
    if result.returncode == 0:
      lines = [line for line in result.stdout.strip().split('\n') if line.strip()]
      return len(lines)
  except Exception:
    pass

  return 0

create_configs_from_file

create_configs_from_file(file_path: Path, overwrite: bool = False) -> List[BaseConfig]

Parses a YAML file to create a list of job configurations.

This function reads a YAML configuration file, processes variables, and generates a list of configuration objects.

The YAML file structure should be as follows: - An optional variables section at the root to define substitution variables. These can be single values, lists, or file paths with wildcards (glob patterns). - Cluster names as top-level keys. - Each cluster must define a scheduler (e.g., 'slurm'). - Each cluster can have a default_conf dictionary to specify common parameters for all jobs on that cluster. - Each cluster must have a configs list, where each item is a dictionary representing a job configuration.

The function expands configurations based on the variables used. If a configuration's name or parameters reference variables that are lists or expand from wildcards, it creates a Cartesian product of all possible variable combinations, generating a distinct configuration for each one.

Parameters:

Name Type Description Default
file_path Path

The path to the YAML configuration file.

required
overwrite bool

If True, indicates that existing configurations with the same name can be overwritten. Defaults to False.

False

Returns:

Type Description
List[BaseConfig]

List[BaseConfig]: A list of fully resolved configuration objects (e.g., SlurmConfig) created from the file.

Raises:

Type Description
ConfigurationError

If the file is not found, contains invalid YAML, or does not adhere to the required structure (e.g., missing 'scheduler' key, root is not a dictionary).

Source code in src/sbatchman/core/config_manager.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def create_configs_from_file(file_path: Path, overwrite: bool = False) -> List[BaseConfig]:
  """Parses a YAML file to create a list of job configurations.

  This function reads a YAML configuration file, processes variables, and
  generates a list of configuration objects.

  The YAML file structure should be as follows:
  - An optional `variables` section at the root to define substitution
    variables. These can be single values, lists, or file paths with
    wildcards (glob patterns).
  - Cluster names as top-level keys.
  - Each cluster must define a `scheduler` (e.g., 'slurm').
  - Each cluster can have a `default_conf` dictionary to specify common
    parameters for all jobs on that cluster.
  - Each cluster must have a `configs` list, where each item is a
    dictionary representing a job configuration.

  The function expands configurations based on the variables used. If a
  configuration's name or parameters reference variables that are lists
  or expand from wildcards, it creates a Cartesian product of all
  possible variable combinations, generating a distinct configuration for each one.

  Args:
    file_path (Path): The path to the YAML configuration file.
    overwrite (bool, optional): If True, indicates that existing
      configurations with the same name can be overwritten.
      Defaults to False.

  Returns:
    List[BaseConfig]: A list of fully resolved configuration objects
      (e.g., SlurmConfig) created from the file.

  Raises:
    ConfigurationError: If the file is not found, contains invalid YAML,
      or does not adhere to the required structure (e.g., missing
      'scheduler' key, root is not a dictionary).
  """
  created_configs = []

  try:
    with open(file_path, 'r') as f:
      data = yaml.safe_load(f)
  except FileNotFoundError:
    raise ConfigurationError(f"Configuration file not found at: {file_path}")
  except yaml.YAMLError as e:
    raise ConfigurationError(f"Error parsing YAML file: {e}")

  # Handle variables at the top level
  variables = data.pop("variables", {})
  expanded_vars = {k: _load_variable_values(v) for k, v in variables.items()}

  if not isinstance(data, dict):
    raise ConfigurationError("The root of the configuration file must be a dictionary of clusters.")

  for cluster_name, cluster_configs in data.items():
    scheduler = cluster_configs.get("scheduler")
    if not scheduler:
      raise ConfigurationError(f"Cluster '{cluster_name}' must have a 'scheduler' defined.")

    default_conf = cluster_configs.get("default_conf", {})
    configs = cluster_configs.get("configs", {})

    if not configs:
      continue

    for config_params in configs:
      config_name_template = config_params['name']
      # Find variables used in config name and params
      used_vars = set()
      used_vars |= _extract_used_vars(config_name_template)
      for v in config_params.values():
        used_vars |= _extract_used_vars(v) if isinstance(v, str) else set()
      # Only expand over variables that are actually used
      relevant_vars = {k: expanded_vars[k] for k in used_vars if k in expanded_vars}
      if relevant_vars:
        keys, values = zip(*relevant_vars.items())
        for combination in itertools.product(*values):
          var_dict = dict(zip(keys, combination))
          config_name = _substitute(config_name_template, var_dict)
          # Merge default params with specific config params, substituting variables
          final_params = deepcopy(default_conf)
          for k, v in config_params.items():
            if isinstance(v, (int, float, str)):
              final_params[k] = _substitute(v, var_dict)
            elif isinstance(v, list) and len(v) > 0:
              if final_params.get(k) and isinstance(final_params[k], list):
                for i in range(len(final_params[k])):
                  if isinstance(final_params[k][i], str):
                    final_params[k][i] = _substitute(final_params[k][i], var_dict)
              else:
                final_params[k] = []
              for lv in v:
                final_params[k].append(_substitute(lv, var_dict) if isinstance(lv, str) else lv)

          final_params["name"] = config_name
          final_params["cluster_name"] = cluster_name
          final_params["overwrite"] = overwrite
          created_configs.append(_create_config_from_params(scheduler, final_params))
      else:
        # No variables to expand
        final_params = default_conf.copy()
        final_params.update(config_params if config_params else {})
        final_params["name"] = config_name_template
        final_params["cluster_name"] = cluster_name
        final_params["overwrite"] = overwrite
        created_configs.append(_create_config_from_params(scheduler, final_params))

  return created_configs

create_local_config

create_local_config(name: str, cluster_name: Optional[str] = None, env: Optional[List[str]] = None, modules: Optional[List[str]] = None, time: Optional[str] = None, overwrite: bool = False) -> LocalConfig

Creates and saves a configuration file for local execution.

Parameters:

Name Type Description Default
name str

The name of the configuration.

required
cluster_name Optional[str]

The name of the cluster this configuration belongs to. Defaults to the system's hostname.

None
env Optional[List[str]]

A list of environment variables to set.

None
modules Optional[List[str]]

A list of modules to load in sbatch scripts before running commands.

None
time Optional[str]

Walltime (e.g., 01-00:00:00).

None
overwrite bool

If True, overwrite an existing configuration with the same name.

False

Returns:

Type Description
LocalConfig

The path to the newly created configuration file.

Source code in src/sbatchman/core/config_manager.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def create_local_config(
  name: str,
  cluster_name: Optional[str] = None,
  env: Optional[List[str]] = None,
  modules: Optional[List[str]] = None,
  time: Optional[str] = None,
  overwrite: bool = False,
) -> LocalConfig:
  """Creates and saves a configuration file for local execution.

  Args:
    name: The name of the configuration.
    cluster_name: The name of the cluster this configuration belongs to.
      Defaults to the system's hostname.
    env: A list of environment variables to set.
    modules: A list of modules to load in sbatch scripts before running commands.
    time: Walltime (e.g., 01-00:00:00).
    overwrite: If True, overwrite an existing configuration with the same name.

  Returns:
    The path to the newly created configuration file.
  """
  config = LocalConfig(name=name, cluster_name=cluster_name if cluster_name else get_cluster_name(), env=env, time=time, modules=modules)
  config.save_config(overwrite)
  return config

create_pbs_config

create_pbs_config(name: str, cluster_name: Optional[str] = None, queue: Optional[str] = None, cpus: Optional[int] = None, mem: Optional[str] = None, walltime: Optional[str] = None, env: Optional[List[str]] = None, custom_headers: Optional[List[str]] = None, overwrite: bool = False) -> PbsConfig

Creates and saves a PBS configuration file.

Parameters:

Name Type Description Default
name str

The name of the configuration.

required
cluster_name Optional[str]

The name of the cluster this configuration belongs to. Defaults to the system's hostname.

None
queue Optional[str]

The PBS queue to submit the job to.

None
cpus Optional[int]

The number of CPUs to request.

None
mem Optional[str]

The amount of memory to request (e.g., "16gb", "100mb").

None
walltime Optional[str]

The maximum wall time for the job (e.g., "24:00:00").

None
env Optional[List[str]]

A list of environment variables to set.

None
overwrite bool

If True, overwrite an existing configuration with the same name.

False
custom_headers Optional[List[str]]

Custom scheduler headers (e.g., ['#SBATCH --my_header=my_value'])

None

Returns:

Type Description
PbsConfig

The path to the newly created configuration file.

Source code in src/sbatchman/core/config_manager.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def create_pbs_config(
  name: str,
  cluster_name: Optional[str] = None,
  queue: Optional[str] = None,
  cpus: Optional[int] = None,
  mem: Optional[str] = None,
  walltime: Optional[str] = None,
  env: Optional[List[str]] = None,
  custom_headers: Optional[List[str]] = None,
  overwrite: bool = False,
) -> PbsConfig:
  """Creates and saves a PBS configuration file.

  Args:
    name: The name of the configuration.
    cluster_name: The name of the cluster this configuration belongs to.
      Defaults to the system's hostname.
    queue: The PBS queue to submit the job to.
    cpus: The number of CPUs to request.
    mem: The amount of memory to request (e.g., "16gb", "100mb").
    walltime: The maximum wall time for the job (e.g., "24:00:00").
    env: A list of environment variables to set.
    overwrite: If True, overwrite an existing configuration with the same name.
    custom_headers: Custom scheduler headers (e.g., ['#SBATCH --my_header=my_value'])

  Returns:
    The path to the newly created configuration file.
  """
  config = PbsConfig(
    name=name, cluster_name=cluster_name if cluster_name else get_cluster_name(), queue=queue, cpus=cpus, mem=mem, walltime=walltime, env=env, custom_headers=custom_headers,
  )
  config.save_config(overwrite)
  return config

create_slurm_config

create_slurm_config(name: str, cluster_name: Optional[str] = None, partition: Optional[str] = None, nodes: Optional[str] = None, ntasks: Optional[str] = None, tasks_per_node: Optional[int] = None, cpus_per_task: Optional[int] = None, mem: Optional[str] = None, account: Optional[str] = None, time: Optional[str] = None, gpus: Optional[int] = None, constraint: Optional[str] = None, nodelist: Optional[Union[str, List[str]]] = None, exclude: Optional[List[str]] = None, qos: Optional[str] = None, reservation: Optional[str] = None, exclusive: Optional[bool] = False, modules: Optional[List[str]] = None, env: Optional[List[str]] = None, custom_headers: Optional[List[str]] = None, overwrite: bool = False) -> SlurmConfig

Creates and saves a SLURM configuration file.

Parameters:

Name Type Description Default
name str

The name of the configuration.

required
cluster_name Optional[str]

The name of the cluster this configuration belongs to. Defaults to the system's hostname.

None
partition Optional[str]

The SLURM partition (queue) to submit the job to.

None
nodes Optional[str]

The number of nodes to request.

None
ntasks Optional[str]

The number of tasks to run.

None
tasks_per_node Optional[int]

The number of tasks per node.

None
cpus_per_task Optional[int]

The number of CPUs to request per task.

None
mem Optional[str]

The amount of memory to request (e.g., "16G", "100M").

None
account Optional[str]

The account to charge for the job.

None
time Optional[str]

The maximum wall time for the job (e.g., "01-00:00:00").

None
gpus Optional[int]

The number of GPUs to request.

None
constraint Optional[str]

Specific features required for the job's nodes.

None
nodelist Optional[Union[str, List[str]]]

A specific list of nodes to use (either a string or a list of strings to concatenate using "," as separator).

None
exclude Optional[List[str]]

A specific list of nodes NOT to use.

None
qos Optional[str]

The Quality of Service for the job.

None
reservation Optional[str]

The reservation to use for the job.

None
exclusive Optional[bool]

Enables the --exclusive flag (may not work on some clusters).

False
modules Optional[List[str]]

Modules to load with module load.

None
env Optional[List[str]]

A list of environment variables to set.

None
overwrite bool

If True, overwrite an existing configuration with the same name.

False
custom_headers Optional[List[str]]

Custom scheduler headers (e.g., ['#SBATCH --my_header=my_value'])

None

Returns:

Type Description
SlurmConfig

The path to the newly created configuration file.

Source code in src/sbatchman/core/config_manager.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def create_slurm_config(
  name: str,
  cluster_name: Optional[str] = None,
  partition: Optional[str] = None,
  nodes: Optional[str] = None,
  ntasks: Optional[str] = None,
  tasks_per_node: Optional[int] = None,
  cpus_per_task: Optional[int] = None,
  mem: Optional[str] = None,
  account: Optional[str] = None,
  time: Optional[str] = None,
  gpus: Optional[int] = None,
  constraint: Optional[str] = None,
  nodelist: Optional[Union[str,List[str]]] = None,
  exclude: Optional[List[str]] = None,
  qos: Optional[str] = None,
  reservation: Optional[str] = None,
  exclusive: Optional[bool] = False,
  modules: Optional[List[str]] = None,
  env: Optional[List[str]] = None,
  custom_headers: Optional[List[str]] = None,
  overwrite: bool = False,
) -> SlurmConfig:
  """Creates and saves a SLURM configuration file.

  Args:
    name: The name of the configuration.
    cluster_name: The name of the cluster this configuration belongs to.
      Defaults to the system's hostname.
    partition: The SLURM partition (queue) to submit the job to.
    nodes: The number of nodes to request.
    ntasks: The number of tasks to run.
    tasks_per_node: The number of tasks per node.
    cpus_per_task: The number of CPUs to request per task.
    mem: The amount of memory to request (e.g., "16G", "100M").
    account: The account to charge for the job.
    time: The maximum wall time for the job (e.g., "01-00:00:00").
    gpus: The number of GPUs to request.
    constraint: Specific features required for the job's nodes.
    nodelist: A specific list of nodes to use (either a string or a list of strings to concatenate using "," as separator).
    exclude: A specific list of nodes NOT to use.
    qos: The Quality of Service for the job.
    reservation: The reservation to use for the job.
    exclusive: Enables the --exclusive flag (may not work on some clusters).
    modules: Modules to load with `module load`.
    env: A list of environment variables to set.
    overwrite: If True, overwrite an existing configuration with the same name.
    custom_headers: Custom scheduler headers (e.g., ['#SBATCH --my_header=my_value'])

  Returns:
    The path to the newly created configuration file.
  """
  config = SlurmConfig(
    name=name, cluster_name=cluster_name if cluster_name else get_cluster_name(), 
    partition=partition, nodes=nodes, ntasks=ntasks, tasks_per_node=tasks_per_node, cpus_per_task=cpus_per_task, mem=mem, account=account,
    time=time, gpus=str(gpus), constraint=constraint, nodelist=nodelist, exclude=exclude, qos=qos, reservation=reservation, exclusive=exclusive,
    modules=modules, env=env, custom_headers=custom_headers,
  )
  config.save_config(overwrite)
  return config

delete_jobs

delete_jobs(cluster_name: Optional[str] = None, config_name: Optional[str] = None, tag: Optional[str] = None, id: Optional[int] = None, archive_name: Optional[str] = None, archived: bool = False, not_archived: bool = False, status: Optional[List[Status]] = None, variables: Optional[Dict[str, Any]] = None) -> int

Deletes jobs matching the filter criteria.

Parameters:

Name Type Description Default
cluster_name Optional[str]

Filter by cluster name.

None
config_name Optional[str]

Filter by configuration name.

None
tag Optional[str]

Filter by tag.

None
id Optional[int]

Filter by id (only one will be deleted).

None
archive_name Optional[str]

If provided, only delete jobs from this archive.

None
archived bool

If True, delete only archived jobs.

False
not_archived bool

If True, delete only active jobs.

False
status Optional[List[Status]]

Filter jobs by status.

None
variables Optional[Dict[str, Any]]

Filter jobs by variable values.

None

Returns:

Type Description
int

The number of deleted jobs.

Source code in src/sbatchman/core/jobs_manager.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def delete_jobs(
  cluster_name: Optional[str] = None,
  config_name: Optional[str] = None,
  tag: Optional[str] = None,
  id: Optional[int] = None,
  archive_name: Optional[str] = None,
  archived: bool = False,
  not_archived: bool = False,
  status: Optional[List[Status]] = None,
  variables: Optional[Dict[str, Any]] = None
) -> int:
  """
  Deletes jobs matching the filter criteria.

  Args:
    cluster_name: Filter by cluster name.
    config_name: Filter by configuration name.
    tag: Filter by tag.
    id: Filter by id (only one will be deleted).
    archive_name: If provided, only delete jobs from this archive.
    archived: If True, delete only archived jobs.
    not_archived: If True, delete only active jobs.
    status: Filter jobs by status.
    variables: Filter jobs by variable values.

  Returns:
    The number of deleted jobs.
  """
  jobs_to_delete = jobs_list(
    cluster_name=cluster_name,
    config_name=config_name,
    tag=tag,
    archive_name=archive_name,
    from_active=not_archived,
    from_archived=archived,
    status=status,
    update_jobs=False,
    variables=variables
  )

  # Delete at most one by id
  if id:
    jobs_to_delete = [j for j in jobs_to_delete if int(j.job_id) == int(id)]

  if not jobs_to_delete:
    return 0

  exp_dir_root = get_experiments_dir()
  archive_root = get_archive_dir()

  for job in jobs_to_delete:
    if job.archive_name:
      job_dir = archive_root / job.archive_name / job.exp_dir
    else:
      job_dir = exp_dir_root / job.exp_dir


    if job_dir.exists():
      shutil.rmtree(job_dir)

    # Recursively delete empty parent directories
    parent_dir = job_dir.parent
    stop_dir = None
    if job.archive_name:
      stop_dir = archive_root
    else:
      stop_dir = exp_dir_root

    try:
      while parent_dir.is_dir() and not any(parent_dir.iterdir()) and parent_dir != stop_dir:
        shutil.rmtree(parent_dir)
        parent_dir = parent_dir.parent
    except FileNotFoundError:
      # This can happen in concurrent scenarios, it's safe to ignore.
      pass

  return len(jobs_to_delete)

job_submit

job_submit(job: Job, force: bool = False, previous_job_id: Optional[int] = None, ignore_archived: bool = False, ignore_conf_in_dup_check: bool = False, ignore_commands_in_dup_check: bool = False)
Source code in src/sbatchman/core/launcher.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def job_submit(
  job: Job,
  force: bool = False,
  previous_job_id: Optional[int] = None,
  ignore_archived: bool = False,
  ignore_conf_in_dup_check: bool = False,
  ignore_commands_in_dup_check: bool = False,
):
  try:
    config_cluster_name = get_cluster_name()
    if job.cluster_name is None: # Use global cluster name if not provided
      job.cluster_name = config_cluster_name
    elif job.cluster_name != config_cluster_name: # Mismatch in cluster names
      raise JobSubmitError(
        f"Cluster name '{job.cluster_name}' does not match the globally set cluster name '{config_cluster_name}'. "
        "You may be running jobs meant for a different cluster. If you want to change this cluster name, use 'sbatchman set-cluster-name <cluster_name>' to set a new global default."
      )
  except ClusterNameNotSetError:
    if not job.cluster_name:
      raise ConfigurationError(
        "Cluster name not specified and not set globally. "
        "Please provide '--cluster-name' or use 'sbatchman set-cluster-name <cluster_name>' to set a global default."
      )

  scheduler = get_scheduler_from_cluster_name(job.cluster_name)

  config_path = get_project_config_dir() / job.cluster_name / f"{job.config_name}.sh"
  if not config_path.exists():
    raise ConfigurationNotFoundError(f"Configuration '{job.config_name}' for cluster '{job.cluster_name}' not found at '{config_path}'.")
  template_script = open(config_path, "r").read()

  j_exists, where = job_exists(
    job.command,
    job.config_name,
    job.cluster_name,
    job.tag,
    job.preprocess,
    job.postprocess,
    ignore_archived=ignore_archived,
    ignore_conf_in_dup_check=ignore_conf_in_dup_check,
    ignore_commands_in_dup_check=ignore_commands_in_dup_check,
  )
  if j_exists and not force:
    raise JobExistsError(
      f"An identical job already exists{'' if where == 'active' else '(in some archive)'} for config '{job.config_name}'{'(ignored)'if ignore_conf_in_dup_check else ''} with tag '{job.tag}'. " +
      ("\nUse '--force' to submit it anyway." if where == 'active' else "\nUse '--ignore-archived or -ia' to ignore archived jobs.")
    )

  # Capture the Current Working Directory at the time of launch
  submission_cwd = Path.cwd()

  # 2. Create a unique, nested directory for this experiment run
  timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
  # Directory structure: <cluster_name>/<config_name>/<tag>/<timestamp>
  # Find a directory name that has not been used yet
  base_exp_dir_local = Path(job.cluster_name) / job.config_name / job.tag / timestamp
  exp_dir_local = base_exp_dir_local
  exp_dir = get_experiments_dir() / exp_dir_local
  counter = 1
  while exp_dir.exists():
    exp_dir_local = base_exp_dir_local.with_name(f"{base_exp_dir_local.name}_{counter}")
    exp_dir = get_experiments_dir() / exp_dir_local
    counter += 1
  exp_dir.mkdir(parents=True, exist_ok=False)

  # 3. Prepare the final runnable script
  # Replace placeholders for log and CWD
  final_script_content = template_script.replace(
    "{JOB_NAME}", f'{job.tag}-{job.config_name}'
  ).replace(
    "{EXP_DIR}", str(exp_dir.resolve())
  ).replace(
    "{CWD}", str(submission_cwd.resolve())
  ).replace(
    "{PREPROCESS}", str(job.preprocess) if job.preprocess is not None else ''
  ).replace(
    "{CMD}", str(job.command)
  ).replace(
    "{POSTPROCESS}", str(job.postprocess) if job.postprocess is not None else ''
  )

  run_script_path = exp_dir / "run.sh"
  with open(run_script_path, "w") as f:
    f.write(final_script_content)
  run_script_path.chmod(0o755)

  queued_ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S.%f")
  job = Job(
    config_name=job.config_name,
    cluster_name=job.cluster_name,
    exp_dir=str(exp_dir_local),
    command=job.command,
    status=Status.SUBMITTING.value,
    scheduler=scheduler,
    job_id=0,
    tag=job.tag,
    archive_name=None,
    preprocess=job.preprocess,
    postprocess=job.postprocess,
    variables=job.variables if job.variables is not None else {},
    queued_timestamp=queued_ts,
  )

  job.write_metadata()

  # Register in cache to speed up subsequent checks in the same process
  register_job(job)

  try:
    # 5. Submit the job using the scheduler's own logic
    if scheduler == 'slurm':
      job.job_id = slurm_submit(run_script_path, exp_dir, previous_job_id)
    elif scheduler == 'pbs':
      job.job_id = pbs_submit(run_script_path, exp_dir, previous_job_id)
    elif scheduler == 'local':
      console.print(f"✅ Submitting job with command '[bold cyan]{job.command}[/bold cyan]'.")
      config = load_local_config(job.config_name)
      if config is None:
        raise ConfigurationError(f'Couldn\'t find configuration `{job.config_name}`')
      job.job_id, timed_out = config.local_submit(run_script_path, exp_dir)
      if timed_out:
        job.status = Status.TIMEOUT.value
        job.end_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S.%f")
        job.write_job_status()
    else:
      raise JobSubmitError(f"No submission class found for scheduler '{scheduler}'. Supported schedulers are: slurm, pbs, local.")

    job.write_metadata(override_status=False)

  except (ValueError, FileNotFoundError) as e:
    job.status = Status.FAILED_SUBMISSION.value
    job.end_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S.%f")
    job.write_metadata()
    err_str = "Failed to submit job. Error: " + str(e)
    with open(job.get_stderr_path(), 'w+') as err_file:
      err_file.write(err_str)
    raise JobSubmitError(err_str) from e
  except subprocess.CalledProcessError as e:
    job.status = Status.FAILED_SUBMISSION.value
    job.end_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S.%f")
    job.write_metadata()
    err_str = f"Job submission failed with error code {e.returncode}.\nOutput stream:\n" + e.output + "\nError stream:\n" + e.stderr if e.stderr else ""
    with open(job.get_stderr_path(), 'w+') as err_file:
      err_file.write(err_str)
    raise JobSubmitError(err_str) from e

jobs_df

jobs_df(cluster_name: Optional[str] = None, config_name: Optional[str] = None, tag: Optional[str] = None, include_archived: bool = False) -> DataFrame

Returns a pandas DataFrame of jobs, with optional filtering. Args: cluster_name: Filter by cluster name. config_name: Filter by configuration name. tag: Filter by tag. include_archived: If True, include archived jobs in the DataFrame. Returns: A pandas DataFrame containing job metadata.

Source code in src/sbatchman/core/jobs_manager.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def jobs_df(
  cluster_name: Optional[str] = None,
  config_name: Optional[str] = None,
  tag: Optional[str] = None,
  include_archived: bool = False
) -> pd.DataFrame:
  """
  Returns a pandas DataFrame of jobs, with optional filtering.
  Args:
    cluster_name: Filter by cluster name.
    config_name: Filter by configuration name.
    tag: Filter by tag.
    include_archived: If True, include archived jobs in the DataFrame.
  Returns:
    A pandas DataFrame containing job metadata.
  """
  jobs = jobs_list(
    cluster_name=cluster_name,
    config_name=config_name,
    tag=tag,
    from_archived=include_archived
  )
  jobs_dicts = [job.__dict__ for job in jobs]
  return pd.DataFrame(jobs_dicts)

jobs_list

jobs_list(cluster_name: Optional[str] = None, config_name: Optional[str] = None, tag: Optional[str] = None, status: Optional[List[Status]] = None, archive_name: Optional[str] = None, from_active: bool = True, from_archived: bool = False, update_jobs: bool = True, variables: Optional[Dict[str, Any]] = None) -> List[Job]

Lists active and/or archived jobs, with optional filtering. Updates the status of active jobs by default. Args: cluster_name: Filter by cluster name. config_name: Filter by configuration name. tag: Filter by tag. status: Filter by a set of Status. archive_name: If provided, only include jobs from this archive. from_active: If True, include active jobs. from_archived: If True, include archived jobs. update_jobs: If True, update the status of active jobs before listing. variables: Filter by variable values. Returns: A list of Job objects matching the filter criteria. Raises: ArchiveExistsError: If an archive with the specified name already exists and overwrite is False.

Source code in src/sbatchman/core/jobs_manager.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def jobs_list(
  cluster_name: Optional[str] = None,
  config_name: Optional[str] = None,
  tag: Optional[str] = None,
  status: Optional[List[Status]] = None,
  archive_name: Optional[str] = None,
  from_active: bool = True,
  from_archived: bool = False,
  update_jobs: bool = True,
  variables: Optional[Dict[str, Any]] = None
) -> List[Job]:
  """
  Lists active and/or archived jobs, with optional filtering. Updates the status of active jobs by default.
  Args:
    cluster_name: Filter by cluster name.
    config_name: Filter by configuration name.
    tag: Filter by tag.
    status: Filter by a set of Status.
    archive_name: If provided, only include jobs from this archive.
    from_active: If True, include active jobs.
    from_archived: If True, include archived jobs.
    update_jobs: If True, update the status of active jobs before listing.
    variables: Filter by variable values.
  Returns:
    A list of Job objects matching the filter criteria.
  Raises:
    ArchiveExistsError: If an archive with the specified name already exists and overwrite is False.
  """
  jobs = []
  exp_dir = get_experiments_dir()

  if update_jobs:
    update_jobs_status()

  paths_to_process = []

  # Scan active jobs
  if from_active:
    exp_dir = get_experiments_dir()

    # Construct a more specific glob pattern if filters are available
    # Structure: cluster/config/tag/timestamp/metadata.yaml
    # We use fixed depth to avoid scanning subdirectories (which is very slow)
    # parts = [
    #     cluster_name or "*",
    #     config_name or "*",
    #     tag or "*",
    #     "*", # timestamp
    #     "metadata.yaml"
    # ]
    # glob_pattern = "/".join(parts)

    # for metadata_path in exp_dir.glob(glob_pattern):
    #   # Apply filters based on path structure BEFORE reading file
    #   # Active: .../cluster/config/tag/timestamp/metadata.yaml (parts[-5] is cluster)
    #   if cluster_name and not fnmatch.fnmatch(metadata_path.parts[-5], cluster_name):
    #       continue
    #   if config_name and not fnmatch.fnmatch(metadata_path.parts[-4], config_name):
    #       continue
    #   if tag and not fnmatch.fnmatch(metadata_path.parts[-3], tag):
    #       continue
    #   paths_to_process.append(metadata_path)

    # Optimized scanning: iterate directory levels manually to avoid full glob scan
    # Level 1: Cluster
    clusters = _get_matching_subdirs(exp_dir, cluster_name)
    for cluster_dir in clusters:

        # Level 2: Config
        configs = _get_matching_subdirs(cluster_dir, config_name)
        for config_dir in configs:

            # Level 3: Tag
            tags = _get_matching_subdirs(config_dir, tag)
            for tag_dir in tags:

                # Level 4: Timestamp (always scan all timestamps)
                try:
                    with os.scandir(tag_dir) as it:
                        for entry in it:
                            if entry.is_dir():
                                # Optimistically assume metadata.yaml exists to avoid stat() call
                                paths_to_process.append(Path(entry.path) / "metadata.yaml")
                except OSError:
                    continue

  # Scan archived jobs
  if from_archived:
    archive_root = get_archive_dir()

    # Construct a more specific glob pattern if filters are available
    # Archive structure: archive_name/cluster_name/config_name/tag/timestamp/metadata.yaml
    # parts = [
    #     archive_name or "*",
    #     cluster_name or "*",
    #     config_name or "*",
    #     tag or "*",
    #     "*", # timestamp
    #     "metadata.yaml"
    # ]
    # glob_pattern = "/".join(parts)

    # for metadata_path in archive_root.glob(glob_pattern):
    #   # Apply filters based on path structure BEFORE reading file
    #   # Archive: .../archive_name/cluster/config/tag/timestamp/metadata.yaml (parts[-6] is archive_name)
    #   if cluster_name and not fnmatch.fnmatch(metadata_path.parts[-5], cluster_name):
    #       continue
    #   if config_name and not fnmatch.fnmatch(metadata_path.parts[-4], config_name):
    #       continue
    #   if tag and not fnmatch.fnmatch(metadata_path.parts[-3], tag):
    #       continue
    #   if archive_name and not fnmatch.fnmatch(metadata_path.parts[-6], archive_name):
    #       continue
    #   paths_to_process.append(metadata_path)

    # Optimized scanning for archives
    archives = _get_matching_subdirs(archive_root, archive_name)
    for archive_dir in archives:

        # Level 1: Cluster
        clusters = _get_matching_subdirs(archive_dir, cluster_name)
        for cluster_dir in clusters:

            # Level 2: Config
            configs = _get_matching_subdirs(cluster_dir, config_name)
            for config_dir in configs:

                # Level 3: Tag
                tags = _get_matching_subdirs(config_dir, tag)
                for tag_dir in tags:

                    # Level 4: Timestamp
                    try:
                        with os.scandir(tag_dir) as it:
                            for entry in it:
                                if entry.is_dir():
                                    # Optimistically assume metadata.yaml exists to avoid stat() call
                                    paths_to_process.append(Path(entry.path) / "metadata.yaml")
                    except OSError:
                        continue

  # Use a higher number of workers for I/O bound tasks
  max_workers = min(100, len(paths_to_process) + 1)
  with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
      future_to_path = {executor.submit(_load_job_metadata, p, variables): p for p in paths_to_process}
      for future in concurrent.futures.as_completed(future_to_path):
          job = future.result()
          if job:
              jobs.append(job)

  if status:
    status = [s.value if isinstance(s, Status) else str(s) for s in status]
    jobs = [j for j in jobs if str(j.status) in status]

  return jobs

launch_job

launch_job(config_name: str, command: str, cluster_name: Optional[str] = None, tag: str = 'notag', preprocess: Optional[str] = None, postprocess: Optional[str] = None, force: bool = False, previous_job_id: Optional[int] = None, variables: Optional[Dict[str, Any]] = None, dry_run: bool = False, max_queued_jobs: Optional[int] = None, ignore_archived: bool = False, ignore_conf_in_dup_check: bool = False, ignore_commands_in_dup_check: bool = False) -> Job

Launches an experiment based on a configuration name. Args: config_name: The name of the configuration to use. command: The command to run for this job. cluster_name: Optional; if not provided, will use the global cluster name. tag: A tag for this experiment run, used in directory structure. preprocess: Optional; a command to run before the main command. postprocess: Optional; a command to run after the main command. previous_job_id: Optional; if this is set, the job will be only launched after the previous is done. max_queued_jobs: Optional; if set, will wait before submitting if the queue has this many jobs. Returns: A Job object representing the launched job. Raises: ConfigurationError: If there is a mismatch in cluster names or if the cluster name is not set. ClusterNameNotSetError: If the cluster name is not set globally and not provided. ConfigurationNotFoundError: If the configuration file does not exist. JobSubmitError: If there is an error during job submission.

Source code in src/sbatchman/core/launcher.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def launch_job(
  config_name: str,
  command: str,
  cluster_name: Optional[str] = None,
  tag: str = "notag",
  preprocess: Optional[str] = None,
  postprocess: Optional[str] = None,
  force: bool = False,
  previous_job_id: Optional[int] = None,
  variables: Optional[Dict[str, Any]] = None,
  dry_run: bool = False,
  max_queued_jobs: Optional[int] = None,
  ignore_archived: bool = False,
  ignore_conf_in_dup_check: bool = False,
  ignore_commands_in_dup_check: bool = False,
) -> Job:
  """
  Launches an experiment based on a configuration name.
  Args:
    config_name: The name of the configuration to use.
    command: The command to run for this job.
    cluster_name: Optional; if not provided, will use the global cluster name.
    tag: A tag for this experiment run, used in directory structure.
    preprocess: Optional; a command to run before the main command.
    postprocess: Optional; a command to run after the main command.
    previous_job_id: Optional; if this is set, the job will be only launched after the previous is done.
    max_queued_jobs: Optional; if set, will wait before submitting if the queue has this many jobs.
  Returns:
    A Job object representing the launched job.
  Raises:
    ConfigurationError: If there is a mismatch in cluster names or if the cluster name is not set.
    ClusterNameNotSetError: If the cluster name is not set globally and not provided.
    ConfigurationNotFoundError: If the configuration file does not exist.
    JobSubmitError: If there is an error during job submission.
  """

  try:
    config_cluster_name = get_cluster_name()
    if cluster_name is None: # Use global cluster name if not provided
      cluster_name = config_cluster_name
    elif cluster_name != config_cluster_name: # Mismatch in cluster names
      raise JobSubmitError(
        f"Cluster name '{cluster_name}' does not match the globally set cluster name '{config_cluster_name}'. "
        "You may be running jobs meant for a different cluster. If you want to change this cluster name, use 'sbatchman set-cluster-name <cluster_name>' to set a new global default."
      )
  except ClusterNameNotSetError:
    if not cluster_name:
      raise ConfigurationError(
        "Cluster name not specified and not set globally. "
        "Please provide '--cluster-name' or use 'sbatchman set-cluster-name <cluster_name>' to set a global default."
      )

  scheduler = get_scheduler_from_cluster_name(cluster_name)

  config_path = get_project_config_dir() / cluster_name / f"{config_name}.sh"
  if not config_path.exists():
    raise ConfigurationNotFoundError(f"Configuration '{config_name}' for cluster '{cluster_name}' not found at '{config_path}'.")
  template_script = open(config_path, "r").read()

  # Wait for queue slot if limit is configured (skip for dry runs)
  if not dry_run:
    wait_for_queue_slot(max_queued_jobs)

  if not force:
    j_exists, where = job_exists(
      command,
      config_name,
      cluster_name,
      tag,
      preprocess,
      postprocess, 
      ignore_archived=ignore_archived,
      ignore_conf_in_dup_check=ignore_conf_in_dup_check,
      ignore_commands_in_dup_check=ignore_commands_in_dup_check,
    )
    if j_exists:
      raise JobExistsError(
        f"An identical job already exists{'' if where == 'active' else '(in some archive)'} for config '{config_name}'{'(ignored)'if ignore_conf_in_dup_check else ''} with tag '{tag}'. " +
        ("\nUse '--force' to submit it anyway." if where == 'active' else "\nUse '--ignore-archived or -ia' to ignore archived jobs.")
      )

  # Capture the Current Working Directory at the time of launch
  submission_cwd = Path.cwd()

  # 2. Create a unique, nested directory for this experiment run
  timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
  # Directory structure: <cluster_name>/<config_name>/<tag>/<timestamp>
  # Find a directory name that has not been used yet
  base_exp_dir_local = Path(cluster_name) / config_name / tag / timestamp
  exp_dir_local = base_exp_dir_local
  exp_dir = get_experiments_dir() / exp_dir_local
  counter = 1
  while exp_dir.exists():
    exp_dir_local = base_exp_dir_local.with_name(f"{base_exp_dir_local.name}_{counter}")
    exp_dir = get_experiments_dir() / exp_dir_local
    counter += 1
  if not dry_run:
    exp_dir.mkdir(parents=True, exist_ok=False)

  # 3. Prepare the final runnable script
  # Replace placeholders for log and CWD
  final_script_content = template_script.replace(
    "{JOB_NAME}", f'{tag}-{config_name}'
  ).replace(
    "{EXP_DIR}", str(exp_dir.resolve())
  ).replace(
    "{CWD}", str(submission_cwd.resolve())
  ).replace(
    "{PREPROCESS}", str(preprocess) if preprocess is not None else ''
  ).replace(
    "{CMD}", str(command)
  ).replace(
    "{POSTPROCESS}", str(postprocess) if postprocess is not None else ''
  )

  if not dry_run:
    run_script_path = exp_dir / "run.sh"
    with open(run_script_path, "w") as f:
      f.write(final_script_content)
    run_script_path.chmod(0o755)

  queued_ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S.%f")
  job_vars = {}
  if variables is not None:
    for k, v in variables.items():
      if isinstance(v, tuple):
        job_vars[k] = v[0]
        job_vars[f'{k}_filename'] = v[1]
      else:
        job_vars[k] = v
  job = Job(
    config_name=config_name,
    cluster_name=cluster_name,
    exp_dir=str(exp_dir_local),
    command=command,
    status=Status.SUBMITTING.value,
    scheduler=scheduler,
    job_id=0,
    tag=tag,
    archive_name=None,
    preprocess=preprocess,
    postprocess=postprocess,
    variables=job_vars,
    queued_timestamp=queued_ts,
  )

  if dry_run:
    return job

  job.write_metadata()

  try:
    # 5. Submit the job using the scheduler's own logic
    if scheduler == 'slurm':
      job.job_id = slurm_submit(run_script_path, exp_dir, previous_job_id)
    elif scheduler == 'pbs':
      job.job_id = pbs_submit(run_script_path, exp_dir, previous_job_id)
    elif scheduler == 'local':
      console.print(f"✅ Submitting job with command '[bold cyan]{job.command}[/bold cyan]'.")
      config = load_local_config(config_name)
      if config is None:
        raise ConfigurationError(f'Couldn\'t find configuration `{config_name}`')
      job.job_id, timed_out = config.local_submit(run_script_path, exp_dir)
      if timed_out:
        job.status = Status.TIMEOUT.value
        job.end_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S.%f")
        job.write_job_status()
    else:
      raise JobSubmitError(f"No submission class found for scheduler '{scheduler}'. Supported schedulers are: slurm, pbs, local.")

    job.write_metadata(override_status=False)

  except (ValueError, FileNotFoundError) as e:
    job.status = Status.FAILED_SUBMISSION.value
    job.end_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S.%f")
    job.write_metadata()
    err_str = "Failed to submit job. Error: " + str(e)
    with open(job.get_stderr_path(), 'w+') as err_file:
      err_file.write(err_str)
    raise JobSubmitError(err_str) from e
  except subprocess.CalledProcessError as e:
    job.status = Status.FAILED_SUBMISSION.value
    job.end_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S.%f")
    job.write_metadata()
    err_str = f"Job submission failed with error code {e.returncode}.\nOutput stream:\n" + e.output + "\nError stream:\n" + e.stderr if e.stderr else ""
    with open(job.get_stderr_path(), 'w+') as err_file:
      err_file.write(err_str)
    raise JobSubmitError(err_str) from e
  finally:    
    return job

launch_jobs_from_file

launch_jobs_from_file(jobs_file_path: Path, force: bool = False, dry_run: bool = False, filter_tags: Optional[List[str]] = None, filter_variables: Optional[Dict[str, Any]] = None, ignore_archived: bool = False, ignore_conf_in_dup_check: bool = False, ignore_commands_in_dup_check: bool = False) -> List[Job]

Launches jobs based on a YAML configuration file. Args: jobs_file_path: Path to the YAML file containing job definitions. force: If True, will overwrite existing jobs with the same configuration. dry_run: If True, will return the list of jobs but will not launch them (warning: won't work for sequential jobs) filter_tags: If provided, only launch jobs whose tag matches one of these values. filter_variables: If provided, only launch jobs where variables match all key=value pairs. Returns: A list of Job objects representing the launched jobs. Raises: ConfigurationError: If the jobs file is not found or has invalid syntax.

Source code in src/sbatchman/core/launcher.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
def launch_jobs_from_file(
  jobs_file_path: Path,
  force: bool = False,
  dry_run: bool = False,
  filter_tags: Optional[List[str]] = None,
  filter_variables: Optional[Dict[str, Any]] = None,
  ignore_archived: bool = False,
  ignore_conf_in_dup_check: bool = False,
  ignore_commands_in_dup_check: bool = False,
) -> List[Job]:
  """  Launches jobs based on a YAML configuration file.
  Args:
    jobs_file_path: Path to the YAML file containing job definitions.
    force: If True, will overwrite existing jobs with the same configuration.
    dry_run: If True, will return the list of jobs but will not launch them (warning: won't work for sequential jobs)
    filter_tags: If provided, only launch jobs whose tag matches one of these values.
    filter_variables: If provided, only launch jobs where variables match all key=value pairs.
  Returns:
    A list of Job objects representing the launched jobs.
  Raises:
    ConfigurationError: If the jobs file is not found or has invalid syntax.
  """

  with open(jobs_file_path, "r") as f:
    config = yaml.safe_load(f)

  global_is_sequential = bool(config.get("sequential"))
  global_vars = config.get("variables", {})
  global_command = config.get("command", None)
  global_preprocess = config.get("preprocess", None)
  global_postprocess = config.get("postprocess", None)
  global_cluster_name = config.get("cluster_name", None)

  machine_cluster_name = None
  try:
    machine_cluster_name = get_cluster_name()
  except ClusterNameNotSetError:
    pass

  if global_is_sequential:
    console.print('[yellow]Jobs will be scheduled sequentially.[/yellow]')

  # Prepare global variable values (expand files if needed)
  expanded_global_vars = {k: _load_variable_values(v) for k, v in global_vars.items()}

  launched_jobs = []
  job_definitions = config.get("jobs", [])
  previous_job_id = None

  for job_def in job_definitions:
    job_config_template = job_def.get("config")
    if not job_config_template:
      continue # Skip job definition if it has no config

    job_command_template = job_def.get("command", global_command)
    job_preprocess_template = job_def.get("preprocess", global_preprocess)
    job_postprocess_template = job_def.get("postprocess", global_postprocess)
    job_cluster_name = job_def.get("cluster_name", global_cluster_name)
    job_vars = job_def.get("variables", {})

    expanded_job_vars = {k: _load_variable_values(v) for k, v in job_vars.items()}

    # Merge global and job-specific variables
    merged_job_vars = {**expanded_global_vars, **expanded_job_vars}

    config_jobs = job_def.get("config_jobs", [])
    if not config_jobs:
      job_tag = job_def.get("tag", "default")

      # Early tag filter: skip if tag doesn't match any filter pattern
      if filter_tags is not None:
        matched = any(fnmatch.fnmatch(job_tag, pattern) for pattern in filter_tags)
        if not matched:
          continue

      if job_cluster_name is not None and machine_cluster_name is not None and job_cluster_name != machine_cluster_name:
        continue # Skip job if job's cluster name doesn't match the machine's cluster name

      # If no config_jobs, run with the job's own context
      previous_job_id = _launch_job_combinations(
        job_config_template,
        job_command_template,
        job_tag,
        job_preprocess_template,
        job_postprocess_template,
        job_cluster_name,
        merged_job_vars,
        launched_jobs,
        force,
        global_is_sequential,
        previous_job_id,
        dry_run=dry_run,
        filter_tags=filter_tags,
        filter_variables=filter_variables,
        ignore_archived=ignore_archived,
        ignore_conf_in_dup_check=ignore_conf_in_dup_check,
        ignore_commands_in_dup_check=ignore_commands_in_dup_check,
      )
    else:
      for entry in config_jobs:
        tag_name = entry.get("tag")
        if not tag_name:
          continue # Skip matrix entry if it has no tag

        # Early tag filter: skip if static tag doesn't match any filter pattern
        # (dynamic tags with variables like {model} are checked later after substitution)
        if filter_tags is not None and not _extract_used_vars(tag_name):
          matched = any(fnmatch.fnmatch(tag_name, pattern) for pattern in filter_tags)
          if not matched:
            continue

        entry_command_template = entry.get("command", job_command_template)
        entry_preprocess_template = entry.get("preprocess", job_preprocess_template)
        entry_postprocess_template = entry.get("postprocess", job_postprocess_template)
        entry_cluster_name = entry.get("cluster_name", job_cluster_name)
        entry_vars = entry.get("variables", {})
        expanded_entry_vars = {k: _load_variable_values(v) for k, v in entry_vars.items()}

        # Merge all variables: global -> job -> entry
        final_vars = {**merged_job_vars, **expanded_entry_vars}

        if entry_cluster_name is not None and machine_cluster_name is not None and entry_cluster_name != machine_cluster_name:
          continue # Skip job if entry's cluster name doesn't match the machine's cluster name

        previous_job_id = _launch_job_combinations(
          job_config_template,
          entry_command_template,
          tag_name,
          entry_preprocess_template,
          entry_postprocess_template,
          entry_cluster_name,
          final_vars,
          launched_jobs,
          force,
          global_is_sequential,
          previous_job_id,
          dry_run=dry_run,
          filter_tags=filter_tags,
          filter_variables=filter_variables,
          ignore_archived=ignore_archived,
          ignore_conf_in_dup_check=ignore_conf_in_dup_check,
          ignore_commands_in_dup_check=ignore_commands_in_dup_check,
        )

  return launched_jobs

unarchive_job

unarchive_job(job: Job) -> None

Moves a single archived job back to the active experiments directory.

Parameters:

Name Type Description Default
job Job

The Job instance to restore. job.archive_name must be set.

required

Raises:

Type Description
ValueError

If the job has no archive_name or no exp_dir.

FileNotFoundError

If the archived experiment directory does not exist.

Source code in src/sbatchman/core/jobs_manager.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def unarchive_job(job: Job) -> None:
  """
  Moves a single archived job back to the active experiments directory.

  Args:
      job: The Job instance to restore.  ``job.archive_name`` must be set.

  Raises:
      ValueError: If the job has no archive_name or no exp_dir.
      FileNotFoundError: If the archived experiment directory does not exist.
  """
  if not job.archive_name:
    raise ValueError(f"Job {job!r} is not archived (archive_name is not set).")
  if not job.exp_dir:
    raise ValueError(f"Job {job!r} has no exp_dir set.")

  archive_root = get_archive_dir()
  exp_dir_root = get_experiments_dir()

  source_job_dir = archive_root / job.archive_name / job.exp_dir
  if not source_job_dir.exists():
    raise FileNotFoundError(f"Archived directory not found: {source_job_dir}")

  dest_job_dir = exp_dir_root / job.exp_dir
  dest_job_dir.parent.mkdir(parents=True, exist_ok=True)

  # Clear the archive tag before moving so active metadata is clean.
  job.archive_name = None
  shutil.move(str(source_job_dir), str(dest_job_dir))

  job.write_metadata()

update_jobs_status

update_jobs_status() -> int

Updates the status of active jobs on the current cluster by querying the scheduler.

Returns:

Type Description
int

The number of jobs whose status was updated.

Source code in src/sbatchman/core/jobs_manager.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def update_jobs_status() -> int:
  """
  Updates the status of active jobs on the current cluster by querying the scheduler.

  Returns:
    The number of jobs whose status was updated.
  """
  current_cluster = get_cluster_name()
  active_jobs = jobs_list(cluster_name=current_cluster, from_active=True, from_archived=False, update_jobs=False)

  updated_count = 0

  with concurrent.futures.ThreadPoolExecutor() as executor:
      futures = [executor.submit(_update_single_job_status, job) for job in active_jobs]
      for future in concurrent.futures.as_completed(futures):
          if future.result():
              updated_count += 1

  return updated_count