Core Concepts¶
The fundamental building blocks of every SGN pipeline.
Frame¶
A Frame is the unit of data that flows through a pipeline. Every pad produces
or consumes one frame per iteration.
from sgn import Frame
frame = Frame(
EOS=False, # End of Stream marker
is_gap=False, # Marks this frame as a gap (missing data)
spec=DataSpec(), # Data specification (type/shape consistency)
data=None, # The payload — any Python object
metadata={}, # Arbitrary key-value metadata
)
| Attribute | Type | Default | Purpose |
|---|---|---|---|
EOS |
bool |
False |
Signals the end of a data stream |
is_gap |
bool |
False |
Indicates missing or unavailable data |
spec |
DataSpec |
DataSpec() |
Enforces consistency across frames on a pad |
data |
Any |
None |
The actual data payload |
metadata |
dict |
{} |
Extra information (timestamps, labels, etc.) |
A frame with data=None and EOS=False is a valid "empty" frame — useful for
iterations where a source has nothing to report yet.
Gap Frames¶
A frame with is_gap=True signals that data is missing or unavailable for this
iteration. Downstream elements can check frame.is_gap and decide whether to
skip processing, interpolate, or propagate the gap:
def new(self, pad: SourcePad) -> Frame:
if not data_available():
return Frame(is_gap=True)
return Frame(data=read_data())
Metadata¶
The metadata dictionary carries auxiliary information alongside the data
payload. Common uses include timestamps, segment identifiers, or quality flags:
Metadata is not validated — it passes through the pipeline unchanged unless an element explicitly reads or modifies it.
DataSpec¶
DataSpec is a frozen dataclass that describes the expected shape or type of
data on a pad. Sink pads record the DataSpec of the first frame they receive
and raise a ValueError if subsequent frames have a different spec.
Subclass DataSpec to add fields relevant to your domain:
from dataclasses import dataclass
from sgn import DataSpec, Frame
@dataclass(frozen=True, eq=True)
class AudioSpec(DataSpec):
sample_rate: int = 16000
channels: int = 1
frame = Frame(data=samples, spec=AudioSpec(sample_rate=44100, channels=2))
Use spec.update() to create a modified copy:
IterFrame¶
IterFrame is a Frame subclass whose data defaults to an empty list
instead of None. Use it when your pipeline passes collections of items per
iteration. The choice between Frame and IterFrame depends on your data
model:
Frame— one logical item per iteration (a single array, a single measurement, a single event)IterFrame— a batch of items per iteration (a list of events, a collection of file paths)
Pad¶
Pads are the connection points where data enters or leaves an element. There are three types:
| Pad Type | Direction | Method Called | Purpose |
|---|---|---|---|
SourcePad |
Output | new(pad) |
Produces a frame for downstream |
SinkPad |
Input | pull(pad, frame) |
Receives a frame from upstream |
InternalPad |
Internal | internal() |
Coordinates between sink and source pads |
Naming¶
Every pad has two names:
pad.pad_name— the short name you provide at construction (e.g.,"H1")pad.name— the fully qualified name:"element_name:pad_type:pad_name"
For example, a source pad named "H1" on element "src" has:
pad.pad_name="H1"pad.name="src:src:H1"
The pad type in the full name is src for source pads, snk for sink pads,
and inl for internal pads.
Use pad.pad_name in element logic (e.g., to distinguish which channel data
came from). Use pad.name when working with the low-level insert()/link()
API.
Linking¶
A sink pad links to exactly one source pad. Multiple sink pads may link to the
same source pad (broadcasting). When linked, the sink pad reads from the source
pad's output attribute each iteration.
At the low level, link() is called as sink_pad.link(source_pad) — the sink
pad is the caller, taking the source pad as its argument. The connect() API
handles this automatically.
Element¶
Elements are the processing units of a pipeline. Each element groups pads together and implements the logic that runs when those pads are called.
Every element has a name. If you don't provide one, SGN assigns a unique
UUID-based name automatically. Named elements are easier to debug and are
required for the registry access pattern described below.
SourceElement¶
Produces data. Has source pads only. Implement new():
@dataclass
class MySource(SourceElement):
def new(self, pad: SourcePad) -> Frame:
return Frame(data=produce_data())
TransformElement¶
Receives and produces data. Has both sink and source pads. Implement pull()
and new():
@dataclass
class MyTransform(TransformElement):
def pull(self, pad: SinkPad, frame: Frame) -> None:
self.in_frame = frame # Store for use in new()
def new(self, pad: SourcePad) -> Frame:
return Frame(data=transform(self.in_frame.data))
Execution order within a transform is guaranteed: all pull() calls complete
before internal(), which completes before any new() calls.
SinkElement¶
Consumes data. Has sink pads only. Implement pull() and call mark_eos():
@dataclass
class MySink(SinkElement):
def pull(self, pad: SinkPad, frame: Frame) -> None:
if frame.EOS:
self.mark_eos(pad)
return
process(frame.data)
Calling self.mark_eos(pad) signals to the pipeline that this pad is done.
The pipeline stops when all sinks report EOS.
The internal() Method¶
Every element has an internal() method (defaults to a no-op). Override it to
run logic between receiving data and producing data. This is especially useful
in transforms that need to combine data from multiple sink pads before
producing output on source pads.
Static Pads¶
By default, pad names are provided at instantiation via source_pad_names and
sink_pad_names. For elements that always have the same pads, use class-level
configuration:
@dataclass
class AlwaysTwoPads(TransformElement):
static_sink_pads: ClassVar[list[str]] = ["H1", "L1"]
static_source_pads: ClassVar[list[str]] = ["H1", "L1"]
allow_dynamic_sink_pads: ClassVar[bool] = False
allow_dynamic_source_pads: ClassVar[bool] = False
With allow_dynamic_*_pads = False, the element rejects any pad names passed
at instantiation and uses only its static pads. With the default True, static
pads are combined with any user-provided pads.
Pipeline¶
Pipeline extends Graph to add execution capabilities. It manages the
element registry, validates connections, and runs the event loop.
Building¶
Three levels of API for building a pipeline:
| Method | Level | Description |
|---|---|---|
connect() |
High | Auto-inserts elements and matches pads by name |
insert() |
Medium | Adds elements to the graph without linking |
link() |
Low | Links specific pads by full name |
connect() is the recommended API. It handles insertion automatically and
supports multiple matching strategies.
Registry Access¶
After inserting elements, you can look up elements and pads by name using bracket notation:
p = Pipeline()
p.connect(src, trn).connect(trn, snk)
element = p["trn"] # Element by name
pad = p["trn:src:H1"] # Pad by full name
This is useful for inspecting pipeline state after run() completes, or for
advanced linking with insert() and link().
Running¶
pipeline.run() does the following:
- Calls
check()to verify all pads are linked and at least one sink exists. - Builds a
TopologicalSorterfrom the graph. - Iterates: executes pads in topological order, concurrently where possible.
- Stops when all sinks report EOS.
All method calls return self, enabling chaining:
For details on how the execution loop works, see Architecture.