Coverage for pesummary/gw/file/calibration.py: 62.0%
79 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import numpy as np
4from scipy.interpolate import interp1d
5from pesummary import conf
6from pesummary.utils.utils import check_file_exists_and_rename, logger
7from pesummary.utils.dict import Dict
9__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
12def _apply_calibration_correction(array=None, amplitude=None, phase=None, type="data"):
13 """Apply the calibration correction based on the calibration type. If
14 type='data', the amplitude is inverted and phase multiplied by -1. If
15 type='template', the amplitude and phase is left unchanged.
17 Parameters
18 ----------
19 array: np.ndarray, optional
20 an opened calibration envelope file (with e.g. np.genfromtxt). Columns must
21 be "Frequency", "Median Mag", "Phase (Rad)", "-1 Sigma Mag", "-1 Sigma Phase",
22 "+1 Sigma Mag", "+1 Sigma Phase"
23 amplitude: np.ndarray, optional
24 array of only calibration amplitudes
25 phase: np.ndarray, optional
26 array of only calibration phases
27 type: str, optional
28 type of calibration data. Must be either 'data' or 'template'
29 """
30 if type not in ["data", "template"]:
31 raise ValueError(f"Unknown calibration type: {type}")
32 if all(_ is None for _ in [array, amplitude, phase]):
33 raise ValueError(
34 "Please provide either an opened calibration file, or amplitude and phase "
35 "corrections"
36 )
37 elif array is None and any(_ is None for _ in [amplitude, phase]):
38 raise ValueError(
39 "Please provide data for both the amplitude and phase corrections"
40 )
41 elif array is not None and any(_ is not None for _ in [amplitude, phase]):
42 logger.warning(
43 "An opened calibration file and amplitude/phase corrections are provided. "
44 "Using opened calibration file"
45 )
46 if array is not None:
47 amplitude = np.array([array.T[1], array.T[3], array.T[5]])
48 phase = np.array([array.T[2], array.T[4], array.T[6]])
49 if type == "data":
50 if amplitude is not None:
51 amplitude = 1. / amplitude
52 if phase is not None:
53 phase *= -1
54 return amplitude, phase
57def _spline_angle_xform(delta_psi):
58 """Returns the angle in degrees corresponding to the spline
59 calibration parameters delta_psi. Code taken from lalinference.bayespputils
61 Parameters
62 ----------
63 delta_psi: array
64 calibration phase uncertainty
65 """
66 rotation = (2.0 + 1.0j * delta_psi) / (2.0 - 1.0j * delta_psi)
67 return 180.0 / np.pi * np.arctan2(np.imag(rotation), np.real(rotation))
70def _interpolate_spline_model(
71 frequencies, data, interpolated_frequencies, nfreqs=100, xform=None,
72 level=0.9, pbar=None
73):
74 """Interpolate calibration posterior estimates for a spline model in log
75 space. Code based upon same function in lalinference.bayespputils
77 Parameters
78 ----------
79 frequencies: array
80 The spline control points
81 data: ndarray
82 Array of posterior samples at each spline control point
83 interpolated_frequencies: array
84 Array of frequencies you wish to evaluate the interpolant for
85 nfreqs: int, optional
86 Number of points to evaluate the interpolates spline. Default 100
87 xform: func, optional
88 Function to transform the spline
89 """
90 interpolated_data = np.zeros((np.asarray(data).shape[0], nfreqs))
91 for num, samp in enumerate(data):
92 interp = interp1d(
93 frequencies, samp, kind="cubic", fill_value=0., bounds_error=False
94 )(interpolated_frequencies)
95 if xform is not None:
96 interp = xform(interp)
97 interpolated_data[num] = interp
98 if pbar is not None:
99 pbar.update(1)
101 mean = np.mean(interpolated_data, axis=0)
102 lower = np.quantile(interpolated_data, (1 - level) / 2., axis=0)
103 upper = np.quantile(interpolated_data, (1 + level) / 2., axis=0)
104 return mean, lower, upper
107def interpolate_calibration_posterior_from_samples(
108 log_frequencies, amplitudes, phases, nfreqs=100, level=0.9, **kwargs
109):
110 """Interpolate calibration posterior estimates for a spline model in log
111 space and return the amplitude and phase uncertainties. Code based upon same
112 function in lalinference.bayespputils
114 Parameters
115 ----------
116 log_frequencies: array
117 The spline control points.
118 amplitudes: ndarray
119 Array of amplitude posterior samples at each of the spline control
120 points
121 phases: ndarray
122 Array of phase posterior samples at each of the spline control points
123 nfreqs: int, optional
124 Number of points to evaluate the interpolates spline. Default 100
125 **kwargs: dict
126 All kwargs passed to _interpolate_spline_model
127 """
128 frequencies = np.exp(log_frequencies)
129 interpolated_frequencies = np.logspace(
130 np.min(log_frequencies), np.max(log_frequencies), nfreqs, base=np.e
131 )
132 amp_mean, amp_lower, amp_upper = (
133 1 + np.array(
134 _interpolate_spline_model(
135 log_frequencies, np.column_stack(amplitudes),
136 np.log(interpolated_frequencies), nfreqs=nfreqs, xform=None,
137 level=level, **kwargs
138 )
139 )
140 )
141 phase_mean, phase_lower, phase_upper = np.array(
142 _interpolate_spline_model(
143 log_frequencies, np.column_stack(phases),
144 np.log(interpolated_frequencies), nfreqs=nfreqs,
145 xform=_spline_angle_xform, level=level, **kwargs
146 )
147 ) * (np.pi / 180)
148 return np.column_stack(
149 [
150 interpolated_frequencies, amp_mean, phase_mean, amp_lower,
151 phase_lower, amp_upper, phase_upper
152 ]
153 )
156class CalibrationDict(Dict):
157 """Class to handle a dictionary of calibration data
159 Parameters
160 ----------
161 detectors: list
162 list of detectors
163 data: nd list
164 list of calibration samples for each detector. Each of the columns
165 should represent Frequency, Median Mag, Phase (Rad), -1 Sigma Mag,
166 -1 Sigma Phase, +1 Sigma Mag, +1 Sigma Phase
168 Attributes
169 ----------
170 detectors: list
171 list of detectors stored in the dictionary
173 Methods
174 -------
175 plot:
176 Generate a plot based on the calibration samples stored
177 """
178 def __init__(self, *args):
179 _columns = [
180 "frequencies", "magnitude", "phase", "magnitude_lower",
181 "phase_lower", "magnitude_upper", "phase_upper"
182 ]
183 super(CalibrationDict, self).__init__(
184 *args, value_class=Calibration, value_columns=_columns
185 )
187 @property
188 def detectors(self):
189 return list(self.keys())
192class Calibration(np.ndarray):
193 """Class to handle Calibration data
194 """
195 def __new__(cls, input_array):
196 obj = np.asarray(input_array).view(cls)
197 if obj.shape[1] != 7:
198 raise ValueError(
199 "Invalid input data. See the docs for instructions"
200 )
201 return obj
203 @classmethod
204 def read(cls, path_to_file, IFO=None, type="data", **kwargs):
205 """Read in a file and initialize the Calibration class
207 Parameters
208 ----------
209 path_to_file: str
210 the path to the file you wish to load
211 IFO: str, optional
212 name of the IFO which relates to the input file
213 type: str, optional
214 the calibration definition. This can either be 'data' or 'template'
215 and determines whether the calibration is applied to the data
216 or the template. Default: 'data'
217 **kwargs: dict
218 all kwargs are passed to the np.genfromtxt method
219 """
220 try:
221 f = np.genfromtxt(path_to_file, **kwargs)
222 amplitudes, phases = _apply_calibration_correction(array=f, type=type)
223 data = np.array([
224 f.T[0], amplitudes[0], phases[0], amplitudes[1], phases[1],
225 amplitudes[2], phases[2]
226 ]).T
227 return cls(data)
228 except Exception:
229 raise
231 @classmethod
232 def from_spline_posterior_samples(
233 cls, log_frequencies, amplitudes, phases, **kwargs
234 ):
235 """Interpolate calibration posterior estimates for a spline model in log
236 space and initialize the Calibration class
238 Parameters
239 ----------
240 """
241 samples = interpolate_calibration_posterior_from_samples(
242 log_frequencies, amplitudes, phases, level=0.68, nfreqs=300,
243 **kwargs
244 )
245 return cls(samples)
247 def save_to_file(self, file_name, comments="#", delimiter=conf.delimiter):
248 """Save the calibration data to file
250 Parameters
251 ----------
252 file_name: str
253 name of the file name that you wish to use
254 comments: str, optional
255 String that will be prepended to the header and footer strings, to
256 mark them as comments. Default is '#'.
257 delimiter: str, optional
258 String or character separating columns.
259 """
260 check_file_exists_and_rename(file_name)
261 header = [
262 "Frequency", "Median Mag", "Phase (Rad)", "-1 Sigma Mag",
263 "-1 Sigma Phase", "+1 Sigma Mag", "+1 Sigma Phase"
264 ]
265 np.savetxt(
266 file_name, self, delimiter=delimiter, comments=comments,
267 header=delimiter.join(header)
268 )
270 def __array_finalize__(self, obj):
271 if obj is None:
272 return