Moved UDP server functionality for receiving LoRa frames to lora.lorasocket.LoRaUDPServer

pull/61/head
Pieter Robyns 2018-01-31 10:49:45 +01:00
rodzic bb0810f13a
commit 60e94eda37
4 zmienionych plików z 39 dodań i 33 usunięć

Wyświetl plik

@ -26,8 +26,6 @@ def iphase(cpx):
return np.unwrap(np.angle(cpx))
def ifreq(cpx):
global fs
iphase_signal = iphase(cpx)
return np.diff(iphase_signal)

Wyświetl plik

@ -32,6 +32,7 @@ GR_PYTHON_INSTALL(
FILES
__init__.py
lora_receiver.py
lorasocket.py
loraconfig.py DESTINATION ${GR_PYTHON_DIR}/lora
)

Wyświetl plik

@ -0,0 +1,34 @@
import socket
import binascii
class LoRaUDPServer():
def __init__(self, ip="127.0.0.1", port=40868, timeout=10):
self.ip = ip
self.port = port
self.timeout = timeout
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.s.bind((self.ip, self.port))
self.s.settimeout(self.timeout)
def __del__(self):
self.s.close()
def get_payloads(self, number_of_payloads):
"""
Returns array of <number_of_payloads> hexadecimal LoRa payload datagrams received on a socket.
"""
total_data = []
data = ''
for i in range(number_of_payloads):
try:
data = self.s.recvfrom(65535)[0]
if data:
total_data.append(binascii.hexlify(data))
except Exception as e:
print(e)
pass
return total_data

Wyświetl plik

@ -16,6 +16,7 @@ from gnuradio import gr, gr_unittest, blocks, filter
from gnuradio.filter import firdes
from sigmf.sigmffile import SigMFFile
from lora.loraconfig import LoRaConfig
from lora.lorasocket import LoRaUDPServer
Test = collections.namedtuple('Test', ['payload', 'times'])
TestResult = collections.namedtuple('TestResult', ['decoded_data', 'lora_config', 'test'])
@ -151,15 +152,8 @@ class qa_testsuite():
"""
Determine installed test suites and setup socket server for receiving payloads decoded by gr-lora.
"""
# Variables
self.host = "127.0.0.1"
self.port = 40868
# Setup socket
self.server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((self.host, self.port))
self.server.settimeout(10)
# Setup UDP server to capture decoded data
self.server = LoRaUDPServer(ip="127.0.0.1", port=40868)
# Determine test suites directory if needed
if path is None:
@ -178,27 +172,6 @@ class qa_testsuite():
else:
print("No test suites found! Skipping...")
def __del__(self):
self.server.close()
def get_payloads(self, number):
"""
Returns array of <number> hexadecimal LoRa payload datagrams received on a socket.
"""
total_data = []
data = ''
for i in range(number):
try:
data = self.server.recvfrom(65535)[0]
if data:
total_data.append(binascii.hexlify(data))
except Exception as e:
print(e)
pass
return total_data
def run(self, suites_to_run, pause=False, write_output=True):
for test_suite in self.test_suites:
# Skip test suites that we don't want to run
@ -264,7 +237,7 @@ class qa_testsuite():
tb.start()
tb.wait()
decoded_data = self.get_payloads(times) # Output from the flowgraph
decoded_data = self.server.get_payloads(times) # Output from the flowgraph
summary.add(TestResult(decoded_data=decoded_data, lora_config=lora_config, test=test), print_intermediate=True)
# Finally, export the result for the suite
summary.export_summary(path=self.reports_directory, write_output=write_output)