Element Composition¶
The Compose class lets you combine multiple elements into a single reusable
unit. The resulting composed element behaves like any other element.
Composition Types¶
| Compose with | Finalize with | Result |
|---|---|---|
| Source + Transform(s) | .as_source() |
ComposedSourceElement |
| Transform(s) | .as_transform() |
ComposedTransformElement |
| Transform(s) + Sink | .as_sink() |
ComposedSinkElement |
Basic Composition¶
from sgn import Pipeline, Compose, IterSource, CollectSink
from sgn.transforms import CallableTransform
source = IterSource(
name="src",
source_pad_names=["data"],
iters={"data": iter([1, 2, 3, 4, 5])},
eos_on_empty={"data": True},
)
double = CallableTransform.from_callable(
name="double",
callable=lambda frame: frame.data * 2 if frame.data is not None else None,
output_pad_name="data",
sink_pad_names=["data"],
)
add_ten = CallableTransform.from_callable(
name="add_ten",
callable=lambda frame: frame.data + 10 if frame.data is not None else None,
output_pad_name="data",
sink_pad_names=["data"],
)
# Compose: source -> double -> add_ten as a single source element
composed_source = (
Compose()
.connect(source, double)
.connect(double, add_ten)
.as_source(name="processed_source")
)
sink = CollectSink(name="sink", sink_pad_names=["data"])
pipeline = Pipeline()
pipeline.connect(composed_source, sink)
pipeline.run()
assert list(sink.collects["data"]) == [12, 14, 16, 18, 20]
All Three Types Together¶
from sgn import Pipeline, Compose, IterSource, CollectSink
from sgn.transforms import CallableTransform
# ComposedSourceElement: source that doubles values
raw_source = IterSource(
name="raw", source_pad_names=["data"],
iters={"data": iter([1, 2, 3, 4, 5])},
)
double_t = CallableTransform.from_callable(
name="double", callable=lambda f: f.data * 2 if f.data is not None else None,
output_pad_name="data", sink_pad_names=["data"],
)
composed_source = Compose().connect(raw_source, double_t).as_source("doubled_source")
# ComposedTransformElement: adds 100 then negates
add_100 = CallableTransform.from_callable(
name="add_100", callable=lambda f: f.data + 100 if f.data is not None else None,
output_pad_name="data", sink_pad_names=["data"],
)
negate = CallableTransform.from_callable(
name="negate", callable=lambda f: -f.data if f.data is not None else None,
output_pad_name="data", sink_pad_names=["data"],
)
composed_transform = Compose().connect(add_100, negate).as_transform("add_and_negate")
# ComposedSinkElement: formats before collecting
formatter = CallableTransform.from_callable(
name="format",
callable=lambda f: f"result={f.data}" if f.data is not None else None,
output_pad_name="data", sink_pad_names=["data"],
)
collector = CollectSink(name="collector", sink_pad_names=["data"])
composed_sink = Compose().connect(formatter, collector).as_sink("formatting_sink")
# Wire it all up
pipeline = Pipeline()
pipeline.connect(composed_source, composed_transform)
pipeline.connect(composed_transform, composed_sink)
pipeline.run()
assert list(collector.collects["data"]) == [
"result=-102", "result=-104", "result=-106", "result=-108", "result=-110"
]
Merging Multiple Sources¶
from sgn import Pipeline, Compose, IterSource, CollectSink
from sgn.transforms import CallableTransform
temps = IterSource(name="temps", source_pad_names=["temp"],
iters={"temp": iter([20, 25, 30])}, eos_on_empty={"temp": True})
humids = IterSource(name="humid", source_pad_names=["humid"],
iters={"humid": iter([50, 60, 70])}, eos_on_empty={"humid": True})
converter = CallableTransform.from_combinations(
name="convert",
combos=[
(("temp",), lambda f: f.data * 9/5 + 32 if f.data is not None else None, "temp"),
(("humid",), lambda f: f"{f.data}%" if f.data is not None else None, "humid"),
],
)
weather_source = (
Compose()
.connect(temps, converter)
.connect(humids, converter)
.as_source("weather_data")
)
sink = CollectSink(name="sink", sink_pad_names=["temp", "humid"])
pipeline = Pipeline()
pipeline.connect(weather_source, sink)
pipeline.run()
assert list(sink.collects["temp"]) == [68.0, 77.0, 86.0]
assert list(sink.collects["humid"]) == ["50%", "60%", "70%"]
Factory Pattern¶
Create parameterized factories for reusable compositions:
from sgn import Pipeline, Compose, IterSource, CollectSink
from sgn.transforms import CallableTransform
def create_scale_and_offset(name: str, scale: float, offset: float):
"""Factory: creates (x * scale) + offset transform."""
multiply = CallableTransform.from_callable(
name=f"{name}_mult",
callable=lambda frame: frame.data * scale if frame.data is not None else None,
output_pad_name="data", sink_pad_names=["data"],
)
add = CallableTransform.from_callable(
name=f"{name}_add",
callable=lambda frame: frame.data + offset if frame.data is not None else None,
output_pad_name="data", sink_pad_names=["data"],
)
return Compose().connect(multiply, add).as_transform(name)
to_fahrenheit = create_scale_and_offset("to_f", scale=9/5, offset=32)
source = IterSource(name="celsius", source_pad_names=["data"],
iters={"data": iter([0, 20, 100])})
sink = CollectSink(name="sink", sink_pad_names=["data"])
pipeline = Pipeline()
pipeline.connect(source, to_fahrenheit)
pipeline.connect(to_fahrenheit, sink)
pipeline.run()
assert list(sink.collects["data"]) == [32.0, 68.0, 212.0]
Tips¶
- Always handle
Nonedata in callables (appears in EOS and gap frames). connect()auto-inserts elements - no need to callinsert()separately.- Use meaningful names for composed elements to help with visualization.