From 7248a88fb2b4fbf313d8a7f409fda65ae7209a8e Mon Sep 17 00:00:00 2001 From: Mark Jessop Date: Wed, 24 Jul 2019 22:06:59 +0930 Subject: [PATCH] Started working on more robust ssdv upload class. --- rx/ssdvuploader.py | 294 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 rx/ssdvuploader.py diff --git a/rx/ssdvuploader.py b/rx/ssdvuploader.py new file mode 100644 index 0000000..3239d48 --- /dev/null +++ b/rx/ssdvuploader.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python +# +# Wenet SSDV Upload Class +# +# Copyright (C) 2019 Mark Jessop +# Released under GNU GPL v3 or later +# +# Somewhat more robust SSDV Uploader class, which can be instantiated from within +# another application and monitored. +# + +import requests +import datetime +import logging +import os +import glob +import sys +import time +import traceback +from base64 import b64encode +from threading import Thread, Lock +try: + # Python 2 + from Queue import Queue +except ImportError: + # Python 3 + from queue import Queue + + +class SSDVUploader(object): + """ + Queued SSDV Imagery Uploader Class + + Based on the Queued habitat uploader class from auto_rx. + + """ + + + SSDV_URL = "http://ssdv.habhub.org/api/v0/packets" + + def __init__(self, + uploader_callsign = "N0CALL", + enable_file_watch = True, + watch_directory = "./rx_images/", + file_mask = "*.bin", + queue_size = 8192, + upload_block_size = 256, + upload_timeout = 10, + upload_retries = 2, + upload_anyway = 10 + ): + """ + Initialise a SSDV Uploader Object + + Args: + + + upload_retries (int): How many times to retry an upload on a timeout before discarding. + + + """ + + self.uploader_callsign = uploader_callsign + self.upload_block_size = upload_block_size + self.upload_timeout = upload_timeout + self.upload_retries = upload_retries + self.upload_anyway = upload_anyway + + # Generate search mask. + self.search_mask = os.path.join(watch_directory, file_mask) + + # Set up Queue + self.upload_queue = Queue(queue_size) + + # Count of uploaded packets. + self.upload_count = 0 + # Count of discarded packets due to upload failures. + self.discard_count = 0 + + + # Start uploader and file watcher threads. + self.uploader_running = True + + self.uploader_thread = Thread(target=self.uploader_loop) + self.uploader_thread.start() + + if enable_file_watch: + self.file_watch_thread = Thread(target=self.file_watch_loop) + self.file_watch_thread.start() + + + + def ssdv_encode_packet(self, packet): + ''' Convert a packet to a suitable JSON blob. ''' + _packet_dict = { + "type" : "packet", + "packet" : b64encode(packet), # Note - b64encode accepts bytes objects under Python 3, and strings under Python 2. + "encoding": "base64", + # Because .isoformat() doesnt give us the right format... (boo) + "received": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), + "receiver": self.uploader_callsign, + } + + return _packet_dict + + + def ssdv_upload_single(self, packet): + _packet_dict = self.ssdv_encode_packet(packet,callsign) + + _attempts = 1 + while _attempts <= self.upload_retries: + try: + _r = requests.post(self.SSDV_URL, json=_packet_dict, timeout=self.upload_timeout) + return True + + except requests.exceptions.Timeout: + # Timeout! We can re-try. + _attempts += 1 + continue + + except Exception as e: + logging.error("Uploader - Error when uploading: %s" % str(e)) + break + + return False + + + def ssdv_upload_multiple(self, count): + # Sanity check that there are enough packet in the queue to upload. + if count > self.upload_queue.qsize(): + count = self.upload_queue.qsize() + + _encoded_array = [] + + for i in range(count): + _encoded_array.append(self.ssdv_encode_packet(self.upload_queue.get())) + + _packet_dict = { + "type": "packets", + "packets": _encoded_array + } + + _attempts = 1 + while _attempts <= self.upload_retries: + try: + _r = requests.post(self.SSDV_URL, json=_packet_dict, timeout=self.upload_timeout) + logging.debug("Uploader - Successfuly uploaded %d packets." % count) + return True + + except requests.exceptions.Timeout: + # Timeout! We can re-try. + _attempts += 1 + continue + + except Exception as e: + logging.error("Uploader - Error when uploading: %s" % str(e)) + break + + return False + + + def uploader_loop(self): + logging.info("Started uploader thread.") + + + _last_upload_time = time.time() + + while self.uploader_running: + + if self.upload_queue.qsize() >= self.upload_block_size: + + if self.ssdv_upload_multiple(self.upload_block_size): + # Upload successful! + self.upload_count += self.upload_block_size + else: + # The upload has failed, + self.discard_count += self.upload_block_size + + _last_upload_time = time.time() + + elif (self.upload_queue.qsize() > 0) and ( (time.time() - _last_upload_time) > self.upload_anyway): + # We have some packets in the queue, and haven't uploaded in a while. Upload them. + _packet_count = self.upload_queue.qsize() + + if self.ssdv_upload_multiple(_packet_count): + # Upload successful! + self.upload_count += _packet_count + else: + # The upload has failed, + self.discard_count += _packet_count + + _last_upload_time = time.time() + + time.sleep(1) + + + logging.info("Closed uploader thread.") + + + + def file_watch_loop(self): + logging.info("Started File Watcher Thread.") + + while self.uploader_running: + + # File watcher stuff here. + + time.sleep(1) + + + logging.info("Closed File Watch Thread.") + + + def get_queue_size(self): + """ Return the packets remaining in the queue """ + return self.upload_queue.qsize() + + + def get_upload_count(self): + """ Return the total number of packets uploaded so far """ + return self.upload_count + + + def add_packet(self, data): + """ Add a single packet to the uploader queue. + If the queue is full, the packet will be immediately discarded. + + Under Python 2, this function should be passed strings. + Under Python 3, it should be passed bytes objects. + """ + if len(data) == 256: + try: + self.upload_queue.put_nowait(data) + return True + except: + # Queue was full. + self.discard_count += 1 + if self.discard_count % 256 == 0: + logging.warning("Upload Queue Full - Packets are being dropped.") + return False + + + def add_file(self, filename): + """ Attempt to add a file to the upload queue """ + + _file_size = os.path.getsize(filename) + + if _file_size%256 != 0: + logging.error("%s size (%d) not a multiple of 256, likely not a SSDV file." % (filename, _file_size)) + return False + + _packet_count = _file_size / 256 + _packets_added = 0 + + _f = open(filename, 'rb') + + for _i in range(_packet_count): + _packet = _f.read(256) + + if self.add_packet(_packet): + _packets_added += 1 + + _f.close() + + logging.info("Added %d packets to queue." % _packets_added) + + + def close(self): + """ Stop uploader thread. """ + logging.info("Shutting down threads.") + self.uploader_running = False + + + + + +if __name__ == "__main__": + + logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.DEBUG) + + if len(sys.argv) == 1: + print("Usage: python ssdvuploader.py YOUR_CALLSIGN_HERE") + sys.exit(1) + else: + _callsign = sys.argv[1] + _uploader = SSDVUploader(uploader_callsign=_callsign) + + try: + while True: + time.sleep(5) + print("%d packets in uploader queue. %d packets uploaded." % (_uploader.get_queue_size(), _uploader.get_upload_count())) + except KeyboardInterrupt: + _uploader.close() +