Skip to content

Element Composition

SGN lets you combine multiple elements into a single reusable unit. The resulting composed element behaves like any other element.

Define a dataclass that inherits from ComposedSourceElement, ComposedTransformElement, or ComposedSinkElement and override _build() to wire internal elements using self.insert() and self.connect(). The resulting object IS an element — pass it directly to pipeline.connect().

Composition Types

Use case Base class
Source(s) + optional Transform(s) ComposedSourceElement
Transform(s) only ComposedTransformElement
Optional Transform(s) + Sink(s) ComposedSinkElement

Composed Transform

from dataclasses import dataclass

from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedTransformElement
from sgn.transforms import CallableTransform


@dataclass(repr=False, kw_only=True)
class ScaleAndOffset(ComposedTransformElement):
    """Multiplies input by a factor then adds an offset."""

    factor: float = 1.0
    offset: float = 0.0

    def _validate(self):
        if self.factor == 0:
            raise ValueError("factor must be non-zero")

    def _build(self):
        scale = CallableTransform.from_callable(
            name=f"{self.name}_scale",
            callable=lambda frame, f=self.factor: (
                frame.data * f if frame.data is not None else None
            ),
            output_pad_name="data",
            sink_pad_names=["data"],
        )
        add = CallableTransform.from_callable(
            name=f"{self.name}_add",
            callable=lambda frame, o=self.offset: (
                frame.data + o if frame.data is not None else None
            ),
            output_pad_name="data",
            sink_pad_names=["data"],
        )
        self.connect(scale, add)


to_fahrenheit = ScaleAndOffset(name="to_f", factor=9 / 5, offset=32)

source = IterSource(
    name="celsius",
    source_pad_names=["data"],
    iters={"data": iter([0, 20, 100])},
    eos_on_empty={"data": True},
)
sink = CollectSink(name="sink", sink_pad_names=["data"])

pipeline = Pipeline()
pipeline.connect(source, to_fahrenheit)
pipeline.connect(to_fahrenheit, sink)
pipeline.run()

assert list(sink.collects["data"]) == [32.0, 68.0, 212.0]

Composed Source

from dataclasses import dataclass

from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedSourceElement
from sgn.transforms import CallableTransform


@dataclass(repr=False, kw_only=True)
class ScaledSource(ComposedSourceElement):
    """A source that produces values scaled by a factor."""

    values: tuple = (1, 2, 3)
    factor: float = 1.0

    def _build(self):
        raw = IterSource(
            name=f"{self.name}_raw",
            source_pad_names=["data"],
            iters={"data": iter(self.values)},
            eos_on_empty={"data": True},
        )
        scale = CallableTransform.from_callable(
            name=f"{self.name}_scale",
            callable=lambda frame, f=self.factor: (
                frame.data * f if frame.data is not None else None
            ),
            output_pad_name="data",
            sink_pad_names=["data"],
        )
        self.connect(raw, scale)


source = ScaledSource(name="src", values=(10, 20, 30), factor=0.5)
sink = CollectSink(name="sink", sink_pad_names=["data"])

pipeline = Pipeline()
pipeline.connect(source, sink)
pipeline.run()

assert list(sink.collects["data"]) == [5.0, 10.0, 15.0]

Composed Sink

from dataclasses import dataclass

from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedSinkElement
from sgn.transforms import CallableTransform


@dataclass(kw_only=True)
class FormattedCollector(ComposedSinkElement):
    """Formats data as strings before collecting."""

    prefix: str = "value"

    def _build(self):
        fmt = CallableTransform.from_callable(
            name=f"{self.name}_fmt",
            callable=lambda frame, p=self.prefix: (
                f"{p}={frame.data}" if frame.data is not None else None
            ),
            output_pad_name="data",
            sink_pad_names=["data"],
        )
        self._collector = CollectSink(
            name=f"{self.name}_collect", sink_pad_names=["data"]
        )
        self.connect(fmt, self._collector)


source = IterSource(
    name="src",
    source_pad_names=["data"],
    iters={"data": iter([1, 2, 3])},
    eos_on_empty={"data": True},
)
sink = FormattedCollector(name="sink", prefix="result")

pipeline = Pipeline()
pipeline.connect(source, sink)
pipeline.run()

assert list(sink._collector.collects["data"]) == [
    "result=1",
    "result=2",
    "result=3",
]

Merging Multiple Sources

Non-linear compositions are built the same way — call self.connect() once per edge in the internal graph.

from dataclasses import dataclass

from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedSourceElement
from sgn.transforms import CallableTransform


@dataclass(repr=False, kw_only=True)
class WeatherSource(ComposedSourceElement):
    """Merges temperature and humidity streams into a single source."""

    temps: tuple = (20, 25, 30)
    humids: tuple = (50, 60, 70)

    def _build(self):
        temp_src = IterSource(
            name=f"{self.name}_temps", source_pad_names=["temp"],
            iters={"temp": iter(self.temps)}, eos_on_empty={"temp": True},
        )
        humid_src = IterSource(
            name=f"{self.name}_humids", source_pad_names=["humid"],
            iters={"humid": iter(self.humids)}, eos_on_empty={"humid": True},
        )
        converter = CallableTransform.from_combinations(
            name=f"{self.name}_convert",
            combos=[
                (("temp",), lambda f: f.data * 9/5 + 32 if f.data is not None else None, "temp"),
                (("humid",), lambda f: f"{f.data}%" if f.data is not None else None, "humid"),
            ],
        )
        self.connect(temp_src, converter)
        self.connect(humid_src, converter)


source = WeatherSource(name="weather")
sink = CollectSink(name="sink", sink_pad_names=["temp", "humid"])
pipeline = Pipeline()
pipeline.connect(source, sink)
pipeline.run()

assert list(sink.collects["temp"]) == [68.0, 77.0, 86.0]
assert list(sink.collects["humid"]) == ["50%", "60%", "70%"]

Advanced: intermediate taps and boundary-pad control

By default, the external pad surface of a composed element is determined by its graph: internal source pads that aren't consumed become the outputs, internal sink pads that aren't fed become the inputs, and the pad names carry through unchanged. Two hooks let you shape that surface beyond the default:

  • self.expose_source_pad("element:src:pad") — during _build(), mark an internal source pad for external exposure even when it's also wired to an internal sink. This is the multilink / tap pattern.
  • self.update_pad_names = {old: {new_1: full_name_1, ...}} — set during _build() (or by the constructor). Each key must name an existing boundary pad; its value is a dict of replacement pad names mapped to full internal pad names. Use it to rename for a clean external API, or to split a single boundary pad into several (e.g. when two internal elements share a pad name and you need to target each).

A note on pad names

Each pad carries two names: a full name like "myelem:snk:data" that is globally unique within the graph, and a short name like "data" that only has to be unique within its own element. A composed element creates exactly one external pad per boundary entry, with its own full name ("composed:snk:data"). Short-name collisions across internal elements are resolved asymmetrically because of the direction of data flow:

  • Source side (one-to-one). An external source emits one frame per tick from one internal pad, so two internal sources sharing a short name is ambiguous and warns. Use update_pad_names to split the colliding name into distinct external names, each bound to a different internal source.
  • Sink side (fan-out). An external sink injects one frame per tick into potentially many internal sinks. Two (or more) internal sinks sharing a short name automatically fan out from one external sink — no ambiguity, no warning. update_pad_names can also go the other direction here, combining differently-named internal sinks into one external sink.

Example: two-stage amplifier with a diagnostic tap

A realistic scenario that needs both hooks: a two-stage amplifier that applies gain and then clips the signal to a limit. We want the clipped signal as the main output — named output for a stable external API regardless of how the internals are wired — and we also want to tap the pre-clip signal so a monitor can tell when clipping is active.

from dataclasses import dataclass

from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedTransformElement
from sgn.transforms import CallableTransform


@dataclass(repr=False, kw_only=True)
class AmplifierWithTap(ComposedTransformElement):
    """Gain stage followed by a clip, with a tap on the pre-clip signal."""

    gain: float = 2.0
    limit: float = 10.0

    def _validate(self):
        if self.gain <= 0:
            raise ValueError("gain must be positive")

    def _build(self):
        amp = CallableTransform.from_callable(
            name=f"{self.name}_amp",
            callable=lambda frame, g=self.gain: (
                frame.data * g if frame.data is not None else None
            ),
            output_pad_name="amplified",
            sink_pad_names=["signal"],
        )
        clip = CallableTransform.from_callable(
            name=f"{self.name}_clip",
            callable=lambda frame, lim=self.limit: (
                max(-lim, min(lim, frame.data)) if frame.data is not None else None
            ),
            output_pad_name="clipped",
            sink_pad_names=["amplified"],
        )
        # Wire amp → clip. amp:src:amplified is now internally consumed
        # by clip, so by default it would NOT be exposed externally.
        self.connect(amp, clip)

        # Expose it anyway, as a diagnostic tap.
        self.expose_source_pad(f"{amp.name}:src:amplified")

        # Rename the main output from the internal "clipped" to "output"
        # so the external API doesn't leak our internal naming.
        self.update_pad_names = {
            "clipped": {"output": f"{clip.name}:src:clipped"},
        }


amp = AmplifierWithTap(name="gain", gain=3.0, limit=10.0)
source = IterSource(
    name="src", source_pad_names=["signal"],
    iters={"signal": iter([1, 2, 5, 10])}, eos_on_empty={"signal": True},
)
main_sink = CollectSink(name="main", sink_pad_names=["output"])
tap_sink = CollectSink(name="tap", sink_pad_names=["amplified"])

pipeline = Pipeline()
pipeline.connect(source, amp)
pipeline.connect(amp, main_sink, link_map={"output": "output"})
pipeline.connect(amp, tap_sink, link_map={"amplified": "amplified"})
pipeline.run()

# Clipping hides the real gain: the tap exposes it for monitoring.
assert list(main_sink.collects["output"]) == [3.0, 6.0, 10.0, 10.0]
assert list(tap_sink.collects["amplified"]) == [3.0, 6.0, 15.0, 30.0]

The resulting pad surface:

  • sink: signal (feeds amp; unchanged from the default)
  • sources: output (clip's output, renamed via update_pad_names) and amplified (amp's output, kept via expose_source_pad even though it's internally linked to clip)

Without expose_source_pad, the tap wouldn't be reachable; without update_pad_names, downstream code would have to know and depend on the internal element's clipped pad name.

Tips

  • Always handle None data in callables (appears in EOS and gap frames).
  • connect() auto-inserts elements — no need to call insert() separately.
  • Use meaningful names for composed elements to help with visualization.
  • Override _validate() in subclasses to check parameters before the internal graph is built.
  • Use self.expose_source_pad("element:src:pad") in _build() for multilink patterns where a source pad feeds both an internal sink and external consumers.