Architecture¶
How SGN executes pipelines internally.
The DAG¶
A pipeline is a directed acyclic graph (DAG) where pads are the nodes and links are the edges. Elements are containers that group pads together, but the execution graph operates at the pad level.
Each element type contributes its own internal subgraph:
SourceElement TransformElement SinkElement
┌───────────────────┐ ┌───────────────────────┐ ┌────────────────┐
│ │ │ [snk H1] [snk L1] │ │ [snk H1] │
│ │ │ \ / │ │ | │
│ [internal] │ │ [internal] │ │ [internal] │
│ / \ │ │ / \ │ │ │
│ [src H1] [src L1] │ │ [src H1] [src L1] │ │ │
└───────────────────┘ └───────────────────────┘ └────────────────┘
When you connect elements, SGN links source pads to sink pads across elements. Consider a pipeline with a two-channel source, a transform, and a sink:
src = MySource(name="src", source_pad_names=["H1", "L1"])
trn = MyTransform(name="trn", sink_pad_names=["H1", "L1"], source_pad_names=["H1", "L1"])
snk = MySink(name="snk", sink_pad_names=["H1", "L1"])
p = Pipeline()
p.connect(src, trn)
p.connect(trn, snk)
SGN builds the following execution graph (data flows top to bottom):
┌─────────────────────────────────────────────────────┐
│ src (SourceElement) │
│ │
│ [internal] │
│ / \ │
│ [src:H1] [src:L1] │
└─────────────────┬──────────────┬────────────────────┘
│ │
│ Frame │ Frame
▼ ▼
┌─────────────────┴──────────────┴────────────────────┐
│ [snk:H1] [snk:L1] │
│ \ / │
│ trn (TransformElement) │
│ [internal] │
│ / \ │
│ [src:H1] [src:L1] │
└─────────────────┬──────────────┬────────────────────┘
│ │
│ Frame │ Frame
▼ ▼
┌─────────────────┴──────────────┴────────────────────┐
│ [snk:H1] [snk:L1] │
│ \ / │
│ snk (SinkElement) │
│ [internal] │
└─────────────────────────────────────────────────────┘
Full pad names follow the pattern element_name:pad_type:pad_name. For example,
trn:snk:H1 is the sink pad named H1 on element trn. Pad types are src,
snk, and inl (internal).
Execution Order¶
SGN uses graphlib.TopologicalSorter from the Python standard library to
determine execution order. Each iteration of the pipeline:
- The sorter identifies all nodes with no unsatisfied dependencies.
- Those nodes run concurrently as
asynciotasks. - Once a batch completes, the sorter marks them done and finds the next batch.
- This repeats until all nodes have been executed.
For the pipeline above, one iteration executes in this order:
Step 1: src:internal (no dependencies)
Step 2: src:src:H1, src:src:L1 (concurrent — both depend only on src:internal)
Step 3: trn:snk:H1, trn:snk:L1 (concurrent — each depends on its upstream src pad)
Step 4: trn:internal (depends on both trn sink pads)
Step 5: trn:src:H1, trn:src:L1 (concurrent — both depend only on trn:internal)
Step 6: snk:snk:H1, snk:snk:L1 (concurrent — each depends on its upstream src pad)
Step 7: snk:internal (depends on both snk sink pads)
Within a single element, the execution order is always:
- Sink pads (
pull) — receive frames from upstream - Internal pad (
internal) — coordinate across pads - Source pads (
new) — produce frames for downstream
This ordering is enforced by the internal subgraph structure (sink pads are dependencies of the internal pad, which is a dependency of source pads).
Concurrency¶
Pads that have no dependency relationship run concurrently within the same
iteration. In the example above, src:src:H1 and src:src:L1 execute in
parallel via asyncio.gather(), as do trn:snk:H1 and trn:snk:L1.
This is cooperative concurrency (not true parallelism). For CPU-bound work across multiple cores, see Run in Parallel.
Frame Propagation¶
Each source pad produces exactly one frame per iteration. When new(pad)
is called, the returned Frame is stored on the source pad's output
attribute. When a downstream sink pad is called, it reads self.other.output
(where self.other is the linked source pad) and passes it to the element's
pull() method.
┌──────────────┐ ┌──────────────┐
│ src:src:H1 │ │ trn:snk:H1 │
│ │ │ │
│ output: ────┼───────────▶│ input: ─────┼──▶ trn.pull(pad, frame)
│ Frame( │ │ (copied │
│ data=42) │ │ from src) │
└──────────────┘ └──────────────┘
This means:
- A source pad linked to multiple sink pads broadcasts the same frame to all of them.
- Each sink pad receives exactly one frame per iteration.
- Frames are not queued — each iteration overwrites the previous frame.
EOS Protocol¶
End of Stream (EOS) is how the pipeline knows when to stop:
- A source element returns
Frame(EOS=True)fromnew(). - The EOS frame propagates through the graph like any other frame.
- Each sink element calls
self.mark_eos(pad)when it receives an EOS frame. - The pipeline loop checks
all(sink.at_eos for sink in self.sinks.values())after every iteration. - When all sinks report EOS,
pipeline.run()returns.
A sink element's at_eos property returns True if any of its sink pads
have been marked EOS. This means a multi-pad sink stops as soon as one pad
receives EOS.
Pad Linking¶
When sink_pad.link(source_pad) is called:
- The sink pad stores a reference to the source pad (
self.other = source_pad). - Both pads are marked as linked (
is_linked = True). - A graph edge is returned:
{sink_pad: {source_pad}}, meaning the sink pad depends on the source pad.
The Pipeline.check() method (called automatically by run()) verifies that
every pad is linked. Unlinked pads raise a RuntimeError.
Pad Naming¶
Every pad has two names:
pad.pad_name— the short name you provide (e.g.,"H1")pad.name— the full qualified name:"element_name:pad_type:pad_name"
Full names are used in the graph registry and for insert()/link() calls.
Short names are used in connect() matching and in element logic (e.g.,
pad.pad_name in pull() and new()).
DataSpec Consistency¶
Each sink pad tracks a DataSpec from the first frame it receives. If a
subsequent frame arrives with a different DataSpec, the pad raises a
ValueError. This catches accidental changes in data shape or type mid-stream.
Running in Jupyter¶
If the asyncio event loop is already running (e.g., in a Jupyter notebook),
pipeline.run() automatically creates a new event loop in a background thread.
No code changes are needed.