Skip to content

sgn.sources

Source elements for generating data streams.

New classes need not be subclassed from DequeSource, but should at least be ultimately a subclass of SourceElement.

DequeSource dataclass

Bases: IterSource

A source element that has one double-ended-queue (deque ) per source pad. The end of stream is controlled by setting an optional limit on the number of times a deque can be empty before EOS is signaled.

Parameters:

Name Type Description Default
iters Optional[dict[str, Iterable[Any]]]

dict[str, deque ], a mapping of source pads to deque s, where the key is the pad name and the value is the deque

None
eos_on_empty Union[dict[str, bool], bool]

Union[dict[str, bool], bool], default True, a mapping of source pads to boolean values, where the key is the pad name and the value is the boolean. If a bool is given, the value is applied to all pads. If True, EOS is signaled when the deque is empty.

True
Source code in sgn/sources.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
@dataclass
class DequeSource(IterSource):
    """A source element that has one double-ended-queue (deque ) per source pad. The end
    of stream is controlled by setting an optional limit on the number of times a deque
    can be empty before EOS is signaled.

    Args:
        iters:
            dict[str, deque ], a mapping of source pads to deque s, where the
            key is the pad name and the value is the deque
        eos_on_empty:
            Union[dict[str, bool], bool], default True, a mapping of source
            pads to boolean values, where the key is the pad name and the value
            is the boolean. If a bool is given, the value is applied to all
            pads. If True, EOS is signaled when the deque is empty.
    """

    def _coerce_iterator(self, iterable):
        """Coerce the iterable to an iterator if it is not already one.

        Args:
            iterable:
                Iterable, the iterable to coerce

        Returns:
            Iterator, the iterator
        """
        return deque(iterable)

    def _get_value(self, deque):
        """Get the next value from the deque.

        Args:
            deque :
                deque , the deque to get the value from

        Returns:
            Any, the next value from the deque
        """
        try:
            return deque.pop()
        except IndexError:
            return None

    @property
    def deques(self) -> dict[str, Iterable]:
        """Get the iters property with more explicit alias."""
        assert isinstance(self.iters, dict)
        return self.iters

deques property

Get the iters property with more explicit alias.

IterSource dataclass

Bases: SourceElement

A source element that has one iterable per source pad. The end of stream is controlled by setting an optional limit on the number of times a deque can be empty before EOS is signaled.

Parameters:

Name Type Description Default
iters Optional[dict[str, Iterable[Any]]]

dict[str, Iterable[Any]], a mapping of source pads to iterables, where the key is the pad name and the value is the Iterable. These will be coerced to iterators, so they can be any iterable type.

None
eos_on_empty Union[dict[str, bool], bool]

Union[dict[str, bool], bool], default True, a mapping of source pads to boolean values, where the key is the pad name and the value is the boolean. If a bool is given, the value is applied to all pads. If True, EOS is signaled when the iterator is empty.

True
Source code in sgn/sources.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@dataclass
class IterSource(SourceElement):
    """A source element that has one iterable per source pad. The end of stream is
    controlled by setting an optional limit on the number of times a deque can be empty
    before EOS is signaled.

    Args:
        iters:
            dict[str, Iterable[Any]], a mapping of source pads to iterables,
            where the key is the pad name and the value is the Iterable. These
            will be coerced to iterators, so they can be any iterable type.
        eos_on_empty:
            Union[dict[str, bool], bool], default True, a mapping of source
            pads to boolean values, where the key is the pad name and the value
            is the boolean. If a bool is given, the value is applied to all
            pads. If True, EOS is signaled when the iterator is empty.
    """

    iters: Optional[dict[str, Iterable[Any]]] = None
    eos_on_empty: Union[dict[str, bool], bool] = True
    frame_factory: Callable = Frame

    def __post_init__(self):
        """Post init checks for the DequeSource element."""
        super().__post_init__()
        # Setup pad counts
        self._setup_iters()
        self._setup_eos_on_empty()
        self._validate_iters()
        self._validate_eos_on_empty()

    def _setup_iters(self):
        # Setup the iter_map if not given
        if self.iters is None:
            self.iters = {
                pad.name: self._coerce_iterator([]) for pad in self.source_pads
            }
        else:
            self.iters = {
                name: self._coerce_iterator(iterable)
                for name, iterable in self.iters.items()
            }

    def _setup_eos_on_empty(self):
        # Setup the limits if not given
        if isinstance(self.eos_on_empty, bool):
            self.eos_on_empty = {
                pad.name: self.eos_on_empty for pad in self.source_pads
            }

    def _validate_iters(self):
        # Check that the deque_map has the correct number of deque s
        if not len(self.iters) == len(self.source_pads):
            raise ValueError("The number of deque s must match the number of pads")

        # Check that the deque_map has the correct pad names
        for pad_name in self.iters:
            if pad_name not in [pad.name for pad in self.source_pads]:
                raise ValueError(
                    "DequeSource has a deque for a pad that does not exist, "
                    f"got: {pad_name}, options are: {self.source_pad_names}"
                )

    def _validate_eos_on_empty(self):
        # Check that the limits has the correct number of limits
        if not len(self.eos_on_empty) == len(self.source_pads):
            raise ValueError("The number of eos on empty must match the number of pads")

        # Check that the limits has the correct pad names
        for pad_name in self.eos_on_empty:
            if pad_name not in [pad.name for pad in self.source_pads]:
                raise ValueError(
                    f"DequeSource has a eos on empty for a pad that does not exist, "
                    f"got: {pad_name}, options are: {self.source_pad_names}"
                )

    def _coerce_iterator(self, iterable):
        """Coerce the iterable to an iterator if it is not already one.

        Args:
            iterable:
                Iterable, the iterable to coerce

        Returns:
            Iterator, the iterator
        """
        # Check if already an iterator or generator
        if isinstance(iterable, (Iterator, Generator)):
            return iterable

        return iter(iterable)

    def _get_value(self, iterator):
        """Get the next value from the iterator.

        Args:
            iterator:
                Iterator, the iterator to get the value from

        Returns:
            Any, the next value from the iterator
        """
        try:
            return next(iterator)
        except StopIteration:
            return None

    def update(self, pad: SourcePad):
        """Update the iterator for the pad. This is a no-op for IterSource. For
        subclasses that need to update the iterator, this method should be overridden.
        Examples include reading from a file or network stream.

        Args:
            pad:
                SourcePad, the pad to update
        """
        pass

    def new(self, pad: SourcePad) -> Frame:
        """New Frames are created on "pad" with an instance specific count and a name
        derived from the pad name. EOS is set if we have surpassed the requested number
        of Frames.

        Args:
            pad:
                SourcePad, the pad for which to produce a new Frame

        Returns:
            Frame, the Frame with optional data payload
        """
        # Update the pad iterator
        self.update(pad=pad)

        # Get the pad iterator
        assert isinstance(self.iters, dict)
        assert isinstance(self.eos_on_empty, dict)
        pad_iter = self.iters[pad.name]
        pad_eos_on_empty = self.eos_on_empty[pad.name]

        # Get data from the iterator
        data = self._get_value(pad_iter)

        # Return the frame
        return self.frame_factory(EOS=data is None and pad_eos_on_empty, data=data)

__post_init__()

Post init checks for the DequeSource element.

Source code in sgn/sources.py
141
142
143
144
145
146
147
148
def __post_init__(self):
    """Post init checks for the DequeSource element."""
    super().__post_init__()
    # Setup pad counts
    self._setup_iters()
    self._setup_eos_on_empty()
    self._validate_iters()
    self._validate_eos_on_empty()

new(pad)

New Frames are created on "pad" with an instance specific count and a name derived from the pad name. EOS is set if we have surpassed the requested number of Frames.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad for which to produce a new Frame

required

Returns:

Type Description
Frame

Frame, the Frame with optional data payload

Source code in sgn/sources.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def new(self, pad: SourcePad) -> Frame:
    """New Frames are created on "pad" with an instance specific count and a name
    derived from the pad name. EOS is set if we have surpassed the requested number
    of Frames.

    Args:
        pad:
            SourcePad, the pad for which to produce a new Frame

    Returns:
        Frame, the Frame with optional data payload
    """
    # Update the pad iterator
    self.update(pad=pad)

    # Get the pad iterator
    assert isinstance(self.iters, dict)
    assert isinstance(self.eos_on_empty, dict)
    pad_iter = self.iters[pad.name]
    pad_eos_on_empty = self.eos_on_empty[pad.name]

    # Get data from the iterator
    data = self._get_value(pad_iter)

    # Return the frame
    return self.frame_factory(EOS=data is None and pad_eos_on_empty, data=data)

update(pad)

Update the iterator for the pad. This is a no-op for IterSource. For subclasses that need to update the iterator, this method should be overridden. Examples include reading from a file or network stream.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad to update

required
Source code in sgn/sources.py
226
227
228
229
230
231
232
233
234
235
def update(self, pad: SourcePad):
    """Update the iterator for the pad. This is a no-op for IterSource. For
    subclasses that need to update the iterator, this method should be overridden.
    Examples include reading from a file or network stream.

    Args:
        pad:
            SourcePad, the pad to update
    """
    pass

NullSource dataclass

Bases: SourceElement, SignalEOS

A source that does precisely nothing.

It is useful for testing and debugging, and will always produce empty frames

frame_factory: Callable = Frame
wait: float = None
num_frames: int = None

If wait is not None the source will block for wait seconds before each new buffer, which is useful for slowing down debugging pipelines. By default this source element handles SIGINT and uses that to set EOS. See SignalEOS. In order to use this feature, the pipeline must be run within the SignalEOS context manager, e.g.,

with SignalEOS() as signal_eos:
    p.run()
Source code in sgn/sources.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@dataclass
class NullSource(SourceElement, SignalEOS):
    """A source that does precisely nothing.

    It is useful for testing and debugging, and will always produce empty frames

        frame_factory: Callable = Frame
        wait: float = None
        num_frames: int = None

    If wait is not None the source will block for wait seconds before each new
    buffer, which is useful for slowing down debugging pipelines.  By default
    this source element handles SIGINT and uses that to set EOS. See SignalEOS.
    In order to use this feature, the pipeline must be run within the SignalEOS
    context manager, e.g.,

        with SignalEOS() as signal_eos:
            p.run()
    """

    frame_factory: Callable = Frame
    wait: Optional[float] = None
    num_frames: Optional[int] = None

    def __post_init__(self):
        super().__post_init__()
        self.frame_count = 0

    def new(self, pad: SourcePad) -> Frame:
        """New Frames are created on "pad" with an instance specific count and a name
        derived from the pad name. EOS is set if we have surpassed the requested number
        of Frames.

        Args:
            pad:
                SourcePad, the pad for which to produce a new Frame

        Returns:
            Frame, the Frame with optional data payload
        """
        if self.wait is not None:
            sleep(self.wait)
        self.frame_count += 1
        return self.frame_factory(
            EOS=self.signaled_eos()
            or (self.num_frames is not None and self.frame_count > self.num_frames),
            data=None,
        )

new(pad)

New Frames are created on "pad" with an instance specific count and a name derived from the pad name. EOS is set if we have surpassed the requested number of Frames.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad for which to produce a new Frame

required

Returns:

Type Description
Frame

Frame, the Frame with optional data payload

Source code in sgn/sources.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def new(self, pad: SourcePad) -> Frame:
    """New Frames are created on "pad" with an instance specific count and a name
    derived from the pad name. EOS is set if we have surpassed the requested number
    of Frames.

    Args:
        pad:
            SourcePad, the pad for which to produce a new Frame

    Returns:
        Frame, the Frame with optional data payload
    """
    if self.wait is not None:
        sleep(self.wait)
    self.frame_count += 1
    return self.frame_factory(
        EOS=self.signaled_eos()
        or (self.num_frames is not None and self.frame_count > self.num_frames),
        data=None,
    )

SignalEOS

This class provides global signal handling for an SGN pipeline. If you inherit it for a source element then it will capture SIGINT and provide a method to mark that eos should be flagged. See NullSource as an example.

Additionally this must be used as a context manager for executing a pipeline and disabling the signal hander after the pipeline is done, e.g.,

with SignalEOS() as signal_eos:
    p.run()
Source code in sgn/sources.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class SignalEOS:
    """
    This class provides global signal handling for an SGN pipeline.  If you
    inherit it for a source element then it will capture SIGINT and provide a
    method to mark that eos should be flagged.  See NullSource as an example.

    Additionally this must be used as a context manager for executing
    a pipeline and disabling the signal hander after the pipeline is done, e.g.,

        with SignalEOS() as signal_eos:
            p.run()

    """

    handled_signals = {signal.SIGINT}
    rcvd_signals: set[int] = set([])
    previous_handlers: dict[int, Callable] = {}

    @classmethod
    def signaled_eos(cls):
        """Returns true of the intersection of received signals and handled
        signals is nonzero.  This can be used by developers to decide if EOS should be
        set"""
        return bool(cls.rcvd_signals & cls.handled_signals)

    def raise_signal(self, sig):
        """Intended to be used after a statment executed as a context manager if
        the application needs to re-raise one of the signals with the previous signal
        handler.  NOTE - this will only raise the signal if it had been previously
        raised"""
        if sig in SignalEOS.rcvd_signals:
            signal.raise_signal(sig)

    def __enter__(self):
        """Store the previous signal handlers and setup new ones for the
        handled signals"""
        for sig in SignalEOS.handled_signals:
            SignalEOS.previous_handlers[sig] = signal.getsignal(sig)
            signal.signal(sig, _handler)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Restore the original signal handlers"""
        for sig in SignalEOS.handled_signals:
            signal.signal(sig, SignalEOS.previous_handlers[sig])

__enter__()

Store the previous signal handlers and setup new ones for the handled signals

Source code in sgn/sources.py
55
56
57
58
59
60
61
def __enter__(self):
    """Store the previous signal handlers and setup new ones for the
    handled signals"""
    for sig in SignalEOS.handled_signals:
        SignalEOS.previous_handlers[sig] = signal.getsignal(sig)
        signal.signal(sig, _handler)
    return self

__exit__(exc_type, exc_val, exc_tb)

Restore the original signal handlers

Source code in sgn/sources.py
63
64
65
66
def __exit__(self, exc_type, exc_val, exc_tb):
    """Restore the original signal handlers"""
    for sig in SignalEOS.handled_signals:
        signal.signal(sig, SignalEOS.previous_handlers[sig])

raise_signal(sig)

Intended to be used after a statment executed as a context manager if the application needs to re-raise one of the signals with the previous signal handler. NOTE - this will only raise the signal if it had been previously raised

Source code in sgn/sources.py
47
48
49
50
51
52
53
def raise_signal(self, sig):
    """Intended to be used after a statment executed as a context manager if
    the application needs to re-raise one of the signals with the previous signal
    handler.  NOTE - this will only raise the signal if it had been previously
    raised"""
    if sig in SignalEOS.rcvd_signals:
        signal.raise_signal(sig)

signaled_eos() classmethod

Returns true of the intersection of received signals and handled signals is nonzero. This can be used by developers to decide if EOS should be set

Source code in sgn/sources.py
40
41
42
43
44
45
@classmethod
def signaled_eos(cls):
    """Returns true of the intersection of received signals and handled
    signals is nonzero.  This can be used by developers to decide if EOS should be
    set"""
    return bool(cls.rcvd_signals & cls.handled_signals)