Skip to content

dags

DAG

DAG(name='workflow', formatter=None, *args, **kwargs)

Bases: DAG

Defines a DAGMan workflow including the execution graph and configuration.

Parameters:

Name Type Description Default
name str

The name of the DAG workflow, used for files written to disk and for DAG submission when calling write() and submit(). Defaults to "workflow".

'workflow'
formatter NodeNameFormatter

Defines how the node names are defined and formatted. Defaults to a hex-based formatter with 5 digits.

None
*args

Any positional arguments that htcondor.dags.DAG accepts

()
**kwargs

Any keyword arguments that htcondor.dags.DAG accepts

{}
Source code in ezdag/dags.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    name: str = "workflow",
    formatter: Optional[dags.NodeNameFormatter] = None,
    *args,
    **kwargs,
) -> None:
    super().__init__(*args, **kwargs)
    self.name = name
    self._node_layers: Dict[str, dags.NodeLayer] = {}
    self._ordered_layers: List[dags.NodeLayer] = []
    self._layers: Dict[str, Layer] = {}
    self._provides: Dict[str, Tuple[str, int]] = {}
    if formatter:
        self.formatter = formatter
    else:
        self.formatter = HexFormatter()
    self._dag_path: Optional[str] = None

attach

attach(layer)

Attach a layer of related job nodes to this DAG.

Parameters:

Name Type Description Default
layer Layer

The layer to attach.

required
Source code in ezdag/dags.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def attach(self, layer: Layer) -> None:
    """Attach a layer of related job nodes to this DAG.

    Parameters
    ----------
    layer
        The layer to attach.

    """
    key = layer.name
    if key in self._layers:
        raise KeyError(f"{key} layer already added to DAG")
    self._layers[layer.name] = layer

    # determine parent-child relationships and connect accordingly
    all_edges = defaultdict(set)
    if layer.has_dependencies:
        # determine edges
        for child_idx, node in enumerate(layer.nodes):
            for input_ in node.requires:
                if input_ in self._provides:
                    parent_name, parent_idx = self._provides[input_]
                    all_edges[parent_name].add((parent_idx, child_idx))

        if not all_edges:
            node_layer = self.layer(**layer.config(self.formatter))
            self._node_layers[key] = node_layer
            self._ordered_layers.append(node_layer)

        # determine edge type and connect
        for num, (parent, edges) in enumerate(all_edges.items()):
            edge = self._get_edge_type(parent, layer.name, edges)
            if num == 0:
                node_layer = self._node_layers[parent].child_layer(
                    **layer.config(self.formatter), edge=edge
                )
                self._node_layers[key] = node_layer
                self._ordered_layers.append(node_layer)
            else:
                self._node_layers[key].add_parents(
                    self._node_layers[parent], edge=edge
                )

    else:
        node_layer = self.layer(**layer.config(self.formatter))
        self._node_layers[key] = node_layer
        self._ordered_layers.append(node_layer)

    # register any data products the layer provides
    for idx, node in enumerate(layer.nodes):
        for output in node.provides:
            self._provides[output] = (key, idx)

create_log_dir

create_log_dir(log_dir=Path('logs'))

Create the log directory where job logs are stored.

Parameters:

Name Type Description Default
log_dir Path

The directory to create logs in. Defaults to ./logs.

Path('logs')
Source code in ezdag/dags.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def create_log_dir(self, log_dir: Path = Path("logs")) -> None:
    """Create the log directory where job logs are stored.

    Parameters
    ----------
    log_dir : Path
        The directory to create logs in. Defaults to ./logs.

    """
    warnings.warn(
        "create_log_dir has been deprecated in favor of automatically "
        "creating log directories upon DAG generation. this method "
        "will be removed in a future release",
        DeprecationWarning,
    )
    os.makedirs(log_dir, exist_ok=True)

submit

submit(path=Path.cwd(), write_script=False, **kwargs)

Submit the DAG via HTCondor.

If the DAG has not already been written to disk, do so as well. This is equivalent to calling write() prior to submission, making use of the path and write_script arguments for doing so. See DAG.write for more information.

Parameters:

Name Type Description Default
path Path

The directory to write the DAG files to. Defaults to the current working directory.

cwd()
write_script bool

Also write out the list of commands for each node to disk. Defaults to false.

False
**kwargs

Any keyword arguments that condor_submit_dag accepts. See htcondor.Submit.from_dag for more information.

{}
Source code in ezdag/dags.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def submit(
    self, path: Path = Path.cwd(), write_script: bool = False, **kwargs
) -> None:
    """Submit the DAG via HTCondor.

    If the DAG has not already been written to disk, do so as well.
    This is equivalent to calling write() prior to submission, making
    use of the `path` and `write_script` arguments for doing so. See
    DAG.write for more information.

    Parameters
    ----------
    path : Path
        The directory to write the DAG files to. Defaults to the current working
        directory.
    write_script : bool
        Also write out the list of commands for each node to disk. Defaults
        to false.
    **kwargs
        Any keyword arguments that `condor_submit_dag` accepts. See
        [htcondor.Submit.from_dag](https://htcondor.readthedocs.io/en/latest/apis/python-bindings/api/htcondor.html#htcondor.Submit.from_dag)
        for more information.

    """
    # write DAG to disk if not already done
    if not self._dag_path:
        self.write(path, write_script)
        self._dag_path = str(path / f"{self.name}.dag")

    # submit the DAG
    submit_kwargs = {"UseDagDir": True, **kwargs}
    dag_submit = htcondor.Submit.from_dag(self._dag_path, submit_kwargs)
    htcondor.Schedd().submit(dag_submit)

write

write(path=Path.cwd(), write_script=False)

Write out the given DAG to the given directory.

This includes the DAG description file itself, as well as any associated submit descriptions and log directories.

Also optionally writes out the list of commands for each node, which represents commands that would be run on the execute point, after taking into account file location changes where the job would be run if file transfer is enabled.

Parameters:

Name Type Description Default
path Path

The directory to write the DAG files to. Defaults to the current working directory.

cwd()
write_script bool

Also write out the list of commands for each node to disk. Defaults to false.

False
Source code in ezdag/dags.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def write(self, path: Path = Path.cwd(), write_script: bool = False) -> None:
    """Write out the given DAG to the given directory.

    This includes the DAG description file itself, as well as any
    associated submit descriptions and log directories.

    Also optionally writes out the list of commands for each node, which
    represents commands that would be run on the execute point, after
    taking into account file location changes where the job would be run if
    file transfer is enabled.

    Parameters
    ----------
    path : Path
        The directory to write the DAG files to. Defaults to the current working
        directory.
    write_script : bool
        Also write out the list of commands for each node to disk. Defaults
        to false.

    """
    dag_file = f"{self.name}.dag"
    self._write_dag(dag_file, path=path)
    self._dag_path = str(path / dag_file)
    if write_script:
        self._write_script(f"{self.name}.sh", path=path)

write_dag

write_dag(filename, path=Path.cwd(), **kwargs)

Write out the given DAG to the given directory.

This includes the DAG description file itself, as well as any associated submit descriptions and log directories.

Parameters:

Name Type Description Default
filename str

The name of the DAG description file itself, e.g. my_dag.dag.

required
path Path

The directory to write the DAG files to. Defaults to the current working directory.

cwd()
**kwargs

Any other keyword arguments that htcondor.dags.write_dag accepts

{}
Source code in ezdag/dags.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def write_dag(self, filename: str, path: Path = Path.cwd(), **kwargs) -> None:
    """Write out the given DAG to the given directory.

    This includes the DAG description file itself, as well as any
    associated submit descriptions and log directories.

    Parameters
    ----------
    filename : str
        The name of the DAG description file itself, e.g. my_dag.dag.
    path : Path
        The directory to write the DAG files to. Defaults to the current working
        directory.
    **kwargs
        Any other keyword arguments that htcondor.dags.write_dag accepts

    """
    warnings.warn(
        "write_dag has been deprecated in favor of write. "
        "this method will be removed in a future release",
        DeprecationWarning,
    )
    self._write_dag(filename, path, **kwargs)

write_script

write_script(filename, path=Path.cwd())

Write out the list of commands for each node to the given directory.

This represents commands that would be run on the execute point, after taking into account file location changes where the job would be run if file transfer is enabled.

Parameters:

Name Type Description Default
filename str

The name of the script file itself, e.g. my_dag.sh.

required
path Path

The directory to write the script file to. Defaults to the current working directory.

cwd()
Source code in ezdag/dags.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def write_script(
    self,
    filename: str,
    path: Path = Path.cwd(),
) -> None:
    """Write out the list of commands for each node to the given directory.

    This represents commands that would be run on the execute point, after
    taking into account file location changes where the job would be run if
    file transfer is enabled.

    Parameters
    ----------
    filename : str
        The name of the script file itself, e.g. my_dag.sh.
    path : Path
        The directory to write the script file to. Defaults to the current working
        directory.

    """
    warnings.warn(
        "write_dag has been deprecated in favor of write. "
        "this method will be removed in a future release",
        DeprecationWarning,
    )
    self._write_script(filename, path)

EdgeConnector

EdgeConnector(indices)

Bases: BaseEdge

This edge connects individual nodes in layers given an explicit mapping.

Source code in ezdag/dags.py
335
336
def __init__(self, indices) -> None:
    self.indices = indices

write_dag

write_dag(dag, dag_dir=Path.cwd(), formatter=None, **kwargs)

Write out the given DAG to the given directory.

This includes the DAG description file itself, as well as any associated submit descriptions.

Parameters:

Name Type Description Default
dag DAG

The DAG to write.

required
dag_dir Path

The directory to write the DAG files to. Defaults to the current working directory.

cwd()
formatter NodeNameFormatter

Defines how the node names are defined and formatted. Defaults to a hex-based formatter with 5 digits.

None
**kwargs

Any other keyword arguments that htcondor.dags.write_dag accepts

{}
Source code in ezdag/dags.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def write_dag(
    dag: dags.DAG,
    dag_dir: Path = Path.cwd(),
    formatter: Optional[dags.NodeNameFormatter] = None,
    **kwargs,
) -> Path:
    """Write out the given DAG to the given directory.

    This includes the DAG description file itself, as well as any associated
    submit descriptions.

    Parameters
    ----------
    dag : DAG
        The DAG to write.
    dag_dir : Path
        The directory to write the DAG files to. Defaults to the current working
        directory.
    formatter : htcondor.dags.NodeNameFormatter
        Defines how the node names are defined and formatted. Defaults to a
        hex-based formatter with 5 digits.
    **kwargs
        Any other keyword arguments that htcondor.dags.write_dag accepts

    """
    warnings.warn(
        "write_dag has been deprecated in favor of DAG.write. "
        "this method will be removed in a future release",
        DeprecationWarning,
    )
    if not formatter:
        formatter = HexFormatter()
    return dags.write_dag(dag, dag_dir, node_name_formatter=formatter, **kwargs)