Skip to content

sgnts.sinks

NullSeriesSink dataclass

Bases: TSSink


              flowchart TD
              sgnts.sinks.NullSeriesSink[NullSeriesSink]
              sgnts.base.base.TSSink[TSSink]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TSSink --> sgnts.sinks.NullSeriesSink
                                sgnts.base.base.TimeSeriesMixin --> sgnts.base.base.TSSink
                



              click sgnts.sinks.NullSeriesSink href "" "sgnts.sinks.NullSeriesSink"
              click sgnts.base.base.TSSink href "" "sgnts.base.base.TSSink"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A series sink that does precisely nothing.

Parameters:

Name Type Description Default
verbose bool

bool, print frames as they pass through the internal pad

False
Source code in sgnts/sinks/null.py
@dataclass
class NullSeriesSink(TSSink):
    """A series sink that does precisely nothing.

    Args:
        verbose:
            bool, print frames as they pass through the internal pad

    """

    verbose: bool = False

    def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
        """Print frames if verbose."""
        for sink_pad, frame in input_frames.items():
            if frame.EOS:
                self.mark_eos(sink_pad)
            if self.verbose:
                print(f"{sink_pad.name}:")
                print(f"  {frame}")
                latency = gpsnow() - Offset.tosec(
                    frame.offset + Offset.SAMPLE_STRIDE_AT_MAX_RATE
                )
                print(f"  latency: {latency} s")

process(input_frames)

Print frames if verbose.

Source code in sgnts/sinks/null.py
def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
    """Print frames if verbose."""
    for sink_pad, frame in input_frames.items():
        if frame.EOS:
            self.mark_eos(sink_pad)
        if self.verbose:
            print(f"{sink_pad.name}:")
            print(f"  {frame}")
            latency = gpsnow() - Offset.tosec(
                frame.offset + Offset.SAMPLE_STRIDE_AT_MAX_RATE
            )
            print(f"  latency: {latency} s")

DumpSeriesSink dataclass

Bases: TSSink


              flowchart TD
              sgnts.sinks.DumpSeriesSink[DumpSeriesSink]
              sgnts.base.base.TSSink[TSSink]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TSSink --> sgnts.sinks.DumpSeriesSink
                                sgnts.base.base.TimeSeriesMixin --> sgnts.base.base.TSSink
                



              click sgnts.sinks.DumpSeriesSink href "" "sgnts.sinks.DumpSeriesSink"
              click sgnts.base.base.TSSink href "" "sgnts.base.base.TSSink"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A sink element that dumps time series data to a txt file.

Parameters:

Name Type Description Default
fname str

str, output file name

required
verbose bool

bool, be verbose

False
Source code in sgnts/sinks/dump.py
@dataclass(kw_only=True)
class DumpSeriesSink(TSSink):
    """A sink element that dumps time series data to a txt file.

    Args:
        fname:
            str, output file name
        verbose:
            bool, be verbose
    """

    fname: str
    verbose: bool = False

    def configure(self) -> None:
        # overwrite existing file
        with open(self.fname, "w"):
            pass

    @validator.single_pad
    def validate(self) -> None:
        pass

    def write_to_file(self, buf) -> None:
        """Write time series data to txt file.

        Args:
            buf:
                SeriesBuffer, the buffer with time series data to write out
        """
        t0 = buf.t0
        duration = buf.duration
        data = buf.data
        # FIXME: How to write multi-dimensional data?
        data = data.reshape(-1, data.shape[-1])
        ts = np.linspace(
            t0 / Time.SECONDS,
            (t0 + duration) / Time.SECONDS,
            data.shape[-1],
            endpoint=False,
        )
        out = np.vstack([ts, data]).T
        with open(self.fname, "ab") as f:
            np.savetxt(f, out)

    @sink.single_pad
    def process(self, input_frame: TSFrame) -> None:
        """Write out time-series data."""
        if input_frame.EOS:
            self.mark_eos(self.sink_pads[0])
        if self.verbose is True:
            print(input_frame)
        for buf in input_frame:
            if not buf.is_gap:
                self.write_to_file(buf)

process(input_frame)

Write out time-series data.

Source code in sgnts/sinks/dump.py
@sink.single_pad
def process(self, input_frame: TSFrame) -> None:
    """Write out time-series data."""
    if input_frame.EOS:
        self.mark_eos(self.sink_pads[0])
    if self.verbose is True:
        print(input_frame)
    for buf in input_frame:
        if not buf.is_gap:
            self.write_to_file(buf)

write_to_file(buf)

Write time series data to txt file.

Parameters:

Name Type Description Default
buf

SeriesBuffer, the buffer with time series data to write out

required
Source code in sgnts/sinks/dump.py
def write_to_file(self, buf) -> None:
    """Write time series data to txt file.

    Args:
        buf:
            SeriesBuffer, the buffer with time series data to write out
    """
    t0 = buf.t0
    duration = buf.duration
    data = buf.data
    # FIXME: How to write multi-dimensional data?
    data = data.reshape(-1, data.shape[-1])
    ts = np.linspace(
        t0 / Time.SECONDS,
        (t0 + duration) / Time.SECONDS,
        data.shape[-1],
        endpoint=False,
    )
    out = np.vstack([ts, data]).T
    with open(self.fname, "ab") as f:
        np.savetxt(f, out)

TSFrameCollectSink dataclass

Bases: CollectSink


              flowchart TD
              sgnts.sinks.TSFrameCollectSink[TSFrameCollectSink]

              

              click sgnts.sinks.TSFrameCollectSink href "" "sgnts.sinks.TSFrameCollectSink"
            

Sink that collects input SeriesBuffers

sgn.CollectSink with an additional method out_frames that will return a dictionary, keyed by sink pad names, where the values are single TSFrames containing all buffers collected on the sink pads during pipeline operation.

Source code in sgnts/sinks/collect.py
@dataclass
class TSFrameCollectSink(CollectSink):
    """Sink that collects input SeriesBuffers

    sgn.CollectSink with an additional method `out_frames` that will
    return a dictionary, keyed by sink pad names, where the values are
    single TSFrames containing all buffers collected on the sink pads
    during pipeline operation.

    """

    def __post_init__(self):
        self.extract_data = False
        self.skip_empty = False
        super().__post_init__()

    def out_frames(self) -> dict[str, TSFrame]:
        """The collected frames."""
        out = {}
        for pad_name, frames in self.collects.items():
            buffers = []
            for frame in frames:
                buffers.extend(frame.buffers)
            out[pad_name] = TSFrame(buffers=buffers)
        return out

out_frames()

The collected frames.

Source code in sgnts/sinks/collect.py
def out_frames(self) -> dict[str, TSFrame]:
    """The collected frames."""
    out = {}
    for pad_name, frames in self.collects.items():
        buffers = []
        for frame in frames:
            buffers.extend(frame.buffers)
        out[pad_name] = TSFrame(buffers=buffers)
    return out

TSPlotSink dataclass

Bases: TSFrameCollectSink


              flowchart TD
              sgnts.sinks.TSPlotSink[TSPlotSink]
              sgnts.sinks.collect.TSFrameCollectSink[TSFrameCollectSink]

                              sgnts.sinks.collect.TSFrameCollectSink --> sgnts.sinks.TSPlotSink
                


              click sgnts.sinks.TSPlotSink href "" "sgnts.sinks.TSPlotSink"
              click sgnts.sinks.collect.TSFrameCollectSink href "" "sgnts.sinks.collect.TSFrameCollectSink"
            

Sink that collects frames and provides plotting methods.

Extends TSFrameCollectSink with convenient plotting capabilities for visualizing collected data after pipeline completion.

Example::

sink = TSPlotSink(name="detector_data", sink_pad_names=("H1", "L1"))
pipeline.connect(source, sink)
pipeline.run()

# Both pads overlaid, labeled "H1" and "L1"
fig, ax = sink.plot(time_unit="s")
ax.legend()
plt.show()

Parameters:

Name Type Description Default
name

Element name (used as default title for multi-pad plots)

required
sink_pad_names

Names of input pads (used as default labels)

required
Source code in sgnts/sinks/plot.py
@dataclass
class TSPlotSink(TSFrameCollectSink):
    """Sink that collects frames and provides plotting methods.

    Extends TSFrameCollectSink with convenient plotting capabilities
    for visualizing collected data after pipeline completion.

    Example::

        sink = TSPlotSink(name="detector_data", sink_pad_names=("H1", "L1"))
        pipeline.connect(source, sink)
        pipeline.run()

        # Both pads overlaid, labeled "H1" and "L1"
        fig, ax = sink.plot(time_unit="s")
        ax.legend()
        plt.show()

    Args:
        name: Element name (used as default title for multi-pad plots)
        sink_pad_names: Names of input pads (used as default labels)
    """

    def plot(
        self,
        pads: Optional[Sequence[str]] = None,
        layout: Literal["overlay", "subplots"] = "overlay",
        labels: Optional[dict[str, str]] = None,
        title: Optional[str] = None,
        time_unit: Literal["s", "ms", "ns", "gps"] = "gps",
        show_gaps: bool = True,
        gap_color: str = "red",
        gap_alpha: float = 0.3,
        figsize: Optional[tuple[float, float]] = None,
        ax=None,
        **kwargs,
    ):
        """Plot collected frames.

        Args:
            pads:
                Which pads to plot. Default: all pads in order they were defined.
            layout:
                How to arrange multiple pads:

                - ``"overlay"``: All pads on same axes (default)
                - ``"subplots"``: Vertical stack with shared x-axis
            labels:
                Custom labels for pads as ``{pad_name: label}``.
                Default: use pad names.
            title:
                Figure title. Default: element name if multiple pads,
                None for single pad.
            time_unit:
                Time unit for x-axis: ``"s"``, ``"ms"``, ``"ns"``, or ``"gps"``.
            show_gaps:
                Show gap regions as shaded areas.
            gap_color:
                Color for gap shading.
            gap_alpha:
                Transparency for gap shading.
            figsize:
                Figure size as ``(width, height)``. Default: auto-calculated
                based on layout and number of pads.
            ax:
                Existing matplotlib axes to plot on. Only used when
                ``layout="overlay"``. If provided, plots on this axes
                instead of creating a new figure.
            **kwargs:
                Additional arguments passed to matplotlib ``plot()``.

        Returns:
            tuple: ``(fig, ax)`` for overlay layout, ``(fig, axes)`` for
            subplots layout where ``axes`` is a list.

        Raises:
            ValueError: If an unknown pad name is specified.
            ImportError: If matplotlib is not installed.
        """
        _check_matplotlib()

        # Get collected frames
        frames = self.out_frames()

        # Determine which pads to plot
        if pads is None:
            # Use all pads in the order they were defined
            pads_to_plot = list(self.sink_pad_names)
        else:
            pads_to_plot = list(pads)

        # Validate pad names
        available_pads = set(frames.keys())
        for pad_name in pads_to_plot:
            if pad_name not in available_pads:
                raise ValueError(
                    f"Unknown pad '{pad_name}'. "
                    f"Available pads: {sorted(available_pads)}"
                )

        n_pads = len(pads_to_plot)

        # Determine labels (default to pad names)
        if labels is None:
            labels = {}
        pad_labels = {pad: labels.get(pad, pad) for pad in pads_to_plot}

        # Determine title (default to element name for multiple pads)
        if title is None and n_pads > 1:
            title = self.name

        # Handle different layouts
        if layout == "subplots" and n_pads > 1:
            return self._plot_subplots(
                frames=frames,
                pads_to_plot=pads_to_plot,
                pad_labels=pad_labels,
                title=title,
                time_unit=time_unit,
                show_gaps=show_gaps,
                gap_color=gap_color,
                gap_alpha=gap_alpha,
                figsize=figsize,
                **kwargs,
            )
        else:
            return self._plot_overlay(
                frames=frames,
                pads_to_plot=pads_to_plot,
                pad_labels=pad_labels,
                title=title,
                time_unit=time_unit,
                show_gaps=show_gaps,
                gap_color=gap_color,
                gap_alpha=gap_alpha,
                figsize=figsize,
                ax=ax,
                **kwargs,
            )

    def _plot_overlay(
        self,
        frames,
        pads_to_plot,
        pad_labels,
        title,
        time_unit,
        show_gaps,
        gap_color,
        gap_alpha,
        figsize,
        ax,
        **kwargs,
    ):
        """Plot all pads on the same axes."""
        plt = _check_matplotlib()
        from sgnts.plotting import plot_frame

        # Create figure if needed
        if ax is None:
            if figsize is None:
                figsize = (10, 4)
            fig, ax = plt.subplots(figsize=figsize)
        else:
            fig = ax.get_figure()

        # Plot each pad
        for pad_name in pads_to_plot:
            frame = frames[pad_name]
            plot_frame(
                frame,
                ax=ax,
                label=pad_labels[pad_name],
                time_unit=time_unit,
                show_gaps=show_gaps,
                gap_color=gap_color,
                gap_alpha=gap_alpha,
                **kwargs,
            )

        if title:
            ax.set_title(title)

        return fig, ax

    def _plot_subplots(
        self,
        frames,
        pads_to_plot,
        pad_labels,
        title,
        time_unit,
        show_gaps,
        gap_color,
        gap_alpha,
        figsize,
        **kwargs,
    ):
        """Plot each pad in its own subplot."""
        plt = _check_matplotlib()
        from sgnts.plotting import plot_frame

        n_pads = len(pads_to_plot)

        # Calculate figure size
        if figsize is None:
            figsize = (10, 2.5 * n_pads)

        fig, axes = plt.subplots(n_pads, 1, sharex=True, figsize=figsize, squeeze=False)
        axes = [ax[0] for ax in axes]  # Flatten from 2D array

        # Plot each pad in its subplot
        for i, pad_name in enumerate(pads_to_plot):
            frame = frames[pad_name]
            plot_frame(
                frame,
                ax=axes[i],
                label=pad_labels[pad_name],
                time_unit=time_unit,
                show_gaps=show_gaps,
                gap_color=gap_color,
                gap_alpha=gap_alpha,
                **kwargs,
            )
            axes[i].set_ylabel(pad_labels[pad_name])

        if title:
            fig.suptitle(title)

        return fig, axes

plot(pads=None, layout='overlay', labels=None, title=None, time_unit='gps', show_gaps=True, gap_color='red', gap_alpha=0.3, figsize=None, ax=None, **kwargs)

Plot collected frames.

Parameters:

Name Type Description Default
pads Optional[Sequence[str]]

Which pads to plot. Default: all pads in order they were defined.

None
layout Literal['overlay', 'subplots']

How to arrange multiple pads:

  • "overlay": All pads on same axes (default)
  • "subplots": Vertical stack with shared x-axis
'overlay'
labels Optional[dict[str, str]]

Custom labels for pads as {pad_name: label}. Default: use pad names.

None
title Optional[str]

Figure title. Default: element name if multiple pads, None for single pad.

None
time_unit Literal['s', 'ms', 'ns', 'gps']

Time unit for x-axis: "s", "ms", "ns", or "gps".

'gps'
show_gaps bool

Show gap regions as shaded areas.

True
gap_color str

Color for gap shading.

'red'
gap_alpha float

Transparency for gap shading.

0.3
figsize Optional[tuple[float, float]]

Figure size as (width, height). Default: auto-calculated based on layout and number of pads.

None
ax

Existing matplotlib axes to plot on. Only used when layout="overlay". If provided, plots on this axes instead of creating a new figure.

None
**kwargs

Additional arguments passed to matplotlib plot().

{}

Returns:

Name Type Description
tuple

(fig, ax) for overlay layout, (fig, axes) for

subplots layout where axes is a list.

Raises:

Type Description
ValueError

If an unknown pad name is specified.

ImportError

If matplotlib is not installed.

Source code in sgnts/sinks/plot.py
def plot(
    self,
    pads: Optional[Sequence[str]] = None,
    layout: Literal["overlay", "subplots"] = "overlay",
    labels: Optional[dict[str, str]] = None,
    title: Optional[str] = None,
    time_unit: Literal["s", "ms", "ns", "gps"] = "gps",
    show_gaps: bool = True,
    gap_color: str = "red",
    gap_alpha: float = 0.3,
    figsize: Optional[tuple[float, float]] = None,
    ax=None,
    **kwargs,
):
    """Plot collected frames.

    Args:
        pads:
            Which pads to plot. Default: all pads in order they were defined.
        layout:
            How to arrange multiple pads:

            - ``"overlay"``: All pads on same axes (default)
            - ``"subplots"``: Vertical stack with shared x-axis
        labels:
            Custom labels for pads as ``{pad_name: label}``.
            Default: use pad names.
        title:
            Figure title. Default: element name if multiple pads,
            None for single pad.
        time_unit:
            Time unit for x-axis: ``"s"``, ``"ms"``, ``"ns"``, or ``"gps"``.
        show_gaps:
            Show gap regions as shaded areas.
        gap_color:
            Color for gap shading.
        gap_alpha:
            Transparency for gap shading.
        figsize:
            Figure size as ``(width, height)``. Default: auto-calculated
            based on layout and number of pads.
        ax:
            Existing matplotlib axes to plot on. Only used when
            ``layout="overlay"``. If provided, plots on this axes
            instead of creating a new figure.
        **kwargs:
            Additional arguments passed to matplotlib ``plot()``.

    Returns:
        tuple: ``(fig, ax)`` for overlay layout, ``(fig, axes)`` for
        subplots layout where ``axes`` is a list.

    Raises:
        ValueError: If an unknown pad name is specified.
        ImportError: If matplotlib is not installed.
    """
    _check_matplotlib()

    # Get collected frames
    frames = self.out_frames()

    # Determine which pads to plot
    if pads is None:
        # Use all pads in the order they were defined
        pads_to_plot = list(self.sink_pad_names)
    else:
        pads_to_plot = list(pads)

    # Validate pad names
    available_pads = set(frames.keys())
    for pad_name in pads_to_plot:
        if pad_name not in available_pads:
            raise ValueError(
                f"Unknown pad '{pad_name}'. "
                f"Available pads: {sorted(available_pads)}"
            )

    n_pads = len(pads_to_plot)

    # Determine labels (default to pad names)
    if labels is None:
        labels = {}
    pad_labels = {pad: labels.get(pad, pad) for pad in pads_to_plot}

    # Determine title (default to element name for multiple pads)
    if title is None and n_pads > 1:
        title = self.name

    # Handle different layouts
    if layout == "subplots" and n_pads > 1:
        return self._plot_subplots(
            frames=frames,
            pads_to_plot=pads_to_plot,
            pad_labels=pad_labels,
            title=title,
            time_unit=time_unit,
            show_gaps=show_gaps,
            gap_color=gap_color,
            gap_alpha=gap_alpha,
            figsize=figsize,
            **kwargs,
        )
    else:
        return self._plot_overlay(
            frames=frames,
            pads_to_plot=pads_to_plot,
            pad_labels=pad_labels,
            title=title,
            time_unit=time_unit,
            show_gaps=show_gaps,
            gap_color=gap_color,
            gap_alpha=gap_alpha,
            figsize=figsize,
            ax=ax,
            **kwargs,
        )