wenet/benchmarking/test_demod.py

161 wiersze
5.7 KiB
Python

#!/usr/bin/env python
#
# Run a set of files through a processing and decode chain, and handle the output.
#
# Copyright (C) 2018 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
# Refer to the README.md in this directory for instructions on use.
#
import glob
import argparse
import os
import sys
import time
import traceback
import subprocess
# Dictionary of available processing types.
processing_type = {
# Wenet, RS232 modulation
# Convert to u8 using csdr, then pipe into fsk_demod, then drs232_ldpc.
# Count bytes at the output as a metric of performance.
'wenet_rs232_demod': {
'demod': '| csdr convert_f_u8 | ../rx/fsk_demod --cu8 -s --stats=100 2 921416 115177 - - 2> stats.txt | ',
'decode': '../rx/drs232_ldpc - - 2> /dev/null ',
"post_process" : " | wc -c", #
'files' : "./generated/wenet_sample_fs921416*.bin"
},
'wenet_rs232_demod_c16': {
'demod': '| csdr convert_f_s16 | ../rx/fsk_demod --cs16 -s --stats=100 2 921416 115177 - - 2> stats.txt | ',
'decode': '../rx/drs232_ldpc - - 2> /dev/null ',
"post_process" : " | wc -c", #
'files' : "./generated/wenet_sample_fs921416*.bin"
},
'wenet_i2s_demod': {
'demod': '| csdr convert_f_u8 | ../rx/fsk_demod --cu8 -s --stats=100 2 960000 96000 - - 2> stats.txt | ',
'decode': '../rx/wenet_ldpc - - 2> /dev/null ', #
"post_process" : " | wc -c", #
'files' : "./generated/wenet_sample_i2s_fs960000*.bin"
},
}
def run_analysis(mode, file_mask=None, shift=0.0, resample=1.0, verbose=False, log_output = None, dry_run = False, quick=False, show=False):
_mode = processing_type[mode]
# If we are not supplied with a file mask, use the defaults.
if file_mask is None:
file_mask = _mode['files']
# Get the list of files.
_file_list = glob.glob(file_mask)
if len(_file_list) == 0:
print("No files found matching supplied path.")
return
# Sort the list of files.
_file_list.sort()
# If we are only running a quick test, just process the last file in the list.
if quick:
_file_list = [_file_list[-1]]
_first = True
# Calculate the frequency offset to apply, if defined.
_shiftcmd = "| csdr shift_addition_cc %.5f 2>/dev/null" % (shift/96000.0)
_resamplecmd = f"| csdr convert_f_s16 | ./tsrc - - {resample:.5f} -c | csdr convert_s16_f "
if log_output is not None:
_log = open(log_output,'w')
# Iterate over the files in the supplied list.
for _file in _file_list:
# Generate the command to run.
_cmd = "cat %s "%_file
# Add in an optional frequency error if supplied.
if shift != 0.0:
_cmd += _shiftcmd
if resample != 1.0:
_cmd += _resamplecmd
# Add on the rest of the demodulation and decoding commands.
_cmd += _mode['demod'] + _mode['decode']
if args.show:
_cmd += " | head -n 10"
else:
_cmd += _mode['post_process']
if _first or dry_run:
print("Command: %s" % _cmd)
_first = False
if dry_run:
continue
# Run the command.
try:
_start = time.time()
_output = subprocess.check_output(_cmd, shell=True, stderr=None)
_output = _output.decode()
except:
#traceback.print_exc()
_output = "error"
_runtime = time.time() - _start
_result = "%s, %s, %.3f" % (os.path.basename(_file), _output.strip(), _runtime)
print(_result)
if log_output is not None:
_log.write(_result + '\n')
if verbose:
print("Runtime: %.1d" % _runtime)
if log_output is not None:
_log.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--mode", type=str, default="rs41_fsk_demod_soft", help="Operation mode.")
parser.add_argument("-f", "--files", type=str, default=None, help="Glob-path to files to run over.")
parser.add_argument("-v", "--verbose", action='store_true', default=False, help="Show additional debug info.")
parser.add_argument("-d", "--dry-run", action='store_true', default=False, help="Show additional debug info.")
parser.add_argument("--shift", type=float, default=0.0, help="Shift the signal-under test by x Hz. Default is 0.")
parser.add_argument("--resample", type=float, default=1.0, help="Resample. Default is 1 (no resampling).")
parser.add_argument("--batch", action='store_true', default=False, help="Run all tests, write results to results directory.")
parser.add_argument("--quick", action='store_true', default=False, help="Only process the last sample file in the list (usually the strongest). Useful for checking the demodulators are still working.")
parser.add_argument("--show", action='store_true', default=False, help="Show the first few lines of output, instead of running the post-processing step.")
args = parser.parse_args()
# Check the mode is valid.
if args.mode not in processing_type:
print("Error - invalid operating mode.")
print("Valid Modes: %s" % str(processing_type.keys()))
sys.exit(1)
batch_modes = []
if args.batch:
for _mode in batch_modes:
_log_name = "./results/" + _mode + ".txt"
run_analysis(_mode, file_mask=None, shift=args.shift, verbose=args.verbose, log_output=_log_name, dry_run=args.dry_run, quick=args.quick, show=args.show)
else:
run_analysis(args.mode, args.files, shift=args.shift, resample=args.resample, verbose=args.verbose, dry_run=args.dry_run, quick=args.quick, show=args.show)