import glob
import os
import numpy as np
from tqdm import tqdm
from gwpy.table import EventTable
from gwpy.table.filters import in_segmentlist
from gwtrigfind import find_trigger_files
from lal.utils import CacheEntry
from ... import exceptions
from ... import names
from ... import utils
from ... import hookimpl
from . import DataLoader
DEFAULT_OMICRON_EXT = "h5"
DEFAULT_CLUSTER_WINDOW = 0.1
def find_triggers(
channel,
start,
end,
ext,
rootdir=None,
instrument=None,
all_gps_dirs=False,
**kwargs,
):
"""Find Omicron triggers for a given channel"""
# use gwtrigfind to auto-discover triggers if
# a specific location is not provided
if not rootdir:
return find_trigger_files(channel, "omicron", start, end, ext=ext)
# assume a specific filename convention
omicron_channel = f"{channel.split(':')[1]}_OMICRON"
filename = names.channel2omicron_filename(instrument, omicron_channel, suffix=ext)
if all_gps_dirs:
gps_dirs = ["*"] # just target all directories
else:
gps_dirs = utils.start_end2gps_dirs(start, end)
# find all triggers with the given directory structure
cache = []
for gps_dir in gps_dirs:
cache.extend(
glob.glob(os.path.join(rootdir, omicron_channel, gps_dir, filename))
)
return cache
[docs]class OmicronDataLoader(DataLoader):
"""an extension meant to read Omicron triggers off disk"""
_default_columns = utils.OMICRON_COLUMNS
_allowed_columns = utils.OMICRON_COLUMNS
_required_kwargs = ["instrument"]
_format_args = {
"h5": {"format": "hdf5", "path": "triggers"},
"root": {"format": "root", "treename": "triggers"},
}
def _query(self, channels=None, bounds=None, verbose=False, **kwargs):
if bounds is None:
bounds = {}
if channels is None:
ifo = self.kwargs["instrument"]
if "rootdir" in self.kwargs:
rootdir = self.kwargs["rootdir"]
else:
rootdir = os.path.join(os.sep, "home", "detchar", "triggers", ifo)
channels = [
f"{ifo}:{channel.replace('_OMICRON', '')}"
for channel in os.listdir(rootdir)
]
data = {}
dtype = [(key, "float") for key in self.columns]
# set up filters for selection
selection = [(utils.OMICRON_TIMECOL, in_segmentlist, self.segs)]
if bounds:
for col, (min_, max_) in bounds.items():
selection.extend([f"{col} >= {min_}", f"{col} <= {max_}"])
# read in triggers
ext = self.kwargs.get("format", DEFAULT_OMICRON_EXT)
for channel in tqdm(channels, desc="loading data", disable=not verbose):
# generate file cache and filter by segments
# NOTE: some production Omicron files are found
# with insufficient permissions which can cause
# problems. we can optionally skip over these
# problematic files with `skip_bad_files`.
try:
cache = find_triggers(channel, self.start, self.end, ext, **self.kwargs)
except ValueError:
continue
else:
cache = [CacheEntry.from_T050017(path) for path in cache]
cache = [c for c in cache if self.segs.intersects_segment(c.segment)]
if self.kwargs.get("skip_bad_files", False):
cache = [c for c in cache if os.access(c.path, os.R_OK)]
if not cache:
continue
# read in triggers
data[channel] = EventTable.read(
cache,
columns=self.columns,
selection=selection,
nproc=self.kwargs.get("nproc", 1),
**self._format_args[ext],
)
# cluster triggers if needed
if ext == "root":
data[channel] = data[channel].cluster(
utils.OMICRON_TIMECOL, utils.OMICRON_SIGCOL, DEFAULT_CLUSTER_WINDOW
)
if not data:
raise exceptions.NoDataError(self.start, self.end - self.start)
# fill in missing channels
for channel in channels:
if channel not in data:
data[channel] = EventTable(data=np.array([], dtype=dtype))
return data
@hookimpl
def get_dataloaders():
return {
"omicron": OmicronDataLoader,
}