import os
from ...utils import BilbyPipeError, logger
from ..node import Node
[docs]
class PESummaryNode(Node):
def __init__(self, inputs, merged_node_list, generation_node_list, dag):
super().__init__(inputs)
[docs]
self.job_name = f"{inputs.label}_pesummary"
n_results = len(merged_node_list)
result_files = [merged_node.result_file for merged_node in merged_node_list]
labels = [merged_node.label for merged_node in merged_node_list]
if self.inputs.transfer_files or self.inputs.osg:
input_files_to_transfer = [
self._relative_topdir(fname, self.inputs.initialdir)
for fname in result_files
] + inputs.additional_transfer_paths
files = [self.inputs.complete_ini_file]
if len(generation_node_list) == 1:
files.append(generation_node_list[0].data_dump_file)
input_files_to_transfer.extend(
[
self._relative_topdir(fname, self.inputs.initialdir)
for fname in files
]
)
for value in [
self.inputs.psd_dict,
self.inputs.spline_calibration_envelope_dict,
]:
input_files_to_transfer.extend(self.extract_paths_from_dict(value))
if self.transfer_container:
input_files_to_transfer.append(self.inputs.container)
input_files_to_transfer, need_scitokens = self.job_needs_authentication(
input_files_to_transfer
)
if need_scitokens:
self.extra_lines.extend(self.scitoken_lines)
self.extra_lines.extend(
self._condor_file_transfer_lines(
input_files_to_transfer,
[self._relative_topdir(self.inputs.webdir, self.inputs.initialdir)],
)
)
self.setup_arguments(
add_ini=False, add_unknown_args=False, add_command_line_args=False
)
self.arguments.add("webdir", self.inputs.webdir)
if self.inputs.email is not None:
self.arguments.add("email", self.inputs.email)
self.arguments.add(
"config", " ".join([self.inputs.complete_ini_file] * n_results)
)
result_files = [os.path.relpath(fname) for fname in result_files]
self.arguments.add("samples", f"{' '.join(result_files)}")
# Using append here as summary pages doesn't take a full name for approximant
self.arguments.append("-a")
self.arguments.append(" ".join([self.inputs.waveform_approximant] * n_results))
if len(generation_node_list) == 1:
self.arguments.add(
"gwdata", os.path.relpath(generation_node_list[0].data_dump_file)
)
elif len(generation_node_list) > 1:
logger.info(
"Not adding --gwdata to PESummary job as there are multiple files"
)
existing_dir = self.inputs.existing_dir
if existing_dir is not None:
self.arguments.add("existing_webdir", existing_dir)
if isinstance(self.inputs.summarypages_arguments, dict):
if "labels" not in self.inputs.summarypages_arguments.keys():
self.arguments.append(f"--labels {' '.join(labels)}")
else:
if len(labels) != len(result_files):
raise BilbyPipeError(
"Please provide the same number of labels for postprocessing "
"as result files"
)
not_recognised_arguments = {}
for key, val in self.inputs.summarypages_arguments.items():
if key == "nsamples_for_skymap":
self.arguments.add("nsamples_for_skymap", val)
elif key == "gw":
self.arguments.add_flag("gw")
elif key == "no_ligo_skymap":
self.arguments.add_flag("no_ligo_skymap")
elif key == "burnin":
self.arguments.add("burnin", val)
elif key == "kde_plot":
self.arguments.add_flag("kde_plot")
elif key == "gracedb":
self.arguments.add("gracedb", val)
elif key == "palette":
self.arguments.add("palette", val)
elif key == "include_prior":
self.arguments.add_flag("include_prior")
elif key == "notes":
self.arguments.add("notes", val)
elif key == "publication":
self.arguments.add_flag("publication")
elif key == "labels":
self.arguments.add("labels", f"{' '.join(val)}")
else:
not_recognised_arguments[key] = val
if not_recognised_arguments != {}:
logger.warn(
"Did not recognise the summarypages_arguments {}. To find "
"the full list of available arguments, please run "
"summarypages --help".format(not_recognised_arguments)
)
self.process_node()
for merged_node in merged_node_list:
self.job.add_parent(merged_node.job)
@property
[docs]
def executable(self):
return self._get_executable_path("summarypages")
@property
[docs]
def request_memory(self):
return "16 GB"
@property
[docs]
def log_directory(self):
return self.inputs.summary_log_directory