Source code for bilby_pipe.job_creation.nodes.merge_node

from ..node import Node


[docs] class MergeNode(Node): def __init__(self, inputs, parallel_node_list, detectors, dag): super().__init__(inputs)
[docs] self.dag = dag
[docs] self.job_name = f"{parallel_node_list[0].base_job_name}_merge"
[docs] self.label = f"{parallel_node_list[0].base_job_name}_merge"
[docs] self.request_cpus = 1
self.setup_arguments( add_ini=False, add_unknown_args=False, add_command_line_args=False ) self.arguments.append("--result") for pn in parallel_node_list: self.arguments.append(pn.result_file) self.arguments.add("outdir", self.inputs.result_directory) self.arguments.add("label", self.label) self.arguments.add("extension", self.inputs.result_format) self.arguments.add_flag("merge") self.process_node() for pn in parallel_node_list: self.job.add_parent(pn.job) @property
[docs] def executable(self): return self._get_executable_path("bilby_result")
@property
[docs] def request_memory(self): return "16 GB"
@property
[docs] def log_directory(self): return self.inputs.data_analysis_log_directory
@property
[docs] def result_file(self): return f"{self.inputs.result_directory}/{self.label}_result.{self.inputs.result_format}"