From 008160660a84ffa210ae471510002d3e752051f8 Mon Sep 17 00:00:00 2001 From: Mark Jessop Date: Tue, 29 Dec 2020 12:26:28 +1030 Subject: [PATCH 1/2] Habitat uploader - add ability to flush queue at input and output stages if queue is full. --- auto_rx/autorx/habitat.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/auto_rx/autorx/habitat.py b/auto_rx/autorx/habitat.py index 5c96c0b..4b91eb5 100644 --- a/auto_rx/autorx/habitat.py +++ b/auto_rx/autorx/habitat.py @@ -573,12 +573,12 @@ class HabitatUploader(object): _req = requests.put( _url, data=json.dumps(_data), - timeout=self.upload_timeout, + timeout=(self.upload_timeout, 6.1), headers=headers, ) except Exception as e: self.log_error("Upload Failed: %s" % str(e)) - break + return if _req.status_code == 201 or _req.status_code == 403: # 201 = Success, 403 = Success, sentence has already seen by others. @@ -620,7 +620,7 @@ class HabitatUploader(object): sentence = self.habitat_upload_queue.get() self.log_warning( - "Uploader queue was full - possible connectivity issue." + "Upload queue was full when reading from queue, now flushed - possible connectivity issue." ) else: # Otherwise, get the first item in the queue. @@ -684,9 +684,17 @@ class HabitatUploader(object): else: # Attept to add it to the habitat uploader queue. try: + if self.habitat_upload_queue.qsize() == self.upload_queue_size: + # Flush queue. + while self.habitat_upload_queue.qsize() > 0: + self.habitat_upload_queue.get(); + + self.log_error("Upload queue was full when adding to queue, now flushed - possible connectivity issue.") + self.habitat_upload_queue.put_nowait(_sentence) except Exception as e: - self.log_error("Error adding sentence to queue: %s" % str(e)) + self.log_error("Error adding sentence to queue, queue likely full. %s" % str(e)) + self.log_error("Queue Size: %d" % self.habitat_upload_queue.qsize()) self.upload_lock.release() From 7db85cb2a933d76762453719e6dcf383406b423b Mon Sep 17 00:00:00 2001 From: Mark Jessop Date: Tue, 29 Dec 2020 18:14:28 +1030 Subject: [PATCH 2/2] Improvements to queue flushing --- auto_rx/autorx/habitat.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/auto_rx/autorx/habitat.py b/auto_rx/autorx/habitat.py index 4b91eb5..a2b15ae 100644 --- a/auto_rx/autorx/habitat.py +++ b/auto_rx/autorx/habitat.py @@ -617,7 +617,10 @@ class HabitatUploader(object): # If the queue is completely full, jump to the most recent telemetry sentence. if self.habitat_upload_queue.qsize() == self.upload_queue_size: while not self.habitat_upload_queue.empty(): - sentence = self.habitat_upload_queue.get() + try: + sentence = self.habitat_upload_queue.get_nowait() + except: + pass self.log_warning( "Upload queue was full when reading from queue, now flushed - possible connectivity issue." @@ -627,8 +630,9 @@ class HabitatUploader(object): sentence = self.habitat_upload_queue.get() # Attempt to upload it. - self.habitat_upload(sentence) - + if sentence: + self.habitat_upload(sentence) + else: # Wait for a short time before checking the queue again. time.sleep(0.1) @@ -686,12 +690,16 @@ class HabitatUploader(object): try: if self.habitat_upload_queue.qsize() == self.upload_queue_size: # Flush queue. - while self.habitat_upload_queue.qsize() > 0: - self.habitat_upload_queue.get(); + while not self.habitat_upload_queue.empty(): + try: + self.habitat_upload_queue.get_nowait() + except: + pass self.log_error("Upload queue was full when adding to queue, now flushed - possible connectivity issue.") self.habitat_upload_queue.put_nowait(_sentence) + self.log_debug("Upload queue size: %d" % self.habitat_upload_queue.qsize()) except Exception as e: self.log_error("Error adding sentence to queue, queue likely full. %s" % str(e)) self.log_error("Queue Size: %d" % self.habitat_upload_queue.qsize())