Stream(): fix "going deaf pn reconnect" issue by subscribing on connect

add on_log, on_disconnect callbacks. asJson flag: pass raw message instead of dict if False
pull/2/head
Michael Haberler 2021-04-02 15:15:57 +02:00
rodzic 81d0a6f6c6
commit ff447ac37a
1 zmienionych plików z 31 dodań i 6 usunięć

Wyświetl plik

@ -15,12 +15,21 @@ S3_BUCKET = "sondehub-open-data"
class Stream:
def __init__(self, sondes: list = ["#"], on_connect=None, on_message=None):
def __init__(self,
sondes: list = ["#"],
on_connect=None,
on_message=None,
on_log=None,
on_disconnect=None, asJson=True):
self.mqttc = mqtt.Client(transport="websockets")
self._sondes = sondes
self.ws_connect()
self.asJson = asJson
self.on_connect = on_connect
self.on_message = on_message
self.on_disconnect = on_disconnect
self.on_log = on_log
self.ws_connect()
def add_sonde(self, sonde):
if sonde not in self._sondes:
@ -53,7 +62,10 @@ class Stream:
self.mqttc.tls_set()
except ValueError:
pass
self.mqttc.connect(urlparts.netloc, 443, 60)
try:
self.mqttc.connect(urlparts.netloc, 443, 60)
except OSError:
pass
self.mqttc.loop_start()
def get_url(self):
@ -65,7 +77,10 @@ class Stream:
def _on_message(self, mqttc, obj, msg):
if self.on_message:
self.on_message(json.loads(msg.payload))
if self.asJson:
self.on_message(json.loads(msg.payload))
else:
self.on_message(msg.payload)
def _on_connect(self, mqttc, obj, flags, rc):
for sonde in self._sondes:
@ -73,10 +88,20 @@ class Stream:
if mqtt.MQTT_ERR_SUCCESS != rc:
self.ws_connect()
if self.on_connect:
self.on_connect()
self.on_connect(mqttc, obj, flags, rc)
def _on_log(self, *args, **kwargs):
if self.on_log:
self.on_log(*args, **kwargs)
def _on_disconnect(self, client, userdata, rc):
self.ws_connect()
try:
self.ws_connect()
except OSError:
pass
if self.on_disconnect:
self.on_disconnect(client, userdata, rc)
def __exit__(self, type, value, traceback):
self.mqttc.disconnect()