Coverage for pesummary/tests/base.py: 73.3%
322 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-15 17:49 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-15 17:49 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import os
4import numpy as np
5from pathlib import Path
6from pesummary.gw.cli.inputs import WebpagePlusPlottingPlusMetaFileInput
7from pesummary.core.cli.inputs import (
8 WebpagePlusPlottingPlusMetaFileInput as Input
9)
11__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
14class Namespace(object):
15 def __init__(self, **kwargs):
16 self.__dict__.update(kwargs)
19def namespace(args):
20 """Generate a namespace for testing purposes
22 Parameters
23 ----------
24 args: dict
25 dictionary of arguments
26 """
27 base = Namespace()
28 for key in list(args.keys()):
29 setattr(base, key, args[key])
30 return base
33def gw_parameters():
34 parameters = [
35 'mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2', 'phi_jl',
36 'phi_12', 'psi', 'theta_jn', 'ra', 'dec', 'luminosity_distance',
37 'geocent_time', 'log_likelihood', 'mass_ratio', 'total_mass',
38 'chirp_mass', 'symmetric_mass_ratio', 'iota', 'spin_1x', 'spin_1y',
39 'spin_1z', 'spin_2x', 'spin_2y', 'spin_2z', 'chi_p', 'chi_eff',
40 'cos_tilt_1', 'cos_tilt_2', 'redshift', 'comoving_distance',
41 'mass_1_source', 'mass_2_source', 'total_mass_source',
42 'chirp_mass_source', 'phi_1', 'phi_2', 'cos_theta_jn', 'cos_iota',
43 'peak_luminosity_non_evolved', 'final_spin_non_evolved',
44 'final_mass_non_evolved', 'final_mass_source_non_evolved',
45 'radiated_energy_non_evolved', 'inverted_mass_ratio',
46 'viewing_angle', 'chi_p_2spin', 'comoving_volume'
47 ]
48 return parameters
51def get_list_of_files(
52 gw=False, number=1, existing_plot=False, parameters=[], sections=[],
53 outdir=".outdir", extra_gw_pages=True, remove_gw_pages=[]
54):
55 """Return a list of files that should be generated from a typical workflow
56 """
57 from pesummary.conf import gw_2d_plots
58 if not gw and not len(parameters):
59 import string
60 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
61 elif not len(parameters):
62 parameters = gw_parameters()
63 if not gw:
64 label = "core"
65 else:
66 label = "gw"
67 html = [
68 "%s/html/error.html" % (outdir),
69 "%s/html/Version.html" % (outdir),
70 "%s/html/Logging.html" % (outdir),
71 "%s/html/About.html" % (outdir),
72 "%s/html/Downloads.html" % (outdir)]
73 if gw and not len(sections):
74 sections = [
75 "spins", "spin_angles", "timings", "source", "remnant", "others",
76 "masses", "location", "inclination", "energy"
77 ]
78 elif not len(sections):
79 sections = ["A-D", "E-F", "I-L", "M-P", "Q-T"]
80 for num in range(number):
81 html.append("%s/html/%s%s_%s%s.html" % (outdir, label, num, label, num))
82 if gw and "classification" not in remove_gw_pages:
83 html.append("%s/html/%s%s_%s%s_Classification.html" % (outdir, label, num, label, num))
84 html.append("%s/html/%s%s_%s%s_Corner.html" % (outdir, label, num, label, num))
85 html.append("%s/html/%s%s_%s%s_Config.html" % (outdir, label, num, label, num))
86 html.append("%s/html/%s%s_%s%s_Custom.html" % (outdir, label, num, label, num))
87 html.append("%s/html/%s%s_%s%s_All.html" % (outdir, label, num, label, num))
88 html.append("%s/html/%s%s_%s%s_Interactive_Corner.html" % (
89 outdir, label, num, label, num
90 ))
91 for section in sections:
92 html.append("%s/html/%s%s_%s%s_%s_all.html" % (outdir, label, num, label, num, section))
93 for j in parameters:
94 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, j))
95 if gw:
96 # 2d histogram pages
97 for section in ["location", "masses", "spins", "timings"]:
98 if section == "location":
99 pairs = [p for p in gw_2d_plots if "luminosity_distance" in p]
100 if not any(all(p in parameters for p in _) for _ in pairs):
101 continue
102 elif section == "masses":
103 pairs = [p for p in gw_2d_plots if "mass_1" in p]
104 if not any(all(p in parameters for p in _) for _ in pairs):
105 continue
106 elif section == "spins":
107 pairs = [p for p in gw_2d_plots if "chi_p" in p]
108 if not any(all(p in parameters for p in _) for _ in pairs):
109 continue
110 elif section == "timings":
111 pairs = [p for p in gw_2d_plots if "geocent_time" in p]
112 if not any(all(p in parameters for p in _) for _ in pairs):
113 continue
114 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, section))
115 if existing_plot:
116 html.append("%s/html/%s%s_%s%s_Additional.html" % (outdir, label, num, label, num))
118 if number > 1:
119 html.append("%s/html/Comparison.html" % (outdir))
120 html.append("%s/html/Comparison_Custom.html" % (outdir))
121 html.append("%s/html/Comparison_All.html" % (outdir))
122 html.append("%s/html/Comparison_Interactive_Ridgeline.html" % (outdir))
123 for j in parameters:
124 if j != "classification":
125 html.append("%s/html/Comparison_%s.html" % (outdir, j))
126 for section in sections:
127 html.append("%s/html/Comparison_%s_all.html" % (outdir, section))
128 return sorted(html)
131def get_list_of_plots(
132 gw=False, number=1, mcmc=False, label=None, outdir=".outdir",
133 comparison=True, psd=False, calibration=False, existing_plot=False,
134 expert=False, waveform=False, parameters=[], extra_gw_plots=True,
135 remove_gw_plots=[]
136):
137 """Return a list of plots that should be generated from a typical workflow
138 """
139 from pesummary.conf import gw_2d_plots
140 if not gw and not len(parameters):
141 import string
143 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
144 elif not len(parameters):
145 parameters = gw_parameters()
146 if not gw and label is None:
147 label = "core"
148 elif label is None:
149 label = "gw"
151 plots = []
152 for num in range(number):
153 for i in ["sample_evolution", "autocorrelation", "1d_posterior", "cdf"]:
154 for j in parameters:
155 plots.append("%s/plots/%s%s_%s_%s.png" % (outdir, label, num, i, j))
156 if mcmc:
157 for j in parameters:
158 plots.append("%s/plots/%s%s_1d_posterior_%s_combined.png" % (outdir, label, num, j))
159 if gw:
160 for pair in gw_2d_plots:
161 if not all(p in parameters for p in pair):
162 continue
163 plots.append("%s/plots/%s%s_2d_posterior_%s_%s.png" % (
164 outdir, label, num, pair[0], pair[1]
165 ))
166 if psd:
167 plots.append("%s/plots/%s%s_psd_plot.png" % (outdir, label, num))
168 if calibration:
169 plots.append("%s/plots/%s%s_calibration_plot.png" % (outdir, label, num))
170 if waveform:
171 plots.append("%s/plots/%s%s_waveform.png" % (outdir, label, num))
172 plots.append("%s/plots/%s%s_waveform_time_domain.png" % (outdir, label, num))
173 if existing_plot:
174 plots.append("%s/plots/test.png" % (outdir))
175 if expert:
176 for j in parameters:
177 plots.append("%s/plots/%s%s_2d_contour_%s_log_likelihood.png" % (outdir, label, num, j))
178 plots.append("%s/plots/%s%s_1d_posterior_%s_bootstrap.png" % (outdir, label, num, j))
179 plots.append("%s/plots/%s%s_sample_evolution_%s_log_likelihood_colored.png" % (outdir, label, num, j))
180 if number > 1 and comparison:
181 for i in ["1d_posterior", "boxplot", "cdf"]:
182 for j in parameters:
183 plots.append("%s/plots/combined_%s_%s.png" % (outdir, i, j))
184 if gw and extra_gw_plots:
185 for num in range(number):
186 plots.append("%s/plots/%s%s_skymap.png" % (outdir, label, num))
187 if "classification" not in remove_gw_plots:
188 plots.append("%s/plots/%s%s.pesummary.em_bright.png" % (outdir, label, num))
189 plots.append("%s/plots/%s%s.pesummary.p_astro.png" % (outdir, label, num))
190 if number > 1 and comparison:
191 plots.append("%s/plots/combined_skymap.png" % (outdir))
193 return sorted(plots)
196def make_argparse(gw=True, extension="json", bilby=False, lalinference=False, dingo=False,
197 number=1, existing=False, disable_expert=True, outdir="./.outdir"):
198 """
199 """
200 default_args = []
201 if gw:
202 from pesummary.gw.cli.parser import ArgumentParser
203 default_args.append("--gw")
204 default_args.append("--nsamples_for_skymap")
205 default_args.append("10")
206 default_args.append("--pastro_category_file")
207 default_args.append(testing_dir + "/rates.yml")
208 default_args.append("--catch_terrestrial_probability_error")
209 else:
210 from pesummary.core.cli.parser import ArgumentParser
211 parser = ArgumentParser()
212 parser.add_all_known_options_to_parser()
213 params, data = make_result_file(
214 extension=extension, gw=gw, bilby=bilby, lalinference=lalinference,
215 outdir=outdir
216 )
217 if not existing:
218 default_args.append("--webdir")
219 else:
220 default_args.append("--existing_webdir")
221 default_args.append(outdir)
222 default_args.append("--samples")
223 for i in range(number):
224 default_args.append("%s/test.%s" % (outdir, extension))
225 default_args.append("--labels")
226 if not existing:
227 for i in range(number):
228 if not gw:
229 default_args.append("core%s" % (i))
230 else:
231 default_args.append("gw%s" % (i))
232 else:
233 if not gw:
234 default_args.append("core1")
235 else:
236 default_args.append("gw1")
237 default_args.append("--config")
238 for i in range(number):
239 default_args.append(testing_dir + "/example_config.ini")
240 if disable_expert:
241 default_args.append("--disable_expert")
242 opts = parser.parse_args(default_args)
243 if gw:
244 func = WebpagePlusPlottingPlusMetaFileInput
245 else:
246 func = Input
247 return opts, func(opts)
250def read_result_file(outdir="./.outdir", extension="json", bilby=False,
251 lalinference=False, pesummary=False):
252 """
253 """
254 if bilby:
255 from bilby.core.result import read_in_result
257 if extension == "json":
258 f = read_in_result(outdir + "/test.json")
259 elif extension == "h5" or extension == "hdf5":
260 f = read_in_result(outdir + "/test.hdf5")
261 posterior = f.posterior
262 posterior = posterior.select_dtypes(include=[float, int])
263 samples = {key: val for key, val in posterior.items()}
264 elif lalinference:
265 import h5py
267 f = h5py.File(outdir + "/test.hdf5", "r")
269 posterior = f["lalinference"]["lalinference_nest"]["posterior_samples"]
270 samples = {
271 i: [j[num] for j in posterior] for num, i in enumerate(
272 posterior.dtype.names
273 )
274 }
275 elif pesummary:
276 from pesummary.gw.file.read import read
278 if extension == "json":
279 f = read(outdir + "/test.json")
280 else:
281 f = read(outdir + "/test.h5")
282 data = f.samples_dict
283 labels = f.labels
284 samples = data[labels[0]]
285 elif extension == "dat":
286 import numpy as np
288 data = np.genfromtxt(outdir + "/test.dat", names=True)
289 samples = {
290 i: [j[num] for j in data] for num, i in enumerate(data.dtype.names)
291 }
292 else:
293 samples = {}
294 return samples
297def make_psd(outdir="./.outdir"):
298 """Make a psd file
299 """
300 frequencies = np.linspace(0, 1024, 1000)
301 strains = np.random.uniform(10, 0.1, 1000)
302 data = np.vstack([frequencies, strains]).T
303 np.savetxt(
304 "{}/psd.dat".format(outdir), data, delimiter="\t",
305 header="\t".join(["frequencies", "strain"])
306 )
309def make_calibration(outdir="./.outdir"):
310 """Make a calibration file
311 """
312 frequencies = np.linspace(0, 1024, 1000)
313 columns = [np.random.uniform(10, 0.1, 1000) for _ in range(6)]
314 data = np.vstack([frequencies] + columns).T
315 np.savetxt(
316 "{}/calibration.dat".format(outdir), data, delimiter="\t",
317 header="\t".join(["frequencies", "a", "b", "c", "d", "e", "f"])
318 )
321def make_injection_file(
322 outdir="./.outdir", extension="json", return_filename=True,
323 return_injection_dict=True
324):
325 """
326 """
327 from pesummary.io import write
329 filename = os.path.join(outdir, "injection.{}".format(extension))
330 parameters = gw_parameters()
331 samples = np.array([[np.random.random()] for i in range(len(parameters))]).T
332 write(parameters, samples, filename=filename, file_format=extension)
333 args = []
334 if return_filename:
335 args.append(filename)
336 if return_injection_dict:
337 args.append({param: samples[0][num] for num, param in enumerate(parameters)})
338 return args
341def make_result_file(outdir="./.outdir/", extension="json", gw=True, bilby=False, dingo=False,
342 lalinference=False, pesummary=False, pesummary_label="label",
343 config=None, psd=None, calibration=None, random_seed=None,
344 n_samples=1000):
345 """Make a result file that can be read in by PESummary
347 Parameters
348 ----------
349 outdir: str
350 directory where you would like to store the result file
351 extension: str
352 the file extension of the result file
353 gw: Bool
354 if True, gw parameters will be used
355 """
356 if random_seed is not None:
357 np.random.seed(random_seed)
358 if outdir[-1] != "/":
359 outdir += "/"
360 data = np.array([np.random.random(18) for i in range(n_samples)])
361 if gw:
362 parameters = ["mass_1", "mass_2", "a_1", "a_2", "tilt_1", "tilt_2",
363 "phi_jl", "phi_12", "psi", "theta_jn", "ra", "dec",
364 "luminosity_distance", "geocent_time", "redshift",
365 "mass_1_source", "mass_2_source", "log_likelihood"]
366 distance = np.random.random(n_samples) * 500
367 mass_1 = np.random.random(n_samples) * 100
368 q = np.random.random(n_samples) * 100
369 a_1 = np.random.uniform(0, 0.99, n_samples)
370 a_2 = np.random.uniform(0, 0.99, n_samples)
371 for num, i in enumerate(data):
372 data[num][12] = distance[num]
373 data[num][0] = mass_1[num]
374 data[num][1] = mass_1[num] * q[num]
375 data[num][2] = a_1[num]
376 data[num][3] = a_2[num]
377 data[num][15] = mass_1[num] / (1 + data[num][14])
378 data[num][16] = (mass_1[num] * q[num]) / (1 + data[num][15])
379 else:
380 import string
382 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
384 if extension == "dat":
385 np.savetxt(
386 outdir + "test.dat",
387 data,
388 delimiter=" ",
389 header=" ".join(parameters),
390 comments="",
391 )
392 elif extension == "csv":
393 np.savetxt(
394 outdir + "test.csv",
395 data,
396 delimiter=",",
397 header=",".join(parameters),
398 comments="",
399 )
400 elif extension == "npy":
401 from pesummary.utils.samples_dict import SamplesDict
403 samples = SamplesDict(parameters, np.array(data).T).to_structured_array()
404 np.save(outdir + "test.npy", samples)
405 elif (
406 extension == "json"
407 and not bilby
408 and not pesummary
409 and not lalinference
410 and not dingo
411 ):
412 import json
414 dictionary = {
415 "NameOfCode": {
416 "posterior_samples": {
417 key: [i[num] for i in data] for num, key in enumerate(parameters)
418 }
419 }
420 }
421 with open(outdir + "test.json", "w") as f:
422 json.dump(dictionary, f, indent=4, sort_keys=True)
423 elif (
424 (extension == "hdf5" or extension == "h5")
425 and not bilby
426 and not pesummary
427 and not lalinference
428 and not dingo
429 ):
430 import h5py
432 h5py_data = np.array(
433 [tuple(i) for i in data], dtype=[tuple([i, "float64"]) for i in parameters]
434 )
435 f = h5py.File(outdir + "test.h5", "w")
436 name = f.create_group("NameOfCode")
437 samples = f.create_dataset("posterior_samples", data=h5py_data)
438 f.close()
439 elif bilby and not pesummary and not lalinference and not dingo:
440 import bilby
441 from bilby.core.result import Result
442 from bilby.core.prior import PriorDict
443 from pandas import DataFrame
445 priors = PriorDict()
446 priors.update({"%s" % (i): bilby.core.prior.Uniform(0.1, 0.5, 0) for i in parameters})
447 posterior_data_frame = DataFrame(data, columns=parameters)
448 injection_parameters = {par: 1. for par in parameters}
449 bilby_object = Result(
450 search_parameter_keys=parameters, samples=data,
451 posterior=posterior_data_frame, label="test",
452 injection_parameters=injection_parameters,
453 priors=priors,
454 log_bayes_factor=0.5, log_evidence_err=0.1, log_noise_evidence=0.1,
455 log_evidence=0.2, version=["bilby=0.5.3:"],
456 meta_data={"likelihood": {"time_marginalization": "True"}})
457 if extension == "json":
458 bilby_object.save_to_file(
459 filename=outdir + "test.json", extension="json")
460 elif extension == "hdf5" or extension == "h5":
461 bilby_object.save_to_file(
462 filename=outdir + "test.hdf5", extension="hdf5")
463 elif dingo and not pesummary and not lalinference and not bilby:
464 import bilby
465 import dingo
466 from pandas import DataFrame
467 from dingo.gw.result import Result
469 intrinsic_parameters = [
470 "mass_1",
471 "mass_2",
472 "a_1",
473 "a_2",
474 "tilt_1",
475 "tilt_2",
476 "phi_jl",
477 "phi_12",
478 "log_likelihood",
479 "redshift",
480 "mass_1_source",
481 "mass_2_source",
482 ]
483 extrinsic_parameters = [
484 "psi",
485 "theta_jn",
486 "ra",
487 "dec",
488 "luminosity_distance",
489 "geocent_time",
490 ]
491 assert set(parameters) == set(intrinsic_parameters + extrinsic_parameters)
492 settings = {
493 "train_settings": {
494 "data": {
495 "unconditional": False,
496 "extrinsic_prior": {
497 "%s" % (i): "bilby.core.prior.Uniform(0.1, 0.5, 0)"
498 for i in extrinsic_parameters
499 },
500 "window": {"type": "tukey", "f_s": 4096, "T": 8.0, "roll_off": 0.4},
501 "ref_time": 1126259462.0,
502 },
503 },
504 "dataset_settings": {
505 "intrinsic_prior": {
506 "%s" % (i): "bilby.core.prior.Uniform(0.1, 0.5, 0)"
507 for i in intrinsic_parameters
508 },
509 "domain": {
510 "type": "FrequencyDomain",
511 "f_min": 20.0,
512 "f_max": 1024.0,
513 "delta_f": 0.125,
514 },
515 "waveform_generator": {"spin_conversion_phase": None, "f_ref": 20.0},
516 },
517 }
519 weights = np.random.random(n_samples)
520 tmp_parameters = parameters + ["weights"]
521 tmp_data = np.c_[data, weights]
522 posterior_data_frame = DataFrame(tmp_data, columns=tmp_parameters)
524 dictionary = {
525 "samples": posterior_data_frame,
526 "context": None,
527 "event_metadata": None,
528 "importance_sampling_metadata": None,
529 "log_evidence": None,
530 "log_noise_evidence": None,
531 "settings": settings,
532 }
533 dingo_object = Result(file_name=None, dictionary=dictionary)
534 if extension == "hdf5" or extension == "h5":
535 dingo_object.to_file(file_name=outdir + f"test.{extension}")
537 elif lalinference and not bilby and not pesummary and not dingo:
538 import h5py
540 h5py_data = np.array(
541 [tuple(i) for i in data], dtype=[tuple([i, "float64"]) for i in parameters]
542 )
543 f = h5py.File(outdir + "test.hdf5", "w")
544 lalinference = f.create_group("lalinference")
545 nest = lalinference.create_group("lalinference_nest")
546 samples = nest.create_dataset("posterior_samples", data=h5py_data)
547 f.close()
548 elif pesummary and not lalinference and not bilby and not dingo:
549 dictionary = {
550 pesummary_label: {
551 "posterior_samples": {
552 "parameter_names": parameters,
553 "samples": [list(i) for i in data],
554 },
555 "injection_data": {
556 "injection_values": [float("nan") for i in range(len(parameters))]
557 },
558 "version": ["No version information found"],
559 "meta_data": {"sampler": {"log_evidence": 0.5}, "meta_data": {}},
560 },
561 "version": {"pesummary": ["v0.1.7"]},
562 }
563 if config is not None:
564 dictionary[pesummary_label]["config_file"] = config
565 if psd is not None:
566 dictionary[pesummary_label]["psds"] = psds
567 if calibration is not None:
568 dictionary[pesummary_label]["calibration_envelope"] = calibration
570 if extension == "json":
571 import json
573 with open(outdir + "test.json", "w") as f:
574 json.dump(dictionary, f, indent=4, sort_keys=True)
575 elif extension == "hdf5" or extension == "h5":
576 import h5py
577 from pesummary.core.file.meta_file import recursively_save_dictionary_to_hdf5_file
579 f = h5py.File(outdir + "test.h5", "w")
580 recursively_save_dictionary_to_hdf5_file(
581 f, dictionary, extra_keys=list(dictionary.keys())
582 )
583 f.close()
584 return parameters, data
587testing_dir = str(Path(__file__).parent)
588data_dir = str(Path(__file__).parent / "files")