Coverage for pesummary/core/file/meta_file.py: 83.9%
367 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-15 17:49 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-15 17:49 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import inspect
4import os
5import numpy as np
6import json
7import copy
8from getpass import getuser
9import pandas as pd
10import pesummary
11from pesummary import __version__
12from pesummary.utils.dict import Dict
13from pesummary.utils.samples_dict import SamplesDict
14from pesummary.utils.utils import make_dir, logger
15from pesummary.utils.decorators import open_config
16from pesummary import conf
18__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
19DEFAULT_HDF5_KEYS = ["version", "history"]
22def recursively_save_dictionary_to_hdf5_file(
23 f, dictionary, current_path=None, extra_keys=DEFAULT_HDF5_KEYS, compression=None
24):
25 """Recursively save a dictionary to a hdf5 file
27 Parameters
28 ----------
29 f: h5py._hl.files.File
30 the open hdf5 file that you would like the data to be saved to
31 dictionary: dict
32 dictionary of data
33 current_path: optional, str
34 string to indicate the level to save the data in the hdf5 file
35 compression: int, optional
36 optional filter to apply for compression. If you do not want to
37 apply compression, compression = None. Default None.
38 """
40 def _safe_create_hdf5_group(hdf5_file, key):
41 if key not in hdf5_file.keys():
42 hdf5_file.create_group(key)
44 for key in extra_keys:
45 if key in dictionary:
46 _safe_create_hdf5_group(hdf5_file=f, key=key)
47 if current_path is None:
48 current_path = []
50 for k, v in dictionary.items():
51 if isinstance(v, pd.DataFrame):
52 v = v.to_dict(orient="list")
53 if isinstance(v, dict):
54 if k not in f["/" + "/".join(current_path)].keys():
55 f["/".join(current_path)].create_group(k)
56 path = current_path + [k]
57 recursively_save_dictionary_to_hdf5_file(
58 f, v, path, extra_keys=extra_keys, compression=compression
59 )
60 else:
61 if isinstance(dictionary, Dict):
62 attrs = dictionary.extra_kwargs
63 else:
64 attrs = {}
65 create_hdf5_dataset(
66 key=k,
67 value=v,
68 hdf5_file=f,
69 current_path=current_path,
70 compression=compression,
71 attrs=attrs,
72 )
75def create_hdf5_dataset(key, value, hdf5_file, current_path, compression=None, attrs={}):
76 """
77 Create a hdf5 dataset in place
79 Parameters
80 ----------
81 key: str
82 Key for the new dataset
83 value: array-like, str
84 Data to store. If you wish to create a softlink to another dataset
85 then value should be a string in the form `softlink:/path/to/dataset`
86 where `/path/to/dataset/` is the path to the dataset which you wish to
87 create a softlink to
88 hdf5_file: h5py.File
89 hdf5 file object
90 current_path: str
91 Current string withing the hdf5 file
92 compression: int, optional
93 optional filter to apply for compression. If you do not want to
94 apply compression, compression = None. Default None.
95 attrs: dict, optional
96 optional list of attributes to store alongside the dataset
97 """
98 error_message = "Cannot process {}={} from list with type {} for hdf5"
99 array_types = (list, pesummary.utils.samples_dict.Array, np.ndarray, tuple)
100 numeric_types = (float, int, np.number)
101 string_types = (str, bytes)
102 SOFTLINK = False
104 if isinstance(value, array_types):
105 import math
107 if not len(value):
108 data = np.array([])
109 elif isinstance(value[0], string_types):
110 data = np.array(value, dtype="S")
111 elif isinstance(value[0], array_types):
112 data = np.array(np.vstack(value))
113 elif isinstance(value[0], (tuple, np.record, np.recarray)):
114 data = value
115 elif all(isinstance(_value, (bool, np.bool_)) for _value in value):
116 data = np.array([str(_value) for _value in value], dtype="S")
117 elif all(_value is None for _value in value):
118 data = np.array(["None"] * len(value), dtype="S")
119 elif isinstance(value[0], np.void) and value.dtype.names:
120 data = value
121 elif math.isnan(value[0]):
122 data = np.array(["NaN"] * len(value), dtype="S")
123 elif isinstance(value[0], numeric_types):
124 data = np.array(value)
125 else:
126 raise TypeError(error_message.format(key, value, type(value[0])))
127 elif isinstance(value, string_types[0]) and "softlink:" in value:
128 import h5py
130 SOFTLINK = True
131 hdf5_file["/".join(current_path + [key])] = h5py.SoftLink(
132 value.split("softlink:")[1]
133 )
134 elif isinstance(value, string_types[0]) and "external:" in value:
135 import h5py
137 SOFTLINK = True
138 substring = value.split("external:")[1]
139 _file, _path = substring.split("|")
140 hdf5_file["/".join(current_path + [key])] = h5py.ExternalLink(_file, _path)
141 elif isinstance(value, string_types):
142 data = np.array([value], dtype="S")
143 elif isinstance(value, numeric_types):
144 data = np.array([value])
145 elif isinstance(value, (bool, np.bool_)):
146 data = np.array([str(value)], dtype="S")
147 elif isinstance(value, complex):
148 key += "_amp"
149 data = np.array(np.abs(value))
150 elif isinstance(value, np.random.Generator):
151 state = value.bit_generator.state
152 data = np.array([
153 f"state:{str(state['state']['state'])}",
154 f"inc:{str(state['state']['inc'])}"
155 ], dtype="S")
156 elif value == {}:
157 data = np.array(np.array("NaN"))
158 elif inspect.isclass(value) or inspect.isfunction(value):
159 data = np.array([value.__module__ + value.__name__], dtype="S")
160 elif inspect.ismodule(value):
161 data = np.array([value.__name__], dtype="S")
162 elif value is None:
163 data = np.array(["None"], dtype="S")
164 else:
165 raise TypeError(error_message.format(key, value, type(value)))
166 if not SOFTLINK:
167 if compression is not None and len(data) > conf.compression_min_length:
168 kwargs = {"compression": "gzip", "compression_opts": compression}
169 else:
170 kwargs = {}
171 try:
172 dset = hdf5_file["/".join(current_path)].create_dataset(
173 key, data=data, **kwargs
174 )
175 except ValueError:
176 dset = hdf5_file.create_dataset(key, data=data, **kwargs)
177 if len(attrs):
178 dset.attrs.update(attrs)
181class PESummaryJsonEncoder(json.JSONEncoder):
182 """Personalised JSON encoder for PESummary"""
184 def default(self, obj):
185 """Return a json serializable object for 'obj'
187 Parameters
188 ----------
189 obj: object
190 object you wish to make json serializable
191 """
192 if isinstance(obj, np.ndarray):
193 return obj.tolist()
194 if inspect.isfunction(obj):
195 return str(obj)
196 if isinstance(obj, np.integer):
197 return int(obj)
198 elif isinstance(obj, np.floating):
199 return float(obj)
200 elif isinstance(obj, (bool, np.bool_)):
201 return str(obj)
202 elif isinstance(obj, bytes):
203 return str(obj)
204 elif isinstance(obj, type):
205 return str(obj)
206 return json.JSONEncoder.default(self, obj)
209class _MetaFile(object):
210 """This is a base class to handle the functions to generate a meta file"""
212 def __init__(
213 self,
214 samples,
215 labels,
216 config,
217 injection_data,
218 file_versions,
219 file_kwargs,
220 webdir=None,
221 result_files=None,
222 hdf5=False,
223 priors={},
224 existing_version=None,
225 existing_label=None,
226 existing_samples=None,
227 existing_injection=None,
228 existing_metadata=None,
229 existing_config=None,
230 existing_priors={},
231 existing_metafile=None,
232 outdir=None,
233 existing=None,
234 package_information={},
235 mcmc_samples=False,
236 filename=None,
237 external_hdf5_links=False,
238 hdf5_compression=None,
239 history=None,
240 descriptions=None,
241 ):
242 self.data = {}
243 self.webdir = webdir
244 self.result_files = result_files
245 self.samples = samples
246 self.labels = labels
247 self.config = config
248 self.injection_data = injection_data
249 self.file_versions = file_versions
250 self.file_kwargs = file_kwargs
251 self.hdf5 = hdf5
252 self.file_name = filename
253 self.external_hdf5_links = external_hdf5_links
254 self.hdf5_compression = hdf5_compression
255 self.history = history
256 self.descriptions = descriptions
257 if self.history is None:
258 from pesummary.utils.utils import history_dictionary
260 try:
261 _user = getuser()
262 except (ImportError, KeyError):
263 _user = ""
264 self.history = history_dictionary(creator=_user)
265 if self.descriptions is None:
266 self.descriptions = {label: "No description found" for label in self.labels}
267 elif not all(label in self.descriptions.keys() for label in self.labels):
268 for label in self.labels:
269 if label not in self.descriptions.keys():
270 self.descriptions[label] = "No description found"
271 self.priors = priors
272 self.existing_version = existing_version
273 self.existing_labels = existing_label
274 self.existing_samples = existing_samples
275 self.existing_injection = existing_injection
276 self.existing_file_kwargs = existing_metadata
277 self.existing_config = existing_config
278 self.existing_priors = existing_priors
279 self.existing_metafile = existing_metafile
280 self.existing = existing
281 self.outdir = outdir
282 self.package_information = package_information
283 if not len(package_information):
284 from pesummary.core.cli.inputs import _Input
286 self.package_information = _Input.get_package_information()
287 self.mcmc_samples = mcmc_samples
289 if self.existing_labels is None:
290 self.existing_labels = [None]
291 if self.existing is not None:
292 self.add_existing_data()
294 @property
295 def outdir(self):
296 return self._outdir
298 @outdir.setter
299 def outdir(self, outdir):
300 if outdir is not None:
301 self._outdir = os.path.abspath(outdir)
302 elif self.webdir is not None:
303 self._outdir = os.path.join(self.webdir, "samples")
304 else:
305 raise Exception("Please provide an output directory for the data")
307 @property
308 def file_name(self):
309 return self._file_name
311 @file_name.setter
312 def file_name(self, file_name):
313 if file_name is not None:
314 self._file_name = file_name
315 else:
316 base = "posterior_samples.{}"
317 if self.hdf5:
318 self._file_name = base.format("h5")
319 else:
320 self._file_name = base.format("json")
322 @property
323 def meta_file(self):
324 return os.path.join(os.path.abspath(self.outdir), self.file_name)
326 def make_dictionary(self):
327 """Wrapper function for _make_dictionary"""
328 self._make_dictionary()
330 @property
331 def _dictionary_structure(self):
332 if self.mcmc_samples:
333 posterior = "mcmc_chains"
334 else:
335 posterior = "posterior_samples"
336 dictionary = {
337 label: {
338 posterior: {},
339 "injection_data": {},
340 "version": {},
341 "meta_data": {},
342 "priors": {},
343 "config_file": {},
344 }
345 for label in self.labels
346 }
347 dictionary["version"] = self.package_information
348 dictionary["version"]["pesummary"] = [__version__]
349 dictionary["history"] = self.history
350 return dictionary
352 def _make_dictionary(self):
353 """Generate a single dictionary which stores all information"""
354 if self.mcmc_samples:
355 posterior = "mcmc_chains"
356 else:
357 posterior = "posterior_samples"
358 dictionary = self._dictionary_structure
359 if self.file_kwargs is not None and isinstance(self.file_kwargs, dict):
360 if "webpage_url" in self.file_kwargs.keys():
361 dictionary["history"]["webpage_url"] = self.file_kwargs["webpage_url"]
362 else:
363 dictionary["history"]["webpage_url"] = "None"
364 for num, label in enumerate(self.labels):
365 parameters = self.samples[label].keys()
366 samples = np.array([self.samples[label][i] for i in parameters]).T
367 dictionary[label][posterior] = {
368 "parameter_names": list(parameters),
369 "samples": samples.tolist(),
370 }
371 dictionary[label]["injection_data"] = {
372 "parameters": list(parameters),
373 "samples": [self.injection_data[label][i] for i in parameters],
374 }
375 dictionary[label]["version"] = [self.file_versions[label]]
376 dictionary[label]["description"] = [self.descriptions[label]]
377 dictionary[label]["meta_data"] = self.file_kwargs[label]
378 if (
379 self.config != {}
380 and self.config[num] is not None
381 and not isinstance(self.config[num], dict)
382 ):
383 config = self._grab_config_data_from_data_file(self.config[num])
384 dictionary[label]["config_file"] = config
385 elif self.config[num] is not None:
386 dictionary[label]["config_file"] = self.config[num]
387 for key in self.priors.keys():
388 if label in self.priors[key].keys():
389 dictionary[label]["priors"][key] = self.priors[key][label]
390 self.data = dictionary
392 @staticmethod
393 @open_config(index=0)
394 def _grab_config_data_from_data_file(file):
395 """Return the config data as a dictionary
397 Parameters
398 ----------
399 file: str
400 path to the configuration file
401 """
402 config = file
403 sections = config.sections()
404 data = {}
405 if config.error:
406 logger.info(
407 "Unable to open %s with configparser because %s. The data will "
408 "not be stored in the meta file" % (config.path_to_file, config.error)
409 )
410 if sections != []:
411 for i in sections:
412 data[i] = {}
413 for key in config["%s" % (i)]:
414 data[i][key] = config["%s" % (i)]["%s" % (key)]
415 return data
417 @staticmethod
418 def write_to_dat(file_name, samples, header=None):
419 """Write samples to a .dat file
421 Parameters
422 ----------
423 file_name: str
424 the name of the file that you wish to write the samples to
425 samples: np.ndarray
426 1d/2d array of samples to write to file
427 header: list, optional
428 List of strings to write at the beginning of the file
429 """
430 np.savetxt(
431 file_name,
432 samples,
433 delimiter=conf.delimiter,
434 header=conf.delimiter.join(header),
435 comments="",
436 )
438 @staticmethod
439 def _convert_posterior_samples_to_numpy(dictionary, mcmc_samples=False, index=None):
440 """Convert the posterior samples from a column-major dictionary
441 to a row-major numpy array
443 Parameters
444 ----------
445 dictionary: dict
446 dictionary of posterior samples to convert to a numpy array.
447 mcmc_samples: Bool, optional
448 if True, the dictionary contains seperate mcmc chains
450 Examples
451 --------
452 >>> dictionary = {"mass_1": [1,2,3], "mass_2": [1,2,3]}
453 >>> dictionry = _Metafile._convert_posterior_samples_to_numpy(
454 ... dictionary
455 ... )
456 >>> print(dictionary)
457 ... rec.array([(1., 1.), (2., 2.), (3., 3.)],
458 ... dtype=[('mass_1', '<f4'), ('mass_2', '<f4')])
459 """
460 samples = copy.deepcopy(dictionary)
461 if mcmc_samples:
462 parameters = list(samples.keys())
463 chains = samples[parameters[0]].keys()
464 data = {
465 key: SamplesDict({param: samples[param][key] for param in parameters})
466 for key in chains
467 }
468 return {key: item.to_structured_array() for key, item in data.items()}
469 return samples.to_structured_array(index=index)
471 @staticmethod
472 def _create_softlinks(dictionary):
473 """Identify duplicated entries in a dictionary and replace them with
474 `softlink:/path/to/existing_dataset`. This is required for creating
475 softlinks when saved in hdf5 format
477 Parameters
478 ----------
479 dictionary: dict
480 nested dictionary of data
481 """
482 try:
483 from pandas.io.json._normalize import nested_to_record
484 except ImportError:
485 from pandas.io.json.normalize import nested_to_record
487 def modify_dict(key, dictionary, replace):
488 """ """
489 from functools import reduce
490 from operator import getitem
492 mod = copy.deepcopy(dictionary)
493 key_list = key.split("/")
494 reduce(getitem, key_list[:-1], mod)[key_list[-1]] = replace
495 return mod
497 data = copy.deepcopy(dictionary)
498 flat_dictionary = nested_to_record(data, sep="/")
499 rev_dictionary = {}
500 for key, value in flat_dictionary.items():
501 try:
502 rev_dictionary.setdefault(value, set()).add(key)
503 except TypeError:
504 rev_dictionary.setdefault(str(value), set()).add(key)
506 for key, values in rev_dictionary.items():
507 if len(values) > 1:
508 tmp = list(values)
509 for val in tmp[1:]:
510 data = modify_dict(val, data, "softlink:/{}".format(tmp[0]))
511 return data
513 def write_marginalized_posterior_to_dat(self):
514 """Write the marginalized posterior for each parameter to a .dat file"""
515 if self.mcmc_samples:
516 return
517 for label in self.labels:
518 if not os.path.isdir(os.path.join(self.outdir, label)):
519 make_dir(os.path.join(self.outdir, label))
520 for param, samples in self.samples[label].items():
521 self.write_to_dat(
522 os.path.join(self.outdir, label, "{}_{}.dat".format(label, param)),
523 samples,
524 header=[param],
525 )
527 @staticmethod
528 def save_to_json(data, meta_file):
529 """Save the metafile as a json file"""
530 with open(meta_file, "w") as f:
531 json.dump(data, f, indent=4, sort_keys=True, cls=PESummaryJsonEncoder)
533 @staticmethod
534 def _seperate_dictionary_for_external_links(
535 data, labels, sub_file_name="_{label}.h5"
536 ):
537 """ """
538 _data = copy.deepcopy(data)
539 sub_file_data = {
540 label: {
541 label: _data[label],
542 "version": _data["version"],
543 "history": _data["history"],
544 }
545 for label in labels
546 }
547 meta_file_data = {key: item for key, item in _data.items() if key not in labels}
548 for label in labels:
549 meta_file_data[label] = "external:{}|{}".format(
550 sub_file_name.format(label=label), label
551 )
552 return meta_file_data, sub_file_data
554 @staticmethod
555 def convert_posterior_samples_to_numpy(labels, samples, mcmc_samples=False):
556 """Convert a dictionary of multiple posterior samples from a
557 column-major dictionary to a row-major numpy array
559 Parameters
560 ----------
561 labels: list
562 list of unique labels for each analysis
563 samples: MultiAnalysisSamplesDict
564 dictionary of multiple posterior samples to convert to a numpy
565 array.
566 mcmc_samples: Bool, optional
567 if True, the dictionary contains seperate mcmc chains
569 Examples
570 --------
571 >>> dictionary = MultiAnalysisSamplesDict(
572 ... {"label": {"mass_1": [1,2,3], "mass_2": [1,2,3]}}
573 ... )
574 >>> dictionary = _Metafile.convert_posterior_samples_to_numpy(
575 ... dictionary.keys(), dictionary
576 ... )
577 >>> print(dictionary)
578 ... {"label": rec.array([(1., 1.), (2., 2.), (3., 3.)],
579 ... dtype=[('mass_1', '<f4'), ('mass_2', '<f4')])}
580 """
581 converted_samples = {
582 label: _MetaFile._convert_posterior_samples_to_numpy(
583 samples[label], mcmc_samples=mcmc_samples
584 )
585 for label in labels
586 }
587 return converted_samples
589 @staticmethod
590 def save_to_hdf5(
591 data,
592 labels,
593 samples,
594 meta_file,
595 no_convert=False,
596 extra_keys=DEFAULT_HDF5_KEYS,
597 mcmc_samples=False,
598 external_hdf5_links=False,
599 compression=None,
600 _class=None,
601 ):
602 """Save the metafile as a hdf5 file"""
603 import h5py
605 if _class is None:
606 _class = _MetaFile
607 if mcmc_samples:
608 key = "mcmc_chains"
609 else:
610 key = "posterior_samples"
611 if not no_convert:
612 _samples = _class.convert_posterior_samples_to_numpy(
613 labels, samples, mcmc_samples=mcmc_samples
614 )
615 for label in labels:
616 data[label][key] = _samples[label]
617 if "injection_data" in data[label].keys():
618 data[label]["injection_data"] = (
619 _class._convert_posterior_samples_to_numpy(
620 SamplesDict(
621 {
622 param: samp
623 for param, samp in zip(
624 data[label]["injection_data"]["parameters"],
625 data[label]["injection_data"]["samples"],
626 )
627 }
628 ),
629 index=[0],
630 )
631 )
632 if external_hdf5_links:
633 from pathlib import Path
635 _dir = Path(meta_file).parent
636 name = "_{label}.h5"
637 _subfile = os.path.join(_dir, name)
638 meta_file_data, sub_file_data = (
639 _MetaFile._seperate_dictionary_for_external_links(
640 data, labels, sub_file_name=name
641 )
642 )
643 for label in labels:
644 with h5py.File(_subfile.format(label=label), "w") as f:
645 recursively_save_dictionary_to_hdf5_file(
646 f,
647 sub_file_data[label],
648 extra_keys=extra_keys + [label],
649 compression=compression,
650 )
651 with h5py.File(meta_file, "w") as f:
652 recursively_save_dictionary_to_hdf5_file(
653 f, meta_file_data, extra_keys=extra_keys, compression=compression
654 )
655 else:
656 with h5py.File(meta_file, "w") as f:
657 recursively_save_dictionary_to_hdf5_file(
658 f, data, extra_keys=extra_keys + labels, compression=compression
659 )
661 def save_to_dat(self):
662 """Save the samples to a .dat file"""
664 def _save(parameters, samples, label):
665 """Helper function to save the parameters and samples to file"""
666 self.write_to_dat(
667 os.path.join(self.outdir, "{}_pesummary.dat".format(label)),
668 samples.T,
669 header=parameters,
670 )
672 if self.mcmc_samples:
673 for label in self.labels:
674 parameters = list(self.samples[label].keys())
675 for chain in self.samples[label][parameters[0]].keys():
676 samples = np.array(
677 [self.samples[label][i][chain] for i in parameters]
678 )
679 _save(parameters, samples, chain)
680 return
681 for label in self.labels:
682 parameters = self.samples[label].keys()
683 samples = np.array([self.samples[label][i] for i in parameters])
684 _save(parameters, samples, label)
686 def add_existing_data(self):
687 """ """
688 from pesummary.utils.utils import _add_existing_data
690 self = _add_existing_data(self)
693class MetaFile(object):
694 """This class handles the creation of a metafile storing all information
695 from the analysis
696 """
698 def __init__(self, inputs, history=None):
699 from pesummary.utils.utils import logger
701 logger.info("Starting to generate the meta file")
702 meta_file = _MetaFile(
703 inputs.samples,
704 inputs.labels,
705 inputs.config,
706 inputs.injection_data,
707 inputs.file_version,
708 inputs.file_kwargs,
709 hdf5=inputs.hdf5,
710 webdir=inputs.webdir,
711 result_files=inputs.result_files,
712 existing_version=inputs.existing_file_version,
713 existing_label=inputs.existing_labels,
714 priors=inputs.priors,
715 existing_samples=inputs.existing_samples,
716 existing_injection=inputs.existing_injection_data,
717 existing_metadata=inputs.existing_file_kwargs,
718 existing_config=inputs.existing_config,
719 existing=inputs.existing,
720 existing_priors=inputs.existing_priors,
721 existing_metafile=inputs.existing_metafile,
722 package_information=inputs.package_information,
723 mcmc_samples=inputs.mcmc_samples,
724 filename=inputs.filename,
725 external_hdf5_links=inputs.external_hdf5_links,
726 hdf5_compression=inputs.hdf5_compression,
727 history=history,
728 descriptions=inputs.descriptions,
729 )
730 meta_file.make_dictionary()
731 if not inputs.hdf5:
732 meta_file.save_to_json(meta_file.data, meta_file.meta_file)
733 else:
734 meta_file.save_to_hdf5(
735 meta_file.data,
736 meta_file.labels,
737 meta_file.samples,
738 meta_file.meta_file,
739 mcmc_samples=meta_file.mcmc_samples,
740 external_hdf5_links=meta_file.external_hdf5_links,
741 compression=meta_file.hdf5_compression,
742 )
743 meta_file.save_to_dat()
744 meta_file.write_marginalized_posterior_to_dat()
745 logger.info(
746 "Finishing generating the meta file. The meta file can be viewed "
747 "here: {}".format(meta_file.meta_file)
748 )