Added example that can be run without GUI or example trace

pull/61/head
Pieter Robyns 2017-09-20 11:42:47 +02:00
rodzic 025faa8c41
commit 040be7eb48
3 zmienionych plików z 106 dodań i 2 usunięć

1
.gitignore vendored
Wyświetl plik

@ -8,3 +8,4 @@ dev/
apps/test-suites
*.sigmf-data
*.sigmf-meta
apps/test-trace.*

Wyświetl plik

@ -0,0 +1,103 @@
#!/usr/bin/env python2
# Gets a USRP capture trace from my research page and decodes it using gr-lora.
# Author: Pieter Robyns
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from optparse import OptionParser
from lora.loraconfig import LoRaConfig
from time import sleep
import lora
import argparse
import os
import requests
import json
class lora_receive_file_nogui(gr.top_block):
def __init__(self, sample_file, sample_rate, capture_freq, lc):
gr.top_block.__init__(self, "Lora Receive File, No GUI")
##################################################
# Variables
##################################################
self.internal_sample_rate = 1e6
##################################################
# Blocks
##################################################
self.message_socket_sink = lora.message_socket_sink('127.0.0.1', 40868, False, False)
self.lora_receiver = lora.lora_receiver(sample_rate, capture_freq, ([lc.freq]), lc.sf, self.internal_sample_rate, lc.implicit, lc.cr_num, lc.crc)
self.blocks_throttle = blocks.throttle(gr.sizeof_gr_complex, sample_rate, True)
self.blocks_file_source = blocks.file_source(gr.sizeof_gr_complex, sample_file, False)
##################################################
# Connections
##################################################
self.msg_connect((self.lora_receiver, 'frames'), (self.message_socket_sink, 'in'))
self.connect((self.blocks_file_source, 0), (self.blocks_throttle, 0))
self.connect((self.blocks_throttle, 0), (self.lora_receiver, 0))
def download_file(source, destination):
print("[+] Downloading %s -> %s" % (source, destination)),
try:
response = requests.get(source, stream=True)
with open(destination, 'wb') as f:
for chunk in response.iter_content(1000*1000):
f.write(chunk)
print("."),
print(".")
except Exception as e:
print("[-] " + str(e))
exit(1)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Gets a USRP capture trace from my research page and decodes it using gr-lora.")
parser.add_argument('--file', type=str, default="example-trace", help='File name of the example trace.')
parser.add_argument('--skip-download', default=False, action='store_true', help='Skip downloading the example trace.')
args = parser.parse_args()
data_file_path = os.path.join('./', args.file + '.sigmf-data')
meta_file_path = os.path.join('./', args.file + '.sigmf-meta')
if not args.skip_download and \
not (os.path.exists(data_file_path) and os.path.exists(meta_file_path)) and \
raw_input("[?] Download test LoRa signal to decode? [y/N] ").lower() == "y":
download_file("https://research.edm.uhasselt.be/probyns/lora/usrp-868.1-sf7-cr4-bw125-crc-0.sigmf-data", data_file_path)
download_file("https://research.edm.uhasselt.be/probyns/lora/usrp-868.1-sf7-cr4-bw125-crc-0.sigmf-meta", meta_file_path)
sleep(3)
else:
print("[+] Skipping download.")
if os.path.exists(data_file_path) and os.path.exists(meta_file_path):
metadata = json.load(open(meta_file_path, "r"))
meta_global = metadata["global"]
meta_capture = metadata["captures"][0]
# Parse metadata. Not using SigMF library because of extra dependency
sample_rate = meta_global["core:sample_rate"]
capture_freq = meta_capture["core:frequency"]
transmit_freq = meta_capture["lora:frequency"]
sf = meta_capture["lora:sf"]
cr = meta_capture["lora:cr"]
bw = meta_capture["lora:bw"]
prlen = meta_capture["lora:prlen"]
crc = meta_capture["lora:crc"]
implicit = meta_capture["lora:implicit"]
lora_config = LoRaConfig(transmit_freq, sf, cr, bw, prlen, crc, implicit)
print("[+] Configuration: %s" % lora_config.string_repr())
# Decode the data!
payload = meta_capture["test:expected"]
times = meta_capture["test:times"]
print("[+] Decoding. You should see a header, followed by '%s'%s %d times." % (payload, " and a CRC" if crc else "", times))
tb = lora_receive_file_nogui(data_file_path, sample_rate, capture_freq, lora_config)
tb.start()
tb.wait()
print("[+] Done")
exit(0)
else:
print("[-] Example trace or metadata missing! Exiting.")
exit(1)

Wyświetl plik

@ -3,14 +3,14 @@ class LoRaConfig():
self.freq = freq
self.sf = sf
self.cr = cr
self.cr_num = int(self.cr.rpartition('/')[2])-4
self.bw = bw
self.prlen = prlen
self.crc = crc
self.implicit = implicit
def file_repr(self):
cr_num = int(self.cr.rpartition('/')[2])-4
format_string = "{:n}-sf{:n}-cr{:n}-bw{:n}".format(self.freq/1000000.0, self.sf, cr_num, self.bw/1000.0)
format_string = "{:n}-sf{:n}-cr{:n}-bw{:n}".format(self.freq/1000000.0, self.sf, self.cr_num, self.bw/1000.0)
if self.crc:
format_string += "-crc"
if self.implicit: