Coverage for pesummary/cli/summarysplit.py: 84.3%
51 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
1#! /usr/bin/env python
3# Licensed under an MIT style license -- see LICENSE.md
5import os
6import numpy as np
7import multiprocessing
9from pesummary.core.cli.actions import CheckFilesExistAction
10from pesummary.core.cli.parser import ArgumentParser as _ArgumentParser
11from pesummary.utils.utils import iterator, logger, make_dir
12from pesummary.io import read, write, available_formats
14__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
15__doc__ = """This executable is used to split the posterior samples contained
16within a file into N separate files. If the input file has more than one
17analysis, the posterior samples for each analysis is split into N separate
18files. This is useful for submitting thousands of summarypages to condor"""
21class ArgumentParser(_ArgumentParser):
22 def _pesummary_options(self):
23 options = super(ArgumentParser, self)._pesummary_options()
24 options.update(
25 {
26 "--samples": {
27 "short": "-s",
28 "required": True,
29 "action": CheckFilesExistAction,
30 "help": (
31 "Path to the posterior samples file you wish to split"
32 ),
33 },
34 "--file_format": {
35 "type": str,
36 "default": "dat",
37 "help": "Format of each output file",
38 "choices": available_formats()[1],
39 },
40 "--outdir": {
41 "type": str,
42 "default": "./",
43 "help": "Directory to save each file"
44 },
45 "--N_files": {
46 "short": "-N",
47 "type": int,
48 "default": 0,
49 "help": (
50 "Number of files to split the posterior samples into. "
51 "Default 0 meaning N_files=n_samples where n_samples "
52 "is the number of posterior samples"
53 )
54 }
55 }
56 )
57 return options
60def _write_posterior_samples(
61 posterior_samples, split_idxs, file_format, outdir, filename
62):
63 """Split a set of posterior samples and write them to file
65 Parameters
66 ----------
67 posterior_samples: pesummary.utils.samples_dict.SamplesDict
68 set of posterior samples you wish to split and write to file
69 split_idxs: np.ndarray
70 2D array giving indices for each split, e.g. [[1,2,3], [4,5,6], [7,8]]
71 file_format: str
72 format to write the posterior samples
73 outdir: str
74 directory to store each file
75 filename: str
76 filename to use for each file
77 """
78 _parameters = posterior_samples.parameters
79 _samples = posterior_samples.samples.T[split_idxs[0]:split_idxs[-1] + 1]
80 write(
81 _parameters, _samples, file_format=file_format, outdir=outdir,
82 filename=filename
83 )
84 return
87def _wrapper_for_write_posterior_samples(args):
88 """Wrapper function for _write_posterior_samples for a pool of workers
90 Parameters
91 ----------
92 args: tuple
93 All args passed to _write_posterior_samples
94 """
95 return _write_posterior_samples(*args)
98def _split_posterior_samples(
99 posterior_samples, N_files, file_format="dat", outdir="./",
100 filename=None, multi_process=1
101):
102 """Split a set of posterior samples and write each split to file
104 Parameters
105 ----------
106 posterior_samples: pesummary.utils.samples_dict.SamplesDict
107 set of posterior samples you wish to split and write to file
108 N_files: int
109 number of times to split the posterior samples
110 file_format: str, optional
111 file format to write split posterior samples. Default 'dat'
112 outdir: str, optional
113 directory to write split posterior samples. Default './'
114 filename: str, optional
115 filename to use when writing split posterior samples. Should be of
116 the form 'filename_{}.file_format'; '{}' will be replaced by the
117 split num. Default 'None' which leads to
118 'split_posterior_samples_{}.dat'
119 multi_process: int, optional
120 number of cpus to use when writing the split posterior samples.
121 Default 1
122 """
123 n_samples = posterior_samples.number_of_samples
124 if N_files > n_samples:
125 logger.warning(
126 "Number of requested files '{}' greater than number of samples "
127 "'{}'. Reducing the number of files to '{}'".format(
128 N_files, n_samples, n_samples
129 )
130 )
131 N_files = n_samples
132 elif not N_files:
133 N_files = n_samples
134 if filename is None:
135 filename = "split_posterior_samples_{}.%s" % (file_format)
136 make_dir(outdir)
138 logger.info(
139 "Splitting posterior samples into {} files".format(N_files)
140 )
141 idxs = np.arange(n_samples)
142 split_idxs = np.array_split(idxs, N_files)
143 filenames = [
144 filename.format(num) for num in np.arange(len(split_idxs))
145 ]
146 args = np.array(
147 [
148 [posterior_samples] * len(split_idxs), split_idxs,
149 [file_format] * len(split_idxs), [outdir] * len(split_idxs),
150 filenames
151 ], dtype=object
152 ).T
153 with multiprocessing.Pool(multi_process) as pool:
154 _ = np.array(
155 list(
156 iterator(
157 pool.imap(_wrapper_for_write_posterior_samples, args),
158 tqdm=True, desc="Saving posterior samples to file",
159 logger=logger, total=len(split_idxs)
160 )
161 )
162 )
165def main(args=None):
166 """Top level interface for `summarysplit`
167 """
168 parser = ArgumentParser(description=__doc__)
169 parser.add_known_options_to_parser(
170 [
171 "--samples", "--file_format", "--outdir", "--multi_process",
172 "--N_files"
173 ]
174 )
175 opts, unknown = parser.parse_known_args(args=args)
176 logger.info("Loading file: '{}'".format(opts.samples))
177 f = read(
178 opts.samples, disable_prior=True, disable_injection_conversion=True
179 )
180 posterior_samples = f.samples_dict
181 if hasattr(f, "labels") and f.labels is not None and len(f.labels) > 1:
182 for label in f.labels:
183 _split_posterior_samples(
184 posterior_samples[label], opts.N_files,
185 outdir=os.path.join(opts.outdir, label),
186 file_format=opts.file_format, multi_process=opts.multi_process
187 )
188 else:
189 _split_posterior_samples(
190 posterior_samples, opts.N_files, outdir=opts.outdir,
191 file_format=opts.file_format, multi_process=opts.multi_process
192 )
195if __name__ == "__main__":
196 main()