LALPulsar 7.1.1.1-eeff03c
lalpulsar_MakeSFTDAG.py
Go to the documentation of this file.
1##python
2# Copyright (C) 2013, 2014, 2020--2024 Evan Goetz
3# Copyright (C) 2011, 2021, 2022 Karl Wette
4# Copyright (C) 2005, 2007 Gregory Mendell
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with with program; see the file COPYING. If not, write to the
18# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19# MA 02110-1301 USA
20
21## \file
22## \ingroup lalpulsar_bin_SFTTools
23"""Creates DAGs to run jobs that generates SFTs"""
24
25import math
26import argparse
27import os
28import re
29from pathlib import Path
30from urllib.parse import urlparse
31
32from gwdatafind import find_urls
33from gwdatafind.utils import filename_metadata, file_segment
34
35from igwn_segments import segment, segmentlist
36
37from lalpulsar import (
38 git_version,
39 SFTFilenameSpec,
40 FillSFTFilenameSpecStrings,
41 BuildSFTFilenameFromSpec,
42)
43
44__author__ = "Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
45__version__ = git_version.id
46__date__ = git_version.date
47
48cache_re = re.compile(r"^([A-Z])(\s+)(\w+)(\s+)(\d+)(\s+)(\d+)(\s+)(.+gwf)")
49
50
51# REVISIONS:
52# 12/02/05 gam; generate datafind.sub and MakeSFTs.sub as well as dag file in
53# PWD, with log files based subLogPath and dag filename.
54# 12/28/05 gam; Add option --make-gps-dirs, -D <num>, to make directory based
55# on this many GPS digits.
56# 12/28/05 gam; Add option --misc-desc, -X <string> giving misc. part of the
57# SFT description field in the filename.
58# 12/28/05 gam; Add options --start-freq -F and --band -B options to enter
59# these.
60# 12/28/05 gam; Add in --window-type, -w options; 0 = no window, 1 = default =
61# Matlab style Tukey window; 2 = make_sfts.c Tukey window; 3 =
62# Hann window.
63# 12/28/05 gam; Add option --overlap-fraction -P (for use with windows; e.g.,
64# use -P 0.5 with -w 3 Hann windows; default is 0.0)
65# 12/28/05 gam; Add --sft-version, -v option to select output SFT version (1 =
66# default is version 1 SFTs; 2 = version 2 SFTs.
67# 12/28/05 gam; Add --comment-field, -c option, for comment for version 2 SFTs.
68# 12/28/05 gam; Remove sample rate option
69# 01/09/06 gam; Add -Z option; write SFT to .*.tmp file, then move to final
70# file name.
71# 01/14/07 gam; Add -u option to specify frame struct and type; add -i option
72# to specify IFO name.
73# 07/24/07 gam; Add in -q option to read in list of nodes on which to output
74# SFTs, -Q option to give node path, and -R option for number of
75# jobs per node.
76# 04/XX/13 eag; Add -y option to synchronize the start times of SFTs.
77# 07/24/14 eag; Change default to version 2 SFTs
78# 12/2020 eag; Update script to conform to modern python3 and pep8
79# 10/2020 kww; Pass args directly to writeToDag(), use Python f-strings
80# 10/2022 kww; Deprecate options that have been removed from MakeSFTs
81# 10/2022 kww; Parse window type as a string, parameter separated by colon
82# 10/2022 kww; Merge -O and -o log path options to free up -O option
83# 10/2022 kww; Implement public SFT file naming convention
84# 11/2022 kww; -R command line option now used for --observing-revision
85# instead of --output-jobs-per-node, which now uses -r
86# 11/2022 kww; --datafind-path and --makesfts-path accept executable names
87# 03/2023 eag; Allow user to pass a frame cache file --cache-file
88# 04/2023 kww; Improve documentation of --window-type argument
89# 05/2023 eag; Add the --gaps flag to gw_data_find
90# 08/2023 eag; Allow for multiple channel names to be provided
91# 09/2023 eag; Modify use of environment variables
92# 01/2024 eag; Allow skipping of channels if not in frames or too low data
93# rate
94# 10/2024 eag; Modify workflow for version 3 SFTs and HTCondor file transfer
95# workflow
96# 12/2024 eag; Modify workflow to use lalpulsar_MoveSFTs script instead of
97# remapping files
98# 03/2025 eag; Rewrite to generate cache files running lalpulsar_MakeSFTDAG
99# enabling frame files on /home or OSDF
100
101
103 obs,
104 gpsstart,
105 Tsft,
106 channel=None,
107 kind=None,
108 rev=None,
109 window="unknown",
110 par=None,
111 miscstr=None,
112):
113 """Create SFT file name from specification"""
114
115 spec = SFTFilenameSpec()
116
117 FillSFTFilenameSpecStrings(
118 spec=spec,
119 path=None,
120 extn=None,
121 detector=channel[:2],
122 window_type=window,
123 privMisc=miscstr,
124 pubObsKind=kind,
125 pubChannel=channel,
126 )
127 spec.pubObsRun = obs or 0
128 spec.pubRevision = rev or 0
129 spec.window_param = par or 0
130 spec.numSFTs = 1 # MakeSFTDAG will only ever generate 1 SFT per file
131 spec.SFTtimebase = Tsft
132 spec.gpsStart = gpsstart
133 spec.SFTspan = Tsft # MakeSFTDAG will only ever generate 1 SFT per file
134
135 return BuildSFTFilenameFromSpec(spec)
136
137
138def get_urls(args):
139 """Get frame file URL list from gwdatafind or cache file"""
140
141 if not args.cache_file:
142 urls = find_urls(
143 site,
144 args.input_data_type,
145 segList[0][0],
146 segList[-1][-1],
147 match=args.datafind_match,
148 urltype=args.datafind_urltype,
149 )
150 else:
151 urls = []
152 with open(args.cache_file, "r") as f:
153 for line in f:
154 m = cache_re.match(line)
155 if m:
156 framefile = m.group(9)
157 urls.append(framefile)
158
159 # sort the urls by gps time since find_urls() may not return a sorted list
160 sorted_urls = sorted(urls, key=lambda x: file_segment(x)[0])
161
162 return sorted_urls
163
164
165def make_cache(
166 urls,
167 job_seg,
168):
169 """Make a frame list and cache list from a list of URLs"""
170
171 cache = [] # list of lines for the cache file
172 frames = [] # list of frame filenames used in the job
173 for idx, url in enumerate(urls):
174 obs, desc, dataseg = filename_metadata(url)
175 dataseg = segment(dataseg)
176 if dataseg.disjoint(job_seg) < 0:
177 continue
178 if dataseg.disjoint(job_seg) > 0:
179 break
180 if dataseg.intersects(job_seg):
181 framefileurl = urlparse(url)
182 framefilepath = Path(framefileurl.path)
183
184 # list in cache file if files not visible on execute node
185 # otherwise use file url
186 if "/home" in str(framefilepath.parent) or "osdf" in framefileurl.scheme:
187 newcache = (
188 f"{obs}\t{desc}\t{dataseg[0]}\t{abs(dataseg)}\t{framefilepath.name}"
189 )
190 else:
191 newcache = f"{obs}\t{desc}\t{dataseg[0]}\t{abs(dataseg)}\t{url}"
192 cache.append(newcache)
193
194 if "/home" in str(framefilepath.parent):
195 frames.append(framefilepath)
196 else:
197 frames.append(url)
198
199 return frames, cache
200
201
202def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args):
203 """Write one job to DAG file"""
204
205 MakeSFTs = f"MakeSFTs_{nodeCount}"
206 tagStringOut = f"{args.tag_string}_{nodeCount}"
207
208 job_segment = segment(
209 startTimeThisNode - args.extra_datafind_time,
210 endTimeThisNode + args.extra_datafind_time,
211 )
212
213 frames, cache = make_cache(urls, job_segment)
214
215 obs, desc, dataseg = filename_metadata(urls[0])
216 cacheFile = args.cache_path / f"{obs}-{job_segment[0]}-{job_segment[1]}.cache"
217 with open(cacheFile, "w") as f:
218 for l in cache:
219 f.write(f"{l}\n")
220
221 argList = []
222 argList.append(f"-O {args.observing_run}")
223 if args.observing_run > 0:
224 argList.append(f"-K {args.observing_kind}")
225 argList.append(f"-R {args.observing_revision}")
226 elif args.misc_desc:
227 argList.append(f"-X {args.misc_desc}")
228 argList.append(f"-f {args.filter_knee_freq}")
229 argList.append(f"-t {args.time_baseline}")
230 # To work with the condor file transfer protocol, we save everything to the
231 # scratch directory because the files will have unique names. Since
232 # lalpulsar_MakeSFTs wants to have a path to save the file to, we provide .
233 argList.append(f"-p {','.join(['.' for p in args.output_sft_path])}")
234 # To work with the condor file transfer protocol, the cache file is saved
235 # to the scratch directory on transfer so we just need the name, not the
236 # full path
237 argList.append(f"-C {cacheFile.name}")
238 argList.append(f"-s {startTimeThisNode}")
239 argList.append(f"-e {endTimeThisNode}")
240 argList.append(f"-N {','.join(args.channel_name)}")
241 argList.append(f"-F {args.start_freq}")
242 argList.append(f"-B {args.band}")
243 if args.comment_field:
244 argList.append(f"-c {args.comment_field}")
245 if ":" in args.window_type:
246 window_type, window_param = args.window_type.split(":")
247 window_param = float(window_param)
248 argList.append(f"-w {window_type} -r {window_param}")
249 else:
250 window_type = args.window_type
251 window_param = None
252 argList.append(f"-w {window_type}")
253 if args.overlap_fraction:
254 argList.append(f"-P {args.overlap_fraction}")
255 if args.allow_skipping:
256 argList.append("--allow-skipping TRUE")
257 argStr = " ".join(argList)
258
259 # The files are going to go to specific directories, so we need to map
260 # the files to their output directories
261 outputfiles = []
262 remap = []
263 sft_start = startTimeThisNode
264 sft_end = sft_start + args.time_baseline
265 # loop over start times
266 while sft_end <= endTimeThisNode:
267 # loop over channels
268 for idx, c in enumerate(args.channel_name):
269 filename = sft_name_from_vars(
270 args.observing_run,
271 sft_start,
272 args.time_baseline,
273 c,
274 kind=args.observing_kind,
275 rev=args.observing_revision,
276 window=window_type,
277 par=window_param,
278 miscstr=args.misc_desc,
279 )
280 outputfiles.append(filename)
281 remap.append(f"{filename}={args.output_sft_path[idx]/filename}")
282
283 # update start and end times
284 if args.overlap_fraction:
285 sft_start += int(round((1 - args.overlap_fraction) * args.time_baseline))
286 else:
287 sft_start += args.time_baseline
288 sft_end = sft_start + args.time_baseline
289
290 # MakeSFT job
291 dagFID.write(f"JOB {MakeSFTs} {Path(dagFID.name).parent / 'MakeSFTs.sub'}\n")
292 dagFID.write(f"RETRY {MakeSFTs} 1\n")
293 dagFID.write(f'VARS {MakeSFTs} argList="{argStr}" cachefile="{cacheFile}" ')
294 if args.transfer_frame_files:
295 framefiles = ",".join([str(fr) for fr in frames])
296 dagFID.write(f'framefiles="{framefiles}" ')
297 dagFID.write(f'tagstring="{tagStringOut}"\n')
298
299
300#
301# MAIN CODE START HERE
302#
303
304parser = argparse.ArgumentParser(
305 description="This script creates MakeSFTs.sub, MoveSFTs.sub, and a dag \
306 file that generates SFTs based on the options given.",
307 fromfile_prefix_chars="@",
308)
309
310dag_group = parser.add_argument_group(
311 "DAG organization", "Options for workflow control"
312)
313datafind_group = parser.add_argument_group(
314 "Datafind", "Options for locating frame files"
315)
316makesfts_group = parser.add_argument_group(
317 "SFT creation", "Options for SFT creation and output"
318)
319deprecated_group = parser.add_argument_group("DEPRECATED")
320
321dag_group.add_argument(
322 "-f",
323 "--dag-file",
324 required=True,
325 type=Path,
326 help="filename for .dag file (should end in .dag)",
327)
328dag_group.add_argument(
329 "-G",
330 "--tag-string",
331 required=True,
332 type=str,
333 help="tag string used in names of various files unique to \
334 jobs that will run under the DAG",
335)
336dag_group.add_argument(
337 "-a",
338 "--analysis-start-time",
339 type=int,
340 help="GPS start time of data from which to generate \
341 SFTs (optional and unused if a segment file is given)",
342)
343dag_group.add_argument(
344 "-b",
345 "--analysis-end-time",
346 type=int,
347 help="GPS end time of data from which to generate SFTs \
348 (optional and unused if a segment file is given)",
349)
350dag_group.add_argument(
351 "-L",
352 "--max-length-all-jobs",
353 type=int,
354 help="maximum total amount of data to process, in seconds \
355 (optional and unused if a segment file is given)",
356)
357dag_group.add_argument(
358 "-g",
359 "--segment-file",
360 type=Path,
361 help="alternative file with segments to use, rather than \
362 the input times",
363)
364dag_group.add_argument(
365 "-l",
366 "--min-seg-length",
367 type=int,
368 default=0,
369 help="minimum length segments to process in seconds (used \
370 only if a segment file is given)",
371)
372dag_group.add_argument(
373 "-y",
374 "--synchronize-start",
375 action="store_true",
376 help="synchronize the start times of the SFTs so that the \
377 start times are synchronized when there are gaps in the \
378 data",
379)
380dag_group.add_argument(
381 "-o",
382 "--log-path",
383 type=Path,
384 default="logs",
385 help="path to log, output, and error files (default \
386 is $PWD/logs; this directory is created if it does not \
387 exist and usually should be under a local file system)",
388)
389dag_group.add_argument(
390 "-m",
391 "--max-num-per-node",
392 type=int,
393 default=1,
394 help="maximum number of SFTs to generate on one node",
395)
396dag_group.add_argument(
397 "-J",
398 "--makesfts-path",
399 type=Path,
400 help="string specifying the lalpulsar_MakeSFTs executable, \
401 or a path to it; if not set, will use \
402 MAKESFTS_PATH env variable or system default (in that \
403 order)",
404)
405dag_group.add_argument(
406 "--movesfts-path",
407 type=Path,
408 help="string specifying the lalpulsar_MoveSFTs executable, \
409 or a path to it; if not set, will use \
410 MOVESFTS_PATH env variable or system default (in that \
411 order)",
412)
413dag_group.add_argument(
414 "-Y",
415 "--request-memory",
416 type=int,
417 default=4096,
418 help="memory allocation in MB to request from condor for \
419 lalpulsar_MakeSFTs step",
420)
421dag_group.add_argument(
422 "-s",
423 "--request-disk",
424 type=int,
425 default=4096,
426 help="disk space allocation in MB to request from condor \
427 for lalpulsar_MakeSFTs step",
428)
429dag_group.add_argument(
430 "-A",
431 "--accounting-group",
432 required=True,
433 type=str,
434 help="Condor tag for the production of SFTs",
435)
436dag_group.add_argument(
437 "-U",
438 "--accounting-group-user",
439 required=True,
440 type=str,
441 help="albert.einstein username (do not add @LIGO.ORG)",
442)
443dag_group.add_argument(
444 "-t",
445 "--transfer-frame-files",
446 action="store_true",
447 help="Transfer frame files via HTCondor file transfer system. \
448 This should be specified if frames are not visible to the \
449 compute node file system. Ex. this should be specified if \
450 frames are on /home or running on the open science grid. \
451 Usually frame files are visible on CIT, LHO, LLO clusters \
452 so that this does not need to be specified in that case.",
453)
454
455datafind_group.add_argument(
456 "-d",
457 "--input-data-type",
458 required=True,
459 type=str,
460 help="input data type for use with the gw_data_find --type \
461 option",
462)
463datafind_group.add_argument(
464 "-x",
465 "--extra-datafind-time",
466 type=int,
467 default=0,
468 help="extra time to subtract/add from/to start/end time \
469 arguments of gw_data_find",
470)
471datafind_group.add_argument(
472 "-M",
473 "--datafind-match",
474 type=str,
475 help="string to use with the gw_data_find --match option",
476)
477datafind_group.add_argument(
478 "--datafind-urltype",
479 type=str,
480 default="file",
481 choices=["file", "osdf"],
482 help="String for the gw_data_find --urltype option. \
483 Use 'file' if creating SFTs on a local LDG cluster. \
484 Use 'osdf' if creating SFTs on the open science grid",
485)
486datafind_group.add_argument(
487 "-e",
488 "--cache-file",
489 type=Path,
490 help="path and filename to frame cache file to use instead \
491 of gw_data_find",
492)
493
494makesfts_group.add_argument(
495 "-O",
496 "--observing-run",
497 required=True,
498 type=int,
499 help="For public SFTs, observing run data the SFTs are generated from, or \
500 (in the case of mock data challenge data) the observing \
501 run on which the data is most closely based",
502)
503makesfts_group.add_argument(
504 "-K",
505 "--observing-kind",
506 type=str,
507 choices=["RUN", "AUX", "SIM", "DEV"],
508 help='For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
509 "AUX" for SFTs of non-h(t) channels; \
510 "SIM" for mock data challenge or other simulated data; or \
511 "DEV" for development/testing purposes',
512)
513makesfts_group.add_argument(
514 "-R",
515 "--observing-revision",
516 type=int,
517 help="For public SFTs: revision number starts at 1, and should be incremented once \
518 SFTs have been widely distributed across clusters, advertised \
519 as being ready for use, etc. For example, if mistakes are found \
520 in the initial SFT production run after they have been published, \
521 regenerated SFTs should have a revision number of at least 2",
522)
523makesfts_group.add_argument(
524 "-X",
525 "--misc-desc",
526 type=str,
527 help="For private SFTs, miscellaneous part of the SFT \
528 description field in the filename",
529)
530makesfts_group.add_argument(
531 "-k",
532 "--filter-knee-freq",
533 required=True,
534 type=float,
535 help="high pass filter knee frequency used on time domain \
536 data before generating SFTs",
537)
538makesfts_group.add_argument(
539 "-T",
540 "--time-baseline",
541 required=True,
542 type=int,
543 help="time baseline of SFTs (e.g., 60 or 1800 seconds)",
544)
545makesfts_group.add_argument(
546 "-p",
547 "--output-sft-path",
548 nargs="+",
549 type=Path,
550 help="Path where to save the SFT files. Can specify multiple options, \
551 If specifying multiple options then it is required to specify the \
552 same number of output-sft-path options as the number of channels. \
553 The first listed channel will have the SFTs go into the first \
554 listed output-sft-path. Otherwise specify only one output path. \
555 If one path is specified and more than 1 channels are specified \
556 then --observing-run must be >= 1 and --observing-kind and \
557 --observing-revision must be set",
558)
559makesfts_group.add_argument(
560 "-C",
561 "--cache-path",
562 type=Path,
563 default="cache",
564 help="path to cache files that will be produced by \
565 gw_data_find (default is $PWD/cache; this directory is \
566 created if it does not exist and must agree with that \
567 given in .sub files)",
568)
569makesfts_group.add_argument(
570 "-N",
571 "--channel-name",
572 nargs="+",
573 type=str,
574 help="Name of input time-domain channel to read from frames. \
575 Can specify multiple options. The number of channels must be \
576 equal to the number of output-sft-path options given. The \
577 first listed channel will have the SFTs go to the first listed \
578 output-sft-path. Can only specify one channel when generating \
579 private SFTs (--observing-run=0)",
580)
581makesfts_group.add_argument(
582 "-c", "--comment-field", type=str, help="comment for SFT header"
583)
584makesfts_group.add_argument(
585 "-F", "--start-freq", type=int, default=10, help="start frequency of the SFTs"
586)
587makesfts_group.add_argument(
588 "-B", "--band", type=int, default=1990, help="frequency band of the SFTs"
589)
590makesfts_group.add_argument(
591 "-w",
592 "--window-type",
593 type=str,
594 default="tukey:0.001",
595 help='type of windowing of time-domain to do \
596 before generating SFTs, e.g. "rectangular", \
597 "hann", "tukey:<beta in [0,1], required>"; \
598 (default is "tukey:0.001", standard choice for LVK production SFTs)',
599)
600makesfts_group.add_argument(
601 "-P",
602 "--overlap-fraction",
603 type=float,
604 default=0,
605 help="overlap fraction (for use with windows; e.g., use \
606 --overlap-fraction 0.5 with --window-type hann windows)",
607)
608makesfts_group.add_argument(
609 "--allow-skipping",
610 action="store_true",
611 help="allow channels to be skipped if not in frames or too low sampling \
612 frequency",
613)
614makesfts_group.add_argument(
615 "--no-validate",
616 dest="validate",
617 action="store_false",
618 help="do not validate created SFTs",
619)
620
621##### DEPRECATED OPTIONS #####
622class DeprecateAction(argparse.Action):
623 def __call__(self, parser, namespace, values, option_string=None):
624 parser.error(
625 f"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
626 )
627
628
629deprecated_group.add_argument(
630 "-u",
631 "--frame-struct-type",
632 nargs=0,
633 action=DeprecateAction,
634 help="DEPRECATED. No longer required; \
635 the frame channel type is determined automatically",
636)
637deprecated_group.add_argument(
638 "-H",
639 "--use-hoft",
640 nargs=0,
641 action=DeprecateAction,
642 help="DEPRECATED. No longer required; \
643 the frame channel type is determined automatically",
644)
645deprecated_group.add_argument(
646 "-i",
647 "--ifo",
648 nargs=0,
649 action=DeprecateAction,
650 help="DEPRECATED. No longer required; \
651 the detector prefix is deduced from the channel name",
652)
653deprecated_group.add_argument(
654 "-D",
655 "--make-gps-dirs",
656 nargs=0,
657 action=DeprecateAction,
658 help="DEPRECATED. No longer supported",
659)
660deprecated_group.add_argument(
661 "-Z",
662 "--make-tmp-file",
663 nargs=0,
664 action=DeprecateAction,
665 help="DEPRECATED. Default behaviour",
666)
667deprecated_group.add_argument(
668 "-v",
669 "--sft-version",
670 nargs=0,
671 action=DeprecateAction,
672 help="DEPRECATED. No longer supported",
673)
674deprecated_group.add_argument(
675 "-S",
676 "--use-single",
677 nargs=0,
678 action=DeprecateAction,
679 help="DEPRECATED. No longer supported",
680)
681deprecated_group.add_argument(
682 "-q",
683 "--list-of-nodes",
684 type=str,
685 action=DeprecateAction,
686 help="DEPCRECATED. No longer supported",
687)
688deprecated_group.add_argument(
689 "-Q",
690 "--node-path",
691 type=Path,
692 action=DeprecateAction,
693 help="DEPCRECATED. No longer supported",
694)
695deprecated_group.add_argument(
696 "-r",
697 "--output-jobs-per-node",
698 type=int,
699 default=0,
700 action=DeprecateAction,
701 help="DEPRECATED. No longer supported",
702)
703
704args = parser.parse_args()
705
706# Some basic argument value checking
707if args.observing_run < 0:
708 raise parser.error("--observing-run must be >= 0")
709
710if args.observing_run > 0 and not args.observing_kind:
711 raise parser.error("--observing-run requires --observing-kind")
712
713if args.observing_run > 0 and not args.observing_revision:
714 raise parser.error("--observing-run requires --observing-revision")
715
716if args.observing_revision and args.observing_revision <= 0:
717 raise parser.error("--observing-revision must be > 0")
718
719if args.observing_run > 0 and args.misc_desc:
720 raise parser.error(
721 f"--observing-run={args.observing_run} incompatible with --misc-desc"
722 )
723
724if args.misc_desc and not re.compile(r"^[A-Za-z0-9]+$").match(args.misc_desc):
725 raise parser.error("--misc-desc may only contain A-Z, a-z, 0-9 characters")
726
727if args.extra_datafind_time < 0:
728 raise parser.error("--extra-datafind-time must be >= 0")
729
730if args.filter_knee_freq < 0:
731 raise parser.error("--filter-knee-freq must be >= 0")
732
733if args.time_baseline <= 0:
734 raise parser.error("--time-baseline must be > 0")
735
736if args.overlap_fraction < 0.0 or args.overlap_fraction >= 1.0:
737 raise parser.error("--overlap-fraction must be in the range [0,1)")
738
739if args.start_freq < 0.0 or args.start_freq >= 7192.0:
740 raise parser.error("--start-freq must be in the range [0,7192)")
741
742if args.band <= 0 or args.band >= 8192.0:
743 raise parser.error("--band must be in the range (0,8192)")
744
745if args.start_freq + args.band >= 8192.0:
746 raise parser.error("--start-freq + --band must be < 8192")
747
748if args.max_num_per_node <= 0:
749 raise parser.error("--max-num-per-node must be > 0")
750
751if (
752 len(args.channel_name) != len(args.output_sft_path)
753 and len(args.output_sft_path) != 1
754):
755 raise parser.error(
756 "--channel-name and --output-sft-path must be the "
757 "same length or --output-sft-path must be length of 1"
758 )
759
760if len(args.channel_name) > 1 and args.observing_run == 0:
761 raise parser.error(
762 "When creating SFTs from multiple channels, public SFT naming "
763 "convention must be used: --observing-run > 0 and set "
764 "--observing-kind and --observing-revision"
765 )
766
767if args.datafind_urltype == "osdf" and not args.transfer_frame_files:
768 raise parser.error(
769 "--transfer-frame-files must be specified when --datafind-urltype=osdf"
770 )
771
772# Set executables for lalpulsar_MakeSFTs, and lalpulsar_MoveSFTs
773makeSFTsExe = "lalpulsar_MakeSFTs"
774if args.makesfts_path:
775 if args.makesfts_path.is_file():
776 makeSFTsExe = args.makesfts_path
777 else:
778 makeSFTsExe = args.makesfts_path / makeSFTsExe
779elif "MAKESFTS_PATH" in os.environ:
780 makeSFTsExe = Path("$ENV(MAKESFTS_PATH)") / makeSFTsExe
781else:
782 makeSFTsExe = Path("@LALSUITE_BINDIR@") / makeSFTsExe
783
784moveSFTsExe = "lalpulsar_MoveSFTs"
785if args.movesfts_path:
786 if args.movesfts_path.is_file():
787 moveSFTsExe = args.movesfts_path
788 else:
789 moveSFTsExe = args.movesfts_path / moveSFTsExe
790elif "MOVESFTS_PATH" in os.environ:
791 moveSFTsExe = Path("$ENV(MOVESFTS_PATH)") / moveSFTsExe
792else:
793 moveSFTsExe = Path("@LALSUITE_BINDIR@") / moveSFTsExe
794
795# make directories to store the cache files, job logs, and SFTs
796args.log_path.mkdir(exist_ok=True)
797args.cache_path.mkdir(exist_ok=True)
798for p in args.output_sft_path:
799 p.mkdir(exist_ok=True)
800
801# Check if segment file was given, else set up one segment from the command line
802segList = segmentlist()
803adjustSegExtraTime = False
804if args.segment_file is not None:
805 if args.min_seg_length < 0:
806 raise parser.error("--min-seg-length must be >= 0")
807
808 # the next flag causes extra time that cannot be processes to be trimmed
809 # from the start and end of a segment
810 adjustSegExtraTime = True
811
812 with open(args.segment_file) as fp_segfile:
813 for idx, line in enumerate(fp_segfile):
814 splitLine = line.split()
815 oneSeg = segment(int(splitLine[0]), int(splitLine[1]))
816 if abs(oneSeg) >= args.min_seg_length:
817 segList.append(oneSeg)
818
819 if len(segList) < 1:
820 raise ValueError(f"No segments found in segment file: {args.segment_file}")
821else:
822 if args.analysis_start_time is None:
823 raise parser.error(
824 "--analysis-start-time must be specified if no segment file is " "given"
825 )
826
827 if args.analysis_end_time is None:
828 raise parser.error(
829 "--analysis-start-time must be specified if no segment file is " "given"
830 )
831
832 if args.max_length_all_jobs is None:
833 raise parser.error(
834 "--max-length-all-jobs must be specified if no segment file is " "given"
835 )
836
837 # Make sure not to exceed maximum allow analysis
838 if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
839 args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
840
841 oneSeg = segment(args.analysis_start_time, args.analysis_end_time)
842 segList.append(oneSeg)
843# END if (args.segment_file != None)
844segList.coalesce()
845
846# Get the IFO site, which is the first letter of the channel name.
847site = args.channel_name[0][0]
848
849# Get the frame file URL list
850urls = get_urls(args)
851
852# Basic check that the frame file url list are traditionally visible on EPs
853if not args.transfer_frame_files:
854 for f in urls:
855 if "/home" in f:
856 raise parser.error(
857 "--transfer-frame-files must be specified when frame files are in /home"
858 )
859
860# data segments created from the list of frame URLs
861dataSegs = segmentlist()
862for url in urls:
863 dataSegs.append(file_segment(url))
864dataSegs.coalesce()
865
866# intersection of segList with dataSegs
867segList &= dataSegs
868segList.coalesce() # just in case
869
870# initialize count of nodes
871nodeCount = 0
872
873# Create .sub files
874path_to_dag_file = args.dag_file.parent
875dag_filename = args.dag_file.name
876makesfts_sub = path_to_dag_file / "MakeSFTs.sub"
877movesfts_sub = path_to_dag_file / "MoveSFTs.sub"
878
879# create MakeSFTs.sub
880with open(makesfts_sub, "w") as MakeSFTsFID:
881 MakeSFTsLogFile = f"{args.log_path}/MakeSFTs_{dag_filename}.log"
882 MakeSFTsFID.write("universe = vanilla\n")
883 MakeSFTsFID.write(f"executable = {makeSFTsExe}\n")
884 MakeSFTsFID.write("arguments = $(argList)\n")
885 MakeSFTsFID.write(f"accounting_group = {args.accounting_group}\n")
886 MakeSFTsFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
887 MakeSFTsFID.write(f"log = {MakeSFTsLogFile}\n")
888 MakeSFTsFID.write(f"error = {args.log_path}/MakeSFTs_$(tagstring).err\n")
889 MakeSFTsFID.write(f"output = {args.log_path}/MakeSFTs_$(tagstring).out\n")
890 MakeSFTsFID.write("notification = never\n")
891 MakeSFTsFID.write(f"request_memory = {args.request_memory}MB\n")
892 MakeSFTsFID.write(f"request_disk = {args.request_disk}MB\n")
893 MakeSFTsFID.write("RequestCpus = 1\n")
894 MakeSFTsFID.write("should_transfer_files = yes\n")
895 if args.transfer_frame_files:
896 MakeSFTsFID.write("transfer_input_files = $(cachefile),$(framefiles)\n")
897 else:
898 MakeSFTsFID.write("transfer_input_files = $(cachefile)\n")
899 if "MAKESFTS_PATH" in os.environ and not args.makesfts_path:
900 MakeSFTsFID.write("getenv = MAKESFTS_PATH\n")
901 if args.datafind_urltype == "osdf":
902 MakeSFTsFID.write("use_oauth_services = scitokens\n")
903 MakeSFTsFID.write(
904 "environment = BEARER_TOKEN_FILE=$$(CondorScratchDir)/.condor_creds/scitokens.use\n"
905 )
906 MakeSFTsFID.write("queue 1\n")
907
908# create MoveSFTs.sub
909with open(movesfts_sub, "w") as MoveSFTsFID:
910 MoveSFTsLogFile = f"{args.log_path}/MoveSFTs_{dag_filename}.log"
911 MoveSFTsFID.write("universe = local\n")
912 MoveSFTsFID.write(f"executable = {moveSFTsExe}\n")
913 MoveSFTsFID.write("arguments = ")
914 if not args.validate:
915 MoveSFTsFID.write("$(opts) ")
916 MoveSFTsFID.write("-s $(sourcedirectory) -c $(channels) -d $(destdirectory)\n")
917 MoveSFTsFID.write(f"accounting_group = {args.accounting_group}\n")
918 MoveSFTsFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
919 MoveSFTsFID.write(f"log = {MoveSFTsLogFile}\n")
920 MoveSFTsFID.write(f"error = {args.log_path}/MoveSFTs.err\n")
921 MoveSFTsFID.write(f"output = {args.log_path}/MoveSFTs.out\n")
922 MoveSFTsFID.write("notification = never\n")
923 MoveSFTsFID.write(f"request_memory = 1GB\n")
924 MoveSFTsFID.write(f"request_disk = 10MB\n")
925 MoveSFTsFID.write("RequestCpus = 1\n")
926 if "MOVESFTS_PATH" in os.environ and not args.movesfts_path:
927 MoveSFTsFID.write("getenv = MOVESFTS_PATH\n")
928 MoveSFTsFID.write("queue 1\n")
929
930# create the DAG file with the jobs to run
931with open(args.dag_file, "w") as dagFID:
932 startTimeAllNodes = None
933 firstSFTstartTime = 0 # need this for the synchronized start option
934
935 # Loop over the segment list to generate the SFTs for each segment
936 for seg in segList:
937 # Each segment in the segList runs on one or more nodes;
938 # initialize the number SFTs produced by the current node:
939 numThisNode = 0
940 numThisSeg = 0
941
942 # Case 1: a segment file was given but the SFTs do not need their
943 # start times to be synchronized
944 if adjustSegExtraTime and not args.synchronize_start:
945 segStartTime = seg[0]
946 segEndTime = seg[1]
947
948 # First we figure out how much extra time is in the segment so that
949 # SFTs are fit within the segment:
950 # |..<SFT><SFT><SFT>..|
951 # where the .. represent the extra time in the segment
952 # The amount of extra time in a segment is given as the remainder
953 # of (total segment time) / (SFT time baseline)
954 segExtraTime = (segEndTime - segStartTime) % args.time_baseline
955
956 # If there is overlap of SFTs requested, then we compute the extra
957 # time as:
958 # the remainder of (end - start - Tsft) / (non-overlap time)
959 # provided there was at least one SFT that is in the segment
960 if args.overlap_fraction != 0.0:
961 if (segEndTime - segStartTime) > args.time_baseline:
962 segExtraTime = (
963 segEndTime - segStartTime - args.time_baseline
964 ) % int((1.0 - args.overlap_fraction) * args.time_baseline)
965
966 # We'll add half the extra time to the start of the SFTs to be
967 # created in this segment and half at the end
968 segExtraStart = int(segExtraTime / 2)
969 segExtraEnd = segExtraTime - segExtraStart
970 args.analysis_start_time = segStartTime + segExtraStart
971
972 # This shift may have pushed past the end time of the segment. In
973 # that case, just fix the start time to the end time of the segment
974 if args.analysis_start_time > segEndTime:
975 args.analysis_start_time = segEndTime
976
977 # shifting the end time by the other portion of the extra time
978 # amount ...
979 args.analysis_end_time = segEndTime - segExtraEnd
980
981 # Again, this shift could have pushed the end time beyond the start
982 # of the segment, so just fix the end time to the segment start
983 if args.analysis_end_time < segStartTime:
984 args.analysis_end_time = segStartTime
985
986 # Case 2: SFTs need a synchronized start. This is a special case for
987 # methods like TwoSpect, where signal periodicity spacing must be
988 # maintained
989 elif args.synchronize_start:
990 segStartTime = seg[0]
991 segEndTime = seg[1]
992
993 # If we haven't set the first SFT start time, then set it equal to
994 # the start time of the first segment
995 if firstSFTstartTime == 0:
996 firstSFTstartTime = segStartTime
997
998 # This is a tricky bit of math to set the start time based on when
999 # the first SFT start time of all the segments
1000 args.analysis_start_time = (
1001 int(
1002 round(
1003 math.ceil(
1004 (segStartTime - firstSFTstartTime)
1005 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1006 )
1007 * (1.0 - args.overlap_fraction)
1008 * args.time_baseline
1009 )
1010 )
1011 + firstSFTstartTime
1012 )
1013
1014 # This shift may have pushed past the end time of the segment. In
1015 # that case, just fix the start time to the end time of the segment
1016 if args.analysis_start_time > segEndTime:
1017 args.analysis_start_time = segEndTime
1018
1019 # This is a tricky bit of math to set the end time based on when
1020 # the first SFT start time of all the segments
1021 args.analysis_end_time = (
1022 int(
1023 round(
1024 math.floor(
1025 (segEndTime - args.analysis_start_time - args.time_baseline)
1026 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1027 )
1028 * (1.0 - args.overlap_fraction)
1029 * args.time_baseline
1030 )
1031 )
1032 + args.time_baseline
1033 + args.analysis_start_time
1034 )
1035
1036 # Again, this shift could have pushed the end time beyond the start
1037 # of the segment, so just fix the end time to the segment start
1038 if args.analysis_end_time < segStartTime:
1039 args.analysis_end_time = segStartTime
1040
1041 # If no segment file given and no synchronized starts, just set the
1042 # start time and end time to the segment start and end
1043 else:
1044 args.analysis_start_time = seg[0]
1045 args.analysis_end_time = seg[1]
1046
1047 # Loop through the analysis time; make sure no more than
1048 # args.max_num_per_node SFTs are produced by any one node
1049 startTimeThisNode = args.analysis_start_time
1050 endTimeThisNode = args.analysis_start_time
1051 endTimeAllNodes = args.analysis_start_time
1052 while endTimeAllNodes < args.analysis_end_time:
1053 # increment endTimeAllNodes by the args.time_baseline until we get
1054 # past the args.analysis_end_time
1055 if args.overlap_fraction != 0.0:
1056 # handle overlap
1057 if numThisSeg == 0:
1058 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1059 else:
1060 endTimeAllNodes = endTimeAllNodes + int(
1061 (1.0 - args.overlap_fraction) * args.time_baseline
1062 )
1063 else:
1064 # default case, no overlap
1065 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1066 if endTimeAllNodes <= args.analysis_end_time:
1067 # increment the number of SFTs output from this node, and
1068 # update the end time this node.
1069 numThisNode = numThisNode + 1
1070 numThisSeg = numThisSeg + 1
1071 endTimeThisNode = endTimeAllNodes
1072 if numThisNode < args.max_num_per_node:
1073 continue
1074 else:
1075 # write jobs to dag for this node
1076 nodeCount = nodeCount + 1
1077
1078 if nodeCount == 1:
1079 startTimeAllNodes = startTimeThisNode
1080 writeToDag(
1081 dagFID,
1082 nodeCount,
1083 startTimeThisNode,
1084 endTimeThisNode,
1085 urls,
1086 args,
1087 )
1088 # Update for next node
1089 numThisNode = 0
1090 if args.overlap_fraction != 0.0:
1091 # handle overlap
1092 startTimeThisNode = endTimeThisNode - int(
1093 (args.overlap_fraction) * args.time_baseline
1094 )
1095 else:
1096 # default case, no overlap
1097 startTimeThisNode = endTimeThisNode
1098 else:
1099 # we are at or past the args.analysis_end_time; output job for last
1100 # node if needed.
1101 if numThisNode > 0:
1102 # write jobs to dag for this node
1103 nodeCount = nodeCount + 1
1104
1105 if nodeCount == 1:
1106 startTimeAllNodes = startTimeThisNode
1107 writeToDag(
1108 dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args
1109 )
1110 # END while (endTimeAllNodes < args.analysis_end_time)
1111 # END for seg in segList
1112
1113 # Write the move SFTs job to the DAG
1114 # Move SFTs
1115 dagFID.write(f"JOB MoveSFTs {Path(dagFID.name).parent / 'MoveSFTs.sub'}\n")
1116 dagFID.write(f"RETRY MoveSFTs 1\n")
1117 dagFID.write(f"VARS MoveSFTs ")
1118 if not args.validate:
1119 dagFID.write('opts="--no-validate" ')
1120 dagFID.write(
1121 f'sourcedirectory="." '
1122 f"channels=\"{' '.join(args.channel_name)}\" "
1123 f"destdirectory=\"{' '.join([str(p) for p in args.output_sft_path])}\"\n"
1124 )
1125 dagFID.write(
1126 f"PARENT {' '.join([f'MakeSFTs_{n}' for n in range(1, nodeCount+1)])} CHILD MoveSFTs\n"
1127 )
1128
1129# Close the DAG file
1130
1131# Update actual end time of the last job and print out the times all jobs will run on:
1132endTimeAllNodes = endTimeThisNode
1133
1134if startTimeAllNodes is None:
1135 raise Exception("The startTimeAllNodes == none; the DAG file contains no jobs!")
1136
1137if endTimeAllNodes <= startTimeAllNodes:
1138 raise Exception(
1139 "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
1140 )
1141
1142print(startTimeAllNodes, endTimeAllNodes)
def __call__(self, parser, namespace, values, option_string=None)
def make_cache(urls, job_seg)
Make a frame list and cache list from a list of URLs.
def sft_name_from_vars(obs, gpsstart, Tsft, channel=None, kind=None, rev=None, window="unknown", par=None, miscstr=None)
Create SFT file name from specification.
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args)
Write one job to DAG file.
def get_urls(args)
Get frame file URL list from gwdatafind or cache file.