Skip to content

sgn.base

Base classes for building a graph of elements and pads.

ElementLike dataclass

Bases: UniqueID


              flowchart TD
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.UniqueID --> sgn.base.ElementLike
                


              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A basic container to hold source and sink pads. The assumption is that this will be a base class for code that actually does something. It should never be subclassed directly, instead subclass SourceElement, SinkElement or TransformElement.

Parameters:

Name Type Description Default
source_pads list[SourcePad]

list, optional, The list of SourcePad objects. This must be given for SourceElements or TransformElements

list()
sink_pads list[SinkPad]

list, optional, The list of SinkPad objects. This must be given for SinkElements or TransformElements

list()
Note

Subclasses can customize pad configuration by setting class-level attributes: - static_sink_pads/static_source_pads: Pads that are always present on this element type. - allow_dynamic_sink_pads/allow_dynamic_source_pads: Boolean flags controlling whether users can provide additional pad names at instantiation. If False, only static_pads are allowed (fully fixed). If True, user-provided pads are combined with static_pads.

Source code in src/sgn/base.py
@dataclass(repr=False)
class ElementLike(UniqueID):
    """A basic container to hold source and sink pads. The assumption is that this will
    be a base class for code that actually does something. It should never be subclassed
    directly, instead subclass SourceElement, SinkElement or TransformElement.

    Args:
        source_pads:
            list, optional, The list of SourcePad objects. This must be given for
            SourceElements or TransformElements
        sink_pads:
            list, optional, The list of SinkPad objects. This must be given for
            SinkElements or TransformElements

    Note:
        Subclasses can customize pad configuration by setting class-level attributes:
            - static_sink_pads/static_source_pads: Pads that are always present
              on this element type.
            - allow_dynamic_sink_pads/allow_dynamic_source_pads: Boolean flags
              controlling whether users can provide additional pad names at
              instantiation. If False, only static_*_pads are allowed (fully
              fixed). If True, user-provided pads are combined with
              static_*_pads.
    """

    # Class-level attributes for pad configuration
    static_sink_pads: ClassVar[list[str]] = []
    static_source_pads: ClassVar[list[str]] = []
    allow_dynamic_sink_pads: ClassVar[bool] = True
    allow_dynamic_source_pads: ClassVar[bool] = True

    # Opt-in flag asserting this element's pad callbacks (``new``, ``pull``,
    # ``internal``) are safe under *parallel* execution. When the pipeline is
    # run with a thread pool, opted-in callbacks may overlap with those of
    # other opted-in elements, and an element whose multiple source pads are
    # ready in the same batch will have its callbacks invoked across those
    # pads on the pool. Because dispatch is thread-based, the GIL still
    # serializes pure-Python callback bodies; real concurrency is only gained
    # for work that releases the GIL (NumPy, blocking I/O, C extensions).
    # Setting this to True is therefore a promise that the callbacks are not
    # merely free of event-loop access but that any shared mutable state they
    # touch is properly synchronized.
    # Default False: the pipeline only dispatches an element's pad calls onto
    # its executor when the class explicitly sets this to True.
    thread_safe: ClassVar[bool] = False

    source_pads: list[SourcePad] = field(default_factory=list)
    sink_pads: list[SinkPad] = field(default_factory=list)
    internal_pad: InternalPad = field(init=False)
    graph: dict[Pad, set[Pad]] = field(init=False)

    def __post_init__(self):
        """Establish the graph attribute as an empty dictionary."""
        super().__post_init__()
        self.graph = {}
        self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)

    @property
    def source_pad_dict(self) -> dict[str, SourcePad]:
        """Return a dictionary of source pads with the pad name as the key."""
        return {p.name: p for p in self.source_pads}

    @property
    def sink_pad_dict(self) -> dict[str, SinkPad]:
        """Return a dictionary of sink pads with the pad name as the key."""
        return {p.name: p for p in self.sink_pads}

    @property
    def pad_list(self) -> Sequence[Pad]:
        """Return a list of all pads."""
        all_pads: list[Pad] = []
        all_pads.extend(self.source_pads)
        all_pads.extend(self.sink_pads)
        all_pads.append(self.internal_pad)
        return all_pads

    @property
    def logger(self) -> logging.Logger:
        """Return the logger scoped to this element, e.g. sgn.{name}."""
        return logger.getChild(self.name)

    def on_startup(self) -> None:
        """Called after the pipeline is fully constructed and validated, but before the
        first frame is processed. Override for setup that requires the full pipeline
        topology to be in place."""
        pass

    def internal(self) -> None:
        """An optional method to call inbetween sink and source pads of an element, by
        default do nothing."""
        pass

    @staticmethod
    def _validate_pad_class_config(
        cls: type, direction: Literal["sink", "source"]
    ) -> None:
        """Class-time check: allow_dynamic_<dir>_pads=False requires static pads.

        Args:
            cls: The subclass being defined
            direction: Either "sink" or "source"
        """
        static_attr = f"static_{direction}_pads"
        allow_attr = f"allow_dynamic_{direction}_pads"
        if not getattr(cls, allow_attr, True) and not _has_static_pads(
            cls, static_attr
        ):
            raise TypeError(
                f"Element '{cls.__name__}' has {allow_attr}=False but "
                f"does not define {static_attr}. Elements must have at least "
                f"one way to provide pads."
            )

    def _resolve_pad_names(
        self, direction: Literal["sink", "source"], user_names: Sequence[str]
    ) -> list[str]:
        """Validate pad config, merge user pad names with static pads, and reject
        duplicates.

        Args:
            direction: Either "sink" or "source"
            user_names: User-provided pad names from the constructor

        Returns:
            Merged list of pad names (user names followed by static names)
        """
        allow_attr = f"allow_dynamic_{direction}_pads"
        static_attr = f"static_{direction}_pads"
        if not getattr(self, allow_attr) and user_names:
            raise ValueError(
                f"Element '{self.name}' has {allow_attr}=False. "
                f"Cannot specify {direction}_pad_names."
            )
        merged = list(user_names) + list(getattr(self, static_attr))
        if len(set(merged)) != len(merged):
            raise ValueError(
                f"Element '{self.name}' has duplicate {direction} pad names: "
                f"{merged}"
            )
        return merged

logger property

Return the logger scoped to this element, e.g. sgn.{name}.

pad_list property

Return a list of all pads.

sink_pad_dict property

Return a dictionary of sink pads with the pad name as the key.

source_pad_dict property

Return a dictionary of source pads with the pad name as the key.

__post_init__()

Establish the graph attribute as an empty dictionary.

Source code in src/sgn/base.py
def __post_init__(self):
    """Establish the graph attribute as an empty dictionary."""
    super().__post_init__()
    self.graph = {}
    self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)

internal()

An optional method to call inbetween sink and source pads of an element, by default do nothing.

Source code in src/sgn/base.py
def internal(self) -> None:
    """An optional method to call inbetween sink and source pads of an element, by
    default do nothing."""
    pass

on_startup()

Called after the pipeline is fully constructed and validated, but before the first frame is processed. Override for setup that requires the full pipeline topology to be in place.

Source code in src/sgn/base.py
def on_startup(self) -> None:
    """Called after the pipeline is fully constructed and validated, but before the
    first frame is processed. Override for setup that requires the full pipeline
    topology to be in place."""
    pass

InternalPad dataclass

Bases: UniqueID, PadLike


              flowchart TD
              sgn.base.InternalPad[InternalPad]
              sgn.base.UniqueID[UniqueID]
              sgn.base.PadLike[PadLike]

                              sgn.base.UniqueID --> sgn.base.InternalPad
                
                sgn.base.PadLike --> sgn.base.InternalPad
                


              click sgn.base.InternalPad href "" "sgn.base.InternalPad"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.base.PadLike href "" "sgn.base.PadLike"
            

A pad that sits inside an element and is called between sink and source pads. Internal pads are connected in the elements internal graph according to the below (data flows top to bottom)

snk1 ... snkN (if exist) \ ... // internal (always exists) // ... \ src1 ... srcM (if exist)

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad

required
name str

str, optional, The unique name for this object

''
Source code in src/sgn/base.py
@dataclass(eq=False, repr=False)
class InternalPad(UniqueID, PadLike):
    """A pad that sits inside an element and is called between sink and source pads.
    Internal pads are connected in the elements internal graph according to the below
    (data flows top to bottom)

    snk1   ...  snkN     (if exist)
      \\   ...   //
         internal      (always exists)
      //   ...   \\
     src1  ...  srcM     (if exist)

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for
            this pad
        name:
            str, optional, The unique name for this object
    """

    def __post_init__(self):
        PadLike.__post_init__(self)
        UniqueID.__post_init__(self)

    @property
    def pad_type(self) -> str:
        return "inl"

    async def __call__(self) -> None:
        """When called, an internal pad receives a Frame from the element that the pad
        belongs to."""
        await self._dispatch()

__call__() async

When called, an internal pad receives a Frame from the element that the pad belongs to.

Source code in src/sgn/base.py
async def __call__(self) -> None:
    """When called, an internal pad receives a Frame from the element that the pad
    belongs to."""
    await self._dispatch()

PadLike dataclass

Bases: ABC


              flowchart TD
              sgn.base.PadLike[PadLike]

              

              click sgn.base.PadLike href "" "sgn.base.PadLike"
            

Pads are 1:1 with graph nodes but source and sink pads must be grouped into elements in order to exchange data from sink->source. source->sink exchanges happen between elements.

A pad must belong to an element and that element must be provided as a keyword argument called "element". The element must also provide a call function that will be executed when the pad is called. The call function must take a pad as an argument, e.g., def call(pad):

Developers should not subclass or use Pad directly. Instead use SourcePad or SinkPad.

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad

required
Source code in src/sgn/base.py
@dataclass(eq=False, repr=False)
class PadLike(ABC):
    """Pads are 1:1 with graph nodes but source and sink pads must be grouped into
    elements in order to exchange data from sink->source.  source->sink exchanges happen
    between elements.

    A pad must belong to an element and that element must be provided as a
    keyword argument called "element".  The element must also provide a call
    function that will be executed when the pad is called. The call function
    must take a pad as an argument, e.g., def call(pad):

    Developers should not subclass or use Pad directly. Instead use SourcePad
    or SinkPad.

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for
            this pad
    """

    element: Element
    call: Callable
    is_linked: bool = False

    def __post_init__(self):
        self.pad_name = self.name
        self.name = f"{self.element.name}:{self.pad_type}:{self.pad_name}"

    async def _dispatch(self, *args):
        """Run ``self.call(*args)`` on the pipeline executor when active.

        Dispatches onto the executor installed by ``Pipeline.run`` iff one
        is set and the owning element has opted in via ``thread_safe = True``.
        Otherwise calls synchronously on the event-loop thread.
        """
        executor = PIPELINE_EXECUTOR.get()
        if executor is not None and self.element.thread_safe:
            loop = asyncio.get_running_loop()
            return await loop.run_in_executor(executor, self.call, *args)
        return self.call(*args)

    @abstractmethod
    async def __call__(self) -> None:
        """The call method for a pad must be implemented by the element that the pad
        belongs to.

        This method will be called when the pad is called in the graph.
        """
        ...

    @property
    @abstractmethod
    def pad_type(self) -> str:
        """The pad's type string representation."""
        ...

pad_type abstractmethod property

The pad's type string representation.

__call__() abstractmethod async

The call method for a pad must be implemented by the element that the pad belongs to.

This method will be called when the pad is called in the graph.

Source code in src/sgn/base.py
@abstractmethod
async def __call__(self) -> None:
    """The call method for a pad must be implemented by the element that the pad
    belongs to.

    This method will be called when the pad is called in the graph.
    """
    ...

SinkElement dataclass

Bases: ABC, ElementLike, Generic[FrameLike]


              flowchart TD
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                



              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

Sink element represents a terminal node in a pipeline, that typically writes data to disk, etc. Sink_pads must exist but not source_pads.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
sink_pad_names Sequence[str]

list, optional, Set the list of sink pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":sink:"

list()
Source code in src/sgn/base.py
@dataclass(kw_only=True)
class SinkElement(ABC, ElementLike, Generic[FrameLike]):
    """Sink element represents a terminal node in a pipeline, that typically writes data
    to disk, etc. Sink_pads must exist but not source_pads.

    Args:
        name:
            str, optional, The unique name for this object
        sink_pad_names:
            list, optional, Set the list of sink pad names. These need to be unique for
            an element but not for an application. The resulting full names will be
            made with "<self.name>:sink:<sink_pad_name>"
    """

    sink_pad_names: Sequence[str] = field(default_factory=list)

    def __init_subclass__(cls, **kwargs):
        """Validate pad configuration at class definition time."""
        super().__init_subclass__(**kwargs)
        ElementLike._validate_pad_class_config(cls, "sink")

    def __post_init__(self):
        """Establish the sink pads and graph attributes."""
        super().__post_init__()

        self.sink_pad_names = self._resolve_pad_names("sink", self.sink_pad_names)
        self.sink_pads = [
            SinkPad(name=pad_name, element=self, call=self.pull)
            for pad_name in self.sink_pad_names
        ]
        # short names for easier recall
        self.snks = dict(zip(self.sink_pad_names, self.sink_pads))
        self.rsnks = {p: n for n, p in self.snks.items()}
        self._at_eos = {p: False for p in self.sink_pads}
        assert self.sink_pads, "SinkElement must specify sink pads"
        assert not self.source_pads, "SinkElement must not specify any source pads"
        self.sink_pad_names_full = [p.name for p in self.sink_pads]

        # Update graph to be (all sinks -> internal)
        self.graph.update({self.internal_pad: set(self.sink_pads)})

    @property
    def at_eos(self) -> bool:
        """If frames on any sink pads are End of Stream (EOS), then mark this whole
        element as EOS.

        Returns:
            bool, True if any sink pad is at EOS, False otherwise
        """
        # TODO generalize this to be able to choose any v. all EOS propagation
        return any(self._at_eos.values())

    def mark_eos(self, pad: SinkPad) -> None:
        """Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals
        that no more frames will be received on this pad.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the EOS signal
        """
        if pad not in self._at_eos:
            raise ValueError(f"Pad {pad.name} is not a sink pad of element {self.name}")
        self._at_eos[pad] = True

    @abstractmethod
    def pull(self, pad: SinkPad, frame: FrameLike) -> None:
        """Pull for a SinkElement represents the action of associating a frame with a
        particular input source pad a frame. This function must be provided by the
        subclass, and is where any "final" behavior must occur, e.g. writing to disk,
        etc.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the frame
            frame:
                Frame, The frame that is being received
        """
        ...

at_eos property

If frames on any sink pads are End of Stream (EOS), then mark this whole element as EOS.

Returns:

Type Description
bool

bool, True if any sink pad is at EOS, False otherwise

__init_subclass__(**kwargs)

Validate pad configuration at class definition time.

Source code in src/sgn/base.py
def __init_subclass__(cls, **kwargs):
    """Validate pad configuration at class definition time."""
    super().__init_subclass__(**kwargs)
    ElementLike._validate_pad_class_config(cls, "sink")

__post_init__()

Establish the sink pads and graph attributes.

Source code in src/sgn/base.py
def __post_init__(self):
    """Establish the sink pads and graph attributes."""
    super().__post_init__()

    self.sink_pad_names = self._resolve_pad_names("sink", self.sink_pad_names)
    self.sink_pads = [
        SinkPad(name=pad_name, element=self, call=self.pull)
        for pad_name in self.sink_pad_names
    ]
    # short names for easier recall
    self.snks = dict(zip(self.sink_pad_names, self.sink_pads))
    self.rsnks = {p: n for n, p in self.snks.items()}
    self._at_eos = {p: False for p in self.sink_pads}
    assert self.sink_pads, "SinkElement must specify sink pads"
    assert not self.source_pads, "SinkElement must not specify any source pads"
    self.sink_pad_names_full = [p.name for p in self.sink_pads]

    # Update graph to be (all sinks -> internal)
    self.graph.update({self.internal_pad: set(self.sink_pads)})

mark_eos(pad)

Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals that no more frames will be received on this pad.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the EOS signal

required
Source code in src/sgn/base.py
def mark_eos(self, pad: SinkPad) -> None:
    """Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals
    that no more frames will be received on this pad.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the EOS signal
    """
    if pad not in self._at_eos:
        raise ValueError(f"Pad {pad.name} is not a sink pad of element {self.name}")
    self._at_eos[pad] = True

pull(pad, frame) abstractmethod

Pull for a SinkElement represents the action of associating a frame with a particular input source pad a frame. This function must be provided by the subclass, and is where any "final" behavior must occur, e.g. writing to disk, etc.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the frame

required
frame FrameLike

Frame, The frame that is being received

required
Source code in src/sgn/base.py
@abstractmethod
def pull(self, pad: SinkPad, frame: FrameLike) -> None:
    """Pull for a SinkElement represents the action of associating a frame with a
    particular input source pad a frame. This function must be provided by the
    subclass, and is where any "final" behavior must occur, e.g. writing to disk,
    etc.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the frame
        frame:
            Frame, The frame that is being received
    """
    ...

SinkPad dataclass

Bases: UniqueID, PadLike


              flowchart TD
              sgn.base.SinkPad[SinkPad]
              sgn.base.UniqueID[UniqueID]
              sgn.base.PadLike[PadLike]

                              sgn.base.UniqueID --> sgn.base.SinkPad
                
                sgn.base.PadLike --> sgn.base.SinkPad
                


              click sgn.base.SinkPad href "" "sgn.base.SinkPad"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.base.PadLike href "" "sgn.base.PadLike"
            

A pad that receives a data Frame when called. When linked, it returns a dictionary suitable for building a graph in graphlib.

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad, takes two arguments, the pad and the frame

required
name str

str, optional, The unique name for this object

''
other SourcePad | None

SourcePad, optional, This holds the source pad that is linked to this sink pad, default None

None
input Frame | None

Frame, optional, This holds the Frame provided by the linked source pad. Generally it gets set when this SinkPad is called, default None

None
data_spec DataSpec | None

DataSpec, optional, This holds a specification for the data stored in frames, and is expected to be consistent across frames passing through this pad. This is set when this sink pad is first called

None
Source code in src/sgn/base.py
@dataclass(eq=False, repr=False)
class SinkPad(UniqueID, PadLike):
    """A pad that receives a data Frame when called.  When linked, it returns a
    dictionary suitable for building a graph in graphlib.

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for this
            pad, takes two arguments, the pad and the frame
        name:
            str, optional, The unique name for this object
        other:
            SourcePad, optional, This holds the source pad that is linked to this sink
            pad, default None
        input:
            Frame, optional, This holds the Frame provided by the linked source pad.
            Generally it gets set when this SinkPad is called, default None
        data_spec:
            DataSpec, optional, This holds a specification for the data stored
            in frames, and is expected to be consistent across frames passing
            through this pad. This is set when this sink pad is first called
    """

    other: SourcePad | None = None
    input: Frame | None = None
    data_spec: DataSpec | None = None

    def __post_init__(self):
        PadLike.__post_init__(self)
        UniqueID.__post_init__(self)

    @property
    def pad_type(self) -> str:
        return "snk"

    def link(self, other: SourcePad) -> dict[Pad, set[Pad]]:
        """Returns a dictionary of dependencies suitable for adding to a graphlib graph.

        Args:
            other:
                SourcePad, The source pad to link to this sink pad

        Notes:
            Many-to-one (source, sink) Not Supported:
                Only sink pads can be linked. A sink pad can be linked to only one
                source pad, but multiple sink pads may link to the same source pad.
                Calling link() on an already-linked sink pad raises ValueError.

        Returns:
            dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for
            adding to a graphlib graph
        """
        if not isinstance(other, SourcePad):
            raise TypeError(
                f"link target must be an instance of SourcePad, "
                f"got {type(other).__name__}"
            )
        if self.is_linked:
            raise ValueError(
                f"sink pad {self.name} is already linked to {self.other}; "
                "a sink pad can only be linked to one source pad"
            )
        self.other = other
        self.is_linked = True
        other.is_linked = True
        return {self: {other}}

    async def __call__(self) -> None:
        """When called, a sink pad gets a Frame from the linked source pad and then
        calls the element's provided call function.

        Notes:
            Pad Call Order:
                pads must be called in the correct order such that the upstream sources
                have new information by the time call is invoked. This should be done
                within a directed acyclic graph such as those provided by the
                apps.Pipeline class.
        """
        if not isinstance(self.other, SourcePad):
            raise RuntimeError(f"sink pad {self.name} has not been linked")
        self.input = self.other.output
        if not isinstance(self.input, Frame):
            raise TypeError(
                f"sink pad {self.name} expected a Frame from its linked source "
                f"pad, got {type(self.input).__name__}"
            )
        if self.data_spec is None:
            self.data_spec = self.input.spec
        elif self.data_spec != self.input.spec:
            raise ValueError(
                f"frame received by {self.name} is inconsistent with "
                "previously received frames. previous data specification: "
                f"{self.data_spec}, current data specification: {self.input.spec}"
            )
        testpoint.record_frame(self, self.input)
        await self._dispatch(self, self.input)
        if self.element is not None:
            self.element.logger.debug("\t%s:%s", self, self.input)

__call__() async

When called, a sink pad gets a Frame from the linked source pad and then calls the element's provided call function.

Notes

Pad Call Order: pads must be called in the correct order such that the upstream sources have new information by the time call is invoked. This should be done within a directed acyclic graph such as those provided by the apps.Pipeline class.

Source code in src/sgn/base.py
async def __call__(self) -> None:
    """When called, a sink pad gets a Frame from the linked source pad and then
    calls the element's provided call function.

    Notes:
        Pad Call Order:
            pads must be called in the correct order such that the upstream sources
            have new information by the time call is invoked. This should be done
            within a directed acyclic graph such as those provided by the
            apps.Pipeline class.
    """
    if not isinstance(self.other, SourcePad):
        raise RuntimeError(f"sink pad {self.name} has not been linked")
    self.input = self.other.output
    if not isinstance(self.input, Frame):
        raise TypeError(
            f"sink pad {self.name} expected a Frame from its linked source "
            f"pad, got {type(self.input).__name__}"
        )
    if self.data_spec is None:
        self.data_spec = self.input.spec
    elif self.data_spec != self.input.spec:
        raise ValueError(
            f"frame received by {self.name} is inconsistent with "
            "previously received frames. previous data specification: "
            f"{self.data_spec}, current data specification: {self.input.spec}"
        )
    testpoint.record_frame(self, self.input)
    await self._dispatch(self, self.input)
    if self.element is not None:
        self.element.logger.debug("\t%s:%s", self, self.input)

Returns a dictionary of dependencies suitable for adding to a graphlib graph.

Parameters:

Name Type Description Default
other SourcePad

SourcePad, The source pad to link to this sink pad

required
Notes

Many-to-one (source, sink) Not Supported: Only sink pads can be linked. A sink pad can be linked to only one source pad, but multiple sink pads may link to the same source pad. Calling link() on an already-linked sink pad raises ValueError.

Returns:

Type Description
dict[Pad, set[Pad]]

dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for

dict[Pad, set[Pad]]

adding to a graphlib graph

Source code in src/sgn/base.py
def link(self, other: SourcePad) -> dict[Pad, set[Pad]]:
    """Returns a dictionary of dependencies suitable for adding to a graphlib graph.

    Args:
        other:
            SourcePad, The source pad to link to this sink pad

    Notes:
        Many-to-one (source, sink) Not Supported:
            Only sink pads can be linked. A sink pad can be linked to only one
            source pad, but multiple sink pads may link to the same source pad.
            Calling link() on an already-linked sink pad raises ValueError.

    Returns:
        dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for
        adding to a graphlib graph
    """
    if not isinstance(other, SourcePad):
        raise TypeError(
            f"link target must be an instance of SourcePad, "
            f"got {type(other).__name__}"
        )
    if self.is_linked:
        raise ValueError(
            f"sink pad {self.name} is already linked to {self.other}; "
            "a sink pad can only be linked to one source pad"
        )
    self.other = other
    self.is_linked = True
    other.is_linked = True
    return {self: {other}}

SourceElement dataclass

Bases: ABC, ElementLike


              flowchart TD
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                



              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

Initialize with a list of source pads. Every source pad is added to the graph with no dependencies.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
source_pad_names Sequence[str]

list, optional, Set the list of source pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":src:"

list()
Source code in src/sgn/base.py
@dataclass(repr=False, kw_only=True)
class SourceElement(ABC, ElementLike):
    """Initialize with a list of source pads. Every source pad is added to the graph
    with no dependencies.

    Args:
        name:
            str, optional, The unique name for this object
        source_pad_names:
            list, optional, Set the list of source pad names. These need to be unique
            for an element but not for an application. The resulting full names will be
            made with "<self.name>:src:<source_pad_name>"
    """

    source_pad_names: Sequence[str] = field(default_factory=list)

    def __init_subclass__(cls, **kwargs):
        """Validate pad configuration at class definition time."""
        super().__init_subclass__(**kwargs)
        ElementLike._validate_pad_class_config(cls, "source")

    def __post_init__(self):
        """Establish the source pads and graph attributes."""
        super().__post_init__()

        self.source_pad_names = self._resolve_pad_names("source", self.source_pad_names)
        self.source_pads = [
            SourcePad(name=pad_name, element=self, call=self.new)
            for pad_name in self.source_pad_names
        ]
        # short names for easier recall
        self.srcs = dict(zip(self.source_pad_names, self.source_pads))
        self.rsrcs = {p: n for n, p in self.srcs.items()}
        assert self.source_pads, "SourceElement must specify source pads"
        assert not self.sink_pads, "SourceElement must not specify sink pads"
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @abstractmethod
    def new(self, pad: SourcePad) -> Frame:
        """New frames are created on "pad". Must be provided by subclass.

        Args:
            pad:
                SourcePad, The source pad through which the frame is passed

        Returns:
            Frame, The new frame to be passed through the source pad
        """
        ...

__init_subclass__(**kwargs)

Validate pad configuration at class definition time.

Source code in src/sgn/base.py
def __init_subclass__(cls, **kwargs):
    """Validate pad configuration at class definition time."""
    super().__init_subclass__(**kwargs)
    ElementLike._validate_pad_class_config(cls, "source")

__post_init__()

Establish the source pads and graph attributes.

Source code in src/sgn/base.py
def __post_init__(self):
    """Establish the source pads and graph attributes."""
    super().__post_init__()

    self.source_pad_names = self._resolve_pad_names("source", self.source_pad_names)
    self.source_pads = [
        SourcePad(name=pad_name, element=self, call=self.new)
        for pad_name in self.source_pad_names
    ]
    # short names for easier recall
    self.srcs = dict(zip(self.source_pad_names, self.source_pads))
    self.rsrcs = {p: n for n, p in self.srcs.items()}
    assert self.source_pads, "SourceElement must specify source pads"
    assert not self.sink_pads, "SourceElement must not specify sink pads"
    self.graph.update({s: {self.internal_pad} for s in self.source_pads})

new(pad) abstractmethod

New frames are created on "pad". Must be provided by subclass.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad through which the frame is passed

required

Returns:

Type Description
Frame

Frame, The new frame to be passed through the source pad

Source code in src/sgn/base.py
@abstractmethod
def new(self, pad: SourcePad) -> Frame:
    """New frames are created on "pad". Must be provided by subclass.

    Args:
        pad:
            SourcePad, The source pad through which the frame is passed

    Returns:
        Frame, The new frame to be passed through the source pad
    """
    ...

SourcePad dataclass

Bases: UniqueID, PadLike


              flowchart TD
              sgn.base.SourcePad[SourcePad]
              sgn.base.UniqueID[UniqueID]
              sgn.base.PadLike[PadLike]

                              sgn.base.UniqueID --> sgn.base.SourcePad
                
                sgn.base.PadLike --> sgn.base.SourcePad
                


              click sgn.base.SourcePad href "" "sgn.base.SourcePad"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.base.PadLike href "" "sgn.base.PadLike"
            

A pad that provides a data Frame when called.

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad

required
name str

str, optional, The unique name for this object

''
output Frame | None

Frame, optional, This attribute is set to be the output Frame when the pad is called.

None
Source code in src/sgn/base.py
@dataclass(eq=False, repr=False)
class SourcePad(UniqueID, PadLike):
    """A pad that provides a data Frame when called.

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for
            this pad
        name:
            str, optional, The unique name for this object
        output:
            Frame, optional, This attribute is set to be the output Frame when the pad
            is called.
    """

    output: Frame | None = None

    def __post_init__(self):
        PadLike.__post_init__(self)
        UniqueID.__post_init__(self)

    @property
    def pad_type(self) -> str:
        return "src"

    async def __call__(self) -> None:
        """When called, a source pad receives a Frame from the element that the pad
        belongs to."""
        self.output = await self._dispatch(self)
        testpoint.record_frame(self, self.output)
        if not isinstance(self.output, Frame):
            raise TypeError(
                f"call function for source pad {self.name} must return a Frame, "
                f"got {type(self.output).__name__}"
            )
        if self.element is not None:
            self.element.logger.debug("\t%s : %s", self, self.output)

__call__() async

When called, a source pad receives a Frame from the element that the pad belongs to.

Source code in src/sgn/base.py
async def __call__(self) -> None:
    """When called, a source pad receives a Frame from the element that the pad
    belongs to."""
    self.output = await self._dispatch(self)
    testpoint.record_frame(self, self.output)
    if not isinstance(self.output, Frame):
        raise TypeError(
            f"call function for source pad {self.name} must return a Frame, "
            f"got {type(self.output).__name__}"
        )
    if self.element is not None:
        self.element.logger.debug("\t%s : %s", self, self.output)

TransformElement dataclass

Bases: ABC, ElementLike, Generic[FrameLike]


              flowchart TD
              sgn.base.TransformElement[TransformElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.ElementLike --> sgn.base.TransformElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                



              click sgn.base.TransformElement href "" "sgn.base.TransformElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

Both "source_pads" and "sink_pads" must exist. The execution scheduling flow of the logic within a TransformElement is as follows: 1.) all sink pads, 2.) the internal pad, 3.) all source pads. The execution of all downstream logic will be blocked until logic in all upstream pads within the same TransformElement has exited.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
source_pad_names Sequence[str]

list, optional, Set the list of source pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":src:"

list()
sink_pad_names Sequence[str]

list, optional, Set the list of sink pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":snk:"

list()
Source code in src/sgn/base.py
@dataclass(repr=False, kw_only=True)
class TransformElement(ABC, ElementLike, Generic[FrameLike]):
    """Both "source_pads" and "sink_pads" must exist. The execution scheduling
    flow of the logic within a TransformElement is as follows: 1.) all sink
    pads, 2.) the internal pad, 3.) all source pads. The execution of all
    downstream logic will be blocked until logic in all upstream pads within
    the same TransformElement has exited.

    Args:
        name:
            str, optional, The unique name for this object
        source_pad_names:
            list, optional, Set the list of source pad names. These need to be unique
            for an element but not for an application. The resulting full names will
            be made with "<self.name>:src:<source_pad_name>"
        sink_pad_names:
            list, optional, Set the list of sink pad names. These need to be unique
            for an element but not for an application. The resulting full names will
            be made with "<self.name>:snk:<sink_pad_name>"
    """

    source_pad_names: Sequence[str] = field(default_factory=list)
    sink_pad_names: Sequence[str] = field(default_factory=list)

    def __init_subclass__(cls, **kwargs):
        """Validate pad configuration at class definition time."""
        super().__init_subclass__(**kwargs)
        ElementLike._validate_pad_class_config(cls, "sink")
        ElementLike._validate_pad_class_config(cls, "source")

    def __post_init__(self):
        """Establish the source pads and sink pads and graph attributes."""
        super().__post_init__()

        self.sink_pad_names = self._resolve_pad_names("sink", self.sink_pad_names)
        self.source_pad_names = self._resolve_pad_names("source", self.source_pad_names)

        self.source_pads = [
            SourcePad(name=pad_name, element=self, call=self.new)
            for pad_name in self.source_pad_names
        ]
        self.sink_pads = [
            SinkPad(name=pad_name, element=self, call=self.pull)
            for pad_name in self.sink_pad_names
        ]
        # short names for easier recall
        self.srcs = dict(zip(self.source_pad_names, self.source_pads))
        self.snks = dict(zip(self.sink_pad_names, self.sink_pads))
        self.rsrcs = {p: n for n, p in self.srcs.items()}
        self.rsnks = {p: n for n, p in self.snks.items()}
        assert (
            self.source_pads and self.sink_pads
        ), "TransformElement must specify both sink and source pads"

        # Make maximal bipartite graph in two pieces
        # First, (all sinks -> internal)
        self.graph.update({self.internal_pad: set(self.sink_pads)})
        # Second, (internal -> all sources)
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @abstractmethod
    def pull(self, pad: SinkPad, frame: FrameLike) -> None:
        """Pull data from the input pads (source pads of upstream elements), must be
        implemented by subclasses.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the frame
            frame:
                Frame, The frame that is pulled from the source pad
        """
        ...

    @abstractmethod
    def new(self, pad: SourcePad) -> FrameLike:
        """New frames are created on "pad". Must be provided by subclass.

        Args:
            pad:
                SourcePad, The source pad through which the frame is passed

        Returns:
            Frame, The new frame to be passed through the source pad
        """
        ...

__init_subclass__(**kwargs)

Validate pad configuration at class definition time.

Source code in src/sgn/base.py
def __init_subclass__(cls, **kwargs):
    """Validate pad configuration at class definition time."""
    super().__init_subclass__(**kwargs)
    ElementLike._validate_pad_class_config(cls, "sink")
    ElementLike._validate_pad_class_config(cls, "source")

__post_init__()

Establish the source pads and sink pads and graph attributes.

Source code in src/sgn/base.py
def __post_init__(self):
    """Establish the source pads and sink pads and graph attributes."""
    super().__post_init__()

    self.sink_pad_names = self._resolve_pad_names("sink", self.sink_pad_names)
    self.source_pad_names = self._resolve_pad_names("source", self.source_pad_names)

    self.source_pads = [
        SourcePad(name=pad_name, element=self, call=self.new)
        for pad_name in self.source_pad_names
    ]
    self.sink_pads = [
        SinkPad(name=pad_name, element=self, call=self.pull)
        for pad_name in self.sink_pad_names
    ]
    # short names for easier recall
    self.srcs = dict(zip(self.source_pad_names, self.source_pads))
    self.snks = dict(zip(self.sink_pad_names, self.sink_pads))
    self.rsrcs = {p: n for n, p in self.srcs.items()}
    self.rsnks = {p: n for n, p in self.snks.items()}
    assert (
        self.source_pads and self.sink_pads
    ), "TransformElement must specify both sink and source pads"

    # Make maximal bipartite graph in two pieces
    # First, (all sinks -> internal)
    self.graph.update({self.internal_pad: set(self.sink_pads)})
    # Second, (internal -> all sources)
    self.graph.update({s: {self.internal_pad} for s in self.source_pads})

new(pad) abstractmethod

New frames are created on "pad". Must be provided by subclass.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad through which the frame is passed

required

Returns:

Type Description
FrameLike

Frame, The new frame to be passed through the source pad

Source code in src/sgn/base.py
@abstractmethod
def new(self, pad: SourcePad) -> FrameLike:
    """New frames are created on "pad". Must be provided by subclass.

    Args:
        pad:
            SourcePad, The source pad through which the frame is passed

    Returns:
        Frame, The new frame to be passed through the source pad
    """
    ...

pull(pad, frame) abstractmethod

Pull data from the input pads (source pads of upstream elements), must be implemented by subclasses.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the frame

required
frame FrameLike

Frame, The frame that is pulled from the source pad

required
Source code in src/sgn/base.py
@abstractmethod
def pull(self, pad: SinkPad, frame: FrameLike) -> None:
    """Pull data from the input pads (source pads of upstream elements), must be
    implemented by subclasses.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the frame
        frame:
            Frame, The frame that is pulled from the source pad
    """
    ...

UniqueID dataclass

Generic class from which all classes that participate in an execution graph should be derived. Enforces a unique name and hashes based on that name.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object, defaults to the objects unique uuid4 hex string if not specified

''
Source code in src/sgn/base.py
@dataclass
class UniqueID:
    """Generic class from which all classes that participate in an execution graph
    should be derived. Enforces a unique name and hashes based on that name.

    Args:
        name:
            str, optional, The unique name for this object, defaults to the objects
            unique uuid4 hex string if not specified
    """

    name: str = ""
    _id: str = field(init=False)

    def __post_init__(self):
        """Handle setup of the UniqueID class, including the `._id` attribute."""
        # give every element a truly unique identifier
        self._id = uuid.uuid4().hex
        if not self.name:
            self.name = self._id

    def __hash__(self) -> int:
        """Compute the hash of the object based on the unique id.

        Notes:
            Motivation:
                we need the Base class to be hashable, so that it can be
                used as a key in a dictionary, but mutable dataclasses are not
                hashable by default, so we have to define our own hash function
                here.
            Stability:
                As currently implemented, the hash of a UniqueID object will not be
                stable across python sessions, and should therefore not be used for
                checksum purposes.

        Returns:
            int, hash of the object
        """
        return hash(self._id)

    def __eq__(self, other) -> bool:
        """Check if two objects are equal based on their unique id."""
        if not isinstance(other, UniqueID):
            return NotImplemented
        return self._id == other._id

__eq__(other)

Check if two objects are equal based on their unique id.

Source code in src/sgn/base.py
def __eq__(self, other) -> bool:
    """Check if two objects are equal based on their unique id."""
    if not isinstance(other, UniqueID):
        return NotImplemented
    return self._id == other._id

__hash__()

Compute the hash of the object based on the unique id.

Notes

Motivation: we need the Base class to be hashable, so that it can be used as a key in a dictionary, but mutable dataclasses are not hashable by default, so we have to define our own hash function here. Stability: As currently implemented, the hash of a UniqueID object will not be stable across python sessions, and should therefore not be used for checksum purposes.

Returns:

Type Description
int

int, hash of the object

Source code in src/sgn/base.py
def __hash__(self) -> int:
    """Compute the hash of the object based on the unique id.

    Notes:
        Motivation:
            we need the Base class to be hashable, so that it can be
            used as a key in a dictionary, but mutable dataclasses are not
            hashable by default, so we have to define our own hash function
            here.
        Stability:
            As currently implemented, the hash of a UniqueID object will not be
            stable across python sessions, and should therefore not be used for
            checksum purposes.

    Returns:
        int, hash of the object
    """
    return hash(self._id)

__post_init__()

Handle setup of the UniqueID class, including the ._id attribute.

Source code in src/sgn/base.py
def __post_init__(self):
    """Handle setup of the UniqueID class, including the `._id` attribute."""
    # give every element a truly unique identifier
    self._id = uuid.uuid4().hex
    if not self.name:
        self.name = self._id