Coverage for pesummary/tests/base.py: 71.9%
306 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-05 13:38 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-05 13:38 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import os
4import numpy as np
5from pathlib import Path
6from pesummary.gw.cli.inputs import WebpagePlusPlottingPlusMetaFileInput
7from pesummary.core.cli.inputs import (
8 WebpagePlusPlottingPlusMetaFileInput as Input
9)
11__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
14class Namespace(object):
15 def __init__(self, **kwargs):
16 self.__dict__.update(kwargs)
19def namespace(args):
20 """Generate a namespace for testing purposes
22 Parameters
23 ----------
24 args: dict
25 dictionary of arguments
26 """
27 base = Namespace()
28 for key in list(args.keys()):
29 setattr(base, key, args[key])
30 return base
33def gw_parameters():
34 parameters = [
35 'mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2', 'phi_jl',
36 'phi_12', 'psi', 'theta_jn', 'ra', 'dec', 'luminosity_distance',
37 'geocent_time', 'log_likelihood', 'mass_ratio', 'total_mass',
38 'chirp_mass', 'symmetric_mass_ratio', 'iota', 'spin_1x', 'spin_1y',
39 'spin_1z', 'spin_2x', 'spin_2y', 'spin_2z', 'chi_p', 'chi_eff',
40 'cos_tilt_1', 'cos_tilt_2', 'redshift', 'comoving_distance',
41 'mass_1_source', 'mass_2_source', 'total_mass_source',
42 'chirp_mass_source', 'phi_1', 'phi_2', 'cos_theta_jn', 'cos_iota',
43 'peak_luminosity_non_evolved', 'final_spin_non_evolved',
44 'final_mass_non_evolved', 'final_mass_source_non_evolved',
45 'radiated_energy_non_evolved', 'inverted_mass_ratio',
46 'viewing_angle', 'chi_p_2spin', 'comoving_volume'
47 ]
48 return parameters
51def get_list_of_files(
52 gw=False, number=1, existing_plot=False, parameters=[], sections=[],
53 outdir=".outdir", extra_gw_pages=True, remove_gw_pages=[]
54):
55 """Return a list of files that should be generated from a typical workflow
56 """
57 from pesummary.conf import gw_2d_plots
58 if not gw and not len(parameters):
59 import string
60 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
61 elif not len(parameters):
62 parameters = gw_parameters()
63 if not gw:
64 label = "core"
65 else:
66 label = "gw"
67 html = [
68 "%s/html/error.html" % (outdir),
69 "%s/html/Version.html" % (outdir),
70 "%s/html/Logging.html" % (outdir),
71 "%s/html/About.html" % (outdir),
72 "%s/html/Downloads.html" % (outdir)]
73 if gw and not len(sections):
74 sections = [
75 "spins", "spin_angles", "timings", "source", "remnant", "others",
76 "masses", "location", "inclination", "energy"
77 ]
78 elif not len(sections):
79 sections = ["A-D", "E-F", "I-L", "M-P", "Q-T"]
80 for num in range(number):
81 html.append("%s/html/%s%s_%s%s.html" % (outdir, label, num, label, num))
82 if gw and "classification" not in remove_gw_pages:
83 html.append("%s/html/%s%s_%s%s_Classification.html" % (outdir, label, num, label, num))
84 html.append("%s/html/%s%s_%s%s_Corner.html" % (outdir, label, num, label, num))
85 html.append("%s/html/%s%s_%s%s_Config.html" % (outdir, label, num, label, num))
86 html.append("%s/html/%s%s_%s%s_Custom.html" % (outdir, label, num, label, num))
87 html.append("%s/html/%s%s_%s%s_All.html" % (outdir, label, num, label, num))
88 html.append("%s/html/%s%s_%s%s_Interactive_Corner.html" % (
89 outdir, label, num, label, num
90 ))
91 for section in sections:
92 html.append("%s/html/%s%s_%s%s_%s_all.html" % (outdir, label, num, label, num, section))
93 for j in parameters:
94 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, j))
95 if gw:
96 # 2d histogram pages
97 for section in ["location", "masses", "spins", "timings"]:
98 if section == "location":
99 pairs = [p for p in gw_2d_plots if "luminosity_distance" in p]
100 if not any(all(p in parameters for p in _) for _ in pairs):
101 continue
102 elif section == "masses":
103 pairs = [p for p in gw_2d_plots if "mass_1" in p]
104 if not any(all(p in parameters for p in _) for _ in pairs):
105 continue
106 elif section == "spins":
107 pairs = [p for p in gw_2d_plots if "chi_p" in p]
108 if not any(all(p in parameters for p in _) for _ in pairs):
109 continue
110 elif section == "timings":
111 pairs = [p for p in gw_2d_plots if "geocent_time" in p]
112 if not any(all(p in parameters for p in _) for _ in pairs):
113 continue
114 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, section))
115 if existing_plot:
116 html.append("%s/html/%s%s_%s%s_Additional.html" % (outdir, label, num, label, num))
118 if number > 1:
119 html.append("%s/html/Comparison.html" % (outdir))
120 html.append("%s/html/Comparison_Custom.html" % (outdir))
121 html.append("%s/html/Comparison_All.html" % (outdir))
122 html.append("%s/html/Comparison_Interactive_Ridgeline.html" % (outdir))
123 for j in parameters:
124 if j != "classification":
125 html.append("%s/html/Comparison_%s.html" % (outdir, j))
126 for section in sections:
127 html.append("%s/html/Comparison_%s_all.html" % (outdir, section))
128 return sorted(html)
131def get_list_of_plots(
132 gw=False, number=1, mcmc=False, label=None, outdir=".outdir",
133 comparison=True, psd=False, calibration=False, existing_plot=False,
134 expert=False, waveform=False, parameters=[], extra_gw_plots=True,
135 remove_gw_plots=[]
136):
137 """Return a list of plots that should be generated from a typical workflow
138 """
139 from pesummary.conf import gw_2d_plots
140 if not gw and not len(parameters):
141 import string
143 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
144 elif not len(parameters):
145 parameters = gw_parameters()
146 if not gw and label is None:
147 label = "core"
148 elif label is None:
149 label = "gw"
151 plots = []
152 for num in range(number):
153 for i in ["sample_evolution", "autocorrelation", "1d_posterior", "cdf"]:
154 for j in parameters:
155 plots.append("%s/plots/%s%s_%s_%s.png" % (outdir, label, num, i, j))
156 if mcmc:
157 for j in parameters:
158 plots.append("%s/plots/%s%s_1d_posterior_%s_combined.png" % (outdir, label, num, j))
159 if gw:
160 for pair in gw_2d_plots:
161 if not all(p in parameters for p in pair):
162 continue
163 plots.append("%s/plots/%s%s_2d_posterior_%s_%s.png" % (
164 outdir, label, num, pair[0], pair[1]
165 ))
166 if psd:
167 plots.append("%s/plots/%s%s_psd_plot.png" % (outdir, label, num))
168 if calibration:
169 plots.append("%s/plots/%s%s_calibration_plot.png" % (outdir, label, num))
170 if waveform:
171 plots.append("%s/plots/%s%s_waveform.png" % (outdir, label, num))
172 plots.append("%s/plots/%s%s_waveform_time_domain.png" % (outdir, label, num))
173 if existing_plot:
174 plots.append("%s/plots/test.png" % (outdir))
175 if expert:
176 for j in parameters:
177 plots.append("%s/plots/%s%s_2d_contour_%s_log_likelihood.png" % (outdir, label, num, j))
178 plots.append("%s/plots/%s%s_1d_posterior_%s_bootstrap.png" % (outdir, label, num, j))
179 plots.append("%s/plots/%s%s_sample_evolution_%s_log_likelihood_colored.png" % (outdir, label, num, j))
180 if number > 1 and comparison:
181 for i in ["1d_posterior", "boxplot", "cdf"]:
182 for j in parameters:
183 plots.append("%s/plots/combined_%s_%s.png" % (outdir, i, j))
184 if gw and extra_gw_plots:
185 for num in range(number):
186 plots.append("%s/plots/%s%s_skymap.png" % (outdir, label, num))
187 if "classification" not in remove_gw_plots:
188 plots.append("%s/plots/%s%s.pesummary.em_bright.png" % (outdir, label, num))
189 plots.append("%s/plots/%s%s.pesummary.p_astro.png" % (outdir, label, num))
190 if number > 1 and comparison:
191 plots.append("%s/plots/combined_skymap.png" % (outdir))
193 return sorted(plots)
196def make_argparse(gw=True, extension="json", bilby=False, lalinference=False,
197 number=1, existing=False, disable_expert=True, outdir="./.outdir"):
198 """
199 """
200 default_args = []
201 if gw:
202 from pesummary.gw.cli.parser import ArgumentParser
203 default_args.append("--gw")
204 default_args.append("--nsamples_for_skymap")
205 default_args.append("10")
206 default_args.append("--pastro_category_file")
207 default_args.append(testing_dir + "/rates.yml")
208 default_args.append("--catch_terrestrial_probability_error")
209 else:
210 from pesummary.core.cli.parser import ArgumentParser
211 parser = ArgumentParser()
212 parser.add_all_known_options_to_parser()
213 params, data = make_result_file(
214 extension=extension, gw=gw, bilby=bilby, lalinference=lalinference,
215 outdir=outdir
216 )
217 if not existing:
218 default_args.append("--webdir")
219 else:
220 default_args.append("--existing_webdir")
221 default_args.append(outdir)
222 default_args.append("--samples")
223 for i in range(number):
224 default_args.append("%s/test.%s" % (outdir, extension))
225 default_args.append("--labels")
226 if not existing:
227 for i in range(number):
228 if not gw:
229 default_args.append("core%s" % (i))
230 else:
231 default_args.append("gw%s" % (i))
232 else:
233 if not gw:
234 default_args.append("core1")
235 else:
236 default_args.append("gw1")
237 default_args.append("--config")
238 for i in range(number):
239 default_args.append(testing_dir + "/example_config.ini")
240 if disable_expert:
241 default_args.append("--disable_expert")
242 opts = parser.parse_args(default_args)
243 if gw:
244 func = WebpagePlusPlottingPlusMetaFileInput
245 else:
246 func = Input
247 return opts, func(opts)
250def read_result_file(outdir="./.outdir", extension="json", bilby=False,
251 lalinference=False, pesummary=False):
252 """
253 """
254 if bilby:
255 from bilby.core.result import read_in_result
257 if extension == "json":
258 f = read_in_result(outdir + "/test.json")
259 elif extension == "h5" or extension == "hdf5":
260 f = read_in_result(outdir + "/test.hdf5")
261 posterior = f.posterior
262 posterior = posterior.select_dtypes(include=[float, int])
263 samples = {key: val for key, val in posterior.items()}
264 elif lalinference:
265 import h5py
267 f = h5py.File(outdir + "/test.hdf5", "r")
269 posterior = f["lalinference"]["lalinference_nest"]["posterior_samples"]
270 samples = {
271 i: [j[num] for j in posterior] for num, i in enumerate(
272 posterior.dtype.names
273 )
274 }
275 elif pesummary:
276 from pesummary.gw.file.read import read
278 if extension == "json":
279 f = read(outdir + "/test.json")
280 else:
281 f = read(outdir + "/test.h5")
282 data = f.samples_dict
283 labels = f.labels
284 samples = data[labels[0]]
285 elif extension == "dat":
286 import numpy as np
288 data = np.genfromtxt(outdir + "/test.dat", names=True)
289 samples = {
290 i: [j[num] for j in data] for num, i in enumerate(data.dtype.names)
291 }
292 else:
293 samples = {}
294 return samples
297def make_psd(outdir="./.outdir"):
298 """Make a psd file
299 """
300 frequencies = np.linspace(0, 1024, 1000)
301 strains = np.random.uniform(10, 0.1, 1000)
302 data = np.vstack([frequencies, strains]).T
303 np.savetxt(
304 "{}/psd.dat".format(outdir), data, delimiter="\t",
305 header="\t".join(["frequencies", "strain"])
306 )
309def make_calibration(outdir="./.outdir"):
310 """Make a calibration file
311 """
312 frequencies = np.linspace(0, 1024, 1000)
313 columns = [np.random.uniform(10, 0.1, 1000) for _ in range(6)]
314 data = np.vstack([frequencies] + columns).T
315 np.savetxt(
316 "{}/calibration.dat".format(outdir), data, delimiter="\t",
317 header="\t".join(["frequencies", "a", "b", "c", "d", "e", "f"])
318 )
321def make_injection_file(
322 outdir="./.outdir", extension="json", return_filename=True,
323 return_injection_dict=True
324):
325 """
326 """
327 from pesummary.io import write
329 filename = os.path.join(outdir, "injection.{}".format(extension))
330 parameters = gw_parameters()
331 samples = np.array([[np.random.random()] for i in range(len(parameters))]).T
332 write(parameters, samples, filename=filename, file_format=extension)
333 args = []
334 if return_filename:
335 args.append(filename)
336 if return_injection_dict:
337 args.append({param: samples[0][num] for num, param in enumerate(parameters)})
338 return args
341def make_result_file(outdir="./.outdir/", extension="json", gw=True, bilby=False,
342 lalinference=False, pesummary=False, pesummary_label="label",
343 config=None, psd=None, calibration=None, random_seed=None,
344 n_samples=1000):
345 """Make a result file that can be read in by PESummary
347 Parameters
348 ----------
349 outdir: str
350 directory where you would like to store the result file
351 extension: str
352 the file extension of the result file
353 gw: Bool
354 if True, gw parameters will be used
355 """
356 if random_seed is not None:
357 np.random.seed(random_seed)
358 print(extension, gw, bilby, lalinference, pesummary)
359 if outdir[-1] != "/":
360 outdir += "/"
361 data = np.array([np.random.random(18) for i in range(n_samples)])
362 if gw:
363 parameters = ["mass_1", "mass_2", "a_1", "a_2", "tilt_1", "tilt_2",
364 "phi_jl", "phi_12", "psi", "theta_jn", "ra", "dec",
365 "luminosity_distance", "geocent_time", "redshift",
366 "mass_1_source", "mass_2_source", "log_likelihood"]
367 distance = np.random.random(n_samples) * 500
368 mass_1 = np.random.random(n_samples) * 100
369 q = np.random.random(n_samples) * 100
370 a_1 = np.random.uniform(0, 0.99, n_samples)
371 a_2 = np.random.uniform(0, 0.99, n_samples)
372 for num, i in enumerate(data):
373 data[num][12] = distance[num]
374 data[num][0] = mass_1[num]
375 data[num][1] = mass_1[num] * q[num]
376 data[num][2] = a_1[num]
377 data[num][3] = a_2[num]
378 data[num][15] = mass_1[num] / (1 + data[num][14])
379 data[num][16] = (mass_1[num] * q[num]) / (1 + data[num][15])
380 else:
381 import string
383 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
384 if extension == "dat":
385 np.savetxt(outdir + "test.dat", data, delimiter=" ",
386 header=" ".join(parameters), comments="")
387 elif extension == "csv":
388 np.savetxt(outdir + "test.csv", data, delimiter=",",
389 header=",".join(parameters), comments="")
390 elif extension == "npy":
391 from pesummary.utils.samples_dict import SamplesDict
392 samples = SamplesDict(parameters, np.array(data).T).to_structured_array()
393 np.save(outdir + "test.npy", samples)
394 elif extension == "json" and not bilby and not pesummary and not lalinference:
395 import json
397 dictionary = {"NameOfCode": {"posterior_samples": {key:
398 [i[num] for i in data] for num, key in
399 enumerate(parameters)}}}
400 with open(outdir + "test.json", "w") as f:
401 json.dump(dictionary, f, indent=4, sort_keys=True)
402 elif (extension == "hdf5" or extension == "h5") and not bilby and not pesummary and not lalinference:
403 import h5py
405 h5py_data = np.array(
406 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters])
407 f = h5py.File(outdir + "test.h5", "w")
408 name = f.create_group("NameOfCode")
409 samples = f.create_dataset("posterior_samples", data=h5py_data)
410 f.close()
411 elif bilby and not pesummary and not lalinference:
412 import bilby
413 from bilby.core.result import Result
414 from bilby.core.prior import PriorDict
415 from pandas import DataFrame
417 priors = PriorDict()
418 priors.update({"%s" % (i): bilby.core.prior.Uniform(0.1, 0.5, 0) for i in parameters})
419 posterior_data_frame = DataFrame(data, columns=parameters)
420 injection_parameters = {par: 1. for par in parameters}
421 bilby_object = Result(
422 search_parameter_keys=parameters, samples=data,
423 posterior=posterior_data_frame, label="test",
424 injection_parameters=injection_parameters,
425 priors=priors,
426 log_bayes_factor=0.5, log_evidence_err=0.1, log_noise_evidence=0.1,
427 log_evidence=0.2, version=["bilby=0.5.3:"],
428 meta_data={"likelihood": {"time_marginalization": "True"}})
429 if extension == "json":
430 bilby_object.save_to_file(
431 filename=outdir + "test.json", extension="json")
432 elif extension == "hdf5" or extension == "h5":
433 bilby_object.save_to_file(
434 filename=outdir + "test.hdf5", extension="hdf5")
435 elif lalinference and not bilby and not pesummary:
436 import h5py
438 h5py_data = np.array(
439 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters])
440 f = h5py.File(outdir + "test.hdf5", "w")
441 lalinference = f.create_group("lalinference")
442 nest = lalinference.create_group("lalinference_nest")
443 samples = nest.create_dataset("posterior_samples", data=h5py_data)
444 f.close()
445 elif pesummary and not lalinference and not bilby:
446 dictionary = {
447 pesummary_label: {
448 "posterior_samples": {
449 "parameter_names": parameters,
450 "samples": [list(i) for i in data]
451 },
452 "injection_data": {
453 "injection_values": [float("nan") for i in range(len(parameters))]
454 },
455 "version": ["No version information found"],
456 "meta_data": {
457 "sampler": {"log_evidence": 0.5},
458 "meta_data": {}
459 }
460 },
461 "version": {
462 "pesummary": ["v0.1.7"]
463 }
464 }
465 if config is not None:
466 dictionary[pesummary_label]["config_file"] = config
467 if psd is not None:
468 dictionary[pesummary_label]["psds"] = psds
469 if calibration is not None:
470 dictionary[pesummary_label]["calibration_envelope"] = calibration
472 if extension == "json":
473 import json
475 with open(outdir + "test.json", "w") as f:
476 json.dump(dictionary, f, indent=4, sort_keys=True)
477 elif extension == "hdf5" or extension == "h5":
478 import h5py
479 from pesummary.core.file.meta_file import recursively_save_dictionary_to_hdf5_file
481 f = h5py.File(outdir + "test.h5", "w")
482 recursively_save_dictionary_to_hdf5_file(
483 f, dictionary, extra_keys=list(dictionary.keys())
484 )
485 f.close()
486 return parameters, data
489testing_dir = str(Path(__file__).parent)
490data_dir = str(Path(__file__).parent / "files")