from ...utils import DataDump, logger
from ..node import Node
[docs]
class GenerationNode(Node):
def __init__(self, inputs, trigger_time, idx, dag, parent=None):
"""
Node for data generation jobs
Parameters:
-----------
inputs: bilby_pipe.main.MainInput
The user-defined inputs
trigger_time: float
The trigger time to use in generating analysis data
idx: int
The index of the data-generation job, used to label data products
dag: bilby_pipe.dag.Dag
The dag structure
parent: bilby_pipe.job_creation.node.Node (optional)
Any job to set as the parent to this job - used to enforce
dependencies
"""
super().__init__(inputs)
[docs]
self.trigger_time = trigger_time
self.setup_arguments()
self.arguments.add("label", self.label)
self.arguments.add("idx", self.idx)
self.arguments.add("trigger-time", self.trigger_time)
if self.inputs.injection_file is not None:
self.arguments.add("injection-file", self.inputs.injection_file)
if self.inputs.timeslide_file is not None:
self.arguments.add("timeslide-file", self.inputs.timeslide_file)
if self.disable_hdf5_locking:
self.extra_lines.append('environment = "HDF5_USE_FILE_LOCKING=FALSE"')
self.process_node()
if parent:
self.job.add_parent(parent.job)
@property
[docs]
def executable(self):
return self._get_executable_path("bilby_pipe_generation")
@property
[docs]
def request_memory(self):
return self.inputs.request_memory_generation
@property
[docs]
def log_directory(self):
return self.inputs.data_generation_log_directory
@property
[docs]
def universe(self):
if self.inputs.local_generation:
logger.debug(
"Data generation done locally: please do not use this when "
"submitting a large number of jobs"
)
universe = "local"
else:
logger.debug(f"All data will be grabbed in the {self._universe} universe")
universe = self._universe
return universe
@property
[docs]
def job_name(self):
job_name = "{}_data{}_{}_generation".format(
self.inputs.label, str(self.idx), self.trigger_time
)
job_name = job_name.replace(".", "-")
return job_name
@property
[docs]
def label(self):
return self.job_name
@property
[docs]
def data_dump_file(self):
return DataDump.get_filename(self.inputs.data_directory, self.label)