gr-lora/python/qa_BasicTest.py

632 wiersze
24 KiB
Python

import lora, socket, pmt, time
import collections
from gnuradio import gr, gr_unittest, blocks
from gnuradio.blocks import socket_pdu
TestResultData = collections.namedtuple('TestResultData', ['passing', 'total', 'rate'])
TestSerieSettings = collections.namedtuple('TestSerieSettings', ['description', 'SpreadingFactor'])
class qa_BasicTest (gr_unittest.TestCase):
#
# Listen on socket for data, append to list if any and return list of captured data.
#
def gatherFromSocket(self):
## or in self.server.settimeout(0.5) ?
# try:
# data = self.server.recv(8192)
# if data:
# total_data.append(data)
# except:
# pass
timeout = 0.5
total_data = []
data = ''
begin = time.time()
while 1:
if total_data and time.time()-begin > timeout:
break
elif time.time()-begin > timeout*2:
break
try:
data = self.server.recv(8192)
if data:
total_data.append(data)
begin = time.time()
else:
time.sleep(0.1)
except:
pass
return total_data
#
# Compare captured data list with the expected data list.
# Print received data, expected data and if they match.
# Also give feedback about successrate in the decoding (percentage correctly captured and decoded).
#
def compareDataSets(self, total_data, expected_data):
total_passing = 0.0
print '\nReceived:'
for idx, val in enumerate(expected_data):
data_str = '?'
passed = True
try:
data_str = " ".join("{:02x}".format(ord(n)) for n in total_data[idx])
self.assertEqual(val, data_str)
except:
passed = False
if passed:
total_passing += 1
print "{0: 3d} :: match:\n {1:s}".format(idx, val)
else:
print "{0: 3d} :: mismatch:\n {1:s}".format(idx, data_str)
print "should be:\n {0:s}".format(expected_data[idx])
results = TestResultData(total_passing, len(expected_data), float(total_passing) / len(expected_data) * 100.0)
testResults[nextSeriesSetting].append(results)
print ("\nPassed rate: {0:d} out of {1:d} ({2:.2f}%)\n"
.format(results[0], results[1], results[2]))
#
# Connect a filesource with the given file and run the flowgraph.
# File is acquired from the file_sink block and saved as .cfile (complex IQ data).
#
def runWithFileSourceAndData(self, file, expected_data):
print "Starting test from data in: %s\n" % (file)
file_source = blocks.file_source(gr.sizeof_gr_complex*1, file, False) # Repeat input: True/False
self.tb.connect((file_source, 0), (self.blocks_throttle_0, 0))
self.tb.run ()
self.tb.disconnect((file_source, 0), (self.blocks_throttle_0, 0))
total_data = self.gatherFromSocket()
self.compareDataSets(total_data, expected_data)
#
# Set up flowgraph before Unit Test. (Gets threrefore called before EVERY test)
#
# 1. Set variables for various blocks (spreading factor from test_series settings)
# 2. Make UDP socket server to listen for outputted messages
# 3. Connect blocks: (file_source) -> throttle -> lora_receiver -> message_socket_sink
#
def setUp (self):
##################################################
# Variables #
##################################################
self.target_freq = 868.1e6
self.sf = test_series[nextSeriesSetting][1] # 7
self.samp_rate = 1e6
self.capture_freq = 868.0e6
#self.bw = 125e3
#self.symbols_per_sec = self.bw / (2**self.sf)
self.offset = -(self.capture_freq - self.target_freq)
#self.bitrate = self.sf * (1 / (2**self.sf / self.bw ))
# Socket connection for sink
self.host = "127.0.0.1"
self.port = 40868
self.server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((self.host, self.port))
self.server.setblocking(0) ## or self.server.settimeout(0.5) ?
self.lastTestComplete = False
##################################################
# Blocks #
##################################################
self.tb = gr.top_block ()
self.lora_lora_receiver_0 = lora.lora_receiver(self.samp_rate, self.capture_freq, self.offset, self.sf, self.samp_rate)
self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex*1, self.samp_rate, True)
self.blocks_message_socket_sink_0 = lora.message_socket_sink()
self.tb.connect( (self.blocks_throttle_0, 0), (self.lora_lora_receiver_0, 0))
self.tb.msg_connect( (self.lora_lora_receiver_0, 'frames'), (self.blocks_message_socket_sink_0, 'in'))
#
# Clean-up flowgraph after Unit Test. (Gets threrefore called after EVERY test)
#
# If last test was completed, report results.
#
def tearDown (self):
self.tb = None
self.server.close()
if self.lastTestComplete:
passed_t_a = 0
passed_t = 0
total_t_a = 0
total_t = 0
print ("-" * 18) + " Test Results " + ("-" * 19)
for j, serie in enumerate(testResults):
total_t = passed_t = 0
print ("Test serie {0: 3d}: {1:5s} {2:5s} {3:5s} {4:5s} {5:5s}"
.format(j, *test_series[j][0].split("_")))
for i, x in enumerate(serie):
passed_t += x[0]
total_t += x[1]
print (" Test {0: 4d} passing rate: {1: 3d} out of {2: 3d} ({3:6.2f}%)"
.format(i, x[0], x[1], x[2]))
passed_t_a += passed_t
total_t_a += total_t
print (" => Total passed: {0:d} out of {1:d} ({2:.2f}%)\n"
.format(passed_t, total_t, float(passed_t) / total_t * 100.0))
print ("\n ====== Total passed: {0:d} out of {1:d} ({2:.2f}%) ======"
.format(passed_t_a, total_t_a, float(passed_t_a) / total_t_a * 100.0))
###############################################################################################
# Unit tests #
# These assume a directory "lora-samples" with the specified .cfiles existing. #
###############################################################################################
###############################################################################################
# Unit tests series 0 #
# cr4-5_bw125_sf7_crc1_pwr1 #
###############################################################################################
#
# Unit test 1
#
def test_001 (self):
expected_data = [
'71 1b 09 12 12 12 12 12 12 12 10 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12',
'71 1b 09 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88 88',
'71 1b 09 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22'
]
self.runWithFileSourceAndData('../../examples/lora-samples/usrpcr5.cfile', expected_data)
nextSeriesSetting = 1
testResults.append([])
###############################################################################################
# Unit tests series 1 #
# cr4-5_bw125_sf7_crc1_pwr1 #
###############################################################################################
#
# Unit test 2
#
def test_002 (self):
expected_data = ["80 0b 01 01 23 45 67 89 ab cd ef"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_000.cfile', expected_data)
# self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_000.cfile', expected_data)
# ...
#
# Unit test 3
#
def test_003 (self):
expected_data = ["30 0b 02 11 11 11"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_001.cfile', expected_data)
#
# Unit test 4
#
def test_004 (self):
expected_data = ["30 0b 02 11 11 11"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_002.cfile', expected_data)
#
# Unit test 5
#
def test_005 (self):
expected_data = ["40 0b 07 aa aa aa aa"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_003.cfile', expected_data)
#
# Unit test 6
#
def test_006 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_004.cfile', expected_data)
#
# Unit test 7
#
def test_007 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_005.cfile', expected_data)
#
# Unit test 8
#
def test_008 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_006.cfile', expected_data)
#
# Unit test 9
#
def test_009 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_007.cfile', expected_data)
#
# Unit test 10
#
def test_010 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_008.cfile', expected_data)
#
# Unit test 11
#
def test_011 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_009.cfile', expected_data)
#
# Unit test 12
#
def test_012 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf7_crc1_pwr1_010.cfile', expected_data)
nextSeriesSetting = 2
testResults.append([])
###############################################################################################
# Unit tests series 2 #
# cr4-5_bw125_sf12_crc1_pwr1 #
###############################################################################################
#
# Unit test 13
#
def test_013 (self):
expected_data = ["80 0b 01 01 23 45 67 89 ab cd ef"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_000.cfile', expected_data)
#
# Unit test 14
#
def test_014 (self):
expected_data = ["30 0b 02 11 11 11"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_001.cfile', expected_data)
#
# Unit test 15
#
def test_015 (self):
expected_data = ["30 0b 02 11 11 11"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_002.cfile', expected_data)
#
# Unit test 16
#
def test_016 (self):
expected_data = ["40 0b 07 aa aa aa aa"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_003.cfile', expected_data)
#
# Unit test 17
#
def test_017 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_004.cfile', expected_data)
#
# Unit test 18
#
def test_018 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_005.cfile', expected_data)
#
# Unit test 19
#
def test_019 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_006.cfile', expected_data)
#
# Unit test 20
#
def test_020 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_007.cfile', expected_data)
#
# Unit test 21
#
def test_021 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_008.cfile', expected_data)
#
# Unit test 22
#
def test_022 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_009.cfile', expected_data)
#
# Unit test 23
#
def test_023 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf12_crc1_pwr1_010.cfile', expected_data)
nextSeriesSetting = 3
testResults.append([])
###############################################################################################
# Unit tests series 3 #
# cr4-5_bw125_sf8_crc1_pwr1 #
###############################################################################################
#
# Unit test 24
#
def test_024 (self):
expected_data = ["80 0b 01 01 23 45 67 89 ab cd ef"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_000.cfile', expected_data)
#
# Unit test 25
#
def test_025 (self):
expected_data = ["30 0b 02 11 11 11"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_001.cfile', expected_data)
#
# Unit test 26
#
def test_026 (self):
expected_data = ["30 0b 02 11 11 11"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_002.cfile', expected_data)
#
# Unit test 27
#
def test_027 (self):
expected_data = ["40 0b 07 aa aa aa aa"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_003.cfile', expected_data)
#
# Unit test 28
#
def test_028 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_004.cfile', expected_data)
#
# Unit test 29
#
def test_029 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_005.cfile', expected_data)
#
# Unit test 30
#
def test_030 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_006.cfile', expected_data)
#
# Unit test 31
#
def test_031 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_007.cfile', expected_data)
#
# Unit test 32
#
def test_032 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_008.cfile', expected_data)
#
# Unit test 33
#
def test_033 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_009.cfile', expected_data)
#
# Unit test 34
#
def test_034 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-5_bw125_sf8_crc1_pwr1_010.cfile', expected_data)
nextSeriesSetting = 4
testResults.append([])
###############################################################################################
# Unit tests series 4 #
# cr4-7_bw125_sf7_crc1_pwr1 #
###############################################################################################
#
# Unit test 35
#
def test_035 (self):
expected_data = ["80 0b 01 01 23 45 67 89 ab cd ef"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_000.cfile', expected_data)
#
# Unit test 36
#
def test_036 (self):
expected_data = ["30 0b 02 11 11 11"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_001.cfile', expected_data)
#
# Unit test 37
#
def test_037 (self):
expected_data = ["30 0b 02 11 11 11"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_002.cfile', expected_data)
#
# Unit test 38
#
def test_038 (self):
expected_data = ["40 0b 07 aa aa aa aa"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_003.cfile', expected_data)
#
# Unit test 39
#
def test_039 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_004.cfile', expected_data)
#
# Unit test 40
#
def test_040 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_005.cfile', expected_data)
#
# Unit test 41
#
def test_041 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_006.cfile', expected_data)
#
# Unit test 42
#
def test_042 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_007.cfile', expected_data)
#
# Unit test 43
#
def test_043 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_008.cfile', expected_data)
#
# Unit test 44
#
def test_044 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_009.cfile', expected_data)
#
# Unit test 45
#
def test_045 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-7_bw125_sf7_crc1_pwr1_010.cfile', expected_data)
nextSeriesSetting = 5
testResults.append([])
###############################################################################################
# Unit tests series 5 #
# cr4-6_bw125_sf6_crc1_pwr1 #
###############################################################################################
#
# Unit test 46
#
def test_046 (self):
expected_data = ["80 0b 01 01 23 45 67 89 ab cd ef"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_000.cfile', expected_data)
#
# Unit test 47
#
def test_047 (self):
expected_data = ["30 0b 02 11 11 11"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_001.cfile', expected_data)
#
# Unit test 48
#
def test_048 (self):
expected_data = ["30 0b 02 11 11 11"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_002.cfile', expected_data)
#
# Unit test 49
#
def test_049 (self):
expected_data = ["40 0b 07 aa aa aa aa"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_003.cfile', expected_data)
#
# Unit test 50
#
def test_050 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_004.cfile', expected_data)
#
# Unit test 51
#
def test_051 (self):
expected_data = ["40 0b 07 ff ff ff ff"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_005.cfile', expected_data)
#
# Unit test 52
#
def test_052 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 3
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_006.cfile', expected_data)
#
# Unit test 53
#
def test_053 (self):
expected_data = ["40 0b 07 55 55 55 55"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_007.cfile', expected_data)
#
# Unit test 54
#
def test_054 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 1
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_008.cfile', expected_data)
#
# Unit test 55
#
def test_055 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 5
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_009.cfile', expected_data)
#
# Unit test 56
#
def test_056 (self):
expected_data = ["40 0b 07 88 88 88 88"] * 10
self.runWithFileSourceAndData('../../examples/lora-samples/hackrf_cr4-6_bw125_sf6_crc1_pwr1_010.cfile', expected_data)
self.lastTestComplete = True
# nextSeriesSetting = 6
# testResults.append([])
if __name__ == '__main__':
testResults = [ [] ]
nextSeriesSetting = 0
test_series = [ TestSerieSettings("cr4-5_bw125_sf7_crc1_pwr1", 7),
TestSerieSettings("cr4-5_bw125_sf7_crc1_pwr1", 7),
TestSerieSettings("cr4-5_bw125_sf12_crc1_pwr1", 12),
TestSerieSettings("cr4-5_bw125_sf8_crc1_pwr1", 8),
TestSerieSettings("cr4-7_bw125_sf7_crc1_pwr1", 7),
TestSerieSettings("cr4-6_bw125_sf6_crc1_pwr1", 6)
]
# Remove second argument, otherwise this would cause each test to run twice
# (once to console and once to xml, both outputs are printed in the console though)
gr_unittest.run(qa_BasicTest) # , "qa_BasicTest.xml")