Skip to content

sgn.sinks

Sink elements for the SGN framework.

CollectSink dataclass

Bases: SinkElement


              flowchart TD
              sgn.sinks.CollectSink[CollectSink]
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.SinkElement --> sgn.sinks.CollectSink
                                sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.sinks.CollectSink href "" "sgn.sinks.CollectSink"
              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A sink element that has one collection per sink pad.

Each frame that is pulled into the sink is added to the collection for that pad using a ".append" method. If the extract_data flag is set, the data is extracted from the frame and added to the deque, otherwise the frame itself is added to the collection.

Parameters:

Name Type Description Default
collects dict[str, MutableSequence]

dict[str, Collection], a mapping of sink pad names to Collections. The Collection must have an append method.

dict()
extract_data bool

bool, default True, flag to indicate if the data should be extracted from the frame before adding it to the deque

True
skip_empty bool

bool, default True, flag to indicate the frame should be skipped and not collected if its data payload is None.

True
Source code in sgn/sinks.py
@dataclass
class CollectSink(SinkElement):
    """A sink element that has one collection per sink pad.

    Each frame that is pulled into the sink is added to the collection
    for that pad using a ".append" method. If the extract_data flag is
    set, the data is extracted from the frame and added to the deque,
    otherwise the frame itself is added to the collection.

    Args:
        collects:
            dict[str, Collection], a mapping of sink pad names to
            Collections. The Collection must have an append method.
        extract_data:
            bool, default True, flag to indicate if the data should be
            extracted from the frame before adding it to the deque
        skip_empty:
            bool, default True, flag to indicate the frame should be
            skipped and not collected if its `data` payload is None.

    """

    collects: dict[str, MutableSequence] = field(default_factory=dict)
    extract_data: bool = True
    skip_empty: bool = True
    collection_factory: Callable = list

    def __post_init__(self):
        super().__post_init__()
        if not self.collects:
            self.collects = {
                name: self.collection_factory() for name in self.sink_pad_names
            }
        else:
            self.collects = {
                name: self.collection_factory(iterable)
                for name, iterable in self.collects.items()
            }
        assert set(self.collects) == set(
            self.sink_pad_names
        ), "The `collects` attribute keys should match sink_pad_names"

    def _extract_data(self, frame: Frame):
        """Extract data from frame if extract_data is True.

        Args:
            frame:
                Frame, the frame to extract data from

        Returns:
            The data from the frame if extract_data is True, otherwise
            the frame itself.
        """
        if self.extract_data:
            # TODO remove this hack when TSFrames agree to use the "data" attribute
            #  of the base Frame class and treat "buffers" as an alias.
            if hasattr(frame, "buffers"):
                return frame.buffers

            # Otherwise, return the data attribute
            return frame.data

        # Return the frame itself if extract_data is False
        return frame

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        """Pull in frame and add it to pad collection.

        Args:
            pad:
                SinkPad, the pad that the frame is pulled into
            frame:
                Frame, the frame that is pulled into the sink
        """
        if frame.EOS:
            self.mark_eos(pad)
        if self.skip_empty and frame.is_gap:
            return
        self.collects[self.rsnks[pad]].append(self._extract_data(frame))

pull(pad, frame)

Pull in frame and add it to pad collection.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the pad that the frame is pulled into

required
frame Frame

Frame, the frame that is pulled into the sink

required
Source code in sgn/sinks.py
def pull(self, pad: SinkPad, frame: Frame) -> None:
    """Pull in frame and add it to pad collection.

    Args:
        pad:
            SinkPad, the pad that the frame is pulled into
        frame:
            Frame, the frame that is pulled into the sink
    """
    if frame.EOS:
        self.mark_eos(pad)
    if self.skip_empty and frame.is_gap:
        return
    self.collects[self.rsnks[pad]].append(self._extract_data(frame))

DequeSink dataclass

Bases: CollectSink


              flowchart TD
              sgn.sinks.DequeSink[DequeSink]
              sgn.sinks.CollectSink[CollectSink]
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.sinks.CollectSink --> sgn.sinks.DequeSink
                                sgn.base.SinkElement --> sgn.sinks.CollectSink
                                sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                





              click sgn.sinks.DequeSink href "" "sgn.sinks.DequeSink"
              click sgn.sinks.CollectSink href "" "sgn.sinks.CollectSink"
              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A sink element that has one double-ended-queue (deque) per sink pad.

Each frame that is pulled into the sink is added to the deque for that pad. If the extract_data flag is set, the data is extracted from the frame and added to the deque , otherwise the frame itself is added to the deque.

Parameters:

Name Type Description Default
collects dict[str, MutableSequence]

dict[str, deque], a mapping of sink pads to deques, where the key is the pad name and the value is the deque

dict()
extract_data bool

bool, default True, flag to indicate if the data should be extracted from the frame before adding it to the deque

True
Notes

Ignoring empty frames: If the frame is empty, it is not added to the deque. The motivating principle is that "empty frames preserve the sink deque". An empty deque is equivalent (for our purposes) to a deque filled with "None" values, so we prevent the latter from being possible.

Source code in sgn/sinks.py
@dataclass
class DequeSink(CollectSink):
    """A sink element that has one double-ended-queue (deque) per sink pad.

    Each frame that is pulled into the sink is added to the deque for
    that pad.  If the extract_data flag is set, the data is extracted
    from the frame and added to the deque , otherwise the frame itself
    is added to the deque.

    Args:
        collects:
            dict[str, deque], a mapping of sink pads to deques, where
            the key is the pad name and the value is the deque

        extract_data:
            bool, default True, flag to indicate if the data should be
            extracted from the frame before adding it to the deque

    Notes:
        Ignoring empty frames:
            If the frame is empty, it is not added to the deque. The
            motivating principle is that "empty frames preserve the
            sink deque". An empty deque is equivalent (for our
            purposes) to a deque filled with "None" values, so we
            prevent the latter from being possible.

    """

    collection_factory: Callable = deque

    @property
    def deques(self) -> dict[str, MutableSequence]:
        """Explicit alias for collects.

        Returns:
            dict[str, deque ]: the deques for the sink
        """
        return self.collects

deques property

Explicit alias for collects.

Returns:

Type Description
dict[str, MutableSequence]

dict[str, deque ]: the deques for the sink

NullSink dataclass

Bases: SinkElement


              flowchart TD
              sgn.sinks.NullSink[NullSink]
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.SinkElement --> sgn.sinks.NullSink
                                sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.sinks.NullSink href "" "sgn.sinks.NullSink"
              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A sink that does precisely nothing.

It is useful for testing and debugging, or for pipelines that do not need a sink, but require one to be present in the pipeline.

Parameters:

Name Type Description Default
verbose bool

bool, print frames as they pass through the internal pad

False
Source code in sgn/sinks.py
@dataclass
class NullSink(SinkElement):
    """A sink that does precisely nothing.

    It is useful for testing and debugging, or for pipelines that do
    not need a sink, but require one to be present in the pipeline.

    Args:
        verbose:
            bool, print frames as they pass through the internal pad

    """

    verbose: bool = False

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        """Do nothing on pull.

        Args:
            pad:
                SinkPad, the pad that the frame is pulled into
            frame:
                Frame, the frame that is pulled into the sink
        """
        if frame.EOS:
            self.mark_eos(pad)
        if self.verbose is True:
            print(frame)

pull(pad, frame)

Do nothing on pull.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the pad that the frame is pulled into

required
frame Frame

Frame, the frame that is pulled into the sink

required
Source code in sgn/sinks.py
def pull(self, pad: SinkPad, frame: Frame) -> None:
    """Do nothing on pull.

    Args:
        pad:
            SinkPad, the pad that the frame is pulled into
        frame:
            Frame, the frame that is pulled into the sink
    """
    if frame.EOS:
        self.mark_eos(pad)
    if self.verbose is True:
        print(frame)