Skip to content

Your First Pipeline

This tutorial walks you through building SGN pipelines from scratch. Each section builds on the previous one, introducing one new concept at a time.

1. A Source and a Sink

Every pipeline needs at least a source (produces data) and a sink (consumes data). Let's build custom ones.

from dataclasses import dataclass
from sgn import SourceElement, SinkElement, SourcePad, SinkPad, Frame, Pipeline


@dataclass
class CountSource(SourceElement):
    """Produces frames counting from 1 to `limit`."""
    limit: int = 3

    def __post_init__(self):
        super().__post_init__()
        self.count = 0

    def new(self, pad: SourcePad) -> Frame:
        self.count += 1
        return Frame(
            data=self.count if self.count <= self.limit else None,
            EOS=self.count > self.limit,
        )


@dataclass
class PrintSink(SinkElement):
    """Prints each frame's data."""
    def pull(self, pad: SinkPad, frame: Frame) -> None:
        if frame.EOS:
            self.mark_eos(pad)
            return
        print(f"{pad.pad_name}: {frame.data}")


src = CountSource(name="src", source_pad_names=["out"])
snk = PrintSink(name="snk", sink_pad_names=["out"])

p = Pipeline()
p.connect(src, snk)
p.run()

Output:

out: 1
out: 2
out: 3

Key points:

  • SourceElement subclasses implement new(pad) and return a Frame.
  • SinkElement subclasses implement pull(pad, frame) and call self.mark_eos(pad) when frame.EOS is True.
  • connect() automatically matches pads with the same name (out to out).
  • The pipeline loops until every sink has received EOS.

2. Stopping the Pipeline (EOS)

The previous example set EOS=True on the frame when the count exceeded the limit. This is the End of Stream protocol:

  1. A source sets Frame(EOS=True) when it has no more data.
  2. Each sink calls self.mark_eos(pad) when it receives an EOS frame.
  3. The pipeline stops when all sinks report EOS.

If your source never sends EOS, the pipeline runs forever. This is useful for streaming applications combined with signal handling.

3. Adding a Transform

A TransformElement sits between a source and a sink. It implements two methods: pull() to receive data and new() to produce data.

from dataclasses import dataclass, field
from sgn import (
    SourceElement, TransformElement, SinkElement,
    SourcePad, SinkPad, Frame, Pipeline,
)


@dataclass
class CountSource(SourceElement):
    limit: int = 3

    def __post_init__(self):
        super().__post_init__()
        self.count = 0

    def new(self, pad: SourcePad) -> Frame:
        self.count += 1
        return Frame(
            data=self.count if self.count <= self.limit else None,
            EOS=self.count > self.limit,
        )


@dataclass
class DoubleTransform(TransformElement):
    """Doubles the data in each frame."""
    in_frame: Frame = field(init=False, default_factory=Frame)

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        self.in_frame = frame

    def new(self, pad: SourcePad) -> Frame:
        result = None if self.in_frame.data is None else self.in_frame.data * 2
        return Frame(data=result, EOS=self.in_frame.EOS)


@dataclass
class CollectSinkCustom(SinkElement):
    """Collects data into a list."""
    results: list = field(default_factory=list)

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        if frame.EOS:
            self.mark_eos(pad)
            return
        self.results.append(frame.data)


src = CountSource(name="src", source_pad_names=["H1"])
trn = DoubleTransform(name="trn", sink_pad_names=["H1"], source_pad_names=["H1"])
snk = CollectSinkCustom(name="snk", sink_pad_names=["H1"])

p = Pipeline()
p.connect(src, trn)
p.connect(trn, snk)
p.run()

assert snk.results == [2, 4, 6]

The execution order within a TransformElement is: sink pads (pull) -> internal -> source pads (new). This guarantees pull() runs before new().

4. Multiple Pads

Elements can have more than one pad. A source might produce multiple data streams, and a sink might consume multiple streams.

from dataclasses import dataclass, field
from sgn import SourceElement, SinkElement, SourcePad, SinkPad, Frame, Pipeline


@dataclass
class TwoChannelSource(SourceElement):
    limit: int = 2

    def __post_init__(self):
        super().__post_init__()
        self.count = 0

    def new(self, pad: SourcePad) -> Frame:
        self.count += 1
        done = self.count > self.limit * len(self.source_pads)
        # Use the pad's short name to vary the data
        value = f"{pad.pad_name}:{self.count}" if not done else None
        return Frame(data=value, EOS=done)


@dataclass
class TwoChannelSink(SinkElement):
    results: dict = field(default_factory=dict)

    def __post_init__(self):
        super().__post_init__()
        self.results = {name: [] for name in self.sink_pad_names}

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        if frame.EOS:
            self.mark_eos(pad)
            return
        self.results[pad.pad_name].append(frame.data)


src = TwoChannelSource(name="src", source_pad_names=["H1", "L1"])
snk = TwoChannelSink(name="snk", sink_pad_names=["H1", "L1"])

p = Pipeline()
p.connect(src, snk)
p.run()

assert len(snk.results["H1"]) == 2
assert len(snk.results["L1"]) == 2

When pad names match between two elements, connect() links them automatically. For elements with different pad names, pass a link_map:

p.connect(src, snk, link_map={"x": "H1", "y": "L1"})

5. The internal() Method

Every element has an internal() method that runs between sink pads and source pads. This is useful for transforms that need to coordinate across multiple pads.

from dataclasses import dataclass, field
from sgn import (
    TransformElement, SourceElement, SinkElement,
    SourcePad, SinkPad, Frame, Pipeline,
)


@dataclass
class SumTransform(TransformElement):
    """Sums data from two input pads into one output pad."""
    inputs: dict = field(init=False, default_factory=dict)
    total: float = field(init=False, default=0.0)
    eos: bool = field(init=False, default=False)

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        if frame.EOS:
            self.eos = True
        self.inputs[pad.pad_name] = frame.data

    def internal(self) -> None:
        """Runs after all pull() calls, before any new() calls."""
        values = [v for v in self.inputs.values() if v is not None]
        self.total = sum(values) if values else 0.0

    def new(self, pad: SourcePad) -> Frame:
        return Frame(data=self.total, EOS=self.eos)


@dataclass
class FixedSource(SourceElement):
    values: list = field(default_factory=list)

    def __post_init__(self):
        super().__post_init__()
        self.idx = 0

    def new(self, pad: SourcePad) -> Frame:
        if self.idx < len(self.values):
            val = self.values[self.idx]
            self.idx += 1
            return Frame(data=val)
        return Frame(EOS=True)


@dataclass
class CollectSinkCustom(SinkElement):
    results: list = field(default_factory=list)

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        if frame.EOS:
            self.mark_eos(pad)
            return
        self.results.append(frame.data)


src_a = FixedSource(name="a", source_pad_names=["val"], values=[1, 2, 3])
src_b = FixedSource(name="b", source_pad_names=["val"], values=[10, 20, 30])

trn = SumTransform(
    name="sum",
    sink_pad_names=["x", "y"],
    source_pad_names=["out"],
)

snk = CollectSinkCustom(name="snk", sink_pad_names=["out"])

p = Pipeline()
p.connect(src_a, trn, link_map={"x": "val"})
p.connect(src_b, trn, link_map={"y": "val"})
p.connect(trn, snk)
p.run()

assert snk.results == [11.0, 22.0, 33.0]

The execution order within any element is always:

  1. Sink pads (pull) - receive data from upstream
  2. Internal pad (internal) - coordinate across pads
  3. Source pads (new) - produce data for downstream

Next Steps

Now that you understand the building blocks, explore: