LALPulsar 7.1.2.1-bf6a62b
lalpulsar_CopyPublicSFTs.py
Go to the documentation of this file.
1##python
2# Copyright (C) 2022 Karl Wette
3#
4# This program is free software; you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation; either version 2 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with with program; see the file COPYING. If not, write to the
16# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17# MA 02110-1301 USA
18
19## \file
20## \ingroup lalpulsar_bin_SFTTools
21"""Copy SFTs between directories. The destination directory is organised
22following the convention detailed in the SFT spec (T040164)."""
23
24import argparse
25import os
26import sys
27import time
28import shutil
29from contextlib import contextmanager
30from concurrent.futures import ProcessPoolExecutor, as_completed
31from tqdm import tqdm
32import logging
33
34from lal import LALERRORBIT, LALWARNINGBIT, LALINFOBIT, LALTRACEBIT
35from lal import GetDebugLevel, ClobberDebugLevel
36
37from lalpulsar import git_version
38from lalpulsar import ValidateSFTFile, SFTErrorMessage
39from lalpulsar.public_sft_directory import public_sft_directory
40from lalpulsar.public_sft_directory import public_sft_directory_readme_md
41
42__author__ = "Karl Wette <karl.wette@ligo.org>"
43__version__ = git_version.id
44__date__ = git_version.date
45
46# Configure logging
47LOGGER = logging.getLogger(__name__.rsplit(".", 1)[-1])
48try:
49 from coloredlogs import ColoredFormatter as _Formatter
50except ImportError:
51 _Formatter = logging.Formatter
52if not LOGGER.hasHandlers():
53 _LOG_HANDLER = logging.StreamHandler()
54 _LOG_HANDLER.setFormatter(
55 _Formatter(
56 fmt="[%(asctime)s] %(levelname)+8s: %(message)s",
57 )
58 )
59 LOGGER.addHandler(_LOG_HANDLER)
60LOGGER.setLevel(logging.INFO)
61
62
63@contextmanager
65 saveDebugLevel = GetDebugLevel()
66 silentDebugLevel = saveDebugLevel & ~(
67 LALERRORBIT | LALWARNINGBIT | LALINFOBIT | LALTRACEBIT
68 )
69 ClobberDebugLevel(silentDebugLevel)
70 try:
71 yield None
72 finally:
73 ClobberDebugLevel(saveDebugLevel)
74
75
77 # parse command line
78 parser = argparse.ArgumentParser(description=__doc__)
79 parser.add_argument(
80 "-p", "--processes", type=int, default=1, help="number of copying processes"
81 )
82 parser.add_argument(
83 "-f", "--force", action="store_true", help="overwrite existing SFTs"
84 )
85 parser.add_argument(
86 "-t", "--test", action="store_true", help="test finding SFTs without copying"
87 )
88 parser.add_argument(
89 "-n",
90 "--no-validate",
91 dest="validate",
92 action="store_false",
93 help="do not validate destination SFTs",
94 )
95 parser.add_argument(
96 "--no-readme-md",
97 dest="readme_md",
98 action="store_false",
99 help="do not write README.md in the destination directory",
100 )
101 parser.add_argument("source_directory", type=str, help="SFT source directory")
102 parser.add_argument("dest_directory", type=str, help="SFT destination directory")
103 args = parser.parse_args()
104
105 # check arguments
106 if args.processes <= 0:
107 parser.error("--processes must be strictly positive")
108 if not os.path.isdir(args.source_directory):
109 parser.error("source_directory is not a directory")
110 if not os.path.isdir(args.dest_directory):
111 parser.error("dest_directory is not a directory")
112
113 return args
114
115
116def find_SFT_files(source_directory, dest_directory, force):
117 dest_dirs = set()
118 src_dest_paths = []
119
120 # find source SFT files
121 t0 = time.time()
122 num_SFTs = 0
123 print_progress = 100
124 print_progress_step = 100
125 print_progress_max = 1000
126 for src_root, _, src_files in os.walk(source_directory):
127 for src_file in src_files:
128 if src_file.endswith(".sft"):
129 src_path = os.path.join(src_root, src_file)
130 _, src_name = os.path.split(src_path)
131
132 # build SFT destination directory
133 dest_dir = os.path.join(dest_directory, public_sft_directory(src_name))
134 dest_path = os.path.join(dest_dir, src_name)
135
136 # skip file if already exists and force=False
137 if os.path.isfile(dest_path) and not force:
138 continue
139
140 # add to outputs
141 dest_dirs.add(dest_dir)
142 src_dest_paths.append((src_path, dest_path))
143
144 # print progress
145 num_SFTs += 1
146 if num_SFTs % print_progress == 0:
147 LOGGER.info(
148 "found {n} SFTs in {dt:0.1f} seconds".format(
149 n=num_SFTs,
150 dt=time.time() - t0,
151 )
152 )
153 print_progress += print_progress_step
154 if print_progress == print_progress_max:
155 print_progress_step *= 10
156 print_progress_max *= 10
157
158 LOGGER.info("found {n} SFTs to copy".format(n=num_SFTs))
159
160 return dest_dirs, src_dest_paths
161
162
163def make_dest_dirs(dest_dirs):
164 # make destination SFT directories
165 LOGGER.info("making {n} directories ...".format(n=len(dest_dirs)))
166 for dest_dir in dest_dirs:
167 if not os.path.isdir(dest_dir):
168 os.makedirs(dest_dir)
169 LOGGER.info("making {n} directories ... done".format(n=len(dest_dirs)))
170
171
172def copy_SFT_file(src_path, dest_path, validate):
173 # copy SFT with a temporary extension
174 tmp_dest_path = dest_path + "_TO_BE_VALIDATED"
175 shutil.copyfile(src_path, tmp_dest_path)
176
177 # validate SFT if requested
178 if validate:
179 with silence_xlal_error_messages() as _:
180 validate_errorcode = ValidateSFTFile(tmp_dest_path)
181 if validate_errorcode != 0:
182 validate_errorstr = SFTErrorMessage(validate_errorcode)
183 return (tmp_dest_path, validate_errorstr)
184
185 # move destination SFT to final location
186 os.rename(tmp_dest_path, dest_path)
187
188 return None
189
190
191def copy_all_SFT_files(src_dest_paths, validate, processes):
192 validate_errors = []
193
194 # create executor
195 LOGGER.info("copying {n} SFTs ...".format(n=len(src_dest_paths)))
196 with ProcessPoolExecutor(max_workers=args.processes) as executor:
197 # submit tasks
198 pool = [
199 executor.submit(copy_SFT_file, src_path, dest_path, validate)
200 for src_path, dest_path in src_dest_paths
201 ]
202
203 # collect tasks
204 for task in tqdm(as_completed(pool), total=len(pool)):
205 validate_error = task.result()
206 if validate_error is not None:
207 validate_errors.append(validate_error)
208
209 # show any validation errors
210 if validate_errors:
211 LOGGER.critical(
212 "failed to validate {n} SFTs after copying:".format(n=len(validate_errors))
213 )
214 for tmp_dest_path, validate_errorstr in validate_errors:
215 LOGGER.critical(" {p}".format(p=tmp_dest_path))
216 LOGGER.critical(" {e}".format(e=validate_errorstr))
217 sys.exit(1)
218
219 LOGGER.info("copying {n} SFTs ... done".format(n=len(src_dest_paths)))
220
221
222def write_readme_md(dest_directory):
223 # write README.md
224 with open(os.path.join(dest_directory, "README.md"), "w") as f:
226
227
228if __name__ == "__main__":
230
231 dest_dirs, src_dest_paths = find_SFT_files(
232 args.source_directory, args.dest_directory, args.force
233 )
234
235 make_dest_dirs(dest_dirs)
236
237 if args.test:
238 LOGGER.info("TESTING, not copying SFTs")
239 sys.exit(0)
240
241 copy_all_SFT_files(src_dest_paths, args.validate, args.processes)
242
243 if args.readme_md:
244 write_readme_md(args.dest_directory)
245
246 LOGGER.info("DONE")
const char * SFTErrorMessage(int errorcode)
int ValidateSFTFile(const char *fname)
Verify that the contents of a SFT file are valid.
def copy_all_SFT_files(src_dest_paths, validate, processes)
def copy_SFT_file(src_path, dest_path, validate)
def find_SFT_files(source_directory, dest_directory, force)