Add basic rotator control

pyqt6
Mark Jessop 2022-03-06 15:47:08 +10:30
rodzic e100800f91
commit 252287f54d
5 zmienionych plików z 378 dodań i 3 usunięć

Wyświetl plik

@ -1 +1 @@
__version__ = "0.3.4"
__version__ = "0.3.5"

Wyświetl plik

@ -31,6 +31,9 @@ default_config = {
"horus_udp_port": 55672,
"ozimux_enabled": False,
"ozimux_udp_port": 55683,
"rotator_type": "rotctld",
"rotator_host": "localhost",
"rotator_port": 4533,
"payload_list": json.dumps(horusdemodlib.payloads.HORUS_PAYLOAD_LIST),
"custom_field_list": json.dumps({})
}
@ -77,7 +80,12 @@ def read_config(widgets):
write_config()
for _setting in default_config:
default_config[_setting] = qt_settings.value(_setting)
try:
_new_setting = qt_settings.value(_setting)
if _new_setting:
default_config[_setting] = _new_setting
except Exception as e:
logging.debug("Missing config setting: " + _setting)
if widgets:
# Habitat Settings
@ -107,6 +115,11 @@ def read_config(widgets):
# Populate the default settings.
populate_modem_settings(widgets)
# Rotator Settings
widgets["rotatorTypeSelector"].setCurrentText(default_config["rotator_type"])
widgets["rotatorHostEntry"].setText(str(default_config["rotator_host"]))
widgets["rotatorPortEntry"].setText(str(default_config["rotator_port"]))
if default_config['baud_rate'] != -1:
widgets["horusModemRateSelector"].setCurrentText(str(default_config['baud_rate']))
@ -145,6 +158,9 @@ def save_config(widgets):
default_config["audio_sample_rate"] = widgets["audioSampleRateSelector"].currentText()
default_config["modem"] = widgets["horusModemSelector"].currentText()
default_config["baud_rate"] = int(widgets["horusModemRateSelector"].currentText())
default_config["rotator_type"] = widgets["rotatorTypeSelector"].currentText()
default_config["rotator_host"] = widgets["rotatorHostEntry"].text()
default_config["rotator_port"] = int(widgets["rotatorPortEntry"].text())
default_config["payload_list"] = json.dumps(horusdemodlib.payloads.HORUS_PAYLOAD_LIST)
default_config["custom_field_list"] = json.dumps(horusdemodlib.payloads.HORUS_CUSTOM_FIELDS)

Wyświetl plik

@ -34,6 +34,7 @@ from .config import *
from .habitat import *
from .utils import position_info
from .icon import getHorusIcon
from .rotators import ROTCTLD, PSTRotator
from horusdemodlib.demod import HorusLib, Mode
from horusdemodlib.decoder import decode_packet, parse_ukhas_string
from horusdemodlib.payloads import *
@ -67,6 +68,13 @@ sondehub_uploader = None
decoder_init = False
# Rotator object
rotator = None
rotator_current_az = 0.0
rotator_current_el = 0.0
# Global running indicator
running = False
@ -106,6 +114,7 @@ d0 = Dock("Audio", size=(300, 50))
d0_modem = Dock("Modem", size=(300, 80))
d0_habitat = Dock("Habitat", size=(300, 200))
d0_other = Dock("Other", size=(300, 100))
d0_rotator = Dock("Rotator", size=(300, 100))
d1 = Dock("Spectrum", size=(800, 350))
d2_stats = Dock("SNR (dB)", size=(50, 300))
d2_snr = Dock("SNR Plot", size=(750, 300))
@ -118,6 +127,7 @@ area.addDock(d1, "right", d0)
area.addDock(d0_modem, "bottom", d0)
area.addDock(d0_habitat, "bottom", d0_modem)
area.addDock(d0_other, "below", d0_habitat)
area.addDock(d0_rotator, "below", d0_other)
area.addDock(d2_stats, "bottom", d1)
area.addDock(d3_data, "bottom", d2_stats)
area.addDock(d3_position, "bottom", d3_data)
@ -319,6 +329,66 @@ w1_other.layout.setRowStretch(7, 1)
d0_other.addWidget(w1_other)
w1_rotator = pg.LayoutWidget()
widgets["rotatorHeaderLabel"] = QtGui.QLabel("<b><u>Rotator Control</u></b>")
widgets["rotatorTypeLabel"] = QtGui.QLabel("<b>Rotator Type:</b>")
widgets["rotatorTypeSelector"] = QtGui.QComboBox()
widgets["rotatorTypeSelector"].addItem("rotctld")
widgets["rotatorTypeSelector"].addItem("PSTRotator")
widgets["rotatorHostLabel"] = QtGui.QLabel("<b>Rotator Hostname:</b>")
widgets["rotatorHostEntry"] = QtGui.QLineEdit("localhost")
widgets["rotatorHostEntry"].setToolTip(
"Hostname of the rotctld or PSTRotator Server.\n"\
)
widgets["rotatorPortLabel"] = QtGui.QLabel("<b>Rotator TCP/UDP Port:</b>")
widgets["rotatorPortEntry"] = QtGui.QLineEdit("4533")
widgets["rotatorPortEntry"].setMaxLength(5)
widgets["rotatorPortEntry"].setToolTip(
"TCP (rotctld) or UDP (PSTRotator) port to connect to.\n"\
"Default for rotctld: 4533\n"\
"Default for PSTRotator: 12000"
)
widgets["rotatorThresholdLabel"] = QtGui.QLabel("<b>Rotator Movement Threshold:</b>")
widgets["rotatorThresholdEntry"] = QtGui.QLineEdit("5.0")
widgets["rotatorThresholdEntry"].setToolTip(
"Only move if the angle between the payload position and \n"\
"the current rotator position is more than this, in degrees."
)
widgets["rotatorConnectButton"] = QtGui.QPushButton("Start")
widgets["rotatorCurrentStatusLabel"] = QtGui.QLabel("<b>Status:</b>")
widgets["rotatorCurrentStatusValue"] = QtGui.QLabel("Not Started.")
widgets["rotatorCurrentPositionLabel"] = QtGui.QLabel("<b>Commanded Az/El:</b>")
widgets["rotatorCurrentPositionValue"] = QtGui.QLabel("---˚, --˚")
w1_rotator.addWidget(widgets["rotatorHeaderLabel"], 0, 0, 1, 2)
w1_rotator.addWidget(widgets["rotatorTypeLabel"], 1, 0, 1, 1)
w1_rotator.addWidget(widgets["rotatorTypeSelector"], 1, 1, 1, 1)
w1_rotator.addWidget(widgets["rotatorHostLabel"], 2, 0, 1, 1)
w1_rotator.addWidget(widgets["rotatorHostEntry"], 2, 1, 1, 1)
w1_rotator.addWidget(widgets["rotatorPortLabel"], 3, 0, 1, 1)
w1_rotator.addWidget(widgets["rotatorPortEntry"], 3, 1, 1, 1)
#w1_rotator.addWidget(widgets["rotatorThresholdLabel"], 4, 0, 1, 1)
#w1_rotator.addWidget(widgets["rotatorThresholdEntry"], 4, 1, 1, 1)
w1_rotator.addWidget(widgets["rotatorConnectButton"], 4, 0, 1, 2)
w1_rotator.addWidget(widgets["rotatorCurrentStatusLabel"], 5, 0, 1, 1)
w1_rotator.addWidget(widgets["rotatorCurrentStatusValue"], 5, 1, 1, 1)
w1_rotator.addWidget(widgets["rotatorCurrentPositionLabel"], 6, 0, 1, 1)
w1_rotator.addWidget(widgets["rotatorCurrentPositionValue"], 6, 1, 1, 1)
w1_rotator.layout.setRowStretch(7, 1)
d0_rotator.addWidget(w1_rotator)
# Spectrum Display
widgets["spectrumPlot"] = pg.PlotWidget(title="Spectra")
widgets["spectrumPlot"].setLabel("left", "Power (dB)")
@ -833,6 +903,14 @@ def handle_new_packet(frame):
widgets['latestPacketBearingValue'].setText(f"{_position_info['bearing']:.1f}")
widgets['latestPacketElevationValue'].setText(f"{_position_info['elevation']:.1f}")
widgets['latestPacketRangeValue'].setText(f"{_position_info['straight_distance']/1000.0:.1f}")
if rotator:
try:
rotator.set_azel(_position_info['bearing'], _position_info['elevation'], check_response=False)
widgets["rotatorCurrentPositionValue"].setText(f"{_position_info['bearing']:3.1f}˚, {_position_info['elevation']:2.1f}˚")
except Exception as e:
logging.error("Rotator - Error setting Position: " + str(e))
except Exception as e:
logging.error(f"Could not calculate relative position to payload - {str(e)}")
@ -1046,6 +1124,70 @@ gui_update_timer.timeout.connect(processQueues)
gui_update_timer.start(100)
# Rotator Control
def startstop_rotator():
global rotator, widgets
if rotator is None:
# Start a rotator connection.
try:
_host = widgets["rotatorHostEntry"].text()
_port = int(widgets["rotatorPortEntry"].text())
_threshold = float(widgets["rotatorThresholdEntry"].text())
except:
widgets["rotatorCurrentStatusValue"].setText("Bad Host/Port")
return
if widgets["rotatorTypeSelector"].currentText() == "rotctld":
try:
rotator = ROTCTLD(hostname=_host, port=_port, threshold=_threshold)
rotator.connect()
except Exception as e:
logging.error("Rotctld Connect Error: " + str(e))
rotator = None
return
elif widgets["rotatorTypeSelector"].currentText() == "PSTRotator":
rotator = PSTRotator(hostname=_host, port=_port, threshold=_threshold)
else:
return
widgets["rotatorCurrentStatusValue"].setText("Connected")
widgets["rotatorConnectButton"].setText("Stop")
else:
# Stop the rotator
rotator.close()
rotator = None
widgets["rotatorConnectButton"].setText("Start")
widgets["rotatorCurrentStatusValue"].setText("Not Connected")
widgets["rotatorCurrentPositionValue"].setText(f"---˚, --˚")
widgets["rotatorConnectButton"].clicked.connect(startstop_rotator)
# def poll_rotator():
# global rotator, widgets, rotator_current_az, rotator_current_el
# if rotator:
# _az, _el = rotator.get_azel()
# if _az != None:
# rotator_current_az = _az
# if _el != None:
# rotator_current_el = _el
# widgets["rotatorCurrentPositionValue"].setText(f"{rotator_current_az:3.1f}˚, {rotator_current_el:2.1f}˚")
# rotator_poll_timer = QtCore.QTimer()
# rotator_poll_timer.timeout.connect(poll_rotator)
# rotator_poll_timer.start(2000)
class ConsoleHandler(logging.Handler):
""" Logging handler to write to the GUI console """
@ -1099,6 +1241,12 @@ def main():
except:
pass
if rotator:
try:
rotator.close()
except:
pass
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,211 @@
#!/usr/bin/env python
#
# Project Horus - Rotator Abstraction Layers
#
# Copyright (C) 2022 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
import socket
import time
import logging
import traceback
from threading import Thread
class ROTCTLD(object):
""" rotctld (hamlib) communication class """
# Note: This is a massive hack.
def __init__(self, hostname, port=4533, poll_rate=5, timeout=10, az_180 = False, threshold=5.0):
""" Open a connection to rotctld, and test it for validity """
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(timeout)
self.hostname = hostname
self.port = port
def connect(self):
""" Connect to rotctld instance """
self.sock.connect((self.hostname,self.port))
model = self.get_model()
print(model)
if model == None:
# Timeout!
self.close()
raise Exception("Timeout!")
else:
return model
def close(self):
self.sock.close()
def send_command(self, command, check_response = False):
""" Send a command to the connected rotctld instance,
and return the return value.
"""
command = command.encode('ascii')
self.sock.sendall(command+b'\n')
if check_response:
try:
return self.sock.recv(1024).decode('ascii')
except:
return None
else:
return "RPRT 0"
def get_model(self):
""" Get the rotator model from rotctld """
model = self.send_command('_')
return model
def set_azel(self,azimuth,elevation, check_response=False):
""" Command rotator to a particular azimuth/elevation """
# Sanity check inputs.
if elevation > 90.0:
elevation = 90.0
elif elevation < 0.0:
elevation = 0.0
if azimuth > 360.0:
azimuth = azimuth % 360.0
command = "P %3.1f %2.1f" % (azimuth,elevation)
response = self.send_command(command, check_response=check_response)
if "RPRT 0" in response:
return True
else:
return False
def get_azel(self):
""" Poll rotctld for azimuth and elevation """
# Send poll command and read in response.
response = self.send_command('p')
# Attempt to split response by \n (az and el are on separate lines)
try:
response_split = response.split('\n')
_current_azimuth = float(response_split[0])
_current_elevation = float(response_split[1])
return (_current_azimuth, _current_elevation)
except:
logging.error("Could not parse position: %s" % response)
return (None,None)
class PSTRotator(object):
""" PSTRotator communication class """
# Local store of current azimuth/elevation
current_azimuth = None
current_elevation = None
azel_thread_running = False
last_poll_time = 0
def __init__(self, hostname='localhost', port=12000, poll_rate=1, threshold=5.0):
""" Start a PSTRotator connection instance """
self.hostname = hostname
self.port = port
self.poll_rate = poll_rate
self.azel_thread_running = True
self.t_rx = Thread(target=self.azel_rx_loop)
self.t_rx.start()
self.t_poll = Thread(target=self.azel_poll_loop)
self.t_poll.start()
def close(self):
self.azel_thread_running = False
def set_azel(self,azimuth,elevation, check_response=False):
""" Send an Azimuth/Elevation move command to PSTRotator """
# Sanity check inputs.
if elevation > 90.0:
elevation = 90.0
elif elevation < 0.0:
elevation = 0.0
if azimuth > 360.0:
azimuth = azimuth % 360.0
# Generate command
pst_command = "<PST><TRACK>0</TRACK><AZIMUTH>%.1f</AZIMUTH><ELEVATION>%.1f</ELEVATION></PST>" % (azimuth,elevation)
logging.debug("Sent command: %s" % pst_command)
# Send!
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.sendto(pst_command.encode('ascii'), (self.hostname,self.port))
udp_socket.close()
return True
def poll_azel(self):
""" Poll PSTRotator for an Azimuth/Elevation Update """
az_poll_command = "<PST>AZ?</PST>"
el_poll_command = "<PST>EL?</PST>"
try:
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.sendto(az_poll_command.encode('ascii'), (self.hostname,self.port))
time.sleep(0.2)
udp_socket.sendto(el_poll_command.encode('ascii'), (self.hostname,self.port))
udp_socket.close()
except:
pass
def azel_poll_loop(self):
while self.azel_thread_running:
self.poll_azel()
logging.debug("Poll sent to PSTRotator.")
time.sleep(self.poll_rate)
def azel_rx_loop(self):
""" Listen for Azimuth and Elevation reports from PSTRotator"""
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.settimeout(1)
s.bind(('',(self.port+1)))
logging.debug("Started PST Rotator Listener Thread.")
while self.azel_thread_running:
try:
m = s.recvfrom(512)
except socket.timeout:
m = None
if m != None:
# Attempt to parse Azimuth / Elevation
logging.debug("Received: %s" % m[0])
data = m[0].decode('ascii')
if data[:2] == 'EL':
self.current_elevation = float(data[3:])
elif data[:2] == 'AZ':
self.current_azimuth = float(data[3:])
logging.debug("Closing UDP Listener")
s.close()
def get_azel(self):
return (self.current_azimuth, self.current_elevation)
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG)
# rot = PSTRotator()
# time.sleep(10)
# rot.set_azel(45.0,45.0)
# time.sleep(10)
# rot.close()
rot = ROTCTLD(hostname="10.0.0.215")
rot.connect()
time.sleep(10)
rot.set_azel(45.0,45.0)
time.sleep(10)
rot.set_azel(0.0,0.0)
rot.close()

Wyświetl plik

@ -1,6 +1,6 @@
[tool.poetry]
name = "horusgui"
version = "0.3.1"
version = "0.3.5"
description = ""
authors = ["Mark Jessop <vk5qi@rfhead.net>"]