Source code for bilby_pipe.job_creation.slurm

#!/usr/bin/env python
"""
Module containing the tools for outputting slurm submission scripts
"""

import os
import subprocess

from ..utils import logger


[docs] class SubmitSLURM(object): def __init__(self, dag):
[docs] self.dag = dag.pycondor_dag
[docs] self.submit_dir = dag.inputs.submit_directory
[docs] self.submit = dag.inputs.submit
[docs] self.label = dag.inputs.label
[docs] self.scheduler = dag.scheduler
[docs] self.scheduler_args = dag.scheduler_args
[docs] self.scheduler_module = dag.scheduler_module
[docs] self.scheduler_env = dag.scheduler_env
[docs] self.scheduler_analysis_time = dag.scheduler_analysis_time
[docs] def run_local_generation(self): for node in self.dag.nodes: if "_generation" in node.name: # Run the job locally cmd = " ".join([node.executable, node.args[0].arg]) subprocess.run(cmd, shell=True) # Remove the children for other_node in self.dag.nodes: if node in other_node.parents: other_node.parents.remove(node) self.dag.nodes.remove(node)
[docs] def write_master_slurm(self): """ Translate dag content to SLURM script """ with open(self.slurm_master_bash, "w") as f: # reformat slurm options if self.scheduler_args is not None: slurm_args = " ".join( [f"--{arg}" for arg in self.scheduler_args.split()] ) else: slurm_args = "" f.write("#!/bin/bash\n") for arg in slurm_args.split(): f.write(f"#SBATCH {arg}\n") f.write("#SBATCH --time=00:10:00\n") # write output to standard file f.write( f"#SBATCH --output={self.submit_dir}/{self.label}_master_slurm.out\n" ) f.write( f"#SBATCH --error={self.submit_dir}/{self.label}_master_slurm.err\n" ) if self.scheduler_module: for module in self.scheduler_module: if module is not None: f.write(f"\nmodule load {module}\n") # if self.scheduler_env is not None: # f.write("\nsource {}\n".format(self.scheduler_env)) # assign new job ID to each process jids = range(len(self.dag.nodes)) job_names = [node.name for node in self.dag.nodes] # create dict for assigning ID to job name job_dict = dict(zip(job_names, jids)) for node, indx in zip(self.dag.nodes, jids): # Generate the real slurm arguments from the dag node and the parsed slurm args job_slurm_args = slurm_args job_slurm_args += " --nodes=1" job_slurm_args += f" --ntasks-per-node={node.request_cpus}" job_slurm_args += ( f" --mem={int(float(node.request_memory.rstrip('GB')))}G" ) job_slurm_args += f" --time={node.slurm_walltime}" job_slurm_args += f" --job-name={node.name}" submit_str = f"\njid{indx}=($(sbatch {job_slurm_args} " # get list of all parents associated with job parents = [job.name for job in node.parents] if len(parents) > 0: # only run subsequent jobs after parent has # *successfully* completed submit_str += "--dependency=afterok" for parent in parents: submit_str += f":${{jid{job_dict[parent]}[-1]}}" # get output file path from dag and use for slurm output_file = self._output_name_from_dag(node.extra_lines) submit_str += f" --output={output_file}" submit_str += f" --error={output_file.replace('.out', '.err')}" job_script = self._write_individual_processes( node.name, node.executable, node.args[0].arg ) submit_str += f" {job_script}))\n\n" submit_str += ( f'echo "jid{indx} ${{jid{indx}[-1]}}" >> {self.slurm_id_file}' ) f.write(f"{submit_str}\n") # print out how to submit command_line = f"sbatch {self.slurm_master_bash}" if self.submit: subprocess.run([command_line], shell=True) else: logger.info(f"slurm scripts written, to run jobs submit:\n$ {command_line}")
[docs] def _write_individual_processes(self, name, executable, args): fname = name + ".sh" job_path = self.submit_dir + "/" + fname with open(job_path, "w") as ff: ff.write("#!/bin/bash\n") if self.scheduler_module: for module in self.scheduler_module: if module is not None: ff.write(f"\nmodule load {module}\n") if self.scheduler_env is not None: ff.write(f"\nsource {self.scheduler_env}\n\n") # Call python from the venv on the script directly to avoid # "bad interpreter" from shebang exceeding 128 chars job_str = f"python {executable} {args}\n\n" else: job_str = f"{executable} {args}\n\n" ff.write(job_str) return job_path
@property
[docs] def slurm_master_bash(self): """ Create filename for master script """ filebasename = "_".join(["slurm", self.label, "master.sh"]) return os.path.join(self.submit_dir, filebasename)
@property
[docs] def slurm_id_file(self): """ Create the file that should store the slurm ids of the jobs """ return os.path.join(self.submit_dir, "slurm_ids")
@staticmethod
[docs] def _output_name_from_dag(extra_lines): # probably a faster way to do this, but the list is short so this should be fine for i in range(len(extra_lines)): if extra_lines[i].startswith("output"): path = extra_lines[i][9:] path = path.replace("_$(Cluster)", "") path = path.replace("_$(Process)", "") return path