diff --git a/sondehub/__init__.py b/sondehub/__init__.py index 199bcf8..f528a2f 100644 --- a/sondehub/__init__.py +++ b/sondehub/__init__.py @@ -15,12 +15,21 @@ S3_BUCKET = "sondehub-open-data" class Stream: - def __init__(self, sondes: list = ["#"], on_connect=None, on_message=None): + def __init__(self, + sondes: list = ["#"], + on_connect=None, + on_message=None, + on_log=None, + on_disconnect=None, asJson=True): self.mqttc = mqtt.Client(transport="websockets") self._sondes = sondes - self.ws_connect() + self.asJson = asJson self.on_connect = on_connect self.on_message = on_message + self.on_disconnect = on_disconnect + self.on_log = on_log + self.ws_connect() + def add_sonde(self, sonde): if sonde not in self._sondes: @@ -53,7 +62,10 @@ class Stream: self.mqttc.tls_set() except ValueError: pass - self.mqttc.connect(urlparts.netloc, 443, 60) + try: + self.mqttc.connect(urlparts.netloc, 443, 60) + except OSError: + pass self.mqttc.loop_start() def get_url(self): @@ -65,7 +77,10 @@ class Stream: def _on_message(self, mqttc, obj, msg): if self.on_message: - self.on_message(json.loads(msg.payload)) + if self.asJson: + self.on_message(json.loads(msg.payload)) + else: + self.on_message(msg.payload) def _on_connect(self, mqttc, obj, flags, rc): for sonde in self._sondes: @@ -73,10 +88,20 @@ class Stream: if mqtt.MQTT_ERR_SUCCESS != rc: self.ws_connect() if self.on_connect: - self.on_connect() + self.on_connect(mqttc, obj, flags, rc) + + def _on_log(self, *args, **kwargs): + if self.on_log: + self.on_log(*args, **kwargs) def _on_disconnect(self, client, userdata, rc): - self.ws_connect() + try: + self.ws_connect() + except OSError: + pass + + if self.on_disconnect: + self.on_disconnect(client, userdata, rc) def __exit__(self, type, value, traceback): self.mqttc.disconnect()