"""A set of generic utilities used in bilby_pipe"""importastimportjsonimportloggingimportmathimportosimportpickleimportreimportsubprocessimportsysimporturllibimporturllib.requestfromimportlibimportimport_moduleimportbilby
[docs]defadd_command_line_arguments(self):"""Adds command line arguments given in addition to the ini file"""command_line_args_list=get_command_line_arguments()# Remove the first positional ini-file argumentcommand_line_args_list=command_line_args_list[1:]self.argument_list+=command_line_args_list
[docs]deffrom_pickle(cls,filename=None):"""Loads in a data dump Parameters ---------- filename: str If given, try to load from this filename """withopen(filename,"rb")asfile:res=pickle.load(file)ifres.__class__!=cls:raiseTypeError("The loaded object is not a DataDump")returnres
[docs]classNoneWrapper(object):""" Wrapper around other types so that "None" always evaluates to None. This is needed to properly read None from ini files. Example ------- >>> nonestr = NoneWrapper(str) >>> nonestr("None") None >>> nonestr(None) None >>> nonestr("foo") "foo" >>> noneint = NoneWrapper(int) >>> noneint("None") None >>> noneint(None) None >>> noneint(0) 0 """def__init__(self,type):
[docs]defparse_args(input_args,parser,allow_unknown=True):"""Parse an argument list using parser generated by create_parser() Parameters ---------- input_args: list A list of arguments Returns ------- args: argparse.Namespace A simple object storing the input arguments unknown_args: list A list of any arguments in `input_args` unknown by the parser """iflen(input_args)==0:raiseBilbyPipeError("No command line arguments provided")ini_file=input_args[0]ifos.path.isfile(ini_file)isFalse:ifos.path.isfile(os.path.basename(ini_file)):input_args[0]=os.path.basename(ini_file)args,unknown_args=parser.parse_known_args(input_args)returnargs,unknown_args
[docs]defcheck_directory_exists_and_if_not_mkdir(directory):"""Checks if the given directory exists and creates it if it does not exist Parameters ---------- directory: str Name of the directory """ifnotos.path.exists(directory):os.makedirs(directory)logger.debug(f"Making directory {directory}")else:logger.debug(f"Directory {directory} exists")
[docs]defsetup_logger(outdir=None,label=None,log_level="INFO"):"""Setup logging output: call at the start of the script to use Parameters ---------- outdir, label: str If supplied, write the logging output to outdir/label.log log_level: str, optional ['debug', 'info', 'warning'] Either a string from the list above, or an integer as specified in https://docs.python.org/2/library/logging.html#logging-levels """if"-v"insys.argvor"--verbose"insys.argv:log_level="DEBUG"ifisinstance(log_level,str):try:level=getattr(logging,log_level.upper())exceptAttributeError:raiseValueError(f"log_level {log_level} not understood")else:level=int(log_level)logger=logging.getLogger("bilby_pipe")logger.propagate=Falselogger.setLevel(level)streams=[isinstance(h,logging.StreamHandler)forhinlogger.handlers]iflen(streams)==0ornotall(streams):stream_handler=logging.StreamHandler()stream_handler.setFormatter(logging.Formatter("%(asctime)s%(name)s%(levelname)-8s: %(message)s",datefmt="%H:%M"))stream_handler.setLevel(level)logger.addHandler(stream_handler)ifany([isinstance(h,logging.FileHandler)forhinlogger.handlers])isFalse:iflabel:ifoutdir:check_directory_exists_and_if_not_mkdir(outdir)else:outdir="."log_file=f"{outdir}/{label}.log"file_handler=logging.FileHandler(log_file)file_handler.setFormatter(logging.Formatter("%(asctime)s%(levelname)-8s: %(message)s",datefmt="%H:%M"))file_handler.setLevel(level)logger.addHandler(file_handler)forhandlerinlogger.handlers:handler.setLevel(level)
[docs]defget_outdir_name(outdir,fail_on_match=False,base_increment="A"):# Check if the directory existsifos.path.exists(outdir)isFalse:returnoutdirelse:msg=f"The outdir {outdir} already exists."iffail_on_match:raiseBilbyPipeError(msg)whileos.path.exists(outdir):outdir=generate_new_outdir_name(outdir)msg+=f" Incrementing outdir to {outdir}"logger.warning(get_colored_string(msg))returnoutdir
[docs]defgenerate_new_outdir_name(outdir):"""Generate a new outdir name to avoid naming conflicts The incrementing name format is "_A", "_B", etc. Once "_Z" is reached, we then move to "_AA" etc. """tokens=outdir.split("_")# Check if at least one _ and non-emptyif(len(tokens)<=1orlen(tokens[-1])==0orre.match("[A-Z]{1,2}$",tokens[-1])isNone):new_outdir=outdir+"_A"returnnew_outdir# Check if target matches standard formmatches=re.findall("^[A-Z]+",tokens[-1])iflen(matches)==1andmatches[0]!=tokens[-1]:new_outdir=outdir+"_A"returnnew_outdirnew_chars=[]suffix=tokens[-1]n_chars=len(suffix)stop_idx=-1foridxinrange(n_chars-1,-1,-1):iford(suffix[idx])==ord("Z"):new_chars.append("A")continueelse:new_chars.append(chr(ord(suffix[idx])+1))stop_idx=idxbreakiflen(new_chars)>0:new_chars.reverse()ifstop_idx==-1:new_suffix=""foridxinrange(n_chars):new_suffix=new_suffix+new_chars[idx]new_suffix=new_suffix+"A"# increase one characterelse:new_suffix=""foridxinrange(stop_idx):new_suffix=new_suffix+suffix[idx]foridxinrange(len(new_chars)):new_suffix=new_suffix+new_chars[idx]roots=tokens[:-1]new_outdir="_".join(roots)+"_"+new_suffixreturnnew_outdir
[docs]defconvert_string_to_tuple(string,key=None,n=None):"""Convert a string to a tuple Parameters ---------- string: str The string to convert key: str Name used for printing useful debug messages n: int The length of the string to check against, if None no check performed. Returns ------- tup: tuple A tuple """try:tup=ast.literal_eval(string)exceptValueErrorase:ifkeyisnotNone:raiseBilbyPipeError(f"Error {e}: unable to convert {key}: {string}")else:raiseBilbyPipeError(f"Error {e}: unable to {string}")ifnisnotNone:iflen(tup)!=n:raiseBilbyPipeError(f"Passed string {string} should be a tuple of length {n}")returntup
[docs]defconvert_string_to_dict(string,key=None):"""Convert a string repr of a string to a python dictionary Parameters ---------- string: str The string to convert key: str (None) A key, used for debugging """ifstring=="None":returnNonestring=strip_quotes(string)# Convert equals to colonsstring=string.replace("=",":")string=string.replace(" ","")string=re.sub(r'([A-Za-z/\.0-9\-\+][^\[\],:"}]*)',r'"\g<1>"',string)# Force double quotes around everythingstring=string.replace('""','"')# Evaluate as a dictionary of str: strtry:dic=ast.literal_eval(string)ifisinstance(dic,str):raiseBilbyPipeError(f"Unable to format {string} into a dictionary")except(ValueError,SyntaxError)ase:ifkeyisnotNone:raiseBilbyPipeError(f"Error {e}. Unable to parse {key}: {string}")else:raiseBilbyPipeError(f"Error {e}. Unable to parse {string}")# Convert values to bool/floats/ints where possibledic=convert_dict_values_if_possible(dic)returndic
[docs]defconvert_string_to_list(string):"""Converts a string to a list, e.g. the mode_array waveform argument See tests/utils_test for tested behaviour. Parameters: ----------- string: str The input string to convert Returns ------- new_list: list A list (or lists) """iftype(string)notin[str,list]:returnstringif(string.count("[")==1)and(string.count("]")==1):string=str(sanitize_string_for_list(string))try:new_list=ast.literal_eval(str(string))exceptValueError:returnstringifnotisinstance(new_list,list):returnnew_listforii,ellinenumerate(new_list):new_list[ii]=convert_string_to_list(ell)returnnew_list
[docs]defwrite_config_file(config_dict,filename,comment=None,remove_none=False):"""Writes ini file Parameters ---------- config_dict: dict Dictionary of parameters for ini file filename: str Filename to write the config file to comment: str Additional information on ini file generation remove_none: bool If true, remove None's from the config_dict before writing otherwise a ValueError is raised """logger.warning("write_config_file has been deprecated, it will be removed in a future version")ifremove_none:config_dict={key:valforkey,valinconfig_dict.items()ifvalisnotNone}ifNoneinconfig_dict.values():raiseValueError("config-dict is not complete")withopen(filename,"w+")asfile:ifcommentisnotNone:print(f"{comment}",file=file)forkey,valinconfig_dict.items():print(f"{key}={val}",file=file)
[docs]deftest_connection():"""A generic test to see if the network is reachable"""try:urllib.request.urlopen("https://google.com",timeout=1.0)excepturllib.error.URLError:raiseBilbyPipeError("It appears you are not connected to a network and so won't be ""able to interface with GraceDB. You may wish to specify the "" local-generation argument either in the configuration file ""or by passing the --local-generation command line argument")
[docs]defrequest_memory_generation_lookup(duration,roq=False):"""Function to determine memory required at the data generation step"""ifroq:returnint(max([8,min([60,duration])]))else:return8
[docs]defget_time_prior(time,uncertainty,name="geocent_time",latex_label="$t_c$"):"""Generate a time prior given some uncertainty. Parameters ---------- time: float The GPS geocent_time (time of coalescence at the center of the Earth) uncertainty: float The +/- uncertainty based around the geocenter time. name: str The name of the time parameter latex_label: str The latex label for the time parameter Returns ------- A bilby.core.prior.Uniform for the time parameter. """returnbilby.core.prior.Uniform(minimum=time-uncertainty,maximum=time+uncertainty,name=name,latex_label=latex_label,unit="$s$",)
[docs]defget_geocent_time_with_uncertainty(geocent_time,uncertainty):"""Get a new geocent time within some uncertainty from the original geocent time. Parameters ---------- geocent_time: float The GPS geocent_time (time of coalescence at the center of the Earth) uncertainty: float The +/- uncertainty based around the geocenter time. Returns ------- A geocent GPS time (float) inside the range of geocent time - uncertainty and geocent time + uncertainty. """geocent_time_prior=get_time_prior(geocent_time,uncertainty)returngeocent_time_prior.sample()
[docs]defconvert_detectors_input(string):"""Convert string inputs into a standard form for the detectors Parameters ---------- string: str A string representation to be converted Returns ------- detectors: list A sorted list of detectors """ifstringisNone:raiseBilbyPipeError("No detector input")ifisinstance(string,list):string=",".join(string)ifisinstance(string,str)isFalse:raiseBilbyPipeError(f"Detector input {string} not understood")# Remove square bracketsstring=string.replace("[","").replace("]","")# Remove added quotesstring=strip_quotes(string)# Replace multiple spaces with a single spacestring=" ".join(string.split())# Spaces can be either space or comma in input, convert to commastring=string.replace(" ,",",").replace(", ",",").replace(" ",",")detectors=string.split(",")detectors.sort()detectors=[det.upper()fordetindetectors]returndetectors
[docs]defcomma_partition(s):"""Partitions `s` at top-level commas"""s=s.strip("{").strip("}")in_parens=0ixs=[]fori,cinenumerate(s):ifc=="(":in_parens+=1ifc==")":in_parens-=1ifnotin_parensandc==",":ixs.append(i)return[s[sc]forscinmake_partition_slices(ixs)]
[docs]defmake_partition_slices(ixs):"""Yields partitioning slices, skipping each index of `ixs`"""ix_x=[None]+ixsix_y=ixs+[None]forx,yinzip(ix_x,ix_y):yieldslice(x+1ifxelsex,y)
[docs]defkv_parser(kv_str,remove_leading_namespace=False):"""Takes a string in 'K=V' format and returns dictionary."""try:k,v=kv_str.split("=",1)return{k:v}exceptValueError:raiseBilbyPipeInternalError(f"Error in ini-dict reader when reading {kv_str}")
[docs]defcheck_if_represents_int(s):"""Checks if the string/bytes-like object/number s represents an int"""try:int(s)returnTrueexceptValueError:returnFalse
[docs]defpretty_print_dictionary(dictionary):"""Convert an input dictionary to a pretty-printed string Parameters ---------- dictionary: dict Input dictionary Returns ------- pp: str The dictionary pretty-printed as a string """dict_as_str={key:str(val)forkey,valindictionary.items()}returnjson.dumps(dict_as_str,indent=2)
[docs]classDuplicateErrorDict(dict):"""An dictionary with immutable key-value pairs Once a key-value pair is initialized, any attempt to update the value for an existing key will raise a BilbyPipeError. Raises ------ BilbyPipeError: When a user attempts to update an existing key. """def__init__(self,color=True,*args):dict.__init__(self,args)
[docs]def__setitem__(self,key,val):ifkeyinself:msg=f"Your ini file contains duplicate '{key}' keys"ifself.color:msg=get_colored_string(msg)raiseBilbyPipeError(msg)dict.__setitem__(self,key,val)
[docs]defget_function_from_string_path(python_path):split=python_path.split(".")module_str=".".join(split[:-1])func_str=split[-1]try:returngetattr(import_module(module_str),func_str)exceptImportErrorase:raiseBilbyPipeError(f"Failed to load function {python_path}, full message: {e}")
[docs]defcheck_if_psd_is_from_built_in(psd_file):"""Check if the psd_file can be found in the bilby built-in directory"""psd_file_built_in=os.path.join(os.path.dirname(bilby.gw.detector.psd.__file__),"noise_curves",psd_file)returnos.path.isfile(psd_file_built_in)
[docs]defresolve_filename_with_transfer_fallback(filename):""" When absolute paths are transferred using HTCondor, the absolute path is replaced with the basename of the file. This function checks if the file exists, if not, checks if the basename exists. If the basename exists, returns the basename, otherwise returns None. Parameters ---------- filename: str The filename to check Returns ------- filename: str The filename, or basename, if it exists, otherwise None """ifos.path.isfile(filename):returnfilenameelifos.path.isfile(os.path.basename(filename)):returnos.path.basename(filename)returnNone