Source code for idq.features

from collections import defaultdict
from dataclasses import dataclass, field
import operator
from typing import Dict, Optional, Tuple, Union

import numpy as np
from numpy.lib.recfunctions import structured_to_unstructured

from astropy.table import Row
from astropy.table import hstack, vstack, setdiff
from gwpy.table import EventTable
from gwpy.table.filters import in_segmentlist, not_in_segmentlist
from ligo.segments import segment, segmentlist

from . import utils


DEFAULT_TIME_NAME = "time"
DEFAULT_DELTA_TIME = np.infty
DEFAULT_COLUMN_VALUE = -np.infty

DEFAULT_MAX_STRIDE = np.infty
DEFAULT_MAX_SAMPLES = np.infty

FEATURE_NAME_TEMPLATE = "%s-%s"


def feature_name(channel, col):
    return FEATURE_NAME_TEMPLATE % (channel, col)


[docs]class Downselect(object): """ an object that downselects a list of triggers to a prefered one, otherwise returning a default value """ _required_kwargs = [] def __init__(self, default=DEFAULT_COLUMN_VALUE, **kwargs): self._default = default for kwarg in self._required_kwargs: assert kwarg in kwargs, "required kwarg=%s missing" % kwarg self._kwargs = kwargs @property def default(self): return self._default def __call__(self, trgs, gps): raise NotImplementedError
[docs]class DownselectLoudest(Downselect): """ take the loudest thing within some time window if there is no trigger within the window, return None """ _required_kwargs = ["window", "time", "significance"] @property def window(self): return self._kwargs["window"] @property def time(self): return self._kwargs["time"] @property def significance(self): return self._kwargs["significance"] def __call__(self, trgs, gps): # time order sort_idx = np.argsort(trgs[self.time]) sort_trgs = np.array(trgs[sort_idx]) # set defaults loudest = np.empty(len(gps), dtype=trgs.dtype) start_idx = np.searchsorted( sort_trgs[self.time], gps - self.window, side="left" ) end_idx = np.searchsorted(sort_trgs[self.time], gps + self.window, side="right") bins = [sort_trgs[s:e] for s, e in zip(start_idx, end_idx)] for idx, bin_ in enumerate(bins): if bin_.size == 0: loudest[idx] = self.default else: max_idx = np.argmax(bin_[self.significance]) loudest[idx] = bin_[max_idx] return loudest
[docs]class DownselectPointy(Downselect): """ take the thing with the smallest pointy-poisson p-value """ _required_kwargs = ["window", "time", "significance"] def __call__(self, trgs, gps): raise NotImplementedError
[docs]class ColumnTransformer: """ manages transformations of columns on an individual feature/FeatureVector basis useful when the same transformation is applied to the same column from many channels """ def __init__(self, transforms, default): self._transforms = transforms self._default = default def __call__(self, column, val, **kwargs): transform = self._transforms.get(column, self._default) return transform(column, val, **kwargs)
[docs]class DeltaTimeTransformer(ColumnTransformer): """ extends column map specifically to transform raw timestamps to relative timestamps """ def __init__( self, time=DEFAULT_TIME_NAME, default=DEFAULT_COLUMN_VALUE, default_delta_time=DEFAULT_DELTA_TIME, **kwargs, ): deltat_transform = DeltaTimeTransform( time=time, default=default, default_delta_time=default_delta_time ) transforms = {time: deltat_transform} super().__init__(transforms, IdentityTransform())
class DeltaTimeTransform: def __init__( self, default=DEFAULT_COLUMN_VALUE, default_delta_time=DEFAULT_DELTA_TIME, time=DEFAULT_TIME_NAME, ): self.default = default self.default_time = default_delta_time self.column_name = "delta{time}" def __call__(self, column, t, t0=None, **kwargs): return ( t - t0 if t is not self.default else self.default_time, self.column_name.format(time=column), ) class IdentityTransform: def __call__(self, column, val, **kwargs): return val, column
[docs]class Whitener(object): """ an object that manages whitening tables from quiver.vectorize remembers the xformations and serves as a utility for classifiers to map back and forth """ def __init__(self): self.means = defaultdict(float) self.stdvs = defaultdict(float)
[docs] def train(self, data, names): """ assumes data.shape = (Nsamples, Nfeatures) """ _, Nnames = data.shape assert Nnames == len(names), "incompatible number of features/columns" for i, name in enumerate(names): self.means[name] = np.mean(data[:, i]) s = np.std(data[:, i]) self.stdvs[name] = s if s != 0 else 1.0
[docs] def whiten(self, data, names): """ map colored data into whitened data with xformation stored in this object assumes data.shape = (Nsamples, Nfeatures) modifies data in place, but also returns a reference """ Nsamp, Nnames = data.shape assert Nnames == len(names), "incompatible number of features/columns" means, stdvs = self._2array(names) ones = np.ones(Nsamp) data -= np.outer(ones, means) data /= np.outer(ones, stdvs) return data
[docs] def color(self, data, names): """ the inverse of whiten assumes data.shape = (Nsamples, Nfeatures) modifies data in place, but also returns a reference """ Nsamp, Nnames = data.shape assert Nnames == len(names), "incompatible number of features/columns" means, stdvs = self._2array(names) ones = np.ones(Nsamp) data *= np.outer(ones, stdvs) data += np.outer(ones, means) return data
def _2array(self, names): Nnames = len(names) means = np.empty(Nnames, dtype="float") stdvs = np.empty(Nnames, dtype="float") for i, name in enumerate(names): if name not in self.means: raise KeyError( "whitener has not been trained on data for name=%s" % name ) means[i] = self.means[name] stdvs[i] = self.stdvs[name] return means, stdvs
[docs]@dataclass class Selector: channels: Tuple time: str = DEFAULT_TIME_NAME downselector: Union[Downselect, None] = None transformer: Union[ColumnTransformer, None] = None bounds: dict = field(default_factory=dict)
[docs]@dataclass class DataChunk: features: Dict columns: Tuple start: int end: int segs: Optional[segmentlist] = None skip_filter: bool = True def __post_init__(self): if self.segs is None: self.segs = segmentlist([segment(self.start, self.end)]) @property def channels(self): return self.features.keys() @property def duration(self): return self.end - self.start @property def livetime(self): return utils.livetime(self.segs) def copy(self): return self.__class__( start=self.start, end=self.end, segs=self.segs, columns=self.columns, features={ channel: table.copy() for channel, table in self.features.items() }, ) def __add__(self, other): assert set(self.columns) == set( other.columns ), "DataChunks must have the same columns to be combined" # update span, segments start = min(self.start, other.start) end = max(self.end, other.end) segs = self.segs | other.segs # combine features features = {} self_channels = set(self.channels) other_channels = set(other.channels) # combine features for channels present in both chunks common_channels = self_channels & other_channels for channel in common_channels: features[channel] = vstack( [self.features[channel], other.features[channel]], join_type="exact", ) # add features for channels present in only one chunk for channel in self_channels - other_channels: features[channel] = self.features[channel] for channel in other_channels - self_channels: features[channel] = other.features[channel] return DataChunk( start=start, end=end, segs=segs, columns=self.columns, features=features )
[docs] def filter( self, channels=None, columns=None, segs=None, time=DEFAULT_TIME_NAME, bounds=None, ): """ update segments and filters out data that don't span segments NOTE: this requires knowledge of the "time" key within data """ # check that all columns in bounds will be returned if bounds is None: bounds = dict() else: for col in bounds.keys(): assert col in self.columns, ( "bounds can only be placed on columns included,", f"which column={col} is not", ) # format and validate if channels: if isinstance(channels, str): channels = [channels] if segs is None: segs = self.segs assert (self.segs & segs) == segs, ( "new segs must be a strict subset of existing segs\nsegs=%s\nnew segs=%s" % (self.segs, segs) ) if columns: if isinstance(columns, str): columns = [columns] for col in columns: assert col in self.columns, "column=%s not in DataChunk" % col # filter by columns if columns: for channel in self.features.keys(): self.features[channel] = self.features[channel][columns] # filter by segments self._segs = segmentlist(segs) # make a copy for channel in self.features.keys(): self.features[channel] = self.features[channel].filter( (time, in_segmentlist, self.segs) ) # filter by bounds for channel, data in self.features.items(): for col, (min_, max_) in bounds.items(): data = data.filter( (col, operator.ge, min_), (col, operator.le, max_), ) self.features[channel] = data return self
[docs] def flush(self, max_stride=DEFAULT_MAX_STRIDE, time=DEFAULT_TIME_NAME): """ remove data to a target span and number of samples """ # update span and filter self.start = max(self.start, self.end - max_stride) self.segs &= segmentlist([segment(self.start, self.end)]) return self.filter(time=time)
[docs] def target_times(self, time, target_channel, target_bounds, segs=None): """ A convenience function to extract target times. """ # filter by bounds trgs = self.features[target_channel].copy() for col, (min_, max_) in target_bounds.items(): trgs = trgs.filter(f"{col} >= {min_}", f"{col} <= {max_}") target_times = trgs[time] # filter times by segments if segs is not None: target_times = target_times[utils.times_in_segments(target_times, segs)] return target_times
[docs] def random_times( self, time, target_channel, dirty_bounds, dirty_window, random_rate, segs=None ): """ A convenience function to extract random times. """ # filter by bounds trgs = self.features[target_channel].copy() for col, (min_, max_) in dirty_bounds.items(): trgs = trgs.filter(f"{col} >= {min_}", f"{col} <= {max_}") # generate dirty segments dirty_times = trgs[time] dirty_seg = utils.times2segments(dirty_times, dirty_window) # only draw from segments that are outside of dirty segs but within segs random_times = utils.draw_random_times(self.segs - dirty_seg, rate=random_rate) # filter times by segments if segs is not None: random_times = random_times[utils.times_in_segments(random_times, segs)] # find segments where data is clean clean_seg = self.segs - dirty_seg if segs is not None: clean_seg &= segs return random_times, clean_seg
[docs]def combine_chunks(datachunks): """ combine chunks into a single DataChunk """ if isinstance(datachunks, DataChunk) or not datachunks: return datachunks elif len(datachunks) == 1: return datachunks[0] else: chunk = datachunks[0].copy() for other in datachunks[1:]: chunk += other return chunk
[docs]class FeatureVector(Row): """A feature vector representing a single row in a Dataset.""" @property def time(self): return self["time"] @property def label(self): return self["label"] @property def rank(self): return self["rank"] @property def hash(self): return self["hash"] def is_evaluated(self): return not np.isnan(self.rank) @property def features(self): return self[list(set(self.colnames) - set(["time", "label", "rank", "hash"]))]
[docs]class DataTable(EventTable): Row = FeatureVector def as_unstructured(self): return structured_to_unstructured(self.as_array())
class Dataset: """ """ def __init__( self, times=None, labels=None, ranks=None, hashes=None, start=None, end=None, segs=None, dataloader=None, ): self._is_loaded = False # define the provenance for this dataset if dataloader: self._dataloader = dataloader self._start = dataloader.start self._end = dataloader.end self._segs = dataloader.segs else: self._dataloader = None self._start = start self._end = end self._segs = ( segs if segs is not None else segmentlist([segment(start, end)]) ) # set up dataset properties self._datatable = DataTable() self._datatable["time"] = ( times if times is not None else np.array([], dtype="float") ) num_rows = len(self._datatable["time"]) self._datatable["label"] = ( labels if labels is not None else np.full(num_rows, np.nan) ) self._datatable["rank"] = ( ranks if ranks is not None else np.full(num_rows, np.nan) ) self._datatable["hash"] = ( hashes if hashes is not None else np.full(num_rows, "", dtype="S30") ) self._datachunk = None self._selector = None @property def start(self): return self._start @start.setter def start(self, start): self._start = start self.segs = self.segs & segmentlist([segment(self.start, self.end)]) @property def end(self): return self._end @end.setter def end(self, end): self._end = end self.segs = self.segs & segmentlist([segment(self.start, self.end)]) @property def segs(self): return self._segs @segs.setter def segs(self, segs): """updates segments and checks coverage""" if segs: assert ( segmentlist([segment(self.start, self.end)]) & segs ) == segs, "segments must be a subset of start, end times" self._segs = segs @property def labels(self): return self._datatable["label"] @property def ranks(self): return self._datatable["rank"] @property def hashes(self): return self._datatable["hash"] @property def times(self): return self._datatable["time"] @property def features(self): """ return the set of features corresponding to this dataset, loading data as needed """ if not self.is_loaded(): self.load_data() if self.is_tabular(): columns = list( set(self._datatable.colnames) - set(["time", "label", "rank", "hash"]) ) return self._datatable[columns] else: return self._datachunk.features @property def selector(self): return self._selector def __len__(self): return len(self.times) def __getitem__(self, item): return self._datatable.__getitem__(item) def is_configured(self): return bool(self._selector) def is_loaded(self): return self._is_loaded def is_evaluated(self): return not np.isnan(self._datatable["rank"]).any() def is_tabular(self): if not self.is_configured(): return None else: return bool(self.selector.downselector) def configure(self, selector): """ configure this dataset with a Selector to define how features are loaded and/or transformed """ if self._selector: # ensure loading / processing based on selector is done again # FIXME: can do better by checking if selector would perform an # identical operation, and if so don't reprocess data self._is_loaded = False self._datachunk = None if self.is_tabular(): self._datatable.keep_columns(["time", "label", "rank", "hash"]) self._selector = selector def validate(self): """ verify all the FeatureVectors within this dataset are within segments """ for time in self.times: assert utils.time_in_segments( time, self.segs ), "time(%.6f) not in segments %s!" % (time, self.segs) def filter(self, segs=None): """ updates segments and filters out vectors that are not included, returning a dataset of the removed vectors """ if segs is None: segs = self._segs # generate removed dataset removed = Dataset.from_datatable( self._datatable.filter(("time", not_in_segmentlist, segs)), start=self.start, end=self.end, segs=(segmentlist([segment(self.start, self.end)]) - self.segs), dataloader=self._dataloader, ) # filter current dataset self._datatable = self._datatable.filter(("time", in_segmentlist, segs)) return removed def vectors2classes(self): """ this only makes sense if labels are integers: 0, 1 one should not use this if labels are floats or something """ datasets = [] classes = (1, 0) # (glitch, clean) for class_ in classes: dataset = Dataset.from_datatable( self._datatable.filter(f"label == {class_}"), start=self.start, end=self.end, segs=self.segs, dataloader=self._dataloader, ) datasets.append(dataset) return tuple(datasets) def vectors2num(self): """ compute the number of glitch and clean samples in the feature set do this by counting their labels return num_gch, num_cln """ num_glitch = np.nansum(self.labels) num_unlabeled = np.count_nonzero(np.isnan(self.labels)) # axiomatic because of how we define labels in 2-class classification num_clean = len(self.labels) - num_glitch - num_unlabeled return num_glitch, num_clean def evaluate(self, ranks, hashes=None): """ evaluate feature vectors in this dataset. Assumes ordering of ranks is the same as the ordering of vectors """ assert len(ranks) == len( self._datatable ), "ranks must have the same length as dataset" self._datatable["rank"] = ranks if hashes is not None: if not isinstance(hashes, str): # a single hash assert len(hashes) == len( self ), "hashes must be a single string or a list for each feature vector" self._datatable["hash"] = hashes return self def copy(self): """ return a copy of this dataset """ new = Dataset( self.times, start=self.start, end=self.end, segs=segmentlist(self.segs), dataloader=self._dataloader, ) new._datatable = self._datatable.copy() if self._datachunk: new._datachunk = self._datachunk.copy() return new def append(self, other): """ a convenience method for adding vectors from another dataset into this one """ assert not ( self.is_loaded() ^ other.is_loaded() ), "can't mix a dataset that is loaded with one that is not loaded" assert not ( bool(self._dataloader) ^ bool(other._dataloader) ), "can't mix a dataset with a dataloader with one that doesn't have one" assert not ( bool(self._datachunk) ^ bool(other._datachunk) ), "can't mix a dataset with a datachunk with one that doesn't have one" if self._dataloader and other._dataloader: self._dataloader = self._dataloader + other._dataloader if self._datachunk and other._datachunk: self._datachunk += other._datachunk self._datatable = vstack([self._datatable, other._datatable], join_type="exact") self._start = min(self.start, other.start) self._end = max(self.end, other.end) self._segs = self.segs | other.segs def __iadd__(self, other): self.append(other) return self def remove(self, other): """ removes all feature vectors in a target dataset from the current dataset does not assume any particular sorting NOTE: modifies dataset in place """ self._datatable = setdiff(self._datatable, other._datatable) def __isub__(self, other): self.remove(other) return self def flush(self, max_stride=DEFAULT_MAX_STRIDE, max_samples=DEFAULT_MAX_SAMPLES): """ remove feature vectors from this dataset to a target span and number of samples """ # update span of dataset and filter self._start = max(self.start, self.end - max_stride) removed = self.filter() # remove more samples as necessary. this assumes implicit time ordering # (we throw away old stuff first), but that's not strictly enforced if len(self._datatable) > max_samples: removed._datatable = vstack( [ self._datatable[: len(self._datatable) - max_samples], removed._datatable, ], join_type="exact", ) self._datatable = self._datatable[-max_samples:] return removed def load_data(self, verbose=False): """ load data into this dataset from its corresponding data loader """ if self.is_loaded(): return assert ( self.is_configured() ), "dataset needs to be configured via .configure() before data can be loaded" # load features and initialize dataset self._datachunk = self._extract_data(verbose=verbose) if self.is_tabular(): assert len( self.times ), "must have at least one FeatureVector to vectorize a dataset" data, names = self._downselect_data() data, names = self._transform_data(data, names) self._datatable = hstack( [DataTable(data, names=names), self._datatable], join_type="exact" ) self._is_loaded = True @classmethod def from_datatable( cls, datatable, start=None, end=None, segs=None, dataloader=None ): """ construct a dataset from a datatable """ dataset = cls( datatable["time"], labels=datatable["label"], ranks=datatable["rank"], hashes=datatable["hash"], start=start, end=end, segs=segs, dataloader=dataloader, ) dataset._datatable = datatable return dataset @classmethod def from_datachunks( cls, datachunks, times=None, labels=None, ranks=None, hashes=None ): """ construct a dataset from datachunks """ # merge chunks chunk = combine_chunks(datachunks) dataset = cls( times=times, labels=labels, ranks=ranks, hashes=hashes, start=chunk.start, end=chunk.end, segs=chunk.segs, ) dataset._datachunk = chunk return dataset def _extract_data(self, verbose=False): """ extract the relevant data from a data loader NOTE: does not return a structured array. Instead returns data, names """ # if datachunk is already defined, return that instead # after filtering accordingly if self._datachunk: if self._datachunk.skip_filter: return self._datachunk else: return self._datachunk.filter( channels=self.selector.channels, time=self.selector.time, bounds=self.selector.bounds, segs=self.segs, ) # define max extent for data to minimize I/O by estimating the # density of features requested. if dataset is dense enough, the I/O # savings will be minimal by breaking up the query into discrete segments if self.is_tabular(): select = self.selector.downselector if len(self.times) < (utils.livetime(self.segs) * 2 * select.window): gps_segs = segmentlist( [ segment(gps - select.window, gps + select.window) for gps in self.times ] ).coalesce() else: gps_segs = segmentlist( [ segment( min(self.times) - select.window, max(self.times) + select.window, ) ] ) relevant_segs = gps_segs & self.segs else: relevant_segs = None # query data return self._dataloader.query( channels=self.selector.channels, time=self.selector.time, bounds=self.selector.bounds, segs=relevant_segs, verbose=verbose, ) def _downselect_data(self): """ downselect data given a downselecting scheme """ select = self.selector.downselector Ncol = len(self._datachunk.columns) # generate place holders data = np.empty( (len(self.times), len(self.selector.channels) * Ncol), dtype="float" ) # iterate over channels and select the best features # filling in default values as needed # NOTE: assumes ordering is consistent, which should be true for i, channel in enumerate(self.selector.channels): if channel in self._datachunk.features: data[:, i * Ncol : (i + 1) * Ncol] = structured_to_unstructured( select(self._datachunk.features[channel], self.times) ) else: data[:, i * Ncol : (i + 1) * Ncol] = select.default return data, self._names(self.selector.channels, select) def _transform_data(self, data, names): """ transform the data extracted via self.extract_data into a dataset """ dtype = [] column_map = self.selector.transformer for i, (channel, column) in enumerate(names): data[:, i], column = column_map(column, data[:, i], t0=self.times) dtype.append(feature_name(channel, column)) return data, dtype def _names(self, channels, select): return [ (channel, col) for channel in channels for col in self._datachunk.columns ]