Your First Pipeline¶
This tutorial walks you through building SGN pipelines from scratch. Each section builds on the previous one, introducing one new concept at a time.
1. A Source and a Sink¶
Every pipeline needs at least a source (produces data) and a sink (consumes data). Let's build custom ones.
from dataclasses import dataclass
from sgn import SourceElement, SinkElement, SourcePad, SinkPad, Frame, Pipeline
@dataclass
class CountSource(SourceElement):
"""Produces frames counting from 1 to `limit`."""
limit: int = 3
def __post_init__(self):
super().__post_init__()
self.count = 0
def new(self, pad: SourcePad) -> Frame:
self.count += 1
return Frame(
data=self.count if self.count <= self.limit else None,
EOS=self.count > self.limit,
)
@dataclass
class PrintSink(SinkElement):
"""Prints each frame's data."""
def pull(self, pad: SinkPad, frame: Frame) -> None:
if frame.EOS:
self.mark_eos(pad)
return
print(f"{pad.pad_name}: {frame.data}")
src = CountSource(name="src", source_pad_names=["out"])
snk = PrintSink(name="snk", sink_pad_names=["out"])
p = Pipeline()
p.connect(src, snk)
p.run()
Output:
Key points:
- SourceElement subclasses implement
new(pad)and return aFrame. - SinkElement subclasses implement
pull(pad, frame)and callself.mark_eos(pad)whenframe.EOSisTrue. connect()automatically matches pads with the same name (outtoout).- The pipeline loops until every sink has received EOS.
2. Stopping the Pipeline (EOS)¶
The previous example set EOS=True on the frame when the count exceeded the
limit. This is the End of Stream protocol:
- A source sets
Frame(EOS=True)when it has no more data. - Each sink calls
self.mark_eos(pad)when it receives an EOS frame. - The pipeline stops when all sinks report EOS.
If your source never sends EOS, the pipeline runs forever. This is useful for streaming applications combined with signal handling.
3. Adding a Transform¶
A TransformElement sits between a source and a sink. It implements two
methods: pull() to receive data and new() to produce data.
from dataclasses import dataclass, field
from sgn import (
SourceElement, TransformElement, SinkElement,
SourcePad, SinkPad, Frame, Pipeline,
)
@dataclass
class CountSource(SourceElement):
limit: int = 3
def __post_init__(self):
super().__post_init__()
self.count = 0
def new(self, pad: SourcePad) -> Frame:
self.count += 1
return Frame(
data=self.count if self.count <= self.limit else None,
EOS=self.count > self.limit,
)
@dataclass
class DoubleTransform(TransformElement):
"""Doubles the data in each frame."""
in_frame: Frame = field(init=False, default_factory=Frame)
def pull(self, pad: SinkPad, frame: Frame) -> None:
self.in_frame = frame
def new(self, pad: SourcePad) -> Frame:
result = None if self.in_frame.data is None else self.in_frame.data * 2
return Frame(data=result, EOS=self.in_frame.EOS)
@dataclass
class CollectSinkCustom(SinkElement):
"""Collects data into a list."""
results: list = field(default_factory=list)
def pull(self, pad: SinkPad, frame: Frame) -> None:
if frame.EOS:
self.mark_eos(pad)
return
self.results.append(frame.data)
src = CountSource(name="src", source_pad_names=["H1"])
trn = DoubleTransform(name="trn", sink_pad_names=["H1"], source_pad_names=["H1"])
snk = CollectSinkCustom(name="snk", sink_pad_names=["H1"])
p = Pipeline()
p.connect(src, trn)
p.connect(trn, snk)
p.run()
assert snk.results == [2, 4, 6]
The execution order within a TransformElement is: sink pads (pull) ->
internal -> source pads (new). This guarantees pull() runs before
new().
4. Multiple Pads¶
Elements can have more than one pad. A source might produce multiple data streams, and a sink might consume multiple streams.
from dataclasses import dataclass, field
from sgn import SourceElement, SinkElement, SourcePad, SinkPad, Frame, Pipeline
@dataclass
class TwoChannelSource(SourceElement):
limit: int = 2
def __post_init__(self):
super().__post_init__()
self.count = 0
def new(self, pad: SourcePad) -> Frame:
self.count += 1
done = self.count > self.limit * len(self.source_pads)
# Use the pad's short name to vary the data
value = f"{pad.pad_name}:{self.count}" if not done else None
return Frame(data=value, EOS=done)
@dataclass
class TwoChannelSink(SinkElement):
results: dict = field(default_factory=dict)
def __post_init__(self):
super().__post_init__()
self.results = {name: [] for name in self.sink_pad_names}
def pull(self, pad: SinkPad, frame: Frame) -> None:
if frame.EOS:
self.mark_eos(pad)
return
self.results[pad.pad_name].append(frame.data)
src = TwoChannelSource(name="src", source_pad_names=["H1", "L1"])
snk = TwoChannelSink(name="snk", sink_pad_names=["H1", "L1"])
p = Pipeline()
p.connect(src, snk)
p.run()
assert len(snk.results["H1"]) == 2
assert len(snk.results["L1"]) == 2
When pad names match between two elements, connect() links them
automatically. For elements with different pad names, pass a link_map:
5. The internal() Method¶
Every element has an internal() method that runs between sink pads and source
pads. This is useful for transforms that need to coordinate across multiple
pads.
from dataclasses import dataclass, field
from sgn import (
TransformElement, SourceElement, SinkElement,
SourcePad, SinkPad, Frame, Pipeline,
)
@dataclass
class SumTransform(TransformElement):
"""Sums data from two input pads into one output pad."""
inputs: dict = field(init=False, default_factory=dict)
total: float = field(init=False, default=0.0)
eos: bool = field(init=False, default=False)
def pull(self, pad: SinkPad, frame: Frame) -> None:
if frame.EOS:
self.eos = True
self.inputs[pad.pad_name] = frame.data
def internal(self) -> None:
"""Runs after all pull() calls, before any new() calls."""
values = [v for v in self.inputs.values() if v is not None]
self.total = sum(values) if values else 0.0
def new(self, pad: SourcePad) -> Frame:
return Frame(data=self.total, EOS=self.eos)
@dataclass
class FixedSource(SourceElement):
values: list = field(default_factory=list)
def __post_init__(self):
super().__post_init__()
self.idx = 0
def new(self, pad: SourcePad) -> Frame:
if self.idx < len(self.values):
val = self.values[self.idx]
self.idx += 1
return Frame(data=val)
return Frame(EOS=True)
@dataclass
class CollectSinkCustom(SinkElement):
results: list = field(default_factory=list)
def pull(self, pad: SinkPad, frame: Frame) -> None:
if frame.EOS:
self.mark_eos(pad)
return
self.results.append(frame.data)
src_a = FixedSource(name="a", source_pad_names=["val"], values=[1, 2, 3])
src_b = FixedSource(name="b", source_pad_names=["val"], values=[10, 20, 30])
trn = SumTransform(
name="sum",
sink_pad_names=["x", "y"],
source_pad_names=["out"],
)
snk = CollectSinkCustom(name="snk", sink_pad_names=["out"])
p = Pipeline()
p.connect(src_a, trn, link_map={"x": "val"})
p.connect(src_b, trn, link_map={"y": "val"})
p.connect(trn, snk)
p.run()
assert snk.results == [11.0, 22.0, 33.0]
The execution order within any element is always:
- Sink pads (
pull) - receive data from upstream - Internal pad (
internal) - coordinate across pads - Source pads (
new) - produce data for downstream
Next Steps¶
Now that you understand the building blocks, explore:
- Connect Elements -
connect()patterns and strategies - Group & Select - combine elements and pick specific pads
- Elements -
IterSource,CallableTransform,CollectSink, and more - Core Concepts - deeper explanation of Frame, Pad, Element, Pipeline