Skip to content

sgnts.sinks

NullSeriesSink dataclass

Bases: TSSink


              flowchart TD
              sgnts.sinks.NullSeriesSink[NullSeriesSink]
              sgnts.base.base.TSSink[TSSink]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TSSink --> sgnts.sinks.NullSeriesSink
                                sgnts.base.base.TimeSeriesMixin --> sgnts.base.base.TSSink
                



              click sgnts.sinks.NullSeriesSink href "" "sgnts.sinks.NullSeriesSink"
              click sgnts.base.base.TSSink href "" "sgnts.base.base.TSSink"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A series sink that does precisely nothing.

Parameters:

Name Type Description Default
verbose bool

bool, print frames as they pass through the internal pad

False
Source code in sgnts/sinks/null.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@dataclass
class NullSeriesSink(TSSink):
    """A series sink that does precisely nothing.

    Args:
        verbose:
            bool, print frames as they pass through the internal pad

    """

    verbose: bool = False

    def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
        """Print frames if verbose."""
        for sink_pad, frame in input_frames.items():
            if frame.EOS:
                self.mark_eos(sink_pad)
            if self.verbose:
                print(f"{sink_pad.name}:")
                print(f"  {frame}")
                latency = gpsnow() - Offset.tosec(
                    frame.offset + Offset.SAMPLE_STRIDE_AT_MAX_RATE
                )
                print(f"  latency: {latency} s")

process(input_frames)

Print frames if verbose.

Source code in sgnts/sinks/null.py
21
22
23
24
25
26
27
28
29
30
31
32
def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
    """Print frames if verbose."""
    for sink_pad, frame in input_frames.items():
        if frame.EOS:
            self.mark_eos(sink_pad)
        if self.verbose:
            print(f"{sink_pad.name}:")
            print(f"  {frame}")
            latency = gpsnow() - Offset.tosec(
                frame.offset + Offset.SAMPLE_STRIDE_AT_MAX_RATE
            )
            print(f"  latency: {latency} s")

DumpSeriesSink dataclass

Bases: TSSink


              flowchart TD
              sgnts.sinks.DumpSeriesSink[DumpSeriesSink]
              sgnts.base.base.TSSink[TSSink]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TSSink --> sgnts.sinks.DumpSeriesSink
                                sgnts.base.base.TimeSeriesMixin --> sgnts.base.base.TSSink
                



              click sgnts.sinks.DumpSeriesSink href "" "sgnts.sinks.DumpSeriesSink"
              click sgnts.base.base.TSSink href "" "sgnts.base.base.TSSink"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A sink element that dumps time series data to a txt file.

Parameters:

Name Type Description Default
fname str

str, output file name

required
verbose bool

bool, be verbose

False
Source code in sgnts/sinks/dump.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass(kw_only=True)
class DumpSeriesSink(TSSink):
    """A sink element that dumps time series data to a txt file.

    Args:
        fname:
            str, output file name
        verbose:
            bool, be verbose
    """

    fname: str
    verbose: bool = False

    def configure(self) -> None:
        # overwrite existing file
        with open(self.fname, "w"):
            pass

    @validator.single_pad
    def validate(self) -> None:
        pass

    def write_to_file(self, buf) -> None:
        """Write time series data to txt file.

        Args:
            buf:
                SeriesBuffer, the buffer with time series data to write out
        """
        t0 = buf.t0
        duration = buf.duration
        data = buf.data
        # FIXME: How to write multi-dimensional data?
        data = data.reshape(-1, data.shape[-1])
        ts = np.linspace(
            t0 / Time.SECONDS,
            (t0 + duration) / Time.SECONDS,
            data.shape[-1],
            endpoint=False,
        )
        out = np.vstack([ts, data]).T
        with open(self.fname, "ab") as f:
            np.savetxt(f, out)

    @sink.single_pad
    def process(self, input_frame: TSFrame) -> None:
        """Write out time-series data."""
        if input_frame.EOS:
            self.mark_eos(self.sink_pads[0])
        if self.verbose is True:
            print(input_frame)
        for buf in input_frame:
            if not buf.is_gap:
                self.write_to_file(buf)

process(input_frame)

Write out time-series data.

Source code in sgnts/sinks/dump.py
55
56
57
58
59
60
61
62
63
64
@sink.single_pad
def process(self, input_frame: TSFrame) -> None:
    """Write out time-series data."""
    if input_frame.EOS:
        self.mark_eos(self.sink_pads[0])
    if self.verbose is True:
        print(input_frame)
    for buf in input_frame:
        if not buf.is_gap:
            self.write_to_file(buf)

write_to_file(buf)

Write time series data to txt file.

Parameters:

Name Type Description Default
buf

SeriesBuffer, the buffer with time series data to write out

required
Source code in sgnts/sinks/dump.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def write_to_file(self, buf) -> None:
    """Write time series data to txt file.

    Args:
        buf:
            SeriesBuffer, the buffer with time series data to write out
    """
    t0 = buf.t0
    duration = buf.duration
    data = buf.data
    # FIXME: How to write multi-dimensional data?
    data = data.reshape(-1, data.shape[-1])
    ts = np.linspace(
        t0 / Time.SECONDS,
        (t0 + duration) / Time.SECONDS,
        data.shape[-1],
        endpoint=False,
    )
    out = np.vstack([ts, data]).T
    with open(self.fname, "ab") as f:
        np.savetxt(f, out)