import os
from ...utils import BilbyPipeError, DataDump, log_function_call, logger
from ..node import Node
[docs]
class GenerationNode(Node):
def __init__(self, inputs, trigger_time, idx, dag, parent=None):
"""
Node for data generation jobs
Parameters:
-----------
inputs: bilby_pipe.main.MainInput
The user-defined inputs
trigger_time: float
The trigger time to use in generating analysis data
idx: int
The index of the data-generation job, used to label data products
dag: bilby_pipe.dag.Dag
The dag structure
parent: bilby_pipe.job_creation.node.Node (optional)
Any job to set as the parent to this job - used to enforce
dependencies
"""
super().__init__(inputs, retry=3)
if not inputs.osg and inputs.generation_pool == "igwn-pool":
raise BilbyPipeError(
"Generation job requested to use the igwn-pool "
"(OSG, --generation-pool=igwn-pool), but --osg=False"
)
else:
self.run_node_on_osg = inputs.generation_pool == "igwn-pool"
[docs]
self.trigger_time = trigger_time
self.inputs.trigger_time = trigger_time
self.setup_arguments()
self.arguments.add("label", self.label)
self.arguments.add("idx", self.idx)
self.arguments.add("trigger-time", self.trigger_time)
if self.inputs.injection_file is not None:
self.arguments.add("injection-file", self.inputs.injection_file)
if self.inputs.timeslide_file is not None:
self.arguments.add("timeslide-file", self.inputs.timeslide_file)
frame_files, success = self.resolve_frame_files
need_scitokens = not success
if self.inputs.transfer_files or self.inputs.osg:
input_files_to_transfer = list()
for attr in [
"complete_ini_file",
"prior_file",
"injection_file",
"gps_file",
"timeslide_file",
]:
if (value := getattr(self.inputs, attr)) is not None:
input_files_to_transfer.append(str(value))
for value in [
self.inputs.psd_dict,
self.inputs.spline_calibration_envelope_dict,
frame_files,
]:
input_files_to_transfer.extend(self.extract_paths_from_dict(value))
input_files_to_transfer.extend(self.inputs.additional_transfer_paths)
for ii, fname in enumerate(input_files_to_transfer):
if fname.startswith("osdf://") and self._file_needs_authentication(
fname
):
need_scitokens = True
prefix = self.authenticated_file_prefix
input_files_to_transfer[ii] = f"{prefix}{fname}"
self.extra_lines.extend(
self._condor_file_transfer_lines(
input_files_to_transfer,
[self._relative_topdir(self.inputs.outdir, self.inputs.initialdir)],
)
)
self.arguments.add("outdir", os.path.relpath(self.inputs.outdir))
elif new_frames := [
fname
for fname in self.extract_paths_from_dict(frame_files)
if fname.startswith(self.inputs.data_find_urltype)
]:
logger.warning(
"The following frame files were identified by gwdatafind for this analysis. "
"These frames may not be found by the data generation stage as file "
"transfer is not being used. You should either set transfer-files=True or "
"pass these frame files to the data-dict option. You may need to "
f"remove a prefix, e.g., file://localhost.\n\t{new_frames}"
)
if need_scitokens:
self.extra_lines.extend(self.scitoken_lines)
self.process_node()
if parent:
self.job.add_parent(parent.job)
@property
[docs]
def resolve_frame_files(self):
"""
Resolve frame files from frame_type_dict and data_dict.
For each detector, if the frame filepath(s) is given
return the filepath(s), otherwise use gwdatafind to
resolve the frame files using the provided frame type.
Returns:
--------
output: list
list of frame filepaths
success: bool
True if frame files are resolved successfully for all detectors
"""
from gwdatafind import find_urls
from gwpy.io.datafind import find_best_frametype
from requests.exceptions import HTTPError
success = True
if self.inputs.gaussian_noise or self.inputs.zero_noise:
return list(), success
elif self.inputs.channel_dict is None:
raise BilbyPipeError(
"channel-dict must be provided if not using gaussian-noise or zero-noise"
)
data = dict()
if self.inputs.frame_type_dict is not None:
data = self.inputs.frame_type_dict
if self.inputs.data_dict is not None:
data.update(self.inputs.data_dict)
output = dict()
for det in self.inputs.detectors:
if (
self.inputs.channel_dict is not None
and self.inputs.channel_dict[det] == "GWOSC"
):
logger.info(f"Skipping datafind for {det} as GWOSC data is used.")
elif isinstance(data.get(det, None), list):
output[det] = data[det]
elif os.path.exists(data.get(det, "/not/a/real/file")):
output[det] = [data[det]]
else:
start_time = self.inputs.start_time
end_time = self.inputs.start_time + self.inputs.duration
if (
self.inputs.psd_dict is None
or self.inputs.psd_dict.get(det, None) is None
):
start_time -= self.inputs.psd_duration
datafind_server = os.environ.get(
"GWDATAFIND_SERVER", self.inputs.data_find_url
)
if det not in data:
channel_name = self.inputs.channel_dict[det]
if not channel_name.startswith(f"{det}:"):
channel_name = f"{det}:{channel_name}"
frame_type = find_best_frametype(
channel_name,
start_time,
end_time,
host=datafind_server,
)
else:
frame_type = data[det]
kwargs = dict(
site=det[0],
gpsstart=start_time,
gpsend=end_time,
urltype=self.inputs.data_find_urltype,
host=datafind_server,
on_gaps="error",
frametype=frame_type,
)
log_function_call("gwdatafind.find_urls", kwargs)
try:
output[det] = find_urls(**kwargs)
logger.info(f"Found frame files with {frame_type}")
except (HTTPError, RuntimeError):
logger.warning(
f"Failed to resolve frame files for detector {det}, the generation "
"job will attempt with gwpy.get."
)
success = False
return output, success
@staticmethod
@property
[docs]
def executable(self):
return self._get_executable_path("bilby_pipe_generation")
@property
[docs]
def request_memory(self):
return self.inputs.request_memory_generation
@property
[docs]
def log_directory(self):
return self.inputs.data_generation_log_directory
@property
[docs]
def universe(self):
if self.inputs.local_generation:
logger.debug(
"Data generation done locally: please do not use this when "
"submitting a large number of jobs"
)
universe = "local"
else:
logger.debug(f"All data will be grabbed in the {self._universe} universe")
universe = self._universe
return universe
@property
[docs]
def job_name(self):
job_name = "{}_data{}_{}_generation".format(
self.inputs.label, str(self.idx), self.trigger_time
)
job_name = job_name.replace(".", "-")
return job_name
@property
[docs]
def label(self):
return self.job_name
@property
[docs]
def data_dump_file(self):
return DataDump.get_filename(self.inputs.data_directory, self.label)
[docs]
def _file_needs_authentication(self, fname):
"""
Check if a file needs authentication to be accessed, currently the only
repositories that need authentication are ligo.osgstorage.org and
*.storage.igwn.org.
Parameters
----------
fname: str
The file name to check
"""
proprietary_paths = ["igwn", "frames"]
return any(path in fname for path in proprietary_paths)
@property
[docs]
def scitoken_lines(self):
"""
Additional lines needed for the submit file to enable access to
proprietary files/services. Note that we do not support scoped tokens.
This is determined by the method used to issue the scitokens. For more details
see `here <https://computing.docs.ligo.org/guide/htcondor/credentials>`_.
"""
issuer = self.scitoken_issuer
if issuer is None:
return []
else:
return [f"use_oauth_services = {issuer}"]
@property
[docs]
def authenticated_file_prefix(self):
"""
Return the prefix to add to files that need authentication. This is
determined by the method used to issue the scitokens. For more details see
`here <https://computing.docs.ligo.org/guide/htcondor/credentials>`.
"""
if self.scitoken_issuer in [None, "scitokens"]:
return ""
else:
return "igwn+"
@property
[docs]
def scitoken_issuer(self):
"""
Return the issuer to use for scitokens. This is determined by the :code:`--scitoken-issuer`
argument or the version :code:`HTCondor` running on the current machine. For more details
see `here <https://computing.docs.ligo.org/guide/htcondor/credentials>`_.
"""
if self.inputs.scheduler.lower() != "condor":
return None
elif (
self.inputs.scitoken_issuer == "local"
or _is_htcondor_scitoken_local_issuer()
):
return "scitokens"
else:
return "igwn"
[docs]
def _is_htcondor_scitoken_local_issuer():
"""
Test whether the machine being used is configured to use a local issuer
or not. See `here <https://git.ligo.org/lscsoft/bilby_pipe/-/issues/304#note_1033251>`_
for where this logic comes from.
"""
try:
from htcondor import param
except ModuleNotFoundError:
logger.warning(
"HTCondor python bindings are not installed, assuming local "
"issuer for scitokens if using HTCondor."
)
return True
return param.get("LOCAL_CREDMON_ISSUER", None) is not None