Conversions and HTCondor
Some of the conversions available in pesummary
can be time-consuming. This is why many of the conversions can be parallelised
through Python’s
multiprocessing module.
However, it is not always ideal to highly parallelise the conversions through
the multiprocessing module. This is because if, for example, you wanted to run
on 100 CPUs and submit this job to a scheduler e.g.
HTCondor, 100 CPUs will
need to be available simultaneously before the job starts. This means that it is
possible for a highly parallised job to take a similar wall time as one which
runs in series since now the job will spend longer in the queue. To avoid
this pesummary
provides all of the tools needed to efficiently interact
with schedulers. This tutorial details how pesummary
can efficiently
interact with the HTCondor schedular and reduce wall time on time-consuming
conversions by parallelising over many CPUs. In this example we consider the
spin evolution to negative infinity. We will interact with HTCondor by writing a
Directed acyclic graph (DAG)
which uses the
summarysplit,
summarycombine and
summarycombine_posteriors
executables.
Designing the DAG
Our job needs to:
Split the input file into N files
Run the conversion module on each file and evolve the spins to negative infinity
Combine the N converted files into a single file
Step 1 can be achieved with the summarysplit executable, e.g.,
$ summarysplit --multi_process 4 --samples PATH_TO_SAMPLES \
--outdir ./output --file_format dat
Step 2 can be achieved with the summarycombine executable, e.g.,
$ summarycombine --webdir ./output/0 \
--samples ./output/split_posterior_samples_0.dat
--evolve_spins_backwards hybrid_orbit_averaged \
--labels evolve --gw
Finally we can combine all of the individually converted files with the summarycombine_posteriors executable, e.g.,
$ summarycombine_posteriors --outdir ./output --filename combined_samples.dat \
--use_all --file_format dat --labels evolve \
--samples ./output/*/samples/evolve_pesummary.dat \
Writing the DAG
The following code snippet creates a dag which a) splits the input file into N seperate analyses where N is the number of posterior samples contained with the input file, b) evolve each sample to negative infinity and c) combine all files into a single dat.
# Licensed under an MIT style license -- see
# https://git.ligo.org/lscsoft/pesummary/-/blob/master/LICENSE.md
import os
import glob
import argparse
import numpy as np
from pesummary.io import read
from pesummary.core.webpage.main import _WebpageGeneration
__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
def command_line():
"""Generate an Argument Parser object to control command line options
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-d", "--dir", required=True, help="directory of split posterior samples",
)
parser.add_argument(
"-f", "--f_ref", help="Reference frequency to use for spin evolution in Hz",
default=20
)
parser.add_argument(
"--filename", help="dat file containing posterior samples to evolve",
default=None
)
return parser
def create_submit_file(
filename, executable, arguments, stage, cpus=1, memory=2048,
accounting_group="ligo.prod.o3.cbc.pe.lalinference",
accounting_user="charlie.hoy"
):
"""Write a basic submit file for HTCondor
Parameters
----------
filename: str
name of the submit file
executable: str
the executable you wish to use
arguments: str
a space seperated list of arguments you wish to pass to the executable.
If you do not wish to pass any arguments simply pass an empty string
stage: str
a unique string which is used when naming the log, error and output files
cpus: int, optional
number of cpus to use when executing the job. Default 1
memory: int, optional
amount of memory to allocate for the job in MB. Default 2048 MB
accounting_group: str, optional
accounting group to use for the job. Default 'ligo.prod.o3.cbc.pe.lalinference'
accounting_user: str, optional
accounting user to use for the job. Default 'charlie.hoy'
"""
with open(filename, "w") as f:
f.writelines(
[
'universe = vanilla\n',
f'executable = {executable}\n',
f'arguments = "{arguments}"\n',
f'request_cpus = {cpus}\n',
'getenv = True\n',
'requirements = \n',
f'request_memory = {memory}\n',
f'log = $(webdir)/{stage}.log\n',
f'error = $(webdir)/{stage}.err\n',
f'output = $(webdir)/{stage}.out\n',
f'accounting_group = {accounting_group}\n',
f'accounting_group_user = {accounting_user}\n',
'notification = never\n',
'queue 1\n'
]
)
return
def main():
"""Top level interface for `make_dag.py`
"""
parser = command_line()
opts = parser.parse_args()
_base_dir = os.path.abspath(opts.dir)
# write submit files
create_submit_file(
os.path.join(_base_dir, "split.sub"),
_WebpageGeneration.get_executable('summarysplit'),
'--multi_process 4 --samples $(samples) --outdir $(webdir) --file_format dat',
'split', cpus=4, memory=8192
)
create_submit_file(
os.path.join(_base_dir, "generate.sub"),
_WebpageGeneration.get_executable('summarycombine'), (
'--webdir $(webdir) --samples $(samples) --evolve_spins_backwards '
'hybrid_orbit_averaged --labels $(label) --gw --no_ligo_skymap '
'--f_ref $(f_ref)'
), 'evolve', cpus=1, memory=16384
)
create_submit_file(
os.path.join(_base_dir, "combine.sub"),
_WebpageGeneration.get_executable('summarycombine_posteriors'), (
'--samples $(webdir)/*/samples/evolve_pesummary.dat '
'--use_all --file_format dat --filename combined_samples.dat '
'--labels evolve --outdir $(webdir)'
), 'combine', cpus=1, memory=4096
)
if opts.filename is None:
_base_file = f'{os.path.join(_base_dir, "extracted_samples.dat")}'
else:
_base_file = opts.filename
# write the dag
with open(os.path.join(_base_dir, "generate_hybrid_orbit_averaged.dag"), "w") as f:
f.writelines(
[
f'JOB SPLIT {os.path.join(_base_dir, "split.sub")}\n',
f'VARS SPLIT samples="{_base_file}" webdir="{_base_dir}"\n'
]
)
_open = read(_base_file).samples_dict
N_samples = _open.number_of_samples
for num in range(N_samples):
ff = os.path.join(_base_dir, f"split_posterior_samples_{num}.dat")
f.writelines(
[
f'JOB GENERATE{num} {os.path.join(_base_dir, "generate.sub")}\n', (
f'VARS GENERATE{num} label="evolve" samples="{ff}" '
f'webdir="{os.path.join(_base_dir, str(num))}" '
f'f_ref="{opts.f_ref}"\n'
)
]
)
try:
os.makedirs(os.path.join(_base_dir, str(num)))
except FileExistsError:
pass
f.writelines(
[
"\n", f'JOB COMBINE {os.path.join(_base_dir, "combine.sub")}\n',
f'VARS COMBINE webdir="{_base_dir}"\n'
]
)
for num in range(N_samples):
f.writelines([f"PARENT SPLIT CHILD GENERATE{num}\n"])
f.writelines([f"PARENT GENERATE{num} CHILD COMBINE\n"])
return
if __name__ == "__main__":
main()
We can then create and submit the dag with the following,
$ python make_dag.py --dir ./output --filename ./posterior_samples.dat
$ condor_submit_dag ./output/generate_hybrid_orbit_averaged.dag
The evolved and post-processed posterior samples can then be found at
./output/combined_samples.dat
.