""" Tool for running online bilby PE using GraceDB events
The functionality of much of these utility assumes the user is running on the
CIT cluster, e.g. the ROQ and calibration directories are in there usual place
"""
import argparse
import json
import os
import shutil
import time
import numpy as np
from gwpy.timeseries import TimeSeries, TimeSeriesList
from . import parser
from .utils import (
DEFAULT_DISTANCE_LOOKUPS,
BilbyPipeError,
check_directory_exists_and_if_not_mkdir,
logger,
next_power_of_2,
run_command_line,
tcolors,
test_connection,
)
# Default channels from: https://wiki.ligo.org/LSC/JRPComm/ObsRun3
# NOTE: can these be read from gracedb?
[docs]
CHANNEL_DICTS = dict(
online=dict(
H1="GDS-CALIB_STRAIN_CLEAN", L1="GDS-CALIB_STRAIN_CLEAN", V1="Hrec_hoft_16384Hz"
),
o2replay=dict(
H1="GDS-CALIB_STRAIN_O2Replay",
L1="GDS-CALIB_STRAIN_O2Replay",
V1="Hrec_hoft_16384Hz_O2Replay",
),
o3replay=dict(
H1="GDS-CALIB_STRAIN_INJ1_O3Replay",
L1="GDS-CALIB_STRAIN_INJ1_O3Replay",
V1="Hrec_hoft_16384Hz_INJ1_O3Replay",
),
)
[docs]
def x509userproxy(outdir):
"""Copies X509_USER_PROXY certificate from user's os.environ and
places it inside the outdir, if the X509_USER_PROXY exists.
Parameters
----------
outdir: str
Output directory where X509_USER_PROXY certificate is copied to.
Returns
-------
x509userproxy: str, None
New path to X509_USER_PROXY certification file
None if X509_USER_PROXY certificate does not exist, or if
the X509_USER_PROXY cannot be copied.
"""
x509userproxy = None
cert_alias = "X509_USER_PROXY"
try:
cert_path = os.environ[cert_alias]
new_cert_path = os.path.join(outdir, "." + os.path.basename(cert_path))
shutil.copyfile(src=cert_path, dst=new_cert_path)
x509userproxy = new_cert_path
except FileNotFoundError as e:
logger.warning(
"Environment variable X509_USER_PROXY does not point to a file. "
f"Error while copying file: {e}. "
"Try running `$ ligo-proxy-init albert.einstein`"
)
except KeyError:
logger.warning(
"Environment variable X509_USER_PROXY not set"
" Try running `$ ligo-proxy-init albert.einstein`"
)
return x509userproxy
[docs]
def read_from_gracedb(gracedb, gracedb_url, outdir):
"""
Read GraceDB events from GraceDB
Parameters
----------
gracedb: str
GraceDB id of event
gracedb_url: str
Service url for GraceDB events
GraceDB 'https://gracedb.ligo.org/api/' (default)
GraceDB-playground 'https://gracedb-playground.ligo.org/api/'
outdir: str
Output directory
Returns
-------
event:
Contains contents of GraceDB event from GraceDB, json format
"""
from urllib.error import HTTPError
from ligo.gracedb.rest import GraceDb
test_connection()
logger.info(f"Connecting to {gracedb_url}")
try:
# TODO: do we need this x509 proxy I'm fairly sure my local jobs aren't using it
client = GraceDb(cred=x509userproxy(outdir), service_url=gracedb_url)
except IOError:
logger.warning("Failed to connect to GraceDB")
raise
logger.info(f"Querying event {gracedb}")
event = client.event(gracedb).json()
with open(f"{outdir}/{gracedb}.json", "w") as ff:
json.dump(event, ff, indent=2)
logger.info(f"Requesting coinc.xml for {gracedb}")
try:
data = client.files(gracedb, filename="coinc.xml")
coinc_filename = f"{outdir}/coinc.xml"
with open(coinc_filename, "wb") as ff:
ff.write(data.data)
event["coinc_file"] = coinc_filename
except HTTPError:
logger.warning(
"Failed to download coinc.xml. PSDs will be generated by bilby_pipe."
)
event["coinc_file"] = None
return event
[docs]
def read_from_json(json_file):
"""Read GraceDB events from json file
Parameters
----------
json_file: str
Filename of json json file output
Returns
-------
candidate: dict
Contains contents of GraceDB event from json, json format
"""
if os.path.isfile(json_file) is False:
raise FileNotFoundError(f"File {json_file} not found")
try:
with open(json_file, "r") as file:
candidate = json.load(file)
except IOError:
logger.warning("Unable to load event contents of json file")
return candidate
[docs]
def calibration_lookup(trigger_time, detector):
"""Lookup function for the relevant calibration file
Assumes that it is running on CIT where the calibration files are stored
under /home/cbc/pe/O3/calibrationenvelopes
Parameters
----------
trigger_time: float
The trigger time of interest
detector: str [H1, L1, V1]
Detector string
Returns
-------
filepath: str
The path to the relevant calibration envelope file. If no calibration
file can be determined, None is returned.
"""
# FIXME: point to updated calibration envelopes
base = "/home/cbc/pe/O3/calibrationenvelopes"
CALENVS_LOOKUP = dict(
H1=os.path.join(base, "LIGO_Hanford/H_CalEnvs.txt"),
L1=os.path.join(base, "LIGO_Livingston/L_CalEnvs.txt"),
V1=os.path.join(base, "Virgo/V_CalEnvs.txt"),
)
if os.path.isdir(base) is False:
raise BilbyPipeError(f"Unable to read from calibration folder {base}")
calenv = CALENVS_LOOKUP[detector]
times = list()
files = dict()
with open(calenv, "r") as f:
for line in f:
time, filename = line.rstrip("\n").rstrip().split(" ")
times.append(float(time))
files[float(time)] = filename
times = sorted(times)
if trigger_time < times[0]:
raise BilbyPipeError(
"Requested trigger time prior to earliest calibration file"
)
for time in times:
if trigger_time > time:
directory = os.path.dirname(calenv)
calib_file = f"{directory}/{files[time]}"
return os.path.abspath(calib_file)
[docs]
def calibration_dict_lookup(trigger_time, detectors):
"""Dictionary lookup function for the relevant calibration files
Parameters
----------
trigger_time: float
The trigger time of interest
detectors: list
List of detector string
Returns
-------
calibration_model, calibration_dict: str, dict
Calibration model string and dictionary of paths to the relevant
calibration envelope file.
"""
try:
calibration_dict = {
det: calibration_lookup(trigger_time, det) for det in detectors
}
return "CubicSpline", calibration_dict
except BilbyPipeError:
return None, None
[docs]
def read_candidate(candidate):
"""Read a gracedb candidate json dictionary"""
if "extra_attributes" not in candidate:
raise BilbyPipeError(
"Cannot parse event dictionary, not 'extra_attributes' present."
)
elif "CoincInspiral" in candidate["extra_attributes"]:
return _read_cbc_candidate(candidate)
elif "MultiBurst" in candidate["extra_attributes"]:
return _read_burst_candidate(candidate)
[docs]
def _read_cbc_candidate(candidate):
if "mchirp" not in candidate["extra_attributes"]["CoincInspiral"]:
raise BilbyPipeError(
f"Unable to determine chirp mass for {candidate['graceid']} from GraceDB"
)
sngl = candidate["extra_attributes"]["SingleInspiral"][0]
trigger_values = {}
trigger_values["chirp_mass"] = sngl["mchirp"]
trigger_values["mass_ratio"] = min(sngl["mass1"], sngl["mass2"]) / max(
sngl["mass1"], sngl["mass2"]
)
trigger_values["spin_1z"] = sngl["spin1z"]
trigger_values["spin_2z"] = sngl["spin2z"]
superevent = candidate["superevent"]
ifos = candidate["extra_attributes"]["CoincInspiral"]["ifos"].split(",")
sngl = candidate["extra_attributes"]["SingleInspiral"]
ifos = [entry["ifo"] for entry in sngl]
snrs = [entry["snr"] for entry in sngl]
# NOTE: the channel name here varies by pipeline and isn't always available
# channels = {entry["ifo"]: entry["channel"][3:] for entry in sngl}
best_to_worst = np.argsort(snrs)[::-1]
best_event = sngl[best_to_worst[0]]
trigger_time = best_event["end_time"] + best_event["end_time_ns"] / 1e9
sorted_ifos = [ifos[idx] for idx in best_to_worst]
time_reference = sorted_ifos[0]
if len(sorted_ifos) > 1:
reference_frame = "".join(sorted_ifos[:2])
else:
reference_frame = "sky"
return (
trigger_values,
superevent,
trigger_time,
ifos,
reference_frame,
time_reference,
)
[docs]
def _read_burst_candidate(candidate):
central_frequency = candidate["extra_attributes"]["MultiBurst"]["central_freq"]
superevent = candidate["superevent"]
trigger_time = candidate["gpstime"]
ifos = candidate["extra_attributes"]["MultiBurst"]["ifos"].split()
return central_frequency, superevent, trigger_time, ifos
[docs]
def _get_cbc_likelihood_args(mode, trigger_values):
"""Return cbc likelihood arguments and quantities characterizing likelihood
Parameters
----------
mode: str
chirp_mass: float
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
"""
if mode in ["phenompv2_bbh_roq"]:
return _choose_phenompv2_bbh_roq(trigger_values["chirp_mass"])
elif mode in [
"lowspin_phenomd_narrowmc_roq",
"lowspin_phenomd_broadmc_roq",
"lowspin_phenomd_fhigh1024_roq",
"lowspin_taylorf2_roq",
"phenompv2_bns_roq",
"phenompv2nrtidalv2_roq",
]:
return _choose_bns_roq(trigger_values["chirp_mass"], mode)
elif mode in ["low_q_phenompv2_roq"]:
return _choose_low_q_pv2_roq(trigger_values["chirp_mass"])
elif mode in ["phenomxphm_roq"]:
return _choose_xphm_roq(trigger_values["chirp_mass"])
elif mode in ["test"]:
return _get_default_likelihood_args(trigger_values)
else:
return _get_cbc_likelihood_args_from_json(mode, trigger_values)
[docs]
def _choose_phenompv2_bbh_roq(chirp_mass, ignore_no_params=False):
"""Choose an appropriate PhenomPv2 ROQ folder, and return likelihood
arguments and quantities characterizing likelihood. The bases were
developed in the work of arXiv:1604.08253. For a high-mass trigger with
chirp mass above 35 solar mass, this returns arguments with the standard
likelihood `GravitationalWaveTransient`, as the analysis is computationally
cheap anyway.
Parameters
----------
chirp_mass: float
ignore_no_params: bool
If True, this ignores FileNotFoundError raised when roq params file is
not found, which is useful for testing this command outside the CIT
cluster.
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
"""
likelihood_args = {
"waveform_approximant": "IMRPhenomPv2",
}
likelihood_parameter_bounds = {"spin_template": "precessing"}
if chirp_mass > 35:
likelihood_args["likelihood_type"] = "GravitationalWaveTransient"
likelihood_args["time_marginalization"] = True
likelihood_args["jitter_time"] = True
likelihood_parameter_bounds["chirp_mass_min"] = 25
likelihood_parameter_bounds["chirp_mass_max"] = 200
likelihood_parameter_bounds["mass_ratio_min"] = 0.125
likelihood_parameter_bounds["a_1_max"] = 0.99
likelihood_parameter_bounds["a_2_max"] = 0.99
minimum_frequency = 20
maximum_frequency = 1024
duration = 4
else:
likelihood_args["likelihood_type"] = "ROQGravitationalWaveTransient"
roq_scale_factor = 1
duration = _get_default_duration(chirp_mass)
likelihood_args["roq_folder"] = f"/home/cbc/ROQ_data/IMRPhenomPv2/{duration}s"
if chirp_mass < 0.9:
roq_scale_factor = 2
elif chirp_mass < 1.43:
roq_scale_factor = 1.6
roq_params_file = os.path.join(likelihood_args["roq_folder"], "params.dat")
if os.path.exists(roq_params_file):
roq_params = np.genfromtxt(roq_params_file, names=True)
elif ignore_no_params:
roq_params = {
"chirpmassmin": roq_scale_factor * chirp_mass * 0.9,
"chirpmassmax": roq_scale_factor * chirp_mass * 1.1,
"qmax": 8,
"compmin": 0,
"chiL1min": -0.99,
"chiL1max": 0.99,
"chiL2min": -0.99,
"chiL2max": 0.99,
"flow": 20,
"fhigh": 1024,
"seglen": 4,
}
logger.warning(
f"{roq_params_file} not found. ROQ parameters are set to {roq_params}."
)
else:
raise FileNotFoundError(f"{roq_params_file} not found.")
likelihood_args["roq_scale_factor"] = roq_scale_factor
likelihood_parameter_bounds["chirp_mass_min"] = (
roq_params["chirpmassmin"] / roq_scale_factor
)
likelihood_parameter_bounds["chirp_mass_max"] = (
roq_params["chirpmassmax"] / roq_scale_factor
)
likelihood_parameter_bounds["mass_ratio_min"] = 1 / roq_params["qmax"]
likelihood_parameter_bounds["comp_min"] = (
roq_params["compmin"] / roq_scale_factor
)
likelihood_parameter_bounds["a_1_max"] = roq_params["chiL1max"]
likelihood_parameter_bounds["a_2_max"] = roq_params["chiL2max"]
minimum_frequency = roq_params["flow"] * roq_scale_factor
maximum_frequency = roq_params["fhigh"] * roq_scale_factor
duration = roq_params["seglen"] / roq_scale_factor
return (
likelihood_args,
likelihood_parameter_bounds,
minimum_frequency,
maximum_frequency,
duration,
)
[docs]
def _choose_bns_roq(chirp_mass, mode):
"""Choose an appropriate BNS-mass ROQ basis file, and return likelihood
arguments and quantities characterizing likelihood. The review information
of those bases are found at https://git.ligo.org/pe/O4/review_bns_roq/-/wikis.
Parameters
----------
chirp_mass: float
mode: str
Allowed options are "lowspin_phenomd_narrowmc_roq",
"lowspin_phenomd_broadmc_roq", "phenompv2_bns_roq",
and "phenompv2nrtidalv2_roq".
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
"""
# When phase marginalization is used, psi + np.pi / 2 is indistingushable
# from psi. That is why decreasing its maximum to pi / 2 does not change
# the inference results at all.
likelihood_parameter_bounds = {"mass_ratio_min": 0.125, "psi_max": np.pi / 2}
if mode == "lowspin_phenomd_narrowmc_roq":
waveform_approximant = "IMRPhenomD"
roq_dir = "/home/roq/IMRPhenomD/lowspin_narrowmc_bns"
likelihood_parameter_bounds["a_1_max"] = 0.05
likelihood_parameter_bounds["a_2_max"] = 0.05
likelihood_parameter_bounds["spin_template"] = "aligned"
elif mode == "lowspin_phenomd_broadmc_roq":
waveform_approximant = "IMRPhenomD"
roq_dir = "/home/roq/IMRPhenomD/lowspin_broadmc_bns"
likelihood_parameter_bounds["a_1_max"] = 0.05
likelihood_parameter_bounds["a_2_max"] = 0.05
likelihood_parameter_bounds["spin_template"] = "aligned"
elif mode == "lowspin_phenomd_fhigh1024_roq":
waveform_approximant = "IMRPhenomD"
roq_dir = "/home/roq/IMRPhenomD/lowspin_fhigh1024"
likelihood_parameter_bounds["a_1_max"] = 0.05
likelihood_parameter_bounds["a_2_max"] = 0.05
likelihood_parameter_bounds["spin_template"] = "aligned"
elif mode == "lowspin_taylorf2_roq":
waveform_approximant = "TaylorF2"
roq_dir = "/home/roq/TaylorF2/lowspin_narrowmc_bns"
likelihood_parameter_bounds["a_1_max"] = 0.05
likelihood_parameter_bounds["a_2_max"] = 0.05
likelihood_parameter_bounds["spin_template"] = "aligned"
elif mode == "phenompv2_bns_roq":
waveform_approximant = "IMRPhenomPv2"
roq_dir = "/home/roq/IMRPhenomPv2/bns"
likelihood_parameter_bounds["a_1_max"] = 0.99
likelihood_parameter_bounds["a_2_max"] = 0.99
likelihood_parameter_bounds["spin_template"] = "precessing"
elif mode == "phenompv2nrtidalv2_roq":
waveform_approximant = "IMRPhenomPv2_NRTidalv2"
roq_dir = "/home/roq/IMRPhenomPv2_NRTidalv2/bns"
likelihood_parameter_bounds["a_1_max"] = 0.4
likelihood_parameter_bounds["a_2_max"] = 0.4
likelihood_parameter_bounds["spin_template"] = "precessing"
likelihood_parameter_bounds["lambda_1_max"] = 5000
likelihood_parameter_bounds["lambda_2_max"] = 5000
logger.info(f"Searching for a basis file in {roq_dir} ...")
# The chirp mass boundaries are chosen so that this passes priors.validate_prior.
if 4.0 > chirp_mass > 2.35:
basis = os.path.join(roq_dir, "basis_64s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 2.1
likelihood_parameter_bounds["chirp_mass_max"] = 4.0
maximum_frequency = 2048
duration = 64
elif chirp_mass > 1.54:
basis = os.path.join(roq_dir, "basis_128s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 1.4
likelihood_parameter_bounds["chirp_mass_max"] = 2.6
maximum_frequency = 4096
duration = 128
elif chirp_mass > 1.012:
basis = os.path.join(roq_dir, "basis_256s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 0.92
likelihood_parameter_bounds["chirp_mass_max"] = 1.7
maximum_frequency = 4096
duration = 256
elif chirp_mass > 0.66:
basis = os.path.join(roq_dir, "basis_512s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 0.6
likelihood_parameter_bounds["chirp_mass_max"] = 1.1
maximum_frequency = 4096
duration = 512
else:
raise ValueError(
f"No BNS-mass {waveform_approximant} basis has been found for "
f"chirp_mass={chirp_mass}!"
)
if mode == "lowspin_phenomd_fhigh1024_roq" or mode == "lowspin_taylorf2_roq":
maximum_frequency = 1024
logger.info(f"The selected ROQ basis file is {basis}.")
return (
{
"likelihood_type": "ROQGravitationalWaveTransient",
"roq_linear_matrix": basis,
"roq_quadratic_matrix": basis,
"roq_scale_factor": 1,
"waveform_approximant": waveform_approximant,
"enforce_signal_duration": False,
},
likelihood_parameter_bounds,
20,
maximum_frequency,
duration,
)
[docs]
def _choose_low_q_pv2_roq(chirp_mass):
"""Choose an appropriate low-mass-ratio IMRPhenomPv2 ROQ basis file and
return likelihood arguments and quantities characterizing likelihood.
Parameters
----------
chirp_mass: float
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
"""
likelihood_parameter_bounds = {
"mass_ratio_min": 0.05,
"a_1_max": 0.99,
"a_2_max": 0.99,
"spin_template": "precessing",
"psi_max": np.pi / 2,
}
roq_dir = "/home/roq/IMRPhenomPv2/low_mass_ratio"
if 21.0 > chirp_mass > 9.57:
basis = os.path.join(roq_dir, "basis_8s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 8.71
likelihood_parameter_bounds["chirp_mass_max"] = 20.99
duration = 8
elif chirp_mass > 5.72:
basis = os.path.join(roq_dir, "basis_16s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 5.21
likelihood_parameter_bounds["chirp_mass_max"] = 10.99
duration = 16
elif chirp_mass > 3.63:
basis = os.path.join(roq_dir, "basis_32s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 3.31
likelihood_parameter_bounds["chirp_mass_max"] = 6.29
duration = 32
elif chirp_mass > 2.35:
basis = os.path.join(roq_dir, "basis_64s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 2.101
likelihood_parameter_bounds["chirp_mass_max"] = 3.999
duration = 64
elif chirp_mass > 1.5:
basis = os.path.join(roq_dir, "basis_128s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 1.401
likelihood_parameter_bounds["chirp_mass_max"] = 2.599
duration = 128
else:
raise ValueError(
f"No low-mass-ratio IMRPhenomPv2 basis has been found for "
f"chirp_mass={chirp_mass}!"
)
logger.info(f"The selected ROQ basis file is {basis}.")
return (
{
"likelihood_type": "ROQGravitationalWaveTransient",
"roq_linear_matrix": basis,
"roq_quadratic_matrix": basis,
"roq_scale_factor": 1,
"waveform_approximant": "IMRPhenomPv2",
"enforce_signal_duration": False,
},
likelihood_parameter_bounds,
20,
1024,
duration,
)
[docs]
def _choose_xphm_roq(chirp_mass):
"""Choose an appropriate IMRPhenomXPHM ROQ basis file and return likelihood
arguments and quantities characterizing likelihood.
Parameters
----------
chirp_mass: float
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
"""
likelihood_parameter_bounds = {
"mass_ratio_min": 0.05,
"a_1_max": 0.99,
"a_2_max": 0.99,
"spin_template": "precessing",
}
roq_dir = "/home/roq/IMRPhenomXPHM"
if 200.0 > chirp_mass > 45:
basis = os.path.join(roq_dir, "basis_4s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 29.9
likelihood_parameter_bounds["chirp_mass_max"] = 199.9
duration = 4
elif chirp_mass > 25:
basis = os.path.join(roq_dir, "basis_8s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 18.8
likelihood_parameter_bounds["chirp_mass_max"] = 62.8
duration = 8
elif chirp_mass > 16:
basis = os.path.join(roq_dir, "basis_16s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 12.8
likelihood_parameter_bounds["chirp_mass_max"] = 31.8
duration = 16
elif chirp_mass > 10.03:
basis = os.path.join(roq_dir, "basis_32s.hdf5")
likelihood_parameter_bounds["chirp_mass_min"] = 10.03
likelihood_parameter_bounds["chirp_mass_max"] = 19.04
duration = 32
else:
raise ValueError(
f"No IMRPhenomXPHM basis has been found for chirp_mass={chirp_mass}!"
)
logger.info(f"The selected ROQ basis file is {basis}.")
return (
{
"likelihood_type": "ROQGravitationalWaveTransient",
"roq_linear_matrix": basis,
"roq_quadratic_matrix": basis,
"roq_scale_factor": 1,
"waveform_approximant": "IMRPhenomXPHM",
"reference_frequency": 20,
"waveform_arguments_dict": {"PhenomXHMReleaseVersion": 122019},
"phase_marginalization": False,
"enforce_signal_duration": False,
},
likelihood_parameter_bounds,
20,
4096,
duration,
)
[docs]
def _get_cbc_likelihood_args_from_json(filename, trigger_values):
"""Load input JSON file containing likelihood settings and determine
appropriate likelihood arguments and parameter bounds depending on input
trigger values.
The json file is supposed to contain `likelihood_args`,
`likelihood_parameter_bounds`, and/or `trigger_dependent`. The first two
contain default arguments and parameter bounds respectively. The last item
contains trigger-dependent settings to update the default settings. It
contains `range`, `likelihood_args`, and/or `likelihood_parameter_bounds`.
`range` contains dictionary of trigger-parameter ranges, whose keys are
parameter names (`chirp_mass`, `mass_ratio`, `spin_1z`, and/or `spin_2z`)
and values are lists of their ranges. `likelihood_args` contains lists of
arguments, one of which is chosen depending on trigger values and used to
update the default likelihood arguments. `likelihood_parameter_bounds`
contains lists of parameter bounds to update their default.
Parameters
----------
filename: str
trigger_values: dict
Returns
-------
likelihood_args: dict
likelihood_parameter_bounds: dict
bounds of parameter space where likelihood is expected to be accurate
minimum_frequency: float
minimum frequency of likelihood integration
maximum_frequency: float
maximum frequency of likelihood integration
duration: float
inverse of frequency interval of likelihood integration
Example
-------
>>> import json
>>> from bilby_pipe.gracedb import _get_cbc_likelihood_args_from_json
>>> settings = {
... "likelihood_args": {
... "likelihood_type": "ROQGravitationalWaveTransient",
... "minimum_frequency": 20,
... "maximum_frequency": 2048,
... "duration": 4,
... },
... "trigger_dependent": {
... "range": {"chirp_mass": [[30, 40], [40, 50]]},
... "likelihood_args": [
... {"roq_folder": "basis_for_30Msun_to_40Msun"},
... {"roq_folder": "basis_for_40Msun_to_50Msun"},
... ]
... },
... }
>>> with open("test.json", "r") as f:
... json.dump(settings, f)
>>> _get_cbc_likelihood_args_from_json("test.json", {"chirp_mass": 35})
09:04 bilby_pipe INFO : Loading likelihood settings from test.json ...
({'likelihood_type': 'ROQGravitationalWaveTransient', 'roq_folder': 'basis_for_30Msun_to_40Msun'}, {}, 20, 2048, 4)
>>> _get_cbc_likelihood_args_from_json("test.json", {"chirp_mass": 45})
09:04 bilby_pipe INFO : Loading likelihood settings from test.json ...
({'likelihood_type': 'ROQGravitationalWaveTransient', 'roq_folder': 'basis_for_40Msun_to_50Msun'}, {}, 20, 2048, 4)
"""
import json
logger.info(f"Loading likelihood settings from {filename} ...")
with open(filename, "r") as ff:
settings = json.load(ff)
likelihood_args = dict()
if "likelihood_args" in settings:
likelihood_args.update(settings["likelihood_args"])
likelihood_parameter_bounds = dict()
if "likelihood_parameter_bounds" in settings:
likelihood_parameter_bounds.update(settings["likelihood_parameter_bounds"])
if "trigger_dependent" in settings:
trigger_range_dict = settings["trigger_dependent"]["range"]
number_of_ranges = len(list(trigger_range_dict.values())[0])
in_range = np.ones(number_of_ranges, dtype=bool)
for key in trigger_range_dict:
trigger = trigger_values[key]
trigger_range = np.array(trigger_range_dict[key])
in_range *= trigger >= trigger_range[:, 0]
in_range *= trigger <= trigger_range[:, 1]
if not any(in_range):
raise ValueError(
"No likelihood settings found for the trigger values: "
f"{trigger_values}!"
)
selected_idx = np.arange(number_of_ranges)[in_range][0]
trigger_dependent_settings = settings["trigger_dependent"]
if "likelihood_args" in trigger_dependent_settings:
likelihood_args.update(
trigger_dependent_settings["likelihood_args"][selected_idx]
)
if "likelihood_parameter_bounds" in trigger_dependent_settings:
likelihood_parameter_bounds.update(
trigger_dependent_settings["likelihood_parameter_bounds"][selected_idx]
)
minimum_frequency = likelihood_args.pop("minimum_frequency")
maximum_frequency = likelihood_args.pop("maximum_frequency")
duration = likelihood_args.pop("duration")
return (
likelihood_args,
likelihood_parameter_bounds,
minimum_frequency,
maximum_frequency,
duration,
)
[docs]
def _get_default_likelihood_args(trigger_values):
logger.info("Using default likelihood settings, these may not be optimal.")
bounds = dict(
chirp_mass_min=trigger_values["chirp_mass"] / 2,
chirp_mass_max=trigger_values["chirp_mass"] * 2,
spin_template="precessing",
a_1_max=0.99,
a_2_max=0.99,
mass_ratio_min=0.125,
)
duration = _get_default_duration(trigger_values["chirp_mass"])
minimum_frequency = 20
maximum_frequency = 1024
return dict(), bounds, minimum_frequency, maximum_frequency, duration
[docs]
def attempt_gwpy_get(channel, start_time, end_time, n_attempts=5):
for _ in range(n_attempts):
try:
data = TimeSeries.get(channel=channel, start=start_time, end=end_time)
return data
except RuntimeError:
time.sleep(45)
return False
[docs]
def copy_and_save_data(
ifos,
start_time,
end_time,
channel_dict,
outdir,
gracedbid,
query_kafka=True,
n_attempts=5,
replay=False,
):
"""Attempt to read the strain data from internal servers and save frame files to the run directory.
If `query_kafka` is True, then attempt to fetch the data from `/dev/shm/kafka/` (preferred method
for low-latency/online analyses). If `query_kafka` is False or the data cannot be found in
`/dev/shm/kafka/`, this function will attempt to get the data from TimeSeries.get(), called in a loop
to allow for multiple attempts.
If data cannot be found for all the ifos, returns None, and data reading will be attempted by the bilby_pipe
data_generation stage.
Parameters
----------
ifos: list
List of ifos for this trigger
start_time: float
Safe start time for data segment
end_time: float
Safe end time for data segment
channel_dict: dict
Dictionary of channel names
outdir: str
Directory to save frame files
gracedbid: str
GraceDB id of event
query_kafka: bool
Whether to attempt to copy frame files from `/dev/shm/kafka/`
n_attempts: int
Number of attempts to call TimeSeries.get() before failing to obtain data
replay: bool
Whether to try to fetch O3ReplayMDC data from the kafka directory. Only
relevant if query_kafka = True.
Returns
-------
data_dict: dict
Dictionary with {ifo: path_to_copied_frame_file}.
None, if data were not able to be obtained for all ifos.
"""
ifo_data = dict()
for ifo in ifos:
channel = f"{ifo}:{channel_dict[ifo]}"
if query_kafka:
try:
logger.info(f"Querying kafka directory for {ifo} data")
data = read_and_concat_data_from_kafka(
ifo, int(start_time), int(end_time), channel=channel, replay=replay
)
except FileNotFoundError:
logger.info(
f"Failed to obtain {ifo} data from kafka directory. Calling TimeSeries.get"
)
data = attempt_gwpy_get(
channel=channel,
start_time=start_time,
end_time=end_time,
n_attempts=n_attempts,
)
else:
data = attempt_gwpy_get(
channel=channel,
start_time=start_time,
end_time=end_time,
n_attempts=n_attempts,
)
if data is False:
logger.info(f"Could not find data for {ifo} with channel name {channel}")
break
else:
ifo_data[ifo] = data
if all(ifo in ifo_data.keys() for ifo in ifos):
data_paths = dict()
for ifo in ifo_data.keys():
actual_start = ifo_data[ifo].times[0].value
actual_duration = ifo_data[ifo].duration.value
if int(actual_duration) == actual_duration:
actual_duration = int(actual_duration)
datapath = os.path.join(
outdir,
f"{ifo[0]}-{ifo}_{gracedbid}_llhoft-{actual_start}-{actual_duration}.gwf",
)
ifo_data[ifo].write(datapath)
data_paths[ifo] = datapath
logger.info(f"Written {ifo} data to {datapath}")
data_dict = data_paths
else:
logger.info(
"Not getting data in pre-generation step. Will get data in data generation stage."
)
data_dict = None
return data_dict
[docs]
def prepare_run_configurations(
candidate,
gracedb,
outdir,
channel_dict,
sampler_kwargs,
webdir,
search_type="cbc",
cbc_likelihood_mode="phenompv2_bbh_roq",
settings=None,
psd_cut=0.95,
query_kafka=True,
replay=False,
):
"""Creates ini file from defaults and candidate contents
Parameters
----------
candidate:
Contains contents of GraceDB event
gracedb: str
GraceDB id of event
outdir: str
Output directory where the ini file and all output is written
channel_dict: dict
Dictionary of channel names
sampler_kwargs: str
Set of sampler arguments, or option for set of sampler arguments
webdir: str
Directory to store summary pages
search_type: str
What kind of search identified the trigger, options are "cbc" and "burst"
cbc_likelihood_mode: str
Built-in CBC likelihood mode or path to a JSON file containing
likelihood settings. The built-in settings include 'phenompv2_bbh_roq',
'phenompv2_bns_roq', 'phenompv2nrtidalv2_roq',
'lowspin_phenomd_narrowmc_roq', 'lowspin_phenomd_broadmc_roq', and
'test'.
settings: str
JSON filename containing settings to override the defaults
psd_cut: float
Fractional maximum frequency cutoff relative to the maximum frequency of pipeline psd
query_kafka: bool
Whether to first attempt to query the kafka directory for data before attempting a
call to gwpy TimeSeries.get()
replay: bool
Whether to try to fetch O3ReplayMDC data from the kafka directory. Only
relevant if query_kafka = True.
Returns
-------
filename: str
Generated ini filename
"""
if settings is not None:
import json
with open(settings, "r") as ff:
settings = json.load(ff)
else:
settings = dict()
if search_type == "cbc":
(
trigger_values,
superevent,
trigger_time,
ifos,
reference_frame,
time_reference,
) = _read_cbc_candidate(candidate)
(
likelihood_args,
likelihood_parameter_bounds,
minimum_frequency,
maximum_frequency,
duration,
) = _get_cbc_likelihood_args(cbc_likelihood_mode, trigger_values)
(
prior_file,
distance_marginalization_lookup_table,
) = generate_cbc_prior_from_template(
trigger_values["chirp_mass"],
likelihood_parameter_bounds,
outdir,
fast_test=(sampler_kwargs == "FastTest"),
phase_marginalization=likelihood_args.get("phase_marginalization", True),
)
calibration_model, calib_dict = calibration_dict_lookup(trigger_time, ifos)
if calibration_model is None:
spline_calibration_nodes = 0
elif sampler_kwargs == "FastTest":
spline_calibration_nodes = 4
else:
spline_calibration_nodes = 10
extra_config_arguments = dict(
reference_frequency=100,
time_marginalization=False,
distance_marginalization=True,
phase_marginalization=True,
distance_marginalization_lookup_table=distance_marginalization_lookup_table,
plot_trace=True,
plot_data=True,
calibration_model=calibration_model,
spline_calibration_envelope_dict=calib_dict,
spline_calibration_nodes=spline_calibration_nodes,
)
extra_config_arguments.update(likelihood_args)
if (
"lambda_1_max" in likelihood_parameter_bounds
or "lambda_2_max" in likelihood_parameter_bounds
):
extra_config_arguments["default_prior"] = "BNSPriorDict"
extra_config_arguments[
"frequency_domain_source_model"
] = "binary_neutron_star_roq"
elif search_type == "burst":
centre_frequency, superevent, trigger_time, ifos = _read_burst_candidate(
candidate
)
minimum_frequency = min(20, centre_frequency / 2)
maximum_frequency = next_power_of_2(centre_frequency * 2)
duration = 4
extra_config_arguments = dict(
frequency_domain_source_model="bilby.gw.source.sinegaussian",
default_prior="PriorDict",
time_marginalization=False,
phase_marginalization=False,
sampler_kwargs="FastTest",
)
prior_file = generate_burst_prior_from_template(
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
outdir=outdir,
)
else:
raise BilbyPipeError(
f"search_type should be either 'cbc' or 'burst', not {search_type}"
)
start_data, end_data = (trigger_time - duration - 2, trigger_time + 4)
data_dict = copy_and_save_data(
ifos=ifos,
start_time=start_data,
end_time=end_data,
channel_dict=channel_dict,
outdir=outdir,
gracedbid=gracedb,
query_kafka=query_kafka,
replay=replay,
)
config_dict = dict(
label=gracedb,
outdir=outdir,
accounting="ligo.dev.o4.cbc.pe.bilby",
maximum_frequency=maximum_frequency,
minimum_frequency=minimum_frequency,
sampling_frequency=16384,
trigger_time=trigger_time,
detectors=ifos,
channel_dict=channel_dict,
deltaT=0.2,
prior_file=prior_file,
duration=duration,
sampler="dynesty",
sampler_kwargs=sampler_kwargs,
webdir=webdir,
local_generation=False,
local_plot=False,
transfer_files=False,
create_summary=False,
summarypages_arguments={"gracedb": gracedb},
plot_trace=True,
plot_data=True,
plot_calibration=False,
plot_corner=False,
plot_marginal=False,
plot_skymap=False,
plot_waveform=False,
overwrite_outdir=True,
result_format="hdf5",
reference_frame=reference_frame,
time_reference=time_reference,
data_dict=data_dict,
)
if sampler_kwargs == "FastTest":
config_dict["n_parallel"] = 2
else:
config_dict["n_parallel"] = 4
if candidate.get("coinc_file", None) is not None:
psd_dict, psd_maximum_frequency = extract_psds_from_xml(
coinc_file=candidate["coinc_file"], ifos=ifos, outdir=outdir
)
config_dict["psd_dict"] = psd_dict
if psd_maximum_frequency is not None:
psd_maximum_frequency *= min(psd_cut, 1)
if config_dict["maximum_frequency"] > psd_maximum_frequency:
config_dict["maximum_frequency"] = psd_maximum_frequency
logger.info(
f"maximum_frequency is reduced to {psd_maximum_frequency} "
"due to the limination of pipeline psd"
)
config_dict.update(extra_config_arguments)
config_dict["summarypages_arguments"]["nsamples_for_skymap"] = 5000
config_dict.update(settings)
comment = (
"# Configuration ini file generated from GraceDB "
f"for event id {gracedb} superevent id {superevent}"
)
filename = f"{outdir}/bilby_config.ini"
_parser = parser.create_parser()
_parser.write_to_file(
filename=filename,
args=config_dict,
overwrite=True,
include_description=False,
exclude_default=True,
comment=comment,
)
return filename
[docs]
def create_config_file(
candidate,
gracedb,
outdir,
channel_dict,
sampler_kwargs,
webdir,
search_type="cbc",
cbc_likelihood_mode="phenompv2_bbh_roq",
settings=None,
psd_cut=0.95,
query_kafka=True,
replay=False,
):
logger.warning(
"create_config_file is deprecated and will be removed in a future version."
"Calling prepare_run_configurations instead."
)
return prepare_run_configurations(
candidate=candidate,
gracedb=gracedb,
outdir=outdir,
channel_dict=channel_dict,
sampler_kwargs=sampler_kwargs,
webdir=webdir,
search_type=search_type,
cbc_likelihood_mode=cbc_likelihood_mode,
settings=settings,
psd_cut=psd_cut,
query_kafka=query_kafka,
replay=replay,
)
[docs]
def _get_default_duration(chirp_mass):
"""Return default duration based on chirp mass
Parameters
----------
chirp_mass: float
Returns
-------
duration: float
"""
if chirp_mass > 13.53:
duration = 4
elif chirp_mass > 8.73:
duration = 8
elif chirp_mass > 5.66:
duration = 16
elif chirp_mass > 3.68:
duration = 32
elif chirp_mass > 2.39:
duration = 64
else:
duration = 128
return duration
[docs]
def _get_distance_lookup(chirp_mass, phase_marginalization=True):
"""Return appropriate distance bounds and lookup table
Parameters
----------
chirp_mass: float
phase_marginalization: bool (optional, default is True)
Returns
-------
distance_bounds: tuple
lookup_table: str
"""
duration = _get_default_duration(chirp_mass)
distance_bounds = DEFAULT_DISTANCE_LOOKUPS[f"{duration}s"]
if phase_marginalization:
filename = f"{duration}s_distance_marginalization_lookup_phase.npz"
else:
filename = f"{duration}s_distance_marginalization_lookup.npz"
lookup_table = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files",
filename,
)
return distance_bounds, lookup_table
[docs]
def generate_cbc_prior_from_template(
chirp_mass,
likelihood_parameter_bounds,
outdir,
fast_test=False,
phase_marginalization=True,
):
"""Generate a cbc prior file from a template and write it to file. This
returns the paths to the prior file and the corresponding distance look-up
table
Parameters
----------
chirp_mass: float
likelihood_parameter_bounds: dict
outdir: str
fast_test: bool (optional, default is False)
phase_marginalization: bool (optional, default is True)
Returns
-------
prior_file: str
lookup_table: str
"""
if chirp_mass < 2:
bounds = (chirp_mass - 0.01, chirp_mass + 0.01)
elif chirp_mass < 4:
bounds = (chirp_mass - 0.1, chirp_mass + 0.1)
elif chirp_mass < 8:
bounds = (chirp_mass * 0.9, chirp_mass * 1.1)
else:
bounds = (0, 1000000)
chirp_mass_min = max(likelihood_parameter_bounds["chirp_mass_min"], bounds[0])
chirp_mass_max = min(likelihood_parameter_bounds["chirp_mass_max"], bounds[1])
to_add = ""
if "comp_min" in likelihood_parameter_bounds:
comp_min = likelihood_parameter_bounds["comp_min"]
to_add += (
f"mass_1 = Constraint(name='mass_1', minimum={comp_min}, maximum=1000)\n"
)
to_add += (
f"mass_2 = Constraint(name='mass_2', minimum={comp_min}, maximum=1000)\n"
)
if "lambda_1_max" in likelihood_parameter_bounds:
lambda_1_max = likelihood_parameter_bounds["lambda_1_max"]
to_add += (
f"lambda_1 = Uniform(name='lambda_1', minimum=0, maximum={lambda_1_max})\n"
)
if "lambda_2_max" in likelihood_parameter_bounds:
lambda_2_max = likelihood_parameter_bounds["lambda_2_max"]
to_add += (
f"lambda_2 = Uniform(name='lambda_2', minimum=0, maximum={lambda_2_max})\n"
)
distance_bounds, lookup_table = _get_distance_lookup(
chirp_mass, phase_marginalization=phase_marginalization
)
if fast_test:
template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files/fast.prior.template",
)
with open(template, "r") as old_prior:
prior_string = (
old_prior.read().format(
mc_min=chirp_mass_min,
mc_max=chirp_mass_max,
d_min=distance_bounds[0],
d_max=distance_bounds[1],
)
+ to_add
)
else:
spin_template = likelihood_parameter_bounds["spin_template"]
a_1_max = likelihood_parameter_bounds["a_1_max"]
a_2_max = likelihood_parameter_bounds["a_2_max"]
if spin_template == "precessing":
template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files/precessing_spin.prior.template",
)
elif spin_template == "aligned":
template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files/aligned_spin.prior.template",
)
else:
raise ValueError(f"Unknown spin template: {spin_template}")
with open(template, "r") as old_prior:
prior_string = (
old_prior.read().format(
mc_min=chirp_mass_min,
mc_max=chirp_mass_max,
q_min=likelihood_parameter_bounds["mass_ratio_min"],
a_1_max=a_1_max,
a_2_max=a_2_max,
d_min=distance_bounds[0],
d_max=distance_bounds[1],
psi_max=likelihood_parameter_bounds.get("psi_max", np.pi),
)
+ to_add
)
prior_file = os.path.join(outdir, "online.prior")
with open(prior_file, "w") as new_prior:
new_prior.write(prior_string)
return prior_file, lookup_table
[docs]
def generate_burst_prior_from_template(
minimum_frequency, maximum_frequency, outdir, template=None
):
"""Generate a prior file from a template and write it to file
Parameters
----------
minimum_frequency: float
Minimum frequency for prior
maximum_frequency: float
Maximum frequency for prior
outdir: str
Path to the outdir (the prior is written to outdir/online.prior)
template: str
Alternative template file to use, otherwise the
data_files/roq.prior.template file is used
"""
if template is None:
template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"data_files/burst.prior.template",
)
with open(template, "r") as old_prior:
prior_string = old_prior.read().format(
minimum_frequency=minimum_frequency, maximum_frequency=maximum_frequency
)
prior_file = os.path.join(outdir, "online.prior")
with open(prior_file, "w") as new_prior:
new_prior.write(prior_string)
return prior_file
[docs]
def read_and_concat_data_from_kafka(ifo, start, end, channel, replay=False):
"""Query the kafka directory for the gwf files with the desired data. Start
and end should be set wide enough to include the entire duration.
This will read in the individual gwf files and concatenate them into
a single gwpy timeseries"""
if replay:
kafka_directory = f"/dev/shm/kafka/{ifo}_O3ReplayMDC"
else:
kafka_directory = f"/dev/shm/kafka/{ifo}"
times = np.arange(start, end)
segments = []
for time_sec in times:
ht = TimeSeries.read(
f"{kafka_directory}/{ifo[0]}-{ifo}_llhoft-{time_sec}-1.gwf", channel=channel
)
segments.append(ht)
segmentlist = TimeSeriesList(*segments)
data = segmentlist.join()
return data
[docs]
def create_parser():
parser = argparse.ArgumentParser(
prog="bilby_pipe gracedb access",
usage=__doc__,
formatter_class=argparse.RawTextHelpFormatter,
)
group1 = parser.add_mutually_exclusive_group(required=True)
group1.add_argument("--gracedb", type=str, help="GraceDB event id")
group1.add_argument("--json", type=str, help="Path to json GraceDB file")
parser.add_argument(
"--psd-file",
type=str,
help="Path to ligolw-xml file containing the PSDs for the interferometers.",
)
parser.add_argument(
"--convert-to-flat-in-component-mass",
action="store_true",
default=False,
help=(
"Convert a flat-in chirp mass and mass-ratio prior file to flat \n"
"in component mass during the post-processing. Note, the prior \n"
"must be uniform in Mc and q with constraints in m1 and m2 for \n"
"this to work. \n"
),
)
parser.add_argument(
"--outdir",
type=str,
help="Output directory where the ini file and all output is written.",
)
parser.add_argument(
"--output",
type=str,
choices=["ini", "full", "full-local", "full-submit"],
help=(
"Flag to create ini, generate directories and/or submit. \n"
" ini : generates ini file \n"
" full : generates ini and dag submission files (default) \n"
" full-local : generates ini and dag submission files and run locally \n"
" full-submit : generates ini and dag submission files and submits to condor \n"
),
default="full",
)
parser.add_argument(
"--gracedb-url",
type=str,
help=(
"GraceDB service url. \n"
" Main page : https://gracedb.ligo.org/api/ (default) \n"
" Playground : https://gracedb-playground.ligo.org/api/ \n"
),
default="https://gracedb.ligo.org/api/",
)
parser.add_argument(
"--channel-dict",
type=str,
default="online",
choices=list(CHANNEL_DICTS.keys()),
help=(
"Channel dictionary. \n"
" online : use for main GraceDB page events (default)\n"
" o2replay : use for playground GraceDB page events\n"
" o3replay : use for playground GraceDB page events\n"
),
)
parser.add_argument(
"--sampler-kwargs",
type=str,
default="DynestyDefault",
help=(
"Dictionary of sampler-kwargs to pass in, e.g., {nlive: 1000} OR "
"pass pre-defined set of sampler-kwargs {DynestyDefault, BilbyMCMCDefault, FastTest}"
),
)
parser.add_argument(
"--cbc-likelihood-mode",
type=str,
default="phenompv2_bbh_roq",
help=(
"Built-in CBC likelihood mode or path to a JSON file containing likelihood settings. "
"The built-in settings include 'phenompv2_bbh_roq', "
"'lowspin_phenomd_narrowmc_roq', 'lowspin_phenomd_broadmc_roq', "
"'lowspin_phenomd_fhigh1024_roq', 'lowspin_taylorf2_roq', "
"'phenompv2_bns_roq', 'phenompv2nrtidalv2_roq', "
"'low_q_phenompv2_roq', 'phenomxphm_roq', and 'test'."
),
)
parser.add_argument(
"--webdir",
type=str,
default=None,
help=(
"Directory to store summary pages. \n"
" If not given, defaults to outdir/results_page"
),
)
parser.add_argument(
"--settings",
type=str,
default=None,
help="JSON file containing extra settings to override the defaults",
)
parser.add_argument(
"--psd-cut",
type=float,
default=0.95,
help=(
"maximum frequency is set to this value multiplied by the maximum frequency of psd contained in coinc.xml."
" This is to avoid likelihood overflow caused by the roll-off of pipeline psd due to low-pass filter."
),
)
parser.add_argument(
"--query-kafka",
type=bool,
default=True,
help=(
"when fetching the data for analysis, check first it is in kafka, and if not, then try to query ifocache."
"If False, query ifocache (via gwpy TimeSeries.get() ) by default."
),
)
return parser
[docs]
def main(args=None, unknown_args=None):
if args is None:
args, unknown_args = create_parser().parse_known_args()
elif unknown_args is None:
unknown_args = []
if len(unknown_args) > 1 and args.output == "ini":
msg = [
tcolors.WARNING,
f"Unrecognized arguments {unknown_args}, these will be ignored",
tcolors.END,
]
logger.warning(" ".join(msg))
outdir = args.outdir
if args.json:
candidate = read_from_json(args.json)
gracedb = candidate["graceid"]
if outdir is None:
outdir = f"outdir_{gracedb}"
check_directory_exists_and_if_not_mkdir(outdir)
if args.psd_file is not None and os.path.isfile(args.psd_file):
candidate["coinc_file"] = args.psd_file
elif args.gracedb:
gracedb = args.gracedb
gracedb_url = args.gracedb_url
if outdir is None:
outdir = f"outdir_{gracedb}"
check_directory_exists_and_if_not_mkdir(outdir)
candidate = read_from_gracedb(gracedb, gracedb_url, outdir)
else:
raise BilbyPipeError("Either gracedb ID or json file must be provided.")
if args.webdir is not None:
webdir = args.webdir
else:
webdir = os.path.join(outdir, "results_page")
sampler_kwargs = args.sampler_kwargs
channel_dict = CHANNEL_DICTS[args.channel_dict.lower()]
if args.channel_dict.lower() == "o3replay":
replay = True
else:
replay = False
search_type = candidate["group"].lower()
if search_type not in ["cbc", "burst"]:
raise BilbyPipeError(f"Candidate group {candidate['group']} not recognised.")
filename = prepare_run_configurations(
candidate=candidate,
gracedb=gracedb,
outdir=outdir,
channel_dict=channel_dict,
sampler_kwargs=sampler_kwargs,
webdir=webdir,
psd_cut=args.psd_cut,
search_type=search_type,
cbc_likelihood_mode=args.cbc_likelihood_mode,
settings=args.settings,
query_kafka=args.query_kafka,
replay=replay,
)
if args.output == "ini":
logger.info(
"Generating ini with default settings. Run using the command: \n"
f" $ bilby_pipe {filename}"
)
else:
arguments = ["bilby_pipe", filename]
if args.output == "full":
logger.info("Generating dag submissions files")
if args.output == "full-local":
logger.info("Generating dag submission files, running locally")
arguments.append("--local")
if args.output == "full-submit":
logger.info("Generating dag submissions files, submitting to condor")
arguments.append("--submit")
if len(unknown_args) > 1:
arguments = arguments + unknown_args
run_command_line(arguments)