Skip to content

Pipeline - Orchestrating SGN Task Graphs

The Pipeline class is the central orchestrator for SGN applications. It manages the directed acyclic graph (DAG) of elements, handles pad connections, and executes your data processing pipeline asynchronously.

Overview

A Pipeline coordinates three key responsibilities:

  1. Element Management - Register and track elements in your graph
  2. Connection Management - Link pads between elements to define data flow
  3. Execution - Run the async event loop to process frames through the graph

Quick Start: Your First Pipeline

Here's a minimal working pipeline:

from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline

class HelloSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], **kwargs)
        self.count = 0

    def new(self, pad):
        self.count += 1
        if self.count > 3:
            return Frame(EOS=True)
        return Frame(data=f"Hello {self.count}")

class PrinterSink(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["in"], **kwargs)

    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)
            return
        print(frame.data)

# Build and run pipeline
p = Pipeline()
source = HelloSource()
sink = PrinterSink()

p.insert(source, sink)
p.link({sink.snks["in"]: source.srcs["out"]})
p.run()
# Output:
# Hello 1
# Hello 2
# Hello 3

Connecting Elements: Two Approaches

The traditional approach gives you fine-grained control over connections:

from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline

class Counter(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["numbers"], **kwargs)
        self.n = 0

    def new(self, pad):
        self.n += 1
        if self.n > 5:
            return Frame(EOS=True)
        return Frame(data=self.n)

class Doubler(TransformElement):
    def __init__(self, **kwargs):
        super().__init__(
            source_pad_names=["doubled"],
            sink_pad_names=["input"],
            **kwargs
        )
        self.value = None

    def pull(self, pad, frame):
        self.value = frame

    def new(self, pad):
        if self.value.EOS:
            return Frame(EOS=True)
        return Frame(data=self.value.data * 2)

class Printer(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["data"], **kwargs)

    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)
            return
        print(f"Result: {frame.data}")

# Explicit connection
p = Pipeline()
source = Counter()
transform = Doubler()
sink = Printer()

# Insert all elements
p.insert(source, transform, sink)

# Link pads explicitly (data flows right to left in dict)
p.link({
    transform.snks["input"]: source.srcs["numbers"],  # source -> transform
    sink.snks["data"]: transform.srcs["doubled"]       # transform -> sink
})

p.run()
# Output:
# Result: 2
# Result: 4
# Result: 6
# Result: 8
# Result: 10

link() Dictionary Format

In link_map, data flows from value to key:

{sink_pad: source_pad}  # Data flows from source_pad -> sink_pad

Approach 2: connect() (Automatic Matching)

The modern connect() method automatically matches pads by name:

from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline

class NumberSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["data"], **kwargs)
        self.n = 0

    def new(self, pad):
        self.n += 1
        if self.n > 3:
            return Frame(EOS=True)
        return Frame(data=self.n)

class Squarer(TransformElement):
    def __init__(self, **kwargs):
        super().__init__(
            source_pad_names=["data"],
            sink_pad_names=["data"],
            **kwargs
        )
        self.input = None

    def pull(self, pad, frame):
        self.input = frame

    def new(self, pad):
        if self.input.EOS:
            return Frame(EOS=True)
        return Frame(data=self.input.data ** 2)

class ResultSink(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["data"], **kwargs)

    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)
            return
        print(f"Squared: {frame.data}")

# Automatic connection (all pads named "data")
p = Pipeline()
source = NumberSource()
transform = Squarer()
sink = ResultSink()

p.connect(source, transform)  # Matches "data" pads
p.connect(transform, sink)    # Matches "data" pads

p.run()
# Output:
# Squared: 1
# Squared: 4
# Squared: 9

When to Use connect()

Use connect() when: - Pad names match between elements - You want cleaner, more readable code - Building simple linear pipelines

Advanced Connection Patterns

Multiple Source Pads

Elements can have multiple output pads for different data streams:

from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline

class DualSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["evens", "odds"], **kwargs)
        self.n = 0

    def new(self, pad):
        self.n += 1
        if self.n > 6:
            return Frame(EOS=True)

        # Different output based on pad
        if pad == self.srcs["evens"]:
            return Frame(data=self.n if self.n % 2 == 0 else None)
        else:  # odds pad
            return Frame(data=self.n if self.n % 2 == 1 else None)

class NumberPrinter(SinkElement):
    def __init__(self, label, **kwargs):
        super().__init__(sink_pad_names=["numbers"], **kwargs)
        self.label = label

    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)
            return
        if frame.data is not None:
            print(f"{self.label}: {frame.data}")

p = Pipeline()
source = DualSource()
even_sink = NumberPrinter("Even")
odd_sink = NumberPrinter("Odd")

p.insert(source, even_sink, odd_sink)
p.link({
    even_sink.snks["numbers"]: source.srcs["evens"],
    odd_sink.snks["numbers"]: source.srcs["odds"]
})

p.run()
# Output:
# Odd: 1
# Even: 2
# Odd: 3
# Even: 4
# Odd: 5
# Even: 6

Fan-Out: One Source, Multiple Sinks

Multiple sinks can connect to the same source pad:

from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline

class DataSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["data"], **kwargs)
        self.n = 0

    def new(self, pad):
        self.n += 1
        if self.n > 3:
            return Frame(EOS=True)
        return Frame(data=self.n * 10)

class Sink1(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["in"], **kwargs)

    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)
            return
        print(f"Sink1 received: {frame.data}")

class Sink2(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["in"], **kwargs)

    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)
            return
        print(f"Sink2 received: {frame.data}")

p = Pipeline()
source = DataSource()
sink1 = Sink1()
sink2 = Sink2()

p.insert(source, sink1, sink2)
p.link({
    sink1.snks["in"]: source.srcs["data"],  # Fan-out: same source
    sink2.snks["in"]: source.srcs["data"]   # to multiple sinks
})

p.run()
# Output:
# Sink1 received: 10
# Sink2 received: 10
# Sink1 received: 20
# Sink2 received: 20
# Sink1 received: 30
# Sink2 received: 30

Pipeline Lifecycle

1. Build Phase

from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline

class SimpleSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], **kwargs)
    def new(self, pad):
        return Frame(data=1)

class SimpleTransform(TransformElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], sink_pad_names=["in"], **kwargs)
        self.data = None
    def pull(self, pad, frame):
        self.data = frame
    def new(self, pad):
        return self.data

class SimpleSink(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["in"], **kwargs)
    def pull(self, pad, frame):
        pass

p = Pipeline()

# Add elements
source = SimpleSource()
transform = SimpleTransform()
sink = SimpleSink()
p.insert(source, transform, sink)

# Define connections
p.link({transform.snks["in"]: source.srcs["out"]})
p.link({sink.snks["in"]: transform.srcs["out"]})

2. Execution Phase

from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline

class QuickSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], **kwargs)
        self.n = 0
    def new(self, pad):
        self.n += 1
        if self.n > 2:
            return Frame(EOS=True)
        return Frame(data=self.n)

class QuickSink(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["in"], **kwargs)
    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)

p = Pipeline()
p.insert(QuickSource(), QuickSink())
p.link({p.elements[1].snks["in"]: p.elements[0].srcs["out"]})

# Run the pipeline (blocking until all sinks reach EOS)
p.run()

3. Accessing Results

from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline

class DataSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], **kwargs)
        self.n = 0
    def new(self, pad):
        self.n += 1
        if self.n > 3:
            return Frame(EOS=True)
        return Frame(data=self.n * 10)

class CollectorSink(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["in"], **kwargs)
        self.collected_data = []
    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)
            return
        self.collected_data.append(frame.data)

p = Pipeline()
source = DataSource()
sink = CollectorSink(name="sink_name")
p.insert(source, sink)
p.link({sink.snks["in"]: source.srcs["out"]})
p.run()

# Access collected data after run()
sink = p["sink_name"]  # Get element by name
results = sink.collected_data
print(f"Collected: {results}")  # [10, 20, 30]

Accessing Pipeline Elements

Elements and pads are accessible by name:

from sgn.base import SourceElement, Frame
from sgn.apps import Pipeline

class MySource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], **kwargs)
    def new(self, pad):
        return Frame(data=1)

p = Pipeline()
source = MySource(name="my_source")
p.insert(source)

# Access element by name
elem = p["my_source"]

# Access pad by full name
pad = p["my_source:src:out"]

Common Patterns

Pattern 1: Linear Pipeline

from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline

class Source(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], **kwargs)
        self.n = 0
    def new(self, pad):
        self.n += 1
        if self.n > 2:
            return Frame(EOS=True)
        return Frame(data=self.n)

class Transform(TransformElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], sink_pad_names=["in"], **kwargs)
        self.val = None
    def pull(self, pad, frame):
        self.val = frame
    def new(self, pad):
        if self.val.EOS:
            return Frame(EOS=True)
        return Frame(data=self.val.data * 2)

class Sink(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["in"], **kwargs)
    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)

p = Pipeline()
source = Source()
transform1 = Transform()
transform2 = Transform()
sink = Sink()

p.insert(source, transform1, transform2, sink)
p.link({
    transform1.snks["in"]: source.srcs["out"],
    transform2.snks["in"]: transform1.srcs["out"],
    sink.snks["in"]: transform2.srcs["out"]
})
p.run()

Pattern 2: Using connect() for Linear Pipelines

from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline

class DataSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["data"], **kwargs)
        self.n = 0
    def new(self, pad):
        self.n += 1
        if self.n > 2:
            return Frame(EOS=True)
        return Frame(data=self.n)

class DataTransform(TransformElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["data"], sink_pad_names=["data"], **kwargs)
        self.val = None
    def pull(self, pad, frame):
        self.val = frame
    def new(self, pad):
        if self.val.EOS:
            return Frame(EOS=True)
        return Frame(data=self.val.data + 1)

class DataSink(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["data"], **kwargs)
    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)

p = Pipeline()
source = DataSource()
transform1 = DataTransform()
transform2 = DataTransform()
sink = DataSink()

p.connect(source, transform1)
p.connect(transform1, transform2)
p.connect(transform2, sink)
p.run()
from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline

class NumSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], **kwargs)
        self.n = 0
    def new(self, pad):
        self.n += 1
        if self.n > 2:
            return Frame(EOS=True)
        return Frame(data=self.n)

class NumTransform(TransformElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], sink_pad_names=["in"], **kwargs)
        self.val = None
    def pull(self, pad, frame):
        self.val = frame
    def new(self, pad):
        if self.val.EOS:
            return Frame(EOS=True)
        return Frame(data=self.val.data * 3)

class NumSink(SinkElement):
    def __init__(self, **kwargs):
        super().__init__(sink_pad_names=["in"], **kwargs)
    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)

p = Pipeline()
source = NumSource()
transform = NumTransform()
sink = NumSink()

p.insert(
    source, transform, sink,
    link_map={
        transform.snks["in"]: source.srcs["out"],
        sink.snks["in"]: transform.srcs["out"]
    }
)
p.run()

Error Handling

The pipeline validates connections and will raise errors for:

  • Duplicate element names
  • Invalid pad types (e.g., connecting two source pads)
  • Missing elements (accessing non-existent elements)
  • Circular dependencies (detected during execution)
from sgn.base import SourceElement, Frame
from sgn.apps import Pipeline

class TestSource(SourceElement):
    def __init__(self, **kwargs):
        super().__init__(source_pad_names=["out"], **kwargs)
    def new(self, pad):
        return Frame(data=1)

# This will raise an error: duplicate names
p = Pipeline()
p.insert(TestSource(name="source"))
try:
    p.insert(TestSource(name="source"))  # AssertionError!
except AssertionError as e:
    print(f"Caught error: {e}")

Visualization

Generate a visual graph of your pipeline:

from sgn.apps import Pipeline
from sgn.visualize import visualize

p = Pipeline()
p.insert(source, transform, sink)
p.link({...})

# Generate graphviz diagram
visualize(p, filename="my_pipeline")
# Creates my_pipeline.pdf

Requires Graphviz

Install graphviz: pip install sgn[visualize]

API Reference

sgn.apps

Pipeline class and related utilities to establish and execute a graph of element tasks.

Pipeline

A Pipeline is essentially a directed acyclic graph of tasks that process frames.

These tasks are grouped using Pads and Elements. The Pipeline class is responsible for registering methods to produce source, transform and sink elements and to assemble those elements in a directed acyclic graph. It also establishes an event loop to execute the graph asynchronously.

Source code in sgn/apps.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
class Pipeline:
    """A Pipeline is essentially a directed acyclic graph of tasks that process frames.

    These tasks are grouped using Pads and Elements. The Pipeline class is responsible
    for registering methods to produce source, transform and sink elements and to
    assemble those elements in a directed acyclic graph. It also establishes an event
    loop to execute the graph asynchronously.
    """

    def __init__(self) -> None:
        """Class to establish and execute a graph of elements that will process frames.

        Registers methods to produce source, transform and sink elements and to assemble
        those elements in a directed acyclic graph. Also establishes an event loop.
        """
        self._registry: dict[str, Pad | Element] = {}
        self.graph: dict[Pad, set[Pad]] = {}
        self.loop = asyncio.get_event_loop()
        self.__loop_counter = 0
        self.sinks: dict[str, SinkElement] = {}
        self.elements: list[Element] = []

    def __getitem__(self, name):
        """return a pipeline element or pad by name"""
        return self._registry[name]

    def insert(
        self,
        *elements: Element,
        link_map: dict[str | SinkPad, str | SourcePad] | None = None,
    ) -> Pipeline:
        """Insert element(s) into the pipeline.

        Args:
            *elements:
                Iterable[Element], the ordered elements to insert into the pipeline
            link_map:
                dict[str | SinkPad, str | SourcePad] | None,
                a mapping of source pad to sink pad names to link

        Returns:
            Pipeline, the pipeline with the elements inserted
        """

        for element in elements:
            assert isinstance(
                element, ElementLike
            ), f"Element {element} is not an instance of a sgn.Element"
            assert (
                element.name not in self._registry
            ), f"Element name '{element.name}' is already in use in this pipeline"
            self._registry[element.name] = element
            for pad in element.pad_list:
                assert (
                    pad.name not in self._registry
                ), f"Pad name '{pad.name}' is already in use in this pipeline"
                self._registry[pad.name] = pad
            if isinstance(element, SinkElement):
                self.sinks[element.name] = element
            self.graph.update(element.graph)
            self.elements.append(element)
        if link_map is not None:
            self.link(link_map)
        return self

    def link(self, link_map: dict[str | SinkPad, str | SourcePad]) -> Pipeline:
        """Link pads in a pipeline.

        Args:
            link_map:
                dict[str, str], a mapping of sink pad to source pad names to link, note
                that the keys of the dictionary are the source pad names and the
                values are the sink pad names, so that: the data flows from value -> key
        """
        for sink_pad_name, source_pad_name in link_map.items():
            if isinstance(sink_pad_name, str):
                sink_pad = self._registry[sink_pad_name]
            else:
                sink_pad = sink_pad_name
            if isinstance(source_pad_name, str):
                source_pad = self._registry[source_pad_name]
            else:
                source_pad = source_pad_name

            assert isinstance(sink_pad, SinkPad), f"not a sink pad: {sink_pad}"
            assert isinstance(source_pad, SourcePad), f"not a source pad: {source_pad}"

            graph = sink_pad.link(source_pad)
            self.graph.update(graph)

        return self

    def connect(
        self,
        source: Element | ElementGroup | PadSelection,
        sink: Element | ElementGroup | PadSelection,
        link_map: dict[str, str] | None = None,
    ) -> Pipeline:
        """Connect elements, ElementGroups, or PadSelections using implicit linking.

        This method supports multiple linking patterns:
        1. Element-to-element linking with implicit pad matching:
           pipeline.connect(source_element, sink_element)
        2. Element-to-element linking with explicit mapping:
           pipeline.connect(source_element, sink_element, link_map={"sink": "source"})
        3. ElementGroup linking (supports elements and pad selections):
           pipeline.connect(group(s1, s2), sink_element)
           pipeline.connect(group(source, select(element, "pad1")), sink)
        4. Direct PadSelection linking:
           pipeline.connect(select(source, "pad1"), sink_element)

        Implicit linking strategies (when no link_map provided):
        1. Exact match: Connect when source and sink pad names are identical
        2. Partial match: Connect all matching pad names (ignores non-matching pads)
        3. N-to-1: Single sink pad, connect all source pads to it
        4. 1-to-N: Single source pad, connect to all sink pads

        Args:
            source:
                Element, ElementGroup, or PadSelection, the source for linking
            sink:
                Element, ElementGroup, or PadSelection, the sink for linking
            link_map:
                dict[str, str], optional, explicit mapping of sink pad names to
                source pad names.

        Returns:
            Pipeline: The pipeline with the new links added.

        Raises:
            ValueError: If implicit linking strategy is ambiguous.
            TypeError: If arguments are of unexpected types.
        """
        if isinstance(source, SinkElement):
            msg = f"Source '{source.name}' is a SinkElement and has no source pads"
            raise ValueError(msg)
        if isinstance(sink, SourceElement):
            msg = f"Sink '{sink.name}' is a SourceElement and has no sink pads"
            raise ValueError(msg)

        source_pads = source.srcs
        sink_pads = sink.snks

        # Ensure all elements are inserted in pipeline
        def ensure_elements_inserted(obj):
            if isinstance(obj, (SourceElement, TransformElement, SinkElement)):
                if obj.name not in self._registry:
                    self.insert(obj)
            elif isinstance(obj, ElementGroup):
                for element in obj.elements:
                    if element.name not in self._registry:
                        self.insert(element)
            elif isinstance(obj, PadSelection):
                if obj.element.name not in self._registry:
                    self.insert(obj.element)

        ensure_elements_inserted(source)
        ensure_elements_inserted(sink)

        return self._connect_pads(source_pads, sink_pads, link_map)

    def _connect_pads(
        self,
        source_pads: dict[str, SourcePad],
        sink_pads: dict[str, SinkPad],
        link_map: dict[str, str] | None = None,
    ) -> Pipeline:
        """Connect source and sink pads using implicit linking strategies."""
        resolved_link_map: dict[str | SinkPad, str | SourcePad]
        source_pad_names = set(source_pads.keys())
        sink_pad_names = set(sink_pads.keys())

        # Determine linking strategy
        if link_map:
            # Explicit mapping provided
            resolved_link_map = {}
            for sink_pad_name, source_pad_name in link_map.items():
                if sink_pad_name not in sink_pads:
                    msg = f"sink pad '{sink_pad_name}' not found"
                    raise KeyError(msg)
                if source_pad_name not in source_pads:
                    msg = f"source pad '{source_pad_name}' not found"
                    raise KeyError(msg)

                sink_pad = sink_pads[sink_pad_name]
                source_pad = source_pads[source_pad_name]
                resolved_link_map[sink_pad] = source_pad

            return self.link(resolved_link_map)

        elif source_pad_names == sink_pad_names:
            # One-to-one linking strategy: same pad names
            resolved_link_map = {
                sink_pads[name]: source_pads[name] for name in source_pad_names
            }
            return self.link(resolved_link_map)

        elif source_pad_names & sink_pad_names:
            # Partial matching strategy: connect all matching pad names
            matching_names = source_pad_names & sink_pad_names
            resolved_link_map = {
                sink_pads[name]: source_pads[name] for name in matching_names
            }
            return self.link(resolved_link_map)

        elif len(sink_pad_names) == 1:
            # N-to-one linking strategy
            sink_pad = next(iter(sink_pads.values()))
            for source_pad in source_pads.values():
                self.link({sink_pad: source_pad})
            return self

        elif len(source_pad_names) == 1:
            # One-to-N linking strategy
            source_pad = next(iter(source_pads.values()))
            resolved_link_map = {
                sink_pad: source_pad for sink_pad in sink_pads.values()
            }
            return self.link(resolved_link_map)

        else:
            msg = (
                "unable to determine unambiguous linking strategy from source "
                "and sink pads. an explicit link_map is required."
            )
            raise ValueError(msg)

    def nodes(self, pads: bool = True, intra: bool = False) -> tuple[str, ...]:
        """Get the nodes in the pipeline.

        Args:
            pads:
                bool, whether to include pads in the graph. If True, the graph will only
                consist of pads. If False, the graph will consist only of elements.
            intra:
                bool, default False, whether or not to include intra-element edges,
                e.g. from an element's sink pads to its source pads. In this case,
                whether to include Internal Pads in the graph.

        Returns:
            list[str], the nodes in the pipeline
        """
        if pads:
            pad_types = [SinkPad, SourcePad]
            if intra:
                pad_types.append(InternalPad)

            return tuple(
                sorted(
                    [
                        pad.name
                        for pad in self._registry.values()
                        if isinstance(pad, tuple(pad_types))
                    ]
                )
            )
        element_types = [TransformElement, SinkElement, SourceElement]
        return tuple(
            sorted(
                [
                    element.name
                    for element in self._registry.values()
                    if isinstance(element, tuple(element_types))
                ]
            )
        )

    def edges(
        self, pads: bool = True, intra: bool = False
    ) -> tuple[tuple[str, str], ...]:
        """Get the edges in the pipeline.

        Args:
            pads:
                bool, whether to include pads in the graph. If True, the graph will only
                consist of pads. If False, the graph will consist only of elements.
            intra:
                bool, default False, whether or not to include intra-element edges, e.g.
                from an element's sink pads to its source pads

        Returns:
        """
        edges = set()
        for target, sources in self.graph.items():
            for source in sources:
                if not intra and isinstance(source, (SinkPad, InternalPad)):
                    continue

                if pads:
                    edges.add((source.name, target.name))
                else:
                    source_element = source.element
                    target_element = target.element
                    edges.add((source_element.name, target_element.name))
        return tuple(sorted(edges))

    def to_graph(self, label: str | None = None):
        """graphviz.DiGraph representation of pipeline

        Args:
            label:
                str, label for the graph

        Returns:
            DiGraph, the graph object
        """
        return visualize(self, label=label)

    def to_dot(self, label: str | None = None) -> str:
        """Convert the pipeline to a graph using graphviz.

        Args:
            label:
                str, label for the graph

        Returns:
            str, the graph representation of the pipeline
        """
        return visualize(self, label=label).source

    def visualize(self, path: str, label: str | None = None) -> None:
        """Convert the pipeline to a graph using graphviz, then render into a visual
        file.

        Args:
            path:
                str, the relative or full path to the file to write the graph to
            label:
                str, label for the graph
        """
        visualize(self, label=label, path=Path(path))

    @async_sgn_mem_profile(logger)
    async def __execute_graph_loop(self) -> None:
        async def _partial(node):
            try:
                return await node()
            except Exception as e:
                exc_type, exc_value, exc_traceback = sys.exc_info()
                msg = f"(from pad '{node.name}'): {exc_value}."
                raise exc_type(msg) from e

        self.__loop_counter += 1
        logger.info("Executing graph loop %s:", self.__loop_counter)
        ts = graphlib.TopologicalSorter(self.graph)
        ts.prepare()
        while ts.is_active():
            # concurrently execute the next batch of ready nodes
            nodes = ts.get_ready()
            tasks = [
                self.loop.create_task(_partial(node)) for node in nodes  # type: ignore # noqa: E501
            ]
            await asyncio.gather(*tasks)
            ts.done(*nodes)

    async def _execute_graphs(self) -> None:
        """Async graph execution function."""

        while not all(sink.at_eos for sink in self.sinks.values()):
            await self.__execute_graph_loop()

    def check(self) -> None:
        """Check that pipeline elements are connected.

        Throws an RuntimeError exception if unconnected pads are
        encountered.

        """
        if not self.sinks:
            msg = "Pipeline contains no sink elements."
            raise RuntimeError(msg)
        for element in self.elements:
            for source_pad in element.source_pads:
                if not source_pad.is_linked:
                    msg = f"Source pad not linked: {source_pad}"
                    raise RuntimeError(msg)
            for sink_pad in element.sink_pads:
                if not sink_pad.is_linked:
                    msg = f"Sink pad not linked: {sink_pad}"
                    raise RuntimeError(msg)

    def run(self, auto_parallelize: bool = True) -> None:
        """Run the pipeline until End Of Stream (EOS)

        Args:
            auto_parallelize: If True (default), automatically detects if
            parallelization is needed and handles it transparently. If False,
            runs the pipeline normally without parallelization detection.
        """
        configure_sgn_logging()
        if auto_parallelize:
            # Import here to avoid circular imports
            from sgn.subprocess import Parallelize

            # Use automatic parallelization detection
            if Parallelize.needs_parallelization(self):
                with Parallelize(self) as parallelize:
                    parallelize.run()
                return

        # Run normally without parallelization
        self.check()
        if not self.loop.is_running():
            self.loop.run_until_complete(self._execute_graphs())
        else:
            """If the event loop is running, e.g., running in a Jupyter
            Notebook, run the pipeline in a forked thread.
            """
            import threading

            def _run_in_fork(pipeline):
                pipeline.loop = asyncio.new_event_loop()
                asyncio.set_event_loop(pipeline.loop)
                pipeline.loop.run_until_complete(pipeline._execute_graphs())
                pipeline.loop.close()

            thread = threading.Thread(target=_run_in_fork, args=(self,))
            thread.start()
            thread.join()

__getitem__(name)

return a pipeline element or pad by name

Source code in sgn/apps.py
def __getitem__(self, name):
    """return a pipeline element or pad by name"""
    return self._registry[name]

__init__()

Class to establish and execute a graph of elements that will process frames.

Registers methods to produce source, transform and sink elements and to assemble those elements in a directed acyclic graph. Also establishes an event loop.

Source code in sgn/apps.py
def __init__(self) -> None:
    """Class to establish and execute a graph of elements that will process frames.

    Registers methods to produce source, transform and sink elements and to assemble
    those elements in a directed acyclic graph. Also establishes an event loop.
    """
    self._registry: dict[str, Pad | Element] = {}
    self.graph: dict[Pad, set[Pad]] = {}
    self.loop = asyncio.get_event_loop()
    self.__loop_counter = 0
    self.sinks: dict[str, SinkElement] = {}
    self.elements: list[Element] = []

check()

Check that pipeline elements are connected.

Throws an RuntimeError exception if unconnected pads are encountered.

Source code in sgn/apps.py
def check(self) -> None:
    """Check that pipeline elements are connected.

    Throws an RuntimeError exception if unconnected pads are
    encountered.

    """
    if not self.sinks:
        msg = "Pipeline contains no sink elements."
        raise RuntimeError(msg)
    for element in self.elements:
        for source_pad in element.source_pads:
            if not source_pad.is_linked:
                msg = f"Source pad not linked: {source_pad}"
                raise RuntimeError(msg)
        for sink_pad in element.sink_pads:
            if not sink_pad.is_linked:
                msg = f"Sink pad not linked: {sink_pad}"
                raise RuntimeError(msg)

connect(source, sink, link_map=None)

Connect elements, ElementGroups, or PadSelections using implicit linking.

This method supports multiple linking patterns: 1. Element-to-element linking with implicit pad matching: pipeline.connect(source_element, sink_element) 2. Element-to-element linking with explicit mapping: pipeline.connect(source_element, sink_element, link_map={"sink": "source"}) 3. ElementGroup linking (supports elements and pad selections): pipeline.connect(group(s1, s2), sink_element) pipeline.connect(group(source, select(element, "pad1")), sink) 4. Direct PadSelection linking: pipeline.connect(select(source, "pad1"), sink_element)

Implicit linking strategies (when no link_map provided): 1. Exact match: Connect when source and sink pad names are identical 2. Partial match: Connect all matching pad names (ignores non-matching pads) 3. N-to-1: Single sink pad, connect all source pads to it 4. 1-to-N: Single source pad, connect to all sink pads

Parameters:

Name Type Description Default
source Element | ElementGroup | PadSelection

Element, ElementGroup, or PadSelection, the source for linking

required
sink Element | ElementGroup | PadSelection

Element, ElementGroup, or PadSelection, the sink for linking

required
link_map dict[str, str] | None

dict[str, str], optional, explicit mapping of sink pad names to source pad names.

None

Returns:

Name Type Description
Pipeline Pipeline

The pipeline with the new links added.

Raises:

Type Description
ValueError

If implicit linking strategy is ambiguous.

TypeError

If arguments are of unexpected types.

Source code in sgn/apps.py
def connect(
    self,
    source: Element | ElementGroup | PadSelection,
    sink: Element | ElementGroup | PadSelection,
    link_map: dict[str, str] | None = None,
) -> Pipeline:
    """Connect elements, ElementGroups, or PadSelections using implicit linking.

    This method supports multiple linking patterns:
    1. Element-to-element linking with implicit pad matching:
       pipeline.connect(source_element, sink_element)
    2. Element-to-element linking with explicit mapping:
       pipeline.connect(source_element, sink_element, link_map={"sink": "source"})
    3. ElementGroup linking (supports elements and pad selections):
       pipeline.connect(group(s1, s2), sink_element)
       pipeline.connect(group(source, select(element, "pad1")), sink)
    4. Direct PadSelection linking:
       pipeline.connect(select(source, "pad1"), sink_element)

    Implicit linking strategies (when no link_map provided):
    1. Exact match: Connect when source and sink pad names are identical
    2. Partial match: Connect all matching pad names (ignores non-matching pads)
    3. N-to-1: Single sink pad, connect all source pads to it
    4. 1-to-N: Single source pad, connect to all sink pads

    Args:
        source:
            Element, ElementGroup, or PadSelection, the source for linking
        sink:
            Element, ElementGroup, or PadSelection, the sink for linking
        link_map:
            dict[str, str], optional, explicit mapping of sink pad names to
            source pad names.

    Returns:
        Pipeline: The pipeline with the new links added.

    Raises:
        ValueError: If implicit linking strategy is ambiguous.
        TypeError: If arguments are of unexpected types.
    """
    if isinstance(source, SinkElement):
        msg = f"Source '{source.name}' is a SinkElement and has no source pads"
        raise ValueError(msg)
    if isinstance(sink, SourceElement):
        msg = f"Sink '{sink.name}' is a SourceElement and has no sink pads"
        raise ValueError(msg)

    source_pads = source.srcs
    sink_pads = sink.snks

    # Ensure all elements are inserted in pipeline
    def ensure_elements_inserted(obj):
        if isinstance(obj, (SourceElement, TransformElement, SinkElement)):
            if obj.name not in self._registry:
                self.insert(obj)
        elif isinstance(obj, ElementGroup):
            for element in obj.elements:
                if element.name not in self._registry:
                    self.insert(element)
        elif isinstance(obj, PadSelection):
            if obj.element.name not in self._registry:
                self.insert(obj.element)

    ensure_elements_inserted(source)
    ensure_elements_inserted(sink)

    return self._connect_pads(source_pads, sink_pads, link_map)

edges(pads=True, intra=False)

Get the edges in the pipeline.

Parameters:

Name Type Description Default
pads bool

bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements.

True
intra bool

bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads

False

Returns:

Source code in sgn/apps.py
def edges(
    self, pads: bool = True, intra: bool = False
) -> tuple[tuple[str, str], ...]:
    """Get the edges in the pipeline.

    Args:
        pads:
            bool, whether to include pads in the graph. If True, the graph will only
            consist of pads. If False, the graph will consist only of elements.
        intra:
            bool, default False, whether or not to include intra-element edges, e.g.
            from an element's sink pads to its source pads

    Returns:
    """
    edges = set()
    for target, sources in self.graph.items():
        for source in sources:
            if not intra and isinstance(source, (SinkPad, InternalPad)):
                continue

            if pads:
                edges.add((source.name, target.name))
            else:
                source_element = source.element
                target_element = target.element
                edges.add((source_element.name, target_element.name))
    return tuple(sorted(edges))

insert(*elements, link_map=None)

Insert element(s) into the pipeline.

Parameters:

Name Type Description Default
*elements Element

Iterable[Element], the ordered elements to insert into the pipeline

()
link_map dict[str | SinkPad, str | SourcePad] | None

dict[str | SinkPad, str | SourcePad] | None, a mapping of source pad to sink pad names to link

None

Returns:

Type Description
Pipeline

Pipeline, the pipeline with the elements inserted

Source code in sgn/apps.py
def insert(
    self,
    *elements: Element,
    link_map: dict[str | SinkPad, str | SourcePad] | None = None,
) -> Pipeline:
    """Insert element(s) into the pipeline.

    Args:
        *elements:
            Iterable[Element], the ordered elements to insert into the pipeline
        link_map:
            dict[str | SinkPad, str | SourcePad] | None,
            a mapping of source pad to sink pad names to link

    Returns:
        Pipeline, the pipeline with the elements inserted
    """

    for element in elements:
        assert isinstance(
            element, ElementLike
        ), f"Element {element} is not an instance of a sgn.Element"
        assert (
            element.name not in self._registry
        ), f"Element name '{element.name}' is already in use in this pipeline"
        self._registry[element.name] = element
        for pad in element.pad_list:
            assert (
                pad.name not in self._registry
            ), f"Pad name '{pad.name}' is already in use in this pipeline"
            self._registry[pad.name] = pad
        if isinstance(element, SinkElement):
            self.sinks[element.name] = element
        self.graph.update(element.graph)
        self.elements.append(element)
    if link_map is not None:
        self.link(link_map)
    return self

Link pads in a pipeline.

Parameters:

Name Type Description Default
link_map dict[str | SinkPad, str | SourcePad]

dict[str, str], a mapping of sink pad to source pad names to link, note that the keys of the dictionary are the source pad names and the values are the sink pad names, so that: the data flows from value -> key

required
Source code in sgn/apps.py
def link(self, link_map: dict[str | SinkPad, str | SourcePad]) -> Pipeline:
    """Link pads in a pipeline.

    Args:
        link_map:
            dict[str, str], a mapping of sink pad to source pad names to link, note
            that the keys of the dictionary are the source pad names and the
            values are the sink pad names, so that: the data flows from value -> key
    """
    for sink_pad_name, source_pad_name in link_map.items():
        if isinstance(sink_pad_name, str):
            sink_pad = self._registry[sink_pad_name]
        else:
            sink_pad = sink_pad_name
        if isinstance(source_pad_name, str):
            source_pad = self._registry[source_pad_name]
        else:
            source_pad = source_pad_name

        assert isinstance(sink_pad, SinkPad), f"not a sink pad: {sink_pad}"
        assert isinstance(source_pad, SourcePad), f"not a source pad: {source_pad}"

        graph = sink_pad.link(source_pad)
        self.graph.update(graph)

    return self

nodes(pads=True, intra=False)

Get the nodes in the pipeline.

Parameters:

Name Type Description Default
pads bool

bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements.

True
intra bool

bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads. In this case, whether to include Internal Pads in the graph.

False

Returns:

Type Description
tuple[str, ...]

list[str], the nodes in the pipeline

Source code in sgn/apps.py
def nodes(self, pads: bool = True, intra: bool = False) -> tuple[str, ...]:
    """Get the nodes in the pipeline.

    Args:
        pads:
            bool, whether to include pads in the graph. If True, the graph will only
            consist of pads. If False, the graph will consist only of elements.
        intra:
            bool, default False, whether or not to include intra-element edges,
            e.g. from an element's sink pads to its source pads. In this case,
            whether to include Internal Pads in the graph.

    Returns:
        list[str], the nodes in the pipeline
    """
    if pads:
        pad_types = [SinkPad, SourcePad]
        if intra:
            pad_types.append(InternalPad)

        return tuple(
            sorted(
                [
                    pad.name
                    for pad in self._registry.values()
                    if isinstance(pad, tuple(pad_types))
                ]
            )
        )
    element_types = [TransformElement, SinkElement, SourceElement]
    return tuple(
        sorted(
            [
                element.name
                for element in self._registry.values()
                if isinstance(element, tuple(element_types))
            ]
        )
    )

run(auto_parallelize=True)

Run the pipeline until End Of Stream (EOS)

Parameters:

Name Type Description Default
auto_parallelize bool

If True (default), automatically detects if

True
Source code in sgn/apps.py
def run(self, auto_parallelize: bool = True) -> None:
    """Run the pipeline until End Of Stream (EOS)

    Args:
        auto_parallelize: If True (default), automatically detects if
        parallelization is needed and handles it transparently. If False,
        runs the pipeline normally without parallelization detection.
    """
    configure_sgn_logging()
    if auto_parallelize:
        # Import here to avoid circular imports
        from sgn.subprocess import Parallelize

        # Use automatic parallelization detection
        if Parallelize.needs_parallelization(self):
            with Parallelize(self) as parallelize:
                parallelize.run()
            return

    # Run normally without parallelization
    self.check()
    if not self.loop.is_running():
        self.loop.run_until_complete(self._execute_graphs())
    else:
        """If the event loop is running, e.g., running in a Jupyter
        Notebook, run the pipeline in a forked thread.
        """
        import threading

        def _run_in_fork(pipeline):
            pipeline.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(pipeline.loop)
            pipeline.loop.run_until_complete(pipeline._execute_graphs())
            pipeline.loop.close()

        thread = threading.Thread(target=_run_in_fork, args=(self,))
        thread.start()
        thread.join()

to_dot(label=None)

Convert the pipeline to a graph using graphviz.

Parameters:

Name Type Description Default
label str | None

str, label for the graph

None

Returns:

Type Description
str

str, the graph representation of the pipeline

Source code in sgn/apps.py
def to_dot(self, label: str | None = None) -> str:
    """Convert the pipeline to a graph using graphviz.

    Args:
        label:
            str, label for the graph

    Returns:
        str, the graph representation of the pipeline
    """
    return visualize(self, label=label).source

to_graph(label=None)

graphviz.DiGraph representation of pipeline

Parameters:

Name Type Description Default
label str | None

str, label for the graph

None

Returns:

Type Description

DiGraph, the graph object

Source code in sgn/apps.py
def to_graph(self, label: str | None = None):
    """graphviz.DiGraph representation of pipeline

    Args:
        label:
            str, label for the graph

    Returns:
        DiGraph, the graph object
    """
    return visualize(self, label=label)

visualize(path, label=None)

Convert the pipeline to a graph using graphviz, then render into a visual file.

Parameters:

Name Type Description Default
path str

str, the relative or full path to the file to write the graph to

required
label str | None

str, label for the graph

None
Source code in sgn/apps.py
def visualize(self, path: str, label: str | None = None) -> None:
    """Convert the pipeline to a graph using graphviz, then render into a visual
    file.

    Args:
        path:
            str, the relative or full path to the file to write the graph to
        label:
            str, label for the graph
    """
    visualize(self, label=label, path=Path(path))