Source code for idq.io.triggers.omicron

import glob
import os

import numpy as np
from tqdm import tqdm

from gwpy.table import EventTable
from gwpy.table.filters import in_segmentlist
from gwtrigfind import find_trigger_files
from lal.utils import CacheEntry

from ... import exceptions
from ... import names
from ... import utils
from ... import hookimpl
from . import DataLoader


DEFAULT_OMICRON_EXT = "h5"
DEFAULT_CLUSTER_WINDOW = 0.1


def find_triggers(
    channel,
    start,
    end,
    ext,
    rootdir=None,
    instrument=None,
    all_gps_dirs=False,
    **kwargs,
):
    """Find Omicron triggers for a given channel"""
    # use gwtrigfind to auto-discover triggers if
    # a specific location is not provided
    if not rootdir:
        return find_trigger_files(channel, "omicron", start, end, ext=ext)

    # assume a specific filename convention
    omicron_channel = f"{channel.split(':')[1]}_OMICRON"
    filename = names.channel2omicron_filename(instrument, omicron_channel, suffix=ext)
    if all_gps_dirs:
        gps_dirs = ["*"]  # just target all directories
    else:
        gps_dirs = utils.start_end2gps_dirs(start, end)

    # find all triggers with the given directory structure
    cache = []
    for gps_dir in gps_dirs:
        cache.extend(
            glob.glob(os.path.join(rootdir, omicron_channel, gps_dir, filename))
        )
    return cache


[docs]class OmicronDataLoader(DataLoader): """an extension meant to read Omicron triggers off disk""" _default_columns = utils.OMICRON_COLUMNS _allowed_columns = utils.OMICRON_COLUMNS _required_kwargs = ["instrument"] _format_args = { "h5": {"format": "hdf5", "path": "triggers"}, "root": {"format": "root", "treename": "triggers"}, } def _query(self, channels=None, bounds=None, verbose=False, **kwargs): if bounds is None: bounds = {} if channels is None: ifo = self.kwargs["instrument"] if "rootdir" in self.kwargs: rootdir = self.kwargs["rootdir"] else: rootdir = os.path.join(os.sep, "home", "detchar", "triggers", ifo) channels = [ f"{ifo}:{channel.replace('_OMICRON', '')}" for channel in os.listdir(rootdir) ] data = {} dtype = [(key, "float") for key in self.columns] # set up filters for selection selection = [(utils.OMICRON_TIMECOL, in_segmentlist, self.segs)] if bounds: for col, (min_, max_) in bounds.items(): selection.extend([f"{col} >= {min_}", f"{col} <= {max_}"]) # read in triggers ext = self.kwargs.get("format", DEFAULT_OMICRON_EXT) for channel in tqdm(channels, desc="loading data", disable=not verbose): # generate file cache and filter by segments # NOTE: some production Omicron files are found # with insufficient permissions which can cause # problems. we can optionally skip over these # problematic files with `skip_bad_files`. try: cache = find_triggers(channel, self.start, self.end, ext, **self.kwargs) except ValueError: continue else: cache = [CacheEntry.from_T050017(path) for path in cache] cache = [c for c in cache if self.segs.intersects_segment(c.segment)] if self.kwargs.get("skip_bad_files", False): cache = [c for c in cache if os.access(c.path, os.R_OK)] if not cache: continue # read in triggers data[channel] = EventTable.read( cache, columns=self.columns, selection=selection, nproc=self.kwargs.get("nproc", 1), **self._format_args[ext], ) # cluster triggers if needed if ext == "root": data[channel] = data[channel].cluster( utils.OMICRON_TIMECOL, utils.OMICRON_SIGCOL, DEFAULT_CLUSTER_WINDOW ) if not data: raise exceptions.NoDataError(self.start, self.end - self.start) # fill in missing channels for channel in channels: if channel not in data: data[channel] = EventTable(data=np.array([], dtype=dtype)) return data
@hookimpl def get_dataloaders(): return { "omicron": OmicronDataLoader, }