Skip to content

sgn.base

Base classes for building a graph of elements and pads.

ElementLike dataclass

Bases: UniqueID


              flowchart TD
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.UniqueID --> sgn.base.ElementLike
                


              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A basic container to hold source and sink pads. The assumption is that this will be a base class for code that actually does something. It should never be subclassed directly, instead subclass SourceElement, SinkElement or TransformElement.

Parameters:

Name Type Description Default
source_pads list[SourcePad]

list, optional, The list of SourcePad objects. This must be given for SourceElements or TransformElements

list()
sink_pads list[SinkPad]

list, optional, The list of SinkPad objects. This must be given for SinkElements or TransformElements

list()
Note

Subclasses can customize pad configuration by setting class-level attributes: - static_sink_pads/static_source_pads: Pads that are always present on this element type. - allow_dynamic_sink_pads/allow_dynamic_source_pads: Boolean flags controlling whether users can provide additional pad names at instantiation. If False, only static_pads are allowed (fully fixed). If True, user-provided pads are combined with static_pads.

Source code in src/sgn/base.py
@dataclass(repr=False)
class ElementLike(UniqueID):
    """A basic container to hold source and sink pads. The assumption is that this will
    be a base class for code that actually does something. It should never be subclassed
    directly, instead subclass SourceElement, SinkElement or TransformElement.

    Args:
        source_pads:
            list, optional, The list of SourcePad objects. This must be given for
            SourceElements or TransformElements
        sink_pads:
            list, optional, The list of SinkPad objects. This must be given for
            SinkElements or TransformElements

    Note:
        Subclasses can customize pad configuration by setting class-level attributes:
            - static_sink_pads/static_source_pads: Pads that are always present
              on this element type.
            - allow_dynamic_sink_pads/allow_dynamic_source_pads: Boolean flags
              controlling whether users can provide additional pad names at
              instantiation. If False, only static_*_pads are allowed (fully
              fixed). If True, user-provided pads are combined with
              static_*_pads.
    """

    # Class-level attributes for pad configuration
    static_sink_pads: ClassVar[list[str]] = []
    static_source_pads: ClassVar[list[str]] = []
    allow_dynamic_sink_pads: ClassVar[bool] = True
    allow_dynamic_source_pads: ClassVar[bool] = True

    source_pads: list[SourcePad] = field(default_factory=list)
    sink_pads: list[SinkPad] = field(default_factory=list)
    internal_pad: InternalPad = field(init=False)
    graph: dict[Pad, set[Pad]] = field(init=False)

    def __post_init__(self):
        """Establish the graph attribute as an empty dictionary."""
        super().__post_init__()
        self.graph = {}
        self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)

    @property
    def source_pad_dict(self) -> dict[str, SourcePad]:
        """Return a dictionary of source pads with the pad name as the key."""
        return {p.name: p for p in self.source_pads}

    @property
    def sink_pad_dict(self) -> dict[str, SinkPad]:
        """Return a dictionary of sink pads with the pad name as the key."""
        return {p.name: p for p in self.sink_pads}

    @property
    def pad_list(self) -> Sequence[Pad]:
        """Return a list of all pads."""
        all_pads: list[Pad] = []
        all_pads.extend(self.source_pads)
        all_pads.extend(self.sink_pads)
        all_pads.append(self.internal_pad)
        return all_pads

    @property
    def logger(self) -> logging.Logger:
        """Return the logger scoped to this element, e.g. sgn.{name}."""
        return logger.getChild(self.name)

    def internal(self) -> None:
        """An optional method to call inbetween sink and source pads of an element, by
        default do nothing."""
        pass

logger property

Return the logger scoped to this element, e.g. sgn.{name}.

pad_list property

Return a list of all pads.

sink_pad_dict property

Return a dictionary of sink pads with the pad name as the key.

source_pad_dict property

Return a dictionary of source pads with the pad name as the key.

__post_init__()

Establish the graph attribute as an empty dictionary.

Source code in src/sgn/base.py
def __post_init__(self):
    """Establish the graph attribute as an empty dictionary."""
    super().__post_init__()
    self.graph = {}
    self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)

internal()

An optional method to call inbetween sink and source pads of an element, by default do nothing.

Source code in src/sgn/base.py
def internal(self) -> None:
    """An optional method to call inbetween sink and source pads of an element, by
    default do nothing."""
    pass

InternalPad dataclass

Bases: UniqueID, PadLike


              flowchart TD
              sgn.base.InternalPad[InternalPad]
              sgn.base.UniqueID[UniqueID]
              sgn.base.PadLike[PadLike]

                              sgn.base.UniqueID --> sgn.base.InternalPad
                
                sgn.base.PadLike --> sgn.base.InternalPad
                


              click sgn.base.InternalPad href "" "sgn.base.InternalPad"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.base.PadLike href "" "sgn.base.PadLike"
            

A pad that sits inside an element and is called between sink and source pads. Internal pads are connected in the elements internal graph according to the below (data flows top to bottom)

snk1 ... snkN (if exist) \ ... // internal (always exists) // ... \ src1 ... srcM (if exist)

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad

required
name str

str, optional, The unique name for this object

''
Source code in src/sgn/base.py
@dataclass(eq=False, repr=False)
class InternalPad(UniqueID, PadLike):
    """A pad that sits inside an element and is called between sink and source pads.
    Internal pads are connected in the elements internal graph according to the below
    (data flows top to bottom)

    snk1   ...  snkN     (if exist)
      \\   ...   //
         internal      (always exists)
      //   ...   \\
     src1  ...  srcM     (if exist)

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for
            this pad
        name:
            str, optional, The unique name for this object
    """

    def __post_init__(self):
        PadLike.__post_init__(self)
        UniqueID.__post_init__(self)

    @property
    def pad_type(self) -> str:
        return "inl"

    async def __call__(self) -> None:
        """When called, an internal pad receives a Frame from the element that the pad
        belongs to."""
        self.call()

__call__() async

When called, an internal pad receives a Frame from the element that the pad belongs to.

Source code in src/sgn/base.py
async def __call__(self) -> None:
    """When called, an internal pad receives a Frame from the element that the pad
    belongs to."""
    self.call()

PadLike dataclass

Bases: ABC


              flowchart TD
              sgn.base.PadLike[PadLike]

              

              click sgn.base.PadLike href "" "sgn.base.PadLike"
            

Pads are 1:1 with graph nodes but source and sink pads must be grouped into elements in order to exchange data from sink->source. source->sink exchanges happen between elements.

A pad must belong to an element and that element must be provided as a keyword argument called "element". The element must also provide a call function that will be executed when the pad is called. The call function must take a pad as an argument, e.g., def call(pad):

Developers should not subclass or use Pad directly. Instead use SourcePad or SinkPad.

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad

required
Source code in src/sgn/base.py
@dataclass(eq=False, repr=False)
class PadLike(ABC):
    """Pads are 1:1 with graph nodes but source and sink pads must be grouped into
    elements in order to exchange data from sink->source.  source->sink exchanges happen
    between elements.

    A pad must belong to an element and that element must be provided as a
    keyword argument called "element".  The element must also provide a call
    function that will be executed when the pad is called. The call function
    must take a pad as an argument, e.g., def call(pad):

    Developers should not subclass or use Pad directly. Instead use SourcePad
    or SinkPad.

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for
            this pad
    """

    element: Element
    call: Callable
    is_linked: bool = False

    def __post_init__(self):
        self.pad_name = self.name
        self.name = f"{self.element.name}:{self.pad_type}:{self.pad_name}"

    @abstractmethod
    async def __call__(self) -> None:
        """The call method for a pad must be implemented by the element that the pad
        belongs to.

        This method will be called when the pad is called in the graph.
        """
        ...

    @property
    @abstractmethod
    def pad_type(self) -> str:
        """The pad's type string representation."""
        ...

pad_type abstractmethod property

The pad's type string representation.

__call__() abstractmethod async

The call method for a pad must be implemented by the element that the pad belongs to.

This method will be called when the pad is called in the graph.

Source code in src/sgn/base.py
@abstractmethod
async def __call__(self) -> None:
    """The call method for a pad must be implemented by the element that the pad
    belongs to.

    This method will be called when the pad is called in the graph.
    """
    ...

SinkElement dataclass

Bases: ABC, ElementLike, Generic[FrameLike]


              flowchart TD
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                



              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

Sink element represents a terminal node in a pipeline, that typically writes data to disk, etc. Sink_pads must exist but not source_pads.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
sink_pad_names Sequence[str]

list, optional, Set the list of sink pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":sink:"

list()
Source code in src/sgn/base.py
@dataclass(kw_only=True)
class SinkElement(ABC, ElementLike, Generic[FrameLike]):
    """Sink element represents a terminal node in a pipeline, that typically writes data
    to disk, etc. Sink_pads must exist but not source_pads.

    Args:
        name:
            str, optional, The unique name for this object
        sink_pad_names:
            list, optional, Set the list of sink pad names. These need to be unique for
            an element but not for an application. The resulting full names will be
            made with "<self.name>:sink:<sink_pad_name>"
    """

    sink_pad_names: Sequence[str] = field(default_factory=list)

    def __init_subclass__(cls, **kwargs):
        """Validate pad configuration at class definition time."""
        super().__init_subclass__(**kwargs)

        # Check: allow_dynamic_*_pads=False requires static_*_pads to be set
        has_static_sink = _has_static_pads(cls, "static_sink_pads")
        allow_dynamic_sink = getattr(cls, "allow_dynamic_sink_pads", True)

        if not allow_dynamic_sink and not has_static_sink:
            raise TypeError(
                f"Element '{cls.__name__}' has allow_dynamic_sink_pads=False but "
                f"does not define static_sink_pads. Elements must have at least one "
                f"way to provide pads."
            )

    def __post_init__(self):
        """Establish the sink pads and graph attributes."""
        super().__post_init__()

        # Check if user provided pads when allow_dynamic is False
        if not self.allow_dynamic_sink_pads and self.sink_pad_names:
            raise ValueError(
                f"Element '{self.name}' has allow_dynamic_sink_pads=False. "
                f"Cannot specify sink_pad_names."
            )

        # Add static pads to user-provided pads
        self.sink_pad_names = list(self.sink_pad_names) + list(self.static_sink_pads)

        self.sink_pads = [
            SinkPad(
                name=pad_name,
                element=self,
                call=self.pull,
            )
            for pad_name in self.sink_pad_names
        ]
        # short names for easier recall
        self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self._at_eos = {p: False for p in self.sink_pads}
        assert self.sink_pads, "SinkElement must specify sink pads"
        assert not self.source_pads, "SinkElement must not specify any source pads"
        self.sink_pad_names_full = [p.name for p in self.sink_pads]

        # Update graph to be (all sinks -> internal)
        self.graph.update({self.internal_pad: set(self.sink_pads)})

    @property
    def at_eos(self) -> bool:
        """If frames on any sink pads are End of Stream (EOS), then mark this whole
        element as EOS.

        Returns:
            bool, True if any sink pad is at EOS, False otherwise
        """
        # TODO generalize this to be able to choose any v. all EOS propagation
        return any(self._at_eos.values())

    def mark_eos(self, pad: SinkPad) -> None:
        """Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals
        that no more frames will be received on this pad.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the
        """
        self._at_eos[pad] = True

    @abstractmethod
    def pull(self, pad: SinkPad, frame: FrameLike) -> None:
        """Pull for a SinkElement represents the action of associating a frame with a
        particular input source pad a frame. This function must be provided by the
        subclass, and is where any "final" behavior must occur, e.g. writing to disk,
        etc.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the frame
            frame:
                Frame, The frame that is being received
        """
        ...

at_eos property

If frames on any sink pads are End of Stream (EOS), then mark this whole element as EOS.

Returns:

Type Description
bool

bool, True if any sink pad is at EOS, False otherwise

__init_subclass__(**kwargs)

Validate pad configuration at class definition time.

Source code in src/sgn/base.py
def __init_subclass__(cls, **kwargs):
    """Validate pad configuration at class definition time."""
    super().__init_subclass__(**kwargs)

    # Check: allow_dynamic_*_pads=False requires static_*_pads to be set
    has_static_sink = _has_static_pads(cls, "static_sink_pads")
    allow_dynamic_sink = getattr(cls, "allow_dynamic_sink_pads", True)

    if not allow_dynamic_sink and not has_static_sink:
        raise TypeError(
            f"Element '{cls.__name__}' has allow_dynamic_sink_pads=False but "
            f"does not define static_sink_pads. Elements must have at least one "
            f"way to provide pads."
        )

__post_init__()

Establish the sink pads and graph attributes.

Source code in src/sgn/base.py
def __post_init__(self):
    """Establish the sink pads and graph attributes."""
    super().__post_init__()

    # Check if user provided pads when allow_dynamic is False
    if not self.allow_dynamic_sink_pads and self.sink_pad_names:
        raise ValueError(
            f"Element '{self.name}' has allow_dynamic_sink_pads=False. "
            f"Cannot specify sink_pad_names."
        )

    # Add static pads to user-provided pads
    self.sink_pad_names = list(self.sink_pad_names) + list(self.static_sink_pads)

    self.sink_pads = [
        SinkPad(
            name=pad_name,
            element=self,
            call=self.pull,
        )
        for pad_name in self.sink_pad_names
    ]
    # short names for easier recall
    self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
    self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
    self._at_eos = {p: False for p in self.sink_pads}
    assert self.sink_pads, "SinkElement must specify sink pads"
    assert not self.source_pads, "SinkElement must not specify any source pads"
    self.sink_pad_names_full = [p.name for p in self.sink_pads]

    # Update graph to be (all sinks -> internal)
    self.graph.update({self.internal_pad: set(self.sink_pads)})

mark_eos(pad)

Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals that no more frames will be received on this pad.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the

required
Source code in src/sgn/base.py
def mark_eos(self, pad: SinkPad) -> None:
    """Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals
    that no more frames will be received on this pad.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the
    """
    self._at_eos[pad] = True

pull(pad, frame) abstractmethod

Pull for a SinkElement represents the action of associating a frame with a particular input source pad a frame. This function must be provided by the subclass, and is where any "final" behavior must occur, e.g. writing to disk, etc.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the frame

required
frame FrameLike

Frame, The frame that is being received

required
Source code in src/sgn/base.py
@abstractmethod
def pull(self, pad: SinkPad, frame: FrameLike) -> None:
    """Pull for a SinkElement represents the action of associating a frame with a
    particular input source pad a frame. This function must be provided by the
    subclass, and is where any "final" behavior must occur, e.g. writing to disk,
    etc.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the frame
        frame:
            Frame, The frame that is being received
    """
    ...

SinkPad dataclass

Bases: UniqueID, PadLike


              flowchart TD
              sgn.base.SinkPad[SinkPad]
              sgn.base.UniqueID[UniqueID]
              sgn.base.PadLike[PadLike]

                              sgn.base.UniqueID --> sgn.base.SinkPad
                
                sgn.base.PadLike --> sgn.base.SinkPad
                


              click sgn.base.SinkPad href "" "sgn.base.SinkPad"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.base.PadLike href "" "sgn.base.PadLike"
            

A pad that receives a data Frame when called. When linked, it returns a dictionary suitable for building a graph in graphlib.

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad, takes two arguments, the pad and the frame

required
name str

str, optional, The unique name for this object

''
other SourcePad | None

SourcePad, optional, This holds the source pad that is linked to this sink pad, default None

None
input Frame | None

Frame, optional, This holds the Frame provided by the linked source pad. Generally it gets set when this SinkPad is called, default None

None
data_spec DataSpec | None

DataSpec, optional, This holds a specification for the data stored in frames, and is expected to be consistent across frames passing through this pad. This is set when this sink pad is first called

None
Source code in src/sgn/base.py
@dataclass(eq=False, repr=False)
class SinkPad(UniqueID, PadLike):
    """A pad that receives a data Frame when called.  When linked, it returns a
    dictionary suitable for building a graph in graphlib.

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for this
            pad, takes two arguments, the pad and the frame
        name:
            str, optional, The unique name for this object
        other:
            SourcePad, optional, This holds the source pad that is linked to this sink
            pad, default None
        input:
            Frame, optional, This holds the Frame provided by the linked source pad.
            Generally it gets set when this SinkPad is called, default None
        data_spec:
            DataSpec, optional, This holds a specification for the data stored
            in frames, and is expected to be consistent across frames passing
            through this pad. This is set when this sink pad is first called
    """

    other: SourcePad | None = None
    input: Frame | None = None
    data_spec: DataSpec | None = None

    def __post_init__(self):
        PadLike.__post_init__(self)
        UniqueID.__post_init__(self)

    @property
    def pad_type(self) -> str:
        return "snk"

    def link(self, other: SourcePad) -> dict[Pad, set[Pad]]:
        """Returns a dictionary of dependencies suitable for adding to a graphlib graph.

        Args:
            other:
                SourcePad, The source pad to link to this sink pad

        Notes:
            Many-to-one (source, sink) Not Supported:
                Only sink pads can be linked. A sink pad can be linked to only one
                source pad, but multiple sink pads may link to the same source pad.

        Returns:
            dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for
            adding to a graphlib graph
        """
        assert isinstance(other, SourcePad), "other is not an instance of SourcePad"
        self.other = other
        self.is_linked = True
        other.is_linked = True
        return {self: {other}}

    async def __call__(self) -> None:
        """When called, a sink pad gets a Frame from the linked source pad and then
        calls the element's provided call function.

        Notes:
            Pad Call Order:
                pads must be called in the correct order such that the upstream sources
                have new information by the time call is invoked. This should be done
                within a directed acyclic graph such as those provided by the
                apps.Pipeline class.
        """
        assert isinstance(self.other, SourcePad), "Sink pad has not been linked"
        self.input = self.other.output
        assert isinstance(self.input, Frame)
        if self.data_spec is None:
            self.data_spec = self.input.spec
        if not self.data_spec == self.input.spec:
            msg = (
                f"frame received by {self.name} is inconsistent with "
                "previously received frames. previous data specification: "
                f"{self.data_spec}, current data specification: {self.input.spec}"
            )
            raise ValueError(msg)
        self.call(self, self.input)
        if self.element is not None:
            self.element.logger.debug("\t%s:%s", self, self.input)

__call__() async

When called, a sink pad gets a Frame from the linked source pad and then calls the element's provided call function.

Notes

Pad Call Order: pads must be called in the correct order such that the upstream sources have new information by the time call is invoked. This should be done within a directed acyclic graph such as those provided by the apps.Pipeline class.

Source code in src/sgn/base.py
async def __call__(self) -> None:
    """When called, a sink pad gets a Frame from the linked source pad and then
    calls the element's provided call function.

    Notes:
        Pad Call Order:
            pads must be called in the correct order such that the upstream sources
            have new information by the time call is invoked. This should be done
            within a directed acyclic graph such as those provided by the
            apps.Pipeline class.
    """
    assert isinstance(self.other, SourcePad), "Sink pad has not been linked"
    self.input = self.other.output
    assert isinstance(self.input, Frame)
    if self.data_spec is None:
        self.data_spec = self.input.spec
    if not self.data_spec == self.input.spec:
        msg = (
            f"frame received by {self.name} is inconsistent with "
            "previously received frames. previous data specification: "
            f"{self.data_spec}, current data specification: {self.input.spec}"
        )
        raise ValueError(msg)
    self.call(self, self.input)
    if self.element is not None:
        self.element.logger.debug("\t%s:%s", self, self.input)

Returns a dictionary of dependencies suitable for adding to a graphlib graph.

Parameters:

Name Type Description Default
other SourcePad

SourcePad, The source pad to link to this sink pad

required
Notes

Many-to-one (source, sink) Not Supported: Only sink pads can be linked. A sink pad can be linked to only one source pad, but multiple sink pads may link to the same source pad.

Returns:

Type Description
dict[Pad, set[Pad]]

dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for

dict[Pad, set[Pad]]

adding to a graphlib graph

Source code in src/sgn/base.py
def link(self, other: SourcePad) -> dict[Pad, set[Pad]]:
    """Returns a dictionary of dependencies suitable for adding to a graphlib graph.

    Args:
        other:
            SourcePad, The source pad to link to this sink pad

    Notes:
        Many-to-one (source, sink) Not Supported:
            Only sink pads can be linked. A sink pad can be linked to only one
            source pad, but multiple sink pads may link to the same source pad.

    Returns:
        dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for
        adding to a graphlib graph
    """
    assert isinstance(other, SourcePad), "other is not an instance of SourcePad"
    self.other = other
    self.is_linked = True
    other.is_linked = True
    return {self: {other}}

SourceElement dataclass

Bases: ABC, ElementLike


              flowchart TD
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                



              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

Initialize with a list of source pads. Every source pad is added to the graph with no dependencies.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
source_pad_names Sequence[str]

list, optional, Set the list of source pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":src:"

list()
Source code in src/sgn/base.py
@dataclass(repr=False, kw_only=True)
class SourceElement(ABC, ElementLike):
    """Initialize with a list of source pads. Every source pad is added to the graph
    with no dependencies.

    Args:
        name:
            str, optional, The unique name for this object
        source_pad_names:
            list, optional, Set the list of source pad names. These need to be unique
            for an element but not for an application. The resulting full names will be
            made with "<self.name>:src:<source_pad_name>"
    """

    source_pad_names: Sequence[str] = field(default_factory=list)

    def __init_subclass__(cls, **kwargs):
        """Validate pad configuration at class definition time."""
        super().__init_subclass__(**kwargs)

        # Check: allow_dynamic_*_pads=False requires static_*_pads to be set
        has_static_source = _has_static_pads(cls, "static_source_pads")
        allow_dynamic_source = getattr(cls, "allow_dynamic_source_pads", True)

        if not allow_dynamic_source and not has_static_source:
            raise TypeError(
                f"Element '{cls.__name__}' has allow_dynamic_source_pads=False but "
                f"does not define static_source_pads. Elements must have at least one "
                f"way to provide pads."
            )

    def __post_init__(self):
        """Establish the source pads and graph attributes."""
        super().__post_init__()

        # Check if user provided pads when allow_dynamic is False
        if not self.allow_dynamic_source_pads and self.source_pad_names:
            raise ValueError(
                f"Element '{self.name}' has allow_dynamic_source_pads=False. "
                f"Cannot specify source_pad_names."
            )

        # Add static pads to user-provided pads
        self.source_pad_names = list(self.source_pad_names) + list(
            self.static_source_pads
        )

        self.source_pads = [
            SourcePad(
                name=pad_name,
                element=self,
                call=self.new,
            )
            for pad_name in self.source_pad_names
        ]
        # short names for easier recall
        self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
        self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
        assert self.source_pads, "SourceElement must specify source pads"
        assert not self.sink_pads, "SourceElement must not specify sink pads"
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @abstractmethod
    def new(self, pad: SourcePad) -> Frame:
        """New frames are created on "pad". Must be provided by subclass.

        Args:
            pad:
                SourcePad, The source pad through which the frame is passed

        Returns:
            Frame, The new frame to be passed through the source pad
        """
        ...

__init_subclass__(**kwargs)

Validate pad configuration at class definition time.

Source code in src/sgn/base.py
def __init_subclass__(cls, **kwargs):
    """Validate pad configuration at class definition time."""
    super().__init_subclass__(**kwargs)

    # Check: allow_dynamic_*_pads=False requires static_*_pads to be set
    has_static_source = _has_static_pads(cls, "static_source_pads")
    allow_dynamic_source = getattr(cls, "allow_dynamic_source_pads", True)

    if not allow_dynamic_source and not has_static_source:
        raise TypeError(
            f"Element '{cls.__name__}' has allow_dynamic_source_pads=False but "
            f"does not define static_source_pads. Elements must have at least one "
            f"way to provide pads."
        )

__post_init__()

Establish the source pads and graph attributes.

Source code in src/sgn/base.py
def __post_init__(self):
    """Establish the source pads and graph attributes."""
    super().__post_init__()

    # Check if user provided pads when allow_dynamic is False
    if not self.allow_dynamic_source_pads and self.source_pad_names:
        raise ValueError(
            f"Element '{self.name}' has allow_dynamic_source_pads=False. "
            f"Cannot specify source_pad_names."
        )

    # Add static pads to user-provided pads
    self.source_pad_names = list(self.source_pad_names) + list(
        self.static_source_pads
    )

    self.source_pads = [
        SourcePad(
            name=pad_name,
            element=self,
            call=self.new,
        )
        for pad_name in self.source_pad_names
    ]
    # short names for easier recall
    self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
    self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
    assert self.source_pads, "SourceElement must specify source pads"
    assert not self.sink_pads, "SourceElement must not specify sink pads"
    self.graph.update({s: {self.internal_pad} for s in self.source_pads})

new(pad) abstractmethod

New frames are created on "pad". Must be provided by subclass.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad through which the frame is passed

required

Returns:

Type Description
Frame

Frame, The new frame to be passed through the source pad

Source code in src/sgn/base.py
@abstractmethod
def new(self, pad: SourcePad) -> Frame:
    """New frames are created on "pad". Must be provided by subclass.

    Args:
        pad:
            SourcePad, The source pad through which the frame is passed

    Returns:
        Frame, The new frame to be passed through the source pad
    """
    ...

SourcePad dataclass

Bases: UniqueID, PadLike


              flowchart TD
              sgn.base.SourcePad[SourcePad]
              sgn.base.UniqueID[UniqueID]
              sgn.base.PadLike[PadLike]

                              sgn.base.UniqueID --> sgn.base.SourcePad
                
                sgn.base.PadLike --> sgn.base.SourcePad
                


              click sgn.base.SourcePad href "" "sgn.base.SourcePad"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.base.PadLike href "" "sgn.base.PadLike"
            

A pad that provides a data Frame when called.

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad

required
name str

str, optional, The unique name for this object

''
output Frame | None

Frame, optional, This attribute is set to be the output Frame when the pad is called.

None
Source code in src/sgn/base.py
@dataclass(eq=False, repr=False)
class SourcePad(UniqueID, PadLike):
    """A pad that provides a data Frame when called.

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for
            this pad
        name:
            str, optional, The unique name for this object
        output:
            Frame, optional, This attribute is set to be the output Frame when the pad
            is called.
    """

    output: Frame | None = None

    def __post_init__(self):
        PadLike.__post_init__(self)
        UniqueID.__post_init__(self)

    @property
    def pad_type(self) -> str:
        return "src"

    async def __call__(self) -> None:
        """When called, a source pad receives a Frame from the element that the pad
        belongs to."""
        self.output = self.call(pad=self)
        assert isinstance(self.output, Frame)
        if self.element is not None:
            self.element.logger.debug("\t%s : %s", self, self.output)

__call__() async

When called, a source pad receives a Frame from the element that the pad belongs to.

Source code in src/sgn/base.py
async def __call__(self) -> None:
    """When called, a source pad receives a Frame from the element that the pad
    belongs to."""
    self.output = self.call(pad=self)
    assert isinstance(self.output, Frame)
    if self.element is not None:
        self.element.logger.debug("\t%s : %s", self, self.output)

TransformElement dataclass

Bases: ABC, ElementLike, Generic[FrameLike]


              flowchart TD
              sgn.base.TransformElement[TransformElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.ElementLike --> sgn.base.TransformElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                



              click sgn.base.TransformElement href "" "sgn.base.TransformElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

Both "source_pads" and "sink_pads" must exist. The execution scheduling flow of the logic within a TransformElement is as follows: 1.) all sink pads, 2.) the internal pad, 3.) all source pads. The execution of all downstream logic will be blocked until logic in all upstream pads within the same TransformElement has exited.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
source_pad_names Sequence[str]

list, optional, Set the list of source pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":src:"

list()
sink_pad_names Sequence[str]

list, optional, Set the list of sink pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":snk:"

list()
Source code in src/sgn/base.py
@dataclass(repr=False, kw_only=True)
class TransformElement(ABC, ElementLike, Generic[FrameLike]):
    """Both "source_pads" and "sink_pads" must exist. The execution scheduling
    flow of the logic within a TransformElement is as follows: 1.) all sink
    pads, 2.) the internal pad, 3.) all source pads. The execution of all
    downstream logic will be blocked until logic in all upstream pads within
    the same TransformElement has exited.

    Args:
        name:
            str, optional, The unique name for this object
        source_pad_names:
            list, optional, Set the list of source pad names. These need to be unique
            for an element but not for an application. The resulting full names will
            be made with "<self.name>:src:<source_pad_name>"
        sink_pad_names:
            list, optional, Set the list of sink pad names. These need to be unique
            for an element but not for an application. The resulting full names will
            be made with "<self.name>:snk:<sink_pad_name>"
    """

    source_pad_names: Sequence[str] = field(default_factory=list)
    sink_pad_names: Sequence[str] = field(default_factory=list)

    def __init_subclass__(cls, **kwargs):
        """Validate pad configuration at class definition time."""
        super().__init_subclass__(**kwargs)

        # Check: allow_dynamic_*_pads=False requires static_*_pads to be set
        has_static_sink = _has_static_pads(cls, "static_sink_pads")
        allow_dynamic_sink = getattr(cls, "allow_dynamic_sink_pads", True)
        has_static_source = _has_static_pads(cls, "static_source_pads")
        allow_dynamic_source = getattr(cls, "allow_dynamic_source_pads", True)

        if not allow_dynamic_sink and not has_static_sink:
            raise TypeError(
                f"Element '{cls.__name__}' has allow_dynamic_sink_pads=False but "
                f"does not define static_sink_pads. Elements must have at least one "
                f"way to provide pads."
            )
        if not allow_dynamic_source and not has_static_source:
            raise TypeError(
                f"Element '{cls.__name__}' has allow_dynamic_source_pads=False but "
                f"does not define static_source_pads. Elements must have at least one "
                f"way to provide pads."
            )

    def __post_init__(self):
        """Establish the source pads and sink pads and graph attributes."""
        super().__post_init__()

        # Check if user provided pads when allow_dynamic is False
        if not self.allow_dynamic_sink_pads and self.sink_pad_names:
            raise ValueError(
                f"Element '{self.name}' has allow_dynamic_sink_pads=False. "
                f"Cannot specify sink_pad_names."
            )
        if not self.allow_dynamic_source_pads and self.source_pad_names:
            raise ValueError(
                f"Element '{self.name}' has allow_dynamic_source_pads=False. "
                f"Cannot specify source_pad_names."
            )

        # Add static pads to user-provided pads
        self.sink_pad_names = list(self.sink_pad_names) + list(self.static_sink_pads)
        self.source_pad_names = list(self.source_pad_names) + list(
            self.static_source_pads
        )

        self.source_pads = [
            SourcePad(
                name=pad_name,
                element=self,
                call=self.new,
            )
            for pad_name in self.source_pad_names
        ]
        self.sink_pads = [
            SinkPad(
                name=pad_name,
                element=self,
                call=self.pull,
            )
            for pad_name in self.sink_pad_names
        ]
        # short names for easier recall
        self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
        self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
        self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
        assert (
            self.source_pads and self.sink_pads
        ), "TransformElement must specify both sink and source pads"

        # Make maximal bipartite graph in two pieces
        # First, (all sinks -> internal)
        self.graph.update({self.internal_pad: set(self.sink_pads)})
        # Second, (internal -> all sources)
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @abstractmethod
    def pull(self, pad: SinkPad, frame: FrameLike) -> None:
        """Pull data from the input pads (source pads of upstream elements), must be
        implemented by subclasses.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the frame
            frame:
                Frame, The frame that is pulled from the source pad
        """
        ...

    @abstractmethod
    def new(self, pad: SourcePad) -> FrameLike:
        """New frames are created on "pad". Must be provided by subclass.

        Args:
            pad:
                SourcePad, The source pad through which the frame is passed

        Returns:
            Frame, The new frame to be passed through the source pad
        """
        ...

__init_subclass__(**kwargs)

Validate pad configuration at class definition time.

Source code in src/sgn/base.py
def __init_subclass__(cls, **kwargs):
    """Validate pad configuration at class definition time."""
    super().__init_subclass__(**kwargs)

    # Check: allow_dynamic_*_pads=False requires static_*_pads to be set
    has_static_sink = _has_static_pads(cls, "static_sink_pads")
    allow_dynamic_sink = getattr(cls, "allow_dynamic_sink_pads", True)
    has_static_source = _has_static_pads(cls, "static_source_pads")
    allow_dynamic_source = getattr(cls, "allow_dynamic_source_pads", True)

    if not allow_dynamic_sink and not has_static_sink:
        raise TypeError(
            f"Element '{cls.__name__}' has allow_dynamic_sink_pads=False but "
            f"does not define static_sink_pads. Elements must have at least one "
            f"way to provide pads."
        )
    if not allow_dynamic_source and not has_static_source:
        raise TypeError(
            f"Element '{cls.__name__}' has allow_dynamic_source_pads=False but "
            f"does not define static_source_pads. Elements must have at least one "
            f"way to provide pads."
        )

__post_init__()

Establish the source pads and sink pads and graph attributes.

Source code in src/sgn/base.py
def __post_init__(self):
    """Establish the source pads and sink pads and graph attributes."""
    super().__post_init__()

    # Check if user provided pads when allow_dynamic is False
    if not self.allow_dynamic_sink_pads and self.sink_pad_names:
        raise ValueError(
            f"Element '{self.name}' has allow_dynamic_sink_pads=False. "
            f"Cannot specify sink_pad_names."
        )
    if not self.allow_dynamic_source_pads and self.source_pad_names:
        raise ValueError(
            f"Element '{self.name}' has allow_dynamic_source_pads=False. "
            f"Cannot specify source_pad_names."
        )

    # Add static pads to user-provided pads
    self.sink_pad_names = list(self.sink_pad_names) + list(self.static_sink_pads)
    self.source_pad_names = list(self.source_pad_names) + list(
        self.static_source_pads
    )

    self.source_pads = [
        SourcePad(
            name=pad_name,
            element=self,
            call=self.new,
        )
        for pad_name in self.source_pad_names
    ]
    self.sink_pads = [
        SinkPad(
            name=pad_name,
            element=self,
            call=self.pull,
        )
        for pad_name in self.sink_pad_names
    ]
    # short names for easier recall
    self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
    self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
    self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
    self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
    assert (
        self.source_pads and self.sink_pads
    ), "TransformElement must specify both sink and source pads"

    # Make maximal bipartite graph in two pieces
    # First, (all sinks -> internal)
    self.graph.update({self.internal_pad: set(self.sink_pads)})
    # Second, (internal -> all sources)
    self.graph.update({s: {self.internal_pad} for s in self.source_pads})

new(pad) abstractmethod

New frames are created on "pad". Must be provided by subclass.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad through which the frame is passed

required

Returns:

Type Description
FrameLike

Frame, The new frame to be passed through the source pad

Source code in src/sgn/base.py
@abstractmethod
def new(self, pad: SourcePad) -> FrameLike:
    """New frames are created on "pad". Must be provided by subclass.

    Args:
        pad:
            SourcePad, The source pad through which the frame is passed

    Returns:
        Frame, The new frame to be passed through the source pad
    """
    ...

pull(pad, frame) abstractmethod

Pull data from the input pads (source pads of upstream elements), must be implemented by subclasses.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the frame

required
frame FrameLike

Frame, The frame that is pulled from the source pad

required
Source code in src/sgn/base.py
@abstractmethod
def pull(self, pad: SinkPad, frame: FrameLike) -> None:
    """Pull data from the input pads (source pads of upstream elements), must be
    implemented by subclasses.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the frame
        frame:
            Frame, The frame that is pulled from the source pad
    """
    ...

UniqueID dataclass

Generic class from which all classes that participate in an execution graph should be derived. Enforces a unique name and hashes based on that name.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object, defaults to the objects unique uuid4 hex string if not specified

''
Source code in src/sgn/base.py
@dataclass
class UniqueID:
    """Generic class from which all classes that participate in an execution graph
    should be derived. Enforces a unique name and hashes based on that name.

    Args:
        name:
            str, optional, The unique name for this object, defaults to the objects
            unique uuid4 hex string if not specified
    """

    name: str = ""
    _id: str = field(init=False)

    def __post_init__(self):
        """Handle setup of the UniqueID class, including the `._id` attribute."""
        # give every element a truly unique identifier
        self._id = uuid.uuid4().hex
        if not self.name:
            self.name = self._id

    def __hash__(self) -> int:
        """Compute the hash of the object based on the unique id.

        Notes:
            Motivation:
                we need the Base class to be hashable, so that it can be
                used as a key in a dictionary, but mutable dataclasses are not
                hashable by default, so we have to define our own hash function
                here.
            Stability:
                As currently implemented, the hash of a UniqueID object will not be
                stable across python sessions, and should therefore not be used for
                checksum purposes.

        Returns:
            int, hash of the object
        """
        return hash(self._id)

    def __eq__(self, other) -> bool:
        """Check if two objects are equal based on their unique id and types."""
        return hash(self) == hash(other)

__eq__(other)

Check if two objects are equal based on their unique id and types.

Source code in src/sgn/base.py
def __eq__(self, other) -> bool:
    """Check if two objects are equal based on their unique id and types."""
    return hash(self) == hash(other)

__hash__()

Compute the hash of the object based on the unique id.

Notes

Motivation: we need the Base class to be hashable, so that it can be used as a key in a dictionary, but mutable dataclasses are not hashable by default, so we have to define our own hash function here. Stability: As currently implemented, the hash of a UniqueID object will not be stable across python sessions, and should therefore not be used for checksum purposes.

Returns:

Type Description
int

int, hash of the object

Source code in src/sgn/base.py
def __hash__(self) -> int:
    """Compute the hash of the object based on the unique id.

    Notes:
        Motivation:
            we need the Base class to be hashable, so that it can be
            used as a key in a dictionary, but mutable dataclasses are not
            hashable by default, so we have to define our own hash function
            here.
        Stability:
            As currently implemented, the hash of a UniqueID object will not be
            stable across python sessions, and should therefore not be used for
            checksum purposes.

    Returns:
        int, hash of the object
    """
    return hash(self._id)

__post_init__()

Handle setup of the UniqueID class, including the ._id attribute.

Source code in src/sgn/base.py
def __post_init__(self):
    """Handle setup of the UniqueID class, including the `._id` attribute."""
    # give every element a truly unique identifier
    self._id = uuid.uuid4().hex
    if not self.name:
        self.name = self._id