LALPulsar 7.1.1.1-eeff03c
lalpulsar_MoveSFTs.py
Go to the documentation of this file.
1##python
2# Copyright (C) 2024 Evan Goetz
3#
4# This program is free software; you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation; either version 2 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with with program; see the file COPYING. If not, write to the
16# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17# MA 02110-1301 USA
18
19## \file
20## \ingroup lalpulsar_bin_SFTTools
21"""Move SFTs between directories."""
22
23import argparse
24import os
25import sys
26import shutil
27from contextlib import contextmanager
28from concurrent.futures import ProcessPoolExecutor, as_completed
29from tqdm import tqdm
30
31from lal import LALERRORBIT, LALWARNINGBIT, LALINFOBIT, LALTRACEBIT
32from lal import GetDebugLevel, ClobberDebugLevel
33
34from lalpulsar import git_version
35from lalpulsar import ValidateSFTFile, SFTErrorMessage
36
37__author__ = "Evan Goetz <evan.goetz@ligo.org>"
38__credits__ = "Karl Wette <karl.wette@ligo.org>"
39__version__ = git_version.id
40__date__ = git_version.date
41
42
43@contextmanager
45 saveDebugLevel = GetDebugLevel()
46 silentDebugLevel = saveDebugLevel & ~(
47 LALERRORBIT | LALWARNINGBIT | LALINFOBIT | LALTRACEBIT
48 )
49 ClobberDebugLevel(silentDebugLevel)
50 try:
51 yield None
52 finally:
53 ClobberDebugLevel(saveDebugLevel)
54
55
57 # parse command line
58 parser = argparse.ArgumentParser(description=__doc__)
59 parser.add_argument(
60 "-p", "--processes", type=int, default=1, help="number of moving processes"
61 )
62 parser.add_argument(
63 "-n",
64 "--no-validate",
65 dest="validate",
66 action="store_false",
67 help="do not validate destination SFTs",
68 )
69 parser.add_argument(
70 "-s", "--source-directory", type=str, help="SFT source directory"
71 )
72 parser.add_argument(
73 "-c",
74 "--channels",
75 type=str,
76 nargs="+",
77 help="Channel names (must be the same number as " "--dest-directory arguments)",
78 )
79 parser.add_argument(
80 "-d",
81 "--dest-directory",
82 type=str,
83 nargs="+",
84 help="SFT destination directory (must be the same "
85 "number as --channels arguments)",
86 )
87 args = parser.parse_args()
88
89 # check arguments
90 if args.processes <= 0:
91 parser.error("--processes must be strictly positive")
92 if not os.path.isdir(args.source_directory):
93 parser.error("source_directory is not a directory")
94 if len(args.channels) != len(args.dest_directory):
95 parser.error(
96 "Number of channel arguments must equal number of " "directory arguments"
97 )
98
99 return args
100
101
102def find_SFT_files(source_directory, channel, dest_directory):
103 # find SFT files for a specific channel and return the source-destiation tuple
104 src_dest_paths = []
105
106 # channel format in filename
107 ifo = channel.split(":")[0]
108 chan = channel.split(":")[1].replace("-", "").replace("_", "")
109
110 # find source SFT files
111 for src_root, _, src_files in os.walk(source_directory):
112 for src_file in src_files:
113 src_file_ifo = src_file.split("_")[1]
114 if (
115 src_file.endswith(".sft")
116 and chan in src_file
117 and src_file.startswith(ifo[0])
118 and src_file_ifo == ifo
119 ):
120 src_path = os.path.join(src_root, src_file)
121 _, src_name = os.path.split(src_path)
122
123 # build SFT destination path
124 dest_path = os.path.join(dest_directory, src_name)
125
126 # add to outputs if not already in the destination
127 if not src_path == dest_path:
128 src_dest_paths.append((src_path, dest_path))
129
130 return src_dest_paths
131
132
133def make_dest_dirs(dest_dirs):
134 # make destination SFT directories
135 print(f"{__file__}: making {len(dest_dirs)} directories ...", flush=True)
136 for dest_dir in dest_dirs:
137 os.makedirs(dest_dir, exist_ok=True)
138 print(f"{__file__}: making {len(dest_dirs)} directories ... done\n", flush=True)
139
140
141def move_SFT_file(src_path, dest_path, validate):
142 # move SFT with a temporary extension
143 tmp_dest_path = dest_path + "_TO_BE_VALIDATED"
144 shutil.move(src_path, tmp_dest_path)
145
146 # validate SFT if requested
147 if validate:
148 with silence_xlal_error_messages() as _:
149 validate_errorcode = ValidateSFTFile(tmp_dest_path)
150 if validate_errorcode != 0:
151 validate_errorstr = SFTErrorMessage(validate_errorcode)
152 return (tmp_dest_path, validate_errorstr)
153
154 # move destination SFT to final location
155 os.rename(tmp_dest_path, dest_path)
156
157 return None
158
159
160def move_all_SFT_files(src_dest_paths, validate, processes):
161 validate_errors = []
162
163 # create executor
164 print(f"{__file__}: copying {len(src_dest_paths)} SFTs ...", flush=True)
165 with ProcessPoolExecutor(max_workers=args.processes) as executor:
166 # submit tasks
167 pool = [
168 executor.submit(move_SFT_file, src_path, dest_path, validate)
169 for src_path, dest_path in src_dest_paths
170 ]
171
172 # collect tasks
173 for task in tqdm(as_completed(pool), total=len(pool)):
174 validate_error = task.result()
175 if validate_error is not None:
176 validate_errors.append(validate_error)
177
178 print("")
179
180 # show any validation errors
181 if validate_errors:
182 print(
183 f"{__file__}: failed to validate {len(validate_errors)} SFTs after copying:",
184 flush=True,
185 )
186 for tmp_dest_path, validate_errorstr in validate_errors:
187 print(f" {tmp_dest_path}\n {validate_errorstr}", flush=True)
188 sys.exit(1)
189
190 print(f"{__file__}: copying {len(src_dest_paths)} SFTs ... done\n", flush=True)
191
192
193if __name__ == "__main__":
195
196 dest_dirs = set()
197 src_dest_paths = []
198 for idx, c in enumerate(args.channels):
199 src_dest = find_SFT_files(args.source_directory, c, args.dest_directory[idx])
200
201 dest_dirs.add(args.dest_directory[idx])
202 src_dest_paths.extend(src_dest)
203
204 make_dest_dirs(dest_dirs)
205
206 move_all_SFT_files(src_dest_paths, args.validate, args.processes)
207
208 print(f"{__file__}: DONE", flush=True)
const char * SFTErrorMessage(int errorcode)
int ValidateSFTFile(const char *fname)
Verify that the contents of a SFT file are valid.
def move_SFT_file(src_path, dest_path, validate)
def move_all_SFT_files(src_dest_paths, validate, processes)
def find_SFT_files(source_directory, channel, dest_directory)
def make_dest_dirs(dest_dirs)