diff --git a/NanoVNASaver/Hardware/NanoVNA.py b/NanoVNASaver/Hardware/NanoVNA.py index 3cf708f..929aded 100644 --- a/NanoVNASaver/Hardware/NanoVNA.py +++ b/NanoVNASaver/Hardware/NanoVNA.py @@ -18,7 +18,6 @@ # along with this program. If not, see . import logging import struct -from time import sleep, time from typing import List import serial @@ -26,7 +25,8 @@ import numpy as np from PyQt5 import QtGui from NanoVNASaver.Hardware.Serial import drain_serial, Interface -from NanoVNASaver.Hardware.VNA import VNA, Version +from NanoVNASaver.Hardware.VNA import VNA +from NanoVNASaver.Version import Version logger = logging.getLogger(__name__) diff --git a/NanoVNASaver/Hardware/NanoVNA_V2.py b/NanoVNASaver/Hardware/NanoVNA_V2.py index 7cc32fd..218e1d7 100644 --- a/NanoVNASaver/Hardware/NanoVNA_V2.py +++ b/NanoVNASaver/Hardware/NanoVNA_V2.py @@ -23,7 +23,8 @@ from time import sleep from typing import List from NanoVNASaver.Hardware.Serial import Interface -from NanoVNASaver.Hardware.VNA import VNA, Version +from NanoVNASaver.Hardware.VNA import VNA +from NanoVNASaver.Version import Version if platform.system() != 'Windows': import tty diff --git a/NanoVNASaver/Hardware/VNA.py b/NanoVNASaver/Hardware/VNA.py index 62574a1..ce62b77 100644 --- a/NanoVNASaver/Hardware/VNA.py +++ b/NanoVNASaver/Hardware/VNA.py @@ -23,7 +23,7 @@ from typing import List, Iterator from PyQt5 import QtGui -from NanoVNASaver.Settings import Version +from NanoVNASaver.Version import Version from NanoVNASaver.Hardware.Serial import Interface, drain_serial logger = logging.getLogger(__name__) diff --git a/NanoVNASaver/Settings/Bands.py b/NanoVNASaver/Settings/Bands.py index 305429b..a4a0aa1 100644 --- a/NanoVNASaver/Settings/Bands.py +++ b/NanoVNASaver/Settings/Bands.py @@ -17,7 +17,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging -import re import typing from typing import List, Tuple @@ -152,68 +151,3 @@ class BandsModel(QtCore.QAbstractTableModel): def setColor(self, color): self.color = color - - -class Version: - RXP = re.compile(r"""^ - \D* - (?P\d+)\. - (?P\d+)\. - (?P\d+) - (?P.*) - $""", re.VERBOSE) - - def __init__(self, vstring: str = "0.0.0"): - self.data = { - "major": 0, - "minor": 0, - "revision": 0, - "note": "", - } - try: - self.data = Version.RXP.search(vstring).groupdict() - for name in ("major", "minor", "revision"): - self.data[name] = int(self.data[name]) - except AttributeError: - logger.error("Unable to parse version: %s", vstring) - - def __gt__(self, other: "Version") -> bool: - l, r = self.data, other.data - for name in ("major", "minor", "revision"): - if l[name] > r[name]: - return True - if l[name] < r[name]: - return False - return False - - def __lt__(self, other: "Version") -> bool: - return other > self - - def __ge__(self, other: "Version") -> bool: - return self > other or self == other - - def __le__(self, other: "Version") -> bool: - return self < other or self == other - - def __eq__(self, other: "Version") -> bool: - return self.data == other.data - - def __str__(self) -> str: - return (f'{self.data["major"]}.{self.data["minor"]}' - f'.{self.data["revision"]}{self.data["note"]}') - - @property - def major(self) -> int: - return self.data["major"] - - @property - def minor(self) -> int: - return self.data["minor"] - - @property - def revision(self) -> int: - return self.data["revision"] - - @property - def note(self) -> str: - return self.data["note"] diff --git a/NanoVNASaver/Settings/Sweep.py b/NanoVNASaver/Settings/Sweep.py index 7a9ea44..1a4e018 100644 --- a/NanoVNASaver/Settings/Sweep.py +++ b/NanoVNASaver/Settings/Sweep.py @@ -18,43 +18,11 @@ # along with this program. If not, see . import logging from math import log -from time import sleep -from typing import Iterator, List, Tuple - -import numpy as np -from PyQt5 import QtCore, QtWidgets -from PyQt5.QtCore import pyqtSlot, pyqtSignal - -from NanoVNASaver.Calibration import correct_delay -from NanoVNASaver.RFTools import Datapoint +from typing import Iterator, Tuple logger = logging.getLogger(__name__) -def truncate(values: List[List[Tuple]], count: int) -> List[List[Tuple]]: - """truncate drops extrema from data list if averaging is active""" - keep = len(values) - count - logger.debug("Truncating from %d values to %d", len(values), keep) - if count < 1 or keep < 1: - logger.info("Not doing illegal truncate") - return values - truncated = [] - for valueset in np.swapaxes(values, 0, 1).tolist(): - avg = complex(*np.average(valueset, 0)) - truncated.append( - sorted(valueset, - key=lambda v, a=avg: - abs(a - complex(*v)))[:keep]) - return np.swapaxes(truncated, 0, 1).tolist() - - -class WorkerSignals(QtCore.QObject): - updated = pyqtSignal() - finished = pyqtSignal() - sweepError = pyqtSignal() - fatalSweepError = pyqtSignal() - - class Sweep(): def __init__(self, start: int = 3600000, end: int = 30000000, points: int = 101, segments: int = 1, @@ -116,285 +84,3 @@ class Sweep(): for _ in range(self.points): yield round(freq) freq += step - - -class SweepWorker(QtCore.QRunnable): - def __init__(self, app: QtWidgets.QWidget): - super().__init__() - logger.info("Initializing SweepWorker") - self.signals = WorkerSignals() - self.app = app - self.sweep = Sweep() - self.setAutoDelete(False) - self.percentage = 0 - self.data11: List[Datapoint] = [] - self.data21: List[Datapoint] = [] - self.rawData11: List[Datapoint] = [] - self.rawData21: List[Datapoint] = [] - self.init_data() - self.stopped = False - self.running = False - self.continuousSweep = False - self.averaging = False - self.averages = 3 - self.truncates = 0 - self.error_message = "" - self.offsetDelay = 0 - - @pyqtSlot() - def run(self): - try: - self._run() - except BaseException as exc: # pylint: disable=broad-except - logger.exception("%s", exc) - self.gui_error(f"ERROR during sweep\n\nStopped\n\n{exc}") - return - # raise exc - - def _run(self): - logger.info("Initializing SweepWorker") - self.running = True - self.percentage = 0 - if not self.app.vna.connected(): - logger.debug( - "Attempted to run without being connected to the NanoVNA") - self.running = False - return - try: - sweep = Sweep( - self.app.sweep_control.get_start(), - self.app.sweep_control.get_end(), - self.app.vna.datapoints, - self.app.sweep_control.get_segments(), - ) - except ValueError: - self.gui_error( - "Unable to parse frequency inputs" - " - check start and stop fields.") - return - - averages = 1 - if self.averaging: - logger.info("%d averages", self.averages) - averages = self.averages - - if sweep != self.sweep: # parameters changed - self.sweep = sweep - self.init_data() - - finished = False - while not finished: - for i in range(self.sweep.segments): - logger.debug("Sweep segment no %d", i) - if self.stopped: - logger.debug("Stopping sweeping as signalled") - finished = True - break - start, stop = self.sweep.get_index_range(i) - - try: - freq, values11, values21 = self.readAveragedSegment( - start, stop, averages) - self.percentage = (i + 1) * 100 / self.sweep.segments - self.updateData(freq, values11, values21, i) - except ValueError as e: - self.error_message = str(e) - self.stopped = True - self.running = False - self.signals.sweepError.emit() - - if not self.continuousSweep: - finished = True - - if self.sweep.segments > 1: - start = self.app.sweep_control.get_start() - end = self.app.sweep_control.get_end() - logger.debug("Resetting NanoVNA sweep to full range: %d to %d", - start, end) - self.app.vna.resetSweep(start, end) - - self.percentage = 100 - logger.debug('Sending "finished" signal') - self.signals.finished.emit() - self.running = False - - def init_data(self): - self.data11 = [] - self.data21 = [] - self.rawData11 = [] - self.rawData21 = [] - for freq in self.sweep.get_frequencies(): - self.data11.append(Datapoint(freq, 0.0, 0.0)) - self.data21.append(Datapoint(freq, 0.0, 0.0)) - self.rawData11.append(Datapoint(freq, 0.0, 0.0)) - self.rawData21.append(Datapoint(freq, 0.0, 0.0)) - logger.debug("Init data length: %s", len(self.data11)) - - def updateData(self, frequencies, values11, values21, index): - # Update the data from (i*101) to (i+1)*101 - logger.debug( - "Calculating data and inserting in existing data at index %d", - index) - offset = self.sweep.points * index - v11 = values11[:] - v21 = values21[:] - raw_data11 = [] - raw_data21 = [] - - for freq in frequencies: - real11, imag11 = v11.pop(0) - real21, imag21 = v21.pop(0) - raw_data11.append(Datapoint(freq, real11, imag11)) - raw_data21.append(Datapoint(freq, real21, imag21)) - - data11, data21 = self.applyCalibration(raw_data11, raw_data21) - logger.debug("update Freqs: %s, Offset: %s", len(frequencies), offset) - for i in range(len(frequencies)): - self.data11[offset + i] = data11[i] - self.data21[offset + i] = data21[i] - self.rawData11[offset + i] = raw_data11[i] - self.rawData21[offset + i] = raw_data21[i] - - logger.debug("Saving data to application (%d and %d points)", - len(self.data11), len(self.data21)) - self.app.saveData(self.data11, self.data21) - logger.debug('Sending "updated" signal') - self.signals.updated.emit() - - def applyCalibration(self, - raw_data11: List[Datapoint], - raw_data21: List[Datapoint] - ) -> Tuple[List[Datapoint], List[Datapoint]]: - if self.offsetDelay != 0: - tmp = [] - for dp in raw_data11: - tmp.append(correct_delay(dp, self.offsetDelay, reflect=True)) - raw_data11 = tmp - tmp = [] - for dp in raw_data21: - tmp.append(correct_delay(dp, self.offsetDelay)) - raw_data21 = tmp - - if not self.app.calibration.isCalculated: - return raw_data11, raw_data21 - - data11: List[Datapoint] = [] - data21: List[Datapoint] = [] - - if self.app.calibration.isValid1Port(): - for dp in raw_data11: - data11.append(self.app.calibration.correct11(dp)) - else: - data11 = raw_data11 - - if self.app.calibration.isValid2Port(): - for dp in raw_data21: - data21.append(self.app.calibration.correct21(dp)) - else: - data21 = raw_data21 - return data11, data21 - - def readAveragedSegment(self, start, stop, averages=1): - values11 = [] - values21 = [] - freq = [] - logger.info("Reading from %d to %d. Averaging %d values", - start, stop, averages) - for i in range(averages): - if self.stopped: - logger.debug("Stopping averaging as signalled") - break - logger.debug("Reading average no %d / %d", i+1, averages) - freq, tmp11, tmp21 = self.readSegment(start, stop) - values11.append(tmp11) - values21.append(tmp21) - self.percentage += 100 / (self.sweep.segments * averages) - self.signals.updated.emit() - - if self.truncates and averages > 1: - logger.debug("Truncating %d values by %d", - len(values11), self.truncates) - values11 = truncate(values11, self.truncates) - values21 = truncate(values21, self.truncates) - - logger.debug("Averaging %d values", len(values11)) - values11 = np.average(values11, 0).tolist() - values21 = np.average(values21, 0).tolist() - - return freq, values11, values21 - - def readSegment(self, start, stop): - logger.debug("Setting sweep range to %d to %d", start, stop) - self.app.vna.setSweep(start, stop) - - frequencies = self.app.vna.readFrequencies() - values11 = self.readData("data 0") - values21 = self.readData("data 1") - if (len(frequencies) != len(values11) or - len(frequencies) != len(values21)): - logger.info("No valid data during this run") - return [], [], [] - return frequencies, values11, values21 - - def readData(self, data): - logger.debug("Reading %s", data) - done = False - returndata = [] - count = 0 - while not done: - done = True - returndata = [] - tmpdata = self.app.vna.readValues(data) - logger.debug("Read %d values", len(tmpdata)) - for d in tmpdata: - a, b = d.split(" ") - try: - if self.app.vna.validateInput and ( - abs(float(a)) > 9.5 or - abs(float(b)) > 9.5): - logger.warning( - "Got a non plausible data value: (%s)", d) - done = False - break - returndata.append((float(a), float(b))) - except ValueError as exc: - logger.exception("An exception occurred reading %s: %s", - data, exc) - done = False - if not done: - logger.debug("Re-reading %s", data) - sleep(0.2) - count += 1 - if count == 5: - logger.error("Tried and failed to read %s %d times.", - data, count) - logger.debug("trying to reconnect") - self.app.vna.reconnect() - if count >= 10: - logger.critical( - "Tried and failed to read %s %d times. Giving up.", - data, count) - raise IOError( - f"Failed reading {data} {count} times.\n" - f"Data outside expected valid ranges," - f" or in an unexpected format.\n\n" - f"You can disable data validation on the" - f"device settings screen.") - return returndata - - def setContinuousSweep(self, continuous_sweep: bool): - self.continuousSweep = continuous_sweep - - def setAveraging(self, averaging: bool, averages: str, truncates: str): - self.averaging = averaging - try: - self.averages = int(averages) - self.truncates = int(truncates) - except ValueError: - return - - def gui_error(self, message: str): - self.error_message = message - self.stopped = True - self.running = False - self.signals.sweepError.emit() diff --git a/NanoVNASaver/Settings/__init__.py b/NanoVNASaver/Settings/__init__.py new file mode 100644 index 0000000..38cacea --- /dev/null +++ b/NanoVNASaver/Settings/__init__.py @@ -0,0 +1,2 @@ +from .Bands import BandsModel +from .Sweep import Sweep diff --git a/NanoVNASaver/SweepWorker.py b/NanoVNASaver/SweepWorker.py index 7a9ea44..c2f22b7 100644 --- a/NanoVNASaver/SweepWorker.py +++ b/NanoVNASaver/SweepWorker.py @@ -17,9 +17,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging -from math import log from time import sleep -from typing import Iterator, List, Tuple +from typing import List, Tuple import numpy as np from PyQt5 import QtCore, QtWidgets @@ -27,6 +26,7 @@ from PyQt5.QtCore import pyqtSlot, pyqtSignal from NanoVNASaver.Calibration import correct_delay from NanoVNASaver.RFTools import Datapoint +from NanoVNASaver.Settings import Sweep logger = logging.getLogger(__name__) @@ -55,69 +55,6 @@ class WorkerSignals(QtCore.QObject): fatalSweepError = pyqtSignal() -class Sweep(): - def __init__(self, start: int = 3600000, end: int = 30000000, - points: int = 101, segments: int = 1, - logarithmic: bool = False): - self.start = start - self.end = end - self.points = points - self.segments = segments - self.logarithmic = logarithmic - self.span = self.end - self.start - self.step = self.stepsize() - self.check() - - def __repr__(self) -> str: - return ( - f"Sweep({self.start}, {self.end}, {self.points}, {self.segments}," - f" {self.logarithmic})") - - def __eq__(self, other) -> bool: - return(self.start == other.start and - self.end == other.end and - self.points == other.points and - self.segments == other.segments) - - def check(self): - if not(self.segments > 0 and - self.points > 0 and - self.start > 0 and - self.end > 0 and - self.stepsize() >= 1): - raise ValueError(f"Illegal sweep settings: {self}") - - def stepsize(self) -> int: - return round(self.span / ((self.points -1) * self.segments)) - - def _exp_factor(self, index: int) -> int: - return 1 - log(self.segments + 1 - index) / log(self.segments + 1) - - def get_index_range(self, index: int) -> Tuple[int, int]: - if not self.logarithmic: - start = self.start + index * self.points * self.step - end = start + (self.points - 1) * self.step - else: - start = self.start + self.span * self._exp_factor(index) - end = self.start + self.span * self._exp_factor(index + 1) - logger.debug("get_index_range(%s) -> (%s, %s)", index, start, end) - return (start, end) - - - def get_frequencies(self) -> Iterator[int]: - if not self.logarithmic: - for freq in range(self.start, self.end + 1, self.step): - yield freq - return - for i in range(self.segments): - start, stop = self.get_index_range(i) - step = (stop - start) / self.points - freq = start - for _ in range(self.points): - yield round(freq) - freq += step - - class SweepWorker(QtCore.QRunnable): def __init__(self, app: QtWidgets.QWidget): super().__init__() diff --git a/NanoVNASaver/Settings/Version.py b/NanoVNASaver/Version.py similarity index 100% rename from NanoVNASaver/Settings/Version.py rename to NanoVNASaver/Version.py diff --git a/NanoVNASaver/Windows/About.py b/NanoVNASaver/Windows/About.py index c8fb505..40cb45f 100644 --- a/NanoVNASaver/Windows/About.py +++ b/NanoVNASaver/Windows/About.py @@ -23,7 +23,7 @@ from urllib import request, error from PyQt5 import QtWidgets, QtCore from NanoVNASaver.About import VERSION_URL, INFO_URL -from NanoVNASaver.Settings import Version +from NanoVNASaver.Version import Version logger = logging.getLogger(__name__) diff --git a/NanoVNASaver/Windows/SweepSettings.py b/NanoVNASaver/Windows/SweepSettings.py index a39e603..721324d 100644 --- a/NanoVNASaver/Windows/SweepSettings.py +++ b/NanoVNASaver/Windows/SweepSettings.py @@ -72,20 +72,24 @@ class SweepSettingsWindow(QtWidgets.QWidget): settings_layout.addRow("Number of measurements to average", self.averages) settings_layout.addRow("Number to discard", self.truncates) - settings_layout.addRow( - QtWidgets.QLabel( - "Averaging allows discarding outlying samples to get better averages.")) - settings_layout.addRow( - QtWidgets.QLabel("Common values are 3/0, 5/2, 9/4 and 25/6.")) - + label = QtWidgets.QLabel( + "Averaging allows discarding outlying samples to get better" + "averages. Common values are 3/0, 5/2, 9/4 and 25/6.\n") + label.setWordWrap(True) + settings_layout.addRow(label) + self.s21att = QtWidgets.QLineEdit("0") - - settings_layout.addRow(QtWidgets.QLabel("")) - settings_layout.addRow(QtWidgets.QLabel("Some times when you measure amplifiers you need to use an attenuator")) - settings_layout.addRow(QtWidgets.QLabel("in line with the S21 input (CH1) here you can specify it.")) + label = QtWidgets.QLabel( + "Some times when you measure amplifiers you need to use an" + " attenuator in line with the S21 input (CH1) here you can" + " specify it.") + label.setWordWrap(True) + settings_layout.addRow(label) settings_layout.addRow("Attenuator in port CH1 (s21) in dB", self.s21att) - settings_layout.addRow(QtWidgets.QLabel("Common values with un-un are 16.9 (49:1 2450) 9.54 (9:1 450)")) + + # settings_layout.addRow(QtWidgets.QLabel("Common values with un-un are 16.9 (49:1 2450) 9.54 (9:1 450)")) + self.continuous_sweep_radiobutton.toggled.connect( lambda: self.app.worker.setContinuousSweep( self.continuous_sweep_radiobutton.isChecked()))