Coverage for pesummary/tests/base.py: 87.1%
295 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-05-02 08:42 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-05-02 08:42 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import os
4import numpy as np
5from pathlib import Path
6from pesummary.gw.cli.inputs import WebpagePlusPlottingPlusMetaFileInput
7from pesummary.core.cli.inputs import (
8 WebpagePlusPlottingPlusMetaFileInput as Input
9)
11__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
14class Namespace(object):
15 def __init__(self, **kwargs):
16 self.__dict__.update(kwargs)
19def namespace(args):
20 """Generate a namespace for testing purposes
22 Parameters
23 ----------
24 args: dict
25 dictionary of arguments
26 """
27 base = Namespace()
28 for key in list(args.keys()):
29 setattr(base, key, args[key])
30 return base
33def gw_parameters():
34 parameters = [
35 'mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2', 'phi_jl',
36 'phi_12', 'psi', 'theta_jn', 'ra', 'dec', 'luminosity_distance',
37 'geocent_time', 'log_likelihood', 'mass_ratio', 'total_mass',
38 'chirp_mass', 'symmetric_mass_ratio', 'iota', 'spin_1x', 'spin_1y',
39 'spin_1z', 'spin_2x', 'spin_2y', 'spin_2z', 'chi_p', 'chi_eff',
40 'cos_tilt_1', 'cos_tilt_2', 'redshift', 'comoving_distance',
41 'mass_1_source', 'mass_2_source', 'total_mass_source',
42 'chirp_mass_source', 'phi_1', 'phi_2', 'cos_theta_jn', 'cos_iota',
43 'peak_luminosity_non_evolved', 'final_spin_non_evolved',
44 'final_mass_non_evolved', 'final_mass_source_non_evolved',
45 'radiated_energy_non_evolved', 'inverted_mass_ratio',
46 'viewing_angle', 'chi_p_2spin'
47 ]
48 return parameters
51def get_list_of_files(
52 gw=False, number=1, existing_plot=False, parameters=[], sections=[],
53 outdir=".outdir", extra_gw_pages=True
54):
55 """Return a list of files that should be generated from a typical workflow
56 """
57 from pesummary.conf import gw_2d_plots
58 if not gw and not len(parameters):
59 import string
60 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
61 elif not len(parameters):
62 parameters = gw_parameters()
63 if not gw:
64 label = "core"
65 else:
66 label = "gw"
67 html = [
68 "%s/html/error.html" % (outdir),
69 "%s/html/Version.html" % (outdir),
70 "%s/html/Logging.html" % (outdir),
71 "%s/html/About.html" % (outdir),
72 "%s/html/Downloads.html" % (outdir)]
73 if gw and not len(sections):
74 sections = [
75 "spins", "spin_angles", "timings", "source", "remnant", "others",
76 "masses", "location", "inclination", "energy"
77 ]
78 elif not len(sections):
79 sections = ["A-D", "E-F", "I-L", "M-P", "Q-T"]
80 for num in range(number):
81 html.append("%s/html/%s%s_%s%s.html" % (outdir, label, num, label, num))
82 if gw and extra_gw_pages:
83 html.append("%s/html/%s%s_%s%s_Classification.html" % (outdir, label, num, label, num))
84 html.append("%s/html/%s%s_%s%s_Corner.html" % (outdir, label, num, label, num))
85 html.append("%s/html/%s%s_%s%s_Config.html" % (outdir, label, num, label, num))
86 html.append("%s/html/%s%s_%s%s_Custom.html" % (outdir, label, num, label, num))
87 html.append("%s/html/%s%s_%s%s_All.html" % (outdir, label, num, label, num))
88 html.append("%s/html/%s%s_%s%s_Interactive_Corner.html" % (
89 outdir, label, num, label, num
90 ))
91 for section in sections:
92 html.append("%s/html/%s%s_%s%s_%s_all.html" % (outdir, label, num, label, num, section))
93 for j in parameters:
94 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, j))
95 if gw:
96 # 2d histogram pages
97 for section in ["location", "masses", "spins"]:
98 if section == "location":
99 pairs = [p for p in gw_2d_plots if "luminosity_distance" in p]
100 if not any(all(p in parameters for p in _) for _ in pairs):
101 continue
102 elif section == "masses":
103 pairs = [p for p in gw_2d_plots if "mass_1" in p]
104 if not any(all(p in parameters for p in _) for _ in pairs):
105 continue
106 elif section == "spins":
107 pairs = [p for p in gw_2d_plots if "chi_p" in p]
108 if not any(all(p in parameters for p in _) for _ in pairs):
109 continue
110 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, section))
111 if existing_plot:
112 html.append("%s/html/%s%s_%s%s_Additional.html" % (outdir, label, num, label, num))
114 if number > 1:
115 html.append("%s/html/Comparison.html" % (outdir))
116 html.append("%s/html/Comparison_Custom.html" % (outdir))
117 html.append("%s/html/Comparison_All.html" % (outdir))
118 html.append("%s/html/Comparison_Interactive_Ridgeline.html" % (outdir))
119 for j in parameters:
120 if j != "classification":
121 html.append("%s/html/Comparison_%s.html" % (outdir, j))
122 for section in sections:
123 html.append("%s/html/Comparison_%s_all.html" % (outdir, section))
124 return sorted(html)
127def get_list_of_plots(
128 gw=False, number=1, mcmc=False, label=None, outdir=".outdir",
129 comparison=True, psd=False, calibration=False, existing_plot=False,
130 expert=False, parameters=[], extra_gw_plots=True
131):
132 """Return a list of plots that should be generated from a typical workflow
133 """
134 from pesummary.conf import gw_2d_plots
135 if not gw and not len(parameters):
136 import string
138 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
139 elif not len(parameters):
140 parameters = gw_parameters()
141 if not gw and label is None:
142 label = "core"
143 elif label is None:
144 label = "gw"
146 plots = []
147 for num in range(number):
148 for i in ["sample_evolution", "autocorrelation", "1d_posterior", "cdf"]:
149 for j in parameters:
150 plots.append("%s/plots/%s%s_%s_%s.png" % (outdir, label, num, i, j))
151 if mcmc:
152 for j in parameters:
153 plots.append("%s/plots/%s%s_1d_posterior_%s_combined.png" % (outdir, label, num, j))
154 if gw:
155 for pair in gw_2d_plots:
156 if not all(p in parameters for p in pair):
157 continue
158 plots.append("%s/plots/%s%s_2d_posterior_%s_%s.png" % (
159 outdir, label, num, pair[0], pair[1]
160 ))
161 if psd:
162 plots.append("%s/plots/%s%s_psd_plot.png" % (outdir, label, num))
163 if calibration:
164 plots.append("%s/plots/%s%s_calibration_plot.png" % (outdir, label, num))
165 if existing_plot:
166 plots.append("%s/plots/test.png" % (outdir))
167 if expert:
168 for j in parameters:
169 plots.append("%s/plots/%s%s_2d_contour_%s_log_likelihood.png" % (outdir, label, num, j))
170 plots.append("%s/plots/%s%s_1d_posterior_%s_bootstrap.png" % (outdir, label, num, j))
171 plots.append("%s/plots/%s%s_sample_evolution_%s_log_likelihood_colored.png" % (outdir, label, num, j))
172 if number > 1 and comparison:
173 for i in ["1d_posterior", "boxplot", "cdf"]:
174 for j in parameters:
175 plots.append("%s/plots/combined_%s_%s.png" % (outdir, i, j))
176 if gw and extra_gw_plots:
177 for num in range(number):
178 plots.append("%s/plots/%s%s_skymap.png" % (outdir, label, num))
179 plots.append("%s/plots/%s%s_default_pepredicates.png" % (outdir, label, num))
180 plots.append("%s/plots/%s%s_default_pepredicates_bar.png" % (outdir, label, num))
181 plots.append("%s/plots/%s%s_population_pepredicates.png" % (outdir, label, num))
182 plots.append("%s/plots/%s%s_population_pepredicates_bar.png" % (outdir, label, num))
183 if number > 1 and comparison:
184 plots.append("%s/plots/combined_skymap.png" % (outdir))
186 return sorted(plots)
189def make_argparse(gw=True, extension="json", bilby=False, lalinference=False,
190 number=1, existing=False, disable_expert=True, outdir="./.outdir"):
191 """
192 """
193 default_args = []
194 if gw:
195 from pesummary.gw.cli.parser import ArgumentParser
196 default_args.append("--gw")
197 default_args.append("--nsamples_for_skymap")
198 default_args.append("10")
199 else:
200 from pesummary.core.cli.parser import ArgumentParser
201 parser = ArgumentParser()
202 parser.add_all_known_options_to_parser()
203 params, data = make_result_file(
204 extension=extension, gw=gw, bilby=bilby, lalinference=lalinference,
205 outdir=outdir
206 )
207 if not existing:
208 default_args.append("--webdir")
209 else:
210 default_args.append("--existing_webdir")
211 default_args.append(outdir)
212 default_args.append("--samples")
213 for i in range(number):
214 default_args.append("%s/test.%s" % (outdir, extension))
215 default_args.append("--labels")
216 if not existing:
217 for i in range(number):
218 if not gw:
219 default_args.append("core%s" % (i))
220 else:
221 default_args.append("gw%s" % (i))
222 else:
223 if not gw:
224 default_args.append("core1")
225 else:
226 default_args.append("gw1")
227 default_args.append("--config")
228 for i in range(number):
229 default_args.append(testing_dir + "/example_config.ini")
230 if disable_expert:
231 default_args.append("--disable_expert")
232 opts = parser.parse_args(default_args)
233 if gw:
234 func = WebpagePlusPlottingPlusMetaFileInput
235 else:
236 func = Input
237 return opts, func(opts)
240def read_result_file(outdir="./.outdir", extension="json", bilby=False,
241 lalinference=False, pesummary=False):
242 """
243 """
244 if bilby:
245 from bilby.core.result import read_in_result
247 if extension == "json":
248 f = read_in_result(outdir + "/test.json")
249 elif extension == "h5" or extension == "hdf5":
250 f = read_in_result(outdir + "/test.h5")
251 posterior = f.posterior
252 posterior = posterior.select_dtypes(include=[float, int])
253 samples = {key: val for key, val in posterior.items()}
254 elif lalinference:
255 import h5py
257 f = h5py.File(outdir + "/test.hdf5", "r")
259 posterior = f["lalinference"]["lalinference_nest"]["posterior_samples"]
260 samples = {
261 i: [j[num] for j in posterior] for num, i in enumerate(
262 posterior.dtype.names
263 )
264 }
265 elif pesummary:
266 from pesummary.gw.file.read import read
268 if extension == "json":
269 f = read(outdir + "/test.json")
270 else:
271 f = read(outdir + "/test.h5")
272 data = f.samples_dict
273 labels = f.labels
274 samples = data[labels[0]]
275 elif extension == "dat":
276 import numpy as np
278 data = np.genfromtxt(outdir + "/test.dat", names=True)
279 samples = {
280 i: [j[num] for j in data] for num, i in enumerate(data.dtype.names)
281 }
282 else:
283 samples = {}
284 return samples
287def make_psd(outdir="./.outdir"):
288 """Make a psd file
289 """
290 frequencies = np.linspace(0, 1024, 1000)
291 strains = np.random.uniform(10, 0.1, 1000)
292 data = np.vstack([frequencies, strains]).T
293 np.savetxt(
294 "{}/psd.dat".format(outdir), data, delimiter="\t",
295 header="\t".join(["frequencies", "strain"])
296 )
299def make_calibration(outdir="./.outdir"):
300 """Make a calibration file
301 """
302 frequencies = np.linspace(0, 1024, 1000)
303 columns = [np.random.uniform(10, 0.1, 1000) for _ in range(6)]
304 data = np.vstack([frequencies] + columns).T
305 np.savetxt(
306 "{}/calibration.dat".format(outdir), data, delimiter="\t",
307 header="\t".join(["frequencies", "a", "b", "c", "d", "e", "f"])
308 )
311def make_injection_file(
312 outdir="./.outdir", extension="json", return_filename=True,
313 return_injection_dict=True
314):
315 """
316 """
317 from pesummary.io import write
319 filename = os.path.join(outdir, "injection.{}".format(extension))
320 parameters = gw_parameters()
321 samples = np.array([[np.random.random()] for i in range(len(parameters))]).T
322 write(parameters, samples, filename=filename, file_format=extension)
323 args = []
324 if return_filename:
325 args.append(filename)
326 if return_injection_dict:
327 args.append({param: samples[0][num] for num, param in enumerate(parameters)})
328 return args
331def make_result_file(outdir="./.outdir/", extension="json", gw=True, bilby=False,
332 lalinference=False, pesummary=False, pesummary_label="label",
333 config=None, psd=None, calibration=None, random_seed=None,
334 n_samples=1000):
335 """Make a result file that can be read in by PESummary
337 Parameters
338 ----------
339 outdir: str
340 directory where you would like to store the result file
341 extension: str
342 the file extension of the result file
343 gw: Bool
344 if True, gw parameters will be used
345 """
346 if random_seed is not None:
347 np.random.seed(random_seed)
348 print(extension, gw, bilby, lalinference, pesummary)
349 if outdir[-1] != "/":
350 outdir += "/"
351 data = np.array([np.random.random(18) for i in range(n_samples)])
352 if gw:
353 parameters = ["mass_1", "mass_2", "a_1", "a_2", "tilt_1", "tilt_2",
354 "phi_jl", "phi_12", "psi", "theta_jn", "ra", "dec",
355 "luminosity_distance", "geocent_time", "redshift",
356 "mass_1_source", "mass_2_source", "log_likelihood"]
357 distance = np.random.random(n_samples) * 500
358 mass_1 = np.random.random(n_samples) * 100
359 q = np.random.random(n_samples) * 100
360 a_1 = np.random.uniform(0, 0.99, n_samples)
361 a_2 = np.random.uniform(0, 0.99, n_samples)
362 for num, i in enumerate(data):
363 data[num][12] = distance[num]
364 data[num][0] = mass_1[num]
365 data[num][1] = mass_1[num] * q[num]
366 data[num][2] = a_1[num]
367 data[num][3] = a_2[num]
368 else:
369 import string
371 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"]
372 if extension == "dat":
373 np.savetxt(outdir + "test.dat", data, delimiter=" ",
374 header=" ".join(parameters), comments="")
375 elif extension == "csv":
376 np.savetxt(outdir + "test.csv", data, delimiter=",",
377 header=",".join(parameters), comments="")
378 elif extension == "npy":
379 from pesummary.utils.samples_dict import SamplesDict
380 samples = SamplesDict(parameters, np.array(data).T).to_structured_array()
381 np.save(outdir + "test.npy", samples)
382 elif extension == "json" and not bilby and not pesummary and not lalinference:
383 import json
385 dictionary = {"NameOfCode": {"posterior_samples": {key:
386 [i[num] for i in data] for num, key in
387 enumerate(parameters)}}}
388 with open(outdir + "test.json", "w") as f:
389 json.dump(dictionary, f, indent=4, sort_keys=True)
390 elif (extension == "hdf5" or extension == "h5") and not bilby and not pesummary and not lalinference:
391 import h5py
393 h5py_data = np.array(
394 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters])
395 f = h5py.File(outdir + "test.h5", "w")
396 name = f.create_group("NameOfCode")
397 samples = f.create_dataset("posterior_samples", data=h5py_data)
398 f.close()
399 elif bilby and not pesummary and not lalinference:
400 import bilby
401 from bilby.core.result import Result
402 from bilby.core.prior import PriorDict
403 from pandas import DataFrame
405 priors = PriorDict()
406 priors.update({"%s" % (i): bilby.core.prior.Uniform(0.1, 0.5, 0) for i in parameters})
407 posterior_data_frame = DataFrame(data, columns=parameters)
408 injection_parameters = {par: 1. for par in parameters}
409 bilby_object = Result(
410 search_parameter_keys=parameters, samples=data,
411 posterior=posterior_data_frame, label="test",
412 injection_parameters=injection_parameters,
413 priors=priors,
414 log_bayes_factor=0.5, log_evidence_err=0.1, log_noise_evidence=0.1,
415 log_evidence=0.2, version=["bilby=0.5.3:"],
416 meta_data={"likelihood": {"time_marginalization": "True"}})
417 if extension == "json":
418 bilby_object.save_to_file(
419 filename=outdir + "test.json", extension="json")
420 elif extension == "hdf5" or extension == "h5":
421 bilby_object.save_to_file(
422 filename=outdir + "test.h5", extension="hdf5")
423 elif lalinference and not bilby and not pesummary:
424 import h5py
426 h5py_data = np.array(
427 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters])
428 f = h5py.File(outdir + "test.hdf5", "w")
429 lalinference = f.create_group("lalinference")
430 nest = lalinference.create_group("lalinference_nest")
431 samples = nest.create_dataset("posterior_samples", data=h5py_data)
432 f.close()
433 elif pesummary and not lalinference and not bilby:
434 dictionary = {
435 pesummary_label: {
436 "posterior_samples": {
437 "parameter_names": parameters,
438 "samples": [list(i) for i in data]
439 },
440 "injection_data": {
441 "injection_values": [float("nan") for i in range(len(parameters))]
442 },
443 "version": ["No version information found"],
444 "meta_data": {
445 "sampler": {"log_evidence": 0.5},
446 "meta_data": {}
447 }
448 },
449 "version": {
450 "pesummary": ["v0.1.7"]
451 }
452 }
453 if config is not None:
454 dictionary[pesummary_label]["config_file"] = config
455 if psd is not None:
456 dictionary[pesummary_label]["psds"] = psds
457 if calibration is not None:
458 dictionary[pesummary_label]["calibration_envelope"] = calibration
460 if extension == "json":
461 import json
463 with open(outdir + "test.json", "w") as f:
464 json.dump(dictionary, f, indent=4, sort_keys=True)
465 elif extension == "hdf5" or extension == "h5":
466 import h5py
467 from pesummary.core.file.meta_file import recursively_save_dictionary_to_hdf5_file
469 f = h5py.File(outdir + "test.h5", "w")
470 recursively_save_dictionary_to_hdf5_file(
471 f, dictionary, extra_keys=list(dictionary.keys())
472 )
473 f.close()
474 return parameters, data
477testing_dir = str(Path(__file__).parent)
478data_dir = str(Path(__file__).parent / "files")