pull/13/head
Mark Jessop 2020-06-22 21:06:55 +09:30
rodzic a1c4e9eb5e
commit c451855fa4
10 zmienionych plików z 767 dodań i 2 usunięć

56
Makefile 100755
Wyświetl plik

@ -0,0 +1,56 @@
# This makefile has been created to help developers perform common actions.
# Most actions assume it is operating in a virtual environment where the
# python command links to the appropriate virtual environment Python.
# Do not remove this block. It is used by the 'help' rule when
# constructing the help output.
# help:
# help: horusgui Makefile help
# help:
# help: help - display this makefile's help information
.PHONY: help
help:
@grep "^# help\:" Makefile | grep -v grep | sed 's/\# help\: //' | sed 's/\# help\://'
# help: style.check - perform code format compliance check
.PHONY: style.check
style.check:
@black src/horusgui apps setup.py --check
# help: style - perform code format compliance changes
.PHONY: style
style:
@black src/horusgui apps setup.py
# help: test - run tests
.PHONY: test
test:
@python -m unittest discover -s tests
# help: test-verbose - run tests [verbosely]
.PHONY: test-verbose
test-verbose:
@python -m unittest discover -s tests -v
# help: dist - create a wheel distribution package
.PHONY: dist
dist:
@python setup.py bdist_wheel
# help: dist-upload - upload a wheel distribution package
.PHONY: dist-upload
dist-upload: dist
@twine upload dist/spew-*-py3-none-any.whl
# Keep these lines at the end of the file to retain nice help
# output formatting.
# help:

37
README.md 100644 → 100755
Wyświetl plik

@ -1,2 +1,35 @@
# horus-gui
Project Horus Telemetry Decoder
# Project Horus Telemetry Decododer
Written by Mark Jessop <vk5qi@rfhead.net>
## Usage
### Dependencies
* [horuslib](https://github.com/projecthorus/horuslib) built, and libhorus.so available either on the system path, or in this directory.
### Create a Virtual Environment
Create a virtual environment and install dependencies.
```console
$ python3 -m venv venv
$ source venv/bin/activate
(venv) $ pip install pip -U (Optional - this updates pip)
(venv) $ pip install -r requirements.txt
```
### Install Package
Install package in a editable state. This type of installation allows a
developer to make changes to the source code while retaining the installation
entry points so it can be used like a normal install.
```console
(venv) $ pip install -e .
```
### Run
`$ python -m horusgui.gui`

7
requirements.txt 100644
Wyświetl plik

@ -0,0 +1,7 @@
numpy
scipy
pyaudio
requests
crcmod
PyQt5
pyqtgraph

50
setup.py 100755
Wyświetl plik

@ -0,0 +1,50 @@
import os
import re
from setuptools import setup, find_packages
regexp = re.compile(r".*__version__ = [\'\"](.*?)[\'\"]", re.S)
init_file = os.path.join(os.path.dirname(__file__), "src", "horusgui", "__init__.py")
with open(init_file, "r") as f:
module_content = f.read()
match = regexp.match(module_content)
if match:
version = match.group(1)
else:
raise RuntimeError(f"Cannot find __version__ in {init_file}")
with open("README.md", "r") as f:
readme = f.read()
with open("requirements.txt", "r") as f:
requirements = []
for line in f.read().split("\n"):
line = line.strip()
if line and not line.startswith("#"):
requirements.append(line)
if __name__ == "__main__":
setup(
name="horusgui",
description="Project Horus Telemetry Decoer",
long_description=readme,
version=version,
install_requires=requirements,
keywords=["horus telemetry radio"],
package_dir={"": "src"},
packages=find_packages("src"),
classifiers=[
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
]
# TODO: Deal with entry points.
# entry_points={
# "console_scripts": [
# "hfssdv=hfssdv.gui.main",
# ]
# },
)

Wyświetl plik

@ -0,0 +1,2 @@
__version__ = "20.6.21"

Wyświetl plik

@ -0,0 +1,121 @@
# Audio Interfacing
import logging
import pyaudio
# Global PyAudio object
pyAudio = None
audioStream = None
audioDevices = {}
def init_audio(widgets):
""" Initialise pyaudio object, and populate list of sound card in GUI """
global pyAudio, audioDevices
# Init PyAudio
pyAudio = pyaudio.PyAudio()
audioDevices = {}
# Clear list
widgets['audioDeviceSelector'].clear()
# Iterate through PyAudio devices
for x in range(0, pyAudio.get_device_count()):
_dev = pyAudio.get_device_info_by_index(x)
# Does the device have inputs?
if _dev['maxInputChannels'] > 0:
# Get the name
_name = _dev['name']
# Add to local store of device info
audioDevices[_name] = _dev
# Add to audio device selection list.
widgets['audioDeviceSelector'].addItem(_name)
# Select first item.
if len(list(audioDevices.keys())) > 0:
widgets['audioDeviceSelector'].setCurrentIndex(0)
# Initial population of sample rates.
populate_sample_rates(widgets)
return audioDevices
def populate_sample_rates(widgets):
""" Populate the sample rate ComboBox with the sample rates of the currently selected audio device """
global audioDevices
# Clear list of sample rates.
widgets['audioSampleRateSelector'].clear()
# Get information on current audio device
_dev_name = widgets['audioDeviceSelector'].currentText()
if _dev_name in audioDevices:
# TODO: Determine valid samples rates. For now, just use the default.
_samp_rate = int(audioDevices[_dev_name]['defaultSampleRate'])
widgets['audioSampleRateSelector'].addItem(str(_samp_rate))
widgets['audioSampleRateSelector'].setCurrentIndex(0)
else:
logging.error("Audio - Unknown Audio Device")
class AudioStream(object):
""" Start up a pyAudio input stream, and pass data around to different callbacks """
def __init__(
self,
audio_device,
fs,
block_size = 8192,
fft_input = None,
modem = None
):
self.audio_device = audio_device
self.fs = fs
self.block_size = block_size
self.fft_input = fft_input
self.modem = modem
# Start audio stream
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=pyaudio.paInt16,
channels=1,
rate=self.fs,
frames_per_buffer=self.block_size,
input=True,
input_device_index=self.audio_device,
output=False,
stream_callback=self.handle_samples
)
def handle_samples(self, data, frame_count, time_info="", status_flags=""):
""" Handle incoming samples from pyaudio """
# Pass samples directly into fft.
if self.fft_input:
self.fft_input(data)
# TODO: Handle modem sample input.
return (None, pyaudio.paContinue)
def stop(self):
""" Halt stream """
self.stream.close()

107
src/horusgui/fft.py 100644
Wyświetl plik

@ -0,0 +1,107 @@
# FFT
import logging
import time
import numpy as np
import scipy.signal
from queue import Queue
from threading import Thread
class FFTProcess(object):
""" Process an incoming stream of samples, and calculate FFTs """
def __init__(
self,
nfft=8192,
stride=4096,
fs=48000,
sample_width=2,
range=[100,4000],
callback=None
):
self.nfft = nfft
self.stride = stride
self.fs = fs
self.sample_width = sample_width
self.range = range
self.callback = callback
self.sample_buffer = bytearray(b"")
self.input_queue = Queue(512)
self.init_window()
self.processing_thread_running = True
self.t = Thread(target=self.processing_thread)
self.t.start()
def init_window(self):
""" Initialise Window functions and FFT scales. """
self.window = scipy.signal.blackmanharris(self.nfft)
self.fft_scale = np.fft.fftshift(np.fft.fftfreq(self.nfft))*self.fs
self.mask = (self.fft_scale>self.range[0]) & (self.fft_scale<self.range[1])
def perform_fft(self):
""" Perform a FFT on the first NFFT samples in the sample buffer, then shift the buffer along """
# Convert raw data to floats.
raw_data = np.fromstring(
bytes(self.sample_buffer[ : self.nfft*self.sample_width]), dtype=np.int16
)
raw_data = raw_data.astype(np.float64) / (2**15)
# Advance sample buffer
self.sample_buffer = self.sample_buffer[self.stride * self.sample_width :]
# Calculate FFT
_fft = 20*np.log10(np.abs(np.fft.fftshift(np.fft.fft(raw_data * self.window)))) - 20*np.log10(self.nfft)
logging.debug("Performed FFT.")
if self.callback != None:
self.callback(
{
'fft': _fft[self.mask],
'scale': self.fft_scale[self.mask]
}
)
def process_block(self, samples):
""" Add a block of samples to the input buffer. Calculate and process FFTs if the buffer is big enough """
self.sample_buffer.extend(samples)
while len(self.sample_buffer) > self.nfft*self.sample_width:
self.perform_fft()
def processing_thread(self):
while self.processing_thread_running:
if self.input_queue.qsize()>0:
data = self.input_queue.get()
self.process_block(data)
else:
time.sleep(0.01)
def add_samples(self, samples):
""" Add a block of samples to the input queue """
try:
self.input_queue.put_nowait(samples)
except:
logging.error("Input overrun!")
def flush(self):
""" Clear the sample buffer """
self.sample_buffer = bytearray(b"")
def stop(self):
""" Halt processing """
self.processing_thread_running = False

316
src/horusgui/gui.py 100644
Wyświetl plik

@ -0,0 +1,316 @@
#!/usr/bin/env python
#
# Horus Telemetry GUI
#
# Mark Jessop <vk5qi@rfhead.net>
#
# Python 3 check
import sys
if sys.version_info < (3, 0):
print("This script requires Python 3!")
sys.exit(1)
import glob
import logging
import pyqtgraph as pg
import numpy as np
from queue import Queue
from pyqtgraph.Qt import QtCore, QtGui, QtWidgets
from pyqtgraph.dockarea import *
from threading import Thread
from .widgets import *
from .audio import *
from .fft import *
from .modem import *
# Setup Logging
logging.basicConfig(
format="%(asctime)s %(levelname)s: %(message)s", level=logging.INFO
)
# Defaults
DEFAULT_CALLSIGN = 'N0CALL'
# Global widget store
widgets = {}
# Queues for handling updates to image / status indications.
fft_update_queue = Queue(256)
status_update_queue = Queue(256)
# List of audio devices and their info
audio_devices = {}
# Processor objects
audio_stream = None
fft_process = None
horus_modem = None
# Global running indicator
running = False
#
# GUI Creation - The Bad way.
#
# Create a Qt App.
pg.mkQApp()
# GUI LAYOUT - Gtk Style!
win = QtGui.QMainWindow()
area = DockArea()
win.setCentralWidget(area)
win.setWindowTitle("Horus Telemetry GUI")
# Create multiple dock areas, for displaying our data.
d0 = Dock("Controls", size=(300,800))
d1 = Dock("Spectrum", size=(800,500))
d2 = Dock("Waterfall", size=(800,200))
d3 = Dock("Telemetry",size=(800,200))
area.addDock(d0, "left")
area.addDock(d1, "right", d0)
area.addDock(d2, "bottom", d1)
area.addDock(d3, "bottom", d2)
# Controls
w1 = pg.LayoutWidget()
# TNC Connection
widgets['audioDeviceLabel'] = QtGui.QLabel("<b>Audio Device:</b>")
widgets['audioDeviceSelector'] = QtGui.QComboBox()
widgets['audioSampleRateLabel'] = QtGui.QLabel("<b>Sample Rate (Hz):</b>")
widgets['audioSampleRateSelector'] = QtGui.QComboBox()
# Modem Parameters
widgets['horusModemLabel'] = QtGui.QLabel("<b>Mode:</b>")
widgets['horusModemSelector'] = QtGui.QComboBox()
widgets['horusModemRateLabel'] = QtGui.QLabel("<b>Baudrate:</b>")
widgets['horusModemRateSelector'] = QtGui.QComboBox()
widgets['horusMaskEstimatorLabel'] = QtGui.QLabel("<b>Enable Mask Estim.:</b>")
widgets['horusMaskEstimatorSelector'] = QtGui.QCheckBox()
widgets['horusMaskSpacingLabel'] = QtGui.QLabel("<b>Tone Spacing (Hz):</b>")
widgets['horusMaskSpacingEntry'] = QtGui.QLineEdit("270")
# Start/Stop
widgets['startDecodeButton'] = QtGui.QPushButton("Start")
# Listener Information
widgets['userCallLabel'] = QtGui.QLabel("<b>Callsign:</b>")
widgets['userCallEntry'] = QtGui.QLineEdit(DEFAULT_CALLSIGN)
widgets['userCallEntry'].setMaxLength(20)
# Layout the Control pane.
# Yes this is horrible. Don't @ me.
w1.addWidget(widgets['audioDeviceLabel'], 0, 0, 1, 1)
w1.addWidget(widgets['audioDeviceSelector'], 0, 1, 1, 1)
w1.addWidget(widgets['audioSampleRateLabel'], 1, 0, 1, 1)
w1.addWidget(widgets['audioSampleRateSelector'], 1, 1, 1, 1)
w1.addWidget(QHLine(), 2, 0, 1, 2)
w1.addWidget(widgets['horusModemLabel'], 3, 0, 1, 1)
w1.addWidget(widgets['horusModemSelector'], 3, 1, 1, 1)
w1.addWidget(widgets['horusModemRateLabel'], 4, 0, 1, 1)
w1.addWidget(widgets['horusModemRateSelector'], 4, 1, 1, 1)
w1.addWidget(widgets['horusMaskEstimatorLabel'], 5, 0, 1, 1)
w1.addWidget(widgets['horusMaskEstimatorSelector'], 5, 1, 1, 1)
w1.addWidget(widgets['horusMaskSpacingLabel'], 6, 0, 1, 1)
w1.addWidget(widgets['horusMaskSpacingEntry'], 6, 1, 1, 1)
w1.addWidget(QHLine(), 7, 0, 1, 2)
w1.addWidget(widgets['startDecodeButton'], 8, 0, 1, 2)
w1.addWidget(QHLine(), 9, 0, 1, 2)
w1.addWidget(widgets['userCallLabel'], 10, 0, 1, 1)
w1.addWidget(widgets['userCallEntry'], 10, 1, 1, 1)
w1.layout.setSpacing(1)
d0.addWidget(w1)
# Spectrum Display
widgets['spectrumPlot'] = pg.PlotWidget(title="Spectra")
widgets['spectrumPlot'].setLabel("left", "Power (dB)")
widgets['spectrumPlot'].setLabel("bottom", "Frequency (Hz)")
widgets['spectrumPlotData']= widgets['spectrumPlot'].plot([0])
widgets['spectrumPlot'].setLabel('left', "Power (dBFs)")
widgets['spectrumPlot'].setLabel('bottom', "Frequency", units="Hz")
widgets['spectrumPlot'].setXRange(100,4000)
widgets['spectrumPlot'].setYRange(-110,-20)
widgets['spectrumPlot'].setLimits(xMin=0, xMax=4000, yMin=-120, yMax=0)
d1.addWidget(widgets['spectrumPlot'])
widgets['spectrumPlotRange'] = [-110, -20]
# Waterfall - TBD
w3 = pg.LayoutWidget()
d2.addWidget(w3)
# Telemetry Data
w4 = pg.LayoutWidget()
rxImageStatus = QtGui.QLabel("No Data Yet.")
w4.addWidget(rxImageStatus, 0, 0, 1, 1)
d3.addWidget(w4)
# Resize window to final resolution, and display.
logging.info("Starting GUI.")
win.resize(1500, 800)
win.show()
# Audio Initialization
audio_devices = init_audio(widgets)
def update_audio_sample_rates():
""" Update the sample-rate dropdown when a different audio device is selected. """
global widgets
# Pass widgets straight on to function from .audio
populate_sample_rates(widgets)
widgets['audioDeviceSelector'].currentIndexChanged.connect(update_audio_sample_rates)
# Initialize modem list.
init_horus_modem(widgets)
def update_modem_settings():
""" Update the modem setting widgets when a different modem is selected """
global widgets
populate_modem_settings(widgets)
widgets['horusModemSelector'].currentIndexChanged.connect(update_modem_settings)
def handle_fft_update(data):
""" Handle a new FFT update """
global widgets
_scale = data['scale']
_data = data['fft']
widgets['spectrumPlotData'].setData(_scale, _data)
# Really basic IIR to smoothly adjust scale
_old_max = widgets['spectrumPlotRange'][1]
_tc = 0.1
_new_max = float((_old_max*(1-_tc)) + (np.max(_data)*_tc))
# Store new max
widgets['spectrumPlotRange'][1] = _new_max
widgets['spectrumPlot'].setYRange(-110, min(0,_new_max)+20)
def add_fft_update(data):
""" Try and insert a new set of FFT data into the update queue """
global fft_update_queue
try:
fft_update_queue.put_nowait(data)
except:
logging.error("FFT Update Queue Full!")
def start_decoding():
global widgets, audio_stream, fft_process, horus_modem, audio_devices, running
if not running:
# Grab settings off widgets
_dev_name = widgets['audioDeviceSelector'].currentText()
_sample_rate = int(widgets['audioSampleRateSelector'].currentText())
_dev_index = audio_devices[_dev_name]['index']
# TODO: Grab horus data here.
# Init FFT Processor
fft_process = FFTProcess(
nfft=8192,
stride=4096,
fs=_sample_rate,
callback=add_fft_update
)
# TODO: Setup modem here
# Setup Audio
audio_stream = AudioStream(
_dev_index,
fs = _sample_rate,
block_size=fft_process.stride,
fft_input = fft_process.add_samples,
modem=None
)
widgets['startDecodeButton'].setText('Stop')
running = True
else:
try:
audio_stream.stop()
except Exception as e:
logging.exception("Could not stop audio stream.", exc_info=e)
try:
fft_process.stop()
except Exception as e:
logging.exception("Could not stop fft processing.", exc_info=e)
widgets['startDecodeButton'].setText('Start')
running = False
widgets['startDecodeButton'].clicked.connect(start_decoding)
# GUI Update Loop
def processQueues():
""" Read in data from the queues, this decouples the GUI and async inputs somewhat. """
global fft_update_queue, status_update_queue
while fft_update_queue.qsize() > 0:
_data = fft_update_queue.get()
handle_fft_update(_data)
while status_update_queue.qsize() > 0:
_status = status_update_queue.get()
# Handle Status updates here.
gui_update_timer = QtCore.QTimer()
gui_update_timer.timeout.connect(processQueues)
gui_update_timer.start(100)
# Main
def main():
# Start the Qt Loop
if (sys.flags.interactive != 1) or not hasattr(QtCore, "PYQT_VERSION"):
QtGui.QApplication.instance().exec_()
try:
audio_stream.stop()
except Exception as e:
logging.exception("Could not stop audio stream.", exc_info=e)
try:
fft_process.stop()
except Exception as e:
logging.exception("Could not stop fft processing.", exc_info=e)
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,63 @@
# Modem Interfacing
import logging
# Modem paramers and defaults
HORUS_MODEM_LIST = {
'Horus Binary v1 (Legacy)': {
'id': 0,
'baud_rates': [50, 100, 300],
'default_baud_rate': 100,
'default_tone_spacing': 270,
'use_mask_estimator': False
},
'RTTY (7N2)': {
'id': 99,
'baud_rates': [50, 100, 300, 600, 1000],
'default_baud_rate': 100,
'default_tone_spacing': 425,
'use_mask_estimator': False
}
}
DEFAULT_MODEM = 'Horus Binary v1 (Legacy)'
horusModem = None
def init_horus_modem(widgets):
""" Initialise the modem drop-down lists """
# Clear modem list.
widgets['horusModemSelector'].clear()
# Add items from modem list
for _modem in HORUS_MODEM_LIST:
widgets['horusModemSelector'].addItem(_modem)
# Select default modem
widgets['horusModemSelector'].setCurrentText(DEFAULT_MODEM)
populate_modem_settings(widgets)
def populate_modem_settings(widgets):
""" Populate the modem settings for the current selected modem """
_current_modem = widgets['horusModemSelector'].currentText()
# Clear baud rate dropdown.
widgets['horusModemRateSelector'].clear()
# Populate
for _rate in HORUS_MODEM_LIST[_current_modem]['baud_rates']:
widgets['horusModemRateSelector'].addItem(str(_rate))
# Select default rate.
widgets['horusModemRateSelector'].setCurrentText(str(HORUS_MODEM_LIST[_current_modem]['default_baud_rate']))
# Set Mask Estimator checkbox.
widgets['horusMaskEstimatorSelector'].setChecked(HORUS_MODEM_LIST[_current_modem]['use_mask_estimator'])
# Set Tone Spacing Input Box
widgets['horusMaskSpacingEntry'].setText(str(HORUS_MODEM_LIST[_current_modem]['default_tone_spacing']))

Wyświetl plik

@ -0,0 +1,10 @@
# Useful widgets
from pyqtgraph.Qt import QtCore, QtGui, QtWidgets
# Useful class for adding horizontal lines.
class QHLine(QtGui.QFrame):
def __init__(self):
super(QHLine, self).__init__()
self.setFrameShape(QtGui.QFrame.HLine)
self.setFrameShadow(QtGui.QFrame.Sunken)