Skip to content

sgnts.transforms.resampler

Resampler dataclass

Bases: TSTransform


              flowchart TD
              sgnts.transforms.resampler.Resampler[Resampler]
              sgnts.base.base.TSTransform[TSTransform]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TSTransform --> sgnts.transforms.resampler.Resampler
                                sgnts.base.base.TimeSeriesMixin --> sgnts.base.base.TSTransform
                



              click sgnts.transforms.resampler.Resampler href "" "sgnts.transforms.resampler.Resampler"
              click sgnts.base.base.TSTransform href "" "sgnts.base.base.TSTransform"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

Up/down samples time-series data

Parameters:

Name Type Description Default
inrate int

int, sample rate of the input frames

required
outrate int

int, sample rate of the output frames

required
backend type[ArrayBackend]

type[ArrayBackend], default NumpyBackend, a wrapper around array operations

NumpyBackend
gstlal_norm bool

boolean: If true it will normalize consistent with SGNL filter matching. If false it have a slightly more accurate normalization

True
use_gstlal_cpu_upsample bool

boolean: If true, use fast C-based gstlal implementation for upsampling

False
Source code in sgnts/transforms/resampler.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@dataclass(kw_only=True)
class Resampler(TSTransform):
    """Up/down samples time-series data

    Args:
        inrate:
            int, sample rate of the input frames
        outrate:
            int, sample rate of the output frames
        backend:
            type[ArrayBackend], default NumpyBackend, a wrapper around array operations
        gstlal_norm:
            boolean: If true it will normalize consistent with SGNL
            filter matching. If false it have a slightly more accurate normalization
        use_gstlal_cpu_upsample:
            boolean: If true, use fast C-based gstlal implementation for upsampling

    """

    inrate: int
    outrate: int
    backend: type[ArrayBackend] = NumpyBackend
    gstlal_norm: bool = True
    use_gstlal_cpu_upsample: bool = False

    def configure(self) -> None:
        self.next_out_offset = None

        if self.outrate < self.inrate:
            # downsample parameters
            factor = self.inrate // self.outrate
            self.half_length = int(DOWN_HALF_LENGTH * factor)
            self.kernel_length = self.half_length * 2 + 1
            self.thiskernel = self.downkernel(factor)
        elif self.outrate > self.inrate:
            # upsample parameters
            factor = self.outrate // self.inrate
            self.half_length = UP_HALF_LENGTH
            self.kernel_length = self.half_length * 2 + 1
            self.thiskernel = self.upkernel(factor)
        else:
            # same rate
            raise ValueError("Inrate {self.inrate} is the same as outrate {outrate}")

        if self.backend == TorchBackend:
            if not TORCH_AVAILABLE:
                raise ImportError(
                    "PyTorch is not installed. Install it with 'pip install "
                    "sgn-ts[torch]'"
                )

            # Convert the numpy kernel to torch tensors
            if self.outrate < self.inrate:
                # downsample
                self.thiskernel = torch.from_numpy(self.thiskernel).view(1, 1, -1)
            else:
                # upsample
                sub_kernel_length = int(2 * self.half_length + 1)
                self.thiskernel = torch.tensor(self.thiskernel.copy()).view(
                    self.outrate // self.inrate, 1, sub_kernel_length
                )
            self.thiskernel = self.thiskernel.to(TorchBackend.DEVICE).to(
                TorchBackend.DTYPE
            )
            self.resample = self.resample_torch
        else:
            self.resample = self.resample_numpy

        self.adapter_config.backend = self.backend
        self.adapter_config.overlap = (
            Offset.fromsamples(self.half_length, self.inrate),
            Offset.fromsamples(self.half_length, self.inrate),
        )
        self.adapter_config.on_startup(pad_zeros=True)

        self.pad_length = self.half_length

    @validator.one_to_one
    def validate(self) -> None:
        assert (
            self.inrate in Offset.ALLOWED_RATES
        ), f"Input rate {self.inrate} not in ALLOWED_RATES: {Offset.ALLOWED_RATES}"
        assert (
            self.outrate in Offset.ALLOWED_RATES
        ), f"Output rate {self.outrate} not in ALLOWED_RATES: {Offset.ALLOWED_RATES}"

    def downkernel(self, factor: int) -> Array:
        """Compute the kernel for downsampling. Modified from gstlal_interpolator.c

        This is a sinc windowed sinc function kernel
        The baseline kernel is defined as

        g[k] = sin(pi / f * (k-c)) / (pi / f * (k-c)) * (1 - (k-c)^2 / c / c)   k != c
        g[k] = 1                                                                k = c

        Where:

            f: downsample factor, must be power of 2, e.g., 2, 4, 8, ...
            c: defined as half the full kernel length

        You specify the half filter length at the target rate in samples,
        the kernel length is then given by:

            kernel_length = half_length_at_original_rate * 2 * f + 1


        Args:
            factor:
                int, factor = inrate/outrate

        Returns:
            Array, the downsampling kernel
        """
        kernel_length = int(2 * self.half_length + 1)

        # the domain should be the kernel_length divided by two
        c = kernel_length // 2
        x = np.arange(-c, c + 1)
        vecs = np.sinc(x / factor) * np.sinc(x / c)
        if self.gstlal_norm:
            norm = np.linalg.norm(vecs) * factor**0.5
        else:
            norm = sum(vecs)
        vecs = vecs / norm
        return vecs.reshape(1, -1)

    def upkernel(self, factor: int) -> Array:
        """Compute the kernel for upsampling. Modified from gstlal_interpolator.c

        This is a sinc windowed sinc function kernel
        The baseline kernel is defined as

        $$\\begin{align}
        g(k) &= \\sin(\\pi / f * (k-c)) /
                (\\pi / f * (k-c)) * (1 - (k-c)^2 / c / c)  & k != c \\\\
        g(k) &= 1 & k = c
        \\end{align}$$

        Where:

            f: interpolation factor, must be power of 2, e.g., 2, 4, 8, ...
            c: defined as half the full kernel length

        You specify the half filter length at the original rate in samples,
        the kernel length is then given by:

            kernel_length = half_length_at_original_rate * 2 * f + 1

        Interpolation is then defined as a two step process.  First the
        input data is zero filled to bring it up to the new sample rate,
        i.e., the input data, x, is transformed to x' such that:

        x'[i] = x[i/f]	if (i%f) == 0
              = 0       if (i%f) > 0

        y[i] = sum_{k=0}^{2c+1} x'[i-k] g[k]

        Since more than half the terms in this series would be zero, the
        convolution is implemented by breaking up the kernel into f separate
        kernels each 1/f as large as the originalcalled z, i.e.,:

        z[0][k/f] = g[k*f]
        z[1][k/f] = g[k*f+1]
        ...
        z[f-1][k/f] = g[k*f + f-1]

        Now the convolution can be written as:

        y[i] = sum_{k=0}^{2c/f+1} x[i/f] z[i%f][k]

        which avoids multiplying zeros.  Note also that by construction the
        sinc function has its zeros arranged such that z[0][:] had only one
        nonzero sample at its center. Therefore the actual convolution is:

        y[i] = x[i/f]					if i%f == 0
        y[i] = sum_{k=0}^{2c/f+1} x[i/f] z[i%f][k]	otherwise


        Args:
            factor:
                int, factor = outrate/inrate

        Returns:
            Array, the upsampling kernel
        """
        kernel_length = int(2 * self.half_length * factor + 1)
        sub_kernel_length = int(2 * self.half_length + 1)

        # the domain should be the kernel_length divided by two
        c = kernel_length // 2
        x = np.arange(-c, c + 1)
        out = np.sinc(x / factor) * np.sinc(x / c)
        out = np.pad(out, (0, factor - 1))
        # FIXME: check if interleave same as no interleave
        vecs = out.reshape(-1, factor).T[:, ::-1]

        return vecs.reshape(int(factor), 1, sub_kernel_length)

    def upsample_gstlal(self, data):
        """Upsample using gstlal implementation.

        Handles both numpy arrays and torch tensors.

        Args:
            data: Input data (numpy array or torch tensor), shape (-1, n_samples)

        Returns:
            Upsampled data (same type as input), not reshaped
        """
        # Check if input is torch tensor
        is_torch = TORCH_AVAILABLE and torch.is_tensor(data)
        if is_torch:
            # Convert torch -> numpy
            torch_device = data.device
            torch_dtype = data.dtype
            data_np = data.cpu().numpy()
        else:
            data_np = data

        # Call gstlal
        factor = self.outrate // self.inrate
        out_np = upsample_transposed(
            data_np, factor=factor, half_length=self.half_length
        )

        # Convert back to torch if needed
        if is_torch:
            out = torch.from_numpy(out_np).to(torch_device).to(torch_dtype)
        else:
            out = out_np

        return out

    def resample_numpy(
        self, data0: NumpyArray, outshape: tuple[int, ...]
    ) -> NumpyArray:
        """Correlate the data with the kernel.

        Args:
            data0:
                Array, the data to be up/downsampled
            outshape:
                tuple[int, ...], the shape of the output array

        Returns:
            Array, the resulting array of the up/downsamping
        """
        data = data0.reshape(-1, data0.shape[-1])

        if self.outrate > self.inrate:
            # upsample
            if self.use_gstlal_cpu_upsample and GSTLAL_AVAILABLE:
                # Use fast C-based gstlal implementation
                out = self.upsample_gstlal(data)
            else:
                # Fall back to scipy correlate
                os = []
                for i in range(self.outrate // self.inrate):
                    os.append(correlate(data, self.thiskernel[i], mode="valid"))
                out = np.vstack(os)
                out = np.moveaxis(out, -1, -2)
        else:
            # downsample
            # FIXME: implement a strided correlation, rather than doing unnecessary
            # calculations
            out = correlate(data, self.thiskernel, mode="valid")[
                ..., :: self.inrate // self.outrate
            ]
        return out.reshape(outshape)

    def resample_torch(
        self, data0: TorchArray, outshape: tuple[int, ...]
    ) -> TorchArray:
        """Correlate the data with the kernel.

        Args:
            data0:
                TorchArray, the data to be up/downsampled
            outshape:
                tuple[int, ...], the shape of the output array

        Returns:
            TorchArray, the resulting array of the up/downsamping
        """
        if not TORCH_AVAILABLE:
            raise ImportError(
                "PyTorch is not installed. Install it with 'pip install sgn-ts[torch]'"
            )

        if self.outrate > self.inrate:  # upsample
            if self.use_gstlal_cpu_upsample and GSTLAL_AVAILABLE:
                # Use gstlal (handles torch->numpy->torch conversion)
                data = data0.view(-1, data0.shape[-1])
                out = self.upsample_gstlal(data)
                return out.view(outshape)
            else:
                # Use PyTorch conv1d
                data = data0.view(-1, 1, data0.shape[-1])
                thiskernel = self.thiskernel

                # Convert data to match kernel's dtype if necessary
                if data.dtype != thiskernel.dtype:
                    data = data.to(thiskernel.dtype)

                out = Fconv1d(data, thiskernel)
                out = out.mT.reshape(data.shape[0], -1)
                return out.view(outshape)
        else:  # downsample
            data = data0.view(-1, 1, data0.shape[-1])
            thiskernel = self.thiskernel

            # Convert data to match kernel's dtype if necessary
            if data.dtype != thiskernel.dtype:
                data = data.to(thiskernel.dtype)

            out = Fconv1d(data, thiskernel, stride=self.inrate // self.outrate)
            out = out.squeeze(1)

        return out.view(outshape)

    @transform.one_to_one
    def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
        """Resample input frame to output sample rate."""
        assert input_frame.sample_rate == self.inrate, (
            f"Frame sample rate {input_frame.sample_rate} doesn't match "
            f"resampler input rate {self.inrate}"
        )

        if input_frame.shape[-1] == 0:
            buf = SeriesBuffer(
                offset=output_frame.offset,
                sample_rate=self.outrate,
                data=None,
                shape=input_frame.shape,
            )
            output_frame.append(buf)
        else:
            for buf in input_frame:
                shape = input_frame.shape[:-1] + (
                    Offset.tosamples(output_frame.noffset, self.outrate),
                )
                if buf.is_gap:
                    data = None
                else:
                    data = self.resample(buf.data, shape)
                buf = buf.copy(
                    offset=output_frame.offset,
                    sample_rate=self.outrate,
                    data=data,
                    shape=shape,
                )
                output_frame.append(buf)

downkernel(factor)

Compute the kernel for downsampling. Modified from gstlal_interpolator.c

This is a sinc windowed sinc function kernel The baseline kernel is defined as

g[k] = sin(pi / f * (k-c)) / (pi / f * (k-c)) * (1 - (k-c)^2 / c / c) k != c g[k] = 1 k = c

Where:

f: downsample factor, must be power of 2, e.g., 2, 4, 8, ...
c: defined as half the full kernel length

You specify the half filter length at the target rate in samples, the kernel length is then given by:

kernel_length = half_length_at_original_rate * 2 * f + 1

Parameters:

Name Type Description Default
factor int

int, factor = inrate/outrate

required

Returns:

Type Description
Array

Array, the downsampling kernel

Source code in sgnts/transforms/resampler.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def downkernel(self, factor: int) -> Array:
    """Compute the kernel for downsampling. Modified from gstlal_interpolator.c

    This is a sinc windowed sinc function kernel
    The baseline kernel is defined as

    g[k] = sin(pi / f * (k-c)) / (pi / f * (k-c)) * (1 - (k-c)^2 / c / c)   k != c
    g[k] = 1                                                                k = c

    Where:

        f: downsample factor, must be power of 2, e.g., 2, 4, 8, ...
        c: defined as half the full kernel length

    You specify the half filter length at the target rate in samples,
    the kernel length is then given by:

        kernel_length = half_length_at_original_rate * 2 * f + 1


    Args:
        factor:
            int, factor = inrate/outrate

    Returns:
        Array, the downsampling kernel
    """
    kernel_length = int(2 * self.half_length + 1)

    # the domain should be the kernel_length divided by two
    c = kernel_length // 2
    x = np.arange(-c, c + 1)
    vecs = np.sinc(x / factor) * np.sinc(x / c)
    if self.gstlal_norm:
        norm = np.linalg.norm(vecs) * factor**0.5
    else:
        norm = sum(vecs)
    vecs = vecs / norm
    return vecs.reshape(1, -1)

process(input_frame, output_frame)

Resample input frame to output sample rate.

Source code in sgnts/transforms/resampler.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@transform.one_to_one
def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
    """Resample input frame to output sample rate."""
    assert input_frame.sample_rate == self.inrate, (
        f"Frame sample rate {input_frame.sample_rate} doesn't match "
        f"resampler input rate {self.inrate}"
    )

    if input_frame.shape[-1] == 0:
        buf = SeriesBuffer(
            offset=output_frame.offset,
            sample_rate=self.outrate,
            data=None,
            shape=input_frame.shape,
        )
        output_frame.append(buf)
    else:
        for buf in input_frame:
            shape = input_frame.shape[:-1] + (
                Offset.tosamples(output_frame.noffset, self.outrate),
            )
            if buf.is_gap:
                data = None
            else:
                data = self.resample(buf.data, shape)
            buf = buf.copy(
                offset=output_frame.offset,
                sample_rate=self.outrate,
                data=data,
                shape=shape,
            )
            output_frame.append(buf)

resample_numpy(data0, outshape)

Correlate the data with the kernel.

Parameters:

Name Type Description Default
data0 NumpyArray

Array, the data to be up/downsampled

required
outshape tuple[int, ...]

tuple[int, ...], the shape of the output array

required

Returns:

Type Description
NumpyArray

Array, the resulting array of the up/downsamping

Source code in sgnts/transforms/resampler.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def resample_numpy(
    self, data0: NumpyArray, outshape: tuple[int, ...]
) -> NumpyArray:
    """Correlate the data with the kernel.

    Args:
        data0:
            Array, the data to be up/downsampled
        outshape:
            tuple[int, ...], the shape of the output array

    Returns:
        Array, the resulting array of the up/downsamping
    """
    data = data0.reshape(-1, data0.shape[-1])

    if self.outrate > self.inrate:
        # upsample
        if self.use_gstlal_cpu_upsample and GSTLAL_AVAILABLE:
            # Use fast C-based gstlal implementation
            out = self.upsample_gstlal(data)
        else:
            # Fall back to scipy correlate
            os = []
            for i in range(self.outrate // self.inrate):
                os.append(correlate(data, self.thiskernel[i], mode="valid"))
            out = np.vstack(os)
            out = np.moveaxis(out, -1, -2)
    else:
        # downsample
        # FIXME: implement a strided correlation, rather than doing unnecessary
        # calculations
        out = correlate(data, self.thiskernel, mode="valid")[
            ..., :: self.inrate // self.outrate
        ]
    return out.reshape(outshape)

resample_torch(data0, outshape)

Correlate the data with the kernel.

Parameters:

Name Type Description Default
data0 TorchArray

TorchArray, the data to be up/downsampled

required
outshape tuple[int, ...]

tuple[int, ...], the shape of the output array

required

Returns:

Type Description
TorchArray

TorchArray, the resulting array of the up/downsamping

Source code in sgnts/transforms/resampler.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def resample_torch(
    self, data0: TorchArray, outshape: tuple[int, ...]
) -> TorchArray:
    """Correlate the data with the kernel.

    Args:
        data0:
            TorchArray, the data to be up/downsampled
        outshape:
            tuple[int, ...], the shape of the output array

    Returns:
        TorchArray, the resulting array of the up/downsamping
    """
    if not TORCH_AVAILABLE:
        raise ImportError(
            "PyTorch is not installed. Install it with 'pip install sgn-ts[torch]'"
        )

    if self.outrate > self.inrate:  # upsample
        if self.use_gstlal_cpu_upsample and GSTLAL_AVAILABLE:
            # Use gstlal (handles torch->numpy->torch conversion)
            data = data0.view(-1, data0.shape[-1])
            out = self.upsample_gstlal(data)
            return out.view(outshape)
        else:
            # Use PyTorch conv1d
            data = data0.view(-1, 1, data0.shape[-1])
            thiskernel = self.thiskernel

            # Convert data to match kernel's dtype if necessary
            if data.dtype != thiskernel.dtype:
                data = data.to(thiskernel.dtype)

            out = Fconv1d(data, thiskernel)
            out = out.mT.reshape(data.shape[0], -1)
            return out.view(outshape)
    else:  # downsample
        data = data0.view(-1, 1, data0.shape[-1])
        thiskernel = self.thiskernel

        # Convert data to match kernel's dtype if necessary
        if data.dtype != thiskernel.dtype:
            data = data.to(thiskernel.dtype)

        out = Fconv1d(data, thiskernel, stride=self.inrate // self.outrate)
        out = out.squeeze(1)

    return out.view(outshape)

upkernel(factor)

Compute the kernel for upsampling. Modified from gstlal_interpolator.c

This is a sinc windowed sinc function kernel The baseline kernel is defined as

\[\begin{align} g(k) &= \sin(\pi / f * (k-c)) / (\pi / f * (k-c)) * (1 - (k-c)^2 / c / c) & k != c \\ g(k) &= 1 & k = c \end{align}\]

Where:

f: interpolation factor, must be power of 2, e.g., 2, 4, 8, ...
c: defined as half the full kernel length

You specify the half filter length at the original rate in samples, the kernel length is then given by:

kernel_length = half_length_at_original_rate * 2 * f + 1

Interpolation is then defined as a two step process. First the input data is zero filled to bring it up to the new sample rate, i.e., the input data, x, is transformed to x' such that:

x'[i] = x[i/f] if (i%f) == 0 = 0 if (i%f) > 0

y[i] = sum_{k=0}^{2c+1} x'[i-k] g[k]

Since more than half the terms in this series would be zero, the convolution is implemented by breaking up the kernel into f separate kernels each 1/f as large as the originalcalled z, i.e.,:

z[0][k/f] = g[kf] z[1][k/f] = g[kf+1] ... z[f-1][k/f] = g[k*f + f-1]

Now the convolution can be written as:

y[i] = sum_{k=0}^{2c/f+1} x[i/f] z[i%f][k]

which avoids multiplying zeros. Note also that by construction the sinc function has its zeros arranged such that z[0][:] had only one nonzero sample at its center. Therefore the actual convolution is:

y[i] = x[i/f] if i%f == 0 y[i] = sum_{k=0}^{2c/f+1} x[i/f] z[i%f][k] otherwise

Parameters:

Name Type Description Default
factor int

int, factor = outrate/inrate

required

Returns:

Type Description
Array

Array, the upsampling kernel

Source code in sgnts/transforms/resampler.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def upkernel(self, factor: int) -> Array:
    """Compute the kernel for upsampling. Modified from gstlal_interpolator.c

    This is a sinc windowed sinc function kernel
    The baseline kernel is defined as

    $$\\begin{align}
    g(k) &= \\sin(\\pi / f * (k-c)) /
            (\\pi / f * (k-c)) * (1 - (k-c)^2 / c / c)  & k != c \\\\
    g(k) &= 1 & k = c
    \\end{align}$$

    Where:

        f: interpolation factor, must be power of 2, e.g., 2, 4, 8, ...
        c: defined as half the full kernel length

    You specify the half filter length at the original rate in samples,
    the kernel length is then given by:

        kernel_length = half_length_at_original_rate * 2 * f + 1

    Interpolation is then defined as a two step process.  First the
    input data is zero filled to bring it up to the new sample rate,
    i.e., the input data, x, is transformed to x' such that:

    x'[i] = x[i/f]	if (i%f) == 0
          = 0       if (i%f) > 0

    y[i] = sum_{k=0}^{2c+1} x'[i-k] g[k]

    Since more than half the terms in this series would be zero, the
    convolution is implemented by breaking up the kernel into f separate
    kernels each 1/f as large as the originalcalled z, i.e.,:

    z[0][k/f] = g[k*f]
    z[1][k/f] = g[k*f+1]
    ...
    z[f-1][k/f] = g[k*f + f-1]

    Now the convolution can be written as:

    y[i] = sum_{k=0}^{2c/f+1} x[i/f] z[i%f][k]

    which avoids multiplying zeros.  Note also that by construction the
    sinc function has its zeros arranged such that z[0][:] had only one
    nonzero sample at its center. Therefore the actual convolution is:

    y[i] = x[i/f]					if i%f == 0
    y[i] = sum_{k=0}^{2c/f+1} x[i/f] z[i%f][k]	otherwise


    Args:
        factor:
            int, factor = outrate/inrate

    Returns:
        Array, the upsampling kernel
    """
    kernel_length = int(2 * self.half_length * factor + 1)
    sub_kernel_length = int(2 * self.half_length + 1)

    # the domain should be the kernel_length divided by two
    c = kernel_length // 2
    x = np.arange(-c, c + 1)
    out = np.sinc(x / factor) * np.sinc(x / c)
    out = np.pad(out, (0, factor - 1))
    # FIXME: check if interleave same as no interleave
    vecs = out.reshape(-1, factor).T[:, ::-1]

    return vecs.reshape(int(factor), 1, sub_kernel_length)

upsample_gstlal(data)

Upsample using gstlal implementation.

Handles both numpy arrays and torch tensors.

Parameters:

Name Type Description Default
data

Input data (numpy array or torch tensor), shape (-1, n_samples)

required

Returns:

Type Description

Upsampled data (same type as input), not reshaped

Source code in sgnts/transforms/resampler.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def upsample_gstlal(self, data):
    """Upsample using gstlal implementation.

    Handles both numpy arrays and torch tensors.

    Args:
        data: Input data (numpy array or torch tensor), shape (-1, n_samples)

    Returns:
        Upsampled data (same type as input), not reshaped
    """
    # Check if input is torch tensor
    is_torch = TORCH_AVAILABLE and torch.is_tensor(data)
    if is_torch:
        # Convert torch -> numpy
        torch_device = data.device
        torch_dtype = data.dtype
        data_np = data.cpu().numpy()
    else:
        data_np = data

    # Call gstlal
    factor = self.outrate // self.inrate
    out_np = upsample_transposed(
        data_np, factor=factor, half_length=self.half_length
    )

    # Convert back to torch if needed
    if is_torch:
        out = torch.from_numpy(out_np).to(torch_device).to(torch_dtype)
    else:
        out = out_np

    return out