Coverage for pesummary/core/file/meta_file.py: 84.6%
364 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import inspect
4import os
5import numpy as np
6import json
7import copy
8from getpass import getuser
9import pandas as pd
10import pesummary
11from pesummary import __version__
12from pesummary.utils.dict import Dict
13from pesummary.utils.samples_dict import SamplesDict
14from pesummary.utils.utils import make_dir, logger
15from pesummary.utils.decorators import open_config
16from pesummary import conf
18__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
19DEFAULT_HDF5_KEYS = ["version", "history"]
22def recursively_save_dictionary_to_hdf5_file(
23 f, dictionary, current_path=None, extra_keys=DEFAULT_HDF5_KEYS,
24 compression=None
25):
26 """Recursively save a dictionary to a hdf5 file
28 Parameters
29 ----------
30 f: h5py._hl.files.File
31 the open hdf5 file that you would like the data to be saved to
32 dictionary: dict
33 dictionary of data
34 current_path: optional, str
35 string to indicate the level to save the data in the hdf5 file
36 compression: int, optional
37 optional filter to apply for compression. If you do not want to
38 apply compression, compression = None. Default None.
39 """
40 def _safe_create_hdf5_group(hdf5_file, key):
41 if key not in hdf5_file.keys():
42 hdf5_file.create_group(key)
43 for key in extra_keys:
44 if key in dictionary:
45 _safe_create_hdf5_group(hdf5_file=f, key=key)
46 if current_path is None:
47 current_path = []
49 for k, v in dictionary.items():
50 if isinstance(v, pd.DataFrame):
51 v = v.to_dict(orient="list")
52 if isinstance(v, dict):
53 if k not in f["/" + "/".join(current_path)].keys():
54 f["/".join(current_path)].create_group(k)
55 path = current_path + [k]
56 recursively_save_dictionary_to_hdf5_file(
57 f, v, path, extra_keys=extra_keys, compression=compression
58 )
59 else:
60 if isinstance(dictionary, Dict):
61 attrs = dictionary.extra_kwargs
62 else:
63 attrs = {}
64 create_hdf5_dataset(
65 key=k, value=v, hdf5_file=f, current_path=current_path,
66 compression=compression, attrs=attrs
67 )
70def create_hdf5_dataset(
71 key, value, hdf5_file, current_path, compression=None, attrs={}
72):
73 """
74 Create a hdf5 dataset in place
76 Parameters
77 ----------
78 key: str
79 Key for the new dataset
80 value: array-like, str
81 Data to store. If you wish to create a softlink to another dataset
82 then value should be a string in the form `softlink:/path/to/dataset`
83 where `/path/to/dataset/` is the path to the dataset which you wish to
84 create a softlink to
85 hdf5_file: h5py.File
86 hdf5 file object
87 current_path: str
88 Current string withing the hdf5 file
89 compression: int, optional
90 optional filter to apply for compression. If you do not want to
91 apply compression, compression = None. Default None.
92 attrs: dict, optional
93 optional list of attributes to store alongside the dataset
94 """
95 error_message = "Cannot process {}={} from list with type {} for hdf5"
96 array_types = (list, pesummary.utils.samples_dict.Array, np.ndarray)
97 numeric_types = (float, int, np.number)
98 string_types = (str, bytes)
99 SOFTLINK = False
101 if isinstance(value, array_types):
102 import math
104 if not len(value):
105 data = np.array([])
106 elif isinstance(value[0], string_types):
107 data = np.array(value, dtype="S")
108 elif isinstance(value[0], array_types):
109 data = np.array(np.vstack(value))
110 elif isinstance(value[0], (tuple, np.record, np.recarray)):
111 data = value
112 elif all(isinstance(_value, (bool, np.bool_)) for _value in value):
113 data = np.array([str(_value) for _value in value], dtype="S")
114 elif all(_value is None for _value in value):
115 data = np.array(["None"] * len(value), dtype="S")
116 elif isinstance(value[0], np.void) and value.dtype.names:
117 data = value
118 elif math.isnan(value[0]):
119 data = np.array(["NaN"] * len(value), dtype="S")
120 elif isinstance(value[0], numeric_types):
121 data = np.array(value)
122 else:
123 raise TypeError(error_message.format(key, value, type(value[0])))
124 elif isinstance(value, string_types[0]) and "softlink:" in value:
125 import h5py
127 SOFTLINK = True
128 hdf5_file["/".join(current_path + [key])] = h5py.SoftLink(
129 value.split("softlink:")[1]
130 )
131 elif isinstance(value, string_types[0]) and "external:" in value:
132 import h5py
134 SOFTLINK = True
135 substring = value.split("external:")[1]
136 _file, _path = substring.split("|")
137 hdf5_file["/".join(current_path + [key])] = h5py.ExternalLink(
138 _file, _path
139 )
140 elif isinstance(value, string_types):
141 data = np.array([value], dtype="S")
142 elif isinstance(value, numeric_types):
143 data = np.array([value])
144 elif isinstance(value, (bool, np.bool_)):
145 data = np.array([str(value)], dtype="S")
146 elif isinstance(value, complex):
147 key += "_amp"
148 data = np.array(np.abs(value))
149 elif value == {}:
150 data = np.array(np.array("NaN"))
151 elif inspect.isclass(value) or inspect.isfunction(value):
152 data = np.array([value.__module__ + value.__name__], dtype="S")
153 elif inspect.ismodule(value):
154 data = np.array([value.__name__], dtype="S")
155 elif value is None:
156 data = np.array(["None"], dtype="S")
157 else:
158 raise TypeError(error_message.format(key, value, type(value)))
159 if not SOFTLINK:
160 if compression is not None and len(data) > conf.compression_min_length:
161 kwargs = {"compression": "gzip", "compression_opts": compression}
162 else:
163 kwargs = {}
164 try:
165 dset = hdf5_file["/".join(current_path)].create_dataset(
166 key, data=data, **kwargs
167 )
168 except ValueError:
169 dset = hdf5_file.create_dataset(key, data=data, **kwargs)
170 if len(attrs):
171 dset.attrs.update(attrs)
174class PESummaryJsonEncoder(json.JSONEncoder):
175 """Personalised JSON encoder for PESummary
176 """
177 def default(self, obj):
178 """Return a json serializable object for 'obj'
180 Parameters
181 ----------
182 obj: object
183 object you wish to make json serializable
184 """
185 if isinstance(obj, np.ndarray):
186 return obj.tolist()
187 if inspect.isfunction(obj):
188 return str(obj)
189 if isinstance(obj, np.integer):
190 return int(obj)
191 elif isinstance(obj, np.floating):
192 return float(obj)
193 elif isinstance(obj, (bool, np.bool_)):
194 return str(obj)
195 elif isinstance(obj, bytes):
196 return str(obj)
197 elif isinstance(obj, type):
198 return str(obj)
199 return json.JSONEncoder.default(self, obj)
202class _MetaFile(object):
203 """This is a base class to handle the functions to generate a meta file
204 """
205 def __init__(
206 self, samples, labels, config, injection_data, file_versions,
207 file_kwargs, webdir=None, result_files=None, hdf5=False, priors={},
208 existing_version=None, existing_label=None, existing_samples=None,
209 existing_injection=None, existing_metadata=None, existing_config=None,
210 existing_priors={}, existing_metafile=None, outdir=None, existing=None,
211 package_information={}, mcmc_samples=False, filename=None,
212 external_hdf5_links=False, hdf5_compression=None, history=None,
213 descriptions=None
214 ):
215 self.data = {}
216 self.webdir = webdir
217 self.result_files = result_files
218 self.samples = samples
219 self.labels = labels
220 self.config = config
221 self.injection_data = injection_data
222 self.file_versions = file_versions
223 self.file_kwargs = file_kwargs
224 self.hdf5 = hdf5
225 self.file_name = filename
226 self.external_hdf5_links = external_hdf5_links
227 self.hdf5_compression = hdf5_compression
228 self.history = history
229 self.descriptions = descriptions
230 if self.history is None:
231 from pesummary.utils.utils import history_dictionary
233 try:
234 _user = getuser()
235 except (ImportError, KeyError):
236 _user = ''
237 self.history = history_dictionary(creator=_user)
238 if self.descriptions is None:
239 self.descriptions = {
240 label: "No description found" for label in self.labels
241 }
242 elif not all(label in self.descriptions.keys() for label in self.labels):
243 for label in self.labels:
244 if label not in self.descriptions.keys():
245 self.descriptions[label] = "No description found"
246 self.priors = priors
247 self.existing_version = existing_version
248 self.existing_labels = existing_label
249 self.existing_samples = existing_samples
250 self.existing_injection = existing_injection
251 self.existing_file_kwargs = existing_metadata
252 self.existing_config = existing_config
253 self.existing_priors = existing_priors
254 self.existing_metafile = existing_metafile
255 self.existing = existing
256 self.outdir = outdir
257 self.package_information = package_information
258 if not len(package_information):
259 from pesummary.core.cli.inputs import _Input
260 self.package_information = _Input.get_package_information()
261 self.mcmc_samples = mcmc_samples
263 if self.existing_labels is None:
264 self.existing_labels = [None]
265 if self.existing is not None:
266 self.add_existing_data()
268 @property
269 def outdir(self):
270 return self._outdir
272 @outdir.setter
273 def outdir(self, outdir):
274 if outdir is not None:
275 self._outdir = os.path.abspath(outdir)
276 elif self.webdir is not None:
277 self._outdir = os.path.join(self.webdir, "samples")
278 else:
279 raise Exception("Please provide an output directory for the data")
281 @property
282 def file_name(self):
283 return self._file_name
285 @file_name.setter
286 def file_name(self, file_name):
287 if file_name is not None:
288 self._file_name = file_name
289 else:
290 base = "posterior_samples.{}"
291 if self.hdf5:
292 self._file_name = base.format("h5")
293 else:
294 self._file_name = base.format("json")
296 @property
297 def meta_file(self):
298 return os.path.join(os.path.abspath(self.outdir), self.file_name)
300 def make_dictionary(self):
301 """Wrapper function for _make_dictionary
302 """
303 self._make_dictionary()
305 @property
306 def _dictionary_structure(self):
307 if self.mcmc_samples:
308 posterior = "mcmc_chains"
309 else:
310 posterior = "posterior_samples"
311 dictionary = {
312 label: {
313 posterior: {}, "injection_data": {}, "version": {},
314 "meta_data": {}, "priors": {}, "config_file": {}
315 } for label in self.labels
316 }
317 dictionary["version"] = self.package_information
318 dictionary["version"]["pesummary"] = [__version__]
319 dictionary["history"] = self.history
320 return dictionary
322 def _make_dictionary(self):
323 """Generate a single dictionary which stores all information
324 """
325 if self.mcmc_samples:
326 posterior = "mcmc_chains"
327 else:
328 posterior = "posterior_samples"
329 dictionary = self._dictionary_structure
330 if self.file_kwargs is not None and isinstance(self.file_kwargs, dict):
331 if "webpage_url" in self.file_kwargs.keys():
332 dictionary["history"]["webpage_url"] = self.file_kwargs["webpage_url"]
333 else:
334 dictionary["history"]["webpage_url"] = "None"
335 for num, label in enumerate(self.labels):
336 parameters = self.samples[label].keys()
337 samples = np.array([self.samples[label][i] for i in parameters]).T
338 dictionary[label][posterior] = {
339 "parameter_names": list(parameters), "samples": samples.tolist()
340 }
341 dictionary[label]["injection_data"] = {
342 "parameters": list(parameters),
343 "samples": [
344 self.injection_data[label][i] for i in parameters
345 ]
346 }
347 dictionary[label]["version"] = [self.file_versions[label]]
348 dictionary[label]["description"] = [self.descriptions[label]]
349 dictionary[label]["meta_data"] = self.file_kwargs[label]
350 if self.config != {} and self.config[num] is not None and \
351 not isinstance(self.config[num], dict):
352 config = self._grab_config_data_from_data_file(self.config[num])
353 dictionary[label]["config_file"] = config
354 elif self.config[num] is not None:
355 dictionary[label]["config_file"] = self.config[num]
356 for key in self.priors.keys():
357 if label in self.priors[key].keys():
358 dictionary[label]["priors"][key] = self.priors[key][label]
359 self.data = dictionary
361 @staticmethod
362 @open_config(index=0)
363 def _grab_config_data_from_data_file(file):
364 """Return the config data as a dictionary
366 Parameters
367 ----------
368 file: str
369 path to the configuration file
370 """
371 config = file
372 sections = config.sections()
373 data = {}
374 if config.error:
375 logger.info(
376 "Unable to open %s with configparser because %s. The data will "
377 "not be stored in the meta file" % (
378 config.path_to_file, config.error
379 )
380 )
381 if sections != []:
382 for i in sections:
383 data[i] = {}
384 for key in config["%s" % (i)]:
385 data[i][key] = config["%s" % (i)]["%s" % (key)]
386 return data
388 @staticmethod
389 def write_to_dat(file_name, samples, header=None):
390 """Write samples to a .dat file
392 Parameters
393 ----------
394 file_name: str
395 the name of the file that you wish to write the samples to
396 samples: np.ndarray
397 1d/2d array of samples to write to file
398 header: list, optional
399 List of strings to write at the beginning of the file
400 """
401 np.savetxt(
402 file_name, samples, delimiter=conf.delimiter,
403 header=conf.delimiter.join(header), comments=""
404 )
406 @staticmethod
407 def _convert_posterior_samples_to_numpy(
408 dictionary, mcmc_samples=False, index=None
409 ):
410 """Convert the posterior samples from a column-major dictionary
411 to a row-major numpy array
413 Parameters
414 ----------
415 dictionary: dict
416 dictionary of posterior samples to convert to a numpy array.
417 mcmc_samples: Bool, optional
418 if True, the dictionary contains seperate mcmc chains
420 Examples
421 --------
422 >>> dictionary = {"mass_1": [1,2,3], "mass_2": [1,2,3]}
423 >>> dictionry = _Metafile._convert_posterior_samples_to_numpy(
424 ... dictionary
425 ... )
426 >>> print(dictionary)
427 ... rec.array([(1., 1.), (2., 2.), (3., 3.)],
428 ... dtype=[('mass_1', '<f4'), ('mass_2', '<f4')])
429 """
430 samples = copy.deepcopy(dictionary)
431 if mcmc_samples:
432 parameters = list(samples.keys())
433 chains = samples[parameters[0]].keys()
434 data = {
435 key: SamplesDict({
436 param: samples[param][key] for param in parameters
437 }) for key in chains
438 }
439 return {
440 key: item.to_structured_array() for key, item in data.items()
441 }
442 return samples.to_structured_array(index=index)
444 @staticmethod
445 def _create_softlinks(dictionary):
446 """Identify duplicated entries in a dictionary and replace them with
447 `softlink:/path/to/existing_dataset`. This is required for creating
448 softlinks when saved in hdf5 format
450 Parameters
451 ----------
452 dictionary: dict
453 nested dictionary of data
454 """
455 try:
456 from pandas.io.json._normalize import nested_to_record
457 except ImportError:
458 from pandas.io.json.normalize import nested_to_record
460 def modify_dict(key, dictionary, replace):
461 """
462 """
463 from functools import reduce
464 from operator import getitem
466 mod = copy.deepcopy(dictionary)
467 key_list = key.split("/")
468 reduce(getitem, key_list[:-1], mod)[key_list[-1]] = replace
469 return mod
471 data = copy.deepcopy(dictionary)
472 flat_dictionary = nested_to_record(data, sep='/')
473 rev_dictionary = {}
474 for key, value in flat_dictionary.items():
475 try:
476 rev_dictionary.setdefault(value, set()).add(key)
477 except TypeError:
478 rev_dictionary.setdefault(str(value), set()).add(key)
480 for key, values in rev_dictionary.items():
481 if len(values) > 1:
482 tmp = list(values)
483 for val in tmp[1:]:
484 data = modify_dict(val, data, "softlink:/{}".format(tmp[0]))
485 return data
487 def write_marginalized_posterior_to_dat(self):
488 """Write the marginalized posterior for each parameter to a .dat file
489 """
490 if self.mcmc_samples:
491 return
492 for label in self.labels:
493 if not os.path.isdir(os.path.join(self.outdir, label)):
494 make_dir(os.path.join(self.outdir, label))
495 for param, samples in self.samples[label].items():
496 self.write_to_dat(
497 os.path.join(
498 self.outdir, label, "{}_{}.dat".format(label, param)
499 ), samples, header=[param]
500 )
502 @staticmethod
503 def save_to_json(data, meta_file):
504 """Save the metafile as a json file
505 """
506 with open(meta_file, "w") as f:
507 json.dump(
508 data, f, indent=4, sort_keys=True,
509 cls=PESummaryJsonEncoder
510 )
512 @staticmethod
513 def _seperate_dictionary_for_external_links(
514 data, labels, sub_file_name="_{label}.h5"
515 ):
516 """
517 """
518 _data = copy.deepcopy(data)
519 sub_file_data = {
520 label: {
521 label: _data[label], "version": _data["version"],
522 "history": _data["history"]
523 } for label in labels
524 }
525 meta_file_data = {
526 key: item for key, item in _data.items() if key not in labels
527 }
528 for label in labels:
529 meta_file_data[label] = "external:{}|{}".format(
530 sub_file_name.format(label=label), label
531 )
532 return meta_file_data, sub_file_data
534 @staticmethod
535 def convert_posterior_samples_to_numpy(labels, samples, mcmc_samples=False):
536 """Convert a dictionary of multiple posterior samples from a
537 column-major dictionary to a row-major numpy array
539 Parameters
540 ----------
541 labels: list
542 list of unique labels for each analysis
543 samples: MultiAnalysisSamplesDict
544 dictionary of multiple posterior samples to convert to a numpy
545 array.
546 mcmc_samples: Bool, optional
547 if True, the dictionary contains seperate mcmc chains
549 Examples
550 --------
551 >>> dictionary = MultiAnalysisSamplesDict(
552 ... {"label": {"mass_1": [1,2,3], "mass_2": [1,2,3]}}
553 ... )
554 >>> dictionary = _Metafile.convert_posterior_samples_to_numpy(
555 ... dictionary.keys(), dictionary
556 ... )
557 >>> print(dictionary)
558 ... {"label": rec.array([(1., 1.), (2., 2.), (3., 3.)],
559 ... dtype=[('mass_1', '<f4'), ('mass_2', '<f4')])}
560 """
561 converted_samples = {
562 label: _MetaFile._convert_posterior_samples_to_numpy(
563 samples[label], mcmc_samples=mcmc_samples
564 ) for label in labels
565 }
566 return converted_samples
568 @staticmethod
569 def save_to_hdf5(
570 data, labels, samples, meta_file, no_convert=False,
571 extra_keys=DEFAULT_HDF5_KEYS, mcmc_samples=False,
572 external_hdf5_links=False, compression=None, _class=None
573 ):
574 """Save the metafile as a hdf5 file
575 """
576 import h5py
578 if _class is None:
579 _class = _MetaFile
580 if mcmc_samples:
581 key = "mcmc_chains"
582 else:
583 key = "posterior_samples"
584 if not no_convert:
585 _samples = _class.convert_posterior_samples_to_numpy(
586 labels, samples, mcmc_samples=mcmc_samples
587 )
588 for label in labels:
589 data[label][key] = _samples[label]
590 if "injection_data" in data[label].keys():
591 data[label]["injection_data"] = \
592 _class._convert_posterior_samples_to_numpy(
593 SamplesDict({
594 param: samp for param, samp in zip(
595 data[label]["injection_data"]["parameters"],
596 data[label]["injection_data"]["samples"]
597 )
598 }), index=[0]
599 )
600 if external_hdf5_links:
601 from pathlib import Path
603 _dir = Path(meta_file).parent
604 name = "_{label}.h5"
605 _subfile = os.path.join(_dir, name)
606 meta_file_data, sub_file_data = (
607 _MetaFile._seperate_dictionary_for_external_links(
608 data, labels, sub_file_name=name
609 )
610 )
611 for label in labels:
612 with h5py.File(_subfile.format(label=label), "w") as f:
613 recursively_save_dictionary_to_hdf5_file(
614 f, sub_file_data[label], extra_keys=extra_keys + [label],
615 compression=compression
616 )
617 with h5py.File(meta_file, "w") as f:
618 recursively_save_dictionary_to_hdf5_file(
619 f, meta_file_data, extra_keys=extra_keys,
620 compression=compression
621 )
622 else:
623 with h5py.File(meta_file, "w") as f:
624 recursively_save_dictionary_to_hdf5_file(
625 f, data, extra_keys=extra_keys + labels,
626 compression=compression
627 )
629 def save_to_dat(self):
630 """Save the samples to a .dat file
631 """
632 def _save(parameters, samples, label):
633 """Helper function to save the parameters and samples to file
634 """
635 self.write_to_dat(
636 os.path.join(self.outdir, "{}_pesummary.dat".format(label)),
637 samples.T, header=parameters
638 )
640 if self.mcmc_samples:
641 for label in self.labels:
642 parameters = list(self.samples[label].keys())
643 for chain in self.samples[label][parameters[0]].keys():
644 samples = np.array(
645 [self.samples[label][i][chain] for i in parameters]
646 )
647 _save(parameters, samples, chain)
648 return
649 for label in self.labels:
650 parameters = self.samples[label].keys()
651 samples = np.array([self.samples[label][i] for i in parameters])
652 _save(parameters, samples, label)
654 def add_existing_data(self):
655 """
656 """
657 from pesummary.utils.utils import _add_existing_data
659 self = _add_existing_data(self)
662class MetaFile(object):
663 """This class handles the creation of a metafile storing all information
664 from the analysis
665 """
666 def __init__(self, inputs, history=None):
667 from pesummary.utils.utils import logger
668 logger.info("Starting to generate the meta file")
669 meta_file = _MetaFile(
670 inputs.samples, inputs.labels, inputs.config,
671 inputs.injection_data, inputs.file_version, inputs.file_kwargs,
672 hdf5=inputs.hdf5, webdir=inputs.webdir, result_files=inputs.result_files,
673 existing_version=inputs.existing_file_version, existing_label=inputs.existing_labels,
674 priors=inputs.priors, existing_samples=inputs.existing_samples,
675 existing_injection=inputs.existing_injection_data,
676 existing_metadata=inputs.existing_file_kwargs,
677 existing_config=inputs.existing_config, existing=inputs.existing,
678 existing_priors=inputs.existing_priors,
679 existing_metafile=inputs.existing_metafile,
680 package_information=inputs.package_information,
681 mcmc_samples=inputs.mcmc_samples, filename=inputs.filename,
682 external_hdf5_links=inputs.external_hdf5_links,
683 hdf5_compression=inputs.hdf5_compression, history=history,
684 descriptions=inputs.descriptions
685 )
686 meta_file.make_dictionary()
687 if not inputs.hdf5:
688 meta_file.save_to_json(meta_file.data, meta_file.meta_file)
689 else:
690 meta_file.save_to_hdf5(
691 meta_file.data, meta_file.labels, meta_file.samples,
692 meta_file.meta_file, mcmc_samples=meta_file.mcmc_samples,
693 external_hdf5_links=meta_file.external_hdf5_links,
694 compression=meta_file.hdf5_compression
695 )
696 meta_file.save_to_dat()
697 meta_file.write_marginalized_posterior_to_dat()
698 logger.info(
699 "Finishing generating the meta file. The meta file can be viewed "
700 "here: {}".format(meta_file.meta_file)
701 )