pull/13/head
Mark Jessop 2020-06-26 22:34:05 +09:30
rodzic 081ae9aa24
commit 3c48b578c2
8 zmienionych plików z 253 dodań i 264 usunięć

Wyświetl plik

@ -18,13 +18,13 @@ help:
# help: style.check - perform code format compliance check
.PHONY: style.check
style.check:
@black src/horusgui apps setup.py --check
@black src/horusgui setup.py --check
# help: style - perform code format compliance changes
.PHONY: style
style:
@black src/horusgui apps setup.py
@black src/horusgui setup.py
# help: test - run tests

Wyświetl plik

@ -1,2 +1 @@
__version__ = "20.6.21"

Wyświetl plik

@ -9,6 +9,7 @@ audioStream = None
audioDevices = {}
def init_audio(widgets):
""" Initialise pyaudio object, and populate list of sound card in GUI """
global pyAudio, audioDevices
@ -18,24 +19,24 @@ def init_audio(widgets):
audioDevices = {}
# Clear list
widgets['audioDeviceSelector'].clear()
widgets["audioDeviceSelector"].clear()
# Iterate through PyAudio devices
for x in range(0, pyAudio.get_device_count()):
_dev = pyAudio.get_device_info_by_index(x)
# Does the device have inputs?
if _dev['maxInputChannels'] > 0:
if _dev["maxInputChannels"] > 0:
# Get the name
_name = _dev['name']
_name = _dev["name"]
# Add to local store of device info
audioDevices[_name] = _dev
# Add to audio device selection list.
widgets['audioDeviceSelector'].addItem(_name)
widgets["audioDeviceSelector"].addItem(_name)
# Select first item.
if len(list(audioDevices.keys())) > 0:
widgets['audioDeviceSelector'].setCurrentIndex(0)
widgets["audioDeviceSelector"].setCurrentIndex(0)
# Initial population of sample rates.
populate_sample_rates(widgets)
@ -48,35 +49,25 @@ def populate_sample_rates(widgets):
global audioDevices
# Clear list of sample rates.
widgets['audioSampleRateSelector'].clear()
widgets["audioSampleRateSelector"].clear()
# Get information on current audio device
_dev_name = widgets['audioDeviceSelector'].currentText()
_dev_name = widgets["audioDeviceSelector"].currentText()
if _dev_name in audioDevices:
# TODO: Determine valid samples rates. For now, just use the default.
_samp_rate = int(audioDevices[_dev_name]['defaultSampleRate'])
widgets['audioSampleRateSelector'].addItem(str(_samp_rate))
widgets['audioSampleRateSelector'].setCurrentIndex(0)
_samp_rate = int(audioDevices[_dev_name]["defaultSampleRate"])
widgets["audioSampleRateSelector"].addItem(str(_samp_rate))
widgets["audioSampleRateSelector"].setCurrentIndex(0)
else:
logging.error("Audio - Unknown Audio Device")
class AudioStream(object):
""" Start up a pyAudio input stream, and pass data around to different callbacks """
def __init__(
self,
audio_device,
fs,
block_size = 8192,
def __init__(self, audio_device, fs, block_size=8192, fft_input=None, modem=None):
fft_input = None,
modem = None
):
self.audio_device = audio_device
self.fs = fs
self.block_size = block_size
@ -84,7 +75,6 @@ class AudioStream(object):
self.fft_input = fft_input
self.modem = modem
# Start audio stream
self.audio = pyaudio.PyAudio()
@ -96,26 +86,20 @@ class AudioStream(object):
input=True,
input_device_index=self.audio_device,
output=False,
stream_callback=self.handle_samples
stream_callback=self.handle_samples,
)
def handle_samples(self, data, frame_count, time_info="", status_flags=""):
""" Handle incoming samples from pyaudio """
# Pass samples directly into fft.
if self.fft_input:
self.fft_input(data)
# TODO: Handle modem sample input.
return (None, pyaudio.paContinue)
def stop(self):
""" Halt stream """
self.stream.close()

Wyświetl plik

@ -12,16 +12,16 @@ from . import __version__
default_config = {
'audio_device': 'None',
'modem': 'Horus Binary v1 (Legacy)',
'habitat_upload_enabled': True,
'habitat_call': 'N0CALL',
'habitat_lat': 0.0,
'habitat_lon': 0.0,
'habitat_antenna': "",
'habitat_radio': "Horus-GUI "+ __version__,
'horus_udp_enabled': True,
'horus_udp_port': 55672
"audio_device": "None",
"modem": "Horus Binary v1 (Legacy)",
"habitat_upload_enabled": True,
"habitat_call": "N0CALL",
"habitat_lat": 0.0,
"habitat_lon": 0.0,
"habitat_antenna": "",
"habitat_radio": "Horus-GUI " + __version__,
"horus_udp_enabled": True,
"horus_udp_port": 55672,
}
@ -33,13 +33,12 @@ def init_config(filename="config.yml"):
yaml = YAML()
try:
with open(filename, 'w') as _outfile:
with open(filename, "w") as _outfile:
yaml.dump(default_config, _outfile)
except Exception as e:
logging.error(f"Could not write configuration file - {str(e)}")
def read_config(widgets, filename="config.yml"):
""" Read in a configuration yml file, and set up all GUI widgets """
if not os.path.exists(filename):
@ -50,7 +49,7 @@ def read_config(widgets, filename="config.yml"):
_config = None
try:
with open(filename, 'r') as _infile:
with open(filename, "r") as _infile:
_config = yaml.load(_infile)
except Exception as e:
logging.error(f"Error reading config file - {str(e)}")
@ -60,25 +59,22 @@ def read_config(widgets, filename="config.yml"):
if widgets:
# Habitat Settings
widgets['habitatUploadSelector'].setChecked(_config['habitat_upload_enabled'])
widgets['userCallEntry'].setText(str(_config['habitat_call']))
widgets['userLatEntry'].setText(str(_config['habitat_lat']))
widgets['userLonEntry'].setText(str(_config['habitat_lon']))
widgets['userAntennaEntry'].setText(str(_config['habitat_antenna']))
widgets['userRadioEntry'].setText(str(_config['habitat_radio']))
widgets["habitatUploadSelector"].setChecked(_config["habitat_upload_enabled"])
widgets["userCallEntry"].setText(str(_config["habitat_call"]))
widgets["userLatEntry"].setText(str(_config["habitat_lat"]))
widgets["userLonEntry"].setText(str(_config["habitat_lon"]))
widgets["userAntennaEntry"].setText(str(_config["habitat_antenna"]))
widgets["userRadioEntry"].setText(str(_config["habitat_radio"]))
# Horus Settings
widgets['horusUploadSelector'].setChecked(_config['horus_udp_enabled'])
widgets['horusUDPEntry'].setText(str(_config['horus_udp_port']))
widgets["horusUploadSelector"].setChecked(_config["horus_udp_enabled"])
widgets["horusUDPEntry"].setText(str(_config["horus_udp_port"]))
# Try and set the audio device.
# If the audio device is not in the available list of devices, this will fail silently.
widgets['audioDeviceSelector'].setCurrentText(_config['audio_device'])
widgets["audioDeviceSelector"].setCurrentText(_config["audio_device"])
# Try and set the modem. If the modem is not valid, this will fail silently.
widgets['horusModemSelector'].setCurrentText(_config['modem'])
widgets["horusModemSelector"].setCurrentText(_config["modem"])
def save_config(widgets, filename="config.yml"):
@ -86,22 +82,22 @@ def save_config(widgets, filename="config.yml"):
global default_config
if widgets:
default_config['habitat_upload_enabled'] = widgets['habitatUploadSelector'].isChecked()
default_config['habitat_call'] = widgets['userCallEntry'].text()
default_config['habitat_lat'] = float(widgets['userLatEntry'].text())
default_config['habitat_lon'] = float(widgets['userLonEntry'].text())
default_config['habitat_antenna'] = widgets['userAntennaEntry'].text()
default_config['habitat_radio'] = widgets['userRadioEntry'].text()
default_config['horus_udp_enabled'] = widgets['horusUploadSelector'].isChecked()
default_config['horus_udp_port'] = int(widgets['horusUDPEntry'].text())
default_config['audio_device'] = widgets['audioDeviceSelector'].currentText()
default_config['modem'] = widgets['horusModemSelector'].currentText()
default_config["habitat_upload_enabled"] = widgets[
"habitatUploadSelector"
].isChecked()
default_config["habitat_call"] = widgets["userCallEntry"].text()
default_config["habitat_lat"] = float(widgets["userLatEntry"].text())
default_config["habitat_lon"] = float(widgets["userLonEntry"].text())
default_config["habitat_antenna"] = widgets["userAntennaEntry"].text()
default_config["habitat_radio"] = widgets["userRadioEntry"].text()
default_config["horus_udp_enabled"] = widgets["horusUploadSelector"].isChecked()
default_config["horus_udp_port"] = int(widgets["horusUDPEntry"].text())
default_config["audio_device"] = widgets["audioDeviceSelector"].currentText()
default_config["modem"] = widgets["horusModemSelector"].currentText()
# Write out to config file
init_config(filename)
if __name__ == "__main__":
read_config(None)

Wyświetl plik

@ -1,4 +1,4 @@
# FFT
# FFT
import logging
import time
import numpy as np
@ -6,6 +6,7 @@ import scipy.signal
from queue import Queue
from threading import Thread
class FFTProcess(object):
""" Process an incoming stream of samples, and calculate FFTs """
@ -15,8 +16,8 @@ class FFTProcess(object):
stride=4096,
fs=48000,
sample_width=2,
range=[100,4000],
callback=None
range=[100, 4000],
callback=None,
):
self.nfft = nfft
self.stride = stride
@ -40,54 +41,46 @@ class FFTProcess(object):
def init_window(self):
""" Initialise Window functions and FFT scales. """
self.window = scipy.signal.blackmanharris(self.nfft)
self.fft_scale = np.fft.fftshift(np.fft.fftfreq(self.nfft))*self.fs
self.mask = (self.fft_scale>self.range[0]) & (self.fft_scale<self.range[1])
self.fft_scale = np.fft.fftshift(np.fft.fftfreq(self.nfft)) * self.fs
self.mask = (self.fft_scale > self.range[0]) & (self.fft_scale < self.range[1])
def perform_fft(self):
""" Perform a FFT on the first NFFT samples in the sample buffer, then shift the buffer along """
# Convert raw data to floats.
raw_data = np.fromstring(
bytes(self.sample_buffer[ : self.nfft*self.sample_width]), dtype=np.int16
bytes(self.sample_buffer[: self.nfft * self.sample_width]), dtype=np.int16
)
raw_data = raw_data.astype(np.float64) / (2**15)
raw_data = raw_data.astype(np.float64) / (2 ** 15)
# Advance sample buffer
self.sample_buffer = self.sample_buffer[self.stride * self.sample_width :]
# Calculate FFT
_fft = 20*np.log10(np.abs(np.fft.fftshift(np.fft.fft(raw_data * self.window)))) - 20*np.log10(self.nfft)
_fft = 20 * np.log10(
np.abs(np.fft.fftshift(np.fft.fft(raw_data * self.window)))
) - 20 * np.log10(self.nfft)
if self.callback != None:
self.callback(
{
'fft': _fft[self.mask],
'scale': self.fft_scale[self.mask]
}
)
self.callback({"fft": _fft[self.mask], "scale": self.fft_scale[self.mask]})
def process_block(self, samples):
""" Add a block of samples to the input buffer. Calculate and process FFTs if the buffer is big enough """
self.sample_buffer.extend(samples)
while len(self.sample_buffer) > self.nfft*self.sample_width:
while len(self.sample_buffer) > self.nfft * self.sample_width:
self.perform_fft()
def processing_thread(self):
while self.processing_thread_running:
if self.input_queue.qsize()>0:
if self.input_queue.qsize() > 0:
data = self.input_queue.get()
self.process_block(data)
else:
time.sleep(0.01)
def add_samples(self, samples):
""" Add a block of samples to the input queue """
try:
@ -95,7 +88,6 @@ class FFTProcess(object):
except:
logging.error("Input overrun!")
def flush(self):
""" Clear the sample buffer """
self.sample_buffer = bytearray(b"")

Wyświetl plik

@ -31,9 +31,7 @@ from .config import *
from . import __version__
# Setup Logging
logging.basicConfig(
format="%(asctime)s %(levelname)s: %(message)s", level=logging.INFO
)
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=logging.INFO)
# Global widget store
widgets = {}
@ -67,17 +65,17 @@ win.setCentralWidget(area)
win.setWindowTitle("Horus Telemetry GUI")
# Create multiple dock areas, for displaying our data.
d0 = Dock("Audio", size=(300,50))
d0_modem = Dock("Modem", size=(300,80))
d0_habitat = Dock("Habitat", size=(300,200))
d0_other = Dock("Other", size=(300,100))
d1 = Dock("Spectrum", size=(800,400))
d2 = Dock("Modem Stats", size=(800,300))
d3 = Dock("Data",size=(800,50))
d4 = Dock("Log",size=(800,150))
d0 = Dock("Audio", size=(300, 50))
d0_modem = Dock("Modem", size=(300, 80))
d0_habitat = Dock("Habitat", size=(300, 200))
d0_other = Dock("Other", size=(300, 100))
d1 = Dock("Spectrum", size=(800, 400))
d2 = Dock("Modem Stats", size=(800, 300))
d3 = Dock("Data", size=(800, 50))
d4 = Dock("Log", size=(800, 150))
# Arrange docks.
area.addDock(d0)
area.addDock(d1, 'right', d0)
area.addDock(d1, "right", d0)
area.addDock(d0_modem, "bottom", d0)
area.addDock(d0_habitat, "bottom", d0_modem)
area.addDock(d0_other, "below", d0_habitat)
@ -90,152 +88,168 @@ d0_habitat.raiseDock()
# Controls
w1_audio = pg.LayoutWidget()
# TNC Connection
widgets['audioDeviceLabel'] = QtGui.QLabel("<b>Audio Device:</b>")
widgets['audioDeviceSelector'] = QtGui.QComboBox()
widgets["audioDeviceLabel"] = QtGui.QLabel("<b>Audio Device:</b>")
widgets["audioDeviceSelector"] = QtGui.QComboBox()
widgets['audioSampleRateLabel'] = QtGui.QLabel("<b>Sample Rate (Hz):</b>")
widgets['audioSampleRateSelector'] = QtGui.QComboBox()
widgets["audioSampleRateLabel"] = QtGui.QLabel("<b>Sample Rate (Hz):</b>")
widgets["audioSampleRateSelector"] = QtGui.QComboBox()
w1_audio.addWidget(widgets['audioDeviceLabel'], 0, 0, 1, 1)
w1_audio.addWidget(widgets['audioDeviceSelector'], 0, 1, 1, 1)
w1_audio.addWidget(widgets['audioSampleRateLabel'], 1, 0, 1, 1)
w1_audio.addWidget(widgets['audioSampleRateSelector'], 1, 1, 1, 1)
w1_audio.addWidget(widgets["audioDeviceLabel"], 0, 0, 1, 1)
w1_audio.addWidget(widgets["audioDeviceSelector"], 0, 1, 1, 1)
w1_audio.addWidget(widgets["audioSampleRateLabel"], 1, 0, 1, 1)
w1_audio.addWidget(widgets["audioSampleRateSelector"], 1, 1, 1, 1)
d0.addWidget(w1_audio)
w1_modem = pg.LayoutWidget()
# Modem Parameters
widgets['horusModemLabel'] = QtGui.QLabel("<b>Mode:</b>")
widgets['horusModemSelector'] = QtGui.QComboBox()
widgets["horusModemLabel"] = QtGui.QLabel("<b>Mode:</b>")
widgets["horusModemSelector"] = QtGui.QComboBox()
widgets['horusModemRateLabel'] = QtGui.QLabel("<b>Baudrate:</b>")
widgets['horusModemRateSelector'] = QtGui.QComboBox()
widgets["horusModemRateLabel"] = QtGui.QLabel("<b>Baudrate:</b>")
widgets["horusModemRateSelector"] = QtGui.QComboBox()
widgets['horusMaskEstimatorLabel'] = QtGui.QLabel("<b>Enable Mask Estim.:</b>")
widgets['horusMaskEstimatorSelector'] = QtGui.QCheckBox()
widgets["horusMaskEstimatorLabel"] = QtGui.QLabel("<b>Enable Mask Estim.:</b>")
widgets["horusMaskEstimatorSelector"] = QtGui.QCheckBox()
widgets['horusMaskSpacingLabel'] = QtGui.QLabel("<b>Tone Spacing (Hz):</b>")
widgets['horusMaskSpacingEntry'] = QtGui.QLineEdit("270")
widgets["horusMaskSpacingLabel"] = QtGui.QLabel("<b>Tone Spacing (Hz):</b>")
widgets["horusMaskSpacingEntry"] = QtGui.QLineEdit("270")
# Start/Stop
widgets['startDecodeButton'] = QtGui.QPushButton("Start")
widgets["startDecodeButton"] = QtGui.QPushButton("Start")
w1_modem.addWidget(widgets['horusModemLabel'], 0, 0, 1, 1)
w1_modem.addWidget(widgets['horusModemSelector'], 0, 1, 1, 1)
w1_modem.addWidget(widgets['horusModemRateLabel'], 1, 0, 1, 1)
w1_modem.addWidget(widgets['horusModemRateSelector'], 1, 1, 1, 1)
w1_modem.addWidget(widgets['horusMaskEstimatorLabel'], 2, 0, 1, 1)
w1_modem.addWidget(widgets['horusMaskEstimatorSelector'], 2, 1, 1, 1)
w1_modem.addWidget(widgets['horusMaskSpacingLabel'], 3, 0, 1, 1)
w1_modem.addWidget(widgets['horusMaskSpacingEntry'], 3, 1, 1, 1)
w1_modem.addWidget(widgets['startDecodeButton'], 4, 0, 2, 2)
w1_modem.addWidget(widgets["horusModemLabel"], 0, 0, 1, 1)
w1_modem.addWidget(widgets["horusModemSelector"], 0, 1, 1, 1)
w1_modem.addWidget(widgets["horusModemRateLabel"], 1, 0, 1, 1)
w1_modem.addWidget(widgets["horusModemRateSelector"], 1, 1, 1, 1)
w1_modem.addWidget(widgets["horusMaskEstimatorLabel"], 2, 0, 1, 1)
w1_modem.addWidget(widgets["horusMaskEstimatorSelector"], 2, 1, 1, 1)
w1_modem.addWidget(widgets["horusMaskSpacingLabel"], 3, 0, 1, 1)
w1_modem.addWidget(widgets["horusMaskSpacingEntry"], 3, 1, 1, 1)
w1_modem.addWidget(widgets["startDecodeButton"], 4, 0, 2, 2)
d0_modem.addWidget(w1_modem)
w1_habitat = pg.LayoutWidget()
# Listener Information
widgets['habitatHeading'] = QtGui.QLabel("<b>Habitat Settings</b>")
widgets['habitatUploadLabel'] = QtGui.QLabel("<b>Enable Habitat Upload:</b>")
widgets['habitatUploadSelector'] = QtGui.QCheckBox()
widgets['habitatUploadSelector'].setChecked(True)
widgets['userCallLabel'] = QtGui.QLabel("<b>Callsign:</b>")
widgets['userCallEntry'] = QtGui.QLineEdit("N0CALL")
widgets['userCallEntry'].setMaxLength(20)
widgets['userLocationLabel'] = QtGui.QLabel("<b>Lat/Lon:</b>")
widgets['userLatEntry'] = QtGui.QLineEdit("0.0")
widgets['userLonEntry'] = QtGui.QLineEdit("0.0")
widgets['userAntennaLabel'] = QtGui.QLabel("<b>Antenna:</b>")
widgets['userAntennaEntry'] = QtGui.QLineEdit("")
widgets['userRadioLabel'] = QtGui.QLabel("<b>Radio:</b>")
widgets['userRadioEntry'] = QtGui.QLineEdit("Horus-GUI " + __version__)
widgets["habitatHeading"] = QtGui.QLabel("<b>Habitat Settings</b>")
widgets["habitatUploadLabel"] = QtGui.QLabel("<b>Enable Habitat Upload:</b>")
widgets["habitatUploadSelector"] = QtGui.QCheckBox()
widgets["habitatUploadSelector"].setChecked(True)
widgets["userCallLabel"] = QtGui.QLabel("<b>Callsign:</b>")
widgets["userCallEntry"] = QtGui.QLineEdit("N0CALL")
widgets["userCallEntry"].setMaxLength(20)
widgets["userLocationLabel"] = QtGui.QLabel("<b>Lat/Lon:</b>")
widgets["userLatEntry"] = QtGui.QLineEdit("0.0")
widgets["userLonEntry"] = QtGui.QLineEdit("0.0")
widgets["userAntennaLabel"] = QtGui.QLabel("<b>Antenna:</b>")
widgets["userAntennaEntry"] = QtGui.QLineEdit("")
widgets["userRadioLabel"] = QtGui.QLabel("<b>Radio:</b>")
widgets["userRadioEntry"] = QtGui.QLineEdit("Horus-GUI " + __version__)
w1_habitat.addWidget(widgets['habitatUploadLabel'],0, 0, 1, 1)
w1_habitat.addWidget(widgets['habitatUploadSelector'],0, 1, 1, 1)
w1_habitat.addWidget(widgets['userCallLabel'], 1, 0, 1, 1)
w1_habitat.addWidget(widgets['userCallEntry'], 1, 1, 1, 2)
w1_habitat.addWidget(widgets['userLocationLabel'], 2, 0, 1, 1)
w1_habitat.addWidget(widgets['userLatEntry'], 2, 1, 1, 1)
w1_habitat.addWidget(widgets['userLonEntry'], 2, 2, 1, 1)
w1_habitat.addWidget(widgets['userAntennaLabel'], 3, 0, 1, 1)
w1_habitat.addWidget(widgets['userAntennaEntry'], 3, 1, 1, 2)
w1_habitat.addWidget(widgets['userRadioLabel'], 4, 0, 1, 1)
w1_habitat.addWidget(widgets['userRadioEntry'], 4, 1, 1, 2)
w1_habitat.addWidget(widgets["habitatUploadLabel"], 0, 0, 1, 1)
w1_habitat.addWidget(widgets["habitatUploadSelector"], 0, 1, 1, 1)
w1_habitat.addWidget(widgets["userCallLabel"], 1, 0, 1, 1)
w1_habitat.addWidget(widgets["userCallEntry"], 1, 1, 1, 2)
w1_habitat.addWidget(widgets["userLocationLabel"], 2, 0, 1, 1)
w1_habitat.addWidget(widgets["userLatEntry"], 2, 1, 1, 1)
w1_habitat.addWidget(widgets["userLonEntry"], 2, 2, 1, 1)
w1_habitat.addWidget(widgets["userAntennaLabel"], 3, 0, 1, 1)
w1_habitat.addWidget(widgets["userAntennaEntry"], 3, 1, 1, 2)
w1_habitat.addWidget(widgets["userRadioLabel"], 4, 0, 1, 1)
w1_habitat.addWidget(widgets["userRadioEntry"], 4, 1, 1, 2)
d0_habitat.addWidget(w1_habitat)
w1_other = pg.LayoutWidget()
widgets['horusUploadLabel'] = QtGui.QLabel("<b>Enable Horus UDP Output:</b>")
widgets['horusUploadSelector'] = QtGui.QCheckBox()
widgets['horusUploadSelector'].setChecked(True)
widgets['horusUDPLabel'] = QtGui.QLabel("<b>Horus UDP Port:</b>")
widgets['horusUDPEntry'] = QtGui.QLineEdit("55672")
widgets['horusUDPEntry'].setMaxLength(5)
widgets["horusUploadLabel"] = QtGui.QLabel("<b>Enable Horus UDP Output:</b>")
widgets["horusUploadSelector"] = QtGui.QCheckBox()
widgets["horusUploadSelector"].setChecked(True)
widgets["horusUDPLabel"] = QtGui.QLabel("<b>Horus UDP Port:</b>")
widgets["horusUDPEntry"] = QtGui.QLineEdit("55672")
widgets["horusUDPEntry"].setMaxLength(5)
w1_other.addWidget(widgets['horusUploadLabel'], 0, 0, 1, 1)
w1_other.addWidget(widgets['horusUploadSelector'], 0, 1, 1, 1)
w1_other.addWidget(widgets['horusUDPLabel'], 1, 0, 1, 1)
w1_other.addWidget(widgets['horusUDPEntry'], 1, 1, 1, 1)
w1_other.addWidget(widgets["horusUploadLabel"], 0, 0, 1, 1)
w1_other.addWidget(widgets["horusUploadSelector"], 0, 1, 1, 1)
w1_other.addWidget(widgets["horusUDPLabel"], 1, 0, 1, 1)
w1_other.addWidget(widgets["horusUDPEntry"], 1, 1, 1, 1)
d0_other.addWidget(w1_other)
# Spectrum Display
widgets['spectrumPlot'] = pg.PlotWidget(title="Spectra")
widgets['spectrumPlot'].setLabel("left", "Power (dB)")
widgets['spectrumPlot'].setLabel("bottom", "Frequency (Hz)")
widgets['spectrumPlotData']= widgets['spectrumPlot'].plot([0])
widgets["spectrumPlot"] = pg.PlotWidget(title="Spectra")
widgets["spectrumPlot"].setLabel("left", "Power (dB)")
widgets["spectrumPlot"].setLabel("bottom", "Frequency (Hz)")
widgets["spectrumPlotData"] = widgets["spectrumPlot"].plot([0])
widgets['spectrumPlot'].setLabel('left', "Power (dBFs)")
widgets['spectrumPlot'].setLabel('bottom', "Frequency", units="Hz")
widgets['spectrumPlot'].setXRange(100,4000)
widgets['spectrumPlot'].setYRange(-100,-20)
widgets['spectrumPlot'].setLimits(xMin=0, xMax=4000, yMin=-120, yMax=0)
widgets["spectrumPlot"].setLabel("left", "Power (dBFs)")
widgets["spectrumPlot"].setLabel("bottom", "Frequency", units="Hz")
widgets["spectrumPlot"].setXRange(100, 4000)
widgets["spectrumPlot"].setYRange(-100, -20)
widgets["spectrumPlot"].setLimits(xMin=0, xMax=4000, yMin=-120, yMax=0)
# Frequency Estiator Outputs
widgets['estimatorLines'] = [
pg.InfiniteLine(pos=-1000,pen=pg.mkPen(color='w', width=2, style=QtCore.Qt.DashLine), label='F1'),
pg.InfiniteLine(pos=-1000,pen=pg.mkPen(color='w', width=2, style=QtCore.Qt.DashLine), label='F2'),
pg.InfiniteLine(pos=-1000,pen=pg.mkPen(color='w', width=2, style=QtCore.Qt.DashLine), label='F3'),
pg.InfiniteLine(pos=-1000,pen=pg.mkPen(color='w', width=2, style=QtCore.Qt.DashLine), label='F4')
widgets["estimatorLines"] = [
pg.InfiniteLine(
pos=-1000,
pen=pg.mkPen(color="w", width=2, style=QtCore.Qt.DashLine),
label="F1",
),
pg.InfiniteLine(
pos=-1000,
pen=pg.mkPen(color="w", width=2, style=QtCore.Qt.DashLine),
label="F2",
),
pg.InfiniteLine(
pos=-1000,
pen=pg.mkPen(color="w", width=2, style=QtCore.Qt.DashLine),
label="F3",
),
pg.InfiniteLine(
pos=-1000,
pen=pg.mkPen(color="w", width=2, style=QtCore.Qt.DashLine),
label="F4",
),
]
for _line in widgets['estimatorLines']:
widgets['spectrumPlot'].addItem(_line)
for _line in widgets["estimatorLines"]:
widgets["spectrumPlot"].addItem(_line)
d1.addWidget(widgets['spectrumPlot'])
d1.addWidget(widgets["spectrumPlot"])
widgets['spectrumPlotRange'] = [-100, -20]
widgets["spectrumPlotRange"] = [-100, -20]
# Waterfall - TBD
w3 = pg.LayoutWidget()
widgets['snrPlot'] = pg.PlotWidget(title="SNR")
widgets['snrPlot'].setLabel("left", "SNR (dB)")
widgets['snrPlot'].setLabel("bottom", "Time (s)")
widgets['snrPlot'].setXRange(-60,0)
widgets['snrPlot'].setYRange(-10,30)
widgets['snrPlot'].setLimits(xMin=0, xMax=60, yMin=-100, yMax=40)
widgets['snrPlotRange'] = [-10, 30]
widgets["snrPlot"] = pg.PlotWidget(title="SNR")
widgets["snrPlot"].setLabel("left", "SNR (dB)")
widgets["snrPlot"].setLabel("bottom", "Time (s)")
widgets["snrPlot"].setXRange(-60, 0)
widgets["snrPlot"].setYRange(-10, 30)
widgets["snrPlot"].setLimits(xMin=0, xMax=60, yMin=-100, yMax=40)
widgets["snrPlotRange"] = [-10, 30]
widgets['eyeDiagramPlot'] = pg.PlotWidget(title="Eye Diagram")
widgets["eyeDiagramPlot"] = pg.PlotWidget(title="Eye Diagram")
w3.addWidget(widgets['snrPlot'],0,0)
w3.addWidget(widgets['eyeDiagramPlot'],0,1)
w3.addWidget(widgets["snrPlot"], 0, 0)
w3.addWidget(widgets["eyeDiagramPlot"], 0, 1)
d2.addWidget(w3)
# Telemetry Data
w4 = pg.LayoutWidget()
widgets['latestSentenceLabel'] = QtGui.QLabel("<b>Latest Sentence:</b>")
widgets['latestSentenceData'] = QtGui.QLabel("NO DATA")
widgets['latestSentenceData'].setFont(QtGui.QFont("Courier New", 18, QtGui.QFont.Bold))
w4.addWidget(widgets['latestSentenceLabel'], 0, 0, 1, 1)
w4.addWidget(widgets['latestSentenceData'], 0, 1, 1, 6)
widgets["latestSentenceLabel"] = QtGui.QLabel("<b>Latest Sentence:</b>")
widgets["latestSentenceData"] = QtGui.QLabel("NO DATA")
widgets["latestSentenceData"].setFont(QtGui.QFont("Courier New", 18, QtGui.QFont.Bold))
w4.addWidget(widgets["latestSentenceLabel"], 0, 0, 1, 1)
w4.addWidget(widgets["latestSentenceData"], 0, 1, 1, 6)
d3.addWidget(w4)
w5 = pg.LayoutWidget()
widgets['console'] = QtWidgets.QPlainTextEdit()
widgets['console'].setReadOnly(True)
w5.addWidget(widgets['console'])
widgets["console"] = QtWidgets.QPlainTextEdit()
widgets["console"].setReadOnly(True)
w5.addWidget(widgets["console"])
d4.addWidget(w5)
# Resize window to final resolution, and display.
@ -243,52 +257,56 @@ logging.info("Starting GUI.")
win.resize(1500, 800)
win.show()
# Audio Initialization
# Audio Initialization
audio_devices = init_audio(widgets)
def update_audio_sample_rates():
""" Update the sample-rate dropdown when a different audio device is selected. """
global widgets
# Pass widgets straight on to function from .audio
populate_sample_rates(widgets)
widgets['audioDeviceSelector'].currentIndexChanged.connect(update_audio_sample_rates)
widgets["audioDeviceSelector"].currentIndexChanged.connect(update_audio_sample_rates)
# Initialize modem list.
init_horus_modem(widgets)
def update_modem_settings():
""" Update the modem setting widgets when a different modem is selected """
global widgets
populate_modem_settings(widgets)
widgets['horusModemSelector'].currentIndexChanged.connect(update_modem_settings)
widgets["horusModemSelector"].currentIndexChanged.connect(update_modem_settings)
# Read in configuration file settings
read_config(widgets)
def handle_fft_update(data):
""" Handle a new FFT update """
global widgets
_scale = data['scale']
_data = data['fft']
_scale = data["scale"]
_data = data["fft"]
widgets['spectrumPlotData'].setData(_scale, _data)
widgets["spectrumPlotData"].setData(_scale, _data)
# Really basic IIR to smoothly adjust scale
_old_max = widgets['spectrumPlotRange'][1]
_old_max = widgets["spectrumPlotRange"][1]
_tc = 0.1
_new_max = float((_old_max*(1-_tc)) + (np.max(_data)*_tc))
_new_max = float((_old_max * (1 - _tc)) + (np.max(_data) * _tc))
# Store new max
widgets['spectrumPlotRange'][1] = _new_max
widgets['spectrumPlot'].setYRange(widgets['spectrumPlotRange'][0], min(0,_new_max)+20)
widgets["spectrumPlotRange"][1] = _new_max
widgets["spectrumPlot"].setYRange(
widgets["spectrumPlotRange"][0], min(0, _new_max) + 20
)
def add_fft_update(data):
@ -305,19 +323,15 @@ def start_decoding():
if not running:
# Grab settings off widgets
_dev_name = widgets['audioDeviceSelector'].currentText()
_sample_rate = int(widgets['audioSampleRateSelector'].currentText())
_dev_index = audio_devices[_dev_name]['index']
_dev_name = widgets["audioDeviceSelector"].currentText()
_sample_rate = int(widgets["audioSampleRateSelector"].currentText())
_dev_index = audio_devices[_dev_name]["index"]
# TODO: Grab horus data here.
# Init FFT Processor
fft_process = FFTProcess(
nfft=8192,
stride=4096,
fs=_sample_rate,
callback=add_fft_update
nfft=8192, stride=4096, fs=_sample_rate, callback=add_fft_update
)
# TODO: Setup modem here
@ -325,13 +339,13 @@ def start_decoding():
# Setup Audio
audio_stream = AudioStream(
_dev_index,
fs = _sample_rate,
fs=_sample_rate,
block_size=fft_process.stride,
fft_input = fft_process.add_samples,
modem=None
fft_input=fft_process.add_samples,
modem=None,
)
widgets['startDecodeButton'].setText('Stop')
widgets["startDecodeButton"].setText("Stop")
running = True
logging.info("Started Audio Processing.")
@ -341,7 +355,7 @@ def start_decoding():
audio_stream.stop()
except Exception as e:
logging.exception("Could not stop audio stream.", exc_info=e)
try:
fft_process.stop()
except Exception as e:
@ -350,14 +364,13 @@ def start_decoding():
fft_update_queue = Queue(256)
status_update_queue = Queue(256)
widgets['startDecodeButton'].setText('Start')
widgets["startDecodeButton"].setText("Start")
running = False
logging.info("Stopped Audio Processing.")
widgets['startDecodeButton'].clicked.connect(start_decoding)
widgets["startDecodeButton"].clicked.connect(start_decoding)
# GUI Update Loop
@ -382,6 +395,7 @@ gui_update_timer.start(100)
class ConsoleHandler(logging.Handler):
""" Logging handler to write to the GUI console """
def __init__(self, consolewidget):
logging.Handler.__init__(self)
self.consolewidget = consolewidget
@ -391,8 +405,9 @@ class ConsoleHandler(logging.Handler):
_text = f"{record.levelname} {_time.strftime('%H:%M:%S')}:\t{record.msg}"
self.consolewidget.appendPlainText(_text)
# Add console handler to top level logger.
console_handler = ConsoleHandler(widgets['console'])
console_handler = ConsoleHandler(widgets["console"])
logging.getLogger().addHandler(console_handler)
@ -405,12 +420,12 @@ def main():
if (sys.flags.interactive != 1) or not hasattr(QtCore, "PYQT_VERSION"):
QtGui.QApplication.instance().exec_()
save_config(widgets)
try:
audio_stream.stop()
except Exception as e:
pass
try:
fft_process.stop()
except Exception as e:
@ -419,5 +434,3 @@ def main():
if __name__ == "__main__":
main()

Wyświetl plik

@ -4,38 +4,39 @@ import logging
# Modem paramers and defaults
HORUS_MODEM_LIST = {
'Horus Binary v1 (Legacy)': {
'id': 0,
'baud_rates': [50, 100, 300],
'default_baud_rate': 100,
'default_tone_spacing': 270,
'use_mask_estimator': False
"Horus Binary v1 (Legacy)": {
"id": 0,
"baud_rates": [50, 100, 300],
"default_baud_rate": 100,
"default_tone_spacing": 270,
"use_mask_estimator": False,
},
"RTTY (7N2)": {
"id": 99,
"baud_rates": [50, 100, 300, 600, 1000],
"default_baud_rate": 100,
"default_tone_spacing": 425,
"use_mask_estimator": False,
},
'RTTY (7N2)': {
'id': 99,
'baud_rates': [50, 100, 300, 600, 1000],
'default_baud_rate': 100,
'default_tone_spacing': 425,
'use_mask_estimator': False
}
}
DEFAULT_MODEM = 'Horus Binary v1 (Legacy)'
DEFAULT_MODEM = "Horus Binary v1 (Legacy)"
horusModem = None
def init_horus_modem(widgets):
""" Initialise the modem drop-down lists """
# Clear modem list.
widgets['horusModemSelector'].clear()
widgets["horusModemSelector"].clear()
# Add items from modem list
for _modem in HORUS_MODEM_LIST:
widgets['horusModemSelector'].addItem(_modem)
widgets["horusModemSelector"].addItem(_modem)
# Select default modem
widgets['horusModemSelector'].setCurrentText(DEFAULT_MODEM)
widgets["horusModemSelector"].setCurrentText(DEFAULT_MODEM)
populate_modem_settings(widgets)
@ -43,21 +44,26 @@ def init_horus_modem(widgets):
def populate_modem_settings(widgets):
""" Populate the modem settings for the current selected modem """
_current_modem = widgets['horusModemSelector'].currentText()
_current_modem = widgets["horusModemSelector"].currentText()
# Clear baud rate dropdown.
widgets['horusModemRateSelector'].clear()
widgets["horusModemRateSelector"].clear()
# Populate
for _rate in HORUS_MODEM_LIST[_current_modem]['baud_rates']:
widgets['horusModemRateSelector'].addItem(str(_rate))
for _rate in HORUS_MODEM_LIST[_current_modem]["baud_rates"]:
widgets["horusModemRateSelector"].addItem(str(_rate))
# Select default rate.
widgets['horusModemRateSelector'].setCurrentText(str(HORUS_MODEM_LIST[_current_modem]['default_baud_rate']))
widgets["horusModemRateSelector"].setCurrentText(
str(HORUS_MODEM_LIST[_current_modem]["default_baud_rate"])
)
# Set Mask Estimator checkbox.
widgets['horusMaskEstimatorSelector'].setChecked(HORUS_MODEM_LIST[_current_modem]['use_mask_estimator'])
widgets["horusMaskEstimatorSelector"].setChecked(
HORUS_MODEM_LIST[_current_modem]["use_mask_estimator"]
)
# Set Tone Spacing Input Box
widgets['horusMaskSpacingEntry'].setText(str(HORUS_MODEM_LIST[_current_modem]['default_tone_spacing']))
widgets["horusMaskSpacingEntry"].setText(
str(HORUS_MODEM_LIST[_current_modem]["default_tone_spacing"])
)

Wyświetl plik

@ -7,4 +7,3 @@ class QHLine(QtGui.QFrame):
super(QHLine, self).__init__()
self.setFrameShape(QtGui.QFrame.HLine)
self.setFrameShadow(QtGui.QFrame.Sunken)