#!/usr/bin/env python"""bilby_pipe is a command line tools for taking user input (as command linearguments or an ini file) and creating DAG files for submitting bilby parameterestimation jobs. To get started, write an ini file `config.ini` and run$ bilby_pipe config.iniInstruction for how to submit the job are printed in a log message. You canalso specify extra arguments from the command line, e.g.$ bilby_pipe config.ini --submitwill build and submit the job."""importimportlibimportjsonimportosimportnumpyasnpimportpandasaspdfrom.create_injectionsimportcreate_injection_filefrom.inputimportInputfrom.job_creationimportgenerate_dagfrom.parserimportcreate_parserfrom.utilsimport(BilbyPipeError,get_colored_string,get_command_line_arguments,get_outdir_name,log_version_information,logger,parse_args,request_memory_generation_lookup,tcolors,)
[docs]classMainInput(Input):"""An object to hold all the inputs to bilby_pipe"""def__init__(self,args,unknown_args,perform_checks=True):super().__init__(args,unknown_args,print_msg=False)
forplot_attrin["calibration","corner","marginal","skymap","waveform",]:attr=f"plot_{plot_attr}"setattr(self,attr,getattr(args,attr))ifgetattr(self,attr):self.plot_node_needed=True# Set all other plotting optionsforplot_attrin["trace","data","injection","spectrogram","format",]:attr=f"plot_{plot_attr}"setattr(self,attr,getattr(args,attr))
@notification.setterdefnotification(self,notification):valid_settings=["Always","Complete","Error","Never"]ifnotificationinvalid_settings:self._notification=notificationelse:raiseBilbyPipeError("'{}' is not a valid notification setting. ""Valid settings are {}.".format(notification,valid_settings))@property
@n_simulation.setterdefn_simulation(self,n_simulation):logger.debug(f"Setting n_simulation={n_simulation}")ifisinstance(n_simulation,int)andn_simulation>=0:self._n_simulation=n_simulationelifn_simulationisNone:self._n_simulation=0else:raiseBilbyPipeError(f"Input n_simulation={n_simulation} not understood")@property
@use_mpi.setterdefuse_mpi(self,use_mpi):ifuse_mpi:logger.debug(f"Turning on MPI for {self.sampler}")self._use_mpi=use_mpi@staticmethod
[docs]defcheck_source_model(args):"""Check the source model consistency with the approximant"""if"tidal"inargs.waveform_approximant.lower():if"neutron_star"notinargs.frequency_domain_source_model.lower():msg=[tcolors.WARNING,"You appear to be using a tidal waveform with the",f"{args.frequency_domain_source_model} source model.","You may want to use `frequency-domain-source-model=","lal_binary_neutron_star`.",tcolors.END,]logger.warning(" ".join(msg))
@staticmethod
[docs]defcheck_calibration_prior_boundary(args):# List of recommendations: print warning if these are not adhered torecs=dict(bilby_mcmc=None,dynesty="reflective")suggested_boundary=recs.get(args.sampler,args.calibration_prior_boundary)ifargs.calibration_prior_boundary!=suggested_boundary:msg=("You have requested a calibration prior boundary "f"{args.calibration_prior_boundary}, but {suggested_boundary} ""is recommended.")logger.warning(get_colored_string(msg))
[docs]defcheck_cpu_parallelisation(self):request_cpus=self.request_cpusnpool=self.sampler_kwargs.get("npool",request_cpus)ifrequest_cpus!=npool:msg=(f"request-cpus={request_cpus}, but sampler_kwargs[npool]={npool}:""this may cause inefficient performance")logger.warning(get_colored_string(msg))
[docs]defcheck_injection(self):"""Check injection behaviour If injections are requested, either use the injection-dict, injection-file, or create an injection-file """default_injection_file_name="{}/{}_injection_file.dat".format(self.data_directory,self.label)ifself.injection_dictisnotNone:logger.debug("Using injection dict from ini file {}".format(json.dumps(self.injection_dict,indent=2)))elifself.injection_fileisnotNone:logger.debug(f"Using injection file {self.injection_file}")elifos.path.isfile(default_injection_file_name):# This is done to avoid overwriting the injection filelogger.debug(f"Using injection file {default_injection_file_name}")self.injection_file=default_injection_file_nameelse:logger.debug("No injection file found, generating one now")ifself.gps_fileisnotNoneorself.gps_tupleisnotNone:ifself.n_simulation>0andself.n_simulation!=len(self.gpstimes):raiseBilbyPipeError("gps_file/gps_tuple option and n_simulation are not matched")gpstimes=self.gpstimesn_injection=len(gpstimes)else:gpstimes=Nonen_injection=self.n_simulationifself.trigger_timeisNone:trigger_time_injections=0else:trigger_time_injections=self.trigger_timecreate_injection_file(filename=default_injection_file_name,prior_file=self.prior_file,prior_dict=self.prior_dict,n_injection=n_injection,trigger_time=trigger_time_injections,deltaT=self.deltaT,gpstimes=gpstimes,duration=self.duration,post_trigger_duration=self.post_trigger_duration,generation_seed=self.generation_seed,extension="dat",default_prior=self.default_prior,)self.injection_file=default_injection_file_name# Check the gps_file has the sample length as number of simulationifself.gps_fileisnotNone:iflen(self.gpstimes)!=len(self.injection_df):raiseBilbyPipeError("Injection file length does not match gps_file")ifself.n_simulation>0:ifself.n_simulation!=len(self.injection_df):raiseBilbyPipeError("n-simulation does not match the number of injections: ""please check your ini file")elifself.n_simulation==0andself.gps_fileisNone:self.n_simulation=len(self.injection_df)logger.debug(f"Setting n_simulation={self.n_simulation} to match injections")
[docs]defwrite_complete_config_file(parser,args,inputs,input_cls=MainInput):args_dict=vars(args).copy()forkey,valinargs_dict.items():ifkey=="label":continueifisinstance(val,str):ifos.path.isfile(val)oros.path.isdir(val):setattr(args,key,os.path.abspath(val))ifisinstance(val,list):iflen(val)==0:setattr(args,key,"[]")elifisinstance(val[0],str):setattr(args,key,f"[{', '.join(val)}]")args.sampler_kwargs=str(inputs.sampler_kwargs)args.submit=Falseparser.write_to_file(filename=inputs.complete_ini_file,args=args,overwrite=False,include_description=False,)# Verify that the written complete config is identical to the source configcomplete_args=parser.parse([inputs.complete_ini_file])complete_inputs=input_cls(complete_args,"",perform_checks=False)ignore_keys=["scheduler_module","submit"]differences=[]forkey,valininputs.__dict__.items():ifkeyinignore_keys:continueifkeynotincomplete_args:continueifisinstance(val,pd.DataFrame)andall(val==complete_inputs.__dict__[key]):continueifisinstance(val,np.ndarray)andall(np.array(val)==np.array(complete_inputs.__dict__[key])):continueifisinstance(val,str)andos.path.isfile(val):# Check if it is relpath vs abspathifos.path.abspath(val)==os.path.abspath(complete_inputs.__dict__[key]):continueifval==complete_inputs.__dict__[key]:continuedifferences.append(key)iflen(differences)>0:forkeyindifferences:print(key,f"{inputs.__dict__[key]} -> {complete_inputs.__dict__[key]}")raiseBilbyPipeError("The written config file {} differs from the source {} in {}".format(inputs.ini,inputs.complete_ini_file,differences))else:logger.info(f"To see full configuration, check {inputs.complete_ini_file}")
[docs]defmain():"""Top-level interface for bilby_pipe"""parser=create_parser(top_level=True)args,unknown_args=parse_args(get_command_line_arguments(),parser)ifargs.analysis_executable_parserisnotNone:# Alternative parser requested, reload argsmodule=".".join(args.analysis_executable_parser.split(".")[:-1])function=args.analysis_executable_parser.split(".")[-1]parser=getattr(importlib.import_module(module),function)()args,unknown_args=parse_args(get_command_line_arguments(),parser)# Check and sort outdirargs.outdir=args.outdir.replace("'","").replace('"',"")ifargs.overwrite_outdirisFalse:args.outdir=get_outdir_name(args.outdir)log_version_information()inputs=MainInput(args,unknown_args)write_complete_config_file(parser,args,inputs)generate_dag(inputs)iflen(unknown_args)>0:msg=[tcolors.WARNING,f"Unrecognized arguments {unknown_args}",tcolors.END]logger.warning(" ".join(msg))