kopia lustrzana https://github.com/projecthorus/horus-gui
Add callback-based processing to HorusLib
rodzic
2b5fa0a13e
commit
4e5e2e127a
|
@ -56,7 +56,8 @@ def populate_sample_rates(widgets):
|
|||
|
||||
if _dev_name in audioDevices:
|
||||
# TODO: Determine valid samples rates. For now, just use the default.
|
||||
_samp_rate = int(audioDevices[_dev_name]["defaultSampleRate"])
|
||||
# TODO: Add support for resampling.
|
||||
_samp_rate = 48000 # int(audioDevices[_dev_name]["defaultSampleRate"])
|
||||
widgets["audioSampleRateSelector"].addItem(str(_samp_rate))
|
||||
widgets["audioSampleRateSelector"].setCurrentIndex(0)
|
||||
else:
|
||||
|
|
|
@ -86,6 +86,7 @@ class HorusLib:
|
|||
tone_spacing=-1,
|
||||
stereo_iq=False,
|
||||
verbose=False,
|
||||
callback=None
|
||||
):
|
||||
"""
|
||||
Parameters
|
||||
|
@ -102,6 +103,8 @@ class HorusLib:
|
|||
use stereo (IQ) input (quadrature)
|
||||
verbose : bool
|
||||
Enabled horus_set_verbose
|
||||
callback : function
|
||||
Callback function to run on packet detection.
|
||||
"""
|
||||
|
||||
if sys.platform == "darwin":
|
||||
|
@ -161,6 +164,10 @@ class HorusLib:
|
|||
|
||||
self.stereo_iq = stereo_iq
|
||||
|
||||
self.callback = callback
|
||||
|
||||
self.input_buffer = bytearray(b"")
|
||||
|
||||
# intial nin
|
||||
self.nin = 0
|
||||
|
||||
|
@ -239,7 +246,7 @@ class HorusLib:
|
|||
data_out = (
|
||||
b"" # check if bytes is just null and return an empty bytes instead
|
||||
)
|
||||
elif self.mode != Mode.RTTY:
|
||||
elif self.mode != Mode.RTTY_7N2:
|
||||
try:
|
||||
data_out = bytes.fromhex(data_out.decode("ascii"))
|
||||
except ValueError:
|
||||
|
@ -257,6 +264,35 @@ class HorusLib:
|
|||
extended_stats=stats,
|
||||
)
|
||||
return frame
|
||||
|
||||
def add_samples(self, samples: bytes, rate: int=48000):
|
||||
""" Add samples to a input buffer, to pass on to demodulate when we have nin samples """
|
||||
|
||||
# TODO: Resampling support
|
||||
|
||||
# Add samples to input buffer
|
||||
self.input_buffer.extend(samples)
|
||||
|
||||
_processing = True
|
||||
_frame = None
|
||||
while _processing:
|
||||
# Process data until we have less than _nin samples.
|
||||
_nin = self.nin
|
||||
if len(self.input_buffer) > (_nin * 2):
|
||||
# Demodulate
|
||||
_frame = self.demodulate(self.input_buffer[:(_nin*2)])
|
||||
|
||||
# Advance sample buffer.
|
||||
self.input_buffer = self.input_buffer[(_nin*2):]
|
||||
|
||||
# If we have decoded a packet, send it on to the callback
|
||||
if len(_frame.data) > 0:
|
||||
if self.callback:
|
||||
self.callback(_frame)
|
||||
else:
|
||||
_processing = False
|
||||
|
||||
return _frame
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -264,18 +300,33 @@ if __name__ == "__main__":
|
|||
|
||||
filename = sys.argv[1]
|
||||
|
||||
def frame_callback(frame):
|
||||
print(f"Callback: {frame.data.hex()} SNR: {frame.snr}")
|
||||
|
||||
# Setup Logging
|
||||
logging.basicConfig(
|
||||
format="%(asctime)s %(levelname)s: %(message)s", level=logging.INFO
|
||||
format="%(asctime)s %(levelname)s: %(message)s", level=logging.DEBUG
|
||||
)
|
||||
with HorusLib(libpath=".", mode=Mode.BINARY, verbose=True) as horus:
|
||||
# with HorusLib(libpath=".", mode=Mode.BINARY, verbose=False) as horus:
|
||||
# with open(filename, "rb") as f:
|
||||
# while True:
|
||||
# data = f.read(horus.nin * 2)
|
||||
# if horus.nin != 0 and data == b"": # detect end of file
|
||||
# break
|
||||
# output = horus.demodulate(data)
|
||||
# if output.crc_pass and output.data:
|
||||
# print(f"{output.data.hex()} SNR: {output.snr}")
|
||||
# for x in range(horus.mfsk):
|
||||
# print(f"F{str(x)}: {float(output.extended_stats.f_est[x])}")
|
||||
|
||||
|
||||
with HorusLib(libpath=".", mode=Mode.BINARY, verbose=False, callback=frame_callback) as horus:
|
||||
with open(filename, "rb") as f:
|
||||
while True:
|
||||
data = f.read(horus.nin * 2)
|
||||
# Fixed read size - 2000 samples
|
||||
data = f.read(2000 * 2)
|
||||
if horus.nin != 0 and data == b"": # detect end of file
|
||||
break
|
||||
output = horus.demodulate(data)
|
||||
if output.crc_pass and output.data:
|
||||
print(f"{output.data.hex()} SNR: {output.snr}")
|
||||
for x in range(horus.mfsk):
|
||||
print(f"F{str(x)}: {float(output.extended_stats.f_est[x])}")
|
||||
output = horus.add_samples(data)
|
||||
if output:
|
||||
print(f"Sync: {output.sync} SNR: {output.snr}")
|
Ładowanie…
Reference in New Issue