Coverage for pesummary/core/file/meta_file.py: 83.9%
367 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-05 13:38 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-05 13:38 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import inspect
4import os
5import numpy as np
6import json
7import copy
8from getpass import getuser
9import pandas as pd
10import pesummary
11from pesummary import __version__
12from pesummary.utils.dict import Dict
13from pesummary.utils.samples_dict import SamplesDict
14from pesummary.utils.utils import make_dir, logger
15from pesummary.utils.decorators import open_config
16from pesummary import conf
18__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
19DEFAULT_HDF5_KEYS = ["version", "history"]
22def recursively_save_dictionary_to_hdf5_file(
23 f, dictionary, current_path=None, extra_keys=DEFAULT_HDF5_KEYS,
24 compression=None
25):
26 """Recursively save a dictionary to a hdf5 file
28 Parameters
29 ----------
30 f: h5py._hl.files.File
31 the open hdf5 file that you would like the data to be saved to
32 dictionary: dict
33 dictionary of data
34 current_path: optional, str
35 string to indicate the level to save the data in the hdf5 file
36 compression: int, optional
37 optional filter to apply for compression. If you do not want to
38 apply compression, compression = None. Default None.
39 """
40 def _safe_create_hdf5_group(hdf5_file, key):
41 if key not in hdf5_file.keys():
42 hdf5_file.create_group(key)
43 for key in extra_keys:
44 if key in dictionary:
45 _safe_create_hdf5_group(hdf5_file=f, key=key)
46 if current_path is None:
47 current_path = []
49 for k, v in dictionary.items():
50 if isinstance(v, pd.DataFrame):
51 v = v.to_dict(orient="list")
52 if isinstance(v, dict):
53 if k not in f["/" + "/".join(current_path)].keys():
54 f["/".join(current_path)].create_group(k)
55 path = current_path + [k]
56 recursively_save_dictionary_to_hdf5_file(
57 f, v, path, extra_keys=extra_keys, compression=compression
58 )
59 else:
60 if isinstance(dictionary, Dict):
61 attrs = dictionary.extra_kwargs
62 else:
63 attrs = {}
64 create_hdf5_dataset(
65 key=k, value=v, hdf5_file=f, current_path=current_path,
66 compression=compression, attrs=attrs
67 )
70def create_hdf5_dataset(
71 key, value, hdf5_file, current_path, compression=None, attrs={}
72):
73 """
74 Create a hdf5 dataset in place
76 Parameters
77 ----------
78 key: str
79 Key for the new dataset
80 value: array-like, str
81 Data to store. If you wish to create a softlink to another dataset
82 then value should be a string in the form `softlink:/path/to/dataset`
83 where `/path/to/dataset/` is the path to the dataset which you wish to
84 create a softlink to
85 hdf5_file: h5py.File
86 hdf5 file object
87 current_path: str
88 Current string withing the hdf5 file
89 compression: int, optional
90 optional filter to apply for compression. If you do not want to
91 apply compression, compression = None. Default None.
92 attrs: dict, optional
93 optional list of attributes to store alongside the dataset
94 """
95 error_message = "Cannot process {}={} from list with type {} for hdf5"
96 array_types = (list, pesummary.utils.samples_dict.Array, np.ndarray)
97 numeric_types = (float, int, np.number)
98 string_types = (str, bytes)
99 SOFTLINK = False
101 if isinstance(value, array_types):
102 import math
104 if not len(value):
105 data = np.array([])
106 elif isinstance(value[0], string_types):
107 data = np.array(value, dtype="S")
108 elif isinstance(value[0], array_types):
109 data = np.array(np.vstack(value))
110 elif isinstance(value[0], (tuple, np.record, np.recarray)):
111 data = value
112 elif all(isinstance(_value, (bool, np.bool_)) for _value in value):
113 data = np.array([str(_value) for _value in value], dtype="S")
114 elif all(_value is None for _value in value):
115 data = np.array(["None"] * len(value), dtype="S")
116 elif isinstance(value[0], np.void) and value.dtype.names:
117 data = value
118 elif math.isnan(value[0]):
119 data = np.array(["NaN"] * len(value), dtype="S")
120 elif isinstance(value[0], numeric_types):
121 data = np.array(value)
122 else:
123 raise TypeError(error_message.format(key, value, type(value[0])))
124 elif isinstance(value, string_types[0]) and "softlink:" in value:
125 import h5py
127 SOFTLINK = True
128 hdf5_file["/".join(current_path + [key])] = h5py.SoftLink(
129 value.split("softlink:")[1]
130 )
131 elif isinstance(value, string_types[0]) and "external:" in value:
132 import h5py
134 SOFTLINK = True
135 substring = value.split("external:")[1]
136 _file, _path = substring.split("|")
137 hdf5_file["/".join(current_path + [key])] = h5py.ExternalLink(
138 _file, _path
139 )
140 elif isinstance(value, string_types):
141 data = np.array([value], dtype="S")
142 elif isinstance(value, numeric_types):
143 data = np.array([value])
144 elif isinstance(value, (bool, np.bool_)):
145 data = np.array([str(value)], dtype="S")
146 elif isinstance(value, complex):
147 key += "_amp"
148 data = np.array(np.abs(value))
149 elif isinstance(value, np.random.Generator):
150 state = value.bit_generator.state
151 data = np.array([
152 f"state:{str(state['state']['state'])}",
153 f"inc:{str(state['state']['inc'])}"
154 ], dtype="S")
155 elif value == {}:
156 data = np.array(np.array("NaN"))
157 elif inspect.isclass(value) or inspect.isfunction(value):
158 data = np.array([value.__module__ + value.__name__], dtype="S")
159 elif inspect.ismodule(value):
160 data = np.array([value.__name__], dtype="S")
161 elif value is None:
162 data = np.array(["None"], dtype="S")
163 else:
164 raise TypeError(error_message.format(key, value, type(value)))
165 if not SOFTLINK:
166 if compression is not None and len(data) > conf.compression_min_length:
167 kwargs = {"compression": "gzip", "compression_opts": compression}
168 else:
169 kwargs = {}
170 try:
171 dset = hdf5_file["/".join(current_path)].create_dataset(
172 key, data=data, **kwargs
173 )
174 except ValueError:
175 dset = hdf5_file.create_dataset(key, data=data, **kwargs)
176 if len(attrs):
177 dset.attrs.update(attrs)
180class PESummaryJsonEncoder(json.JSONEncoder):
181 """Personalised JSON encoder for PESummary
182 """
183 def default(self, obj):
184 """Return a json serializable object for 'obj'
186 Parameters
187 ----------
188 obj: object
189 object you wish to make json serializable
190 """
191 if isinstance(obj, np.ndarray):
192 return obj.tolist()
193 if inspect.isfunction(obj):
194 return str(obj)
195 if isinstance(obj, np.integer):
196 return int(obj)
197 elif isinstance(obj, np.floating):
198 return float(obj)
199 elif isinstance(obj, (bool, np.bool_)):
200 return str(obj)
201 elif isinstance(obj, bytes):
202 return str(obj)
203 elif isinstance(obj, type):
204 return str(obj)
205 return json.JSONEncoder.default(self, obj)
208class _MetaFile(object):
209 """This is a base class to handle the functions to generate a meta file
210 """
211 def __init__(
212 self, samples, labels, config, injection_data, file_versions,
213 file_kwargs, webdir=None, result_files=None, hdf5=False, priors={},
214 existing_version=None, existing_label=None, existing_samples=None,
215 existing_injection=None, existing_metadata=None, existing_config=None,
216 existing_priors={}, existing_metafile=None, outdir=None, existing=None,
217 package_information={}, mcmc_samples=False, filename=None,
218 external_hdf5_links=False, hdf5_compression=None, history=None,
219 descriptions=None
220 ):
221 self.data = {}
222 self.webdir = webdir
223 self.result_files = result_files
224 self.samples = samples
225 self.labels = labels
226 self.config = config
227 self.injection_data = injection_data
228 self.file_versions = file_versions
229 self.file_kwargs = file_kwargs
230 self.hdf5 = hdf5
231 self.file_name = filename
232 self.external_hdf5_links = external_hdf5_links
233 self.hdf5_compression = hdf5_compression
234 self.history = history
235 self.descriptions = descriptions
236 if self.history is None:
237 from pesummary.utils.utils import history_dictionary
239 try:
240 _user = getuser()
241 except (ImportError, KeyError):
242 _user = ''
243 self.history = history_dictionary(creator=_user)
244 if self.descriptions is None:
245 self.descriptions = {
246 label: "No description found" for label in self.labels
247 }
248 elif not all(label in self.descriptions.keys() for label in self.labels):
249 for label in self.labels:
250 if label not in self.descriptions.keys():
251 self.descriptions[label] = "No description found"
252 self.priors = priors
253 self.existing_version = existing_version
254 self.existing_labels = existing_label
255 self.existing_samples = existing_samples
256 self.existing_injection = existing_injection
257 self.existing_file_kwargs = existing_metadata
258 self.existing_config = existing_config
259 self.existing_priors = existing_priors
260 self.existing_metafile = existing_metafile
261 self.existing = existing
262 self.outdir = outdir
263 self.package_information = package_information
264 if not len(package_information):
265 from pesummary.core.cli.inputs import _Input
266 self.package_information = _Input.get_package_information()
267 self.mcmc_samples = mcmc_samples
269 if self.existing_labels is None:
270 self.existing_labels = [None]
271 if self.existing is not None:
272 self.add_existing_data()
274 @property
275 def outdir(self):
276 return self._outdir
278 @outdir.setter
279 def outdir(self, outdir):
280 if outdir is not None:
281 self._outdir = os.path.abspath(outdir)
282 elif self.webdir is not None:
283 self._outdir = os.path.join(self.webdir, "samples")
284 else:
285 raise Exception("Please provide an output directory for the data")
287 @property
288 def file_name(self):
289 return self._file_name
291 @file_name.setter
292 def file_name(self, file_name):
293 if file_name is not None:
294 self._file_name = file_name
295 else:
296 base = "posterior_samples.{}"
297 if self.hdf5:
298 self._file_name = base.format("h5")
299 else:
300 self._file_name = base.format("json")
302 @property
303 def meta_file(self):
304 return os.path.join(os.path.abspath(self.outdir), self.file_name)
306 def make_dictionary(self):
307 """Wrapper function for _make_dictionary
308 """
309 self._make_dictionary()
311 @property
312 def _dictionary_structure(self):
313 if self.mcmc_samples:
314 posterior = "mcmc_chains"
315 else:
316 posterior = "posterior_samples"
317 dictionary = {
318 label: {
319 posterior: {}, "injection_data": {}, "version": {},
320 "meta_data": {}, "priors": {}, "config_file": {}
321 } for label in self.labels
322 }
323 dictionary["version"] = self.package_information
324 dictionary["version"]["pesummary"] = [__version__]
325 dictionary["history"] = self.history
326 return dictionary
328 def _make_dictionary(self):
329 """Generate a single dictionary which stores all information
330 """
331 if self.mcmc_samples:
332 posterior = "mcmc_chains"
333 else:
334 posterior = "posterior_samples"
335 dictionary = self._dictionary_structure
336 if self.file_kwargs is not None and isinstance(self.file_kwargs, dict):
337 if "webpage_url" in self.file_kwargs.keys():
338 dictionary["history"]["webpage_url"] = self.file_kwargs["webpage_url"]
339 else:
340 dictionary["history"]["webpage_url"] = "None"
341 for num, label in enumerate(self.labels):
342 parameters = self.samples[label].keys()
343 samples = np.array([self.samples[label][i] for i in parameters]).T
344 dictionary[label][posterior] = {
345 "parameter_names": list(parameters), "samples": samples.tolist()
346 }
347 dictionary[label]["injection_data"] = {
348 "parameters": list(parameters),
349 "samples": [
350 self.injection_data[label][i] for i in parameters
351 ]
352 }
353 dictionary[label]["version"] = [self.file_versions[label]]
354 dictionary[label]["description"] = [self.descriptions[label]]
355 dictionary[label]["meta_data"] = self.file_kwargs[label]
356 if self.config != {} and self.config[num] is not None and \
357 not isinstance(self.config[num], dict):
358 config = self._grab_config_data_from_data_file(self.config[num])
359 dictionary[label]["config_file"] = config
360 elif self.config[num] is not None:
361 dictionary[label]["config_file"] = self.config[num]
362 for key in self.priors.keys():
363 if label in self.priors[key].keys():
364 dictionary[label]["priors"][key] = self.priors[key][label]
365 self.data = dictionary
367 @staticmethod
368 @open_config(index=0)
369 def _grab_config_data_from_data_file(file):
370 """Return the config data as a dictionary
372 Parameters
373 ----------
374 file: str
375 path to the configuration file
376 """
377 config = file
378 sections = config.sections()
379 data = {}
380 if config.error:
381 logger.info(
382 "Unable to open %s with configparser because %s. The data will "
383 "not be stored in the meta file" % (
384 config.path_to_file, config.error
385 )
386 )
387 if sections != []:
388 for i in sections:
389 data[i] = {}
390 for key in config["%s" % (i)]:
391 data[i][key] = config["%s" % (i)]["%s" % (key)]
392 return data
394 @staticmethod
395 def write_to_dat(file_name, samples, header=None):
396 """Write samples to a .dat file
398 Parameters
399 ----------
400 file_name: str
401 the name of the file that you wish to write the samples to
402 samples: np.ndarray
403 1d/2d array of samples to write to file
404 header: list, optional
405 List of strings to write at the beginning of the file
406 """
407 np.savetxt(
408 file_name, samples, delimiter=conf.delimiter,
409 header=conf.delimiter.join(header), comments=""
410 )
412 @staticmethod
413 def _convert_posterior_samples_to_numpy(
414 dictionary, mcmc_samples=False, index=None
415 ):
416 """Convert the posterior samples from a column-major dictionary
417 to a row-major numpy array
419 Parameters
420 ----------
421 dictionary: dict
422 dictionary of posterior samples to convert to a numpy array.
423 mcmc_samples: Bool, optional
424 if True, the dictionary contains seperate mcmc chains
426 Examples
427 --------
428 >>> dictionary = {"mass_1": [1,2,3], "mass_2": [1,2,3]}
429 >>> dictionry = _Metafile._convert_posterior_samples_to_numpy(
430 ... dictionary
431 ... )
432 >>> print(dictionary)
433 ... rec.array([(1., 1.), (2., 2.), (3., 3.)],
434 ... dtype=[('mass_1', '<f4'), ('mass_2', '<f4')])
435 """
436 samples = copy.deepcopy(dictionary)
437 if mcmc_samples:
438 parameters = list(samples.keys())
439 chains = samples[parameters[0]].keys()
440 data = {
441 key: SamplesDict({
442 param: samples[param][key] for param in parameters
443 }) for key in chains
444 }
445 return {
446 key: item.to_structured_array() for key, item in data.items()
447 }
448 return samples.to_structured_array(index=index)
450 @staticmethod
451 def _create_softlinks(dictionary):
452 """Identify duplicated entries in a dictionary and replace them with
453 `softlink:/path/to/existing_dataset`. This is required for creating
454 softlinks when saved in hdf5 format
456 Parameters
457 ----------
458 dictionary: dict
459 nested dictionary of data
460 """
461 try:
462 from pandas.io.json._normalize import nested_to_record
463 except ImportError:
464 from pandas.io.json.normalize import nested_to_record
466 def modify_dict(key, dictionary, replace):
467 """
468 """
469 from functools import reduce
470 from operator import getitem
472 mod = copy.deepcopy(dictionary)
473 key_list = key.split("/")
474 reduce(getitem, key_list[:-1], mod)[key_list[-1]] = replace
475 return mod
477 data = copy.deepcopy(dictionary)
478 flat_dictionary = nested_to_record(data, sep='/')
479 rev_dictionary = {}
480 for key, value in flat_dictionary.items():
481 try:
482 rev_dictionary.setdefault(value, set()).add(key)
483 except TypeError:
484 rev_dictionary.setdefault(str(value), set()).add(key)
486 for key, values in rev_dictionary.items():
487 if len(values) > 1:
488 tmp = list(values)
489 for val in tmp[1:]:
490 data = modify_dict(val, data, "softlink:/{}".format(tmp[0]))
491 return data
493 def write_marginalized_posterior_to_dat(self):
494 """Write the marginalized posterior for each parameter to a .dat file
495 """
496 if self.mcmc_samples:
497 return
498 for label in self.labels:
499 if not os.path.isdir(os.path.join(self.outdir, label)):
500 make_dir(os.path.join(self.outdir, label))
501 for param, samples in self.samples[label].items():
502 self.write_to_dat(
503 os.path.join(
504 self.outdir, label, "{}_{}.dat".format(label, param)
505 ), samples, header=[param]
506 )
508 @staticmethod
509 def save_to_json(data, meta_file):
510 """Save the metafile as a json file
511 """
512 with open(meta_file, "w") as f:
513 json.dump(
514 data, f, indent=4, sort_keys=True,
515 cls=PESummaryJsonEncoder
516 )
518 @staticmethod
519 def _seperate_dictionary_for_external_links(
520 data, labels, sub_file_name="_{label}.h5"
521 ):
522 """
523 """
524 _data = copy.deepcopy(data)
525 sub_file_data = {
526 label: {
527 label: _data[label], "version": _data["version"],
528 "history": _data["history"]
529 } for label in labels
530 }
531 meta_file_data = {
532 key: item for key, item in _data.items() if key not in labels
533 }
534 for label in labels:
535 meta_file_data[label] = "external:{}|{}".format(
536 sub_file_name.format(label=label), label
537 )
538 return meta_file_data, sub_file_data
540 @staticmethod
541 def convert_posterior_samples_to_numpy(labels, samples, mcmc_samples=False):
542 """Convert a dictionary of multiple posterior samples from a
543 column-major dictionary to a row-major numpy array
545 Parameters
546 ----------
547 labels: list
548 list of unique labels for each analysis
549 samples: MultiAnalysisSamplesDict
550 dictionary of multiple posterior samples to convert to a numpy
551 array.
552 mcmc_samples: Bool, optional
553 if True, the dictionary contains seperate mcmc chains
555 Examples
556 --------
557 >>> dictionary = MultiAnalysisSamplesDict(
558 ... {"label": {"mass_1": [1,2,3], "mass_2": [1,2,3]}}
559 ... )
560 >>> dictionary = _Metafile.convert_posterior_samples_to_numpy(
561 ... dictionary.keys(), dictionary
562 ... )
563 >>> print(dictionary)
564 ... {"label": rec.array([(1., 1.), (2., 2.), (3., 3.)],
565 ... dtype=[('mass_1', '<f4'), ('mass_2', '<f4')])}
566 """
567 converted_samples = {
568 label: _MetaFile._convert_posterior_samples_to_numpy(
569 samples[label], mcmc_samples=mcmc_samples
570 ) for label in labels
571 }
572 return converted_samples
574 @staticmethod
575 def save_to_hdf5(
576 data, labels, samples, meta_file, no_convert=False,
577 extra_keys=DEFAULT_HDF5_KEYS, mcmc_samples=False,
578 external_hdf5_links=False, compression=None, _class=None
579 ):
580 """Save the metafile as a hdf5 file
581 """
582 import h5py
584 if _class is None:
585 _class = _MetaFile
586 if mcmc_samples:
587 key = "mcmc_chains"
588 else:
589 key = "posterior_samples"
590 if not no_convert:
591 _samples = _class.convert_posterior_samples_to_numpy(
592 labels, samples, mcmc_samples=mcmc_samples
593 )
594 for label in labels:
595 data[label][key] = _samples[label]
596 if "injection_data" in data[label].keys():
597 data[label]["injection_data"] = \
598 _class._convert_posterior_samples_to_numpy(
599 SamplesDict({
600 param: samp for param, samp in zip(
601 data[label]["injection_data"]["parameters"],
602 data[label]["injection_data"]["samples"]
603 )
604 }), index=[0]
605 )
606 if external_hdf5_links:
607 from pathlib import Path
609 _dir = Path(meta_file).parent
610 name = "_{label}.h5"
611 _subfile = os.path.join(_dir, name)
612 meta_file_data, sub_file_data = (
613 _MetaFile._seperate_dictionary_for_external_links(
614 data, labels, sub_file_name=name
615 )
616 )
617 for label in labels:
618 with h5py.File(_subfile.format(label=label), "w") as f:
619 recursively_save_dictionary_to_hdf5_file(
620 f, sub_file_data[label], extra_keys=extra_keys + [label],
621 compression=compression
622 )
623 with h5py.File(meta_file, "w") as f:
624 recursively_save_dictionary_to_hdf5_file(
625 f, meta_file_data, extra_keys=extra_keys,
626 compression=compression
627 )
628 else:
629 with h5py.File(meta_file, "w") as f:
630 recursively_save_dictionary_to_hdf5_file(
631 f, data, extra_keys=extra_keys + labels,
632 compression=compression
633 )
635 def save_to_dat(self):
636 """Save the samples to a .dat file
637 """
638 def _save(parameters, samples, label):
639 """Helper function to save the parameters and samples to file
640 """
641 self.write_to_dat(
642 os.path.join(self.outdir, "{}_pesummary.dat".format(label)),
643 samples.T, header=parameters
644 )
646 if self.mcmc_samples:
647 for label in self.labels:
648 parameters = list(self.samples[label].keys())
649 for chain in self.samples[label][parameters[0]].keys():
650 samples = np.array(
651 [self.samples[label][i][chain] for i in parameters]
652 )
653 _save(parameters, samples, chain)
654 return
655 for label in self.labels:
656 parameters = self.samples[label].keys()
657 samples = np.array([self.samples[label][i] for i in parameters])
658 _save(parameters, samples, label)
660 def add_existing_data(self):
661 """
662 """
663 from pesummary.utils.utils import _add_existing_data
665 self = _add_existing_data(self)
668class MetaFile(object):
669 """This class handles the creation of a metafile storing all information
670 from the analysis
671 """
672 def __init__(self, inputs, history=None):
673 from pesummary.utils.utils import logger
674 logger.info("Starting to generate the meta file")
675 meta_file = _MetaFile(
676 inputs.samples, inputs.labels, inputs.config,
677 inputs.injection_data, inputs.file_version, inputs.file_kwargs,
678 hdf5=inputs.hdf5, webdir=inputs.webdir, result_files=inputs.result_files,
679 existing_version=inputs.existing_file_version, existing_label=inputs.existing_labels,
680 priors=inputs.priors, existing_samples=inputs.existing_samples,
681 existing_injection=inputs.existing_injection_data,
682 existing_metadata=inputs.existing_file_kwargs,
683 existing_config=inputs.existing_config, existing=inputs.existing,
684 existing_priors=inputs.existing_priors,
685 existing_metafile=inputs.existing_metafile,
686 package_information=inputs.package_information,
687 mcmc_samples=inputs.mcmc_samples, filename=inputs.filename,
688 external_hdf5_links=inputs.external_hdf5_links,
689 hdf5_compression=inputs.hdf5_compression, history=history,
690 descriptions=inputs.descriptions
691 )
692 meta_file.make_dictionary()
693 if not inputs.hdf5:
694 meta_file.save_to_json(meta_file.data, meta_file.meta_file)
695 else:
696 meta_file.save_to_hdf5(
697 meta_file.data, meta_file.labels, meta_file.samples,
698 meta_file.meta_file, mcmc_samples=meta_file.mcmc_samples,
699 external_hdf5_links=meta_file.external_hdf5_links,
700 compression=meta_file.hdf5_compression
701 )
702 meta_file.save_to_dat()
703 meta_file.write_marginalized_posterior_to_dat()
704 logger.info(
705 "Finishing generating the meta file. The meta file can be viewed "
706 "here: {}".format(meta_file.meta_file)
707 )