import os
import numpy as np
import gpstime
from LDAStools import frameCPP as frcpp
from ... import hookimpl
from ... import utils
from ...series import ChannelInfo, Series, SeriesInfo
from . import DiskReporter
[docs]class GWFSeriesReporter(DiskReporter):
"""
store series informaiton in gwf files
NOTE:
GWFSeriesReporter currently only supports a single contiguous segment
and will raise an exception if it finds more than one.
This is most likely the most common use cause
(e.g.: streaming timeseries ignoring segdb), so it's probably fine
"""
_suffix = "gwf"
_MODEL_NAME = "IDQ_MODEL"
_CALIB_NAME = "IDQ_CALIB"
def _write(self, path, series, run=0, **kwargs):
if isinstance(series, list):
assert (
len(series) == 1
), "GWFSeriesReporter only supports a single contiguous segment of data"
series = series[0]
duration = series.dt * len(series)
# make the frame
frame = frcpp.FrameH()
# NOTE: SetName could be fragile
frame.SetName("-".join(os.path.basename(path).split("-")[:-2]))
frame.SetGTime(frcpp.GPSTime(*[int(_) for _ in utils.float2sec_ns(series.t0)]))
frame.SetDt(duration)
frame.SetRun(run)
# add provenance information
current_gps = int(gpstime.gpsnow())
frame.AppendFrHistory(
frcpp.FrHistory(self._MODEL_NAME, current_gps, series.model_id)
)
frame.AppendFrHistory(
frcpp.FrHistory(self._CALIB_NAME, current_gps, series.calibration_id)
)
# add data
for channel, data in series.items():
# make a vector
frvect = frcpp.FrVect(
channel,
frcpp.FrVect.FR_VECT_8R,
1, # number of dimensions
frcpp.Dimension( # the dimension object
len(series), # number of elements
series.dt, # spacing
"s", # units
0, # start time offset relative to the header (set above)
),
"", # dimensionless unit, cause we have probabilities and such
)
# actually populate the object
frvect.GetDataArray()[:] = data
# add that vector to the ProcData
frdata = frcpp.FrProcData(
channel,
"", # no comment
frcpp.FrProcData.TIME_SERIES, # ID as time-series
frcpp.FrProcData.UNKNOWN_SUB_TYPE, # empty sub-type (fseries)
0, # offset of first sample relative to frame start
duration, # duration of data
0.0, # heterodyne frequency
0.0, # phase of heterodyne
0.0, # frequency range
0.0, # resolution bandwidth
)
# add the vector to this data
frdata.AppendData(frvect)
# add ProcData to FrameH
frame.AppendFrProcData(frdata)
# write
frame.Write(frcpp.OFrameFStream(path))
@classmethod
def read(cls, path):
seriesdict = {}
stream = frcpp.IFrameFStream(path)
frame = stream.ReadNextFrame() # assume a single frame
info = None
deltaT = None
# extract data
toc = stream.GetTOC()
for channels, fr_data in [
(toc.GetProc(), stream.ReadFrProcData),
(toc.GetADC(), stream.ReadFrAdcData),
(toc.GetSim(), stream.ReadFrSimData),
]:
for channel in channels:
# magic number: one frame per stream...
data = fr_data(0, channel)
dim = data.data[0].GetDim(0)
# magic number: one FrVect wrapper per channel
data = data.data[0].GetDataArray()
if not info:
info = ChannelInfo.from_channel(channel)
deltaT = dim.dx
series = np.empty_like(data, dtype=float)
series[:] = data[:]
metric = info.channel_to_metric_name(channel)
seriesdict[metric] = Series(channel, series)
# assume there is at least one channel
# NOTE: these should be the same for all channels, so we only read them once
t0 = utils.sec_ns2float(*frame.GetGTime())
# extract histories
histories = dict()
for history in frame.RefHistory():
histories[history.GetName()] = history.GetComment()
for name in [cls._MODEL_NAME, cls._CALIB_NAME]:
if name not in histories:
raise KeyError(
"could not find FrHistory named=%s in %s!" % (name, path)
)
seriesinfo = SeriesInfo(
t0,
deltaT,
info=info,
model_id=histories[cls._MODEL_NAME],
calibration_id=histories[cls._CALIB_NAME],
**seriesdict,
)
# we only know how to handle a single contiguous stretch of data at the moment
return [seriesinfo]
@hookimpl
def get_reporters():
return {"series:gwf": GWFSeriesReporter}