Source code for bilby_pipe.job_creation.slurm

#!/usr/bin/env python
"""
Module containing the tools for outputting slurm submission scripts
"""

import os
import subprocess

from ..utils import logger


[docs] class SubmitSLURM(object): def __init__(self, dag):
[docs] self.dag = dag.pycondor_dag
[docs] self.submit_dir = dag.inputs.submit_directory
[docs] self.submit = dag.inputs.submit
[docs] self.label = dag.inputs.label
[docs] self.scheduler = dag.scheduler
[docs] self.scheduler_args = dag.scheduler_args
[docs] self.scheduler_module = dag.scheduler_module
[docs] self.scheduler_env = dag.scheduler_env
[docs] self.scheduler_analysis_time = dag.scheduler_analysis_time
[docs] def run_local_generation(self): for node in self.dag.nodes: if "_generation" in node.name: # Run the job locally cmd = " ".join([node.executable, node.args[0].arg]) subprocess.run(cmd, shell=True) # Remove the children for other_node in self.dag.nodes: if node in other_node.parents: other_node.parents.remove(node) self.dag.nodes.remove(node)
[docs] def write_master_slurm(self): """ Translate dag content to SLURM script """ slurm_args_master = { "mem": "1G", "nodes": 1, "ntasks-per-node": 1, "time": "00:10:00", "output": f"{self.submit_dir}/{self.label}_master_slurm.out", "error": f"{self.submit_dir}/{self.label}_master_slurm.err", "job-name": f"{self.label}_master", } # reformat slurm options if self.scheduler_args is not None: slurm_args_custom = { arg.split("=")[0]: arg.split("=")[1] for arg in self.scheduler_args.split() } slurm_args_master.update(slurm_args_custom) else: slurm_args_custom = {} with open(self.slurm_master_bash, "w") as f: f.write("#!/bin/bash\n") for key, val in slurm_args_master.items(): f.write(f"#SBATCH --{key}={val}\n") if self.scheduler_module: for module in self.scheduler_module: if module is not None: f.write(f"\nmodule load {module}\n") # if self.scheduler_env is not None: # f.write("\nsource {}\n".format(self.scheduler_env)) # assign new job ID to each process jids = range(len(self.dag.nodes)) job_names = [node.name for node in self.dag.nodes] # create dict for assigning ID to job name job_dict = dict(zip(job_names, jids)) for node, indx in zip(self.dag.nodes, jids): # get output file path from dag and use for slurm output_file = self._output_name_from_dag(node.extra_lines) # Generate the real slurm arguments from the dag node and the parsed slurm args job_slurm_args = { "mem": f"{int(float(node.request_memory.rstrip('GB')))}G", "nodes": 1, "ntasks-per-node": node.request_cpus, "time": node.slurm_walltime, "output": output_file, "error": output_file.replace(".out", ".err"), "job-name": node.name, } job_slurm_args.update(slurm_args_custom) # build the submit string for this node submit_str = f"\njid{indx}=($(sbatch" for key, val in job_slurm_args.items(): submit_str += f" --{key}={val}" # get list of all parents associated with job parents = [job.name for job in node.parents] if len(parents) > 0: # only run subsequent jobs after parent has # *successfully* completed submit_str += " --dependency=afterok" for parent in parents: submit_str += f":${{jid{job_dict[parent]}[-1]}}" job_script = self._write_individual_processes( node.name, node.executable, node.args[0].arg ) submit_str += f" {job_script}))\n\n" submit_str += ( f'echo "jid{indx} ${{jid{indx}[-1]}}" >> {self.slurm_id_file}' ) f.write(f"{submit_str}\n") # print out how to submit command_line = f"sbatch {self.slurm_master_bash}" if self.submit: subprocess.run([command_line], shell=True) else: logger.info(f"slurm scripts written, to run jobs submit:\n$ {command_line}")
[docs] def _write_individual_processes(self, name, executable, args): fname = name + ".sh" job_path = self.submit_dir + "/" + fname with open(job_path, "w") as ff: ff.write("#!/bin/bash\n") if self.scheduler_module: for module in self.scheduler_module: if module is not None: ff.write(f"\nmodule load {module}\n") if self.scheduler_env is not None: ff.write(f"\nsource {self.scheduler_env}\n\n") # Call python from the venv on the script directly to avoid # "bad interpreter" from shebang exceeding 128 chars job_str = f"python {executable} {args}\n\n" else: job_str = f"{executable} {args}\n\n" ff.write(job_str) return job_path
@property
[docs] def slurm_master_bash(self): """ Create filename for master script """ filebasename = "_".join(["slurm", self.label, "master.sh"]) return os.path.join(self.submit_dir, filebasename)
@property
[docs] def slurm_id_file(self): """ Create the file that should store the slurm ids of the jobs """ return os.path.join(self.submit_dir, "slurm_ids")
@staticmethod
[docs] def _output_name_from_dag(extra_lines): # probably a faster way to do this, but the list is short so this should be fine for i in range(len(extra_lines)): if extra_lines[i].startswith("output"): path = extra_lines[i][9:] path = path.replace("_$(Cluster)", "") path = path.replace("_$(Process)", "") return path