23"""Creates DAGs to run jobs that generates SFTs"""
29from pathlib
import Path
30from urllib.parse
import urlparse
32from gwdatafind
import find_urls
33from gwdatafind.utils
import filename_metadata, file_segment
35from igwn_segments
import segment, segmentlist
37from lalpulsar
import (
40 FillSFTFilenameSpecStrings,
41 BuildSFTFilenameFromSpec,
44__author__ =
"Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
45__version__ = git_version.id
46__date__ = git_version.date
48cache_re = re.compile(
r"^([A-Z])(\s+)(\w+)(\s+)(\d+)(\s+)(\d+)(\s+)(.+gwf)")
113 """Create SFT file name from specification"""
115 spec = SFTFilenameSpec()
117 FillSFTFilenameSpecStrings(
121 detector=channel[:2],
127 spec.pubObsRun = obs
or 0
128 spec.pubRevision = rev
or 0
129 spec.window_param = par
or 0
131 spec.SFTtimebase = Tsft
132 spec.gpsStart = gpsstart
135 return BuildSFTFilenameFromSpec(spec)
139 """Get frame file URL list from gwdatafind or cache file"""
141 if not args.cache_file:
144 args.input_data_type,
147 match=args.datafind_match,
148 urltype=args.datafind_urltype,
152 with open(args.cache_file,
"r")
as f:
154 m = cache_re.match(line)
156 framefile = m.group(9)
157 urls.append(framefile)
160 sorted_urls = sorted(urls, key=
lambda x: file_segment(x)[0])
169 """Make a frame list and cache list from a list of URLs"""
173 for idx, url
in enumerate(urls):
174 obs, desc, dataseg = filename_metadata(url)
175 dataseg = segment(dataseg)
176 if dataseg.disjoint(job_seg) < 0:
178 if dataseg.disjoint(job_seg) > 0:
180 if dataseg.intersects(job_seg):
181 framefileurl = urlparse(url)
182 framefilepath = Path(framefileurl.path)
186 if "/home" in str(framefilepath.parent)
or "osdf" in framefileurl.scheme:
188 f
"{obs}\t{desc}\t{dataseg[0]}\t{abs(dataseg)}\t{framefilepath.name}"
191 newcache = f
"{obs}\t{desc}\t{dataseg[0]}\t{abs(dataseg)}\t{url}"
192 cache.append(newcache)
194 if "/home" in str(framefilepath.parent):
195 frames.append(framefilepath)
202def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args):
203 """Write one job to DAG file"""
205 MakeSFTs = f
"MakeSFTs_{nodeCount}"
206 tagStringOut = f
"{args.tag_string}_{nodeCount}"
208 job_segment = segment(
209 startTimeThisNode - args.extra_datafind_time,
210 endTimeThisNode + args.extra_datafind_time,
215 obs, desc, dataseg = filename_metadata(urls[0])
216 cacheFile = args.cache_path / f
"{obs}-{job_segment[0]}-{job_segment[1]}.cache"
217 with open(cacheFile,
"w")
as f:
222 argList.append(f
"-O {args.observing_run}")
223 if args.observing_run > 0:
224 argList.append(f
"-K {args.observing_kind}")
225 argList.append(f
"-R {args.observing_revision}")
227 argList.append(f
"-X {args.misc_desc}")
228 argList.append(f
"-f {args.filter_knee_freq}")
229 argList.append(f
"-t {args.time_baseline}")
233 argList.append(f
"-p {','.join(['.' for p in args.output_sft_path])}")
237 argList.append(f
"-C {cacheFile.name}")
238 argList.append(f
"-s {startTimeThisNode}")
239 argList.append(f
"-e {endTimeThisNode}")
240 argList.append(f
"-N {','.join(args.channel_name)}")
241 argList.append(f
"-F {args.start_freq}")
242 argList.append(f
"-B {args.band}")
243 if args.comment_field:
244 argList.append(f
"-c {args.comment_field}")
245 if ":" in args.window_type:
246 window_type, window_param = args.window_type.split(
":")
247 window_param =
float(window_param)
248 argList.append(f
"-w {window_type} -r {window_param}")
250 window_type = args.window_type
252 argList.append(f
"-w {window_type}")
253 if args.overlap_fraction:
254 argList.append(f
"-P {args.overlap_fraction}")
255 if args.allow_skipping:
256 argList.append(
"--allow-skipping TRUE")
257 argStr =
" ".join(argList)
263 sft_start = startTimeThisNode
264 sft_end = sft_start + args.time_baseline
266 while sft_end <= endTimeThisNode:
268 for idx, c
in enumerate(args.channel_name):
274 kind=args.observing_kind,
275 rev=args.observing_revision,
278 miscstr=args.misc_desc,
280 outputfiles.append(filename)
281 remap.append(f
"{filename}={args.output_sft_path[idx]/filename}")
284 if args.overlap_fraction:
285 sft_start +=
int(round((1 - args.overlap_fraction) * args.time_baseline))
287 sft_start += args.time_baseline
288 sft_end = sft_start + args.time_baseline
291 dagFID.write(f
"JOB {MakeSFTs} {Path(dagFID.name).parent / 'MakeSFTs.sub'}\n")
292 dagFID.write(f
"RETRY {MakeSFTs} 1\n")
293 dagFID.write(f
'VARS {MakeSFTs} argList="{argStr}" cachefile="{cacheFile}" ')
294 if args.transfer_frame_files:
295 framefiles =
",".join([
str(fr)
for fr
in frames])
296 dagFID.write(f
'framefiles="{framefiles}" ')
297 dagFID.write(f
'tagstring="{tagStringOut}"\n')
304parser = argparse.ArgumentParser(
305 description=
"This script creates MakeSFTs.sub, MoveSFTs.sub, and a dag \
306 file that generates SFTs based on the options given.",
307 fromfile_prefix_chars=
"@",
310dag_group = parser.add_argument_group(
311 "DAG organization",
"Options for workflow control"
313datafind_group = parser.add_argument_group(
314 "Datafind",
"Options for locating frame files"
316makesfts_group = parser.add_argument_group(
317 "SFT creation",
"Options for SFT creation and output"
319deprecated_group = parser.add_argument_group(
"DEPRECATED")
321dag_group.add_argument(
326 help=
"filename for .dag file (should end in .dag)",
328dag_group.add_argument(
333 help=
"tag string used in names of various files unique to \
334 jobs that will run under the DAG",
336dag_group.add_argument(
338 "--analysis-start-time",
340 help=
"GPS start time of data from which to generate \
341 SFTs (optional and unused if a segment file is given)",
343dag_group.add_argument(
345 "--analysis-end-time",
347 help=
"GPS end time of data from which to generate SFTs \
348 (optional and unused if a segment file is given)",
350dag_group.add_argument(
352 "--max-length-all-jobs",
354 help=
"maximum total amount of data to process, in seconds \
355 (optional and unused if a segment file is given)",
357dag_group.add_argument(
361 help=
"alternative file with segments to use, rather than \
364dag_group.add_argument(
369 help=
"minimum length segments to process in seconds (used \
370 only if a segment file is given)",
372dag_group.add_argument(
374 "--synchronize-start",
376 help=
"synchronize the start times of the SFTs so that the \
377 start times are synchronized when there are gaps in the \
380dag_group.add_argument(
385 help=
"path to log, output, and error files (default \
386 is $PWD/logs; this directory is created if it does not \
387 exist and usually should be under a local file system)",
389dag_group.add_argument(
391 "--max-num-per-node",
394 help=
"maximum number of SFTs to generate on one node",
396dag_group.add_argument(
400 help=
"string specifying the lalpulsar_MakeSFTs executable, \
401 or a path to it; if not set, will use \
402 MAKESFTS_PATH env variable or system default (in that \
405dag_group.add_argument(
408 help=
"string specifying the lalpulsar_MoveSFTs executable, \
409 or a path to it; if not set, will use \
410 MOVESFTS_PATH env variable or system default (in that \
413dag_group.add_argument(
418 help=
"memory allocation in MB to request from condor for \
419 lalpulsar_MakeSFTs step",
421dag_group.add_argument(
426 help=
"disk space allocation in MB to request from condor \
427 for lalpulsar_MakeSFTs step",
429dag_group.add_argument(
431 "--accounting-group",
434 help=
"Condor tag for the production of SFTs",
436dag_group.add_argument(
438 "--accounting-group-user",
441 help=
"albert.einstein username (do not add @LIGO.ORG)",
443dag_group.add_argument(
445 "--transfer-frame-files",
447 help=
"Transfer frame files via HTCondor file transfer system. \
448 This should be specified if frames are not visible to the \
449 compute node file system. Ex. this should be specified if \
450 frames are on /home or running on the open science grid. \
451 Usually frame files are visible on CIT, LHO, LLO clusters \
452 so that this does not need to be specified in that case.",
455datafind_group.add_argument(
460 help=
"input data type for use with the gw_data_find --type \
463datafind_group.add_argument(
465 "--extra-datafind-time",
468 help=
"extra time to subtract/add from/to start/end time \
469 arguments of gw_data_find",
471datafind_group.add_argument(
475 help=
"string to use with the gw_data_find --match option",
477datafind_group.add_argument(
478 "--datafind-urltype",
481 choices=[
"file",
"osdf"],
482 help=
"String for the gw_data_find --urltype option. \
483 Use 'file' if creating SFTs on a local LDG cluster. \
484 Use 'osdf' if creating SFTs on the open science grid",
486datafind_group.add_argument(
490 help=
"path and filename to frame cache file to use instead \
494makesfts_group.add_argument(
499 help=
"For public SFTs, observing run data the SFTs are generated from, or \
500 (in the case of mock data challenge data) the observing \
501 run on which the data is most closely based",
503makesfts_group.add_argument(
507 choices=[
"RUN",
"AUX",
"SIM",
"DEV"],
508 help=
'For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
509 "AUX" for SFTs of non-h(t) channels; \
510 "SIM" for mock data challenge or other simulated data; or \
511 "DEV" for development/testing purposes',
513makesfts_group.add_argument(
515 "--observing-revision",
517 help=
"For public SFTs: revision number starts at 1, and should be incremented once \
518 SFTs have been widely distributed across clusters, advertised \
519 as being ready for use, etc. For example, if mistakes are found \
520 in the initial SFT production run after they have been published, \
521 regenerated SFTs should have a revision number of at least 2",
523makesfts_group.add_argument(
527 help=
"For private SFTs, miscellaneous part of the SFT \
528 description field in the filename",
530makesfts_group.add_argument(
532 "--filter-knee-freq",
535 help=
"high pass filter knee frequency used on time domain \
536 data before generating SFTs",
538makesfts_group.add_argument(
543 help=
"time baseline of SFTs (e.g., 60 or 1800 seconds)",
545makesfts_group.add_argument(
550 help=
"Path where to save the SFT files. Can specify multiple options, \
551 If specifying multiple options then it is required to specify the \
552 same number of output-sft-path options as the number of channels. \
553 The first listed channel will have the SFTs go into the first \
554 listed output-sft-path. Otherwise specify only one output path. \
555 If one path is specified and more than 1 channels are specified \
556 then --observing-run must be >= 1 and --observing-kind and \
557 --observing-revision must be set",
559makesfts_group.add_argument(
564 help=
"path to cache files that will be produced by \
565 gw_data_find (default is $PWD/cache; this directory is \
566 created if it does not exist and must agree with that \
567 given in .sub files)",
569makesfts_group.add_argument(
574 help=
"Name of input time-domain channel to read from frames. \
575 Can specify multiple options. The number of channels must be \
576 equal to the number of output-sft-path options given. The \
577 first listed channel will have the SFTs go to the first listed \
578 output-sft-path. Can only specify one channel when generating \
579 private SFTs (--observing-run=0)",
581makesfts_group.add_argument(
582 "-c",
"--comment-field", type=str, help=
"comment for SFT header"
584makesfts_group.add_argument(
585 "-F",
"--start-freq", type=int, default=10, help=
"start frequency of the SFTs"
587makesfts_group.add_argument(
588 "-B",
"--band", type=int, default=1990, help=
"frequency band of the SFTs"
590makesfts_group.add_argument(
594 default=
"tukey:0.001",
595 help=
'type of windowing of time-domain to do \
596 before generating SFTs, e.g. "rectangular", \
597 "hann", "tukey:<beta in [0,1], required>"; \
598 (default is "tukey:0.001", standard choice for LVK production SFTs)',
600makesfts_group.add_argument(
602 "--overlap-fraction",
605 help=
"overlap fraction (for use with windows; e.g., use \
606 --overlap-fraction 0.5 with --window-type hann windows)",
608makesfts_group.add_argument(
611 help=
"allow channels to be skipped if not in frames or too low sampling \
614makesfts_group.add_argument(
617 action=
"store_false",
618 help=
"do not validate created SFTs",
623 def __call__(self, parser, namespace, values, option_string=None):
625 f
"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
629deprecated_group.add_argument(
631 "--frame-struct-type",
633 action=DeprecateAction,
634 help=
"DEPRECATED. No longer required; \
635 the frame channel type is determined automatically",
637deprecated_group.add_argument(
641 action=DeprecateAction,
642 help=
"DEPRECATED. No longer required; \
643 the frame channel type is determined automatically",
645deprecated_group.add_argument(
649 action=DeprecateAction,
650 help=
"DEPRECATED. No longer required; \
651 the detector prefix is deduced from the channel name",
653deprecated_group.add_argument(
657 action=DeprecateAction,
658 help=
"DEPRECATED. No longer supported",
660deprecated_group.add_argument(
664 action=DeprecateAction,
665 help=
"DEPRECATED. Default behaviour",
667deprecated_group.add_argument(
671 action=DeprecateAction,
672 help=
"DEPRECATED. No longer supported",
674deprecated_group.add_argument(
678 action=DeprecateAction,
679 help=
"DEPRECATED. No longer supported",
681deprecated_group.add_argument(
685 action=DeprecateAction,
686 help=
"DEPCRECATED. No longer supported",
688deprecated_group.add_argument(
692 action=DeprecateAction,
693 help=
"DEPCRECATED. No longer supported",
695deprecated_group.add_argument(
697 "--output-jobs-per-node",
700 action=DeprecateAction,
701 help=
"DEPRECATED. No longer supported",
704args = parser.parse_args()
707if args.observing_run < 0:
708 raise parser.error(
"--observing-run must be >= 0")
710if args.observing_run > 0
and not args.observing_kind:
711 raise parser.error(
"--observing-run requires --observing-kind")
713if args.observing_run > 0
and not args.observing_revision:
714 raise parser.error(
"--observing-run requires --observing-revision")
716if args.observing_revision
and args.observing_revision <= 0:
717 raise parser.error(
"--observing-revision must be > 0")
719if args.observing_run > 0
and args.misc_desc:
721 f
"--observing-run={args.observing_run} incompatible with --misc-desc"
724if args.misc_desc
and not re.compile(
r"^[A-Za-z0-9]+$").match(args.misc_desc):
725 raise parser.error(
"--misc-desc may only contain A-Z, a-z, 0-9 characters")
727if args.extra_datafind_time < 0:
728 raise parser.error(
"--extra-datafind-time must be >= 0")
730if args.filter_knee_freq < 0:
731 raise parser.error(
"--filter-knee-freq must be >= 0")
733if args.time_baseline <= 0:
734 raise parser.error(
"--time-baseline must be > 0")
736if args.overlap_fraction < 0.0
or args.overlap_fraction >= 1.0:
737 raise parser.error(
"--overlap-fraction must be in the range [0,1)")
739if args.start_freq < 0.0
or args.start_freq >= 7192.0:
740 raise parser.error(
"--start-freq must be in the range [0,7192)")
742if args.band <= 0
or args.band >= 8192.0:
743 raise parser.error(
"--band must be in the range (0,8192)")
745if args.start_freq + args.band >= 8192.0:
746 raise parser.error(
"--start-freq + --band must be < 8192")
748if args.max_num_per_node <= 0:
749 raise parser.error(
"--max-num-per-node must be > 0")
752 len(args.channel_name) != len(args.output_sft_path)
753 and len(args.output_sft_path) != 1
756 "--channel-name and --output-sft-path must be the "
757 "same length or --output-sft-path must be length of 1"
760if len(args.channel_name) > 1
and args.observing_run == 0:
762 "When creating SFTs from multiple channels, public SFT naming "
763 "convention must be used: --observing-run > 0 and set "
764 "--observing-kind and --observing-revision"
767if args.datafind_urltype ==
"osdf" and not args.transfer_frame_files:
769 "--transfer-frame-files must be specified when --datafind-urltype=osdf"
773makeSFTsExe =
"lalpulsar_MakeSFTs"
774if args.makesfts_path:
775 if args.makesfts_path.is_file():
776 makeSFTsExe = args.makesfts_path
778 makeSFTsExe = args.makesfts_path / makeSFTsExe
779elif "MAKESFTS_PATH" in os.environ:
780 makeSFTsExe = Path(
"$ENV(MAKESFTS_PATH)") / makeSFTsExe
782 makeSFTsExe = Path(
"@LALSUITE_BINDIR@") / makeSFTsExe
784moveSFTsExe =
"lalpulsar_MoveSFTs"
785if args.movesfts_path:
786 if args.movesfts_path.is_file():
787 moveSFTsExe = args.movesfts_path
789 moveSFTsExe = args.movesfts_path / moveSFTsExe
790elif "MOVESFTS_PATH" in os.environ:
791 moveSFTsExe = Path(
"$ENV(MOVESFTS_PATH)") / moveSFTsExe
793 moveSFTsExe = Path(
"@LALSUITE_BINDIR@") / moveSFTsExe
796args.log_path.mkdir(exist_ok=
True)
797args.cache_path.mkdir(exist_ok=
True)
798for p
in args.output_sft_path:
799 p.mkdir(exist_ok=
True)
802segList = segmentlist()
803adjustSegExtraTime =
False
804if args.segment_file
is not None:
805 if args.min_seg_length < 0:
806 raise parser.error(
"--min-seg-length must be >= 0")
810 adjustSegExtraTime =
True
812 with open(args.segment_file)
as fp_segfile:
813 for idx, line
in enumerate(fp_segfile):
814 splitLine = line.split()
815 oneSeg = segment(
int(splitLine[0]),
int(splitLine[1]))
816 if abs(oneSeg) >= args.min_seg_length:
817 segList.append(oneSeg)
820 raise ValueError(f
"No segments found in segment file: {args.segment_file}")
822 if args.analysis_start_time
is None:
824 "--analysis-start-time must be specified if no segment file is " "given"
827 if args.analysis_end_time
is None:
829 "--analysis-start-time must be specified if no segment file is " "given"
832 if args.max_length_all_jobs
is None:
834 "--max-length-all-jobs must be specified if no segment file is " "given"
838 if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
839 args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
841 oneSeg = segment(args.analysis_start_time, args.analysis_end_time)
842 segList.append(oneSeg)
847site = args.channel_name[0][0]
853if not args.transfer_frame_files:
857 "--transfer-frame-files must be specified when frame files are in /home"
861dataSegs = segmentlist()
863 dataSegs.append(file_segment(url))
874path_to_dag_file = args.dag_file.parent
875dag_filename = args.dag_file.name
876makesfts_sub = path_to_dag_file /
"MakeSFTs.sub"
877movesfts_sub = path_to_dag_file /
"MoveSFTs.sub"
880with open(makesfts_sub,
"w")
as MakeSFTsFID:
881 MakeSFTsLogFile = f
"{args.log_path}/MakeSFTs_{dag_filename}.log"
882 MakeSFTsFID.write(
"universe = vanilla\n")
883 MakeSFTsFID.write(f
"executable = {makeSFTsExe}\n")
884 MakeSFTsFID.write(
"arguments = $(argList)\n")
885 MakeSFTsFID.write(f
"accounting_group = {args.accounting_group}\n")
886 MakeSFTsFID.write(f
"accounting_group_user = {args.accounting_group_user}\n")
887 MakeSFTsFID.write(f
"log = {MakeSFTsLogFile}\n")
888 MakeSFTsFID.write(f
"error = {args.log_path}/MakeSFTs_$(tagstring).err\n")
889 MakeSFTsFID.write(f
"output = {args.log_path}/MakeSFTs_$(tagstring).out\n")
890 MakeSFTsFID.write(
"notification = never\n")
891 MakeSFTsFID.write(f
"request_memory = {args.request_memory}MB\n")
892 MakeSFTsFID.write(f
"request_disk = {args.request_disk}MB\n")
893 MakeSFTsFID.write(
"RequestCpus = 1\n")
894 MakeSFTsFID.write(
"should_transfer_files = yes\n")
895 if args.transfer_frame_files:
896 MakeSFTsFID.write(
"transfer_input_files = $(cachefile),$(framefiles)\n")
898 MakeSFTsFID.write(
"transfer_input_files = $(cachefile)\n")
899 if "MAKESFTS_PATH" in os.environ
and not args.makesfts_path:
900 MakeSFTsFID.write(
"getenv = MAKESFTS_PATH\n")
901 if args.datafind_urltype ==
"osdf":
902 MakeSFTsFID.write(
"use_oauth_services = scitokens\n")
904 "environment = BEARER_TOKEN_FILE=$$(CondorScratchDir)/.condor_creds/scitokens.use\n"
906 MakeSFTsFID.write(
"queue 1\n")
909with open(movesfts_sub,
"w")
as MoveSFTsFID:
910 MoveSFTsLogFile = f
"{args.log_path}/MoveSFTs_{dag_filename}.log"
911 MoveSFTsFID.write(
"universe = local\n")
912 MoveSFTsFID.write(f
"executable = {moveSFTsExe}\n")
913 MoveSFTsFID.write(
"arguments = ")
914 if not args.validate:
915 MoveSFTsFID.write(
"$(opts) ")
916 MoveSFTsFID.write(
"-s $(sourcedirectory) -c $(channels) -d $(destdirectory)\n")
917 MoveSFTsFID.write(f
"accounting_group = {args.accounting_group}\n")
918 MoveSFTsFID.write(f
"accounting_group_user = {args.accounting_group_user}\n")
919 MoveSFTsFID.write(f
"log = {MoveSFTsLogFile}\n")
920 MoveSFTsFID.write(f
"error = {args.log_path}/MoveSFTs.err\n")
921 MoveSFTsFID.write(f
"output = {args.log_path}/MoveSFTs.out\n")
922 MoveSFTsFID.write(
"notification = never\n")
923 MoveSFTsFID.write(f
"request_memory = 1GB\n")
924 MoveSFTsFID.write(f
"request_disk = 10MB\n")
925 MoveSFTsFID.write(
"RequestCpus = 1\n")
926 if "MOVESFTS_PATH" in os.environ
and not args.movesfts_path:
927 MoveSFTsFID.write(
"getenv = MOVESFTS_PATH\n")
928 MoveSFTsFID.write(
"queue 1\n")
931with open(args.dag_file,
"w")
as dagFID:
932 startTimeAllNodes =
None
933 firstSFTstartTime = 0
944 if adjustSegExtraTime
and not args.synchronize_start:
945 segStartTime = seg[0]
954 segExtraTime = (segEndTime - segStartTime) % args.time_baseline
960 if args.overlap_fraction != 0.0:
961 if (segEndTime - segStartTime) > args.time_baseline:
963 segEndTime - segStartTime - args.time_baseline
964 ) %
int((1.0 - args.overlap_fraction) * args.time_baseline)
968 segExtraStart =
int(segExtraTime / 2)
969 segExtraEnd = segExtraTime - segExtraStart
970 args.analysis_start_time = segStartTime + segExtraStart
974 if args.analysis_start_time > segEndTime:
975 args.analysis_start_time = segEndTime
979 args.analysis_end_time = segEndTime - segExtraEnd
983 if args.analysis_end_time < segStartTime:
984 args.analysis_end_time = segStartTime
989 elif args.synchronize_start:
990 segStartTime = seg[0]
995 if firstSFTstartTime == 0:
996 firstSFTstartTime = segStartTime
1000 args.analysis_start_time = (
1004 (segStartTime - firstSFTstartTime)
1005 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1007 * (1.0 - args.overlap_fraction)
1008 * args.time_baseline
1016 if args.analysis_start_time > segEndTime:
1017 args.analysis_start_time = segEndTime
1021 args.analysis_end_time = (
1025 (segEndTime - args.analysis_start_time - args.time_baseline)
1026 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1028 * (1.0 - args.overlap_fraction)
1029 * args.time_baseline
1032 + args.time_baseline
1033 + args.analysis_start_time
1038 if args.analysis_end_time < segStartTime:
1039 args.analysis_end_time = segStartTime
1044 args.analysis_start_time = seg[0]
1045 args.analysis_end_time = seg[1]
1049 startTimeThisNode = args.analysis_start_time
1050 endTimeThisNode = args.analysis_start_time
1051 endTimeAllNodes = args.analysis_start_time
1052 while endTimeAllNodes < args.analysis_end_time:
1055 if args.overlap_fraction != 0.0:
1058 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1060 endTimeAllNodes = endTimeAllNodes +
int(
1061 (1.0 - args.overlap_fraction) * args.time_baseline
1065 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1066 if endTimeAllNodes <= args.analysis_end_time:
1069 numThisNode = numThisNode + 1
1070 numThisSeg = numThisSeg + 1
1071 endTimeThisNode = endTimeAllNodes
1072 if numThisNode < args.max_num_per_node:
1076 nodeCount = nodeCount + 1
1079 startTimeAllNodes = startTimeThisNode
1090 if args.overlap_fraction != 0.0:
1092 startTimeThisNode = endTimeThisNode -
int(
1093 (args.overlap_fraction) * args.time_baseline
1097 startTimeThisNode = endTimeThisNode
1103 nodeCount = nodeCount + 1
1106 startTimeAllNodes = startTimeThisNode
1108 dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args
1115 dagFID.write(f
"JOB MoveSFTs {Path(dagFID.name).parent / 'MoveSFTs.sub'}\n")
1116 dagFID.write(f
"RETRY MoveSFTs 1\n")
1117 dagFID.write(f
"VARS MoveSFTs ")
1118 if not args.validate:
1119 dagFID.write(
'opts="--no-validate" ')
1121 f
'sourcedirectory="." '
1122 f
"channels=\"{' '.join(args.channel_name)}\" "
1123 f
"destdirectory=\"{' '.join([str(p) for p in args.output_sft_path])}\"\n"
1126 f
"PARENT {' '.join([f'MakeSFTs_{n}' for n in range(1, nodeCount+1)])} CHILD MoveSFTs\n"
1132endTimeAllNodes = endTimeThisNode
1134if startTimeAllNodes
is None:
1135 raise Exception(
"The startTimeAllNodes == none; the DAG file contains no jobs!")
1137if endTimeAllNodes <= startTimeAllNodes:
1139 "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
1142print(startTimeAllNodes, endTimeAllNodes)
DEPRECATED OPTIONS #####.
def __call__(self, parser, namespace, values, option_string=None)
def make_cache(urls, job_seg)
Make a frame list and cache list from a list of URLs.
def sft_name_from_vars(obs, gpsstart, Tsft, channel=None, kind=None, rev=None, window="unknown", par=None, miscstr=None)
Create SFT file name from specification.
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args)
Write one job to DAG file.
def get_urls(args)
Get frame file URL list from gwdatafind or cache file.