snax.utils module

class snax.utils.ETGFeatureQueue(channels, columns, **kwargs)[source]

Bases: object

Class for storing feature data when pipeline is running in ETG mode, i.e. report all triggers above an SNR threshold. NOTE: assumes that ingested features are time ordered.

append(timestamp, channel, row)[source]
flush()[source]
pop()[source]
class snax.utils.FeatureData(columns, keys=None, **kwargs)[source]

Bases: object

Base class for saving feature data. Extend for a specific file-based implementation.

append(key, value)[source]
clear()[source]
dump(path)[source]
class snax.utils.HDF5ETGFeatureData(columns, keys, **kwargs)[source]

Bases: FeatureData

! Saves feature data with varying dataset lengths (when run in ETG mode) to hdf5.

append(timestamp, features)[source]

Append a trigger row to data structure

NOTE: timestamp arg is here purely to match API, not used in append

clear()[source]
dump(path, base, start_time, tmp=False)[source]

Saves the current cadence of gps triggers to disk and clear out data

class snax.utils.HDF5TimeseriesFeatureData(columns, keys, **kwargs)[source]

Bases: FeatureData

Saves feature data to hdf5 as regularly sampled timeseries.

append(timestamp, features)[source]

Append a feature buffer to data structure

clear()[source]
dump(path, base, start_time, tmp=False)[source]

Saves the current cadence of features to disk and clear out data

class snax.utils.TimeseriesFeatureQueue(channels, columns, **kwargs)[source]

Bases: object

Class for storing regularly sampled feature data. NOTE: assumes that ingested features are time ordered.

Example:
>>> # create the queue
>>> columns = ['time', 'snr']
>>> channels = ['channel1']
>>> queue = TimeseriesFeatureQueue(channels, columns, sample_rate=1, buffer_size=1)
>>> # add features
>>> queue.append(123450, 'channel1', {'time': 123450.3, 'snr': 3.0})
>>> queue.append(123451, 'channel1', {'time': 123451.7, 'snr': 6.5})
>>> queue.append(123452, 'channel1', {'time': 123452.4, 'snr': 5.2})
>>> # get oldest feature
>>> row = queue.pop()
>>> row['timestamp']
123450
>>> row['features']['channel1']
[{'time': 123450.3, 'snr': 3.0}]
append(timestamp, channel, row)[source]
flush()[source]
pop()[source]
snax.utils.create_new_dataset(path, base, data, name='data', group=None, tmp=False, metadata=None)[source]

A function to create a new dataset with data @param data. The data will be stored in an hdf5 file at path @param path with base name @param base. You can also make a temporary file. If specified, will also save metadata given as key value pairs.

Returns the filename where the dataset was created.

snax.utils.feature_dtype(columns)[source]

given a set of columns, returns back numpy dtypes associated with those columns. All time-based columns are double-precision, others are stored in single-precision.

snax.utils.floor_div(x, n)[source]

Floor an integer by removing its remainder from the nearest value n.

>>> floor_div(163, 10)
160
>>> floor_div(158, 10)
150
snax.utils.get_dataset(path, base, name='data', group=None)[source]

open a dataset at @param path with name @param base and return the data

snax.utils.get_logger(logname, verbose=False)[source]

standardize how we instantiate loggers

snax.utils.gps2latency(gps_time)[source]

Given a gps time, measures the latency to ms precision relative to now.

snax.utils.group_indices(indices)[source]

Given a list of indices, groups up indices into contiguous groups.

snax.utils.in_new_epoch(new_gps_time, prev_gps_time, gps_epoch)[source]

Returns whether new and old gps times are in different epochs.

>>> in_new_epoch(1234561200, 1234560000, 1000)
True
>>> in_new_epoch(1234561200, 1234560000, 10000)
False
snax.utils.latency_name(stage_name, stage_num, channel, rate=None)[source]

Returns a properly formatted latency element name based on stage, channel, and rate information.

snax.utils.path2cache(rootdir, pathname)[source]

given a rootdir and a glob-compatible pathname that may contain shell-style wildcards, will find all files that match and populate a Cache. NOTE: this will only work with files that comply with the T050017 file convention.

snax.utils.split_segments_by_stride(span, segs, stride)[source]

given a span (segment), segments and a stride, breaks up segments along stride boundaries returns a segment list