# Licensed under an MIT style license -- see LICENSE.md
import numpy as np
from pesummary.utils.decorators import array_input
from pesummary.utils.utils import logger
from pesummary.gw.pycbc import optimal_snr, compute_the_overlap
from pesummary.gw.conversions.angles import _dphi, _dpsi
__author__ = [
"Stephen Fairhurst <stephen.fairhurst@ligo.org>",
"Rhys Green <rhys.green@ligo.org>",
"Charlie Hoy <charlie.hoy@ligo.org>",
"Cameron Mills <joseph.mills@ligo.org>"
]
@array_input()
def _ifo_snr(IFO_abs_snr, IFO_snr_angle):
"""Return the matched filter SNR for a given IFO given samples for the
absolute SNR and the angle
"""
return IFO_abs_snr * np.cos(IFO_snr_angle)
@array_input()
def _ifo_snr_from_real_and_imaginary(IFO_real_snr, IFO_imag_snr):
"""Return the matched filter SNR for a given IFO given samples for the
real and imaginary SNR
"""
_complex = IFO_real_snr + IFO_imag_snr * 1j
_abs = np.abs(_complex)
return _ifo_snr(_abs, np.angle(_complex))
[docs]
@array_input()
def network_snr(snrs):
"""Return the network SNR for N IFOs
Parameters
----------
snrs: list
list of numpy.array objects containing the snrs samples for a particular
IFO
"""
squares = np.square(snrs)
network_snr = np.sqrt(np.sum(squares, axis=0))
return network_snr
[docs]
@array_input()
def network_matched_filter_snr(IFO_matched_filter_snrs, IFO_optimal_snrs):
"""Return the network matched filter SNR for a given detector network. Code
adapted from Christopher Berry's python notebook
Parameters
----------
IFO_matched_filter_snrs: list
list of matched filter SNRs for each IFO in the network
IFO_optimal_snrs: list
list of optimal SNRs
"""
for num, det_snr in enumerate(IFO_matched_filter_snrs):
complex_snr = np.iscomplex(det_snr)
convert_mf_snr = False
try:
if complex_snr:
convert_mf_snr = True
except ValueError:
if any(_complex for _complex in complex_snr):
convert_mf_snr = True
if convert_mf_snr:
IFO_matched_filter_snrs[num] = np.real(det_snr)
network_optimal_snr = network_snr(IFO_optimal_snrs)
network_matched_filter_snr = np.sum(
[
mf_snr * opt_snr / network_optimal_snr for mf_snr, opt_snr in zip(
IFO_matched_filter_snrs, IFO_optimal_snrs
)
], axis=0
)
return network_matched_filter_snr
def _setup_frequencies(f_low, f_final, f_ref, psd):
"""Setup the final frequency and reference frequency to be compatible with
the provided PSD.
Parameters
----------
f_low: float
starting frequency you wish to use
f_final: float
final frequency you wish to use
f_ref: float
reference frequency you wish to use
psd: dict
dictionary of pycbc.frequencyseries.FrequencySeries objects containing
the psd. Keys must be the detector identifier, e.g. H1, L1, ...
"""
detectors = list(psd.keys())
_f_final = psd[detectors[0]].sample_frequencies[-1]
if f_final is None:
f_final = _f_final
elif f_final > _f_final:
logger.warning(
"The provided final frequency: {} does not match the final "
"frequency in the PSD: {}. Using the final frequency stored in the "
"PSD to prevent interpolation errors".format(f_final, _f_final)
)
f_final = _f_final
if f_ref is None:
logger.warning("No reference frequency provided. Using f_low as default")
f_ref = f_low
elif isinstance(f_ref, (list, np.ndarray)):
f_ref = f_ref[0]
return f_final, f_ref
def _setup_psd(psd, psd_default, df=1. / 256, **psd_default_kwargs):
"""Setup the PSD dictionary. If the provided PSD is empty, construct a
PSD based on the default, this could either be analytic or not. If the
provided PSD is not empty, simply return the provided PSD unchanged.
Parameters
----------
psd: dict
dictionary containing the psd. Keys are the IFO and items are a
pycbc.frequencyseries.FrequencySeries object
psd_default: str/dict
The default PSD to use. This can either be a string describing the
analytic PSD or a dictionary with keyes showing the IFO and items
either a pesummary.gw.file.psd.PSD or a
pycbc.frequencyseries.FrequencySeries object
psd_default_kwargs: dict, optional
kwargs to pass to pesummary.gw.file.psd.pycbc_default_psd when
a PSD is constructured based on the analytic default
"""
from pesummary.gw.file.psd import PSD
ANALYTIC = False
condition = isinstance(psd_default, dict) and len(psd_default) and (
all(isinstance(value, PSD) for ifo, value in psd_default.items())
)
f_low = psd_default_kwargs.get("f_low", None)
f_final = psd_default_kwargs.get("f_final", None)
psd_default_kwargs.update({"df": df})
frequency_cond = any(param is None for param in [f_low, f_final])
if psd == {} and condition:
if frequency_cond:
raise ValueError(
"Please provide f_low and f_final as keyword arguments"
)
for ifo, data in psd_default.items():
psd[ifo] = data.to_pycbc(
f_low, f_high=f_final, f_high_override=True
)
elif psd == {} and isinstance(psd_default, dict) and len(psd_default):
for ifo, data in psd_default.items():
psd[ifo] = data
elif psd == {}:
from pesummary.gw.pycbc import pycbc_default_psd
if isinstance(psd_default, dict):
from pesummary import conf
psd_default = conf.psd
ANALYTIC = True
psd_default_kwargs.update({"psd": psd_default})
psd = pycbc_default_psd(**psd_default_kwargs)
detectors = list(psd.keys())
if df != psd[detectors[0]].delta_f:
from pesummary.gw.file.psd import PSDDict
logger.warning(
"Provided PSD has df={} and {} has been specified. Interpolation "
"will be used".format(psd[detectors[0]].delta_f, df)
)
if isinstance(psd, PSDDict):
psd = psd.interpolate(f_low, df)
else:
from pesummary.gw.pycbc import interpolate_psd
psd = {
ifo: interpolate_psd(psd[ifo], f_low, df) for ifo in
psd.keys()
}
return psd, ANALYTIC
def _make_waveform(
approx, theta_jn, phi_jl, phase, psi_J, mass_1, mass_2, tilt_1, tilt_2,
phi_12, a_1, a_2, beta, distance, apply_detector_response=True, **kwargs
):
"""Generate a frequency domain waveform
Parameters
----------
approx: str
Name of the approximant you wish to use when generating the waveform
theta_jn: float
Angle between the total angular momentum and the line of sight
phi_jl: float
Azimuthal angle of the total orbital angular momentum around the
total angular momentum
phase: float
The phase of the binary at coaelescence
mass_1: float
Primary mass of the binary
mass_2: float
Secondary mass of the binary
tilt_1: float
The angle between the total orbital angular momentum and the primary
spin
tilt_2: float
The angle between the total orbital angular momentum and the primary
spin
phi_12: float
The angle between the primary spin and the secondary spin
a_1: float
The spin magnitude on the larger object
a_2: float
The spin magnitude on the secondary object
beta: float
The opening angle of the system. Defined as the angle between the
orbital angular momentum, L, and the total angular momentum J.
apply_detector_response: Bool, optional
if True apply an effective detector response and return
fp * hp + fc * hc else return hp, hc. Default True
**kwargs: dict
All additional kwargs are passed to the
pesummary.gw.waveform.fd_waveform function
"""
from pesummary.gw.waveform import fd_waveform
_samples = {
"theta_jn": [theta_jn], "phi_jl": [phi_jl], "phase": [phase],
"mass_1": [mass_1], "mass_2": [mass_2], "tilt_1": [tilt_1],
"tilt_2": [tilt_2], "phi_12": [phi_12], "a_1": [a_1],
"a_2": [a_2], "luminosity_distance": [distance]
}
waveforms = fd_waveform(
_samples, approx, kwargs.get("df", 1. / 256),
kwargs.get("f_low", 20.), kwargs.get("f_final", 1024.),
f_ref=kwargs.get("f_ref", 20.), ind=0, pycbc=True,
mode_array=kwargs.get("mode_array", None)
)
hp, hc = waveforms["h_plus"], waveforms["h_cross"]
if kwargs.get("flen", None) is not None:
flen = kwargs.get("flen")
hp.resize(flen)
hc.resize(flen)
if not apply_detector_response:
return hp, hc
dpsi = _dpsi(theta_jn, phi_jl, beta)
fp = np.cos(2 * (psi_J - dpsi))
fc = -1. * np.sin(2 * (psi_J - dpsi))
h = (fp * hp + fc * hc)
h *= np.exp(2j * _dphi(theta_jn, phi_jl, beta))
return h
def _calculate_precessing_harmonics(
mass_1, mass_2, a_1, a_2, tilt_1, tilt_2, phi_12, beta, distance,
harmonics=[0, 1], approx="IMRPhenomPv2", **kwargs
):
"""Decompose a precessing waveform into a series of harmonics as defined
in Fairhurst et al. arXiv:1908.05707
Parameters
----------
mass_1: np.ndarray
Primary mass of the bianry
mass_2: np.ndarray
Secondary mass of the binary
a_1: np.ndarray
The spin magnitude on the larger object
a_2: np.ndarray
The spin magnitude on the secondary object
tilt_1: np.ndarray
The angle between the total orbital angular momentum and the primary
spin
tilt_2: np.ndarray
The angle between the total orbital angular momentum and the secondary
spin
phi_12: np.ndarray
The angle between the primary spin and the secondary spin
beta: np.ndarray
The angle between the total angular momentum and the total orbital
angular momentum
harmonics: list, optional
List of harmonics which you wish to calculate. Default [0, 1]
approximant: str, optional
Approximant to use for the decomposition. Default IMRPhenomPv2
"""
harm = {}
if (0 in harmonics) or (4 in harmonics):
h0 = _make_waveform(
approx, 0., 0., 0., 0., mass_1, mass_2, tilt_1, tilt_2,
phi_12, a_1, a_2, beta, distance, **kwargs
)
hpi4 = _make_waveform(
approx, 0., 0., np.pi / 4, np.pi / 4, mass_1, mass_2,
tilt_1, tilt_2, phi_12, a_1, a_2, beta, distance, **kwargs
)
if (0 in harmonics):
harm[0] = (h0 - hpi4) / 2
if (4 in harmonics):
harm[4] = (h0 + hpi4) / 2
if (1 in harmonics) or (3 in harmonics):
h0 = _make_waveform(
approx, np.pi / 2, 0., np.pi / 4, np.pi / 4, mass_1, mass_2,
tilt_1, tilt_2, phi_12, a_1, a_2, beta, distance,
**kwargs
)
hpi2 = _make_waveform(
approx, np.pi / 2, np.pi / 2, 0., np.pi / 4, mass_1, mass_2,
tilt_1, tilt_2, phi_12, a_1, a_2, beta, distance,
**kwargs
)
if (1 in harmonics):
harm[1] = -1. * (h0 + hpi2) / 4
if (3 in harmonics):
harm[3] = -1. * (h0 - hpi2) / 4
if (2 in harmonics):
h0 = _make_waveform(
approx, np.pi / 2, 0., 0., 0., mass_1, mass_2,
tilt_1, tilt_2, phi_12, a_1, a_2, beta,
distance, **kwargs
)
hpi2 = _make_waveform(
approx, np.pi / 2, np.pi / 2, 0., 0., mass_1, mass_2,
tilt_1, tilt_2, phi_12, a_1, a_2, beta, distance,
**kwargs
)
harm[2] = (h0 + hpi2) / 6
return harm
def _make_waveform_from_precessing_harmonics(
harmonic_dict, theta_jn, phi_jl, phase, f_plus_j, f_cross_j
):
"""Generate waveform for a binary merger with given precessing harmonics and
orientation
Parameters
----------
harmonic_dict: dict
harmonics to include
theta_jn: np.ndarray
the angle between total angular momentum and line of sight
phi_jl: np.ndarray
the initial polarization phase
phase: np.ndarray
the initial orbital phase
psi_J: np.ndarray
the polarization angle in the J-aligned frame
f_plus_j: np.ndarray
The Detector plus response function as defined using the J-aligned frame
f_cross_j: np.ndarray
The Detector cross response function as defined using the J-aligned
frame
"""
A = _harmonic_amplitudes(
theta_jn, phi_jl, f_plus_j, f_cross_j, harmonic_dict
)
h_app = 0
for k, harm in harmonic_dict.items():
if h_app:
h_app += A[k] * harm
else:
h_app = A[k] * harm
h_app *= np.exp(2j * phase + 2j * phi_jl)
return h_app
def _harmonic_amplitudes(
theta_jn, phi_jl, f_plus_j, f_cross_j, harmonics=[0, 1]
):
"""Calculate the amplitudes of the precessing harmonics as a function of
orientation
Parameters
----------
theta_jn: np.ndarray
the angle between J and line of sight
phi_jl: np.ndarray
the precession phase
f_plus_j: np.ndarray
The Detector plus response function as defined using the J-aligned frame
f_cross_j: np.ndarray
The Detector cross response function as defined using the J-aligned
frame
harmonics: list, optional
The list of harmonics you wish to return. Default is [0, 1]
"""
amp = {}
if 0 in harmonics:
amp[0] = (
(1 + np.cos(theta_jn)**2) / 2 * f_plus_j
- 1j * np.cos(theta_jn) * f_cross_j
)
if 1 in harmonics:
amp[1] = 2 * np.exp(-1j * phi_jl) * (
np.sin(theta_jn) * np.cos(theta_jn) * f_plus_j
- 1j * np.sin(theta_jn) * f_cross_j
)
if 2 in harmonics:
amp[2] = 3 * np.exp(-2j * phi_jl) * (np.sin(theta_jn)**2) * f_plus_j
if 3 in harmonics:
amp[3] = 2 * np.exp(-3j * phi_jl) * (
-np.sin(theta_jn) * np.cos(theta_jn) * f_plus_j
- 1j * np.sin(theta_jn) * f_cross_j
)
if 4 in harmonics:
amp[4] = np.exp(-4j * phi_jl) * (
(1 + np.cos(theta_jn)**2) / 2 * f_plus_j
+ 1j * np.cos(theta_jn) * f_cross_j
)
return amp
def _prec_ratio(theta_jn, phi_jl, psi_J, b_bar, ra, dec, time, detector):
"""Calculate the ratio between the leading and first precession terms: Zeta
as defined in Fairhurst et al. arXiv:1908.05707
Parameters
----------
theta_jn: float
The angle between the total angular momentum and the line of sight
phi_jl: np.ndarray
the precession phase
psi_J: float
The polarization of the binary defined with respect to the total
angular momentum
b_bar: float
Tangent of the average angle between the total angular momentum and the
total orbital angular momentum during inspiral
ra: float
The right ascension of the binary
dec: float
The declinartion of the binary
time: float
The merger time of the binary
detector: str
The name of the detector you wish to calculate the ratio for
"""
from pesummary.gw.waveform import antenna_response
samples = {"ra": [ra], "dec": [dec], "psi": [psi_J], "geocent_time": [time]}
f_plus, f_cross = antenna_response(samples, detector)
ratio = _prec_ratio_plus_cross(theta_jn, phi_jl, f_plus, f_cross, b_bar)
return ratio
def _prec_ratio_plus_cross(theta_jn, phi_jl, f_plus, f_cross, b_bar):
"""Calculate the ratio between the leading and first precession harmonics
given an antenna response pattern. Zeta as defined in Fairhurst et al.
arXiv:1908.05707
Parameters
----------
theta_jn: float/np.ndarray
The angle between the total angular momentum and the line of sight
phi_jl: np.ndarray
the precession phase
f_plus: float/np.ndarray
The plus polarization factor for a given sky location / orientation
f_cross: float/np.ndarray
The cross polarization factor for a given sky location / orientation
b_bar: float/np.ndarray
Tangent of the average angle between the total angular momentum and the
total orbital angular momentum during inspiral
"""
amplitudes = _harmonic_amplitudes(
theta_jn, phi_jl, f_plus, f_cross, harmonics=[0, 1]
)
A0 = amplitudes[0]
A1 = amplitudes[1]
return b_bar * A1 / A0
[docs]
@array_input(
ignore_kwargs=[
"f_low", "f_final", "psd", "approx", "f_ref", "return_data_used",
"multi_process", "df", "multipole"
]
)
def multipole_snr(
mass_1, mass_2, spin_1z, spin_2z, psi, iota, ra, dec, time, distance, phase,
f_low=20., f_final=1024., psd={}, approx="IMRPhenomXHM", f_ref=None,
return_data_used=False, multi_process=1, df=1. / 256,
multipole=[21, 33, 44], psd_default="aLIGOZeroDetHighPower"
):
"""Calculate the multipole snr as defined in Mills et al.
arXiv:
Parameters
----------
mass_1: float/np.ndarray
Primary mass of the bianry
mass_2: float/np.ndarray
Secondary mass of the binary
spin_1z: float/np.ndarray
The primary spin aligned with the total orbital angular momentum
spin_2z: float/np.ndarray
The secondary spin aligned with the total orbital angular momentum
psi: float/np.ndarray
The polarization angle of the binary
iota: float/np.ndarray
The angle between the total orbital angular momentum and the line of
sight
ra: float/np.ndarray
The right ascension of the source
dec: float/np.ndarray
The declination of the source
time: float/np.ndarray
The merger time of the binary
distance: float/np.ndarray
The luminosity distance of the source
phase: float/np.ndarray
The phase of the source
f_low: float, optional
Low frequency to use for integration. Default 20Hz
f_final: float, optional
Final frequency to use for integration. Default 1024Hz
psd: dict, optional
Dictionary of pycbc.types.frequencyseries.FrequencySeries objects, one
for each detector. Default is to use the aLIGOZeroDetHighPower PSD
approx: str, optional
The aligned spin higher order multipole approximant you wish to use.
Default IMRPhenomXHM
f_ref: float, optional
Reference frequency where the spins are defined. Default is f_low
return_data_used: Bool, optional
if True, return a dictionary containing information about what data was
used. Default False
multi_process: int, optional
The number of cpus to use when computing the precessing_snr. Default 1
df: float, optional
Frequency spacing between frequency samples. Default 1./256
multipole: list, optional
List of multipoles to calculate the SNR for. Default [21, 33, 44]
"""
from pesummary.gw.waveform import antenna_response
from pesummary.utils.utils import iterator
import multiprocessing
if isinstance(f_low, (list, np.ndarray)):
f_low = f_low[0]
if any(mm not in [21, 33, 44] for mm in multipole):
raise ValueError(
"Currently, only the SNR in the (l, m) = [(2, 1), (3, 3), (4, 4)] "
"multipoles can be calculated. Please provide any multipole within "
"multipole=[21, 33, 44]"
)
multipole += [22]
psd, ANALYTIC = _setup_psd(
psd, psd_default, mass_1=mass_1, mass_2=mass_2, spin_1z=spin_1z,
spin_2z=spin_2z, f_low=f_low, detectors=["H1", "L1"], f_final=f_final,
df=df
)
f_final, f_ref = _setup_frequencies(f_low, f_final, f_ref, psd)
flen = int(f_final / df) + 1
_samples = {"ra": ra, "dec": dec, "psi": psi, "geocent_time": time}
detectors = list(psd.keys())
antenna = {
detector: antenna_response(_samples, detector) for detector in detectors
}
_f_plus = {key: value[0] for key, value in antenna.items()}
_f_cross = {key: value[1] for key, value in antenna.items()}
with multiprocessing.Pool(multi_process) as pool:
f_plus = np.array(
[dict(zip(_f_plus, item)) for item in zip(*_f_plus.values())]
)
f_cross = np.array(
[dict(zip(_f_cross, item)) for item in zip(*_f_cross.values())]
)
args = np.array([
mass_1, mass_2, spin_1z, spin_2z, psi, iota, ra, dec, time, distance,
phase, [f_low] * len(mass_1), [f_final] * len(mass_1),
[psd] * len(mass_1), [approx] * len(mass_1), [f_ref] * len(mass_1),
[df] * len(mass_1), [flen] * len(mass_1), [multipole] * len(mass_1),
f_plus, f_cross
], dtype=object).T
rho_hm = np.array(
list(
iterator(
pool.imap(_wrapper_for_multipole_snr, args), tqdm=True,
desc="Calculating rho_hm", logger=logger, total=len(mass_1)
)
), dtype=object
)
rho_hm = np.asarray(np.nan_to_num(rho_hm, 0).T, dtype=np.float64)
rho_hm = np.sqrt(rho_hm)
if return_data_used:
psd_used = "stored" if not ANALYTIC else list(psd.values())[0].__name__
data_used = {"psd": psd_used, "approximant": approx, "f_final": f_final}
return rho_hm, data_used
return rho_hm
[docs]
@array_input(
ignore_kwargs=[
"f_low", "psd", "approx", "f_final", "f_ref", "return_data_used",
"multi_process", "duration", "df", "psd_default", "debug"
]
)
def precessing_snr(
mass_1, mass_2, beta, psi_J, a_1, a_2, tilt_1, tilt_2, phi_12, theta_jn,
ra, dec, time, phi_jl, distance, phase, f_low=20., psd={}, spin_1z=None,
spin_2z=None, chi_eff=None, approx="IMRPhenomPv2", f_final=1024.,
f_ref=None, return_data_used=False, multi_process=1, duration=None,
df=1. / 256, psd_default="aLIGOZeroDetHighPower", debug=True
):
"""Calculate the precessing snr as defined in Fairhurst et al.
arXiv:1908.05707
Parameters
----------
mass_1: np.ndarray
Primary mass of the bianry
mass_2: np.ndarray
Secondary mass of the binary
beta: np.ndarray
The angle between the total angular momentum and the total orbital
angular momentum
psi_J: np.ndarray
The polarization angle as defined with respect to the total angular
momentum
a_1: np.ndarray
The spin magnitude on the larger object
a_2: np.ndarray
The spin magnitude on the secondary object
tilt_1: np.ndarray
The angle between the total orbital angular momentum and the primary
spin
tilt_2: np.ndarray
The angle between the total orbital angular momentum and the secondary
spin
phi_12: np.ndarray
The angle between the primary spin and the secondary spin
theta_jn: np.ndarray
The angle between the total orbital angular momentum and the line of
sight
ra: np.ndarray
The right ascension of the source
dec: np.ndarray
The declination of the source
time: np.ndarray
The merger time of the binary
phi_jl: np.ndarray
the precession phase
f_low: float, optional
The low frequency cut-off to use for integration. Default is 20Hz
psd: dict, optional
Dictionary of pycbc.types.frequencyseries.FrequencySeries objects, one
for each detector. Default is to use the aLIGOZeroDetHighPower PSD
spin_1z: np.ndarray, optional
The primary spin aligned with the total orbital angular momentum
spin_2z: np.ndarray, optional
The secondary spin aligned with the total orbital angular momentum
chi_eff: np.ndarray, optional
Effective spin of the binary
approx: str, optional
The approximant you wish to use. Default IMRPhenomPv2
f_final: float, optional
Final frequency to use for integration. Default 1024Hz
f_ref: float, optional
Reference frequency where the spins are defined. Default is f_low
return_data_used: Bool, optional
if True, return a dictionary containing information about what data was
used. Default False
multi_process: int, optional
The number of cpus to use when computing the precessing_snr. Default 1
duration: float, optional
maximum IMR duration to use to estimate delta_f when PSD is not
provided.
debug: Bool, optional
if True, return posteriors for b_bar and the overlap between the 0th
and 1st harmonics. These are useful for debugging.
"""
from pesummary.gw.waveform import antenna_response
from pesummary.utils.utils import iterator
import multiprocessing
if isinstance(f_low, (list, np.ndarray)):
f_low = f_low[0]
psd, ANALYTIC = _setup_psd(
psd, psd_default, mass_1=mass_1, mass_2=mass_2, spin_1z=spin_1z,
spin_2z=spin_2z, chi_eff=chi_eff, f_low=f_low, duration=duration,
detectors=["H1", "L1"], f_final=f_final, df=df
)
f_final, f_ref = _setup_frequencies(f_low, f_final, f_ref, psd)
flen = int(f_final / df) + 1
_samples = {"ra": ra, "dec": dec, "psi": psi_J, "geocent_time": time}
detectors = list(psd.keys())
antenna = {
detector: antenna_response(_samples, detector) for detector in detectors
}
_f_plus_j = {key: value[0] for key, value in antenna.items()}
_f_cross_j = {key: value[1] for key, value in antenna.items()}
with multiprocessing.Pool(multi_process) as pool:
f_plus_j = np.array(
[dict(zip(_f_plus_j, item)) for item in zip(*_f_plus_j.values())]
)
f_cross_j = np.array(
[dict(zip(_f_cross_j, item)) for item in zip(*_f_cross_j.values())]
)
dphi = _dphi(theta_jn, phi_jl, beta)
args = np.array([
mass_1, mass_2, a_1, a_2, tilt_1, tilt_2, phi_12,
theta_jn, beta, psi_J, ra, dec, time, [approx] * len(mass_1),
[psd] * len(mass_1), [detectors] * len(mass_1), phi_jl, distance,
phase - dphi, f_plus_j, f_cross_j, [f_low] * len(mass_1),
[df] * len(mass_1), [f_final] * len(mass_1), [flen] * len(mass_1),
[f_ref] * len(mass_1), [debug] * len(mass_1)
], dtype=object).T
rho_p = np.array(
list(
iterator(
pool.imap(_wrapper_for_precessing_snr, args), tqdm=True,
desc="Calculating rho_p", logger=logger, total=len(mass_1)
)
), dtype=object
)
if debug:
rho_ps, b_bars, overlaps, snrs = {}, {}, {}, {}
for num, _dict in enumerate([rho_ps, b_bars, overlaps, snrs]):
for key in rho_p[0][0]:
_dict[key] = np.nan_to_num(
[dictionary[num][key] for dictionary in rho_p], 0
)
_return = [
np.sqrt(np.sum([_dict[i] for i in detectors], axis=0)) if num == 0
or num == 3 else np.mean([_dict[i] for i in detectors], axis=0) for
num, _dict in enumerate([rho_ps, b_bars, overlaps, snrs])
]
else:
rho_p = {
key: np.nan_to_num([dictionary[key] for dictionary in rho_p], 0) for
key in rho_p[0]
}
_return = np.sqrt(np.sum([rho_p[i] for i in detectors], axis=0))
if return_data_used:
psd_used = "stored" if not ANALYTIC else list(psd.values())[0].__name__
data_used = {"psd": psd_used, "approximant": approx, "f_final": f_final}
return _return, data_used
return _return
def _wrapper_for_multipole_snr(args):
"""Wrapper function for _multipole_snr for a pool of workers
Parameters
----------
args: tuple
All args passed to _precessing_snr
"""
return _multipole_snr(*args)
def _wrapper_for_precessing_snr(args):
"""Wrapper function for _precessing_snr for a pool of workers
Parameters
----------
args: tuple
All args passed to _precessing_snr
"""
return _precessing_snr(*args)
def _calculate_b_bar(
harmonic_dict, psd_dict, low_frequency_cutoff=20.,
high_frequency_cutoff=1024., return_snrs=False
):
"""Calculate the tangent of half the opening angle as defined in
Fairhurst et al. arXiv:1908.05707
Parameters
----------
harmonic_dict: dict
dictionary of precessing harmonics. Key is the harmonic number
(0, 1, 2...) and item is the
`pycbc.types.frequencyseries.FrequencySeries` object
psd_dict: dict
dictionary of psds. Key is the IFO and item is the
`pycbc.types.frequencyseries.FrequencySeries` object
low_frequency_cutoff: float, optional
Low frequency-cutoff to use for integration. Default 20Hz
high_frequency_cutoff: float, optional
The final frequency to use for integration. Default 1024Hz
return_snrs: Bool, optional
if True, return the snrs of each harmonic
"""
_optimal_snr = lambda harmonic, psd: optimal_snr(
harmonic, psd, low_frequency_cutoff=low_frequency_cutoff,
high_frequency_cutoff=high_frequency_cutoff
)
rhos = {
detector: {
0: _optimal_snr(harmonic_dict[0], psd_dict[detector]),
1: _optimal_snr(harmonic_dict[1], psd_dict[detector])
} for detector in psd_dict.keys()
}
b_bar = {detector: snr[1] / snr[0] for detector, snr in rhos.items()}
if return_snrs:
return b_bar, rhos
return b_bar
def _mode_array_map(mode_key, approx):
"""Return the mode_array in pycbc format for requested mode
Parameters
----------
mode_key: int/str
Mode key e.g. 22/'22'.
approx: str
Waveform approximant.
Returns
-------
mode_array: list
pesummary.gw.waveform.fd_waveform appropriate list of lm modes.
"""
mode_array_dict = {
"22": [[2, 2], [2, -2]],
"32": [[3, 2], [3, -2]],
"21": [[2, 1], [2, -1]],
"44": [[4, 4], [4, -4]],
"33": [[3, 3], [3, -3]],
"43": [[4, 3], [4, -3]],
}
# don't include negative m modes in Cardiff Phenom models
# as will throw an error - they are automatically
# added by these models. Note: Must include
# negative m modes for phenomXHM.
if approx in ["IMRPhenomPv3HM", "IMRPhenomHM"]:
mode_array_idx = -1
else:
mode_array_idx = None
return mode_array_dict[str(mode_key)][:mode_array_idx]
def _multipole_snr(
mass_1, mass_2, spin_1z, spin_2z, psi, iota, ra, dec, time, distance, phase,
f_low, f_final, psd, approx, f_ref, df, flen, multipole, f_plus, f_cross
):
"""Calculate the square of the multipole SNR for a given detector network
Parameters
----------
mass_1: float
Primary mass of the bianry
mass_2: float
Secondary mass of the binary
spin_1z: float
The primary spin aligned with the total orbital angular momentum
spin_2z: float
The secondary spin aligned with the total orbital angular momentum
psi: float
The polarization angle of the binary
iota: float
The angle between the total orbital angular momentum and the line of
sight
ra: float
The right ascension of the source
dec: float
The declination of the source
time: float
The merger time of the binary
distance: float
The luminosity distance of the source
phase: float
The phase of the source
f_low: float
Low frequency to use for integration
f_final: float
Final frequency to use for integration
psd: dict
Dictionary of pycbc.types.frequencyseries.FrequencySeries objects, one
for each detector.
approx: str
The aligned spin higher order multipole approximant you wish to use.
f_ref: float
Reference frequency where the spins are defined.
df: float
Frequency spacing between frequency samples
flen: int
Length of frequency array to use when generating the frequency domain
waveform
multipole: list
List of multipoles to calculate the SNR for.
f_plus: float
Detector response function for the plus polarization
f_cross:
Detector response function for the cross polarization
"""
from pesummary.gw.pycbc import compute_the_overlap
from pycbc.filter import sigmasq
h = {}
# geocenter waveforms
for mode in multipole:
mode_array = _mode_array_map(mode, approx)
tilt_1, tilt_2 = np.arccos(np.sign(spin_1z)), np.arccos(np.sign(spin_2z))
a_1, a_2 = np.abs(spin_1z), np.abs(spin_2z)
h[mode] = _make_waveform(
approx, iota, 0., phase, psi, mass_1, mass_2, tilt_1, tilt_2,
0., a_1, a_2, 0., distance, df=df, f_low=f_low, f_final=f_final,
f_ref=f_ref, mode_array=mode_array, apply_detector_response=False
)
rhosq_22 = 0
rhosq_hm = {mode: 0 for mode in multipole}
for det, _psd in psd.items():
_22 = h[22][0] * f_plus[det] + h[22][1] * f_cross[det]
for mode in multipole:
if mode == 22:
continue
_mode = h[mode][0] * f_plus[det] + h[mode][1] * f_cross[det]
_rhosq_hm = sigmasq(
_mode, _psd, low_frequency_cutoff=f_low,
high_frequency_cutoff=f_final
)
# calculate the mode power perpindicular to the 22 mode
# don't calculate the overlap if SNR ~ 0
if (_rhosq_hm < 1e-14):
oo = 0
else:
oo = compute_the_overlap(
_22, _mode, _psd, low_frequency_cutoff=f_low,
high_frequency_cutoff=f_final, normalized=True
)
rhosq_hm[mode] += _rhosq_hm * (1 - abs(oo) ** 2)
return [snr for mode, snr in rhosq_hm.items() if mode != 22]
def _precessing_snr(
mass_1, mass_2, a_1, a_2, tilt_1, tilt_2, phi_12, theta_jn,
beta, psi_J, ra, dec, time, approx, psd, detectors, phi_jl, distance,
phase, f_plus_j, f_cross_j, f_low, df, f_final, flen, f_ref, debug
):
"""Calculate the square of the precessing SNR for a given detector network
Parameters
----------
mass_1: float
Primary mass of the bianry
mass_2: float
Secondary mass of the binary
a_1: float
The spin magnitude on the larger object
a_2: float
The spin magnitude on the secondary object
tilt_1: float
The angle between the total orbital angular momentum and the primary
spin
tilt_2: float
The angle between the total orbital angular momentum and the secondary
spin
phi_12: float
The angle between the primary spin and the secondary spin
theta_jn: float
The angle between the total orbital angular momentum and the line of
sight
beta: float
The angle between the total angular momentum and the total orbital
angular momentum
psi_J: float
The polarization angle as defined with respect to the total angular
momentum
ra: float
The right ascension of the source
dec: float
The declination of the source
time: float
The merger time of the binary
flow: float
Low frequency-cutoff to use for integration
df: float
The difference between consecutive frequency samples
f_final: float
The final frequency to use for integration
flen: int
Length of the frequency series in samples. Default is None
f_ref: float, optional
Reference frequency where the spins are defined. Default is f_low
approx: str
Name of the approximant to use when calculating the harmonic
decomposition
psd: dict
Dictionary of PSDs for each detector
detector: list
List of detectors to analyse
phi_jl: float
the precession phase
"""
rho_p_dict = {detector: 0 for detector in detectors}
b_bar_dict = {detector: 0 for detector in detectors}
overlap_dict = {detector: 0 for detector in detectors}
snr_dict = {detector: 0 for detector in detectors}
harmonics = _calculate_precessing_harmonics(
mass_1, mass_2, a_1, a_2, tilt_1,
tilt_2, phi_12, beta, distance, approx=approx, f_final=f_final,
flen=flen, f_ref=f_ref, f_low=f_low, df=df
)
for detector in detectors:
rho_0 = optimal_snr(
harmonics[0], psd[detector], low_frequency_cutoff=f_low,
high_frequency_cutoff=f_final
)
rho_1 = optimal_snr(
harmonics[1], psd[detector], low_frequency_cutoff=f_low,
high_frequency_cutoff=f_final
)
b_bar = rho_1 / rho_0
pr = _prec_ratio_plus_cross(
theta_jn, phi_jl, f_plus_j[detector], f_cross_j[detector], b_bar
)
overlap = compute_the_overlap(
harmonics[0], harmonics[1], psd[detector],
low_frequency_cutoff=f_low, high_frequency_cutoff=f_final,
normalized=True
)
overlap_squared = np.abs(overlap)**2
prec_squared = np.abs(pr)**2
real_prec_overlap = 2 * (pr * overlap).real
h_2harm = _make_waveform_from_precessing_harmonics(
harmonics, theta_jn, phi_jl, phase, f_plus_j[detector],
f_cross_j[detector]
)
snr = optimal_snr(
h_2harm, psd[detector], low_frequency_cutoff=f_low,
high_frequency_cutoff=f_final
)
normalization = snr**2 / (1 + prec_squared + real_prec_overlap)
rho_0 = (
1 + np.abs(pr * overlap)**2 + real_prec_overlap
) * normalization
rho_0_perp = (prec_squared * (1 - overlap_squared)) * normalization
rho_1 = (
overlap_squared + real_prec_overlap + prec_squared
) * normalization
rho_1_perp = (1 - overlap_squared) * normalization
_rho_p = np.min([rho_0_perp, rho_1_perp])
if _rho_p > snr**2 / 2.:
_rho_p = snr**2 - _rho_p
rho_p_dict[detector] = _rho_p
b_bar_dict[detector] = b_bar
overlap_dict[detector] = np.sqrt(overlap_squared)
snr_dict[detector] = snr**2
if debug:
return rho_p_dict, b_bar_dict, overlap_dict, snr_dict
return rho_p_dict