Skip to content

Element Composition

The Compose class lets you combine multiple elements into a single reusable unit. The resulting composed element behaves like any other element.

Composition Types

Compose with Finalize with Result
Source + Transform(s) .as_source() ComposedSourceElement
Transform(s) .as_transform() ComposedTransformElement
Transform(s) + Sink .as_sink() ComposedSinkElement

Basic Composition

from sgn import Pipeline, Compose, IterSource, CollectSink
from sgn.transforms import CallableTransform

source = IterSource(
    name="src",
    source_pad_names=["data"],
    iters={"data": iter([1, 2, 3, 4, 5])},
    eos_on_empty={"data": True},
)

double = CallableTransform.from_callable(
    name="double",
    callable=lambda frame: frame.data * 2 if frame.data is not None else None,
    output_pad_name="data",
    sink_pad_names=["data"],
)

add_ten = CallableTransform.from_callable(
    name="add_ten",
    callable=lambda frame: frame.data + 10 if frame.data is not None else None,
    output_pad_name="data",
    sink_pad_names=["data"],
)

# Compose: source -> double -> add_ten as a single source element
composed_source = (
    Compose()
    .connect(source, double)
    .connect(double, add_ten)
    .as_source(name="processed_source")
)

sink = CollectSink(name="sink", sink_pad_names=["data"])
pipeline = Pipeline()
pipeline.connect(composed_source, sink)
pipeline.run()

assert list(sink.collects["data"]) == [12, 14, 16, 18, 20]

All Three Types Together

from sgn import Pipeline, Compose, IterSource, CollectSink
from sgn.transforms import CallableTransform

# ComposedSourceElement: source that doubles values
raw_source = IterSource(
    name="raw", source_pad_names=["data"],
    iters={"data": iter([1, 2, 3, 4, 5])},
)
double_t = CallableTransform.from_callable(
    name="double", callable=lambda f: f.data * 2 if f.data is not None else None,
    output_pad_name="data", sink_pad_names=["data"],
)
composed_source = Compose().connect(raw_source, double_t).as_source("doubled_source")

# ComposedTransformElement: adds 100 then negates
add_100 = CallableTransform.from_callable(
    name="add_100", callable=lambda f: f.data + 100 if f.data is not None else None,
    output_pad_name="data", sink_pad_names=["data"],
)
negate = CallableTransform.from_callable(
    name="negate", callable=lambda f: -f.data if f.data is not None else None,
    output_pad_name="data", sink_pad_names=["data"],
)
composed_transform = Compose().connect(add_100, negate).as_transform("add_and_negate")

# ComposedSinkElement: formats before collecting
formatter = CallableTransform.from_callable(
    name="format",
    callable=lambda f: f"result={f.data}" if f.data is not None else None,
    output_pad_name="data", sink_pad_names=["data"],
)
collector = CollectSink(name="collector", sink_pad_names=["data"])
composed_sink = Compose().connect(formatter, collector).as_sink("formatting_sink")

# Wire it all up
pipeline = Pipeline()
pipeline.connect(composed_source, composed_transform)
pipeline.connect(composed_transform, composed_sink)
pipeline.run()

assert list(collector.collects["data"]) == [
    "result=-102", "result=-104", "result=-106", "result=-108", "result=-110"
]

Merging Multiple Sources

from sgn import Pipeline, Compose, IterSource, CollectSink
from sgn.transforms import CallableTransform

temps = IterSource(name="temps", source_pad_names=["temp"],
                   iters={"temp": iter([20, 25, 30])}, eos_on_empty={"temp": True})
humids = IterSource(name="humid", source_pad_names=["humid"],
                    iters={"humid": iter([50, 60, 70])}, eos_on_empty={"humid": True})

converter = CallableTransform.from_combinations(
    name="convert",
    combos=[
        (("temp",), lambda f: f.data * 9/5 + 32 if f.data is not None else None, "temp"),
        (("humid",), lambda f: f"{f.data}%" if f.data is not None else None, "humid"),
    ],
)

weather_source = (
    Compose()
    .connect(temps, converter)
    .connect(humids, converter)
    .as_source("weather_data")
)

sink = CollectSink(name="sink", sink_pad_names=["temp", "humid"])
pipeline = Pipeline()
pipeline.connect(weather_source, sink)
pipeline.run()

assert list(sink.collects["temp"]) == [68.0, 77.0, 86.0]
assert list(sink.collects["humid"]) == ["50%", "60%", "70%"]

Factory Pattern

Create parameterized factories for reusable compositions:

from sgn import Pipeline, Compose, IterSource, CollectSink
from sgn.transforms import CallableTransform


def create_scale_and_offset(name: str, scale: float, offset: float):
    """Factory: creates (x * scale) + offset transform."""
    multiply = CallableTransform.from_callable(
        name=f"{name}_mult",
        callable=lambda frame: frame.data * scale if frame.data is not None else None,
        output_pad_name="data", sink_pad_names=["data"],
    )
    add = CallableTransform.from_callable(
        name=f"{name}_add",
        callable=lambda frame: frame.data + offset if frame.data is not None else None,
        output_pad_name="data", sink_pad_names=["data"],
    )
    return Compose().connect(multiply, add).as_transform(name)


to_fahrenheit = create_scale_and_offset("to_f", scale=9/5, offset=32)

source = IterSource(name="celsius", source_pad_names=["data"],
                    iters={"data": iter([0, 20, 100])})
sink = CollectSink(name="sink", sink_pad_names=["data"])

pipeline = Pipeline()
pipeline.connect(source, to_fahrenheit)
pipeline.connect(to_fahrenheit, sink)
pipeline.run()

assert list(sink.collects["data"]) == [32.0, 68.0, 212.0]

Tips

  • Always handle None data in callables (appears in EOS and gap frames).
  • connect() auto-inserts elements - no need to call insert() separately.
  • Use meaningful names for composed elements to help with visualization.