Source code for bilby_pipe.gracedb

""" Tool for running online bilby PE using GraceDB events

The functionality of much of these utility assumes the user is running on the
CIT cluster, e.g. the ROQ and calibration directories are in there usual place
"""
import argparse
import json
import os
import shutil
import time

import numpy as np
from gwpy.timeseries import TimeSeries, TimeSeriesList

from . import parser
from .utils import (
    DEFAULT_DISTANCE_LOOKUPS,
    BilbyPipeError,
    check_directory_exists_and_if_not_mkdir,
    logger,
    next_power_of_2,
    run_command_line,
    tcolors,
    test_connection,
)

# Default channels from: https://wiki.ligo.org/LSC/JRPComm/ObsRun3
# NOTE: can these be read from gracedb?
[docs] CHANNEL_DICTS = dict( online=dict( H1="GDS-CALIB_STRAIN_CLEAN", L1="GDS-CALIB_STRAIN_CLEAN", V1="Hrec_hoft_16384Hz" ), o2replay=dict( H1="GDS-CALIB_STRAIN_O2Replay", L1="GDS-CALIB_STRAIN_O2Replay", V1="Hrec_hoft_16384Hz_O2Replay", ), o3replay=dict( H1="GDS-CALIB_STRAIN_INJ1_O3Replay", L1="GDS-CALIB_STRAIN_INJ1_O3Replay", V1="Hrec_hoft_16384Hz_INJ1_O3Replay", ), )
[docs] def x509userproxy(outdir): """Copies X509_USER_PROXY certificate from user's os.environ and places it inside the outdir, if the X509_USER_PROXY exists. Parameters ---------- outdir: str Output directory where X509_USER_PROXY certificate is copied to. Returns ------- x509userproxy: str, None New path to X509_USER_PROXY certification file None if X509_USER_PROXY certificate does not exist, or if the X509_USER_PROXY cannot be copied. """ x509userproxy = None cert_alias = "X509_USER_PROXY" try: cert_path = os.environ[cert_alias] new_cert_path = os.path.join(outdir, "." + os.path.basename(cert_path)) shutil.copyfile(src=cert_path, dst=new_cert_path) x509userproxy = new_cert_path except FileNotFoundError as e: logger.warning( "Environment variable X509_USER_PROXY does not point to a file. " f"Error while copying file: {e}. " "Try running `$ ligo-proxy-init albert.einstein`" ) except KeyError: logger.warning( "Environment variable X509_USER_PROXY not set" " Try running `$ ligo-proxy-init albert.einstein`" ) return x509userproxy
[docs] def read_from_gracedb(gracedb, gracedb_url, outdir): """ Read GraceDB events from GraceDB Parameters ---------- gracedb: str GraceDB id of event gracedb_url: str Service url for GraceDB events GraceDB 'https://gracedb.ligo.org/api/' (default) GraceDB-playground 'https://gracedb-playground.ligo.org/api/' outdir: str Output directory Returns ------- event: Contains contents of GraceDB event from GraceDB, json format """ from urllib.error import HTTPError from ligo.gracedb.rest import GraceDb test_connection() logger.info(f"Connecting to {gracedb_url}") try: # TODO: do we need this x509 proxy I'm fairly sure my local jobs aren't using it client = GraceDb(cred=x509userproxy(outdir), service_url=gracedb_url) except IOError: logger.warning("Failed to connect to GraceDB") raise logger.info(f"Querying event {gracedb}") event = client.event(gracedb).json() with open(f"{outdir}/{gracedb}.json", "w") as ff: json.dump(event, ff, indent=2) logger.info(f"Requesting coinc.xml for {gracedb}") try: data = client.files(gracedb, filename="coinc.xml") coinc_filename = f"{outdir}/coinc.xml" with open(coinc_filename, "wb") as ff: ff.write(data.data) event["coinc_file"] = coinc_filename except HTTPError: logger.warning( "Failed to download coinc.xml. PSDs will be generated by bilby_pipe." ) event["coinc_file"] = None return event
[docs] def extract_psds_from_xml(coinc_file, ifos, outdir="."): from gwpy.frequencyseries import FrequencySeries psd_filenames = dict() psd_maximum_frequency = None for ifo in ifos: try: psd = FrequencySeries.read(coinc_file, instrument=ifo) if psd_maximum_frequency is None: psd_maximum_frequency = psd.frequencies.value[-1] else: psd_maximum_frequency = min( psd_maximum_frequency, psd.frequencies.value[-1], ) filename = f"{outdir}/{ifo}_psd.txt" psd.write(target=filename, format="txt") psd_filenames[ifo] = filename except ValueError: logger.warning(f"PSD for {ifo} not found in {coinc_file}.") return psd_filenames, psd_maximum_frequency
[docs] def read_from_json(json_file): """Read GraceDB events from json file Parameters ---------- json_file: str Filename of json json file output Returns ------- candidate: dict Contains contents of GraceDB event from json, json format """ if os.path.isfile(json_file) is False: raise FileNotFoundError(f"File {json_file} not found") try: with open(json_file, "r") as file: candidate = json.load(file) except IOError: logger.warning("Unable to load event contents of json file") return candidate
[docs] def calibration_lookup(trigger_time, detector): """Lookup function for the relevant calibration file Assumes that it is running on CIT where the calibration files are stored under /home/cbc/pe/O3/calibrationenvelopes Parameters ---------- trigger_time: float The trigger time of interest detector: str [H1, L1, V1] Detector string Returns ------- filepath: str The path to the relevant calibration envelope file. If no calibration file can be determined, None is returned. """ # FIXME: point to updated calibration envelopes base = "/home/cbc/pe/O3/calibrationenvelopes" CALENVS_LOOKUP = dict( H1=os.path.join(base, "LIGO_Hanford/H_CalEnvs.txt"), L1=os.path.join(base, "LIGO_Livingston/L_CalEnvs.txt"), V1=os.path.join(base, "Virgo/V_CalEnvs.txt"), ) if os.path.isdir(base) is False: raise BilbyPipeError(f"Unable to read from calibration folder {base}") calenv = CALENVS_LOOKUP[detector] times = list() files = dict() with open(calenv, "r") as f: for line in f: time, filename = line.rstrip("\n").rstrip().split(" ") times.append(float(time)) files[float(time)] = filename times = sorted(times) if trigger_time < times[0]: raise BilbyPipeError( "Requested trigger time prior to earliest calibration file" ) for time in times: if trigger_time > time: directory = os.path.dirname(calenv) calib_file = f"{directory}/{files[time]}" return os.path.abspath(calib_file)
[docs] def calibration_dict_lookup(trigger_time, detectors): """Dictionary lookup function for the relevant calibration files Parameters ---------- trigger_time: float The trigger time of interest detectors: list List of detector string Returns ------- calibration_model, calibration_dict: str, dict Calibration model string and dictionary of paths to the relevant calibration envelope file. """ try: calibration_dict = { det: calibration_lookup(trigger_time, det) for det in detectors } return "CubicSpline", calibration_dict except BilbyPipeError: return None, None
[docs] def read_candidate(candidate): """Read a gracedb candidate json dictionary""" if "extra_attributes" not in candidate: raise BilbyPipeError( "Cannot parse event dictionary, not 'extra_attributes' present." ) elif "CoincInspiral" in candidate["extra_attributes"]: return _read_cbc_candidate(candidate) elif "MultiBurst" in candidate["extra_attributes"]: return _read_burst_candidate(candidate)
[docs] def _read_cbc_candidate(candidate): if "mchirp" not in candidate["extra_attributes"]["CoincInspiral"]: raise BilbyPipeError( f"Unable to determine chirp mass for {candidate['graceid']} from GraceDB" ) sngl = candidate["extra_attributes"]["SingleInspiral"][0] trigger_values = {} trigger_values["chirp_mass"] = sngl["mchirp"] trigger_values["mass_ratio"] = min(sngl["mass1"], sngl["mass2"]) / max( sngl["mass1"], sngl["mass2"] ) trigger_values["spin_1z"] = sngl["spin1z"] trigger_values["spin_2z"] = sngl["spin2z"] superevent = candidate["superevent"] ifos = candidate["extra_attributes"]["CoincInspiral"]["ifos"].split(",") sngl = candidate["extra_attributes"]["SingleInspiral"] ifos = [entry["ifo"] for entry in sngl] snrs = [entry["snr"] for entry in sngl] # NOTE: the channel name here varies by pipeline and isn't always available # channels = {entry["ifo"]: entry["channel"][3:] for entry in sngl} best_to_worst = np.argsort(snrs)[::-1] best_event = sngl[best_to_worst[0]] trigger_time = best_event["end_time"] + best_event["end_time_ns"] / 1e9 sorted_ifos = [ifos[idx] for idx in best_to_worst] time_reference = sorted_ifos[0] if len(sorted_ifos) > 1: reference_frame = "".join(sorted_ifos[:2]) else: reference_frame = "sky" return ( trigger_values, superevent, trigger_time, ifos, reference_frame, time_reference, )
[docs] def _read_burst_candidate(candidate): central_frequency = candidate["extra_attributes"]["MultiBurst"]["central_freq"] superevent = candidate["superevent"] trigger_time = candidate["gpstime"] ifos = candidate["extra_attributes"]["MultiBurst"]["ifos"].split() return central_frequency, superevent, trigger_time, ifos
[docs] def _get_cbc_likelihood_args(mode, trigger_values): """Return cbc likelihood arguments and quantities characterizing likelihood Parameters ---------- mode: str chirp_mass: float Returns ------- likelihood_args: dict likelihood_parameter_bounds: dict bounds of parameter space where likelihood is expected to be accurate minimum_frequency: float minimum frequency of likelihood integration maximum_frequency: float maximum frequency of likelihood integration duration: float inverse of frequency interval of likelihood integration """ if mode in ["phenompv2_bbh_roq"]: return _choose_phenompv2_bbh_roq(trigger_values["chirp_mass"]) elif mode in [ "lowspin_phenomd_narrowmc_roq", "lowspin_phenomd_broadmc_roq", "lowspin_phenomd_fhigh1024_roq", "lowspin_taylorf2_roq", "phenompv2_bns_roq", "phenompv2nrtidalv2_roq", ]: return _choose_bns_roq(trigger_values["chirp_mass"], mode) elif mode in ["low_q_phenompv2_roq"]: return _choose_low_q_pv2_roq(trigger_values["chirp_mass"]) elif mode in ["phenomxphm_roq"]: return _choose_xphm_roq(trigger_values["chirp_mass"]) elif mode in ["test"]: return _get_default_likelihood_args(trigger_values) else: return _get_cbc_likelihood_args_from_json(mode, trigger_values)
[docs] def _choose_phenompv2_bbh_roq(chirp_mass, ignore_no_params=False): """Choose an appropriate PhenomPv2 ROQ folder, and return likelihood arguments and quantities characterizing likelihood. The bases were developed in the work of arXiv:1604.08253. For a high-mass trigger with chirp mass above 35 solar mass, this returns arguments with the standard likelihood `GravitationalWaveTransient`, as the analysis is computationally cheap anyway. Parameters ---------- chirp_mass: float ignore_no_params: bool If True, this ignores FileNotFoundError raised when roq params file is not found, which is useful for testing this command outside the CIT cluster. Returns ------- likelihood_args: dict likelihood_parameter_bounds: dict bounds of parameter space where likelihood is expected to be accurate minimum_frequency: float minimum frequency of likelihood integration maximum_frequency: float maximum frequency of likelihood integration duration: float inverse of frequency interval of likelihood integration """ likelihood_args = { "waveform_approximant": "IMRPhenomPv2", } likelihood_parameter_bounds = {"spin_template": "precessing"} if chirp_mass > 35: likelihood_args["likelihood_type"] = "GravitationalWaveTransient" likelihood_args["time_marginalization"] = True likelihood_args["jitter_time"] = True likelihood_parameter_bounds["chirp_mass_min"] = 25 likelihood_parameter_bounds["chirp_mass_max"] = 200 likelihood_parameter_bounds["mass_ratio_min"] = 0.125 likelihood_parameter_bounds["a_1_max"] = 0.99 likelihood_parameter_bounds["a_2_max"] = 0.99 minimum_frequency = 20 maximum_frequency = 1024 duration = 4 else: likelihood_args["likelihood_type"] = "ROQGravitationalWaveTransient" roq_scale_factor = 1 duration = _get_default_duration(chirp_mass) likelihood_args["roq_folder"] = f"/home/cbc/ROQ_data/IMRPhenomPv2/{duration}s" if chirp_mass < 0.9: roq_scale_factor = 2 elif chirp_mass < 1.43: roq_scale_factor = 1.6 roq_params_file = os.path.join(likelihood_args["roq_folder"], "params.dat") if os.path.exists(roq_params_file): roq_params = np.genfromtxt(roq_params_file, names=True) elif ignore_no_params: roq_params = { "chirpmassmin": roq_scale_factor * chirp_mass * 0.9, "chirpmassmax": roq_scale_factor * chirp_mass * 1.1, "qmax": 8, "compmin": 0, "chiL1min": -0.99, "chiL1max": 0.99, "chiL2min": -0.99, "chiL2max": 0.99, "flow": 20, "fhigh": 1024, "seglen": 4, } logger.warning( f"{roq_params_file} not found. ROQ parameters are set to {roq_params}." ) else: raise FileNotFoundError(f"{roq_params_file} not found.") likelihood_args["roq_scale_factor"] = roq_scale_factor likelihood_parameter_bounds["chirp_mass_min"] = ( roq_params["chirpmassmin"] / roq_scale_factor ) likelihood_parameter_bounds["chirp_mass_max"] = ( roq_params["chirpmassmax"] / roq_scale_factor ) likelihood_parameter_bounds["mass_ratio_min"] = 1 / roq_params["qmax"] likelihood_parameter_bounds["comp_min"] = ( roq_params["compmin"] / roq_scale_factor ) likelihood_parameter_bounds["a_1_max"] = roq_params["chiL1max"] likelihood_parameter_bounds["a_2_max"] = roq_params["chiL2max"] minimum_frequency = roq_params["flow"] * roq_scale_factor maximum_frequency = roq_params["fhigh"] * roq_scale_factor duration = roq_params["seglen"] / roq_scale_factor return ( likelihood_args, likelihood_parameter_bounds, minimum_frequency, maximum_frequency, duration, )
[docs] def _choose_bns_roq(chirp_mass, mode): """Choose an appropriate BNS-mass ROQ basis file, and return likelihood arguments and quantities characterizing likelihood. The review information of those bases are found at https://git.ligo.org/pe/O4/review_bns_roq/-/wikis. Parameters ---------- chirp_mass: float mode: str Allowed options are "lowspin_phenomd_narrowmc_roq", "lowspin_phenomd_broadmc_roq", "phenompv2_bns_roq", and "phenompv2nrtidalv2_roq". Returns ------- likelihood_args: dict likelihood_parameter_bounds: dict bounds of parameter space where likelihood is expected to be accurate minimum_frequency: float minimum frequency of likelihood integration maximum_frequency: float maximum frequency of likelihood integration duration: float inverse of frequency interval of likelihood integration """ # When phase marginalization is used, psi + np.pi / 2 is indistingushable # from psi. That is why decreasing its maximum to pi / 2 does not change # the inference results at all. likelihood_parameter_bounds = {"mass_ratio_min": 0.125, "psi_max": np.pi / 2} if mode == "lowspin_phenomd_narrowmc_roq": waveform_approximant = "IMRPhenomD" roq_dir = "/home/roq/IMRPhenomD/lowspin_narrowmc_bns" likelihood_parameter_bounds["a_1_max"] = 0.05 likelihood_parameter_bounds["a_2_max"] = 0.05 likelihood_parameter_bounds["spin_template"] = "aligned" elif mode == "lowspin_phenomd_broadmc_roq": waveform_approximant = "IMRPhenomD" roq_dir = "/home/roq/IMRPhenomD/lowspin_broadmc_bns" likelihood_parameter_bounds["a_1_max"] = 0.05 likelihood_parameter_bounds["a_2_max"] = 0.05 likelihood_parameter_bounds["spin_template"] = "aligned" elif mode == "lowspin_phenomd_fhigh1024_roq": waveform_approximant = "IMRPhenomD" roq_dir = "/home/roq/IMRPhenomD/lowspin_fhigh1024" likelihood_parameter_bounds["a_1_max"] = 0.05 likelihood_parameter_bounds["a_2_max"] = 0.05 likelihood_parameter_bounds["spin_template"] = "aligned" elif mode == "lowspin_taylorf2_roq": waveform_approximant = "TaylorF2" roq_dir = "/home/roq/TaylorF2/lowspin_narrowmc_bns" likelihood_parameter_bounds["a_1_max"] = 0.05 likelihood_parameter_bounds["a_2_max"] = 0.05 likelihood_parameter_bounds["spin_template"] = "aligned" elif mode == "phenompv2_bns_roq": waveform_approximant = "IMRPhenomPv2" roq_dir = "/home/roq/IMRPhenomPv2/bns" likelihood_parameter_bounds["a_1_max"] = 0.99 likelihood_parameter_bounds["a_2_max"] = 0.99 likelihood_parameter_bounds["spin_template"] = "precessing" elif mode == "phenompv2nrtidalv2_roq": waveform_approximant = "IMRPhenomPv2_NRTidalv2" roq_dir = "/home/roq/IMRPhenomPv2_NRTidalv2/bns" likelihood_parameter_bounds["a_1_max"] = 0.4 likelihood_parameter_bounds["a_2_max"] = 0.4 likelihood_parameter_bounds["spin_template"] = "precessing" likelihood_parameter_bounds["lambda_1_max"] = 5000 likelihood_parameter_bounds["lambda_2_max"] = 5000 logger.info(f"Searching for a basis file in {roq_dir} ...") # The chirp mass boundaries are chosen so that this passes priors.validate_prior. if 4.0 > chirp_mass > 2.35: basis = os.path.join(roq_dir, "basis_64s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 2.1 likelihood_parameter_bounds["chirp_mass_max"] = 4.0 maximum_frequency = 2048 duration = 64 elif chirp_mass > 1.54: basis = os.path.join(roq_dir, "basis_128s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 1.4 likelihood_parameter_bounds["chirp_mass_max"] = 2.6 maximum_frequency = 4096 duration = 128 elif chirp_mass > 1.012: basis = os.path.join(roq_dir, "basis_256s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 0.92 likelihood_parameter_bounds["chirp_mass_max"] = 1.7 maximum_frequency = 4096 duration = 256 elif chirp_mass > 0.66: basis = os.path.join(roq_dir, "basis_512s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 0.6 likelihood_parameter_bounds["chirp_mass_max"] = 1.1 maximum_frequency = 4096 duration = 512 else: raise ValueError( f"No BNS-mass {waveform_approximant} basis has been found for " f"chirp_mass={chirp_mass}!" ) if mode == "lowspin_phenomd_fhigh1024_roq" or mode == "lowspin_taylorf2_roq": maximum_frequency = 1024 logger.info(f"The selected ROQ basis file is {basis}.") return ( { "likelihood_type": "ROQGravitationalWaveTransient", "roq_linear_matrix": basis, "roq_quadratic_matrix": basis, "roq_scale_factor": 1, "waveform_approximant": waveform_approximant, "enforce_signal_duration": False, }, likelihood_parameter_bounds, 20, maximum_frequency, duration, )
[docs] def _choose_low_q_pv2_roq(chirp_mass): """Choose an appropriate low-mass-ratio IMRPhenomPv2 ROQ basis file and return likelihood arguments and quantities characterizing likelihood. Parameters ---------- chirp_mass: float Returns ------- likelihood_args: dict likelihood_parameter_bounds: dict bounds of parameter space where likelihood is expected to be accurate minimum_frequency: float minimum frequency of likelihood integration maximum_frequency: float maximum frequency of likelihood integration duration: float inverse of frequency interval of likelihood integration """ likelihood_parameter_bounds = { "mass_ratio_min": 0.05, "a_1_max": 0.99, "a_2_max": 0.99, "spin_template": "precessing", "psi_max": np.pi / 2, } roq_dir = "/home/roq/IMRPhenomPv2/low_mass_ratio" if 21.0 > chirp_mass > 9.57: basis = os.path.join(roq_dir, "basis_8s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 8.71 likelihood_parameter_bounds["chirp_mass_max"] = 20.99 duration = 8 elif chirp_mass > 5.72: basis = os.path.join(roq_dir, "basis_16s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 5.21 likelihood_parameter_bounds["chirp_mass_max"] = 10.99 duration = 16 elif chirp_mass > 3.63: basis = os.path.join(roq_dir, "basis_32s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 3.31 likelihood_parameter_bounds["chirp_mass_max"] = 6.29 duration = 32 elif chirp_mass > 2.35: basis = os.path.join(roq_dir, "basis_64s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 2.101 likelihood_parameter_bounds["chirp_mass_max"] = 3.999 duration = 64 elif chirp_mass > 1.5: basis = os.path.join(roq_dir, "basis_128s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 1.401 likelihood_parameter_bounds["chirp_mass_max"] = 2.599 duration = 128 else: raise ValueError( f"No low-mass-ratio IMRPhenomPv2 basis has been found for " f"chirp_mass={chirp_mass}!" ) logger.info(f"The selected ROQ basis file is {basis}.") return ( { "likelihood_type": "ROQGravitationalWaveTransient", "roq_linear_matrix": basis, "roq_quadratic_matrix": basis, "roq_scale_factor": 1, "waveform_approximant": "IMRPhenomPv2", "enforce_signal_duration": False, }, likelihood_parameter_bounds, 20, 1024, duration, )
[docs] def _choose_xphm_roq(chirp_mass): """Choose an appropriate IMRPhenomXPHM ROQ basis file and return likelihood arguments and quantities characterizing likelihood. Parameters ---------- chirp_mass: float Returns ------- likelihood_args: dict likelihood_parameter_bounds: dict bounds of parameter space where likelihood is expected to be accurate minimum_frequency: float minimum frequency of likelihood integration maximum_frequency: float maximum frequency of likelihood integration duration: float inverse of frequency interval of likelihood integration """ likelihood_parameter_bounds = { "mass_ratio_min": 0.05, "a_1_max": 0.99, "a_2_max": 0.99, "spin_template": "precessing", } roq_dir = "/home/roq/IMRPhenomXPHM" if 200.0 > chirp_mass > 45: basis = os.path.join(roq_dir, "basis_4s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 29.9 likelihood_parameter_bounds["chirp_mass_max"] = 199.9 duration = 4 elif chirp_mass > 25: basis = os.path.join(roq_dir, "basis_8s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 18.8 likelihood_parameter_bounds["chirp_mass_max"] = 62.8 duration = 8 elif chirp_mass > 16: basis = os.path.join(roq_dir, "basis_16s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 12.8 likelihood_parameter_bounds["chirp_mass_max"] = 31.8 duration = 16 elif chirp_mass > 10.03: basis = os.path.join(roq_dir, "basis_32s.hdf5") likelihood_parameter_bounds["chirp_mass_min"] = 10.03 likelihood_parameter_bounds["chirp_mass_max"] = 19.04 duration = 32 else: raise ValueError( f"No IMRPhenomXPHM basis has been found for chirp_mass={chirp_mass}!" ) logger.info(f"The selected ROQ basis file is {basis}.") return ( { "likelihood_type": "ROQGravitationalWaveTransient", "roq_linear_matrix": basis, "roq_quadratic_matrix": basis, "roq_scale_factor": 1, "waveform_approximant": "IMRPhenomXPHM", "reference_frequency": 20, "waveform_arguments_dict": {"PhenomXHMReleaseVersion": 122019}, "phase_marginalization": False, "enforce_signal_duration": False, }, likelihood_parameter_bounds, 20, 4096, duration, )
[docs] def _get_cbc_likelihood_args_from_json(filename, trigger_values): """Load input JSON file containing likelihood settings and determine appropriate likelihood arguments and parameter bounds depending on input trigger values. The json file is supposed to contain `likelihood_args`, `likelihood_parameter_bounds`, and/or `trigger_dependent`. The first two contain default arguments and parameter bounds respectively. The last item contains trigger-dependent settings to update the default settings. It contains `range`, `likelihood_args`, and/or `likelihood_parameter_bounds`. `range` contains dictionary of trigger-parameter ranges, whose keys are parameter names (`chirp_mass`, `mass_ratio`, `spin_1z`, and/or `spin_2z`) and values are lists of their ranges. `likelihood_args` contains lists of arguments, one of which is chosen depending on trigger values and used to update the default likelihood arguments. `likelihood_parameter_bounds` contains lists of parameter bounds to update their default. Parameters ---------- filename: str trigger_values: dict Returns ------- likelihood_args: dict likelihood_parameter_bounds: dict bounds of parameter space where likelihood is expected to be accurate minimum_frequency: float minimum frequency of likelihood integration maximum_frequency: float maximum frequency of likelihood integration duration: float inverse of frequency interval of likelihood integration Example ------- >>> import json >>> from bilby_pipe.gracedb import _get_cbc_likelihood_args_from_json >>> settings = { ... "likelihood_args": { ... "likelihood_type": "ROQGravitationalWaveTransient", ... "minimum_frequency": 20, ... "maximum_frequency": 2048, ... "duration": 4, ... }, ... "trigger_dependent": { ... "range": {"chirp_mass": [[30, 40], [40, 50]]}, ... "likelihood_args": [ ... {"roq_folder": "basis_for_30Msun_to_40Msun"}, ... {"roq_folder": "basis_for_40Msun_to_50Msun"}, ... ] ... }, ... } >>> with open("test.json", "r") as f: ... json.dump(settings, f) >>> _get_cbc_likelihood_args_from_json("test.json", {"chirp_mass": 35}) 09:04 bilby_pipe INFO : Loading likelihood settings from test.json ... ({'likelihood_type': 'ROQGravitationalWaveTransient', 'roq_folder': 'basis_for_30Msun_to_40Msun'}, {}, 20, 2048, 4) >>> _get_cbc_likelihood_args_from_json("test.json", {"chirp_mass": 45}) 09:04 bilby_pipe INFO : Loading likelihood settings from test.json ... ({'likelihood_type': 'ROQGravitationalWaveTransient', 'roq_folder': 'basis_for_40Msun_to_50Msun'}, {}, 20, 2048, 4) """ import json logger.info(f"Loading likelihood settings from {filename} ...") with open(filename, "r") as ff: settings = json.load(ff) likelihood_args = dict() if "likelihood_args" in settings: likelihood_args.update(settings["likelihood_args"]) likelihood_parameter_bounds = dict() if "likelihood_parameter_bounds" in settings: likelihood_parameter_bounds.update(settings["likelihood_parameter_bounds"]) if "trigger_dependent" in settings: trigger_range_dict = settings["trigger_dependent"]["range"] number_of_ranges = len(list(trigger_range_dict.values())[0]) in_range = np.ones(number_of_ranges, dtype=bool) for key in trigger_range_dict: trigger = trigger_values[key] trigger_range = np.array(trigger_range_dict[key]) in_range *= trigger >= trigger_range[:, 0] in_range *= trigger <= trigger_range[:, 1] if not any(in_range): raise ValueError( "No likelihood settings found for the trigger values: " f"{trigger_values}!" ) selected_idx = np.arange(number_of_ranges)[in_range][0] trigger_dependent_settings = settings["trigger_dependent"] if "likelihood_args" in trigger_dependent_settings: likelihood_args.update( trigger_dependent_settings["likelihood_args"][selected_idx] ) if "likelihood_parameter_bounds" in trigger_dependent_settings: likelihood_parameter_bounds.update( trigger_dependent_settings["likelihood_parameter_bounds"][selected_idx] ) minimum_frequency = likelihood_args.pop("minimum_frequency") maximum_frequency = likelihood_args.pop("maximum_frequency") duration = likelihood_args.pop("duration") return ( likelihood_args, likelihood_parameter_bounds, minimum_frequency, maximum_frequency, duration, )
[docs] def _get_default_likelihood_args(trigger_values): logger.info("Using default likelihood settings, these may not be optimal.") bounds = dict( chirp_mass_min=trigger_values["chirp_mass"] / 2, chirp_mass_max=trigger_values["chirp_mass"] * 2, spin_template="precessing", a_1_max=0.99, a_2_max=0.99, mass_ratio_min=0.125, ) duration = _get_default_duration(trigger_values["chirp_mass"]) minimum_frequency = 20 maximum_frequency = 1024 return dict(), bounds, minimum_frequency, maximum_frequency, duration
[docs] def attempt_gwpy_get(channel, start_time, end_time, n_attempts=5): for _ in range(n_attempts): try: data = TimeSeries.get(channel=channel, start=start_time, end=end_time) return data except RuntimeError: time.sleep(45) return False
[docs] def copy_and_save_data( ifos, start_time, end_time, channel_dict, outdir, gracedbid, query_kafka=True, n_attempts=5, replay=False, ): """Attempt to read the strain data from internal servers and save frame files to the run directory. If `query_kafka` is True, then attempt to fetch the data from `/dev/shm/kafka/` (preferred method for low-latency/online analyses). If `query_kafka` is False or the data cannot be found in `/dev/shm/kafka/`, this function will attempt to get the data from TimeSeries.get(), called in a loop to allow for multiple attempts. If data cannot be found for all the ifos, returns None, and data reading will be attempted by the bilby_pipe data_generation stage. Parameters ---------- ifos: list List of ifos for this trigger start_time: float Safe start time for data segment end_time: float Safe end time for data segment channel_dict: dict Dictionary of channel names outdir: str Directory to save frame files gracedbid: str GraceDB id of event query_kafka: bool Whether to attempt to copy frame files from `/dev/shm/kafka/` n_attempts: int Number of attempts to call TimeSeries.get() before failing to obtain data replay: bool Whether to try to fetch O3ReplayMDC data from the kafka directory. Only relevant if query_kafka = True. Returns ------- data_dict: dict Dictionary with {ifo: path_to_copied_frame_file}. None, if data were not able to be obtained for all ifos. """ ifo_data = dict() for ifo in ifos: channel = f"{ifo}:{channel_dict[ifo]}" if query_kafka: try: logger.info(f"Querying kafka directory for {ifo} data") data = read_and_concat_data_from_kafka( ifo, int(start_time), int(end_time), channel=channel, replay=replay ) except FileNotFoundError: logger.info( f"Failed to obtain {ifo} data from kafka directory. Calling TimeSeries.get" ) data = attempt_gwpy_get( channel=channel, start_time=start_time, end_time=end_time, n_attempts=n_attempts, ) else: data = attempt_gwpy_get( channel=channel, start_time=start_time, end_time=end_time, n_attempts=n_attempts, ) if data is False: logger.info(f"Could not find data for {ifo} with channel name {channel}") break else: ifo_data[ifo] = data if all(ifo in ifo_data.keys() for ifo in ifos): data_paths = dict() for ifo in ifo_data.keys(): actual_start = ifo_data[ifo].times[0].value actual_duration = ifo_data[ifo].duration.value if int(actual_duration) == actual_duration: actual_duration = int(actual_duration) datapath = os.path.join( outdir, f"{ifo[0]}-{ifo}_{gracedbid}_llhoft-{actual_start}-{actual_duration}.gwf", ) ifo_data[ifo].write(datapath) data_paths[ifo] = datapath logger.info(f"Written {ifo} data to {datapath}") data_dict = data_paths else: logger.info( "Not getting data in pre-generation step. Will get data in data generation stage." ) data_dict = None return data_dict
[docs] def prepare_run_configurations( candidate, gracedb, outdir, channel_dict, sampler_kwargs, webdir, search_type="cbc", cbc_likelihood_mode="phenompv2_bbh_roq", settings=None, psd_cut=0.95, query_kafka=True, replay=False, ): """Creates ini file from defaults and candidate contents Parameters ---------- candidate: Contains contents of GraceDB event gracedb: str GraceDB id of event outdir: str Output directory where the ini file and all output is written channel_dict: dict Dictionary of channel names sampler_kwargs: str Set of sampler arguments, or option for set of sampler arguments webdir: str Directory to store summary pages search_type: str What kind of search identified the trigger, options are "cbc" and "burst" cbc_likelihood_mode: str Built-in CBC likelihood mode or path to a JSON file containing likelihood settings. The built-in settings include 'phenompv2_bbh_roq', 'phenompv2_bns_roq', 'phenompv2nrtidalv2_roq', 'lowspin_phenomd_narrowmc_roq', 'lowspin_phenomd_broadmc_roq', and 'test'. settings: str JSON filename containing settings to override the defaults psd_cut: float Fractional maximum frequency cutoff relative to the maximum frequency of pipeline psd query_kafka: bool Whether to first attempt to query the kafka directory for data before attempting a call to gwpy TimeSeries.get() replay: bool Whether to try to fetch O3ReplayMDC data from the kafka directory. Only relevant if query_kafka = True. Returns ------- filename: str Generated ini filename """ if settings is not None: import json with open(settings, "r") as ff: settings = json.load(ff) else: settings = dict() if search_type == "cbc": ( trigger_values, superevent, trigger_time, ifos, reference_frame, time_reference, ) = _read_cbc_candidate(candidate) ( likelihood_args, likelihood_parameter_bounds, minimum_frequency, maximum_frequency, duration, ) = _get_cbc_likelihood_args(cbc_likelihood_mode, trigger_values) ( prior_file, distance_marginalization_lookup_table, ) = generate_cbc_prior_from_template( trigger_values["chirp_mass"], likelihood_parameter_bounds, outdir, fast_test=(sampler_kwargs == "FastTest"), phase_marginalization=likelihood_args.get("phase_marginalization", True), ) calibration_model, calib_dict = calibration_dict_lookup(trigger_time, ifos) if calibration_model is None: spline_calibration_nodes = 0 elif sampler_kwargs == "FastTest": spline_calibration_nodes = 4 else: spline_calibration_nodes = 10 extra_config_arguments = dict( reference_frequency=100, time_marginalization=False, distance_marginalization=True, phase_marginalization=True, distance_marginalization_lookup_table=distance_marginalization_lookup_table, plot_trace=True, plot_data=True, calibration_model=calibration_model, spline_calibration_envelope_dict=calib_dict, spline_calibration_nodes=spline_calibration_nodes, ) extra_config_arguments.update(likelihood_args) if ( "lambda_1_max" in likelihood_parameter_bounds or "lambda_2_max" in likelihood_parameter_bounds ): extra_config_arguments["default_prior"] = "BNSPriorDict" extra_config_arguments[ "frequency_domain_source_model" ] = "binary_neutron_star_roq" elif search_type == "burst": centre_frequency, superevent, trigger_time, ifos = _read_burst_candidate( candidate ) minimum_frequency = min(20, centre_frequency / 2) maximum_frequency = next_power_of_2(centre_frequency * 2) duration = 4 extra_config_arguments = dict( frequency_domain_source_model="bilby.gw.source.sinegaussian", default_prior="PriorDict", time_marginalization=False, phase_marginalization=False, sampler_kwargs="FastTest", ) prior_file = generate_burst_prior_from_template( minimum_frequency=minimum_frequency, maximum_frequency=maximum_frequency, outdir=outdir, ) else: raise BilbyPipeError( f"search_type should be either 'cbc' or 'burst', not {search_type}" ) start_data, end_data = (trigger_time - duration - 2, trigger_time + 4) data_dict = copy_and_save_data( ifos=ifos, start_time=start_data, end_time=end_data, channel_dict=channel_dict, outdir=outdir, gracedbid=gracedb, query_kafka=query_kafka, replay=replay, ) config_dict = dict( label=gracedb, outdir=outdir, accounting="ligo.dev.o4.cbc.pe.bilby", maximum_frequency=maximum_frequency, minimum_frequency=minimum_frequency, sampling_frequency=16384, trigger_time=trigger_time, detectors=ifos, channel_dict=channel_dict, deltaT=0.2, prior_file=prior_file, duration=duration, sampler="dynesty", sampler_kwargs=sampler_kwargs, webdir=webdir, local_generation=False, local_plot=False, transfer_files=False, create_summary=False, summarypages_arguments={"gracedb": gracedb}, plot_trace=True, plot_data=True, plot_calibration=False, plot_corner=False, plot_marginal=False, plot_skymap=False, plot_waveform=False, overwrite_outdir=True, result_format="hdf5", reference_frame=reference_frame, time_reference=time_reference, data_dict=data_dict, ) if sampler_kwargs == "FastTest": config_dict["n_parallel"] = 2 else: config_dict["n_parallel"] = 4 if candidate.get("coinc_file", None) is not None: psd_dict, psd_maximum_frequency = extract_psds_from_xml( coinc_file=candidate["coinc_file"], ifos=ifos, outdir=outdir ) config_dict["psd_dict"] = psd_dict if psd_maximum_frequency is not None: psd_maximum_frequency *= min(psd_cut, 1) if config_dict["maximum_frequency"] > psd_maximum_frequency: config_dict["maximum_frequency"] = psd_maximum_frequency logger.info( f"maximum_frequency is reduced to {psd_maximum_frequency} " "due to the limination of pipeline psd" ) config_dict.update(extra_config_arguments) config_dict["summarypages_arguments"]["nsamples_for_skymap"] = 5000 config_dict.update(settings) comment = ( "# Configuration ini file generated from GraceDB " f"for event id {gracedb} superevent id {superevent}" ) filename = f"{outdir}/bilby_config.ini" _parser = parser.create_parser() _parser.write_to_file( filename=filename, args=config_dict, overwrite=True, include_description=False, exclude_default=True, comment=comment, ) return filename
[docs] def create_config_file( candidate, gracedb, outdir, channel_dict, sampler_kwargs, webdir, search_type="cbc", cbc_likelihood_mode="phenompv2_bbh_roq", settings=None, psd_cut=0.95, query_kafka=True, replay=False, ): logger.warning( "create_config_file is deprecated and will be removed in a future version." "Calling prepare_run_configurations instead." ) return prepare_run_configurations( candidate=candidate, gracedb=gracedb, outdir=outdir, channel_dict=channel_dict, sampler_kwargs=sampler_kwargs, webdir=webdir, search_type=search_type, cbc_likelihood_mode=cbc_likelihood_mode, settings=settings, psd_cut=psd_cut, query_kafka=query_kafka, replay=replay, )
[docs] def _get_default_duration(chirp_mass): """Return default duration based on chirp mass Parameters ---------- chirp_mass: float Returns ------- duration: float """ if chirp_mass > 13.53: duration = 4 elif chirp_mass > 8.73: duration = 8 elif chirp_mass > 5.66: duration = 16 elif chirp_mass > 3.68: duration = 32 elif chirp_mass > 2.39: duration = 64 else: duration = 128 return duration
[docs] def _get_distance_lookup(chirp_mass, phase_marginalization=True): """Return appropriate distance bounds and lookup table Parameters ---------- chirp_mass: float phase_marginalization: bool (optional, default is True) Returns ------- distance_bounds: tuple lookup_table: str """ duration = _get_default_duration(chirp_mass) distance_bounds = DEFAULT_DISTANCE_LOOKUPS[f"{duration}s"] if phase_marginalization: filename = f"{duration}s_distance_marginalization_lookup_phase.npz" else: filename = f"{duration}s_distance_marginalization_lookup.npz" lookup_table = os.path.join( os.path.dirname(os.path.realpath(__file__)), "data_files", filename, ) return distance_bounds, lookup_table
[docs] def generate_cbc_prior_from_template( chirp_mass, likelihood_parameter_bounds, outdir, fast_test=False, phase_marginalization=True, ): """Generate a cbc prior file from a template and write it to file. This returns the paths to the prior file and the corresponding distance look-up table Parameters ---------- chirp_mass: float likelihood_parameter_bounds: dict outdir: str fast_test: bool (optional, default is False) phase_marginalization: bool (optional, default is True) Returns ------- prior_file: str lookup_table: str """ if chirp_mass < 2: bounds = (chirp_mass - 0.01, chirp_mass + 0.01) elif chirp_mass < 4: bounds = (chirp_mass - 0.1, chirp_mass + 0.1) elif chirp_mass < 8: bounds = (chirp_mass * 0.9, chirp_mass * 1.1) else: bounds = (0, 1000000) chirp_mass_min = max(likelihood_parameter_bounds["chirp_mass_min"], bounds[0]) chirp_mass_max = min(likelihood_parameter_bounds["chirp_mass_max"], bounds[1]) to_add = "" if "comp_min" in likelihood_parameter_bounds: comp_min = likelihood_parameter_bounds["comp_min"] to_add += ( f"mass_1 = Constraint(name='mass_1', minimum={comp_min}, maximum=1000)\n" ) to_add += ( f"mass_2 = Constraint(name='mass_2', minimum={comp_min}, maximum=1000)\n" ) if "lambda_1_max" in likelihood_parameter_bounds: lambda_1_max = likelihood_parameter_bounds["lambda_1_max"] to_add += ( f"lambda_1 = Uniform(name='lambda_1', minimum=0, maximum={lambda_1_max})\n" ) if "lambda_2_max" in likelihood_parameter_bounds: lambda_2_max = likelihood_parameter_bounds["lambda_2_max"] to_add += ( f"lambda_2 = Uniform(name='lambda_2', minimum=0, maximum={lambda_2_max})\n" ) distance_bounds, lookup_table = _get_distance_lookup( chirp_mass, phase_marginalization=phase_marginalization ) if fast_test: template = os.path.join( os.path.dirname(os.path.realpath(__file__)), "data_files/fast.prior.template", ) with open(template, "r") as old_prior: prior_string = ( old_prior.read().format( mc_min=chirp_mass_min, mc_max=chirp_mass_max, d_min=distance_bounds[0], d_max=distance_bounds[1], ) + to_add ) else: spin_template = likelihood_parameter_bounds["spin_template"] a_1_max = likelihood_parameter_bounds["a_1_max"] a_2_max = likelihood_parameter_bounds["a_2_max"] if spin_template == "precessing": template = os.path.join( os.path.dirname(os.path.realpath(__file__)), "data_files/precessing_spin.prior.template", ) elif spin_template == "aligned": template = os.path.join( os.path.dirname(os.path.realpath(__file__)), "data_files/aligned_spin.prior.template", ) else: raise ValueError(f"Unknown spin template: {spin_template}") with open(template, "r") as old_prior: prior_string = ( old_prior.read().format( mc_min=chirp_mass_min, mc_max=chirp_mass_max, q_min=likelihood_parameter_bounds["mass_ratio_min"], a_1_max=a_1_max, a_2_max=a_2_max, d_min=distance_bounds[0], d_max=distance_bounds[1], psi_max=likelihood_parameter_bounds.get("psi_max", np.pi), ) + to_add ) prior_file = os.path.join(outdir, "online.prior") with open(prior_file, "w") as new_prior: new_prior.write(prior_string) return prior_file, lookup_table
[docs] def generate_burst_prior_from_template( minimum_frequency, maximum_frequency, outdir, template=None ): """Generate a prior file from a template and write it to file Parameters ---------- minimum_frequency: float Minimum frequency for prior maximum_frequency: float Maximum frequency for prior outdir: str Path to the outdir (the prior is written to outdir/online.prior) template: str Alternative template file to use, otherwise the data_files/roq.prior.template file is used """ if template is None: template = os.path.join( os.path.dirname(os.path.realpath(__file__)), "data_files/burst.prior.template", ) with open(template, "r") as old_prior: prior_string = old_prior.read().format( minimum_frequency=minimum_frequency, maximum_frequency=maximum_frequency ) prior_file = os.path.join(outdir, "online.prior") with open(prior_file, "w") as new_prior: new_prior.write(prior_string) return prior_file
[docs] def read_and_concat_data_from_kafka(ifo, start, end, channel, replay=False): """Query the kafka directory for the gwf files with the desired data. Start and end should be set wide enough to include the entire duration. This will read in the individual gwf files and concatenate them into a single gwpy timeseries""" if replay: kafka_directory = f"/dev/shm/kafka/{ifo}_O3ReplayMDC" else: kafka_directory = f"/dev/shm/kafka/{ifo}" times = np.arange(start, end) segments = [] for time_sec in times: ht = TimeSeries.read( f"{kafka_directory}/{ifo[0]}-{ifo}_llhoft-{time_sec}-1.gwf", channel=channel ) segments.append(ht) segmentlist = TimeSeriesList(*segments) data = segmentlist.join() return data
[docs] def create_parser(): parser = argparse.ArgumentParser( prog="bilby_pipe gracedb access", usage=__doc__, formatter_class=argparse.RawTextHelpFormatter, ) group1 = parser.add_mutually_exclusive_group(required=True) group1.add_argument("--gracedb", type=str, help="GraceDB event id") group1.add_argument("--json", type=str, help="Path to json GraceDB file") parser.add_argument( "--psd-file", type=str, help="Path to ligolw-xml file containing the PSDs for the interferometers.", ) parser.add_argument( "--convert-to-flat-in-component-mass", action="store_true", default=False, help=( "Convert a flat-in chirp mass and mass-ratio prior file to flat \n" "in component mass during the post-processing. Note, the prior \n" "must be uniform in Mc and q with constraints in m1 and m2 for \n" "this to work. \n" ), ) parser.add_argument( "--outdir", type=str, help="Output directory where the ini file and all output is written.", ) parser.add_argument( "--output", type=str, choices=["ini", "full", "full-local", "full-submit"], help=( "Flag to create ini, generate directories and/or submit. \n" " ini : generates ini file \n" " full : generates ini and dag submission files (default) \n" " full-local : generates ini and dag submission files and run locally \n" " full-submit : generates ini and dag submission files and submits to condor \n" ), default="full", ) parser.add_argument( "--gracedb-url", type=str, help=( "GraceDB service url. \n" " Main page : https://gracedb.ligo.org/api/ (default) \n" " Playground : https://gracedb-playground.ligo.org/api/ \n" ), default="https://gracedb.ligo.org/api/", ) parser.add_argument( "--channel-dict", type=str, default="online", choices=list(CHANNEL_DICTS.keys()), help=( "Channel dictionary. \n" " online : use for main GraceDB page events (default)\n" " o2replay : use for playground GraceDB page events\n" " o3replay : use for playground GraceDB page events\n" ), ) parser.add_argument( "--sampler-kwargs", type=str, default="DynestyDefault", help=( "Dictionary of sampler-kwargs to pass in, e.g., {nlive: 1000} OR " "pass pre-defined set of sampler-kwargs {DynestyDefault, BilbyMCMCDefault, FastTest}" ), ) parser.add_argument( "--cbc-likelihood-mode", type=str, default="phenompv2_bbh_roq", help=( "Built-in CBC likelihood mode or path to a JSON file containing likelihood settings. " "The built-in settings include 'phenompv2_bbh_roq', " "'lowspin_phenomd_narrowmc_roq', 'lowspin_phenomd_broadmc_roq', " "'lowspin_phenomd_fhigh1024_roq', 'lowspin_taylorf2_roq', " "'phenompv2_bns_roq', 'phenompv2nrtidalv2_roq', " "'low_q_phenompv2_roq', 'phenomxphm_roq', and 'test'." ), ) parser.add_argument( "--webdir", type=str, default=None, help=( "Directory to store summary pages. \n" " If not given, defaults to outdir/results_page" ), ) parser.add_argument( "--settings", type=str, default=None, help="JSON file containing extra settings to override the defaults", ) parser.add_argument( "--psd-cut", type=float, default=0.95, help=( "maximum frequency is set to this value multiplied by the maximum frequency of psd contained in coinc.xml." " This is to avoid likelihood overflow caused by the roll-off of pipeline psd due to low-pass filter." ), ) parser.add_argument( "--query-kafka", type=bool, default=True, help=( "when fetching the data for analysis, check first it is in kafka, and if not, then try to query ifocache." "If False, query ifocache (via gwpy TimeSeries.get() ) by default." ), ) return parser
[docs] def main(args=None, unknown_args=None): if args is None: args, unknown_args = create_parser().parse_known_args() elif unknown_args is None: unknown_args = [] if len(unknown_args) > 1 and args.output == "ini": msg = [ tcolors.WARNING, f"Unrecognized arguments {unknown_args}, these will be ignored", tcolors.END, ] logger.warning(" ".join(msg)) outdir = args.outdir if args.json: candidate = read_from_json(args.json) gracedb = candidate["graceid"] if outdir is None: outdir = f"outdir_{gracedb}" check_directory_exists_and_if_not_mkdir(outdir) if args.psd_file is not None and os.path.isfile(args.psd_file): candidate["coinc_file"] = args.psd_file elif args.gracedb: gracedb = args.gracedb gracedb_url = args.gracedb_url if outdir is None: outdir = f"outdir_{gracedb}" check_directory_exists_and_if_not_mkdir(outdir) candidate = read_from_gracedb(gracedb, gracedb_url, outdir) else: raise BilbyPipeError("Either gracedb ID or json file must be provided.") if args.webdir is not None: webdir = args.webdir else: webdir = os.path.join(outdir, "results_page") sampler_kwargs = args.sampler_kwargs channel_dict = CHANNEL_DICTS[args.channel_dict.lower()] if args.channel_dict.lower() == "o3replay": replay = True else: replay = False search_type = candidate["group"].lower() if search_type not in ["cbc", "burst"]: raise BilbyPipeError(f"Candidate group {candidate['group']} not recognised.") filename = prepare_run_configurations( candidate=candidate, gracedb=gracedb, outdir=outdir, channel_dict=channel_dict, sampler_kwargs=sampler_kwargs, webdir=webdir, psd_cut=args.psd_cut, search_type=search_type, cbc_likelihood_mode=args.cbc_likelihood_mode, settings=args.settings, query_kafka=args.query_kafka, replay=replay, ) if args.output == "ini": logger.info( "Generating ini with default settings. Run using the command: \n" f" $ bilby_pipe {filename}" ) else: arguments = ["bilby_pipe", filename] if args.output == "full": logger.info("Generating dag submissions files") if args.output == "full-local": logger.info("Generating dag submission files, running locally") arguments.append("--local") if args.output == "full-submit": logger.info("Generating dag submissions files, submitting to condor") arguments.append("--submit") if len(unknown_args) > 1: arguments = arguments + unknown_args run_command_line(arguments)