Pipeline - Orchestrating SGN Task Graphs¶
The Pipeline class is the central orchestrator for SGN applications. It manages the directed acyclic graph (DAG) of elements, handles pad connections, and executes your data processing pipeline asynchronously.
Overview¶
A Pipeline coordinates three key responsibilities:
- Element Management - Register and track elements in your graph
- Connection Management - Link pads between elements to define data flow
- Execution - Run the async event loop to process frames through the graph
Quick Start: Your First Pipeline¶
Here's a minimal working pipeline:
from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline
class HelloSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], **kwargs)
self.count = 0
def new(self, pad):
self.count += 1
if self.count > 3:
return Frame(EOS=True)
return Frame(data=f"Hello {self.count}")
class PrinterSink(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["in"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
return
print(frame.data)
# Build and run pipeline
p = Pipeline()
source = HelloSource()
sink = PrinterSink()
p.insert(source, sink)
p.link({sink.snks["in"]: source.srcs["out"]})
p.run()
# Output:
# Hello 1
# Hello 2
# Hello 3
Connecting Elements: Two Approaches¶
Approach 1: insert() + link() (Explicit)¶
The traditional approach gives you fine-grained control over connections:
from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline
class Counter(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["numbers"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 5:
return Frame(EOS=True)
return Frame(data=self.n)
class Doubler(TransformElement):
def __init__(self, **kwargs):
super().__init__(
source_pad_names=["doubled"],
sink_pad_names=["input"],
**kwargs
)
self.value = None
def pull(self, pad, frame):
self.value = frame
def new(self, pad):
if self.value.EOS:
return Frame(EOS=True)
return Frame(data=self.value.data * 2)
class Printer(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["data"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
return
print(f"Result: {frame.data}")
# Explicit connection
p = Pipeline()
source = Counter()
transform = Doubler()
sink = Printer()
# Insert all elements
p.insert(source, transform, sink)
# Link pads explicitly (data flows right to left in dict)
p.link({
transform.snks["input"]: source.srcs["numbers"], # source -> transform
sink.snks["data"]: transform.srcs["doubled"] # transform -> sink
})
p.run()
# Output:
# Result: 2
# Result: 4
# Result: 6
# Result: 8
# Result: 10
link() Dictionary Format
In link_map, data flows from value to key:
Approach 2: connect() (Automatic Matching)¶
The modern connect() method automatically matches pads by name:
from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline
class NumberSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["data"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 3:
return Frame(EOS=True)
return Frame(data=self.n)
class Squarer(TransformElement):
def __init__(self, **kwargs):
super().__init__(
source_pad_names=["data"],
sink_pad_names=["data"],
**kwargs
)
self.input = None
def pull(self, pad, frame):
self.input = frame
def new(self, pad):
if self.input.EOS:
return Frame(EOS=True)
return Frame(data=self.input.data ** 2)
class ResultSink(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["data"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
return
print(f"Squared: {frame.data}")
# Automatic connection (all pads named "data")
p = Pipeline()
source = NumberSource()
transform = Squarer()
sink = ResultSink()
p.connect(source, transform) # Matches "data" pads
p.connect(transform, sink) # Matches "data" pads
p.run()
# Output:
# Squared: 1
# Squared: 4
# Squared: 9
When to Use connect()
Use connect() when:
- Pad names match between elements
- You want cleaner, more readable code
- Building simple linear pipelines
Advanced Connection Patterns¶
Multiple Source Pads¶
Elements can have multiple output pads for different data streams:
from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline
class DualSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["evens", "odds"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 6:
return Frame(EOS=True)
# Different output based on pad
if pad == self.srcs["evens"]:
return Frame(data=self.n if self.n % 2 == 0 else None)
else: # odds pad
return Frame(data=self.n if self.n % 2 == 1 else None)
class NumberPrinter(SinkElement):
def __init__(self, label, **kwargs):
super().__init__(sink_pad_names=["numbers"], **kwargs)
self.label = label
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
return
if frame.data is not None:
print(f"{self.label}: {frame.data}")
p = Pipeline()
source = DualSource()
even_sink = NumberPrinter("Even")
odd_sink = NumberPrinter("Odd")
p.insert(source, even_sink, odd_sink)
p.link({
even_sink.snks["numbers"]: source.srcs["evens"],
odd_sink.snks["numbers"]: source.srcs["odds"]
})
p.run()
# Output:
# Odd: 1
# Even: 2
# Odd: 3
# Even: 4
# Odd: 5
# Even: 6
Fan-Out: One Source, Multiple Sinks¶
Multiple sinks can connect to the same source pad:
from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline
class DataSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["data"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 3:
return Frame(EOS=True)
return Frame(data=self.n * 10)
class Sink1(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["in"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
return
print(f"Sink1 received: {frame.data}")
class Sink2(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["in"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
return
print(f"Sink2 received: {frame.data}")
p = Pipeline()
source = DataSource()
sink1 = Sink1()
sink2 = Sink2()
p.insert(source, sink1, sink2)
p.link({
sink1.snks["in"]: source.srcs["data"], # Fan-out: same source
sink2.snks["in"]: source.srcs["data"] # to multiple sinks
})
p.run()
# Output:
# Sink1 received: 10
# Sink2 received: 10
# Sink1 received: 20
# Sink2 received: 20
# Sink1 received: 30
# Sink2 received: 30
Pipeline Lifecycle¶
1. Build Phase¶
from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline
class SimpleSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], **kwargs)
def new(self, pad):
return Frame(data=1)
class SimpleTransform(TransformElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], sink_pad_names=["in"], **kwargs)
self.data = None
def pull(self, pad, frame):
self.data = frame
def new(self, pad):
return self.data
class SimpleSink(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["in"], **kwargs)
def pull(self, pad, frame):
pass
p = Pipeline()
# Add elements
source = SimpleSource()
transform = SimpleTransform()
sink = SimpleSink()
p.insert(source, transform, sink)
# Define connections
p.link({transform.snks["in"]: source.srcs["out"]})
p.link({sink.snks["in"]: transform.srcs["out"]})
2. Execution Phase¶
from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline
class QuickSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 2:
return Frame(EOS=True)
return Frame(data=self.n)
class QuickSink(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["in"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
p = Pipeline()
p.insert(QuickSource(), QuickSink())
p.link({p.elements[1].snks["in"]: p.elements[0].srcs["out"]})
# Run the pipeline (blocking until all sinks reach EOS)
p.run()
3. Accessing Results¶
from sgn.base import SourceElement, SinkElement, Frame
from sgn.apps import Pipeline
class DataSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 3:
return Frame(EOS=True)
return Frame(data=self.n * 10)
class CollectorSink(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["in"], **kwargs)
self.collected_data = []
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
return
self.collected_data.append(frame.data)
p = Pipeline()
source = DataSource()
sink = CollectorSink(name="sink_name")
p.insert(source, sink)
p.link({sink.snks["in"]: source.srcs["out"]})
p.run()
# Access collected data after run()
sink = p["sink_name"] # Get element by name
results = sink.collected_data
print(f"Collected: {results}") # [10, 20, 30]
Accessing Pipeline Elements¶
Elements and pads are accessible by name:
from sgn.base import SourceElement, Frame
from sgn.apps import Pipeline
class MySource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], **kwargs)
def new(self, pad):
return Frame(data=1)
p = Pipeline()
source = MySource(name="my_source")
p.insert(source)
# Access element by name
elem = p["my_source"]
# Access pad by full name
pad = p["my_source:src:out"]
Common Patterns¶
Pattern 1: Linear Pipeline¶
from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline
class Source(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 2:
return Frame(EOS=True)
return Frame(data=self.n)
class Transform(TransformElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], sink_pad_names=["in"], **kwargs)
self.val = None
def pull(self, pad, frame):
self.val = frame
def new(self, pad):
if self.val.EOS:
return Frame(EOS=True)
return Frame(data=self.val.data * 2)
class Sink(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["in"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
p = Pipeline()
source = Source()
transform1 = Transform()
transform2 = Transform()
sink = Sink()
p.insert(source, transform1, transform2, sink)
p.link({
transform1.snks["in"]: source.srcs["out"],
transform2.snks["in"]: transform1.srcs["out"],
sink.snks["in"]: transform2.srcs["out"]
})
p.run()
Pattern 2: Using connect() for Linear Pipelines¶
from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline
class DataSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["data"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 2:
return Frame(EOS=True)
return Frame(data=self.n)
class DataTransform(TransformElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["data"], sink_pad_names=["data"], **kwargs)
self.val = None
def pull(self, pad, frame):
self.val = frame
def new(self, pad):
if self.val.EOS:
return Frame(EOS=True)
return Frame(data=self.val.data + 1)
class DataSink(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["data"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
p = Pipeline()
source = DataSource()
transform1 = DataTransform()
transform2 = DataTransform()
sink = DataSink()
p.connect(source, transform1)
p.connect(transform1, transform2)
p.connect(transform2, sink)
p.run()
Pattern 3: Insert with Inline link_map¶
from sgn.base import SourceElement, TransformElement, SinkElement, Frame
from sgn.apps import Pipeline
class NumSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], **kwargs)
self.n = 0
def new(self, pad):
self.n += 1
if self.n > 2:
return Frame(EOS=True)
return Frame(data=self.n)
class NumTransform(TransformElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], sink_pad_names=["in"], **kwargs)
self.val = None
def pull(self, pad, frame):
self.val = frame
def new(self, pad):
if self.val.EOS:
return Frame(EOS=True)
return Frame(data=self.val.data * 3)
class NumSink(SinkElement):
def __init__(self, **kwargs):
super().__init__(sink_pad_names=["in"], **kwargs)
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
p = Pipeline()
source = NumSource()
transform = NumTransform()
sink = NumSink()
p.insert(
source, transform, sink,
link_map={
transform.snks["in"]: source.srcs["out"],
sink.snks["in"]: transform.srcs["out"]
}
)
p.run()
Error Handling¶
The pipeline validates connections and will raise errors for:
- Duplicate element names
- Invalid pad types (e.g., connecting two source pads)
- Missing elements (accessing non-existent elements)
- Circular dependencies (detected during execution)
from sgn.base import SourceElement, Frame
from sgn.apps import Pipeline
class TestSource(SourceElement):
def __init__(self, **kwargs):
super().__init__(source_pad_names=["out"], **kwargs)
def new(self, pad):
return Frame(data=1)
# This will raise an error: duplicate names
p = Pipeline()
p.insert(TestSource(name="source"))
try:
p.insert(TestSource(name="source")) # AssertionError!
except AssertionError as e:
print(f"Caught error: {e}")
Visualization¶
Generate a visual graph of your pipeline:
from sgn.apps import Pipeline
from sgn.visualize import visualize
p = Pipeline()
p.insert(source, transform, sink)
p.link({...})
# Generate graphviz diagram
visualize(p, filename="my_pipeline")
# Creates my_pipeline.pdf
Requires Graphviz
Install graphviz: pip install sgn[visualize]
Related Tutorials¶
- Connection Basics - Detailed guide to
connect()vsinsert() - Connection Strategies - Automatic pad matching patterns
- Element Grouping - Working with element groups
- Pipeline Visualization - Creating pipeline diagrams
API Reference¶
sgn.apps
¶
Pipeline class and related utilities to establish and execute a graph of element tasks.
Pipeline
¶
A Pipeline is essentially a directed acyclic graph of tasks that process frames.
These tasks are grouped using Pads and Elements. The Pipeline class is responsible for registering methods to produce source, transform and sink elements and to assemble those elements in a directed acyclic graph. It also establishes an event loop to execute the graph asynchronously.
Source code in sgn/apps.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 | |
__getitem__(name)
¶
__init__()
¶
Class to establish and execute a graph of elements that will process frames.
Registers methods to produce source, transform and sink elements and to assemble those elements in a directed acyclic graph. Also establishes an event loop.
Source code in sgn/apps.py
check()
¶
Check that pipeline elements are connected.
Throws an RuntimeError exception if unconnected pads are encountered.
Source code in sgn/apps.py
connect(source, sink, link_map=None)
¶
Connect elements, ElementGroups, or PadSelections using implicit linking.
This method supports multiple linking patterns: 1. Element-to-element linking with implicit pad matching: pipeline.connect(source_element, sink_element) 2. Element-to-element linking with explicit mapping: pipeline.connect(source_element, sink_element, link_map={"sink": "source"}) 3. ElementGroup linking (supports elements and pad selections): pipeline.connect(group(s1, s2), sink_element) pipeline.connect(group(source, select(element, "pad1")), sink) 4. Direct PadSelection linking: pipeline.connect(select(source, "pad1"), sink_element)
Implicit linking strategies (when no link_map provided): 1. Exact match: Connect when source and sink pad names are identical 2. Partial match: Connect all matching pad names (ignores non-matching pads) 3. N-to-1: Single sink pad, connect all source pads to it 4. 1-to-N: Single source pad, connect to all sink pads
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
Element | ElementGroup | PadSelection
|
Element, ElementGroup, or PadSelection, the source for linking |
required |
sink
|
Element | ElementGroup | PadSelection
|
Element, ElementGroup, or PadSelection, the sink for linking |
required |
link_map
|
dict[str, str] | None
|
dict[str, str], optional, explicit mapping of sink pad names to source pad names. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
Pipeline
|
The pipeline with the new links added. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If implicit linking strategy is ambiguous. |
TypeError
|
If arguments are of unexpected types. |
Source code in sgn/apps.py
edges(pads=True, intra=False)
¶
Get the edges in the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pads
|
bool
|
bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements. |
True
|
intra
|
bool
|
bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads |
False
|
Returns:
Source code in sgn/apps.py
insert(*elements, link_map=None)
¶
Insert element(s) into the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*elements
|
Element
|
Iterable[Element], the ordered elements to insert into the pipeline |
()
|
link_map
|
dict[str | SinkPad, str | SourcePad] | None
|
dict[str | SinkPad, str | SourcePad] | None, a mapping of source pad to sink pad names to link |
None
|
Returns:
| Type | Description |
|---|---|
Pipeline
|
Pipeline, the pipeline with the elements inserted |
Source code in sgn/apps.py
link(link_map)
¶
Link pads in a pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
link_map
|
dict[str | SinkPad, str | SourcePad]
|
dict[str, str], a mapping of sink pad to source pad names to link, note that the keys of the dictionary are the source pad names and the values are the sink pad names, so that: the data flows from value -> key |
required |
Source code in sgn/apps.py
nodes(pads=True, intra=False)
¶
Get the nodes in the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pads
|
bool
|
bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements. |
True
|
intra
|
bool
|
bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads. In this case, whether to include Internal Pads in the graph. |
False
|
Returns:
| Type | Description |
|---|---|
tuple[str, ...]
|
list[str], the nodes in the pipeline |
Source code in sgn/apps.py
run(auto_parallelize=True)
¶
Run the pipeline until End Of Stream (EOS)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
auto_parallelize
|
bool
|
If True (default), automatically detects if |
True
|
Source code in sgn/apps.py
to_dot(label=None)
¶
Convert the pipeline to a graph using graphviz.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str | None
|
str, label for the graph |
None
|
Returns:
| Type | Description |
|---|---|
str
|
str, the graph representation of the pipeline |
Source code in sgn/apps.py
to_graph(label=None)
¶
graphviz.DiGraph representation of pipeline
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str | None
|
str, label for the graph |
None
|
Returns:
| Type | Description |
|---|---|
|
DiGraph, the graph object |
visualize(path, label=None)
¶
Convert the pipeline to a graph using graphviz, then render into a visual file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
str, the relative or full path to the file to write the graph to |
required |
label
|
str | None
|
str, label for the graph |
None
|