Skip to content

sgnts.base

AdapterConfig dataclass

Config to hold parameters used for the audioadapter in _TSTransSink.

Parameters:

Name Type Description Default
enable bool | None

bool | None, controls whether adapter processing is enabled. - None (default): Auto-detect based on configuration - Disabled if all config values remain at defaults - Enabled if any non-default config is provided - Enabled if any configuration method is called - True: Force enable adapter processing - False: Force disable adapter processing Default: None

None
overlap tuple[int, int]

tuple[int, int], the overlap before and after the data segment to process, in offsets

(0, 0)
stride int

int, the stride to produce, in offsets

0
pad_zeros_startup bool

bool, when overlap is provided, whether to pad zeros in front of the first buffer, or wait until there is enough data.

False
skip_gaps bool

bool, produce a whole gap buffer if there are any gaps in the copied data segment

False
backend type[ArrayBackend]

type[ArrayBackend], the ArrayBackend wrapper

NumpyBackend
align_to int | None

int or None, alignment boundary in offsets When set, output offsets will be aligned to multiples of this value. For example: - Offset.fromsec(1) aligns to integer seconds - Offset.fromsamples(1024, rate) aligns to 1024-sample boundaries Default: None (no alignment)

None
align_buffers bool

bool, when True, aligns buffer slices to the minimum sampling rate across all pads. This expands gaps and shrinks data slices to ensure all buffers align to integer sample boundaries at the lowest rate. Default: False

False
offset_shift int

int, offset shift to apply to output buffers, in offsets This is used for transforms that introduce latency or phase shifts. The output offset will be shifted by this amount: offset + offset_shift. Positive values shift forward in time, negative values shift backward. For example, a filter with latency=2 samples at rate=1 Hz would use offset_shift=-Offset.fromsamples(2, 1) to shift output backward by 2 samples. Default: 0 (no shift)

0
Source code in sgnts/base/audioadapter.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@dataclass
class AdapterConfig:
    """Config to hold parameters used for the audioadapter in _TSTransSink.

    Args:
        enable:
            bool | None, controls whether adapter processing is enabled.
            - None (default): Auto-detect based on configuration
              - Disabled if all config values remain at defaults
              - Enabled if any non-default config is provided
              - Enabled if any configuration method is called
            - True: Force enable adapter processing
            - False: Force disable adapter processing
            Default: None
        overlap:
            tuple[int, int], the overlap before and after the data segment to process,
            in offsets
        stride:
            int, the stride to produce, in offsets
        pad_zeros_startup:
            bool, when overlap is provided, whether to pad zeros in front of the
            first buffer, or wait until there is enough data.
        skip_gaps:
            bool, produce a whole gap buffer if there are any gaps in the copied data
            segment
        backend:
            type[ArrayBackend], the ArrayBackend wrapper
        align_to:
            int or None, alignment boundary in offsets
            When set, output offsets will be aligned to multiples of this value.
            For example:
            - Offset.fromsec(1) aligns to integer seconds
            - Offset.fromsamples(1024, rate) aligns to 1024-sample boundaries
            Default: None (no alignment)
        align_buffers:
            bool, when True, aligns buffer slices to the minimum sampling rate
            across all pads. This expands gaps and shrinks data slices to ensure
            all buffers align to integer sample boundaries at the lowest rate.
            Default: False
        offset_shift:
            int, offset shift to apply to output buffers, in offsets This is
            used for transforms that introduce latency or phase shifts. The
            output offset will be shifted by this amount: offset +
            offset_shift. Positive values shift forward in time, negative
            values shift backward. For example, a filter with latency=2 samples
            at rate=1 Hz would use offset_shift=-Offset.fromsamples(2, 1) to
            shift output backward by 2 samples.
            Default: 0 (no shift)
    """

    enable: bool | None = None
    overlap: tuple[int, int] = (0, 0)
    stride: int = 0
    pad_zeros_startup: bool = False
    skip_gaps: bool = False
    backend: type[ArrayBackend] = NumpyBackend
    align_to: int | None = None
    align_buffers: bool = False
    offset_shift: int = 0

    @property
    def is_enabled(self) -> bool:
        """Check if adapter should be enabled.

        Returns:
            True if adapter is explicitly enabled or has non-default configuration.
            False if adapter is explicitly disabled or has all default values.
        """
        if self.enable is False:
            return False
        if self.enable is True:
            return True
        # enable is None - auto-detect based on configuration
        return (
            self.overlap != (0, 0)
            or self.stride != 0
            or self.pad_zeros_startup
            or self.skip_gaps
            or self.backend != NumpyBackend
            or self.align_to is not None
            or self.align_buffers
            or self.offset_shift != 0
        )

    def alignment(
        self,
        overlap: Optional[tuple[int, int]] = None,
        stride: Optional[int] = None,
        align_to: Optional[int] = None,
        align_buffers: Optional[bool] = None,
        shift: Optional[int] = None,
    ) -> AdapterConfig:
        """Configure alignment and buffering parameters.

        Enables the adapter when called.

        Args:
            overlap: tuple[int, int], the overlap before and after the data segment
            stride: int, the stride to produce, in offsets
            align_to: int, alignment boundary in offsets
            align_buffers: bool, align buffer slices to minimum sampling rate
            shift: int, offset shift to apply to output buffers

        Returns:
            AdapterConfig, self for method chaining
        """
        if self.enable is None:
            self.enable = True

        if overlap is not None:
            self.overlap = overlap
        if stride is not None:
            self.stride = stride
        if align_to is not None:
            self.align_to = align_to
        if align_buffers is not None:
            self.align_buffers = align_buffers
        if shift is not None:
            self.offset_shift = shift
        return self

    def on_gap(self, skip: Optional[bool] = None) -> AdapterConfig:
        """Configure gap handling.

        Enables the adapter when called.

        Args:
            skip: bool, produce a whole gap buffer if there are any gaps

        Returns:
            AdapterConfig, self for method chaining
        """
        if self.enable is None:
            self.enable = True

        if skip is not None:
            self.skip_gaps = skip
        return self

    def on_startup(self, pad_zeros: Optional[bool] = None) -> AdapterConfig:
        """Configure startup behavior.

        Enables the adapter when called.

        Args:
            pad_zeros: bool, whether to pad zeros in front of the first buffer

        Returns:
            AdapterConfig, self for method chaining
        """
        if self.enable is None:
            self.enable = True

        if pad_zeros is not None:
            self.pad_zeros_startup = pad_zeros
        return self

    def valid_buffer(self, buf, data: Optional[Union[int, Array]] = 0):
        """
        Return a new buffer corresponding to the non overlapping part of a
        buffer "buf" as defined by this classes overlap properties As a special case,
        if the buffer is shape zero (a heartbeat buffer) a new heartbeat buffer is
        returned with the offsets shifted by overlap[0].
        Otherwise, in order for the buffer to be valid it must be what is expected
        based on the adapter's overlap and stride etc.
        """

        if buf.shape == (0,):
            new_slice = TSSlice(
                buf.slice[0] + self.overlap[0], buf.slice[0] + self.overlap[0]
            )
            return buf.new(new_slice, data=None)
        else:
            expected_shape = (
                Offset.tosamples(self.overlap[0], buf.sample_rate)
                + Offset.tosamples(self.overlap[1], buf.sample_rate)
                + Offset.sample_stride(buf.sample_rate),
            )
            assert buf.shape == expected_shape
            new_slice = TSSlice(
                buf.slice[0] + self.overlap[0], buf.slice[1] - self.overlap[1]
            )
            return buf.new(new_slice, data)

is_enabled property

Check if adapter should be enabled.

Returns:

Type Description
bool

True if adapter is explicitly enabled or has non-default configuration.

bool

False if adapter is explicitly disabled or has all default values.

alignment(overlap=None, stride=None, align_to=None, align_buffers=None, shift=None)

Configure alignment and buffering parameters.

Enables the adapter when called.

Parameters:

Name Type Description Default
overlap Optional[tuple[int, int]]

tuple[int, int], the overlap before and after the data segment

None
stride Optional[int]

int, the stride to produce, in offsets

None
align_to Optional[int]

int, alignment boundary in offsets

None
align_buffers Optional[bool]

bool, align buffer slices to minimum sampling rate

None
shift Optional[int]

int, offset shift to apply to output buffers

None

Returns:

Type Description
AdapterConfig

AdapterConfig, self for method chaining

Source code in sgnts/base/audioadapter.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def alignment(
    self,
    overlap: Optional[tuple[int, int]] = None,
    stride: Optional[int] = None,
    align_to: Optional[int] = None,
    align_buffers: Optional[bool] = None,
    shift: Optional[int] = None,
) -> AdapterConfig:
    """Configure alignment and buffering parameters.

    Enables the adapter when called.

    Args:
        overlap: tuple[int, int], the overlap before and after the data segment
        stride: int, the stride to produce, in offsets
        align_to: int, alignment boundary in offsets
        align_buffers: bool, align buffer slices to minimum sampling rate
        shift: int, offset shift to apply to output buffers

    Returns:
        AdapterConfig, self for method chaining
    """
    if self.enable is None:
        self.enable = True

    if overlap is not None:
        self.overlap = overlap
    if stride is not None:
        self.stride = stride
    if align_to is not None:
        self.align_to = align_to
    if align_buffers is not None:
        self.align_buffers = align_buffers
    if shift is not None:
        self.offset_shift = shift
    return self

on_gap(skip=None)

Configure gap handling.

Enables the adapter when called.

Parameters:

Name Type Description Default
skip Optional[bool]

bool, produce a whole gap buffer if there are any gaps

None

Returns:

Type Description
AdapterConfig

AdapterConfig, self for method chaining

Source code in sgnts/base/audioadapter.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def on_gap(self, skip: Optional[bool] = None) -> AdapterConfig:
    """Configure gap handling.

    Enables the adapter when called.

    Args:
        skip: bool, produce a whole gap buffer if there are any gaps

    Returns:
        AdapterConfig, self for method chaining
    """
    if self.enable is None:
        self.enable = True

    if skip is not None:
        self.skip_gaps = skip
    return self

on_startup(pad_zeros=None)

Configure startup behavior.

Enables the adapter when called.

Parameters:

Name Type Description Default
pad_zeros Optional[bool]

bool, whether to pad zeros in front of the first buffer

None

Returns:

Type Description
AdapterConfig

AdapterConfig, self for method chaining

Source code in sgnts/base/audioadapter.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def on_startup(self, pad_zeros: Optional[bool] = None) -> AdapterConfig:
    """Configure startup behavior.

    Enables the adapter when called.

    Args:
        pad_zeros: bool, whether to pad zeros in front of the first buffer

    Returns:
        AdapterConfig, self for method chaining
    """
    if self.enable is None:
        self.enable = True

    if pad_zeros is not None:
        self.pad_zeros_startup = pad_zeros
    return self

valid_buffer(buf, data=0)

Return a new buffer corresponding to the non overlapping part of a buffer "buf" as defined by this classes overlap properties As a special case, if the buffer is shape zero (a heartbeat buffer) a new heartbeat buffer is returned with the offsets shifted by overlap[0]. Otherwise, in order for the buffer to be valid it must be what is expected based on the adapter's overlap and stride etc.

Source code in sgnts/base/audioadapter.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def valid_buffer(self, buf, data: Optional[Union[int, Array]] = 0):
    """
    Return a new buffer corresponding to the non overlapping part of a
    buffer "buf" as defined by this classes overlap properties As a special case,
    if the buffer is shape zero (a heartbeat buffer) a new heartbeat buffer is
    returned with the offsets shifted by overlap[0].
    Otherwise, in order for the buffer to be valid it must be what is expected
    based on the adapter's overlap and stride etc.
    """

    if buf.shape == (0,):
        new_slice = TSSlice(
            buf.slice[0] + self.overlap[0], buf.slice[0] + self.overlap[0]
        )
        return buf.new(new_slice, data=None)
    else:
        expected_shape = (
            Offset.tosamples(self.overlap[0], buf.sample_rate)
            + Offset.tosamples(self.overlap[1], buf.sample_rate)
            + Offset.sample_stride(buf.sample_rate),
        )
        assert buf.shape == expected_shape
        new_slice = TSSlice(
            buf.slice[0] + self.overlap[0], buf.slice[1] - self.overlap[1]
        )
        return buf.new(new_slice, data)

TSTransform dataclass

Bases: TimeSeriesMixin[TSFrame], TransformElement[TimeSpanFrame]


              flowchart TD
              sgnts.base.TSTransform[TSTransform]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TimeSeriesMixin --> sgnts.base.TSTransform
                


              click sgnts.base.TSTransform href "" "sgnts.base.TSTransform"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A time-series transform element.

Source code in sgnts/base/base.py
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
@dataclass
class TSTransform(TimeSeriesMixin[TSFrame], TransformElement[TimeSpanFrame]):
    """A time-series transform element."""

    def internal(self) -> None:
        """Process frames by calling child class implementation.

        If the child class defines a process() method, it will be called with
        input and output frame dictionaries. Otherwise, child classes should
        override internal() directly.
        """
        super().internal()

        # Check if the element defines a process() method
        if hasattr(self, "process"):
            # Collect all input frames (both TSFrame and EventFrame)
            inframes: dict[SinkPad, TimeSpanFrame] = {}
            inframes.update(self.next_ts_inputs())
            inframes.update(self.next_event_inputs())

            # Collect all output collectors/frames (TSCollectFrame or EventFrame)
            ts_collectors = self.next_ts_outputs()
            outframes: dict[SourcePad, TimeSpanFrame | TSCollectFrame] = {}
            outframes.update(ts_collectors)
            outframes.update(self.next_event_outputs())

            # Call the process method
            self.process(inframes, outframes)  # type: ignore[attr-defined]

            # Close all TS collectors to commit buffers to parent frames
            for collector in ts_collectors.values():
                collector.close()

    def new(self, pad: SourcePad) -> TimeSpanFrame:
        """Return the output frame for the given pad.

        It should take the source pad as an argument and return a new
        TSFrame or EventFrame.

        Args:
            pad:
                SourcePad, The source pad that is producing the transformed frame

        Returns:
            TSFrame or EventFrame, The transformed frame

        """
        frame = self.outframes.get(pad)
        assert frame is not None
        return frame

internal()

Process frames by calling child class implementation.

If the child class defines a process() method, it will be called with input and output frame dictionaries. Otherwise, child classes should override internal() directly.

Source code in sgnts/base/base.py
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
def internal(self) -> None:
    """Process frames by calling child class implementation.

    If the child class defines a process() method, it will be called with
    input and output frame dictionaries. Otherwise, child classes should
    override internal() directly.
    """
    super().internal()

    # Check if the element defines a process() method
    if hasattr(self, "process"):
        # Collect all input frames (both TSFrame and EventFrame)
        inframes: dict[SinkPad, TimeSpanFrame] = {}
        inframes.update(self.next_ts_inputs())
        inframes.update(self.next_event_inputs())

        # Collect all output collectors/frames (TSCollectFrame or EventFrame)
        ts_collectors = self.next_ts_outputs()
        outframes: dict[SourcePad, TimeSpanFrame | TSCollectFrame] = {}
        outframes.update(ts_collectors)
        outframes.update(self.next_event_outputs())

        # Call the process method
        self.process(inframes, outframes)  # type: ignore[attr-defined]

        # Close all TS collectors to commit buffers to parent frames
        for collector in ts_collectors.values():
            collector.close()

new(pad)

Return the output frame for the given pad.

It should take the source pad as an argument and return a new TSFrame or EventFrame.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad that is producing the transformed frame

required

Returns:

Type Description
TimeSpanFrame

TSFrame or EventFrame, The transformed frame

Source code in sgnts/base/base.py
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
def new(self, pad: SourcePad) -> TimeSpanFrame:
    """Return the output frame for the given pad.

    It should take the source pad as an argument and return a new
    TSFrame or EventFrame.

    Args:
        pad:
            SourcePad, The source pad that is producing the transformed frame

    Returns:
        TSFrame or EventFrame, The transformed frame

    """
    frame = self.outframes.get(pad)
    assert frame is not None
    return frame

TSSink dataclass

Bases: TimeSeriesMixin[TSFrame], SinkElement[TimeSpanFrame]


              flowchart TD
              sgnts.base.TSSink[TSSink]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TimeSeriesMixin --> sgnts.base.TSSink
                


              click sgnts.base.TSSink href "" "sgnts.base.TSSink"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A time-series sink element.

Source code in sgnts/base/base.py
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
@dataclass
class TSSink(TimeSeriesMixin[TSFrame], SinkElement[TimeSpanFrame]):
    """A time-series sink element."""

    def internal(self) -> None:
        """Process frames by calling child class implementation.

        If the child class defines a process() method, it will be called with
        input frame dictionaries. Otherwise, child classes should override
        internal() directly.
        """
        super().internal()

        # Check if the element defines a process() method
        if hasattr(self, "process"):
            # Collect all input frames (both TSFrame and EventFrame)
            inframes: dict[SinkPad, TimeSpanFrame] = {}
            inframes.update(self.next_ts_inputs())
            inframes.update(self.next_event_inputs())

            # Call the process method
            self.process(inframes)  # type: ignore[attr-defined]

internal()

Process frames by calling child class implementation.

If the child class defines a process() method, it will be called with input frame dictionaries. Otherwise, child classes should override internal() directly.

Source code in sgnts/base/base.py
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
def internal(self) -> None:
    """Process frames by calling child class implementation.

    If the child class defines a process() method, it will be called with
    input frame dictionaries. Otherwise, child classes should override
    internal() directly.
    """
    super().internal()

    # Check if the element defines a process() method
    if hasattr(self, "process"):
        # Collect all input frames (both TSFrame and EventFrame)
        inframes: dict[SinkPad, TimeSpanFrame] = {}
        inframes.update(self.next_ts_inputs())
        inframes.update(self.next_event_inputs())

        # Call the process method
        self.process(inframes)  # type: ignore[attr-defined]

TSSource dataclass

Bases: _TSSource


              flowchart TD
              sgnts.base.TSSource[TSSource]
              sgnts.base.base._TSSource[_TSSource]

                              sgnts.base.base._TSSource --> sgnts.base.TSSource
                


              click sgnts.base.TSSource href "" "sgnts.base.TSSource"
              click sgnts.base.base._TSSource href "" "sgnts.base.base._TSSource"
            

A time-series source that generates data in fixed-size buffers where the user can specify the start time and end time. If you want a data driven source consider using TSResourceSource.

Parameters:

Name Type Description Default
t0 float | None

float, start time of first buffer, in seconds

None
end float | None

float, end time of the last buffer, in seconds

None
duration float | None

float, alternative to end option, specify the duration of time to be covered in seconds. Cannot be given if end is given.

None
Source code in sgnts/base/base.py
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
@dataclass
class TSSource(_TSSource):
    """A time-series source that generates data in fixed-size buffers where the
       user can specify the start time and end time. If you want a data driven
       source consider using TSResourceSource.

    Args:
        t0:
            float, start time of first buffer, in seconds
        end:
            float, end time of the last buffer, in seconds
        duration:
            float, alternative to end option, specify the duration of
            time to be covered in seconds. Cannot be given if end is given.
    """

    t0: float | None = None
    end: float | None = None
    duration: float | None = None

    def __post_init__(self):
        super().__post_init__()

        if self.t0 is None:
            raise ValueError("You must specifiy a t0")

        if self.end is not None and self.duration is not None:
            raise ValueError("may specify either end or duration, not both")

        if self.duration is not None:
            self.end = self.t0 + self.duration

        if self.end is not None:
            assert self.end > self.t0, "end is before t0"

    @property
    def end_offset(self):
        if self.end is None:
            return float("inf")
        return Offset.fromsec(self.end - Offset.offset_ref_t0 / Time.SECONDS)

    @property
    def start_offset(self):
        assert self.t0 is not None
        return Offset.fromsec(self.t0 - Offset.offset_ref_t0 / Time.SECONDS)

    def set_pad_buffer_params(
        self,
        pad: SourcePad,
        sample_shape: tuple[int, ...],
        rate: int,
    ) -> None:
        """Set variables on the pad that are needed to construct SeriesBuffers.

        These should remain constant throughout the duration of the
        pipeline so this method may only be called once.

        Args:
            pad:
                SourcePad, the pad to setup buffers on
            sample_shape:
                tuple[int, ...], the shape of a single sample of the
                data, or put another way, the shape of the data except
                for the last (time) dimension,
                i.e. sample_shape=data.shape[:-1]
            rate:
                int, the sample rate of the data the pad will produce

        """
        # Make sure this has only been called once per pad
        assert (
            pad not in self._new_buffer_dict
        ), f"Pad {pad.name} already exists in _new_buffer_dict - duplicate pad entry"

        self._new_buffer_dict[pad] = {
            "sample_rate": rate,
            "shape": sample_shape + (self.num_samples(rate),),
        }
        self._next_frame_dict[pad] = TSFrame.from_buffer_kwargs(
            offset=self.start_offset, data=None, **self._new_buffer_dict[pad]
        )

set_pad_buffer_params(pad, sample_shape, rate)

Set variables on the pad that are needed to construct SeriesBuffers.

These should remain constant throughout the duration of the pipeline so this method may only be called once.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad to setup buffers on

required
sample_shape tuple[int, ...]

tuple[int, ...], the shape of a single sample of the data, or put another way, the shape of the data except for the last (time) dimension, i.e. sample_shape=data.shape[:-1]

required
rate int

int, the sample rate of the data the pad will produce

required
Source code in sgnts/base/base.py
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
def set_pad_buffer_params(
    self,
    pad: SourcePad,
    sample_shape: tuple[int, ...],
    rate: int,
) -> None:
    """Set variables on the pad that are needed to construct SeriesBuffers.

    These should remain constant throughout the duration of the
    pipeline so this method may only be called once.

    Args:
        pad:
            SourcePad, the pad to setup buffers on
        sample_shape:
            tuple[int, ...], the shape of a single sample of the
            data, or put another way, the shape of the data except
            for the last (time) dimension,
            i.e. sample_shape=data.shape[:-1]
        rate:
            int, the sample rate of the data the pad will produce

    """
    # Make sure this has only been called once per pad
    assert (
        pad not in self._new_buffer_dict
    ), f"Pad {pad.name} already exists in _new_buffer_dict - duplicate pad entry"

    self._new_buffer_dict[pad] = {
        "sample_rate": rate,
        "shape": sample_shape + (self.num_samples(rate),),
    }
    self._next_frame_dict[pad] = TSFrame.from_buffer_kwargs(
        offset=self.start_offset, data=None, **self._new_buffer_dict[pad]
    )