Source code for pesummary.gw.conversions.tgr

# Licensed under an MIT style license -- see LICENSE.md

from pesummary.utils.utils import logger
import numpy as np
from scipy.stats import gaussian_kde
import multiprocessing
from pesummary.utils.probability_dict import ProbabilityDict2D
from pesummary.utils.bounded_interp import RectBivariateSpline

__author__ = [
    "Aditya Vijaykumar <aditya.vijaykumar@ligo.org>",
    "Charlie Hoy <charlie.hoy@ligo.org>"
]


def _wrapper_for_multiprocessing_kde(kde, *args):
    """Wrapper to evaluate a KDE on multiple cpus

    Parameters
    ----------
    kde: func
        KDE you wish to evaluate
    *args: tuple
        all args are passed to the KDE
    """
    _reshape = (len(args[0]), len(args[1]))
    yy, xx = np.meshgrid(args[1], args[0])
    _args = [xx.ravel(), yy.ravel()]
    return kde(_args).reshape(_reshape)


def _wrapper_for_multiprocessing_interp(interp, *args):
    """Wrapper to evaluate an interpolant on multiple cpus

    Parameters
    ----------
    interp: func
        interpolant you wish to use
    *args: tuple
        all args are passed to the interpolant
    """
    return interp(*args).T


def _imrct_deviation_parameters_integrand_vectorized(
    final_mass,
    final_spin,
    v1,
    v2,
    P_final_mass_final_spin_i_interp_object,
    P_final_mass_final_spin_r_interp_object,
    multi_process=1,
    wrapper_function_for_multiprocess=None,
):
    """Compute the integrand of P(delta_final_mass/final_mass_bar,
    delta_final_spin/final_spin_bar).

    Parameters
    ----------
    final_mass: np.ndarray
        vector of values of final mass
    final_spin: np.ndarray
        vector of values of final spin
    v1: np.ndarray
        array of delta_final_mass/final_mass_bar values
    v2: np.ndarray
        array of delta_final_spin/final_spin_bar values
    P_final_mass_final_spin_i_interp_object:
        interpolated P_i(final_mass, final_spin)
    P_final_mass_final_spin_r_interp_object:
        interpolated P_r(final_mass, final_spin)
    multi_process: int
        Number of parallel processes. Default: 1
    wrapper_function_for_multiprocess: method
        Wrapper function for the multiprocessing. Default: None

    Returns
    -------
    np.array
        integrand of P(delta_final_mass/final_mass_bar,
        delta_final_spin/final_spin_bar)
    """
    final_mass_mat, final_spin_mat = np.meshgrid(final_mass, final_spin)
    _abs = np.abs(final_mass_mat * final_spin_mat)
    _reshape = (len(v1), len(v1))
    _v2, _v1 = np.meshgrid(v2, v1)
    v1, v2 = _v1.ravel(), _v2.ravel()
    v1, v2 = v1.reshape(len(v1), 1), v2.reshape(len(v2), 1)

    # Create delta_final_mass and delta_final_spin vectors corresponding
    # to the given v1 and v2.

    # These vectors have to be monotonically increasing in order to
    # evaluate the interpolated prob densities. Hence, for v1, v2 < 0,
    # flip them, evaluate the prob density (in column or row) and flip it back

    # The definition of the delta_* parameters is taken from eq A1 of
    # Ghosh et al 2018, arXiv:1704.06784.

    delta_final_mass_i = ((1.0 + v1 / 2.0)) * final_mass
    delta_final_spin_i = ((1.0 + v2 / 2.0)) * final_spin
    delta_final_mass_r = ((1.0 - v1 / 2.0)) * final_mass
    delta_final_spin_r = ((1.0 - v2 / 2.0)) * final_spin

    for num in range(len(v1)):
        if (1.0 + v1[num] / 2.0) < 0.0:
            delta_final_mass_i[num] = np.flipud(delta_final_mass_i[num])
        if (1.0 + v2[num] / 2.0) < 0.0:
            delta_final_spin_i[num] = np.flipud(delta_final_spin_i[num])
        if (1.0 - v1[num] / 2.0) < 0.0:
            delta_final_mass_r[num] = np.flipud(delta_final_mass_r[num])
        if (1.0 - v2[num] / 2.0) < 0.0:
            delta_final_spin_r[num] = np.flipud(delta_final_spin_r[num])

    if multi_process > 1:
        with multiprocessing.Pool(multi_process) as pool:
            _length = len(delta_final_mass_i)
            _P_i = pool.starmap(
                wrapper_function_for_multiprocess,
                zip(
                    [P_final_mass_final_spin_i_interp_object] * _length,
                    delta_final_mass_i,
                    delta_final_spin_i,
                ),
            )
            _P_r = pool.starmap(
                wrapper_function_for_multiprocess,
                zip(
                    [P_final_mass_final_spin_r_interp_object] * _length,
                    delta_final_mass_r,
                    delta_final_spin_r,
                ),
            )
        P_i = np.array([i for i in _P_i])
        P_r = np.array([i for i in _P_r])
    else:
        P_i, P_r = [], []
        for num in range(len(delta_final_mass_i)):
            P_i.append(
                wrapper_function_for_multiprocess(
                    P_final_mass_final_spin_i_interp_object,
                    delta_final_mass_i[num],
                    delta_final_spin_i[num],
                )
            )
            P_r.append(
                wrapper_function_for_multiprocess(
                    P_final_mass_final_spin_r_interp_object,
                    delta_final_mass_r[num],
                    delta_final_spin_r[num],
                )
            )

    for num in range(len(v1)):
        if (1.0 + v1[num] / 2.0) < 0.0:
            P_i[num] = np.fliplr(P_i[num])
        if (1.0 + v2[num] / 2.0) < 0.0:
            P_i[num] = np.flipud(P_i[num])
        if (1.0 - v1[num] / 2.0) < 0.0:
            P_r[num] = np.fliplr(P_r[num])
        if (1.0 - v2[num] / 2.0) < 0.0:
            P_r[num] = np.flipud(P_r[num])

    # The integration is performed according to eq A2 of Ghosh et al,
    # arXiv:1704.06784

    _prod = np.array(
        [np.sum(_P_i * _P_r * _abs) for _P_i, _P_r in zip(P_i, P_r)]
    ).reshape(_reshape)
    return _prod


def _apply_args_and_kwargs(function, args, kwargs):
    """Apply a tuple of args and a dictionary of kwargs to a function

    Parameters
    ----------
    function: func
        function you wish to use
    args: tuple
        all args passed to function
    kwargs: dict
        all kwargs passed to function
    """
    return function(*args, **kwargs)


def _imrct_deviation_parameters_integrand_series(
    final_mass,
    final_spin,
    v1,
    v2,
    P_final_mass_final_spin_i_interp_object,
    P_final_mass_final_spin_r_interp_object,
    multi_process=1,
    **kwargs
):
    """
    Creates the over the deviation parameter space.

    Parameters
    ----------
    final_mass: np.ndarray
        vector of values of final mass
    final_spin: np.ndarray
        vector of values of final spin
    v1: np.ndarray
        array of delta_final_mass/final_mass_bar values
    v2: np.ndarray
        array of delta_final_spin/final_spin_bar values
    P_final_mass_final_spin_i_interp_object:
        interpolated P_i(final_mass, final_spin)
    P_final_massfinal_spin_r_interp_object:
        interpolated P_r(final_mass, final_spin)
    """
    P = np.zeros(shape=(len(v1), len(v2)))
    if multi_process == 1:
        logger.debug("Performing calculation on a single cpu. This may take some time")
        for i, _v2 in enumerate(v2):
            for j, _v1 in enumerate(v1):
                P[i, j] = _imrct_deviation_parameters_integrand_vectorized(
                    final_mass,
                    final_spin,
                    [_v1],
                    [_v2],
                    P_final_mass_final_spin_i_interp_object,
                    P_final_mass_final_spin_r_interp_object,
                    multi_process=1,
                    **kwargs
                )
    else:
        logger.debug("Splitting the calculation across {} cpus".format(multi_process))
        _v1, _v2 = np.meshgrid(v1, v2)
        _v1, _v2 = _v1.ravel(), _v2.ravel()
        with multiprocessing.Pool(multi_process) as pool:
            args = [
                [final_mass] * len(_v1),
                [final_spin] * len(_v1),
                np.atleast_2d(_v1).T.tolist(),
                np.atleast_2d(_v2).T.tolist(),
                [P_final_mass_final_spin_i_interp_object] * len(_v1),
                [P_final_mass_final_spin_r_interp_object] * len(_v1),
            ]
            kwargs["multi_process"] = 1
            _args = np.array(args, dtype=object).T
            _P = pool.starmap(
                _apply_args_and_kwargs,
                zip(
                    [_imrct_deviation_parameters_integrand_vectorized] * len(_args),
                    _args,
                    [kwargs] * len(_args),
                ),
            )
        P = np.array([i[0] for i in _P]).reshape(len(v1), len(v2))
    return P


[docs] def imrct_deviation_parameters_integrand(*args, vectorize=False, **kwargs): """Compute the final mass and final spin deviation parameters Parameters ---------- *args: tuple all args passed to either _imrct_deviation_parameters_integrand_vectorized or _imrct_deviation_parameters_integrand_series vectorize: bool Vectorize the calculation. Note that vectorize=True uses up a lot of memory kwargs: dict, optional all kwargs passed to either _imrct_deviation_parameters_integrand_vectorized or _imrct_deviation_parameters_integrand_series """ if vectorize: return _imrct_deviation_parameters_integrand_vectorized(*args, **kwargs) return _imrct_deviation_parameters_integrand_series(*args, **kwargs)
[docs] def imrct_deviation_parameters_from_final_mass_final_spin( final_mass_inspiral, final_spin_inspiral, final_mass_postinspiral, final_spin_postinspiral, N_bins=101, final_mass_deviation_lim=1, final_spin_deviation_lim=1, multi_process=1, use_kde=False, kde=gaussian_kde, kde_kwargs=dict(), interp_method=RectBivariateSpline, interp_kwargs=dict(fill_value=0.0, bounds_error=False, kx=1, ky=1), vectorize=False, ): """Compute the IMR Consistency Test deviation parameters. Code borrows from the implementation in lalsuite: https://git.ligo.org/lscsoft/lalsuite/-/blob/master/lalinference/python/lalinference/imrtgr/imrtgrutils.py and https://git.ligo.org/lscsoft/lalsuite/-/blob/master/lalinference/bin/imrtgr_imr_consistency_test.py Parameters ---------- final_mass_inspiral: np.ndarray values of final mass calculated from the inspiral part final_spin_inspiral: np.ndarray values of final spin calculated from the inspiral part final_mass_postinspiral: np.ndarray values of final mass calculated from the post-inspiral part final_spin_postinspiral: np.ndarray values of final spin calculated from the post-inspiral part final_mass_deviation_lim: float Maximum absolute value of the final mass deviation parameter. Default 1. final_spin_deviation_lim: float Maximum absolute value of the final spin deviation parameter. Default 1. N_bins: int, optional Number of equally spaced bins between [-final_mass_deviation_lim, final_mass_deviation_lim] and [-final_spin_deviation_lim, final_spin_deviation_lim]. Default is 101. multi_process: int Number of parallel processes. Default is 1. use_kde: bool If `True`, uses kde instead of interpolation. Default is False. kde: method KDE method to use. Default is scipy.stats.gaussian_kde kde_kwargs: dict Arguments to be passed to the KDE method interp_method: method Interpolation method to use. Default is scipy.interpolate.interp2d interp_kwargs: dict, optional Arguments to be passed to the interpolation method Default is `dict(fill_value=0.0, bounds_error=False)` vectorize: bool if True, use vectorized imrct_deviation_parameters_integrand function. This is quicker but does consume more memory. Default: False Returns ------- imrct_deviations: ProbabilityDict2d Contains the 2d pdf of the IMRCT deviation parameters """ # Find the maximum values final_mass_lim = np.max(np.append(final_mass_inspiral, final_mass_postinspiral)) final_spin_lim = np.max( np.abs(np.append(final_spin_inspiral, final_spin_postinspiral)) ) # bin the data final_mass_bins = np.linspace(-final_mass_lim, final_mass_lim, N_bins) diff_final_mass = final_mass_bins[1] - final_mass_bins[0] final_spin_bins = np.linspace(-final_spin_lim, final_spin_lim, N_bins) diff_final_spin = final_spin_bins[1] - final_spin_bins[0] final_mass_intp = (final_mass_bins[:-1] + final_mass_bins[1:]) * 0.5 final_spin_intp = (final_spin_bins[:-1] + final_spin_bins[1:]) * 0.5 if use_kde: logger.debug("Using KDE to interpolate data") # kde the samples for final mass and final spin final_mass_intp = np.append( final_mass_intp, final_mass_bins[-1] + diff_final_mass ) final_spin_intp = np.append( final_spin_intp, final_spin_bins[-1] + diff_final_spin ) inspiral_interp = kde( np.array([final_mass_inspiral, final_spin_inspiral]), **kde_kwargs ) postinspiral_interp = kde( np.array([final_mass_postinspiral, final_spin_postinspiral]), **kde_kwargs ) _wrapper_function = _wrapper_for_multiprocessing_kde else: logger.debug("Interpolating 2d histogram data") _inspiral_2d_histogram, _, _ = np.histogram2d( final_mass_inspiral, final_spin_inspiral, bins=(final_mass_bins, final_spin_bins), density=True, ) _postinspiral_2d_histogram, _, _ = np.histogram2d( final_mass_postinspiral, final_spin_postinspiral, bins=(final_mass_bins, final_spin_bins), density=True, ) inspiral_interp = interp_method( final_mass_intp, final_spin_intp, _inspiral_2d_histogram, **interp_kwargs ) postinspiral_interp = interp_method( final_mass_intp, final_spin_intp, _postinspiral_2d_histogram, **interp_kwargs ) _wrapper_function = _wrapper_for_multiprocessing_interp final_mass_deviation_vec = np.linspace( -final_mass_deviation_lim, final_mass_deviation_lim, N_bins ) final_spin_deviation_vec = np.linspace( -final_spin_deviation_lim, final_spin_deviation_lim, N_bins ) diff_final_mass_deviation = final_mass_deviation_vec[1] - final_mass_deviation_vec[0] diff_final_spin_deviation = final_spin_deviation_vec[1] - final_spin_deviation_vec[0] P_final_mass_deviation_final_spin_deviation = imrct_deviation_parameters_integrand( final_mass_intp, final_spin_intp, final_mass_deviation_vec, final_spin_deviation_vec, inspiral_interp, postinspiral_interp, multi_process=multi_process, vectorize=vectorize, wrapper_function_for_multiprocess=_wrapper_function, ) imrct_deviations = ProbabilityDict2D( { "final_mass_final_spin_deviations": [ final_mass_deviation_vec, final_spin_deviation_vec, P_final_mass_deviation_final_spin_deviation / np.sum(P_final_mass_deviation_final_spin_deviation), ] } ) return imrct_deviations
[docs] def generate_imrct_deviation_parameters( samples, evolve_spins_forward=True, inspiral_string="inspiral", postinspiral_string="postinspiral", approximant=None, f_low=None, return_samples_used=False, **kwargs ): """Generate deviation parameter pdfs for the IMR Consistency Test Parameters ---------- samples: MultiAnalysisSamplesDict Dictionary containing inspiral and postinspiral samples evolve_spins_forward: bool If `True`, evolve spins to the ISCO frequency. Default: True. inspiral_string: string Identifier for the inspiral samples postinspiral_string: string Identifier for the post-inspiral samples approximant: dict, optional The approximant used for the inspiral and postinspiral analyses. Keys of the dictionary must be the same as the inspiral_string and postinspiral_string. Default None f_low: dict, optional The low frequency cut-off used for the inspiral and postinspiral analyses. Keys of the dictionary must be the same as the inspiral_string and postinspiral_string. Default None return_samples_used: Bool, optional if True, return the samples which were used to generate the IMRCT deviation parameters. These samples will match the input but may include remnant samples if they were not previously present kwargs: dict, optional Keywords to be passed to imrct_deviation_parameters_from_final_mass_final_spin Returns ------- imrct_deviations: ProbabilityDict2d 2d pdf of the IMRCT deviation parameters data: dict Metadata """ import time remnant_condition = lambda _dictionary, _suffix: all( "{}{}".format(param, _suffix) not in _dictionary.keys() for param in ["final_mass", "final_spin"] ) evolved = np.ones(2, dtype=bool) suffix = [""] evolve_spins = ["ISCO"] if not evolve_spins_forward: suffix = ["_non_evolved"] + suffix evolve_spins = [False, False] fits_data = {} for idx, (key, sample) in enumerate(samples.items()): zipped = zip(suffix, evolve_spins) for num, (_suffix, _evolve_spins) in enumerate(zipped): cond = remnant_condition(sample, _suffix) _found_msg = ( "Found {} remnant properties in the posterior table " "for {}. Using these for calculation." ) if not cond: logger.info( _found_msg.format( "evolved" if not len(_suffix) else "non-evolved", key ) ) if len(_suffix): evolved[idx] = False break elif not remnant_condition(sample, ""): logger.info(_found_msg.format("evolved", key)) evolved[idx] = True break else: logger.warning( "{} remnant properties not found in the posterior " "table for {}. Trying to calculate them.".format( "Evolved" if not len(_suffix) else "Non-evolved", key ) ) returned_extra_kwargs = sample.generate_all_posterior_samples( evolve_spins=_evolve_spins, return_kwargs=True, approximant=( approximant[key] if approximant is not None else None ), f_low=f_low[key] if f_low is not None else None ) _cond = remnant_condition(sample, _suffix) if not _cond: logger.info( "{} remnant properties generated. Using these " "samples for calculation".format( "Evolved" if not len(_suffix) else "Non-evolved" ) ) for fit in ["final_mass_NR_fits", "final_spin_NR_fits"]: fits_data["{} {}".format(key, fit)] = returned_extra_kwargs[ "meta_data" ][fit] if len(_suffix): evolved[idx] = False break if num == 1: raise ValueError( "Unable to compute the remnant properties" ) if not all(_evolved == evolved[0] for _evolved in evolved): keys = list(samples.keys()) _inspiral_index = keys.index(inspiral_string) _postinspiral_index = keys.index(postinspiral_string) raise ValueError( "Using {} remnant properties for the inspiral and {} remnant " "properties for the postinspiral. This must be the same for " "the calculation".format( "evolved" if evolved[_inspiral_index] else "non-evolved", "non-evolved" if not evolved[_postinspiral_index] else "evolved" ) ) samples_string = "final_{}" if not evolved[0]: samples_string += "_non_evolved" logger.info("Calculating IMRCT deviation parameters and GR Quantile") t0 = time.time() imrct_deviations = imrct_deviation_parameters_from_final_mass_final_spin( samples[inspiral_string][samples_string.format("mass")], samples[inspiral_string][samples_string.format("spin")], samples[postinspiral_string][samples_string.format("mass")], samples[postinspiral_string][samples_string.format("spin")], **kwargs, ) gr_quantile = ( imrct_deviations[ "final_mass_final_spin_deviations" ].minimum_encompassing_contour_level(0.0, 0.0) * 100 ) t1 = time.time() data = kwargs.copy() data["evolve_spins"] = evolved data["Time (seconds)"] = round(t1 - t0, 2) data["GR Quantile (%)"] = gr_quantile[0] data.update(fits_data) logger.info( "Calculation Finished in {} seconds. GR Quantile is {} %.".format( data["Time (seconds)"], round(data["GR Quantile (%)"], 2) ) ) if return_samples_used: return imrct_deviations, data, evolved[0], samples return imrct_deviations, data, evolved[0]