Coverage for pesummary/gw/file/formats/base_read.py: 80.1%
297 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-15 17:49 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-15 17:49 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import numpy as np
4from pesummary.gw.file.standard_names import standard_names
5from pesummary.core.file.formats.base_read import (
6 Read, SingleAnalysisRead, MultiAnalysisRead
7)
8from pesummary.utils.utils import logger
9from pesummary.utils.samples_dict import SamplesDict
10from pesummary.utils.decorators import open_config
11from pesummary.gw.conversions import convert
13__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
16def _translate_parameters(parameters, samples):
17 """Translate parameters to a standard names
19 Parameters
20 ----------
21 parameters: list
22 list of parameters used in the analysis
23 samples: list
24 list of samples for each parameters
25 """
26 path = ("https://git.ligo.org/lscsoft/pesummary/blob/master/pesummary/"
27 "gw/file/standard_names.py")
28 parameters_not_included = [
29 i for i in parameters if i not in standard_names.keys()
30 ]
31 if len(parameters_not_included) > 0:
32 logger.debug(
33 "PESummary does not have a 'standard name' for the following "
34 "parameters: {}. This means that comparison plots between "
35 "different codes may not show these parameters. If you want to "
36 "assign a standard name for these parameters, please add an MR "
37 "which edits the following file: {}. These parameters will be "
38 "added to the result pages and meta file as is.".format(
39 ", ".join(parameters_not_included), path
40 )
41 )
42 standard_params = [i for i in parameters if i in standard_names.keys()]
43 converted_params = [
44 standard_names[i] if i in standard_params else i for i in
45 parameters
46 ]
47 return converted_params, samples
50def _add_log_likelihood(parameters, samples):
51 """Add zero log_likelihood samples to the posterior table
53 Parameters
54 ----------
55 parameters: list
56 list of parameters stored in the table
57 samples: 2d list
58 list of samples for each parameter. Columns correspond to a given
59 parameter
60 """
61 if "log_likelihood" not in parameters:
62 parameters.append("log_likelihood")
63 samples = np.vstack(
64 [np.array(samples).T, np.zeros(len(samples))]
65 ).T
66 return parameters, samples
69class GWRead(Read):
70 """Base class to read in a results file
72 Parameters
73 ----------
74 path_to_results_file: str
75 path to the results file you wish to load
76 remove_nan_likelihood_samples: Bool, optional
77 if True, remove samples which have log_likelihood='nan'. Default True
79 Attributes
80 ----------
81 parameters: list
82 list of parameters stored in the result file
83 converted_parameters: list
84 list of parameters that have been derived from the sampled distributions
85 samples: 2d list
86 list of samples stored in the result file
87 samples_dict: dict
88 dictionary of samples stored in the result file keyed by parameters
89 input_version: str
90 version of the result file passed.
91 extra_kwargs: dict
92 dictionary of kwargs that were extracted from the result file
93 converted_parameters: list
94 list of parameters that have been added
96 Methods
97 -------
98 to_dat:
99 save the posterior samples to a .dat file
100 to_latex_table:
101 convert the posterior samples to a latex table
102 generate_latex_macros:
103 generate a set of latex macros for the stored posterior samples
104 to_lalinference:
105 convert the posterior samples to a lalinference result file
106 generate_all_posterior_samples:
107 generate all posterior distributions that may be derived from
108 sampled distributions
109 """
110 def __init__(self, path_to_results_file, **kwargs):
111 super(GWRead, self).__init__(path_to_results_file, **kwargs)
113 @property
114 def calibration_spline_posterior(self):
115 return None
117 Read.attrs.update({"approximant": "approximant"})
119 def load(self, function, _data=None, **kwargs):
120 """Load a results file according to a given function
122 Parameters
123 ----------
124 function: func
125 callable function that will load in your results file
126 """
127 data = _data
128 if _data is None:
129 data = self.load_from_function(
130 function, self.path_to_results_file, **kwargs
131 )
132 parameters, samples = self.translate_parameters(
133 data["parameters"], data["samples"]
134 )
135 _add_likelihood = kwargs.get("add_zero_likelihood", True)
136 if not self.check_for_log_likelihood(parameters) and _add_likelihood:
137 logger.warning(
138 "Failed to find 'log_likelihood' in result file. Setting "
139 "every sample to have log_likelihood 0"
140 )
141 parameters, samples = self.add_log_likelihood(parameters, samples)
142 _mod_psi = kwargs.get("psi_mod_pi", True)
143 if _mod_psi and "psi" in parameters:
144 parameters, samples = self.psi_mod_pi(parameters, samples)
145 data.update(
146 {
147 "parameters": parameters, "samples": samples,
148 "injection": data["injection"]
149 }
150 )
151 super(GWRead, self).load(function, _data=data, **kwargs)
152 if self.injection_parameters is not None:
153 self.injection_parameters = self.convert_injection_parameters(
154 self.injection_parameters, extra_kwargs=self.extra_kwargs,
155 disable_convert=kwargs.get("disable_injection_conversion", False)
156 )
157 if self.priors is not None and len(self.priors):
158 if self.priors["samples"] != {}:
159 priors = self.priors["samples"]
160 self.priors["samples"] = self.convert_and_translate_prior_samples(
161 priors, disable_convert=kwargs.get(
162 "disable_prior_conversion", False
163 )
164 )
166 def convert_and_translate_prior_samples(self, priors, disable_convert=False):
167 """
168 """
169 default_parameters = list(priors.keys())
170 default_samples = [
171 [priors[parameter][i] for parameter in default_parameters] for i
172 in range(len(priors[default_parameters[0]]))
173 ]
174 parameters, samples = self.translate_parameters(
175 default_parameters, default_samples
176 )
177 if not disable_convert:
178 return convert(
179 parameters, samples, extra_kwargs=self.extra_kwargs
180 )
181 return SamplesDict(parameters, samples)
183 def convert_injection_parameters(
184 self, data, extra_kwargs={"sampler": {}, "meta_data": {}},
185 disable_convert=False, sampled_parameters=None, **kwargs
186 ):
187 """Apply the conversion module to the injection data
189 Parameters
190 ----------
191 data: dict
192 dictionary of injection data keyed by the parameter
193 extra_kwargs: dict, optional
194 optional kwargs to pass to the conversion module
195 disable_convert: Bool, optional
196 if True, do not convert injection parameters
197 """
198 import math
199 from pesummary.gw.file.injection import GWInjection
200 kwargs.update({"extra_kwargs": extra_kwargs})
201 _data = data.copy()
202 for key, item in data.items():
203 if math.isnan(np.atleast_1d(item)[0]):
204 _ = _data.pop(key)
205 if len(_data):
206 converted = GWInjection(
207 _data, conversion=not disable_convert, conversion_kwargs=kwargs
208 )
209 _param = list(converted.keys())[0]
210 _example = converted[_param]
211 if not len(_example.shape):
212 for key, item in converted.items():
213 converted[key] = [item]
214 else:
215 converted = _data
216 for i in sampled_parameters:
217 if i not in list(converted.keys()):
218 converted[i] = float('nan')
219 return converted
221 def write(self, package="core", **kwargs):
222 """Save the data to file
224 Parameters
225 ----------
226 package: str, optional
227 package you wish to use when writing the data
228 kwargs: dict, optional
229 all additional kwargs are passed to the pesummary.io.write function
230 """
231 return super(GWRead, self).write(package="gw", **kwargs)
233 def _grab_injection_parameters_from_file(self, path, **kwargs):
234 """Extract data from an injection file
236 Parameters
237 ----------
238 path: str
239 path to injection file
240 """
241 from pesummary.gw.file.injection import GWInjection
242 return super(GWRead, self)._grab_injection_parameters_from_file(
243 path, cls=GWInjection, **kwargs
244 )
246 def interpolate_calibration_spline_posterior(self, **kwargs):
247 from pesummary.gw.file.calibration import Calibration
248 from pesummary.utils.utils import iterator
249 if self.calibration_spline_posterior is None:
250 return
251 total = []
252 log_frequencies, amplitudes, phases = self.calibration_spline_posterior
253 keys = list(log_frequencies.keys())
254 _iterator = iterator(
255 None, desc="Interpolating calibration posterior", logger=logger,
256 tqdm=True, total=len(self.samples) * 2 * len(keys)
257 )
258 with _iterator as pbar:
259 for key in keys:
260 total.append(
261 Calibration.from_spline_posterior_samples(
262 np.array(log_frequencies[key]),
263 np.array(amplitudes[key]), np.array(phases[key]),
264 pbar=pbar, **kwargs
265 )
266 )
267 return total, log_frequencies.keys()
269 @staticmethod
270 def translate_parameters(parameters, samples):
271 """Translate parameters to a standard names
273 Parameters
274 ----------
275 parameters: list
276 list of parameters used in the analysis
277 samples: list
278 list of samples for each parameters
279 """
280 return _translate_parameters(parameters, samples)
282 @staticmethod
283 def _check_definition_of_inclination(parameters):
284 """Check the definition of inclination given the other parameters
286 Parameters
287 ----------
288 parameters: list
289 list of parameters used in the study
290 """
291 theta_jn = False
292 spin_angles = ["tilt_1", "tilt_2", "a_1", "a_2"]
293 names = [
294 standard_names[i] for i in parameters if i in standard_names.keys()]
295 if all(i in names for i in spin_angles):
296 theta_jn = True
297 if theta_jn:
298 if "theta_jn" not in names and "inclination" in parameters:
299 logger.warning("Because the spin angles are in your list of "
300 "parameters, the angle 'inclination' probably "
301 "refers to 'theta_jn'. If this is a mistake, "
302 "please change the definition of 'inclination' to "
303 "'iota' in your results file")
304 index = parameters.index("inclination")
305 parameters[index] = "theta_jn"
306 else:
307 if "inclination" in parameters:
308 index = parameters.index("inclination")
309 parameters[index] = "iota"
310 return parameters
312 def add_fixed_parameters_from_config_file(self, config_file):
313 """Search the conifiguration file and add fixed parameters to the
314 list of parameters and samples
316 Parameters
317 ----------
318 config_file: str
319 path to the configuration file
320 """
321 self._add_fixed_parameters_from_config_file(
322 config_file, self._add_fixed_parameters)
324 @staticmethod
325 @open_config(index=2)
326 def _add_fixed_parameters(parameters, samples, config_file):
327 """Open a LALInference configuration file and add the fixed parameters
328 to the list of parameters and samples
330 Parameters
331 ----------
332 parameters: list
333 list of existing parameters
334 samples: list
335 list of existing samples
336 config_file: str
337 path to the configuration file
338 """
339 from pesummary.gw.file.standard_names import standard_names
341 config = config_file
342 if not config.error:
343 fixed_data = {}
344 if "engine" in config.sections():
345 fixed_data = {
346 key.split("fix-")[1]: item for key, item in
347 config.items("engine") if "fix" in key}
348 for i in fixed_data.keys():
349 fixed_parameter = i
350 fixed_value = fixed_data[i]
351 try:
352 param = standard_names[fixed_parameter]
353 if param in parameters:
354 pass
355 else:
356 parameters.append(param)
357 for num in range(len(samples)):
358 samples[num].append(float(fixed_value))
359 except Exception:
360 if fixed_parameter == "logdistance":
361 if "luminosity_distance" not in parameters:
362 parameters.append(standard_names["distance"])
363 for num in range(len(samples)):
364 samples[num].append(float(fixed_value))
365 if fixed_parameter == "costheta_jn":
366 if "theta_jn" not in parameters:
367 parameters.append(standard_names["theta_jn"])
368 for num in range(len(samples)):
369 samples[num].append(float(fixed_value))
370 return parameters, samples
371 return parameters, samples
374class GWSingleAnalysisRead(GWRead, SingleAnalysisRead):
375 """Base class to read in a results file which contains a single analysis
377 Parameters
378 ----------
379 path_to_results_file: str
380 path to the results file you wish to load
381 remove_nan_likelihood_samples: Bool, optional
382 if True, remove samples which have log_likelihood='nan'. Default True
384 Attributes
385 ----------
386 parameters: list
387 list of parameters stored in the result file
388 converted_parameters: list
389 list of parameters that have been derived from the sampled distributions
390 samples: 2d list
391 list of samples stored in the result file
392 samples_dict: dict
393 dictionary of samples stored in the result file keyed by parameters
394 input_version: str
395 version of the result file passed.
396 extra_kwargs: dict
397 dictionary of kwargs that were extracted from the result file
398 converted_parameters: list
399 list of parameters that have been added
401 Methods
402 -------
403 to_dat:
404 save the posterior samples to a .dat file
405 to_latex_table:
406 convert the posterior samples to a latex table
407 generate_latex_macros:
408 generate a set of latex macros for the stored posterior samples
409 to_lalinference:
410 convert the posterior samples to a lalinference result file
411 generate_all_posterior_samples:
412 generate all posterior distributions that may be derived from
413 sampled distributions
414 """
415 def __init__(self, *args, **kwargs):
416 super(GWSingleAnalysisRead, self).__init__(*args, **kwargs)
418 def check_for_log_likelihood(self, parameters):
419 """Return True if 'log_likelihood' is in a list of sampled parameters
421 Parameters
422 ----------
423 parameters: list
424 list of sampled parameters
425 """
426 if "log_likelihood" in parameters:
427 return True
428 return False
430 def add_log_likelihood(self, parameters, samples):
431 """Add log_likelihood samples to a posterior table
433 Parameters
434 ----------
435 parameters: list
436 list of parameters stored in the table
437 samples: 2d list
438 list of samples for each parameter. Columns correspond to a given
439 parameter
440 """
441 return _add_log_likelihood(parameters, samples)
443 def psi_mod_pi(self, parameters, samples):
444 """Fold the polarisation (psi) between the interval [0, pi). This is
445 only performed if there are polarisation samples > pi in the dataset.
447 Parameters
448 ----------
449 parameters: list
450 list of parameters stored in the table
451 samples: 2d list
452 list of samples for each parameter. Columns correspond to a given
453 parameter
454 """
455 from pesummary.gw.conversions.angles import psi_mod_pi
456 ind = parameters.index("psi")
457 _psi = np.array(samples)[:,ind]
458 if not np.any(_psi > np.pi):
459 return parameters, samples
460 logger.warning(
461 "Folding the polarisation (psi) between the interval [0, pi). "
462 "If this is not desired, pass 'psi_mod_pi=False'."
463 )
464 _psi_mod = psi_mod_pi(_psi)
465 for row in np.arange(len(samples)):
466 samples[row][ind] = _psi_mod[row]
467 return parameters, samples
469 def generate_all_posterior_samples(self, **kwargs):
470 """Generate all posterior samples via the conversion module
472 Parameters
473 ----------
474 **kwargs: dict
475 all kwargs passed to the conversion module
476 """
477 if "no_conversion" in kwargs.keys():
478 no_conversion = kwargs.pop("no_conversion")
479 else:
480 no_conversion = False
481 if not no_conversion:
482 from pesummary.gw.conversions import convert
484 data = convert(
485 self.parameters, self.samples, extra_kwargs=self.extra_kwargs,
486 return_dict=False, **kwargs
487 )
488 self.parameters = data[0]
489 self.converted_parameters = self.parameters.added
490 self.samples = data[1]
491 if kwargs.get("return_kwargs", False):
492 self.extra_kwargs = data[2]
494 def convert_injection_parameters(self, *args, **kwargs):
495 return super(GWSingleAnalysisRead, self).convert_injection_parameters(
496 *args, sampled_parameters=self.parameters, **kwargs
497 )
499 def to_lalinference(self, **kwargs):
500 """Save the PESummary results file object to a lalinference hdf5 file
502 Parameters
503 ----------
504 kwargs: dict
505 all kwargs are passed to the pesummary.io.write.write function
506 """
507 return self.write(file_format="lalinference", package="gw", **kwargs)
510class GWMultiAnalysisRead(GWRead, MultiAnalysisRead):
511 """Base class to read in a results file which contains multiple analyses
513 Parameters
514 ----------
515 path_to_results_file: str
516 path to the results file you wish to load
517 remove_nan_likelihood_samples: Bool, optional
518 if True, remove samples which have log_likelihood='nan'. Default True
519 """
520 def __init__(self, *args, **kwargs):
521 super(GWMultiAnalysisRead, self).__init__(*args, **kwargs)
523 def load(self, *args, **kwargs):
524 super(GWMultiAnalysisRead, self).load(*args, **kwargs)
525 if "psd" in self.data.keys():
526 from pesummary.gw.file.psd import PSDDict
528 try:
529 self.psd = {
530 label: PSDDict(
531 {ifo: value for ifo, value in psd_data.items()}
532 ) for label, psd_data in self.data["psd"].items()
533 }
534 except (KeyError, AttributeError):
535 self.psd = self.data["psd"]
536 if "calibration" in self.data.keys():
537 from pesummary.gw.file.calibration import Calibration
539 try:
540 self.calibration = {
541 label: {
542 ifo: Calibration(value) for ifo, value in
543 calibration_data.items()
544 } for label, calibration_data in
545 self.data["calibration"].items()
546 }
547 except (KeyError, AttributeError):
548 self.calibration = self.data["calibration"]
549 if "prior" in self.data.keys() and "calibration" in self.data["prior"].keys():
550 from pesummary.gw.file.calibration import CalibrationDict
552 try:
553 self.priors["calibration"] = {
554 label: CalibrationDict(calibration_data) for
555 label, calibration_data in
556 self.data["prior"]["calibration"].items()
557 }
558 except (KeyError, AttributeError):
559 pass
560 if "skymap" in self.data.keys():
561 from pesummary.gw.file.skymap import SkyMapDict, SkyMap
563 try:
564 self.skymap = SkyMapDict({
565 label: SkyMap(skymap["data"], skymap["meta_data"])
566 for label, skymap in self.data["skymap"].items()
567 })
568 except (KeyError, AttributeError):
569 self.skymap = self.data["skymap"]
570 if "gwdata" in self.data.keys():
571 try:
572 from pesummary.gw.file.strain import StrainDataDict, StrainData
573 mydict = {}
574 for IFO, value in self.data["gwdata"].items():
575 channel = [ch for ch in value.keys() if "_attrs" not in ch][0]
576 if "{}_attrs".format(channel) in value.keys():
577 _attrs = value["{}_attrs".format(channel)]
578 else:
579 _attrs = {}
580 mydict[IFO] = StrainData(value[channel], **_attrs)
581 self.gwdata = StrainDataDict(mydict)
582 except (KeyError, AttributeError):
583 pass
585 def convert_and_translate_prior_samples(self, priors, disable_convert=False):
586 """
587 """
588 from pesummary.utils.samples_dict import MultiAnalysisSamplesDict
590 mydict = {}
591 for num, label in enumerate(self.labels):
592 if label in priors.keys() and len(priors[label]):
593 default_parameters = list(priors[label].keys())
594 default_samples = np.array(
595 [priors[label][_param] for _param in default_parameters]
596 ).T
597 parameters, samples = self.translate_parameters(
598 [default_parameters], [default_samples]
599 )
600 if not disable_convert:
601 mydict[label] = convert(
602 parameters[0], samples[0], extra_kwargs=self.extra_kwargs[num]
603 )
604 else:
605 mydict[label] = SamplesDict(parameters[0], samples[0])
606 else:
607 mydict[label] = {}
608 return MultiAnalysisSamplesDict(mydict)
610 def check_for_log_likelihood(self, parameters):
611 if all("log_likelihood" in p for p in parameters):
612 return True
613 return False
615 @staticmethod
616 def translate_parameters(parameters, samples):
617 """Translate parameters to a standard names
619 Parameters
620 ----------
621 parameters: list
622 list of parameters used in the analysis
623 samples: list
624 list of samples for each parameters
625 """
626 converted_params = []
627 for _parameters, _samples in zip(parameters, samples):
628 converted_params.append(
629 _translate_parameters(_parameters, _samples)[0]
630 )
631 return converted_params, samples
633 def add_log_likelihood(self, parameters, samples):
634 """
635 """
636 parameters_logl, samples_logl = [], []
637 for _parameters, _samples in zip(parameters, samples):
638 pp, ss = _add_log_likelihood(_parameters, _samples)
639 parameters_logl.append(pp)
640 samples_logl.append(ss)
641 return parameters_logl, samples_logl
643 def generate_all_posterior_samples(self, labels=None, **conversion_kwargs):
644 if "no_conversion" in conversion_kwargs.keys():
645 no_conversion = conversion_kwargs.pop("no_conversion")
646 else:
647 no_conversion = False
648 if no_conversion:
649 return
650 from pesummary.gw.conversions import convert
652 converted_params, converted_samples, converted_kwargs = [], [], []
653 _converted_params = []
654 for label, param, samples, kwargs in zip(
655 self.labels, self.parameters, self.samples, self.extra_kwargs
656 ):
657 if labels is not None and label not in labels:
658 converted_params.append(param)
659 _converted_params.append([])
660 converted_samples.append(samples)
661 if kwargs.get("return_kwargs", False):
662 converted_kwargs.append(kwargs)
663 continue
664 if label in conversion_kwargs.keys():
665 _conversion_kwargs = conversion_kwargs[label]
666 else:
667 _conversion_kwargs = conversion_kwargs
668 if _conversion_kwargs.get("evolve_spins", False):
669 if not _conversion_kwargs.get("return_kwargs", False):
670 _conversion_kwargs["return_kwargs"] = True
671 data = convert(
672 param, samples, extra_kwargs=kwargs, return_dict=False,
673 **_conversion_kwargs
674 )
675 converted_params.append(data[0])
676 _converted_params.append(data[0].added)
677 converted_samples.append(data[1])
678 if kwargs.get("return_kwargs", False):
679 converted_kwargs.append(data[2])
680 self.parameters = converted_params
681 self.converted_parameters = _converted_params
682 self.samples = converted_samples
683 if converted_kwargs != []:
684 self.extra_kwargs = {
685 label: converted_kwargs[num] for num, label in enumerate(
686 self.labels
687 )
688 }
690 def convert_injection_parameters(
691 self, data, extra_kwargs={"sampler": {}, "meta_data": {}},
692 disable_convert=False
693 ):
694 """Apply the conversion module to the injection data
695 """
696 for num, label in enumerate(self.labels):
697 _identifier = label
698 if isinstance(data, dict):
699 _data = data[label]
700 else:
701 _data = data[num]
702 _identifier = num
703 data[_identifier] = super(
704 GWMultiAnalysisRead, self
705 ).convert_injection_parameters(
706 _data, extra_kwargs=extra_kwargs[num],
707 disable_convert=disable_convert,
708 sampled_parameters=self.parameters[num]
709 )
710 return data