Coverage for pesummary/gw/file/formats/base_read.py: 80.6%
283 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import numpy as np
4from pesummary.gw.file.standard_names import standard_names
5from pesummary.core.file.formats.base_read import (
6 Read, SingleAnalysisRead, MultiAnalysisRead
7)
8from pesummary.utils.utils import logger
9from pesummary.utils.samples_dict import SamplesDict
10from pesummary.utils.decorators import open_config
11from pesummary.gw.conversions import convert
13__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
16def _translate_parameters(parameters, samples):
17 """Translate parameters to a standard names
19 Parameters
20 ----------
21 parameters: list
22 list of parameters used in the analysis
23 samples: list
24 list of samples for each parameters
25 """
26 path = ("https://git.ligo.org/lscsoft/pesummary/blob/master/pesummary/"
27 "gw/file/standard_names.py")
28 parameters_not_included = [
29 i for i in parameters if i not in standard_names.keys()
30 ]
31 if len(parameters_not_included) > 0:
32 logger.debug(
33 "PESummary does not have a 'standard name' for the following "
34 "parameters: {}. This means that comparison plots between "
35 "different codes may not show these parameters. If you want to "
36 "assign a standard name for these parameters, please add an MR "
37 "which edits the following file: {}. These parameters will be "
38 "added to the result pages and meta file as is.".format(
39 ", ".join(parameters_not_included), path
40 )
41 )
42 standard_params = [i for i in parameters if i in standard_names.keys()]
43 converted_params = [
44 standard_names[i] if i in standard_params else i for i in
45 parameters
46 ]
47 return converted_params, samples
50def _add_log_likelihood(parameters, samples):
51 """Add zero log_likelihood samples to the posterior table
53 Parameters
54 ----------
55 parameters: list
56 list of parameters stored in the table
57 samples: 2d list
58 list of samples for each parameter. Columns correspond to a given
59 parameter
60 """
61 if "log_likelihood" not in parameters:
62 parameters.append("log_likelihood")
63 samples = np.vstack(
64 [np.array(samples).T, np.zeros(len(samples))]
65 ).T
66 return parameters, samples
69class GWRead(Read):
70 """Base class to read in a results file
72 Parameters
73 ----------
74 path_to_results_file: str
75 path to the results file you wish to load
76 remove_nan_likelihood_samples: Bool, optional
77 if True, remove samples which have log_likelihood='nan'. Default True
79 Attributes
80 ----------
81 parameters: list
82 list of parameters stored in the result file
83 converted_parameters: list
84 list of parameters that have been derived from the sampled distributions
85 samples: 2d list
86 list of samples stored in the result file
87 samples_dict: dict
88 dictionary of samples stored in the result file keyed by parameters
89 input_version: str
90 version of the result file passed.
91 extra_kwargs: dict
92 dictionary of kwargs that were extracted from the result file
93 converted_parameters: list
94 list of parameters that have been added
96 Methods
97 -------
98 to_dat:
99 save the posterior samples to a .dat file
100 to_latex_table:
101 convert the posterior samples to a latex table
102 generate_latex_macros:
103 generate a set of latex macros for the stored posterior samples
104 to_lalinference:
105 convert the posterior samples to a lalinference result file
106 generate_all_posterior_samples:
107 generate all posterior distributions that may be derived from
108 sampled distributions
109 """
110 def __init__(self, path_to_results_file, **kwargs):
111 super(GWRead, self).__init__(path_to_results_file, **kwargs)
113 @property
114 def calibration_spline_posterior(self):
115 return None
117 Read.attrs.update({"approximant": "approximant"})
119 def load(self, function, _data=None, **kwargs):
120 """Load a results file according to a given function
122 Parameters
123 ----------
124 function: func
125 callable function that will load in your results file
126 """
127 data = _data
128 if _data is None:
129 data = self.load_from_function(
130 function, self.path_to_results_file, **kwargs
131 )
132 parameters, samples = self.translate_parameters(
133 data["parameters"], data["samples"]
134 )
135 _add_likelihood = kwargs.get("add_zero_likelihood", True)
136 if not self.check_for_log_likelihood(parameters) and _add_likelihood:
137 logger.warning(
138 "Failed to find 'log_likelihood' in result file. Setting "
139 "every sample to have log_likelihood 0"
140 )
141 parameters, samples = self.add_log_likelihood(parameters, samples)
142 data.update(
143 {
144 "parameters": parameters, "samples": samples,
145 "injection": data["injection"]
146 }
147 )
148 super(GWRead, self).load(function, _data=data, **kwargs)
149 if self.injection_parameters is not None:
150 self.injection_parameters = self.convert_injection_parameters(
151 self.injection_parameters, extra_kwargs=self.extra_kwargs,
152 disable_convert=kwargs.get("disable_injection_conversion", False)
153 )
154 if self.priors is not None and len(self.priors):
155 if self.priors["samples"] != {}:
156 priors = self.priors["samples"]
157 self.priors["samples"] = self.convert_and_translate_prior_samples(
158 priors, disable_convert=kwargs.get(
159 "disable_prior_conversion", False
160 )
161 )
163 def convert_and_translate_prior_samples(self, priors, disable_convert=False):
164 """
165 """
166 default_parameters = list(priors.keys())
167 default_samples = [
168 [priors[parameter][i] for parameter in default_parameters] for i
169 in range(len(priors[default_parameters[0]]))
170 ]
171 parameters, samples = self.translate_parameters(
172 default_parameters, default_samples
173 )
174 if not disable_convert:
175 return convert(
176 parameters, samples, extra_kwargs=self.extra_kwargs
177 )
178 return SamplesDict(parameters, samples)
180 def convert_injection_parameters(
181 self, data, extra_kwargs={"sampler": {}, "meta_data": {}},
182 disable_convert=False, sampled_parameters=None, **kwargs
183 ):
184 """Apply the conversion module to the injection data
186 Parameters
187 ----------
188 data: dict
189 dictionary of injection data keyed by the parameter
190 extra_kwargs: dict, optional
191 optional kwargs to pass to the conversion module
192 disable_convert: Bool, optional
193 if True, do not convert injection parameters
194 """
195 import math
196 from pesummary.gw.file.injection import GWInjection
197 kwargs.update({"extra_kwargs": extra_kwargs})
198 _data = data.copy()
199 for key, item in data.items():
200 if math.isnan(np.atleast_1d(item)[0]):
201 _ = _data.pop(key)
202 if len(_data):
203 converted = GWInjection(
204 _data, conversion=not disable_convert, conversion_kwargs=kwargs
205 )
206 _param = list(converted.keys())[0]
207 _example = converted[_param]
208 if not len(_example.shape):
209 for key, item in converted.items():
210 converted[key] = [item]
211 else:
212 converted = _data
213 for i in sampled_parameters:
214 if i not in list(converted.keys()):
215 converted[i] = float('nan')
216 return converted
218 def write(self, package="core", **kwargs):
219 """Save the data to file
221 Parameters
222 ----------
223 package: str, optional
224 package you wish to use when writing the data
225 kwargs: dict, optional
226 all additional kwargs are passed to the pesummary.io.write function
227 """
228 return super(GWRead, self).write(package="gw", **kwargs)
230 def _grab_injection_parameters_from_file(self, path, **kwargs):
231 """Extract data from an injection file
233 Parameters
234 ----------
235 path: str
236 path to injection file
237 """
238 from pesummary.gw.file.injection import GWInjection
239 return super(GWRead, self)._grab_injection_parameters_from_file(
240 path, cls=GWInjection, **kwargs
241 )
243 def interpolate_calibration_spline_posterior(self, **kwargs):
244 from pesummary.gw.file.calibration import Calibration
245 from pesummary.utils.utils import iterator
246 if self.calibration_spline_posterior is None:
247 return
248 total = []
249 log_frequencies, amplitudes, phases = self.calibration_spline_posterior
250 keys = list(log_frequencies.keys())
251 _iterator = iterator(
252 None, desc="Interpolating calibration posterior", logger=logger,
253 tqdm=True, total=len(self.samples) * 2 * len(keys)
254 )
255 with _iterator as pbar:
256 for key in keys:
257 total.append(
258 Calibration.from_spline_posterior_samples(
259 np.array(log_frequencies[key]),
260 np.array(amplitudes[key]), np.array(phases[key]),
261 pbar=pbar, **kwargs
262 )
263 )
264 return total, log_frequencies.keys()
266 @staticmethod
267 def translate_parameters(parameters, samples):
268 """Translate parameters to a standard names
270 Parameters
271 ----------
272 parameters: list
273 list of parameters used in the analysis
274 samples: list
275 list of samples for each parameters
276 """
277 return _translate_parameters(parameters, samples)
279 @staticmethod
280 def _check_definition_of_inclination(parameters):
281 """Check the definition of inclination given the other parameters
283 Parameters
284 ----------
285 parameters: list
286 list of parameters used in the study
287 """
288 theta_jn = False
289 spin_angles = ["tilt_1", "tilt_2", "a_1", "a_2"]
290 names = [
291 standard_names[i] for i in parameters if i in standard_names.keys()]
292 if all(i in names for i in spin_angles):
293 theta_jn = True
294 if theta_jn:
295 if "theta_jn" not in names and "inclination" in parameters:
296 logger.warning("Because the spin angles are in your list of "
297 "parameters, the angle 'inclination' probably "
298 "refers to 'theta_jn'. If this is a mistake, "
299 "please change the definition of 'inclination' to "
300 "'iota' in your results file")
301 index = parameters.index("inclination")
302 parameters[index] = "theta_jn"
303 else:
304 if "inclination" in parameters:
305 index = parameters.index("inclination")
306 parameters[index] = "iota"
307 return parameters
309 def add_fixed_parameters_from_config_file(self, config_file):
310 """Search the conifiguration file and add fixed parameters to the
311 list of parameters and samples
313 Parameters
314 ----------
315 config_file: str
316 path to the configuration file
317 """
318 self._add_fixed_parameters_from_config_file(
319 config_file, self._add_fixed_parameters)
321 @staticmethod
322 @open_config(index=2)
323 def _add_fixed_parameters(parameters, samples, config_file):
324 """Open a LALInference configuration file and add the fixed parameters
325 to the list of parameters and samples
327 Parameters
328 ----------
329 parameters: list
330 list of existing parameters
331 samples: list
332 list of existing samples
333 config_file: str
334 path to the configuration file
335 """
336 from pesummary.gw.file.standard_names import standard_names
338 config = config_file
339 if not config.error:
340 fixed_data = {}
341 if "engine" in config.sections():
342 fixed_data = {
343 key.split("fix-")[1]: item for key, item in
344 config.items("engine") if "fix" in key}
345 for i in fixed_data.keys():
346 fixed_parameter = i
347 fixed_value = fixed_data[i]
348 try:
349 param = standard_names[fixed_parameter]
350 if param in parameters:
351 pass
352 else:
353 parameters.append(param)
354 for num in range(len(samples)):
355 samples[num].append(float(fixed_value))
356 except Exception:
357 if fixed_parameter == "logdistance":
358 if "luminosity_distance" not in parameters:
359 parameters.append(standard_names["distance"])
360 for num in range(len(samples)):
361 samples[num].append(float(fixed_value))
362 if fixed_parameter == "costheta_jn":
363 if "theta_jn" not in parameters:
364 parameters.append(standard_names["theta_jn"])
365 for num in range(len(samples)):
366 samples[num].append(float(fixed_value))
367 return parameters, samples
368 return parameters, samples
371class GWSingleAnalysisRead(GWRead, SingleAnalysisRead):
372 """Base class to read in a results file which contains a single analysis
374 Parameters
375 ----------
376 path_to_results_file: str
377 path to the results file you wish to load
378 remove_nan_likelihood_samples: Bool, optional
379 if True, remove samples which have log_likelihood='nan'. Default True
381 Attributes
382 ----------
383 parameters: list
384 list of parameters stored in the result file
385 converted_parameters: list
386 list of parameters that have been derived from the sampled distributions
387 samples: 2d list
388 list of samples stored in the result file
389 samples_dict: dict
390 dictionary of samples stored in the result file keyed by parameters
391 input_version: str
392 version of the result file passed.
393 extra_kwargs: dict
394 dictionary of kwargs that were extracted from the result file
395 converted_parameters: list
396 list of parameters that have been added
398 Methods
399 -------
400 to_dat:
401 save the posterior samples to a .dat file
402 to_latex_table:
403 convert the posterior samples to a latex table
404 generate_latex_macros:
405 generate a set of latex macros for the stored posterior samples
406 to_lalinference:
407 convert the posterior samples to a lalinference result file
408 generate_all_posterior_samples:
409 generate all posterior distributions that may be derived from
410 sampled distributions
411 """
412 def __init__(self, *args, **kwargs):
413 super(GWSingleAnalysisRead, self).__init__(*args, **kwargs)
415 def check_for_log_likelihood(self, parameters):
416 """Return True if 'log_likelihood' is in a list of sampled parameters
418 Parameters
419 ----------
420 parameters: list
421 list of sampled parameters
422 """
423 if "log_likelihood" in parameters:
424 return True
425 return False
427 def add_log_likelihood(self, parameters, samples):
428 """Add log_likelihood samples to a posterior table
430 Parameters
431 ----------
432 parameters: list
433 list of parameters stored in the table
434 samples: 2d list
435 list of samples for each parameter. Columns correspond to a given
436 parameter
437 """
438 return _add_log_likelihood(parameters, samples)
440 def generate_all_posterior_samples(self, **kwargs):
441 """Generate all posterior samples via the conversion module
443 Parameters
444 ----------
445 **kwargs: dict
446 all kwargs passed to the conversion module
447 """
448 if "no_conversion" in kwargs.keys():
449 no_conversion = kwargs.pop("no_conversion")
450 else:
451 no_conversion = False
452 if not no_conversion:
453 from pesummary.gw.conversions import convert
455 data = convert(
456 self.parameters, self.samples, extra_kwargs=self.extra_kwargs,
457 return_dict=False, **kwargs
458 )
459 self.parameters = data[0]
460 self.converted_parameters = self.parameters.added
461 self.samples = data[1]
462 if kwargs.get("return_kwargs", False):
463 self.extra_kwargs = data[2]
465 def convert_injection_parameters(self, *args, **kwargs):
466 return super(GWSingleAnalysisRead, self).convert_injection_parameters(
467 *args, sampled_parameters=self.parameters, **kwargs
468 )
470 def to_lalinference(self, **kwargs):
471 """Save the PESummary results file object to a lalinference hdf5 file
473 Parameters
474 ----------
475 kwargs: dict
476 all kwargs are passed to the pesummary.io.write.write function
477 """
478 return self.write(file_format="lalinference", package="gw", **kwargs)
481class GWMultiAnalysisRead(GWRead, MultiAnalysisRead):
482 """Base class to read in a results file which contains multiple analyses
484 Parameters
485 ----------
486 path_to_results_file: str
487 path to the results file you wish to load
488 remove_nan_likelihood_samples: Bool, optional
489 if True, remove samples which have log_likelihood='nan'. Default True
490 """
491 def __init__(self, *args, **kwargs):
492 super(GWMultiAnalysisRead, self).__init__(*args, **kwargs)
494 def load(self, *args, **kwargs):
495 super(GWMultiAnalysisRead, self).load(*args, **kwargs)
496 if "psd" in self.data.keys():
497 from pesummary.gw.file.psd import PSDDict
499 try:
500 self.psd = {
501 label: PSDDict(
502 {ifo: value for ifo, value in psd_data.items()}
503 ) for label, psd_data in self.data["psd"].items()
504 }
505 except (KeyError, AttributeError):
506 self.psd = self.data["psd"]
507 if "calibration" in self.data.keys():
508 from pesummary.gw.file.calibration import Calibration
510 try:
511 self.calibration = {
512 label: {
513 ifo: Calibration(value) for ifo, value in
514 calibration_data.items()
515 } for label, calibration_data in
516 self.data["calibration"].items()
517 }
518 except (KeyError, AttributeError):
519 self.calibration = self.data["calibration"]
520 if "prior" in self.data.keys() and "calibration" in self.data["prior"].keys():
521 from pesummary.gw.file.calibration import CalibrationDict
523 try:
524 self.priors["calibration"] = {
525 label: CalibrationDict(calibration_data) for
526 label, calibration_data in
527 self.data["prior"]["calibration"].items()
528 }
529 except (KeyError, AttributeError):
530 pass
531 if "skymap" in self.data.keys():
532 from pesummary.gw.file.skymap import SkyMapDict, SkyMap
534 try:
535 self.skymap = SkyMapDict({
536 label: SkyMap(skymap["data"], skymap["meta_data"])
537 for label, skymap in self.data["skymap"].items()
538 })
539 except (KeyError, AttributeError):
540 self.skymap = self.data["skymap"]
541 if "gwdata" in self.data.keys():
542 try:
543 from pesummary.gw.file.strain import StrainDataDict, StrainData
544 mydict = {}
545 for IFO, value in self.data["gwdata"].items():
546 channel = [ch for ch in value.keys() if "_attrs" not in ch][0]
547 if "{}_attrs".format(channel) in value.keys():
548 _attrs = value["{}_attrs".format(channel)]
549 else:
550 _attrs = {}
551 mydict[IFO] = StrainData(value[channel], **_attrs)
552 self.gwdata = StrainDataDict(mydict)
553 except (KeyError, AttributeError):
554 pass
556 def convert_and_translate_prior_samples(self, priors, disable_convert=False):
557 """
558 """
559 from pesummary.utils.samples_dict import MultiAnalysisSamplesDict
561 mydict = {}
562 for num, label in enumerate(self.labels):
563 if label in priors.keys() and len(priors[label]):
564 default_parameters = list(priors[label].keys())
565 default_samples = np.array(
566 [priors[label][_param] for _param in default_parameters]
567 ).T
568 parameters, samples = self.translate_parameters(
569 [default_parameters], [default_samples]
570 )
571 if not disable_convert:
572 mydict[label] = convert(
573 parameters[0], samples[0], extra_kwargs=self.extra_kwargs[num]
574 )
575 else:
576 mydict[label] = SamplesDict(parameters[0], samples[0])
577 else:
578 mydict[label] = {}
579 return MultiAnalysisSamplesDict(mydict)
581 def check_for_log_likelihood(self, parameters):
582 if all("log_likelihood" in p for p in parameters):
583 return True
584 return False
586 @staticmethod
587 def translate_parameters(parameters, samples):
588 """Translate parameters to a standard names
590 Parameters
591 ----------
592 parameters: list
593 list of parameters used in the analysis
594 samples: list
595 list of samples for each parameters
596 """
597 converted_params = []
598 for _parameters, _samples in zip(parameters, samples):
599 converted_params.append(
600 _translate_parameters(_parameters, _samples)[0]
601 )
602 return converted_params, samples
604 def add_log_likelihood(self, parameters, samples):
605 """
606 """
607 parameters_logl, samples_logl = [], []
608 for _parameters, _samples in zip(parameters, samples):
609 pp, ss = _add_log_likelihood(_parameters, _samples)
610 parameters_logl.append(pp)
611 samples_logl.append(ss)
612 return parameters_logl, samples_logl
614 def generate_all_posterior_samples(self, labels=None, **conversion_kwargs):
615 if "no_conversion" in conversion_kwargs.keys():
616 no_conversion = conversion_kwargs.pop("no_conversion")
617 else:
618 no_conversion = False
619 if no_conversion:
620 return
621 from pesummary.gw.conversions import convert
623 converted_params, converted_samples, converted_kwargs = [], [], []
624 _converted_params = []
625 for label, param, samples, kwargs in zip(
626 self.labels, self.parameters, self.samples, self.extra_kwargs
627 ):
628 if labels is not None and label not in labels:
629 converted_params.append(param)
630 _converted_params.append([])
631 converted_samples.append(samples)
632 if kwargs.get("return_kwargs", False):
633 converted_kwargs.append(kwargs)
634 continue
635 if label in conversion_kwargs.keys():
636 _conversion_kwargs = conversion_kwargs[label]
637 else:
638 _conversion_kwargs = conversion_kwargs
639 if _conversion_kwargs.get("evolve_spins", False):
640 if not _conversion_kwargs.get("return_kwargs", False):
641 _conversion_kwargs["return_kwargs"] = True
642 data = convert(
643 param, samples, extra_kwargs=kwargs, return_dict=False,
644 **_conversion_kwargs
645 )
646 converted_params.append(data[0])
647 _converted_params.append(data[0].added)
648 converted_samples.append(data[1])
649 if kwargs.get("return_kwargs", False):
650 converted_kwargs.append(data[2])
651 self.parameters = converted_params
652 self.converted_parameters = _converted_params
653 self.samples = converted_samples
654 if converted_kwargs != []:
655 self.extra_kwargs = {
656 label: converted_kwargs[num] for num, label in enumerate(
657 self.labels
658 )
659 }
661 def convert_injection_parameters(
662 self, data, extra_kwargs={"sampler": {}, "meta_data": {}},
663 disable_convert=False
664 ):
665 """Apply the conversion module to the injection data
666 """
667 for num, label in enumerate(self.labels):
668 _identifier = label
669 if isinstance(data, dict):
670 _data = data[label]
671 else:
672 _data = data[num]
673 _identifier = num
674 data[_identifier] = super(
675 GWMultiAnalysisRead, self
676 ).convert_injection_parameters(
677 _data, extra_kwargs=extra_kwargs[num],
678 disable_convert=disable_convert,
679 sampled_parameters=self.parameters[num]
680 )
681 return data