Skip to content

Core Concepts

The fundamental building blocks of every SGN pipeline.

Frame

A Frame is the unit of data that flows through a pipeline. Every pad produces or consumes one frame per iteration.

from sgn import Frame

frame = Frame(
    EOS=False,        # End of Stream marker
    is_gap=False,     # Marks this frame as a gap (missing data)
    spec=DataSpec(),  # Data specification (type/shape consistency)
    data=None,        # The payload — any Python object
    metadata={},      # Arbitrary key-value metadata
)
Attribute Type Default Purpose
EOS bool False Signals the end of a data stream
is_gap bool False Indicates missing or unavailable data
spec DataSpec DataSpec() Enforces consistency across frames on a pad
data Any None The actual data payload
metadata dict {} Extra information (timestamps, labels, etc.)

A frame with data=None and EOS=False is a valid "empty" frame — useful for iterations where a source has nothing to report yet.

Gap Frames

A frame with is_gap=True signals that data is missing or unavailable for this iteration. Downstream elements can check frame.is_gap and decide whether to skip processing, interpolate, or propagate the gap:

def new(self, pad: SourcePad) -> Frame:
    if not data_available():
        return Frame(is_gap=True)
    return Frame(data=read_data())

Metadata

The metadata dictionary carries auxiliary information alongside the data payload. Common uses include timestamps, segment identifiers, or quality flags:

Frame(data=samples, metadata={"t0": 1234567890.0, "duration": 1.0})

Metadata is not validated — it passes through the pipeline unchanged unless an element explicitly reads or modifies it.

DataSpec

DataSpec is a frozen dataclass that describes the expected shape or type of data on a pad. Sink pads record the DataSpec of the first frame they receive and raise a ValueError if subsequent frames have a different spec.

Subclass DataSpec to add fields relevant to your domain:

from dataclasses import dataclass
from sgn import DataSpec, Frame

@dataclass(frozen=True, eq=True)
class AudioSpec(DataSpec):
    sample_rate: int = 16000
    channels: int = 1

frame = Frame(data=samples, spec=AudioSpec(sample_rate=44100, channels=2))

Use spec.update() to create a modified copy:

new_spec = frame.spec.update(sample_rate=48000)

IterFrame

IterFrame is a Frame subclass whose data defaults to an empty list instead of None. Use it when your pipeline passes collections of items per iteration. The choice between Frame and IterFrame depends on your data model:

  • Frame — one logical item per iteration (a single array, a single measurement, a single event)
  • IterFrame — a batch of items per iteration (a list of events, a collection of file paths)

Pad

Pads are the connection points where data enters or leaves an element. There are three types:

Pad Type Direction Method Called Purpose
SourcePad Output new(pad) Produces a frame for downstream
SinkPad Input pull(pad, frame) Receives a frame from upstream
InternalPad Internal internal() Coordinates between sink and source pads

Naming

Every pad has two names:

  • pad.pad_name — the short name you provide at construction (e.g., "H1")
  • pad.name — the fully qualified name: "element_name:pad_type:pad_name"

For example, a source pad named "H1" on element "src" has:

  • pad.pad_name = "H1"
  • pad.name = "src:src:H1"

The pad type in the full name is src for source pads, snk for sink pads, and inl for internal pads.

Use pad.pad_name in element logic (e.g., to distinguish which channel data came from). Use pad.name when working with the low-level insert()/link() API.

Linking

A sink pad links to exactly one source pad. Multiple sink pads may link to the same source pad (broadcasting). When linked, the sink pad reads from the source pad's output attribute each iteration.

At the low level, link() is called as sink_pad.link(source_pad) — the sink pad is the caller, taking the source pad as its argument. The connect() API handles this automatically.

Element

Elements are the processing units of a pipeline. Each element groups pads together and implements the logic that runs when those pads are called.

Every element has a name. If you don't provide one, SGN assigns a unique UUID-based name automatically. Named elements are easier to debug and are required for the registry access pattern described below.

SourceElement

Produces data. Has source pads only. Implement new():

@dataclass
class MySource(SourceElement):
    def new(self, pad: SourcePad) -> Frame:
        return Frame(data=produce_data())

TransformElement

Receives and produces data. Has both sink and source pads. Implement pull() and new():

@dataclass
class MyTransform(TransformElement):
    def pull(self, pad: SinkPad, frame: Frame) -> None:
        self.in_frame = frame  # Store for use in new()

    def new(self, pad: SourcePad) -> Frame:
        return Frame(data=transform(self.in_frame.data))

Execution order within a transform is guaranteed: all pull() calls complete before internal(), which completes before any new() calls.

SinkElement

Consumes data. Has sink pads only. Implement pull() and call mark_eos():

@dataclass
class MySink(SinkElement):
    def pull(self, pad: SinkPad, frame: Frame) -> None:
        if frame.EOS:
            self.mark_eos(pad)
            return
        process(frame.data)

Calling self.mark_eos(pad) signals to the pipeline that this pad is done. The pipeline stops when all sinks report EOS.

The internal() Method

Every element has an internal() method (defaults to a no-op). Override it to run logic between receiving data and producing data. This is especially useful in transforms that need to combine data from multiple sink pads before producing output on source pads.

Static Pads

By default, pad names are provided at instantiation via source_pad_names and sink_pad_names. For elements that always have the same pads, use class-level configuration:

@dataclass
class AlwaysTwoPads(TransformElement):
    static_sink_pads: ClassVar[list[str]] = ["H1", "L1"]
    static_source_pads: ClassVar[list[str]] = ["H1", "L1"]
    allow_dynamic_sink_pads: ClassVar[bool] = False
    allow_dynamic_source_pads: ClassVar[bool] = False

With allow_dynamic_*_pads = False, the element rejects any pad names passed at instantiation and uses only its static pads. With the default True, static pads are combined with any user-provided pads.

Pipeline

Pipeline extends Graph to add execution capabilities. It manages the element registry, validates connections, and runs the event loop.

Building

Three levels of API for building a pipeline:

Method Level Description
connect() High Auto-inserts elements and matches pads by name
insert() Medium Adds elements to the graph without linking
link() Low Links specific pads by full name

connect() is the recommended API. It handles insertion automatically and supports multiple matching strategies.

Registry Access

After inserting elements, you can look up elements and pads by name using bracket notation:

p = Pipeline()
p.connect(src, trn).connect(trn, snk)

element = p["trn"]          # Element by name
pad = p["trn:src:H1"]       # Pad by full name

This is useful for inspecting pipeline state after run() completes, or for advanced linking with insert() and link().

Running

pipeline.run() does the following:

  1. Calls check() to verify all pads are linked and at least one sink exists.
  2. Builds a TopologicalSorter from the graph.
  3. Iterates: executes pads in topological order, concurrently where possible.
  4. Stops when all sinks report EOS.

All method calls return self, enabling chaining:

Pipeline().connect(src, trn).connect(trn, snk).run()

For details on how the execution loop works, see Architecture.