Skip to content

sgn.control

HTTPControl

Bases: SignalEOS

A context manager that stores a bottle app running in a separate thread.

If you have a pipeline called p do,

with HTTPControl() as control: p.run()

The bottle process is started and stoped on enter and exit. This class inherits SignalEOS context manager actions too, becuase otherwise bottle will respond to ctrl+C and generally you want to deal with both signals and bottle contexts in a coherent way when executing a pipeline, so this implementation tries to do that.

Source code in sgn/control.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class HTTPControl(SignalEOS):
    """A context manager that stores a bottle app running in a separate thread.

    If you have a pipeline called p do,

    with HTTPControl() as control:
        p.run()

    The bottle process is started and stoped on enter and exit. This class
    inherits SignalEOS context manager actions too, becuase otherwise bottle will
    respond to ctrl+C and generally you want to deal with both signals and bottle
    contexts in a coherent way when executing a pipeline, so this implementation
    tries to do that.
    """

    port = 8080
    try:
        host = socket.gethostbyname(socket.gethostname())
    except socket.gaierror:
        host = socket.gethostbyname("localhost")
    post_queues: dict[str, Queue] = {}
    get_queues: dict[str, Queue] = {}
    http_thread = None
    registry_file = "registry.txt"
    tag = None

    def __enter__(self):
        # The bottle thread doesn't want to die without daemon mode (which
        # doesn't kill it, it just lets the program die) FIXME
        HTTPControl.http_thread = Thread(
            target=run_bottle_app,
            kwargs={
                "post_queues": HTTPControl.post_queues,
                "get_queues": HTTPControl.get_queues,
                "port": HTTPControl.port,
                "host": HTTPControl.host,
                "tag": HTTPControl.tag,
            },
            daemon=True,
        )
        HTTPControl.http_thread.start()  # Start the Bottle app as a subthread
        LOGGER.info(
            "Bottle app running on http://%s:%s", HTTPControl.host, HTTPControl.port
        )
        with open(HTTPControl.registry_file, "w") as f:
            f.write("http://%s:%s" % (HTTPControl.host, HTTPControl.port))
        super().__enter__()
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        HTTPControl.http_thread.join(3.0)  # Wait for the subthread to clean up
        super().__exit__(exc_type, exc_value, exc_traceback)

    @classmethod
    def exchange_state(cls, name, state_dict):
        """Automate the common task of reading and writing state to an element.
        name is the name of an element (which is the key of both get and post queues)
        and state_dict is a dictionary of state variables with **correct** types that
        can be coerced out of json.  This will mostly work out of the box for simple
        data types like ints and floats and strings, but complicated data will probably
        not work.  FIXME consider supporting more complex types if it comes up.

        HTTPControl.exchange_state(<elem name>, state_dict) will drain the post
        queue and update matching keys in statdict with the contents of the postqueue.
        The postqueue is not preserved so if you call it again immediately the
        postqueue is likely to be empty resulting in no change to state_dict.
        """
        while not cls.post_queues[name].empty():
            postdata = cls.post_queues[name].get()
            for k in state_dict:
                if k in postdata:
                    state_dict[k] = type(state_dict[k])(postdata[k])
            cls.post_queues[name].task_done()
        cls.post_queues[name].join()
        # drain the get queue and put data into it
        while not cls.get_queues[name].empty():
            cls.get_queues[name].get()
            cls.get_queues[name].task_done()
        cls.get_queues[name].join()
        cls.get_queues[name].put(state_dict)

exchange_state(name, state_dict) classmethod

Automate the common task of reading and writing state to an element. name is the name of an element (which is the key of both get and post queues) and state_dict is a dictionary of state variables with correct types that can be coerced out of json. This will mostly work out of the box for simple data types like ints and floats and strings, but complicated data will probably not work. FIXME consider supporting more complex types if it comes up.

HTTPControl.exchange_state(, state_dict) will drain the post queue and update matching keys in statdict with the contents of the postqueue. The postqueue is not preserved so if you call it again immediately the postqueue is likely to be empty resulting in no change to state_dict.

Source code in sgn/control.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@classmethod
def exchange_state(cls, name, state_dict):
    """Automate the common task of reading and writing state to an element.
    name is the name of an element (which is the key of both get and post queues)
    and state_dict is a dictionary of state variables with **correct** types that
    can be coerced out of json.  This will mostly work out of the box for simple
    data types like ints and floats and strings, but complicated data will probably
    not work.  FIXME consider supporting more complex types if it comes up.

    HTTPControl.exchange_state(<elem name>, state_dict) will drain the post
    queue and update matching keys in statdict with the contents of the postqueue.
    The postqueue is not preserved so if you call it again immediately the
    postqueue is likely to be empty resulting in no change to state_dict.
    """
    while not cls.post_queues[name].empty():
        postdata = cls.post_queues[name].get()
        for k in state_dict:
            if k in postdata:
                state_dict[k] = type(state_dict[k])(postdata[k])
        cls.post_queues[name].task_done()
    cls.post_queues[name].join()
    # drain the get queue and put data into it
    while not cls.get_queues[name].empty():
        cls.get_queues[name].get()
        cls.get_queues[name].task_done()
    cls.get_queues[name].join()
    cls.get_queues[name].put(state_dict)

HTTPControlSinkElement dataclass

Bases: SinkElement, HTTPControl

A lightweight subclass of SinkElement that defaults to setting up post and get routes based on the provided element name. HTTP Queues are limited to a size of queuesize of 1. Posts will always succeed by draining the queue first. Gets will preserve the data in the queue

Source code in sgn/control.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@dataclass
class HTTPControlSinkElement(SinkElement, HTTPControl):
    """A lightweight subclass of SinkElement that defaults to setting up post
    and get routes based on the provided element name.  HTTP Queues are limited
    to a size of queuesize of 1. Posts will always succeed by draining the queue
    first.  Gets will preserve the data in the queue"""

    queuesize: int = 1

    def __post_init__(self):
        SinkElement.__post_init__(self)
        queuesize = 1
        HTTPControl.post_queues[self.name] = Queue(queuesize)
        HTTPControl.get_queues[self.name] = Queue(queuesize)

HTTPControlSourceElement dataclass

Bases: SourceElement, HTTPControl

A lightweight subclass of SourceElement that defaults to setting up post and get routes based on the provided element name. HTTP Queues are limited to a size of queuesize of 1. Posts will always succeed by draining the queue first. Gets will preserve the data in the queue

Source code in sgn/control.py
202
203
204
205
206
207
208
209
210
211
212
213
@dataclass
class HTTPControlSourceElement(SourceElement, HTTPControl):
    """A lightweight subclass of SourceElement that defaults to setting up post
    and get routes based on the provided element name.  HTTP Queues are limited
    to a size of queuesize of 1. Posts will always succeed by draining the queue
    first.  Gets will preserve the data in the queue"""

    def __post_init__(self):
        SourceElement.__post_init__(self)
        queuesize = 1
        HTTPControl.post_queues[self.name] = Queue(queuesize)
        HTTPControl.get_queues[self.name] = Queue(queuesize)

HTTPControlTransformElement dataclass

Bases: TransformElement, HTTPControl

A lightweight subclass of TransformElement that defaults to setting up post and get routes based on the provided element name. HTTP Queues are limited to a size of queuesize of 1. Posts will always succeed by draining the queue first. Gets will preserve the data in the queue

Source code in sgn/control.py
216
217
218
219
220
221
222
223
224
225
226
227
@dataclass
class HTTPControlTransformElement(TransformElement, HTTPControl):
    """A lightweight subclass of TransformElement that defaults to setting up post
    and get routes based on the provided element name.  HTTP Queues are limited
    to a size of queuesize of 1. Posts will always succeed by draining the queue
    first.  Gets will preserve the data in the queue"""

    def __post_init__(self):
        TransformElement.__post_init__(self)
        queuesize = 1
        HTTPControl.post_queues[self.name] = Queue(queuesize)
        HTTPControl.get_queues[self.name] = Queue(queuesize)

run_bottle_app(post_queues=None, get_queues=None, host='localhost', port=8080, tag=None)

A function that sets up post and get queues for a bottle server running on host:port.

post_queues is a dictionary whose keys define routes of the form http://host:port/post/key. The values are Queue objects where the data that is posted by the external request will be stored.

get_queues define the inverse, e.g., http://host:port/get/key and the Queue values are where the data for the request comes from.

Source code in sgn/control.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def run_bottle_app(
    post_queues=None, get_queues=None, host="localhost", port=8080, tag=None
):
    """A function that sets up post and get queues for a bottle server running
    on host:port.

    post_queues is a dictionary whose keys define routes of the form
    http://host:port/post/key. The values are Queue objects where the data that is
    posted by the external request will be stored.

    get_queues define the inverse, e.g., http://host:port/get/key and the Queue
    values are where the data for the request comes from.
    """
    app = Bottle()

    for postroute, postqueue in post_queues.items():

        def post(postqueue=postqueue):
            data = request.json  # Get JSON payload
            if data:
                # Drain the post queue
                while not postqueue.empty():
                    postqueue.get()
                    postqueue.task_done()
                # Then put in the data we have
                postqueue.join()
                postqueue.put(data)  # Put JSON payload into the queue
                return {"status": "success", "message": "Data received"}
            else:
                return {"status": "error", "message": "Invalid JSON"}

        app.route(
            "%s/post/%s" % ("" if tag is None else f"/{tag}", postroute),
            method="POST",
            callback=post,
        )

    for getroute, getqueue in get_queues.items():

        def get(
            getqueue=getqueue,
            key=None,
            key2=None,
            content_type="application",
            content_subtype="json",
        ):
            data = {}
            # Get the last data in the queue
            while not getqueue.empty():
                data = getqueue.get()
                getqueue.task_done()
            # Put a copy back in
            getqueue.join()
            getqueue.put(data)
            response.content_type = "%s/%s" % (content_type, content_subtype)
            if key is None:
                return json.dumps(data)
            elif key in data:
                if key2 is None:
                    if content_subtype == "json":
                        return json.dumps(data[key])
                    else:
                        return data[key]
                elif key2 in data[key]:
                    if content_subtype == "json":
                        return json.dumps(data[key][key2])
                    else:
                        return data[key][key2]
                else:
                    return {"status": "error", "message": f"{key2} not in data[{key}]"}
            else:
                return {"status": "error", "message": f"{key} not in data"}

        app.route(
            "%s/get/%s" % ("" if tag is None else f"/{tag}", getroute),
            method="GET",
            callback=get,
        )
        app.route(
            "%s/get/%s/<key>" % ("" if tag is None else f"/{tag}", getroute),
            method="GET",
            callback=get,
        )
        app.route(
            "%s/get/%s/<key>/<key2>" % ("" if tag is None else f"/{tag}", getroute),
            method="GET",
            callback=get,
        )
        app.route(
            "%s/get/<content_type>/<content_subtype>/%s/<key>"
            % ("" if tag is None else f"/{tag}", getroute),
            method="GET",
            callback=get,
        )
        app.route(
            "%s/get/<content_type>/<content_subtype>/%s/<key>/<key2>"
            % ("" if tag is None else f"/{tag}", getroute),
            method="GET",
            callback=get,
        )

    run(app, host=host, port=port, debug=True)