""" Tool for running online bilby PE using GraceDB events
The functionality of much of these utility assumes the user is running on the
CIT cluster, e.g. the ROQ and calibration directories are in there usual place
"""
import argparse
import json
import os
import shutil
import numpy as np
from . import parser
from .utils import (
DEFAULT_DISTANCE_LOOKUPS,
BilbyPipeError,
check_directory_exists_and_if_not_mkdir,
logger,
next_power_of_2,
run_command_line,
tcolors,
test_connection,
)
# Default channels from: https://wiki.ligo.org/LSC/JRPComm/ObsRun3
# NOTE: can these be read from gracedb?
[docs]
CHANNEL_DICTS = dict(
online=dict(
H1="GDS-CALIB_STRAIN_CLEAN", L1="GDS-CALIB_STRAIN_CLEAN", V1="Hrec_hoft_16384Hz"
),
o2replay=dict(
H1="GDS-CALIB_STRAIN_O2Replay",
L1="GDS-CALIB_STRAIN_O2Replay",
V1="Hrec_hoft_16384Hz_O2Replay",
),
o3replay=dict(
H1="GDS-CALIB_STRAIN_INJ1_O3Replay",
L1="GDS-CALIB_STRAIN_INJ1_O3Replay",
V1="Hrec_hoft_16384Hz_INJ1_O3Replay",
),
)
[docs]
def x509userproxy(outdir):
"""Copies X509_USER_PROXY certificate from user's os.environ and
places it inside the outdir, if the X509_USER_PROXY exists.
Parameters
----------
outdir: str
Output directory where X509_USER_PROXY certificate is copied to.
Returns
-------
x509userproxy: str, None
New path to X509_USER_PROXY certification file
None if X509_USER_PROXY certificate does not exist, or if
the X509_USER_PROXY cannot be copied.
"""
x509userproxy = None
cert_alias = "X509_USER_PROXY"
try:
cert_path = os.environ[cert_alias]
new_cert_path = os.path.join(outdir, "." + os.path.basename(cert_path))
shutil.copyfile(src=cert_path, dst=new_cert_path)
x509userproxy = new_cert_path
except FileNotFoundError as e:
logger.warning(
"Environment variable X509_USER_PROXY does not point to a file. "
f"Error while copying file: {e}. "
"Try running `$ ligo-proxy-init albert.einstein`"
)
except KeyError:
logger.warning(
"Environment variable X509_USER_PROXY not set"
" Try running `$ ligo-proxy-init albert.einstein`"
)
return x509userproxy
[docs]
def read_from_gracedb(gracedb, gracedb_url, outdir):
"""
Read GraceDB events from GraceDB
Parameters
----------
gracedb: str
GraceDB id of event
gracedb_url: str
Service url for GraceDB events
GraceDB 'https://gracedb.ligo.org/api/' (default)
GraceDB-playground 'https://gracedb-playground.ligo.org/api/'
outdir: str
Output directory
Returns
-------
event:
Contains contents of GraceDB event from GraceDB, json format
"""
from urllib.error import HTTPError
from ligo.gracedb.rest import GraceDb
test_connection()
logger.info(f"Connecting to {gracedb_url}")
try:
# TODO: do we need this x509 proxy I'm fairly sure my local jobs aren't using it
client = GraceDb(cred=x509userproxy(outdir), service_url=gracedb_url)
except IOError:
logger.warning("Failed to connect to GraceDB")
raise
logger.info(f"Querying event {gracedb}")
event = client.event(gracedb).json()
with open(f"{outdir}/{gracedb}.json", "w") as ff:
json.dump(event, ff, indent=2)
logger.info(f"Requesting coinc.xml for {gracedb}")
try:
data = client.files(gracedb, filename="coinc.xml")
coinc_filename = f"{outdir}/coinc.xml"
with open(coinc_filename, "wb") as ff:
ff.write(data.data)
event["coinc_file"] = coinc_filename
except HTTPError:
logger.warning(
"Failed to download coinc.xml. PSDs will be generated by bilby_pipe."
)
event["coinc_file"] = None
return event
[docs]
def read_from_json(json_file):
"""Read GraceDB events from json file
Parameters
----------
json_file: str
Filename of json json file output
Returns
-------
candidate: dict
Contains contents of GraceDB event from json, json format
"""
if os.path.isfile(json_file) is False:
raise FileNotFoundError(f"File {json_file} not found")
try:
with open(json_file, "r") as file:
candidate = json.load(file)
except IOError:
logger.warning("Unable to load event contents of json file")
return candidate
[docs]
def calibration_lookup(trigger_time, detector):
"""Lookup function for the relevant calibration file
Assumes that it is running on CIT where the calibration files are stored
under /home/cbc/pe/O3/calibrationenvelopes
Parameters
----------
trigger_time: float
The trigger time of interest
detector: str [H1, L1, V1]
Detector string
Returns
-------
filepath: str
The path to the relevant calibration envelope file. If no calibration
file can be determined, None is returned.
"""
# FIXME: point to updated calibration envelopes
base = "/home/cbc/pe/O3/calibrationenvelopes"
CALENVS_LOOKUP = dict(
H1=os.path.join(base, "LIGO_Hanford/H_CalEnvs.txt"),
L1=os.path.join(base, "LIGO_Livingston/L_CalEnvs.txt"),
V1=os.path.join(base, "Virgo/V_CalEnvs.txt"),
)
if os.path.isdir(base) is False:
raise BilbyPipeError(f"Unable to read from calibration folder {base}")
calenv = CALENVS_LOOKUP[detector]
times = list()
files = dict()
with open(calenv, "r") as f:
for line in f:
time, filename = line.rstrip("\n").rstrip().split(" ")
times.append(float(time))
files[float(time)] = filename
times = sorted(times)
if trigger_time < times[0]:
raise BilbyPipeError(
"Requested trigger time prior to earliest calibration file"
)
for time in times:
if trigger_time > time:
directory = os.path.dirname(calenv)
calib_file = f"{directory}/{files[time]}"
return os.path.abspath(calib_file)
[docs]
def calibration_dict_lookup(trigger_time, detectors):
"""Dictionary lookup function for the relevant calibration files
Parameters
----------
trigger_time: float
The trigger time of interest
detectors: list
List of detector string
Returns
-------
calibration_model, calibration_dict: str, dict
Calibration model string and dictionary of paths to the relevant
calibration envelope file.
"""
try:
calibration_dict = {
det: calibration_lookup(trigger_time, det) for det in detectors
}
return "CubicSpline", calibration_dict
except BilbyPipeError:
return None, None
[docs]
def read_candidate(candidate):
"""Read a gracedb candidate json dictionary"""
if "extra_attributes" not in candidate:
raise BilbyPipeError(
"Cannot parse event dictionary, not 'extra_attributes' present."
)
elif "CoincInspiral" in candidate["extra_attributes"]:
return _read_cbc_candidate(candidate)
elif "MultiBurst" in candidate["extra_attributes"]:
return _read_burst_candidate(candidate)
[docs]
def _read_cbc_candidate(candidate):
if "mchirp" not in candidate["extra_attributes"]["CoincInspiral"]:
raise BilbyPipeError(
f"Unable to determine chirp mass for {candidate['graceid']} from GraceDB"
)
sngl = candidate["extra_attributes"]["SingleInspiral"][0]
trigger_values = {}
trigger_values["chirp_mass"] = sngl["mchirp"]
trigger_values["mass_ratio"] = min(sngl["mass1"], sngl["mass2"]) / max(
sngl["mass1"], sngl["mass2"]
)
trigger_values["spin_1z"] = sngl["spin1z"]
trigger_values["spin_2z"] = sngl["spin2z"]
superevent = candidate["superevent"]
ifos = candidate["extra_attributes"]["CoincInspiral"]["ifos"].split(",")
sngl = candidate["extra_attributes"]["SingleInspiral"]
ifos = [entry["ifo"] for entry in sngl]
snrs = [entry["snr"] for entry in sngl]
# NOTE: the channel name here varies by pipeline and isn't always available
# channels = {entry["ifo"]: entry["channel"][3:] for entry in sngl}
best_to_worst = np.argsort(snrs)[::-1]
best_event = sngl[best_to_worst[0]]
trigger_time = best_event["end_time"] + best_event["end_time_ns"] / 1e9
sorted_ifos = [ifos[idx] for idx in best_to_worst]
time_reference = sorted_ifos[0]
if len(sorted_ifos) > 1:
reference_frame = "".join(sorted_ifos[:2])
else:
reference_frame = "sky"
return (
trigger_values,
superevent,
trigger_time,
ifos,
reference_frame,
time_reference,
)
[docs]
def _read_burst_candidate(candidate):
central_frequency = candidate["extra_attributes"]["MultiBurst"]["central_freq"]
superevent = candidate["superevent"]
trigger_time = candidate["gpstime"]
ifos = candidate["extra_attributes"]["MultiBurst"]["ifos"].split()
return central_frequency, superevent, trigger_time, ifos
[docs]
def _get_cbc_likelihood_args(mode, trigger_values):
"""Return cbc likelihood arguments and quantities characterizing likelihood
Parameters
----------
mode: str
chirp_mass: float
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
"""
if mode in ["phenompv2_bbh_roq", "test"]:
return _choose_phenompv2_bbh_roq(
trigger_values["chirp_mass"], ignore_no_params=(mode == "test")
)
elif mode in [
"lowspin_phenomd_narrowmc_roq",
"lowspin_phenomd_broadmc_roq",
"phenompv2_bns_roq",
"phenompv2nrtidalv2_roq",
]:
return _choose_bns_roq(trigger_values["chirp_mass"], mode)
else:
return _get_cbc_likelihood_args_from_json(mode, trigger_values)
[docs]
def _choose_phenompv2_bbh_roq(chirp_mass, ignore_no_params=False):
"""Choose an appropriate PhenomPv2 ROQ folder, and return likelihood
arguments and quantities characterizing likelihood. The bases were
developed in the work of arXiv:1604.08253. For a high-mass trigger with
chirp mass above 35 solar mass, this returns arguments with the standard
likelihood `GravitationalWaveTransient`, as the analysis is computationally
cheap anyway.
Parameters
----------
chirp_mass: float
ignore_no_params: bool
If True, this ignores FileNotFoundError raised when roq params file is
not found, which is useful for testing this command outside the CIT
cluster.
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
"""
likelihood_args = {
"waveform_approximant": "IMRPhenomPv2",
}
likelihood_parameter_bounds = {"spin_template": "precessing"}
if chirp_mass > 35:
likelihood_args["likelihood_type"] = "GravitationalWaveTransient"
likelihood_args["time_marginalization"] = True
likelihood_args["jitter_time"] = True
likelihood_parameter_bounds["chirp_mass_min"] = 25
likelihood_parameter_bounds["chirp_mass_max"] = 200
likelihood_parameter_bounds["mass_ratio_min"] = 0.125
likelihood_parameter_bounds["a_1_max"] = 0.99
likelihood_parameter_bounds["a_2_max"] = 0.99
minimum_frequency = 20
maximum_frequency = 1024
duration = 4
else:
likelihood_args["likelihood_type"] = "ROQGravitationalWaveTransient"
roq_scale_factor = 1
if chirp_mass > 13.53:
likelihood_args["roq_folder"] = "/home/cbc/ROQ_data/IMRPhenomPv2/4s"
elif chirp_mass > 8.73:
likelihood_args["roq_folder"] = "/home/cbc/ROQ_data/IMRPhenomPv2/8s"
elif chirp_mass > 5.66:
likelihood_args["roq_folder"] = "/home/cbc/ROQ_data/IMRPhenomPv2/16s"
elif chirp_mass > 3.68:
likelihood_args["roq_folder"] = "/home/cbc/ROQ_data/IMRPhenomPv2/32s"
elif chirp_mass > 2.39:
likelihood_args["roq_folder"] = "/home/cbc/ROQ_data/IMRPhenomPv2/64s"
elif chirp_mass > 1.43:
likelihood_args["roq_folder"] = "/home/cbc/ROQ_data/IMRPhenomPv2/128s"
elif chirp_mass > 0.9:
likelihood_args["roq_folder"] = "/home/cbc/ROQ_data/IMRPhenomPv2/128s"
roq_scale_factor = 1.6
else:
likelihood_args["roq_folder"] = "/home/cbc/ROQ_data/IMRPhenomPv2/128s"
roq_scale_factor = 2
roq_params_file = os.path.join(likelihood_args["roq_folder"], "params.dat")
if os.path.exists(roq_params_file):
roq_params = np.genfromtxt(roq_params_file, names=True)
elif ignore_no_params:
roq_params = {
"chirpmassmin": roq_scale_factor * chirp_mass * 0.9,
"chirpmassmax": roq_scale_factor * chirp_mass * 1.1,
"qmax": 8,
"compmin": 0,
"chiL1min": -0.99,
"chiL1max": 0.99,
"chiL2min": -0.99,
"chiL2max": 0.99,
"flow": 20,
"fhigh": 1024,
"seglen": 4,
}
logger.warning(
f"{roq_params_file} not found. ROQ parameters are set to {roq_params}."
)
else:
raise FileNotFoundError(f"{roq_params_file} not found.")
likelihood_args["roq_scale_factor"] = roq_scale_factor
likelihood_parameter_bounds["chirp_mass_min"] = (
roq_params["chirpmassmin"] / roq_scale_factor
)
likelihood_parameter_bounds["chirp_mass_max"] = (
roq_params["chirpmassmax"] / roq_scale_factor
)
likelihood_parameter_bounds["mass_ratio_min"] = 1 / roq_params["qmax"]
likelihood_parameter_bounds["comp_min"] = (
roq_params["compmin"] / roq_scale_factor
)
likelihood_parameter_bounds["a_1_max"] = roq_params["chiL1max"]
likelihood_parameter_bounds["a_2_max"] = roq_params["chiL2max"]
minimum_frequency = roq_params["flow"] * roq_scale_factor
maximum_frequency = roq_params["fhigh"] * roq_scale_factor
duration = roq_params["seglen"] / roq_scale_factor
return (
likelihood_args,
likelihood_parameter_bounds,
minimum_frequency,
maximum_frequency,
duration,
)
[docs]
def _choose_bns_roq(chirp_mass, mode):
"""Choose an appropriate BNS-mass ROQ basis file, and return likelihood
arguments and quantities characterizing likelihood. The review information
of those bases are found at https://git.ligo.org/pe/O4/review_bns_roq/-/wikis.
Parameters
----------
chirp_mass: float
mode: str
Allowed options are "lowspin_phenomd_narrowmc_roq",
"lowspin_phenomd_broadmc_roq", "phenompv2_bns_roq",
and "phenompv2nrtidalv2_roq".
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
"""
likelihood_parameter_bounds = {"mass_ratio_min": 0.125}
# 2.31, 1.54, and 1.012 are 1.1 times the minimum chirp mass values of 64s,
# 128s, and 256s bases respectively.
if mode == "lowspin_phenomd_narrowmc_roq":
waveform_approximant = "IMRPhenomD"
roq_dir = "/home/roq/IMRPhenomD/lowspin_narrowmc_bns"
likelihood_parameter_bounds["a_1_max"] = 0.05
likelihood_parameter_bounds["a_2_max"] = 0.05
likelihood_parameter_bounds["spin_template"] = "aligned"
elif mode == "lowspin_phenomd_broadmc_roq":
waveform_approximant = "IMRPhenomD"
roq_dir = "/home/roq/IMRPhenomD/lowspin_broadmc_bns"
likelihood_parameter_bounds["a_1_max"] = 0.05
likelihood_parameter_bounds["a_2_max"] = 0.05
likelihood_parameter_bounds["spin_template"] = "aligned"
elif mode == "phenompv2_bns_roq":
waveform_approximant = "IMRPhenomPv2"
roq_dir = "/home/roq/IMRPhenomPv2/bns"
likelihood_parameter_bounds["a_1_max"] = 0.99
likelihood_parameter_bounds["a_2_max"] = 0.99
likelihood_parameter_bounds["spin_template"] = "precessing"
elif mode == "phenompv2nrtidalv2_roq":
waveform_approximant = "IMRPhenomPv2_NRTidalv2"
roq_dir = "/home/roq/IMRPhenomPv2_NRTidalv2/bns"
likelihood_parameter_bounds["a_1_max"] = 0.4
likelihood_parameter_bounds["a_2_max"] = 0.4
likelihood_parameter_bounds["spin_template"] = "precessing"
likelihood_parameter_bounds["lambda_1_max"] = 5000
likelihood_parameter_bounds["lambda_2_max"] = 5000
logger.info(f"Searching for a basis file in {roq_dir} ...")
if 4.0 > chirp_mass > 2.31:
basis = os.path.join(roq_dir, "basis_64s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 2.1
likelihood_parameter_bounds["chirp_mass_max"] = 4.0
maximum_frequency = 2048
duration = 64
elif chirp_mass > 1.54:
basis = os.path.join(roq_dir, "basis_128s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 1.4
likelihood_parameter_bounds["chirp_mass_max"] = 2.6
maximum_frequency = 4096
duration = 128
elif chirp_mass > 1.012:
basis = os.path.join(roq_dir, "basis_256s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 0.92
likelihood_parameter_bounds["chirp_mass_max"] = 1.7
maximum_frequency = 4096
duration = 256
elif chirp_mass > 0.6:
basis = os.path.join(roq_dir, "basis_512s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 0.6
likelihood_parameter_bounds["chirp_mass_max"] = 1.1
maximum_frequency = 4096
duration = 512
else:
raise ValueError(
f"No BNS-mass {waveform_approximant} basis has been found for "
f"chirp_mass={chirp_mass}!"
)
logger.info(f"The selected ROQ basis file is {basis}.")
return (
{
"likelihood_type": "ROQGravitationalWaveTransient",
"roq_linear_matrix": basis,
"roq_quadratic_matrix": basis,
"roq_scale_factor": 1,
"waveform_approximant": waveform_approximant,
},
likelihood_parameter_bounds,
20,
maximum_frequency,
duration,
)
[docs]
def _get_cbc_likelihood_args_from_json(filename, trigger_values):
"""Load input JSON file containing likelihood settings and determine
appropriate likelihood arguments and parameter bounds depending on input
trigger values.
The json file is supposed to contain `likelihood_args`,
`likelihood_parameter_bounds`, and/or `trigger_dependent`. The first two
contain default arguments and parameter bounds respectively. The last item
contains trigger-dependent settings to update the default settings. It
contains `range`, `likelihood_args`, and/or `likelihood_parameter_bounds`.
`range` contains dictionary of trigger-parameter ranges, whose keys are
parameter names (`chirp_mass`, `mass_ratio`, `spin_1z`, and/or `spin_2z`)
and values are lists of their ranges. `likelihood_args` contains lists of
arguments, one of which is chosen depending on trigger values and used to
update the default likelihood arguments. `likelihood_parameter_bounds`
contains lists of parameter bounds to update their default.
Parameters
----------
filename: str
trigger_values: dict
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
Example
-------
>>> import json
>>> from bilby_pipe.gracedb import _get_cbc_likelihood_args_from_json
>>> settings = {
... "likelihood_args": {
... "likelihood_type": "ROQGravitationalWaveTransient",
... "minimum_frequency": 20,
... "maximum_frequency": 2048,
... "duration": 4,
... },
... "trigger_dependent": {
... "range": {"chirp_mass": [[30, 40], [40, 50]]},
... "likelihood_args": [
... {"roq_folder": "basis_for_30Msun_to_40Msun"},
... {"roq_folder": "basis_for_40Msun_to_50Msun"},
... ]
... },
... }
>>> with open("test.json", "r") as f:
... json.dump(settings, f)
>>> _get_cbc_likelihood_args_from_json("test.json", {"chirp_mass": 35})
09:04 bilby_pipe INFO : Loading likelihood settings from test.json ...
({'likelihood_type': 'ROQGravitationalWaveTransient', 'roq_folder': 'basis_for_30Msun_to_40Msun'}, {}, 20, 2048, 4)
>>> _get_cbc_likelihood_args_from_json("test.json", {"chirp_mass": 45})
09:04 bilby_pipe INFO : Loading likelihood settings from test.json ...
({'likelihood_type': 'ROQGravitationalWaveTransient', 'roq_folder': 'basis_for_40Msun_to_50Msun'}, {}, 20, 2048, 4)
"""
import json
logger.info(f"Loading likelihood settings from {filename} ...")
with open(filename, "r") as ff:
settings = json.load(ff)
likelihood_args = dict()
if "likelihood_args" in settings:
likelihood_args.update(settings["likelihood_args"])
likelihood_parameter_bounds = dict()
if "likelihood_parameter_bounds" in settings:
likelihood_parameter_bounds.update(settings["likelihood_parameter_bounds"])
if "trigger_dependent" in settings:
trigger_range_dict = settings["trigger_dependent"]["range"]
number_of_ranges = len(list(trigger_range_dict.values())[0])
in_range = np.ones(number_of_ranges, dtype=bool)
for key in trigger_range_dict:
trigger = trigger_values[key]
trigger_range = np.array(trigger_range_dict[key])
in_range *= trigger >= trigger_range[:, 0]
in_range *= trigger <= trigger_range[:, 1]
if not any(in_range):
raise ValueError(
"No likelihood settings found for the trigger values: "
f"{trigger_values}!"
)
selected_idx = np.arange(number_of_ranges)[in_range][0]
trigger_dependent_settings = settings["trigger_dependent"]
if "likelihood_args" in trigger_dependent_settings:
likelihood_args.update(
trigger_dependent_settings["likelihood_args"][selected_idx]
)
if "likelihood_parameter_bounds" in trigger_dependent_settings:
likelihood_parameter_bounds.update(
trigger_dependent_settings["likelihood_parameter_bounds"][selected_idx]
)
minimum_frequency = likelihood_args["minimum_frequency"]
likelihood_args.pop("minimum_frequency")
maximum_frequency = likelihood_args["maximum_frequency"]
likelihood_args.pop("maximum_frequency")
duration = likelihood_args["duration"]
likelihood_args.pop("duration")
return (
likelihood_args,
likelihood_parameter_bounds,
minimum_frequency,
maximum_frequency,
duration,
)
[docs]
def create_config_file(
candidate,
gracedb,
outdir,
channel_dict,
sampler_kwargs,
webdir,
search_type="cbc",
cbc_likelihood_mode="phenompv2_bbh_roq",
settings=None,
psd_cut=0.95,
):
"""Creates ini file from defaults and candidate contents
Parameters
----------
candidate:
Contains contents of GraceDB event
gracedb: str
GraceDB id of event
outdir: str
Output directory where the ini file and all output is written
channel_dict: dict
Dictionary of channel names
sampler_kwargs: str
Set of sampler arguments, or option for set of sampler arguments
webdir: str
Directory to store summary pages
search_type: str
What kind of search identified the trigger, options are "cbc" and "burst"
cbc_likelihood_mode: str
Built-in CBC likelihood mode or path to a JSON file containing
likelihood settings. The built-in settings include 'phenompv2_bbh_roq',
'phenompv2_bns_roq', 'phenompv2nrtidalv2_roq',
'lowspin_phenomd_narrowmc_roq', 'lowspin_phenomd_broadmc_roq', and
'test'.
settings: str
JSON filename containing settings to override the defaults
psd_cut: float
Fractional maximum frequency cutoff relative to the maximum frequency of pipeline psd
Returns
-------
filename: str
Generated ini filename
"""
if settings is not None:
import json
with open(settings, "r") as ff:
settings = json.load(ff)
else:
settings = dict()
if search_type == "cbc":
(
trigger_values,
superevent,
trigger_time,
ifos,
reference_frame,
time_reference,
) = _read_cbc_candidate(candidate)
(
likelihood_args,
likelihood_parameter_bounds,
minimum_frequency,
maximum_frequency,
duration,
) = _get_cbc_likelihood_args(cbc_likelihood_mode, trigger_values)
(
prior_file,
distance_marginalization_lookup_table,
) = generate_cbc_prior_from_template(
trigger_values["chirp_mass"],
likelihood_parameter_bounds,
outdir,
fast_test=(sampler_kwargs == "FastTest"),
phase_marginalization=likelihood_args.get("phase_marginalization", True),
)
calibration_model, calib_dict = calibration_dict_lookup(trigger_time, ifos)
if calibration_model is None:
spline_calibration_nodes = 0
elif sampler_kwargs == "FastTest":
spline_calibration_nodes = 4
else:
spline_calibration_nodes = 10
extra_config_arguments = dict(
reference_frequency=100,
time_marginalization=False,
distance_marginalization=True,
phase_marginalization=True,
distance_marginalization_lookup_table=distance_marginalization_lookup_table,
plot_trace=True,
plot_data=True,
calibration_model=calibration_model,
spline_calibration_envelope_dict=calib_dict,
spline_calibration_nodes=spline_calibration_nodes,
)
extra_config_arguments.update(likelihood_args)
if (
"lambda_1_max" in likelihood_parameter_bounds
or "lambda_2_max" in likelihood_parameter_bounds
):
extra_config_arguments["default_prior"] = "BNSPriorDict"
extra_config_arguments[
"frequency_domain_source_model"
] = "binary_neutron_star_roq"
elif search_type == "burst":
centre_frequency, superevent, trigger_time, ifos = _read_burst_candidate(
candidate
)
minimum_frequency = min(20, centre_frequency / 2)
maximum_frequency = next_power_of_2(centre_frequency * 2)
duration = 4
extra_config_arguments = dict(
frequency_domain_source_model="bilby.gw.source.sinegaussian",
default_prior="PriorDict",
time_marginalization=False,
phase_marginalization=False,
sampler_kwargs="FastTest",
)
prior_file = generate_burst_prior_from_template(
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
outdir=outdir,
)
else:
raise BilbyPipeError(
f"search_type should be either 'cbc' or 'burst', not {search_type}"
)
config_dict = dict(
label=gracedb,
outdir=outdir,
accounting="ligo.dev.o4.cbc.pe.bilby",
maximum_frequency=maximum_frequency,
minimum_frequency=minimum_frequency,
sampling_frequency=16384,
trigger_time=trigger_time,
detectors=ifos,
channel_dict=channel_dict,
deltaT=0.2,
prior_file=prior_file,
duration=duration,
sampler="dynesty",
sampler_kwargs=sampler_kwargs,
webdir=webdir,
local_generation=False,
local_plot=False,
transfer_files=False,
create_summary=False,
summarypages_arguments={"gracedb": gracedb},
plot_trace=True,
plot_data=True,
plot_calibration=False,
plot_corner=False,
plot_marginal=False,
plot_skymap=False,
plot_waveform=False,
overwrite_outdir=True,
result_format="hdf5",
reference_frame=reference_frame,
time_reference=time_reference,
)
if sampler_kwargs == "FastTest":
config_dict["n_parallel"] = 2
else:
config_dict["n_parallel"] = 4
if candidate.get("coinc_file", None) is not None:
psd_dict, psd_maximum_frequency = extract_psds_from_xml(
coinc_file=candidate["coinc_file"], ifos=ifos, outdir=outdir
)
config_dict["psd_dict"] = psd_dict
if psd_maximum_frequency is not None:
psd_maximum_frequency *= min(psd_cut, 1)
if config_dict["maximum_frequency"] > psd_maximum_frequency:
config_dict["maximum_frequency"] = psd_maximum_frequency
logger.info(
f"maximum_frequency is reduced to {psd_maximum_frequency} "
"due to the limination of pipeline psd"
)
config_dict.update(extra_config_arguments)
config_dict["summarypages_arguments"]["nsamples_for_skymap"] = 5000
config_dict.update(settings)
comment = (
"# Configuration ini file generated from GraceDB "
f"for event id {gracedb} superevent id {superevent}"
)
filename = f"{outdir}/bilby_config.ini"
_parser = parser.create_parser()
_parser.write_to_file(
filename=filename,
args=config_dict,
overwrite=True,
include_description=False,
exclude_default=True,
comment=comment,
)
return filename
[docs]
def _get_distance_lookup(chirp_mass, phase_marginalization=True):
"""Return appropriate distance bounds and lookup table
Parameters
----------
chirp_mass: float
phase_marginalization: bool (optional, default is True)
Returns
-------
distance_bounds: tuple
lookup_table: str
"""
if chirp_mass > 13.53:
_duration = 4
elif chirp_mass > 8.73:
_duration = 8
elif chirp_mass > 5.66:
_duration = 16
elif chirp_mass > 3.68:
_duration = 32
elif chirp_mass > 2.39:
_duration = 64
else:
_duration = 128
distance_bounds = DEFAULT_DISTANCE_LOOKUPS[str(_duration) + "s"]
if phase_marginalization:
filename = f"{_duration}s_distance_marginalization_lookup_phase.npz"
else:
filename = f"{_duration}s_distance_marginalization_lookup.npz"
lookup_table = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files",
filename,
)
return distance_bounds, lookup_table
[docs]
def generate_cbc_prior_from_template(
chirp_mass,
likelihood_parameter_bounds,
outdir,
fast_test=False,
phase_marginalization=True,
):
"""Generate a cbc prior file from a template and write it to file. This
returns the paths to the prior file and the corresponding distance look-up
table
Parameters
----------
chirp_mass: float
likelihood_parameter_bounds: dict
outdir: str
fast_test: bool (optional, default is False)
phase_marginalization: bool (optional, default is True)
Returns
-------
prior_file: str
lookup_table: str
"""
if chirp_mass < 2:
bounds = (chirp_mass - 0.01, chirp_mass + 0.01)
elif chirp_mass < 4:
bounds = (chirp_mass - 0.1, chirp_mass + 0.1)
elif chirp_mass < 8:
bounds = (chirp_mass * 0.9, chirp_mass * 1.1)
else:
bounds = (0, 1000000)
chirp_mass_min = max(likelihood_parameter_bounds["chirp_mass_min"], bounds[0])
chirp_mass_max = min(likelihood_parameter_bounds["chirp_mass_max"], bounds[1])
to_add = ""
if "comp_min" in likelihood_parameter_bounds:
comp_min = likelihood_parameter_bounds["comp_min"]
to_add += (
f"mass_1 = Constraint(name='mass_1', minimum={comp_min}, maximum=1000)\n"
)
to_add += (
f"mass_2 = Constraint(name='mass_2', minimum={comp_min}, maximum=1000)\n"
)
if "lambda_1_max" in likelihood_parameter_bounds:
lambda_1_max = likelihood_parameter_bounds["lambda_1_max"]
to_add += (
f"lambda_1 = Uniform(name='lambda_1', minimum=0, maximum={lambda_1_max})\n"
)
if "lambda_2_max" in likelihood_parameter_bounds:
lambda_2_max = likelihood_parameter_bounds["lambda_2_max"]
to_add += (
f"lambda_2 = Uniform(name='lambda_2', minimum=0, maximum={lambda_2_max})\n"
)
distance_bounds, lookup_table = _get_distance_lookup(
chirp_mass, phase_marginalization=phase_marginalization
)
if fast_test:
template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files/fast.prior.template",
)
with open(template, "r") as old_prior:
prior_string = (
old_prior.read().format(
mc_min=chirp_mass_min,
mc_max=chirp_mass_max,
d_min=distance_bounds[0],
d_max=distance_bounds[1],
)
+ to_add
)
else:
spin_template = likelihood_parameter_bounds["spin_template"]
a_1_max = likelihood_parameter_bounds["a_1_max"]
a_2_max = likelihood_parameter_bounds["a_2_max"]
if spin_template == "precessing":
template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files/precessing_spin.prior.template",
)
elif spin_template == "aligned":
template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files/aligned_spin.prior.template",
)
else:
raise ValueError(f"Unknown spin template: {spin_template}")
with open(template, "r") as old_prior:
prior_string = (
old_prior.read().format(
mc_min=chirp_mass_min,
mc_max=chirp_mass_max,
q_min=likelihood_parameter_bounds["mass_ratio_min"],
a_1_max=a_1_max,
a_2_max=a_2_max,
d_min=distance_bounds[0],
d_max=distance_bounds[1],
)
+ to_add
)
prior_file = os.path.join(outdir, "online.prior")
with open(prior_file, "w") as new_prior:
new_prior.write(prior_string)
return prior_file, lookup_table
[docs]
def generate_burst_prior_from_template(
minimum_frequency, maximum_frequency, outdir, template=None
):
"""Generate a prior file from a template and write it to file
Parameters
----------
minimum_frequency: float
Minimum frequency for prior
maximum_frequency: float
Maximum frequency for prior
outdir: str
Path to the outdir (the prior is written to outdir/online.prior)
template: str
Alternative template file to use, otherwise the
data_files/roq.prior.template file is used
"""
if template is None:
template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files/burst.prior.template",
)
with open(template, "r") as old_prior:
prior_string = old_prior.read().format(
minimum_frequency=minimum_frequency, maximum_frequency=maximum_frequency
)
prior_file = os.path.join(outdir, "online.prior")
with open(prior_file, "w") as new_prior:
new_prior.write(prior_string)
return prior_file
[docs]
def create_parser():
parser = argparse.ArgumentParser(
prog="bilby_pipe gracedb access",
usage=__doc__,
formatter_class=argparse.RawTextHelpFormatter,
)
group1 = parser.add_mutually_exclusive_group(required=True)
group1.add_argument("--gracedb", type=str, help="GraceDB event id")
group1.add_argument("--json", type=str, help="Path to json GraceDB file")
parser.add_argument(
"--psd-file",
type=str,
help="Path to ligolw-xml file containing the PSDs for the interferometers.",
)
parser.add_argument(
"--convert-to-flat-in-component-mass",
action="store_true",
default=False,
help=(
"Convert a flat-in chirp mass and mass-ratio prior file to flat \n"
"in component mass during the post-processing. Note, the prior \n"
"must be uniform in Mc and q with constraints in m1 and m2 for \n"
"this to work. \n"
),
)
parser.add_argument(
"--outdir",
type=str,
help="Output directory where the ini file and all output is written.",
)
parser.add_argument(
"--output",
type=str,
choices=["ini", "full", "full-local", "full-submit"],
help=(
"Flag to create ini, generate directories and/or submit. \n"
" ini : generates ini file \n"
" full : generates ini and dag submission files (default) \n"
" full-local : generates ini and dag submission files and run locally \n"
" full-submit : generates ini and dag submission files and submits to condor \n"
),
default="full",
)
parser.add_argument(
"--gracedb-url",
type=str,
help=(
"GraceDB service url. \n"
" Main page : https://gracedb.ligo.org/api/ (default) \n"
" Playground : https://gracedb-playground.ligo.org/api/ \n"
),
default="https://gracedb.ligo.org/api/",
)
parser.add_argument(
"--channel-dict",
type=str,
default="online",
choices=list(CHANNEL_DICTS.keys()),
help=(
"Channel dictionary. \n"
" online : use for main GraceDB page events (default)\n"
" o2replay : use for playground GraceDB page events\n"
" o3replay : use for playground GraceDB page events\n"
),
)
parser.add_argument(
"--sampler-kwargs",
type=str,
default="DynestyDefault",
help=(
"Dictionary of sampler-kwargs to pass in, e.g., {nlive: 1000} OR "
"pass pre-defined set of sampler-kwargs {DynestyDefault, BilbyMCMCDefault, FastTest}"
),
)
parser.add_argument(
"--cbc-likelihood-mode",
type=str,
default="phenompv2_bbh_roq",
help=(
"Built-in CBC likelihood mode or path to a JSON file containing likelihood settings. "
"The built-in settings include 'phenompv2_bbh_roq', 'phenompv2_bns_roq', 'phenompv2nrtidalv2_roq', "
"'lowspin_phenomd_narrowmc_roq', 'lowspin_phenomd_broadmc_roq', and 'test'."
),
)
parser.add_argument(
"--webdir",
type=str,
default=None,
help=(
"Directory to store summary pages. \n"
" If not given, defaults to outdir/results_page"
),
)
parser.add_argument(
"--settings",
type=str,
default=None,
help="JSON file containing extra settings to override the defaults",
)
parser.add_argument(
"--psd-cut",
type=float,
default=0.95,
help=(
"maximum frequency is set to this value multiplied by the maximum frequency of psd contained in coinc.xml."
" This is to avoid likelihood overflow caused by the roll-off of pipeline psd due to low-pass filter."
),
)
return parser
[docs]
def main(args=None, unknown_args=None):
if args is None:
args, unknown_args = create_parser().parse_known_args()
elif unknown_args is None:
unknown_args = []
if len(unknown_args) > 1 and args.output == "ini":
msg = [
tcolors.WARNING,
f"Unrecognized arguments {unknown_args}, these will be ignored",
tcolors.END,
]
logger.warning(" ".join(msg))
outdir = args.outdir
if args.json:
candidate = read_from_json(args.json)
gracedb = candidate["graceid"]
if outdir is None:
outdir = f"outdir_{gracedb}"
check_directory_exists_and_if_not_mkdir(outdir)
if args.psd_file is not None and os.path.isfile(args.psd_file):
candidate["coinc_file"] = args.psd_file
elif args.gracedb:
gracedb = args.gracedb
gracedb_url = args.gracedb_url
if outdir is None:
outdir = f"outdir_{gracedb}"
check_directory_exists_and_if_not_mkdir(outdir)
candidate = read_from_gracedb(gracedb, gracedb_url, outdir)
else:
raise BilbyPipeError("Either gracedb ID or json file must be provided.")
if args.webdir is not None:
webdir = args.webdir
else:
webdir = os.path.join(outdir, "results_page")
sampler_kwargs = args.sampler_kwargs
channel_dict = CHANNEL_DICTS[args.channel_dict.lower()]
search_type = candidate["group"].lower()
if search_type not in ["cbc", "burst"]:
raise BilbyPipeError(f"Candidate group {candidate['group']} not recognised.")
filename = create_config_file(
candidate=candidate,
gracedb=gracedb,
outdir=outdir,
channel_dict=channel_dict,
sampler_kwargs=sampler_kwargs,
webdir=webdir,
psd_cut=args.psd_cut,
search_type=search_type,
cbc_likelihood_mode=args.cbc_likelihood_mode,
settings=args.settings,
)
if args.output == "ini":
logger.info(
"Generating ini with default settings. Run using the command: \n"
f" $ bilby_pipe {filename}"
)
else:
arguments = ["bilby_pipe", filename]
if args.output == "full":
logger.info("Generating dag submissions files")
if args.output == "full-local":
logger.info("Generating dag submission files, running locally")
arguments.append("--local")
if args.output == "full-submit":
logger.info("Generating dag submissions files, submitting to condor")
arguments.append("--submit")
if len(unknown_args) > 1:
arguments = arguments + unknown_args
run_command_line(arguments)