import glob
import os
import shutil
import time
import pluggy
from ligo.segments import segment, segmentlist
from ... import names
from ... import utils
DEFAULT_SHMDIR_AGE = 600
[docs]class Reporter(object):
"""
an object that handles I/O of models, calibration_maps
this thing knows about file-systems and directory structures
the specifics about where things will be stored, etc should be set on creation
NOTE: this is just a parent class defining the API. Extensions of this
class must define their own functionality.
Upon instantiation, we require some knowledge of where these things will be
written (rootdir) as well as some knowledge of what time range the span (start, end)
"""
_required_kwargs = []
def __init__(self, rootdir, start, end, **kwargs):
self._rootdir = rootdir
self._start = start
self._end = end
for kwarg in self._required_kwargs:
assert kwarg in kwargs, "kwarg=%s required" % kwarg
self.kwargs = kwargs
@property
def rootdir(self):
return self._rootdir
@rootdir.setter
def rootdir(self, rootdir):
self._rootdir = rootdir
@property
def start(self):
return self._start
@start.setter
def start(self, start):
self._start = start
@property
def end(self):
return self._end
@end.setter
def end(self, end):
self._end = end
def preferred(self, nickname):
return None
[docs] def report(self, nickname, obj, preferred=False):
"""
write a serialized version of the object into a path determined by kwargs
"""
raise NotImplementedError
[docs] def retrieve(self, nickname, preferred=False):
"""
the inverse of report
returns the object that was serialized
if preferred, looks for the most recent object, which does not necessarily
correspond to where this thing would write serialized output supported for
convenient retrieval
"""
raise NotImplementedError
[docs] @classmethod
def read(cls, path):
"""
return the object from the path given
"""
raise NotImplementedError
[docs] def glob(self, nickname, start, end):
"""return a generator over all existing data associated
with nickname within [start, end)
NOTE: this should work without modifying the object!
"""
raise NotImplementedError
[docs]class DiskReporter(Reporter):
"""
a sub-class of Reporter that reads/writes to the local disk (by pickling objects)
this defines a directory structure that it expects and works within that paradigm
"""
# we'll always write plaintxt for this, so suffix doesn't really matter
_suffix = "txt"
_cache_template = ".%s-preferred.cache"
def __init__(self, *args, **kwargs):
"""
delegates to Reporter.__init__ and then makes sure the correct directory exists
"""
Reporter.__init__(self, *args, **kwargs)
# define this upon instantiation so each instance has it set independently
self._cache_mtime = dict()
if "shmdir" in self.kwargs:
shmdir = self.kwargs["shmdir"]
os.makedirs(shmdir, exist_ok=True)
# set temp directory
if "tmpdir" in self.kwargs:
self._tmpdir = self.kwargs["tmpdir"]
elif "TMPDIR" in os.environ:
self._tmpdir = os.environ["TMPDIR"]
else:
self._tmpdir = "/tmp"
# make temp directory if it doesn't exist
os.makedirs(self._tmpdir, exist_ok=True)
@property
def tmpdir(self):
return self._tmpdir
@property
def suffix(self):
return self._suffix
def directory(self, start, end):
"""checks to make sure this directory exists whenever its called"""
directory = names.start2dirMOD1e5(start, basename="START", rootdir=self.rootdir)
os.makedirs(directory, exist_ok=True)
return directory
def path(self, nickname, start, end):
"""
standardizes file naming conventions via delegations to idq.names
"""
return os.path.join(
self.directory(start, end),
names.start_end2path(nickname, start, end, suffix=self.suffix),
)
def _cachepath(self, nickname):
return os.path.join(
self.rootdir,
self._cache_template % (nickname),
)
def new_preferred(self, nickname):
"""
determines if there is a new preferred path by checking the mtime of the cache
Note: this updates self._cache_time, which is used to determine whether the
cache has been touched since we last asked
"""
cachepath = self._cachepath(nickname)
if os.path.exists(cachepath): # path must exist for us to have a preferred
mtime = os.path.getmtime(cachepath)
# check this against the stored mtime
if mtime != self._cache_mtime.get(nickname, None):
self._cache_mtime[nickname] = mtime # update the stored mtime
return True
return False
def update_preferred(self, nickname, path):
"""
write a path into the preferred cachefile
NOTE: this is *not* automatically called within report.
users must update the cache themselves.
"""
cachepath = self._cachepath(nickname)
with open(cachepath, "a") as file_obj:
file_obj.write("\n" + path)
def preferred(self, nickname):
"""
get the path that redirects to the preferred file
does this by referencing a cachepath that's automatically defined
relative to rootdir
"""
if not self.new_preferred(nickname):
return None
else:
# we can rely on this existing because new_preferred was True
return self._tail(self._cachepath(nickname))
def _tail(self, path):
"""
retrieve the last 'preferred' file from the cache
see https://stackoverflow.com/a/54278929
"""
with open(path, "rb") as f:
try: # catch OSError in case of a one line file
f.seek(-2, os.SEEK_END)
while f.read(1) != b"\n":
f.seek(-2, os.SEEK_CUR)
except OSError:
f.seek(0)
return f.readline().strip().decode()
def report(self, nickname, obj, preferred=False, **kwargs):
"""
writes string into plaintxt file
"""
path = self.path(nickname, self.start, self.end)
basename = os.path.basename(path)
if self.kwargs.get("shmdir_only", False):
path = os.path.join(self.tmpdir, basename)
tmppath = os.path.join(self.tmpdir, "." + basename + ".tmp")
# save to temp directory, and copy/move over as requested
self._write(tmppath, obj, nickname=nickname, **kwargs)
self._manage_shmdir(
tmppath,
basename,
age=self.kwargs.get("shmdir_age", DEFAULT_SHMDIR_AGE),
persist=self.kwargs.get("shmdir_persist", False),
)
shutil.move(tmppath, path)
if preferred:
self.update_preferred(nickname, path)
return path
def _write(self, path, string, **kwargs):
with open(path, "w") as file_obj:
file_obj.write(string)
def retrieve(self, nickname, preferred=False):
if preferred:
path = self.preferred(nickname)
else:
path = self.path(nickname, self.start, self.end)
if path is not None:
return self.read(path)
@classmethod
def read(cls, path):
with open(path, "r") as file_obj:
string = file_obj.read()
return string
def _manage_shmdir(self, path, basename, age=DEFAULT_SHMDIR_AGE, persist=False):
if "shmdir" in self.kwargs:
# assume this directory already exists, see __init__
shmdir = self.kwargs["shmdir"]
# copy to a temporary file and then move to make this atomic
tmppath = os.path.join(shmdir, "." + os.path.basename(path) + ".tmp")
shutil.copyfile(path, tmppath)
shutil.move(tmppath, os.path.join(shmdir, basename))
# get rid of old things in shmdir
if not persist:
self._cleanup(shmdir, age=age)
def _cleanup(self, directory, age=DEFAULT_SHMDIR_AGE):
"""
Delete files in directory that are older than age given
"""
for file_ in os.listdir(directory):
filepath = os.path.join(directory, file_)
try:
if time.time() - os.path.getmtime(filepath) > age:
# file hasn't been modified in a while
os.remove(filepath)
except OSError:
# file was already deleted, can happen with parallel workflows
pass
def _glob_paths(self, nickname, start, end, data_id=None, all_dirs=False):
"""a generator for the set of paths needed by glob
NOTE: if there are overlapping ranges, all will be returned.
They are sorted by start time and ties are broken by durations
(shorter durations first)
ALSO NOTE: this returns all data with any overlap with the requested
range and does NOT truncate or extend it to fit the specified range.
"""
bounds = []
span = segmentlist([segment(start, end)])
if all_dirs:
gps_dirs = sorted(glob.glob(os.path.join(self.rootdir, "*")))
else:
gps_dirs = [
os.path.join(self.rootdir, f"START-{gps_dir}")
for gps_dir in utils.start_end2gps_dirs(start, end, lookback=1)
]
for directory in gps_dirs:
if not os.path.isdir(directory):
continue
try:
# top level directory
S, E = names.dirMOD1e52start_end(directory)
if S > end:
# we've run off the end
break
for file_ in sorted(
glob.glob(
os.path.join(
directory,
names.glob2path(f"{nickname}*", suffix=self.suffix),
)
)
):
try:
s, e = names.path2start_end(file_, suffix=self.suffix)
if s > end:
# we've run off the end
continue
# we already checked (s<end), so we don't check it again
if utils.livetime(segmentlist([segment(s, e)]) & span):
# some overlap
bounds.append((s, e))
except ValueError:
pass
except ValueError:
# bad formatting
pass
# remove duplicates and sort
bounds = list(set(bounds))
bounds.sort(key=lambda _: _[1]) # sort by end time first
bounds.sort(key=lambda _: _[0]) # then sort by start time
for start, end in bounds:
if data_id is None:
paths = [self.path(nickname, start, end)]
else:
paths = glob.glob(self.path(f"{nickname}_{data_id}", start, end)) + [
self.path(nickname, start, end)
]
for path in paths:
if os.path.exists(path):
# there is data for this nickname in this directory
yield start, end, path
def glob(self, nickname, start, end, data_id=None, all_dirs=False):
"""yield all data associated with nickname within [start, end)"""
for s, e, path in self._glob_paths(
nickname, start, end, data_id=data_id, all_dirs=all_dirs
):
data = self.read(path)
yield s, e, data
hookspec = pluggy.HookspecMarker("iDQ")
@hookspec
def get_reporters():
"""
This hook is used to return Reporters in the form:
{"[type:]format": Reporter}
where the type (optional if generic) refers
to a specific reporter handling specific data products:
- span
- series
- segment
- model
- calib
- dataset
"""