Element Composition¶
SGN lets you combine multiple elements into a single reusable unit. The resulting composed element behaves like any other element.
Define a dataclass that inherits from ComposedSourceElement,
ComposedTransformElement, or ComposedSinkElement and override _build()
to wire internal elements using self.insert() and self.connect(). The
resulting object IS an element — pass it directly to pipeline.connect().
Composition Types¶
| Use case | Base class |
|---|---|
| Source(s) + optional Transform(s) | ComposedSourceElement |
| Transform(s) only | ComposedTransformElement |
| Optional Transform(s) + Sink(s) | ComposedSinkElement |
Composed Transform¶
from dataclasses import dataclass
from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedTransformElement
from sgn.transforms import CallableTransform
@dataclass(repr=False, kw_only=True)
class ScaleAndOffset(ComposedTransformElement):
"""Multiplies input by a factor then adds an offset."""
factor: float = 1.0
offset: float = 0.0
def _validate(self):
if self.factor == 0:
raise ValueError("factor must be non-zero")
def _build(self):
scale = CallableTransform.from_callable(
name=f"{self.name}_scale",
callable=lambda frame, f=self.factor: (
frame.data * f if frame.data is not None else None
),
output_pad_name="data",
sink_pad_names=["data"],
)
add = CallableTransform.from_callable(
name=f"{self.name}_add",
callable=lambda frame, o=self.offset: (
frame.data + o if frame.data is not None else None
),
output_pad_name="data",
sink_pad_names=["data"],
)
self.connect(scale, add)
to_fahrenheit = ScaleAndOffset(name="to_f", factor=9 / 5, offset=32)
source = IterSource(
name="celsius",
source_pad_names=["data"],
iters={"data": iter([0, 20, 100])},
eos_on_empty={"data": True},
)
sink = CollectSink(name="sink", sink_pad_names=["data"])
pipeline = Pipeline()
pipeline.connect(source, to_fahrenheit)
pipeline.connect(to_fahrenheit, sink)
pipeline.run()
assert list(sink.collects["data"]) == [32.0, 68.0, 212.0]
Composed Source¶
from dataclasses import dataclass
from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedSourceElement
from sgn.transforms import CallableTransform
@dataclass(repr=False, kw_only=True)
class ScaledSource(ComposedSourceElement):
"""A source that produces values scaled by a factor."""
values: tuple = (1, 2, 3)
factor: float = 1.0
def _build(self):
raw = IterSource(
name=f"{self.name}_raw",
source_pad_names=["data"],
iters={"data": iter(self.values)},
eos_on_empty={"data": True},
)
scale = CallableTransform.from_callable(
name=f"{self.name}_scale",
callable=lambda frame, f=self.factor: (
frame.data * f if frame.data is not None else None
),
output_pad_name="data",
sink_pad_names=["data"],
)
self.connect(raw, scale)
source = ScaledSource(name="src", values=(10, 20, 30), factor=0.5)
sink = CollectSink(name="sink", sink_pad_names=["data"])
pipeline = Pipeline()
pipeline.connect(source, sink)
pipeline.run()
assert list(sink.collects["data"]) == [5.0, 10.0, 15.0]
Composed Sink¶
from dataclasses import dataclass
from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedSinkElement
from sgn.transforms import CallableTransform
@dataclass(kw_only=True)
class FormattedCollector(ComposedSinkElement):
"""Formats data as strings before collecting."""
prefix: str = "value"
def _build(self):
fmt = CallableTransform.from_callable(
name=f"{self.name}_fmt",
callable=lambda frame, p=self.prefix: (
f"{p}={frame.data}" if frame.data is not None else None
),
output_pad_name="data",
sink_pad_names=["data"],
)
self._collector = CollectSink(
name=f"{self.name}_collect", sink_pad_names=["data"]
)
self.connect(fmt, self._collector)
source = IterSource(
name="src",
source_pad_names=["data"],
iters={"data": iter([1, 2, 3])},
eos_on_empty={"data": True},
)
sink = FormattedCollector(name="sink", prefix="result")
pipeline = Pipeline()
pipeline.connect(source, sink)
pipeline.run()
assert list(sink._collector.collects["data"]) == [
"result=1",
"result=2",
"result=3",
]
Merging Multiple Sources¶
Non-linear compositions are built the same way — call self.connect() once
per edge in the internal graph.
from dataclasses import dataclass
from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedSourceElement
from sgn.transforms import CallableTransform
@dataclass(repr=False, kw_only=True)
class WeatherSource(ComposedSourceElement):
"""Merges temperature and humidity streams into a single source."""
temps: tuple = (20, 25, 30)
humids: tuple = (50, 60, 70)
def _build(self):
temp_src = IterSource(
name=f"{self.name}_temps", source_pad_names=["temp"],
iters={"temp": iter(self.temps)}, eos_on_empty={"temp": True},
)
humid_src = IterSource(
name=f"{self.name}_humids", source_pad_names=["humid"],
iters={"humid": iter(self.humids)}, eos_on_empty={"humid": True},
)
converter = CallableTransform.from_combinations(
name=f"{self.name}_convert",
combos=[
(("temp",), lambda f: f.data * 9/5 + 32 if f.data is not None else None, "temp"),
(("humid",), lambda f: f"{f.data}%" if f.data is not None else None, "humid"),
],
)
self.connect(temp_src, converter)
self.connect(humid_src, converter)
source = WeatherSource(name="weather")
sink = CollectSink(name="sink", sink_pad_names=["temp", "humid"])
pipeline = Pipeline()
pipeline.connect(source, sink)
pipeline.run()
assert list(sink.collects["temp"]) == [68.0, 77.0, 86.0]
assert list(sink.collects["humid"]) == ["50%", "60%", "70%"]
Advanced: intermediate taps and boundary-pad control¶
By default, the external pad surface of a composed element is determined by its graph: internal source pads that aren't consumed become the outputs, internal sink pads that aren't fed become the inputs, and the pad names carry through unchanged. Two hooks let you shape that surface beyond the default:
self.expose_source_pad("element:src:pad")— during_build(), mark an internal source pad for external exposure even when it's also wired to an internal sink. This is the multilink / tap pattern.self.update_pad_names = {old: {new_1: full_name_1, ...}}— set during_build()(or by the constructor). Each key must name an existing boundary pad; its value is a dict of replacement pad names mapped to full internal pad names. Use it to rename for a clean external API, or to split a single boundary pad into several (e.g. when two internal elements share a pad name and you need to target each).
A note on pad names¶
Each pad carries two names: a full name like "myelem:snk:data" that is
globally unique within the graph, and a short name like "data" that only
has to be unique within its own element. A composed element creates exactly
one external pad per boundary entry, with its own full name
("composed:snk:data"). Short-name collisions across internal elements are
resolved asymmetrically because of the direction of data flow:
- Source side (one-to-one). An external source emits one frame per tick
from one internal pad, so two internal sources sharing a short name is
ambiguous and warns. Use
update_pad_namesto split the colliding name into distinct external names, each bound to a different internal source. - Sink side (fan-out). An external sink injects one frame per tick into
potentially many internal sinks. Two (or more) internal sinks sharing a
short name automatically fan out from one external sink — no ambiguity,
no warning.
update_pad_namescan also go the other direction here, combining differently-named internal sinks into one external sink.
Example: two-stage amplifier with a diagnostic tap¶
A realistic scenario that needs both hooks: a two-stage amplifier that
applies gain and then clips the signal to a limit. We want the clipped
signal as the main output — named output for a stable external API
regardless of how the internals are wired — and we also want to tap the
pre-clip signal so a monitor can tell when clipping is active.
from dataclasses import dataclass
from sgn import Pipeline, IterSource, CollectSink
from sgn.compose import ComposedTransformElement
from sgn.transforms import CallableTransform
@dataclass(repr=False, kw_only=True)
class AmplifierWithTap(ComposedTransformElement):
"""Gain stage followed by a clip, with a tap on the pre-clip signal."""
gain: float = 2.0
limit: float = 10.0
def _validate(self):
if self.gain <= 0:
raise ValueError("gain must be positive")
def _build(self):
amp = CallableTransform.from_callable(
name=f"{self.name}_amp",
callable=lambda frame, g=self.gain: (
frame.data * g if frame.data is not None else None
),
output_pad_name="amplified",
sink_pad_names=["signal"],
)
clip = CallableTransform.from_callable(
name=f"{self.name}_clip",
callable=lambda frame, lim=self.limit: (
max(-lim, min(lim, frame.data)) if frame.data is not None else None
),
output_pad_name="clipped",
sink_pad_names=["amplified"],
)
# Wire amp → clip. amp:src:amplified is now internally consumed
# by clip, so by default it would NOT be exposed externally.
self.connect(amp, clip)
# Expose it anyway, as a diagnostic tap.
self.expose_source_pad(f"{amp.name}:src:amplified")
# Rename the main output from the internal "clipped" to "output"
# so the external API doesn't leak our internal naming.
self.update_pad_names = {
"clipped": {"output": f"{clip.name}:src:clipped"},
}
amp = AmplifierWithTap(name="gain", gain=3.0, limit=10.0)
source = IterSource(
name="src", source_pad_names=["signal"],
iters={"signal": iter([1, 2, 5, 10])}, eos_on_empty={"signal": True},
)
main_sink = CollectSink(name="main", sink_pad_names=["output"])
tap_sink = CollectSink(name="tap", sink_pad_names=["amplified"])
pipeline = Pipeline()
pipeline.connect(source, amp)
pipeline.connect(amp, main_sink, link_map={"output": "output"})
pipeline.connect(amp, tap_sink, link_map={"amplified": "amplified"})
pipeline.run()
# Clipping hides the real gain: the tap exposes it for monitoring.
assert list(main_sink.collects["output"]) == [3.0, 6.0, 10.0, 10.0]
assert list(tap_sink.collects["amplified"]) == [3.0, 6.0, 15.0, 30.0]
The resulting pad surface:
- sink:
signal(feedsamp; unchanged from the default) - sources:
output(clip's output, renamed viaupdate_pad_names) andamplified(amp's output, kept viaexpose_source_padeven though it's internally linked toclip)
Without expose_source_pad, the tap wouldn't be reachable; without
update_pad_names, downstream code would have to know and depend on the
internal element's clipped pad name.
Tips¶
- Always handle
Nonedata in callables (appears in EOS and gap frames). connect()auto-inserts elements — no need to callinsert()separately.- Use meaningful names for composed elements to help with visualization.
- Override
_validate()in subclasses to check parameters before the internal graph is built. - Use
self.expose_source_pad("element:src:pad")in_build()for multilink patterns where a source pad feeds both an internal sink and external consumers.