import os
from pathlib import Path
from ...utils import check_directory_exists_and_if_not_mkdir, logger
from ..node import Node
[docs]
class AnalysisNode(Node):
# If --osg, run analysis nodes on the OSG
def __init__(self, inputs, generation_node, detectors, sampler, parallel_idx, dag):
super().__init__(inputs, retry=3)
[docs]
self.generation_node = generation_node
[docs]
self.detectors = detectors
[docs]
self.parallel_idx = parallel_idx
[docs]
self.request_cpus = inputs.request_cpus
data_label = generation_node.job_name
base_name = data_label.replace("generation", "analysis")
[docs]
self.base_job_name = f"{base_name}_{''.join(detectors)}"
if parallel_idx != "":
self.job_name = f"{self.base_job_name}_{parallel_idx}"
else:
self.job_name = self.base_job_name
[docs]
self.label = self.job_name
if self.inputs.use_mpi:
self.setup_arguments(
parallel_program=self._get_executable_path(
self.inputs.analysis_executable
)
)
else:
self.setup_arguments()
if self.inputs.transfer_files or self.inputs.osg:
data_dump_file = generation_node.data_dump_file
input_files_to_transfer = (
[
str(data_dump_file),
str(self.inputs.complete_ini_file),
]
+ touch_checkpoint_files(
os.path.join(inputs.outdir, "result"),
self.job_name,
inputs.sampler,
inputs.result_format,
)
+ inputs.additional_transfer_paths
)
self.extra_lines.extend(
self._condor_file_transfer_lines(
input_files_to_transfer,
[self._relative_topdir(self.inputs.outdir, self.inputs.initialdir)],
)
)
self.arguments.add("outdir", os.path.relpath(self.inputs.outdir))
for det in detectors:
self.arguments.add("detectors", det)
self.arguments.add("label", self.label)
self.arguments.add("data-dump-file", generation_node.data_dump_file)
self.arguments.add("sampler", sampler)
self.extra_lines.extend(self._checkpoint_submit_lines())
env_vars = []
if self.request_cpus > 1:
env_vars.append("OMP_NUM_THREADS=1")
# see https://git.ligo.org/computing/helpdesk/-/issues/3837#note_707110
env_vars.append("KMP_AFFINITY='reset'")
if self.disable_hdf5_locking:
env_vars.append("USE_HDF5_FILE_LOCKING=FALSE")
if env_vars:
self.extra_lines.append(f"environment = \"{' '.join(env_vars)}\"")
self.process_node()
self.job.add_parent(generation_node.job)
@property
[docs]
def executable(self):
if self.inputs.use_mpi:
return self._get_executable_path("mpiexec")
elif self.inputs.analysis_executable:
return self._get_executable_path(self.inputs.analysis_executable)
else:
return self._get_executable_path("bilby_pipe_analysis")
@property
[docs]
def request_memory(self):
return self.inputs.request_memory
@property
[docs]
def log_directory(self):
return self.inputs.data_analysis_log_directory
@property
[docs]
def result_file(self):
return f"{self.inputs.result_directory}/{self.job_name}_result.{self.inputs.result_format}"
@property
[docs]
def slurm_walltime(self):
"""Default wall-time for base-name"""
# Seven days
return self.inputs.scheduler_analysis_time
[docs]
def touch_checkpoint_files(directory, label, sampler, result_format="hdf5"):
"""
Figure out the pathnames required to recover from a checkpoint.
These may change due to upstream changes in Bilby.
"""
def touch_pickle_file(filename):
import dill
if not Path(filename).exists():
with open(filename, "wb") as ff:
dill.dump(dict(), ff)
abbreviations = dict(
ptmcmcsampler="ptmcmc_temp",
pymultinest="pm",
ultranest="ultra",
)
check_directory_exists_and_if_not_mkdir(directory=directory)
result_file = Path(directory) / f"{label}_result.{result_format}"
result_file.touch()
filenames = [str(result_file)]
if sampler.lower() == "dynesty":
for kind in ["resume", "dynesty"]:
filename = f"{directory}/{label}_{kind}.pickle"
touch_pickle_file(filename)
filenames.append(filename)
elif sampler.lower() == "bilby_mcmc":
filename = f"{directory}/{label}_resume.pickle"
touch_pickle_file(filename)
filenames.append(filename)
elif sampler.lower == "ptemcee":
filename = f"{directory}/{label}_checkpoint_resume.pickle"
touch_pickle_file(filename)
filenames.append(filename)
elif sampler.lower() == "nessai":
dirname = f"{directory}/{label}_nessai"
check_directory_exists_and_if_not_mkdir(directory=dirname)
filenames.append(dirname)
subdirectories = ["proposal", "diagnostics"]
for sd in subdirectories:
subdir = os.path.join(dirname, sd)
check_directory_exists_and_if_not_mkdir(subdir)
filenames.append(dirname)
elif sampler.lower() in [
"cpnest",
"emcee",
"kombine",
"ultranest",
"ptmcmcsampler",
"pymultinest",
"zeus",
]:
name = abbreviations.get(sampler.lower(), sampler.lower())
dirname = f"{directory}/{name}_{label}"
check_directory_exists_and_if_not_mkdir(directory=dirname)
filenames.append(dirname)
elif sampler.lower() == "pypolychord":
dirname = f"{directory}/chains"
check_directory_exists_and_if_not_mkdir(directory=dirname)
filenames.append(dirname)
else:
logger.warning(f"Unable to predict resume files for {sampler}")
return filenames