Expose the mqttc loop functions, and option to avoid the auto start

This also makes a cleaner `main()` by using `loop_forever()`

Default for auto_start_loop is backwards compatible and structured to
allow for changing the default later and/or raising a warning if not
set.
pull/4/head
Lee Begg 2021-05-22 21:11:37 +12:00
rodzic 2ca9e23404
commit 359123b22b
2 zmienionych plików z 30 dodań i 5 usunięć

Wyświetl plik

@ -13,6 +13,8 @@ import queue
S3_BUCKET = "sondehub-open-data" S3_BUCKET = "sondehub-open-data"
class LoopStartedError(Exception):
pass
class Stream: class Stream:
def __init__(self, def __init__(self,
@ -20,7 +22,8 @@ class Stream:
on_connect=None, on_connect=None,
on_message=None, on_message=None,
on_log=None, on_log=None,
on_disconnect=None, asJson=False): on_disconnect=None, asJson=False,
auto_start_loop=None):
self.mqttc = mqtt.Client(transport="websockets") self.mqttc = mqtt.Client(transport="websockets")
self._sondes = sondes self._sondes = sondes
self.asJson = asJson self.asJson = asJson
@ -28,6 +31,9 @@ class Stream:
self.on_message = on_message self.on_message = on_message
self.on_disconnect = on_disconnect self.on_disconnect = on_disconnect
self.on_log = on_log self.on_log = on_log
if auto_start_loop is None:
auto_start_loop = True
self.auto_start_loop = auto_start_loop
self.ws_connect() self.ws_connect()
@ -66,7 +72,8 @@ class Stream:
self.mqttc.connect(urlparts.netloc, 443, 60) self.mqttc.connect(urlparts.netloc, 443, 60)
except OSError: except OSError:
pass pass
self.mqttc.loop_start() if self.auto_start_loop:
self.mqttc.loop_start()
def get_url(self): def get_url(self):
conn = http.client.HTTPSConnection("api.v2.sondehub.org") conn = http.client.HTTPSConnection("api.v2.sondehub.org")
@ -108,6 +115,25 @@ class Stream:
def disconnect(self): def disconnect(self):
self.mqttc.disconnect() self.mqttc.disconnect()
def loop_start(self):
if self.auto_start_loop:
raise LoopStartedError()
self.mqttc.loop_start()
def loop_stop(self):
self.mqttc.loop_stop()
self.auto_start_loop = False
def loop_step(self, timeout=1.0):
if self.auto_start_loop:
raise LoopStartedError()
self.mqttc.loop(timeout)
def loop_forever(self):
if self.auto_start_loop:
raise LoopStartedError()
self.mqttc.loop_forever()
class Downloader(threading.Thread): class Downloader(threading.Thread):

Wyświetl plik

@ -47,9 +47,8 @@ def main():
): # we need to drop the default value if the user specifies specific sondes ): # we need to drop the default value if the user specifies specific sondes
args.sondes = args.sondes[1:] args.sondes = args.sondes[1:]
sondes = [item for sublist in args.sondes for item in sublist] sondes = [item for sublist in args.sondes for item in sublist]
test = sondehub.Stream(on_message=on_message, sondes=sondes) test = sondehub.Stream(on_message=on_message, sondes=sondes, auto_start_loop=False)
while 1: test.loop_forever()
time.sleep(0.01) # don't overwork the CPU waiting for events
if __name__ == "__main__": if __name__ == "__main__":