"""
A set of generic utilities used in bilby_pipe
"""
import ast
import logging
import math
import os
import pickle
import re
import subprocess
import sys
import urllib
import urllib.request
from pathlib import Path
import bilby
[docs]
CHECKPOINT_EXIT_CODE = 77
[docs]
def get_colored_string(msg_list, color="WARNING"):
if isinstance(msg_list, str):
msg_list = [msg_list]
colstr = getattr(tcolors, color)
msg = [colstr] + msg_list + [tcolors.END]
return " ".join(msg)
[docs]
class BilbyPipeError(Exception):
def __init__(self, message):
super().__init__(message)
[docs]
class BilbyPipeInternalError(Exception):
def __init__(self, message):
super().__init__(message)
[docs]
class ArgumentsString(object):
""" A convenience object to aid in the creation of argument strings """
def __init__(self):
[docs]
self.argument_list = []
[docs]
def append(self, argument):
self.argument_list.append(argument)
[docs]
def add_positional_argument(self, value):
self.argument_list.append(f"{value}")
[docs]
def add_flag(self, flag):
self.argument_list.append(f"--{flag}")
[docs]
def add(self, argument, value):
self.argument_list.append(f"--{argument}")
self.argument_list.append(f"{value}")
[docs]
def add_unknown_args(self, unknown_args):
self.argument_list += unknown_args
[docs]
def add_command_line_arguments(self):
""" Adds command line arguments given in addition to the ini file """
command_line_args_list = get_command_line_arguments()
# Remove the first positional ini-file argument
command_line_args_list = command_line_args_list[1:]
self.argument_list += command_line_args_list
[docs]
def print(self):
return " ".join(self.argument_list)
[docs]
class DataDump(object):
def __init__(
self,
label,
outdir,
trigger_time,
likelihood_lookup_table,
likelihood_roq_weights,
likelihood_roq_params,
priors_dict,
priors_class,
interferometers,
meta_data,
idx,
):
[docs]
self.trigger_time = trigger_time
[docs]
self.interferometers = interferometers
[docs]
self.likelihood_lookup_table = likelihood_lookup_table
[docs]
self.likelihood_roq_weights = likelihood_roq_weights
[docs]
self.likelihood_roq_params = likelihood_roq_params
[docs]
self.priors_dict = priors_dict
[docs]
self.priors_class = priors_class
@staticmethod
[docs]
def get_filename(outdir, label):
return os.path.join(outdir, "_".join([label, "data_dump.pickle"]))
@property
[docs]
def filename(self):
return self.get_filename(self.outdir, self.label)
[docs]
def to_pickle(self):
with open(self.filename, "wb+") as file:
pickle.dump(self, file)
@classmethod
[docs]
def from_pickle(cls, filename=None):
""" Loads in a data dump
Parameters
----------
filename: str
If given, try to load from this filename
"""
with open(filename, "rb") as file:
res = pickle.load(file)
if res.__class__ != cls:
raise TypeError("The loaded object is not a DataDump")
return res
[docs]
class NoneWrapper(object):
"""
Wrapper around other types so that "None" always evaluates to None.
This is needed to properly read None from ini files.
Example
-------
>>> nonestr = NoneWrapper(str)
>>> nonestr("None")
None
>>> nonestr(None)
None
>>> nonestr("foo")
"foo"
>>> noneint = NoneWrapper(int)
>>> noneint("None")
None
>>> noneint(None)
None
>>> noneint(0)
0
"""
def __init__(self, type):
[docs]
def __call__(self, val):
if val == "None" or val is None:
return None
else:
return self.type(val)
[docs]
nonestr = NoneWrapper(str)
[docs]
noneint = NoneWrapper(int)
[docs]
nonefloat = NoneWrapper(float)
[docs]
DEFAULT_DISTANCE_LOOKUPS = {
"high_mass": (1e2, 5e3),
"4s": (1e2, 5e3),
"8s": (1e2, 5e3),
"16s": (1e2, 4e3),
"32s": (1e2, 3e3),
"64s": (50, 2e3),
"128s": (1, 5e2),
"128s_tidal": (1, 5e2),
}
[docs]
DURATION_LOOKUPS = {
"high_mass": 4,
"4s": 4,
"8s": 8,
"16s": 16,
"32s": 32,
"64s": 64,
"128s": 128,
"128s_tidal": 128,
}
[docs]
MAXIMUM_FREQUENCY_LOOKUPS = {
"high_mass": 1024,
"4s": 1024,
"8s": 2048,
"16s": 2048,
"32s": 2048,
"64s": 2048,
"128s": 4096,
"128s_tidal": 2048,
}
[docs]
SAMPLER_SETTINGS = {
"Default": {
"nlive": 1000,
"walks": 50,
"check_point_plot": True,
"n_check_point": 10000,
},
"FastTest": {
"nlive": 500,
"walks": 50,
"dlogz": 2,
"check_point_plot": True,
"n_check_point": 1000,
},
}
[docs]
def get_command_line_arguments():
""" Helper function to return the list of command line arguments """
return sys.argv[1:]
[docs]
def run_command_line(arguments, directory=None):
if directory:
pwd = os.path.abspath(".")
os.chdir(directory)
else:
pwd = None
print(f"\nRunning command $ {' '.join(arguments)}\n")
subprocess.call(arguments)
if pwd:
os.chdir(pwd)
[docs]
def parse_args(input_args, parser, allow_unknown=True):
""" Parse an argument list using parser generated by create_parser()
Parameters
----------
input_args: list
A list of arguments
Returns
-------
args: argparse.Namespace
A simple object storing the input arguments
unknown_args: list
A list of any arguments in `input_args` unknown by the parser
"""
if len(input_args) == 0:
raise BilbyPipeError("No command line arguments provided")
ini_file = input_args[0]
if os.path.isfile(ini_file) is False:
if os.path.isfile(os.path.basename(ini_file)):
input_args[0] = os.path.basename(ini_file)
args, unknown_args = parser.parse_known_args(input_args)
return args, unknown_args
[docs]
def check_directory_exists_and_if_not_mkdir(directory):
""" Checks if the given directory exists and creates it if it does not exist
Parameters
----------
directory: str
Name of the directory
"""
if not os.path.exists(directory):
os.makedirs(directory)
logger.debug(f"Making directory {directory}")
else:
logger.debug(f"Directory {directory} exists")
[docs]
def setup_logger(outdir=None, label=None, log_level="INFO"):
""" Setup logging output: call at the start of the script to use
Parameters
----------
outdir, label: str
If supplied, write the logging output to outdir/label.log
log_level: str, optional
['debug', 'info', 'warning']
Either a string from the list above, or an integer as specified
in https://docs.python.org/2/library/logging.html#logging-levels
"""
if "-v" in sys.argv:
log_level = "DEBUG"
if isinstance(log_level, str):
try:
level = getattr(logging, log_level.upper())
except AttributeError:
raise ValueError(f"log_level {log_level} not understood")
else:
level = int(log_level)
logger = logging.getLogger("bilby_pipe")
logger.propagate = False
logger.setLevel(level)
streams = [isinstance(h, logging.StreamHandler) for h in logger.handlers]
if len(streams) == 0 or not all(streams):
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(
logging.Formatter(
"%(asctime)s %(name)s %(levelname)-8s: %(message)s", datefmt="%H:%M"
)
)
stream_handler.setLevel(level)
logger.addHandler(stream_handler)
if any([isinstance(h, logging.FileHandler) for h in logger.handlers]) is False:
if label:
if outdir:
check_directory_exists_and_if_not_mkdir(outdir)
else:
outdir = "."
log_file = f"{outdir}/{label}.log"
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)-8s: %(message)s", datefmt="%H:%M"
)
)
file_handler.setLevel(level)
logger.addHandler(file_handler)
for handler in logger.handlers:
handler.setLevel(level)
[docs]
def convert_string_to_tuple(string, key=None, n=None):
"""" Convert a string to a tuple
Parameters
----------
string: str
The string to convert
key: str
Name used for printing useful debug messages
n: int
The length of the string to check against, if None no check performed.
Returns
-------
tup: tuple
A tuple
"""
try:
tup = ast.literal_eval(string)
except ValueError as e:
if key is not None:
raise BilbyPipeError(f"Error {e}: unable to convert {key}: {string}")
else:
raise BilbyPipeError(f"Error {e}: unable to {string}")
if n is not None:
if len(tup) != n:
raise BilbyPipeError(
f"Passed string {string} should be a tuple of length {n}"
)
return tup
[docs]
def convert_string_to_dict(string, key=None):
""" Convert a string repr of a string to a python dictionary
Parameters
----------
string: str
The string to convert
key: str (None)
A key, used for debugging
"""
if string == "None":
return None
string = strip_quotes(string)
# Convert equals to colons
string = string.replace("=", ":")
string = string.replace(" ", "")
string = re.sub(r'([A-Za-z/\.0-9\-\+][^\[\],:"}]*)', r'"\g<1>"', string)
# Force double quotes around everything
string = string.replace('""', '"')
# Evaluate as a dictionary of str: str
try:
dic = ast.literal_eval(string)
if isinstance(dic, str):
raise BilbyPipeError(f"Unable to format {string} into a dictionary")
except (ValueError, SyntaxError) as e:
if key is not None:
raise BilbyPipeError(f"Error {e}. Unable to parse {key}: {string}")
else:
raise BilbyPipeError(f"Error {e}. Unable to parse {string}")
# Convert values to bool/floats/ints where possible
dic = convert_dict_values_if_possible(dic)
return dic
[docs]
def convert_string_to_list(string):
""" Converts a string to a list, e.g. the mode_array waveform argument
See tests/utils_test for tested behaviour.
Parameters:
-----------
string: str
The input string to convert
Returns
-------
new_list: list
A list (or lists)
"""
if type(string) not in [str, list]:
return string
if (string.count("[") == 1) and (string.count("]") == 1):
string = str(sanitize_string_for_list(string))
try:
new_list = ast.literal_eval(str(string))
except ValueError:
return string
if not isinstance(new_list, list):
return new_list
for ii, ell in enumerate(new_list):
new_list[ii] = convert_string_to_list(ell)
return new_list
[docs]
def sanitize_string_for_list(string):
string = string.replace(",", " ")
string = string.replace("[", "")
string = string.replace("]", "")
string = string.replace('"', "")
string = string.replace("'", "")
string_list = string.split()
return string_list
[docs]
def convert_dict_values_if_possible(dic):
for key in dic:
if isinstance(dic[key], str) and dic[key].lower() == "true":
dic[key] = True
elif isinstance(dic[key], str) and dic[key].lower() == "false":
dic[key] = False
elif isinstance(dic[key], str):
dic[key] = string_to_int_float(dic[key])
elif isinstance(dic[key], dict):
dic[key] = convert_dict_values_if_possible(dic[key])
return dic
[docs]
def write_config_file(config_dict, filename, comment=None, remove_none=False):
""" Writes ini file
Parameters
----------
config_dict: dict
Dictionary of parameters for ini file
filename: str
Filename to write the config file to
comment: str
Additional information on ini file generation
remove_none: bool
If true, remove None's from the config_dict before writing otherwise
a ValueError is raised
"""
logger.warning(
"write_config_file has been deprecated, it will be removed in a future version"
)
if remove_none:
config_dict = {key: val for key, val in config_dict.items() if val is not None}
if None in config_dict.values():
raise ValueError("config-dict is not complete")
with open(filename, "w+") as file:
if comment is not None:
print(f"{comment}", file=file)
for key, val in config_dict.items():
print(f"{key}={val}", file=file)
[docs]
def test_connection():
""" A generic test to see if the network is reachable """
try:
urllib.request.urlopen("https://google.com", timeout=1.0)
except urllib.error.URLError:
raise BilbyPipeError(
"It appears you are not connected to a network and so won't be "
"able to interface with GraceDB. You may wish to specify the "
" local-generation argument either in the configuration file "
"or by passing the --local-generation command line argument"
)
[docs]
def strip_quotes(string):
try:
return string.replace('"', "").replace("'", "")
except AttributeError:
return string
[docs]
def string_to_int_float(s):
try:
return int(s)
except ValueError:
try:
return float(s)
except ValueError:
return s
[docs]
def is_a_power_of_2(num):
num = int(num)
return num != 0 and ((num & (num - 1)) == 0)
[docs]
def next_power_of_2(x):
return 1 if x == 0 else 2 ** math.ceil(math.log2(x))
[docs]
def request_memory_generation_lookup(duration, roq=False):
""" Function to determine memory required at the data generation step """
if roq:
return int(max([8, min([60, duration])]))
else:
return 8
[docs]
def get_time_prior(time, uncertainty, name="geocent_time", latex_label="$t_c$"):
""""Generate a time prior given some uncertainty.
Parameters
----------
time: float
The GPS geocent_time (time of coalescence at the center of the Earth)
uncertainty: float
The +/- uncertainty based around the geocenter time.
name: str
The name of the time parameter
latex_label: str
The latex label for the time parameter
Returns
-------
A bilby.core.prior.Uniform for the time parameter.
"""
return bilby.core.prior.Uniform(
minimum=time - uncertainty,
maximum=time + uncertainty,
name=name,
latex_label=latex_label,
unit="$s$",
)
[docs]
def get_geocent_time_with_uncertainty(geocent_time, uncertainty):
"""Get a new geocent time within some uncertainty from the original geocent time.
Parameters
----------
geocent_time: float
The GPS geocent_time (time of coalescence at the center of the Earth)
uncertainty: float
The +/- uncertainty based around the geocenter time.
Returns
-------
A geocent GPS time (float) inside the range of geocent time - uncertainty and
geocent time + uncertainty.
"""
geocent_time_prior = get_time_prior(geocent_time, uncertainty)
return geocent_time_prior.sample()
[docs]
def comma_partition(s):
"""Partitions `s` at top-level commas"""
s = s.strip("{").strip("}")
in_parens = 0
ixs = []
for i, c in enumerate(s):
if c == "(":
in_parens += 1
if c == ")":
in_parens -= 1
if not in_parens and c == ",":
ixs.append(i)
return [s[sc] for sc in make_partition_slices(ixs)]
[docs]
def make_partition_slices(ixs):
"""Yields partitioning slices, skipping each index of `ixs`"""
ix_x = [None] + ixs
ix_y = ixs + [None]
for x, y in zip(ix_x, ix_y):
yield slice(x + 1 if x else x, y)
[docs]
def kv_parser(kv_str, remove_leading_namespace=False):
"""Takes a string in 'K=V' format and returns dictionary.
"""
try:
k, v = kv_str.split("=", 1)
return {k: v}
except ValueError:
raise BilbyPipeInternalError(f"Error in ini-dict reader when reading {kv_str}")
[docs]
def check_if_represents_int(s):
"""Checks if the string/bytes-like object/number s represents an int"""
try:
int(s)
return True
except ValueError:
return False
setup_logger()
[docs]
logger = logging.getLogger("bilby_pipe")