Coverage for pesummary/core/file/read.py: 88.8%
116 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3from pesummary.core.file.formats.base_read import Read
4from pesummary.core.file.formats.bilby import Bilby, bilby_version
5from pesummary.core.file.formats.default import Default
6from pesummary.core.file.formats.pesummary import PESummary, PESummaryDeprecated
7from pesummary.utils.utils import logger
8import os
10__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
13def is_bilby_hdf5_file(path):
14 """Determine if the results file is a bilby hdf5 results file
16 Parameters
17 ----------
18 path: str
19 path to the results file
20 """
21 import h5py
22 f = h5py.File(path, "r")
23 cond = _check_bilby_version(f)
24 f.close()
25 return cond
28def is_bilby_json_file(path):
29 """Determine if the results file is a bilby json results file
31 Parameters
32 ----------
33 path: str
34 path to the results file
35 """
36 import json
37 with open(path, "r") as f:
38 data = json.load(f)
39 return _check_bilby_version(data)
42def _check_bilby_version(data):
43 """Identify if 'bilby' is in the version information extracted from the
44 file
46 Parameters
47 ----------
48 data: iterable
49 contents of the result file, opened with either h5py or json libraries
50 """
51 try:
52 version = bilby_version(data=data)
53 if isinstance(version, bytes):
54 version = version.decode("utf-8")
55 if "bilby" in version:
56 return True
57 return False
58 except Exception:
59 return False
62def _is_pesummary_hdf5_file(path, check_function):
63 """Determine if the results file is a pesummary hdf5 file
65 Parameters
66 ----------
67 path: str
68 path to the results file
69 check_function: func
70 function used to check the result file
71 """
72 import h5py
73 f = h5py.File(path, 'r')
74 outcome = check_function(f)
75 f.close()
76 return outcome
79def is_pesummary_hdf5_file_deprecated(path):
80 """Determine if the results file is a deprecated pesummary hdf5 file
82 Parameters
83 ----------
84 path: str
85 path to the results file
86 """
87 return _is_pesummary_hdf5_file(path, _check_pesummary_file_deprecated)
90def is_pesummary_hdf5_file(path):
91 """Determine if the results file is a pesummary hdf5 file
93 Parameters
94 ----------
95 path: str
96 path to the results file
97 """
98 return _is_pesummary_hdf5_file(path, _check_pesummary_file)
101def _is_pesummary_json_file(path, check_function):
102 """Determine if the result file is a pesummary json file
104 Parameters
105 ----------
106 path: str
107 path to the results file
108 check_function: func
109 function used to check the result file
110 """
111 import json
112 with open(path, "r") as f:
113 data = json.load(f)
114 return check_function(data)
117def is_pesummary_json_file(path):
118 """Determine if the results file is a pesummary json file
120 Parameters
121 ----------
122 path: str
123 path to results file
124 """
125 return _is_pesummary_json_file(path, _check_pesummary_file)
128def is_pesummary_json_file_deprecated(path):
129 """Determine if the results file is a deprecated pesummary json file
131 Parameters
132 ----------
133 path: str
134 path to results file
135 """
136 return _is_pesummary_json_file(path, _check_pesummary_file_deprecated)
139def _check_pesummary_file_deprecated(f):
140 """Check the contents of a dictionary to see if it is a deprecated pesummary
141 dictionary
143 Parameters
144 ----------
145 f: dict
146 dictionary of the contents of the file
147 """
148 if "posterior_samples" in f.keys():
149 try:
150 import collections
152 labels = f["posterior_samples"].keys()
153 if isinstance(labels, collections.abc.KeysView):
154 _label = list(labels)[0]
155 try:
156 _param = list(f["posterior_samples"][_label].keys())[0]
157 samples = f["posterior_samples"][_label][_param]
158 return True
159 except Exception:
160 return False
161 else:
162 return False
163 except Exception:
164 return False
165 else:
166 return False
169def _check_pesummary_file(f):
170 """Check the contents of a dictionary to see if it is a pesummary dictionary
172 Parameters
173 ----------
174 f: dict
175 dictionary of the contents of the file
176 """
177 labels = f.keys()
178 if "version" not in labels:
179 return False
180 try:
181 if all(
182 "posterior_samples" in f[label].keys() for label in labels if
183 label != "version" and label != "history" and label != "strain"
184 ):
185 return True
186 elif all(
187 "mcmc_chains" in f[label].keys() for label in labels if
188 label != "version" and label != "history" and label != "strain"
189 ):
190 return True
191 else:
192 return False
193 except Exception:
194 return False
197CORE_HDF5_LOAD = {
198 is_bilby_hdf5_file: Bilby.load_file,
199 is_pesummary_hdf5_file: PESummary.load_file,
200 is_pesummary_hdf5_file_deprecated: PESummaryDeprecated.load_file
201}
203CORE_JSON_LOAD = {
204 is_bilby_json_file: Bilby.load_file,
205 is_pesummary_json_file: PESummary.load_file,
206 is_pesummary_json_file_deprecated: PESummaryDeprecated.load_file
207}
209CORE_DEFAULT_LOAD = {
210 "default": Default.load_file
211}
213DEFAULT_FORMATS = ["default", "dat", "json", "hdf5", "h5", "txt"]
216def _read(path, load_options, default=CORE_DEFAULT_LOAD, **load_kwargs):
217 """Try and load a result file according to multiple options
219 Parameters
220 ----------
221 path: str
222 path to results file
223 load_options: dict
224 dictionary of checks and loading functions
225 """
226 for check, load in load_options.items():
227 if check(path):
228 try:
229 return load(path, **load_kwargs)
230 except ImportError as e:
231 logger.warning(
232 "Failed due to import error: {}. Using default load".format(
233 e
234 )
235 )
236 return default["default"](path, **load_kwargs)
237 except Exception as e:
238 logger.info(
239 "Failed to read in {} with the {} class because {}".format(
240 path, load, e
241 )
242 )
243 continue
244 if len(load_options):
245 logger.warning(
246 "Using the default load because {} failed the following checks: {}".format(
247 path, ", ".join([function.__name__ for function in load_options.keys()])
248 )
249 )
250 return default["default"](path, **load_kwargs)
253def _file_format(file_format, options):
254 """Return a dictionary of load options. If a file format is specified, then return
255 a dictionary containing only that one load option
257 Parameters
258 ----------
259 file_format: str
260 the file format you wish to use. If None, all possible load options will be
261 returned
262 options: dict
263 dictionary of possible load options. Key is a function which returns True
264 or False depending on whether the input file belongs to that class of objects,
265 value is the load function
266 """
267 if file_format is None:
268 return options
269 elif any(file_format.lower() in key for key in DEFAULT_FORMATS):
270 reduced = {}
271 elif not any(file_format in key.__name__ for key in options.keys()):
272 raise ValueError(
273 "Unrecognised file format. The format must be a substring of any"
274 "of the following: {}".format(
275 ", ".join([key.__name__ for key in options.keys()] + DEFAULT_FORMATS)
276 )
277 )
278 else:
279 reduced = {
280 key: value for key, value in options.items() if file_format in key.__name__
281 }
282 return reduced
285def read(
286 path, HDF5_LOAD=CORE_HDF5_LOAD, JSON_LOAD=CORE_JSON_LOAD,
287 DEFAULT=CORE_DEFAULT_LOAD, file_format=None, **kwargs
288):
289 """Read in a results file.
291 Parameters
292 ----------
293 path: str
294 path to results file
295 format: str
296 the file format you wish to use. Default None. If None, the read
297 function loops through all possible options
298 **kwargs: dict, optional
299 all additional kwargs are passed directly to the load_file class method
300 """
301 path = os.path.expanduser(path)
302 extension = Read.extension_from_path(path)
304 if extension in ["gz"]:
305 from pesummary.utils.utils import unzip
306 path = unzip(path)
307 if extension in ["hdf5", "h5", "hdf"]:
308 options = _file_format(file_format, HDF5_LOAD)
309 return _read(path, options, default=DEFAULT, **kwargs)
310 elif extension == "json":
311 options = _file_format(file_format, JSON_LOAD)
312 return _read(path, options, default=DEFAULT, **kwargs)
313 else:
314 return DEFAULT["default"](path, file_format=file_format, **kwargs)