Coverage for pesummary/tests/base.py: 85.4%
302 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import os
4import numpy as np
5from pathlib import Path
6from pesummary.gw.cli.inputs import WebpagePlusPlottingPlusMetaFileInput
7from pesummary.core.cli.inputs import (
8 WebpagePlusPlottingPlusMetaFileInput as Input
9)
11__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
14class Namespace(object):
15 def __init__(self, **kwargs):
16 self.__dict__.update(kwargs)
19def namespace(args):
20 """Generate a namespace for testing purposes
22 Parameters
23 ----------
24 args: dict
25 dictionary of arguments
26 """
27 base = Namespace()
28 for key in list(args.keys()):
29 setattr(base, key, args[key])
30 return base
33def gw_parameters():
34 parameters = [
35 'mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2', 'phi_jl',
36 'phi_12', 'psi', 'theta_jn', 'ra', 'dec', 'luminosity_distance',
37 'geocent_time', 'log_likelihood', 'mass_ratio', 'total_mass',
38 'chirp_mass', 'symmetric_mass_ratio', 'iota', 'spin_1x', 'spin_1y',
39 'spin_1z', 'spin_2x', 'spin_2y', 'spin_2z', 'chi_p', 'chi_eff',
40 'cos_tilt_1', 'cos_tilt_2', 'redshift', 'comoving_distance',
41 'mass_1_source', 'mass_2_source', 'total_mass_source',
42 'chirp_mass_source', 'phi_1', 'phi_2', 'cos_theta_jn', 'cos_iota',
43 'peak_luminosity_non_evolved', 'final_spin_non_evolved',
44 'final_mass_non_evolved', 'final_mass_source_non_evolved',
45 'radiated_energy_non_evolved', 'inverted_mass_ratio',
46 'viewing_angle', 'chi_p_2spin', 'comoving_volume'
47 ]
48 return parameters
51def get_list_of_files(
52 gw=False, number=1, existing_plot=False, parameters=[], sections=[],
53 outdir=".outdir", extra_gw_pages=True
54):
55 """Return a list of files that should be generated from a typical workflow
56 """
57 from pesummary.conf import gw_2d_plots
58 if not gw and not len(parameters):
59 import string
60 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
61 elif not len(parameters):
62 parameters = gw_parameters()
63 if not gw:
64 label = "core"
65 else:
66 label = "gw"
67 html = [
68 "%s/html/error.html" % (outdir),
69 "%s/html/Version.html" % (outdir),
70 "%s/html/Logging.html" % (outdir),
71 "%s/html/About.html" % (outdir),
72 "%s/html/Downloads.html" % (outdir)]
73 if gw and not len(sections):
74 sections = [
75 "spins", "spin_angles", "timings", "source", "remnant", "others",
76 "masses", "location", "inclination", "energy"
77 ]
78 elif not len(sections):
79 sections = ["A-D", "E-F", "I-L", "M-P", "Q-T"]
80 for num in range(number):
81 html.append("%s/html/%s%s_%s%s.html" % (outdir, label, num, label, num))
82 if gw and extra_gw_pages:
83 html.append("%s/html/%s%s_%s%s_Classification.html" % (outdir, label, num, label, num))
84 html.append("%s/html/%s%s_%s%s_Corner.html" % (outdir, label, num, label, num))
85 html.append("%s/html/%s%s_%s%s_Config.html" % (outdir, label, num, label, num))
86 html.append("%s/html/%s%s_%s%s_Custom.html" % (outdir, label, num, label, num))
87 html.append("%s/html/%s%s_%s%s_All.html" % (outdir, label, num, label, num))
88 html.append("%s/html/%s%s_%s%s_Interactive_Corner.html" % (
89 outdir, label, num, label, num
90 ))
91 for section in sections:
92 html.append("%s/html/%s%s_%s%s_%s_all.html" % (outdir, label, num, label, num, section))
93 for j in parameters:
94 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, j))
95 if gw:
96 # 2d histogram pages
97 for section in ["location", "masses", "spins", "timings"]:
98 if section == "location":
99 pairs = [p for p in gw_2d_plots if "luminosity_distance" in p]
100 if not any(all(p in parameters for p in _) for _ in pairs):
101 continue
102 elif section == "masses":
103 pairs = [p for p in gw_2d_plots if "mass_1" in p]
104 if not any(all(p in parameters for p in _) for _ in pairs):
105 continue
106 elif section == "spins":
107 pairs = [p for p in gw_2d_plots if "chi_p" in p]
108 if not any(all(p in parameters for p in _) for _ in pairs):
109 continue
110 elif section == "timings":
111 pairs = [p for p in gw_2d_plots if "geocent_time" in p]
112 if not any(all(p in parameters for p in _) for _ in pairs):
113 continue
114 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, section))
115 if existing_plot:
116 html.append("%s/html/%s%s_%s%s_Additional.html" % (outdir, label, num, label, num))
118 if number > 1:
119 html.append("%s/html/Comparison.html" % (outdir))
120 html.append("%s/html/Comparison_Custom.html" % (outdir))
121 html.append("%s/html/Comparison_All.html" % (outdir))
122 html.append("%s/html/Comparison_Interactive_Ridgeline.html" % (outdir))
123 for j in parameters:
124 if j != "classification":
125 html.append("%s/html/Comparison_%s.html" % (outdir, j))
126 for section in sections:
127 html.append("%s/html/Comparison_%s_all.html" % (outdir, section))
128 return sorted(html)
131def get_list_of_plots(
132 gw=False, number=1, mcmc=False, label=None, outdir=".outdir",
133 comparison=True, psd=False, calibration=False, existing_plot=False,
134 expert=False, waveform=False, parameters=[], extra_gw_plots=True
135):
136 """Return a list of plots that should be generated from a typical workflow
137 """
138 from pesummary.conf import gw_2d_plots
139 if not gw and not len(parameters):
140 import string
142 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
143 elif not len(parameters):
144 parameters = gw_parameters()
145 if not gw and label is None:
146 label = "core"
147 elif label is None:
148 label = "gw"
150 plots = []
151 for num in range(number):
152 for i in ["sample_evolution", "autocorrelation", "1d_posterior", "cdf"]:
153 for j in parameters:
154 plots.append("%s/plots/%s%s_%s_%s.png" % (outdir, label, num, i, j))
155 if mcmc:
156 for j in parameters:
157 plots.append("%s/plots/%s%s_1d_posterior_%s_combined.png" % (outdir, label, num, j))
158 if gw:
159 for pair in gw_2d_plots:
160 if not all(p in parameters for p in pair):
161 continue
162 plots.append("%s/plots/%s%s_2d_posterior_%s_%s.png" % (
163 outdir, label, num, pair[0], pair[1]
164 ))
165 if psd:
166 plots.append("%s/plots/%s%s_psd_plot.png" % (outdir, label, num))
167 if calibration:
168 plots.append("%s/plots/%s%s_calibration_plot.png" % (outdir, label, num))
169 if waveform:
170 plots.append("%s/plots/%s%s_waveform.png" % (outdir, label, num))
171 plots.append("%s/plots/%s%s_waveform_time_domain.png" % (outdir, label, num))
172 if existing_plot:
173 plots.append("%s/plots/test.png" % (outdir))
174 if expert:
175 for j in parameters:
176 plots.append("%s/plots/%s%s_2d_contour_%s_log_likelihood.png" % (outdir, label, num, j))
177 plots.append("%s/plots/%s%s_1d_posterior_%s_bootstrap.png" % (outdir, label, num, j))
178 plots.append("%s/plots/%s%s_sample_evolution_%s_log_likelihood_colored.png" % (outdir, label, num, j))
179 if number > 1 and comparison:
180 for i in ["1d_posterior", "boxplot", "cdf"]:
181 for j in parameters:
182 plots.append("%s/plots/combined_%s_%s.png" % (outdir, i, j))
183 if gw and extra_gw_plots:
184 for num in range(number):
185 plots.append("%s/plots/%s%s_skymap.png" % (outdir, label, num))
186 plots.append("%s/plots/%s%s_default_pepredicates.png" % (outdir, label, num))
187 plots.append("%s/plots/%s%s_default_pepredicates_bar.png" % (outdir, label, num))
188 plots.append("%s/plots/%s%s_population_pepredicates.png" % (outdir, label, num))
189 plots.append("%s/plots/%s%s_population_pepredicates_bar.png" % (outdir, label, num))
190 if number > 1 and comparison:
191 plots.append("%s/plots/combined_skymap.png" % (outdir))
193 return sorted(plots)
196def make_argparse(gw=True, extension="json", bilby=False, lalinference=False,
197 number=1, existing=False, disable_expert=True, outdir="./.outdir"):
198 """
199 """
200 default_args = []
201 if gw:
202 from pesummary.gw.cli.parser import ArgumentParser
203 default_args.append("--gw")
204 default_args.append("--nsamples_for_skymap")
205 default_args.append("10")
206 else:
207 from pesummary.core.cli.parser import ArgumentParser
208 parser = ArgumentParser()
209 parser.add_all_known_options_to_parser()
210 params, data = make_result_file(
211 extension=extension, gw=gw, bilby=bilby, lalinference=lalinference,
212 outdir=outdir
213 )
214 if not existing:
215 default_args.append("--webdir")
216 else:
217 default_args.append("--existing_webdir")
218 default_args.append(outdir)
219 default_args.append("--samples")
220 for i in range(number):
221 default_args.append("%s/test.%s" % (outdir, extension))
222 default_args.append("--labels")
223 if not existing:
224 for i in range(number):
225 if not gw:
226 default_args.append("core%s" % (i))
227 else:
228 default_args.append("gw%s" % (i))
229 else:
230 if not gw:
231 default_args.append("core1")
232 else:
233 default_args.append("gw1")
234 default_args.append("--config")
235 for i in range(number):
236 default_args.append(testing_dir + "/example_config.ini")
237 if disable_expert:
238 default_args.append("--disable_expert")
239 opts = parser.parse_args(default_args)
240 if gw:
241 func = WebpagePlusPlottingPlusMetaFileInput
242 else:
243 func = Input
244 return opts, func(opts)
247def read_result_file(outdir="./.outdir", extension="json", bilby=False,
248 lalinference=False, pesummary=False):
249 """
250 """
251 if bilby:
252 from bilby.core.result import read_in_result
254 if extension == "json":
255 f = read_in_result(outdir + "/test.json")
256 elif extension == "h5" or extension == "hdf5":
257 f = read_in_result(outdir + "/test.hdf5")
258 posterior = f.posterior
259 posterior = posterior.select_dtypes(include=[float, int])
260 samples = {key: val for key, val in posterior.items()}
261 elif lalinference:
262 import h5py
264 f = h5py.File(outdir + "/test.hdf5", "r")
266 posterior = f["lalinference"]["lalinference_nest"]["posterior_samples"]
267 samples = {
268 i: [j[num] for j in posterior] for num, i in enumerate(
269 posterior.dtype.names
270 )
271 }
272 elif pesummary:
273 from pesummary.gw.file.read import read
275 if extension == "json":
276 f = read(outdir + "/test.json")
277 else:
278 f = read(outdir + "/test.h5")
279 data = f.samples_dict
280 labels = f.labels
281 samples = data[labels[0]]
282 elif extension == "dat":
283 import numpy as np
285 data = np.genfromtxt(outdir + "/test.dat", names=True)
286 samples = {
287 i: [j[num] for j in data] for num, i in enumerate(data.dtype.names)
288 }
289 else:
290 samples = {}
291 return samples
294def make_psd(outdir="./.outdir"):
295 """Make a psd file
296 """
297 frequencies = np.linspace(0, 1024, 1000)
298 strains = np.random.uniform(10, 0.1, 1000)
299 data = np.vstack([frequencies, strains]).T
300 np.savetxt(
301 "{}/psd.dat".format(outdir), data, delimiter="\t",
302 header="\t".join(["frequencies", "strain"])
303 )
306def make_calibration(outdir="./.outdir"):
307 """Make a calibration file
308 """
309 frequencies = np.linspace(0, 1024, 1000)
310 columns = [np.random.uniform(10, 0.1, 1000) for _ in range(6)]
311 data = np.vstack([frequencies] + columns).T
312 np.savetxt(
313 "{}/calibration.dat".format(outdir), data, delimiter="\t",
314 header="\t".join(["frequencies", "a", "b", "c", "d", "e", "f"])
315 )
318def make_injection_file(
319 outdir="./.outdir", extension="json", return_filename=True,
320 return_injection_dict=True
321):
322 """
323 """
324 from pesummary.io import write
326 filename = os.path.join(outdir, "injection.{}".format(extension))
327 parameters = gw_parameters()
328 samples = np.array([[np.random.random()] for i in range(len(parameters))]).T
329 write(parameters, samples, filename=filename, file_format=extension)
330 args = []
331 if return_filename:
332 args.append(filename)
333 if return_injection_dict:
334 args.append({param: samples[0][num] for num, param in enumerate(parameters)})
335 return args
338def make_result_file(outdir="./.outdir/", extension="json", gw=True, bilby=False,
339 lalinference=False, pesummary=False, pesummary_label="label",
340 config=None, psd=None, calibration=None, random_seed=None,
341 n_samples=1000):
342 """Make a result file that can be read in by PESummary
344 Parameters
345 ----------
346 outdir: str
347 directory where you would like to store the result file
348 extension: str
349 the file extension of the result file
350 gw: Bool
351 if True, gw parameters will be used
352 """
353 if random_seed is not None:
354 np.random.seed(random_seed)
355 print(extension, gw, bilby, lalinference, pesummary)
356 if outdir[-1] != "/":
357 outdir += "/"
358 data = np.array([np.random.random(18) for i in range(n_samples)])
359 if gw:
360 parameters = ["mass_1", "mass_2", "a_1", "a_2", "tilt_1", "tilt_2",
361 "phi_jl", "phi_12", "psi", "theta_jn", "ra", "dec",
362 "luminosity_distance", "geocent_time", "redshift",
363 "mass_1_source", "mass_2_source", "log_likelihood"]
364 distance = np.random.random(n_samples) * 500
365 mass_1 = np.random.random(n_samples) * 100
366 q = np.random.random(n_samples) * 100
367 a_1 = np.random.uniform(0, 0.99, n_samples)
368 a_2 = np.random.uniform(0, 0.99, n_samples)
369 for num, i in enumerate(data):
370 data[num][12] = distance[num]
371 data[num][0] = mass_1[num]
372 data[num][1] = mass_1[num] * q[num]
373 data[num][2] = a_1[num]
374 data[num][3] = a_2[num]
375 else:
376 import string
378 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
379 if extension == "dat":
380 np.savetxt(outdir + "test.dat", data, delimiter=" ",
381 header=" ".join(parameters), comments="")
382 elif extension == "csv":
383 np.savetxt(outdir + "test.csv", data, delimiter=",",
384 header=",".join(parameters), comments="")
385 elif extension == "npy":
386 from pesummary.utils.samples_dict import SamplesDict
387 samples = SamplesDict(parameters, np.array(data).T).to_structured_array()
388 np.save(outdir + "test.npy", samples)
389 elif extension == "json" and not bilby and not pesummary and not lalinference:
390 import json
392 dictionary = {"NameOfCode": {"posterior_samples": {key:
393 [i[num] for i in data] for num, key in
394 enumerate(parameters)}}}
395 with open(outdir + "test.json", "w") as f:
396 json.dump(dictionary, f, indent=4, sort_keys=True)
397 elif (extension == "hdf5" or extension == "h5") and not bilby and not pesummary and not lalinference:
398 import h5py
400 h5py_data = np.array(
401 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters])
402 f = h5py.File(outdir + "test.h5", "w")
403 name = f.create_group("NameOfCode")
404 samples = f.create_dataset("posterior_samples", data=h5py_data)
405 f.close()
406 elif bilby and not pesummary and not lalinference:
407 import bilby
408 from bilby.core.result import Result
409 from bilby.core.prior import PriorDict
410 from pandas import DataFrame
412 priors = PriorDict()
413 priors.update({"%s" % (i): bilby.core.prior.Uniform(0.1, 0.5, 0) for i in parameters})
414 posterior_data_frame = DataFrame(data, columns=parameters)
415 injection_parameters = {par: 1. for par in parameters}
416 bilby_object = Result(
417 search_parameter_keys=parameters, samples=data,
418 posterior=posterior_data_frame, label="test",
419 injection_parameters=injection_parameters,
420 priors=priors,
421 log_bayes_factor=0.5, log_evidence_err=0.1, log_noise_evidence=0.1,
422 log_evidence=0.2, version=["bilby=0.5.3:"],
423 meta_data={"likelihood": {"time_marginalization": "True"}})
424 if extension == "json":
425 bilby_object.save_to_file(
426 filename=outdir + "test.json", extension="json")
427 elif extension == "hdf5" or extension == "h5":
428 bilby_object.save_to_file(
429 filename=outdir + "test.hdf5", extension="hdf5")
430 elif lalinference and not bilby and not pesummary:
431 import h5py
433 h5py_data = np.array(
434 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters])
435 f = h5py.File(outdir + "test.hdf5", "w")
436 lalinference = f.create_group("lalinference")
437 nest = lalinference.create_group("lalinference_nest")
438 samples = nest.create_dataset("posterior_samples", data=h5py_data)
439 f.close()
440 elif pesummary and not lalinference and not bilby:
441 dictionary = {
442 pesummary_label: {
443 "posterior_samples": {
444 "parameter_names": parameters,
445 "samples": [list(i) for i in data]
446 },
447 "injection_data": {
448 "injection_values": [float("nan") for i in range(len(parameters))]
449 },
450 "version": ["No version information found"],
451 "meta_data": {
452 "sampler": {"log_evidence": 0.5},
453 "meta_data": {}
454 }
455 },
456 "version": {
457 "pesummary": ["v0.1.7"]
458 }
459 }
460 if config is not None:
461 dictionary[pesummary_label]["config_file"] = config
462 if psd is not None:
463 dictionary[pesummary_label]["psds"] = psds
464 if calibration is not None:
465 dictionary[pesummary_label]["calibration_envelope"] = calibration
467 if extension == "json":
468 import json
470 with open(outdir + "test.json", "w") as f:
471 json.dump(dictionary, f, indent=4, sort_keys=True)
472 elif extension == "hdf5" or extension == "h5":
473 import h5py
474 from pesummary.core.file.meta_file import recursively_save_dictionary_to_hdf5_file
476 f = h5py.File(outdir + "test.h5", "w")
477 recursively_save_dictionary_to_hdf5_file(
478 f, dictionary, extra_keys=list(dictionary.keys())
479 )
480 f.close()
481 return parameters, data
484testing_dir = str(Path(__file__).parent)
485data_dir = str(Path(__file__).parent / "files")