diff --git a/NanoVNASaver/Hardware/AVNA.py b/NanoVNASaver/Hardware/AVNA.py
index b6cac6e..75947e3 100644
--- a/NanoVNASaver/Hardware/AVNA.py
+++ b/NanoVNASaver/Hardware/AVNA.py
@@ -20,8 +20,7 @@ import logging
from time import sleep
from typing import List
-import serial
-
+from NanoVNASaver.Hardware.Serial import Interface
from NanoVNASaver.Hardware.VNA import VNA, Version
logger = logging.getLogger(__name__)
@@ -30,66 +29,18 @@ logger = logging.getLogger(__name__)
class AVNA(VNA):
name = "AVNA"
- def __init__(self, app, serial_port):
- super().__init__(app, serial_port)
+ def __init__(self, iface: Interface):
+ super().__init__(iface)
self.version = Version(self.readVersion())
self.features.add("Customizable data points")
def isValid(self):
return True
- def getCalibration(self) -> str:
- logger.debug("Reading calibration info.")
- if not self.serial.is_open:
- return "Not connected."
- with self.app.serialLock:
- try:
- data = "a"
- while data != "":
- data = self.serial.readline().decode('ascii')
- self.serial.write("cal\r".encode('ascii'))
- result = ""
- data = ""
- sleep(0.1)
- while "ch>" not in data:
- data = self.serial.readline().decode('ascii')
- result += data
- values = result.splitlines()
- return values[1]
- except serial.SerialException as exc:
- logger.exception("Exception while reading calibration info: %s", exc)
- return "Unknown"
-
- def readFrequencies(self) -> List[str]:
- return self.readValues("frequencies")
-
def resetSweep(self, start: int, stop: int):
- self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
+ self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
self.writeSerial("resume")
- def readVersion(self):
- logger.debug("Reading version info.")
- if not self.serial.is_open:
- return
- with self.app.serialLock:
- try:
- data = "a"
- while data != "":
- data = self.serial.readline().decode('ascii')
- self.serial.write("version\r".encode('ascii'))
- result = ""
- data = ""
- sleep(0.1)
- while "ch>" not in data:
- data = self.serial.readline().decode('ascii')
- result += data
- values = result.splitlines()
- logger.debug("Found version info: %s", values[1])
- return values[1]
- except serial.SerialException as exc:
- logger.exception("Exception while reading firmware version: %s", exc)
- return ""
-
def setSweep(self, start, stop):
- self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
+ self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
sleep(1)
diff --git a/NanoVNASaver/Hardware/Hardware.py b/NanoVNASaver/Hardware/Hardware.py
index 996c3de..d4b1304 100644
--- a/NanoVNASaver/Hardware/Hardware.py
+++ b/NanoVNASaver/Hardware/Hardware.py
@@ -18,8 +18,8 @@
# along with this program. If not, see .
import logging
import platform
-from typing import List, Tuple
from collections import namedtuple
+from typing import List
import serial
from serial.tools import list_ports
@@ -30,16 +30,16 @@ from NanoVNASaver.Hardware.NanoVNA_F import NanoVNA_F
from NanoVNASaver.Hardware.NanoVNA_H import NanoVNA_H, NanoVNA_H4
from NanoVNASaver.Hardware.NanoVNA import NanoVNA
from NanoVNASaver.Hardware.NanoVNA_V2 import NanoVNAV2
-from NanoVNASaver.Hardware.Serial import drain_serial
+from NanoVNASaver.Hardware.Serial import drain_serial, Interface
logger = logging.getLogger(__name__)
-Device = namedtuple("Device", "vid pid name")
+USBDevice = namedtuple("Device", "vid pid name")
-DEVICETYPES = (
- Device(0x0483, 0x5740, "NanoVNA"),
- Device(0x16c0, 0x0483, "AVNA"),
- Device(0x04b4, 0x0008, "NanaVNA-V2"),
+USBDEVICETYPES = (
+ USBDevice(0x0483, 0x5740, "NanoVNA"),
+ USBDevice(0x16c0, 0x0483, "AVNA"),
+ USBDevice(0x04b4, 0x0008, "S-A-A-2"),
)
RETRIES = 3
TIMEOUT = 0.2
@@ -56,60 +56,63 @@ def _fix_v2_hwinfo(dev):
# Get list of interfaces with VNAs connected
-def get_interfaces() -> List[Tuple[str, str]]:
- return_ports = []
+def get_interfaces() -> List[Interface]:
+ interfaces = []
+ # serial like usb interfaces
for d in list_ports.comports():
if platform.system() == 'Windows' and d.vid is None:
d = _fix_v2_hwinfo(d)
- for t in DEVICETYPES:
- if d.vid == t.vid and d.pid == t.pid:
- port = d.device
- logger.info("Found %s (%04x %04x) on port %s",
- t.name, d.vid, d.pid, d.device)
- return_ports.append((port, f"{port} ({t.name})"))
- return return_ports
+ for t in USBDEVICETYPES:
+ if d.vid != t.vid or d.pid != t.pid:
+ continue
+ logger.debug("Found %s USB:(%04x:%04x) on port %s",
+ t.name, d.vid, d.pid, d.device)
+ iface = Interface('serial', t.name)
+ iface.port = d.device
+ interfaces.append(iface)
+ return interfaces
-def get_VNA(app, serial_port: serial.Serial) -> 'VNA':
- serial_port.timeout = TIMEOUT
+def get_VNA(iface: Interface) -> 'VNA':
+ # serial_port.timeout = TIMEOUT
logger.info("Finding correct VNA type...")
- with app.serialLock:
- vna_version = detect_version(serial_port)
+ with iface.lock:
+ vna_version = detect_version(iface)
if vna_version == 'v2':
logger.info("Type: NanoVNA-V2")
- return NanoVNAV2(app, serial_port)
+ return NanoVNAV2(iface)
logger.info("Finding firmware variant...")
- tmp_vna = VNA(app, serial_port)
+ tmp_vna = VNA(iface)
tmp_vna.flushSerialBuffers()
firmware = tmp_vna.readFirmware()
if firmware.find("AVNA + Teensy") > 0:
logger.info("Type: AVNA")
- return AVNA(app, serial_port)
+ return AVNA(iface)
if firmware.find("NanoVNA-H 4") > 0:
logger.info("Type: NanoVNA-H4")
- vna = NanoVNA_H4(app, serial_port)
+ vna = NanoVNA_H4(iface)
if vna.readFirmware().find("sweep_points 201") > 0:
logger.info("VNA has 201 datapoints capability")
- vna._datapoints = (201, 101)
+ vna.datapoints = (201, 101)
return vna
if firmware.find("NanoVNA-H") > 0:
logger.info("Type: NanoVNA-H")
- vna = NanoVNA_H(app, serial_port)
+ vna = NanoVNA_H(iface)
if vna.readFirmware().find("sweep_points 201") > 0:
logger.info("VNA has 201 datapoints capability")
- vna._datapoints = (201, 101)
+ vna.datapoints = (201, 101)
return vna
if firmware.find("NanoVNA-F") > 0:
logger.info("Type: NanoVNA-F")
- return NanoVNA_F(app, serial_port)
+ return NanoVNA_F(iface)
if firmware.find("NanoVNA") > 0:
logger.info("Type: Generic NanoVNA")
- return NanoVNA(app, serial_port)
+ return NanoVNA(iface)
logger.warning("Did not recognize NanoVNA type from firmware.")
- return NanoVNA(app, serial_port)
+ return NanoVNA(iface)
def detect_version(serial_port: serial.Serial) -> str:
diff --git a/NanoVNASaver/Hardware/NanoVNA.py b/NanoVNASaver/Hardware/NanoVNA.py
index e3b715e..1f4a871 100644
--- a/NanoVNASaver/Hardware/NanoVNA.py
+++ b/NanoVNASaver/Hardware/NanoVNA.py
@@ -25,7 +25,7 @@ import serial
import numpy as np
from PyQt5 import QtGui
-from NanoVNASaver.Hardware.Serial import drain_serial
+from NanoVNASaver.Hardware.Serial import drain_serial, Interface
from NanoVNASaver.Hardware.VNA import VNA, Version
logger = logging.getLogger(__name__)
@@ -36,70 +36,53 @@ class NanoVNA(VNA):
screenwidth = 320
screenheight = 240
- def __init__(self, app, serial_port):
- super().__init__(app, serial_port)
- self.version = Version(self.readVersion())
+ def __init__(self, iface: Interface):
+ super().__init__(iface)
+ version_string = self.readVersion()
+ self.version = Version(version_string)
+ self.sweep_method = "sweep"
+ self.start = 27000000
+ self.stop = 30000000
+ self._sweepdata = []
- logger.debug("Testing against 0.2.0")
- if self.version.version_string.find("extended with scan") > 0:
- logger.debug("Incompatible scan command detected.")
- self.features.add("Incompatible scan command")
- self.useScan = False
+ if self.version >= Version("0.7.1"):
+ self.features.add("Scan mask command")
+ self.sweep_method = "scan_mask"
elif self.version >= Version("0.2.0"):
logger.debug("Newer than 0.2.0, using new scan command.")
- self.features.add("New scan command")
- self.useScan = True
- else:
- logger.debug("Older than 0.2.0, using old sweep command.")
- self.features.add("Original sweep method")
- self.useScan = False
+ self.features.add("Scan command")
+ self.sweep_method = "scan"
self.readFeatures()
def isValid(self):
return True
- def getCalibration(self) -> str:
- logger.debug("Reading calibration info.")
- if not self.serial.is_open:
- return "Not connected."
- with self.app.serialLock:
- try:
- drain_serial(self.serial)
- self.serial.write("cal\r".encode('ascii'))
- result = ""
- data = ""
- sleep(0.1)
- while "ch>" not in data:
- data = self.serial.readline().decode('ascii')
- result += data
- values = result.splitlines()
- return values[1]
- except serial.SerialException as exc:
- logger.exception("Exception while reading calibration info: %s", exc)
- return "Unknown"
+
+ def _capture_data(self) -> bytes:
+ with self.serial.lock:
+ drain_serial(self.serial)
+ timeout = self.serial.timeout
+ self.serial.write("capture\r".encode('ascii'))
+ self.serial.timeout = 4
+ self.serial.readline()
+ image_data = self.serial.read(
+ self.screenwidth * self.screenheight * 2)
+ self.serial.timeout = timeout
+ rgb_data = struct.unpack(
+ f">{self.screenwidth * self.screenheight}H",
+ image_data)
+ rgb_array = np.array(rgb_data, dtype=np.uint32)
+ return (0xFF000000 +
+ ((rgb_array & 0xF800) << 8) +
+ ((rgb_array & 0x07E0) << 5) +
+ ((rgb_array & 0x001F) << 3))
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
- if not self.serial.is_open:
+ if not self.connected():
return QtGui.QPixmap()
try:
- with self.app.serialLock:
- drain_serial(self.serial)
- timeout = self.serial.timeout
- self.serial.write("capture\r".encode('ascii'))
- self.serial.timeout = 4
- self.serial.readline()
- image_data = self.serial.read(
- self.screenwidth * self.screenheight * 2)
- self.serial.timeout = timeout
- rgb_data = struct.unpack(
- f">{self.screenwidth * self.screenheight}H",
- image_data)
- rgb_array = np.array(rgb_data, dtype=np.uint32)
- rgba_array = (0xFF000000 +
- ((rgb_array & 0xF800) << 8) +
- ((rgb_array & 0x07E0) << 5) +
- ((rgb_array & 0x001F) << 3))
+ rgba_array = self._capture_data()
image = QtGui.QImage(
rgba_array,
self.screenwidth,
@@ -112,37 +95,61 @@ class NanoVNA(VNA):
"Exception while capturing screenshot: %s", exc)
return QtGui.QPixmap()
- def readFrequencies(self) -> List[str]:
- return self.readValues("frequencies")
-
def resetSweep(self, start: int, stop: int):
- self.writeSerial("sweep {start} {stop} {self.datapoints}")
+ self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
self.writeSerial("resume")
- def readVersion(self):
- logger.debug("Reading version info.")
- if not self.serial.is_open:
- return ""
- try:
- with self.app.serialLock:
- drain_serial(self.serial)
- self.serial.write("version\r".encode('ascii'))
- result = ""
- data = ""
- sleep(0.1)
- while "ch>" not in data:
- data = self.serial.readline().decode('ascii')
- result += data
- values = result.splitlines()
- logger.debug("Found version info: %s", values[1])
- return values[1]
- except serial.SerialException as exc:
- logger.exception("Exception while reading firmware version: %s", exc)
- return ""
-
def setSweep(self, start, stop):
- if self.useScan:
- self.writeSerial(f"scan {start} {stop} {self.datapoints}")
- else:
+ self.start = start
+ self.stop = stop
+ if self.sweep_method == "sweep":
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
- sleep(1)
+ elif self.sweep_method == "scan":
+ self.writeSerial(f"scan {start} {stop} {self.datapoints}")
+
+ def readFrequencies(self) -> List[int]:
+ if self.sweep_method != "scan_mask":
+ return super().readFrequencies()
+ step = (self.stop - self.start) / (self.datapoints - 1.0)
+ return [round(self.start + i * step) for i in range(self.datapoints)]
+
+ def readValues(self, value) -> List[str]:
+ if self.sweep_method != "scan_mask":
+ return super().readValues(value)
+ logger.debug("readValue with scan mask (%s)", value)
+ # Actually grab the data only when requesting channel 0.
+ # The hardware will return all channels which we will store.
+ if value == "data 0":
+ self._sweepdata = []
+ try:
+ with self.serial.lock:
+ drain_serial(self.serial)
+ self.serial.write(
+ (f"scan {self.start} {self.stop}"
+ f' {self.datapoints} 0b110\r'
+ ).encode("ascii"))
+ self.serial.readline()
+ logger.info("reading values")
+ retries = 0
+ while True:
+ line = self.serial.readline()
+ line = line.decode("ascii").strip()
+ if not line:
+ retries += 1
+ logger.info("Retry nr: %s", retries)
+ if retries > 10:
+ raise IOError("too many retries")
+ sleep(0.2)
+ continue
+ if line.startswith("ch>"):
+ break
+ data = line.split()
+ self._sweepdata.append((
+ f"{data[0]} {data[1]}",
+ f"{data[2]} {data[3]}"))
+ except IOError as exc:
+ logger.error("Error readValues: %s", exc)
+ if value == "data 0":
+ return [x[0] for x in self._sweepdata]
+ if value == "data 1":
+ return [x[1] for x in self._sweepdata]
diff --git a/NanoVNASaver/Hardware/NanoVNA_F.py b/NanoVNASaver/Hardware/NanoVNA_F.py
index b71634d..e3fe1cd 100644
--- a/NanoVNASaver/Hardware/NanoVNA_F.py
+++ b/NanoVNASaver/Hardware/NanoVNA_F.py
@@ -17,14 +17,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import logging
-import struct
import serial
import numpy as np
from PyQt5 import QtGui
from NanoVNASaver.Hardware.NanoVNA import NanoVNA
-from NanoVNASaver.Hardware.Serial import drain_serial
logger = logging.getLogger(__name__)
@@ -36,26 +34,10 @@ class NanoVNA_F(NanoVNA):
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
- if not self.serial.is_open:
+ if not self.connected():
return QtGui.QPixmap()
try:
- with self.app.serialLock:
- drain_serial(self.serial)
- self.serial.write("capture\r".encode('ascii'))
- timeout = self.serial.timeout
- self.serial.timeout = 4
- self.serial.readline()
- image_data = self.serial.read(
- self.screenwidth * self.screenheight * 2)
- self.serial.timeout = timeout
- rgb_data = struct.unpack(
- f"<{self.screenwidth * self.screenheight}H", image_data)
- rgb_array = np.array(rgb_data, dtype=np.uint32)
- rgba_array = (0xFF000000 +
- ((rgb_array & 0xF800) << 8) + # G?!
- ((rgb_array & 0x07E0) >> 3) + # B
- ((rgb_array & 0x001F) << 11)) # G
-
+ rgba_array = self._capture_data()
unwrapped_array = np.empty(
self.screenwidth*self.screenheight,
dtype=np.uint32)
diff --git a/NanoVNASaver/Hardware/NanoVNA_V2.py b/NanoVNASaver/Hardware/NanoVNA_V2.py
index 4d651e8..bbea8e5 100644
--- a/NanoVNASaver/Hardware/NanoVNA_V2.py
+++ b/NanoVNASaver/Hardware/NanoVNA_V2.py
@@ -21,6 +21,7 @@ import platform
from struct import pack, unpack_from
from typing import List
+from NanoVNASaver.Hardware.Serial import Interface
from NanoVNASaver.Hardware.VNA import VNA, Version
if platform.system() != 'Windows':
@@ -59,14 +60,14 @@ class NanoVNAV2(VNA):
screenwidth = 320
screenheight = 240
- def __init__(self, app, serialPort):
- super().__init__(app, serialPort)
+ def __init__(self, iface: Interface):
+ super().__init__(iface)
if platform.system() != 'Windows':
tty.setraw(self.serial.fd)
# reset protocol to known state
- with self.app.serialLock:
+ with self.serial.lock:
self.serial.write(pack(" str:
+ return "Unknown"
def readFirmware(self) -> str:
# read register 0xf3 and 0xf4 (firmware major and minor version)
cmd = pack(" List[str]:
- self.checkValid()
+ def readFrequencies(self) -> List[int]:
return [
- str(int(self.sweepStartHz + i * self.sweepStepHz))
+ int(self.sweepStartHz + i * self.sweepStepHz)
for i in range(self.datapoints)]
def readValues(self, value) -> List[str]:
- self.checkValid()
-
# Actually grab the data only when requesting channel 0.
# The hardware will return all channels which we will store.
if value == "data 0":
# reset protocol to known state
- with self.app.serialLock:
+ with self.serial.lock:
self.serial.timeout = 8 # should be enough
self.serial.write(pack(".
+import logging
+from threading import Lock
+
import serial
+logger = logging.getLogger(__name__)
+
def drain_serial(serial_port: serial.Serial):
"""drain up to 10k outstanding data in the serial incoming buffer"""
for _ in range(80):
if len(serial_port.read(128)) == 0:
break
+
+class Interface(serial.Serial):
+ def __init__(self, interface_type: str, comment, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ assert interface_type in ('serial', 'usb', 'bt', 'network')
+ self.type = interface_type
+ self.comment = comment
+ self.port = None
+ self.baudrate = 115200
+ self.lock = Lock()
+
+ def __str__(self):
+ return f"{self.port} ({self.comment})"
diff --git a/NanoVNASaver/Hardware/VNA.py b/NanoVNASaver/Hardware/VNA.py
index 3d61254..d5d8e60 100644
--- a/NanoVNASaver/Hardware/VNA.py
+++ b/NanoVNASaver/Hardware/VNA.py
@@ -21,10 +21,10 @@ from time import sleep
from typing import List
import serial
-from PyQt5 import QtWidgets, QtGui
+from PyQt5 import QtGui
from NanoVNASaver.Settings import Version
-from NanoVNASaver.Hardware.Serial import drain_serial
+from NanoVNASaver.Hardware.Serial import Interface, drain_serial
logger = logging.getLogger(__name__)
@@ -33,10 +33,9 @@ class VNA:
name = "VNA"
_datapoints = (101, )
- def __init__(self, app: QtWidgets.QWidget, serial_port: serial.Serial):
- self.app = app
- self.serial = serial_port
- self.version: Version = Version("0.0.0")
+ def __init__(self, iface: Interface):
+ self.serial = iface
+ self.version = Version("0.0.0")
self.features = set()
self.validateInput = True
self.datapoints = VNA._datapoints[0]
@@ -54,9 +53,8 @@ class VNA:
return self.features
- # TODO: check return types
def readFrequencies(self) -> List[int]:
- return []
+ return [int(f) for f in self.readValues("frequencies")]
def resetSweep(self, start: int, stop: int):
pass
@@ -64,20 +62,37 @@ class VNA:
def isValid(self):
return False
- def isDFU(self):
- return False
+ def connected(self) -> bool:
+ return self.serial.is_open
def getFeatures(self) -> List[str]:
return self.features
def getCalibration(self) -> str:
+ logger.debug("Reading calibration info.")
+ if not self.connected():
+ return "Not connected."
+ with self.serial.lock:
+ try:
+ drain_serial(self.serial)
+ self.serial.write("cal\r".encode('ascii'))
+ result = ""
+ data = ""
+ sleep(0.1)
+ while "ch>" not in data:
+ data = self.serial.readline().decode('ascii')
+ result += data
+ values = result.splitlines()
+ return values[1]
+ except serial.SerialException as exc:
+ logger.exception("Exception while reading calibration info: %s", exc)
return "Unknown"
def getScreenshot(self) -> QtGui.QPixmap:
return QtGui.QPixmap()
def flushSerialBuffers(self):
- with self.app.serialLock:
+ with self.serial.lock:
self.serial.write("\r\n\r\n".encode("ascii"))
sleep(0.1)
self.serial.reset_input_buffer()
@@ -86,7 +101,7 @@ class VNA:
def readFirmware(self) -> str:
try:
- with self.app.serialLock:
+ with self.serial.lock:
drain_serial(self.serial)
self.serial.write("info\r".encode('ascii'))
result = ""
@@ -103,7 +118,7 @@ class VNA:
def readFromCommand(self, command) -> str:
try:
- with self.app.serialLock:
+ with self.serial.lock:
drain_serial(self.serial)
self.serial.write(f"{command}\r".encode('ascii'))
result = ""
@@ -121,7 +136,7 @@ class VNA:
def readValues(self, value) -> List[str]:
logger.debug("VNA reading %s", value)
try:
- with self.app.serialLock:
+ with self.serial.lock:
drain_serial(self.serial)
self.serial.write(f"{value}\r".encode('ascii'))
result = ""
@@ -140,12 +155,33 @@ class VNA:
"Exception while reading %s: %s", value, exc)
return []
+ def readVersion(self) -> str:
+ logger.debug("Reading version info.")
+ if not self.connected():
+ return ""
+ try:
+ with self.serial.lock:
+ drain_serial(self.serial)
+ self.serial.write("version\r".encode('ascii'))
+ result = ""
+ data = ""
+ sleep(0.1)
+ while "ch>" not in data:
+ data = self.serial.readline().decode('ascii')
+ result += data
+ values = result.splitlines()
+ logger.debug("Found version info: %s", values[1])
+ return values[1]
+ except serial.SerialException as exc:
+ logger.exception("Exception while reading firmware version: %s", exc)
+ return ""
+
def writeSerial(self, command):
- if not self.serial.is_open:
+ if not self.connected():
logger.warning("Writing without serial port being opened (%s)",
command)
return
- with self.app.serialLock:
+ with self.serial.lock:
try:
self.serial.write(f"{command}\r".encode('ascii'))
self.serial.readline()
@@ -156,31 +192,3 @@ class VNA:
def setSweep(self, start, stop):
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
-
-
-# TODO: should be dropped and the serial part should be a connection class
-# which handles unconnected devices
-class InvalidVNA(VNA):
- name = "Invalid"
- _datapoints = (0, )
-
- def setSweep(self, start, stop):
- return
-
- def resetSweep(self, start, stop):
- return
-
- def writeSerial(self, command):
- return
-
- def readFirmware(self):
- return
-
- def readFrequencies(self) -> List[int]:
- return []
-
- def readValues(self, value):
- return
-
- def flushSerialBuffers(self):
- return
diff --git a/NanoVNASaver/NanoVNASaver.py b/NanoVNASaver/NanoVNASaver.py
index e101e29..3cb190f 100644
--- a/NanoVNASaver/NanoVNASaver.py
+++ b/NanoVNASaver/NanoVNASaver.py
@@ -24,7 +24,6 @@ from collections import OrderedDict
from time import sleep, strftime, localtime
from typing import List
-import serial
from PyQt5 import QtWidgets, QtCore, QtGui
from .Windows import (
@@ -36,8 +35,8 @@ from .Formatting import (
format_frequency, format_frequency_short, format_frequency_sweep,
parse_frequency,
)
-from .Hardware.Hardware import get_interfaces, get_VNA
-from .Hardware.VNA import InvalidVNA
+from .Hardware.Hardware import Interface, get_interfaces, get_VNA
+from .Hardware.VNA import VNA
from .RFTools import Datapoint, corr_att_data
from .Charts.Chart import Chart
from .Charts import (
@@ -92,9 +91,8 @@ class NanoVNASaver(QtWidgets.QWidget):
self.noSweeps = 1 # Number of sweeps to run
- self.serialLock = threading.Lock()
- self.serial = serial.Serial()
- self.vna = InvalidVNA(self, serial)
+ self.interface = Interface("serial", "None")
+ self.vna = VNA(self.interface)
self.dataLock = threading.Lock()
# TODO: use Touchstone class as data container
@@ -110,8 +108,6 @@ class NanoVNASaver(QtWidgets.QWidget):
self.markers = []
- self.serialPort = ""
-
logger.debug("Building user interface")
self.baseTitle = f"NanoVNA Saver {NanoVNASaver.version}"
@@ -456,7 +452,7 @@ class NanoVNASaver(QtWidgets.QWidget):
self.rescanSerialPort()
self.serialPortInput.setEditable(True)
btn_rescan_serial_port = QtWidgets.QPushButton("Rescan")
- btn_rescan_serial_port.setFixedWidth(60)
+ btn_rescan_serial_port.setFixedWidth(65)
btn_rescan_serial_port.clicked.connect(self.rescanSerialPort)
serial_port_input_layout = QtWidgets.QHBoxLayout()
serial_port_input_layout.addWidget(self.serialPortInput)
@@ -471,6 +467,7 @@ class NanoVNASaver(QtWidgets.QWidget):
serial_button_layout.addWidget(self.btnSerialToggle, stretch=1)
self.btnDeviceSettings = QtWidgets.QPushButton("Manage")
+ self.btnDeviceSettings.setFixedWidth(65)
self.btnDeviceSettings.clicked.connect(
lambda: self.display_window("device_settings"))
serial_button_layout.addWidget(self.btnDeviceSettings, stretch=0)
@@ -555,8 +552,8 @@ class NanoVNASaver(QtWidgets.QWidget):
def rescanSerialPort(self):
self.serialPortInput.clear()
- for port in get_interfaces():
- self.serialPortInput.insertItem(1, port[1], port[0])
+ for iface in get_interfaces():
+ self.serialPortInput.insertItem(1, f"{iface}", iface)
def exportFile(self, nr_params: int = 1):
if len(self.data) == 0:
@@ -600,70 +597,63 @@ class NanoVNASaver(QtWidgets.QWidget):
return
def serialButtonClick(self):
- if self.serial.is_open:
- self.stopSerial()
+ if not self.vna.connected():
+ self.connect_device()
else:
- self.startSerial()
+ self.disconnect_device()
return
- def startSerial(self):
- with self.serialLock:
- self.serialPort = self.serialPortInput.currentData()
- if self.serialPort == "":
- self.serialPort = self.serialPortInput.currentText()
- logger.info("Opening serial port %s", self.serialPort)
+ def connect_device(self):
+ with self.interface.lock:
+ self.interface = self.serialPortInput.currentData()
+ logger.info("Connection %s", self.interface)
try:
- self.serial = serial.Serial(port=self.serialPort, baudrate=115200)
- self.serial.timeout = 0.05
- except serial.SerialException as exc:
- logger.error("Tried to open %s and failed: %s", self.serialPort, exc)
+ self.interface.open()
+ self.interface.timeout = 0.05
+ except (IOError, AttributeError) as exc:
+ logger.error("Tried to open %s and failed: %s",
+ self.interface, exc)
return
- if not self.serial.isOpen() :
- logger.error("Unable to open port %s", self.serialPort)
+ if not self.interface.isOpen():
+ logger.error("Unable to open port %s", self.interface)
return
- self.btnSerialToggle.setText("Disconnect")
+ sleep(0.1)
+ try:
+ self.vna = get_VNA(self.interface)
+ except IOError as exc:
+ logger.error("Unable to connect to VNA: %s", exc)
- sleep(0.05)
-
- self.vna = get_VNA(self, self.serial)
self.vna.validateInput = self.settings.value("SerialInputValidation", True, bool)
self.worker.setVNA(self.vna)
- logger.info(self.vna.readFirmware())
+ # connected
+ self.btnSerialToggle.setText("Disconnect")
frequencies = self.vna.readFrequencies()
- if frequencies:
- logger.info("Read starting frequency %s and end frequency %s",
- frequencies[0], frequencies[100])
- if int(frequencies[0]) == int(frequencies[100]) and (
- self.sweepStartInput.text() == "" or
- self.sweepEndInput.text() == ""):
- self.sweepStartInput.setText(
- format_frequency_sweep(int(frequencies[0])))
- self.sweepEndInput.setText(
- format_frequency_sweep(int(frequencies[100]) + 100000))
- elif (self.sweepStartInput.text() == "" or
- self.sweepEndInput.text() == ""):
- self.sweepStartInput.setText(
- format_frequency_sweep(int(frequencies[0])))
- self.sweepEndInput.setText(
- format_frequency_sweep(int(frequencies[100])))
- self.sweepStartInput.textEdited.emit(
- self.sweepStartInput.text())
- self.sweepStartInput.textChanged.emit(
- self.sweepStartInput.text())
- else:
+ if not frequencies:
logger.warning("No frequencies read")
return
+ logger.info("Read starting frequency %s and end frequency %s",
+ frequencies[0], frequencies[-1])
+ self.sweepStartInput.setText(
+ format_frequency_sweep(frequencies[0]))
+ if frequencies[0] < frequencies[-1]:
+ self.sweepEndInput.setText(
+ format_frequency_sweep(frequencies[-1]))
+ else:
+ self.sweepEndInput.setText(
+ format_frequency_sweep(frequencies[-1] + 100000))
+ self.sweepStartInput.textEdited.emit(self.sweepStartInput.text())
+ self.sweepStartInput.textChanged.emit(self.sweepStartInput.text())
+
logger.debug("Starting initial sweep")
self.sweep()
- return
- def stopSerial(self):
- with self.serialLock:
- logger.info("Closing connection to NanoVNA")
- self.serial.close()
- self.btnSerialToggle.setText("Connect to NanoVNA")
+ def disconnect_device(self):
+ with self.interface.lock:
+ logger.info("Closing connection to %s", self.interface)
+ self.interface.close()
+ self.btnSerialToggle.setText("Connect to device")
def toggleSweepSettings(self, disabled):
self.sweepStartInput.setDisabled(disabled)
@@ -673,8 +663,8 @@ class NanoVNASaver(QtWidgets.QWidget):
self.sweepCountInput.setDisabled(disabled)
def sweep(self):
- # Run the serial port update
- if not self.serial.is_open:
+ # Run the device data update
+ if not self.vna.connected():
return
self.worker.stopped = False
diff --git a/NanoVNASaver/Settings.py b/NanoVNASaver/Settings.py
index 4bbbd98..e518f08 100644
--- a/NanoVNASaver/Settings.py
+++ b/NanoVNASaver/Settings.py
@@ -155,36 +155,35 @@ class BandsModel(QtCore.QAbstractTableModel):
class Version:
- RXP = re.compile(r"(.*\s+)?(\d+)\.(\d+)\.(\d+)(.*)")
+ RXP = re.compile(r"""^
+ \D*
+ (?P\d+)\.
+ (?P\d+)\.
+ (?P\d+)
+ (?P.*)
+ $""", re.VERBOSE)
- def __init__(self, version_string: str):
- self.major = 0
- self.minor = 0
- self.revision = 0
- self.note = ""
- self.version_string = version_string
-
- results = Version.RXP.match(version_string)
- if results:
- self.major = int(results.group(2))
- self.minor = int(results.group(3))
- self.revision = int(results.group(4))
- self.note = results.group(5)
- logger.debug(
- "Parsed version as \"%d.%d.%d%s\"",
- self.major, self.minor, self.revision, self.note)
+ def __init__(self, vstring: str = "0.0.0"):
+ self.data = {
+ "major": 0,
+ "minor": 0,
+ "revision": 0,
+ "note": "",
+ }
+ try:
+ self.data = Version.RXP.search(vstring).groupdict()
+ for name in ("major", "minor", "revision"):
+ self.data[name] = int(self.data[name])
+ except AttributeError:
+ logger.error("Unable to parse version: %s", vstring)
def __gt__(self, other: "Version") -> bool:
- if self.major > other.major:
- return True
- if self.major < other.major:
- return False
- if self.minor > other.minor:
- return True
- if self.minor < other.minor:
- return False
- if self.revision > other.revision:
- return True
+ l, r = self.data, other.data
+ for name in ("major", "minor", "revision"):
+ if l[name] > r[name]:
+ return True
+ if l[name] < r[name]:
+ return False
return False
def __lt__(self, other: "Version") -> bool:
@@ -197,11 +196,8 @@ class Version:
return self < other or self == other
def __eq__(self, other: "Version") -> bool:
- return (
- self.major == other.major and
- self.minor == other.minor and
- self.revision == other.revision and
- self.note == other.note)
+ return self.data == other.data
def __str__(self) -> str:
- return f"{self.major}.{self.minor}.{self.revision}{self.note}"
+ return (f'{self.data["major"]}.{self.data["minor"]}'
+ f'.{self.data["revision"]}{self.data["note"]}')
diff --git a/NanoVNASaver/SweepWorker.py b/NanoVNASaver/SweepWorker.py
index b0be9c1..358158c 100644
--- a/NanoVNASaver/SweepWorker.py
+++ b/NanoVNASaver/SweepWorker.py
@@ -32,6 +32,7 @@ logger = logging.getLogger(__name__)
def truncate(values: List[List[Tuple]], count: int) -> List[List[Tuple]]:
+ """truncate drops extrema from data list if averaging is active"""
keep = len(values) - count
logger.debug("Truncating from %d values to %d", len(values), keep)
if count < 1 or keep < 1:
@@ -82,7 +83,7 @@ class SweepWorker(QtCore.QRunnable):
logger.info("Initializing SweepWorker")
self.running = True
self.percentage = 0
- if not self.app.serial.is_open:
+ if not self.app.vna.connected():
logger.debug(
"Attempted to run without being connected to the NanoVNA")
self.running = False
@@ -127,82 +128,52 @@ class SweepWorker(QtCore.QRunnable):
values21 = []
frequencies = []
- if self.averaging:
- for i in range(self.noSweeps):
- logger.debug("Sweep segment no %d averaged over %d readings",
- i, self.averages)
- if self.stopped:
- logger.debug("Stopping sweeping as signalled")
- break
- start = sweep_from + i * self.vna.datapoints * stepsize
- freq, val11, val21 = self.readAveragedSegment(
- start,
- start + (self.vna.datapoints - 1) * stepsize,
- self.averages)
-
- frequencies.extend(freq)
- values11.extend(val11)
- values21.extend(val21)
-
- self.percentage = (i + 1) * (self.vna.datapoints - 1) / \
- self.noSweeps
- logger.debug("Saving acquired data")
- self.saveData(frequencies, values11, values21)
-
- else:
+ first_sweep = True
+ finished = False
+ while not finished:
for i in range(self.noSweeps):
logger.debug("Sweep segment no %d", i)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
+ finished = True
break
start = sweep_from + i * self.vna.datapoints * stepsize
+
try:
- freq, val11, val21 = self.readSegment(
- start, start + (self.vna.datapoints - 1) * stepsize)
+ if self.continuousSweep and not first_sweep:
+ _, values11, values21 = self.readSegment(
+ start, start + (self.vna.datapoints-1) * stepsize)
+ self.percentage = (i + 1) * 100 / self.noSweeps
+ self.updateData(values11, values21, i,
+ self.vna.datapoints)
+ continue
+
+ if self.averaging:
+ freq, val11, val21 = self.readAveragedSegment(
+ start,
+ start + (self.vna.datapoints - 1) * stepsize,
+ self.averages)
+ else:
+ freq, val11, val21 = self.readSegment(
+ start, start + (self.vna.datapoints - 1) * stepsize)
frequencies.extend(freq)
values11.extend(val11)
values21.extend(val21)
self.percentage = (i + 1) * 100 / self.noSweeps
- logger.debug("Saving acquired data")
self.saveData(frequencies, values11, values21)
- except NanoVNAValueException as e:
+
+ except ValueError as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepError.emit()
- except NanoVNASerialException as e:
- self.error_message = str(e)
- self.stopped = True
- self.running = False
- self.signals.sweepFatalError.emit()
- while self.continuousSweep and not self.stopped:
- logger.debug("Continuous sweeping")
- for i in range(self.noSweeps):
- logger.debug("Sweep segment no %d", i)
- if self.stopped:
- logger.debug("Stopping sweeping as signalled")
- break
- start = sweep_from + i * self.vna.datapoints * stepsize
- try:
- _, values11, values21 = self.readSegment(
- start, start + (self.vna.datapoints-1) * stepsize)
- logger.debug("Updating acquired data")
- self.updateData(values11, values21, i, self.vna.datapoints)
- except NanoVNAValueException as e:
- self.error_message = str(e)
- self.stopped = True
- self.running = False
- self.signals.sweepError.emit()
- except NanoVNASerialException as e:
- self.error_message = str(e)
- self.stopped = True
- self.running = False
- self.signals.sweepFatalError.emit()
+ if not self.continuousSweep:
+ finished = True
+ first_sweep = False
- # Reset the device to show the full range if we were multisegment
if self.noSweeps > 1:
logger.debug("Resetting NanoVNA sweep to full range: %d to %d",
parse_frequency(
@@ -267,7 +238,7 @@ class SweepWorker(QtCore.QRunnable):
def applyCalibration(self,
raw_data11: List[Datapoint],
raw_data21: List[Datapoint]
- ) -> (List[Datapoint], List[Datapoint]):
+ ) -> Tuple[List[Datapoint], List[Datapoint]]:
if self.offsetDelay != 0:
tmp = []
for dp in raw_data11:
@@ -329,7 +300,7 @@ class SweepWorker(QtCore.QRunnable):
self.vna.setSweep(start, stop)
# Let's check the frequencies first:
- frequencies = self.readFreq()
+ frequencies = self.vna.readFrequencies()
# S11
values11 = self.readData("data 0")
# S21
@@ -356,25 +327,19 @@ class SweepWorker(QtCore.QRunnable):
a, b = d.split(" ")
try:
if self.vna.validateInput and (
- float(a) < -9.5 or float(a) > 9.5):
+ abs(float(a)) > 9.5 or
+ abs(float(b)) > 9.5):
logger.warning(
- "Got a non-float data value: %s (%s)", d, a)
- logger.debug("Re-reading %s", data)
+ "Got a non plausible data value: (%s)", d)
done = False
- elif self.vna.validateInput and (
- float(b) < -9.5 or float(b) > 9.5):
- logger.warning(
- "Got a non-float data value: %s (%s)", d, b)
- logger.debug("Re-reading %s", data)
- done = False
- else:
- returndata.append((float(a), float(b)))
- except Exception as e:
+ break
+ returndata.append((float(a), float(b)))
+ except ValueError as exc:
logger.exception("An exception occurred reading %s: %s",
- data, e)
- logger.debug("Re-reading %s", data)
+ data, exc)
done = False
if not done:
+ logger.debug("Re-reading %s", data)
sleep(0.2)
count += 1
if count == 10:
@@ -384,7 +349,7 @@ class SweepWorker(QtCore.QRunnable):
logger.critical(
"Tried and failed to read %s %d times. Giving up.",
data, count)
- raise NanoVNAValueException(
+ raise IOError(
f"Failed reading {data} {count} times.\n"
f"Data outside expected valid ranges,"
f" or in an unexpected format.\n\n"
@@ -392,40 +357,6 @@ class SweepWorker(QtCore.QRunnable):
f"device settings screen.")
return returndata
- def readFreq(self):
- # TODO: Figure out why frequencies sometimes arrive as non-integers
- logger.debug("Reading frequencies")
- returnfreq = []
- done = False
- count = 0
- while not done:
- done = True
- returnfreq = []
- tmpfreq = self.vna.readFrequencies()
- if not tmpfreq:
- logger.warning("Read no frequencies")
- raise NanoVNASerialException(
- "Failed reading frequencies: Returned no values.")
- for f in tmpfreq:
- if not f.isdigit():
- logger.warning("Got a non-digit frequency: %s", f)
- logger.debug("Re-reading frequencies")
- done = False
- count += 1
- if count == 10:
- logger.error(
- "Tried and failed %d times to read frequencies.",
- count)
- if count >= 20:
- logger.critical(
- "Tried and failed to read frequencies from the"
- " NanoVNA %d times.", count)
- raise NanoVNAValueException(
- f"Failed reading frequencies {count} times.")
- else:
- returnfreq.append(int(f))
- return returnfreq
-
def setContinuousSweep(self, continuous_sweep: bool):
self.continuousSweep = continuous_sweep
@@ -439,11 +370,3 @@ class SweepWorker(QtCore.QRunnable):
def setVNA(self, vna):
self.vna = vna
-
-
-class NanoVNAValueException(Exception):
- pass
-
-
-class NanoVNASerialException(Exception):
- pass
diff --git a/NanoVNASaver/Windows/About.py b/NanoVNASaver/Windows/About.py
index 22dbbd1..c8fb505 100644
--- a/NanoVNASaver/Windows/About.py
+++ b/NanoVNASaver/Windows/About.py
@@ -53,7 +53,7 @@ class AboutWindow(QtWidgets.QWidget):
f"NanoVNASaver version {self.app.version}"))
layout.addWidget(QtWidgets.QLabel(""))
layout.addWidget(QtWidgets.QLabel(
- "\N{COPYRIGHT SIGN} Copyright 2019, 2020 Rune B. Broberg"
+ "\N{COPYRIGHT SIGN} Copyright 2019, 2020 Rune B. Broberg\n"
"\N{COPYRIGHT SIGN} Copyright 2020 NanoVNA-Saver Authors"
))
layout.addWidget(QtWidgets.QLabel(
@@ -115,12 +115,12 @@ class AboutWindow(QtWidgets.QWidget):
self.updateLabels()
def updateLabels(self):
- if self.app.vna.isValid():
- logger.debug("Valid VNA")
- v: Version = self.app.vna.version
+ try:
self.versionLabel.setText(
- f"NanoVNA Firmware Version: {self.app.vna.name}"
- f"{v.version_string}")
+ f"NanoVNA Firmware Version: {self.app.vna.name} "
+ f"v{self.app.vna.version}")
+ except (IOError, AttributeError):
+ pass
def updateSettings(self):
if self.updateCheckBox.isChecked():
@@ -150,7 +150,7 @@ class AboutWindow(QtWidgets.QWidget):
self.app.settings.setValue("CheckForUpdates", "Ask")
def findUpdates(self, automatic=False):
- latest_version = Version("")
+ latest_version = Version()
latest_url = ""
try:
req = request.Request(VERSION_URL)
@@ -174,7 +174,7 @@ class AboutWindow(QtWidgets.QWidget):
self.updateLabel.setText("Connection error.")
return
- logger.info("Latest version is %s", latest_version.version_string)
+ logger.info("Latest version is %s", latest_version)
this_version = Version(self.app.version)
logger.info("This is %s", this_version)
if latest_version > this_version:
@@ -183,9 +183,9 @@ class AboutWindow(QtWidgets.QWidget):
QtWidgets.QMessageBox.information(
self,
"Updates available",
- "There is a new update for NanoVNA-Saver available!\n" +
- "Version " + latest_version.version_string + "\n\n" +
- "Press \"About\" to find the update.")
+ f"There is a new update for NanoVNA-Saver available!\n"
+ f"Version {latest_version}\n\n"
+ f'Press "About" to find the update.')
else:
QtWidgets.QMessageBox.information(
self, "Updates available",
diff --git a/NanoVNASaver/Windows/CalibrationSettings.py b/NanoVNASaver/Windows/CalibrationSettings.py
index 15bb3cb..5ef5c2d 100644
--- a/NanoVNASaver/Windows/CalibrationSettings.py
+++ b/NanoVNASaver/Windows/CalibrationSettings.py
@@ -633,7 +633,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.btn_automatic.setDisabled(False)
return
logger.info("Starting automatic calibration assistant.")
- if not self.app.serial.is_open:
+ if not self.app.vna.connected():
QtWidgets.QMessageBox(
QtWidgets.QMessageBox.Information,
"NanoVNA not connected",
diff --git a/NanoVNASaver/Windows/DeviceSettings.py b/NanoVNASaver/Windows/DeviceSettings.py
index e2fb7e8..9981b4d 100644
--- a/NanoVNASaver/Windows/DeviceSettings.py
+++ b/NanoVNASaver/Windows/DeviceSettings.py
@@ -35,6 +35,12 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
QtWidgets.QShortcut(QtCore.Qt.Key_Escape, self, self.hide)
+ self.label = {
+ "status": QtWidgets.QLabel("Not connected."),
+ "firmware": QtWidgets.QLabel("Not connected."),
+ "calibration": QtWidgets.QLabel("Not connected."),
+ }
+
top_layout = QtWidgets.QHBoxLayout()
left_layout = QtWidgets.QVBoxLayout()
right_layout = QtWidgets.QVBoxLayout()
@@ -44,13 +50,13 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
status_box = QtWidgets.QGroupBox("Status")
status_layout = QtWidgets.QFormLayout(status_box)
- self.statusLabel = QtWidgets.QLabel("Not connected.")
- status_layout.addRow("Status:", self.statusLabel)
- self.calibrationStatusLabel = QtWidgets.QLabel("Not connected.")
- status_layout.addRow("Calibration:", self.calibrationStatusLabel)
+ status_layout.addRow("Status:", self.label["status"])
+ status_layout.addRow("Firmware:", self.label["firmware"])
+ status_layout.addRow("Calibration:", self.label["calibration"])
status_layout.addRow(QtWidgets.QLabel("Features:"))
+
self.featureList = QtWidgets.QListWidget()
status_layout.addRow(self.featureList)
@@ -93,33 +99,36 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
self.updateFields()
def updateFields(self):
- if self.app.vna.isValid():
- self.statusLabel.setText("Connected to " + self.app.vna.name + ".")
- if self.app.worker.running:
- self.calibrationStatusLabel.setText("(Sweep running)")
- else:
- self.calibrationStatusLabel.setText(self.app.vna.getCalibration())
-
+ if not self.app.vna.connected():
+ self.label["status"].setText("Not connected.")
+ self.label["firmware"].setText("Not connected.")
+ self.label["calibration"].setText("Not connected.")
self.featureList.clear()
- self.featureList.addItem(self.app.vna.name + " v" + str(self.app.vna.version))
- features = self.app.vna.getFeatures()
- for item in features:
- self.featureList.addItem(item)
-
- self.btnCaptureScreenshot.setDisabled("Screenshots" not in features)
- if "Customizable data points" in features:
- self.datapoints.clear()
- cur_dps = self.app.vna.datapoints
- dplist = self.app.vna._datapoints[:]
- for d in sorted(dplist):
- self.datapoints.addItem(str(d))
- self._set_datapoint_index(cur_dps)
- else:
- self.statusLabel.setText("Not connected.")
- self.calibrationStatusLabel.setText("Not connected.")
- self.featureList.clear()
- self.featureList.addItem("Not connected.")
self.btnCaptureScreenshot.setDisabled(True)
+ return
+
+ self.label["status"].setText(
+ f"Connected to {self.app.vna.name}.")
+ self.label["firmware"].setText(
+ f"{self.app.vna.name} v{self.app.vna.version}")
+ if self.app.worker.running:
+ self.label["calibration"].setText("(Sweep running)")
+ else:
+ self.label["calibration"].setText(self.app.vna.getCalibration())
+ self.featureList.clear()
+ features = self.app.vna.getFeatures()
+ for item in features:
+ self.featureList.addItem(item)
+
+ self.btnCaptureScreenshot.setDisabled("Screenshots" not in features)
+
+ if "Customizable data points" in features:
+ self.datapoints.clear()
+ cur_dps = self.app.vna.datapoints
+ dplist = self.app.vna._datapoints[:]
+ for d in sorted(dplist):
+ self.datapoints.addItem(str(d))
+ self._set_datapoint_index(cur_dps)
def updateValidation(self, validate_data: bool):
self.app.vna.validateInput = validate_data