Skip to content

sgn.apps

Pipeline class and related utilities to establish and execute a graph of element tasks.

Pipeline

A Pipeline is essentially a directed acyclic graph of tasks that process frames.

These tasks are grouped using Pads and Elements. The Pipeline class is responsible for registering methods to produce source, transform and sink elements and to assemble those elements in a directed acyclic graph. It also establishes an event loop to execute the graph asynchronously.

Source code in sgn/apps.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class Pipeline:
    """A Pipeline is essentially a directed acyclic graph of tasks that process frames.

    These tasks are grouped using Pads and Elements. The Pipeline class is responsible
    for registering methods to produce source, transform and sink elements and to
    assemble those elements in a directed acyclic graph. It also establishes an event
    loop to execute the graph asynchronously.
    """

    def __init__(self) -> None:
        """Class to establish and execute a graph of elements that will process frames.

        Registers methods to produce source, transform and sink elements and to assemble
        those elements in a directed acyclic graph. Also establishes an event loop.
        """
        self._registry: dict[str, Union[Pad, Element]] = {}
        self.graph: dict[Pad, set[Pad]] = {}
        self.loop = asyncio.get_event_loop()
        self.__loop_counter = 0
        self.sinks: dict[str, SinkElement] = {}
        self.elements: list[Element] = []

    def insert(
        self,
        *elements: Element,
        link_map: Optional[dict[Union[str, SinkPad], Union[str, SourcePad]]] = None,
    ) -> Pipeline:
        """Insert element(s) into the pipeline.

        Args:
            *elements:
                Iterable[Element], the ordered elements to insert into the pipeline
            link_map:
                Optional[dict[Union[str, SinkPad], Union[str, SourcePad]]],
                a mapping of source pad to sink pad names to link

        Returns:
            Pipeline, the pipeline with the elements inserted
        """

        for element in elements:
            assert isinstance(
                element, ElementLike
            ), f"Element {element} is not an instance of a sgn.Element"
            assert (
                element.name not in self._registry
            ), f"Element name '{element.name}' is already in use in this pipeline"
            self._registry[element.name] = element
            for pad in element.pad_list:
                if (
                    pad is not None
                ):  # Stupid mypy kludge, remove once python3.9 is dropped
                    assert (
                        pad.name not in self._registry
                    ), f"Pad name '{pad.name}' is already in use in this pipeline"
                    self._registry[pad.name] = pad
            if isinstance(element, SinkElement):
                self.sinks[element.name] = element
            self.graph.update(element.graph)
            self.elements.append(element)
        if link_map is not None:
            self.link(link_map)
        return self

    def link(
        self, link_map: Dict[Union[str, SinkPad], Union[str, SourcePad]]
    ) -> Pipeline:
        """Link pads in a pipeline.

        Args:
            link_map:
                dict[str, str], a mapping of sink pad to source pad names to link, note
                that the keys of the dictionary are the source pad names and the
                values are the sink pad names, so that: the data flows from value -> key
        """
        for sink_pad_name, source_pad_name in link_map.items():
            if isinstance(sink_pad_name, str):
                sink_pad = self._registry[sink_pad_name]
            else:
                sink_pad = sink_pad_name
            if isinstance(source_pad_name, str):
                source_pad = self._registry[source_pad_name]
            else:
                source_pad = source_pad_name

            assert isinstance(sink_pad, SinkPad), f"not a sink pad: {sink_pad}"
            assert isinstance(source_pad, SourcePad), f"not a source pad: {source_pad}"

            graph = sink_pad.link(source_pad)
            self.graph.update(graph)

        return self

    def nodes(self, pads: bool = True, intra: bool = False) -> tuple[str, ...]:
        """Get the nodes in the pipeline.

        Args:
            pads:
                bool, whether to include pads in the graph. If True, the graph will only
                consist of pads. If False, the graph will consist only of elements.
            intra:
                bool, default False, whether or not to include intra-element edges,
                e.g. from an element's sink pads to its source pads. In this case,
                whether to include Internal Pads in the graph.

        Returns:
            list[str], the nodes in the pipeline
        """
        # TODO remove this kludge when Python3.9 support is dropped
        element_types = [TransformElement, SinkElement, SourceElement, ElementLike]
        if sys.version_info < (3, 10):
            element_types = [SinkElement, SourceElement, TransformElement]

        if pads:
            pad_types = [SinkPad, SourcePad]
            if intra:
                pad_types.append(InternalPad)

            return tuple(
                sorted(
                    [
                        pad.name
                        for pad in self._registry.values()
                        if isinstance(pad, tuple(pad_types))
                    ]
                )
            )
        return tuple(
            sorted(
                [
                    element.name
                    for element in self._registry.values()
                    if isinstance(element, tuple(element_types))
                ]
            )
        )

    def edges(
        self, pads: bool = True, intra: bool = False
    ) -> tuple[tuple[str, str], ...]:
        """Get the edges in the pipeline.

        Args:
            pads:
                bool, whether to include pads in the graph. If True, the graph will only
                consist of pads. If False, the graph will consist only of elements.
            intra:
                bool, default False, whether or not to include intra-element edges, e.g.
                from an element's sink pads to its source pads

        Returns:
        """
        edges = set()
        for target, sources in self.graph.items():
            for source in sources:
                if not intra and isinstance(source, (SinkPad, InternalPad)):
                    continue

                if pads:
                    edges.add((source.name, target.name))
                else:
                    source_element = source.element
                    target_element = target.element
                    edges.add((source_element.name, target_element.name))
        return tuple(sorted(edges))

    def to_graph(self, pads: bool = True, intra: bool = False):
        """Get an empty graph object, and check if graphviz is installed.

        Args:
            pads:
                bool, whether to include pads in the graph. If True, the graph will only
                consist of pads. If False, the graph will consist only of elements.
            intra:
                bool, default False, whether or not to include intra-element edges, e.g.
                from an element's sink pads to its source pads

        Returns:
            DiGraph, the graph object
        """
        try:
            import graphviz
        except ImportError:
            raise ImportError("graphviz needs to be installed to visualize pipelines")

        # create the graph
        graph = graphviz.Digraph()

        nodes = self.nodes(pads=pads, intra=intra)
        edges = self.edges(pads=pads, intra=intra)

        # add nodes
        for node in nodes:
            graph.node(node.replace(":", "_"), node)

        # add edges
        for edge in edges:
            source, target = edge
            graph.edge(source.replace(":", "_"), target.replace(":", "_"))

        return graph

    def to_dot(self, pads: bool = False, intra: bool = False) -> str:
        """Convert the pipeline to a graph using graphviz.

        Args:
            pads:
                bool, whether to include pads in the graph. If True, the graph will only
                consist of pads. If False, the graph will consist only of elements.
            intra:
                bool, default False, whether or not to include intra-element edges, e.g.
                from an element's sink pads to its source pads

        Returns:
            str, the graph representation of the pipeline
        """
        return self.to_graph(pads=pads, intra=intra).source

    def visualize(self, path: str, pads: bool = True, intra: bool = False) -> None:
        """Convert the pipeline to a graph using graphviz, then render into a visual
        file.

        Args:
            path:
                str, the relative or full path to the file to write the graph to
        """
        dot = self.to_graph(pads=pads, intra=intra)

        # write to disk
        directory, filename = os.path.split(path)
        name, extension = os.path.splitext(filename)
        dot.render(
            filename=name,
            directory=directory,
            format=extension.strip("."),
            cleanup=True,
        )

    async def _execute_graphs(self) -> None:
        """Async graph execution function."""
        while not all(sink.at_eos for sink in self.sinks.values()):
            self.__loop_counter += 1
            LOGGER.info("Executing graph loop %s:", self.__loop_counter)
            ts = graphlib.TopologicalSorter(self.graph)
            ts.prepare()
            while ts.is_active():
                # concurrently execute the next batch of ready nodes
                nodes = ts.get_ready()
                tasks = [self.loop.create_task(node()) for node in nodes]  # type: ignore # noqa: E501
                await asyncio.gather(*tasks)
                ts.done(*nodes)

    def run(self) -> None:
        """Run the pipeline until End Of Stream (EOS)"""
        assert self.sinks, "Pipeline contains no sink elements."
        for element in self.elements:
            for source_pad in element.source_pads:
                assert source_pad.is_linked, f"Source pad not linked: {source_pad}"
            for sink_pad in element.sink_pads:
                assert sink_pad.is_linked, f"Sink pad not linked: {sink_pad}"
        self.loop.run_until_complete(self._execute_graphs())

__init__()

Class to establish and execute a graph of elements that will process frames.

Registers methods to produce source, transform and sink elements and to assemble those elements in a directed acyclic graph. Also establishes an event loop.

Source code in sgn/apps.py
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(self) -> None:
    """Class to establish and execute a graph of elements that will process frames.

    Registers methods to produce source, transform and sink elements and to assemble
    those elements in a directed acyclic graph. Also establishes an event loop.
    """
    self._registry: dict[str, Union[Pad, Element]] = {}
    self.graph: dict[Pad, set[Pad]] = {}
    self.loop = asyncio.get_event_loop()
    self.__loop_counter = 0
    self.sinks: dict[str, SinkElement] = {}
    self.elements: list[Element] = []

edges(pads=True, intra=False)

Get the edges in the pipeline.

Parameters:

Name Type Description Default
pads bool

bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements.

True
intra bool

bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads

False

Returns:

Source code in sgn/apps.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def edges(
    self, pads: bool = True, intra: bool = False
) -> tuple[tuple[str, str], ...]:
    """Get the edges in the pipeline.

    Args:
        pads:
            bool, whether to include pads in the graph. If True, the graph will only
            consist of pads. If False, the graph will consist only of elements.
        intra:
            bool, default False, whether or not to include intra-element edges, e.g.
            from an element's sink pads to its source pads

    Returns:
    """
    edges = set()
    for target, sources in self.graph.items():
        for source in sources:
            if not intra and isinstance(source, (SinkPad, InternalPad)):
                continue

            if pads:
                edges.add((source.name, target.name))
            else:
                source_element = source.element
                target_element = target.element
                edges.add((source_element.name, target_element.name))
    return tuple(sorted(edges))

insert(*elements, link_map=None)

Insert element(s) into the pipeline.

Parameters:

Name Type Description Default
*elements Element

Iterable[Element], the ordered elements to insert into the pipeline

()
link_map Optional[dict[Union[str, SinkPad], Union[str, SourcePad]]]

Optional[dict[Union[str, SinkPad], Union[str, SourcePad]]], a mapping of source pad to sink pad names to link

None

Returns:

Type Description
Pipeline

Pipeline, the pipeline with the elements inserted

Source code in sgn/apps.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def insert(
    self,
    *elements: Element,
    link_map: Optional[dict[Union[str, SinkPad], Union[str, SourcePad]]] = None,
) -> Pipeline:
    """Insert element(s) into the pipeline.

    Args:
        *elements:
            Iterable[Element], the ordered elements to insert into the pipeline
        link_map:
            Optional[dict[Union[str, SinkPad], Union[str, SourcePad]]],
            a mapping of source pad to sink pad names to link

    Returns:
        Pipeline, the pipeline with the elements inserted
    """

    for element in elements:
        assert isinstance(
            element, ElementLike
        ), f"Element {element} is not an instance of a sgn.Element"
        assert (
            element.name not in self._registry
        ), f"Element name '{element.name}' is already in use in this pipeline"
        self._registry[element.name] = element
        for pad in element.pad_list:
            if (
                pad is not None
            ):  # Stupid mypy kludge, remove once python3.9 is dropped
                assert (
                    pad.name not in self._registry
                ), f"Pad name '{pad.name}' is already in use in this pipeline"
                self._registry[pad.name] = pad
        if isinstance(element, SinkElement):
            self.sinks[element.name] = element
        self.graph.update(element.graph)
        self.elements.append(element)
    if link_map is not None:
        self.link(link_map)
    return self

Link pads in a pipeline.

Parameters:

Name Type Description Default
link_map Dict[Union[str, SinkPad], Union[str, SourcePad]]

dict[str, str], a mapping of sink pad to source pad names to link, note that the keys of the dictionary are the source pad names and the values are the sink pad names, so that: the data flows from value -> key

required
Source code in sgn/apps.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def link(
    self, link_map: Dict[Union[str, SinkPad], Union[str, SourcePad]]
) -> Pipeline:
    """Link pads in a pipeline.

    Args:
        link_map:
            dict[str, str], a mapping of sink pad to source pad names to link, note
            that the keys of the dictionary are the source pad names and the
            values are the sink pad names, so that: the data flows from value -> key
    """
    for sink_pad_name, source_pad_name in link_map.items():
        if isinstance(sink_pad_name, str):
            sink_pad = self._registry[sink_pad_name]
        else:
            sink_pad = sink_pad_name
        if isinstance(source_pad_name, str):
            source_pad = self._registry[source_pad_name]
        else:
            source_pad = source_pad_name

        assert isinstance(sink_pad, SinkPad), f"not a sink pad: {sink_pad}"
        assert isinstance(source_pad, SourcePad), f"not a source pad: {source_pad}"

        graph = sink_pad.link(source_pad)
        self.graph.update(graph)

    return self

nodes(pads=True, intra=False)

Get the nodes in the pipeline.

Parameters:

Name Type Description Default
pads bool

bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements.

True
intra bool

bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads. In this case, whether to include Internal Pads in the graph.

False

Returns:

Type Description
tuple[str, ...]

list[str], the nodes in the pipeline

Source code in sgn/apps.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def nodes(self, pads: bool = True, intra: bool = False) -> tuple[str, ...]:
    """Get the nodes in the pipeline.

    Args:
        pads:
            bool, whether to include pads in the graph. If True, the graph will only
            consist of pads. If False, the graph will consist only of elements.
        intra:
            bool, default False, whether or not to include intra-element edges,
            e.g. from an element's sink pads to its source pads. In this case,
            whether to include Internal Pads in the graph.

    Returns:
        list[str], the nodes in the pipeline
    """
    # TODO remove this kludge when Python3.9 support is dropped
    element_types = [TransformElement, SinkElement, SourceElement, ElementLike]
    if sys.version_info < (3, 10):
        element_types = [SinkElement, SourceElement, TransformElement]

    if pads:
        pad_types = [SinkPad, SourcePad]
        if intra:
            pad_types.append(InternalPad)

        return tuple(
            sorted(
                [
                    pad.name
                    for pad in self._registry.values()
                    if isinstance(pad, tuple(pad_types))
                ]
            )
        )
    return tuple(
        sorted(
            [
                element.name
                for element in self._registry.values()
                if isinstance(element, tuple(element_types))
            ]
        )
    )

run()

Run the pipeline until End Of Stream (EOS)

Source code in sgn/apps.py
280
281
282
283
284
285
286
287
288
def run(self) -> None:
    """Run the pipeline until End Of Stream (EOS)"""
    assert self.sinks, "Pipeline contains no sink elements."
    for element in self.elements:
        for source_pad in element.source_pads:
            assert source_pad.is_linked, f"Source pad not linked: {source_pad}"
        for sink_pad in element.sink_pads:
            assert sink_pad.is_linked, f"Sink pad not linked: {sink_pad}"
    self.loop.run_until_complete(self._execute_graphs())

to_dot(pads=False, intra=False)

Convert the pipeline to a graph using graphviz.

Parameters:

Name Type Description Default
pads bool

bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements.

False
intra bool

bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads

False

Returns:

Type Description
str

str, the graph representation of the pipeline

Source code in sgn/apps.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def to_dot(self, pads: bool = False, intra: bool = False) -> str:
    """Convert the pipeline to a graph using graphviz.

    Args:
        pads:
            bool, whether to include pads in the graph. If True, the graph will only
            consist of pads. If False, the graph will consist only of elements.
        intra:
            bool, default False, whether or not to include intra-element edges, e.g.
            from an element's sink pads to its source pads

    Returns:
        str, the graph representation of the pipeline
    """
    return self.to_graph(pads=pads, intra=intra).source

to_graph(pads=True, intra=False)

Get an empty graph object, and check if graphviz is installed.

Parameters:

Name Type Description Default
pads bool

bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements.

True
intra bool

bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads

False

Returns:

Type Description

DiGraph, the graph object

Source code in sgn/apps.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def to_graph(self, pads: bool = True, intra: bool = False):
    """Get an empty graph object, and check if graphviz is installed.

    Args:
        pads:
            bool, whether to include pads in the graph. If True, the graph will only
            consist of pads. If False, the graph will consist only of elements.
        intra:
            bool, default False, whether or not to include intra-element edges, e.g.
            from an element's sink pads to its source pads

    Returns:
        DiGraph, the graph object
    """
    try:
        import graphviz
    except ImportError:
        raise ImportError("graphviz needs to be installed to visualize pipelines")

    # create the graph
    graph = graphviz.Digraph()

    nodes = self.nodes(pads=pads, intra=intra)
    edges = self.edges(pads=pads, intra=intra)

    # add nodes
    for node in nodes:
        graph.node(node.replace(":", "_"), node)

    # add edges
    for edge in edges:
        source, target = edge
        graph.edge(source.replace(":", "_"), target.replace(":", "_"))

    return graph

visualize(path, pads=True, intra=False)

Convert the pipeline to a graph using graphviz, then render into a visual file.

Parameters:

Name Type Description Default
path str

str, the relative or full path to the file to write the graph to

required
Source code in sgn/apps.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def visualize(self, path: str, pads: bool = True, intra: bool = False) -> None:
    """Convert the pipeline to a graph using graphviz, then render into a visual
    file.

    Args:
        path:
            str, the relative or full path to the file to write the graph to
    """
    dot = self.to_graph(pads=pads, intra=intra)

    # write to disk
    directory, filename = os.path.split(path)
    name, extension = os.path.splitext(filename)
    dot.render(
        filename=name,
        directory=directory,
        format=extension.strip("."),
        cleanup=True,
    )