gr-lora/apps/grlora_analyze.py

130 wiersze
4.4 KiB
Python
Executable File

#!/usr/bin/python3
# -----------------------------------------------------------------------------
# grlora_analyze.py
# A Python tool capable of performing a real-time analysis of signals passed
# by GNU Radio.
#
# Author: Pieter Robyns
# -----------------------------------------------------------------------------
import os
import socket
import numpy as np
import matplotlib.pyplot as plt
import struct
import argparse
from threading import Thread
# Pop num bytes from l
def fetch(l, num):
fet = l[0:num]
del l[0:num]
return fet
class State:
READ_HEADER = 0
READ_DATA = 1
class Plotter(Thread):
def __init__(self):
Thread.__init__(self)
self.setDaemon(True)
self.socket_address = "/tmp/gr_lora.sock"
self.socket = None
self.init_socket()
plt.ion()
def init_socket(self):
if self.socket is None:
try:
os.unlink(self.socket_address)
except OSError:
if os.path.exists(self.socket_address):
raise FileExistsError
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.bind(self.socket_address)
self.socket.listen(1)
self.socket.setblocking(0)
print("[gr-lora analyzer]: listening at " + self.socket_address)
else:
return # Socket is already initialized
def get_data(self, connection):
buffer = connection.recv(1024)
if buffer:
return buffer
else:
raise ConnectionResetError
def run(self):
while True:
plt.pause(0.0001) # Give some time to Qt GUI to render
try:
client_socket, client_address = self.socket.accept()
try:
self.state = State.READ_HEADER
data = bytearray()
data_len = 0
draw_over = False
while True:
plt.pause(0.0001)
try:
# Parse buffer data
if self.state == State.READ_HEADER:
while len(data) < 5:
data += self.get_data(client_socket)
data_len, draw_over = struct.unpack(">I?", fetch(data, 5))
self.state = State.READ_DATA
elif self.state == State.READ_DATA:
while len(data) < data_len:
data += self.get_data(client_socket)
plot_data = np.frombuffer(fetch(data, data_len), dtype=np.complex64)
if not draw_over:
plt.gcf().clear()
plt.plot(np.arange(len(plot_data)), np.real(plot_data), "b", np.arange(len(plot_data)), np.imag(plot_data), "g")
self.state = State.READ_HEADER
except ConnectionResetError:
print("[gr-lora analyzer]: connection reset")
break
finally:
# Clean up the connection
client_socket.close()
except BlockingIOError:
pass
# Test operation of the Plotter class
def test():
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect("/tmp/gr_lora.sock")
try:
print("[fake client]: sending!")
with open(os.path.expanduser("~/usrpsf7.cfile"),"rb") as f:
data = f.read()
chunk_size = 524288
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
data_len = len(chunk)
draw_over = False
client.sendall(struct.pack(">I?", data_len, draw_over) + chunk)
finally:
print("[fake client]: done sending!")
client.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='gr-lora debugging / analysis tool for interfacing with GNU Radio over Unix sockets')
parser.add_argument('--test', dest='test', help='Peform a test', action='store_true')
args, unknown = parser.parse_known_args()
plotter = Plotter()
plotter.start()
if args.test:
test()
plotter.join()
exit(0)