Source code for bilby_pipe.job_creation.nodes.generation_node

import os

from ...utils import BilbyPipeError, DataDump, log_function_call, logger
from ..node import Node


[docs] class GenerationNode(Node): def __init__(self, inputs, trigger_time, idx, dag, parent=None): """ Node for data generation jobs Parameters: ----------- inputs: bilby_pipe.main.MainInput The user-defined inputs trigger_time: float The trigger time to use in generating analysis data idx: int The index of the data-generation job, used to label data products dag: bilby_pipe.dag.Dag The dag structure parent: bilby_pipe.job_creation.node.Node (optional) Any job to set as the parent to this job - used to enforce dependencies """ super().__init__(inputs, retry=3) if not inputs.osg and inputs.generation_pool == "igwn-pool": raise BilbyPipeError( "Generation job requested to use the igwn-pool " "(OSG, --generation-pool=igwn-pool), but --osg=False" ) else: self.run_node_on_osg = inputs.generation_pool == "igwn-pool"
[docs] self.inputs = inputs
[docs] self.trigger_time = trigger_time
self.inputs.trigger_time = trigger_time
[docs] self.idx = idx
[docs] self.dag = dag
[docs] self.request_cpus = 1
self.setup_arguments() self.arguments.add("label", self.label) self.arguments.add("idx", self.idx) self.arguments.add("trigger-time", self.trigger_time) if self.inputs.injection_file is not None: self.arguments.add("injection-file", self.inputs.injection_file) if self.inputs.timeslide_file is not None: self.arguments.add("timeslide-file", self.inputs.timeslide_file) frame_files, success = self.resolve_frame_files need_scitokens = not success if self.inputs.transfer_files or self.inputs.osg: input_files_to_transfer = list() for attr in [ "complete_ini_file", "prior_file", "injection_file", "gps_file", "timeslide_file", ]: if (value := getattr(self.inputs, attr)) is not None: input_files_to_transfer.append(str(value)) for value in [ self.inputs.psd_dict, self.inputs.spline_calibration_envelope_dict, frame_files, ]: input_files_to_transfer.extend(self.extract_paths_from_dict(value)) input_files_to_transfer.extend(self.inputs.additional_transfer_paths) for ii, fname in enumerate(input_files_to_transfer): if fname.startswith("osdf://") and self._file_needs_authentication( fname ): need_scitokens = True prefix = self.authenticated_file_prefix input_files_to_transfer[ii] = f"{prefix}{fname}" self.extra_lines.extend( self._condor_file_transfer_lines( input_files_to_transfer, [self._relative_topdir(self.inputs.outdir, self.inputs.initialdir)], ) ) self.arguments.add("outdir", os.path.relpath(self.inputs.outdir)) elif new_frames := [ fname for fname in self.extract_paths_from_dict(frame_files) if fname.startswith(self.inputs.data_find_urltype) ]: logger.warning( "The following frame files were identified by gwdatafind for this analysis. " "These frames may not be found by the data generation stage as file " "transfer is not being used. You should either set transfer-files=True or " "pass these frame files to the data-dict option. You may need to " f"remove a prefix, e.g., file://localhost.\n\t{new_frames}" ) if need_scitokens: self.extra_lines.extend(self.scitoken_lines) self.process_node() if parent: self.job.add_parent(parent.job) @property
[docs] def resolve_frame_files(self): """ Resolve frame files from frame_type_dict and data_dict. For each detector, if the frame filepath(s) is given return the filepath(s), otherwise use gwdatafind to resolve the frame files using the provided frame type. Returns: -------- output: list list of frame filepaths success: bool True if frame files are resolved successfully for all detectors """ from gwdatafind import find_urls from gwpy.io.datafind import find_best_frametype from requests.exceptions import HTTPError success = True if self.inputs.gaussian_noise or self.inputs.zero_noise: return list(), success elif self.inputs.channel_dict is None: raise BilbyPipeError( "channel-dict must be provided if not using gaussian-noise or zero-noise" ) data = dict() if self.inputs.frame_type_dict is not None: data = self.inputs.frame_type_dict if self.inputs.data_dict is not None: data.update(self.inputs.data_dict) output = dict() for det in self.inputs.detectors: if ( self.inputs.channel_dict is not None and self.inputs.channel_dict[det] == "GWOSC" ): logger.info(f"Skipping datafind for {det} as GWOSC data is used.") elif isinstance(data.get(det, None), list): output[det] = data[det] elif os.path.exists(data.get(det, "/not/a/real/file")): output[det] = [data[det]] else: start_time = self.inputs.start_time end_time = self.inputs.start_time + self.inputs.duration if ( self.inputs.psd_dict is None or self.inputs.psd_dict.get(det, None) is None ): start_time -= self.inputs.psd_duration datafind_server = os.environ.get( "GWDATAFIND_SERVER", self.inputs.data_find_url ) if det not in data: channel_name = self.inputs.channel_dict[det] if not channel_name.startswith(f"{det}:"): channel_name = f"{det}:{channel_name}" frame_type = find_best_frametype( channel_name, start_time, end_time, host=datafind_server, ) else: frame_type = data[det] kwargs = dict( site=det[0], gpsstart=start_time, gpsend=end_time, urltype=self.inputs.data_find_urltype, host=datafind_server, on_gaps="error", frametype=frame_type, ) log_function_call("gwdatafind.find_urls", kwargs) try: output[det] = find_urls(**kwargs) logger.info(f"Found frame files with {frame_type}") except (HTTPError, RuntimeError): logger.warning( f"Failed to resolve frame files for detector {det}, the generation " "job will attempt with gwpy.get." ) success = False return output, success
@staticmethod
[docs] def extract_paths_from_dict(input): output = list() if isinstance(input, dict): for value in input.values(): if isinstance(value, str): output.append(value) elif isinstance(value, list): output.extend(value) return output
@property
[docs] def executable(self): return self._get_executable_path("bilby_pipe_generation")
@property
[docs] def request_memory(self): return self.inputs.request_memory_generation
@property
[docs] def log_directory(self): return self.inputs.data_generation_log_directory
@property
[docs] def universe(self): if self.inputs.local_generation: logger.debug( "Data generation done locally: please do not use this when " "submitting a large number of jobs" ) universe = "local" else: logger.debug(f"All data will be grabbed in the {self._universe} universe") universe = self._universe return universe
@property
[docs] def job_name(self): job_name = "{}_data{}_{}_generation".format( self.inputs.label, str(self.idx), self.trigger_time ) job_name = job_name.replace(".", "-") return job_name
@property
[docs] def label(self): return self.job_name
@property
[docs] def data_dump_file(self): return DataDump.get_filename(self.inputs.data_directory, self.label)
[docs] def _file_needs_authentication(self, fname): """ Check if a file needs authentication to be accessed, currently the only repositories that need authentication are ligo.osgstorage.org and *.storage.igwn.org. Parameters ---------- fname: str The file name to check """ proprietary_paths = ["igwn", "frames"] return any(path in fname for path in proprietary_paths)
@property
[docs] def scitoken_lines(self): """ Additional lines needed for the submit file to enable access to proprietary files/services. Note that we do not support scoped tokens. This is determined by the method used to issue the scitokens. For more details see `here <https://computing.docs.ligo.org/guide/htcondor/credentials>`_. """ issuer = self.scitoken_issuer if issuer is None: return [] else: return [f"use_oauth_services = {issuer}"]
@property
[docs] def authenticated_file_prefix(self): """ Return the prefix to add to files that need authentication. This is determined by the method used to issue the scitokens. For more details see `here <https://computing.docs.ligo.org/guide/htcondor/credentials>`. """ if self.scitoken_issuer in [None, "scitokens"]: return "" else: return "igwn+"
@property
[docs] def scitoken_issuer(self): """ Return the issuer to use for scitokens. This is determined by the :code:`--scitoken-issuer` argument or the version :code:`HTCondor` running on the current machine. For more details see `here <https://computing.docs.ligo.org/guide/htcondor/credentials>`_. """ if self.inputs.scheduler.lower() != "condor": return None elif ( self.inputs.scitoken_issuer == "local" or _is_htcondor_scitoken_local_issuer() ): return "scitokens" else: return "igwn"
[docs] def _is_htcondor_scitoken_local_issuer(): """ Test whether the machine being used is configured to use a local issuer or not. See `here <https://git.ligo.org/lscsoft/bilby_pipe/-/issues/304#note_1033251>`_ for where this logic comes from. """ try: from htcondor import param except ModuleNotFoundError: logger.warning( "HTCondor python bindings are not installed, assuming local " "issuer for scitokens if using HTCondor." ) return True return param.get("LOCAL_CREDMON_ISSUER", None) is not None