Skip to content

dags

DAG

DAG(name='workflow', formatter=None, *args, **kwargs)

Bases: DAG

Defines a DAGMan workflow including the execution graph and configuration.

Parameters:

Name Type Description Default
name str

The name of the DAG workflow, used for files written to disk and for DAG submission when calling write() and submit(). Defaults to "workflow".

'workflow'
formatter NodeNameFormatter

Defines how the node names are defined and formatted. Defaults to a hex-based formatter with 5 digits.

None
*args

Any positional arguments that htcondor.dags.DAG accepts

()
**kwargs

Any keyword arguments that htcondor.dags.DAG accepts

{}
Source code in ezdag/dags.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(
    self,
    name: str = "workflow",
    formatter: dags.NodeNameFormatter | None = None,
    *args,
    **kwargs,
) -> None:
    super().__init__(*args, **kwargs)
    self.name = name
    self._node_layers: dict[str, dags.NodeLayer] = {}
    self._ordered_layers: list[dags.NodeLayer] = []
    self._layers: dict[str, Layer] = {}
    self._provides: dict[str, tuple[str, int]] = {}
    self._all_edges: dict[str, dict[str, set[tuple[int, int]]]] = {}
    if formatter:
        self.formatter = formatter
    else:
        self.formatter = HexFormatter()
    self._dag_path: str | None = None
    self._layer_count: Counter[str] = Counter()

attach

attach(layer)

Attach a layer of related job nodes to this DAG.

Parameters:

Name Type Description Default
layer Layer

The layer to attach.

required
Source code in ezdag/dags.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def attach(self, layer: Layer) -> None:
    """Attach a layer of related job nodes to this DAG.

    Parameters
    ----------
    layer
        The layer to attach.

    """
    self._layer_count[layer.name] += 1

    # layer naming - append number for duplicate layers
    layer_config = layer.config(self.formatter)
    if self._layer_count[layer.name] > 1:
        layer_name = f"{layer.name}{self._layer_count[layer.name]}"
        layer_config["name"] = layer_name
    else:
        layer_name = layer.name
    self._layers[layer_name] = layer

    # determine parent-child relationships and connect accordingly
    all_edges = defaultdict(set)
    if layer.has_dependencies:
        # determine edges
        for child_idx, node in enumerate(layer.nodes):
            for input_ in node.requires:
                if input_ in self._provides:
                    parent_name, parent_idx = self._provides[input_]
                    all_edges[parent_name].add((parent_idx, child_idx))
        self._all_edges[layer_name] = all_edges

        if not all_edges:
            node_layer = self.layer(**layer_config)
            self._node_layers[layer_name] = node_layer
            self._ordered_layers.append(node_layer)

        # determine edge type and connect
        for num, (parent, edges) in enumerate(all_edges.items()):
            edge = self._get_edge_type(parent, layer_name, edges)
            if num == 0:
                node_layer = self._node_layers[parent].child_layer(
                    **layer_config, edge=edge
                )
                self._node_layers[layer_name] = node_layer
                self._ordered_layers.append(node_layer)
            else:
                self._node_layers[layer_name].add_parents(
                    self._node_layers[parent], edge=edge
                )

    else:
        node_layer = self.layer(**layer_config)
        self._node_layers[layer_name] = node_layer
        self._ordered_layers.append(node_layer)

    # register any data products the layer provides
    for idx, node in enumerate(layer.nodes):
        for output in node.provides:
            self._provides[output] = (layer_name, idx)

create_log_dir

create_log_dir(log_dir=Path('logs'))

Create the log directory where job logs are stored.

Parameters:

Name Type Description Default
log_dir Path

The directory to create logs in. Defaults to ./logs.

Path('logs')
Source code in ezdag/dags.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def create_log_dir(self, log_dir: Path = Path("logs")) -> None:
    """Create the log directory where job logs are stored.

    Parameters
    ----------
    log_dir : Path
        The directory to create logs in. Defaults to ./logs.

    """
    warnings.warn(
        "create_log_dir has been deprecated in favor of automatically "
        "creating log directories upon DAG generation. this method "
        "will be removed in a future release",
        DeprecationWarning,
        stacklevel=2,
    )
    os.makedirs(log_dir, exist_ok=True)

submit

submit(path=None, *, write_script=False, **kwargs)

Submit the DAG via HTCondor.

If the DAG has not already been written to disk, do so as well. This is equivalent to calling write() prior to submission, making use of the path and write_script arguments for doing so. See DAG.write for more information.

Parameters:

Name Type Description Default
path Path

The directory to write the DAG files to. Defaults to the current working directory.

None
write_script bool

Also write out the list of commands for each node to disk. Defaults to false.

False
**kwargs

Any keyword arguments that condor_submit_dag accepts. See htcondor.Submit.from_dag for more information.

{}

Returns:

Type Description
SubmitResult

The submit result containing the cluster ID and ClassAd of the submitted DAG.

Source code in ezdag/dags.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def submit(
    self, path: Path | None = None, *, write_script: bool = False, **kwargs
) -> htcondor.SubmitResult:
    """Submit the DAG via HTCondor.

    If the DAG has not already been written to disk, do so as well.
    This is equivalent to calling write() prior to submission, making
    use of the `path` and `write_script` arguments for doing so. See
    DAG.write for more information.

    Parameters
    ----------
    path : Path
        The directory to write the DAG files to. Defaults to the current working
        directory.
    write_script : bool
        Also write out the list of commands for each node to disk. Defaults
        to false.
    **kwargs
        Any keyword arguments that `condor_submit_dag` accepts. See
        [htcondor.Submit.from_dag](https://htcondor.readthedocs.io/en/latest/apis/python-bindings/api/htcondor.html#htcondor.Submit.from_dag)
        for more information.

    Returns
    -------
    htcondor.SubmitResult
        The submit result containing the cluster ID and ClassAd of the
        submitted DAG.

    """
    if not path:
        path = Path.cwd()

    # write DAG to disk if not already done
    if not self._dag_path:
        self.write(path, write_script=write_script)
        self._dag_path = str(path / f"{self.name}.dag")

    # submit the DAG
    submit_kwargs = {"UseDagDir": True, **kwargs}
    dag_submit = htcondor.Submit.from_dag(self._dag_path, submit_kwargs)
    return htcondor.Schedd().submit(dag_submit)

visualize

visualize(path=None, image_format='svg')

Visualize a DAG.

Note: This requires graphviz to be installed.

Parameters:

Name Type Description Default
path Path

The directory to write the graph diagram to. Defaults to the current working directory as {workflow}.png.

None
image_format str

The output file format to use. Defaults to 'svg'.

'svg'
Source code in ezdag/dags.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def visualize(self, path: Path | None = None, image_format: str = "svg") -> None:
    """Visualize a DAG.

    Note: This requires graphviz to be installed.

    Parameters
    ----------
    path : Path
        The directory to write the graph diagram to. Defaults to the
        current working directory as {workflow}.png.
    image_format : str
        The output file format to use. Defaults to 'svg'.

    """
    try:
        import graphviz
    except ImportError as exc:
        msg = "graphviz needs to be installed to visualize DAGs"
        raise ImportError(msg) from exc

    if not path:
        path = Path.cwd()

    # create the graph
    graph = graphviz.Digraph()
    for layer in self.walk(dags.WalkOrder("BREADTH")):
        for idx in range(len(layer.vars)):
            node_name = self.formatter.generate(layer.name, idx)
            graph.node(node_name.replace(":", "-"))
        for parent, edges in self._all_edges[layer.name].items():
            for parent_idx, child_idx in edges:
                parent_name = self.formatter.generate(parent, parent_idx)
                child_name = self.formatter.generate(layer.name, child_idx)
                graph.edge(
                    parent_name.replace(":", "-"),
                    child_name.replace(":", "-"),
                )

    # write to disk
    graph.render(
        filename=self.name,
        directory=str(path),
        format=image_format,
        cleanup=True,
    )

write

write(path=None, *, write_script=False)

Write out the given DAG to the given directory.

This includes the DAG description file itself, as well as any associated submit descriptions and log directories.

Also optionally writes out the list of commands for each node, which represents commands that would be run on the execute point, after taking into account file location changes where the job would be run if file transfer is enabled.

Parameters:

Name Type Description Default
path Path

The directory to write the DAG files to. Defaults to the current working directory.

None
write_script bool

Also write out the list of commands for each node to disk. Defaults to false.

False
Source code in ezdag/dags.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def write(self, path: Path | None = None, *, write_script: bool = False) -> None:
    """Write out the given DAG to the given directory.

    This includes the DAG description file itself, as well as any
    associated submit descriptions and log directories.

    Also optionally writes out the list of commands for each node, which
    represents commands that would be run on the execute point, after
    taking into account file location changes where the job would be run if
    file transfer is enabled.

    Parameters
    ----------
    path : Path
        The directory to write the DAG files to. Defaults to the current working
        directory.
    write_script : bool
        Also write out the list of commands for each node to disk. Defaults
        to false.

    """
    if not path:
        path = Path.cwd()

    dag_file = f"{self.name}.dag"
    self._write_dag(dag_file, path=path)
    self._dag_path = str(path / dag_file)
    if write_script:
        self._write_script(f"{self.name}.sh", path=path)

write_dag

write_dag(filename, path=None, **kwargs)

Write out the given DAG to the given directory.

This includes the DAG description file itself, as well as any associated submit descriptions and log directories.

Parameters:

Name Type Description Default
filename str

The name of the DAG description file itself, e.g. my_dag.dag.

required
path Path

The directory to write the DAG files to. Defaults to the current working directory.

None
**kwargs

Any other keyword arguments that htcondor.dags.write_dag accepts

{}
Source code in ezdag/dags.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def write_dag(self, filename: str, path: Path | None = None, **kwargs) -> None:
    """Write out the given DAG to the given directory.

    This includes the DAG description file itself, as well as any
    associated submit descriptions and log directories.

    Parameters
    ----------
    filename : str
        The name of the DAG description file itself, e.g. my_dag.dag.
    path : Path
        The directory to write the DAG files to. Defaults to the current working
        directory.
    **kwargs
        Any other keyword arguments that htcondor.dags.write_dag accepts

    """
    warnings.warn(
        "write_dag has been deprecated in favor of write. "
        "this method will be removed in a future release",
        DeprecationWarning,
        stacklevel=2,
    )
    if not path:
        path = Path.cwd()
    self._write_dag(filename, path, **kwargs)

write_script

write_script(filename, path=None)

Write out the list of commands for each node to the given directory.

This represents commands that would be run on the execute point, after taking into account file location changes where the job would be run if file transfer is enabled.

Parameters:

Name Type Description Default
filename str

The name of the script file itself, e.g. my_dag.sh.

required
path Path

The directory to write the script file to. Defaults to the current working directory.

None
Source code in ezdag/dags.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def write_script(
    self,
    filename: str,
    path: Path | None = None,
) -> None:
    """Write out the list of commands for each node to the given directory.

    This represents commands that would be run on the execute point, after
    taking into account file location changes where the job would be run if
    file transfer is enabled.

    Parameters
    ----------
    filename : str
        The name of the script file itself, e.g. my_dag.sh.
    path : Path
        The directory to write the script file to. Defaults to the current working
        directory.

    """
    warnings.warn(
        "write_dag has been deprecated in favor of write. "
        "this method will be removed in a future release",
        DeprecationWarning,
        stacklevel=2,
    )
    if not path:
        path = Path.cwd()
    self._write_script(filename, path)

EdgeConnector

EdgeConnector(indices)

Bases: BaseEdge

This edge connects individual nodes in layers given an explicit mapping.

Source code in ezdag/dags.py
413
414
def __init__(self, indices) -> None:
    self.indices = indices

write_dag

write_dag(dag, dag_dir=None, formatter=None, **kwargs)

Write out the given DAG to the given directory.

This includes the DAG description file itself, as well as any associated submit descriptions.

Parameters:

Name Type Description Default
dag DAG

The DAG to write.

required
dag_dir Path

The directory to write the DAG files to. Defaults to the current working directory.

None
formatter NodeNameFormatter

Defines how the node names are defined and formatted. Defaults to a hex-based formatter with 5 digits.

None
**kwargs

Any other keyword arguments that htcondor.dags.write_dag accepts

{}
Source code in ezdag/dags.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
def write_dag(
    dag: dags.DAG,
    dag_dir: Path | None = None,
    formatter: dags.NodeNameFormatter | None = None,
    **kwargs,
) -> Path:
    """Write out the given DAG to the given directory.

    This includes the DAG description file itself, as well as any associated
    submit descriptions.

    Parameters
    ----------
    dag : DAG
        The DAG to write.
    dag_dir : Path
        The directory to write the DAG files to. Defaults to the current working
        directory.
    formatter : htcondor.dags.NodeNameFormatter
        Defines how the node names are defined and formatted. Defaults to a
        hex-based formatter with 5 digits.
    **kwargs
        Any other keyword arguments that htcondor.dags.write_dag accepts

    """
    warnings.warn(
        "write_dag has been deprecated in favor of DAG.write. "
        "this method will be removed in a future release",
        DeprecationWarning,
        stacklevel=2,
    )
    if not dag_dir:
        dag_dir = Path.cwd()
    if not formatter:
        formatter = HexFormatter()
    return dags.write_dag(dag, dag_dir, node_name_formatter=formatter, **kwargs)