kopia lustrzana https://github.com/NanoVNA-Saver/nanovna-saver
Split history NanoVNASaver/SweepWorker.py to NanoVNASaver/Settings/Sweep.py
commit
de7ec1be7d
|
@ -0,0 +1,400 @@
|
|||
# NanoVNASaver
|
||||
#
|
||||
# A python program to view and export Touchstone data from a NanoVNA
|
||||
# Copyright (C) 2019, 2020 Rune B. Broberg
|
||||
# Copyright (C) 2020 NanoVNA-Saver Authors
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
import logging
|
||||
from math import log
|
||||
from time import sleep
|
||||
from typing import Iterator, List, Tuple
|
||||
|
||||
import numpy as np
|
||||
from PyQt5 import QtCore, QtWidgets
|
||||
from PyQt5.QtCore import pyqtSlot, pyqtSignal
|
||||
|
||||
from NanoVNASaver.Calibration import correct_delay
|
||||
from NanoVNASaver.RFTools import Datapoint
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def truncate(values: List[List[Tuple]], count: int) -> List[List[Tuple]]:
|
||||
"""truncate drops extrema from data list if averaging is active"""
|
||||
keep = len(values) - count
|
||||
logger.debug("Truncating from %d values to %d", len(values), keep)
|
||||
if count < 1 or keep < 1:
|
||||
logger.info("Not doing illegal truncate")
|
||||
return values
|
||||
truncated = []
|
||||
for valueset in np.swapaxes(values, 0, 1).tolist():
|
||||
avg = complex(*np.average(valueset, 0))
|
||||
truncated.append(
|
||||
sorted(valueset,
|
||||
key=lambda v, a=avg:
|
||||
abs(a - complex(*v)))[:keep])
|
||||
return np.swapaxes(truncated, 0, 1).tolist()
|
||||
|
||||
|
||||
class WorkerSignals(QtCore.QObject):
|
||||
updated = pyqtSignal()
|
||||
finished = pyqtSignal()
|
||||
sweepError = pyqtSignal()
|
||||
fatalSweepError = pyqtSignal()
|
||||
|
||||
|
||||
class Sweep():
|
||||
def __init__(self, start: int = 3600000, end: int = 30000000,
|
||||
points: int = 101, segments: int = 1,
|
||||
logarithmic: bool = False):
|
||||
self.start = start
|
||||
self.end = end
|
||||
self.points = points
|
||||
self.segments = segments
|
||||
self.logarithmic = logarithmic
|
||||
self.span = self.end - self.start
|
||||
self.step = self.stepsize()
|
||||
self.check()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"Sweep({self.start}, {self.end}, {self.points}, {self.segments},"
|
||||
f" {self.logarithmic})")
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
return(self.start == other.start and
|
||||
self.end == other.end and
|
||||
self.points == other.points and
|
||||
self.segments == other.segments)
|
||||
|
||||
def check(self):
|
||||
if not(self.segments > 0 and
|
||||
self.points > 0 and
|
||||
self.start > 0 and
|
||||
self.end > 0 and
|
||||
self.stepsize() >= 1):
|
||||
raise ValueError(f"Illegal sweep settings: {self}")
|
||||
|
||||
def stepsize(self) -> int:
|
||||
return round(self.span / ((self.points -1) * self.segments))
|
||||
|
||||
def _exp_factor(self, index: int) -> int:
|
||||
return 1 - log(self.segments + 1 - index) / log(self.segments + 1)
|
||||
|
||||
def get_index_range(self, index: int) -> Tuple[int, int]:
|
||||
if not self.logarithmic:
|
||||
start = self.start + index * self.points * self.step
|
||||
end = start + (self.points - 1) * self.step
|
||||
else:
|
||||
start = self.start + self.span * self._exp_factor(index)
|
||||
end = self.start + self.span * self._exp_factor(index + 1)
|
||||
logger.debug("get_index_range(%s) -> (%s, %s)", index, start, end)
|
||||
return (start, end)
|
||||
|
||||
|
||||
def get_frequencies(self) -> Iterator[int]:
|
||||
if not self.logarithmic:
|
||||
for freq in range(self.start, self.end + 1, self.step):
|
||||
yield freq
|
||||
return
|
||||
for i in range(self.segments):
|
||||
start, stop = self.get_index_range(i)
|
||||
step = (stop - start) / self.points
|
||||
freq = start
|
||||
for _ in range(self.points):
|
||||
yield round(freq)
|
||||
freq += step
|
||||
|
||||
|
||||
class SweepWorker(QtCore.QRunnable):
|
||||
def __init__(self, app: QtWidgets.QWidget):
|
||||
super().__init__()
|
||||
logger.info("Initializing SweepWorker")
|
||||
self.signals = WorkerSignals()
|
||||
self.app = app
|
||||
self.sweep = Sweep()
|
||||
self.setAutoDelete(False)
|
||||
self.percentage = 0
|
||||
self.data11: List[Datapoint] = []
|
||||
self.data21: List[Datapoint] = []
|
||||
self.rawData11: List[Datapoint] = []
|
||||
self.rawData21: List[Datapoint] = []
|
||||
self.init_data()
|
||||
self.stopped = False
|
||||
self.running = False
|
||||
self.continuousSweep = False
|
||||
self.averaging = False
|
||||
self.averages = 3
|
||||
self.truncates = 0
|
||||
self.error_message = ""
|
||||
self.offsetDelay = 0
|
||||
|
||||
@pyqtSlot()
|
||||
def run(self):
|
||||
try:
|
||||
self._run()
|
||||
except BaseException as exc: # pylint: disable=broad-except
|
||||
logger.exception("%s", exc)
|
||||
self.gui_error(f"ERROR during sweep\n\nStopped\n\n{exc}")
|
||||
return
|
||||
# raise exc
|
||||
|
||||
def _run(self):
|
||||
logger.info("Initializing SweepWorker")
|
||||
self.running = True
|
||||
self.percentage = 0
|
||||
if not self.app.vna.connected():
|
||||
logger.debug(
|
||||
"Attempted to run without being connected to the NanoVNA")
|
||||
self.running = False
|
||||
return
|
||||
try:
|
||||
sweep = Sweep(
|
||||
self.app.sweep_control.get_start(),
|
||||
self.app.sweep_control.get_end(),
|
||||
self.app.vna.datapoints,
|
||||
self.app.sweep_control.get_segments(),
|
||||
)
|
||||
except ValueError:
|
||||
self.gui_error(
|
||||
"Unable to parse frequency inputs"
|
||||
" - check start and stop fields.")
|
||||
return
|
||||
|
||||
averages = 1
|
||||
if self.averaging:
|
||||
logger.info("%d averages", self.averages)
|
||||
averages = self.averages
|
||||
|
||||
if sweep != self.sweep: # parameters changed
|
||||
self.sweep = sweep
|
||||
self.init_data()
|
||||
|
||||
finished = False
|
||||
while not finished:
|
||||
for i in range(self.sweep.segments):
|
||||
logger.debug("Sweep segment no %d", i)
|
||||
if self.stopped:
|
||||
logger.debug("Stopping sweeping as signalled")
|
||||
finished = True
|
||||
break
|
||||
start, stop = self.sweep.get_index_range(i)
|
||||
|
||||
try:
|
||||
freq, values11, values21 = self.readAveragedSegment(
|
||||
start, stop, averages)
|
||||
self.percentage = (i + 1) * 100 / self.sweep.segments
|
||||
self.updateData(freq, values11, values21, i)
|
||||
except ValueError as e:
|
||||
self.error_message = str(e)
|
||||
self.stopped = True
|
||||
self.running = False
|
||||
self.signals.sweepError.emit()
|
||||
|
||||
if not self.continuousSweep:
|
||||
finished = True
|
||||
|
||||
if self.sweep.segments > 1:
|
||||
start = self.app.sweep_control.get_start()
|
||||
end = self.app.sweep_control.get_end()
|
||||
logger.debug("Resetting NanoVNA sweep to full range: %d to %d",
|
||||
start, end)
|
||||
self.app.vna.resetSweep(start, end)
|
||||
|
||||
self.percentage = 100
|
||||
logger.debug('Sending "finished" signal')
|
||||
self.signals.finished.emit()
|
||||
self.running = False
|
||||
|
||||
def init_data(self):
|
||||
self.data11 = []
|
||||
self.data21 = []
|
||||
self.rawData11 = []
|
||||
self.rawData21 = []
|
||||
for freq in self.sweep.get_frequencies():
|
||||
self.data11.append(Datapoint(freq, 0.0, 0.0))
|
||||
self.data21.append(Datapoint(freq, 0.0, 0.0))
|
||||
self.rawData11.append(Datapoint(freq, 0.0, 0.0))
|
||||
self.rawData21.append(Datapoint(freq, 0.0, 0.0))
|
||||
logger.debug("Init data length: %s", len(self.data11))
|
||||
|
||||
def updateData(self, frequencies, values11, values21, index):
|
||||
# Update the data from (i*101) to (i+1)*101
|
||||
logger.debug(
|
||||
"Calculating data and inserting in existing data at index %d",
|
||||
index)
|
||||
offset = self.sweep.points * index
|
||||
v11 = values11[:]
|
||||
v21 = values21[:]
|
||||
raw_data11 = []
|
||||
raw_data21 = []
|
||||
|
||||
for freq in frequencies:
|
||||
real11, imag11 = v11.pop(0)
|
||||
real21, imag21 = v21.pop(0)
|
||||
raw_data11.append(Datapoint(freq, real11, imag11))
|
||||
raw_data21.append(Datapoint(freq, real21, imag21))
|
||||
|
||||
data11, data21 = self.applyCalibration(raw_data11, raw_data21)
|
||||
logger.debug("update Freqs: %s, Offset: %s", len(frequencies), offset)
|
||||
for i in range(len(frequencies)):
|
||||
self.data11[offset + i] = data11[i]
|
||||
self.data21[offset + i] = data21[i]
|
||||
self.rawData11[offset + i] = raw_data11[i]
|
||||
self.rawData21[offset + i] = raw_data21[i]
|
||||
|
||||
logger.debug("Saving data to application (%d and %d points)",
|
||||
len(self.data11), len(self.data21))
|
||||
self.app.saveData(self.data11, self.data21)
|
||||
logger.debug('Sending "updated" signal')
|
||||
self.signals.updated.emit()
|
||||
|
||||
def applyCalibration(self,
|
||||
raw_data11: List[Datapoint],
|
||||
raw_data21: List[Datapoint]
|
||||
) -> Tuple[List[Datapoint], List[Datapoint]]:
|
||||
if self.offsetDelay != 0:
|
||||
tmp = []
|
||||
for dp in raw_data11:
|
||||
tmp.append(correct_delay(dp, self.offsetDelay, reflect=True))
|
||||
raw_data11 = tmp
|
||||
tmp = []
|
||||
for dp in raw_data21:
|
||||
tmp.append(correct_delay(dp, self.offsetDelay))
|
||||
raw_data21 = tmp
|
||||
|
||||
if not self.app.calibration.isCalculated:
|
||||
return raw_data11, raw_data21
|
||||
|
||||
data11: List[Datapoint] = []
|
||||
data21: List[Datapoint] = []
|
||||
|
||||
if self.app.calibration.isValid1Port():
|
||||
for dp in raw_data11:
|
||||
data11.append(self.app.calibration.correct11(dp))
|
||||
else:
|
||||
data11 = raw_data11
|
||||
|
||||
if self.app.calibration.isValid2Port():
|
||||
for dp in raw_data21:
|
||||
data21.append(self.app.calibration.correct21(dp))
|
||||
else:
|
||||
data21 = raw_data21
|
||||
return data11, data21
|
||||
|
||||
def readAveragedSegment(self, start, stop, averages=1):
|
||||
values11 = []
|
||||
values21 = []
|
||||
freq = []
|
||||
logger.info("Reading from %d to %d. Averaging %d values",
|
||||
start, stop, averages)
|
||||
for i in range(averages):
|
||||
if self.stopped:
|
||||
logger.debug("Stopping averaging as signalled")
|
||||
break
|
||||
logger.debug("Reading average no %d / %d", i+1, averages)
|
||||
freq, tmp11, tmp21 = self.readSegment(start, stop)
|
||||
values11.append(tmp11)
|
||||
values21.append(tmp21)
|
||||
self.percentage += 100 / (self.sweep.segments * averages)
|
||||
self.signals.updated.emit()
|
||||
|
||||
if self.truncates and averages > 1:
|
||||
logger.debug("Truncating %d values by %d",
|
||||
len(values11), self.truncates)
|
||||
values11 = truncate(values11, self.truncates)
|
||||
values21 = truncate(values21, self.truncates)
|
||||
|
||||
logger.debug("Averaging %d values", len(values11))
|
||||
values11 = np.average(values11, 0).tolist()
|
||||
values21 = np.average(values21, 0).tolist()
|
||||
|
||||
return freq, values11, values21
|
||||
|
||||
def readSegment(self, start, stop):
|
||||
logger.debug("Setting sweep range to %d to %d", start, stop)
|
||||
self.app.vna.setSweep(start, stop)
|
||||
|
||||
frequencies = self.app.vna.readFrequencies()
|
||||
values11 = self.readData("data 0")
|
||||
values21 = self.readData("data 1")
|
||||
if (len(frequencies) != len(values11) or
|
||||
len(frequencies) != len(values21)):
|
||||
logger.info("No valid data during this run")
|
||||
return [], [], []
|
||||
return frequencies, values11, values21
|
||||
|
||||
def readData(self, data):
|
||||
logger.debug("Reading %s", data)
|
||||
done = False
|
||||
returndata = []
|
||||
count = 0
|
||||
while not done:
|
||||
done = True
|
||||
returndata = []
|
||||
tmpdata = self.app.vna.readValues(data)
|
||||
logger.debug("Read %d values", len(tmpdata))
|
||||
for d in tmpdata:
|
||||
a, b = d.split(" ")
|
||||
try:
|
||||
if self.app.vna.validateInput and (
|
||||
abs(float(a)) > 9.5 or
|
||||
abs(float(b)) > 9.5):
|
||||
logger.warning(
|
||||
"Got a non plausible data value: (%s)", d)
|
||||
done = False
|
||||
break
|
||||
returndata.append((float(a), float(b)))
|
||||
except ValueError as exc:
|
||||
logger.exception("An exception occurred reading %s: %s",
|
||||
data, exc)
|
||||
done = False
|
||||
if not done:
|
||||
logger.debug("Re-reading %s", data)
|
||||
sleep(0.2)
|
||||
count += 1
|
||||
if count == 5:
|
||||
logger.error("Tried and failed to read %s %d times.",
|
||||
data, count)
|
||||
logger.debug("trying to reconnect")
|
||||
self.app.vna.reconnect()
|
||||
if count >= 10:
|
||||
logger.critical(
|
||||
"Tried and failed to read %s %d times. Giving up.",
|
||||
data, count)
|
||||
raise IOError(
|
||||
f"Failed reading {data} {count} times.\n"
|
||||
f"Data outside expected valid ranges,"
|
||||
f" or in an unexpected format.\n\n"
|
||||
f"You can disable data validation on the"
|
||||
f"device settings screen.")
|
||||
return returndata
|
||||
|
||||
def setContinuousSweep(self, continuous_sweep: bool):
|
||||
self.continuousSweep = continuous_sweep
|
||||
|
||||
def setAveraging(self, averaging: bool, averages: str, truncates: str):
|
||||
self.averaging = averaging
|
||||
try:
|
||||
self.averages = int(averages)
|
||||
self.truncates = int(truncates)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
def gui_error(self, message: str):
|
||||
self.error_message = message
|
||||
self.stopped = True
|
||||
self.running = False
|
||||
self.signals.sweepError.emit()
|
Ładowanie…
Reference in New Issue