2019-07-24 12:36:59 +00:00
|
|
|
#!/usr/bin/env python
|
|
|
|
#
|
2019-07-26 13:02:14 +00:00
|
|
|
# Wenet SSDV Upload Class
|
2019-07-24 12:36:59 +00:00
|
|
|
#
|
2019-07-26 13:02:14 +00:00
|
|
|
# Copyright (C) 2019 Mark Jessop <vk5qi@rfhead.net>
|
|
|
|
# Released under GNU GPL v3 or later
|
2019-07-24 12:36:59 +00:00
|
|
|
#
|
2019-07-26 13:02:14 +00:00
|
|
|
# Somewhat more robust SSDV Uploader class, which can be instantiated from within
|
|
|
|
# another application and monitored.
|
2019-07-27 05:47:12 +00:00
|
|
|
# Compatible with both Python 2 and Python 3
|
2019-07-24 12:36:59 +00:00
|
|
|
#
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
import argparse
|
2019-07-24 12:36:59 +00:00
|
|
|
import datetime
|
|
|
|
import logging
|
2019-07-26 14:00:02 +00:00
|
|
|
import json
|
2019-07-24 12:36:59 +00:00
|
|
|
import os
|
|
|
|
import glob
|
2019-07-26 13:02:14 +00:00
|
|
|
import requests
|
2019-07-26 14:00:02 +00:00
|
|
|
import socket
|
2019-07-24 12:36:59 +00:00
|
|
|
import sys
|
|
|
|
import time
|
|
|
|
import traceback
|
|
|
|
from base64 import b64encode
|
|
|
|
from threading import Thread, Lock
|
|
|
|
try:
|
|
|
|
# Python 2
|
|
|
|
from Queue import Queue
|
|
|
|
except ImportError:
|
|
|
|
# Python 3
|
|
|
|
from queue import Queue
|
|
|
|
|
2019-07-28 07:50:40 +00:00
|
|
|
from WenetPackets import WENET_IMAGE_UDP_PORT
|
2019-07-26 14:00:02 +00:00
|
|
|
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
class SSDVUploader(object):
|
2019-07-26 13:02:14 +00:00
|
|
|
"""
|
|
|
|
Queued SSDV Imagery Uploader Class
|
|
|
|
|
|
|
|
Based on the Queued habitat uploader class from auto_rx.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
SSDV_URL = "http://ssdv.habhub.org/api/v0/packets"
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
uploader_callsign = "N0CALL",
|
|
|
|
enable_file_watch = True,
|
|
|
|
watch_directory = "./rx_images/",
|
|
|
|
file_mask = "*.bin",
|
|
|
|
watch_time = 5,
|
|
|
|
queue_size = 8192,
|
|
|
|
upload_block_size = 256,
|
|
|
|
upload_timeout = 20,
|
|
|
|
upload_retries = 3,
|
|
|
|
upload_anyway = 10
|
|
|
|
):
|
|
|
|
"""
|
|
|
|
Initialise a SSDV Uploader Object
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
|
|
|
|
|
|
upload_retries (int): How many times to retry an upload on a timeout before discarding.
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
self.uploader_callsign = uploader_callsign
|
|
|
|
self.upload_block_size = upload_block_size
|
|
|
|
self.upload_timeout = upload_timeout
|
|
|
|
self.upload_retries = upload_retries
|
|
|
|
self.upload_anyway = upload_anyway
|
|
|
|
self.watch_time = watch_time
|
|
|
|
|
|
|
|
# Generate search mask.
|
|
|
|
self.search_mask = os.path.join(watch_directory, file_mask)
|
|
|
|
|
|
|
|
# Set up Queue
|
|
|
|
self.upload_queue = Queue(queue_size)
|
|
|
|
|
|
|
|
# Count of uploaded packets.
|
|
|
|
self.upload_count = 0
|
|
|
|
# Count of discarded packets due to upload failures.
|
|
|
|
self.discard_count = 0
|
|
|
|
|
|
|
|
|
|
|
|
# Start uploader and file watcher threads.
|
|
|
|
self.uploader_running = True
|
|
|
|
|
|
|
|
self.uploader_thread = Thread(target=self.uploader_loop)
|
|
|
|
self.uploader_thread.start()
|
|
|
|
|
|
|
|
if enable_file_watch:
|
|
|
|
self.file_watch_thread = Thread(target=self.file_watch_loop)
|
|
|
|
self.file_watch_thread.start()
|
|
|
|
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def ssdv_encode_packet(self, packet):
|
|
|
|
''' Convert a packet to a suitable JSON blob. '''
|
|
|
|
_packet_dict = {
|
|
|
|
"type" : "packet",
|
|
|
|
"packet" : b64encode(packet).decode('ascii'), # Note - b64encode accepts bytes objects under Python 3, and strings under Python 2.
|
|
|
|
"encoding": "base64",
|
|
|
|
# Because .isoformat() doesnt give us the right format... (boo)
|
|
|
|
"received": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
|
|
"receiver": self.uploader_callsign,
|
|
|
|
}
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
return _packet_dict
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def ssdv_upload_single(self, packet):
|
2024-10-04 03:52:04 +00:00
|
|
|
_packet_dict = self.ssdv_encode_packet(packet)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_attempts = 1
|
|
|
|
while _attempts <= self.upload_retries:
|
|
|
|
try:
|
|
|
|
_r = requests.post(self.SSDV_URL, json=_packet_dict, timeout=self.upload_timeout)
|
|
|
|
return True
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
except requests.exceptions.Timeout:
|
|
|
|
# Timeout! We can re-try.
|
|
|
|
_attempts += 1
|
|
|
|
continue
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
except Exception as e:
|
|
|
|
logging.error("Uploader - Error when uploading: %s" % str(e))
|
|
|
|
break
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
return False
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def ssdv_upload_multiple(self, count):
|
|
|
|
# Sanity check that there are enough packet in the queue to upload.
|
|
|
|
if count > self.upload_queue.qsize():
|
|
|
|
count = self.upload_queue.qsize()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_encoded_array = []
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
for i in range(count):
|
|
|
|
_encoded_array.append(self.ssdv_encode_packet(self.upload_queue.get()))
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_packet_dict = {
|
|
|
|
"type": "packets",
|
|
|
|
"packets": _encoded_array
|
|
|
|
}
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_attempts = 1
|
|
|
|
while _attempts <= self.upload_retries:
|
|
|
|
try:
|
|
|
|
_r = requests.post(self.SSDV_URL, json=_packet_dict, timeout=self.upload_timeout)
|
|
|
|
logging.debug("Uploader - Successfuly uploaded %d packets." % count)
|
|
|
|
return True
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
except requests.exceptions.Timeout:
|
|
|
|
# Timeout! We can re-try.
|
|
|
|
_attempts += 1
|
|
|
|
logging.debug("Uploader - Upload Timeout (attempt %d/%d)." % (_attempts, self.upload_retries))
|
|
|
|
continue
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
except Exception as e:
|
|
|
|
logging.error("Uploader - Error when uploading: %s" % str(e))
|
|
|
|
return False
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
logging.error("Uploader - Upload timed out after %d attempts." % _attempts)
|
|
|
|
return False
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def uploader_loop(self):
|
|
|
|
logging.info("Uploader - Started uploader thread.")
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_last_upload_time = time.time()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
while self.uploader_running:
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
if self.upload_queue.qsize() >= self.upload_block_size:
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
if self.ssdv_upload_multiple(self.upload_block_size):
|
|
|
|
# Upload successful!
|
|
|
|
self.upload_count += self.upload_block_size
|
|
|
|
else:
|
|
|
|
# The upload has failed,
|
|
|
|
self.discard_count += self.upload_block_size
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_last_upload_time = time.time()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
elif (self.upload_queue.qsize() > 0) and ( (time.time() - _last_upload_time) > self.upload_anyway):
|
|
|
|
# We have some packets in the queue, and haven't uploaded in a while. Upload them.
|
|
|
|
_packet_count = self.upload_queue.qsize()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
if self.ssdv_upload_multiple(_packet_count):
|
|
|
|
# Upload successful!
|
|
|
|
self.upload_count += _packet_count
|
|
|
|
else:
|
|
|
|
# The upload has failed,
|
|
|
|
self.discard_count += _packet_count
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_last_upload_time = time.time()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
time.sleep(1)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
logging.info("Uploader - Closed uploader thread.")
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def file_watch_loop(self):
|
|
|
|
logging.info("Directory Watcher - Started Directory Watcher Thread.")
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_rx_images = glob.glob(self.search_mask)
|
|
|
|
_rx_images.sort()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
while self.uploader_running:
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
# Wait a few seconds before checking for new files.
|
|
|
|
time.sleep(self.watch_time)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
# Check for new files.
|
|
|
|
_folder_check = glob.glob(self.search_mask)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
if len(_folder_check) == 0:
|
|
|
|
# No files in directory, continue.
|
|
|
|
continue
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
# Sort list. Image filenames are timestamps, so the last element in the array will be the latest image.
|
|
|
|
_folder_check.sort()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
# Determine which images are new
|
|
|
|
_folder_check = set(_folder_check)
|
|
|
|
_new_images = [x for x in _folder_check if x not in _rx_images]
|
|
|
|
_new_images.sort()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
for _image in _new_images:
|
|
|
|
# New file! Wait a short amount of time in case the file is still being written out.
|
|
|
|
time.sleep(0.5)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
# Add it to the queue!
|
|
|
|
try:
|
|
|
|
self.add_file(_image)
|
|
|
|
except Exception as e:
|
|
|
|
logging.error("Directory Watcher - Error when adding image: %s" % str(e))
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
# Update the list of uploaded images
|
|
|
|
_rx_images.append(_image)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
time.sleep(1)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
logging.info("Directory Watcher - Closed Directory Watch Thread.")
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def get_queue_size(self):
|
|
|
|
""" Return the packets remaining in the queue """
|
|
|
|
return self.upload_queue.qsize()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def get_upload_count(self):
|
|
|
|
""" Return the total number of packets uploaded so far """
|
|
|
|
return self.upload_count
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def get_discard_count(self):
|
|
|
|
""" Return the total number of packets uploaded so far """
|
|
|
|
return self.discard_count
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def add_packet(self, data):
|
|
|
|
""" Add a single packet to the uploader queue.
|
|
|
|
If the queue is full, the packet will be immediately discarded.
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
Under Python 2, this function should be passed strings.
|
|
|
|
Under Python 3, it should be passed bytes objects.
|
|
|
|
"""
|
|
|
|
if len(data) == 256:
|
|
|
|
try:
|
|
|
|
self.upload_queue.put_nowait(data)
|
|
|
|
return True
|
|
|
|
except:
|
|
|
|
# Queue was full.
|
|
|
|
self.discard_count += 1
|
|
|
|
if self.discard_count % 256 == 0:
|
|
|
|
logging.warning("Upload Queue Full - Packets are being dropped.")
|
|
|
|
return False
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def add_file(self, filename):
|
|
|
|
""" Attempt to add a file to the upload queue """
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_file_size = os.path.getsize(filename)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
if _file_size%256 != 0:
|
|
|
|
logging.error("Directory Watcher - %s size (%d) not a multiple of 256, likely not a SSDV file." % (filename, _file_size))
|
|
|
|
return False
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_packet_count = _file_size // 256
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
logging.info("Directory Watcher - New file %s contains %d packets." % (filename, _packet_count))
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_packets_added = 0
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_f = open(filename, 'rb')
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
for _i in range(_packet_count):
|
|
|
|
_packet = _f.read(256)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
if self.add_packet(_packet):
|
|
|
|
_packets_added += 1
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_f.close()
|
2019-07-24 12:36:59 +00:00
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
logging.info("Directory Watcher - Added %d packets to queue." % _packets_added)
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
def close(self):
|
|
|
|
""" Stop uploader thread. """
|
|
|
|
logging.info("Shutting down threads.")
|
|
|
|
self.uploader_running = False
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
2019-07-26 14:00:02 +00:00
|
|
|
def telemetry_gui_update(queued, uploaded, discarded):
|
2019-07-26 13:02:14 +00:00
|
|
|
""" Update the SSDV Receiver GUI with information on how many packets have been uploader """
|
2019-07-26 14:00:02 +00:00
|
|
|
message = {'uploader_status': 'running',
|
|
|
|
'queued': queued,
|
|
|
|
'uploaded': uploaded,
|
|
|
|
'discarded': discarded
|
|
|
|
}
|
|
|
|
|
|
|
|
try:
|
|
|
|
gui_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
gui_socket.sendto(json.dumps(message).encode('ascii'), ("127.0.0.1", WENET_IMAGE_UDP_PORT))
|
|
|
|
gui_socket.close()
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
logging.error("Error updating GUI with uploader status: %s" % str(e))
|
2019-07-24 12:36:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
# Command line arguments.
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("callsign", default="WENETRX", help="User Callsign")
|
|
|
|
parser.add_argument("--watch_dir", default="./rx_images/", help="Directory to watch for new files. (Default: ./rx_images/")
|
|
|
|
parser.add_argument("--file_mask", default="*.bin", help="File mask to watch (Defaut: *.bin)")
|
|
|
|
parser.add_argument("--queue_size", default=8192, type=int, help="Uploader queue size (Default: 8192 packets = ~2MiB)")
|
|
|
|
parser.add_argument("--upload_block_size", default=256, type=int, help="Upload block size (Default: 256 packets uploaded at a time.)")
|
2024-11-02 01:22:11 +00:00
|
|
|
parser.add_argument("--image_port", type=int, default=None, help="UDP port used for communication between Wenet decoder processes. Default: 7890")
|
2019-07-26 13:02:14 +00:00
|
|
|
parser.add_argument("-v", "--verbose", action='store_true', default=False, help="Verbose logging output.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
if args.verbose:
|
|
|
|
log_level = logging.DEBUG
|
|
|
|
else:
|
|
|
|
log_level = logging.INFO
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=log_level)
|
|
|
|
logging.getLogger("requests").setLevel(logging.CRITICAL)
|
|
|
|
logging.getLogger("urllib3").setLevel(logging.CRITICAL)
|
|
|
|
|
|
|
|
|
2024-11-02 01:22:11 +00:00
|
|
|
# Overwrite the image UDP port if it has been provided
|
|
|
|
if args.image_port:
|
|
|
|
WENET_IMAGE_UDP_PORT = args.image_port
|
|
|
|
|
2019-07-26 13:02:14 +00:00
|
|
|
_uploader = SSDVUploader(
|
|
|
|
uploader_callsign = args.callsign,
|
|
|
|
watch_directory = args.watch_dir,
|
|
|
|
file_mask = args.file_mask,
|
|
|
|
queue_size = args.queue_size,
|
|
|
|
upload_block_size = args.upload_block_size)
|
|
|
|
|
|
|
|
try:
|
|
|
|
while True:
|
2019-07-26 14:00:02 +00:00
|
|
|
time.sleep(5)
|
|
|
|
logging.debug("%d packets in uploader queue, %d packets uploaded, %d packets discarded." % (_uploader.get_queue_size(), _uploader.get_upload_count(), _uploader.get_discard_count()))
|
|
|
|
telemetry_gui_update(_uploader.get_queue_size(), _uploader.get_upload_count(), _uploader.get_discard_count())
|
2019-07-26 13:02:14 +00:00
|
|
|
except KeyboardInterrupt:
|
|
|
|
_uploader.close()
|
2019-07-24 12:36:59 +00:00
|
|
|
|