from collections import defaultdict
from dataclasses import dataclass, field
import operator
from typing import Dict, Optional, Tuple, Union
import numpy as np
from numpy.lib.recfunctions import structured_to_unstructured
from astropy.table import Row
from astropy.table import hstack, vstack, setdiff
from gwpy.table import EventTable
from gwpy.table.filters import in_segmentlist, not_in_segmentlist
from ligo.segments import segment, segmentlist
from . import utils
DEFAULT_TIME_NAME = "time"
DEFAULT_DELTA_TIME = np.infty
DEFAULT_COLUMN_VALUE = -np.infty
DEFAULT_MAX_STRIDE = np.infty
DEFAULT_MAX_SAMPLES = np.infty
FEATURE_NAME_TEMPLATE = "%s-%s"
def feature_name(channel, col):
return FEATURE_NAME_TEMPLATE % (channel, col)
[docs]class Downselect(object):
"""
an object that downselects a list of triggers to a prefered one,
otherwise returning a default value
"""
_required_kwargs = []
def __init__(self, default=DEFAULT_COLUMN_VALUE, **kwargs):
self._default = default
for kwarg in self._required_kwargs:
assert kwarg in kwargs, "required kwarg=%s missing" % kwarg
self._kwargs = kwargs
@property
def default(self):
return self._default
def __call__(self, trgs, gps):
raise NotImplementedError
[docs]class DownselectLoudest(Downselect):
"""
take the loudest thing within some time window
if there is no trigger within the window, return None
"""
_required_kwargs = ["window", "time", "significance"]
@property
def window(self):
return self._kwargs["window"]
@property
def time(self):
return self._kwargs["time"]
@property
def significance(self):
return self._kwargs["significance"]
def __call__(self, trgs, gps):
# time order
sort_idx = np.argsort(trgs[self.time])
sort_trgs = np.array(trgs[sort_idx])
# set defaults
loudest = np.empty(len(gps), dtype=trgs.dtype)
start_idx = np.searchsorted(
sort_trgs[self.time], gps - self.window, side="left"
)
end_idx = np.searchsorted(sort_trgs[self.time], gps + self.window, side="right")
bins = [sort_trgs[s:e] for s, e in zip(start_idx, end_idx)]
for idx, bin_ in enumerate(bins):
if bin_.size == 0:
loudest[idx] = self.default
else:
max_idx = np.argmax(bin_[self.significance])
loudest[idx] = bin_[max_idx]
return loudest
[docs]class DownselectPointy(Downselect):
"""
take the thing with the smallest pointy-poisson p-value
"""
_required_kwargs = ["window", "time", "significance"]
def __call__(self, trgs, gps):
raise NotImplementedError
class DeltaTimeTransform:
def __init__(
self,
default=DEFAULT_COLUMN_VALUE,
default_delta_time=DEFAULT_DELTA_TIME,
time=DEFAULT_TIME_NAME,
):
self.default = default
self.default_time = default_delta_time
self.column_name = "delta{time}"
def __call__(self, column, t, t0=None, **kwargs):
return (
t - t0 if t is not self.default else self.default_time,
self.column_name.format(time=column),
)
class IdentityTransform:
def __call__(self, column, val, **kwargs):
return val, column
[docs]class Whitener(object):
"""
an object that manages whitening tables from quiver.vectorize
remembers the xformations and serves as a utility for classifiers
to map back and forth
"""
def __init__(self):
self.means = defaultdict(float)
self.stdvs = defaultdict(float)
[docs] def train(self, data, names):
"""
assumes data.shape = (Nsamples, Nfeatures)
"""
_, Nnames = data.shape
assert Nnames == len(names), "incompatible number of features/columns"
for i, name in enumerate(names):
self.means[name] = np.mean(data[:, i])
s = np.std(data[:, i])
self.stdvs[name] = s if s != 0 else 1.0
[docs] def whiten(self, data, names):
"""
map colored data into whitened data with xformation stored in this object
assumes data.shape = (Nsamples, Nfeatures)
modifies data in place, but also returns a reference
"""
Nsamp, Nnames = data.shape
assert Nnames == len(names), "incompatible number of features/columns"
means, stdvs = self._2array(names)
ones = np.ones(Nsamp)
data -= np.outer(ones, means)
data /= np.outer(ones, stdvs)
return data
[docs] def color(self, data, names):
"""
the inverse of whiten
assumes data.shape = (Nsamples, Nfeatures)
modifies data in place, but also returns a reference
"""
Nsamp, Nnames = data.shape
assert Nnames == len(names), "incompatible number of features/columns"
means, stdvs = self._2array(names)
ones = np.ones(Nsamp)
data *= np.outer(ones, stdvs)
data += np.outer(ones, means)
return data
def _2array(self, names):
Nnames = len(names)
means = np.empty(Nnames, dtype="float")
stdvs = np.empty(Nnames, dtype="float")
for i, name in enumerate(names):
if name not in self.means:
raise KeyError(
"whitener has not been trained on data for name=%s" % name
)
means[i] = self.means[name]
stdvs[i] = self.stdvs[name]
return means, stdvs
[docs]@dataclass
class Selector:
channels: Tuple
time: str = DEFAULT_TIME_NAME
downselector: Union[Downselect, None] = None
transformer: Union[ColumnTransformer, None] = None
bounds: dict = field(default_factory=dict)
[docs]@dataclass
class DataChunk:
features: Dict
columns: Tuple
start: int
end: int
segs: Optional[segmentlist] = None
skip_filter: bool = True
def __post_init__(self):
if self.segs is None:
self.segs = segmentlist([segment(self.start, self.end)])
@property
def channels(self):
return self.features.keys()
@property
def duration(self):
return self.end - self.start
@property
def livetime(self):
return utils.livetime(self.segs)
def copy(self):
return self.__class__(
start=self.start,
end=self.end,
segs=self.segs,
columns=self.columns,
features={
channel: table.copy() for channel, table in self.features.items()
},
)
def __add__(self, other):
assert set(self.columns) == set(
other.columns
), "DataChunks must have the same columns to be combined"
# update span, segments
start = min(self.start, other.start)
end = max(self.end, other.end)
segs = self.segs | other.segs
# combine features
features = {}
self_channels = set(self.channels)
other_channels = set(other.channels)
# combine features for channels present in both chunks
common_channels = self_channels & other_channels
for channel in common_channels:
features[channel] = vstack(
[self.features[channel], other.features[channel]],
join_type="exact",
)
# add features for channels present in only one chunk
for channel in self_channels - other_channels:
features[channel] = self.features[channel]
for channel in other_channels - self_channels:
features[channel] = other.features[channel]
return DataChunk(
start=start, end=end, segs=segs, columns=self.columns, features=features
)
[docs] def filter(
self,
channels=None,
columns=None,
segs=None,
time=DEFAULT_TIME_NAME,
bounds=None,
):
"""
update segments and filters out data that don't span segments
NOTE: this requires knowledge of the "time" key within data
"""
# check that all columns in bounds will be returned
if bounds is None:
bounds = dict()
else:
for col in bounds.keys():
assert col in self.columns, (
"bounds can only be placed on columns included,",
f"which column={col} is not",
)
# format and validate
if channels:
if isinstance(channels, str):
channels = [channels]
if segs is None:
segs = self.segs
assert (self.segs & segs) == segs, (
"new segs must be a strict subset of existing segs\nsegs=%s\nnew segs=%s"
% (self.segs, segs)
)
if columns:
if isinstance(columns, str):
columns = [columns]
for col in columns:
assert col in self.columns, "column=%s not in DataChunk" % col
# filter by columns
if columns:
for channel in self.features.keys():
self.features[channel] = self.features[channel][columns]
# filter by segments
self._segs = segmentlist(segs) # make a copy
for channel in self.features.keys():
self.features[channel] = self.features[channel].filter(
(time, in_segmentlist, self.segs)
)
# filter by bounds
for channel, data in self.features.items():
for col, (min_, max_) in bounds.items():
data = data.filter(
(col, operator.ge, min_),
(col, operator.le, max_),
)
self.features[channel] = data
return self
[docs] def flush(self, max_stride=DEFAULT_MAX_STRIDE, time=DEFAULT_TIME_NAME):
"""
remove data to a target span and number of samples
"""
# update span and filter
self.start = max(self.start, self.end - max_stride)
self.segs &= segmentlist([segment(self.start, self.end)])
return self.filter(time=time)
[docs] def target_times(self, time, target_channel, target_bounds, segs=None):
"""
A convenience function to extract target times.
"""
# filter by bounds
trgs = self.features[target_channel].copy()
for col, (min_, max_) in target_bounds.items():
trgs = trgs.filter(f"{col} >= {min_}", f"{col} <= {max_}")
target_times = trgs[time]
# filter times by segments
if segs is not None:
target_times = target_times[utils.times_in_segments(target_times, segs)]
return target_times
[docs] def random_times(
self, time, target_channel, dirty_bounds, dirty_window, random_rate, segs=None
):
"""
A convenience function to extract random times.
"""
# filter by bounds
trgs = self.features[target_channel].copy()
for col, (min_, max_) in dirty_bounds.items():
trgs = trgs.filter(f"{col} >= {min_}", f"{col} <= {max_}")
# generate dirty segments
dirty_times = trgs[time]
dirty_seg = utils.times2segments(dirty_times, dirty_window)
# only draw from segments that are outside of dirty segs but within segs
random_times = utils.draw_random_times(self.segs - dirty_seg, rate=random_rate)
# filter times by segments
if segs is not None:
random_times = random_times[utils.times_in_segments(random_times, segs)]
# find segments where data is clean
clean_seg = self.segs - dirty_seg
if segs is not None:
clean_seg &= segs
return random_times, clean_seg
[docs]def combine_chunks(datachunks):
"""
combine chunks into a single DataChunk
"""
if isinstance(datachunks, DataChunk) or not datachunks:
return datachunks
elif len(datachunks) == 1:
return datachunks[0]
else:
chunk = datachunks[0].copy()
for other in datachunks[1:]:
chunk += other
return chunk
[docs]class FeatureVector(Row):
"""A feature vector representing a single row in a Dataset."""
@property
def time(self):
return self["time"]
@property
def label(self):
return self["label"]
@property
def rank(self):
return self["rank"]
@property
def hash(self):
return self["hash"]
def is_evaluated(self):
return not np.isnan(self.rank)
@property
def features(self):
return self[list(set(self.colnames) - set(["time", "label", "rank", "hash"]))]
[docs]class DataTable(EventTable):
Row = FeatureVector
def as_unstructured(self):
return structured_to_unstructured(self.as_array())
class Dataset:
""" """
def __init__(
self,
times=None,
labels=None,
ranks=None,
hashes=None,
start=None,
end=None,
segs=None,
dataloader=None,
):
self._is_loaded = False
# define the provenance for this dataset
if dataloader:
self._dataloader = dataloader
self._start = dataloader.start
self._end = dataloader.end
self._segs = dataloader.segs
else:
self._dataloader = None
self._start = start
self._end = end
self._segs = (
segs if segs is not None else segmentlist([segment(start, end)])
)
# set up dataset properties
self._datatable = DataTable()
self._datatable["time"] = (
times if times is not None else np.array([], dtype="float")
)
num_rows = len(self._datatable["time"])
self._datatable["label"] = (
labels if labels is not None else np.full(num_rows, np.nan)
)
self._datatable["rank"] = (
ranks if ranks is not None else np.full(num_rows, np.nan)
)
self._datatable["hash"] = (
hashes if hashes is not None else np.full(num_rows, "", dtype="S30")
)
self._datachunk = None
self._selector = None
@property
def start(self):
return self._start
@start.setter
def start(self, start):
self._start = start
self.segs = self.segs & segmentlist([segment(self.start, self.end)])
@property
def end(self):
return self._end
@end.setter
def end(self, end):
self._end = end
self.segs = self.segs & segmentlist([segment(self.start, self.end)])
@property
def segs(self):
return self._segs
@segs.setter
def segs(self, segs):
"""updates segments and checks coverage"""
if segs:
assert (
segmentlist([segment(self.start, self.end)]) & segs
) == segs, "segments must be a subset of start, end times"
self._segs = segs
@property
def labels(self):
return self._datatable["label"]
@property
def ranks(self):
return self._datatable["rank"]
@property
def hashes(self):
return self._datatable["hash"]
@property
def times(self):
return self._datatable["time"]
@property
def features(self):
"""
return the set of features corresponding to this dataset, loading data as needed
"""
if not self.is_loaded():
self.load_data()
if self.is_tabular():
columns = list(
set(self._datatable.colnames) - set(["time", "label", "rank", "hash"])
)
return self._datatable[columns]
else:
return self._datachunk.features
@property
def selector(self):
return self._selector
def __len__(self):
return len(self.times)
def __getitem__(self, item):
return self._datatable.__getitem__(item)
def is_configured(self):
return bool(self._selector)
def is_loaded(self):
return self._is_loaded
def is_evaluated(self):
return not np.isnan(self._datatable["rank"]).any()
def is_tabular(self):
if not self.is_configured():
return None
else:
return bool(self.selector.downselector)
def configure(self, selector):
"""
configure this dataset with a Selector to define how
features are loaded and/or transformed
"""
if self._selector:
# ensure loading / processing based on selector is done again
# FIXME: can do better by checking if selector would perform an
# identical operation, and if so don't reprocess data
self._is_loaded = False
self._datachunk = None
if self.is_tabular():
self._datatable.keep_columns(["time", "label", "rank", "hash"])
self._selector = selector
def validate(self):
"""
verify all the FeatureVectors within this dataset are within segments
"""
for time in self.times:
assert utils.time_in_segments(
time, self.segs
), "time(%.6f) not in segments %s!" % (time, self.segs)
def filter(self, segs=None):
"""
updates segments and filters out vectors that are not included,
returning a dataset of the removed vectors
"""
if segs is None:
segs = self._segs
# generate removed dataset
removed = Dataset.from_datatable(
self._datatable.filter(("time", not_in_segmentlist, segs)),
start=self.start,
end=self.end,
segs=(segmentlist([segment(self.start, self.end)]) - self.segs),
dataloader=self._dataloader,
)
# filter current dataset
self._datatable = self._datatable.filter(("time", in_segmentlist, segs))
return removed
def vectors2classes(self):
"""
this only makes sense if labels are integers: 0, 1
one should not use this if labels are floats or something
"""
datasets = []
classes = (1, 0) # (glitch, clean)
for class_ in classes:
dataset = Dataset.from_datatable(
self._datatable.filter(f"label == {class_}"),
start=self.start,
end=self.end,
segs=self.segs,
dataloader=self._dataloader,
)
datasets.append(dataset)
return tuple(datasets)
def vectors2num(self):
"""
compute the number of glitch and clean samples in the feature set
do this by counting their labels
return num_gch, num_cln
"""
num_glitch = np.nansum(self.labels)
num_unlabeled = np.count_nonzero(np.isnan(self.labels))
# axiomatic because of how we define labels in 2-class classification
num_clean = len(self.labels) - num_glitch - num_unlabeled
return num_glitch, num_clean
def evaluate(self, ranks, hashes=None):
"""
evaluate feature vectors in this dataset.
Assumes ordering of ranks is the same as the ordering of vectors
"""
assert len(ranks) == len(
self._datatable
), "ranks must have the same length as dataset"
self._datatable["rank"] = ranks
if hashes is not None:
if not isinstance(hashes, str): # a single hash
assert len(hashes) == len(
self
), "hashes must be a single string or a list for each feature vector"
self._datatable["hash"] = hashes
return self
def copy(self):
"""
return a copy of this dataset
"""
new = Dataset(
self.times,
start=self.start,
end=self.end,
segs=segmentlist(self.segs),
dataloader=self._dataloader,
)
new._datatable = self._datatable.copy()
if self._datachunk:
new._datachunk = self._datachunk.copy()
return new
def append(self, other):
"""
a convenience method for adding vectors from another dataset into this one
"""
assert not (
self.is_loaded() ^ other.is_loaded()
), "can't mix a dataset that is loaded with one that is not loaded"
assert not (
bool(self._dataloader) ^ bool(other._dataloader)
), "can't mix a dataset with a dataloader with one that doesn't have one"
assert not (
bool(self._datachunk) ^ bool(other._datachunk)
), "can't mix a dataset with a datachunk with one that doesn't have one"
if self._dataloader and other._dataloader:
self._dataloader = self._dataloader + other._dataloader
if self._datachunk and other._datachunk:
self._datachunk += other._datachunk
self._datatable = vstack([self._datatable, other._datatable], join_type="exact")
self._start = min(self.start, other.start)
self._end = max(self.end, other.end)
self._segs = self.segs | other.segs
def __iadd__(self, other):
self.append(other)
return self
def remove(self, other):
"""
removes all feature vectors in a target dataset from the current dataset
does not assume any particular sorting
NOTE: modifies dataset in place
"""
self._datatable = setdiff(self._datatable, other._datatable)
def __isub__(self, other):
self.remove(other)
return self
def flush(self, max_stride=DEFAULT_MAX_STRIDE, max_samples=DEFAULT_MAX_SAMPLES):
"""
remove feature vectors from this dataset to a target span and number of samples
"""
# update span of dataset and filter
self._start = max(self.start, self.end - max_stride)
removed = self.filter()
# remove more samples as necessary. this assumes implicit time ordering
# (we throw away old stuff first), but that's not strictly enforced
if len(self._datatable) > max_samples:
removed._datatable = vstack(
[
self._datatable[: len(self._datatable) - max_samples],
removed._datatable,
],
join_type="exact",
)
self._datatable = self._datatable[-max_samples:]
return removed
def load_data(self, verbose=False):
"""
load data into this dataset from its corresponding data loader
"""
if self.is_loaded():
return
assert (
self.is_configured()
), "dataset needs to be configured via .configure() before data can be loaded"
# load features and initialize dataset
self._datachunk = self._extract_data(verbose=verbose)
if self.is_tabular():
assert len(
self.times
), "must have at least one FeatureVector to vectorize a dataset"
data, names = self._downselect_data()
data, names = self._transform_data(data, names)
self._datatable = hstack(
[DataTable(data, names=names), self._datatable], join_type="exact"
)
self._is_loaded = True
@classmethod
def from_datatable(
cls, datatable, start=None, end=None, segs=None, dataloader=None
):
"""
construct a dataset from a datatable
"""
dataset = cls(
datatable["time"],
labels=datatable["label"],
ranks=datatable["rank"],
hashes=datatable["hash"],
start=start,
end=end,
segs=segs,
dataloader=dataloader,
)
dataset._datatable = datatable
return dataset
@classmethod
def from_datachunks(
cls, datachunks, times=None, labels=None, ranks=None, hashes=None
):
"""
construct a dataset from datachunks
"""
# merge chunks
chunk = combine_chunks(datachunks)
dataset = cls(
times=times,
labels=labels,
ranks=ranks,
hashes=hashes,
start=chunk.start,
end=chunk.end,
segs=chunk.segs,
)
dataset._datachunk = chunk
return dataset
def _extract_data(self, verbose=False):
"""
extract the relevant data from a data loader
NOTE: does not return a structured array. Instead returns
data, names
"""
# if datachunk is already defined, return that instead
# after filtering accordingly
if self._datachunk:
if self._datachunk.skip_filter:
return self._datachunk
else:
return self._datachunk.filter(
channels=self.selector.channels,
time=self.selector.time,
bounds=self.selector.bounds,
segs=self.segs,
)
# define max extent for data to minimize I/O by estimating the
# density of features requested. if dataset is dense enough, the I/O
# savings will be minimal by breaking up the query into discrete segments
if self.is_tabular():
select = self.selector.downselector
if len(self.times) < (utils.livetime(self.segs) * 2 * select.window):
gps_segs = segmentlist(
[
segment(gps - select.window, gps + select.window)
for gps in self.times
]
).coalesce()
else:
gps_segs = segmentlist(
[
segment(
min(self.times) - select.window,
max(self.times) + select.window,
)
]
)
relevant_segs = gps_segs & self.segs
else:
relevant_segs = None
# query data
return self._dataloader.query(
channels=self.selector.channels,
time=self.selector.time,
bounds=self.selector.bounds,
segs=relevant_segs,
verbose=verbose,
)
def _downselect_data(self):
"""
downselect data given a downselecting scheme
"""
select = self.selector.downselector
Ncol = len(self._datachunk.columns)
# generate place holders
data = np.empty(
(len(self.times), len(self.selector.channels) * Ncol), dtype="float"
)
# iterate over channels and select the best features
# filling in default values as needed
# NOTE: assumes ordering is consistent, which should be true
for i, channel in enumerate(self.selector.channels):
if channel in self._datachunk.features:
data[:, i * Ncol : (i + 1) * Ncol] = structured_to_unstructured(
select(self._datachunk.features[channel], self.times)
)
else:
data[:, i * Ncol : (i + 1) * Ncol] = select.default
return data, self._names(self.selector.channels, select)
def _transform_data(self, data, names):
"""
transform the data extracted via self.extract_data into a dataset
"""
dtype = []
column_map = self.selector.transformer
for i, (channel, column) in enumerate(names):
data[:, i], column = column_map(column, data[:, i], t0=self.times)
dtype.append(feature_name(channel, column))
return data, dtype
def _names(self, channels, select):
return [
(channel, col) for channel in channels for col in self._datachunk.columns
]