Merge pull request #261 from NanoVNA-Saver/Development

v0.3.6
pull/280/head v0.3.6
Holger Müller 2020-07-17 11:47:35 +02:00 zatwierdzone przez GitHub
commit 85e1374d0d
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
40 zmienionych plików z 1229 dodań i 1446 usunięć

1
.gitignore vendored
Wyświetl plik

@ -1,4 +1,5 @@
/venv/
/env/
.idea
.vscode
/build/

Wyświetl plik

@ -1,3 +1,23 @@
v0.3.6
======
- Implemented bandwidth setting in device management
v0.3.5
======
- Sweep worker now initializes full dataset on setting changes.
Therefore no resize of charts when doing multi segment sweep
- Changing datapoints in DeviceSettings are reflected in SweepSettings widget step size
- Simplified calibration code by just using scipy.interp1d with fill\_value
- Established Interface class to ease locking and allow non usb connections in future
- Cleaned up VNA code. Added some pause statements to get more robust readings
- Added MagLoopAnalysis
- Touchstone class can now generate interpolated Datapoints for a given frequency
Will be usefull in future analysis code
- Fixed a bug in Version comparison
v0.3.4
======

Wyświetl plik

@ -17,7 +17,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
VERSION = "0.3.4"
VERSION = "0.3.6"
VERSION_URL = (
"https://raw.githubusercontent.com/"
"NanoVNA-Saver/nanovna-saver/master/NanoVNASaver/About.py")

Wyświetl plik

@ -1,271 +1,64 @@
# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2020 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Created on 30 giu 2020
Created on May 30th 2020
@author: mauro
'''
from PyQt5 import QtWidgets, QtTest
import logging
import math
from NanoVNASaver.Analysis import Analysis
from PyQt5 import QtWidgets
from NanoVNASaver.Hardware import VNA
import numpy as np
from NanoVNASaver.Marker import Marker
from NanoVNASaver import RFTools
from NanoVNASaver.Analysis.VSWRAnalysis import VSWRAnalysis
from NanoVNASaver.Formatting import format_frequency_sweep
logger = logging.getLogger(__name__)
class Antenna(object):
@staticmethod
def group_consecutives(vals, step=1):
"""
https://stackoverflow.com/questions/7352684/how-to-find-the-groups-of-consecutive-elements-from-an-array-in-numpy
Return list of consecutive lists of numbers from vals (number list).
:param vals:
:param step:
"""
run = []
result = [run]
expect = None
for v in vals:
if (v == expect) or (expect is None):
run.append(v)
else:
run = [v]
result.append(run)
expect = v + step
return result
@classmethod
def analyze(cls, frequencies, values, FIELD_NAME, step=5000, vuoto=False):
'''
dati dati, prova a trovare minimi e bands passanti
:param cls:
'''
if FIELD_NAME == "rl":
BAND_THRESHOLD = -7.0
MIN_CALCOLO_Q = -27
elif FIELD_NAME == "vswr":
BAND_THRESHOLD = 2.62
MIN_CALCOLO_Q = 1.1
else:
raise ValueError("unknown threshold for {}".format(FIELD_NAME))
bands_raw = np.where(values < BAND_THRESHOLD)[0]
# raggruppo posizioni in cui il valore è sotto la soglia
bands = cls.group_consecutives(bands_raw)
# print("raggruppate in ", bands)
out = []
# print "bands", bands
banda_dict = None
for band in bands:
if band:
print("band ", band)
fmin = frequencies[band[0]]
fmax = frequencies[band[-1]]
estensione = fmax - fmin
x = np.argmin(values[band[0]:band[-1] + 1])
prog = x + band[0]
min_val = values[prog]
if banda_dict:
salto = fmin - banda_dict["fmax"]
if salto < (10 * step):
logger.warning("unisco band e proseguo")
if min_val < banda_dict["min"]:
logger.debug("aggiusto nuovo minimo, da %s a %s",
banda_dict["min"], min_val)
banda_dict["min"] = min_val
# invalido eventuale Q e band passante ?!?
banda_dict["q"] = None
banda_dict["banda_passante"] = None
banda_dict["fmax"] = fmax
# non servono ulteriori elaborazioni
continue
else:
logger.warning("finalizzo band precedente")
out.append(banda_dict)
banda_dict = None
# se sono qui è nuova
if estensione == 0 and vuoto:
logger.warning("ritorno minima estensione")
banda_dict = {"fmin": fmin - 30 * step,
"fmax": fmin + 30 * step,
"banda_passante": None,
"q": None,
"min": min_val,
"freq": fmin,
"prog": prog,
}
else:
logger.warning("Nuova band")
if min_val <= MIN_CALCOLO_Q:
# FIXME: verificare che ci siano valori >
# BAND_THRESHOLD?!?
q = np.sqrt(fmax * fmin) / (fmax - fmin)
logger.info("Q=%s", q)
else:
logger.info(
"non calcolo Q perchè minimo %s non è abbastanza", min_val)
q = None
banda_dict = {"fmin": fmin,
"fmax": fmax,
"banda_passante": fmax - fmin,
"q": q,
"min": min_val,
"freq": frequencies[prog],
"prog": prog,
}
if banda_dict:
out.append(banda_dict)
return out
class ChartFactory(object):
@classmethod
def NewChart(cls, chart_class, name, app):
from NanoVNASaver.NanoVNASaver import BandsModel
new_chart = chart_class(name)
new_chart.isPopout = True
new_chart.data = app.data
new_chart.bands = BandsModel()
i=0
default_color = app.default_marker_colors[i]
color = app.settings.value("Marker" + str(i+1) + "Color", default_color)
marker = Marker("Marker " + str(i+1), color)
marker.isMouseControlledRadioButton.setChecked(True)
new_chart.setMarkers([marker])
return new_chart
class MinVswrAnalysis(Antenna, Analysis):
def __init__(self, app):
super().__init__(app)
self._widget = QtWidgets.QWidget()
def runAnalysis(self):
self.reset()
if len(self.app.data) == 0:
logger.debug("No data to analyse")
self.result_label.setText("No data to analyse.")
return
frequencies = []
values = []
for p in self.app.data:
frequencies.append(p.freq)
vswr = p.vswr
values.append(vswr)
res = self.analyze(np.array(frequencies),
np.array(values),
"vswr")
marker = 0
for banda in res:
if marker < 3:
self.app.markers[marker].setFrequency(
str(round(banda["freq"])))
marker += 1
print("min {min} a {freq}".format(**banda))
# Charts
progr = 0
for c in self.app.subscribing_charts:
if c.name == "S11 VSWR":
new_chart = c.copy()
new_chart.isPopout = True
new_chart.show()
new_chart.setWindowTitle("%s %s" % (new_chart.name, progr))
vna = self.app.vna
if isinstance(vna, InvalidVNA):
logger.warning("end analysis, non valid vna")
else:
logger.warning("band zoom")
for banda in res:
progr += 1
# scan
self.app.sweepStartInput.setText(str(banda["fmin"]))
self.app.sweepEndInput.setText(str(banda["fmax"]))
self.app.sweep()
while not self.app.btnSweep.isEnabled():
QtTest.QTest.qWait(500)
for c in self.app.subscribing_charts:
if c.name == "S11 VSWR":
new_chart = c.copy()
new_chart.isPopout = True
new_chart.show()
new_chart.setWindowTitle("%s %s" % (new_chart.name, progr))
class ZeroCrossAnalysis(Antenna, Analysis):
def __init__(self, app):
super().__init__(app)
self._widget = QtWidgets.QWidget()
def runAnalysis(self):
self.reset()
if len(self.app.data) == 0:
logger.debug("No data to analyse")
self.result_label.setText("No data to analyse.")
return
frequencies = []
values = []
for p in self.app.data:
frequencies.append(p.freq)
values.append(p.z.imag)
zero_crossings = np.where(np.diff(np.sign(np.array(values))))[0]
marker = 0
for pos in zero_crossings:
freq = round(frequencies[pos])
if marker < 3:
self.app.markers[marker].setFrequency(
str(freq))
marker += 1
print("cross at {}".format(freq))
class MagLoopAnalysis(VSWRAnalysis):
'''
Find min vswr and change sweep to zoom.
Useful for tuning magloop.
'''
max_dips_shown = 1
vswr_limit_value = 2.56
bandwith = 250000
def runAnalysis(self):
super().runAnalysis()
for m in self.minimums:
start, lowest, end = m
if start != end:
Q = self.app.data[lowest].freq/(self.app.data[end].freq-self.app.data[start].freq)
self.layout.addRow("Q",QtWidgets.QLabel("{}".format(int(Q))))
self.app.sweepStartInput.setText(self.app.data[start].freq)
self.app.sweepEndInput.setText(self.app.data[end].freq)
# self.app.sweepEndInput.textEdited.emit(self.app.sweepEndInput.text())
if len(self.minimums) > 1:
self.layout.addRow("", QtWidgets.QLabel(
"Not magloop or try to lower VSWR limit"))
for m in self.minimums[:1]:
# only one time
start, lowest, end = m
if start != end:
Q = self.app.data11[lowest].freq / \
(self.app.data11[end].freq - self.app.data11[start].freq)
self.layout.addRow(
"Q", QtWidgets.QLabel("{}".format(int(Q))))
self.app.sweep_control.set_start(self.app.data11[start].freq)
self.app.sweep_control.set_end(self.app.data11[end].freq)
else:
self.app.sweep_control.set_start(
self.app.data11[start].freq - self.bandwith)
self.app.sweep_control.set_end(
self.app.data11[end].freq + self.bandwith)

Wyświetl plik

@ -161,7 +161,7 @@ class BandPassAnalysis(Analysis):
peak_db = db
peak_location = i
logger.debug("Found peak of %f at %d", peak_db, self.app.data[peak_location].freq)
logger.debug("Found peak of %f at %d", peak_db, self.app.data11[peak_location].freq)
lower_cutoff_location = -1
pass_band_db = peak_db
@ -355,8 +355,8 @@ class BandPassAnalysis(Analysis):
if upper_cutoff_gain < -4 or lower_cutoff_gain < -4:
self.result_label.setText(
f"Analysis complete ({len(self.app.data)} points)\n"
f"Analysis complete ({len(self.app.data11)} points)\n"
f"Insufficient data for analysis. Increase segment count.")
else:
self.result_label.setText(
f"Analysis complete ({len(self.app.data)} points)")
f"Analysis complete ({len(self.app.data11)} points)")

Wyświetl plik

@ -114,7 +114,7 @@ class BandStopAnalysis(Analysis):
peak_db = db
peak_location = i
logger.debug("Found peak of %f at %d", peak_db, self.app.data[peak_location].freq)
logger.debug("Found peak of %f at %d", peak_db, self.app.data11[peak_location].freq)
lower_cutoff_location = -1
pass_band_db = peak_db
@ -309,8 +309,8 @@ class BandStopAnalysis(Analysis):
if upper_cutoff_gain < -4 or lower_cutoff_gain < -4:
self.result_label.setText(
f"Analysis complete ({len(self.app.data)} points)\n"
f"Analysis complete ({len(self.app.data11)} points)\n"
f"Insufficient data for analysis. Increase segment count.")
else:
self.result_label.setText(
f"Analysis complete ({len(self.app.data)} points)")
f"Analysis complete ({len(self.app.data11)} points)")

Wyświetl plik

@ -102,7 +102,7 @@ class HighPassAnalysis(Analysis):
peak_db = db
peak_location = i
logger.debug("Found peak of %f at %d", peak_db, self.app.data[peak_location].freq)
logger.debug("Found peak of %f at %d", peak_db, self.app.data11[peak_location].freq)
self.app.markers[0].setFrequency(str(self.app.data21[peak_location].freq))
self.app.markers[0].frequencyInput.setText(str(self.app.data21[peak_location].freq))
@ -187,4 +187,4 @@ class HighPassAnalysis(Analysis):
self.db_per_octave_label.setText("Not calculated")
self.db_per_decade_label.setText("Not calculated")
self.result_label.setText("Analysis complete (" + str(len(self.app.data)) + " points)")
self.result_label.setText(f"Analysis complete ({len(self.app.data11)}) points)")

Wyświetl plik

@ -106,7 +106,7 @@ class LowPassAnalysis(Analysis):
peak_db = db
peak_location = i
logger.debug("Found peak of %f at %d", peak_db, self.app.data[peak_location].freq)
logger.debug("Found peak of %f at %d", peak_db, self.app.data11[peak_location].freq)
self.app.markers[0].setFrequency(str(self.app.data21[peak_location].freq))
self.app.markers[0].frequencyInput.setText(str(self.app.data21[peak_location].freq))
@ -202,4 +202,4 @@ class LowPassAnalysis(Analysis):
self.db_per_decade_label.setText("Not calculated")
self.result_label.setText(
"Analysis complete (" + str(len(self.app.data)) + " points)")
"Analysis complete (" + str(len(self.app.data11)) + " points)")

Wyświetl plik

@ -90,8 +90,8 @@ class PeakSearchAnalysis(Analysis):
count = self.input_number_of_peaks.value()
if self.rbtn_data_vswr.isChecked():
data = []
for d in self.app.data:
data.append(d.vswr)
for d in self.app.data11:
data11.append(d.vswr)
elif self.rbtn_data_s21_gain.isChecked():
data = []
for d in self.app.data21:
@ -130,7 +130,7 @@ class PeakSearchAnalysis(Analysis):
logger.debug("Index %d", i)
logger.debug("Prominence %f", prominences[i])
logger.debug("Index in sweep %d", peaks[i])
logger.debug("Frequency %d", self.app.data[peaks[i]].freq)
logger.debug("Frequency %d", self.app.data11[peaks[i]].freq)
logger.debug("Value %f", data[peaks[i]])
if self.checkbox_move_markers:
@ -138,9 +138,9 @@ class PeakSearchAnalysis(Analysis):
logger.warning("More peaks found than there are markers")
for i in range(min(count, len(self.app.markers))):
self.app.markers[i].setFrequency(
str(self.app.data[peaks[indices[i]]].freq))
str(self.app.data11[peaks[indices[i]]].freq))
self.app.markers[i].frequencyInput.setText(
str(self.app.data[peaks[indices[i]]].freq))
str(self.app.data11[peaks[indices[i]]].freq))
max_val = -10**10
max_idx = -1

Wyświetl plik

@ -81,17 +81,17 @@ class SimplePeakSearchAnalysis(Analysis):
if self.rbtn_data_vswr.isChecked():
suffix = ""
data = []
for d in self.app.data:
for d in self.app.data11:
data.append(d.vswr)
elif self.rbtn_data_resistance.isChecked():
suffix = " \N{OHM SIGN}"
data = []
for d in self.app.data:
for d in self.app.data11:
data.append(d.impedance().real)
elif self.rbtn_data_reactance.isChecked():
suffix = " \N{OHM SIGN}"
data = []
for d in self.app.data:
for d in self.app.data11:
data.append(d.impedance().imag)
elif self.rbtn_data_s21_gain.isChecked():
suffix = " dB"
@ -117,10 +117,10 @@ class SimplePeakSearchAnalysis(Analysis):
return
self.peak_frequency.setText(
format_frequency(self.app.data[idx_peak].freq))
format_frequency(self.app.data11[idx_peak].freq))
self.peak_value.setText(str(round(data[idx_peak], 3)) + suffix)
if self.checkbox_move_marker.isChecked() and len(self.app.markers) >= 1:
self.app.markers[0].setFrequency(str(self.app.data[idx_peak].freq))
self.app.markers[0].setFrequency(str(self.app.data11[idx_peak].freq))
self.app.markers[0].frequencyInput.setText(
format_frequency(self.app.data[idx_peak].freq))
format_frequency(self.app.data11[idx_peak].freq))

Wyświetl plik

@ -61,17 +61,17 @@ class VSWRAnalysis(Analysis):
def runAnalysis(self):
max_dips_shown = self.max_dips_shown
data = []
for d in self.app.data:
for d in self.app.data11:
data.append(d.vswr)
# min_idx = np.argmin(data)
#
# logger.debug("Minimum at %d", min_idx)
# logger.debug("Value at minimum: %f", data[min_idx])
# logger.debug("Frequency: %d", self.app.data[min_idx].freq)
# logger.debug("Frequency: %d", self.app.data11[min_idx].freq)
#
# if self.checkbox_move_marker.isChecked():
# self.app.markers[0].setFrequency(str(self.app.data[min_idx].freq))
# self.app.markers[0].frequencyInput.setText(str(self.app.data[min_idx].freq))
# self.app.markers[0].setFrequency(str(self.app.data11[min_idx].freq))
# self.app.markers[0].frequencyInput.setText(str(self.app.data11[min_idx].freq))
minimums = []
min_start = -1
@ -122,23 +122,23 @@ class VSWRAnalysis(Analysis):
logger.debug(
"Section from %d to %d, lowest at %d", start, end, lowest)
self.layout.addRow("Start", QtWidgets.QLabel(
format_frequency(self.app.data[start].freq)))
format_frequency(self.app.data11[start].freq)))
self.layout.addRow(
"Minimum",
QtWidgets.QLabel(
f"{format_frequency(self.app.data[lowest].freq)}"
f"{format_frequency(self.app.data11[lowest].freq)}"
f" ({round(data[lowest], 2)})"))
self.layout.addRow("End", QtWidgets.QLabel(
format_frequency(self.app.data[end].freq)))
format_frequency(self.app.data11[end].freq)))
self.layout.addRow(
"Span",
QtWidgets.QLabel(
format_frequency(self.app.data[end].freq -
self.app.data[start].freq)))
format_frequency(self.app.data11[end].freq -
self.app.data11[start].freq)))
self.layout.addWidget(PeakSearchAnalysis.QHLine())
else:
self.layout.addRow("Low spot", QtWidgets.QLabel(
format_frequency(self.app.data[lowest].freq)))
format_frequency(self.app.data11[lowest].freq)))
self.layout.addWidget(PeakSearchAnalysis.QHLine())
# Remove the final separator line
self.layout.removeRow(self.layout.rowCount()-1)

Wyświetl plik

@ -6,3 +6,4 @@ from .LowPassAnalysis import LowPassAnalysis
from .PeakSearchAnalysis import PeakSearchAnalysis
from .SimplePeakSearchAnalysis import SimplePeakSearchAnalysis
from .VSWRAnalysis import VSWRAnalysis
from .AntennaAnalysis import MagLoopAnalysis

Wyświetl plik

@ -40,6 +40,7 @@ RXP_CAL_LINE = re.compile(r"""^\s*
logger = logging.getLogger(__name__)
def correct_delay(d: Datapoint, delay: float, reflect: bool = False):
mult = 2 if reflect else 1
corr_data = d.z * cmath.exp(
@ -78,12 +79,13 @@ class CalData(UserDict):
f' {d["isolation"].re} {d["isolation"].im}')
return s
class CalDataSet:
def __init__(self):
self.data = defaultdict(CalData)
def insert(self, name: str, dp: Datapoint):
if not name in self.data[dp.freq]:
if name not in self.data[dp.freq]:
raise KeyError(name)
self.data[dp.freq]["freq"] = dp.freq
self.data[dp.freq][name] = dp
@ -292,42 +294,33 @@ class Calibration:
e30.append(caldata["e30"])
e10e32.append(caldata["e10e32"])
self.interp["e00"] = interp1d(freq, e00, kind="slinear")
self.interp["e11"] = interp1d(freq, e11, kind="slinear")
self.interp["delta_e"] = interp1d(freq, delta_e, kind="slinear")
self.interp["e30"] = interp1d(freq, e30, kind="slinear")
self.interp["e10e32"] = interp1d(freq, e10e32, kind="slinear")
self.interp = {
"e00": interp1d(freq, e00,
kind="slinear", bounds_error=False,
fill_value=(e00[0], e00[-1])),
"e11": interp1d(freq, e11,
kind="slinear", bounds_error=False,
fill_value=(e11[0], e11[-1])),
"delta_e": interp1d(freq, delta_e,
kind="slinear", bounds_error=False,
fill_value=(delta_e[0], delta_e[-1])),
"e30": interp1d(freq, e30,
kind="slinear", bounds_error=False,
fill_value=(e30[0], e30[-1])),
"e10e32": interp1d(freq, e10e32,
kind="slinear", bounds_error=False,
fill_value=(e10e32[0], e10e32[-1])),
}
def correct11(self, dp: Datapoint):
i = self.interp
try:
s11 = (dp.z - i["e00"](dp.freq)) / (
(dp.z * i["e11"](dp.freq)) - i["delta_e"](dp.freq))
return Datapoint(dp.freq, s11.real, s11.imag)
except ValueError:
# TODO: implement warn message in gui
logger.info("Data outside calibration")
nearest = sorted(self.dataset.frequencies(),
key=lambda k: abs(dp.freq - k))[0]
ds = self.dataset.get(nearest)
s11 = (dp.z - ds["e00"]) / (
(dp.z * ds["e11"]) - ds["delta_e"])
s11 = (dp.z - i["e00"](dp.freq)) / (
(dp.z * i["e11"](dp.freq)) - i["delta_e"](dp.freq))
return Datapoint(dp.freq, s11.real, s11.imag)
def correct21(self, dp: Datapoint):
i = self.interp
try:
s21 = (dp.z - i["e30"](dp.freq)) / i["e10e32"](dp.freq)
return Datapoint(dp.freq, s21.real, s21.imag)
except ValueError:
# TODO: implement warn message in gui
logger.info("Data outside calibration")
nearest = sorted(self.dataset.frequencies(),
key=lambda k: abs(dp.freq - k))[0]
ds = self.dataset.get(nearest)
s21 = (dp.z - ds["e30"]) / ds["e10e32"]
s21 = (dp.z - i["e30"](dp.freq)) / i["e10e32"](dp.freq)
return Datapoint(dp.freq, s21.real, s21.imag)
# TODO: implement tests

Wyświetl plik

@ -17,12 +17,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from time import sleep
from typing import List
import serial
from NanoVNASaver.Hardware.VNA import VNA, Version
from NanoVNASaver.Hardware.Serial import Interface
from NanoVNASaver.Hardware.VNA import VNA
logger = logging.getLogger(__name__)
@ -30,66 +27,16 @@ logger = logging.getLogger(__name__)
class AVNA(VNA):
name = "AVNA"
def __init__(self, app, serial_port):
super().__init__(app, serial_port)
self.version = Version(self.readVersion())
def __init__(self, iface: Interface):
super().__init__(iface)
self.features.add("Customizable data points")
def isValid(self):
return True
def getCalibration(self) -> str:
logger.debug("Reading calibration info.")
if not self.serial.is_open:
return "Not connected."
with self.app.serialLock:
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("cal\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading calibration info: %s", exc)
return "Unknown"
def readFrequencies(self) -> List[str]:
return self.readValues("frequencies")
def resetSweep(self, start: int, stop: int):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
self.writeSerial("resume")
def readVersion(self):
logger.debug("Reading version info.")
if not self.serial.is_open:
return
with self.app.serialLock:
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("version\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
logger.debug("Found version info: %s", values[1])
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
return ""
list(self.exec_command(f"sweep {start} {stop} {self.datapoints}"))
list(self.exec_command("resume"))
def setSweep(self, start, stop):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
sleep(1)
list(self.exec_command(f"sweep {start} {stop} {self.datapoints}"))

Wyświetl plik

@ -18,28 +18,30 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import platform
from typing import List, Tuple
from collections import namedtuple
from time import sleep
from typing import List
import serial
from serial.tools import list_ports
from NanoVNASaver.Hardware.VNA import VNA
from NanoVNASaver.Hardware.AVNA import AVNA
from NanoVNASaver.Hardware.NanoVNA_F import NanoVNA_F
from NanoVNASaver.Hardware.NanoVNA_H import NanoVNA_H, NanoVNA_H4
from NanoVNASaver.Hardware.NanoVNA import NanoVNA
from NanoVNASaver.Hardware.NanoVNA_V2 import NanoVNAV2
from NanoVNASaver.Hardware.Serial import drain_serial
from NanoVNASaver.Hardware.NanoVNA_F import NanoVNA_F
from NanoVNASaver.Hardware.NanoVNA_H import NanoVNA_H
from NanoVNASaver.Hardware.NanoVNA_H4 import NanoVNA_H4
from NanoVNASaver.Hardware.NanoVNA_V2 import NanoVNA_V2
from NanoVNASaver.Hardware.Serial import drain_serial, Interface
logger = logging.getLogger(__name__)
Device = namedtuple("Device", "vid pid name")
USBDevice = namedtuple("Device", "vid pid name")
DEVICETYPES = (
Device(0x0483, 0x5740, "NanoVNA"),
Device(0x16c0, 0x0483, "AVNA"),
Device(0x04b4, 0x0008, "NanaVNA-V2"),
USBDEVICETYPES = (
USBDevice(0x0483, 0x5740, "NanoVNA"),
USBDevice(0x16c0, 0x0483, "AVNA"),
USBDevice(0x04b4, 0x0008, "S-A-A-2"),
)
RETRIES = 3
TIMEOUT = 0.2
@ -56,67 +58,62 @@ def _fix_v2_hwinfo(dev):
# Get list of interfaces with VNAs connected
def get_interfaces() -> List[Tuple[str, str]]:
return_ports = []
def get_interfaces() -> List[Interface]:
interfaces = []
# serial like usb interfaces
for d in list_ports.comports():
if platform.system() == 'Windows' and d.vid is None:
d = _fix_v2_hwinfo(d)
for t in DEVICETYPES:
if d.vid == t.vid and d.pid == t.pid:
port = d.device
logger.info("Found %s (%04x %04x) on port %s",
t.name, d.vid, d.pid, d.device)
return_ports.append((port, f"{port} ({t.name})"))
return return_ports
for t in USBDEVICETYPES:
if d.vid != t.vid or d.pid != t.pid:
continue
logger.debug("Found %s USB:(%04x:%04x) on port %s",
t.name, d.vid, d.pid, d.device)
iface = Interface('serial', t.name)
iface.port = d.device
interfaces.append(iface)
return interfaces
def get_VNA(app, serial_port: serial.Serial) -> 'VNA':
serial_port.timeout = TIMEOUT
def get_VNA(iface: Interface) -> 'VNA':
# serial_port.timeout = TIMEOUT
logger.info("Finding correct VNA type...")
with app.serialLock:
vna_version = detect_version(serial_port)
with iface.lock:
vna_version = detect_version(iface)
if vna_version == 'v2':
logger.info("Type: NanoVNA-V2")
return NanoVNAV2(app, serial_port)
return NanoVNA_V2(iface)
logger.info("Finding firmware variant...")
tmp_vna = VNA(app, serial_port)
tmp_vna.flushSerialBuffers()
firmware = tmp_vna.readFirmware()
if firmware.find("AVNA + Teensy") > 0:
info = get_info(iface)
if info.find("AVNA + Teensy") >= 0:
logger.info("Type: AVNA")
return AVNA(app, serial_port)
if firmware.find("NanoVNA-H 4") > 0:
return AVNA(iface)
if info.find("NanoVNA-H 4") >= 0:
logger.info("Type: NanoVNA-H4")
vna = NanoVNA_H4(app, serial_port)
if vna.readFirmware().find("sweep_points 201") > 0:
logger.info("VNA has 201 datapoints capability")
vna._datapoints = (201, 101)
vna = NanoVNA_H4(iface)
return vna
if firmware.find("NanoVNA-H") > 0:
if info.find("NanoVNA-H") >= 0:
logger.info("Type: NanoVNA-H")
vna = NanoVNA_H(app, serial_port)
if vna.readFirmware().find("sweep_points 201") > 0:
logger.info("VNA has 201 datapoints capability")
vna._datapoints = (201, 101)
vna = NanoVNA_H(iface)
return vna
if firmware.find("NanoVNA-F") > 0:
if info.find("NanoVNA-F") >= 0:
logger.info("Type: NanoVNA-F")
return NanoVNA_F(app, serial_port)
if firmware.find("NanoVNA") > 0:
return NanoVNA_F(iface)
if info.find("NanoVNA") >= 0:
logger.info("Type: Generic NanoVNA")
return NanoVNA(app, serial_port)
return NanoVNA(iface)
logger.warning("Did not recognize NanoVNA type from firmware.")
return NanoVNA(app, serial_port)
return NanoVNA(iface)
def detect_version(serial_port: serial.Serial) -> str:
data = ""
for i in range(RETRIES):
drain_serial(serial_port)
serial_port.write("\r".encode("ascii"))
sleep(0.05)
data = serial_port.read(128).decode("ascii")
if data.startswith("ch> "):
return "v1"
@ -128,3 +125,26 @@ def detect_version(serial_port: serial.Serial) -> str:
logger.debug("Retry detection: %s", i + 1)
logger.error('No VNA detected. Hardware responded to CR with: %s', data)
return ""
def get_info(serial_port: serial.Serial) -> str:
for i in range(RETRIES):
drain_serial(serial_port)
serial_port.write("info\r".encode("ascii"))
lines = []
retries = 0
while True:
line = serial_port.readline()
line = line.decode("ascii").strip()
if not line:
retries += 1
if retries > RETRIES:
return ""
sleep(wait)
continue
if line == "info": # suppress echo
continue
if line.startswith("ch>"):
logger.debug("Needed retries: %s", retries)
break
lines.append(line)
return "\n".join(lines)

Wyświetl plik

@ -18,14 +18,14 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import struct
from time import sleep
from time import sleep, time
from typing import List
import serial
import numpy as np
from PyQt5 import QtGui
from NanoVNASaver.Hardware.Serial import drain_serial
from NanoVNASaver.Hardware.Serial import drain_serial, Interface
from NanoVNASaver.Hardware.VNA import VNA, Version
logger = logging.getLogger(__name__)
@ -36,70 +36,41 @@ class NanoVNA(VNA):
screenwidth = 320
screenheight = 240
def __init__(self, app, serial_port):
super().__init__(app, serial_port)
self.version = Version(self.readVersion())
def __init__(self, iface: Interface):
super().__init__(iface)
self.sweep_method = "sweep"
self.read_features()
self.start = 27000000
self.stop = 30000000
self._sweepdata = []
logger.debug("Testing against 0.2.0")
if self.version.version_string.find("extended with scan") > 0:
logger.debug("Incompatible scan command detected.")
self.features.add("Incompatible scan command")
self.useScan = False
elif self.version >= Version("0.2.0"):
logger.debug("Newer than 0.2.0, using new scan command.")
self.features.add("New scan command")
self.useScan = True
else:
logger.debug("Older than 0.2.0, using old sweep command.")
self.features.add("Original sweep method")
self.useScan = False
self.readFeatures()
def isValid(self):
return True
def getCalibration(self) -> str:
logger.debug("Reading calibration info.")
if not self.serial.is_open:
return "Not connected."
with self.app.serialLock:
try:
drain_serial(self.serial)
self.serial.write("cal\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading calibration info: %s", exc)
return "Unknown"
def _capture_data(self) -> bytes:
timeout = self.serial.timeout
with self.serial.lock:
drain_serial(self.serial)
timeout = self.serial.timeout
self.serial.write("capture\r".encode('ascii'))
self.serial.readline()
self.serial.timeout = 4
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
self.serial.timeout = timeout
rgb_data = struct.unpack(
f">{self.screenwidth * self.screenheight}H",
image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
return (0xFF000000 +
((rgb_array & 0xF800) << 8) +
((rgb_array & 0x07E0) << 5) +
((rgb_array & 0x001F) << 3))
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
if not self.serial.is_open:
if not self.connected():
return QtGui.QPixmap()
try:
with self.app.serialLock:
drain_serial(self.serial)
timeout = self.serial.timeout
self.serial.write("capture\r".encode('ascii'))
self.serial.timeout = 4
self.serial.readline()
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack(
f">{self.screenwidth * self.screenheight}H",
image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) +
((rgb_array & 0x07E0) << 5) +
((rgb_array & 0x001F) << 3))
rgba_array = self._capture_data()
image = QtGui.QImage(
rgba_array,
self.screenwidth,
@ -112,37 +83,53 @@ class NanoVNA(VNA):
"Exception while capturing screenshot: %s", exc)
return QtGui.QPixmap()
def readFrequencies(self) -> List[str]:
return self.readValues("frequencies")
def resetSweep(self, start: int, stop: int):
self.writeSerial("sweep {start} {stop} {self.datapoints}")
self.writeSerial("resume")
def readVersion(self):
logger.debug("Reading version info.")
if not self.serial.is_open:
return ""
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write("version\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
logger.debug("Found version info: %s", values[1])
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
return ""
list(self.exec_command(f"sweep {start} {stop} {self.datapoints}"))
list(self.exec_command("resume"))
def setSweep(self, start, stop):
if self.useScan:
self.writeSerial(f"scan {start} {stop} {self.datapoints}")
else:
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
sleep(1)
self.start = start
self.stop = stop
if self.sweep_method == "sweep":
list(self.exec_command(f"sweep {start} {stop} {self.datapoints}"))
elif self.sweep_method == "scan":
list(self.exec_command(f"scan {start} {stop} {self.datapoints}"))
def read_features(self):
super().read_features()
if self.version >= Version("0.7.1"):
logger.debug("Using scan mask command.")
self.features.add("Scan mask command")
self.sweep_method = "scan_mask"
elif self.version >= Version("0.2.0"):
logger.debug("Using new scan command.")
self.features.add("Scan command")
self.sweep_method = "scan"
def readFrequencies(self) -> List[int]:
logger.debug("readFrequencies: %s", self.sweep_method)
if self.sweep_method != "scan_mask":
return super().readFrequencies()
if not self._sweepdata: # on connect
return [int(line) for line in self.exec_command("frequencies")]
step = (self.stop - self.start) / (self.datapoints - 1.0)
return [round(self.start + i * step) for i in range(self.datapoints)]
def readValues(self, value) -> List[str]:
if self.sweep_method != "scan_mask":
return super().readValues(value)
logger.debug("readValue with scan mask (%s)", value)
# Actually grab the data only when requesting channel 0.
# The hardware will return all channels which we will store.
if value == "data 0":
self._sweepdata = []
for line in self.exec_command(
f"scan {self.start} {self.stop} {self.datapoints} 0b110"):
data = line.split()
self._sweepdata.append((
f"{data[0]} {data[1]}",
f"{data[2]} {data[3]}"))
if value == "data 0":
return [x[0] for x in self._sweepdata]
if value == "data 1":
return [x[1] for x in self._sweepdata]

Wyświetl plik

@ -17,14 +17,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import struct
import serial
import numpy as np
from PyQt5 import QtGui
from NanoVNASaver.Hardware.NanoVNA import NanoVNA
from NanoVNASaver.Hardware.Serial import drain_serial
logger = logging.getLogger(__name__)
@ -36,26 +34,10 @@ class NanoVNA_F(NanoVNA):
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
if not self.serial.is_open:
if not self.connected():
return QtGui.QPixmap()
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write("capture\r".encode('ascii'))
timeout = self.serial.timeout
self.serial.timeout = 4
self.serial.readline()
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack(
f"<{self.screenwidth * self.screenheight}H", image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) + # G?!
((rgb_array & 0x07E0) >> 3) + # B
((rgb_array & 0x001F) << 11)) # G
rgba_array = self._capture_data()
unwrapped_array = np.empty(
self.screenwidth*self.screenheight,
dtype=np.uint32)

Wyświetl plik

@ -25,9 +25,3 @@ logger = logging.getLogger(__name__)
class NanoVNA_H(NanoVNA):
name = "NanoVNA-H"
class NanoVNA_H4(NanoVNA):
name = "NanoVNA-H4"
screenwidth = 480
screenheight = 320

Wyświetl plik

@ -0,0 +1,44 @@
# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2020 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from NanoVNASaver.Hardware.Serial import Interface
from NanoVNASaver.Hardware.NanoVNA_H import NanoVNA_H
logger = logging.getLogger(__name__)
class NanoVNA_H4(NanoVNA_H):
name = "NanoVNA-H4"
screenwidth = 480
screenheight = 320
def __init__(self, iface: Interface):
super().__init__(iface)
self.sweep_method = "scan"
if "Scan mask command" in self.features:
self.sweep_method = "scan_mask"
def read_features(self):
logger.debug("read_features")
super().read_features()
if self.readFirmware().find("DiSlord") > 0:
self.features.add("Customizable data points")
logger.info("VNA has 201 datapoints capability")
self.valid_datapoints = (201, 101)
self.datapoints = 201

Wyświetl plik

@ -19,8 +19,10 @@
import logging
import platform
from struct import pack, unpack_from
from time import sleep
from typing import List
from NanoVNASaver.Hardware.Serial import Interface
from NanoVNASaver.Hardware.VNA import VNA, Version
if platform.system() != 'Windows':
@ -52,98 +54,91 @@ _ADDR_HARDWARE_REVISION = 0xf2
_ADDR_FW_MAJOR = 0xf3
_ADDR_FW_MINOR = 0xf4
WRITE_SLEEP = 0.05
class NanoVNAV2(VNA):
class NanoVNA_V2(VNA):
name = "NanoVNA-V2"
_datapoints = (303, 101, 203, 505, 1023)
valid_datapoints = (101, 51, 202, 303, 505, 1023)
screenwidth = 320
screenheight = 240
def __init__(self, app, serialPort):
super().__init__(app, serialPort)
def __init__(self, iface: Interface):
super().__init__(iface)
if platform.system() != 'Windows':
tty.setraw(self.serial.fd)
# reset protocol to known state
with self.app.serialLock:
with self.serial.lock:
self.serial.write(pack("<Q", 0))
sleep(WRITE_SLEEP)
self.version = self.readVersion()
self.firmware = self.readFirmware()
self.features.add("Customizable data points")
# TODO: more than one dp per freq
self.features.add("Multi data points")
# firmware major version of 0xff indicates dfu mode
if self.firmware.major == 0xff:
self._isDFU = True
return
if self.firmware.data["major"] == 0xff:
raise IOError('Device is in DFU mode')
self._isDFU = False
self.sweepStartHz = 200e6
self.sweepStepHz = 1e6
self._sweepdata = []
self._updateSweep()
# self.setSweep(200e6, 300e6)
def isValid(self):
if self.isDFU():
return False
return True
def getCalibration(self) -> str:
return "Unknown"
def isDFU(self):
return self._isDFU
def checkValid(self):
if self.isDFU():
raise IOError('Device is in DFU mode')
def read_features(self):
self.features.add("Customizable data points")
# TODO: more than one dp per freq
self.features.add("Multi data points")
def readFirmware(self) -> str:
# read register 0xf3 and 0xf4 (firmware major and minor version)
cmd = pack("<BBBB",
_CMD_READ, _ADDR_FW_MAJOR,
_CMD_READ, _ADDR_FW_MINOR)
with self.app.serialLock:
with self.serial.lock:
self.serial.write(cmd)
sleep(WRITE_SLEEP)
resp = self.serial.read(2)
if len(resp) != 2:
logger.error("Timeout reading version registers")
return None
return Version(f"{resp[0]}.{resp[1]}.0")
def readFrequencies(self) -> List[str]:
self.checkValid()
def readFrequencies(self) -> List[int]:
return [
str(int(self.sweepStartHz + i * self.sweepStepHz))
int(self.sweepStartHz + i * self.sweepStepHz)
for i in range(self.datapoints)]
def readValues(self, value) -> List[str]:
self.checkValid()
# Actually grab the data only when requesting channel 0.
# The hardware will return all channels which we will store.
if value == "data 0":
# reset protocol to known state
with self.app.serialLock:
self.serial.timeout = 8 # should be enough
timeout = self.serial.timeout
with self.serial.lock:
self.serial.write(pack("<Q", 0))
sleep(WRITE_SLEEP)
# cmd: write register 0x30 to clear FIFO
self.serial.write(pack("<BBB",
_CMD_WRITE, _ADDR_VALUES_FIFO, 0))
_CMD_WRITE, _ADDR_VALUES_FIFO, 0))
sleep(WRITE_SLEEP)
# clear sweepdata
self._sweepdata = [(complex(), complex())] * self.datapoints
pointstodo = self.datapoints
# 8 seconds should be enough for 8k points
self.serial.timeout = min(8.0, (pointstodo / 32) + 0.1)
while pointstodo > 0:
logger.info("reading values")
pointstoread = min(255, pointstodo)
# cmd: read FIFO, addr 0x30
self.serial.write(
pack("<BBB",
_CMD_READFIFO, _ADDR_VALUES_FIFO,
pointstoread))
_CMD_READFIFO, _ADDR_VALUES_FIFO,
pointstoread))
sleep(WRITE_SLEEP)
# each value is 32 bytes
nBytes = pointstoread * 32
@ -151,20 +146,24 @@ class NanoVNAV2(VNA):
arr = self.serial.read(nBytes)
if nBytes != len(arr):
logger.error("expected %d bytes, got %d",
nBytes, len(arr))
nBytes, len(arr))
return []
freq_index = -1
for i in range(pointstoread):
(fwd_real, fwd_imag, rev0_real, rev0_imag, rev1_real,
rev1_imag, freq_index) = unpack_from(
"<iiiiiihxxxxxx", arr, i * 32)
rev1_imag, freq_index) = unpack_from(
"<iiiiiihxxxxxx", arr, i * 32)
fwd = complex(fwd_real, fwd_imag)
refl = complex(rev0_real, rev0_imag)
thru = complex(rev1_real, rev1_imag)
logger.debug("Freq index: %i", freq_index)
if i == 0:
logger.debug("Freq index from: %i", freq_index)
self._sweepdata[freq_index] = (refl / fwd, thru / fwd)
logger.debug("Freq index to: %i", freq_index)
pointstodo = pointstodo - pointstoread
self.serial.timeout = timeout
ret = [x[0] for x in self._sweepdata]
ret = [str(x.real) + ' ' + str(x.imag) for x in ret]
@ -177,14 +176,14 @@ class NanoVNAV2(VNA):
def resetSweep(self, start: int, stop: int):
self.setSweep(start, stop)
return
# returns device variant
def readVersion(self):
def readVersion(self) -> 'Version':
# read register 0xf0 (device type), 0xf2 (board revision)
cmd = b"\x10\xf0\x10\xf2"
with self.app.serialLock:
with self.serial.lock:
self.serial.write(cmd)
sleep(WRITE_SLEEP)
resp = self.serial.read(2)
if len(resp) != 2:
logger.error("Timeout reading version registers")
@ -203,7 +202,6 @@ class NanoVNAV2(VNA):
return
def _updateSweep(self):
self.checkValid()
cmd = pack("<BBQ", _CMD_WRITE8,
_ADDR_SWEEP_START, int(self.sweepStartHz))
cmd += pack("<BBQ", _CMD_WRITE8,
@ -212,5 +210,6 @@ class NanoVNAV2(VNA):
_ADDR_SWEEP_POINTS, self.datapoints)
cmd += pack("<BBH", _CMD_WRITE2,
_ADDR_SWEEP_VALS_PER_FREQ, 1)
with self.app.serialLock:
with self.serial.lock:
self.serial.write(cmd)
sleep(WRITE_SLEEP)

Wyświetl plik

@ -16,10 +16,38 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from threading import Lock
import serial
logger = logging.getLogger(__name__)
def drain_serial(serial_port: serial.Serial):
"""drain up to 10k outstanding data in the serial incoming buffer"""
for _ in range(80):
if len(serial_port.read(128)) == 0:
break
"""drain up to 64k outstanding data in the serial incoming buffer"""
# logger.debug("Draining: %s", serial_port)
timeout = serial_port.timeout
serial_port.timeout = 0.05
for _ in range(512):
cnt = len(serial_port.read(128))
if not cnt:
serial_port.timeout = timeout
return
serial_port.timeout = timeout
logger.warning("unable to drain all data")
class Interface(serial.Serial):
def __init__(self, interface_type: str, comment, *args, **kwargs):
super().__init__(*args, **kwargs)
assert interface_type in ('serial', 'usb', 'bt', 'network')
self.type = interface_type
self.comment = comment
self.port = None
self.baudrate = 115200
self.timeout = 0.05
self.lock = Lock()
def __str__(self):
return f"{self.port} ({self.comment})"

Wyświetl plik

@ -17,67 +17,132 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from collections import OrderedDict
from time import sleep
from typing import List
from typing import List, Iterator
import serial
from PyQt5 import QtWidgets, QtGui
from PyQt5 import QtGui
from NanoVNASaver.Settings import Version
from NanoVNASaver.Hardware.Serial import drain_serial
from NanoVNASaver.Hardware.Serial import Interface, drain_serial
logger = logging.getLogger(__name__)
DISLORD_BW = OrderedDict((
(10, 181),
(33, 58),
(100, 19),
(333, 5),
(1000, 1),
(2000, 0),
))
def _max_retries(bandwidth: int, datapoints: int) -> int:
return 20 * (datapoints / 101) + round(
(1000 / bandwidth) ** 1.2 * (datapoints / 101))
class VNA:
name = "VNA"
_datapoints = (101, )
valid_datapoints = (101, )
def __init__(self, app: QtWidgets.QWidget, serial_port: serial.Serial):
self.app = app
self.serial = serial_port
self.version: Version = Version("0.0.0")
def __init__(self, iface: Interface):
self.serial = iface
self.version = Version("0.0.0")
self.features = set()
self.validateInput = True
self.datapoints = VNA._datapoints[0]
self.datapoints = self.valid_datapoints[0]
self.bandwidth = 1000
self.bw_method = "ttrftech"
if self.connected():
self.version = self.readVersion()
self.read_features()
logger.debug("Features: %s", self.features)
# cannot read current bandwidth, so set to highest
# to get initial sweep fast
if "Bandwidth" in self.features:
self.set_bandwidth(self.get_bandwidths()[-1])
def readFeatures(self) -> List[str]:
raw_help = self.readFromCommand("help")
logger.debug("Help command output:")
logger.debug(raw_help)
def exec_command(self, command: str, wait: float = 0.05) -> Iterator[str]:
logger.debug("exec_command(%s)", command)
with self.serial.lock:
drain_serial(self.serial)
self.serial.write(f"{command}\r".encode('ascii'))
sleep(wait)
retries = 0
max_retries = _max_retries(self.bandwidth, self.datapoints)
logger.debug("Max retries: %s", max_retries)
while True:
line = self.serial.readline()
line = line.decode("ascii").strip()
if not line:
retries += 1
if retries > max_retries:
raise IOError("too many retries")
sleep(wait)
continue
if line == command: # suppress echo
continue
if line.startswith("ch>"):
logger.debug("Needed retries: %s", retries)
break
yield line
# Detect features from the help command
if "capture" in raw_help:
def read_features(self):
result = " ".join(self.exec_command("help")).split()
logger.debug("result:\n%s", result)
if "capture" in result:
self.features.add("Screenshots")
if len(self._datapoints) > 1:
if "bandwidth" in result:
self.features.add("Bandwidth")
result = " ".join(list(self.exec_command("bandwidth")))
if "Hz)" in result:
self.bw_method = "dislord"
if len(self.valid_datapoints) > 1:
self.features.add("Customizable data points")
return self.features
def get_bandwidths(self) -> List[int]:
logger.debug("get bandwidths")
if self.bw_method == "dislord":
return list(DISLORD_BW.keys())
result = " ".join(list(self.exec_command("bandwidth")))
try:
result = result.split(" {")[1].strip("}")
return sorted([int(i) for i in result.split("|")])
except IndexError:
return [1000, ]
def set_bandwidth(self, bandwidth: int):
bw_val = bandwidth
if self.bw_method == "dislord":
bw_val = DISLORD_BW[bandwidth]
result = " ".join(self.exec_command(f"bandwidth {bw_val}"))
if self.bw_method == "ttrftech" and result:
raise IOError(f"set_bandwith({bandwidth}: {result}")
self.bandwidth = bandwidth
# TODO: check return types
def readFrequencies(self) -> List[int]:
return []
return [int(f) for f in self.readValues("frequencies")]
def resetSweep(self, start: int, stop: int):
pass
def isValid(self):
return False
def isDFU(self):
return False
def connected(self) -> bool:
return self.serial.is_open
def getFeatures(self) -> List[str]:
return self.features
def getCalibration(self) -> str:
return "Unknown"
return " ".join(list(self.exec_command("cal")))
def getScreenshot(self) -> QtGui.QPixmap:
return QtGui.QPixmap()
def flushSerialBuffers(self):
with self.app.serialLock:
if not self.connected():
return
with self.serial.lock:
self.serial.write("\r\n\r\n".encode("ascii"))
sleep(0.1)
self.serial.reset_input_buffer()
@ -85,102 +150,21 @@ class VNA:
sleep(0.1)
def readFirmware(self) -> str:
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write("info\r".encode('ascii'))
result = ""
data = ""
sleep(0.01)
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
return result
except serial.SerialException as exc:
logger.exception(
"Exception while reading firmware data: %s", exc)
return ""
def readFromCommand(self, command) -> str:
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write(f"{command}\r".encode('ascii'))
result = ""
data = ""
sleep(0.01)
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
return result
except serial.SerialException as exc:
logger.exception(
"Exception while reading %s: %s", command, exc)
return ""
result = "\n".join(list(self.exec_command("info")))
logger.debug("result:\n%s", result)
return result
def readValues(self, value) -> List[str]:
logger.debug("VNA reading %s", value)
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write(f"{value}\r".encode('ascii'))
result = ""
data = ""
sleep(0.05)
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
values = result.split("\r\n")
logger.debug(
"VNA done reading %s (%d values)",
value, len(values)-2)
return values[1:-1]
except serial.SerialException as exc:
logger.exception(
"Exception while reading %s: %s", value, exc)
return []
result = list(self.exec_command(value))
logger.debug("VNA done reading %s (%d values)",
value, len(result))
return result
def writeSerial(self, command):
if not self.serial.is_open:
logger.warning("Writing without serial port being opened (%s)",
command)
return
with self.app.serialLock:
try:
self.serial.write(f"{command}\r".encode('ascii'))
self.serial.readline()
except serial.SerialException as exc:
logger.exception(
"Exception while writing to serial port (%s): %s",
command, exc)
def readVersion(self) -> 'Version':
result = list(self.exec_command("version"))
logger.debug("result:\n%s", result)
return Version(result[0])
def setSweep(self, start, stop):
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
# TODO: should be dropped and the serial part should be a connection class
# which handles unconnected devices
class InvalidVNA(VNA):
name = "Invalid"
_datapoints = (0, )
def setSweep(self, start, stop):
return
def resetSweep(self, start, stop):
return
def writeSerial(self, command):
return
def readFirmware(self):
return
def readFrequencies(self) -> List[int]:
return []
def readValues(self, value):
return
def flushSerialBuffers(self):
return
list(self.exec_command(f"sweep {start} {stop} {self.datapoints}"))

Wyświetl plik

@ -1,157 +0,0 @@
# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2019, 2020 Rune B. Broberg
# Copyright (C) 2020 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from PyQt5 import QtWidgets, QtCore, QtGui
from NanoVNASaver.RFTools import Datapoint
from NanoVNASaver.Marker import Marker
from NanoVNASaver.Marker.Values import TYPES, default_label_ids
logger = logging.getLogger(__name__)
class MarkerSettingsWindow(QtWidgets.QWidget):
exampleData11 = [Datapoint(123000000, 0.89, -0.11),
Datapoint(123500000, 0.9, -0.1),
Datapoint(124000000, 0.91, -0.95)]
exampleData21 = [Datapoint(123000000, -0.25, 0.49),
Datapoint(123456000, -0.3, 0.5),
Datapoint(124000000, -0.2, 0.5)]
def __init__(self, app: QtWidgets.QWidget):
super().__init__()
self.app = app
self.setWindowTitle("Marker settings")
self.setWindowIcon(self.app.icon)
QtWidgets.QShortcut(QtCore.Qt.Key_Escape, self, self.cancelButtonClick)
self.exampleMarker = Marker("Example marker")
layout = QtWidgets.QVBoxLayout()
self.setLayout(layout)
settings_group_box = QtWidgets.QGroupBox("Settings")
settings_group_box_layout = QtWidgets.QFormLayout(settings_group_box)
self.checkboxColouredMarker = QtWidgets.QCheckBox("Colored marker name")
self.checkboxColouredMarker.setChecked(
self.app.settings.value("ColoredMarkerNames", True, bool))
self.checkboxColouredMarker.stateChanged.connect(self.updateMarker)
settings_group_box_layout.addRow(self.checkboxColouredMarker)
fields_group_box = QtWidgets.QGroupBox("Displayed data")
fields_group_box_layout = QtWidgets.QFormLayout(fields_group_box)
self.savedFieldSelection = self.app.settings.value(
"MarkerFields", defaultValue=default_label_ids()
)
if self.savedFieldSelection == "":
self.savedFieldSelection = []
self.currentFieldSelection = self.savedFieldSelection[:]
self.active_labels_view = QtWidgets.QListView()
self.update_displayed_data_form()
fields_group_box_layout.addRow(self.active_labels_view)
layout.addWidget(settings_group_box)
layout.addWidget(fields_group_box)
layout.addWidget(self.exampleMarker.getGroupBox())
btn_layout = QtWidgets.QHBoxLayout()
layout.addLayout(btn_layout)
btn_ok = QtWidgets.QPushButton("OK")
btn_apply = QtWidgets.QPushButton("Apply")
btn_default = QtWidgets.QPushButton("Defaults")
btn_cancel = QtWidgets.QPushButton("Cancel")
btn_ok.clicked.connect(self.okButtonClick)
btn_apply.clicked.connect(self.applyButtonClick)
btn_default.clicked.connect(self.defaultButtonClick)
btn_cancel.clicked.connect(self.cancelButtonClick)
btn_layout.addWidget(btn_ok)
btn_layout.addWidget(btn_apply)
btn_layout.addWidget(btn_default)
btn_layout.addWidget(btn_cancel)
self.updateMarker()
for m in self.app.markers:
m.setFieldSelection(self.currentFieldSelection)
m.setColoredText(self.checkboxColouredMarker.isChecked())
def updateMarker(self):
self.exampleMarker.setFrequency(123456000)
self.exampleMarker.setColoredText(self.checkboxColouredMarker.isChecked())
self.exampleMarker.setFieldSelection(self.currentFieldSelection)
self.exampleMarker.findLocation(self.exampleData11)
self.exampleMarker.resetLabels()
self.exampleMarker.updateLabels(self.exampleData11, self.exampleData21)
def updateField(self, field: QtGui.QStandardItem):
if field.checkState() == QtCore.Qt.Checked:
if not field.data() in self.currentFieldSelection:
self.currentFieldSelection = []
for i in range(self.model.rowCount()):
field = self.model.item(i, 0)
if field.checkState() == QtCore.Qt.Checked:
self.currentFieldSelection.append(field.data())
else:
if field.data() in self.currentFieldSelection:
self.currentFieldSelection.remove(field.data())
self.updateMarker()
def applyButtonClick(self):
self.savedFieldSelection = self.currentFieldSelection[:]
self.app.settings.setValue("MarkerFields", self.savedFieldSelection)
self.app.settings.setValue("ColoredMarkerNames", self.checkboxColouredMarker.isChecked())
for m in self.app.markers:
m.setFieldSelection(self.savedFieldSelection)
m.setColoredText(self.checkboxColouredMarker.isChecked())
def okButtonClick(self):
self.applyButtonClick()
self.close()
def cancelButtonClick(self):
self.currentFieldSelection = self.savedFieldSelection[:]
self.update_displayed_data_form()
self.updateMarker()
self.close()
def defaultButtonClick(self):
self.currentFieldSelection = default_label_ids()
self.update_displayed_data_form()
self.updateMarker()
def update_displayed_data_form(self):
self.model = QtGui.QStandardItemModel()
for label in TYPES:
item = QtGui.QStandardItem(label.description)
item.setData(label.label_id)
item.setCheckable(True)
item.setEditable(False)
if label.label_id in self.currentFieldSelection:
item.setCheckState(QtCore.Qt.Checked)
self.model.appendRow(item)
self.active_labels_view.setModel(self.model)
self.model.itemChanged.connect(self.updateField)

Wyświetl plik

@ -1,3 +1,2 @@
from .Widget import Marker # noqa
from .Settings import MarkerSettingsWindow # noqa
from .Values import Value, default_label_ids # noqa

Wyświetl plik

@ -24,7 +24,6 @@ from collections import OrderedDict
from time import sleep, strftime, localtime
from typing import List
import serial
from PyQt5 import QtWidgets, QtCore, QtGui
from .Windows import (
@ -32,12 +31,10 @@ from .Windows import (
DeviceSettingsWindow, DisplaySettingsWindow, SweepSettingsWindow,
TDRWindow
)
from .Formatting import (
format_frequency, format_frequency_short, format_frequency_sweep,
parse_frequency,
)
from .Hardware.Hardware import get_interfaces, get_VNA
from .Hardware.VNA import InvalidVNA
from .Widgets import SweepControl
from .Formatting import format_frequency
from .Hardware.Hardware import Interface, get_interfaces, get_VNA
from .Hardware.VNA import VNA
from .RFTools import Datapoint, corr_att_data
from .Charts.Chart import Chart
from .Charts import (
@ -50,7 +47,6 @@ from .Charts import (
SmithChart, SParameterChart, TDRChart,
)
from .Calibration import Calibration
from .Inputs import FrequencyInputWidget
from .Marker import Marker
from .SweepWorker import SweepWorker
from .Settings import BandsModel
@ -88,17 +84,18 @@ class NanoVNASaver(QtWidgets.QWidget):
self.worker.signals.sweepError.connect(self.showSweepError)
self.worker.signals.fatalSweepError.connect(self.showFatalSweepError)
self.sweep_control = SweepControl(self)
self.bands = BandsModel()
self.noSweeps = 1 # Number of sweeps to run
self.serialLock = threading.Lock()
self.serial = serial.Serial()
self.vna = InvalidVNA(self, serial)
self.interface = Interface("serial", "None")
self.vna = VNA(self.interface)
self.dataLock = threading.Lock()
# TODO: use Touchstone class as data container
self.data: List[Datapoint] = []
self.data11: List[Datapoint] = []
self.data21: List[Datapoint] = []
self.referenceS11data: List[Datapoint] = []
self.referenceS21data: List[Datapoint] = []
@ -110,8 +107,6 @@ class NanoVNASaver(QtWidgets.QWidget):
self.markers = []
self.serialPort = ""
logger.debug("Building user interface")
self.baseTitle = f"NanoVNA Saver {NanoVNASaver.version}"
@ -224,90 +219,9 @@ class NanoVNASaver(QtWidgets.QWidget):
# Sweep control
###############################################################
sweep_control_box = QtWidgets.QGroupBox()
sweep_control_box.setMaximumWidth(250)
sweep_control_box.setTitle("Sweep control")
sweep_control_layout = QtWidgets.QFormLayout(sweep_control_box)
left_column.addWidget(self.sweep_control)
line = QtWidgets.QFrame()
line.setFrameShape(QtWidgets.QFrame.VLine)
sweep_input_layout = QtWidgets.QHBoxLayout()
sweep_input_left_layout = QtWidgets.QFormLayout()
sweep_input_right_layout = QtWidgets.QFormLayout()
sweep_input_layout.addLayout(sweep_input_left_layout)
sweep_input_layout.addWidget(line)
sweep_input_layout.addLayout(sweep_input_right_layout)
sweep_control_layout.addRow(sweep_input_layout)
self.sweepStartInput = FrequencyInputWidget()
self.sweepStartInput.setMinimumWidth(60)
self.sweepStartInput.setAlignment(QtCore.Qt.AlignRight)
self.sweepStartInput.textEdited.connect(self.updateCenterSpan)
self.sweepStartInput.textChanged.connect(self.updateStepSize)
sweep_input_left_layout.addRow(QtWidgets.QLabel("Start"), self.sweepStartInput)
self.sweepEndInput = FrequencyInputWidget()
self.sweepEndInput.setAlignment(QtCore.Qt.AlignRight)
self.sweepEndInput.textEdited.connect(self.updateCenterSpan)
self.sweepEndInput.textChanged.connect(self.updateStepSize)
sweep_input_left_layout.addRow(QtWidgets.QLabel("Stop"), self.sweepEndInput)
self.sweepCenterInput = FrequencyInputWidget()
self.sweepCenterInput.setMinimumWidth(60)
self.sweepCenterInput.setAlignment(QtCore.Qt.AlignRight)
self.sweepCenterInput.textEdited.connect(self.updateStartEnd)
sweep_input_right_layout.addRow(QtWidgets.QLabel("Center"), self.sweepCenterInput)
self.sweepSpanInput = FrequencyInputWidget()
self.sweepSpanInput.setAlignment(QtCore.Qt.AlignRight)
self.sweepSpanInput.textEdited.connect(self.updateStartEnd)
sweep_input_right_layout.addRow(QtWidgets.QLabel("Span"), self.sweepSpanInput)
self.sweepCountInput = QtWidgets.QLineEdit(self.settings.value("Segments", "1"))
self.sweepCountInput.setAlignment(QtCore.Qt.AlignRight)
self.sweepCountInput.setFixedWidth(60)
self.sweepCountInput.textEdited.connect(self.updateStepSize)
self.sweepStepLabel = QtWidgets.QLabel("Hz/step")
self.sweepStepLabel.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignVCenter)
segment_layout = QtWidgets.QHBoxLayout()
segment_layout.addWidget(self.sweepCountInput)
segment_layout.addWidget(self.sweepStepLabel)
sweep_control_layout.addRow(QtWidgets.QLabel("Segments"), segment_layout)
btn_sweep_settings_window = QtWidgets.QPushButton("Sweep settings ...")
btn_sweep_settings_window.clicked.connect(
lambda: self.display_window("sweep_settings"))
sweep_control_layout.addRow(btn_sweep_settings_window)
self.sweepProgressBar = QtWidgets.QProgressBar()
self.sweepProgressBar.setMaximum(100)
self.sweepProgressBar.setValue(0)
sweep_control_layout.addRow(self.sweepProgressBar)
self.btnSweep = QtWidgets.QPushButton("Sweep")
self.btnSweep.clicked.connect(self.sweep)
self.btnSweep.setShortcut(QtCore.Qt.Key_W | QtCore.Qt.CTRL)
self.btnStopSweep = QtWidgets.QPushButton("Stop")
self.btnStopSweep.clicked.connect(self.stopSweep)
self.btnStopSweep.setShortcut(QtCore.Qt.Key_Escape)
self.btnStopSweep.setDisabled(True)
btn_layout = QtWidgets.QHBoxLayout()
btn_layout.addWidget(self.btnSweep)
btn_layout.addWidget(self.btnStopSweep)
btn_layout.setContentsMargins(0, 0, 0, 0)
btn_layout_widget = QtWidgets.QWidget()
btn_layout_widget.setLayout(btn_layout)
sweep_control_layout.addRow(btn_layout_widget)
left_column.addWidget(sweep_control_box)
###############################################################
# ###############################################################
# Marker control
###############################################################
@ -456,7 +370,7 @@ class NanoVNASaver(QtWidgets.QWidget):
self.rescanSerialPort()
self.serialPortInput.setEditable(True)
btn_rescan_serial_port = QtWidgets.QPushButton("Rescan")
btn_rescan_serial_port.setFixedWidth(60)
btn_rescan_serial_port.setFixedWidth(65)
btn_rescan_serial_port.clicked.connect(self.rescanSerialPort)
serial_port_input_layout = QtWidgets.QHBoxLayout()
serial_port_input_layout.addWidget(self.serialPortInput)
@ -471,6 +385,7 @@ class NanoVNASaver(QtWidgets.QWidget):
serial_button_layout.addWidget(self.btnSerialToggle, stretch=1)
self.btnDeviceSettings = QtWidgets.QPushButton("Manage")
self.btnDeviceSettings.setFixedWidth(65)
self.btnDeviceSettings.clicked.connect(
lambda: self.display_window("device_settings"))
serial_button_layout.addWidget(self.btnDeviceSettings, stretch=0)
@ -555,11 +470,11 @@ class NanoVNASaver(QtWidgets.QWidget):
def rescanSerialPort(self):
self.serialPortInput.clear()
for port in get_interfaces():
self.serialPortInput.insertItem(1, port[1], port[0])
for iface in get_interfaces():
self.serialPortInput.insertItem(1, f"{iface}", iface)
def exportFile(self, nr_params: int = 1):
if len(self.data) == 0:
if len(self.data11) == 0:
QtWidgets.QMessageBox.warning(
self, "No data to save", "There is no data to save.")
return
@ -587,10 +502,10 @@ class NanoVNASaver(QtWidgets.QWidget):
return
ts = Touchstone(filename)
ts.sdata[0] = self.data
ts.sdata[0] = self.data11
if nr_params > 1:
ts.sdata[1] = self.data21
for dp in self.data:
for dp in self.data11:
ts.sdata[2].append(Datapoint(dp.freq, 0, 0))
ts.sdata[3].append(Datapoint(dp.freq, 0, 0))
try:
@ -600,88 +515,76 @@ class NanoVNASaver(QtWidgets.QWidget):
return
def serialButtonClick(self):
if self.serial.is_open:
self.stopSerial()
if not self.vna.connected():
self.connect_device()
else:
self.startSerial()
return
self.disconnect_device()
def startSerial(self):
with self.serialLock:
self.serialPort = self.serialPortInput.currentData()
if self.serialPort == "":
self.serialPort = self.serialPortInput.currentText()
logger.info("Opening serial port %s", self.serialPort)
def connect_device(self):
if not self.interface:
return
with self.interface.lock:
self.interface = self.serialPortInput.currentData()
logger.info("Connection %s", self.interface)
try:
self.serial = serial.Serial(port=self.serialPort, baudrate=115200)
self.serial.timeout = 0.05
except serial.SerialException as exc:
logger.error("Tried to open %s and failed: %s", self.serialPort, exc)
self.interface.open()
self.interface.timeout = 0.05
except (IOError, AttributeError) as exc:
logger.error("Tried to open %s and failed: %s",
self.interface, exc)
return
if not self.serial.isOpen() :
logger.error("Unable to open port %s", self.serialPort)
if not self.interface.isOpen():
logger.error("Unable to open port %s", self.interface)
return
self.btnSerialToggle.setText("Disconnect")
sleep(0.1)
try:
self.vna = get_VNA(self.interface)
except IOError as exc:
logger.error("Unable to connect to VNA: %s", exc)
sleep(0.05)
self.vna = get_VNA(self, self.serial)
self.vna.validateInput = self.settings.value("SerialInputValidation", True, bool)
self.worker.setVNA(self.vna)
logger.info(self.vna.readFirmware())
# connected
self.btnSerialToggle.setText("Disconnect")
frequencies = self.vna.readFrequencies()
if frequencies:
logger.info("Read starting frequency %s and end frequency %s",
frequencies[0], frequencies[100])
if int(frequencies[0]) == int(frequencies[100]) and (
self.sweepStartInput.text() == "" or
self.sweepEndInput.text() == ""):
self.sweepStartInput.setText(
format_frequency_sweep(int(frequencies[0])))
self.sweepEndInput.setText(
format_frequency_sweep(int(frequencies[100]) + 100000))
elif (self.sweepStartInput.text() == "" or
self.sweepEndInput.text() == ""):
self.sweepStartInput.setText(
format_frequency_sweep(int(frequencies[0])))
self.sweepEndInput.setText(
format_frequency_sweep(int(frequencies[100])))
self.sweepStartInput.textEdited.emit(
self.sweepStartInput.text())
self.sweepStartInput.textChanged.emit(
self.sweepStartInput.text())
else:
if not frequencies:
logger.warning("No frequencies read")
return
logger.info("Read starting frequency %s and end frequency %s",
frequencies[0], frequencies[-1])
self.sweep_control.set_start(frequencies[0])
if frequencies[0] < frequencies[-1]:
self.sweep_control.set_end(frequencies[-1])
else:
self.sweep_control.set_end(
frequencies[0] +
self.vna.datapoints * self.sweep_control.get_count())
self.sweep_control.set_count(1) # speed up things
self.sweep_control.update_center_span()
self.sweep_control.update_step_size()
logger.debug("Starting initial sweep")
self.sweep()
return
self.sweep_start()
def stopSerial(self):
with self.serialLock:
logger.info("Closing connection to NanoVNA")
self.serial.close()
self.btnSerialToggle.setText("Connect to NanoVNA")
def disconnect_device(self):
with self.interface.lock:
logger.info("Closing connection to %s", self.interface)
self.interface.close()
self.btnSerialToggle.setText("Connect to device")
def toggleSweepSettings(self, disabled):
self.sweepStartInput.setDisabled(disabled)
self.sweepEndInput.setDisabled(disabled)
self.sweepSpanInput.setDisabled(disabled)
self.sweepCenterInput.setDisabled(disabled)
self.sweepCountInput.setDisabled(disabled)
def sweep(self):
# Run the serial port update
if not self.serial.is_open:
def sweep_start(self):
# Run the device data update
if not self.vna.connected():
return
self.worker.stopped = False
self.sweepProgressBar.setValue(0)
self.btnSweep.setDisabled(True)
self.btnStopSweep.setDisabled(False)
self.toggleSweepSettings(True)
self.sweep_control.progress_bar.setValue(0)
self.sweep_control.btn_start.setDisabled(True)
self.sweep_control.btn_stop.setDisabled(False)
self.sweep_control.toggle_settings(True)
for m in self.markers:
m.resetLabels()
self.s11_min_rl_label.setText("")
@ -690,25 +593,21 @@ class NanoVNASaver(QtWidgets.QWidget):
self.s21_max_gain_label.setText("")
self.tdr_result_label.setText("")
if self.sweepCountInput.text().isdigit():
self.settings.setValue("Segments", self.sweepCountInput.text())
if self.sweep_control.input_count.text().isdigit():
self.settings.setValue("Segments", self.sweep_control.input_count.text())
logger.debug("Starting worker thread")
self.threadpool.start(self.worker)
def stopSweep(self):
def sweep_stop(self):
self.worker.stopped = True
def saveData(self, data, data21, source=None):
if self.dataLock.acquire(blocking=True):
self.data = data
with self.dataLock:
self.data11 = data
self.data21 = data21
if self.s21att > 0:
self.data21 = corr_att_data(data21, self.s21att)
else:
self.data21 = data21
else:
logger.error("Failed acquiring data lock while saving.")
self.dataLock.release()
self.data21 = corr_att_data(self.data21, self.s21att)
if source is not None:
self.sweepSource = source
else:
@ -718,38 +617,37 @@ class NanoVNASaver(QtWidgets.QWidget):
).lstrip()
def markerUpdated(self, marker: Marker):
if self.dataLock.acquire(blocking=True):
marker.findLocation(self.data)
with self.dataLock:
marker.findLocation(self.data11)
for m in self.markers:
m.resetLabels()
m.updateLabels(self.data, self.data21)
m.updateLabels(self.data11, self.data21)
for c in self.subscribing_charts:
c.update()
self.dataLock.release()
def dataUpdated(self):
if self.dataLock.acquire(blocking=True):
with self.dataLock:
for m in self.markers:
m.resetLabels()
m.updateLabels(self.data, self.data21)
m.updateLabels(self.data11, self.data21)
for c in self.s11charts:
c.setData(self.data)
c.setData(self.data11)
for c in self.s21charts:
c.setData(self.data21)
for c in self.combinedCharts:
c.setCombinedData(self.data, self.data21)
c.setCombinedData(self.data11, self.data21)
self.sweepProgressBar.setValue(self.worker.percentage)
self.sweep_control.progress_bar.setValue(self.worker.percentage)
self.windows["tdr"].updateTDR()
# Find the minimum S11 VSWR:
min_vswr = 100
min_vswr_freq = -1
for d in self.data:
for d in self.data11:
vswr = d.vswr
if min_vswr > vswr > 0:
min_vswr = vswr
@ -789,56 +687,24 @@ class NanoVNASaver(QtWidgets.QWidget):
self.s21_min_gain_label.setText("")
self.s21_max_gain_label.setText("")
else:
logger.error("Failed acquiring data lock while updating.")
self.updateTitle()
self.dataLock.release()
self.dataAvailable.emit()
def sweepFinished(self):
self.sweepProgressBar.setValue(100)
self.btnSweep.setDisabled(False)
self.btnStopSweep.setDisabled(True)
self.toggleSweepSettings(False)
self.sweep_control.progress_bar.setValue(100)
self.sweep_control.btn_start.setDisabled(False)
self.sweep_control.btn_stop.setDisabled(True)
self.sweep_control.toggle_settings(False)
def updateCenterSpan(self):
fstart = parse_frequency(self.sweepStartInput.text())
fstop = parse_frequency(self.sweepEndInput.text())
fspan = fstop - fstart
fcenter = int(round((fstart+fstop)/2))
if fspan < 0 or fstart < 0 or fstop < 0:
return
self.sweepSpanInput.setText(format_frequency_sweep(fspan))
self.sweepCenterInput.setText(format_frequency_sweep(fcenter))
def updateStartEnd(self):
fcenter = parse_frequency(self.sweepCenterInput.text())
fspan = parse_frequency(self.sweepSpanInput.text())
if fspan < 0 or fcenter < 0:
return
fstart = int(round(fcenter - fspan/2))
fstop = int(round(fcenter + fspan/2))
if fstart < 0 or fstop < 0:
return
self.sweepStartInput.setText(format_frequency_sweep(fstart))
self.sweepEndInput.setText(format_frequency_sweep(fstop))
def updateStepSize(self):
fspan = parse_frequency(self.sweepSpanInput.text())
if fspan < 0:
return
if self.sweepCountInput.text().isdigit():
segments = int(self.sweepCountInput.text())
if segments > 0:
fstep = fspan / (segments * self.vna.datapoints - 1)
self.sweepStepLabel.setText(
f"{format_frequency_short(fstep)}/step")
for marker in self.markers:
marker.frequencyInput.textEdited.emit(
marker.frequencyInput.text())
def setReference(self, s11data=None, s21data=None, source=None):
if not s11data:
s11data = self.data
if not s21data:
s21data = self.data21
s11data = self.data11[:]
s21data = self.data21[:]
self.referenceS11data = s11data
for c in self.s11charts:
c.setReference(s11data)
@ -863,7 +729,7 @@ class NanoVNASaver(QtWidgets.QWidget):
title = self.baseTitle
insert = ""
if self.sweepSource != "":
insert += f"Sweep: {self.sweepSource} @ {len(self.data)} points"
insert += f"Sweep: {self.sweepSource} @ {len(self.data11)} points"
if self.referenceSource != "":
if insert != "":
insert += ", "
@ -904,7 +770,7 @@ class NanoVNASaver(QtWidgets.QWidget):
filename, _ = QtWidgets.QFileDialog.getOpenFileName(
filter="Touchstone Files (*.s1p *.s2p);;All files (*.*)")
if filename != "":
self.data = []
self.data11 = []
self.data21 = []
t = Touchstone(filename)
t.load()
@ -927,7 +793,7 @@ class NanoVNASaver(QtWidgets.QWidget):
def showSweepError(self):
self.showError(self.worker.error_message)
self.serial.flushInput() # Remove any left-over data
self.vna.flushSerialBuffers() # Remove any left-over data
self.sweepFinished()
def popoutChart(self, chart: Chart):

Wyświetl plik

@ -149,7 +149,7 @@ def serial_to_parallel(z: complex) -> complex:
return complex(z_sq_sum / z.real, z_sq_sum / z.imag)
def corr_att_data(data: List[Datapoint], att: float):
def corr_att_data(data: List[Datapoint], att: float) -> List[Datapoint]:
"""Correct the ratio for a given attenuation on s21 input"""
if att <= 0:
return data
@ -157,8 +157,6 @@ def corr_att_data(data: List[Datapoint], att: float):
att = 10**(att/20)
ndata = []
for dp in data:
freq, re, im = dp
orig = complex(re, im)
corrected = orig * att
ndata.append(Datapoint(freq, corrected.real, corrected.imag))
corrected = dp.z * att
ndata.append(Datapoint(dp.freq, corrected.real, corrected.imag))
return ndata

Wyświetl plik

@ -155,40 +155,39 @@ class BandsModel(QtCore.QAbstractTableModel):
class Version:
RXP = re.compile(r"(.*\s+)?(\d+)\.(\d+)\.(\d+)(.*)")
RXP = re.compile(r"""^
\D*
(?P<major>\d+)\.
(?P<minor>\d+)\.
(?P<revision>\d+)
(?P<note>.*)
$""", re.VERBOSE)
def __init__(self, version_string: str):
self.major = 0
self.minor = 0
self.revision = 0
self.note = ""
self.version_string = version_string
results = Version.RXP.match(version_string)
if results:
self.major = int(results.group(2))
self.minor = int(results.group(3))
self.revision = int(results.group(4))
self.note = results.group(5)
logger.debug(
"Parsed version as \"%d.%d.%d%s\"",
self.major, self.minor, self.revision, self.note)
def __init__(self, vstring: str = "0.0.0"):
self.data = {
"major": 0,
"minor": 0,
"revision": 0,
"note": "",
}
try:
self.data = Version.RXP.search(vstring).groupdict()
for name in ("major", "minor", "revision"):
self.data[name] = int(self.data[name])
except AttributeError:
logger.error("Unable to parse version: %s", vstring)
def __gt__(self, other: "Version") -> bool:
if self.major > other.major:
return True
if self.major < other.major:
return False
if self.minor > other.minor:
return True
if self.minor < other.minor:
return False
if self.revision > other.revision:
return True
l, r = self.data, other.data
for name in ("major", "minor", "revision"):
if l[name] > r[name]:
return True
if l[name] < r[name]:
return False
return False
def __lt__(self, other: "Version") -> bool:
return other > self
return other < self
def __ge__(self, other: "Version") -> bool:
return self > other or self == other
@ -197,11 +196,24 @@ class Version:
return self < other or self == other
def __eq__(self, other: "Version") -> bool:
return (
self.major == other.major and
self.minor == other.minor and
self.revision == other.revision and
self.note == other.note)
return self.data == other.data
def __str__(self) -> str:
return f"{self.major}.{self.minor}.{self.revision}{self.note}"
return (f'{self.data["major"]}.{self.data["minor"]}'
f'.{self.data["revision"]}{self.data["note"]}')
@property
def major(self) -> int:
return self.data["major"]
@property
def minor(self) -> int:
return self.data["minor"]
@property
def revision(self) -> int:
return self.data["revision"]
@property
def note(self) -> str:
return self.data["note"]

Wyświetl plik

@ -18,20 +18,20 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from time import sleep
from typing import List, Tuple
from typing import Iterator, List, Tuple
import numpy as np
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtCore import pyqtSlot, pyqtSignal
from NanoVNASaver.Calibration import correct_delay
from NanoVNASaver.Formatting import parse_frequency
from NanoVNASaver.RFTools import Datapoint
logger = logging.getLogger(__name__)
def truncate(values: List[List[Tuple]], count: int) -> List[List[Tuple]]:
"""truncate drops extrema from data list if averaging is active"""
keep = len(values) - count
logger.debug("Truncating from %d values to %d", len(values), keep)
if count < 1 or keep < 1:
@ -54,20 +54,62 @@ class WorkerSignals(QtCore.QObject):
fatalSweepError = pyqtSignal()
class Sweep():
def __init__(self, start: int = 3600000, end: int = 30000000,
points: int = 101, sweeps: int = 1):
self.start = start
self.end = end
self.points = points
self.sweeps = sweeps
self.span = self.end - self.start
self.step = self.stepsize()
self.check()
def __repr__(self) -> str:
return (
f"Sweep({self.start}, {self.end}, {self.points} {self.sweeps})")
def __eq__(self, other) -> bool:
return(self.start == other.start and
self.end == other.end and
self.points == other.points and
self.sweeps == other.sweeps)
def check(self):
if not(self.sweeps > 0 and
self.points > 0 and
self.start > 0 and
self.end > 0 and
self.step >= 1):
raise ValueError(f"Illegal sweep settings: {self}")
def stepsize(self) -> int:
return int(self.span / (self.points * self.sweeps - 1))
def get_index_range(self, index: int) -> Tuple[int, int]:
start = self.start + index * self.points * self.step
end = start + (self.points -1) * self.step
return (start, end)
def get_frequencies(self) -> Iterator[int]:
for freq in range(self.start, self.end + 1, self.step):
yield freq
class SweepWorker(QtCore.QRunnable):
def __init__(self, app: QtWidgets.QWidget):
super().__init__()
logger.info("Initializing SweepWorker")
self.signals = WorkerSignals()
self.app = app
self.vna: app.vna
self.noSweeps = 1
self.sweep = Sweep()
self.setAutoDelete(False)
self.percentage = 0
self.data11: List[Datapoint] = []
self.data21: List[Datapoint] = []
self.rawData11: List[Datapoint] = []
self.rawData21: List[Datapoint] = []
self.init_data()
self.stopped = False
self.running = False
self.continuousSweep = False
@ -79,195 +121,126 @@ class SweepWorker(QtCore.QRunnable):
@pyqtSlot()
def run(self):
try:
self._run()
except BaseException as exc:
logger.exception("%s", exc)
raise exc
def _run(self):
logger.info("Initializing SweepWorker")
self.running = True
self.percentage = 0
if not self.app.serial.is_open:
if not self.app.vna.connected():
logger.debug(
"Attempted to run without being connected to the NanoVNA")
self.running = False
return
try:
sweep = Sweep(
self.app.sweep_control.get_start(),
self.app.sweep_control.get_end(),
self.app.vna.datapoints,
self.app.sweep_control.get_count(),
)
except ValueError:
self.gui_error(
"Unable to parse frequency inputs"
" - check start and stop fields.")
return
if int(self.app.sweepCountInput.text()) > 0:
self.noSweeps = int(self.app.sweepCountInput.text())
logger.info("%d sweeps", self.noSweeps)
averages = 1
if self.averaging:
logger.info("%d averages", self.averages)
averages = self.averages
if (self.app.sweepStartInput.text() == "" or
self.app.sweepEndInput.text() == ""):
logger.debug("First sweep - standard range")
# We should handle the first startup by reading frequencies?
sweep_from = 1000000
sweep_to = 800000000
else:
sweep_from = parse_frequency(self.app.sweepStartInput.text())
sweep_to = parse_frequency(self.app.sweepEndInput.text())
logger.debug("Parsed sweep range as %d to %d",
sweep_from, sweep_to)
if sweep_from < 0 or sweep_to < 0 or sweep_from == sweep_to:
logger.warning("Can't sweep from %s to %s",
self.app.sweepStartInput.text(),
self.app.sweepEndInput.text())
self.error_message = (
"Unable to parse frequency inputs"
" - check start and stop fields.")
self.stopped = True
self.running = False
self.signals.sweepError.emit()
return
if sweep != self.sweep: # parameters changed
self.sweep = sweep
self.init_data()
span = sweep_to - sweep_from
stepsize = int(span / (self.noSweeps * self.vna.datapoints - 1))
# Setup complete
values11 = []
values21 = []
frequencies = []
if self.averaging:
for i in range(self.noSweeps):
logger.debug("Sweep segment no %d averaged over %d readings",
i, self.averages)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
break
start = sweep_from + i * self.vna.datapoints * stepsize
freq, val11, val21 = self.readAveragedSegment(
start,
start + (self.vna.datapoints - 1) * stepsize,
self.averages)
frequencies.extend(freq)
values11.extend(val11)
values21.extend(val21)
self.percentage = (i + 1) * (self.vna.datapoints - 1) / \
self.noSweeps
logger.debug("Saving acquired data")
self.saveData(frequencies, values11, values21)
else:
for i in range(self.noSweeps):
finished = False
while not finished:
for i in range(self.sweep.sweeps):
logger.debug("Sweep segment no %d", i)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
finished = True
break
start = sweep_from + i * self.vna.datapoints * stepsize
start, stop = self.sweep.get_index_range(i)
try:
freq, val11, val21 = self.readSegment(
start, start + (self.vna.datapoints - 1) * stepsize)
frequencies.extend(freq)
values11.extend(val11)
values21.extend(val21)
self.percentage = (i + 1) * 100 / self.noSweeps
logger.debug("Saving acquired data")
self.saveData(frequencies, values11, values21)
except NanoVNAValueException as e:
freq, values11, values21 = self.readAveragedSegment(
start, stop, averages)
self.percentage = (i + 1) * 100 / self.sweep.sweeps
self.updateData(freq, values11, values21, i)
except ValueError as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepError.emit()
except NanoVNASerialException as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepFatalError.emit()
while self.continuousSweep and not self.stopped:
logger.debug("Continuous sweeping")
for i in range(self.noSweeps):
logger.debug("Sweep segment no %d", i)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
break
start = sweep_from + i * self.vna.datapoints * stepsize
try:
_, values11, values21 = self.readSegment(
start, start + (self.vna.datapoints-1) * stepsize)
logger.debug("Updating acquired data")
self.updateData(values11, values21, i, self.vna.datapoints)
except NanoVNAValueException as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepError.emit()
except NanoVNASerialException as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepFatalError.emit()
if not self.continuousSweep:
finished = True
# Reset the device to show the full range if we were multisegment
if self.noSweeps > 1:
if self.sweep.sweeps > 1:
start = self.app.sweep_control.get_start()
end = self.app.sweep_control.get_end()
logger.debug("Resetting NanoVNA sweep to full range: %d to %d",
parse_frequency(
self.app.sweepStartInput.text()),
parse_frequency(self.app.sweepEndInput.text()))
self.vna.resetSweep(
parse_frequency(self.app.sweepStartInput.text()),
parse_frequency(self.app.sweepEndInput.text()))
start, end)
self.app.vna.resetSweep(start, end)
self.percentage = 100
logger.debug("Sending \"finished\" signal")
logger.debug('Sending "finished" signal')
self.signals.finished.emit()
self.running = False
return
def updateData(self, values11, values21, offset, segment_size=101):
def init_data(self):
self.data11 = []
self.data21 = []
self.rawData11 = []
self.rawData21 = []
for freq in self.sweep.get_frequencies():
self.data11.append(Datapoint(freq, 0.0, 0.0))
self.data21.append(Datapoint(freq, 0.0, 0.0))
self.rawData11.append(Datapoint(freq, 0.0, 0.0))
self.rawData21.append(Datapoint(freq, 0.0, 0.0))
logger.debug("Init data length: %s", len(self.data11))
def updateData(self, frequencies, values11, values21, index):
# Update the data from (i*101) to (i+1)*101
logger.debug(
"Calculating data and inserting in existing data at offset %d",
offset)
for i, val11 in enumerate(values11):
re, im = val11
re21, im21 = values21[i]
freq = self.data11[offset * segment_size + i].freq
raw_data11 = Datapoint(freq, re, im)
raw_data21 = Datapoint(freq, re21, im21)
data11, data21 = self.applyCalibration([raw_data11], [raw_data21])
self.data11[offset * segment_size + i] = data11[0]
self.data21[offset * segment_size + i] = data21[0]
self.rawData11[offset * segment_size + i] = raw_data11
self.rawData21[offset * segment_size + i] = raw_data21
logger.debug("Saving data to application (%d and %d points)",
len(self.data11), len(self.data21))
self.app.saveData(self.data11, self.data21)
logger.debug("Sending \"updated\" signal")
self.signals.updated.emit()
def saveData(self, frequencies, values11, values21):
logger.debug("Freqs: %d, values11: %d, values21: %d",
len(frequencies), len(values11), len(values21))
"Calculating data and inserting in existing data at index %d",
index)
offset = self.sweep.points * index
v11 = values11[:]
v21 = values21[:]
raw_data11 = []
raw_data21 = []
logger.debug("Calculating data including corrections")
for freq in frequencies:
real11, imag11 = v11.pop(0)
real21, imag21 = v21.pop(0)
raw_data11.append(Datapoint(freq, real11, imag11))
raw_data21.append(Datapoint(freq, real21, imag21))
self.rawData11 = raw_data11
self.rawData21 = raw_data21
self.data11, self.data21 = self.applyCalibration(
raw_data11, raw_data21)
data11, data21 = self.applyCalibration(raw_data11, raw_data21)
logger.debug("update Freqs: %s, Offset: %s", len(frequencies), offset)
for i in range(len(frequencies)):
self.data11[offset + i] = data11[i]
self.data21[offset + i] = data21[i]
self.rawData11[offset + i] = raw_data11[i]
self.rawData21[offset + i] = raw_data21[i]
logger.debug("Saving data to application (%d and %d points)",
len(self.data11), len(self.data21))
self.app.saveData(self.data11, self.data21)
logger.debug("Sending \"updated\" signal")
logger.debug('Sending "updated" signal')
self.signals.updated.emit()
def applyCalibration(self,
raw_data11: List[Datapoint],
raw_data21: List[Datapoint]
) -> (List[Datapoint], List[Datapoint]):
) -> Tuple[List[Datapoint], List[Datapoint]]:
if self.offsetDelay != 0:
tmp = []
for dp in raw_data11:
@ -297,44 +270,42 @@ class SweepWorker(QtCore.QRunnable):
data21 = raw_data21
return data11, data21
def readAveragedSegment(self, start, stop, averages):
val11 = []
val21 = []
def readAveragedSegment(self, start, stop, averages=1):
values11 = []
values21 = []
freq = []
logger.info("Reading %d averages from %d to %d", averages, start, stop)
logger.info("Reading from %d to %d. Averaging %d values",
start, stop, averages)
for i in range(averages):
if self.stopped:
logger.debug("Stopping averaging as signalled")
break
logger.debug("Reading average no %d / %d", i+1, averages)
freq, tmp11, tmp21 = self.readSegment(start, stop)
val11.append(tmp11)
val21.append(tmp21)
self.percentage += 100/(self.noSweeps*averages)
values11.append(tmp11)
values21.append(tmp21)
self.percentage += 100 / (self.sweep.sweeps * averages)
self.signals.updated.emit()
logger.debug("Post-processing averages")
logger.debug("Truncating %d values by %d", len(val11), self.truncates)
val11 = truncate(val11, self.truncates)
val21 = truncate(val21, self.truncates)
logger.debug("Averaging %d values", len(val11))
if self.truncates and averages > 1:
logger.debug("Truncating %d values by %d",
len(values11), self.truncates)
values11 = truncate(values11, self.truncates)
values21 = truncate(values21, self.truncates)
return11 = np.average(val11, 0).tolist()
return21 = np.average(val21, 0).tolist()
logger.debug("Averaging %d values", len(values11))
values11 = np.average(values11, 0).tolist()
values21 = np.average(values21, 0).tolist()
return freq, return11, return21
return freq, values11, values21
def readSegment(self, start, stop):
logger.debug("Setting sweep range to %d to %d", start, stop)
self.vna.setSweep(start, stop)
self.app.vna.setSweep(start, stop)
# Let's check the frequencies first:
frequencies = self.readFreq()
# S11
frequencies = self.app.vna.readFrequencies()
values11 = self.readData("data 0")
# S21
values21 = self.readData("data 1")
if (len(frequencies) != len(values11) or
len(frequencies) != len(values21)):
logger.info("No valid data during this run")
@ -350,41 +321,35 @@ class SweepWorker(QtCore.QRunnable):
while not done:
done = True
returndata = []
tmpdata = self.vna.readValues(data)
tmpdata = self.app.vna.readValues(data)
logger.debug("Read %d values", len(tmpdata))
for d in tmpdata:
a, b = d.split(" ")
try:
if self.vna.validateInput and (
float(a) < -9.5 or float(a) > 9.5):
if self.app.vna.validateInput and (
abs(float(a)) > 9.5 or
abs(float(b)) > 9.5):
logger.warning(
"Got a non-float data value: %s (%s)", d, a)
logger.debug("Re-reading %s", data)
"Got a non plausible data value: (%s)", d)
done = False
elif self.vna.validateInput and (
float(b) < -9.5 or float(b) > 9.5):
logger.warning(
"Got a non-float data value: %s (%s)", d, b)
logger.debug("Re-reading %s", data)
done = False
else:
returndata.append((float(a), float(b)))
except Exception as e:
break
returndata.append((float(a), float(b)))
except ValueError as exc:
logger.exception("An exception occurred reading %s: %s",
data, e)
logger.debug("Re-reading %s", data)
data, exc)
done = False
if not done:
logger.debug("Re-reading %s", data)
sleep(0.2)
count += 1
if count == 10:
if count == 5:
logger.error("Tried and failed to read %s %d times.",
data, count)
if count >= 20:
if count >= 10:
logger.critical(
"Tried and failed to read %s %d times. Giving up.",
data, count)
raise NanoVNAValueException(
raise IOError(
f"Failed reading {data} {count} times.\n"
f"Data outside expected valid ranges,"
f" or in an unexpected format.\n\n"
@ -392,40 +357,6 @@ class SweepWorker(QtCore.QRunnable):
f"device settings screen.")
return returndata
def readFreq(self):
# TODO: Figure out why frequencies sometimes arrive as non-integers
logger.debug("Reading frequencies")
returnfreq = []
done = False
count = 0
while not done:
done = True
returnfreq = []
tmpfreq = self.vna.readFrequencies()
if not tmpfreq:
logger.warning("Read no frequencies")
raise NanoVNASerialException(
"Failed reading frequencies: Returned no values.")
for f in tmpfreq:
if not f.isdigit():
logger.warning("Got a non-digit frequency: %s", f)
logger.debug("Re-reading frequencies")
done = False
count += 1
if count == 10:
logger.error(
"Tried and failed %d times to read frequencies.",
count)
if count >= 20:
logger.critical(
"Tried and failed to read frequencies from the"
" NanoVNA %d times.", count)
raise NanoVNAValueException(
f"Failed reading frequencies {count} times.")
else:
returnfreq.append(int(f))
return returnfreq
def setContinuousSweep(self, continuous_sweep: bool):
self.continuousSweep = continuous_sweep
@ -437,13 +368,8 @@ class SweepWorker(QtCore.QRunnable):
except ValueError:
return
def setVNA(self, vna):
self.vna = vna
class NanoVNAValueException(Exception):
pass
class NanoVNASerialException(Exception):
pass
def gui_error(self, message: str):
self.error_message = message
self.stopped = True
self.running = False
self.signals.sweepError.emit()

Wyświetl plik

@ -21,6 +21,11 @@ import math
import cmath
import io
from operator import attrgetter
from typing import List
from scipy.interpolate import interp1d
from NanoVNASaver.RFTools import Datapoint
logger = logging.getLogger(__name__)
@ -98,30 +103,78 @@ class Touchstone:
self.sdata = [[], [], [], []] # at max 4 data pairs
self.comments = []
self.opts = Options()
self._interp = {}
@property
def s11data(self) -> list:
def s11data(self) -> List[Datapoint]:
return self.s("11")
@s11data.setter
def s11data(self, value: List[Datapoint]):
self.sdata[0] = value
@property
def s12data(self) -> list:
def s12data(self) -> List[Datapoint]:
return self.s("12")
@property
def s21data(self) -> list:
return self.s("21")
@s12data.setter
def s12data(self, value: List[Datapoint]):
self.sdata[2] = value
@property
def s22data(self) -> list:
def s21data(self) -> List[Datapoint]:
return self.s("21")
@s21data.setter
def s21data(self, value: List[Datapoint]):
self.sdata[1] = value
@property
def s22data(self) -> List[Datapoint]:
return self.s("22")
@s22data.setter
def s22data(self, value: List[Datapoint]):
self.sdata[3] = value
@property
def r(self) -> int:
return self.opts.resistance
def s(self, name: str) -> list:
def s(self, name: str) -> List[Datapoint]:
return self.sdata[Touchstone.FIELD_ORDER.index(name)]
def s_freq(self, name: str, freq: int) -> Datapoint:
return Datapoint(freq,
float(self._interp[name]["real"](freq)),
float(self._interp[name]["imag"](freq)))
def min_freq(self) -> int:
return self.s("11")[0].freq
def max_freq(self) -> int:
return self.s("11")[-1].freq
def gen_interpolation(self):
for i in Touchstone.FIELD_ORDER:
freq = []
real = []
imag = []
for dp in self.s(i):
freq.append(dp.freq)
real.append(dp.re)
imag.append(dp.im)
self._interp[i] = {
"real": interp1d(freq, real,
kind="slinear", bounds_error=False,
fill_value=(real[0], real[-1])),
"imag": interp1d(freq, imag,
kind="slinear", bounds_error=False,
fill_value=(imag[0], imag[-1])),
}
def _parse_comments(self, fp) -> str:
for line in fp:
line = line.strip()

Wyświetl plik

@ -0,0 +1,203 @@
# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2019, 2020 Rune B. Broberg
# Copyright (C) 2020 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from PyQt5 import QtWidgets, QtCore
from PyQt5.QtCore import pyqtSignal
from NanoVNASaver.Formatting import (
format_frequency_sweep, format_frequency_short,
parse_frequency)
from NanoVNASaver.Inputs import FrequencyInputWidget
logger = logging.getLogger(__name__)
class SweepControl(QtWidgets.QGroupBox):
updated = pyqtSignal(object)
def __init__(self, app: QtWidgets.QWidget):
super().__init__()
self.app = app
self.setMaximumWidth(250)
self.setTitle("Sweep control")
control_layout = QtWidgets.QFormLayout(self)
line = QtWidgets.QFrame()
line.setFrameShape(QtWidgets.QFrame.VLine)
input_layout = QtWidgets.QHBoxLayout()
input_left_layout = QtWidgets.QFormLayout()
input_right_layout = QtWidgets.QFormLayout()
input_layout.addLayout(input_left_layout)
input_layout.addWidget(line)
input_layout.addLayout(input_right_layout)
control_layout.addRow(input_layout)
self.input_start = FrequencyInputWidget()
self.input_start.setMinimumWidth(60)
self.input_start.setAlignment(QtCore.Qt.AlignRight)
self.input_start.textEdited.connect(self.update_center_span)
self.input_start.textChanged.connect(self.update_step_size)
input_left_layout.addRow(QtWidgets.QLabel("Start"), self.input_start)
self.input_end = FrequencyInputWidget()
self.input_end.setAlignment(QtCore.Qt.AlignRight)
self.input_end.textEdited.connect(self.update_center_span)
self.input_end.textChanged.connect(self.update_step_size)
input_left_layout.addRow(QtWidgets.QLabel("Stop"), self.input_end)
self.input_center = FrequencyInputWidget()
self.input_center.setMinimumWidth(60)
self.input_center.setAlignment(QtCore.Qt.AlignRight)
self.input_center.textEdited.connect(self.update_start_end)
input_right_layout.addRow(QtWidgets.QLabel("Center"), self.input_center)
self.input_span = FrequencyInputWidget()
self.input_span.setAlignment(QtCore.Qt.AlignRight)
self.input_span.textEdited.connect(self.update_start_end)
input_right_layout.addRow(QtWidgets.QLabel("Span"), self.input_span)
self.input_count = QtWidgets.QLineEdit(self.app.settings.value("Segments", "1"))
self.input_count.setAlignment(QtCore.Qt.AlignRight)
self.input_count.setFixedWidth(60)
self.input_count.textEdited.connect(self.update_step_size)
self.label_step = QtWidgets.QLabel("Hz/step")
self.label_step.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignVCenter)
segment_layout = QtWidgets.QHBoxLayout()
segment_layout.addWidget(self.input_count)
segment_layout.addWidget(self.label_step)
control_layout.addRow(QtWidgets.QLabel("Segments"), segment_layout)
btn_settings_window = QtWidgets.QPushButton("Sweep settings ...")
btn_settings_window.clicked.connect(
lambda: self.app.display_window("sweep_settings"))
control_layout.addRow(btn_settings_window)
self.progress_bar = QtWidgets.QProgressBar()
self.progress_bar.setMaximum(100)
self.progress_bar.setValue(0)
control_layout.addRow(self.progress_bar)
self.btn_start = QtWidgets.QPushButton("Sweep")
self.btn_start.clicked.connect(self.app.sweep_start)
self.btn_start.setShortcut(QtCore.Qt.Key_W | QtCore.Qt.CTRL)
self.btn_stop = QtWidgets.QPushButton("Stop")
self.btn_stop.clicked.connect(self.app.sweep_stop)
self.btn_stop.setShortcut(QtCore.Qt.Key_Escape)
self.btn_stop.setDisabled(True)
btn_layout = QtWidgets.QHBoxLayout()
btn_layout.addWidget(self.btn_start)
btn_layout.addWidget(self.btn_stop)
btn_layout.setContentsMargins(0, 0, 0, 0)
btn_layout_widget = QtWidgets.QWidget()
btn_layout_widget.setLayout(btn_layout)
control_layout.addRow(btn_layout_widget)
self.input_start.textEdited.emit(self.input_start.text())
self.input_start.textChanged.emit(self.input_start.text())
def get_start(self) -> int:
return parse_frequency(self.input_start.text())
def set_start(self, start: int):
self.input_start.setText(format_frequency_sweep(start))
self.input_start.textEdited.emit(self.input_start.text())
self.updated.emit(self)
def get_end(self) -> int:
return parse_frequency(self.input_end.text())
def set_end(self, end: int):
self.input_end.setText(format_frequency_sweep(end))
self.input_end.textEdited.emit(self.input_end.text())
self.updated.emit(self)
def get_center(self) -> int:
return parse_frequency(self.input_center.text())
def set_center(self, center: int):
self.input_center.setText(format_frequency_sweep(center))
self.input_center.textEdited.emit(self.input_center.text())
self.updated.emit(self)
def get_count(self) -> int:
try:
result = int(self.input_count.text())
except ValueError:
result = 1
return result
def set_count(self, count: int):
self.input_count.setText(str(count))
self.input_count.textEdited.emit(self.input_count.text())
self.updated.emit(self)
def get_span(self) -> int:
return parse_frequency(self.input_span.text())
def set_span(self, span: int):
self.input_span.setText(format_frequency_sweep(span))
self.input_span.textEdited.emit(self.input_span.text())
self.updated.emit(self)
def toggle_settings(self, disabled):
self.input_start.setDisabled(disabled)
self.input_end.setDisabled(disabled)
self.input_span.setDisabled(disabled)
self.input_center.setDisabled(disabled)
self.input_count.setDisabled(disabled)
def update_center_span(self):
fstart = self.get_start()
fstop = self.get_end()
fspan = fstop - fstart
fcenter = round((fstart + fstop) / 2)
if fspan < 0 or fstart < 0 or fstop < 0:
return
self.input_span.setText(fspan)
self.input_center.setText(fcenter)
def update_start_end(self):
fcenter = self.get_center()
fspan = self.get_span()
if fspan < 0 or fcenter < 0:
return
fstart = round(fcenter - fspan / 2)
fstop = round(fcenter + fspan / 2)
if fstart < 0 or fstop < 0:
return
self.input_start.setText(fstart)
self.input_end.setText(fstop)
def update_step_size(self):
fspan = self.get_span()
if fspan < 0:
return
segments = self.get_count()
if segments > 0:
fstep = fspan / (segments * self.app.vna.datapoints - 1)
self.label_step.setText(
f"{format_frequency_short(fstep)}/step")

Wyświetl plik

@ -0,0 +1 @@
from .SweepControl import SweepControl

Wyświetl plik

@ -53,7 +53,7 @@ class AboutWindow(QtWidgets.QWidget):
f"NanoVNASaver version {self.app.version}"))
layout.addWidget(QtWidgets.QLabel(""))
layout.addWidget(QtWidgets.QLabel(
"\N{COPYRIGHT SIGN} Copyright 2019, 2020 Rune B. Broberg"
"\N{COPYRIGHT SIGN} Copyright 2019, 2020 Rune B. Broberg\n"
"\N{COPYRIGHT SIGN} Copyright 2020 NanoVNA-Saver Authors"
))
layout.addWidget(QtWidgets.QLabel(
@ -115,12 +115,12 @@ class AboutWindow(QtWidgets.QWidget):
self.updateLabels()
def updateLabels(self):
if self.app.vna.isValid():
logger.debug("Valid VNA")
v: Version = self.app.vna.version
try:
self.versionLabel.setText(
f"NanoVNA Firmware Version: {self.app.vna.name}"
f"{v.version_string}")
f"NanoVNA Firmware Version: {self.app.vna.name} "
f"v{self.app.vna.version}")
except (IOError, AttributeError):
pass
def updateSettings(self):
if self.updateCheckBox.isChecked():
@ -150,7 +150,7 @@ class AboutWindow(QtWidgets.QWidget):
self.app.settings.setValue("CheckForUpdates", "Ask")
def findUpdates(self, automatic=False):
latest_version = Version("")
latest_version = Version()
latest_url = ""
try:
req = request.Request(VERSION_URL)
@ -174,7 +174,7 @@ class AboutWindow(QtWidgets.QWidget):
self.updateLabel.setText("Connection error.")
return
logger.info("Latest version is %s", latest_version.version_string)
logger.info("Latest version is %s", latest_version)
this_version = Version(self.app.version)
logger.info("This is %s", this_version)
if latest_version > this_version:
@ -183,9 +183,9 @@ class AboutWindow(QtWidgets.QWidget):
QtWidgets.QMessageBox.information(
self,
"Updates available",
"There is a new update for NanoVNA-Saver available!\n" +
"Version " + latest_version.version_string + "\n\n" +
"Press \"About\" to find the update.")
f"There is a new update for NanoVNA-Saver available!\n"
f"Version {latest_version}\n\n"
f'Press "About" to find the update.')
else:
QtWidgets.QMessageBox.information(
self, "Updates available",

Wyświetl plik

@ -22,8 +22,7 @@ from PyQt5 import QtWidgets, QtCore
from NanoVNASaver.Analysis import Analysis, LowPassAnalysis, HighPassAnalysis, \
BandPassAnalysis, BandStopAnalysis, VSWRAnalysis, \
SimplePeakSearchAnalysis
from NanoVNASaver.Analysis.AntennaAnalysis import MagLoopAnalysis
SimplePeakSearchAnalysis, MagLoopAnalysis
logger = logging.getLogger(__name__)

Wyświetl plik

@ -232,9 +232,9 @@ class CalibrationWindow(QtWidgets.QWidget):
if name in ("through", "isolation"):
self.app.calibration.insert(name, self.app.data21)
else:
self.app.calibration.insert(name, self.app.data)
self.app.calibration.insert(name, self.app.data11)
self.cal_label[name].setText(
_format_cal_label(len(self.app.data)))
_format_cal_label(len(self.app.data11)))
def manual_save(self, name: str):
if self.checkExpertUser():
@ -452,7 +452,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.app.worker.signals.updated.emit()
def calculate(self):
if self.app.btnStopSweep.isEnabled():
if self.app.sweep_control.btn_stop.isEnabled():
# Currently sweeping
self.app.showError("Unable to apply calibration while a sweep is running. " +
"Please stop the sweep and try again.")
@ -633,7 +633,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.btn_automatic.setDisabled(False)
return
logger.info("Starting automatic calibration assistant.")
if not self.app.serial.is_open:
if not self.app.vna.connected():
QtWidgets.QMessageBox(
QtWidgets.QMessageBox.Information,
"NanoVNA not connected",
@ -666,7 +666,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.app.calibration.source = "Calibration assistant"
self.nextStep = 0
self.app.worker.signals.finished.connect(self.automaticCalibrationStep)
self.app.sweep()
self.app.sweep_start()
return
def automaticCalibrationStep(self):
@ -695,7 +695,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.app.worker.signals.finished.disconnect(
self.automaticCalibrationStep)
return
self.app.sweep()
self.app.sweep_start()
return
if self.nextStep == 1:
@ -716,7 +716,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.app.worker.signals.finished.disconnect(
self.automaticCalibrationStep)
return
self.app.sweep()
self.app.sweep_start()
return
if self.nextStep == 2:
@ -760,7 +760,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.app.worker.signals.finished.disconnect(
self.automaticCalibrationStep)
return
self.app.sweep()
self.app.sweep_start()
return
if self.nextStep == 3:
@ -782,7 +782,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.app.worker.signals.finished.disconnect(
self.automaticCalibrationStep)
return
self.app.sweep()
self.app.sweep_start()
return
if self.nextStep == 4:

Wyświetl plik

@ -35,6 +35,12 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
QtWidgets.QShortcut(QtCore.Qt.Key_Escape, self, self.hide)
self.label = {
"status": QtWidgets.QLabel("Not connected."),
"firmware": QtWidgets.QLabel("Not connected."),
"calibration": QtWidgets.QLabel("Not connected."),
}
top_layout = QtWidgets.QHBoxLayout()
left_layout = QtWidgets.QVBoxLayout()
right_layout = QtWidgets.QVBoxLayout()
@ -44,13 +50,13 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
status_box = QtWidgets.QGroupBox("Status")
status_layout = QtWidgets.QFormLayout(status_box)
self.statusLabel = QtWidgets.QLabel("Not connected.")
status_layout.addRow("Status:", self.statusLabel)
self.calibrationStatusLabel = QtWidgets.QLabel("Not connected.")
status_layout.addRow("Calibration:", self.calibrationStatusLabel)
status_layout.addRow("Status:", self.label["status"])
status_layout.addRow("Firmware:", self.label["firmware"])
status_layout.addRow("Calibration:", self.label["calibration"])
status_layout.addRow(QtWidgets.QLabel("Features:"))
self.featureList = QtWidgets.QListWidget()
status_layout.addRow(self.featureList)
@ -58,7 +64,7 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
settings_layout = QtWidgets.QFormLayout(settings_box)
self.chkValidateInputData = QtWidgets.QCheckBox("Validate received data")
validate_input = self.app.settings.value("SerialInputValidation", True, bool)
validate_input = self.app.settings.value("SerialInputValidation", False, bool)
self.chkValidateInputData.setChecked(validate_input)
self.chkValidateInputData.stateChanged.connect(self.updateValidation)
settings_layout.addRow("Validation", self.chkValidateInputData)
@ -79,8 +85,16 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
self.datapoints = QtWidgets.QComboBox()
self.datapoints.addItem(str(self.app.vna.datapoints))
self.datapoints.currentIndexChanged.connect(self.updateNrDatapoints)
self.datapoints.currentIndexChanged.connect(
self.app.sweep_control.update_step_size)
self.bandwidth = QtWidgets.QComboBox()
self.bandwidth.addItem(str(self.app.vna.bandwidth))
self.bandwidth.currentIndexChanged.connect(self.updateBandwidth)
form_layout = QtWidgets.QFormLayout()
form_layout.addRow(QtWidgets.QLabel("Datapoints"), self.datapoints)
form_layout.addRow(QtWidgets.QLabel("Bandwidth"), self.bandwidth)
right_layout.addWidget(settings_box)
settings_layout.addRow(form_layout)
@ -88,38 +102,57 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
self.datapoints.setCurrentIndex(
self.datapoints.findText(str(dpoints)))
def _set_bandwidth_index(self, bw: int):
self.bandwidth.setCurrentIndex(
self.bandwidth.findText(str(bw)))
def show(self):
super().show()
self.updateFields()
def updateFields(self):
if self.app.vna.isValid():
self.statusLabel.setText("Connected to " + self.app.vna.name + ".")
if self.app.worker.running:
self.calibrationStatusLabel.setText("(Sweep running)")
else:
self.calibrationStatusLabel.setText(self.app.vna.getCalibration())
if not self.app.vna.connected():
self.label["status"].setText("Not connected.")
self.label["firmware"].setText("Not connected.")
self.label["calibration"].setText("Not connected.")
self.featureList.clear()
self.featureList.addItem(self.app.vna.name + " v" + str(self.app.vna.version))
features = self.app.vna.getFeatures()
for item in features:
self.featureList.addItem(item)
self.btnCaptureScreenshot.setDisabled("Screenshots" not in features)
if "Customizable data points" in features:
self.datapoints.clear()
cur_dps = self.app.vna.datapoints
dplist = self.app.vna._datapoints[:]
for d in sorted(dplist):
self.datapoints.addItem(str(d))
self._set_datapoint_index(cur_dps)
else:
self.statusLabel.setText("Not connected.")
self.calibrationStatusLabel.setText("Not connected.")
self.featureList.clear()
self.featureList.addItem("Not connected.")
self.btnCaptureScreenshot.setDisabled(True)
return
self.label["status"].setText(
f"Connected to {self.app.vna.name}.")
self.label["firmware"].setText(
f"{self.app.vna.name} v{self.app.vna.version}")
if self.app.worker.running:
self.label["calibration"].setText("(Sweep running)")
else:
self.label["calibration"].setText(self.app.vna.getCalibration())
self.featureList.clear()
features = self.app.vna.getFeatures()
for item in features:
self.featureList.addItem(item)
self.btnCaptureScreenshot.setDisabled("Screenshots" not in features)
if "Customizable data points" in features:
self.datapoints.clear()
cur_dps = self.app.vna.datapoints
for d in sorted(self.app.vna.valid_datapoints):
self.datapoints.addItem(str(d))
self._set_datapoint_index(cur_dps)
self.datapoints.setDisabled(False)
else:
self.datapoints.setDisabled(True)
if "Bandwidth" in features:
self.bandwidth.clear()
cur_bw = self.app.vna.bandwidth
for d in sorted(self.app.vna.get_bandwidths()):
self.bandwidth.addItem(str(d))
self._set_bandwidth_index(cur_bw)
self.bandwidth.setDisabled(False)
else:
self.bandwidth.setDisabled(True)
def updateValidation(self, validate_data: bool):
self.app.vna.validateInput = validate_data
@ -139,3 +172,9 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
return
logger.debug("DP: %s", self.datapoints.itemText(i))
self.app.vna.datapoints = int(self.datapoints.itemText(i))
def updateBandwidth(self, i):
if i < 0 or self.app.worker.running:
return
logger.debug("Bandwidth: %s", self.bandwidth.itemText(i))
self.app.vna.set_bandwidth(int(self.bandwidth.itemText(i)))

Wyświetl plik

@ -77,7 +77,7 @@ class SweepSettingsWindow(QtWidgets.QWidget):
"Averaging allows discarding outlying samples to get better averages."))
settings_layout.addRow(
QtWidgets.QLabel("Common values are 3/0, 5/2, 9/4 and 25/6."))
self.s21att = QtWidgets.QLineEdit("0")
settings_layout.addRow(QtWidgets.QLabel(""))
@ -158,20 +158,16 @@ class SweepSettingsWindow(QtWidgets.QWidget):
f" to {format_frequency_short(stop)}")
def setS21Attenuator(self):
try:
s21att = float(self.s21att.text())
except:
except ValueError:
s21att = 0
if (s21att < 0):
if s21att < 0:
logger.warning("Values for attenuator are absolute and with no minus sign, resetting.")
self.s21att.setText("0")
else:
logger.info("Setting an attenuator of %.2f dB inline with the CH1/S21 input", s21att)
self.app.s21att = s21att
self.app.s21att = 0
logger.info("Setting an attenuator of %.2f dB inline with the CH1/S21 input", s21att)
self.app.s21att = s21att
def setBandSweep(self):
index_start = self.band_list.model().index(self.band_list.currentIndex(), 1)
@ -194,9 +190,12 @@ class SweepSettingsWindow(QtWidgets.QWidget):
start = max(1, start)
stop += round(span * padding / 100)
self.app.sweepStartInput.setText(format_frequency_sweep(start))
self.app.sweepEndInput.setText(format_frequency_sweep(stop))
self.app.sweepEndInput.textEdited.emit(self.app.sweepEndInput.text())
self.app.sweep_control.input_start.setText(
format_frequency_sweep(start))
self.app.sweep_control.input_end.setText(
format_frequency_sweep(stop))
self.app.sweep_control.input_end.textEdited.emit(
self.app.sweep_control.input_end.text())
def updateAveraging(self):
self.app.worker.setAveraging(self.averaged_sweep_radiobutton.isChecked(),

Wyświetl plik

@ -107,7 +107,7 @@ class TDRWindow(QtWidgets.QWidget):
# TODO: Let the user select whether to use high or low resolution TDR?
FFT_POINTS = 2**14
if len(self.app.data) < 2:
if len(self.app.data11) < 2:
return
if self.tdr_velocity_dropdown.currentData() == -1:
@ -121,17 +121,17 @@ class TDRWindow(QtWidgets.QWidget):
except ValueError:
return
step_size = self.app.data[1].freq - self.app.data[0].freq
step_size = self.app.data11[1].freq - self.app.data11[0].freq
if step_size == 0:
self.tdr_result_label.setText("")
logger.info("Cannot compute cable length at 0 span")
return
s11 = []
for d in self.app.data:
for d in self.app.data11:
s11.append(np.complex(d.re, d.im))
window = np.blackman(len(self.app.data))
window = np.blackman(len(self.app.data11))
windowed_s11 = window * s11
self.td = np.abs(np.fft.ifft(windowed_s11, FFT_POINTS))

Wyświetl plik

@ -79,8 +79,10 @@ def main():
app = QtWidgets.QApplication(sys.argv)
window = NanoVNASaver()
window.show()
app.exec_()
try:
app.exec_()
except BaseException as exc:
logger.exception("%s", exc)
if __name__ == '__main__':
main()

Wyświetl plik

@ -16,6 +16,23 @@ points, and generally display and analyze the resulting data.
# Latest Changes
## Changes in v0.3.6
- Added bandwidth setting in device manage dialog
## Changes in v0.3.5
- Sweep worker now initializes full dataset on setting changes.
Therefore no resize of charts when doing multi segment sweep
- Changing datapoints in DeviceSettings are reflected in SweepSettings widget step size
- Simplified calibration code by just using scipy.interp1d with fill\_value
- Established Interface class to ease locking and allow non usb connections in future
- Cleaned up VNA code. Added some pause statements to get more robust readings
- Added MagLoopAnalysis
- Touchstone class can now generate interpolated Datapoints for a given frequency
Will be usefull in future analysis code
- Fixed a bug in Version comparison
## Changes in v0.3.4
- Refactored Analysis
- Add Antenna Analysis

Wyświetl plik

@ -69,12 +69,20 @@ class TestTouchstoneTouchstone(unittest.TestCase):
ts = Touchstone("./test/data/valid.s2p")
ts.load()
ts.gen_interpolation()
self.assertEqual(str(ts.opts), "# HZ S RI R 50")
self.assertEqual(len(ts.s11data), 1020)
self.assertEqual(len(ts.s21data), 1020)
self.assertEqual(len(ts.s12data), 1020)
self.assertEqual(len(ts.s22data), 1020)
self.assertIn("! Vector Network Analyzer VNA R2", ts.comments)
self.assertEqual(ts.min_freq(), 500000)
self.assertEqual(ts.max_freq(), 900000000)
self.assertEqual(ts.s_freq("11", 1),
Datapoint(1, -3.33238E-001, 1.80018E-004))
self.assertEqual(ts.s_freq("11", 750000),
Datapoint(750000, -0.3331754099382822,
0.00032433255669243524))
ts = Touchstone("./test/data/ma.s2p")
ts.load()
@ -119,9 +127,13 @@ class TestTouchstoneTouchstone(unittest.TestCase):
with self.assertLogs(level=logging.WARNING) as cm:
ts.load()
self.assertEqual(cm.output, [
'WARNING:NanoVNASaver.Touchstone:Non integer resistance value: 50.0',
'WARNING:NanoVNASaver.Touchstone:Comment after header: !freq ReS11 ImS11 ReS21 ImS21 ReS12 ImS12 ReS22 ImS22',
'WARNING:NanoVNASaver.Touchstone:Frequency not ascending: 15000000.0 0.849810063 -0.4147357 -0.000306106 0.0041482 0.0 0.0 0.0 0.0',
'WARNING:NanoVNASaver.Touchstone:'
'Non integer resistance value: 50.0',
'WARNING:NanoVNASaver.Touchstone:Comment after header:'
' !freq ReS11 ImS11 ReS21 ImS21 ReS12 ImS12 ReS22 ImS22',
'WARNING:NanoVNASaver.Touchstone:Frequency not ascending:'
' 15000000.0 0.849810063 -0.4147357 -0.000306106 0.0041482'
' 0.0 0.0 0.0 0.0',
'WARNING:NanoVNASaver.Touchstone:Reordering data',
])
self.assertEqual(str(ts.opts), "# HZ S RI R 50")
@ -129,6 +141,23 @@ class TestTouchstoneTouchstone(unittest.TestCase):
self.assertIn("!freq ReS11 ImS11 ReS21 ImS21 ReS12 ImS12 ReS22 ImS22",
ts.comments)
def test_setter(self):
ts = Touchstone("")
dp_list = [Datapoint(1, 0.0, 0.0), Datapoint(3, 1.0, 1.0)]
ts.s11data = dp_list[:]
ts.s21data = dp_list[:]
ts.s12data = dp_list[:]
ts.s22data = dp_list[:]
self.assertEqual(ts.s11data, dp_list)
self.assertEqual(ts.s21data, dp_list)
self.assertEqual(ts.s12data, dp_list)
self.assertEqual(ts.s22data, dp_list)
self.assertEqual(ts.min_freq(), 1)
self.assertEqual(ts.max_freq(), 3)
ts.gen_interpolation()
self.assertEqual(ts.s_freq("11", 2), Datapoint(2, 0.5, 0.5))
def test_save(self):
ts = Touchstone("./test/data/valid.s2p")
self.assertEqual(ts.saves(), "# HZ S RI R 50\n")
@ -141,8 +170,12 @@ class TestTouchstoneTouchstone(unittest.TestCase):
lines = ts.saves(4).splitlines()
self.assertEqual(len(lines), 1021)
self.assertEqual(lines[0], "# HZ S RI R 50")
self.assertEqual(lines[1], '500000 -0.333238 0.000180018 0.67478 -8.1951e-07 0.67529 -8.20129e-07 -0.333238 0.000308078')
self.assertEqual(lines[-1], '900000000 -0.127646 0.31969 0.596287 -0.503453 0.599076 -0.50197 -0.122713 0.326965')
self.assertEqual(lines[1],
'500000 -0.333238 0.000180018 0.67478 -8.1951e-07'
' 0.67529 -8.20129e-07 -0.333238 0.000308078')
self.assertEqual(lines[-1],
'900000000 -0.127646 0.31969 0.596287 -0.503453'
' 0.599076 -0.50197 -0.122713 0.326965')
ts.filename = "./test/data/output.s2p"
ts.save(4)
os.remove(ts.filename)