kopia lustrzana https://github.com/projecthorus/horus-gui
211 wiersze
6.3 KiB
Python
211 wiersze
6.3 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Project Horus - Rotator Abstraction Layers
|
|
#
|
|
# Copyright (C) 2022 Mark Jessop <vk5qi@rfhead.net>
|
|
# Released under GNU GPL v3 or later
|
|
#
|
|
import socket
|
|
import time
|
|
import logging
|
|
import traceback
|
|
from threading import Thread
|
|
|
|
class ROTCTLD(object):
|
|
""" rotctld (hamlib) communication class """
|
|
# Note: This is a massive hack.
|
|
|
|
def __init__(self, hostname, port=4533, poll_rate=5, timeout=10, az_180 = False, threshold=5.0):
|
|
""" Open a connection to rotctld, and test it for validity """
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.settimeout(timeout)
|
|
|
|
self.hostname = hostname
|
|
self.port = port
|
|
|
|
|
|
def connect(self):
|
|
""" Connect to rotctld instance """
|
|
self.sock.connect((self.hostname,self.port))
|
|
model = self.get_model()
|
|
print(model)
|
|
if model == None:
|
|
# Timeout!
|
|
self.close()
|
|
raise Exception("Timeout!")
|
|
else:
|
|
return model
|
|
|
|
def close(self):
|
|
self.sock.close()
|
|
|
|
def send_command(self, command, check_response = False):
|
|
""" Send a command to the connected rotctld instance,
|
|
and return the return value.
|
|
"""
|
|
command = command.encode('ascii')
|
|
self.sock.sendall(command+b'\n')
|
|
|
|
if check_response:
|
|
try:
|
|
return self.sock.recv(1024).decode('ascii')
|
|
except:
|
|
return None
|
|
else:
|
|
return "RPRT 0"
|
|
|
|
def get_model(self):
|
|
""" Get the rotator model from rotctld """
|
|
model = self.send_command('_')
|
|
return model
|
|
|
|
def set_azel(self,azimuth,elevation, check_response=False):
|
|
""" Command rotator to a particular azimuth/elevation """
|
|
# Sanity check inputs.
|
|
if elevation > 90.0:
|
|
elevation = 90.0
|
|
elif elevation < 0.0:
|
|
elevation = 0.0
|
|
|
|
if azimuth > 360.0:
|
|
azimuth = azimuth % 360.0
|
|
|
|
|
|
command = "P %3.1f %2.1f" % (azimuth,elevation)
|
|
response = self.send_command(command, check_response=check_response)
|
|
if "RPRT 0" in response:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def get_azel(self):
|
|
""" Poll rotctld for azimuth and elevation """
|
|
# Send poll command and read in response.
|
|
response = self.send_command('p')
|
|
|
|
# Attempt to split response by \n (az and el are on separate lines)
|
|
try:
|
|
response_split = response.split('\n')
|
|
_current_azimuth = float(response_split[0])
|
|
_current_elevation = float(response_split[1])
|
|
return (_current_azimuth, _current_elevation)
|
|
except:
|
|
logging.error("Could not parse position: %s" % response)
|
|
return (None,None)
|
|
|
|
|
|
|
|
class PSTRotator(object):
|
|
""" PSTRotator communication class """
|
|
|
|
# Local store of current azimuth/elevation
|
|
current_azimuth = None
|
|
current_elevation = None
|
|
|
|
azel_thread_running = False
|
|
last_poll_time = 0
|
|
|
|
def __init__(self, hostname='localhost', port=12000, poll_rate=1, threshold=5.0):
|
|
""" Start a PSTRotator connection instance """
|
|
self.hostname = hostname
|
|
self.port = port
|
|
self.poll_rate = poll_rate
|
|
self.azel_thread_running = True
|
|
|
|
self.t_rx = Thread(target=self.azel_rx_loop)
|
|
self.t_rx.start()
|
|
|
|
self.t_poll = Thread(target=self.azel_poll_loop)
|
|
self.t_poll.start()
|
|
|
|
|
|
def close(self):
|
|
self.azel_thread_running = False
|
|
|
|
def set_azel(self,azimuth,elevation, check_response=False):
|
|
""" Send an Azimuth/Elevation move command to PSTRotator """
|
|
|
|
# Sanity check inputs.
|
|
if elevation > 90.0:
|
|
elevation = 90.0
|
|
elif elevation < 0.0:
|
|
elevation = 0.0
|
|
|
|
if azimuth > 360.0:
|
|
azimuth = azimuth % 360.0
|
|
|
|
# Generate command
|
|
pst_command = "<PST><TRACK>0</TRACK><AZIMUTH>%.1f</AZIMUTH><ELEVATION>%.1f</ELEVATION></PST>" % (azimuth,elevation)
|
|
logging.debug("Sent command: %s" % pst_command)
|
|
# Send!
|
|
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
udp_socket.sendto(pst_command.encode('ascii'), (self.hostname,self.port))
|
|
udp_socket.close()
|
|
|
|
return True
|
|
|
|
def poll_azel(self):
|
|
""" Poll PSTRotator for an Azimuth/Elevation Update """
|
|
az_poll_command = "<PST>AZ?</PST>"
|
|
el_poll_command = "<PST>EL?</PST>"
|
|
try:
|
|
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
udp_socket.sendto(az_poll_command.encode('ascii'), (self.hostname,self.port))
|
|
time.sleep(0.2)
|
|
udp_socket.sendto(el_poll_command.encode('ascii'), (self.hostname,self.port))
|
|
udp_socket.close()
|
|
except:
|
|
pass
|
|
|
|
def azel_poll_loop(self):
|
|
while self.azel_thread_running:
|
|
self.poll_azel()
|
|
logging.debug("Poll sent to PSTRotator.")
|
|
time.sleep(self.poll_rate)
|
|
|
|
def azel_rx_loop(self):
|
|
""" Listen for Azimuth and Elevation reports from PSTRotator"""
|
|
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
|
|
s.settimeout(1)
|
|
s.bind(('',(self.port+1)))
|
|
logging.debug("Started PST Rotator Listener Thread.")
|
|
while self.azel_thread_running:
|
|
try:
|
|
m = s.recvfrom(512)
|
|
except socket.timeout:
|
|
m = None
|
|
|
|
if m != None:
|
|
# Attempt to parse Azimuth / Elevation
|
|
logging.debug("Received: %s" % m[0])
|
|
|
|
data = m[0].decode('ascii')
|
|
if data[:2] == 'EL':
|
|
self.current_elevation = float(data[3:])
|
|
elif data[:2] == 'AZ':
|
|
self.current_azimuth = float(data[3:])
|
|
|
|
|
|
logging.debug("Closing UDP Listener")
|
|
s.close()
|
|
|
|
|
|
def get_azel(self):
|
|
return (self.current_azimuth, self.current_elevation)
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG)
|
|
|
|
# rot = PSTRotator()
|
|
# time.sleep(10)
|
|
# rot.set_azel(45.0,45.0)
|
|
# time.sleep(10)
|
|
# rot.close()
|
|
|
|
rot = ROTCTLD(hostname="10.0.0.215")
|
|
rot.connect()
|
|
time.sleep(10)
|
|
rot.set_azel(45.0,45.0)
|
|
time.sleep(10)
|
|
rot.set_azel(0.0,0.0)
|
|
rot.close() |